summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2013-02-19 11:17:47 +0100
committerYorhel <git@yorhel.nl>2013-02-19 11:18:30 +0100
commit2e05ee9924f62d182a2b33d941e08a08362e3d01 (patch)
tree39b570771ced7c3d40285a1454dea24c953ec7db
parent57905cf2828495d2512f0f0f4f38894421a89dcb (diff)
Added evtp
-rw-r--r--README1
-rw-r--r--evtp.c361
-rw-r--r--evtp.h85
-rw-r--r--test/Makefile14
-rw-r--r--test/evtp.c107
5 files changed, 565 insertions, 3 deletions
diff --git a/README b/README
index 9c488b4..126622f 100644
--- a/README
+++ b/README
@@ -3,6 +3,7 @@ This git repository holds a collection of small and independent C libraries.
dbusev - Register a DBusConnection with libev
dbusuv - Register a DBusConnection with libuv
ecbuf - An efficient expanding circular buffer
+ evtp - A convenient and efficient thread pool for libev
ylog - A small, flexible and efficient logging system for C
yopt - A small, simple and portable getopt_long() replacement
yuri - A minimal URI validation and parsing library
diff --git a/evtp.c b/evtp.c
new file mode 100644
index 0000000..b947007
--- /dev/null
+++ b/evtp.c
@@ -0,0 +1,361 @@
+/*
+Copyright (c) 2009 by Juliusz Chroboczek
+Copyright (c) 2013 by Yoran Heling
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+
+#if defined(EV_CONFIG_H)
+#include EV_CONFIG_H
+#elif defined(HAVE_CONFIG_H)
+#include "config.h"
+#endif
+
+#include "evtp.h"
+
+#include <stdlib.h>
+#include <errno.h>
+#include <assert.h>
+#include <pthread.h>
+
+
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+
+#include <stdatomic.h>
+
+typedef atomic_int threadpool_atomic;
+
+static inline int atomic_test(threadpool_atomic *a) {
+ return !!atomic_load_explicit(a, memory_order_acquire);
+}
+
+static inline void atomic_set(threadpool_atomic *a) {
+ atomic_store_explicit(a, 1, memory_order_release);
+}
+
+static inline void atomic_reset(threadpool_atomic *a) {
+ atomic_store_explicit(a, 0, memory_order_release);
+}
+
+#elif defined(__GNUC__) && \
+ (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 7))
+
+typedef int threadpool_atomic;
+
+static inline int atomic_test(threadpool_atomic *a) {
+ return !!__atomic_load_n(a, __ATOMIC_ACQUIRE);
+}
+
+static inline void atomic_set(threadpool_atomic *a) {
+ __atomic_store_n(a, 1, __ATOMIC_RELEASE);
+}
+
+static inline void atomic_reset(threadpool_atomic *a) {
+ __atomic_store_n(a, 0, __ATOMIC_RELEASE);
+}
+
+#else
+
+/* Since every lock acts as a memory barrier, and we only ever modify
+ atomics under a lock, this is mostly safe. Note however that
+ evtp_async might in principle get a stale copy. */
+
+typedef volatile int threadpool_atomic;
+
+static inline int atomic_test(threadpool_atomic *a) {
+ return *a;
+}
+
+static inline void atomic_set(threadpool_atomic *a) {
+ *a = 1;
+}
+
+static inline void atomic_reset(threadpool_atomic *a) {
+ *a = 0;
+}
+
+#endif
+
+
+typedef struct evtp_queue_t {
+ evtp_work_t *first;
+ evtp_work_t *last;
+} evtp_queue_t;
+
+struct evtp_t {
+ int maxthreads, threads, idle;
+ evtp_queue_t scheduled, scheduled_back;
+ /* Set when we request that all threads die. */
+ int dying;
+ /* If this is false, we are guaranteed that scheduled_back is empty. */
+ threadpool_atomic have_scheduled_back;
+ /* Protects everything except the atomics above. */
+ pthread_mutex_t lock;
+ /* Signalled whenever a new continuation is enqueued or dying is set. */
+ pthread_cond_t cond;
+ /* Signalled whenever a thread dies. */
+ pthread_cond_t die_cond;
+#if EV_MULTIPLICITY
+ struct ev_loop *loop;
+#endif
+ ev_async async;
+};
+
+
+/* (This was threadpool_run_callbacks()) */
+static void evtp_async(EV_P_ ev_async *async, int revents) {
+ evtp_t *threadpool = async->data;
+ evtp_work_t *items;
+
+ if(!atomic_test(&threadpool->have_scheduled_back))
+ return;
+
+ pthread_mutex_lock(&threadpool->lock);
+ items = threadpool->scheduled_back.first;
+ /* Order is important. */
+ threadpool->scheduled_back.first = NULL;
+ threadpool->scheduled_back.last = NULL;
+ atomic_reset(&threadpool->have_scheduled_back);
+ pthread_mutex_unlock(&threadpool->lock);
+
+ while(items) {
+ evtp_work_t *first = items;
+ evtp_func_t func = first->done_func;
+ items = items->next;
+ /* Don't use 'first' after this function, application may have free()d it. */
+ func(first);
+ }
+}
+
+
+evtp_t *evtp_create(EV_P_ int maxthreads) {
+ evtp_t *tp = calloc(1, sizeof(evtp_t));
+ if(tp == NULL)
+ return NULL;
+
+ tp->maxthreads = maxthreads;
+ pthread_mutex_init(&tp->lock, NULL);
+ pthread_cond_init(&tp->cond, NULL);
+ pthread_cond_init(&tp->die_cond, NULL);
+
+ ev_async_init(&tp->async, evtp_async);
+ ev_async_start(EV_A_ &tp->async);
+ tp->async.data = tp;
+#if EV_MULTIPLICITY
+ tp->loop = loop;
+#endif
+
+ return tp;
+}
+
+
+int evtp_die(evtp_t *threadpool, int canblock) {
+ int done;
+
+ pthread_mutex_lock(&threadpool->lock);
+
+ threadpool->dying = 1;
+ pthread_cond_broadcast(&threadpool->cond);
+
+ while(threadpool->threads > 0) {
+ if(threadpool->scheduled_back.first || !canblock)
+ break;
+ pthread_cond_wait(&threadpool->die_cond, &threadpool->lock);
+ }
+
+ done = threadpool->threads == 0;
+
+ pthread_mutex_unlock(&threadpool->lock);
+ return done;
+}
+
+
+int evtp_destroy(evtp_t *threadpool) {
+ int dead;
+#if EV_MULTIPLICITY
+ struct ev_loop *loop = threadpool->loop;
+#endif
+
+ pthread_mutex_lock(&threadpool->lock);
+ dead =
+ threadpool->threads == 0 &&
+ threadpool->scheduled.first == NULL &&
+ threadpool->scheduled_back.first == NULL;
+ pthread_mutex_unlock(&threadpool->lock);
+
+ if(!dead)
+ return -1;
+
+ pthread_cond_destroy(&threadpool->cond);
+ pthread_cond_destroy(&threadpool->die_cond);
+ pthread_mutex_destroy(&threadpool->lock);
+ ev_async_stop(EV_A_ &threadpool->async);
+ free(threadpool);
+ return 1;
+}
+
+
+static evtp_work_t *evtp_dequeue(evtp_queue_t *queue) {
+ evtp_work_t *item;
+
+ if(queue->first == NULL)
+ return NULL;
+
+ item = queue->first;
+ queue->first = item->next;
+ if(item->next == NULL)
+ queue->last = NULL;
+ return item;
+}
+
+
+static void evtp_enqueue(evtp_queue_t *queue, evtp_work_t *item) {
+ item->next = NULL;
+ if(queue->last)
+ queue->last->next = item;
+ else
+ queue->first = item;
+ queue->last = item;
+}
+
+
+static int evtp_schedule_back(evtp_t *threadpool, evtp_work_t *item) {
+ int wake = 1;
+#if EV_MULTIPLICITY
+ struct ev_loop *loop = threadpool->loop;
+#endif
+
+ pthread_mutex_lock(&threadpool->lock);
+ if(atomic_test(&threadpool->have_scheduled_back))
+ wake = 0;
+ /* Order is important. */
+ atomic_set(&threadpool->have_scheduled_back);
+ evtp_enqueue(&threadpool->scheduled_back, item);
+ pthread_mutex_unlock(&threadpool->lock);
+
+ /* TODO: The 'wake' thing looks like an optimization to avoid too much
+ * ev_async_send() calls. But this might be unnecessary when
+ * ev_async_send() itself already optimizes this case. ...Right? */
+ if(wake)
+ ev_async_send(EV_A_ &threadpool->async);
+
+ return 0;
+}
+
+
+static void *thread_main(void *pool) {
+ evtp_t *threadpool = pool;
+ evtp_work_t *item;
+
+again:
+ pthread_mutex_lock(&threadpool->lock);
+
+ if(threadpool->scheduled.first == NULL) {
+ struct timespec ts;
+
+ if(threadpool->dying)
+ goto die;
+
+ /* Beware when benchmarking. Under Linux with NPTL, idle threads
+ are slightly counter-productive in some benchmarks, but
+ extremely productive in others. */
+
+ /* This constant may need to be tweaked. */
+ if(threadpool->idle >= 2)
+ goto die;
+
+ /* Don't bother with POSIX clocks. */
+ ts.tv_sec = time(NULL) + 1;
+ ts.tv_nsec = 0;
+
+ threadpool->idle++;
+ pthread_cond_timedwait(&threadpool->cond, &threadpool->lock, &ts);
+ threadpool->idle--;
+ if(threadpool->scheduled.first == NULL)
+ goto die;
+ }
+
+ item = evtp_dequeue(&threadpool->scheduled);
+ pthread_mutex_unlock(&threadpool->lock);
+
+ item->work_func(item);
+
+ /* TODO: evtp_schedule_back() needs a lock, too. Perhaps it would be more
+ * efficient if we obtain the lock before calling it and then avoid the
+ * re-lock after 'again'? That might cause more lock contention, though,
+ * since ev_async_send() may do a syscall and thus take a bit longer than
+ * just a few cycles. */
+ evtp_schedule_back(threadpool, item);
+
+ goto again;
+
+die:
+ threadpool->threads--;
+ pthread_cond_broadcast(&threadpool->die_cond);
+ pthread_mutex_unlock(&threadpool->lock);
+ return NULL;
+}
+
+
+/* This is called with the pool locked. */
+static int evtp_new_thread(evtp_t *threadpool) {
+ pthread_t thread;
+ pthread_attr_t attr;
+ int rc;
+
+ assert(threadpool->threads < threadpool->maxthreads);
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ rc = pthread_create(&thread, &attr, thread_main, (void*)threadpool);
+ if(rc) {
+ errno = rc;
+ return -1;
+ }
+ threadpool->threads++;
+ return 1;
+}
+
+
+int evtp_submit(evtp_work_t *work, evtp_t *threadpool, evtp_func_t work_func, evtp_func_t done_func) {
+ int rc = 0;
+ int dosignal = 1;
+
+ work->work_func = work_func;
+ work->done_func = done_func;
+ work->next = NULL;
+
+ pthread_mutex_lock(&threadpool->lock);
+ evtp_enqueue(&threadpool->scheduled, work);
+ if(threadpool->idle == 0) {
+ dosignal = 0;
+ if(threadpool->threads < threadpool->maxthreads) {
+ rc = evtp_new_thread(threadpool);
+ if(rc < 0 && threadpool->threads > 0)
+ rc = 0; /* we'll recover */
+ }
+ }
+ if(dosignal)
+ pthread_cond_signal(&threadpool->cond);
+ pthread_mutex_unlock(&threadpool->lock);
+
+ return rc;
+}
+
+/* vim: set noet sw=4 ts=4: */
diff --git a/evtp.h b/evtp.h
new file mode 100644
index 0000000..d2d6461
--- /dev/null
+++ b/evtp.h
@@ -0,0 +1,85 @@
+/*
+Copyright (c) 2009 by Juliusz Chroboczek
+Copyright (c) 2013 by Yoran Heling
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+
+/* This is a thread pool implementation for convenient use with libev[1]. It
+ * mimics the libev and libeio[2] API to some extent. The core thread pool code
+ * is based on the threadpool library[3] by Juliusz Chroboczek. If you're
+ * looking for a generic thread pool that doesn't rely on libev, use that.
+ *
+ * 1. http://software.schmorp.de/pkg/libev
+ * 2. http://software.schmorp.de/pkg/libeio
+ * 3. https://github.com/jech/threadpool
+ *
+ * TODO:
+ * - Figure out shutdown
+ * - Convenient create_work_object_and_submit()
+ * - Don't hold a ref on the libev loop when nothing is queued?
+ * - Cancellation?
+ * - Allow done_func = NULL? (Implies auto-free()-work-object-when-done)
+ */
+
+#ifndef EVTP_H
+#define EVTP_H
+
+#include <ev.h>
+
+typedef struct evtp_t evtp_t; /* Opaque */
+typedef struct evtp_work_t evtp_work_t;
+
+typedef void (*evtp_func_t)(evtp_work_t *);
+
+struct evtp_work_t {
+ /* Public */
+ void *data;
+ /* Private */
+ evtp_func_t work_func;
+ evtp_func_t done_func;
+ evtp_t *tp;
+ evtp_work_t *next;
+};
+
+
+/* Create a new thread pool. May return NULL if malloc() fails. */
+evtp_t *evtp_create(EV_P_ int maxthreads);
+
+
+/* Submit work to the thread pool. work_func() will be called in a worker
+ * thread. A short while after work_func() returns, done_func() will be called
+ * in the thread that runs ev_run() on the main loop given to evtp_create().
+ * The *work data must remain valid until done_func() is called.
+ */
+int evtp_submit(evtp_work_t *work, evtp_t *evtp, evtp_func_t work_func, evtp_func_t done_func);
+
+
+/* Cause a thread pool to die. Returns whenever there is new stuff in the
+ callback queue, or immediately if canblock is false. Returns true when
+ the thread pool is dead. */
+int evtp_die(evtp_t *threadpool, int canblock);
+
+/* Destroy a thread pool. Does nothing and returns -1 if the pool is not
+ dead. */
+int evtp_destroy(evtp_t *threadpool);
+
+#endif
+
+/* vim: set noet sw=4 ts=4: */
diff --git a/test/Makefile b/test/Makefile
index 4cfe088..ab8c360 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -11,16 +11,24 @@ yuri: ../yuri.c ../yuri.h yuri.c
ecbuf: ../ecbuf.h ecbuf.c
$(CC) $(CFLAGS) -I.. ecbuf.c -o ecbuf
-test: yuri ecbuf
+evtp: ../evtp.c ../evtp.h evtp.c
+ $(CC) $(CFLAGS) -I.. ../evtp.c evtp.c -lpthread -lev -o evtp
+
+test: yuri ecbuf evtp
./yuri
./ecbuf
+ ./evtp
@echo All tests passed.
ecbuf-bench: ../ecbuf.h ecbuf-bench.c
$(CC) $(CFLAGS) -DNDEBUG -I.. ecbuf-bench.c -o ecbuf-bench
-bench: ecbuf-bench
+evtp-bench: ../evtp.c ../evtp.h evtp.c
+ $(CC) $(CFLAGS) -DNDEBUG -DBENCH -I.. ../evtp.c evtp.c -lpthread -lev -o evtp-bench
+
+bench: ecbuf-bench evtp-bench
./ecbuf-bench
+ time ./evtp-bench
clean:
- rm -f yuri ecbuf ecbuf-bench
+ rm -f yuri ecbuf evtp ecbuf-bench evtp-bench
diff --git a/test/evtp.c b/test/evtp.c
new file mode 100644
index 0000000..b9813c1
--- /dev/null
+++ b/test/evtp.c
@@ -0,0 +1,107 @@
+/* Copyright (c) 2012 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#if !defined(BENCH) && defined(NDEBUG)
+#error These tests should not be compiled with -DNDEBUG!
+#endif
+
+#define BENCH_LOOPS 100000
+
+#include "evtp.h"
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+static evtp_t *tp;
+static char data[51];
+
+static void done_cb(evtp_work_t *w);
+static void work_cb(evtp_work_t *w);
+
+
+static void done_cb(evtp_work_t *w) {
+ static int done = 0;
+#ifdef BENCH
+ static int loops = 0;
+#endif
+ int i;
+
+ assert(*((char *)w->data) == 1);
+ free(w);
+ done++;
+
+ if(done == 1) {
+ for(i=1; i<50; i++) {
+ w = calloc(1, sizeof(evtp_work_t));
+ w->data = data+i;
+ evtp_submit(w, tp, work_cb, done_cb);
+ }
+ }
+
+#ifdef BENCH
+ loops++;
+ if(loops < BENCH_LOOPS) {
+ done = 0;
+ memset(data, 0, sizeof(data));
+ w = calloc(1, sizeof(evtp_work_t));
+ w->data = data;
+ evtp_submit(w, tp, work_cb, done_cb);
+ return;
+ }
+#endif
+
+ /* TODO: Shouldn't be necessary when evtp drops its ref when nothing is queued */
+ if(done == 50)
+ ev_break(EV_DEFAULT_ EVBREAK_ONE);
+}
+
+
+static void work_cb(evtp_work_t *w) {
+ char *d = w->data;
+ assert(*d == 0);
+ *d = 1;
+}
+
+
+int main(int argc, char **argv) {
+ ev_default_loop(0);
+ tp = evtp_create(EV_DEFAULT_ 4);
+
+ evtp_work_t *w = calloc(1, sizeof(evtp_work_t));
+ w->data = data;
+ evtp_submit(w, tp, work_cb, done_cb);
+ w = NULL;
+
+ ev_run(EV_DEFAULT_ 0);
+
+ assert(evtp_die(tp, 1) == 1);
+ assert(evtp_destroy(tp) == 1);
+ tp = NULL;
+
+ int i;
+ for(i=0; i<50; i++)
+ assert(data[i] == 1);
+
+ return 0;
+}
+
+/* vim: set noet sw=4 ts=4: */