diff options
author | Yorhel <git@yorhel.nl> | 2012-03-20 21:55:20 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-20 21:55:20 +0100 |
commit | d16390981ed9105d168980a8a2d6f2365167c754 (patch) | |
tree | 13fc1418e1736c4bf931d52d9b661ed7cf326c12 | |
parent | fea086f3657630b2f9db9c80cab0452d4d3706c6 (diff) |
link: Send tuples from the local node to the network
Doesn't do any return-path stuff yet.
-rw-r--r-- | tanja.c | 39 |
1 files changed, 30 insertions, 9 deletions
@@ -60,8 +60,9 @@ typedef struct patternreg patternreg; static void tn_session_recv(tn_session *, tn_tuple *, patternreg *, tn_returnpath *); static void tn_session_reply(tn_session *, tn_tuple *, tn_returnpath *); -static void tn_link_node_reg(tn_link *l, int id, patternreg *p); -static void tn_link_node_unreg(tn_link *l, int id, patternreg *p); +static void tn_link_node_reg(tn_link *, int, patternreg *); +static void tn_link_node_unreg(tn_link *, int, patternreg *); +static void tn_link_node_recv(tn_link *, tn_tuple *, tn_returnpath *); @@ -915,7 +916,7 @@ static inline patternreg *patternreg_create_link(tn_tuple *pat, tn_link *l) { r->active = 1; r->type = 1; r->pat = pat; - r->willreply = 0; + r->willreply = 1; r->recipient = l; return r; } @@ -1042,7 +1043,7 @@ static void tn_node_unregsession(tn_node *n, tn_session *s) { // tn_session_send(.., tn_tuple_new(..), ..); // // TODO: don't send tuples back to the link it came from -static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { +static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp, tn_link *src) { mutex_lock(n->lock); khiter_t k = kh_begin(n->regs); for(; k != kh_end(n->regs); ++k) { @@ -1053,8 +1054,8 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { tn_returnpath_open(rp); if(r->type == 0) tn_session_recv(r->recipient, tup, r, r->willreply ? rp : NULL); - else - {} // TODO: notify link + else if(r->recipient != src) + tn_link_node_recv(r->recipient, tup, rp); } mutex_unlock(n->lock); @@ -1133,7 +1134,7 @@ tn_session *tn_session_create(tn_node *n, tn_session_dispatch_cb dis, void *data void tn_session_send(tn_session *s, tn_tuple *t, tn_reply_cb cb, void *dat) { assert(s); assert(t); - tn_node_send(s->node, t, cb ? tn_returnpath_create_ses(s, cb, dat) : NULL); + tn_node_send(s->node, t, cb ? tn_returnpath_create_ses(s, cb, dat) : NULL, NULL); } @@ -1306,6 +1307,7 @@ struct tn_link { unsigned active : 1; int catchall; // registration ID of our catch-all tuple (0 if not registered) khash_t(ii) *regs; // map of remote pattern IDs to local IDs + void *lasttup; // last received tuple (void because it's only used for pointer comparison) int ref; }; @@ -1330,7 +1332,6 @@ void tn_link_unref(tn_link *l) { } -// TODO: "link_ready" callback // TODO: configure a maximum buffer size? tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) { tn_link *l = malloc(sizeof(tn_link)); @@ -1346,6 +1347,7 @@ tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) { l->active = 1; l->sync = 1; l->synced = l->readydone = 0; + l->lasttup = NULL; tn_node_ref(n); mutex_init(l->lock); lbuf_init(l->rbuf); @@ -1544,6 +1546,25 @@ static void tn_link_node_unreg(tn_link *l, int id, patternreg *p) { } +// We've received a tuple from the node (couldn't have been ours, the node +// filters those out automatically). +static void tn_link_node_recv(tn_link *l, tn_tuple *tup, tn_returnpath *r) { + mutex_lock(l->lock); + if(l->active && tup != l->lasttup) { + l->lasttup = tup; + // TODO: actually do something with the return path. + tn_reply_close(r); + + lbuf_append(l->wbuf, "[5,0,", 5); + tn_json_fmt_buf(tup, &l->wbuf); + lbuf_append(l->wbuf, "]\n", 2); + tn_link_write(l); + } else + tn_reply_close(r); + mutex_unlock(l->lock); +} + + #define inc(n) do { buf+=n; len-=n; } while(0) // Called while the lock is held. @@ -1786,7 +1807,7 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t case 5: // tuple tid tuple // TODO: create a return path if tid > 0 - tn_node_send(l->node, tup, NULL); + tn_node_send(l->node, tup, NULL, l); break; case 6: // reply tid tuple // TODO |