summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-20 21:55:20 +0100
committerYorhel <git@yorhel.nl>2012-03-20 21:55:20 +0100
commitd16390981ed9105d168980a8a2d6f2365167c754 (patch)
tree13fc1418e1736c4bf931d52d9b661ed7cf326c12
parentfea086f3657630b2f9db9c80cab0452d4d3706c6 (diff)
link: Send tuples from the local node to the network
Doesn't do any return-path stuff yet.
-rw-r--r--tanja.c39
1 files changed, 30 insertions, 9 deletions
diff --git a/tanja.c b/tanja.c
index a37e2b0..6b9fafb 100644
--- a/tanja.c
+++ b/tanja.c
@@ -60,8 +60,9 @@
typedef struct patternreg patternreg;
static void tn_session_recv(tn_session *, tn_tuple *, patternreg *, tn_returnpath *);
static void tn_session_reply(tn_session *, tn_tuple *, tn_returnpath *);
-static void tn_link_node_reg(tn_link *l, int id, patternreg *p);
-static void tn_link_node_unreg(tn_link *l, int id, patternreg *p);
+static void tn_link_node_reg(tn_link *, int, patternreg *);
+static void tn_link_node_unreg(tn_link *, int, patternreg *);
+static void tn_link_node_recv(tn_link *, tn_tuple *, tn_returnpath *);
@@ -915,7 +916,7 @@ static inline patternreg *patternreg_create_link(tn_tuple *pat, tn_link *l) {
r->active = 1;
r->type = 1;
r->pat = pat;
- r->willreply = 0;
+ r->willreply = 1;
r->recipient = l;
return r;
}
@@ -1042,7 +1043,7 @@ static void tn_node_unregsession(tn_node *n, tn_session *s) {
// tn_session_send(.., tn_tuple_new(..), ..);
//
// TODO: don't send tuples back to the link it came from
-static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
+static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp, tn_link *src) {
mutex_lock(n->lock);
khiter_t k = kh_begin(n->regs);
for(; k != kh_end(n->regs); ++k) {
@@ -1053,8 +1054,8 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
tn_returnpath_open(rp);
if(r->type == 0)
tn_session_recv(r->recipient, tup, r, r->willreply ? rp : NULL);
- else
- {} // TODO: notify link
+ else if(r->recipient != src)
+ tn_link_node_recv(r->recipient, tup, rp);
}
mutex_unlock(n->lock);
@@ -1133,7 +1134,7 @@ tn_session *tn_session_create(tn_node *n, tn_session_dispatch_cb dis, void *data
void tn_session_send(tn_session *s, tn_tuple *t, tn_reply_cb cb, void *dat) {
assert(s);
assert(t);
- tn_node_send(s->node, t, cb ? tn_returnpath_create_ses(s, cb, dat) : NULL);
+ tn_node_send(s->node, t, cb ? tn_returnpath_create_ses(s, cb, dat) : NULL, NULL);
}
@@ -1306,6 +1307,7 @@ struct tn_link {
unsigned active : 1;
int catchall; // registration ID of our catch-all tuple (0 if not registered)
khash_t(ii) *regs; // map of remote pattern IDs to local IDs
+ void *lasttup; // last received tuple (void because it's only used for pointer comparison)
int ref;
};
@@ -1330,7 +1332,6 @@ void tn_link_unref(tn_link *l) {
}
-// TODO: "link_ready" callback
// TODO: configure a maximum buffer size?
tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) {
tn_link *l = malloc(sizeof(tn_link));
@@ -1346,6 +1347,7 @@ tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) {
l->active = 1;
l->sync = 1;
l->synced = l->readydone = 0;
+ l->lasttup = NULL;
tn_node_ref(n);
mutex_init(l->lock);
lbuf_init(l->rbuf);
@@ -1544,6 +1546,25 @@ static void tn_link_node_unreg(tn_link *l, int id, patternreg *p) {
}
+// We've received a tuple from the node (couldn't have been ours, the node
+// filters those out automatically).
+static void tn_link_node_recv(tn_link *l, tn_tuple *tup, tn_returnpath *r) {
+ mutex_lock(l->lock);
+ if(l->active && tup != l->lasttup) {
+ l->lasttup = tup;
+ // TODO: actually do something with the return path.
+ tn_reply_close(r);
+
+ lbuf_append(l->wbuf, "[5,0,", 5);
+ tn_json_fmt_buf(tup, &l->wbuf);
+ lbuf_append(l->wbuf, "]\n", 2);
+ tn_link_write(l);
+ } else
+ tn_reply_close(r);
+ mutex_unlock(l->lock);
+}
+
+
#define inc(n) do { buf+=n; len-=n; } while(0)
// Called while the lock is held.
@@ -1786,7 +1807,7 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t
case 5: // tuple tid tuple
// TODO: create a return path if tid > 0
- tn_node_send(l->node, tup, NULL);
+ tn_node_send(l->node, tup, NULL, l);
break;
case 6: // reply tid tuple
// TODO