summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2016-09-07 16:51:37 +0200
committerYorhel <git@yorhel.nl>2016-09-07 16:51:37 +0200
commit863080971958332975098b918dd74ec9c2c44036 (patch)
treebb784b4468339aa2e5266449e0da334be0dd23bc
parentccd38998fd8ff2bbf2f916e6e0d04087eaaf6293 (diff)
eventloop: Make spawn() accept a closure that creates a Machine
This provides two advantages: Machines don't have to be Send/Sync anymore (but can still be spawned on different threads), and the init() and new() methods are now merged, allowing the Machine to access its context on initialization. I guess Rotor had a point here after all. :)
-rw-r--r--src/eventloop.rs18
-rw-r--r--src/itf_http.rs14
-rw-r--r--src/listener.rs33
3 files changed, 29 insertions, 36 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs
index 93343b1..befd104 100644
--- a/src/eventloop.rs
+++ b/src/eventloop.rs
@@ -67,7 +67,6 @@ const TIMER_TOKEN: Token = Token(std::usize::MAX-1);
pub trait Machine {
- fn init(&mut self, &mut Context);
fn handle(&mut self, &mut Context, mio::Event);
fn timeout(&mut self, &mut Context, TToken);
}
@@ -88,7 +87,7 @@ pub struct EventLoop {
timer: Timer<TToken>,
timers: Slab<Option<(MToken,Timeout)>, TToken>,
// A machine entry is set to None during a method call on the Machine object.
- machines: Slab<Option<Box<Machine + Sync>>, MToken>,
+ machines: Slab<Option<Box<Machine>>, MToken>,
}
pub struct Context<'a> {
@@ -158,8 +157,8 @@ impl<'a> Context<'a> {
self.removed
}
- pub fn spawn(&mut self, machine: Box<Machine + Sync>) {
- self.parent.spawn(machine);
+ pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> {
+ self.parent.spawn(f);
}
}
@@ -189,20 +188,21 @@ impl EventLoop {
}
}
- pub fn spawn(&mut self, mut machine: Box<Machine + Sync>) {
+ pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> {
let mtoken = slab_insert(&mut self.machines, None);
trace!("[{}] Spawning machine", mtoken.0);
- {
+ let machine = {
let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
- machine.init(&mut ctx);
+ let m = f(&mut ctx);
assert!(!ctx.removed);
- }
+ m
+ };
self.machines[mtoken] = Some(machine);
}
// XXX: I don't get why "&mut Machine" doesn't work in the FnOnce.
fn dispatch<F>(&mut self, mtoken: MToken, f: F)
- where F: FnOnce(&mut Box<Machine + Sync>, &mut Context) {
+ where F: FnOnce(&mut Box<Machine>, &mut Context) {
let mut machine = match self.machines.entry(mtoken) {
None => { return; },
Some(mut x) => { x.replace(None).unwrap() },
diff --git a/src/itf_http.rs b/src/itf_http.rs
index a89d9e0..cda923f 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -22,15 +22,16 @@ pub struct ItfHttp {
impl ItfHttp {
- pub fn new(itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
+ pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
+ let io = ctx.register(&sock, Ready::readable());
ItfHttp {
sock: sock,
addr: addr,
- io: Token(0),
+ io: io,
ioreg: Ready::readable(),
rbuf: Buf::new(),
wbuf: Buf::new(),
- iotimeout: TToken(0),
+ iotimeout: ctx.set_timeout(itf.cfg.io_timeout),
itf: itf,
}
}
@@ -47,7 +48,7 @@ impl ItfHttp {
}
return;
},
- Ok(0) => { // This behaviour isn't documented, unsure if it's intended.
+ Ok(0) => {
debug!("{}: Connection closed", self.addr);
self.remove(ctx);
return;
@@ -106,11 +107,6 @@ impl ItfHttp {
impl Machine for ItfHttp {
- fn init(&mut self, ctx: &mut Context) {
- self.io = ctx.register(&self.sock, self.ioreg);
- self.iotimeout = ctx.set_timeout(self.itf.cfg.io_timeout);
- }
-
fn handle(&mut self, ctx: &mut Context, ev: Event) {
if ev.kind().is_readable() {
self.handle_read(ctx);
diff --git a/src/listener.rs b/src/listener.rs
index f1a1ad9..2823fe0 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -78,27 +78,21 @@ struct Listener {
impl Listener {
- fn new(itf: Arc<Interface>, addr: &SocketAddr) -> Result<Listener> {
- trace!("Binding {}", addr);
- let sock = try!(TcpListener::bind(addr));
- Ok(Listener {
+ fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpListener, addr: SocketAddr) -> Listener {
+ info!("Listening on {}", addr);
+ let io = ctx.register(&sock, Ready::readable());
+ Listener {
sock: sock,
- addr: *addr,
- io: Token(0),
- timeout: TToken(0),
+ addr: addr,
+ io: io,
+ timeout: ctx.alloc_timeout(),
itf: itf,
- })
+ }
}
}
impl Machine for Listener {
- fn init(&mut self, ctx: &mut Context) {
- self.io = ctx.register(&self.sock, Ready::readable());
- self.timeout = ctx.alloc_timeout();
- info!("Listening on {}", self.addr);
- }
-
fn handle(&mut self, ctx: &mut Context, _: Event) {
match self.sock.accept() {
Err(err) => {
@@ -118,7 +112,8 @@ impl Machine for Listener {
Ok((sock, addr)) => {
debug!("{}: New connection", addr);
if self.itf.acquire() {
- ctx.spawn(Box::new(itf_http::ItfHttp::new(self.itf.clone(), sock, addr)));
+ let itf = self.itf.clone();
+ ctx.spawn(move|ctx| { Box::new(itf_http::ItfHttp::new(ctx, itf.clone(), sock, addr)) });
} else {
// TODO: Specify which connection limit
warn!("Connection limit reached, consider increasing max-connections.");
@@ -145,9 +140,11 @@ pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> {
cfg: itf.clone(), // Silly, config is never deallocated.
});
- for addr in &itf.addr {
- let lst = try!(Listener::new(interface.clone(), &addr));
- ev.spawn(Box::new(lst));
+ for &addr in &itf.addr {
+ trace!("Binding {}", addr);
+ let sock = try!(TcpListener::bind(&addr));
+ let itf = interface.clone();
+ ev.spawn(move|ctx| { Box::new(Listener::new(ctx, itf, sock, addr)) });
}
}
Ok(())