From 60e0eaab3674e88c7ff13a82ee44d524dbe59477 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Thu, 8 Sep 2016 13:35:30 +0200 Subject: Add IoStream abstraction + max read buf size + revamp IO & timer registration --- src/iostream.rs | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 src/iostream.rs (limited to 'src/iostream.rs') diff --git a/src/iostream.rs b/src/iostream.rs new file mode 100644 index 0000000..22d5ef0 --- /dev/null +++ b/src/iostream.rs @@ -0,0 +1,177 @@ +use mio::{Ready,Token,Event}; +use mio::tcp::TcpStream; +use std::io::{Result,ErrorKind,Error}; +use std::time::Duration; +use netbuf::Buf; + +use eventloop::{Context,TToken}; + + +pub struct IoStream { + pub sock: TcpStream, + pub rbuf: Buf, + pub wbuf: Buf, + io: Token, + ioreg: Ready, + rtime: TToken, + wtime: TToken, + rtimeout: Duration, + wtimeout: Duration, + rmax: usize, + rexpect: bool, +} + + +impl IoStream { + pub fn new(ctx: &mut Context, sock: TcpStream, rtimeout: Duration, wtimeout: Duration, rmax: usize) -> IoStream { + let io = ctx.reg_alloc(); + let rtime = ctx.timeout_alloc(); + ctx.reg_set(&sock, io, Ready::readable()); + ctx.timeout_set(rtime, rtimeout); + IoStream { + sock: sock, + io: io, + ioreg: Ready::readable(), + rbuf: Buf::new(), + wbuf: Buf::new(), + rtime: rtime, + wtime: ctx.timeout_alloc(), + rtimeout: rtimeout, + wtimeout: wtimeout, + rmax: rmax, + rexpect: true, + } + } + + /// Modifies the expect_read flag. If this flag is set, we are expecting to read data from the + /// other end, and will throw an error if nothing is read within the timeout. + /// This flag merely affects the timeout behaviour; we keep trying to read even if we are not + /// expecting any data, in order catch a closed connection early and optimize I/O when + /// receiving multiple requests over one stream. Until the read buffer is full, that is. + #[allow(dead_code)] + pub fn set_expect_read(&mut self, ctx: &mut Context, x: bool) { + self.rexpect = x; + self.set_ioreg(ctx); + } + + /// Updates the I/O registration with the current state and starts/stops timers as necessary. + /// Should be called whenever something is consumed from the read buffer or something is added + /// to the write buffer. + pub fn set_ioreg(&mut self, ctx: &mut Context) { + let oldreg = self.ioreg; + + // Optimization: If the write buffer was empty before but contains data now, we can already + // try to write it out at this point. This way we can avoid going through the event loop if + // the OS buffers have enough space. + // TODO: Measure how effective this is in practice; if this only tends to happen on the + // first write to a new socket then it might not be worth the effort. + if self.wbuf.len() > 0 && !oldreg.contains(Ready::writable()) { + // Ignore errors, if we catch an error we can handle it in the next loop iteration. + let _ = self.wbuf.write_to(&mut self.sock); + } + + let mut reg = Ready::none(); + + if self.rbuf.len() < self.rmax { + reg.insert(Ready::readable()); + if self.rexpect && !oldreg.contains(Ready::readable()) { + ctx.timeout_set(self.rtime, self.rtimeout); + } + } else { + ctx.timeout_unset(self.rtime); + } + + if self.wbuf.len() > 0 { + reg.insert(Ready::writable()); + if !oldreg.contains(Ready::writable()) { + ctx.timeout_set(self.wtime, self.wtimeout); + } + } else { + ctx.timeout_unset(self.wtime); + } + + if reg != oldreg { + if reg == Ready::none() { + ctx.reg_unset(&self.sock, self.io); + } else { + ctx.reg_set(&self.sock, self.io, reg); + } + } + self.ioreg = reg; + } + + fn handle_io(&mut self, cond: bool, io: F, tim: G) -> Result + where F: FnOnce(&mut Self) -> Result, G: FnOnce(&mut Self) { + if !cond { + return Ok(0); + } + + // Error conversion: + // - Ok(0) is converted into a ConnectionReset error + // - Temporary errors are converted into Ok(0) + match io(self) { + Err(err) => { + match err.kind() { + ErrorKind::WouldBlock | + ErrorKind::Interrupted => { + Ok(0) + }, + _ => { + Err(err) + } + } + }, + Ok(0) => { + Err(Error::new(ErrorKind::ConnectionReset, "Connection reset by peer")) + }, + Ok(n) => { + tim(self); + Ok(n) + } + } + } + + /// Handle an IO event. Returns the (read_bytes, write_bytes) on success. Both can be 0 if the + /// event was not intended for this steam or if the read/write operations didn't feel like + /// doing anything. It's possible that data was read from the stream even if this method + /// returns an error. + pub fn handle(&mut self, ctx: &mut Context, ev: Event) -> Result<(usize, usize)> { + if ev.token() != self.io { + return Ok((0,0)); + } + + let canr = ev.kind().is_readable() && self.rbuf.len() < self.rmax; + let canw = ev.kind().is_writable() && self.wbuf.len() > 0; + + let rd = try!(self.handle_io(canr, + |s|{ s.rbuf.read_from(&mut s.sock) }, + |s|{ if s.rexpect { ctx.timeout_set(s.rtime, s.rtimeout) } } + )); + + let wr = try!(self.handle_io(canw, + |s|{ s.wbuf.write_to(&mut s.sock) }, + |s|{ ctx.timeout_set(s.wtime, s.wtimeout) } + )); + + if rd > 0 || wr > 0 { + self.set_ioreg(ctx); + } + Ok((rd,wr)) + } + + pub fn timeout(&mut self, t: TToken) -> Result<()> { + if t == self.rtime { + Err(Error::new(ErrorKind::TimedOut, "Read timeout")) + } else if t == self.wtime { + Err(Error::new(ErrorKind::TimedOut, "Write timeout")) + } else { + Ok(()) + } + } + + pub fn remove(&mut self, ctx: &mut Context) { + ctx.reg_free(&self.sock, self.io); + ctx.timeout_free(self.rtime); + ctx.timeout_free(self.wtime); + } +} -- cgit v1.2.3