/* This is a thin wrapper around mio for building state machines on a shared event loop. * It's similar in spirit to Rotor, but with a bunch of simplifications and other differences. * (Rotor is currently in too much flux and its ecosystem very incomplete/alpha) * * = Machine -> Eventloop interaction * * The biggest difference (I think) between Rotor and this is the way it deals with the problem of * communicating actions back from the Machine to the EventLoop. Since the EventLoop is the owner * of the Machine, it is not possible to give a mutable borrow of the EventLoop to a method in the * Machine, as that what will result in a double mutable borrow. This problem is inherent in the * architecture and Rust is correct to disallow the borrow; since it would be possible to mutate * the Machine object through multiple aliases. * * Rotor solves this problem by having the method handlers return a "response", i.e. an action that * it wants the EventLoop to perform. This implementation is more flexible but slightly more hacky: * When calling a method in the Machine, the Machine object itself is temporarily removed from the * EventLoop object, thus removing the alias and allowing both objects to be Mutably borrowed. * Downside: This solution is more prone to logic errors that Rust can't catch, like removing or * re-using the MToken while inside a Machine handler. * * = Some rants on mio::timer * * This implementation wraps mio::timer in a way to provide an API that is similar to IO * registration. The mio-provided API has a few odd warts that make it hard to use: * - On registration you get a Timeout object, but on poll you get your custom token * - On poll you only get your custom token, not the previous Timeout object. In fact, the Timeout * object has implicitly become invalid. * - The Timeout object is needed in order to change the timeout or cancel it * * This wrapping provides extra overhead because we need to keep track of each timer registration. * Some other weak points of mio::timer: * - It spawns a thread in order to signal timeouts. To me it seems much easier and efficient to * simply provide a timeout value that can be passed to Poll::poll(). * - It does an up-front allocation for all timer objects, which is a waste of memory if the upper * bound on the number of timers is much higher than what is actually reached (likely), and * forces me to calculate this upper bound in the first place. (It also requires me to add * ugly-looking unwrap()'s to function calls that should never fail) * * * TODO: Machines don't care on which thread they run, so as a scalability improvement it's * possible to spawn a configurable number of threads on start-up, run a separate event loop on * each, and assign each Machine to a different event loop. */ use mio; use mio::{Poll,Events,Token}; use mio::timer::{Timer,Timeout}; use std; use std::time::Duration; use slab::Slab; // TODO: Upstream this to Slab crate? fn slab_insert+From>(s: &mut Slab, t: T) -> I { s.insert(t).or_else(|t| { // Grow by some small fixed number. From what I can see the Slab implementation already // does exponential growth internally. s.reserve_exact(8); s.insert(t) }).unwrap_or_else(|_| { unreachable!() }) } // Reserved token for the timer. Assuming this value is never allocated inside a Slab. // Note that mio already uses MAX internally, and this is undocumented. // Such a fragile API. -.- const TIMER_TOKEN: Token = Token(std::usize::MAX-1); pub trait Machine { fn handle(&mut self, &mut Context, mio::Event); fn timeout(&mut self, &mut Context, TToken); } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct MToken(pub usize); impl From for MToken { fn from(val: usize) -> MToken { MToken(val) } } impl From for usize { fn from(val: MToken) -> usize { val.0 } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TToken(pub usize); impl From for TToken { fn from(val: usize) -> TToken { TToken(val) } } impl From for usize { fn from(val: TToken) -> usize { val.0 } } pub struct EventLoop { poll: Poll, tokens: Slab, timer: Timer, timers: Slab, TToken>, // A machine entry is set to None during a method call on the Machine object. machines: Slab>, MToken>, } pub struct Context<'a> { parent: &'a mut EventLoop, machine: MToken, removed: bool } impl<'a> Context<'a> { // This method is opinionated in always providing a PollOpt::level(). It's the only option that // makes sense. :) pub fn register(&mut self, io: &E, interest: mio::Ready) -> Token where E: mio::Evented { let token = slab_insert(&mut self.parent.tokens, self.machine); self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap(); token } pub fn reregister(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { assert_eq!(self.parent.tokens[token], self.machine); self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); } pub fn deregister(&mut self, io: &E, token: Token) where E: mio::Evented { assert_eq!(self.parent.tokens[token], self.machine); self.parent.tokens.remove(token); self.parent.poll.deregister(io).unwrap(); } // The _timeout() methods behave the same as the above register methods, and follow the same // control flow, independent of whether the timeout has passed in between or not: // 1. set_timeout() or alloc_timeout() to get and initialize a token, // 2. Followed by any number of reset_timeout() to change the duration of the timer if it is // still active, or set the new timeout after it has fired. // 3. cancel_timeout() to stop the timer if it still active, and deallocate the timeout // Allocates a timeout token but does not set a timeout. The token can be activated with // reset_timeout() pub fn alloc_timeout(&mut self) -> TToken { slab_insert(&mut self.parent.timers, None) } pub fn set_timeout(&mut self, d: Duration) -> TToken { let token = self.alloc_timeout(); self.reset_timeout(token, d); token } pub fn reset_timeout(&mut self, token: TToken, d: Duration) { if let Some((_, ref timeout)) = self.parent.timers[token] { self.parent.timer.cancel_timeout(timeout); } self.parent.timers[token] = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap())); } pub fn cancel_timeout(&mut self, token: TToken) { if let Some( Some((_, timeout)) ) = self.parent.timers.remove(token) { self.parent.timer.cancel_timeout(&timeout); } } pub fn remove(&mut self) { self.removed = true; } pub fn is_removed(&self) -> bool { self.removed } pub fn spawn(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box { self.parent.spawn(f); } } impl EventLoop { pub fn new() -> EventLoop { let timer = mio::timer::Builder::default() // Timers are used for (1) aborting a connection or process after some timeout and // (2) back-off after listen error. (1) will be fine with 1-second precision, and // (2) should be incredibly rare. The current value is terribly imprecise for a // generic timer, but should suffice for our limited use case. .tick_duration(Duration::from_millis(200)) // TODO: Calculate a more exact upper limit. I'd prefer mio to just resize internal // structures on demand, but it can't do that (yet). The default value allocates // 2MB of memory, which is far too wasteful. .capacity(4_000).build(); let poll = Poll::new().unwrap(); poll.register(&timer, TIMER_TOKEN, mio::Ready::readable(), mio::PollOpt::edge()).unwrap(); EventLoop { poll: poll, tokens: Slab::with_capacity(16), timer: timer, timers: Slab::with_capacity(16), machines: Slab::with_capacity(16), } } pub fn spawn(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box { let mtoken = slab_insert(&mut self.machines, None); trace!("[{}] Spawning machine", mtoken.0); let machine = { let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; let m = f(&mut ctx); assert!(!ctx.removed); m }; self.machines[mtoken] = Some(machine); } // XXX: I don't get why "&mut Machine" doesn't work in the FnOnce. fn dispatch(&mut self, mtoken: MToken, f: F) where F: FnOnce(&mut Box, &mut Context) { let mut machine = match self.machines.entry(mtoken) { None => { return; }, Some(mut x) => { x.replace(None).unwrap() }, }; let removed = { let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; f(&mut machine, &mut ctx); ctx.removed }; if !removed { self.machines[mtoken] = Some(machine); } else { trace!("[{}] Removing machine", mtoken.0); } } fn dispatch_io(&mut self, event: mio::Event) { if let Some(&mtoken) = self.tokens.get(event.token()) { self.dispatch(mtoken, |m, ctx| { trace!("[{}] Calling handle for event {} state {:?}", mtoken.0, event.token().0, event.kind()); m.handle(ctx, event) }); } } fn dispatch_timeout(&mut self) { while let Some(ttoken) = self.timer.poll() { let mtoken = self.timers.entry(ttoken).and_then(|mut e| { e.replace(None).map(|(t,_)| { t }) }); if let Some(mtoken) = mtoken { self.dispatch(mtoken, |m, ctx| { trace!("[{}] Calling timeout", mtoken.0); m.timeout(ctx, ttoken) }); } } } pub fn run(&mut self) { let mut events = Events::with_capacity(64); debug!("Entering event loop"); loop { self.poll.poll(&mut events, None).unwrap(); for event in events.iter() { if event.token() == TIMER_TOKEN { self.dispatch_timeout(); } else { self.dispatch_io(event); } } } } }