use mio::{Ready,Token,Event}; use mio::tcp::TcpStream; use std::net::SocketAddr; use std::io::ErrorKind; use listener::Interface; use eventloop::{Machine,Context,TToken}; use std::sync::Arc; use netbuf::Buf; // TODO: Set a max size on the buffers and provide backpressure pub struct ItfHttp { sock: TcpStream, addr: SocketAddr, io: Token, ioreg: Ready, rbuf: Buf, wbuf: Buf, iotimeout: TToken, /* timeout on any form of IO */ itf: Arc, } impl ItfHttp { pub fn new(ctx: &mut Context, itf: Arc, sock: TcpStream, addr: SocketAddr) -> ItfHttp { let io = ctx.register(&sock, Ready::readable()); ItfHttp { sock: sock, addr: addr, io: io, ioreg: Ready::readable(), rbuf: Buf::new(), wbuf: Buf::new(), iotimeout: ctx.set_timeout(itf.cfg.io_timeout), itf: itf, } } fn handle_read(&mut self, ctx: &mut Context) { match self.rbuf.read_from(&mut self.sock) { Err(err) => { match err.kind() { ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, _ => { debug!("{}: Read error: {}", self.addr, err); self.remove(ctx); } } return; }, Ok(0) => { debug!("{}: Connection closed", self.addr); self.remove(ctx); return; }, _ => {} } // This is where we parse the stuff, generate a request object, throw the request object // through some handlers to get a response and send it back. But for now let's just act // like we wanted to be an echo server all along. self.rbuf.write_to(&mut self.wbuf).unwrap(); } fn handle_write(&mut self, ctx: &mut Context) { if let Err(err) = self.wbuf.write_to(&mut self.sock) { match err.kind() { ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, _ => { debug!("{}: Write error: {}", self.addr, err); self.remove(ctx); } } } } fn set_ioreg(&mut self, ctx: &mut Context) { // Optimization: If the write buffer was empty before but contains data now, we can already // try to write it out at this point. This way we can avoid going through the event loop if // the OS buffers have enough space. // TODO: Measure how effective this is in practice; if this only tends to happen on the // first write to a new socket then it might not be worth the effort. if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) { self.handle_write(ctx); if ctx.is_removed() { return; } } let mut reg = Ready::readable(); if self.wbuf.len() > 0 { reg.insert(Ready::writable()); } if reg != self.ioreg { self.ioreg = reg; ctx.reregister(&self.sock, self.io, self.ioreg); } ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout); } fn remove(&mut self, ctx: &mut Context) { self.itf.release(); ctx.deregister(&self.sock, self.io); ctx.cancel_timeout(self.iotimeout); ctx.remove(); } } impl Machine for ItfHttp { fn handle(&mut self, ctx: &mut Context, ev: Event) { if ev.kind().is_readable() { self.handle_read(ctx); } if !ctx.is_removed() && ev.kind().is_writable() { self.handle_write(ctx); } if !ctx.is_removed() { self.set_ioreg(ctx); } } fn timeout(&mut self, ctx: &mut Context, _: TToken) { debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs()); self.remove(ctx); } }