summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-28 21:20:40 +0100
committerYorhel <git@yorhel.nl>2012-02-28 21:20:40 +0100
commitb98655e0b203f84e31b25a697a9f6215097ea460 (patch)
tree51b61d688f8e6c305a04d58ab0ac59399efd081a
parentc25b9aade0cff48672ba0c362fca5546a3032e51 (diff)
c: Added crude pthread-based "event loop" + fixed a few bugs
-rw-r--r--c/Makefile7
-rw-r--r--c/tanja.c15
-rw-r--r--c/tanja.h1
-rw-r--r--c/tanja_pthread.c93
-rw-r--r--c/test.c23
5 files changed, 135 insertions, 4 deletions
diff --git a/c/Makefile b/c/Makefile
index 2bf005f..42063cb 100644
--- a/c/Makefile
+++ b/c/Makefile
@@ -4,5 +4,8 @@ CFLAGS=-Wall -Wextra -g -O3 -DTANJA_THREADSAFE
tanja.o: Makefile tanja.c tanja.h
$(CC) $(CFLAGS) -c tanja.c -o tanja.o
-test: test.c tanja.o
- $(CC) $(CFLAGS) $(LDFLAGS) tanja.o test.c -o test
+tanja_pthread.o: tanja.o tanja_pthread.c
+ $(CC) $(CFLAGS) -c tanja_pthread.c -o tanja_pthread.o
+
+test: test.c tanja_pthread.o
+ $(CC) $(CFLAGS) $(LDFLAGS) tanja.o tanja_pthread.o test.c -lpthread -o test
diff --git a/c/tanja.c b/c/tanja.c
index 46945ec..4d2286a 100644
--- a/c/tanja.c
+++ b/c/tanja.c
@@ -450,6 +450,7 @@ static inline patternreg *patternreg_create_ses(tn_tuple *pat, int willreply, tn
patternreg *r = malloc(sizeof(patternreg));
r->ref = 1;
r->active = 1;
+ r->type = 0;
r->pat = pat;
r->willreply = willreply;
r->recipient = ses; // No need to ref() this, the pattern is unregistered when the session is closed anyway
@@ -570,6 +571,10 @@ static void tn_node_unregsession(tn_node *n, tn_session *s) {
}
+// The reference held on the tuple is passed to the node. So the following
+// works without memory leaks:
+// tn_session_send(.., tn_tuple_new(..), ..);
+//
// TODO: don't send tuples back to the link it came from
static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
mutex_lock(n->lock);
@@ -594,6 +599,9 @@ static void tn_node_send(tn_node *n, tn_tuple *tup, tn_returnpath *rp) {
tn_returnpath_open(rp);
tn_reply_close(rp);
}
+
+ // Free the tuple if no sessions/links were interested in it.
+ tn_tuple_unref(tup);
}
@@ -681,7 +689,7 @@ void tn_session_unregister(tn_session *s, int id) {
// Assumes s->lock is held, and will unlock it.
static inline void tn_session_appendmsg(tn_session *s, sesmsg *m) {
m->next = NULL;
- int empty = !!s->q_begin;
+ int empty = !s->q_begin;
if(empty)
s->q_begin = m;
else
@@ -752,16 +760,19 @@ static inline void tn_session_dispatch_one(tn_session *s, sesmsg *m) {
void tn_session_dispatch(tn_session *s) {
sesmsg *q;
+ tn_session_ref(s);
do {
// Pop item from the start of the queue
mutex_lock(s->lock);
assert(s->active || !s->q_begin); // Queue must be (and stay) empty if the session has been closed
q = s->q_begin;
- s->q_begin = q->next;
+ if(q)
+ s->q_begin = q->next;
mutex_unlock(s->lock);
if(q)
tn_session_dispatch_one(s, q);
} while(q);
+ tn_session_unref(s);
}
diff --git a/c/tanja.h b/c/tanja.h
index 5191396..6fddebc 100644
--- a/c/tanja.h
+++ b/c/tanja.h
@@ -84,6 +84,7 @@ tn_session *tn_session_create(tn_node *, tn_session_context *, void *);
void tn_session_send(tn_session *, tn_tuple *, tn_reply_cb, void *dat);
int tn_session_register(tn_session *, tn_tuple *, int, tn_tuple_cb, void *);
void tn_session_unregister(tn_session *, int);
+void tn_session_dispatch(tn_session *);
void tn_session_close(tn_session *);
// vim:noet:sw=4:ts=4
diff --git a/c/tanja_pthread.c b/c/tanja_pthread.c
new file mode 100644
index 0000000..90308e2
--- /dev/null
+++ b/c/tanja_pthread.c
@@ -0,0 +1,93 @@
+// This is a very crude one-thread-per-session "event" model.
+
+#include <pthread.h>
+#include <signal.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include "tanja.h"
+
+
+typedef struct {
+ pthread_t self;
+ tn_session *s;
+} pt_dat;
+
+
+static void dispatch(tn_session *s, void *_dat) {
+ if(s) {}
+ pt_dat *dat = _dat;
+ pthread_kill(dat->self, SIGUSR2);
+}
+
+
+// Calling session_close() from within another thread may be a bit racy, I'm
+// afraid. Have to look into that (if it's going to be allowed in the first
+// place).
+static void sclose(tn_session *s, void *_dat) {
+ if(s) {}
+ pt_dat *dat = _dat;
+ dat->s = NULL;
+ pthread_kill(dat->self, SIGUSR2);
+}
+
+
+static void *thread(void *_dat) {
+ pt_dat *dat = _dat;
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED);
+ // TODO: sleep() -> read() if this is a link
+ while(dat->s) {
+ tn_session_dispatch(dat->s);
+ if(dat->s)
+ sleep(10);
+ }
+ free(dat);
+ return NULL;
+}
+
+
+static void sig(int x) {
+ x++;
+ // No need to do anything, just make sure that any blocking function gets
+ // interrupted.
+}
+
+
+tn_session *tn_session_pthread(tn_node *n, pthread_t *t) {
+ static int init = 0;
+ static tn_session_context ctx = {
+ dispatch,
+ sclose
+ };
+
+ if(!init++) {
+ // Make sure to handle SIGUSR2
+ struct sigaction a;
+ memset(&a, 0, sizeof(a));
+ sigemptyset(&a.sa_mask);
+ a.sa_flags = 0;
+ a.sa_handler = sig;
+ if(sigaction(SIGUSR2, &a, NULL) < 0) {
+ init = 0;
+ return NULL;
+ }
+ }
+
+ pt_dat *dat = malloc(sizeof(pt_dat));
+ dat->s = tn_session_create(n, &ctx, dat);
+
+ if(pthread_create(&(dat->self), NULL, thread, dat) < 0) {
+ tn_session_close(dat->s);
+ return NULL;
+ }
+
+ if(t)
+ memcpy(t, &(dat->self), sizeof(pthread_t));
+ else
+ pthread_detach(dat->self);
+
+ return dat->s;
+}
+
+// vim:noet:sw=4:ts=4
diff --git a/c/test.c b/c/test.c
index 5d1c621..485bca4 100644
--- a/c/test.c
+++ b/c/test.c
@@ -1,8 +1,23 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <pthread.h>
#include "tanja.h"
+
+extern tn_session *tn_session_pthread(tn_node *, pthread_t *);
+
+void do_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
+ if(d) {}
+ if(p) {}
+ tn_tuple_unref(tup);
+ tn_session_close(s);
+ printf("Closing!\n");
+}
+
+
int main() {
tn_tuple *t = tn_tuple_new("i*sam", 1293, strdup("som\\\"e_strin\01g"),
tn_array_new("i", INT64_MIN),
@@ -16,7 +31,15 @@ int main() {
free(buf);
tn_tuple_unref(t);
tn_node *n = tn_node_create();
+
+ pthread_t th;
+ tn_session *s = tn_session_pthread(n, &th);
tn_node_unref(n);
+
+ tn_session_register(s, tn_tuple_new(""), 0, do_quit, NULL);
+ tn_session_send(s, tn_tuple_new("i", 1), NULL, NULL);
+
+ pthread_join(th, NULL);
return 0;
}