diff options
author | Yorhel <git@yorhel.nl> | 2012-03-18 13:17:00 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-18 13:17:00 +0100 |
commit | 2d402beccc50695bae5a2c0bca5f084fcfe3981b (patch) | |
tree | 6339a2c00e9a69d4c3bb2dbbf0dde797ecf52e01 | |
parent | fd598fcde7aeaedb583c8344065e71843f1d0597 (diff) |
Added simple (manual) link tester + fix race in tanja_pthread.c
These should definitely help with further implementing and testing the
link functionality.
-rw-r--r-- | tanja_pthread.c | 41 | ||||
-rw-r--r-- | test/Makefile | 5 | ||||
-rw-r--r-- | test/link.c | 105 | ||||
-rw-r--r-- | test/pthread.c | 1 |
4 files changed, 139 insertions, 13 deletions
diff --git a/tanja_pthread.c b/tanja_pthread.c index fd82520..8ae8346 100644 --- a/tanja_pthread.c +++ b/tanja_pthread.c @@ -24,13 +24,16 @@ // This is a very crude one-thread-per-session "event" model. -#include <pthread.h> -#include <signal.h> -#include <unistd.h> #include <stdlib.h> #include <string.h> #include <assert.h> #include <errno.h> +#include <unistd.h> + +#include <sys/select.h> +#include <pthread.h> +#include <signal.h> + #include "tanja.h" @@ -157,10 +160,20 @@ static void fdlclose(tn_link *l, void *_dat) { static int fdlwrite(tn_link *l, char *buf, int len, char **err, void *_dat) { - if(l) {} + assert(l != NULL); pt_fdldat *dat = _dat; - // TODO: this has a race condition on signal arrival, use pselect() to block-wait instead. - int n = write(dat->fdout, buf, len); + + // Current signal mask, but without SIGUSR2 + sigset_t mask; + pthread_sigmask(SIG_BLOCK, NULL, &mask); + sigdelset(&mask, SIGUSR2); + + fd_set fds; + FD_ZERO(&fds); + FD_SET(dat->fdout, &fds); + int n = pselect(dat->fdout+1, NULL, &fds, NULL, NULL, &mask); + if(n > 0) + n = write(dat->fdout, buf, len); if(n > 0) return n; if(n < 0 && errno == EINTR) @@ -175,16 +188,22 @@ static void *fdlthread(void *_dat) { pt_fdldat *dat = _dat; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED); + sigset_t mask, oldmask; + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + pthread_sigmask(SIG_BLOCK, &mask, &oldmask); while(dat->l) { tn_link_dispatch(dat->l); if(dat->l) { - // TODO: same race condition as in fdlwrite() - int n = read(dat->fdin, buf, READBUFSIZE); - // Note: Don't check for EAGAIN or EWOULDBLOCK, the fd should not - // be non-blocking so that is an error. + fd_set fds; + FD_ZERO(&fds); + FD_SET(dat->fdin, &fds); + int n = pselect(dat->fdin+1, &fds, NULL, NULL, NULL, &oldmask); + if(n > 0) + n = read(dat->fdin, buf, READBUFSIZE); if(n < 0 && errno == EINTR) continue; - if(n <= 0) + if(n <= 0) // pselect() shouldn't return 0, so that must come from the read() tn_link_read_error(dat->l, n == 0 ? 1 : errno, n == 0 ? "Remote disconnected." : strerror(errno)); else tn_link_read(dat->l, buf, n); diff --git a/test/Makefile b/test/Makefile index 6f61deb..39160fc 100644 --- a/test/Makefile +++ b/test/Makefile @@ -8,7 +8,7 @@ COMP=$(CC) $(CFLAGS) $(LDFLAGS) ../tanja.c DEP=../tanja.c ../tanja.h ../khash.h THREAD=-DTANJA_THREADSAFE -lpthread -all: test +all: json pthread link json: $(DEP) json.c $(COMP) json.c -o json @@ -16,6 +16,9 @@ json: $(DEP) json.c pthread: $(DEP) ../tanja_pthread.c pthread.c $(COMP) $(THREAD) ../tanja_pthread.c pthread.c -o pthread +link: $(DEP) ../tanja_pthread.c link.c + $(COMP) $(THREAD) ../tanja_pthread.c link.c -o link + test: json pthread prove ./json ./pthread diff --git a/test/link.c b/test/link.c new file mode 100644 index 0000000..ecf1db6 --- /dev/null +++ b/test/link.c @@ -0,0 +1,105 @@ +/* + Copyright (c) 2012 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <pthread.h> +#include <assert.h> +#include <unistd.h> +#include "tanja.h" + + +extern tn_session *tn_session_pthread(tn_node *, pthread_t *); +extern tn_link *tn_link_pthread_fd(tn_node *, int, int, int, pthread_t *); + +static tn_link *lnk; +static tn_session *ses; + + +// When the tuple ["quit",..] is received, the link and the session will be +// closed. Hopefully resulting in a graceful shutdown. +static void tup_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { + assert(s == ses); + assert(p == NULL); + assert(d == NULL); + fprintf(stderr, "Quit tuple received, closing\n"); + if(lnk) + tn_link_close(lnk); // TODO: close() from a different context is currently very bugyy. + tn_session_close(s); + tn_tuple_unref(tup); +} + + +// Print out any received tuples for debugging. Whether this is called before +// tup_quit() is not defined. +static void tup_all(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) { + assert(s == ses); + assert(p == NULL); + assert(d == NULL); + char *t = tn_json_fmt(tup); + fprintf(stderr, "Tuple: %s\n", t); + free(t); + tn_tuple_unref(tup); +} + + +static void lnk_error(tn_link *l, int code, char *msg) { + assert(l == lnk); + fprintf(stderr, "Link error #%d: %s\n", code, msg?msg:"(empty)"); + // Indicate that the other session shouldn't _close() the link anymore. + // TODO: It's probably better to make a tn_link refcounted and allow + // multiple _close() calls. + lnk = NULL; + // Signal to the other session that it can close. + // (tn_session_send() may be called from any context) + tn_session_send(ses, tn_tuple_new("s", strdup("quit")), NULL, NULL); +} + + +int main() { + tn_node *n = tn_node_create(); + + // Create a session for debugging + pthread_t tha; + ses = tn_session_pthread(n, &tha); + tn_session_register(ses, tn_tuple_new(""), 0, tup_all, NULL); + tn_session_register(ses, tn_tuple_new("s", strdup("quit")), 0, tup_quit, NULL); + // TODO: also register patterns with a handler that replies to a return path + // and one that sends out a tuple itself. + + // Create a link with STDIO + pthread_t thb; + lnk = tn_link_pthread_fd(n, 1, STDIN_FILENO, STDOUT_FILENO, &thb); + tn_link_on_error(lnk, lnk_error); + tn_link_start(lnk); + + // Close + tn_node_unref(n); + pthread_join(tha, NULL); + pthread_join(thb, NULL); + return 0; +} + +// vim:noet:sw=4:ts=4 diff --git a/test/pthread.c b/test/pthread.c index 02ee4c7..8c50e6c 100644 --- a/test/pthread.c +++ b/test/pthread.c @@ -29,7 +29,6 @@ extern tn_session *tn_session_pthread(tn_node *, pthread_t *); -extern tn_link *tn_link_pthread_fd(tn_node *, int, int, int, pthread_t *); #define t_assert(n) printf("%sok %d - " __FILE__ ":%d: " #n "\n", (n)?"":"not ", ++test_num, __LINE__) |