summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tanja_pthread.h (renamed from tanja_pthread.c)90
-rw-r--r--test/Makefile8
-rw-r--r--test/link.c6
-rw-r--r--test/pthread.c5
4 files changed, 52 insertions, 57 deletions
diff --git a/tanja_pthread.c b/tanja_pthread.h
index e0a6d0b..5b79621 100644
--- a/tanja_pthread.c
+++ b/tanja_pthread.h
@@ -24,6 +24,10 @@
// This is a very crude one-thread-per-session "event" model.
+
+#ifndef _TANJA_PTHREAD_H
+#define _TANJA_PTHREAD_H
+
#include <stdlib.h>
#include <string.h>
#include <assert.h>
@@ -38,28 +42,22 @@
#include "tanja.h"
-static void sig(int x) {
+static inline void _tn_pthread_sig(int x) {
assert(x == SIGUSR2);
// No need to do anything, just make sure that any blocking function gets
// interrupted.
}
-// Make sure to handle SIGUSR2
-static int setsig() {
- static int init = 0;
- if(init++)
- return 1;
+// Make sure to handle SIGUSR2. Call this function before creating any sessions
+// or nodes. Returns 0 on success, something else otherwise.
+static inline int tn_pthread_setsig() {
struct sigaction a;
memset(&a, 0, sizeof(a));
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
- a.sa_handler = sig;
- if(sigaction(SIGUSR2, &a, NULL) < 0) {
- init = 0;
- return 0;
- }
- return 1;
+ a.sa_handler = _tn_pthread_sig;
+ return sigaction(SIGUSR2, &a, NULL);
}
@@ -68,18 +66,18 @@ static int setsig() {
typedef struct {
pthread_t self;
tn_session *s;
-} pt_sdat;
+} _tn_pthread_s;
-static void sdispatch(tn_session *s, void *_dat) {
- pt_sdat *dat = _dat;
+static inline void _tn_pthread_sd(tn_session *s, void *_dat) {
+ _tn_pthread_s *dat = _dat;
assert(dat->s == s);
pthread_kill(dat->self, SIGUSR2);
}
-static void *sthread(void *_dat) {
- pt_sdat *dat = _dat;
+static inline void *_tn_pthread_st(void *_dat) {
+ _tn_pthread_s *dat = _dat;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED);
sigset_t mask, oldmask;
@@ -95,14 +93,11 @@ static void *sthread(void *_dat) {
}
-tn_session *tn_session_pthread(tn_node *n, pthread_t *t) {
- if(!setsig())
- return NULL;
-
- pt_sdat *dat = malloc(sizeof(pt_sdat));
- dat->s = tn_session_new(n, sdispatch, dat);
+static inline tn_session *tn_session_pthread(tn_node *n, pthread_t *t) {
+ _tn_pthread_s *dat = malloc(sizeof(_tn_pthread_s));
+ dat->s = tn_session_new(n, _tn_pthread_sd, dat);
tn_session_ref(dat->s);
- if(pthread_create(&(dat->self), NULL, sthread, dat) < 0) {
+ if(pthread_create(&(dat->self), NULL, _tn_pthread_st, dat) < 0) {
tn_session_unref(dat->s);
tn_session_close(dat->s);
return NULL;
@@ -118,18 +113,22 @@ tn_session *tn_session_pthread(tn_node *n, pthread_t *t) {
-#define READBUFSIZE (10*1024)
+
+#ifndef TN_PTHREAD_READBUFSIZE
+# define TN_PTHREAD_READBUFSIZE (10*1024)
+#endif
+
typedef struct {
pthread_t self;
tn_link *l;
int fdin;
int fdout;
-} pt_fdldat;
+} _tn_pthread_l;
-static void fdldispatch(tn_link *l, void *_dat) {
- pt_fdldat *dat = _dat;
+static inline void _tn_pthread_ld(tn_link *l, void *_dat) {
+ _tn_pthread_l *dat = _dat;
assert(l == dat->l);
pthread_kill(dat->self, SIGUSR2);
}
@@ -137,15 +136,15 @@ static void fdldispatch(tn_link *l, void *_dat) {
// Just interrupt whatever is going on, the thread loop will empty the write
// buffer if there's something to write.
-static int fdlwdispatch(tn_link *l, char *buf, int len, void *_dat) {
+static inline int _tn_pthread_ldw(tn_link *l, char *buf, int len, void *_dat) {
assert(buf && len > 0);
- fdldispatch(l, _dat);
+ _tn_pthread_ld(l, _dat);
return 0;
}
// Returns 0 if there is nothing more to write, 1 otherwise.
-static int fdlwrite(tn_link *l, sigset_t *mask, int fd) {
+static inline int _tn_pthread_lwr(tn_link *l, sigset_t *mask, int fd) {
char *buf = NULL;
int len = tn_link_startwrite(l, &buf);
if(!len)
@@ -166,14 +165,14 @@ static int fdlwrite(tn_link *l, sigset_t *mask, int fd) {
}
-static void fdlread(tn_link *l, sigset_t *mask, int fd) {
- char buf[READBUFSIZE];
+static inline void _tn_pthread_lrd(tn_link *l, sigset_t *mask, int fd) {
+ char buf[TN_PTHREAD_READBUFSIZE];
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
int n = pselect(fd+1, &fds, NULL, NULL, NULL, mask);
if(n > 0)
- n = read(fd, buf, READBUFSIZE);
+ n = read(fd, buf, TN_PTHREAD_READBUFSIZE);
if(n < 0 && errno == EINTR)
return;
if(n <= 0) // pselect() shouldn't return 0, so that must come from the read()
@@ -183,8 +182,8 @@ static void fdlread(tn_link *l, sigset_t *mask, int fd) {
}
-static void *fdlthread(void *_dat) {
- pt_fdldat *dat = _dat;
+static inline void *_tn_pthread_lt(void *_dat) {
+ _tn_pthread_l *dat = _dat;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED);
sigset_t mask, oldmask;
@@ -197,14 +196,14 @@ static void *fdlthread(void *_dat) {
// If there's a callback waiting for the application, dispatch that
if(!tn_link_dispatch(dat->l))
goto end;
- } while(fdlwrite(dat->l, &oldmask, dat->fdout));
+ } while(_tn_pthread_lwr(dat->l, &oldmask, dat->fdout));
// If fdlwrite() returned 0, that could also mean that there was an error, check & dispatch.
if(!tn_link_dispatch(dat->l))
goto end;
// Now get to sleep again in a read call.
- fdlread(dat->l, &oldmask, dat->fdin);
+ _tn_pthread_lrd(dat->l, &oldmask, dat->fdin);
}
end:
tn_link_unref(dat->l);
@@ -214,21 +213,18 @@ end:
}
-tn_link *tn_link_pthread_fd(tn_node *n, int fdin, int fdout, pthread_t *t) {
+static inline tn_link *tn_link_pthread_fd(tn_node *n, int fdin, int fdout, pthread_t *t) {
static tn_link_context ctx = {
- fdldispatch,
- fdlwdispatch
+ _tn_pthread_ld,
+ _tn_pthread_ldw
};
- if(!setsig())
- return NULL;
-
- pt_fdldat *dat = malloc(sizeof(pt_fdldat));
+ _tn_pthread_l *dat = malloc(sizeof(_tn_pthread_l));
dat->fdin = fdin;
dat->fdout = fdout;
dat->l = tn_link_new(n, &ctx, dat);
tn_link_ref(dat->l);
- if(pthread_create(&dat->self, NULL, fdlthread, dat) < 0) {
+ if(pthread_create(&dat->self, NULL, _tn_pthread_lt, dat) < 0) {
tn_link_unref(dat->l);
tn_link_close(dat->l);
return NULL;
@@ -243,4 +239,6 @@ tn_link *tn_link_pthread_fd(tn_node *n, int fdin, int fdout, pthread_t *t) {
}
+#endif // _TANJA_PTHREAD_H
+
// vim:noet:sw=4:ts=4
diff --git a/test/Makefile b/test/Makefile
index e99a412..8d71f6e 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -17,11 +17,11 @@ json: $(DEP) json.c
tuple: $(DEP) tuple.c
$(COMP) tuple.c -o tuple
-pthread: $(DEP) ../tanja_pthread.c pthread.c
- $(COMP) $(THREAD) ../tanja_pthread.c pthread.c -o pthread
+pthread: $(DEP) ../tanja_pthread.h pthread.c
+ $(COMP) $(THREAD) pthread.c -o pthread
-link: $(DEP) ../tanja_pthread.c link.c
- $(COMP) $(THREAD) ../tanja_pthread.c link.c -o link
+link: $(DEP) ../tanja_pthread.h link.c
+ $(COMP) $(THREAD) link.c -o link
test: json tuple pthread
prove ./json ./tuple ./pthread
diff --git a/test/link.c b/test/link.c
index 178278a..7c1e382 100644
--- a/test/link.c
+++ b/test/link.c
@@ -29,10 +29,7 @@
#include <assert.h>
#include <unistd.h>
#include "tanja.h"
-
-
-extern tn_session *tn_session_pthread(tn_node *, pthread_t *);
-extern tn_link *tn_link_pthread_fd(tn_node *, int, int, pthread_t *);
+#include "tanja_pthread.h"
static tn_link *lnk;
static tn_session *ses;
@@ -119,6 +116,7 @@ static void lnk_ready(tn_link *l) {
int main() {
+ tn_pthread_setsig();
tn_node *n = tn_node_new();
// Create a session for debugging
diff --git a/test/pthread.c b/test/pthread.c
index 8c1d87f..f305f97 100644
--- a/test/pthread.c
+++ b/test/pthread.c
@@ -26,9 +26,7 @@
#include <string.h>
#include <pthread.h>
#include "tanja.h"
-
-
-extern tn_session *tn_session_pthread(tn_node *, pthread_t *);
+#include "tanja_pthread.h"
#define t_assert(n) printf("%sok %d - " __FILE__ ":%d: " #n "\n", (n)?"":"not ", ++test_num, __LINE__)
@@ -118,6 +116,7 @@ static void recv_reply(tn_session *s, tn_tuple *tup, void *d) {
int main() {
+ tn_pthread_setsig();
tn_node *n = tn_node_new();
t_assert(n != NULL);