summaryrefslogtreecommitdiff
path: root/src/iostream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/iostream.rs')
-rw-r--r--src/iostream.rs177
1 files changed, 177 insertions, 0 deletions
diff --git a/src/iostream.rs b/src/iostream.rs
new file mode 100644
index 0000000..22d5ef0
--- /dev/null
+++ b/src/iostream.rs
@@ -0,0 +1,177 @@
+use mio::{Ready,Token,Event};
+use mio::tcp::TcpStream;
+use std::io::{Result,ErrorKind,Error};
+use std::time::Duration;
+use netbuf::Buf;
+
+use eventloop::{Context,TToken};
+
+
+pub struct IoStream {
+ pub sock: TcpStream,
+ pub rbuf: Buf,
+ pub wbuf: Buf,
+ io: Token,
+ ioreg: Ready,
+ rtime: TToken,
+ wtime: TToken,
+ rtimeout: Duration,
+ wtimeout: Duration,
+ rmax: usize,
+ rexpect: bool,
+}
+
+
+impl IoStream {
+ pub fn new(ctx: &mut Context, sock: TcpStream, rtimeout: Duration, wtimeout: Duration, rmax: usize) -> IoStream {
+ let io = ctx.reg_alloc();
+ let rtime = ctx.timeout_alloc();
+ ctx.reg_set(&sock, io, Ready::readable());
+ ctx.timeout_set(rtime, rtimeout);
+ IoStream {
+ sock: sock,
+ io: io,
+ ioreg: Ready::readable(),
+ rbuf: Buf::new(),
+ wbuf: Buf::new(),
+ rtime: rtime,
+ wtime: ctx.timeout_alloc(),
+ rtimeout: rtimeout,
+ wtimeout: wtimeout,
+ rmax: rmax,
+ rexpect: true,
+ }
+ }
+
+ /// Modifies the expect_read flag. If this flag is set, we are expecting to read data from the
+ /// other end, and will throw an error if nothing is read within the timeout.
+ /// This flag merely affects the timeout behaviour; we keep trying to read even if we are not
+ /// expecting any data, in order catch a closed connection early and optimize I/O when
+ /// receiving multiple requests over one stream. Until the read buffer is full, that is.
+ #[allow(dead_code)]
+ pub fn set_expect_read(&mut self, ctx: &mut Context, x: bool) {
+ self.rexpect = x;
+ self.set_ioreg(ctx);
+ }
+
+ /// Updates the I/O registration with the current state and starts/stops timers as necessary.
+ /// Should be called whenever something is consumed from the read buffer or something is added
+ /// to the write buffer.
+ pub fn set_ioreg(&mut self, ctx: &mut Context) {
+ let oldreg = self.ioreg;
+
+ // Optimization: If the write buffer was empty before but contains data now, we can already
+ // try to write it out at this point. This way we can avoid going through the event loop if
+ // the OS buffers have enough space.
+ // TODO: Measure how effective this is in practice; if this only tends to happen on the
+ // first write to a new socket then it might not be worth the effort.
+ if self.wbuf.len() > 0 && !oldreg.contains(Ready::writable()) {
+ // Ignore errors, if we catch an error we can handle it in the next loop iteration.
+ let _ = self.wbuf.write_to(&mut self.sock);
+ }
+
+ let mut reg = Ready::none();
+
+ if self.rbuf.len() < self.rmax {
+ reg.insert(Ready::readable());
+ if self.rexpect && !oldreg.contains(Ready::readable()) {
+ ctx.timeout_set(self.rtime, self.rtimeout);
+ }
+ } else {
+ ctx.timeout_unset(self.rtime);
+ }
+
+ if self.wbuf.len() > 0 {
+ reg.insert(Ready::writable());
+ if !oldreg.contains(Ready::writable()) {
+ ctx.timeout_set(self.wtime, self.wtimeout);
+ }
+ } else {
+ ctx.timeout_unset(self.wtime);
+ }
+
+ if reg != oldreg {
+ if reg == Ready::none() {
+ ctx.reg_unset(&self.sock, self.io);
+ } else {
+ ctx.reg_set(&self.sock, self.io, reg);
+ }
+ }
+ self.ioreg = reg;
+ }
+
+ fn handle_io<F,G>(&mut self, cond: bool, io: F, tim: G) -> Result<usize>
+ where F: FnOnce(&mut Self) -> Result<usize>, G: FnOnce(&mut Self) {
+ if !cond {
+ return Ok(0);
+ }
+
+ // Error conversion:
+ // - Ok(0) is converted into a ConnectionReset error
+ // - Temporary errors are converted into Ok(0)
+ match io(self) {
+ Err(err) => {
+ match err.kind() {
+ ErrorKind::WouldBlock |
+ ErrorKind::Interrupted => {
+ Ok(0)
+ },
+ _ => {
+ Err(err)
+ }
+ }
+ },
+ Ok(0) => {
+ Err(Error::new(ErrorKind::ConnectionReset, "Connection reset by peer"))
+ },
+ Ok(n) => {
+ tim(self);
+ Ok(n)
+ }
+ }
+ }
+
+ /// Handle an IO event. Returns the (read_bytes, write_bytes) on success. Both can be 0 if the
+ /// event was not intended for this steam or if the read/write operations didn't feel like
+ /// doing anything. It's possible that data was read from the stream even if this method
+ /// returns an error.
+ pub fn handle(&mut self, ctx: &mut Context, ev: Event) -> Result<(usize, usize)> {
+ if ev.token() != self.io {
+ return Ok((0,0));
+ }
+
+ let canr = ev.kind().is_readable() && self.rbuf.len() < self.rmax;
+ let canw = ev.kind().is_writable() && self.wbuf.len() > 0;
+
+ let rd = try!(self.handle_io(canr,
+ |s|{ s.rbuf.read_from(&mut s.sock) },
+ |s|{ if s.rexpect { ctx.timeout_set(s.rtime, s.rtimeout) } }
+ ));
+
+ let wr = try!(self.handle_io(canw,
+ |s|{ s.wbuf.write_to(&mut s.sock) },
+ |s|{ ctx.timeout_set(s.wtime, s.wtimeout) }
+ ));
+
+ if rd > 0 || wr > 0 {
+ self.set_ioreg(ctx);
+ }
+ Ok((rd,wr))
+ }
+
+ pub fn timeout(&mut self, t: TToken) -> Result<()> {
+ if t == self.rtime {
+ Err(Error::new(ErrorKind::TimedOut, "Read timeout"))
+ } else if t == self.wtime {
+ Err(Error::new(ErrorKind::TimedOut, "Write timeout"))
+ } else {
+ Ok(())
+ }
+ }
+
+ pub fn remove(&mut self, ctx: &mut Context) {
+ ctx.reg_free(&self.sock, self.io);
+ ctx.timeout_free(self.rtime);
+ ctx.timeout_free(self.wtime);
+ }
+}