summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-21 11:12:40 +0100
committerYorhel <git@yorhel.nl>2012-03-21 11:13:17 +0100
commit868d779c5b9c66f1d983c73ba4e53863964805b0 (patch)
tree1491da5b0875738c93807d5103d858626bada4e6
parent1bfbf4a063b4e1a99ecfa81503c80ccbfe91ee5d (diff)
link: Forward remote returnpaths to the local code
Functionality-wise, this finalizes the link code. There's still a lot of testing to do, and no doubt can there be a few more useful features.
-rw-r--r--tanja.c64
-rw-r--r--test/link.c27
2 files changed, 82 insertions, 9 deletions
diff --git a/tanja.c b/tanja.c
index 155dea1..44e931f 100644
--- a/tanja.c
+++ b/tanja.c
@@ -70,6 +70,9 @@ static void tn_link_reply(tn_link *, tn_tuple *, tn_returnpath *);
// Map of integers to patternreg pointers
KHASH_INIT(pr, int, patternreg*, 1, kh_int_hash_func2, kh_int_hash_equal)
+// Map of integers to returnpath pointers
+KHASH_INIT(rp, int, tn_returnpath*, 1, kh_int_hash_func2, kh_int_hash_equal)
+
// Map of integers to integers
KHASH_INIT(ii, int, int, 1, kh_int_hash_func2, kh_int_hash_equal)
@@ -1324,7 +1327,9 @@ struct tn_link {
unsigned readydone : 1;
unsigned active : 1;
int catchall; // registration ID of our catch-all tuple (0 if not registered)
+ int lastid; // last used ID for an outgoing tuple with a returnpath
khash_t(ii) *regs; // map of remote pattern IDs to local IDs
+ khash_t(rp) *rets; // map of remote returnpath IDs to local returnpaths
void *lasttup; // last received tuple (void because it's only used for pointer comparison)
int ref;
};
@@ -1371,8 +1376,10 @@ tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) {
lbuf_init(l->rbuf);
lbuf_init(l->wbuf);
lbuf_init(l->tbuf);
+ l->lastid = 0;
l->catchall = 0;
l->regs = kh_init(ii);
+ l->rets = kh_init(rp);
return l;
}
@@ -1405,7 +1412,13 @@ void tn_link_set_error(tn_link *l, int code, const char *msg) {
mutex_unlock(l->node->lock);
+ // Close any local return paths
+ for(k=kh_begin(l->rets); k!=kh_end(l->regs); k++)
+ if(kh_exist(l->rets, k))
+ tn_reply_close(kh_val(l->rets, k));
+
kh_destroy(ii, l->regs);
+ kh_destroy(rp, l->rets);
l->regs = NULL;
l->catchall = 0;
lbuf_free(l->wbuf);
@@ -1570,10 +1583,25 @@ static void tn_link_node_recv(tn_link *l, tn_tuple *tup, tn_returnpath *r) {
mutex_lock(l->lock);
if(l->active && tup != l->lasttup) {
l->lasttup = tup;
- // TODO: actually do something with the return path.
- tn_reply_close(r);
- lbuf_append(l->wbuf, "[5,0,", 5);
+ // Handle return path
+ int id = 0;
+ if(r) {
+ do {
+ if(++l->lastid <= 0)
+ l->lastid = 1;
+ } while(kh_get(rp, l->rets, l->lastid) != kh_end(l->rets));
+ id = l->lastid;
+
+ int ret;
+ khiter_t k = kh_put(rp, l->rets, id, &ret);
+ kh_val(l->rets, k) = r;
+ }
+
+ // Send message
+ char prefix[30];
+ int n = snprintf(prefix, 30, "[5,%d,", id);
+ lbuf_append(l->wbuf, prefix, n);
tn_json_fmt_buf(tup, &l->wbuf);
lbuf_append(l->wbuf, "]\n", 2);
tn_link_write(l);
@@ -1850,12 +1878,34 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t
tn_node_send(l->node, tup, arg > 0 ? tn_returnpath_create_link(l, arg) : NULL, l);
break;
- case 6: // reply tid tuple
- // TODO
+ case 6: { // reply tid tuple
+ tn_returnpath *r = NULL;
+ mutex_lock(l->lock);
+ if(l->active) {
+ khiter_t k = kh_get(rp, l->rets, arg);
+ if(k != kh_end(l->rets))
+ r = kh_val(l->rets, k);
+ }
+ tn_reply(r, tup);
+ mutex_unlock(l->lock);
break;
- case 7: // close tid
- // TODO
+ }
+
+ case 7: { // close tid
+ tn_returnpath *r = NULL;
+ mutex_lock(l->lock);
+ if(l->active) {
+ khiter_t k = kh_get(rp, l->rets, arg);
+ if(k != kh_end(l->rets)) {
+ r = kh_val(l->rets, k);
+ kh_del(rp, l->rets, k);
+ }
+ }
+ tn_reply_close(r);
+ mutex_unlock(l->lock);
break;
+ }
+
default:
assert(0);
}
diff --git a/test/link.c b/test/link.c
index ff64a27..8ce964d 100644
--- a/test/link.c
+++ b/test/link.c
@@ -76,6 +76,30 @@ static void tup_hallo(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
}
+static void tup_request_reply(tn_session *s, tn_tuple *tup, void *d) {
+ assert(s == ses);
+ if(tup) {
+ char *t = tn_json_fmt(tup);
+ fprintf(stderr, "Reply to path #%d: %s\n", (int)(long)d, t);
+ free(t);
+ tn_tuple_unref(tup);
+ } else
+ fprintf(stderr, "Path #%d closed.\n", (int)(long)d);
+}
+
+
+// Upon receicing this, the session will send out a "please" tuple, for which
+// it expects to receive a reply.
+static void tup_request(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
+ static long num = 0;
+ assert(s == ses);
+ assert(p == NULL);
+ assert(d == NULL);
+ tn_session_send(ses, tn_tuple_new("s", strdup("please")), tup_request_reply, (void *)++num);
+ tn_tuple_unref(tup);
+}
+
+
static void lnk_error(tn_link *l, int code, char *msg) {
assert(l == lnk);
fprintf(stderr, "Link error #%d: %s\n", code, msg?msg:"(empty)");
@@ -103,8 +127,7 @@ int main() {
tn_session_register(ses, tn_tuple_new(""), 0, tup_all, NULL);
tn_session_register(ses, tn_tuple_new("s", strdup("quit")), 0, tup_quit, NULL);
tn_session_register(ses, tn_tuple_new("s", strdup("hallo")), 1, tup_hallo, NULL);
- // TODO: also register patterns with a handler that replies to a return path
- // and one that sends out a tuple itself.
+ tn_session_register(ses, tn_tuple_new("s", strdup("request")), 0, tup_request, NULL);
// Create a link with STDIO
pthread_t thb;