summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-02 10:29:39 +0100
committerYorhel <git@yorhel.nl>2012-03-02 10:29:39 +0100
commitf47c2efd2cb4c5c868f4b2da168e565cb681b3d4 (patch)
treeda990c23528c94adda54abb3db19f5225d233ed8
parentbdde01b591a6b0a65fb100d83a7a7a604d2da820 (diff)
c: Added pthread context for links + fixed a few bugs
It can do a successful handshake now, yay!
-rw-r--r--c/tanja.c62
-rw-r--r--c/tanja.h4
-rw-r--r--c/tanja_pthread.c161
-rw-r--r--c/test.c9
4 files changed, 198 insertions, 38 deletions
diff --git a/c/tanja.c b/c/tanja.c
index 8fc73bf..17db8ff 100644
--- a/c/tanja.c
+++ b/c/tanja.c
@@ -840,17 +840,19 @@ struct tn_link {
lbuf_init(b);\
} while(0)
-#define lbuf_grow(b) do {\
+#define lbuf_grow(b, m) do {\
if(b##_size < 16)\
b##_size = 16;\
else\
b##_size *= 2;\
+ if(b##_size < m)\
+ b##_size = m;\
b = realloc(b, b##_size);\
} while(0)
#define lbuf_append(b, buf, len) do {\
if(b##_size < b##_len + len)\
- lbuf_grow(b);\
+ lbuf_grow(b, b##_len + len);\
memcpy(b+b##_len, buf, len);\
b##_len += len;\
} while(0)
@@ -881,21 +883,28 @@ tn_link *tn_link_create(tn_node *n, int sync, tn_link_context *ctx, void *data)
l->ctx = ctx;
l->data = data;
l->handshaked = 0;
+ mutex_init(l->lock);
lbuf_init(l->rbuf);
lbuf_init(l->wbuf);
- tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28);
return l;
}
+void tn_link_start(tn_link *l) {
+ tn_link_write(l, "ver,1.0 seri,json sero,json\n", 28);
+}
+
+
#define inc(n) do { buf+=n; len-=n; } while(0)
static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len) {
int ok = 0;
while(len > 0) {
// Space, ignore
- if(*buf == ' ')
+ if(*buf == ' ') {
+ inc(1);
continue;
+ }
// Version
if(len > 4 && strncmp(buf, "ver,", 4) == 0) {
@@ -912,13 +921,14 @@ static inline void tn_link_handlehandshake(tn_link *l, const char *buf, int len)
int flag = buf[3] == 'i' ? 2 : 4;
inc(4);
while(len && *buf != ' ') {
- if(len >= 5 && strncmp(buf, ",json", 5) == 0)
+ if(len >= 5 && strncmp(buf, ",json", 5) == 0 && (len == 5 || buf[5] == ' ' || buf[5] == ','))
ok |= flag;
inc(1);
}
}
}
+ if(l) {}
/* TODO
if(ok != 1|2|4) {
error(!(ok&1) ? "No or invalid version" : !(ok&2) : "No common input format" : "No common output format");
@@ -981,6 +991,48 @@ void tn_link_read(tn_link *l, const char *buf, int len) {
}
+void tn_link_dispatch(tn_link *l) {
+ // Call write() if we have something buffered.
+ // Note: We should not call write() while holding the lock, since that may
+ // block other threads. Instead, we take control over the write buffer and
+ // temporarily replace it with an empty one so that other threads can keep
+ // writing without blocking or losing the new data.
+ mutex_lock(l->lock);
+ if(l->wbuf_len) {
+ char *buf = l->wbuf;
+ int buf_len = l->wbuf_len;
+ int buf_size = l->wbuf_size;
+ lbuf_init(l->wbuf);
+ mutex_unlock(l->lock);
+
+ int n = l->ctx->write(l, buf, buf_len, l->data);
+ assert(n >= 0); // TODO: error reporting!
+
+ mutex_lock(l->lock);
+ if(n > 0) {
+ memmove(buf, buf+n, buf_len-n);
+ buf_len -= n;
+ }
+ // If our buffer has been emptied, then just free it and leave the write buffer alone.
+ if(!buf_len)
+ lbuf_free(buf);
+ else {
+ // Oops, someone else has written to the buffer while we were being
+ // busy. Append this data to our buffer.
+ if(l->wbuf_len)
+ lbuf_append(buf, l->wbuf, l->wbuf_len);
+ // Now give back our temporary buffer
+ if(l->wbuf)
+ free(l->wbuf);
+ l->wbuf = buf;
+ l->wbuf_len = buf_len;
+ l->wbuf_size = buf_size;
+ }
+ }
+ mutex_unlock(l->lock);
+}
+
+
void tn_link_close(tn_link *l) {
mutex_lock(l->lock);
lbuf_free(l->wbuf);
diff --git a/c/tanja.h b/c/tanja.h
index dbe690e..17a5b8e 100644
--- a/c/tanja.h
+++ b/c/tanja.h
@@ -61,7 +61,7 @@ typedef struct {
typedef struct {
void (*dispatch)(tn_link *, void *); // if NULL, then write() will be called immediately
- void (*write)(tn_link *, char *, int, void *);
+ int (*write)(tn_link *, char *, int, void *);
void (*close)(tn_link *, void *);
} tn_link_context;
@@ -96,7 +96,9 @@ void tn_session_close(tn_session *);
tn_link *tn_link_create(tn_node *, int, tn_link_context *, void *);
+void tn_link_start(tn_link *);
void tn_link_read(tn_link *, const char *, int);
+void tn_link_dispatch(tn_link *);
void tn_link_close(tn_link *);
// vim:noet:sw=4:ts=4
diff --git a/c/tanja_pthread.c b/c/tanja_pthread.c
index 90308e2..1980533 100644
--- a/c/tanja_pthread.c
+++ b/c/tanja_pthread.c
@@ -5,18 +5,46 @@
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
+#include <errno.h>
#include "tanja.h"
+static void sig(int x) {
+ x++;
+ // No need to do anything, just make sure that any blocking function gets
+ // interrupted.
+}
+
+
+// Make sure to handle SIGUSR2
+static int setsig() {
+ static int init = 0;
+ if(init++)
+ return 1;
+ struct sigaction a;
+ memset(&a, 0, sizeof(a));
+ sigemptyset(&a.sa_mask);
+ a.sa_flags = 0;
+ a.sa_handler = sig;
+ if(sigaction(SIGUSR2, &a, NULL) < 0) {
+ init = 0;
+ return 0;
+ }
+ return 1;
+}
+
+
+
+
typedef struct {
pthread_t self;
tn_session *s;
-} pt_dat;
+} pt_sdat;
-static void dispatch(tn_session *s, void *_dat) {
+static void sdispatch(tn_session *s, void *_dat) {
if(s) {}
- pt_dat *dat = _dat;
+ pt_sdat *dat = _dat;
pthread_kill(dat->self, SIGUSR2);
}
@@ -26,17 +54,16 @@ static void dispatch(tn_session *s, void *_dat) {
// place).
static void sclose(tn_session *s, void *_dat) {
if(s) {}
- pt_dat *dat = _dat;
+ pt_sdat *dat = _dat;
dat->s = NULL;
pthread_kill(dat->self, SIGUSR2);
}
-static void *thread(void *_dat) {
- pt_dat *dat = _dat;
+static void *sthread(void *_dat) {
+ pt_sdat *dat = _dat;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED);
- // TODO: sleep() -> read() if this is a link
while(dat->s) {
tn_session_dispatch(dat->s);
if(dat->s)
@@ -47,37 +74,18 @@ static void *thread(void *_dat) {
}
-static void sig(int x) {
- x++;
- // No need to do anything, just make sure that any blocking function gets
- // interrupted.
-}
-
-
tn_session *tn_session_pthread(tn_node *n, pthread_t *t) {
- static int init = 0;
static tn_session_context ctx = {
- dispatch,
+ sdispatch,
sclose
};
- if(!init++) {
- // Make sure to handle SIGUSR2
- struct sigaction a;
- memset(&a, 0, sizeof(a));
- sigemptyset(&a.sa_mask);
- a.sa_flags = 0;
- a.sa_handler = sig;
- if(sigaction(SIGUSR2, &a, NULL) < 0) {
- init = 0;
- return NULL;
- }
- }
+ if(!setsig())
+ return NULL;
- pt_dat *dat = malloc(sizeof(pt_dat));
+ pt_sdat *dat = malloc(sizeof(pt_sdat));
dat->s = tn_session_create(n, &ctx, dat);
-
- if(pthread_create(&(dat->self), NULL, thread, dat) < 0) {
+ if(pthread_create(&(dat->self), NULL, sthread, dat) < 0) {
tn_session_close(dat->s);
return NULL;
}
@@ -90,4 +98,95 @@ tn_session *tn_session_pthread(tn_node *n, pthread_t *t) {
return dat->s;
}
+
+
+#define READBUFSIZE (10*1024)
+
+typedef struct {
+ pthread_t self;
+ tn_link *l;
+ int fdin;
+ int fdout;
+} pt_fdldat;
+
+
+static void fdldispatch(tn_link *l, void *_dat) {
+ if(l) {}
+ pt_fdldat *dat = _dat;
+ pthread_kill(dat->self, SIGUSR2);
+}
+
+
+// TODO: Same note as for link_close() applies here, I guess. But for links it
+// actually has value...
+static void fdlclose(tn_link *l, void *_dat) {
+ if(l) {}
+ pt_fdldat *dat = _dat;
+ dat->l = NULL;
+ pthread_kill(dat->self, SIGUSR2);
+}
+
+
+static int fdlwrite(tn_link *l, char *buf, int len, void *_dat) {
+ if(l) {}
+ pt_fdldat *dat = _dat;
+ int n = write(dat->fdout, buf, len);
+ // interrupt? 0 byte written. disconnect? error.
+ return n < 0 && errno == EINTR ? 0 : n == 0 ? -1 : n;
+}
+
+
+static void *fdlthread(void *_dat) {
+ char buf[READBUFSIZE];
+ pt_fdldat *dat = _dat;
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, PTHREAD_CANCEL_DEFERRED);
+ while(dat->l) {
+ tn_link_dispatch(dat->l);
+ if(dat->l) {
+ int n = read(dat->fdin, buf, READBUFSIZE);
+ // Note: Don't check for EAGAIN or EWOULDBLOCK, the fd should not
+ // be non-blocking so that is an error.
+ if(n < 0 && errno == EINTR)
+ continue;
+ if(n <= 0) {
+ // TODO: report error / disconnect
+ tn_link_close(dat->l);
+ } else
+ tn_link_read(dat->l, buf, n);
+ }
+ }
+ free(dat);
+ return NULL;
+}
+
+
+tn_link *tn_link_pthread_fd(tn_node *n, int sync, int fdin, int fdout, pthread_t *t) {
+ static tn_link_context ctx = {
+ fdldispatch,
+ fdlwrite,
+ fdlclose
+ };
+
+ if(!setsig())
+ return NULL;
+
+ pt_fdldat *dat = malloc(sizeof(pt_fdldat));
+ dat->fdin = fdin;
+ dat->fdout = fdout;
+ dat->l = tn_link_create(n, sync, &ctx, dat);
+ if(pthread_create(&dat->self, NULL, fdlthread, dat) < 0) {
+ tn_link_close(dat->l);
+ return NULL;
+ }
+
+ if(t)
+ memcpy(t, &dat->self, sizeof(pthread_t));
+ else
+ pthread_detach(dat->self);
+
+ return dat->l;
+}
+
+
// vim:noet:sw=4:ts=4
diff --git a/c/test.c b/c/test.c
index 485bca4..23b8ba0 100644
--- a/c/test.c
+++ b/c/test.c
@@ -8,6 +8,7 @@
extern tn_session *tn_session_pthread(tn_node *, pthread_t *);
+extern tn_link *tn_link_pthread_fd(tn_node *, int, int, int, pthread_t *);
void do_quit(tn_session *s, tn_tuple *tup, tn_returnpath *p, void *d) {
if(d) {}
@@ -34,12 +35,18 @@ int main() {
pthread_t th;
tn_session *s = tn_session_pthread(n, &th);
- tn_node_unref(n);
tn_session_register(s, tn_tuple_new(""), 0, do_quit, NULL);
tn_session_send(s, tn_tuple_new("i", 1), NULL, NULL);
pthread_join(th, NULL);
+
+ // And now a link with STDIO
+ tn_link *l = tn_link_pthread_fd(n, 1, STDIN_FILENO, STDOUT_FILENO, &th);
+ tn_link_start(l);
+ pthread_join(th, NULL);
+
+ tn_node_unref(n);
return 0;
}