summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-01 19:55:08 +0100
committerYorhel <git@yorhel.nl>2012-03-01 19:55:08 +0100
commitbdde01b591a6b0a65fb100d83a7a7a604d2da820 (patch)
treea5239e63ed4056ae99aa1364136428d9d1cb15f6
parentb98655e0b203f84e31b25a697a9f6215097ea460 (diff)
c: Started on the link implementation
Emphasis on "started", I don't even have the means yet to test it. There's also no error reporting at all yet.
-rw-r--r--c/tanja.c190
-rw-r--r--c/tanja.h12
2 files changed, 202 insertions, 0 deletions
diff --git a/c/tanja.c b/c/tanja.c
index 4d2286a..8fc73bf 100644
--- a/c/tanja.c
+++ b/c/tanja.c
@@ -803,4 +803,194 @@ void tn_session_close(tn_session *s) {
tn_session_unref(s);
}
+
+
+
+
+
+// Link
+
+// TODO: probably needs to be refcounted to correctly handle return paths
+struct tn_link {
+ mutex_t(lock); // protects wbuf_*
+ tn_node *node;
+ tn_link_context *ctx;
+ void *data;
+ char *rbuf;
+ int rbuf_len;
+ int rbuf_size;
+ char *wbuf;
+ int wbuf_len;
+ int wbuf_size;
+ unsigned handshaked : 1;
+ unsigned sync : 1;
+};
+
+
+// Tiny buffer abstraction
+
+#define lbuf_init(b) do {\
+ b = NULL;\
+ b##_size = b##_len = 0;\
+ } while(0)
+
+#define lbuf_free(b) do {\
+ if(b)\
+ free(b);\
+ lbuf_init(b);\
+ } while(0)
+
+#define lbuf_grow(b) do {\
+ if(b##_size < 16)\
+ b##_size = 16;\
+ else\
+ b##_size *= 2;\
+ b = realloc(b, b##_size);\
+ } while(0)
+
+#define lbuf_append(b, buf, len) do {\
+ if(b##_size < b##_len + len)\
+ lbuf_grow(b);\
+ memcpy(b+b##_len, buf, len);\
+ b##_len += len;\
+ } while(0)
+
+
+static inline void tn_link_write(tn_link *l, char *msg, int len) {
+ // No dispatcher? Then assume we can call ctx->write() directly
+ if(!l->ctx->dispatch) {
+ l->ctx->write(l, msg, len, l->data);
+ return;
+ }
+ // Otherwise, append to the buffer and call the dispatcher
+ mutex_lock(l->lock);
+ int empty = !l->wbuf_len;
+ lbuf_append(l->wbuf, msg, len);
+ mutex_unlock(l->lock);
+ if(empty)
+ l->ctx->dispatch(l, l->data);
+}
+
+
+// TODO: error & "link_ready" callback
+// TODO: configure a maximum buffer size?
+tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data) {
+ tn_link *l = malloc(sizeof(tn_link));
+ l->node = n;
+ l->sync = !!sync;
+ l->ctx = ctx;
+ l->data = data;
+ l->handshaked = 0;
+ lbuf_init(l->rbuf);
+ lbuf_init(l->wbuf);
+ tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28);
+ return l;
+}
+
+
+#define inc(n) do { buf+=n; len-=n; } while(0)
+
+static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) {
+ int ok = 0;
+ while(len > 0) {
+ // Space, ignore
+ if(*buf == ' ')
+ continue;
+
+ // Version
+ if(len > 4 && strncmp(buf, "ver,", 4) == 0) {
+ inc(3);
+ while(len && *buf != ' ') {
+ if(len >= 3 && strncmp(buf, ",1.", 3) == 0)
+ ok |= 1;
+ inc(1);
+ }
+ }
+
+ // Ser[io]
+ if(len > 5 && (strncmp(buf, "seri,", 5) == 0 || strncmp(buf, "sero,", 5) == 0)) {
+ int flag = buf[3] == 'i' ? 2 : 4;
+ inc(4);
+ while(len && *buf != ' ') {
+ if(len >= 5 && strncmp(buf, ",json", 5) == 0)
+ ok |= flag;
+ inc(1);
+ }
+ }
+ }
+
+ /* TODO
+ if(ok != 1|2|4) {
+ error(!(ok&1) ? "No or invalid version" : !(ok&2) : "No common input format" : "No common output format");
+ */
+}
+
+#undef inc
+
+
+static int tn_link_handleread(tn_link *l, const char *buf, int len) {
+ // All messages end with a newline (in the current protocol), so that's
+ // easy to detect.
+ char *end;
+ if((end = memchr(buf, '\n', len)) == NULL)
+ return 0;
+ len = end-buf;
+ // Throw away '\r' if there is one
+ int msglen = len > 0 && buf[len-1] == '\r' ? len-1 : len;
+ if(!l->handshaked)
+ tn_link_handlehandshake(l, buf, msglen);
+ else
+ {} // TODO: regular message handling
+
+ return len+1;
+}
+
+
+// Access to this call must be serialized. (Hardly makes sense to call multiple
+// _reads() in parallel anyway).
+// ->dispatch() or ->write() callbacks may be run as a result of calling this
+// function, so these callbacks must not call any tn_link* functions again!
+void tn_link_read(tn_link *l, const char *buf, int len) {
+ // Don't copy things to the read buffer if we have a full message
+ int n=1;
+ while(!l->rbuf_len && len > 0 && n) {
+ n = tn_link_handleread(l, buf, len);
+ buf += n;
+ len -= n;
+ }
+
+ // Copy leftover data to the read buffer
+ if(len)
+ lbuf_append(l->rbuf, buf, len);
+
+ // Handle / remove stuff from the read buffer
+ // TODO: tn_link_handleread() does a linear search on the buffer to find
+ // '\n'. It's not very efficient if it has to do this all over again for
+ // every increase in the buffer, since only the newly appended bytes
+ // actually have to be checked.
+ int r=0;
+ n=1;
+ while(l->rbuf_len-r > 0 && n) {
+ n = tn_link_handleread(l, l->rbuf+r, l->rbuf_len-r);
+ r += n;
+ }
+ if(r > 0) {
+ memmove(l->rbuf, l->rbuf+r, l->rbuf_len-r);
+ l->rbuf_len -= r;
+ }
+}
+
+
+void tn_link_close(tn_link *l) {
+ mutex_lock(l->lock);
+ lbuf_free(l->wbuf);
+ mutex_unlock(l->lock);
+ lbuf_free(l->rbuf);
+
+ if(l->ctx->close)
+ l->ctx->close(l, l->data);
+ free(l);
+}
+
+
// vim:noet:sw=4:ts=4
diff --git a/c/tanja.h b/c/tanja.h
index 6fddebc..dbe690e 100644
--- a/c/tanja.h
+++ b/c/tanja.h
@@ -51,6 +51,7 @@ typedef struct {
typedef struct tn_returnpath tn_returnpath;
typedef struct tn_node tn_node;
typedef struct tn_session tn_session;
+typedef struct tn_link tn_link;
typedef struct {
@@ -58,6 +59,12 @@ typedef struct {
void (*close)(tn_session *, void *);
} tn_session_context;
+typedef struct {
+ void (*dispatch)(tn_link *, void *); // if NULL, then write() will be called immediately
+ void (*write)(tn_link *, char *, int, void *);
+ void (*close)(tn_link *, void *);
+} tn_link_context;
+
typedef void (*tn_reply_cb)(tn_session *, tn_tuple *, void *);
typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, tn_returnpath *, void *);
@@ -87,4 +94,9 @@ void tn_session_unregister(tn_session *, int);
void tn_session_dispatch(tn_session *);
void tn_session_close(tn_session *);
+
+tn_link *tn_link_create(tn_node *, int, tn_link_context *, void *);
+void tn_link_read(tn_link *, const char *, int);
+void tn_link_close(tn_link *);
+
// vim:noet:sw=4:ts=4