diff options
author | Yorhel <git@yorhel.nl> | 2012-03-21 11:12:40 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-21 11:13:17 +0100 |
commit | 868d779c5b9c66f1d983c73ba4e53863964805b0 (patch) | |
tree | 1491da5b0875738c93807d5103d858626bada4e6 | |
parent | 1bfbf4a063b4e1a99ecfa81503c80ccbfe91ee5d (diff) |
link: Forward remote returnpaths to the local code
Functionality-wise, this finalizes the link code. There's still a lot of
testing to do, and no doubt can there be a few more useful features.
-rw-r--r-- | tanja.c | 64 | ||||
-rw-r--r-- | test/link.c | 27 |
2 files changed, 82 insertions, 9 deletions
@@ -70,6 +70,9 @@ static void tn_link_reply(tn_link *, tn_tuple *, tn_returnpath *); // Map of integers to patternreg pointers KHASH_INIT(pr, int, patternreg*, 1, kh_int_hash_func2, kh_int_hash_equal) +// Map of integers to returnpath pointers +KHASH_INIT(rp, int, tn_returnpath*, 1, kh_int_hash_func2, kh_int_hash_equal) + // Map of integers to integers KHASH_INIT(ii, int, int, 1, kh_int_hash_func2, kh_int_hash_equal) @@ -1324,7 +1327,9 @@ struct tn_link { unsigned readydone : 1; unsigned active : 1; int catchall; // registration ID of our catch-all tuple (0 if not registered) + int lastid; // last used ID for an outgoing tuple with a returnpath khash_t(ii) *regs; // map of remote pattern IDs to local IDs + khash_t(rp) *rets; // map of remote returnpath IDs to local returnpaths void *lasttup; // last received tuple (void because it's only used for pointer comparison) int ref; }; @@ -1371,8 +1376,10 @@ tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) { lbuf_init(l->rbuf); lbuf_init(l->wbuf); lbuf_init(l->tbuf); + l->lastid = 0; l->catchall = 0; l->regs = kh_init(ii); + l->rets = kh_init(rp); return l; } @@ -1405,7 +1412,13 @@ void tn_link_set_error(tn_link *l, int code, const char *msg) { mutex_unlock(l->node->lock); + // Close any local return paths + for(k=kh_begin(l->rets); k!=kh_end(l->regs); k++) + if(kh_exist(l->rets, k)) + tn_reply_close(kh_val(l->rets, k)); + kh_destroy(ii, l->regs); + kh_destroy(rp, l->rets); l->regs = NULL; l->catchall = 0; lbuf_free(l->wbuf); @@ -1570,10 +1583,25 @@ static void tn_link_node_recv(tn_link *l, tn_tuple *tup, tn_returnpath *r) { mutex_lock(l->lock); if(l->active && tup != l->lasttup) { l->lasttup = tup; - // TODO: actually do something with the return path. - tn_reply_close(r); - lbuf_append(l->wbuf, "[5,0,", 5); + // Handle return path + int id = 0; + if(r) { + do { + if(++l->lastid <= 0) + l->lastid = 1; + } while(kh_get(rp, l->rets, l->lastid) != kh_end(l->rets)); + id = l->lastid; + + int ret; + khiter_t k = kh_put(rp, l->rets, id, &ret); + kh_val(l->rets, k) = r; + } + + // Send message + char prefix[30]; + int n = snprintf(prefix, 30, "[5,%d,", id); + lbuf_append(l->wbuf, prefix, n); tn_json_fmt_buf(tup, &l->wbuf); lbuf_append(l->wbuf, "]\n", 2); tn_link_write(l); @@ -1850,12 +1878,34 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t tn_node_send(l->node, tup, arg > 0 ? tn_returnpath_create_link(l, arg) : NULL, l); break; - case 6: // reply tid tuple - // TODO + case 6: { // reply tid tuple + tn_returnpath *r = NULL; + mutex_lock(l->lock); + if(l->active) { + khiter_t k = kh_get(rp, l->rets, arg); + if(k != kh_end(l->rets)) + r = kh_val(l->rets, k); + } + tn_reply(r, tup); + mutex_unlock(l->lock); break; - case 7: // close tid - // TODO + } + + case 7: { // close tid + tn_returnpath *r = NULL; + mutex_lock(l->lock); + if(l->active) { + khiter_t k = kh_get(rp, l->rets, arg); + if(k != kh_end(l->rets)) { + r = kh_val(l->rets, k); + kh_del(rp, l->rets, k); + } + } + tn_reply_close(r); + mutex_unlock(l->lock); break; + } + default: assert(0); } diff --git a/test/link.c b/test/link.c index ff64a27..8ce964d 100644 --- a/test/link.c +++ b/test/link.c @@ -76,6 +76,30 @@ static void tup_hallo(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { } +static void tup_request_reply(tn_session *s, tn_tuple *tup, void *d) { + assert(s == ses); + if(tup) { + char *t = tn_json_fmt(tup); + fprintf(stderr, "Reply to path #%d: %s\n", (int)(long)d, t); + free(t); + tn_tuple_unref(tup); + } else + fprintf(stderr, "Path #%d closed.\n", (int)(long)d); +} + + +// Upon receicing this, the session will send out a "please" tuple, for which +// it expects to receive a reply. +static void tup_request(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { + static long num = 0; + assert(s == ses); + assert(p == NULL); + assert(d == NULL); + tn_session_send(ses, tn_tuple_new("s", strdup("please")), tup_request_reply, (void *)++num); + tn_tuple_unref(tup); +} + + static void lnk_error(tn_link *l, int code, char *msg) { assert(l == lnk); fprintf(stderr, "Link error #%d: %s\n", code, msg?msg:"(empty)"); @@ -103,8 +127,7 @@ int main() { tn_session_register(ses, tn_tuple_new(""), 0, tup_all, NULL); tn_session_register(ses, tn_tuple_new("s", strdup("quit")), 0, tup_quit, NULL); tn_session_register(ses, tn_tuple_new("s", strdup("hallo")), 1, tup_hallo, NULL); - // TODO: also register patterns with a handler that replies to a return path - // and one that sends out a tuple itself. + tn_session_register(ses, tn_tuple_new("s", strdup("request")), 0, tup_request, NULL); // Create a link with STDIO pthread_t thb; |