diff options
author | Yorhel <git@yorhel.nl> | 2012-03-20 16:38:50 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-20 16:38:50 +0100 |
commit | ce8cacb6442fb45dce1ffa01db0100bb3ce3f874 (patch) | |
tree | 2aedfd7f820eb3ce058f31b77993409cc583f4e2 | |
parent | 575df902fd42508ded2afa3276f22fd83441b6cd (diff) |
link: Don't always hold the lock in the read handlers
This is true to the "Only hold a mutex for the shortest time possible"
philosophy, and fixes a possible deadlock with regard to ordering of the
node and link mutexes.
This also means that the read buffer and node-notification-list are only
freed/updated when the last reference to the link object has dropped,
since that is the only time when it is guaranteed that the link lock is
not held.
-rw-r--r-- | tanja.c | 66 |
1 files changed, 31 insertions, 35 deletions
@@ -1274,7 +1274,7 @@ struct tn_link { int errcode; char *errmsg; void *data; - lbuf rbuf; + lbuf rbuf; // not protected by the lock lbuf wbuf; lbuf tbuf; // (temporary) buffer that we gave to the context unsigned handshaked : 1; @@ -1292,6 +1292,16 @@ void tn_link_ref(tn_link *l) { void tn_link_unref(tn_link *l) { if(!atomic_dec(l->ref)) { assert(!l->active); + + // Remove link from the notification list + mutex_lock(l->node->lock); + khiter_t k = kh_get(ln, l->node->links, l); + if(k != kh_end(l->node->links)) + kh_del(ln, l->node->links, k); + mutex_unlock(l->node->lock); + + // And free + lbuf_free(l->rbuf); if(l->errmsg) free(l->errmsg); tn_node_unref(l->node); @@ -1330,18 +1340,8 @@ static void tn_link_set_error_lock(tn_link *l, int code, const char *msg) { return; lbuf_free(l->wbuf); lbuf_init(l->wbuf); - lbuf_free(l->rbuf); - lbuf_init(l->rbuf); lbuf_free(l->tbuf); lbuf_init(l->tbuf); - - // Remove link from the notification list (TODO: fix possible deadlock) - mutex_lock(l->node->lock); - khiter_t k = kh_get(ln, l->node->links, l); - if(k != kh_end(l->node->links)) - kh_del(ln, l->node->links, k); - mutex_unlock(l->node->lock); - l->errcode = code; l->errmsg = msg ? strdup(msg) : NULL; l->active = 0; @@ -1536,12 +1536,14 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) inc(1); } + mutex_lock(l->lock); if(ok != (1|2|4)) { int c = !(ok&1) ? 1 : !(ok&2)? 2 : 3; tn_link_set_error_lock(l, -c, c==1 ? "No or invalid protocol version" : c==2 ? "No common input format" : "No common output format"); } else l->handshaked = 1; + mutex_unlock(l->lock); } @@ -1549,18 +1551,18 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *arg, tn_tuple **tup) { if(len < 2 || *buf != '[') { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } inc(1); incs(); if(!len) { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } // Message type if(*buf < '1' || *buf > '7') { - tn_link_set_error_lock(l, -5, "Invalid message type."); + tn_link_set_error(l, -5, "Invalid message type."); return 0; } int type = *buf-'0'; @@ -1570,7 +1572,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a if(type == 1) { // Comma if(*buf != ',') { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } inc(1); @@ -1582,7 +1584,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a *arg = 0; inc(5); } else { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } incs(); @@ -1591,7 +1593,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a if(type == 2 || type == 4 || type == 5 || type == 6 || type == 7) { // Comma if(*buf != ',') { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } inc(1); @@ -1601,7 +1603,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a while(len > 0 && *buf >= '0' && *buf <= '9') { int n = (*arg)*10 + (*buf-'0'); if(n < *arg) { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } *arg = n; @@ -1613,7 +1615,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a if(type == 2 || type == 5 || type == 6) { // Comma if(*buf != ',') { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } inc(1); @@ -1622,7 +1624,7 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a int rd = 0; *tup = tn_json_parse(buf, len, &rd); if(!*tup) { - tn_link_set_error_lock(l, -6, "Invalid JSON tuple."); + tn_link_set_error(l, -6, "Invalid JSON tuple."); return 0; } buf += rd; @@ -1631,13 +1633,13 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a } // End-of-message if(!len || *buf != ']') { - tn_link_set_error_lock(l, -4, "Invalid message format."); + tn_link_set_error(l, -4, "Invalid message format."); return 0; } inc(1); incs(); if(len) { - tn_link_set_error_lock(l, -7, "Excessive data after message."); + tn_link_set_error(l, -7, "Excessive data after message."); return 0; } return type; @@ -1647,9 +1649,6 @@ static int tn_link_parsemessage(tn_link *l, const char *buf, int len, int32_t *a #undef incs -// TODO: There's a potential deadlock here. This function has a link lock and -// wishes to obtain a node lock, whereas the opposite order is common in the -// rest of the code. static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *tup) { switch(type) { @@ -1662,12 +1661,14 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t if(k == kh_end(l->node->links)) kh_put(ln, l->node->links, l, &ret); // Send the current pattern list + mutex_lock(l->lock); for(k = kh_begin(l->node->regs); k!=kh_end(l->node->regs); ++k) if(kh_exist(l->node->regs, k)) tn_link_write_reg(l, kh_key(l->node->regs, k), kh_val(l->node->regs, k)->pat); // Send a regdone message lbuf_append(l->wbuf, "[3]\n", 4); tn_link_write(l); + mutex_unlock(l->lock); } else { // Remove this link from the notification list if(k != kh_end(l->node->links)) @@ -1702,7 +1703,7 @@ static void tn_link_handlemessage(tn_link *l, int type, int32_t arg, tn_tuple *t // Tries to read a single message from the buffer, returns the number of bytes -// read (0 if there is no complete message). Called while the lock is held. +// read (0 if there is no complete message). static int tn_link_handleread(tn_link *l, const char *buf, int len) { // All messages end with a newline (in the current protocol), so that's // easy to detect. @@ -1726,19 +1727,17 @@ static int tn_link_handleread(tn_link *l, const char *buf, int len) { } +// Calls to this function must be serialized. void tn_link_read(tn_link *l, const char *buf, int len) { - mutex_lock(l->lock); - if(!l->active) { - mutex_unlock(l->lock); + if(!l->active) return; - } // Don't copy things to the read buffer if we have a full message int n=1; while(!l->rbuf.len && len > 0 && n) { n = tn_link_handleread(l, buf, len); if(!l->active) - goto stop; + return; buf += n; len -= n; } @@ -1757,14 +1756,11 @@ void tn_link_read(tn_link *l, const char *buf, int len) { while(l->rbuf.len-r > 0 && n && l->active) { n = tn_link_handleread(l, l->rbuf.dat+r, l->rbuf.len-r); if(!l->active) - goto stop; + return; r += n; } if(r > 0) lbuf_shift(l->rbuf, r); - -stop: - mutex_unlock(l->lock); } |