diff options
Diffstat (limited to 'src/itf_http.rs')
-rw-r--r-- | src/itf_http.rs | 115 |
1 files changed, 24 insertions, 91 deletions
diff --git a/src/itf_http.rs b/src/itf_http.rs index 012323c..0d261a2 100644 --- a/src/itf_http.rs +++ b/src/itf_http.rs @@ -1,31 +1,28 @@ -use mio::{Ready,Token,Event}; +use mio::Event; use mio::tcp::TcpStream; use std::net::SocketAddr; -use std::io::ErrorKind; use std::sync::Arc; -use netbuf::Buf; +use iostream::IoStream; use listener::Interface; use eventloop::{Machine,Context,TToken,Action}; -// TODO: Set a max size on the buffers and provide backpressure pub struct ItfHttp { - sock: TcpStream, + io: IoStream, addr: SocketAddr, - io: Token, - ioreg: Ready, - rbuf: Buf, - wbuf: Buf, - iotimeout: TToken, /* timeout on any form of IO */ itf: Arc<Interface>, } -macro_rules! try_act { - ( $e: expr ) => {{ - if let Some(x) = $e { - return Some(x); +macro_rules! try_rm { + ( $s: expr, $e: expr ) => {{ + match $e { + Err(e) => { + debug!("{}: {}", $s.addr, e); + return Some(Action::Remove); + }, + Ok(v) => v } }}; } @@ -33,100 +30,36 @@ macro_rules! try_act { impl ItfHttp { pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp { - let io = ctx.register(&sock, Ready::readable()); + let io = IoStream::new(ctx, sock, itf.cfg.io_timeout, itf.cfg.io_timeout, 4096); ItfHttp { - sock: sock, - addr: addr, io: io, - ioreg: Ready::readable(), - rbuf: Buf::new(), - wbuf: Buf::new(), - iotimeout: ctx.set_timeout(itf.cfg.io_timeout), + addr: addr, itf: itf, } } +} + + +impl Machine for ItfHttp { + fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> { + try_rm!(self, self.io.handle(ctx, ev)); - fn handle_read(&mut self) -> Option<Action> { - match self.rbuf.read_from(&mut self.sock) { - Err(err) => { - match err.kind() { - ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, - _ => { - debug!("{}: Read error: {}", self.addr, err); - return Some(Action::Remove); - } - } - return None; - }, - Ok(0) => { - debug!("{}: Connection closed", self.addr); - return Some(Action::Remove); - }, - _ => {} - } // This is where we parse the stuff, generate a request object, throw the request object // through some handlers to get a response and send it back. But for now let's just act // like we wanted to be an echo server all along. - self.rbuf.write_to(&mut self.wbuf).unwrap(); - None - } + let _ = self.io.rbuf.write_to(&mut self.io.wbuf); + self.io.set_ioreg(ctx); - fn handle_write(&mut self) -> Option<Action> { - if let Err(err) = self.wbuf.write_to(&mut self.sock) { - match err.kind() { - ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, - _ => { - debug!("{}: Write error: {}", self.addr, err); - return Some(Action::Remove); - } - } - } None } - fn set_ioreg(&mut self, ctx: &mut Context) -> Option<Action> { - // Optimization: If the write buffer was empty before but contains data now, we can already - // try to write it out at this point. This way we can avoid going through the event loop if - // the OS buffers have enough space. - // TODO: Measure how effective this is in practice; if this only tends to happen on the - // first write to a new socket then it might not be worth the effort. - if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) { - try_act!(self.handle_write()); - } - - let mut reg = Ready::readable(); - if self.wbuf.len() > 0 { - reg.insert(Ready::writable()); - } - if reg != self.ioreg { - self.ioreg = reg; - ctx.reregister(&self.sock, self.io, self.ioreg); - } - ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout); + fn timeout(&mut self, _: &mut Context, t: TToken) -> Option<Action> { + try_rm!(self, self.io.timeout(t)); None } -} - - -impl Machine for ItfHttp { - fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> { - if ev.kind().is_readable() { - try_act!(self.handle_read()); - } - if ev.kind().is_writable() { - try_act!(self.handle_write()); - } - self.set_ioreg(ctx) - } - - fn timeout(&mut self, _: &mut Context, _: TToken) -> Option<Action> { - debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs()); - Some(Action::Remove) - } fn remove(&mut self, ctx: &mut Context) { self.itf.release(); - ctx.deregister(&self.sock, self.io); - ctx.cancel_timeout(self.iotimeout); + self.io.remove(ctx); } } |