diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/config.rs | 4 | ||||
-rw-r--r-- | src/eventloop.rs | 144 | ||||
-rw-r--r-- | src/listener.rs | 49 | ||||
-rw-r--r-- | src/main.rs | 15 |
4 files changed, 206 insertions, 6 deletions
diff --git a/src/config.rs b/src/config.rs index ea3a14a..c3882d8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -42,12 +42,12 @@ named!(parse_expr<&[u8],Expr>, alt!( #[derive(Debug,Clone,Default)] pub struct Interface { - addr: Vec<SocketAddr> + pub addr: Vec<SocketAddr> } #[derive(Debug,Clone,Default)] pub struct Config { - itf: Vec<Interface> + pub itf: Vec<Interface> } diff --git a/src/eventloop.rs b/src/eventloop.rs new file mode 100644 index 0000000..2cf8ecd --- /dev/null +++ b/src/eventloop.rs @@ -0,0 +1,144 @@ +/* This is a thin wrapper around mio for building state machines on a shared event loop. + * It's similar in spirit to Rotor, but with a bunch of simplifications and other differences. + * (Rotor is currently in too much flux and its ecosystem very incomplete/alpha) + * + * The biggest difference (I think) between Rotor and this is the way it deals with the problem of + * communicating actions back from the Machine to the EventLoop. Since the EventLoop is the owner + * of the Machine, it is not possible to give a mutable borrow of the EventLoop to a method in the + * Machine, as that what will result in a double mutable borrow. This problem is inherent in the + * architecture and Rust is correct to disallow the borrow; since it would be possible to mutate + * the Machine object through multiple aliases. + * + * Rotor solves this problem by having the method handlers return a "response", i.e. an action that + * it wants the EventLoop to perform. This implementation is more flexible but slightly more hacky: + * When calling a method in the Machine, the Machine object itself is temporarily removed from the + * EventLoop object, thus removing the alias and allowing both objects to be Mutably borrowed. + * Downside: This solution is more prone to logic errors that Rust can't catch, like removing or + * re-using the MToken while inside a Machine handler. + * + * TODO: Machines don't care on which thread they run, so as a scalability improvement it's + * possible to spawn a configurable number of threads on start-up, run a separate event loop on + * each, and assign each Machine to a different event loop. + */ +use mio; +use mio::{Poll,Events,Token}; +use slab::Slab; + + +// TODO: Upstream this to Slab crate? +fn slab_insert<T, I:Into<usize>+From<usize>>(s: &mut Slab<T,I>, t: T) -> I { + s.insert(t).or_else(|t| { + // Grow by some small fixed number. From what I can see the Slab implementation already + // does exponential growth internally. + s.reserve_exact(8); + s.insert(t) + }).unwrap_or_else(|_| { unreachable!() }) +} + + +pub trait Machine { + fn init(&mut self, &mut Context); + fn handle(&mut self, &mut Context, mio::Event); +} + + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct MToken(pub usize); +impl From<usize> for MToken { fn from(val: usize) -> MToken { MToken(val) } } +impl From<MToken> for usize { fn from(val: MToken) -> usize { val.0 } } + + +pub struct EventLoop { + poll: Poll, + tokens: Slab<MToken, Token>, + // A machine entry is set to None during a method call on the Machine object. + machines: Slab<Option<Box<Machine>>, MToken> +} + +pub struct Context<'a> { + parent: &'a mut EventLoop, + machine: MToken, + removed: bool +} + + +impl<'a> Context<'a> { + pub fn assign(&mut self) -> Token { + slab_insert(&mut self.parent.tokens, self.machine) + } + + #[allow(dead_code)] + pub fn unassign(&mut self, t: Token) { + assert_eq!(self.parent.tokens[t], self.machine); + self.parent.tokens.remove(t); + } + + // This method is opinionated in always providing a PollOpt::level(). It's the only option that + // makes sense. :) + pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { + self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap(); + } + + #[allow(dead_code)] + pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented { + self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); + } + + #[allow(dead_code)] + pub fn deregister<E: ?Sized>(&mut self, io: &E) where E: mio::Evented { + self.parent.poll.deregister(io).unwrap(); + } + + #[allow(dead_code)] + pub fn remove(&mut self) { + self.removed = true; + } + + #[allow(dead_code)] + pub fn spawn(&mut self, machine: Box<Machine>) { + self.parent.spawn(machine); + } +} + + +impl EventLoop { + pub fn new() -> EventLoop { + EventLoop { + poll: Poll::new().unwrap(), + tokens: Slab::with_capacity(16), + machines: Slab::with_capacity(32), + } + } + + pub fn spawn(&mut self, mut machine: Box<Machine>) { + let mtoken = slab_insert(&mut self.machines, None); + { + let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; + machine.init(&mut ctx); + assert!(!ctx.removed); + } + self.machines[mtoken] = Some(machine); + } + + pub fn run(&mut self) { + let mut events = Events::with_capacity(64); + debug!("Entering event loop"); + loop { + self.poll.poll(&mut events, None).unwrap(); + trace!("Poll returned with {} events", events.len()); + for event in events.iter() { + let mtoken = self.tokens[event.token()]; + let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap(); + + let removed = { + let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; + machine.handle(&mut ctx, event); + ctx.removed + }; + + if !removed { + self.machines[mtoken] = Some(machine); + } + } + } + } +} diff --git a/src/listener.rs b/src/listener.rs new file mode 100644 index 0000000..e6683f3 --- /dev/null +++ b/src/listener.rs @@ -0,0 +1,49 @@ +use mio::{Ready,Token,Event}; +use mio::tcp::TcpListener; +use std::net::SocketAddr; +use std::io::Result; +use config::Config; +use eventloop::{Machine,Context,EventLoop}; + + +struct Listener { + sock: TcpListener, + io: Token +} + + +impl Listener { + fn new(addr: &SocketAddr) -> Result<Listener> { + trace!("Binding {}", addr); + let sock = try!(TcpListener::bind(addr)); + Ok(Listener { + sock: sock, + io: Token(0) + }) + } +} + + +impl Machine for Listener { + fn init(&mut self, ctx: &mut Context) { + self.io = ctx.assign(); + ctx.register(&self.sock, self.io, Ready::readable()); + info!("Listening on {}", self.sock.local_addr().unwrap()); + } + + fn handle(&mut self, _: &mut Context, _: Event) { + let(_, addr) = self.sock.accept().unwrap(); // TODO: ERROR HANDLING! + debug!("New connection from {}", addr); + } +} + + +pub fn setup(ev: &mut EventLoop, cfg: &Config) -> Result<()> { + for itf in &cfg.itf { + for addr in &itf.addr { + let lst = try!(Listener::new(&addr)); + ev.spawn(Box::new(lst)); + } + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 504b7f7..9a90486 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,13 +9,16 @@ #[macro_use] extern crate log; extern crate env_logger; extern crate getopts; +extern crate mio; +extern crate slab; +mod config; +mod eventloop; +mod listener; use std::process::exit; use std::io::prelude::*; -mod config; - struct CliOpts { config: String, @@ -63,7 +66,11 @@ impl CliOpts { fn main() { let opts = CliOpts::new().parse(); - let conf = config::Config::parse(&opts.config); - println!("{:?}", conf); + let conf = config::Config::parse(&opts.config).unwrap(); env_logger::init().unwrap(); + trace!("Configuration read from {}: {:?}", opts.config, conf); + + let mut main = eventloop::EventLoop::new(); + listener::setup(&mut main, &conf).unwrap(); + main.run(); } |