summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-27 21:56:59 +0100
committerYorhel <git@yorhel.nl>2012-02-27 22:00:58 +0100
commitc25b9aade0cff48672ba0c362fca5546a3032e51 (patch)
treec0318cb5f26436a8bfb979b2f11ceae50f5a9587
parent9b524a9c112cb8381df3a7f6f367316924042ce6 (diff)
c: Implemented dispatcher for tuples and replies and for sessions
There's a lot of tricky stuff going on to make this thread-safe. Meh :(
-rw-r--r--c/tanja.c130
-rw-r--r--c/tanja.h5
2 files changed, 122 insertions, 13 deletions
diff --git a/c/tanja.c b/c/tanja.c
index 6527e65..46945ec 100644
--- a/c/tanja.c
+++ b/c/tanja.c
@@ -30,8 +30,11 @@
// Forward declarations to make things work
+typedef struct patternreg patternreg;
static inline void tn_session_ref(tn_session *);
static inline void tn_session_unref(tn_session *);
+static void tn_session_recv(tn_session *, tn_tuple *, patternreg *, tn_returnpath *);
+static void tn_session_reply(tn_session *, tn_tuple *, tn_returnpath *);
@@ -370,22 +373,31 @@ struct tn_returnpath {
union {
struct {
tn_session *s;
- tn_tuple_cb cb;
+ tn_reply_cb cb;
void *data;
} s;
} t;
};
+// TODO: _ref() the tuple or "pass the reference"?
void tn_reply(tn_returnpath *rp, tn_tuple *t) {
- rp++; t++;
- // TODO: Notify session/link
+ assert(rp);
+ assert(t);
+ if(rp->type == 0)
+ tn_session_reply(rp->t.s.s, t, rp);
+ else
+ {} // TODO: Notify link
}
void tn_reply_close(tn_returnpath *rp) {
+ assert(rp);
if(!atomic_dec(rp->ref)) {
- // TODO: notify session/link
+ if(rp->type == 0)
+ tn_session_reply(rp->t.s.s, NULL, rp);
+ else
+ {} // TODO: notify link
}
}
@@ -395,7 +407,7 @@ static inline void tn_returnpath_open(tn_returnpath *rp) {
}
-static inline tn_returnpath *tn_returnpath_create_ses(tn_session *ses, tn_tuple_cb cb, void *dat) {
+static inline tn_returnpath *tn_returnpath_create_ses(tn_session *ses, tn_reply_cb cb, void *dat) {
tn_returnpath *rp = malloc(sizeof(tn_returnpath));
rp->ref = 0; // No senders yet
rp->type = 0;
@@ -421,7 +433,7 @@ static inline void tn_returnpath_free(tn_returnpath *rp) {
// A registered pattern
-typedef struct {
+struct patternreg {
tn_tuple *pat; // Only accessed by the node, only valid if active=1
int active; // Atomic int, whether the pattern is still registered or not
int ref;
@@ -431,7 +443,7 @@ typedef struct {
// For sessions, links don't need any additional data
tn_tuple_cb cb;
void *data;
-} patternreg;
+};
static inline patternreg *patternreg_create_ses(tn_tuple *pat, int willreply, tn_session *ses, tn_tuple_cb cb, void *dat) {
@@ -568,7 +580,10 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
continue;
if(rp && r->willreply)
tn_returnpath_open(rp);
- // TODO: notify the respective sessions/links
+ if(r->type == 0)
+ tn_session_recv(r->recipient, tup, r, r->willreply ? rp : NULL);
+ else
+ {} // TODO: notify link
}
mutex_unlock(n->lock);
@@ -588,11 +603,14 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
// Session
+// A queued "message" for the session: either a regular tuple send() or a reply()
typedef struct sesmsg sesmsg;
struct sesmsg {
sesmsg *next;
tn_tuple *tup;
+ patternreg *reg; // If this is a regular send
tn_returnpath *ret; // If this is a regular send that wishes to receive a reply.
+ tn_returnpath *rep; // If this *is* a reply
};
struct tn_session {
@@ -639,7 +657,7 @@ tn_session *tn_session_create(tn_node *n, tn_session_context *ctx, void *data) {
}
-void tn_session_send(tn_session *s, tn_tuple *t, tn_tuple_cb cb, void *dat) {
+void tn_session_send(tn_session *s, tn_tuple *t, tn_reply_cb cb, void *dat) {
assert(s);
assert(t);
tn_node_send(s->node, t, cb ? tn_returnpath_create_ses(s, cb, dat) : NULL);
@@ -660,6 +678,93 @@ void tn_session_unregister(tn_session *s, int id) {
}
+// Assumes s->lock is held, and will unlock it.
+static inline void tn_session_appendmsg(tn_session *s, sesmsg *m) {
+ m->next = NULL;
+ int empty = !!s->q_begin;
+ if(empty)
+ s->q_begin = m;
+ else
+ s->q_end->next = m;
+ s->q_end = m;
+ mutex_unlock(s->lock);
+ if(empty)
+ s->ctx->dispatch(s, s->data);
+}
+
+// Called from the node, when we've received a tuple. Note that *ret has
+// already been open()'ed for this session.
+static void tn_session_recv(tn_session *s, tn_tuple *tup, patternreg *pr, tn_returnpath *ret) {
+ tn_tuple_ref(tup);
+ patternreg_ref(pr);
+ sesmsg *m = malloc(sizeof(sesmsg));
+ m->tup = tup;
+ m->reg = pr;
+ m->ret = ret;
+ m->rep = NULL;
+ mutex_lock(s->lock);
+ tn_session_appendmsg(s, m);
+}
+
+
+// Called when we've received a reply on one of our return-paths.
+static void tn_session_reply(tn_session *s, tn_tuple *tup, tn_returnpath *rep) {
+ // Session has been close()'ed, ignore this message
+ mutex_lock(s->lock);
+ if(!s->active) {
+ mutex_unlock(s->lock);
+ tn_tuple_unref(tup);
+ if(!tup)
+ tn_returnpath_free(rep);
+ return;
+ }
+ // Otherwise, queue it
+ sesmsg *m = malloc(sizeof(sesmsg));
+ m->tup = tup;
+ m->reg = NULL;
+ m->ret = NULL;
+ m->rep = rep;
+ tn_session_appendmsg(s, m);
+}
+
+
+static inline void tn_session_dispatch_one(tn_session *s, sesmsg *m) {
+ // Regular send()
+ if(m->reg) {
+ if(m->reg->active) // If the pattern hasn't been unregistered in the meantime
+ m->reg->cb(s, m->tup, m->ret, m->reg->data); // Pass our reference on m->tup to the application
+ else {
+ tn_tuple_unref(m->tup);
+ if(m->ret)
+ tn_reply_close(m->ret);
+ }
+ patternreg_unref(m->reg);
+
+ // Reply
+ } else {
+ m->rep->t.s.cb(s, m->tup, m->rep->t.s.data); // Again, pass our reference on m->tup
+ if(!m->tup)
+ tn_returnpath_free(m->rep);
+ }
+ free(m);
+}
+
+
+void tn_session_dispatch(tn_session *s) {
+ sesmsg *q;
+ do {
+ // Pop item from the start of the queue
+ mutex_lock(s->lock);
+ assert(s->active || !s->q_begin); // Queue must be (and stay) empty if the session has been closed
+ q = s->q_begin;
+ s->q_begin = q->next;
+ mutex_unlock(s->lock);
+ if(q)
+ tn_session_dispatch_one(s, q);
+ } while(q);
+}
+
+
void tn_session_close(tn_session *s) {
assert(s);
@@ -667,12 +772,16 @@ void tn_session_close(tn_session *s) {
// free the message queue and close any return-paths
mutex_lock(s->lock);
+ atomic_dec(s->active);
sesmsg *n, *c;
for(n=s->q_begin; n; n=c) {
c = n->next;
if(n->ret)
tn_reply_close(n->ret);
- tn_tuple_unref(n->tup);
+ if(n->rep && !n->tup) // close, free the return-path
+ tn_returnpath_free(n->rep);
+ if(n->tup)
+ tn_tuple_unref(n->tup);
free(n);
}
s->q_begin = s->q_end = NULL;
@@ -680,7 +789,6 @@ void tn_session_close(tn_session *s) {
if(s->ctx->close)
s->ctx->close(s, s->data);
- atomic_dec(s->active);
tn_session_unref(s);
}
diff --git a/c/tanja.h b/c/tanja.h
index 9f2f6fb..5191396 100644
--- a/c/tanja.h
+++ b/c/tanja.h
@@ -58,7 +58,8 @@ typedef struct {
void (*close)(tn_session *, void *);
} tn_session_context;
-typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, void *);
+typedef void (*tn_reply_cb)(tn_session *, tn_tuple *, void *);
+typedef void (*tn_tuple_cb)(tn_session *, tn_tuple *, tn_returnpath *, void *);
void tn_element_free(tn_element);
void tn_tuple_ref(tn_tuple *);
@@ -80,7 +81,7 @@ void tn_node_ref(tn_node *);
void tn_node_unref(tn_node *);
tn_session *tn_session_create(tn_node *, tn_session_context *, void *);
-void tn_session_send(tn_session *, tn_tuple *, tn_tuple_cb, void *dat);
+void tn_session_send(tn_session *, tn_tuple *, tn_reply_cb, void *dat);
int tn_session_register(tn_session *, tn_tuple *, int, tn_tuple_cb, void *);
void tn_session_unregister(tn_session *, int);
void tn_session_close(tn_session *);