summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2016-09-05 09:49:24 +0200
committerYorhel <git@yorhel.nl>2016-09-05 09:49:24 +0200
commit39515558ad68edffc6fe52936714fcefbfc58949 (patch)
treecb6891169c101638ff798b268b0aa66328845e3a
parent72813e1af8aaeef36a22592b83204dcbb48afdd9 (diff)
Add some buffer handling + improve error handling a bit
-rw-r--r--Cargo.lock7
-rw-r--r--Cargo.toml1
-rw-r--r--src/eventloop.rs16
-rw-r--r--src/itf_http.rs118
-rw-r--r--src/listener.rs31
-rw-r--r--src/main.rs2
6 files changed, 163 insertions, 12 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2244df6..f670169 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6,6 +6,7 @@ dependencies = [
"getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "netbuf 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"nom 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -114,6 +115,11 @@ dependencies = [
]
[[package]]
+name = "netbuf"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "nix"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -226,6 +232,7 @@ dependencies = [
"checksum mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2dadd39d4b47343e10513ac2a731c979517a4761224ecb6bbd243602300c9537"
"checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a"
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
+"checksum netbuf 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "464910395746223293a381447af789668a3732e9564778f8bd94aaefb9543952"
"checksum nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7bb1da2be7da3cbffda73fc681d509ffd9e665af478d2bee1907cee0bc64b2"
"checksum nom 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b8c256fd9471521bcb84c3cdba98921497f1a331cbc15b8030fc63b82050ce"
"checksum regex 0.1.73 (registry+https://github.com/rust-lang/crates.io-index)" = "56b7ee9f764ecf412c6e2fff779bca4b22980517ae335a21aeaf4e32625a5df2"
diff --git a/Cargo.toml b/Cargo.toml
index 19f4424..541ce84 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,3 +10,4 @@ getopts = "0.2"
nom = "1.2"
mio = "0.6"
slab = "0.3.0"
+netbuf = "0.3.2"
diff --git a/src/eventloop.rs b/src/eventloop.rs
index 2cf8ecd..84f1e77 100644
--- a/src/eventloop.rs
+++ b/src/eventloop.rs
@@ -66,7 +66,6 @@ impl<'a> Context<'a> {
slab_insert(&mut self.parent.tokens, self.machine)
}
- #[allow(dead_code)]
pub fn unassign(&mut self, t: Token) {
assert_eq!(self.parent.tokens[t], self.machine);
self.parent.tokens.remove(t);
@@ -78,22 +77,22 @@ impl<'a> Context<'a> {
self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap();
}
- #[allow(dead_code)]
pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap();
}
- #[allow(dead_code)]
pub fn deregister<E: ?Sized>(&mut self, io: &E) where E: mio::Evented {
self.parent.poll.deregister(io).unwrap();
}
- #[allow(dead_code)]
pub fn remove(&mut self) {
self.removed = true;
}
- #[allow(dead_code)]
+ pub fn is_removed(&self) -> bool {
+ self.removed
+ }
+
pub fn spawn(&mut self, machine: Box<Machine>) {
self.parent.spawn(machine);
}
@@ -111,6 +110,7 @@ impl EventLoop {
pub fn spawn(&mut self, mut machine: Box<Machine>) {
let mtoken = slab_insert(&mut self.machines, None);
+ trace!("Spawning machine {}", mtoken.0);
{
let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
machine.init(&mut ctx);
@@ -124,11 +124,11 @@ impl EventLoop {
debug!("Entering event loop");
loop {
self.poll.poll(&mut events, None).unwrap();
- trace!("Poll returned with {} events", events.len());
for event in events.iter() {
let mtoken = self.tokens[event.token()];
- let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap();
+ trace!("Poll returned event {} on machine {} state {:?}", event.token().0, mtoken.0, event.kind());
+ let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap();
let removed = {
let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
machine.handle(&mut ctx, event);
@@ -137,6 +137,8 @@ impl EventLoop {
if !removed {
self.machines[mtoken] = Some(machine);
+ } else {
+ trace!("Removing machine {}", mtoken.0);
}
}
}
diff --git a/src/itf_http.rs b/src/itf_http.rs
new file mode 100644
index 0000000..f3995cd
--- /dev/null
+++ b/src/itf_http.rs
@@ -0,0 +1,118 @@
+use mio::{Ready,Token,Event};
+use mio::tcp::TcpStream;
+use std::net::SocketAddr;
+use std::io::ErrorKind;
+use eventloop::{Machine,Context};
+use netbuf::Buf;
+
+
+// TODO: timeouts
+// TODO: Set a max size on the buffers and provide backpressure
+pub struct ItfHttp {
+ sock: TcpStream,
+ addr: SocketAddr,
+ io: Token,
+ ioreg: Ready,
+ rbuf: Buf,
+ wbuf: Buf
+}
+
+
+impl ItfHttp {
+ pub fn new(sock: TcpStream, addr: SocketAddr) -> ItfHttp {
+ ItfHttp {
+ sock: sock,
+ addr: addr,
+ io: Token(0),
+ ioreg: Ready::readable(),
+ rbuf: Buf::new(),
+ wbuf: Buf::new()
+ }
+ }
+
+ fn handle_read(&mut self, ctx: &mut Context) {
+ match self.rbuf.read_from(&mut self.sock) {
+ Err(err) => {
+ match err.kind() {
+ ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
+ _ => {
+ debug!("{}: Read error: {}", self.addr, err);
+ self.remove(ctx);
+ }
+ }
+ return;
+ },
+ Ok(0) => { // This behaviour isn't documented, unsure if it's intended.
+ debug!("{}: Connection closed", self.addr);
+ self.remove(ctx);
+ return;
+ },
+ _ => {}
+ }
+ // This is where we parse the stuff, generate a request object, throw the request object
+ // through some handlers to get a response and send it back. But for now let's just act
+ // like we wanted to be an echo server all along.
+ self.rbuf.write_to(&mut self.wbuf).unwrap();
+ }
+
+ fn handle_write(&mut self, ctx: &mut Context) {
+ if let Err(err) = self.wbuf.write_to(&mut self.sock) {
+ match err.kind() {
+ ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
+ _ => {
+ debug!("{}: Write error: {}", self.addr, err);
+ self.remove(ctx);
+ }
+ }
+ }
+ }
+
+ fn set_ioreg(&mut self, ctx: &mut Context) {
+ // Optimization: If the write buffer was empty before but contains data now, we can already
+ // try to write it out at this point. This way we can avoid going through the event loop if
+ // the OS buffers have enough space.
+ // TODO: Measure how effective this is in practice; if this only tends to happen on the
+ // first write to a new socket then it might not be worth the effort.
+ if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) {
+ self.handle_write(ctx);
+ if ctx.is_removed() {
+ return;
+ }
+ }
+
+ let mut reg = Ready::readable();
+ if self.wbuf.len() > 0 {
+ reg.insert(Ready::writable());
+ }
+ if reg != self.ioreg {
+ self.ioreg = reg;
+ ctx.reregister(&self.sock, self.io, self.ioreg);
+ }
+ }
+
+ fn remove(&mut self, ctx: &mut Context) {
+ ctx.deregister(&self.sock);
+ ctx.unassign(self.io);
+ ctx.remove();
+ }
+}
+
+
+impl Machine for ItfHttp {
+ fn init(&mut self, ctx: &mut Context) {
+ self.io = ctx.assign();
+ ctx.register(&self.sock, self.io, self.ioreg);
+ }
+
+ fn handle(&mut self, ctx: &mut Context, ev: Event) {
+ if ev.kind().is_readable() {
+ self.handle_read(ctx);
+ }
+ if !ctx.is_removed() && ev.kind().is_writable() {
+ self.handle_write(ctx);
+ }
+ if !ctx.is_removed() {
+ self.set_ioreg(ctx);
+ }
+ }
+}
diff --git a/src/listener.rs b/src/listener.rs
index e6683f3..293099e 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -1,13 +1,16 @@
use mio::{Ready,Token,Event};
use mio::tcp::TcpListener;
use std::net::SocketAddr;
-use std::io::Result;
+use std::io::{Result,ErrorKind};
use config::Config;
use eventloop::{Machine,Context,EventLoop};
+use itf_http;
+
struct Listener {
sock: TcpListener,
+ addr: SocketAddr,
io: Token
}
@@ -18,6 +21,7 @@ impl Listener {
let sock = try!(TcpListener::bind(addr));
Ok(Listener {
sock: sock,
+ addr: *addr,
io: Token(0)
})
}
@@ -28,12 +32,29 @@ impl Machine for Listener {
fn init(&mut self, ctx: &mut Context) {
self.io = ctx.assign();
ctx.register(&self.sock, self.io, Ready::readable());
- info!("Listening on {}", self.sock.local_addr().unwrap());
+ info!("Listening on {}", self.addr);
}
- fn handle(&mut self, _: &mut Context, _: Event) {
- let(_, addr) = self.sock.accept().unwrap(); // TODO: ERROR HANDLING!
- debug!("New connection from {}", addr);
+ fn handle(&mut self, ctx: &mut Context, _: Event) {
+ // TODO: max_connections or something
+ match self.sock.accept() {
+ Err(err) => {
+ match err.kind() {
+ ErrorKind::WouldBlock |
+ ErrorKind::Interrupted |
+ ErrorKind::TimedOut => { },
+ _ => {
+ error!("Error accepting on {}: {}", self.addr, err);
+ // If the error is persistent, we may be getting into an infinite loop.
+ // TODO: Have a back-off timer here (especially for EMFILE).
+ }
+ }
+ },
+ Ok((sock, addr)) => {
+ debug!("{}: New connection", addr);
+ ctx.spawn(Box::new(itf_http::ItfHttp::new(sock, addr)));
+ }
+ }
}
}
diff --git a/src/main.rs b/src/main.rs
index 9a90486..5b990c9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -11,10 +11,12 @@ extern crate env_logger;
extern crate getopts;
extern crate mio;
extern crate slab;
+extern crate netbuf;
mod config;
mod eventloop;
mod listener;
+mod itf_http;
use std::process::exit;
use std::io::prelude::*;