summaryrefslogtreecommitdiff
path: root/ylib/evtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'ylib/evtp.c')
-rw-r--r--ylib/evtp.c327
1 files changed, 149 insertions, 178 deletions
diff --git a/ylib/evtp.c b/ylib/evtp.c
index 680e957..eade2d1 100644
--- a/ylib/evtp.c
+++ b/ylib/evtp.c
@@ -1,24 +1,23 @@
-/*
-Copyright (c) 2009 by Juliusz Chroboczek
-Copyright (c) 2013 by Yoran Heling
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
+/* Copyright (c) 2013 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#if defined(EV_CONFIG_H)
@@ -40,34 +39,53 @@ typedef struct evtp_queue_t {
evtp_work_t *last;
} evtp_queue_t;
+
struct evtp_t {
- int maxthreads, threads, idle;
- evtp_queue_t scheduled, scheduled_back;
- /* Set when we request that all threads die. */
- int dying;
- /* Protects everything. */
pthread_mutex_t lock;
- /* Signalled whenever a new continuation is enqueued or dying is set. */
pthread_cond_t cond;
- /* Signalled whenever a thread dies. */
pthread_cond_t die_cond;
+ evtp_queue_t work, results;
+ int maxthreads, threads, idle, kill;
+ ev_async async;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
- ev_async async;
};
-/* (This was threadpool_run_callbacks()) */
+static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) {
+ evtp_work_t *item;
+
+ if(queue->first == NULL)
+ return NULL;
+
+ item = queue->first;
+ queue->first = item->next;
+ if(item->next == NULL)
+ queue->last = NULL;
+ return item;
+}
+
+
+static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) {
+ item->next = NULL;
+ if(queue->last)
+ queue->last->next = item;
+ else
+ queue->first = item;
+ queue->last = item;
+}
+
+
static void evtp_async(EV_P_ ev_async *async, int revents) {
- evtp_t *threadpool = async->data;
+ evtp_t *tp = async->data;
evtp_work_t *items;
- pthread_mutex_lock(&threadpool->lock);
- items = threadpool->scheduled_back.first;
- threadpool->scheduled_back.first = NULL;
- threadpool->scheduled_back.last = NULL;
- pthread_mutex_unlock(&threadpool->lock);
+ pthread_mutex_lock(&tp->lock);
+ items = tp->results.first;
+ tp->results.first = NULL;
+ tp->results.last = NULL;
+ pthread_mutex_unlock(&tp->lock);
while(items) {
evtp_work_t *first = items;
@@ -82,10 +100,8 @@ static void evtp_async(EV_P_ ev_async *async, int revents) {
evtp_t *evtp_create(EV_P_ int maxthreads) {
evtp_t *tp = calloc(1, sizeof(evtp_t));
- if(tp == NULL)
- return NULL;
-
tp->maxthreads = maxthreads;
+
pthread_mutex_init(&tp->lock, NULL);
pthread_cond_init(&tp->cond, NULL);
pthread_cond_init(&tp->die_cond, NULL);
@@ -102,179 +118,134 @@ evtp_t *evtp_create(EV_P_ int maxthreads) {
}
-int evtp_die(evtp_t *threadpool, int canblock) {
- int done;
+static void *evtp_thread(void *data) {
+ evtp_t *tp = data;
+#if EV_MULTIPLICITY
+ struct ev_loop *loop = tp->loop;
+#endif
- pthread_mutex_lock(&threadpool->lock);
+ pthread_mutex_lock(&tp->lock);
+ while(1) {
+ if(tp->kill > 0) {
+ tp->kill--;
+ if(tp->kill > 0)
+ pthread_cond_signal(&tp->cond);
+ pthread_cond_signal(&tp->die_cond);
+ break;
+ }
- threadpool->dying = 1;
- pthread_cond_broadcast(&threadpool->cond);
+ evtp_work_t *work = evtp_dequeue(&tp->work);
+ if(work) {
+ pthread_mutex_unlock(&tp->lock);
+ work->work_func(work);
+ pthread_mutex_lock(&tp->lock);
+ evtp_enqueue(&tp->results, work);
+ ev_async_send(EV_A_ &tp->async);
+ continue;
+ }
- while(threadpool->threads > 0) {
- if(threadpool->scheduled_back.first || !canblock)
- break;
- pthread_cond_wait(&threadpool->die_cond, &threadpool->lock);
+ tp->idle++;
+ pthread_cond_wait(&tp->cond, &tp->lock);
+ tp->idle--;
}
- done = threadpool->threads == 0;
-
- pthread_mutex_unlock(&threadpool->lock);
- return done;
+ tp->threads--;
+ pthread_mutex_unlock(&tp->lock);
+ return NULL;
}
-int evtp_destroy(evtp_t *threadpool) {
- int dead;
-#if EV_MULTIPLICITY
- struct ev_loop *loop = threadpool->loop;
-#endif
-
- pthread_mutex_lock(&threadpool->lock);
- dead =
- threadpool->threads == 0 &&
- threadpool->scheduled.first == NULL &&
- threadpool->scheduled_back.first == NULL;
- pthread_mutex_unlock(&threadpool->lock);
-
- if(!dead)
- return -1;
-
- pthread_cond_destroy(&threadpool->cond);
- pthread_cond_destroy(&threadpool->die_cond);
- pthread_mutex_destroy(&threadpool->lock);
- ev_ref(EV_A);
- ev_async_stop(EV_A_ &threadpool->async);
- free(threadpool);
+/* Must be called while the lock is held */
+static int evtp_spawn(evtp_t *tp) {
+ if(tp->kill) { /* Why spawn a new thread when we've just attempted to kill one? */
+ tp->kill--;
+ return 1;
+ }
+ pthread_t thread;
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ int r = pthread_create(&thread, &attr, evtp_thread, tp);
+ if(r) {
+ errno = r;
+ return tp->threads ? 0 : -1;
+ }
+ tp->threads++;
return 1;
}
-static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) {
- evtp_work_t *item;
-
- if(queue->first == NULL)
- return NULL;
+int evtp_maxthreads(evtp_t *tp, int maxthreads) {
+ int r = 1;
+ pthread_mutex_lock(&tp->lock);
+ tp->maxthreads = maxthreads;
- item = queue->first;
- queue->first = item->next;
- if(item->next == NULL)
- queue->last = NULL;
- return item;
-}
+ if(tp->threads > maxthreads) {
+ tp->kill = tp->threads - maxthreads;
+ if(tp->idle)
+ pthread_cond_signal(&tp->cond);
+ }
+ evtp_work_t *work = tp->work.first;
+ while(work && tp->threads-tp->kill < maxthreads && r >= 0) {
+ r = evtp_spawn(tp);
+ work = work->next;
+ }
-static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) {
- item->next = NULL;
- if(queue->last)
- queue->last->next = item;
- else
- queue->first = item;
- queue->last = item;
+ pthread_mutex_unlock(&tp->lock);
+ return r;
}
-static void *thread_main(void *pool) {
- evtp_t *threadpool = pool;
- evtp_work_t *item;
+int evtp_submit(evtp_work_t *work, evtp_t *tp, evtp_func_t work_func, evtp_func_t done_func) {
#if EV_MULTIPLICITY
- struct ev_loop *loop = threadpool->loop;
+ struct ev_loop *loop = tp->loop;
#endif
+ int r = 1;
- pthread_mutex_lock(&threadpool->lock);
-
-again:
- if(threadpool->scheduled.first == NULL) {
- struct timespec ts;
-
- if(threadpool->dying)
- goto die;
-
- /* Beware when benchmarking. Under Linux with NPTL, idle threads
- are slightly counter-productive in some benchmarks, but
- extremely productive in others. */
-
- /* This constant may need to be tweaked. */
- if(threadpool->idle >= 2)
- goto die;
+ work->work_func = work_func;
+ work->done_func = done_func;
- /* Don't bother with POSIX clocks. */
- ts.tv_sec = time(NULL) + 1;
- ts.tv_nsec = 0;
+ pthread_mutex_lock(&tp->lock);
+ if(tp->idle <= tp->kill && tp->threads-tp->kill < tp->maxthreads)
+ r = evtp_spawn(tp);
- threadpool->idle++;
- pthread_cond_timedwait(&threadpool->cond, &threadpool->lock, &ts);
- threadpool->idle--;
- if(threadpool->scheduled.first == NULL)
- goto die;
+ if(r >= 0) {
+ evtp_enqueue(&tp->work, work);
+ ev_ref(EV_A);
+ if(tp->idle)
+ pthread_cond_signal(&tp->cond);
}
- item = evtp_dequeue(&threadpool->scheduled);
- pthread_mutex_unlock(&threadpool->lock);
-
- item->work_func(item);
-
- pthread_mutex_lock(&threadpool->lock);
- evtp_enqueue(&threadpool->scheduled_back, item);
- ev_async_send(EV_A_ &threadpool->async);
-
- goto again;
-
-die:
- threadpool->threads--;
- pthread_cond_broadcast(&threadpool->die_cond);
- pthread_mutex_unlock(&threadpool->lock);
- return NULL;
+ pthread_mutex_unlock(&tp->lock);
+ return r;
}
-/* This is called with the pool locked. */
-static int evtp_new_thread(evtp_t *threadpool) {
- pthread_t thread;
- pthread_attr_t attr;
- int rc;
-
- assert(threadpool->threads < threadpool->maxthreads);
-
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- rc = pthread_create(&thread, &attr, thread_main, (void*)threadpool);
- if(rc) {
- errno = rc;
- return -1;
- }
- threadpool->threads++;
- return 1;
-}
-
-
-int evtp_submit(evtp_work_t *work, evtp_t *threadpool, evtp_func_t work_func, evtp_func_t done_func) {
- int rc = 0;
- int dosignal = 1;
+int evtp_destroy(evtp_t *tp, int force) {
#if EV_MULTIPLICITY
- struct ev_loop *loop = threadpool->loop;
+ struct ev_loop *loop = tp->loop;
#endif
- work->work_func = work_func;
- work->done_func = done_func;
- work->next = NULL;
-
- pthread_mutex_lock(&threadpool->lock);
- evtp_enqueue(&threadpool->scheduled, work);
- if(threadpool->idle == 0) {
- dosignal = 0;
- if(threadpool->threads < threadpool->maxthreads) {
- rc = evtp_new_thread(threadpool);
- if(rc < 0 && threadpool->threads > 0)
- rc = 0; /* we'll recover */
- }
+ pthread_mutex_lock(&tp->lock);
+ if(!force && (tp->work.first || tp->results.first)) {
+ pthread_mutex_unlock(&tp->lock);
+ return -1;
}
- if(dosignal)
- pthread_cond_signal(&threadpool->cond);
- pthread_mutex_unlock(&threadpool->lock);
- ev_ref(EV_A);
+ tp->kill = tp->threads;
+ pthread_cond_signal(&tp->cond);
+ while(tp->threads > 0)
+ pthread_cond_wait(&tp->die_cond, &tp->lock);
+ pthread_mutex_unlock(&tp->lock);
- return rc;
+ pthread_cond_destroy(&tp->cond);
+ pthread_cond_destroy(&tp->die_cond);
+ pthread_mutex_destroy(&tp->lock);
+ ev_ref(EV_A);
+ ev_async_stop(EV_A_ &tp->async);
+ free(tp);
+ return 1;
}
/* vim: set noet sw=4 ts=4: */