diff options
author | Yorhel <git@yorhel.nl> | 2016-09-06 19:37:34 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2016-09-06 19:37:34 +0200 |
commit | efe595493c9a8dd1e4df3b9045fff012521449c8 (patch) | |
tree | cb1d03aaa426707495c58427c69b81c9f62abaea | |
parent | 39515558ad68edffc6fe52936714fcefbfc58949 (diff) |
Add accept() backoff timer + tcp idle timer + minor eventloop API improvements
-rw-r--r-- | src/eventloop.rs | 169 | ||||
-rw-r--r-- | src/itf_http.rs | 26 | ||||
-rw-r--r-- | src/listener.rs | 25 |
3 files changed, 176 insertions, 44 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs index 84f1e77..1a41762 100644 --- a/src/eventloop.rs +++ b/src/eventloop.rs @@ -2,6 +2,8 @@ * It's similar in spirit to Rotor, but with a bunch of simplifications and other differences. * (Rotor is currently in too much flux and its ecosystem very incomplete/alpha) * + * = Machine -> Eventloop interaction + * * The biggest difference (I think) between Rotor and this is the way it deals with the problem of * communicating actions back from the Machine to the EventLoop. Since the EventLoop is the owner * of the Machine, it is not possible to give a mutable borrow of the EventLoop to a method in the @@ -16,12 +18,34 @@ * Downside: This solution is more prone to logic errors that Rust can't catch, like removing or * re-using the MToken while inside a Machine handler. * + * = Some rants on mio::timer + * + * This implementation wraps mio::timer in a way to provide an API that is similar to IO + * registration. The mio-provided API has a few odd warts that make it hard to use: + * - On registration you get a Timeout object, but on poll you get your custom token + * - On poll you only get your custom token, not the previous Timeout object. In fact, the Timeout + * object has implicitly become invalid. + * - The Timeout object is needed in order to change the timeout or cancel it + * + * This wrapping provides extra overhead because we need to keep track of each timer registration. + * Some other weak points of mio::timer: + * - It spawns a thread in order to signal timeouts. To me it seems much easier and efficient to + * simply provide a timeout value that can be passed to Poll::poll(). + * - It does an up-front allocation for all timer objects, which is a waste of memory if the upper + * bound on the number of timers is much higher than what is actually reached (likely), and + * forces me to calculate this upper bound in the first place. (It also requires me to add + * ugly-looking unwrap()'s to function calls that should never fail) + * + * * TODO: Machines don't care on which thread they run, so as a scalability improvement it's * possible to spawn a configurable number of threads on start-up, run a separate event loop on * each, and assign each Machine to a different event loop. */ use mio; use mio::{Poll,Events,Token}; +use mio::timer::{Timer,Timeout}; +use std; +use std::time::Duration; use slab::Slab; @@ -36,9 +60,16 @@ fn slab_insert<T, I:Into<usize>+From<usize>>(s: &mut Slab<T,I>, t: T) -> I { } +// Reserved token for the timer. Assuming this value is never allocated inside a Slab. +// Note that mio already uses MAX internally, and this is undocumented. +// Such a fragile API. -.- +const TIMER_TOKEN: Token = Token(std::usize::MAX-1); + + pub trait Machine { fn init(&mut self, &mut Context); fn handle(&mut self, &mut Context, mio::Event); + fn timeout(&mut self, &mut Context, TToken); } @@ -46,12 +77,18 @@ pub trait Machine { impl From<usize> for MToken { fn from(val: usize) -> MToken { MToken(val) } } impl From<MToken> for usize { fn from(val: MToken) -> usize { val.0 } } +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TToken(pub usize); +impl From<usize> for TToken { fn from(val: usize) -> TToken { TToken(val) } } +impl From<TToken> for usize { fn from(val: TToken) -> usize { val.0 } } + pub struct EventLoop { poll: Poll, tokens: Slab<MToken, Token>, + timer: Timer<TToken>, + timers: Slab<Option<(MToken,Timeout)>, TToken>, // A machine entry is set to None during a method call on the Machine object. - machines: Slab<Option<Box<Machine>>, MToken> + machines: Slab<Option<Box<Machine>>, MToken>, } pub struct Context<'a> { @@ -62,29 +99,57 @@ pub struct Context<'a> { impl<'a> Context<'a> { - pub fn assign(&mut self) -> Token { - slab_insert(&mut self.parent.tokens, self.machine) - } - - pub fn unassign(&mut self, t: Token) { - assert_eq!(self.parent.tokens[t], self.machine); - self.parent.tokens.remove(t); - } - // This method is opinionated in always providing a PollOpt::level(). It's the only option that // makes sense. :) - pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { + pub fn register<E: ?Sized>(&mut self, io: &E, interest: mio::Ready) -> Token where E: mio::Evented { + let token = slab_insert(&mut self.parent.tokens, self.machine); self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap(); + token } pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { + assert_eq!(self.parent.tokens[token], self.machine); self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); } - pub fn deregister<E: ?Sized>(&mut self, io: &E) where E: mio::Evented { + pub fn deregister<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented { + assert_eq!(self.parent.tokens[token], self.machine); + self.parent.tokens.remove(token); self.parent.poll.deregister(io).unwrap(); } + // The _timeout() methods behave the same as the above register methods, and follow the same + // control flow, independent of whether the timeout has passed in between or not: + // 1. set_timeout() or alloc_timeout() to get and initialize a token, + // 2. Followed by any number of reset_timeout() to change the duration of the timer if it is + // still active, or set the new timeout after it has fired. + // 3. cancel_timeout() to stop the timer if it still active, and deallocate the timeout + + // Allocates a timeout token but does not set a timeout. The token can be activated with + // reset_timeout() + pub fn alloc_timeout(&mut self) -> TToken { + slab_insert(&mut self.parent.timers, None) + } + + pub fn set_timeout(&mut self, d: Duration) -> TToken { + let token = self.alloc_timeout(); + self.reset_timeout(token, d); + token + } + + pub fn reset_timeout(&mut self, token: TToken, d: Duration) { + if let Some((_, ref timeout)) = self.parent.timers[token] { + self.parent.timer.cancel_timeout(timeout); + } + self.parent.timers[token] = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap())); + } + + pub fn cancel_timeout(&mut self, token: TToken) { + if let Some( Some((_, timeout)) ) = self.parent.timers.remove(token) { + self.parent.timer.cancel_timeout(&timeout); + } + } + pub fn remove(&mut self) { self.removed = true; } @@ -101,16 +166,32 @@ impl<'a> Context<'a> { impl EventLoop { pub fn new() -> EventLoop { + let timer = mio::timer::Builder::default() + // Timers are used for (1) aborting a connection or process after some timeout and + // (2) back-off after listen error. (1) will be fine with 1-second precision, and + // (2) should be incredibly rare. The current value is terribly imprecise for a + // generic timer, but should suffice for our limited use case. + .tick_duration(Duration::from_millis(200)) + // TODO: Calculate a more exact upper limit. I'd prefer mio to just resize internal + // structures on demand, but it can't do that (yet). The default value allocates + // 2MB of memory, which is far too wasteful. + .capacity(4_000).build(); + + let poll = Poll::new().unwrap(); + poll.register(&timer, TIMER_TOKEN, mio::Ready::readable(), mio::PollOpt::edge()).unwrap(); + EventLoop { - poll: Poll::new().unwrap(), + poll: poll, tokens: Slab::with_capacity(16), - machines: Slab::with_capacity(32), + timer: timer, + timers: Slab::with_capacity(16), + machines: Slab::with_capacity(16), } } pub fn spawn(&mut self, mut machine: Box<Machine>) { let mtoken = slab_insert(&mut self.machines, None); - trace!("Spawning machine {}", mtoken.0); + trace!("[{}] Spawning machine", mtoken.0); { let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; machine.init(&mut ctx); @@ -119,26 +200,58 @@ impl EventLoop { self.machines[mtoken] = Some(machine); } + // XXX: I don't get why "&mut Machine" doesn't work in the FnOnce. + fn dispatch<F>(&mut self, mtoken: MToken, f: F) + where F: FnOnce(&mut Box<Machine>, &mut Context) { + let mut machine = match self.machines.entry(mtoken) { + None => { return; }, + Some(mut x) => { x.replace(None).unwrap() }, + }; + + let removed = { + let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; + f(&mut machine, &mut ctx); + ctx.removed + }; + + if !removed { + self.machines[mtoken] = Some(machine); + } else { + trace!("[{}] Removing machine", mtoken.0); + } + } + + fn dispatch_io(&mut self, event: mio::Event) { + if let Some(&mtoken) = self.tokens.get(event.token()) { + self.dispatch(mtoken, |m, ctx| { + trace!("[{}] Calling handle for event {} state {:?}", mtoken.0, event.token().0, event.kind()); + m.handle(ctx, event) + }); + } + } + + fn dispatch_timeout(&mut self) { + while let Some(ttoken) = self.timer.poll() { + let mtoken = self.timers.entry(ttoken).and_then(|mut e| { e.replace(None).map(|(t,_)| { t }) }); + if let Some(mtoken) = mtoken { + self.dispatch(mtoken, |m, ctx| { + trace!("[{}] Calling timeout", mtoken.0); + m.timeout(ctx, ttoken) + }); + } + } + } + pub fn run(&mut self) { let mut events = Events::with_capacity(64); debug!("Entering event loop"); loop { self.poll.poll(&mut events, None).unwrap(); for event in events.iter() { - let mtoken = self.tokens[event.token()]; - trace!("Poll returned event {} on machine {} state {:?}", event.token().0, mtoken.0, event.kind()); - - let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap(); - let removed = { - let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; - machine.handle(&mut ctx, event); - ctx.removed - }; - - if !removed { - self.machines[mtoken] = Some(machine); + if event.token() == TIMER_TOKEN { + self.dispatch_timeout(); } else { - trace!("Removing machine {}", mtoken.0); + self.dispatch_io(event); } } } diff --git a/src/itf_http.rs b/src/itf_http.rs index f3995cd..f234740 100644 --- a/src/itf_http.rs +++ b/src/itf_http.rs @@ -2,11 +2,11 @@ use mio::{Ready,Token,Event}; use mio::tcp::TcpStream; use std::net::SocketAddr; use std::io::ErrorKind; -use eventloop::{Machine,Context}; +use std::time::Duration; +use eventloop::{Machine,Context,TToken}; use netbuf::Buf; -// TODO: timeouts // TODO: Set a max size on the buffers and provide backpressure pub struct ItfHttp { sock: TcpStream, @@ -14,7 +14,9 @@ pub struct ItfHttp { io: Token, ioreg: Ready, rbuf: Buf, - wbuf: Buf + wbuf: Buf, + iotimeout: TToken, /* timeout on any form of IO */ + iotimeout_val: Duration, } @@ -26,7 +28,9 @@ impl ItfHttp { io: Token(0), ioreg: Ready::readable(), rbuf: Buf::new(), - wbuf: Buf::new() + wbuf: Buf::new(), + iotimeout: TToken(0), + iotimeout_val: Duration::from_secs(10) // TODO: Make configurable & increase } } @@ -88,11 +92,12 @@ impl ItfHttp { self.ioreg = reg; ctx.reregister(&self.sock, self.io, self.ioreg); } + ctx.reset_timeout(self.iotimeout, self.iotimeout_val); } fn remove(&mut self, ctx: &mut Context) { - ctx.deregister(&self.sock); - ctx.unassign(self.io); + ctx.deregister(&self.sock, self.io); + ctx.cancel_timeout(self.iotimeout); ctx.remove(); } } @@ -100,8 +105,8 @@ impl ItfHttp { impl Machine for ItfHttp { fn init(&mut self, ctx: &mut Context) { - self.io = ctx.assign(); - ctx.register(&self.sock, self.io, self.ioreg); + self.io = ctx.register(&self.sock, self.ioreg); + self.iotimeout = ctx.set_timeout(self.iotimeout_val); } fn handle(&mut self, ctx: &mut Context, ev: Event) { @@ -115,4 +120,9 @@ impl Machine for ItfHttp { self.set_ioreg(ctx); } } + + fn timeout(&mut self, ctx: &mut Context, _: TToken) { + debug!("{}: No IO activity in {} seconds", self.addr, self.iotimeout_val.as_secs()); + self.remove(ctx); + } } diff --git a/src/listener.rs b/src/listener.rs index 293099e..fcac06d 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -2,8 +2,9 @@ use mio::{Ready,Token,Event}; use mio::tcp::TcpListener; use std::net::SocketAddr; use std::io::{Result,ErrorKind}; +use std::time::Duration; use config::Config; -use eventloop::{Machine,Context,EventLoop}; +use eventloop::{Machine,Context,EventLoop,TToken}; use itf_http; @@ -11,7 +12,8 @@ use itf_http; struct Listener { sock: TcpListener, addr: SocketAddr, - io: Token + io: Token, + timeout: TToken, } @@ -22,7 +24,8 @@ impl Listener { Ok(Listener { sock: sock, addr: *addr, - io: Token(0) + io: Token(0), + timeout: TToken(0), }) } } @@ -30,8 +33,8 @@ impl Listener { impl Machine for Listener { fn init(&mut self, ctx: &mut Context) { - self.io = ctx.assign(); - ctx.register(&self.sock, self.io, Ready::readable()); + self.io = ctx.register(&self.sock, Ready::readable()); + self.timeout = ctx.alloc_timeout(); info!("Listening on {}", self.addr); } @@ -42,11 +45,13 @@ impl Machine for Listener { match err.kind() { ErrorKind::WouldBlock | ErrorKind::Interrupted | - ErrorKind::TimedOut => { }, + ErrorKind::TimedOut => { /* Uninteresting temporary errors */ }, _ => { error!("Error accepting on {}: {}", self.addr, err); - // If the error is persistent, we may be getting into an infinite loop. - // TODO: Have a back-off timer here (especially for EMFILE). + // Stop listening for short time. If this error is not persistent (like + // EMFILE), it will solve itself by just waiting. Not much else we can do. + ctx.deregister(&self.sock, self.io); + ctx.reset_timeout(self.timeout, Duration::from_millis(500)); } } }, @@ -56,6 +61,10 @@ impl Machine for Listener { } } } + + fn timeout(&mut self, ctx: &mut Context, _: TToken) { + self.io = ctx.register(&self.sock, Ready::readable()); + } } |