diff options
author | Yorhel <git@yorhel.nl> | 2012-02-27 21:56:59 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-02-27 22:00:58 +0100 |
commit | c25b9aade0cff48672ba0c362fca5546a3032e51 (patch) | |
tree | c0318cb5f26436a8bfb979b2f11ceae50f5a9587 | |
parent | 9b524a9c112cb8381df3a7f6f367316924042ce6 (diff) |
c: Implemented dispatcher for tuples and replies and for sessions
There's a lot of tricky stuff going on to make this thread-safe. Meh :(
-rw-r--r-- | c/tanja.c | 130 | ||||
-rw-r--r-- | c/tanja.h | 5 |
2 files changed, 122 insertions, 13 deletions
@@ -30,8 +30,11 @@ // Forward declarations to make things work +typedef struct patternreg patternreg; static inline void tn_session_ref(tn_session *); static inline void tn_session_unref(tn_session *); +static void tn_session_recv(tn_session *, tn_tuple *, patternreg *, tn_returnpath *); +static void tn_session_reply(tn_session *, tn_tuple *, tn_returnpath *); @@ -370,22 +373,31 @@ struct tn_returnpath { union { struct { tn_session *s; - tn_tuple_cb cb; + tn_reply_cb cb; void *data; } s; } t; }; +// TODO: _ref() the tuple or "pass the reference"? void tn_reply(tn_returnpath *rp, tn_tuple *t) { - rp++; t++; - // TODO: Notify session/link + assert(rp); + assert(t); + if(rp->type == 0) + tn_session_reply(rp->t.s.s, t, rp); + else + {} // TODO: Notify link } void tn_reply_close(tn_returnpath *rp) { + assert(rp); if(!atomic_dec(rp->ref)) { - // TODO: notify session/link + if(rp->type == 0) + tn_session_reply(rp->t.s.s, NULL, rp); + else + {} // TODO: notify link } } @@ -395,7 +407,7 @@ static inline void tn_returnpath_open(tn_returnpath *rp) { } -static inline tn_returnpath *tn_returnpath_create_ses(tn_session *ses, tn_tuple_cb cb, void *dat) { +static inline tn_returnpath *tn_returnpath_create_ses(tn_session *ses, tn_reply_cb cb, void *dat) { tn_returnpath *rp = malloc(sizeof(tn_returnpath)); rp->ref = 0; // No senders yet rp->type = 0; @@ -421,7 +433,7 @@ static inline void tn_returnpath_free(tn_returnpath *rp) { // A registered pattern -typedef struct { +struct patternreg { tn_tuple *pat; // Only accessed by the node, only valid if active=1 int active; // Atomic int, whether the pattern is still registered or not int ref; @@ -431,7 +443,7 @@ typedef struct { // For sessions, links don't need any additional data tn_tuple_cb cb; void *data; -} patternreg; +}; static inline patternreg *patternreg_create_ses(tn_tuple *pat, int willreply, tn_session *ses, tn_tuple_cb cb, void *dat) { @@ -568,7 +580,10 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { continue; if(rp && r->willreply) tn_returnpath_open(rp); - // TODO: notify the respective sessions/links + if(r->type == 0) + tn_session_recv(r->recipient, tup, r, r->willreply ? rp : NULL); + else + {} // TODO: notify link } mutex_unlock(n->lock); @@ -588,11 +603,14 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { // Session +// A queued "message" for the session: either a regular tuple send() or a reply() typedef struct sesmsg sesmsg; struct sesmsg { sesmsg *next; tn_tuple *tup; + patternreg *reg; // If this is a regular send tn_returnpath *ret; // If this is a regular send that wishes to receive a reply. + tn_returnpath *rep; // If this *is* a reply }; struct tn_session { @@ -639,7 +657,7 @@ tn_session *tn_session_create(tn_node *n, tn_session_context *ctx, void *data) { } -void tn_session_send(tn_session *s, tn_tuple *t, tn_tuple_cb cb, void *dat) { +void tn_session_send(tn_session *s, tn_tuple *t, tn_reply_cb cb, void *dat) { assert(s); assert(t); tn_node_send(s->node, t, cb ? tn_returnpath_create_ses(s, cb, dat) : NULL); @@ -660,6 +678,93 @@ void tn_session_unregister(tn_session *s, int id) { } +// Assumes s->lock is held, and will unlock it. +static inline void tn_session_appendmsg(tn_session *s, sesmsg *m) { + m->next = NULL; + int empty = !!s->q_begin; + if(empty) + s->q_begin = m; + else + s->q_end->next = m; + s->q_end = m; + mutex_unlock(s->lock); + if(empty) + s->ctx->dispatch(s, s->data); +} + +// Called from the node, when we've received a tuple. Note that *ret has +// already been open()'ed for this session. +static void tn_session_recv(tn_session *s, tn_tuple *tup, patternreg *pr, tn_returnpath *ret) { + tn_tuple_ref(tup); + patternreg_ref(pr); + sesmsg *m = malloc(sizeof(sesmsg)); + m->tup = tup; + m->reg = pr; + m->ret = ret; + m->rep = NULL; + mutex_lock(s->lock); + tn_session_appendmsg(s, m); +} + + +// Called when we've received a reply on one of our return-paths. +static void tn_session_reply(tn_session *s, tn_tuple *tup, tn_returnpath *rep) { + // Session has been close()'ed, ignore this message + mutex_lock(s->lock); + if(!s->active) { + mutex_unlock(s->lock); + tn_tuple_unref(tup); + if(!tup) + tn_returnpath_free(rep); + return; + } + // Otherwise, queue it + sesmsg *m = malloc(sizeof(sesmsg)); + m->tup = tup; + m->reg = NULL; + m->ret = NULL; + m->rep = rep; + tn_session_appendmsg(s, m); +} + + +static inline void tn_session_dispatch_one(tn_session *s, sesmsg *m) { + // Regular send() + if(m->reg) { + if(m->reg->active) // If the pattern hasn't been unregistered in the meantime + m->reg->cb(s, m->tup, m->ret, m->reg->data); // Pass our reference on m->tup to the application + else { + tn_tuple_unref(m->tup); + if(m->ret) + tn_reply_close(m->ret); + } + patternreg_unref(m->reg); + + // Reply + } else { + m->rep->t.s.cb(s, m->tup, m->rep->t.s.data); // Again, pass our reference on m->tup + if(!m->tup) + tn_returnpath_free(m->rep); + } + free(m); +} + + +void tn_session_dispatch(tn_session *s) { + sesmsg *q; + do { + // Pop item from the start of the queue + mutex_lock(s->lock); + assert(s->active || !s->q_begin); // Queue must be (and stay) empty if the session has been closed + q = s->q_begin; + s->q_begin = q->next; + mutex_unlock(s->lock); + if(q) + tn_session_dispatch_one(s, q); + } while(q); +} + + void tn_session_close(tn_session *s) { assert(s); @@ -667,12 +772,16 @@ void tn_session_close(tn_session *s) { // free the message queue and close any return-paths mutex_lock(s->lock); + atomic_dec(s->active); sesmsg *n, *c; for(n=s->q_begin; n; n=c) { c = n->next; if(n->ret) tn_reply_close(n->ret); - tn_tuple_unref(n->tup); + if(n->rep && !n->tup) // close, free the return-path + tn_returnpath_free(n->rep); + if(n->tup) + tn_tuple_unref(n->tup); free(n); } s->q_begin = s->q_end = NULL; @@ -680,7 +789,6 @@ void tn_session_close(tn_session *s) { if(s->ctx->close) s->ctx->close(s, s->data); - atomic_dec(s->active); tn_session_unref(s); } @@ -58,7 +58,8 @@ typedef struct { void (*close)(tn_session *, void *); } tn_session_context; -typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, void *); +typedef void (*tn_reply_cb)(tn_session *, tn_tuple *, void *); +typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, tn_returnpath *, void *); void tn_element_free(tn_element); void tn_tuple_ref(tn_tuple *); @@ -80,7 +81,7 @@ void tn_node_ref(tn_node *); void tn_node_unref(tn_node *); tn_session *tn_session_create(tn_node *, tn_session_context *, void *); -void tn_session_send(tn_session *, tn_tuple *, tn_tuple_cb, void *dat); +void tn_session_send(tn_session *, tn_tuple *, tn_reply_cb, void *dat); int tn_session_register(tn_session *, tn_tuple *, int, tn_tuple_cb, void *); void tn_session_unregister(tn_session *, int); void tn_session_close(tn_session *); |