summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config.rs4
-rw-r--r--src/eventloop.rs144
-rw-r--r--src/listener.rs49
-rw-r--r--src/main.rs15
4 files changed, 206 insertions, 6 deletions
diff --git a/src/config.rs b/src/config.rs
index ea3a14a..c3882d8 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -42,12 +42,12 @@ named!(parse_expr<&[u8],Expr>, alt!(
#[derive(Debug,Clone,Default)]
pub struct Interface {
- addr: Vec<SocketAddr>
+ pub addr: Vec<SocketAddr>
}
#[derive(Debug,Clone,Default)]
pub struct Config {
- itf: Vec<Interface>
+ pub itf: Vec<Interface>
}
diff --git a/src/eventloop.rs b/src/eventloop.rs
new file mode 100644
index 0000000..2cf8ecd
--- /dev/null
+++ b/src/eventloop.rs
@@ -0,0 +1,144 @@
+/* This is a thin wrapper around mio for building state machines on a shared event loop.
+ * It's similar in spirit to Rotor, but with a bunch of simplifications and other differences.
+ * (Rotor is currently in too much flux and its ecosystem very incomplete/alpha)
+ *
+ * The biggest difference (I think) between Rotor and this is the way it deals with the problem of
+ * communicating actions back from the Machine to the EventLoop. Since the EventLoop is the owner
+ * of the Machine, it is not possible to give a mutable borrow of the EventLoop to a method in the
+ * Machine, as that what will result in a double mutable borrow. This problem is inherent in the
+ * architecture and Rust is correct to disallow the borrow; since it would be possible to mutate
+ * the Machine object through multiple aliases.
+ *
+ * Rotor solves this problem by having the method handlers return a "response", i.e. an action that
+ * it wants the EventLoop to perform. This implementation is more flexible but slightly more hacky:
+ * When calling a method in the Machine, the Machine object itself is temporarily removed from the
+ * EventLoop object, thus removing the alias and allowing both objects to be Mutably borrowed.
+ * Downside: This solution is more prone to logic errors that Rust can't catch, like removing or
+ * re-using the MToken while inside a Machine handler.
+ *
+ * TODO: Machines don't care on which thread they run, so as a scalability improvement it's
+ * possible to spawn a configurable number of threads on start-up, run a separate event loop on
+ * each, and assign each Machine to a different event loop.
+ */
+use mio;
+use mio::{Poll,Events,Token};
+use slab::Slab;
+
+
+// TODO: Upstream this to Slab crate?
+fn slab_insert<T, I:Into<usize>+From<usize>>(s: &mut Slab<T,I>, t: T) -> I {
+ s.insert(t).or_else(|t| {
+ // Grow by some small fixed number. From what I can see the Slab implementation already
+ // does exponential growth internally.
+ s.reserve_exact(8);
+ s.insert(t)
+ }).unwrap_or_else(|_| { unreachable!() })
+}
+
+
+pub trait Machine {
+ fn init(&mut self, &mut Context);
+ fn handle(&mut self, &mut Context, mio::Event);
+}
+
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct MToken(pub usize);
+impl From<usize> for MToken { fn from(val: usize) -> MToken { MToken(val) } }
+impl From<MToken> for usize { fn from(val: MToken) -> usize { val.0 } }
+
+
+pub struct EventLoop {
+ poll: Poll,
+ tokens: Slab<MToken, Token>,
+ // A machine entry is set to None during a method call on the Machine object.
+ machines: Slab<Option<Box<Machine>>, MToken>
+}
+
+pub struct Context<'a> {
+ parent: &'a mut EventLoop,
+ machine: MToken,
+ removed: bool
+}
+
+
+impl<'a> Context<'a> {
+ pub fn assign(&mut self) -> Token {
+ slab_insert(&mut self.parent.tokens, self.machine)
+ }
+
+ #[allow(dead_code)]
+ pub fn unassign(&mut self, t: Token) {
+ assert_eq!(self.parent.tokens[t], self.machine);
+ self.parent.tokens.remove(t);
+ }
+
+ // This method is opinionated in always providing a PollOpt::level(). It's the only option that
+ // makes sense. :)
+ pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
+ self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap();
+ }
+
+ #[allow(dead_code)]
+ pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
+ self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap();
+ }
+
+ #[allow(dead_code)]
+ pub fn deregister<E: ?Sized>(&mut self, io: &E) where E: mio::Evented {
+ self.parent.poll.deregister(io).unwrap();
+ }
+
+ #[allow(dead_code)]
+ pub fn remove(&mut self) {
+ self.removed = true;
+ }
+
+ #[allow(dead_code)]
+ pub fn spawn(&mut self, machine: Box<Machine>) {
+ self.parent.spawn(machine);
+ }
+}
+
+
+impl EventLoop {
+ pub fn new() -> EventLoop {
+ EventLoop {
+ poll: Poll::new().unwrap(),
+ tokens: Slab::with_capacity(16),
+ machines: Slab::with_capacity(32),
+ }
+ }
+
+ pub fn spawn(&mut self, mut machine: Box<Machine>) {
+ let mtoken = slab_insert(&mut self.machines, None);
+ {
+ let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
+ machine.init(&mut ctx);
+ assert!(!ctx.removed);
+ }
+ self.machines[mtoken] = Some(machine);
+ }
+
+ pub fn run(&mut self) {
+ let mut events = Events::with_capacity(64);
+ debug!("Entering event loop");
+ loop {
+ self.poll.poll(&mut events, None).unwrap();
+ trace!("Poll returned with {} events", events.len());
+ for event in events.iter() {
+ let mtoken = self.tokens[event.token()];
+ let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap();
+
+ let removed = {
+ let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
+ machine.handle(&mut ctx, event);
+ ctx.removed
+ };
+
+ if !removed {
+ self.machines[mtoken] = Some(machine);
+ }
+ }
+ }
+ }
+}
diff --git a/src/listener.rs b/src/listener.rs
new file mode 100644
index 0000000..e6683f3
--- /dev/null
+++ b/src/listener.rs
@@ -0,0 +1,49 @@
+use mio::{Ready,Token,Event};
+use mio::tcp::TcpListener;
+use std::net::SocketAddr;
+use std::io::Result;
+use config::Config;
+use eventloop::{Machine,Context,EventLoop};
+
+
+struct Listener {
+ sock: TcpListener,
+ io: Token
+}
+
+
+impl Listener {
+ fn new(addr: &SocketAddr) -> Result<Listener> {
+ trace!("Binding {}", addr);
+ let sock = try!(TcpListener::bind(addr));
+ Ok(Listener {
+ sock: sock,
+ io: Token(0)
+ })
+ }
+}
+
+
+impl Machine for Listener {
+ fn init(&mut self, ctx: &mut Context) {
+ self.io = ctx.assign();
+ ctx.register(&self.sock, self.io, Ready::readable());
+ info!("Listening on {}", self.sock.local_addr().unwrap());
+ }
+
+ fn handle(&mut self, _: &mut Context, _: Event) {
+ let(_, addr) = self.sock.accept().unwrap(); // TODO: ERROR HANDLING!
+ debug!("New connection from {}", addr);
+ }
+}
+
+
+pub fn setup(ev: &mut EventLoop, cfg: &Config) -> Result<()> {
+ for itf in &cfg.itf {
+ for addr in &itf.addr {
+ let lst = try!(Listener::new(&addr));
+ ev.spawn(Box::new(lst));
+ }
+ }
+ Ok(())
+}
diff --git a/src/main.rs b/src/main.rs
index 504b7f7..9a90486 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,13 +9,16 @@
#[macro_use] extern crate log;
extern crate env_logger;
extern crate getopts;
+extern crate mio;
+extern crate slab;
+mod config;
+mod eventloop;
+mod listener;
use std::process::exit;
use std::io::prelude::*;
-mod config;
-
struct CliOpts {
config: String,
@@ -63,7 +66,11 @@ impl CliOpts {
fn main() {
let opts = CliOpts::new().parse();
- let conf = config::Config::parse(&opts.config);
- println!("{:?}", conf);
+ let conf = config::Config::parse(&opts.config).unwrap();
env_logger::init().unwrap();
+ trace!("Configuration read from {}: {:?}", opts.config, conf);
+
+ let mut main = eventloop::EventLoop::new();
+ listener::setup(&mut main, &conf).unwrap();
+ main.run();
}