summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ylib/evtp.c327
-rw-r--r--ylib/evtp.h87
2 files changed, 204 insertions, 210 deletions
diff --git a/ylib/evtp.c b/ylib/evtp.c
index 680e957..eade2d1 100644
--- a/ylib/evtp.c
+++ b/ylib/evtp.c
@@ -1,24 +1,23 @@
-/*
-Copyright (c) 2009 by Juliusz Chroboczek
-Copyright (c) 2013 by Yoran Heling
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
+/* Copyright (c) 2013 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#if defined(EV_CONFIG_H)
@@ -40,34 +39,53 @@ typedef struct evtp_queue_t {
evtp_work_t *last;
} evtp_queue_t;
+
struct evtp_t {
- int maxthreads, threads, idle;
- evtp_queue_t scheduled, scheduled_back;
- /* Set when we request that all threads die. */
- int dying;
- /* Protects everything. */
pthread_mutex_t lock;
- /* Signalled whenever a new continuation is enqueued or dying is set. */
pthread_cond_t cond;
- /* Signalled whenever a thread dies. */
pthread_cond_t die_cond;
+ evtp_queue_t work, results;
+ int maxthreads, threads, idle, kill;
+ ev_async async;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
- ev_async async;
};
-/* (This was threadpool_run_callbacks()) */
+static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) {
+ evtp_work_t *item;
+
+ if(queue->first == NULL)
+ return NULL;
+
+ item = queue->first;
+ queue->first = item->next;
+ if(item->next == NULL)
+ queue->last = NULL;
+ return item;
+}
+
+
+static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) {
+ item->next = NULL;
+ if(queue->last)
+ queue->last->next = item;
+ else
+ queue->first = item;
+ queue->last = item;
+}
+
+
static void evtp_async(EV_P_ ev_async *async, int revents) {
- evtp_t *threadpool = async->data;
+ evtp_t *tp = async->data;
evtp_work_t *items;
- pthread_mutex_lock(&threadpool->lock);
- items = threadpool->scheduled_back.first;
- threadpool->scheduled_back.first = NULL;
- threadpool->scheduled_back.last = NULL;
- pthread_mutex_unlock(&threadpool->lock);
+ pthread_mutex_lock(&tp->lock);
+ items = tp->results.first;
+ tp->results.first = NULL;
+ tp->results.last = NULL;
+ pthread_mutex_unlock(&tp->lock);
while(items) {
evtp_work_t *first = items;
@@ -82,10 +100,8 @@ static void evtp_async(EV_P_ ev_async *async, int revents) {
evtp_t *evtp_create(EV_P_ int maxthreads) {
evtp_t *tp = calloc(1, sizeof(evtp_t));
- if(tp == NULL)
- return NULL;
-
tp->maxthreads = maxthreads;
+
pthread_mutex_init(&tp->lock, NULL);
pthread_cond_init(&tp->cond, NULL);
pthread_cond_init(&tp->die_cond, NULL);
@@ -102,179 +118,134 @@ evtp_t *evtp_create(EV_P_ int maxthreads) {
}
-int evtp_die(evtp_t *threadpool, int canblock) {
- int done;
+static void *evtp_thread(void *data) {
+ evtp_t *tp = data;
+#if EV_MULTIPLICITY
+ struct ev_loop *loop = tp->loop;
+#endif
- pthread_mutex_lock(&threadpool->lock);
+ pthread_mutex_lock(&tp->lock);
+ while(1) {
+ if(tp->kill > 0) {
+ tp->kill--;
+ if(tp->kill > 0)
+ pthread_cond_signal(&tp->cond);
+ pthread_cond_signal(&tp->die_cond);
+ break;
+ }
- threadpool->dying = 1;
- pthread_cond_broadcast(&threadpool->cond);
+ evtp_work_t *work = evtp_dequeue(&tp->work);
+ if(work) {
+ pthread_mutex_unlock(&tp->lock);
+ work->work_func(work);
+ pthread_mutex_lock(&tp->lock);
+ evtp_enqueue(&tp->results, work);
+ ev_async_send(EV_A_ &tp->async);
+ continue;
+ }
- while(threadpool->threads > 0) {
- if(threadpool->scheduled_back.first || !canblock)
- break;
- pthread_cond_wait(&threadpool->die_cond, &threadpool->lock);
+ tp->idle++;
+ pthread_cond_wait(&tp->cond, &tp->lock);
+ tp->idle--;
}
- done = threadpool->threads == 0;
-
- pthread_mutex_unlock(&threadpool->lock);
- return done;
+ tp->threads--;
+ pthread_mutex_unlock(&tp->lock);
+ return NULL;
}
-int evtp_destroy(evtp_t *threadpool) {
- int dead;
-#if EV_MULTIPLICITY
- struct ev_loop *loop = threadpool->loop;
-#endif
-
- pthread_mutex_lock(&threadpool->lock);
- dead =
- threadpool->threads == 0 &&
- threadpool->scheduled.first == NULL &&
- threadpool->scheduled_back.first == NULL;
- pthread_mutex_unlock(&threadpool->lock);
-
- if(!dead)
- return -1;
-
- pthread_cond_destroy(&threadpool->cond);
- pthread_cond_destroy(&threadpool->die_cond);
- pthread_mutex_destroy(&threadpool->lock);
- ev_ref(EV_A);
- ev_async_stop(EV_A_ &threadpool->async);
- free(threadpool);
+/* Must be called while the lock is held */
+static int evtp_spawn(evtp_t *tp) {
+ if(tp->kill) { /* Why spawn a new thread when we've just attempted to kill one? */
+ tp->kill--;
+ return 1;
+ }
+ pthread_t thread;
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ int r = pthread_create(&thread, &attr, evtp_thread, tp);
+ if(r) {
+ errno = r;
+ return tp->threads ? 0 : -1;
+ }
+ tp->threads++;
return 1;
}
-static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) {
- evtp_work_t *item;
-
- if(queue->first == NULL)
- return NULL;
+int evtp_maxthreads(evtp_t *tp, int maxthreads) {
+ int r = 1;
+ pthread_mutex_lock(&tp->lock);
+ tp->maxthreads = maxthreads;
- item = queue->first;
- queue->first = item->next;
- if(item->next == NULL)
- queue->last = NULL;
- return item;
-}
+ if(tp->threads > maxthreads) {
+ tp->kill = tp->threads - maxthreads;
+ if(tp->idle)
+ pthread_cond_signal(&tp->cond);
+ }
+ evtp_work_t *work = tp->work.first;
+ while(work && tp->threads-tp->kill < maxthreads && r >= 0) {
+ r = evtp_spawn(tp);
+ work = work->next;
+ }
-static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) {
- item->next = NULL;
- if(queue->last)
- queue->last->next = item;
- else
- queue->first = item;
- queue->last = item;
+ pthread_mutex_unlock(&tp->lock);
+ return r;
}
-static void *thread_main(void *pool) {
- evtp_t *threadpool = pool;
- evtp_work_t *item;
+int evtp_submit(evtp_work_t *work, evtp_t *tp, evtp_func_t work_func, evtp_func_t done_func) {
#if EV_MULTIPLICITY
- struct ev_loop *loop = threadpool->loop;
+ struct ev_loop *loop = tp->loop;
#endif
+ int r = 1;
- pthread_mutex_lock(&threadpool->lock);
-
-again:
- if(threadpool->scheduled.first == NULL) {
- struct timespec ts;
-
- if(threadpool->dying)
- goto die;
-
- /* Beware when benchmarking. Under Linux with NPTL, idle threads
- are slightly counter-productive in some benchmarks, but
- extremely productive in others. */
-
- /* This constant may need to be tweaked. */
- if(threadpool->idle >= 2)
- goto die;
+ work->work_func = work_func;
+ work->done_func = done_func;
- /* Don't bother with POSIX clocks. */
- ts.tv_sec = time(NULL) + 1;
- ts.tv_nsec = 0;
+ pthread_mutex_lock(&tp->lock);
+ if(tp->idle <= tp->kill && tp->threads-tp->kill < tp->maxthreads)
+ r = evtp_spawn(tp);
- threadpool->idle++;
- pthread_cond_timedwait(&threadpool->cond, &threadpool->lock, &ts);
- threadpool->idle--;
- if(threadpool->scheduled.first == NULL)
- goto die;
+ if(r >= 0) {
+ evtp_enqueue(&tp->work, work);
+ ev_ref(EV_A);
+ if(tp->idle)
+ pthread_cond_signal(&tp->cond);
}
- item = evtp_dequeue(&threadpool->scheduled);
- pthread_mutex_unlock(&threadpool->lock);
-
- item->work_func(item);
-
- pthread_mutex_lock(&threadpool->lock);
- evtp_enqueue(&threadpool->scheduled_back, item);
- ev_async_send(EV_A_ &threadpool->async);
-
- goto again;
-
-die:
- threadpool->threads--;
- pthread_cond_broadcast(&threadpool->die_cond);
- pthread_mutex_unlock(&threadpool->lock);
- return NULL;
+ pthread_mutex_unlock(&tp->lock);
+ return r;
}
-/* This is called with the pool locked. */
-static int evtp_new_thread(evtp_t *threadpool) {
- pthread_t thread;
- pthread_attr_t attr;
- int rc;
-
- assert(threadpool->threads < threadpool->maxthreads);
-
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- rc = pthread_create(&thread, &attr, thread_main, (void*)threadpool);
- if(rc) {
- errno = rc;
- return -1;
- }
- threadpool->threads++;
- return 1;
-}
-
-
-int evtp_submit(evtp_work_t *work, evtp_t *threadpool, evtp_func_t work_func, evtp_func_t done_func) {
- int rc = 0;
- int dosignal = 1;
+int evtp_destroy(evtp_t *tp, int force) {
#if EV_MULTIPLICITY
- struct ev_loop *loop = threadpool->loop;
+ struct ev_loop *loop = tp->loop;
#endif
- work->work_func = work_func;
- work->done_func = done_func;
- work->next = NULL;
-
- pthread_mutex_lock(&threadpool->lock);
- evtp_enqueue(&threadpool->scheduled, work);
- if(threadpool->idle == 0) {
- dosignal = 0;
- if(threadpool->threads < threadpool->maxthreads) {
- rc = evtp_new_thread(threadpool);
- if(rc < 0 && threadpool->threads > 0)
- rc = 0; /* we'll recover */
- }
+ pthread_mutex_lock(&tp->lock);
+ if(!force && (tp->work.first || tp->results.first)) {
+ pthread_mutex_unlock(&tp->lock);
+ return -1;
}
- if(dosignal)
- pthread_cond_signal(&threadpool->cond);
- pthread_mutex_unlock(&threadpool->lock);
- ev_ref(EV_A);
+ tp->kill = tp->threads;
+ pthread_cond_signal(&tp->cond);
+ while(tp->threads > 0)
+ pthread_cond_wait(&tp->die_cond, &tp->lock);
+ pthread_mutex_unlock(&tp->lock);
- return rc;
+ pthread_cond_destroy(&tp->cond);
+ pthread_cond_destroy(&tp->die_cond);
+ pthread_mutex_destroy(&tp->lock);
+ ev_ref(EV_A);
+ ev_async_stop(EV_A_ &tp->async);
+ free(tp);
+ return 1;
}
/* vim: set noet sw=4 ts=4: */
diff --git a/ylib/evtp.h b/ylib/evtp.h
index 84d1ebd..25aa675 100644
--- a/ylib/evtp.h
+++ b/ylib/evtp.h
@@ -1,36 +1,40 @@
-/*
-Copyright (c) 2009 by Juliusz Chroboczek
-Copyright (c) 2013 by Yoran Heling
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
+/* Copyright (c) 2013 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/* This is a thread pool implementation for convenient use with libev[1]. It
- * mimics the libev and libeio[2] API to some extent. The core thread pool code
- * is based on the threadpool library[3] by Juliusz Chroboczek. If you're
- * looking for a generic thread pool that doesn't rely on libev, use that.
+ * mimics the libev and libeio[2] API to some extent.
+ *
+ * The initial thread pool code was based on the threadpool library[3] by
+ * Juliusz Chroboczek, but the current implementation is quite different.
+ * Still, if you're looking for a generic thread pool that doesn't rely on
+ * libev, that might be a good choice.
*
* 1. http://software.schmorp.de/pkg/libev
* 2. http://software.schmorp.de/pkg/libeio
* 3. https://github.com/jech/threadpool
*
* TODO:
+ * - Automatically kill some idle threads? The application can already use
+ * evtp_maxthreads() to achieve the same effect.
* - Cancellation?
* - Allow done_func = NULL? (Implies auto-free()-work-object-when-done)
*/
@@ -65,6 +69,25 @@ struct evtp_work_t {
evtp_t *evtp_create(EV_P_ int maxthreads);
+/* Dynamically change the maximum number of threads. New threads will be
+ * created when this value is increased and there is enough work to do. If this
+ * value is decreased, some threads will be killed to ensure that the number of
+ * threads will remain below the configured maximum.
+ *
+ * Note that a thread can only be killed when it's not running a work_func(),
+ * so there may be a small delay before the new maxthreads value is honored.
+ * Temporarily setting maxthreads to '0' is a valid way to pause processing of
+ * queued work objects.
+ * Temporarily setting maxthreads to '0' directly followed by resetting it to
+ * its previous value is a valid way to kill all idle threads.
+ *
+ * Returns -1 if we have work queued, but pthread_create() failed and we have
+ * no other threads running (fatal), returns 0 if pthread_create() failed but
+ * we still have a worker thread (recoverable), or 1 if everything went fine.
+ */
+int evtp_maxthreads(evtp_t *evtp, int maxthreads);
+
+
/* Submit work to the thread pool. Returns -1 if pthread_create() failed and we
* have no threads running (fatal), 0 if pthread_create() failed but we still
* have a worker thread (recoverable), or 1 if everything went fine.
@@ -91,14 +114,14 @@ static inline evtp_work_t *evtp_submit_new(evtp_t *evtp, evtp_func_t work_func,
}
-/* Cause a thread pool to die. Returns whenever there is new stuff in the
- callback queue, or immediately if canblock is false. Returns true when
- the thread pool is dead. */
-int evtp_die(evtp_t *threadpool, int canblock);
-
-/* Destroy a thread pool. Does nothing and returns -1 if the pool is not
- dead. */
-int evtp_destroy(evtp_t *threadpool);
+/* Destroy a thread pool. If there is still work scheduled, this function does
+ * nothing and returns -1. If force is non-zero, then the thread pool is
+ * destroyed even if there is still work scheduled. In that case, the work_func
+ * and/or done_func callbacks will NOT be run for those objects.
+ *
+ * This function blocks until all threads have been destroyed and returns 1 on
+ * success. */
+int evtp_destroy(evtp_t *threadpool, int force);
#endif