use mio::{Ready,Token,Event}; use mio::tcp::TcpStream; use std::io::{Result,ErrorKind,Error}; use std::time::Duration; use netbuf::Buf; use eventloop::{Context,TToken}; pub struct IoStream { pub sock: TcpStream, pub rbuf: Buf, pub wbuf: Buf, io: Token, ioreg: Ready, rtime: TToken, wtime: TToken, rtimeout: Duration, wtimeout: Duration, rmax: usize, rexpect: bool, } impl IoStream { pub fn new(ctx: &mut Context, sock: TcpStream, rtimeout: Duration, wtimeout: Duration, rmax: usize) -> IoStream { let io = ctx.reg_alloc(); let rtime = ctx.timeout_alloc(); ctx.reg_set(&sock, io, Ready::readable()); ctx.timeout_set(rtime, rtimeout); IoStream { sock: sock, io: io, ioreg: Ready::readable(), rbuf: Buf::new(), wbuf: Buf::new(), rtime: rtime, wtime: ctx.timeout_alloc(), rtimeout: rtimeout, wtimeout: wtimeout, rmax: rmax, rexpect: true, } } /// Modifies the expect_read flag. If this flag is set, we are expecting to read data from the /// other end, and will throw an error if nothing is read within the timeout. /// This flag merely affects the timeout behaviour; we keep trying to read even if we are not /// expecting any data, in order catch a closed connection early and optimize I/O when /// receiving multiple requests over one stream. Until the read buffer is full, that is. #[allow(dead_code)] pub fn set_expect_read(&mut self, ctx: &mut Context, x: bool) { self.rexpect = x; self.set_ioreg(ctx); } /// Updates the I/O registration with the current state and starts/stops timers as necessary. /// Should be called whenever something is consumed from the read buffer or something is added /// to the write buffer. pub fn set_ioreg(&mut self, ctx: &mut Context) { let oldreg = self.ioreg; // Optimization: If the write buffer was empty before but contains data now, we can already // try to write it out at this point. This way we can avoid going through the event loop if // the OS buffers have enough space. // TODO: Measure how effective this is in practice; if this only tends to happen on the // first write to a new socket then it might not be worth the effort. if self.wbuf.len() > 0 && !oldreg.contains(Ready::writable()) { // Ignore errors, if we catch an error we can handle it in the next loop iteration. let _ = self.wbuf.write_to(&mut self.sock); } let mut reg = Ready::none(); if self.rbuf.len() < self.rmax { reg.insert(Ready::readable()); if self.rexpect && !oldreg.contains(Ready::readable()) { ctx.timeout_set(self.rtime, self.rtimeout); } } else { ctx.timeout_unset(self.rtime); } if self.wbuf.len() > 0 { reg.insert(Ready::writable()); if !oldreg.contains(Ready::writable()) { ctx.timeout_set(self.wtime, self.wtimeout); } } else { ctx.timeout_unset(self.wtime); } if reg != oldreg { if reg == Ready::none() { ctx.reg_unset(&self.sock, self.io); } else { ctx.reg_set(&self.sock, self.io, reg); } } self.ioreg = reg; } fn handle_io(&mut self, cond: bool, io: F, tim: G) -> Result where F: FnOnce(&mut Self) -> Result, G: FnOnce(&mut Self) { if !cond { return Ok(0); } // Error conversion: // - Ok(0) is converted into a ConnectionReset error // - Temporary errors are converted into Ok(0) match io(self) { Err(err) => { match err.kind() { ErrorKind::WouldBlock | ErrorKind::Interrupted => { Ok(0) }, _ => { Err(err) } } }, Ok(0) => { Err(Error::new(ErrorKind::ConnectionReset, "Connection reset by peer")) }, Ok(n) => { tim(self); Ok(n) } } } /// Handle an IO event. Returns the (read_bytes, write_bytes) on success. Both can be 0 if the /// event was not intended for this steam or if the read/write operations didn't feel like /// doing anything. It's possible that data was read from the stream even if this method /// returns an error. pub fn handle(&mut self, ctx: &mut Context, ev: Event) -> Result<(usize, usize)> { if ev.token() != self.io { return Ok((0,0)); } let canr = ev.kind().is_readable() && self.rbuf.len() < self.rmax; let canw = ev.kind().is_writable() && self.wbuf.len() > 0; let rd = try!(self.handle_io(canr, |s|{ s.rbuf.read_from(&mut s.sock) }, |s|{ if s.rexpect { ctx.timeout_set(s.rtime, s.rtimeout) } } )); let wr = try!(self.handle_io(canw, |s|{ s.wbuf.write_to(&mut s.sock) }, |s|{ ctx.timeout_set(s.wtime, s.wtimeout) } )); if rd > 0 || wr > 0 { self.set_ioreg(ctx); } Ok((rd,wr)) } pub fn timeout(&mut self, t: TToken) -> Result<()> { if t == self.rtime { Err(Error::new(ErrorKind::TimedOut, "Read timeout")) } else if t == self.wtime { Err(Error::new(ErrorKind::TimedOut, "Write timeout")) } else { Ok(()) } } pub fn remove(&mut self, ctx: &mut Context) { ctx.reg_free(&self.sock, self.io); ctx.timeout_free(self.rtime); ctx.timeout_free(self.wtime); } }