diff options
author | Yorhel <git@yorhel.nl> | 2013-06-29 12:43:32 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2013-06-29 12:43:32 +0200 |
commit | cb2f507eb4fc8c9adfc0f71d6e4c5e3e2f9c6067 (patch) | |
tree | f956b43ed9a184382466847a2321efc11862c477 /ylib/evtp.c | |
parent | 1ad9ca7a5fa51921b83ecd2f637c76e12ac8fd70 (diff) |
Diffstat (limited to 'ylib/evtp.c')
-rw-r--r-- | ylib/evtp.c | 327 |
1 files changed, 149 insertions, 178 deletions
diff --git a/ylib/evtp.c b/ylib/evtp.c index 680e957..eade2d1 100644 --- a/ylib/evtp.c +++ b/ylib/evtp.c @@ -1,24 +1,23 @@ -/* -Copyright (c) 2009 by Juliusz Chroboczek -Copyright (c) 2013 by Yoran Heling - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. +/* Copyright (c) 2013 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #if defined(EV_CONFIG_H) @@ -40,34 +39,53 @@ typedef struct evtp_queue_t { evtp_work_t *last; } evtp_queue_t; + struct evtp_t { - int maxthreads, threads, idle; - evtp_queue_t scheduled, scheduled_back; - /* Set when we request that all threads die. */ - int dying; - /* Protects everything. */ pthread_mutex_t lock; - /* Signalled whenever a new continuation is enqueued or dying is set. */ pthread_cond_t cond; - /* Signalled whenever a thread dies. */ pthread_cond_t die_cond; + evtp_queue_t work, results; + int maxthreads, threads, idle, kill; + ev_async async; #if EV_MULTIPLICITY struct ev_loop *loop; #endif - ev_async async; }; -/* (This was threadpool_run_callbacks()) */ +static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) { + evtp_work_t *item; + + if(queue->first == NULL) + return NULL; + + item = queue->first; + queue->first = item->next; + if(item->next == NULL) + queue->last = NULL; + return item; +} + + +static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) { + item->next = NULL; + if(queue->last) + queue->last->next = item; + else + queue->first = item; + queue->last = item; +} + + static void evtp_async(EV_P_ ev_async *async, int revents) { - evtp_t *threadpool = async->data; + evtp_t *tp = async->data; evtp_work_t *items; - pthread_mutex_lock(&threadpool->lock); - items = threadpool->scheduled_back.first; - threadpool->scheduled_back.first = NULL; - threadpool->scheduled_back.last = NULL; - pthread_mutex_unlock(&threadpool->lock); + pthread_mutex_lock(&tp->lock); + items = tp->results.first; + tp->results.first = NULL; + tp->results.last = NULL; + pthread_mutex_unlock(&tp->lock); while(items) { evtp_work_t *first = items; @@ -82,10 +100,8 @@ static void evtp_async(EV_P_ ev_async *async, int revents) { evtp_t *evtp_create(EV_P_ int maxthreads) { evtp_t *tp = calloc(1, sizeof(evtp_t)); - if(tp == NULL) - return NULL; - tp->maxthreads = maxthreads; + pthread_mutex_init(&tp->lock, NULL); pthread_cond_init(&tp->cond, NULL); pthread_cond_init(&tp->die_cond, NULL); @@ -102,179 +118,134 @@ evtp_t *evtp_create(EV_P_ int maxthreads) { } -int evtp_die(evtp_t *threadpool, int canblock) { - int done; +static void *evtp_thread(void *data) { + evtp_t *tp = data; +#if EV_MULTIPLICITY + struct ev_loop *loop = tp->loop; +#endif - pthread_mutex_lock(&threadpool->lock); + pthread_mutex_lock(&tp->lock); + while(1) { + if(tp->kill > 0) { + tp->kill--; + if(tp->kill > 0) + pthread_cond_signal(&tp->cond); + pthread_cond_signal(&tp->die_cond); + break; + } - threadpool->dying = 1; - pthread_cond_broadcast(&threadpool->cond); + evtp_work_t *work = evtp_dequeue(&tp->work); + if(work) { + pthread_mutex_unlock(&tp->lock); + work->work_func(work); + pthread_mutex_lock(&tp->lock); + evtp_enqueue(&tp->results, work); + ev_async_send(EV_A_ &tp->async); + continue; + } - while(threadpool->threads > 0) { - if(threadpool->scheduled_back.first || !canblock) - break; - pthread_cond_wait(&threadpool->die_cond, &threadpool->lock); + tp->idle++; + pthread_cond_wait(&tp->cond, &tp->lock); + tp->idle--; } - done = threadpool->threads == 0; - - pthread_mutex_unlock(&threadpool->lock); - return done; + tp->threads--; + pthread_mutex_unlock(&tp->lock); + return NULL; } -int evtp_destroy(evtp_t *threadpool) { - int dead; -#if EV_MULTIPLICITY - struct ev_loop *loop = threadpool->loop; -#endif - - pthread_mutex_lock(&threadpool->lock); - dead = - threadpool->threads == 0 && - threadpool->scheduled.first == NULL && - threadpool->scheduled_back.first == NULL; - pthread_mutex_unlock(&threadpool->lock); - - if(!dead) - return -1; - - pthread_cond_destroy(&threadpool->cond); - pthread_cond_destroy(&threadpool->die_cond); - pthread_mutex_destroy(&threadpool->lock); - ev_ref(EV_A); - ev_async_stop(EV_A_ &threadpool->async); - free(threadpool); +/* Must be called while the lock is held */ +static int evtp_spawn(evtp_t *tp) { + if(tp->kill) { /* Why spawn a new thread when we've just attempted to kill one? */ + tp->kill--; + return 1; + } + pthread_t thread; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + int r = pthread_create(&thread, &attr, evtp_thread, tp); + if(r) { + errno = r; + return tp->threads ? 0 : -1; + } + tp->threads++; return 1; } -static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) { - evtp_work_t *item; - - if(queue->first == NULL) - return NULL; +int evtp_maxthreads(evtp_t *tp, int maxthreads) { + int r = 1; + pthread_mutex_lock(&tp->lock); + tp->maxthreads = maxthreads; - item = queue->first; - queue->first = item->next; - if(item->next == NULL) - queue->last = NULL; - return item; -} + if(tp->threads > maxthreads) { + tp->kill = tp->threads - maxthreads; + if(tp->idle) + pthread_cond_signal(&tp->cond); + } + evtp_work_t *work = tp->work.first; + while(work && tp->threads-tp->kill < maxthreads && r >= 0) { + r = evtp_spawn(tp); + work = work->next; + } -static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) { - item->next = NULL; - if(queue->last) - queue->last->next = item; - else - queue->first = item; - queue->last = item; + pthread_mutex_unlock(&tp->lock); + return r; } -static void *thread_main(void *pool) { - evtp_t *threadpool = pool; - evtp_work_t *item; +int evtp_submit(evtp_work_t *work, evtp_t *tp, evtp_func_t work_func, evtp_func_t done_func) { #if EV_MULTIPLICITY - struct ev_loop *loop = threadpool->loop; + struct ev_loop *loop = tp->loop; #endif + int r = 1; - pthread_mutex_lock(&threadpool->lock); - -again: - if(threadpool->scheduled.first == NULL) { - struct timespec ts; - - if(threadpool->dying) - goto die; - - /* Beware when benchmarking. Under Linux with NPTL, idle threads - are slightly counter-productive in some benchmarks, but - extremely productive in others. */ - - /* This constant may need to be tweaked. */ - if(threadpool->idle >= 2) - goto die; + work->work_func = work_func; + work->done_func = done_func; - /* Don't bother with POSIX clocks. */ - ts.tv_sec = time(NULL) + 1; - ts.tv_nsec = 0; + pthread_mutex_lock(&tp->lock); + if(tp->idle <= tp->kill && tp->threads-tp->kill < tp->maxthreads) + r = evtp_spawn(tp); - threadpool->idle++; - pthread_cond_timedwait(&threadpool->cond, &threadpool->lock, &ts); - threadpool->idle--; - if(threadpool->scheduled.first == NULL) - goto die; + if(r >= 0) { + evtp_enqueue(&tp->work, work); + ev_ref(EV_A); + if(tp->idle) + pthread_cond_signal(&tp->cond); } - item = evtp_dequeue(&threadpool->scheduled); - pthread_mutex_unlock(&threadpool->lock); - - item->work_func(item); - - pthread_mutex_lock(&threadpool->lock); - evtp_enqueue(&threadpool->scheduled_back, item); - ev_async_send(EV_A_ &threadpool->async); - - goto again; - -die: - threadpool->threads--; - pthread_cond_broadcast(&threadpool->die_cond); - pthread_mutex_unlock(&threadpool->lock); - return NULL; + pthread_mutex_unlock(&tp->lock); + return r; } -/* This is called with the pool locked. */ -static int evtp_new_thread(evtp_t *threadpool) { - pthread_t thread; - pthread_attr_t attr; - int rc; - - assert(threadpool->threads < threadpool->maxthreads); - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - rc = pthread_create(&thread, &attr, thread_main, (void*)threadpool); - if(rc) { - errno = rc; - return -1; - } - threadpool->threads++; - return 1; -} - - -int evtp_submit(evtp_work_t *work, evtp_t *threadpool, evtp_func_t work_func, evtp_func_t done_func) { - int rc = 0; - int dosignal = 1; +int evtp_destroy(evtp_t *tp, int force) { #if EV_MULTIPLICITY - struct ev_loop *loop = threadpool->loop; + struct ev_loop *loop = tp->loop; #endif - work->work_func = work_func; - work->done_func = done_func; - work->next = NULL; - - pthread_mutex_lock(&threadpool->lock); - evtp_enqueue(&threadpool->scheduled, work); - if(threadpool->idle == 0) { - dosignal = 0; - if(threadpool->threads < threadpool->maxthreads) { - rc = evtp_new_thread(threadpool); - if(rc < 0 && threadpool->threads > 0) - rc = 0; /* we'll recover */ - } + pthread_mutex_lock(&tp->lock); + if(!force && (tp->work.first || tp->results.first)) { + pthread_mutex_unlock(&tp->lock); + return -1; } - if(dosignal) - pthread_cond_signal(&threadpool->cond); - pthread_mutex_unlock(&threadpool->lock); - ev_ref(EV_A); + tp->kill = tp->threads; + pthread_cond_signal(&tp->cond); + while(tp->threads > 0) + pthread_cond_wait(&tp->die_cond, &tp->lock); + pthread_mutex_unlock(&tp->lock); - return rc; + pthread_cond_destroy(&tp->cond); + pthread_cond_destroy(&tp->die_cond); + pthread_mutex_destroy(&tp->lock); + ev_ref(EV_A); + ev_async_stop(EV_A_ &tp->async); + free(tp); + return 1; } /* vim: set noet sw=4 ts=4: */ |