diff options
author | Yorhel <git@yorhel.nl> | 2012-02-27 20:42:29 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-02-27 20:42:29 +0100 |
commit | 9b524a9c112cb8381df3a7f6f367316924042ce6 (patch) | |
tree | 47a11f5b9f0fa77932514fe214f5cf08c45f0ae8 | |
parent | 51e390e648fde3ca0173ba2cb40b798544d07891 (diff) |
c: Unregister patterns and close/free queued messages in tn_session_close()
-rw-r--r-- | c/tanja.c | 51 |
1 files changed, 40 insertions, 11 deletions
@@ -526,18 +526,33 @@ static int tn_node_register(tn_node *n, patternreg *r) { } +// TODO: notify links +static inline void tn_node_dounreg(tn_node *n, patternreg *r, khiter_t k) { + kh_del(pr, n->regs, k); + atomic_dec(r->active); + tn_tuple_unref(r->pat); + r->pat = NULL; + patternreg_unref(r); +} + static void tn_node_unregister(tn_node *n, int id) { mutex_lock(n->lock); khiter_t k = kh_get(pr, n->regs, id); if(k != kh_end(n->regs)) { patternreg *r = kh_val(n->regs, k); - kh_del(pr, n->regs, k); - atomic_dec(r->active); - tn_tuple_unref(r->pat); - r->pat = NULL; - patternreg_unref(r); + tn_node_dounreg(n, r, k); + } + mutex_unlock(n->lock); +} - // TODO: notify links + +static void tn_node_unregsession(tn_node *n, tn_session *s) { + mutex_lock(n->lock); + khiter_t k = kh_begin(n->regs); + for(; k != kh_end(n->regs); ++k) { + patternreg *r = kh_exist(n->regs, k) ? kh_val(n->regs, k) : NULL; + if(r && r->type == 0 && r->recipient == s) + tn_node_dounreg(n, r, k); // khash allows deletion from within a loop, yay! } mutex_unlock(n->lock); } @@ -548,8 +563,8 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) { mutex_lock(n->lock); khiter_t k = kh_begin(n->regs); for(; k != kh_end(n->regs); ++k) { - patternreg *r = kh_val(n->regs, k); - if(!kh_exist(n->regs, k) || !tn_tuple_match(r->pat, tup)) + patternreg *r = kh_exist(n->regs, k) ? kh_val(n->regs, k) : NULL; + if(!r || !tn_tuple_match(r->pat, tup)) continue; if(rp && r->willreply) tn_returnpath_open(rp); @@ -581,7 +596,7 @@ struct sesmsg { }; struct tn_session { - mutex_t(lock); + mutex_t(lock); // Protects q_begin and q_end tn_node *node; tn_session_context *ctx; sesmsg *q_begin; @@ -647,8 +662,22 @@ void tn_session_unregister(tn_session *s, int id) { void tn_session_close(tn_session *s) { assert(s); - // TODO: unregister any patterns - // TODO: free the message queue and close any return-paths + + tn_node_unregsession(s->node, s); + + // free the message queue and close any return-paths + mutex_lock(s->lock); + sesmsg *n, *c; + for(n=s->q_begin; n; n=c) { + c = n->next; + if(n->ret) + tn_reply_close(n->ret); + tn_tuple_unref(n->tup); + free(n); + } + s->q_begin = s->q_end = NULL; + mutex_unlock(s->lock); + if(s->ctx->close) s->ctx->close(s, s->data); atomic_dec(s->active); |