diff options
author | Yorhel <git@yorhel.nl> | 2012-02-28 21:20:40 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-02-28 21:20:40 +0100 |
commit | b98655e0b203f84e31b25a697a9f6215097ea460 (patch) | |
tree | 51b61d688f8e6c305a04d58ab0ac59399efd081a | |
parent | c25b9aade0cff48672ba0c362fca5546a3032e51 (diff) |
c: Added crude pthread-based "event loop" + fixed a few bugs
-rw-r--r-- | c/Makefile | 7 | ||||
-rw-r--r-- | c/tanja.c | 15 | ||||
-rw-r--r-- | c/tanja.h | 1 | ||||
-rw-r--r-- | c/tanja_pthread.c | 93 | ||||
-rw-r--r-- | c/test.c | 23 |
5 files changed, 135 insertions, 4 deletions
@@ -4,5 +4,8 @@ CFLAGS=-Wall -Wextra -g -O3 -DTANJA_THREADSAFE tanja.o: Makefile tanja.c tanja.h $(CC) $(CFLAGS) -c tanja.c -o tanja.o -test: test.c tanja.o - $(CC) $(CFLAGS) $(LDFLAGS) tanja.o test.c -o test +tanja_pthread.o: tanja.o tanja_pthread.c + $(CC) $(CFLAGS) -c tanja_pthread.c -o tanja_pthread.o + +test: test.c tanja_pthread.o + $(CC) $(CFLAGS) $(LDFLAGS) tanja.o tanja_pthread.o test.c -lpthread -o test @@ -450,6 +450,7 @@ static inline patternreg *patternreg_create_ses(tn_tuple *pat, int willreply, tn patternreg *r = malloc(sizeof(patternreg)); r->ref = 1; r->active = 1; + r->type = 0; r->pat = pat; r->willreply = willreply; r->recipient = ses; // No need to ref() this, the pattern is unregistered when the session is closed anyway @@ -570,6 +571,10 @@ static void tn_node_unregsession(tn_node *n, tn_session *s) { } +// The reference held on the tuple is passed to the node. So the following +// works without memory leaks: +// tn_session_send(.., tn_tuple_new(..), ..); +// // TODO: don't send tuples back to the link it came from static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { mutex_lock(n->lock); @@ -594,6 +599,9 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { tn_returnpath_open(rp); tn_reply_close(rp); } + + // Free the tuple if no sessions/links were interested in it. + tn_tuple_unref(tup); } @@ -681,7 +689,7 @@ void tn_session_unregister(tn_session *s, int id) { // Assumes s->lock is held, and will unlock it. static inline void tn_session_appendmsg(tn_session *s, sesmsg *m) { m->next = NULL; - int empty = !!s->q_begin; + int empty = !s->q_begin; if(empty) s->q_begin = m; else @@ -752,16 +760,19 @@ static inline void tn_session_dispatch_one(tn_session *s, sesmsg *m) { void tn_session_dispatch(tn_session *s) { sesmsg *q; + tn_session_ref(s); do { // Pop item from the start of the queue mutex_lock(s->lock); assert(s->active || !s->q_begin); // Queue must be (and stay) empty if the session has been closed q = s->q_begin; - s->q_begin = q->next; + if(q) + s->q_begin = q->next; mutex_unlock(s->lock); if(q) tn_session_dispatch_one(s, q); } while(q); + tn_session_unref(s); } @@ -84,6 +84,7 @@ tn_session *tn_session_create(tn_node *, tn_session_context *, void *); void tn_session_send(tn_session *, tn_tuple *, tn_reply_cb, void *dat); int tn_session_register(tn_session *, tn_tuple *, int, tn_tuple_cb, void *); void tn_session_unregister(tn_session *, int); +void tn_session_dispatch(tn_session *); void tn_session_close(tn_session *); // vim:noet:sw=4:ts=4 diff --git a/c/tanja_pthread.c b/c/tanja_pthread.c new file mode 100644 index 0000000..90308e2 --- /dev/null +++ b/c/tanja_pthread.c @@ -0,0 +1,93 @@ +// This is a very crude one-thread-per-session "event" model. + +#include <pthread.h> +#include <signal.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include "tanja.h" + + +typedef struct { + pthread_t self; + tn_session *s; +} pt_dat; + + +static void dispatch(tn_session *s, void *_dat) { + if(s) {} + pt_dat *dat = _dat; + pthread_kill(dat->self, SIGUSR2); +} + + +// Calling session_close() from within another thread may be a bit racy, I'm +// afraid. Have to look into that (if it's going to be allowed in the first +// place). +static void sclose(tn_session *s, void *_dat) { + if(s) {} + pt_dat *dat = _dat; + dat->s = NULL; + pthread_kill(dat->self, SIGUSR2); +} + + +static void *thread(void *_dat) { + pt_dat *dat = _dat; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED); + // TODO: sleep() -> read() if this is a link + while(dat->s) { + tn_session_dispatch(dat->s); + if(dat->s) + sleep(10); + } + free(dat); + return NULL; +} + + +static void sig(int x) { + x++; + // No need to do anything, just make sure that any blocking function gets + // interrupted. +} + + +tn_session *tn_session_pthread(tn_node *n, pthread_t *t) { + static int init = 0; + static tn_session_context ctx = { + dispatch, + sclose + }; + + if(!init++) { + // Make sure to handle SIGUSR2 + struct sigaction a; + memset(&a, 0, sizeof(a)); + sigemptyset(&a.sa_mask); + a.sa_flags = 0; + a.sa_handler = sig; + if(sigaction(SIGUSR2, &a, NULL) < 0) { + init = 0; + return NULL; + } + } + + pt_dat *dat = malloc(sizeof(pt_dat)); + dat->s = tn_session_create(n, &ctx, dat); + + if(pthread_create(&(dat->self), NULL, thread, dat) < 0) { + tn_session_close(dat->s); + return NULL; + } + + if(t) + memcpy(t, &(dat->self), sizeof(pthread_t)); + else + pthread_detach(dat->self); + + return dat->s; +} + +// vim:noet:sw=4:ts=4 @@ -1,8 +1,23 @@ #include <stdlib.h> #include <stdio.h> #include <string.h> +#include <unistd.h> +#include <stdlib.h> +#include <pthread.h> #include "tanja.h" + +extern tn_session *tn_session_pthread(tn_node *, pthread_t *); + +void do_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { + if(d) {} + if(p) {} + tn_tuple_unref(tup); + tn_session_close(s); + printf("Closing!\n"); +} + + int main() { tn_tuple *t = tn_tuple_new("i*sam", 1293, strdup("som\\\"e_strin\01g"), tn_array_new("i", INT64_MIN), @@ -16,7 +31,15 @@ int main() { free(buf); tn_tuple_unref(t); tn_node *n = tn_node_create(); + + pthread_t th; + tn_session *s = tn_session_pthread(n, &th); tn_node_unref(n); + + tn_session_register(s, tn_tuple_new(""), 0, do_quit, NULL); + tn_session_send(s, tn_tuple_new("i", 1), NULL, NULL); + + pthread_join(th, NULL); return 0; } |