summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-20 16:38:50 +0100
committerYorhel <git@yorhel.nl>2012-03-20 16:38:50 +0100
commitce8cacb6442fb45dce1ffa01db0100bb3ce3f874 (patch)
tree2aedfd7f820eb3ce058f31b77993409cc583f4e2
parent575df902fd42508ded2afa3276f22fd83441b6cd (diff)
link: Don't always hold the lock in the read handlers
This is true to the "Only hold a mutex for the shortest time possible" philosophy, and fixes a possible deadlock with regard to ordering of the node and link mutexes. This also means that the read buffer and node-notification-list are only freed/updated when the last reference to the link object has dropped, since that is the only time when it is guaranteed that the link lock is not held.
-rw-r--r--tanja.c66
1 files changed, 31 insertions, 35 deletions
diff --git a/tanja.c b/tanja.c
index 2eeb6d4..a58f512 100644
--- a/tanja.c
+++ b/tanja.c
@@ -1274,7 +1274,7 @@ struct tn_link {
int errcode;
char *errmsg;
void *data;
- lbuf rbuf;
+ lbuf rbuf; // not protected by the lock
lbuf wbuf;
lbuf tbuf; // (temporary) buffer that we gave to the context
unsigned handshaked : 1;
@@ -1292,6 +1292,16 @@ void tn_link_ref(tn_link *l) {
void tn_link_unref(tn_link *l) {
if(!atomic_dec(l->ref)) {
assert(!l->active);
+
+ // Remove link from the notification list
+ mutex_lock(l->node->lock);
+ khiter_t k = kh_get(ln, l->node->links, l);
+ if(k != kh_end(l->node->links))
+ kh_del(ln, l->node->links, k);
+ mutex_unlock(l->node->lock);
+
+ // And free
+ lbuf_free(l->rbuf);
if(l->errmsg)
free(l->errmsg);
tn_node_unref(l->node);
@@ -1330,18 +1340,8 @@ static void tn_link_set_error_lock(tn_link *l, int code, const char *msg) {
return;
lbuf_free(l->wbuf);
lbuf_init(l->wbuf);
- lbuf_free(l->rbuf);
- lbuf_init(l->rbuf);
lbuf_free(l->tbuf);
lbuf_init(l->tbuf);
-
- // Remove link from the notification list (TODO: fix possible deadlock)
- mutex_lock(l->node->lock);
- khiter_t k = kh_get(ln, l->node->links, l);
- if(k != kh_end(l->node->links))
- kh_del(ln, l->node->links, k);
- mutex_unlock(l->node->lock);
-
l->errcode = code;
l->errmsg = msg ? strdup(msg) : NULL;
l->active = 0;
@@ -1536,12 +1536,14 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len)
inc(1);
}
+ mutex_lock(l->lock);
if(ok != (1|2|4)) {
int c = !(ok&1) ? 1 : !(ok&2)? 2 : 3;
tn_link_set_error_lock(l, -c,
c==1 ? "No or invalid protocol version" : c==2 ? "No common input format" : "No common output format");
} else
l->handshaked = 1;
+ mutex_unlock(l->lock);
}
@@ -1549,18 +1551,18 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len)
static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *arg, tn_tuple **tup) {
if(len < 2 || *buf != '[') {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
inc(1);
incs();
if(!len) {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
// Message type
if(*buf < '1' || *buf > '7') {
- tn_link_set_error_lock(l, -5, "Invalid message type.");
+ tn_link_set_error(l, -5, "Invalid message type.");
return 0;
}
int type = *buf-'0';
@@ -1570,7 +1572,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
if(type == 1) {
// Comma
if(*buf != ',') {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
inc(1);
@@ -1582,7 +1584,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
*arg = 0;
inc(5);
} else {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
incs();
@@ -1591,7 +1593,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
if(type == 2 || type == 4 || type == 5 || type == 6 || type == 7) {
// Comma
if(*buf != ',') {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
inc(1);
@@ -1601,7 +1603,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
while(len > 0 && *buf >= '0' && *buf <= '9') {
int n = (*arg)*10 + (*buf-'0');
if(n < *arg) {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
*arg = n;
@@ -1613,7 +1615,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
if(type == 2 || type == 5 || type == 6) {
// Comma
if(*buf != ',') {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
inc(1);
@@ -1622,7 +1624,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
int rd = 0;
*tup = tn_json_parse(buf, len, &rd);
if(!*tup) {
- tn_link_set_error_lock(l, -6, "Invalid JSON tuple.");
+ tn_link_set_error(l, -6, "Invalid JSON tuple.");
return 0;
}
buf += rd;
@@ -1631,13 +1633,13 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
}
// End-of-message
if(!len || *buf != ']') {
- tn_link_set_error_lock(l, -4, "Invalid message format.");
+ tn_link_set_error(l, -4, "Invalid message format.");
return 0;
}
inc(1);
incs();
if(len) {
- tn_link_set_error_lock(l, -7, "Excessive data after message.");
+ tn_link_set_error(l, -7, "Excessive data after message.");
return 0;
}
return type;
@@ -1647,9 +1649,6 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a
#undef incs
-// TODO: There's a potential deadlock here. This function has a link lock and
-// wishes to obtain a node lock, whereas the opposite order is common in the
-// rest of the code.
static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *tup) {
switch(type) {
@@ -1662,12 +1661,14 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t
if(k == kh_end(l->node->links))
kh_put(ln, l->node->links, l, &ret);
// Send the current pattern list
+ mutex_lock(l->lock);
for(k = kh_begin(l->node->regs); k!=kh_end(l->node->regs); ++k)
if(kh_exist(l->node->regs, k))
tn_link_write_reg(l, kh_key(l->node->regs, k), kh_val(l->node->regs, k)->pat);
// Send a regdone message
lbuf_append(l->wbuf, "[3]\n", 4);
tn_link_write(l);
+ mutex_unlock(l->lock);
} else {
// Remove this link from the notification list
if(k != kh_end(l->node->links))
@@ -1702,7 +1703,7 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t
// Tries to read a single message from the buffer, returns the number of bytes
-// read (0 if there is no complete message). Called while the lock is held.
+// read (0 if there is no complete message).
static int tn_link_handleread(tn_link *l, const char *buf, int len) {
// All messages end with a newline (in the current protocol), so that's
// easy to detect.
@@ -1726,19 +1727,17 @@ static int tn_link_handleread(tn_link *l, const char *buf, int len) {
}
+// Calls to this function must be serialized.
void tn_link_read(tn_link *l, const char *buf, int len) {
- mutex_lock(l->lock);
- if(!l->active) {
- mutex_unlock(l->lock);
+ if(!l->active)
return;
- }
// Don't copy things to the read buffer if we have a full message
int n=1;
while(!l->rbuf.len && len > 0 && n) {
n = tn_link_handleread(l, buf, len);
if(!l->active)
- goto stop;
+ return;
buf += n;
len -= n;
}
@@ -1757,14 +1756,11 @@ void tn_link_read(tn_link *l, const char *buf, int len) {
while(l->rbuf.len-r > 0 && n && l->active) {
n = tn_link_handleread(l, l->rbuf.dat+r, l->rbuf.len-r);
if(!l->active)
- goto stop;
+ return;
r += n;
}
if(r > 0)
lbuf_shift(l->rbuf, r);
-
-stop:
- mutex_unlock(l->lock);
}