summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-02 15:03:57 +0100
committerYorhel <git@yorhel.nl>2012-03-02 15:03:57 +0100
commitdd2a060a9622d4bba13bb8da9cf80caa8e1e361e (patch)
tree5da9c0c7c1e59f41590443581514823036a6f1a7
parentf47c2efd2cb4c5c868f4b2da168e565cb681b3d4 (diff)
c: Improved error handling for link objects
-rw-r--r--c/tanja.c71
-rw-r--r--c/tanja.h10
-rw-r--r--c/tanja_pthread.c19
-rw-r--r--c/test.c6
4 files changed, 82 insertions, 24 deletions
diff --git a/c/tanja.c b/c/tanja.c
index 17db8ff..e6f8633 100644
--- a/c/tanja.c
+++ b/c/tanja.c
@@ -815,6 +815,9 @@ struct tn_link {
mutex_t(lock); // protects wbuf_*
tn_node *node;
tn_link_context *ctx;
+ tn_link_error_cb errcb;
+ int errcode;
+ char *errmsg;
void *data;
char *rbuf;
int rbuf_len;
@@ -859,18 +862,20 @@ struct tn_link {
static inline void tn_link_write(tn_link *l, char *msg, int len) {
- // No dispatcher? Then assume we can call ctx->write() directly
- if(!l->ctx->dispatch) {
- l->ctx->write(l, msg, len, l->data);
+ // No write dispatcher? Then assume we can call ctx->write() directly
+ if(!l->ctx->wdispatch) {
+ l->errcode = l->ctx->write(l, msg, len, &l->errmsg, l->data);
+ if(l->errcode)
+ l->ctx->dispatch(l, l->data);
return;
}
- // Otherwise, append to the buffer and call the dispatcher
+ // Otherwise, append to the buffer and call the write dispatcher
mutex_lock(l->lock);
int empty = !l->wbuf_len;
lbuf_append(l->wbuf, msg, len);
mutex_unlock(l->lock);
if(empty)
- l->ctx->dispatch(l, l->data);
+ l->ctx->wdispatch(l, l->data);
}
@@ -883,6 +888,9 @@ tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data)
l->ctx = ctx;
l->data = data;
l->handshaked = 0;
+ l->errmsg = NULL;
+ l->errcode = 0;
+ l->errcb = NULL;
mutex_init(l->lock);
lbuf_init(l->rbuf);
lbuf_init(l->wbuf);
@@ -890,6 +898,11 @@ tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data)
}
+void tn_link_on_error(tn_link *l, tn_link_error_cb cb) {
+ l->errcb = cb;
+}
+
+
void tn_link_start(tn_link *l) {
tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28);
}
@@ -928,11 +941,13 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len)
}
}
- if(l) {}
- /* TODO
- if(ok != 1|2|4) {
- error(!(ok&1) ? "No or invalid version" : !(ok&2) : "No common input format" : "No common output format");
- */
+ if(ok != (1|2|4)) {
+ int c = !(ok&1) ? 1 : !(ok&2)? 2 : 3;
+ l->errcode = c << 24;
+ l->errmsg = strdup(c==1 ? "No or invalid protocol version" : c==2 ? "No common input format" : "No common output format");
+ l->ctx->dispatch(l, l->data);
+ } else
+ l->handshaked = 1;
}
#undef inc
@@ -956,11 +971,24 @@ static int tn_link_handleread(tn_link *l, const char *buf, int len) {
}
+void tn_link_read_error(tn_link *l, int code, const char *msg) {
+ if(l->errcode)
+ return;
+ l->errcode = code;
+ l->errmsg = strdup(msg);
+ l->ctx->dispatch(l, l->data);
+}
+
+
// Access to this call must be serialized. (Hardly makes sense to call multiple
// _reads() in parallel anyway).
-// ->dispatch() or ->write() callbacks may be run as a result of calling this
-// function, so these callbacks must not call any tn_link* functions again!
+// ->wdispatch(), ->dispatch() or ->write() callbacks may be run as a result of
+// calling this function, so these callbacks must not call any tn_link*
+// functions again!
void tn_link_read(tn_link *l, const char *buf, int len) {
+ if(l->errcode)
+ return;
+
// Don't copy things to the read buffer if we have a full message
int n=1;
while(!l->rbuf_len && len > 0 && n) {
@@ -992,6 +1020,14 @@ void tn_link_read(tn_link *l, const char *buf, int len) {
void tn_link_dispatch(tn_link *l) {
+ // If there's an error, handle that.
+ if(l->errcode) {
+ if(l->errcb)
+ l->errcb(l, l->errcode, l->errmsg);
+ tn_link_close(l);
+ return;
+ }
+
// Call write() if we have something buffered.
// Note: We should not call write() while holding the lock, since that may
// block other threads. Instead, we take control over the write buffer and
@@ -1005,8 +1041,13 @@ void tn_link_dispatch(tn_link *l) {
lbuf_init(l->wbuf);
mutex_unlock(l->lock);
- int n = l->ctx->write(l, buf, buf_len, l->data);
- assert(n >= 0); // TODO: error reporting!
+ int n = l->ctx->write(l, buf, buf_len, &l->errmsg, l->data);
+ if(n < 0) {
+ l->errcode = -n;
+ l->ctx->dispatch(l, l->data);
+ free(buf);
+ return;
+ }
mutex_lock(l->lock);
if(n > 0) {
@@ -1041,6 +1082,8 @@ void tn_link_close(tn_link *l) {
if(l->ctx->close)
l->ctx->close(l, l->data);
+ if(l->errmsg)
+ free(l->errmsg);
free(l);
}
diff --git a/c/tanja.h b/c/tanja.h
index 17a5b8e..aa3d328 100644
--- a/c/tanja.h
+++ b/c/tanja.h
@@ -60,14 +60,17 @@ typedef struct {
} tn_session_context;
typedef struct {
- void (*dispatch)(tn_link *, void *); // if NULL, then write() will be called immediately
- int (*write)(tn_link *, char *, int, void *);
+ void (*dispatch)(tn_link *, void *);
+ void (*wdispatch)(tn_link *, void *); // if NULL, then write() will be called immediately
+ int (*write)(tn_link *, char *, int, char **, void *);
void (*close)(tn_link *, void *);
} tn_link_context;
typedef void (*tn_reply_cb)(tn_session *, tn_tuple *, void *);
typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, tn_returnpath *, void *);
+typedef void (*tn_link_error_cb)(tn_link *, int, char *);
+
void tn_element_free(tn_element);
void tn_tuple_ref(tn_tuple *);
void tn_tuple_unref(tn_tuple *);
@@ -94,10 +97,11 @@ void tn_session_unregister(tn_session *, int);
void tn_session_dispatch(tn_session *);
void tn_session_close(tn_session *);
-
tn_link *tn_link_create(tn_node *, int, tn_link_context *, void *);
+void tn_link_on_error(tn_link *, tn_link_error_cb);
void tn_link_start(tn_link *);
void tn_link_read(tn_link *, const char *, int);
+void tn_link_read_error(tn_link *, int, const char *);
void tn_link_dispatch(tn_link *);
void tn_link_close(tn_link *);
diff --git a/c/tanja_pthread.c b/c/tanja_pthread.c
index 1980533..83f9ea8 100644
--- a/c/tanja_pthread.c
+++ b/c/tanja_pthread.c
@@ -5,6 +5,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
+#include <assert.h>
#include <errno.h>
#include "tanja.h"
@@ -127,12 +128,16 @@ static void fdlclose(tn_link *l, void *_dat) {
}
-static int fdlwrite(tn_link *l, char *buf, int len, void *_dat) {
+static int fdlwrite(tn_link *l, char *buf, int len, char **err, void *_dat) {
if(l) {}
pt_fdldat *dat = _dat;
int n = write(dat->fdout, buf, len);
- // interrupt? 0 byte written. disconnect? error.
- return n < 0 && errno == EINTR ? 0 : n == 0 ? -1 : n;
+ if(n > 0)
+ return n;
+ if(n < 0 && errno == EINTR)
+ return 0;
+ *err = strdup(n == 0 ? "Remote disconnected." : strerror(errno));
+ return n == 0 ? -1 : -errno;
}
@@ -149,10 +154,9 @@ static void *fdlthread(void *_dat) {
// be non-blocking so that is an error.
if(n < 0 && errno == EINTR)
continue;
- if(n <= 0) {
- // TODO: report error / disconnect
- tn_link_close(dat->l);
- } else
+ if(n <= 0)
+ tn_link_read_error(dat->l, n == 0 ? 1 : errno, n == 0 ? "Remote disconnected." : strerror(errno));
+ else
tn_link_read(dat->l, buf, n);
}
}
@@ -164,6 +168,7 @@ static void *fdlthread(void *_dat) {
tn_link *tn_link_pthread_fd(tn_node *n, int sync, int fdin, int fdout, pthread_t *t) {
static tn_link_context ctx = {
fdldispatch,
+ fdldispatch,
fdlwrite,
fdlclose
};
diff --git a/c/test.c b/c/test.c
index 23b8ba0..a28e196 100644
--- a/c/test.c
+++ b/c/test.c
@@ -18,6 +18,11 @@ void do_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
printf("Closing!\n");
}
+void print_error(tn_link *l, int code, char *msg) {
+ if(l) {}
+ printf("Link error #%d: %s\n", code, msg?msg:"(empty)");
+}
+
int main() {
tn_tuple *t = tn_tuple_new("i*sam", 1293, strdup("som\\\"e_strin\01g"),
@@ -43,6 +48,7 @@ int main() {
// And now a link with STDIO
tn_link *l = tn_link_pthread_fd(n, 1, STDIN_FILENO, STDOUT_FILENO, &th);
+ tn_link_on_error(l, print_error);
tn_link_start(l);
pthread_join(th, NULL);