diff options
author | Yorhel <git@yorhel.nl> | 2012-03-01 19:55:08 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-01 19:55:08 +0100 |
commit | bdde01b591a6b0a65fb100d83a7a7a604d2da820 (patch) | |
tree | a5239e63ed4056ae99aa1364136428d9d1cb15f6 | |
parent | b98655e0b203f84e31b25a697a9f6215097ea460 (diff) |
c: Started on the link implementation
Emphasis on "started", I don't even have the means yet to test it.
There's also no error reporting at all yet.
-rw-r--r-- | c/tanja.c | 190 | ||||
-rw-r--r-- | c/tanja.h | 12 |
2 files changed, 202 insertions, 0 deletions
@@ -803,4 +803,194 @@ void tn_session_close(tn_session *s) { tn_session_unref(s); } + + + + + +// Link + +// TODO: probably needs to be refcounted to correctly handle return paths +struct tn_link { + mutex_t(lock); // protects wbuf_* + tn_node *node; + tn_link_context *ctx; + void *data; + char *rbuf; + int rbuf_len; + int rbuf_size; + char *wbuf; + int wbuf_len; + int wbuf_size; + unsigned handshaked : 1; + unsigned sync : 1; +}; + + +// Tiny buffer abstraction + +#define lbuf_init(b) do {\ + b = NULL;\ + b##_size = b##_len = 0;\ + } while(0) + +#define lbuf_free(b) do {\ + if(b)\ + free(b);\ + lbuf_init(b);\ + } while(0) + +#define lbuf_grow(b) do {\ + if(b##_size < 16)\ + b##_size = 16;\ + else\ + b##_size *= 2;\ + b = realloc(b, b##_size);\ + } while(0) + +#define lbuf_append(b, buf, len) do {\ + if(b##_size < b##_len + len)\ + lbuf_grow(b);\ + memcpy(b+b##_len, buf, len);\ + b##_len += len;\ + } while(0) + + +static inline void tn_link_write(tn_link *l, char *msg, int len) { + // No dispatcher? Then assume we can call ctx->write() directly + if(!l->ctx->dispatch) { + l->ctx->write(l, msg, len, l->data); + return; + } + // Otherwise, append to the buffer and call the dispatcher + mutex_lock(l->lock); + int empty = !l->wbuf_len; + lbuf_append(l->wbuf, msg, len); + mutex_unlock(l->lock); + if(empty) + l->ctx->dispatch(l, l->data); +} + + +// TODO: error & "link_ready" callback +// TODO: configure a maximum buffer size? +tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data) { + tn_link *l = malloc(sizeof(tn_link)); + l->node = n; + l->sync = !!sync; + l->ctx = ctx; + l->data = data; + l->handshaked = 0; + lbuf_init(l->rbuf); + lbuf_init(l->wbuf); + tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28); + return l; +} + + +#define inc(n) do { buf+=n; len-=n; } while(0) + +static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) { + int ok = 0; + while(len > 0) { + // Space, ignore + if(*buf == ' ') + continue; + + // Version + if(len > 4 && strncmp(buf, "ver,", 4) == 0) { + inc(3); + while(len && *buf != ' ') { + if(len >= 3 && strncmp(buf, ",1.", 3) == 0) + ok |= 1; + inc(1); + } + } + + // Ser[io] + if(len > 5 && (strncmp(buf, "seri,", 5) == 0 || strncmp(buf, "sero,", 5) == 0)) { + int flag = buf[3] == 'i' ? 2 : 4; + inc(4); + while(len && *buf != ' ') { + if(len >= 5 && strncmp(buf, ",json", 5) == 0) + ok |= flag; + inc(1); + } + } + } + + /* TODO + if(ok != 1|2|4) { + error(!(ok&1) ? "No or invalid version" : !(ok&2) : "No common input format" : "No common output format"); + */ +} + +#undef inc + + +static int tn_link_handleread(tn_link *l, const char *buf, int len) { + // All messages end with a newline (in the current protocol), so that's + // easy to detect. + char *end; + if((end = memchr(buf, '\n', len)) == NULL) + return 0; + len = end-buf; + // Throw away '\r' if there is one + int msglen = len > 0 && buf[len-1] == '\r' ? len-1 : len; + if(!l->handshaked) + tn_link_handlehandshake(l, buf, msglen); + else + {} // TODO: regular message handling + + return len+1; +} + + +// Access to this call must be serialized. (Hardly makes sense to call multiple +// _reads() in parallel anyway). +// ->dispatch() or ->write() callbacks may be run as a result of calling this +// function, so these callbacks must not call any tn_link* functions again! +void tn_link_read(tn_link *l, const char *buf, int len) { + // Don't copy things to the read buffer if we have a full message + int n=1; + while(!l->rbuf_len && len > 0 && n) { + n = tn_link_handleread(l, buf, len); + buf += n; + len -= n; + } + + // Copy leftover data to the read buffer + if(len) + lbuf_append(l->rbuf, buf, len); + + // Handle / remove stuff from the read buffer + // TODO: tn_link_handleread() does a linear search on the buffer to find + // '\n'. It's not very efficient if it has to do this all over again for + // every increase in the buffer, since only the newly appended bytes + // actually have to be checked. + int r=0; + n=1; + while(l->rbuf_len-r > 0 && n) { + n = tn_link_handleread(l, l->rbuf+r, l->rbuf_len-r); + r += n; + } + if(r > 0) { + memmove(l->rbuf, l->rbuf+r, l->rbuf_len-r); + l->rbuf_len -= r; + } +} + + +void tn_link_close(tn_link *l) { + mutex_lock(l->lock); + lbuf_free(l->wbuf); + mutex_unlock(l->lock); + lbuf_free(l->rbuf); + + if(l->ctx->close) + l->ctx->close(l, l->data); + free(l); +} + + // vim:noet:sw=4:ts=4 @@ -51,6 +51,7 @@ typedef struct { typedef struct tn_returnpath tn_returnpath; typedef struct tn_node tn_node; typedef struct tn_session tn_session; +typedef struct tn_link tn_link; typedef struct { @@ -58,6 +59,12 @@ typedef struct { void (*close)(tn_session *, void *); } tn_session_context; +typedef struct { + void (*dispatch)(tn_link *, void *); // if NULL, then write() will be called immediately + void (*write)(tn_link *, char *, int, void *); + void (*close)(tn_link *, void *); +} tn_link_context; + typedef void (*tn_reply_cb)(tn_session *, tn_tuple *, void *); typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, tn_returnpath *, void *); @@ -87,4 +94,9 @@ void tn_session_unregister(tn_session *, int); void tn_session_dispatch(tn_session *); void tn_session_close(tn_session *); + +tn_link *tn_link_create(tn_node *, int, tn_link_context *, void *); +void tn_link_read(tn_link *, const char *, int); +void tn_link_close(tn_link *); + // vim:noet:sw=4:ts=4 |