summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-18 13:17:00 +0100
committerYorhel <git@yorhel.nl>2012-03-18 13:17:00 +0100
commit2d402beccc50695bae5a2c0bca5f084fcfe3981b (patch)
tree6339a2c00e9a69d4c3bb2dbbf0dde797ecf52e01
parentfd598fcde7aeaedb583c8344065e71843f1d0597 (diff)
Added simple (manual) link tester + fix race in tanja_pthread.c
These should definitely help with further implementing and testing the link functionality.
-rw-r--r--tanja_pthread.c41
-rw-r--r--test/Makefile5
-rw-r--r--test/link.c105
-rw-r--r--test/pthread.c1
4 files changed, 139 insertions, 13 deletions
diff --git a/tanja_pthread.c b/tanja_pthread.c
index fd82520..8ae8346 100644
--- a/tanja_pthread.c
+++ b/tanja_pthread.c
@@ -24,13 +24,16 @@
// This is a very crude one-thread-per-session "event" model.
-#include <pthread.h>
-#include <signal.h>
-#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
+#include <unistd.h>
+
+#include <sys/select.h>
+#include <pthread.h>
+#include <signal.h>
+
#include "tanja.h"
@@ -157,10 +160,20 @@ static void fdlclose(tn_link *l, void *_dat) {
static int fdlwrite(tn_link *l, char *buf, int len, char **err, void *_dat) {
- if(l) {}
+ assert(l != NULL);
pt_fdldat *dat = _dat;
- // TODO: this has a race condition on signal arrival, use pselect() to block-wait instead.
- int n = write(dat->fdout, buf, len);
+
+ // Current signal mask, but without SIGUSR2
+ sigset_t mask;
+ pthread_sigmask(SIG_BLOCK, NULL, &mask);
+ sigdelset(&mask, SIGUSR2);
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(dat->fdout, &fds);
+ int n = pselect(dat->fdout+1, NULL, &fds, NULL, NULL, &mask);
+ if(n > 0)
+ n = write(dat->fdout, buf, len);
if(n > 0)
return n;
if(n < 0 && errno == EINTR)
@@ -175,16 +188,22 @@ static void *fdlthread(void *_dat) {
pt_fdldat *dat = _dat;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED);
+ sigset_t mask, oldmask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGUSR2);
+ pthread_sigmask(SIG_BLOCK, &mask, &oldmask);
while(dat->l) {
tn_link_dispatch(dat->l);
if(dat->l) {
- // TODO: same race condition as in fdlwrite()
- int n = read(dat->fdin, buf, READBUFSIZE);
- // Note: Don't check for EAGAIN or EWOULDBLOCK, the fd should not
- // be non-blocking so that is an error.
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(dat->fdin, &fds);
+ int n = pselect(dat->fdin+1, &fds, NULL, NULL, NULL, &oldmask);
+ if(n > 0)
+ n = read(dat->fdin, buf, READBUFSIZE);
if(n < 0 && errno == EINTR)
continue;
- if(n <= 0)
+ if(n <= 0) // pselect() shouldn't return 0, so that must come from the read()
tn_link_read_error(dat->l, n == 0 ? 1 : errno, n == 0 ? "Remote disconnected." : strerror(errno));
else
tn_link_read(dat->l, buf, n);
diff --git a/test/Makefile b/test/Makefile
index 6f61deb..39160fc 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -8,7 +8,7 @@ COMP=$(CC) $(CFLAGS) $(LDFLAGS) ../tanja.c
DEP=../tanja.c ../tanja.h ../khash.h
THREAD=-DTANJA_THREADSAFE -lpthread
-all: test
+all: json pthread link
json: $(DEP) json.c
$(COMP) json.c -o json
@@ -16,6 +16,9 @@ json: $(DEP) json.c
pthread: $(DEP) ../tanja_pthread.c pthread.c
$(COMP) $(THREAD) ../tanja_pthread.c pthread.c -o pthread
+link: $(DEP) ../tanja_pthread.c link.c
+ $(COMP) $(THREAD) ../tanja_pthread.c link.c -o link
+
test: json pthread
prove ./json ./pthread
diff --git a/test/link.c b/test/link.c
new file mode 100644
index 0000000..ecf1db6
--- /dev/null
+++ b/test/link.c
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2012 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <assert.h>
+#include <unistd.h>
+#include "tanja.h"
+
+
+extern tn_session *tn_session_pthread(tn_node *, pthread_t *);
+extern tn_link *tn_link_pthread_fd(tn_node *, int, int, int, pthread_t *);
+
+static tn_link *lnk;
+static tn_session *ses;
+
+
+// When the tuple ["quit",..] is received, the link and the session will be
+// closed. Hopefully resulting in a graceful shutdown.
+static void tup_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
+ assert(s == ses);
+ assert(p == NULL);
+ assert(d == NULL);
+ fprintf(stderr, "Quit tuple received, closing\n");
+ if(lnk)
+ tn_link_close(lnk); // TODO: close() from a different context is currently very bugyy.
+ tn_session_close(s);
+ tn_tuple_unref(tup);
+}
+
+
+// Print out any received tuples for debugging. Whether this is called before
+// tup_quit() is not defined.
+static void tup_all(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
+ assert(s == ses);
+ assert(p == NULL);
+ assert(d == NULL);
+ char *t = tn_json_fmt(tup);
+ fprintf(stderr, "Tuple: %s\n", t);
+ free(t);
+ tn_tuple_unref(tup);
+}
+
+
+static void lnk_error(tn_link *l, int code, char *msg) {
+ assert(l == lnk);
+ fprintf(stderr, "Link error #%d: %s\n", code, msg?msg:"(empty)");
+ // Indicate that the other session shouldn't _close() the link anymore.
+ // TODO: It's probably better to make a tn_link refcounted and allow
+ // multiple _close() calls.
+ lnk = NULL;
+ // Signal to the other session that it can close.
+ // (tn_session_send() may be called from any context)
+ tn_session_send(ses, tn_tuple_new("s", strdup("quit")), NULL, NULL);
+}
+
+
+int main() {
+ tn_node *n = tn_node_create();
+
+ // Create a session for debugging
+ pthread_t tha;
+ ses = tn_session_pthread(n, &tha);
+ tn_session_register(ses, tn_tuple_new(""), 0, tup_all, NULL);
+ tn_session_register(ses, tn_tuple_new("s", strdup("quit")), 0, tup_quit, NULL);
+ // TODO: also register patterns with a handler that replies to a return path
+ // and one that sends out a tuple itself.
+
+ // Create a link with STDIO
+ pthread_t thb;
+ lnk = tn_link_pthread_fd(n, 1, STDIN_FILENO, STDOUT_FILENO, &thb);
+ tn_link_on_error(lnk, lnk_error);
+ tn_link_start(lnk);
+
+ // Close
+ tn_node_unref(n);
+ pthread_join(tha, NULL);
+ pthread_join(thb, NULL);
+ return 0;
+}
+
+// vim:noet:sw=4:ts=4
diff --git a/test/pthread.c b/test/pthread.c
index 02ee4c7..8c50e6c 100644
--- a/test/pthread.c
+++ b/test/pthread.c
@@ -29,7 +29,6 @@
extern tn_session *tn_session_pthread(tn_node *, pthread_t *);
-extern tn_link *tn_link_pthread_fd(tn_node *, int, int, int, pthread_t *);
#define t_assert(n) printf("%sok %d - " __FILE__ ":%d: " #n "\n", (n)?"":"not ", ++test_num, __LINE__)