diff options
author | Yorhel <git@yorhel.nl> | 2013-02-19 11:17:47 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2013-02-19 11:18:30 +0100 |
commit | 2e05ee9924f62d182a2b33d941e08a08362e3d01 (patch) | |
tree | 39b570771ced7c3d40285a1454dea24c953ec7db | |
parent | 57905cf2828495d2512f0f0f4f38894421a89dcb (diff) |
Added evtp
-rw-r--r-- | README | 1 | ||||
-rw-r--r-- | evtp.c | 361 | ||||
-rw-r--r-- | evtp.h | 85 | ||||
-rw-r--r-- | test/Makefile | 14 | ||||
-rw-r--r-- | test/evtp.c | 107 |
5 files changed, 565 insertions, 3 deletions
@@ -3,6 +3,7 @@ This git repository holds a collection of small and independent C libraries. dbusev - Register a DBusConnection with libev dbusuv - Register a DBusConnection with libuv ecbuf - An efficient expanding circular buffer + evtp - A convenient and efficient thread pool for libev ylog - A small, flexible and efficient logging system for C yopt - A small, simple and portable getopt_long() replacement yuri - A minimal URI validation and parsing library @@ -0,0 +1,361 @@ +/* +Copyright (c) 2009 by Juliusz Chroboczek +Copyright (c) 2013 by Yoran Heling + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#if defined(EV_CONFIG_H) +#include EV_CONFIG_H +#elif defined(HAVE_CONFIG_H) +#include "config.h" +#endif + +#include "evtp.h" + +#include <stdlib.h> +#include <errno.h> +#include <assert.h> +#include <pthread.h> + + +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +#include <stdatomic.h> + +typedef atomic_int threadpool_atomic; + +static inline int atomic_test(threadpool_atomic *a) { + return !!atomic_load_explicit(a, memory_order_acquire); +} + +static inline void atomic_set(threadpool_atomic *a) { + atomic_store_explicit(a, 1, memory_order_release); +} + +static inline void atomic_reset(threadpool_atomic *a) { + atomic_store_explicit(a, 0, memory_order_release); +} + +#elif defined(__GNUC__) && \ + (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 7)) + +typedef int threadpool_atomic; + +static inline int atomic_test(threadpool_atomic *a) { + return !!__atomic_load_n(a, __ATOMIC_ACQUIRE); +} + +static inline void atomic_set(threadpool_atomic *a) { + __atomic_store_n(a, 1, __ATOMIC_RELEASE); +} + +static inline void atomic_reset(threadpool_atomic *a) { + __atomic_store_n(a, 0, __ATOMIC_RELEASE); +} + +#else + +/* Since every lock acts as a memory barrier, and we only ever modify + atomics under a lock, this is mostly safe. Note however that + evtp_async might in principle get a stale copy. */ + +typedef volatile int threadpool_atomic; + +static inline int atomic_test(threadpool_atomic *a) { + return *a; +} + +static inline void atomic_set(threadpool_atomic *a) { + *a = 1; +} + +static inline void atomic_reset(threadpool_atomic *a) { + *a = 0; +} + +#endif + + +typedef struct evtp_queue_t { + evtp_work_t *first; + evtp_work_t *last; +} evtp_queue_t; + +struct evtp_t { + int maxthreads, threads, idle; + evtp_queue_t scheduled, scheduled_back; + /* Set when we request that all threads die. */ + int dying; + /* If this is false, we are guaranteed that scheduled_back is empty. */ + threadpool_atomic have_scheduled_back; + /* Protects everything except the atomics above. */ + pthread_mutex_t lock; + /* Signalled whenever a new continuation is enqueued or dying is set. */ + pthread_cond_t cond; + /* Signalled whenever a thread dies. */ + pthread_cond_t die_cond; +#if EV_MULTIPLICITY + struct ev_loop *loop; +#endif + ev_async async; +}; + + +/* (This was threadpool_run_callbacks()) */ +static void evtp_async(EV_P_ ev_async *async, int revents) { + evtp_t *threadpool = async->data; + evtp_work_t *items; + + if(!atomic_test(&threadpool->have_scheduled_back)) + return; + + pthread_mutex_lock(&threadpool->lock); + items = threadpool->scheduled_back.first; + /* Order is important. */ + threadpool->scheduled_back.first = NULL; + threadpool->scheduled_back.last = NULL; + atomic_reset(&threadpool->have_scheduled_back); + pthread_mutex_unlock(&threadpool->lock); + + while(items) { + evtp_work_t *first = items; + evtp_func_t func = first->done_func; + items = items->next; + /* Don't use 'first' after this function, application may have free()d it. */ + func(first); + } +} + + +evtp_t *evtp_create(EV_P_ int maxthreads) { + evtp_t *tp = calloc(1, sizeof(evtp_t)); + if(tp == NULL) + return NULL; + + tp->maxthreads = maxthreads; + pthread_mutex_init(&tp->lock, NULL); + pthread_cond_init(&tp->cond, NULL); + pthread_cond_init(&tp->die_cond, NULL); + + ev_async_init(&tp->async, evtp_async); + ev_async_start(EV_A_ &tp->async); + tp->async.data = tp; +#if EV_MULTIPLICITY + tp->loop = loop; +#endif + + return tp; +} + + +int evtp_die(evtp_t *threadpool, int canblock) { + int done; + + pthread_mutex_lock(&threadpool->lock); + + threadpool->dying = 1; + pthread_cond_broadcast(&threadpool->cond); + + while(threadpool->threads > 0) { + if(threadpool->scheduled_back.first || !canblock) + break; + pthread_cond_wait(&threadpool->die_cond, &threadpool->lock); + } + + done = threadpool->threads == 0; + + pthread_mutex_unlock(&threadpool->lock); + return done; +} + + +int evtp_destroy(evtp_t *threadpool) { + int dead; +#if EV_MULTIPLICITY + struct ev_loop *loop = threadpool->loop; +#endif + + pthread_mutex_lock(&threadpool->lock); + dead = + threadpool->threads == 0 && + threadpool->scheduled.first == NULL && + threadpool->scheduled_back.first == NULL; + pthread_mutex_unlock(&threadpool->lock); + + if(!dead) + return -1; + + pthread_cond_destroy(&threadpool->cond); + pthread_cond_destroy(&threadpool->die_cond); + pthread_mutex_destroy(&threadpool->lock); + ev_async_stop(EV_A_ &threadpool->async); + free(threadpool); + return 1; +} + + +static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) { + evtp_work_t *item; + + if(queue->first == NULL) + return NULL; + + item = queue->first; + queue->first = item->next; + if(item->next == NULL) + queue->last = NULL; + return item; +} + + +static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) { + item->next = NULL; + if(queue->last) + queue->last->next = item; + else + queue->first = item; + queue->last = item; +} + + +static int evtp_schedule_back(evtp_t *threadpool, evtp_work_t *item) { + int wake = 1; +#if EV_MULTIPLICITY + struct ev_loop *loop = threadpool->loop; +#endif + + pthread_mutex_lock(&threadpool->lock); + if(atomic_test(&threadpool->have_scheduled_back)) + wake = 0; + /* Order is important. */ + atomic_set(&threadpool->have_scheduled_back); + evtp_enqueue(&threadpool->scheduled_back, item); + pthread_mutex_unlock(&threadpool->lock); + + /* TODO: The 'wake' thing looks like an optimization to avoid too much + * ev_async_send() calls. But this might be unnecessary when + * ev_async_send() itself already optimizes this case. ...Right? */ + if(wake) + ev_async_send(EV_A_ &threadpool->async); + + return 0; +} + + +static void *thread_main(void *pool) { + evtp_t *threadpool = pool; + evtp_work_t *item; + +again: + pthread_mutex_lock(&threadpool->lock); + + if(threadpool->scheduled.first == NULL) { + struct timespec ts; + + if(threadpool->dying) + goto die; + + /* Beware when benchmarking. Under Linux with NPTL, idle threads + are slightly counter-productive in some benchmarks, but + extremely productive in others. */ + + /* This constant may need to be tweaked. */ + if(threadpool->idle >= 2) + goto die; + + /* Don't bother with POSIX clocks. */ + ts.tv_sec = time(NULL) + 1; + ts.tv_nsec = 0; + + threadpool->idle++; + pthread_cond_timedwait(&threadpool->cond, &threadpool->lock, &ts); + threadpool->idle--; + if(threadpool->scheduled.first == NULL) + goto die; + } + + item = evtp_dequeue(&threadpool->scheduled); + pthread_mutex_unlock(&threadpool->lock); + + item->work_func(item); + + /* TODO: evtp_schedule_back() needs a lock, too. Perhaps it would be more + * efficient if we obtain the lock before calling it and then avoid the + * re-lock after 'again'? That might cause more lock contention, though, + * since ev_async_send() may do a syscall and thus take a bit longer than + * just a few cycles. */ + evtp_schedule_back(threadpool, item); + + goto again; + +die: + threadpool->threads--; + pthread_cond_broadcast(&threadpool->die_cond); + pthread_mutex_unlock(&threadpool->lock); + return NULL; +} + + +/* This is called with the pool locked. */ +static int evtp_new_thread(evtp_t *threadpool) { + pthread_t thread; + pthread_attr_t attr; + int rc; + + assert(threadpool->threads < threadpool->maxthreads); + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + rc = pthread_create(&thread, &attr, thread_main, (void*)threadpool); + if(rc) { + errno = rc; + return -1; + } + threadpool->threads++; + return 1; +} + + +int evtp_submit(evtp_work_t *work, evtp_t *threadpool, evtp_func_t work_func, evtp_func_t done_func) { + int rc = 0; + int dosignal = 1; + + work->work_func = work_func; + work->done_func = done_func; + work->next = NULL; + + pthread_mutex_lock(&threadpool->lock); + evtp_enqueue(&threadpool->scheduled, work); + if(threadpool->idle == 0) { + dosignal = 0; + if(threadpool->threads < threadpool->maxthreads) { + rc = evtp_new_thread(threadpool); + if(rc < 0 && threadpool->threads > 0) + rc = 0; /* we'll recover */ + } + } + if(dosignal) + pthread_cond_signal(&threadpool->cond); + pthread_mutex_unlock(&threadpool->lock); + + return rc; +} + +/* vim: set noet sw=4 ts=4: */ @@ -0,0 +1,85 @@ +/* +Copyright (c) 2009 by Juliusz Chroboczek +Copyright (c) 2013 by Yoran Heling + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +/* This is a thread pool implementation for convenient use with libev[1]. It + * mimics the libev and libeio[2] API to some extent. The core thread pool code + * is based on the threadpool library[3] by Juliusz Chroboczek. If you're + * looking for a generic thread pool that doesn't rely on libev, use that. + * + * 1. http://software.schmorp.de/pkg/libev + * 2. http://software.schmorp.de/pkg/libeio + * 3. https://github.com/jech/threadpool + * + * TODO: + * - Figure out shutdown + * - Convenient create_work_object_and_submit() + * - Don't hold a ref on the libev loop when nothing is queued? + * - Cancellation? + * - Allow done_func = NULL? (Implies auto-free()-work-object-when-done) + */ + +#ifndef EVTP_H +#define EVTP_H + +#include <ev.h> + +typedef struct evtp_t evtp_t; /* Opaque */ +typedef struct evtp_work_t evtp_work_t; + +typedef void (*evtp_func_t)(evtp_work_t *); + +struct evtp_work_t { + /* Public */ + void *data; + /* Private */ + evtp_func_t work_func; + evtp_func_t done_func; + evtp_t *tp; + evtp_work_t *next; +}; + + +/* Create a new thread pool. May return NULL if malloc() fails. */ +evtp_t *evtp_create(EV_P_ int maxthreads); + + +/* Submit work to the thread pool. work_func() will be called in a worker + * thread. A short while after work_func() returns, done_func() will be called + * in the thread that runs ev_run() on the main loop given to evtp_create(). + * The *work data must remain valid until done_func() is called. + */ +int evtp_submit(evtp_work_t *work, evtp_t *evtp, evtp_func_t work_func, evtp_func_t done_func); + + +/* Cause a thread pool to die. Returns whenever there is new stuff in the + callback queue, or immediately if canblock is false. Returns true when + the thread pool is dead. */ +int evtp_die(evtp_t *threadpool, int canblock); + +/* Destroy a thread pool. Does nothing and returns -1 if the pool is not + dead. */ +int evtp_destroy(evtp_t *threadpool); + +#endif + +/* vim: set noet sw=4 ts=4: */ diff --git a/test/Makefile b/test/Makefile index 4cfe088..ab8c360 100644 --- a/test/Makefile +++ b/test/Makefile @@ -11,16 +11,24 @@ yuri: ../yuri.c ../yuri.h yuri.c ecbuf: ../ecbuf.h ecbuf.c $(CC) $(CFLAGS) -I.. ecbuf.c -o ecbuf -test: yuri ecbuf +evtp: ../evtp.c ../evtp.h evtp.c + $(CC) $(CFLAGS) -I.. ../evtp.c evtp.c -lpthread -lev -o evtp + +test: yuri ecbuf evtp ./yuri ./ecbuf + ./evtp @echo All tests passed. ecbuf-bench: ../ecbuf.h ecbuf-bench.c $(CC) $(CFLAGS) -DNDEBUG -I.. ecbuf-bench.c -o ecbuf-bench -bench: ecbuf-bench +evtp-bench: ../evtp.c ../evtp.h evtp.c + $(CC) $(CFLAGS) -DNDEBUG -DBENCH -I.. ../evtp.c evtp.c -lpthread -lev -o evtp-bench + +bench: ecbuf-bench evtp-bench ./ecbuf-bench + time ./evtp-bench clean: - rm -f yuri ecbuf ecbuf-bench + rm -f yuri ecbuf evtp ecbuf-bench evtp-bench diff --git a/test/evtp.c b/test/evtp.c new file mode 100644 index 0000000..b9813c1 --- /dev/null +++ b/test/evtp.c @@ -0,0 +1,107 @@ +/* Copyright (c) 2012 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#if !defined(BENCH) && defined(NDEBUG) +#error These tests should not be compiled with -DNDEBUG! +#endif + +#define BENCH_LOOPS 100000 + +#include "evtp.h" +#include <assert.h> +#include <stdlib.h> +#include <string.h> + +static evtp_t *tp; +static char data[51]; + +static void done_cb(evtp_work_t *w); +static void work_cb(evtp_work_t *w); + + +static void done_cb(evtp_work_t *w) { + static int done = 0; +#ifdef BENCH + static int loops = 0; +#endif + int i; + + assert(*((char *)w->data) == 1); + free(w); + done++; + + if(done == 1) { + for(i=1; i<50; i++) { + w = calloc(1, sizeof(evtp_work_t)); + w->data = data+i; + evtp_submit(w, tp, work_cb, done_cb); + } + } + +#ifdef BENCH + loops++; + if(loops < BENCH_LOOPS) { + done = 0; + memset(data, 0, sizeof(data)); + w = calloc(1, sizeof(evtp_work_t)); + w->data = data; + evtp_submit(w, tp, work_cb, done_cb); + return; + } +#endif + + /* TODO: Shouldn't be necessary when evtp drops its ref when nothing is queued */ + if(done == 50) + ev_break(EV_DEFAULT_ EVBREAK_ONE); +} + + +static void work_cb(evtp_work_t *w) { + char *d = w->data; + assert(*d == 0); + *d = 1; +} + + +int main(int argc, char **argv) { + ev_default_loop(0); + tp = evtp_create(EV_DEFAULT_ 4); + + evtp_work_t *w = calloc(1, sizeof(evtp_work_t)); + w->data = data; + evtp_submit(w, tp, work_cb, done_cb); + w = NULL; + + ev_run(EV_DEFAULT_ 0); + + assert(evtp_die(tp, 1) == 1); + assert(evtp_destroy(tp) == 1); + tp = NULL; + + int i; + for(i=0; i<50; i++) + assert(data[i] == 1); + + return 0; +} + +/* vim: set noet sw=4 ts=4: */ |