From 39515558ad68edffc6fe52936714fcefbfc58949 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Mon, 5 Sep 2016 09:49:24 +0200 Subject: Add some buffer handling + improve error handling a bit --- Cargo.lock | 7 ++++ Cargo.toml | 1 + src/eventloop.rs | 16 ++++---- src/itf_http.rs | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/listener.rs | 31 ++++++++++++--- src/main.rs | 2 + 6 files changed, 163 insertions(+), 12 deletions(-) create mode 100644 src/itf_http.rs diff --git a/Cargo.lock b/Cargo.lock index 2244df6..f670169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,6 +6,7 @@ dependencies = [ "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "netbuf 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "nom 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -113,6 +114,11 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "netbuf" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "nix" version = "0.6.0" @@ -226,6 +232,7 @@ dependencies = [ "checksum mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2dadd39d4b47343e10513ac2a731c979517a4761224ecb6bbd243602300c9537" "checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" +"checksum netbuf 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "464910395746223293a381447af789668a3732e9564778f8bd94aaefb9543952" "checksum nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7bb1da2be7da3cbffda73fc681d509ffd9e665af478d2bee1907cee0bc64b2" "checksum nom 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b8c256fd9471521bcb84c3cdba98921497f1a331cbc15b8030fc63b82050ce" "checksum regex 0.1.73 (registry+https://github.com/rust-lang/crates.io-index)" = "56b7ee9f764ecf412c6e2fff779bca4b22980517ae335a21aeaf4e32625a5df2" diff --git a/Cargo.toml b/Cargo.toml index 19f4424..541ce84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ getopts = "0.2" nom = "1.2" mio = "0.6" slab = "0.3.0" +netbuf = "0.3.2" diff --git a/src/eventloop.rs b/src/eventloop.rs index 2cf8ecd..84f1e77 100644 --- a/src/eventloop.rs +++ b/src/eventloop.rs @@ -66,7 +66,6 @@ impl<'a> Context<'a> { slab_insert(&mut self.parent.tokens, self.machine) } - #[allow(dead_code)] pub fn unassign(&mut self, t: Token) { assert_eq!(self.parent.tokens[t], self.machine); self.parent.tokens.remove(t); @@ -78,22 +77,22 @@ impl<'a> Context<'a> { self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap(); } - #[allow(dead_code)] pub fn reregister(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); } - #[allow(dead_code)] pub fn deregister(&mut self, io: &E) where E: mio::Evented { self.parent.poll.deregister(io).unwrap(); } - #[allow(dead_code)] pub fn remove(&mut self) { self.removed = true; } - #[allow(dead_code)] + pub fn is_removed(&self) -> bool { + self.removed + } + pub fn spawn(&mut self, machine: Box) { self.parent.spawn(machine); } @@ -111,6 +110,7 @@ impl EventLoop { pub fn spawn(&mut self, mut machine: Box) { let mtoken = slab_insert(&mut self.machines, None); + trace!("Spawning machine {}", mtoken.0); { let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; machine.init(&mut ctx); @@ -124,11 +124,11 @@ impl EventLoop { debug!("Entering event loop"); loop { self.poll.poll(&mut events, None).unwrap(); - trace!("Poll returned with {} events", events.len()); for event in events.iter() { let mtoken = self.tokens[event.token()]; - let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap(); + trace!("Poll returned event {} on machine {} state {:?}", event.token().0, mtoken.0, event.kind()); + let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap(); let removed = { let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; machine.handle(&mut ctx, event); @@ -137,6 +137,8 @@ impl EventLoop { if !removed { self.machines[mtoken] = Some(machine); + } else { + trace!("Removing machine {}", mtoken.0); } } } diff --git a/src/itf_http.rs b/src/itf_http.rs new file mode 100644 index 0000000..f3995cd --- /dev/null +++ b/src/itf_http.rs @@ -0,0 +1,118 @@ +use mio::{Ready,Token,Event}; +use mio::tcp::TcpStream; +use std::net::SocketAddr; +use std::io::ErrorKind; +use eventloop::{Machine,Context}; +use netbuf::Buf; + + +// TODO: timeouts +// TODO: Set a max size on the buffers and provide backpressure +pub struct ItfHttp { + sock: TcpStream, + addr: SocketAddr, + io: Token, + ioreg: Ready, + rbuf: Buf, + wbuf: Buf +} + + +impl ItfHttp { + pub fn new(sock: TcpStream, addr: SocketAddr) -> ItfHttp { + ItfHttp { + sock: sock, + addr: addr, + io: Token(0), + ioreg: Ready::readable(), + rbuf: Buf::new(), + wbuf: Buf::new() + } + } + + fn handle_read(&mut self, ctx: &mut Context) { + match self.rbuf.read_from(&mut self.sock) { + Err(err) => { + match err.kind() { + ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, + _ => { + debug!("{}: Read error: {}", self.addr, err); + self.remove(ctx); + } + } + return; + }, + Ok(0) => { // This behaviour isn't documented, unsure if it's intended. + debug!("{}: Connection closed", self.addr); + self.remove(ctx); + return; + }, + _ => {} + } + // This is where we parse the stuff, generate a request object, throw the request object + // through some handlers to get a response and send it back. But for now let's just act + // like we wanted to be an echo server all along. + self.rbuf.write_to(&mut self.wbuf).unwrap(); + } + + fn handle_write(&mut self, ctx: &mut Context) { + if let Err(err) = self.wbuf.write_to(&mut self.sock) { + match err.kind() { + ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, + _ => { + debug!("{}: Write error: {}", self.addr, err); + self.remove(ctx); + } + } + } + } + + fn set_ioreg(&mut self, ctx: &mut Context) { + // Optimization: If the write buffer was empty before but contains data now, we can already + // try to write it out at this point. This way we can avoid going through the event loop if + // the OS buffers have enough space. + // TODO: Measure how effective this is in practice; if this only tends to happen on the + // first write to a new socket then it might not be worth the effort. + if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) { + self.handle_write(ctx); + if ctx.is_removed() { + return; + } + } + + let mut reg = Ready::readable(); + if self.wbuf.len() > 0 { + reg.insert(Ready::writable()); + } + if reg != self.ioreg { + self.ioreg = reg; + ctx.reregister(&self.sock, self.io, self.ioreg); + } + } + + fn remove(&mut self, ctx: &mut Context) { + ctx.deregister(&self.sock); + ctx.unassign(self.io); + ctx.remove(); + } +} + + +impl Machine for ItfHttp { + fn init(&mut self, ctx: &mut Context) { + self.io = ctx.assign(); + ctx.register(&self.sock, self.io, self.ioreg); + } + + fn handle(&mut self, ctx: &mut Context, ev: Event) { + if ev.kind().is_readable() { + self.handle_read(ctx); + } + if !ctx.is_removed() && ev.kind().is_writable() { + self.handle_write(ctx); + } + if !ctx.is_removed() { + self.set_ioreg(ctx); + } + } +} diff --git a/src/listener.rs b/src/listener.rs index e6683f3..293099e 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -1,13 +1,16 @@ use mio::{Ready,Token,Event}; use mio::tcp::TcpListener; use std::net::SocketAddr; -use std::io::Result; +use std::io::{Result,ErrorKind}; use config::Config; use eventloop::{Machine,Context,EventLoop}; +use itf_http; + struct Listener { sock: TcpListener, + addr: SocketAddr, io: Token } @@ -18,6 +21,7 @@ impl Listener { let sock = try!(TcpListener::bind(addr)); Ok(Listener { sock: sock, + addr: *addr, io: Token(0) }) } @@ -28,12 +32,29 @@ impl Machine for Listener { fn init(&mut self, ctx: &mut Context) { self.io = ctx.assign(); ctx.register(&self.sock, self.io, Ready::readable()); - info!("Listening on {}", self.sock.local_addr().unwrap()); + info!("Listening on {}", self.addr); } - fn handle(&mut self, _: &mut Context, _: Event) { - let(_, addr) = self.sock.accept().unwrap(); // TODO: ERROR HANDLING! - debug!("New connection from {}", addr); + fn handle(&mut self, ctx: &mut Context, _: Event) { + // TODO: max_connections or something + match self.sock.accept() { + Err(err) => { + match err.kind() { + ErrorKind::WouldBlock | + ErrorKind::Interrupted | + ErrorKind::TimedOut => { }, + _ => { + error!("Error accepting on {}: {}", self.addr, err); + // If the error is persistent, we may be getting into an infinite loop. + // TODO: Have a back-off timer here (especially for EMFILE). + } + } + }, + Ok((sock, addr)) => { + debug!("{}: New connection", addr); + ctx.spawn(Box::new(itf_http::ItfHttp::new(sock, addr))); + } + } } } diff --git a/src/main.rs b/src/main.rs index 9a90486..5b990c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,10 +11,12 @@ extern crate env_logger; extern crate getopts; extern crate mio; extern crate slab; +extern crate netbuf; mod config; mod eventloop; mod listener; +mod itf_http; use std::process::exit; use std::io::prelude::*; -- cgit v1.2.3