diff options
author | Yorhel <git@yorhel.nl> | 2012-03-02 15:03:57 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-02 15:03:57 +0100 |
commit | dd2a060a9622d4bba13bb8da9cf80caa8e1e361e (patch) | |
tree | 5da9c0c7c1e59f41590443581514823036a6f1a7 | |
parent | f47c2efd2cb4c5c868f4b2da168e565cb681b3d4 (diff) |
c: Improved error handling for link objects
-rw-r--r-- | c/tanja.c | 71 | ||||
-rw-r--r-- | c/tanja.h | 10 | ||||
-rw-r--r-- | c/tanja_pthread.c | 19 | ||||
-rw-r--r-- | c/test.c | 6 |
4 files changed, 82 insertions, 24 deletions
@@ -815,6 +815,9 @@ struct tn_link { mutex_t(lock); // protects wbuf_* tn_node *node; tn_link_context *ctx; + tn_link_error_cb errcb; + int errcode; + char *errmsg; void *data; char *rbuf; int rbuf_len; @@ -859,18 +862,20 @@ struct tn_link { static inline void tn_link_write(tn_link *l, char *msg, int len) { - // No dispatcher? Then assume we can call ctx->write() directly - if(!l->ctx->dispatch) { - l->ctx->write(l, msg, len, l->data); + // No write dispatcher? Then assume we can call ctx->write() directly + if(!l->ctx->wdispatch) { + l->errcode = l->ctx->write(l, msg, len, &l->errmsg, l->data); + if(l->errcode) + l->ctx->dispatch(l, l->data); return; } - // Otherwise, append to the buffer and call the dispatcher + // Otherwise, append to the buffer and call the write dispatcher mutex_lock(l->lock); int empty = !l->wbuf_len; lbuf_append(l->wbuf, msg, len); mutex_unlock(l->lock); if(empty) - l->ctx->dispatch(l, l->data); + l->ctx->wdispatch(l, l->data); } @@ -883,6 +888,9 @@ tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data) l->ctx = ctx; l->data = data; l->handshaked = 0; + l->errmsg = NULL; + l->errcode = 0; + l->errcb = NULL; mutex_init(l->lock); lbuf_init(l->rbuf); lbuf_init(l->wbuf); @@ -890,6 +898,11 @@ tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data) } +void tn_link_on_error(tn_link *l, tn_link_error_cb cb) { + l->errcb = cb; +} + + void tn_link_start(tn_link *l) { tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28); } @@ -928,11 +941,13 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) } } - if(l) {} - /* TODO - if(ok != 1|2|4) { - error(!(ok&1) ? "No or invalid version" : !(ok&2) : "No common input format" : "No common output format"); - */ + if(ok != (1|2|4)) { + int c = !(ok&1) ? 1 : !(ok&2)? 2 : 3; + l->errcode = c << 24; + l->errmsg = strdup(c==1 ? "No or invalid protocol version" : c==2 ? "No common input format" : "No common output format"); + l->ctx->dispatch(l, l->data); + } else + l->handshaked = 1; } #undef inc @@ -956,11 +971,24 @@ static int tn_link_handleread(tn_link *l, const char *buf, int len) { } +void tn_link_read_error(tn_link *l, int code, const char *msg) { + if(l->errcode) + return; + l->errcode = code; + l->errmsg = strdup(msg); + l->ctx->dispatch(l, l->data); +} + + // Access to this call must be serialized. (Hardly makes sense to call multiple // _reads() in parallel anyway). -// ->dispatch() or ->write() callbacks may be run as a result of calling this -// function, so these callbacks must not call any tn_link* functions again! +// ->wdispatch(), ->dispatch() or ->write() callbacks may be run as a result of +// calling this function, so these callbacks must not call any tn_link* +// functions again! void tn_link_read(tn_link *l, const char *buf, int len) { + if(l->errcode) + return; + // Don't copy things to the read buffer if we have a full message int n=1; while(!l->rbuf_len && len > 0 && n) { @@ -992,6 +1020,14 @@ void tn_link_read(tn_link *l, const char *buf, int len) { void tn_link_dispatch(tn_link *l) { + // If there's an error, handle that. + if(l->errcode) { + if(l->errcb) + l->errcb(l, l->errcode, l->errmsg); + tn_link_close(l); + return; + } + // Call write() if we have something buffered. // Note: We should not call write() while holding the lock, since that may // block other threads. Instead, we take control over the write buffer and @@ -1005,8 +1041,13 @@ void tn_link_dispatch(tn_link *l) { lbuf_init(l->wbuf); mutex_unlock(l->lock); - int n = l->ctx->write(l, buf, buf_len, l->data); - assert(n >= 0); // TODO: error reporting! + int n = l->ctx->write(l, buf, buf_len, &l->errmsg, l->data); + if(n < 0) { + l->errcode = -n; + l->ctx->dispatch(l, l->data); + free(buf); + return; + } mutex_lock(l->lock); if(n > 0) { @@ -1041,6 +1082,8 @@ void tn_link_close(tn_link *l) { if(l->ctx->close) l->ctx->close(l, l->data); + if(l->errmsg) + free(l->errmsg); free(l); } @@ -60,14 +60,17 @@ typedef struct { } tn_session_context; typedef struct { - void (*dispatch)(tn_link *, void *); // if NULL, then write() will be called immediately - int (*write)(tn_link *, char *, int, void *); + void (*dispatch)(tn_link *, void *); + void (*wdispatch)(tn_link *, void *); // if NULL, then write() will be called immediately + int (*write)(tn_link *, char *, int, char **, void *); void (*close)(tn_link *, void *); } tn_link_context; typedef void (*tn_reply_cb)(tn_session *, tn_tuple *, void *); typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, tn_returnpath *, void *); +typedef void (*tn_link_error_cb)(tn_link *, int, char *); + void tn_element_free(tn_element); void tn_tuple_ref(tn_tuple *); void tn_tuple_unref(tn_tuple *); @@ -94,10 +97,11 @@ void tn_session_unregister(tn_session *, int); void tn_session_dispatch(tn_session *); void tn_session_close(tn_session *); - tn_link *tn_link_create(tn_node *, int, tn_link_context *, void *); +void tn_link_on_error(tn_link *, tn_link_error_cb); void tn_link_start(tn_link *); void tn_link_read(tn_link *, const char *, int); +void tn_link_read_error(tn_link *, int, const char *); void tn_link_dispatch(tn_link *); void tn_link_close(tn_link *); diff --git a/c/tanja_pthread.c b/c/tanja_pthread.c index 1980533..83f9ea8 100644 --- a/c/tanja_pthread.c +++ b/c/tanja_pthread.c @@ -5,6 +5,7 @@ #include <unistd.h> #include <stdlib.h> #include <string.h> +#include <assert.h> #include <errno.h> #include "tanja.h" @@ -127,12 +128,16 @@ static void fdlclose(tn_link *l, void *_dat) { } -static int fdlwrite(tn_link *l, char *buf, int len, void *_dat) { +static int fdlwrite(tn_link *l, char *buf, int len, char **err, void *_dat) { if(l) {} pt_fdldat *dat = _dat; int n = write(dat->fdout, buf, len); - // interrupt? 0 byte written. disconnect? error. - return n < 0 && errno == EINTR ? 0 : n == 0 ? -1 : n; + if(n > 0) + return n; + if(n < 0 && errno == EINTR) + return 0; + *err = strdup(n == 0 ? "Remote disconnected." : strerror(errno)); + return n == 0 ? -1 : -errno; } @@ -149,10 +154,9 @@ static void *fdlthread(void *_dat) { // be non-blocking so that is an error. if(n < 0 && errno == EINTR) continue; - if(n <= 0) { - // TODO: report error / disconnect - tn_link_close(dat->l); - } else + if(n <= 0) + tn_link_read_error(dat->l, n == 0 ? 1 : errno, n == 0 ? "Remote disconnected." : strerror(errno)); + else tn_link_read(dat->l, buf, n); } } @@ -164,6 +168,7 @@ static void *fdlthread(void *_dat) { tn_link *tn_link_pthread_fd(tn_node *n, int sync, int fdin, int fdout, pthread_t *t) { static tn_link_context ctx = { fdldispatch, + fdldispatch, fdlwrite, fdlclose }; @@ -18,6 +18,11 @@ void do_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { printf("Closing!\n"); } +void print_error(tn_link *l, int code, char *msg) { + if(l) {} + printf("Link error #%d: %s\n", code, msg?msg:"(empty)"); +} + int main() { tn_tuple *t = tn_tuple_new("i*sam", 1293, strdup("som\\\"e_strin\01g"), @@ -43,6 +48,7 @@ int main() { // And now a link with STDIO tn_link *l = tn_link_pthread_fd(n, 1, STDIN_FILENO, STDOUT_FILENO, &th); + tn_link_on_error(l, print_error); tn_link_start(l); pthread_join(th, NULL); |