summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-27 20:42:29 +0100
committerYorhel <git@yorhel.nl>2012-02-27 20:42:29 +0100
commit9b524a9c112cb8381df3a7f6f367316924042ce6 (patch)
tree47a11f5b9f0fa77932514fe214f5cf08c45f0ae8
parent51e390e648fde3ca0173ba2cb40b798544d07891 (diff)
c: Unregister patterns and close/free queued messages in tn_session_close()
-rw-r--r--c/tanja.c51
1 files changed, 40 insertions, 11 deletions
diff --git a/c/tanja.c b/c/tanja.c
index f1ef460..6527e65 100644
--- a/c/tanja.c
+++ b/c/tanja.c
@@ -526,18 +526,33 @@ static int tn_node_register(tn_node *n, patternreg *r) {
}
+// TODO: notify links
+static inline void tn_node_dounreg(tn_node *n, patternreg *r, khiter_t k) {
+ kh_del(pr, n->regs, k);
+ atomic_dec(r->active);
+ tn_tuple_unref(r->pat);
+ r->pat = NULL;
+ patternreg_unref(r);
+}
+
static void tn_node_unregister(tn_node *n, int id) {
mutex_lock(n->lock);
khiter_t k = kh_get(pr, n->regs, id);
if(k != kh_end(n->regs)) {
patternreg *r = kh_val(n->regs, k);
- kh_del(pr, n->regs, k);
- atomic_dec(r->active);
- tn_tuple_unref(r->pat);
- r->pat = NULL;
- patternreg_unref(r);
+ tn_node_dounreg(n, r, k);
+ }
+ mutex_unlock(n->lock);
+}
- // TODO: notify links
+
+static void tn_node_unregsession(tn_node *n, tn_session *s) {
+ mutex_lock(n->lock);
+ khiter_t k = kh_begin(n->regs);
+ for(; k != kh_end(n->regs); ++k) {
+ patternreg *r = kh_exist(n->regs, k) ? kh_val(n->regs, k) : NULL;
+ if(r && r->type == 0 && r->recipient == s)
+ tn_node_dounreg(n, r, k); // khash allows deletion from within a loop, yay!
}
mutex_unlock(n->lock);
}
@@ -548,8 +563,8 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
mutex_lock(n->lock);
khiter_t k = kh_begin(n->regs);
for(; k != kh_end(n->regs); ++k) {
- patternreg *r = kh_val(n->regs, k);
- if(!kh_exist(n->regs, k) || !tn_tuple_match(r->pat, tup))
+ patternreg *r = kh_exist(n->regs, k) ? kh_val(n->regs, k) : NULL;
+ if(!r || !tn_tuple_match(r->pat, tup))
continue;
if(rp && r->willreply)
tn_returnpath_open(rp);
@@ -581,7 +596,7 @@ struct sesmsg {
};
struct tn_session {
- mutex_t(lock);
+ mutex_t(lock); // Protects q_begin and q_end
tn_node *node;
tn_session_context *ctx;
sesmsg *q_begin;
@@ -647,8 +662,22 @@ void tn_session_unregister(tn_session *s, int id) {
void tn_session_close(tn_session *s) {
assert(s);
- // TODO: unregister any patterns
- // TODO: free the message queue and close any return-paths
+
+ tn_node_unregsession(s->node, s);
+
+ // free the message queue and close any return-paths
+ mutex_lock(s->lock);
+ sesmsg *n, *c;
+ for(n=s->q_begin; n; n=c) {
+ c = n->next;
+ if(n->ret)
+ tn_reply_close(n->ret);
+ tn_tuple_unref(n->tup);
+ free(n);
+ }
+ s->q_begin = s->q_end = NULL;
+ mutex_unlock(s->lock);
+
if(s->ctx->close)
s->ctx->close(s, s->data);
atomic_dec(s->active);