summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-20 20:19:21 +0100
committerYorhel <git@yorhel.nl>2012-03-20 20:19:21 +0100
commita80c9431a934e1f6f0c89d91b51fbbdbb26de18e (patch)
tree312b3ea4e259db8c449cb0dc8f2c982427b674ee
parentce8cacb6442fb45dce1ffa01db0100bb3ce3f874 (diff)
link: Changed locking stuff a bit + implemented remote pattern sync
For pattern registration related stuff, the caller is now responsible for locking the node mutex. This is to allow the caller to perform some more actions in a single critical section, which the link requires. I also use l->active now to indicate that state data should not be modified/relied upon, to allow _set_error() to immediately unregister stuff with the node without requiring a lock.
-rw-r--r--tanja.c177
1 files changed, 131 insertions, 46 deletions
diff --git a/tanja.c b/tanja.c
index a58f512..8b13c93 100644
--- a/tanja.c
+++ b/tanja.c
@@ -54,6 +54,8 @@
#endif
+
+
// Forward declarations to make things work
typedef struct patternreg patternreg;
static void tn_session_recv(tn_session *, tn_tuple *, patternreg *, tn_returnpath *);
@@ -63,6 +65,21 @@ static void tn_link_node_unreg(tn_link *l, int id, patternreg *p);
+// Map of integers to patternreg pointers
+KHASH_INIT(pr, int, patternreg*, 1, kh_int_hash_func2, kh_int_hash_equal)
+
+// Map of integers to integers
+KHASH_INIT(ii, int, int, 1, kh_int_hash_func2, kh_int_hash_equal)
+
+// Set of tn_link pointers
+#define ln_hash_f(p) kh_int64_hash_func((khint64_t)p)
+#define ln_hash_eq(a,b) ((a) == (b))
+KHASH_INIT(ln, tn_link *, char, 0, ln_hash_f, ln_hash_eq)
+#undef ln_hash_f
+#undef ln_hash_eq
+
+
+
// Tiny buffer abstraction for internal use
// Note that these macros aren't very safe with respect to argument naming /
// evaluation. Only pass variables directly.
@@ -892,6 +909,18 @@ static inline patternreg *patternreg_create_ses(tn_tuple *pat, int willreply, tn
}
+static inline patternreg *patternreg_create_link(tn_tuple *pat, tn_link *l) {
+ patternreg *r = malloc(sizeof(patternreg));
+ r->ref = 1;
+ r->active = 1;
+ r->type = 1;
+ r->pat = pat;
+ r->willreply = 0;
+ r->recipient = l;
+ return r;
+}
+
+
static inline void patternreg_ref(patternreg *p) {
atomic_inc(p->ref);
}
@@ -910,15 +939,6 @@ static inline void patternreg_unref(patternreg *p) {
// Node
-KHASH_INIT(pr, int, patternreg*, 1, kh_int_hash_func2, kh_int_hash_equal)
-
-
-#define ln_hash_f(p) kh_int64_hash_func((khint64_t)p)
-#define ln_hash_eq(a,b) ((a) == (b))
-KHASH_INIT(ln, tn_link *, char, 0, ln_hash_f, ln_hash_eq)
-#undef ln_hash_f
-#undef ln_hash_eq
-
// TODO: Organize the patterns in a tree of hash tables to speed up matching?
// Note: Links access some of these fields (and the lock) directly.
@@ -958,10 +978,8 @@ void tn_node_unref(tn_node *n) {
}
-// TODO: should this do a _ref() on the pattern? Or is "passing the reference" more convenient?
+// The caller is responsible for obtaining the node lock
static int tn_node_register(tn_node *n, patternreg *r) {
- mutex_lock(n->lock);
-
// Get new ID
do {
if(++n->lastid <= 0)
@@ -980,7 +998,6 @@ static int tn_node_register(tn_node *n, patternreg *r) {
if(kh_exist(n->links, k))
tn_link_node_reg(kh_key(n->links, k), id, r);
- mutex_unlock(n->lock);
return id;
}
@@ -1000,26 +1017,23 @@ static inline void tn_node_dounreg(tn_node *n, patternreg *r, khiter_t k) {
patternreg_unref(r);
}
+// Likewise, caller is responsible for obtaining the node lock
static void tn_node_unregister(tn_node *n, int id) {
- mutex_lock(n->lock);
khiter_t k = kh_get(pr, n->regs, id);
if(k != kh_end(n->regs)) {
patternreg *r = kh_val(n->regs, k);
tn_node_dounreg(n, r, k);
}
- mutex_unlock(n->lock);
}
static void tn_node_unregsession(tn_node *n, tn_session *s) {
- mutex_lock(n->lock);
khiter_t k = kh_begin(n->regs);
for(; k != kh_end(n->regs); ++k) {
patternreg *r = kh_exist(n->regs, k) ? kh_val(n->regs, k) : NULL;
if(r && r->type == 0 && r->recipient == s)
tn_node_dounreg(n, r, k); // khash allows deletion from within a loop, yay!
}
- mutex_unlock(n->lock);
}
@@ -1127,13 +1141,18 @@ int tn_session_register(tn_session *s, tn_tuple *pat, int willreply, tn_tuple_cb
assert(s);
assert(pat);
assert(cb);
- return tn_node_register(s->node, patternreg_create_ses(pat, willreply, s, cb, dat));
+ mutex_lock(s->node->lock);
+ int id = tn_node_register(s->node, patternreg_create_ses(pat, willreply, s, cb, dat));
+ mutex_unlock(s->node->lock);
+ return id;
}
void tn_session_unregister(tn_session *s, int id) {
assert(s);
+ mutex_lock(s->node->lock);
tn_node_unregister(s->node, id);
+ mutex_unlock(s->node->lock);
}
@@ -1234,7 +1253,9 @@ int tn_session_dispatch(tn_session *s) {
void tn_session_close(tn_session *s) {
assert(s);
+ mutex_lock(s->node->lock);
tn_node_unregsession(s->node, s);
+ mutex_unlock(s->node->lock);
// free the message queue and close any return-paths
mutex_lock(s->lock);
@@ -1280,6 +1301,8 @@ struct tn_link {
unsigned handshaked : 1;
unsigned sync : 1;
unsigned active : 1;
+ int catchall; // registration ID of our catch-all tuple (0 if not registered)
+ khash_t(ii) *regs; // map of remote pattern IDs to local IDs
int ref;
};
@@ -1293,13 +1316,6 @@ void tn_link_unref(tn_link *l) {
if(!atomic_dec(l->ref)) {
assert(!l->active);
- // Remove link from the notification list
- mutex_lock(l->node->lock);
- khiter_t k = kh_get(ln, l->node->links, l);
- if(k != kh_end(l->node->links))
- kh_del(ln, l->node->links, k);
- mutex_unlock(l->node->lock);
-
// And free
lbuf_free(l->rbuf);
if(l->errmsg)
@@ -1330,32 +1346,53 @@ tn_link *tn_link_create(tn_node *n, tn_link_context *ctx, void *data) {
lbuf_init(l->rbuf);
lbuf_init(l->wbuf);
lbuf_init(l->tbuf);
+ l->catchall = 0;
+ l->regs = kh_init(ii);
return l;
}
-// Assumes the lock is held.
-static void tn_link_set_error_lock(tn_link *l, int code, const char *msg) {
- if(!l->active)
+void tn_link_set_error(tn_link *l, int code, const char *msg) {
+ // Set active to 0 inside a lock. After unlocking, this guarantees nobody
+ // else will touch the state data protected by the active flag.
+ mutex_lock(l->lock);
+ int a = l->active;
+ l->active = 0;
+ mutex_unlock(l->lock);
+ if(!a)
return;
+
+ mutex_lock(l->node->lock);
+
+ // Unregister any patterns
+ khiter_t k = kh_begin(l->regs);
+ for(; k!=kh_end(l->regs); k++)
+ if(kh_exist(l->regs, k))
+ tn_node_unregister(l->node, kh_val(l->regs, k));
+
+ if(l->catchall)
+ tn_node_unregister(l->node, l->catchall);
+
+ // Remove link from the notification list
+ k = kh_get(ln, l->node->links, l);
+ if(k != kh_end(l->node->links))
+ kh_del(ln, l->node->links, k);
+
+ mutex_unlock(l->node->lock);
+
+ kh_destroy(ii, l->regs);
+ l->regs = NULL;
+ l->catchall = 0;
lbuf_free(l->wbuf);
lbuf_init(l->wbuf);
lbuf_free(l->tbuf);
lbuf_init(l->tbuf);
l->errcode = code;
l->errmsg = msg ? strdup(msg) : NULL;
- l->active = 0;
l->ctx->dispatch(l, l->data);
}
-void tn_link_set_error(tn_link *l, int code, const char *msg) {
- mutex_lock(l->lock);
- tn_link_set_error_lock(l, code, msg);
- mutex_unlock(l->lock);
-}
-
-
// Assumes that the lock is held. Should be called to indicate that something
// has been written to the write buffer.
static inline void tn_link_write(tn_link *l) {
@@ -1384,7 +1421,7 @@ int tn_link_startwrite(tn_link *l, char **buf) {
// If we have nothing to write or if there's already a write busy in the
// background, just return 0.
- if(!l->wbuf.len || l->tbuf.len) {
+ if(!l->active || !l->wbuf.len || l->tbuf.len) {
mutex_unlock(l->lock);
return 0;
}
@@ -1451,6 +1488,7 @@ void tn_link_set_sync(tn_link *l, int sync) {
void tn_link_start(tn_link *l) {
mutex_lock(l->lock);
+ assert(l->active && !l->handshaked);
lbuf_append(l->wbuf, "ver,1.0 seri,json sero,json\n", 28);
tn_link_write(l);
mutex_unlock(l->lock);
@@ -1536,14 +1574,28 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len)
inc(1);
}
- mutex_lock(l->lock);
if(ok != (1|2|4)) {
int c = !(ok&1) ? 1 : !(ok&2)? 2 : 3;
- tn_link_set_error_lock(l, -c,
+ tn_link_set_error(l, -c,
c==1 ? "No or invalid protocol version" : c==2 ? "No common input format" : "No common output format");
} else
l->handshaked = 1;
- mutex_unlock(l->lock);
+
+ // If we want sync with their pattern list, request this. Otherwise, register the catch-all pattern with the node.
+ if(l->sync) {
+ mutex_lock(l->lock);
+ if(l->active) {
+ lbuf_append(l->wbuf, "[1,true]\n", 9); // patternsync
+ tn_link_write(l);
+ }
+ mutex_unlock(l->lock);
+ } else {
+ mutex_lock(l->node->lock);
+ mutex_lock(l->lock);
+ l->catchall = tn_node_register(l->node, patternreg_create_link(tn_tuple_new(""), l));
+ mutex_unlock(l->lock);
+ mutex_unlock(l->node->lock);
+ }
}
@@ -1654,38 +1706,71 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t
case 1: // patternsync 1/0
mutex_lock(l->node->lock);
+ mutex_lock(l->lock);
khiter_t k = kh_get(ln, l->node->links, l);
- if(arg) {
+ if(l->active && arg) {
int ret;
// Add this link to the node for notifications
if(k == kh_end(l->node->links))
kh_put(ln, l->node->links, l, &ret);
// Send the current pattern list
- mutex_lock(l->lock);
for(k = kh_begin(l->node->regs); k!=kh_end(l->node->regs); ++k)
if(kh_exist(l->node->regs, k))
tn_link_write_reg(l, kh_key(l->node->regs, k), kh_val(l->node->regs, k)->pat);
// Send a regdone message
lbuf_append(l->wbuf, "[3]\n", 4);
tn_link_write(l);
- mutex_unlock(l->lock);
- } else {
+ } else if(l->active) {
// Remove this link from the notification list
if(k != kh_end(l->node->links))
kh_del(ln, l->node->links, k);
}
+ mutex_unlock(l->lock);
mutex_unlock(l->node->lock);
break;
case 2: // register pid pat
- // TODO
+ if(!l->sync) {
+ tn_tuple_unref(tup);
+ break;
+ }
+ mutex_lock(l->node->lock);
+ mutex_lock(l->lock);
+ if(l->active) {
+ int id = tn_node_register(l->node, patternreg_create_link(tup, l));
+ khiter_t k = kh_get(ii, l->regs, arg);
+ if(k != kh_end(l->regs))
+ tn_node_unregister(l->node, kh_val(l->regs, k));
+ else {
+ int ret;
+ k = kh_put(ii, l->regs, id, &ret);
+ }
+ kh_val(l->regs, k) = id;
+ }
+ mutex_unlock(l->lock);
+ mutex_unlock(l->node->lock);
break;
+
case 3: // regdone
// TODO
break;
+
case 4: // unregister pid
- // TODO
+ if(!l->sync)
+ break;
+ mutex_lock(l->node->lock);
+ mutex_lock(l->lock);
+ if(l->active) {
+ khiter_t k = kh_get(ii, l->regs, arg);
+ if(k != kh_end(l->regs)) {
+ tn_node_unregister(l->node, kh_val(l->regs, k));
+ kh_del(ii, l->regs, k);
+ }
+ }
+ mutex_unlock(l->lock);
+ mutex_unlock(l->node->lock);
break;
+
case 5: // tuple tid tuple
// TODO: create a return path if tid > 0
tn_node_send(l->node, tup, NULL);