diff options
author | Yorhel <git@yorhel.nl> | 2012-03-02 10:29:39 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-02 10:29:39 +0100 |
commit | 9250293d6fe028439bae39ed42b9f8c634f893b1 (patch) | |
tree | 0315362543874f91fc458a962eea8f3fcb6042c8 | |
parent | e72f46e9a98986ebb80f204d27bb50d046c88ab3 (diff) |
Added pthread context for links + fixed a few bugs
It can do a successful handshake now, yay!
-rw-r--r-- | tanja.c | 62 | ||||
-rw-r--r-- | tanja.h | 4 | ||||
-rw-r--r-- | tanja_pthread.c | 161 | ||||
-rw-r--r-- | test.c | 9 |
4 files changed, 198 insertions, 38 deletions
@@ -840,17 +840,19 @@ struct tn_link { lbuf_init(b);\ } while(0) -#define lbuf_grow(b) do {\ +#define lbuf_grow(b, m) do {\ if(b##_size < 16)\ b##_size = 16;\ else\ b##_size *= 2;\ + if(b##_size < m)\ + b##_size = m;\ b = realloc(b, b##_size);\ } while(0) #define lbuf_append(b, buf, len) do {\ if(b##_size < b##_len + len)\ - lbuf_grow(b);\ + lbuf_grow(b, b##_len + len);\ memcpy(b+b##_len, buf, len);\ b##_len += len;\ } while(0) @@ -881,21 +883,28 @@ tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data) l->ctx = ctx; l->data = data; l->handshaked = 0; + mutex_init(l->lock); lbuf_init(l->rbuf); lbuf_init(l->wbuf); - tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28); return l; } +void tn_link_start(tn_link *l) { + tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28); +} + + #define inc(n) do { buf+=n; len-=n; } while(0) static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) { int ok = 0; while(len > 0) { // Space, ignore - if(*buf == ' ') + if(*buf == ' ') { + inc(1); continue; + } // Version if(len > 4 && strncmp(buf, "ver,", 4) == 0) { @@ -912,13 +921,14 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) int flag = buf[3] == 'i' ? 2 : 4; inc(4); while(len && *buf != ' ') { - if(len >= 5 && strncmp(buf, ",json", 5) == 0) + if(len >= 5 && strncmp(buf, ",json", 5) == 0 && (len == 5 || buf[5] == ' ' || buf[5] == ',')) ok |= flag; inc(1); } } } + if(l) {} /* TODO if(ok != 1|2|4) { error(!(ok&1) ? "No or invalid version" : !(ok&2) : "No common input format" : "No common output format"); @@ -981,6 +991,48 @@ void tn_link_read(tn_link *l, const char *buf, int len) { } +void tn_link_dispatch(tn_link *l) { + // Call write() if we have something buffered. + // Note: We should not call write() while holding the lock, since that may + // block other threads. Instead, we take control over the write buffer and + // temporarily replace it with an empty one so that other threads can keep + // writing without blocking or losing the new data. + mutex_lock(l->lock); + if(l->wbuf_len) { + char *buf = l->wbuf; + int buf_len = l->wbuf_len; + int buf_size = l->wbuf_size; + lbuf_init(l->wbuf); + mutex_unlock(l->lock); + + int n = l->ctx->write(l, buf, buf_len, l->data); + assert(n >= 0); // TODO: error reporting! + + mutex_lock(l->lock); + if(n > 0) { + memmove(buf, buf+n, buf_len-n); + buf_len -= n; + } + // If our buffer has been emptied, then just free it and leave the write buffer alone. + if(!buf_len) + lbuf_free(buf); + else { + // Oops, someone else has written to the buffer while we were being + // busy. Append this data to our buffer. + if(l->wbuf_len) + lbuf_append(buf, l->wbuf, l->wbuf_len); + // Now give back our temporary buffer + if(l->wbuf) + free(l->wbuf); + l->wbuf = buf; + l->wbuf_len = buf_len; + l->wbuf_size = buf_size; + } + } + mutex_unlock(l->lock); +} + + void tn_link_close(tn_link *l) { mutex_lock(l->lock); lbuf_free(l->wbuf); @@ -61,7 +61,7 @@ typedef struct { typedef struct { void (*dispatch)(tn_link *, void *); // if NULL, then write() will be called immediately - void (*write)(tn_link *, char *, int, void *); + int (*write)(tn_link *, char *, int, void *); void (*close)(tn_link *, void *); } tn_link_context; @@ -96,7 +96,9 @@ void tn_session_close(tn_session *); tn_link *tn_link_create(tn_node *, int, tn_link_context *, void *); +void tn_link_start(tn_link *); void tn_link_read(tn_link *, const char *, int); +void tn_link_dispatch(tn_link *); void tn_link_close(tn_link *); // vim:noet:sw=4:ts=4 diff --git a/tanja_pthread.c b/tanja_pthread.c index 90308e2..1980533 100644 --- a/tanja_pthread.c +++ b/tanja_pthread.c @@ -5,18 +5,46 @@ #include <unistd.h> #include <stdlib.h> #include <string.h> +#include <errno.h> #include "tanja.h" +static void sig(int x) { + x++; + // No need to do anything, just make sure that any blocking function gets + // interrupted. +} + + +// Make sure to handle SIGUSR2 +static int setsig() { + static int init = 0; + if(init++) + return 1; + struct sigaction a; + memset(&a, 0, sizeof(a)); + sigemptyset(&a.sa_mask); + a.sa_flags = 0; + a.sa_handler = sig; + if(sigaction(SIGUSR2, &a, NULL) < 0) { + init = 0; + return 0; + } + return 1; +} + + + + typedef struct { pthread_t self; tn_session *s; -} pt_dat; +} pt_sdat; -static void dispatch(tn_session *s, void *_dat) { +static void sdispatch(tn_session *s, void *_dat) { if(s) {} - pt_dat *dat = _dat; + pt_sdat *dat = _dat; pthread_kill(dat->self, SIGUSR2); } @@ -26,17 +54,16 @@ static void dispatch(tn_session *s, void *_dat) { // place). static void sclose(tn_session *s, void *_dat) { if(s) {} - pt_dat *dat = _dat; + pt_sdat *dat = _dat; dat->s = NULL; pthread_kill(dat->self, SIGUSR2); } -static void *thread(void *_dat) { - pt_dat *dat = _dat; +static void *sthread(void *_dat) { + pt_sdat *dat = _dat; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED); - // TODO: sleep() -> read() if this is a link while(dat->s) { tn_session_dispatch(dat->s); if(dat->s) @@ -47,37 +74,18 @@ static void *thread(void *_dat) { } -static void sig(int x) { - x++; - // No need to do anything, just make sure that any blocking function gets - // interrupted. -} - - tn_session *tn_session_pthread(tn_node *n, pthread_t *t) { - static int init = 0; static tn_session_context ctx = { - dispatch, + sdispatch, sclose }; - if(!init++) { - // Make sure to handle SIGUSR2 - struct sigaction a; - memset(&a, 0, sizeof(a)); - sigemptyset(&a.sa_mask); - a.sa_flags = 0; - a.sa_handler = sig; - if(sigaction(SIGUSR2, &a, NULL) < 0) { - init = 0; - return NULL; - } - } + if(!setsig()) + return NULL; - pt_dat *dat = malloc(sizeof(pt_dat)); + pt_sdat *dat = malloc(sizeof(pt_sdat)); dat->s = tn_session_create(n, &ctx, dat); - - if(pthread_create(&(dat->self), NULL, thread, dat) < 0) { + if(pthread_create(&(dat->self), NULL, sthread, dat) < 0) { tn_session_close(dat->s); return NULL; } @@ -90,4 +98,95 @@ tn_session *tn_session_pthread(tn_node *n, pthread_t *t) { return dat->s; } + + +#define READBUFSIZE (10*1024) + +typedef struct { + pthread_t self; + tn_link *l; + int fdin; + int fdout; +} pt_fdldat; + + +static void fdldispatch(tn_link *l, void *_dat) { + if(l) {} + pt_fdldat *dat = _dat; + pthread_kill(dat->self, SIGUSR2); +} + + +// TODO: Same note as for link_close() applies here, I guess. But for links it +// actually has value... +static void fdlclose(tn_link *l, void *_dat) { + if(l) {} + pt_fdldat *dat = _dat; + dat->l = NULL; + pthread_kill(dat->self, SIGUSR2); +} + + +static int fdlwrite(tn_link *l, char *buf, int len, void *_dat) { + if(l) {} + pt_fdldat *dat = _dat; + int n = write(dat->fdout, buf, len); + // interrupt? 0 byte written. disconnect? error. + return n < 0 && errno == EINTR ? 0 : n == 0 ? -1 : n; +} + + +static void *fdlthread(void *_dat) { + char buf[READBUFSIZE]; + pt_fdldat *dat = _dat; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED); + while(dat->l) { + tn_link_dispatch(dat->l); + if(dat->l) { + int n = read(dat->fdin, buf, READBUFSIZE); + // Note: Don't check for EAGAIN or EWOULDBLOCK, the fd should not + // be non-blocking so that is an error. + if(n < 0 && errno == EINTR) + continue; + if(n <= 0) { + // TODO: report error / disconnect + tn_link_close(dat->l); + } else + tn_link_read(dat->l, buf, n); + } + } + free(dat); + return NULL; +} + + +tn_link *tn_link_pthread_fd(tn_node *n, int sync, int fdin, int fdout, pthread_t *t) { + static tn_link_context ctx = { + fdldispatch, + fdlwrite, + fdlclose + }; + + if(!setsig()) + return NULL; + + pt_fdldat *dat = malloc(sizeof(pt_fdldat)); + dat->fdin = fdin; + dat->fdout = fdout; + dat->l = tn_link_create(n, sync, &ctx, dat); + if(pthread_create(&dat->self, NULL, fdlthread, dat) < 0) { + tn_link_close(dat->l); + return NULL; + } + + if(t) + memcpy(t, &dat->self, sizeof(pthread_t)); + else + pthread_detach(dat->self); + + return dat->l; +} + + // vim:noet:sw=4:ts=4 @@ -8,6 +8,7 @@ extern tn_session *tn_session_pthread(tn_node *, pthread_t *); +extern tn_link *tn_link_pthread_fd(tn_node *, int, int, int, pthread_t *); void do_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { if(d) {} @@ -34,12 +35,18 @@ int main() { pthread_t th; tn_session *s = tn_session_pthread(n, &th); - tn_node_unref(n); tn_session_register(s, tn_tuple_new(""), 0, do_quit, NULL); tn_session_send(s, tn_tuple_new("i", 1), NULL, NULL); pthread_join(th, NULL); + + // And now a link with STDIO + tn_link *l = tn_link_pthread_fd(n, 1, STDIN_FILENO, STDOUT_FILENO, &th); + tn_link_start(l); + pthread_join(th, NULL); + + tn_node_unref(n); return 0; } |