From cb2f507eb4fc8c9adfc0f71d6e4c5e3e2f9c6067 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Sat, 29 Jun 2013 12:43:32 +0200 Subject: ylib/evtp update --- ylib/evtp.c | 327 +++++++++++++++++++++++++++--------------------------------- ylib/evtp.h | 87 ++++++++++------ 2 files changed, 204 insertions(+), 210 deletions(-) diff --git a/ylib/evtp.c b/ylib/evtp.c index 680e957..eade2d1 100644 --- a/ylib/evtp.c +++ b/ylib/evtp.c @@ -1,24 +1,23 @@ -/* -Copyright (c) 2009 by Juliusz Chroboczek -Copyright (c) 2013 by Yoran Heling - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. +/* Copyright (c) 2013 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #if defined(EV_CONFIG_H) @@ -40,34 +39,53 @@ typedef struct evtp_queue_t { evtp_work_t *last; } evtp_queue_t; + struct evtp_t { - int maxthreads, threads, idle; - evtp_queue_t scheduled, scheduled_back; - /* Set when we request that all threads die. */ - int dying; - /* Protects everything. */ pthread_mutex_t lock; - /* Signalled whenever a new continuation is enqueued or dying is set. */ pthread_cond_t cond; - /* Signalled whenever a thread dies. */ pthread_cond_t die_cond; + evtp_queue_t work, results; + int maxthreads, threads, idle, kill; + ev_async async; #if EV_MULTIPLICITY struct ev_loop *loop; #endif - ev_async async; }; -/* (This was threadpool_run_callbacks()) */ +static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) { + evtp_work_t *item; + + if(queue->first == NULL) + return NULL; + + item = queue->first; + queue->first = item->next; + if(item->next == NULL) + queue->last = NULL; + return item; +} + + +static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) { + item->next = NULL; + if(queue->last) + queue->last->next = item; + else + queue->first = item; + queue->last = item; +} + + static void evtp_async(EV_P_ ev_async *async, int revents) { - evtp_t *threadpool = async->data; + evtp_t *tp = async->data; evtp_work_t *items; - pthread_mutex_lock(&threadpool->lock); - items = threadpool->scheduled_back.first; - threadpool->scheduled_back.first = NULL; - threadpool->scheduled_back.last = NULL; - pthread_mutex_unlock(&threadpool->lock); + pthread_mutex_lock(&tp->lock); + items = tp->results.first; + tp->results.first = NULL; + tp->results.last = NULL; + pthread_mutex_unlock(&tp->lock); while(items) { evtp_work_t *first = items; @@ -82,10 +100,8 @@ static void evtp_async(EV_P_ ev_async *async, int revents) { evtp_t *evtp_create(EV_P_ int maxthreads) { evtp_t *tp = calloc(1, sizeof(evtp_t)); - if(tp == NULL) - return NULL; - tp->maxthreads = maxthreads; + pthread_mutex_init(&tp->lock, NULL); pthread_cond_init(&tp->cond, NULL); pthread_cond_init(&tp->die_cond, NULL); @@ -102,179 +118,134 @@ evtp_t *evtp_create(EV_P_ int maxthreads) { } -int evtp_die(evtp_t *threadpool, int canblock) { - int done; +static void *evtp_thread(void *data) { + evtp_t *tp = data; +#if EV_MULTIPLICITY + struct ev_loop *loop = tp->loop; +#endif - pthread_mutex_lock(&threadpool->lock); + pthread_mutex_lock(&tp->lock); + while(1) { + if(tp->kill > 0) { + tp->kill--; + if(tp->kill > 0) + pthread_cond_signal(&tp->cond); + pthread_cond_signal(&tp->die_cond); + break; + } - threadpool->dying = 1; - pthread_cond_broadcast(&threadpool->cond); + evtp_work_t *work = evtp_dequeue(&tp->work); + if(work) { + pthread_mutex_unlock(&tp->lock); + work->work_func(work); + pthread_mutex_lock(&tp->lock); + evtp_enqueue(&tp->results, work); + ev_async_send(EV_A_ &tp->async); + continue; + } - while(threadpool->threads > 0) { - if(threadpool->scheduled_back.first || !canblock) - break; - pthread_cond_wait(&threadpool->die_cond, &threadpool->lock); + tp->idle++; + pthread_cond_wait(&tp->cond, &tp->lock); + tp->idle--; } - done = threadpool->threads == 0; - - pthread_mutex_unlock(&threadpool->lock); - return done; + tp->threads--; + pthread_mutex_unlock(&tp->lock); + return NULL; } -int evtp_destroy(evtp_t *threadpool) { - int dead; -#if EV_MULTIPLICITY - struct ev_loop *loop = threadpool->loop; -#endif - - pthread_mutex_lock(&threadpool->lock); - dead = - threadpool->threads == 0 && - threadpool->scheduled.first == NULL && - threadpool->scheduled_back.first == NULL; - pthread_mutex_unlock(&threadpool->lock); - - if(!dead) - return -1; - - pthread_cond_destroy(&threadpool->cond); - pthread_cond_destroy(&threadpool->die_cond); - pthread_mutex_destroy(&threadpool->lock); - ev_ref(EV_A); - ev_async_stop(EV_A_ &threadpool->async); - free(threadpool); +/* Must be called while the lock is held */ +static int evtp_spawn(evtp_t *tp) { + if(tp->kill) { /* Why spawn a new thread when we've just attempted to kill one? */ + tp->kill--; + return 1; + } + pthread_t thread; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + int r = pthread_create(&thread, &attr, evtp_thread, tp); + if(r) { + errno = r; + return tp->threads ? 0 : -1; + } + tp->threads++; return 1; } -static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) { - evtp_work_t *item; - - if(queue->first == NULL) - return NULL; +int evtp_maxthreads(evtp_t *tp, int maxthreads) { + int r = 1; + pthread_mutex_lock(&tp->lock); + tp->maxthreads = maxthreads; - item = queue->first; - queue->first = item->next; - if(item->next == NULL) - queue->last = NULL; - return item; -} + if(tp->threads > maxthreads) { + tp->kill = tp->threads - maxthreads; + if(tp->idle) + pthread_cond_signal(&tp->cond); + } + evtp_work_t *work = tp->work.first; + while(work && tp->threads-tp->kill < maxthreads && r >= 0) { + r = evtp_spawn(tp); + work = work->next; + } -static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) { - item->next = NULL; - if(queue->last) - queue->last->next = item; - else - queue->first = item; - queue->last = item; + pthread_mutex_unlock(&tp->lock); + return r; } -static void *thread_main(void *pool) { - evtp_t *threadpool = pool; - evtp_work_t *item; +int evtp_submit(evtp_work_t *work, evtp_t *tp, evtp_func_t work_func, evtp_func_t done_func) { #if EV_MULTIPLICITY - struct ev_loop *loop = threadpool->loop; + struct ev_loop *loop = tp->loop; #endif + int r = 1; - pthread_mutex_lock(&threadpool->lock); - -again: - if(threadpool->scheduled.first == NULL) { - struct timespec ts; - - if(threadpool->dying) - goto die; - - /* Beware when benchmarking. Under Linux with NPTL, idle threads - are slightly counter-productive in some benchmarks, but - extremely productive in others. */ - - /* This constant may need to be tweaked. */ - if(threadpool->idle >= 2) - goto die; + work->work_func = work_func; + work->done_func = done_func; - /* Don't bother with POSIX clocks. */ - ts.tv_sec = time(NULL) + 1; - ts.tv_nsec = 0; + pthread_mutex_lock(&tp->lock); + if(tp->idle <= tp->kill && tp->threads-tp->kill < tp->maxthreads) + r = evtp_spawn(tp); - threadpool->idle++; - pthread_cond_timedwait(&threadpool->cond, &threadpool->lock, &ts); - threadpool->idle--; - if(threadpool->scheduled.first == NULL) - goto die; + if(r >= 0) { + evtp_enqueue(&tp->work, work); + ev_ref(EV_A); + if(tp->idle) + pthread_cond_signal(&tp->cond); } - item = evtp_dequeue(&threadpool->scheduled); - pthread_mutex_unlock(&threadpool->lock); - - item->work_func(item); - - pthread_mutex_lock(&threadpool->lock); - evtp_enqueue(&threadpool->scheduled_back, item); - ev_async_send(EV_A_ &threadpool->async); - - goto again; - -die: - threadpool->threads--; - pthread_cond_broadcast(&threadpool->die_cond); - pthread_mutex_unlock(&threadpool->lock); - return NULL; + pthread_mutex_unlock(&tp->lock); + return r; } -/* This is called with the pool locked. */ -static int evtp_new_thread(evtp_t *threadpool) { - pthread_t thread; - pthread_attr_t attr; - int rc; - - assert(threadpool->threads < threadpool->maxthreads); - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - rc = pthread_create(&thread, &attr, thread_main, (void*)threadpool); - if(rc) { - errno = rc; - return -1; - } - threadpool->threads++; - return 1; -} - - -int evtp_submit(evtp_work_t *work, evtp_t *threadpool, evtp_func_t work_func, evtp_func_t done_func) { - int rc = 0; - int dosignal = 1; +int evtp_destroy(evtp_t *tp, int force) { #if EV_MULTIPLICITY - struct ev_loop *loop = threadpool->loop; + struct ev_loop *loop = tp->loop; #endif - work->work_func = work_func; - work->done_func = done_func; - work->next = NULL; - - pthread_mutex_lock(&threadpool->lock); - evtp_enqueue(&threadpool->scheduled, work); - if(threadpool->idle == 0) { - dosignal = 0; - if(threadpool->threads < threadpool->maxthreads) { - rc = evtp_new_thread(threadpool); - if(rc < 0 && threadpool->threads > 0) - rc = 0; /* we'll recover */ - } + pthread_mutex_lock(&tp->lock); + if(!force && (tp->work.first || tp->results.first)) { + pthread_mutex_unlock(&tp->lock); + return -1; } - if(dosignal) - pthread_cond_signal(&threadpool->cond); - pthread_mutex_unlock(&threadpool->lock); - ev_ref(EV_A); + tp->kill = tp->threads; + pthread_cond_signal(&tp->cond); + while(tp->threads > 0) + pthread_cond_wait(&tp->die_cond, &tp->lock); + pthread_mutex_unlock(&tp->lock); - return rc; + pthread_cond_destroy(&tp->cond); + pthread_cond_destroy(&tp->die_cond); + pthread_mutex_destroy(&tp->lock); + ev_ref(EV_A); + ev_async_stop(EV_A_ &tp->async); + free(tp); + return 1; } /* vim: set noet sw=4 ts=4: */ diff --git a/ylib/evtp.h b/ylib/evtp.h index 84d1ebd..25aa675 100644 --- a/ylib/evtp.h +++ b/ylib/evtp.h @@ -1,36 +1,40 @@ -/* -Copyright (c) 2009 by Juliusz Chroboczek -Copyright (c) 2013 by Yoran Heling - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. +/* Copyright (c) 2013 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ /* This is a thread pool implementation for convenient use with libev[1]. It - * mimics the libev and libeio[2] API to some extent. The core thread pool code - * is based on the threadpool library[3] by Juliusz Chroboczek. If you're - * looking for a generic thread pool that doesn't rely on libev, use that. + * mimics the libev and libeio[2] API to some extent. + * + * The initial thread pool code was based on the threadpool library[3] by + * Juliusz Chroboczek, but the current implementation is quite different. + * Still, if you're looking for a generic thread pool that doesn't rely on + * libev, that might be a good choice. * * 1. http://software.schmorp.de/pkg/libev * 2. http://software.schmorp.de/pkg/libeio * 3. https://github.com/jech/threadpool * * TODO: + * - Automatically kill some idle threads? The application can already use + * evtp_maxthreads() to achieve the same effect. * - Cancellation? * - Allow done_func = NULL? (Implies auto-free()-work-object-when-done) */ @@ -65,6 +69,25 @@ struct evtp_work_t { evtp_t *evtp_create(EV_P_ int maxthreads); +/* Dynamically change the maximum number of threads. New threads will be + * created when this value is increased and there is enough work to do. If this + * value is decreased, some threads will be killed to ensure that the number of + * threads will remain below the configured maximum. + * + * Note that a thread can only be killed when it's not running a work_func(), + * so there may be a small delay before the new maxthreads value is honored. + * Temporarily setting maxthreads to '0' is a valid way to pause processing of + * queued work objects. + * Temporarily setting maxthreads to '0' directly followed by resetting it to + * its previous value is a valid way to kill all idle threads. + * + * Returns -1 if we have work queued, but pthread_create() failed and we have + * no other threads running (fatal), returns 0 if pthread_create() failed but + * we still have a worker thread (recoverable), or 1 if everything went fine. + */ +int evtp_maxthreads(evtp_t *evtp, int maxthreads); + + /* Submit work to the thread pool. Returns -1 if pthread_create() failed and we * have no threads running (fatal), 0 if pthread_create() failed but we still * have a worker thread (recoverable), or 1 if everything went fine. @@ -91,14 +114,14 @@ static inline evtp_work_t *evtp_submit_new(evtp_t *evtp, evtp_func_t work_func, } -/* Cause a thread pool to die. Returns whenever there is new stuff in the - callback queue, or immediately if canblock is false. Returns true when - the thread pool is dead. */ -int evtp_die(evtp_t *threadpool, int canblock); - -/* Destroy a thread pool. Does nothing and returns -1 if the pool is not - dead. */ -int evtp_destroy(evtp_t *threadpool); +/* Destroy a thread pool. If there is still work scheduled, this function does + * nothing and returns -1. If force is non-zero, then the thread pool is + * destroyed even if there is still work scheduled. In that case, the work_func + * and/or done_func callbacks will NOT be run for those objects. + * + * This function blocks until all threads have been destroyed and returns 1 on + * success. */ +int evtp_destroy(evtp_t *threadpool, int force); #endif -- cgit v1.2.3