diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/eventloop.rs | 85 | ||||
-rw-r--r-- | src/iostream.rs | 177 | ||||
-rw-r--r-- | src/itf_http.rs | 115 | ||||
-rw-r--r-- | src/listener.rs | 11 | ||||
-rw-r--r-- | src/main.rs | 1 |
5 files changed, 252 insertions, 137 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs index 9192401..48c5984 100644 --- a/src/eventloop.rs +++ b/src/eventloop.rs @@ -92,7 +92,7 @@ impl From<TToken> for usize { fn from(val: TToken) -> usize { val.0 } } pub struct EventLoop { poll: Poll, - tokens: Slab<MToken, Token>, + regs: Slab<Option<MToken>, Token>, timer: Timer<TToken>, timers: Slab<Option<(MToken,Timeout)>, TToken>, // A machine entry is set to None during a method call on the Machine object. @@ -106,57 +106,59 @@ pub struct Context<'a> { impl<'a> Context<'a> { - // This method is opinionated in always providing a PollOpt::level(). It's the only option that - // makes sense. :) - pub fn register<E: ?Sized>(&mut self, io: &E, interest: mio::Ready) -> Token where E: mio::Evented { - let token = slab_insert(&mut self.parent.tokens, self.machine); - self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap(); - token + pub fn reg_alloc(&mut self) -> Token { + slab_insert(&mut self.parent.regs, None) } - pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { - assert_eq!(self.parent.tokens[token], self.machine); - self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); + pub fn reg_free<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented { + if let Some(Some(_)) = self.parent.regs.remove(token) { + self.parent.poll.deregister(io).unwrap(); + } } - pub fn deregister<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented { - assert_eq!(self.parent.tokens[token], self.machine); - self.parent.tokens.remove(token); - self.parent.poll.deregister(io).unwrap(); + pub fn reg_set<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { + let r = self.parent.regs.entry(token).expect("Invalid token in reg_set").replace(Some(self.machine)); + match r { + None => { self.parent.poll. register(io, token, interest, mio::PollOpt::level()).unwrap(); } + Some(_) => { self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); }, + } + } + + pub fn reg_unset<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented { + let mut r = self.parent.regs.get_mut(token).expect("Invalid token in reg_unset"); + if let Some(_) = *r { + self.parent.poll.deregister(io).unwrap(); + } + *r = None; } - // The _timeout() methods behave the same as the above register methods, and follow the same - // control flow, independent of whether the timeout has passed in between or not: - // 1. set_timeout() or alloc_timeout() to get and initialize a token, - // 2. Followed by any number of reset_timeout() to change the duration of the timer if it is - // still active, or set the new timeout after it has fired. - // 3. cancel_timeout() to stop the timer if it still active, and deallocate the timeout - // Allocates a timeout token but does not set a timeout. The token can be activated with - // reset_timeout() - pub fn alloc_timeout(&mut self) -> TToken { + pub fn timeout_alloc(&mut self) -> TToken { slab_insert(&mut self.parent.timers, None) } - pub fn set_timeout(&mut self, d: Duration) -> TToken { - let token = self.alloc_timeout(); - self.reset_timeout(token, d); - token + pub fn timeout_free(&mut self, token: TToken) { + if let Some(Some((_, ref timeout))) = self.parent.timers.remove(token) { + self.parent.timer.cancel_timeout(timeout); + } } - pub fn reset_timeout(&mut self, token: TToken, d: Duration) { - if let Some((_, ref timeout)) = self.parent.timers[token] { + pub fn timeout_set(&mut self, token: TToken, d: Duration) { + let newval = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap())); + let oldval = self.parent.timers.entry(token).expect("Invalid token in timeout_unset").replace(newval); + if let Some((_, ref timeout)) = oldval { self.parent.timer.cancel_timeout(timeout); } - self.parent.timers[token] = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap())); } - pub fn cancel_timeout(&mut self, token: TToken) { - if let Some( Some((_, timeout)) ) = self.parent.timers.remove(token) { - self.parent.timer.cancel_timeout(&timeout); + pub fn timeout_unset(&mut self, token: TToken) { + let oldval = self.parent.timers.entry(token).expect("Invalid token in timeout_unset").replace(None); + if let Some((_, ref timeout)) = oldval { + self.parent.timer.cancel_timeout(timeout); } } + pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> { self.parent.spawn(f); } @@ -181,7 +183,7 @@ impl EventLoop { EventLoop { poll: poll, - tokens: Slab::with_capacity(16), + regs: Slab::with_capacity(16), timer: timer, timers: Slab::with_capacity(16), machines: Slab::with_capacity(16), @@ -221,7 +223,7 @@ impl EventLoop { } fn dispatch_io(&mut self, event: mio::Event) { - if let Some(&mtoken) = self.tokens.get(event.token()) { + if let Some(&Some(mtoken)) = self.regs.get(event.token()) { self.dispatch(mtoken, |m, ctx| { trace!("[{}] Calling handle for event {} state {:?}", mtoken.0, event.token().0, event.kind()); m.handle(ctx, event) @@ -245,12 +247,13 @@ impl EventLoop { let mut events = Events::with_capacity(64); debug!("Entering event loop"); loop { - self.poll.poll(&mut events, None).unwrap(); - for event in events.iter() { - if event.token() == TIMER_TOKEN { - self.dispatch_timeout(); - } else { - self.dispatch_io(event); + if let Ok(_) = self.poll.poll(&mut events, None) { + for event in events.iter() { + if event.token() == TIMER_TOKEN { + self.dispatch_timeout(); + } else { + self.dispatch_io(event); + } } } } diff --git a/src/iostream.rs b/src/iostream.rs new file mode 100644 index 0000000..22d5ef0 --- /dev/null +++ b/src/iostream.rs @@ -0,0 +1,177 @@ +use mio::{Ready,Token,Event}; +use mio::tcp::TcpStream; +use std::io::{Result,ErrorKind,Error}; +use std::time::Duration; +use netbuf::Buf; + +use eventloop::{Context,TToken}; + + +pub struct IoStream { + pub sock: TcpStream, + pub rbuf: Buf, + pub wbuf: Buf, + io: Token, + ioreg: Ready, + rtime: TToken, + wtime: TToken, + rtimeout: Duration, + wtimeout: Duration, + rmax: usize, + rexpect: bool, +} + + +impl IoStream { + pub fn new(ctx: &mut Context, sock: TcpStream, rtimeout: Duration, wtimeout: Duration, rmax: usize) -> IoStream { + let io = ctx.reg_alloc(); + let rtime = ctx.timeout_alloc(); + ctx.reg_set(&sock, io, Ready::readable()); + ctx.timeout_set(rtime, rtimeout); + IoStream { + sock: sock, + io: io, + ioreg: Ready::readable(), + rbuf: Buf::new(), + wbuf: Buf::new(), + rtime: rtime, + wtime: ctx.timeout_alloc(), + rtimeout: rtimeout, + wtimeout: wtimeout, + rmax: rmax, + rexpect: true, + } + } + + /// Modifies the expect_read flag. If this flag is set, we are expecting to read data from the + /// other end, and will throw an error if nothing is read within the timeout. + /// This flag merely affects the timeout behaviour; we keep trying to read even if we are not + /// expecting any data, in order catch a closed connection early and optimize I/O when + /// receiving multiple requests over one stream. Until the read buffer is full, that is. + #[allow(dead_code)] + pub fn set_expect_read(&mut self, ctx: &mut Context, x: bool) { + self.rexpect = x; + self.set_ioreg(ctx); + } + + /// Updates the I/O registration with the current state and starts/stops timers as necessary. + /// Should be called whenever something is consumed from the read buffer or something is added + /// to the write buffer. + pub fn set_ioreg(&mut self, ctx: &mut Context) { + let oldreg = self.ioreg; + + // Optimization: If the write buffer was empty before but contains data now, we can already + // try to write it out at this point. This way we can avoid going through the event loop if + // the OS buffers have enough space. + // TODO: Measure how effective this is in practice; if this only tends to happen on the + // first write to a new socket then it might not be worth the effort. + if self.wbuf.len() > 0 && !oldreg.contains(Ready::writable()) { + // Ignore errors, if we catch an error we can handle it in the next loop iteration. + let _ = self.wbuf.write_to(&mut self.sock); + } + + let mut reg = Ready::none(); + + if self.rbuf.len() < self.rmax { + reg.insert(Ready::readable()); + if self.rexpect && !oldreg.contains(Ready::readable()) { + ctx.timeout_set(self.rtime, self.rtimeout); + } + } else { + ctx.timeout_unset(self.rtime); + } + + if self.wbuf.len() > 0 { + reg.insert(Ready::writable()); + if !oldreg.contains(Ready::writable()) { + ctx.timeout_set(self.wtime, self.wtimeout); + } + } else { + ctx.timeout_unset(self.wtime); + } + + if reg != oldreg { + if reg == Ready::none() { + ctx.reg_unset(&self.sock, self.io); + } else { + ctx.reg_set(&self.sock, self.io, reg); + } + } + self.ioreg = reg; + } + + fn handle_io<F,G>(&mut self, cond: bool, io: F, tim: G) -> Result<usize> + where F: FnOnce(&mut Self) -> Result<usize>, G: FnOnce(&mut Self) { + if !cond { + return Ok(0); + } + + // Error conversion: + // - Ok(0) is converted into a ConnectionReset error + // - Temporary errors are converted into Ok(0) + match io(self) { + Err(err) => { + match err.kind() { + ErrorKind::WouldBlock | + ErrorKind::Interrupted => { + Ok(0) + }, + _ => { + Err(err) + } + } + }, + Ok(0) => { + Err(Error::new(ErrorKind::ConnectionReset, "Connection reset by peer")) + }, + Ok(n) => { + tim(self); + Ok(n) + } + } + } + + /// Handle an IO event. Returns the (read_bytes, write_bytes) on success. Both can be 0 if the + /// event was not intended for this steam or if the read/write operations didn't feel like + /// doing anything. It's possible that data was read from the stream even if this method + /// returns an error. + pub fn handle(&mut self, ctx: &mut Context, ev: Event) -> Result<(usize, usize)> { + if ev.token() != self.io { + return Ok((0,0)); + } + + let canr = ev.kind().is_readable() && self.rbuf.len() < self.rmax; + let canw = ev.kind().is_writable() && self.wbuf.len() > 0; + + let rd = try!(self.handle_io(canr, + |s|{ s.rbuf.read_from(&mut s.sock) }, + |s|{ if s.rexpect { ctx.timeout_set(s.rtime, s.rtimeout) } } + )); + + let wr = try!(self.handle_io(canw, + |s|{ s.wbuf.write_to(&mut s.sock) }, + |s|{ ctx.timeout_set(s.wtime, s.wtimeout) } + )); + + if rd > 0 || wr > 0 { + self.set_ioreg(ctx); + } + Ok((rd,wr)) + } + + pub fn timeout(&mut self, t: TToken) -> Result<()> { + if t == self.rtime { + Err(Error::new(ErrorKind::TimedOut, "Read timeout")) + } else if t == self.wtime { + Err(Error::new(ErrorKind::TimedOut, "Write timeout")) + } else { + Ok(()) + } + } + + pub fn remove(&mut self, ctx: &mut Context) { + ctx.reg_free(&self.sock, self.io); + ctx.timeout_free(self.rtime); + ctx.timeout_free(self.wtime); + } +} diff --git a/src/itf_http.rs b/src/itf_http.rs index 012323c..0d261a2 100644 --- a/src/itf_http.rs +++ b/src/itf_http.rs @@ -1,31 +1,28 @@ -use mio::{Ready,Token,Event}; +use mio::Event; use mio::tcp::TcpStream; use std::net::SocketAddr; -use std::io::ErrorKind; use std::sync::Arc; -use netbuf::Buf; +use iostream::IoStream; use listener::Interface; use eventloop::{Machine,Context,TToken,Action}; -// TODO: Set a max size on the buffers and provide backpressure pub struct ItfHttp { - sock: TcpStream, + io: IoStream, addr: SocketAddr, - io: Token, - ioreg: Ready, - rbuf: Buf, - wbuf: Buf, - iotimeout: TToken, /* timeout on any form of IO */ itf: Arc<Interface>, } -macro_rules! try_act { - ( $e: expr ) => {{ - if let Some(x) = $e { - return Some(x); +macro_rules! try_rm { + ( $s: expr, $e: expr ) => {{ + match $e { + Err(e) => { + debug!("{}: {}", $s.addr, e); + return Some(Action::Remove); + }, + Ok(v) => v } }}; } @@ -33,100 +30,36 @@ macro_rules! try_act { impl ItfHttp { pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp { - let io = ctx.register(&sock, Ready::readable()); + let io = IoStream::new(ctx, sock, itf.cfg.io_timeout, itf.cfg.io_timeout, 4096); ItfHttp { - sock: sock, - addr: addr, io: io, - ioreg: Ready::readable(), - rbuf: Buf::new(), - wbuf: Buf::new(), - iotimeout: ctx.set_timeout(itf.cfg.io_timeout), + addr: addr, itf: itf, } } +} + + +impl Machine for ItfHttp { + fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> { + try_rm!(self, self.io.handle(ctx, ev)); - fn handle_read(&mut self) -> Option<Action> { - match self.rbuf.read_from(&mut self.sock) { - Err(err) => { - match err.kind() { - ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, - _ => { - debug!("{}: Read error: {}", self.addr, err); - return Some(Action::Remove); - } - } - return None; - }, - Ok(0) => { - debug!("{}: Connection closed", self.addr); - return Some(Action::Remove); - }, - _ => {} - } // This is where we parse the stuff, generate a request object, throw the request object // through some handlers to get a response and send it back. But for now let's just act // like we wanted to be an echo server all along. - self.rbuf.write_to(&mut self.wbuf).unwrap(); - None - } + let _ = self.io.rbuf.write_to(&mut self.io.wbuf); + self.io.set_ioreg(ctx); - fn handle_write(&mut self) -> Option<Action> { - if let Err(err) = self.wbuf.write_to(&mut self.sock) { - match err.kind() { - ErrorKind::WouldBlock | ErrorKind::Interrupted => { }, - _ => { - debug!("{}: Write error: {}", self.addr, err); - return Some(Action::Remove); - } - } - } None } - fn set_ioreg(&mut self, ctx: &mut Context) -> Option<Action> { - // Optimization: If the write buffer was empty before but contains data now, we can already - // try to write it out at this point. This way we can avoid going through the event loop if - // the OS buffers have enough space. - // TODO: Measure how effective this is in practice; if this only tends to happen on the - // first write to a new socket then it might not be worth the effort. - if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) { - try_act!(self.handle_write()); - } - - let mut reg = Ready::readable(); - if self.wbuf.len() > 0 { - reg.insert(Ready::writable()); - } - if reg != self.ioreg { - self.ioreg = reg; - ctx.reregister(&self.sock, self.io, self.ioreg); - } - ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout); + fn timeout(&mut self, _: &mut Context, t: TToken) -> Option<Action> { + try_rm!(self, self.io.timeout(t)); None } -} - - -impl Machine for ItfHttp { - fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> { - if ev.kind().is_readable() { - try_act!(self.handle_read()); - } - if ev.kind().is_writable() { - try_act!(self.handle_write()); - } - self.set_ioreg(ctx) - } - - fn timeout(&mut self, _: &mut Context, _: TToken) -> Option<Action> { - debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs()); - Some(Action::Remove) - } fn remove(&mut self, ctx: &mut Context) { self.itf.release(); - ctx.deregister(&self.sock, self.io); - ctx.cancel_timeout(self.iotimeout); + self.io.remove(ctx); } } diff --git a/src/listener.rs b/src/listener.rs index d6dc45f..77c690a 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -80,12 +80,13 @@ struct Listener { impl Listener { fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpListener, addr: SocketAddr) -> Listener { info!("Listening on {}", addr); - let io = ctx.register(&sock, Ready::readable()); + let io = ctx.reg_alloc(); + ctx.reg_set(&sock, io, Ready::readable()); Listener { sock: sock, addr: addr, io: io, - timeout: ctx.alloc_timeout(), + timeout: ctx.timeout_alloc(), itf: itf, } } @@ -104,8 +105,8 @@ impl Machine for Listener { error!("Error accepting on {}: {}", self.addr, err); // Stop listening for short time. If this error is not persistent (like // EMFILE), it will solve itself by just waiting. Not much else we can do. - ctx.deregister(&self.sock, self.io); - ctx.reset_timeout(self.timeout, Duration::from_millis(500)); + ctx.reg_unset(&self.sock, self.io); + ctx.timeout_set(self.timeout, Duration::from_millis(500)); } } }, @@ -124,7 +125,7 @@ impl Machine for Listener { } fn timeout(&mut self, ctx: &mut Context, _: TToken) -> Option<Action> { - self.io = ctx.register(&self.sock, Ready::readable()); + ctx.reg_set(&self.sock, self.io, Ready::readable()); None } } diff --git a/src/main.rs b/src/main.rs index 5b990c9..8672f4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ extern crate mio; extern crate slab; extern crate netbuf; +mod iostream; mod config; mod eventloop; mod listener; |