diff options
-rw-r--r-- | tanja_pthread.h (renamed from tanja_pthread.c) | 90 | ||||
-rw-r--r-- | test/Makefile | 8 | ||||
-rw-r--r-- | test/link.c | 6 | ||||
-rw-r--r-- | test/pthread.c | 5 |
4 files changed, 52 insertions, 57 deletions
diff --git a/tanja_pthread.c b/tanja_pthread.h index e0a6d0b..5b79621 100644 --- a/tanja_pthread.c +++ b/tanja_pthread.h @@ -24,6 +24,10 @@ // This is a very crude one-thread-per-session "event" model. + +#ifndef _TANJA_PTHREAD_H +#define _TANJA_PTHREAD_H + #include <stdlib.h> #include <string.h> #include <assert.h> @@ -38,28 +42,22 @@ #include "tanja.h" -static void sig(int x) { +static inline void _tn_pthread_sig(int x) { assert(x == SIGUSR2); // No need to do anything, just make sure that any blocking function gets // interrupted. } -// Make sure to handle SIGUSR2 -static int setsig() { - static int init = 0; - if(init++) - return 1; +// Make sure to handle SIGUSR2. Call this function before creating any sessions +// or nodes. Returns 0 on success, something else otherwise. +static inline int tn_pthread_setsig() { struct sigaction a; memset(&a, 0, sizeof(a)); sigemptyset(&a.sa_mask); a.sa_flags = 0; - a.sa_handler = sig; - if(sigaction(SIGUSR2, &a, NULL) < 0) { - init = 0; - return 0; - } - return 1; + a.sa_handler = _tn_pthread_sig; + return sigaction(SIGUSR2, &a, NULL); } @@ -68,18 +66,18 @@ static int setsig() { typedef struct { pthread_t self; tn_session *s; -} pt_sdat; +} _tn_pthread_s; -static void sdispatch(tn_session *s, void *_dat) { - pt_sdat *dat = _dat; +static inline void _tn_pthread_sd(tn_session *s, void *_dat) { + _tn_pthread_s *dat = _dat; assert(dat->s == s); pthread_kill(dat->self, SIGUSR2); } -static void *sthread(void *_dat) { - pt_sdat *dat = _dat; +static inline void *_tn_pthread_st(void *_dat) { + _tn_pthread_s *dat = _dat; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED); sigset_t mask, oldmask; @@ -95,14 +93,11 @@ static void *sthread(void *_dat) { } -tn_session *tn_session_pthread(tn_node *n, pthread_t *t) { - if(!setsig()) - return NULL; - - pt_sdat *dat = malloc(sizeof(pt_sdat)); - dat->s = tn_session_new(n, sdispatch, dat); +static inline tn_session *tn_session_pthread(tn_node *n, pthread_t *t) { + _tn_pthread_s *dat = malloc(sizeof(_tn_pthread_s)); + dat->s = tn_session_new(n, _tn_pthread_sd, dat); tn_session_ref(dat->s); - if(pthread_create(&(dat->self), NULL, sthread, dat) < 0) { + if(pthread_create(&(dat->self), NULL, _tn_pthread_st, dat) < 0) { tn_session_unref(dat->s); tn_session_close(dat->s); return NULL; @@ -118,18 +113,22 @@ tn_session *tn_session_pthread(tn_node *n, pthread_t *t) { -#define READBUFSIZE (10*1024) + +#ifndef TN_PTHREAD_READBUFSIZE +# define TN_PTHREAD_READBUFSIZE (10*1024) +#endif + typedef struct { pthread_t self; tn_link *l; int fdin; int fdout; -} pt_fdldat; +} _tn_pthread_l; -static void fdldispatch(tn_link *l, void *_dat) { - pt_fdldat *dat = _dat; +static inline void _tn_pthread_ld(tn_link *l, void *_dat) { + _tn_pthread_l *dat = _dat; assert(l == dat->l); pthread_kill(dat->self, SIGUSR2); } @@ -137,15 +136,15 @@ static void fdldispatch(tn_link *l, void *_dat) { // Just interrupt whatever is going on, the thread loop will empty the write // buffer if there's something to write. -static int fdlwdispatch(tn_link *l, char *buf, int len, void *_dat) { +static inline int _tn_pthread_ldw(tn_link *l, char *buf, int len, void *_dat) { assert(buf && len > 0); - fdldispatch(l, _dat); + _tn_pthread_ld(l, _dat); return 0; } // Returns 0 if there is nothing more to write, 1 otherwise. -static int fdlwrite(tn_link *l, sigset_t *mask, int fd) { +static inline int _tn_pthread_lwr(tn_link *l, sigset_t *mask, int fd) { char *buf = NULL; int len = tn_link_startwrite(l, &buf); if(!len) @@ -166,14 +165,14 @@ static int fdlwrite(tn_link *l, sigset_t *mask, int fd) { } -static void fdlread(tn_link *l, sigset_t *mask, int fd) { - char buf[READBUFSIZE]; +static inline void _tn_pthread_lrd(tn_link *l, sigset_t *mask, int fd) { + char buf[TN_PTHREAD_READBUFSIZE]; fd_set fds; FD_ZERO(&fds); FD_SET(fd, &fds); int n = pselect(fd+1, &fds, NULL, NULL, NULL, mask); if(n > 0) - n = read(fd, buf, READBUFSIZE); + n = read(fd, buf, TN_PTHREAD_READBUFSIZE); if(n < 0 && errno == EINTR) return; if(n <= 0) // pselect() shouldn't return 0, so that must come from the read() @@ -183,8 +182,8 @@ static void fdlread(tn_link *l, sigset_t *mask, int fd) { } -static void *fdlthread(void *_dat) { - pt_fdldat *dat = _dat; +static inline void *_tn_pthread_lt(void *_dat) { + _tn_pthread_l *dat = _dat; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED); sigset_t mask, oldmask; @@ -197,14 +196,14 @@ static void *fdlthread(void *_dat) { // If there's a callback waiting for the application, dispatch that if(!tn_link_dispatch(dat->l)) goto end; - } while(fdlwrite(dat->l, &oldmask, dat->fdout)); + } while(_tn_pthread_lwr(dat->l, &oldmask, dat->fdout)); // If fdlwrite() returned 0, that could also mean that there was an error, check & dispatch. if(!tn_link_dispatch(dat->l)) goto end; // Now get to sleep again in a read call. - fdlread(dat->l, &oldmask, dat->fdin); + _tn_pthread_lrd(dat->l, &oldmask, dat->fdin); } end: tn_link_unref(dat->l); @@ -214,21 +213,18 @@ end: } -tn_link *tn_link_pthread_fd(tn_node *n, int fdin, int fdout, pthread_t *t) { +static inline tn_link *tn_link_pthread_fd(tn_node *n, int fdin, int fdout, pthread_t *t) { static tn_link_context ctx = { - fdldispatch, - fdlwdispatch + _tn_pthread_ld, + _tn_pthread_ldw }; - if(!setsig()) - return NULL; - - pt_fdldat *dat = malloc(sizeof(pt_fdldat)); + _tn_pthread_l *dat = malloc(sizeof(_tn_pthread_l)); dat->fdin = fdin; dat->fdout = fdout; dat->l = tn_link_new(n, &ctx, dat); tn_link_ref(dat->l); - if(pthread_create(&dat->self, NULL, fdlthread, dat) < 0) { + if(pthread_create(&dat->self, NULL, _tn_pthread_lt, dat) < 0) { tn_link_unref(dat->l); tn_link_close(dat->l); return NULL; @@ -243,4 +239,6 @@ tn_link *tn_link_pthread_fd(tn_node *n, int fdin, int fdout, pthread_t *t) { } +#endif // _TANJA_PTHREAD_H + // vim:noet:sw=4:ts=4 diff --git a/test/Makefile b/test/Makefile index e99a412..8d71f6e 100644 --- a/test/Makefile +++ b/test/Makefile @@ -17,11 +17,11 @@ json: $(DEP) json.c tuple: $(DEP) tuple.c $(COMP) tuple.c -o tuple -pthread: $(DEP) ../tanja_pthread.c pthread.c - $(COMP) $(THREAD) ../tanja_pthread.c pthread.c -o pthread +pthread: $(DEP) ../tanja_pthread.h pthread.c + $(COMP) $(THREAD) pthread.c -o pthread -link: $(DEP) ../tanja_pthread.c link.c - $(COMP) $(THREAD) ../tanja_pthread.c link.c -o link +link: $(DEP) ../tanja_pthread.h link.c + $(COMP) $(THREAD) link.c -o link test: json tuple pthread prove ./json ./tuple ./pthread diff --git a/test/link.c b/test/link.c index 178278a..7c1e382 100644 --- a/test/link.c +++ b/test/link.c @@ -29,10 +29,7 @@ #include <assert.h> #include <unistd.h> #include "tanja.h" - - -extern tn_session *tn_session_pthread(tn_node *, pthread_t *); -extern tn_link *tn_link_pthread_fd(tn_node *, int, int, pthread_t *); +#include "tanja_pthread.h" static tn_link *lnk; static tn_session *ses; @@ -119,6 +116,7 @@ static void lnk_ready(tn_link *l) { int main() { + tn_pthread_setsig(); tn_node *n = tn_node_new(); // Create a session for debugging diff --git a/test/pthread.c b/test/pthread.c index 8c1d87f..f305f97 100644 --- a/test/pthread.c +++ b/test/pthread.c @@ -26,9 +26,7 @@ #include <string.h> #include <pthread.h> #include "tanja.h" - - -extern tn_session *tn_session_pthread(tn_node *, pthread_t *); +#include "tanja_pthread.h" #define t_assert(n) printf("%sok %d - " __FILE__ ":%d: " #n "\n", (n)?"":"not ", ++test_num, __LINE__) @@ -118,6 +116,7 @@ static void recv_reply(tn_session *s, tn_tuple *tup, void *d) { int main() { + tn_pthread_setsig(); tn_node *n = tn_node_new(); t_assert(n != NULL); |