diff options
author | Yorhel <git@yorhel.nl> | 2016-09-07 16:51:37 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2016-09-07 16:51:37 +0200 |
commit | 863080971958332975098b918dd74ec9c2c44036 (patch) | |
tree | bb784b4468339aa2e5266449e0da334be0dd23bc /src | |
parent | ccd38998fd8ff2bbf2f916e6e0d04087eaaf6293 (diff) |
eventloop: Make spawn() accept a closure that creates a Machine
This provides two advantages: Machines don't have to be Send/Sync
anymore (but can still be spawned on different threads), and the init()
and new() methods are now merged, allowing the Machine to access its
context on initialization.
I guess Rotor had a point here after all. :)
Diffstat (limited to 'src')
-rw-r--r-- | src/eventloop.rs | 18 | ||||
-rw-r--r-- | src/itf_http.rs | 14 | ||||
-rw-r--r-- | src/listener.rs | 33 |
3 files changed, 29 insertions, 36 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs index 93343b1..befd104 100644 --- a/src/eventloop.rs +++ b/src/eventloop.rs @@ -67,7 +67,6 @@ const TIMER_TOKEN: Token = Token(std::usize::MAX-1); pub trait Machine { - fn init(&mut self, &mut Context); fn handle(&mut self, &mut Context, mio::Event); fn timeout(&mut self, &mut Context, TToken); } @@ -88,7 +87,7 @@ pub struct EventLoop { timer: Timer<TToken>, timers: Slab<Option<(MToken,Timeout)>, TToken>, // A machine entry is set to None during a method call on the Machine object. - machines: Slab<Option<Box<Machine + Sync>>, MToken>, + machines: Slab<Option<Box<Machine>>, MToken>, } pub struct Context<'a> { @@ -158,8 +157,8 @@ impl<'a> Context<'a> { self.removed } - pub fn spawn(&mut self, machine: Box<Machine + Sync>) { - self.parent.spawn(machine); + pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> { + self.parent.spawn(f); } } @@ -189,20 +188,21 @@ impl EventLoop { } } - pub fn spawn(&mut self, mut machine: Box<Machine + Sync>) { + pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> { let mtoken = slab_insert(&mut self.machines, None); trace!("[{}] Spawning machine", mtoken.0); - { + let machine = { let mut ctx = Context{ parent: self, machine: mtoken, removed: false }; - machine.init(&mut ctx); + let m = f(&mut ctx); assert!(!ctx.removed); - } + m + }; self.machines[mtoken] = Some(machine); } // XXX: I don't get why "&mut Machine" doesn't work in the FnOnce. fn dispatch<F>(&mut self, mtoken: MToken, f: F) - where F: FnOnce(&mut Box<Machine + Sync>, &mut Context) { + where F: FnOnce(&mut Box<Machine>, &mut Context) { let mut machine = match self.machines.entry(mtoken) { None => { return; }, Some(mut x) => { x.replace(None).unwrap() }, diff --git a/src/itf_http.rs b/src/itf_http.rs index a89d9e0..cda923f 100644 --- a/src/itf_http.rs +++ b/src/itf_http.rs @@ -22,15 +22,16 @@ pub struct ItfHttp { impl ItfHttp { - pub fn new(itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp { + pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp { + let io = ctx.register(&sock, Ready::readable()); ItfHttp { sock: sock, addr: addr, - io: Token(0), + io: io, ioreg: Ready::readable(), rbuf: Buf::new(), wbuf: Buf::new(), - iotimeout: TToken(0), + iotimeout: ctx.set_timeout(itf.cfg.io_timeout), itf: itf, } } @@ -47,7 +48,7 @@ impl ItfHttp { } return; }, - Ok(0) => { // This behaviour isn't documented, unsure if it's intended. + Ok(0) => { debug!("{}: Connection closed", self.addr); self.remove(ctx); return; @@ -106,11 +107,6 @@ impl ItfHttp { impl Machine for ItfHttp { - fn init(&mut self, ctx: &mut Context) { - self.io = ctx.register(&self.sock, self.ioreg); - self.iotimeout = ctx.set_timeout(self.itf.cfg.io_timeout); - } - fn handle(&mut self, ctx: &mut Context, ev: Event) { if ev.kind().is_readable() { self.handle_read(ctx); diff --git a/src/listener.rs b/src/listener.rs index f1a1ad9..2823fe0 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -78,27 +78,21 @@ struct Listener { impl Listener { - fn new(itf: Arc<Interface>, addr: &SocketAddr) -> Result<Listener> { - trace!("Binding {}", addr); - let sock = try!(TcpListener::bind(addr)); - Ok(Listener { + fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpListener, addr: SocketAddr) -> Listener { + info!("Listening on {}", addr); + let io = ctx.register(&sock, Ready::readable()); + Listener { sock: sock, - addr: *addr, - io: Token(0), - timeout: TToken(0), + addr: addr, + io: io, + timeout: ctx.alloc_timeout(), itf: itf, - }) + } } } impl Machine for Listener { - fn init(&mut self, ctx: &mut Context) { - self.io = ctx.register(&self.sock, Ready::readable()); - self.timeout = ctx.alloc_timeout(); - info!("Listening on {}", self.addr); - } - fn handle(&mut self, ctx: &mut Context, _: Event) { match self.sock.accept() { Err(err) => { @@ -118,7 +112,8 @@ impl Machine for Listener { Ok((sock, addr)) => { debug!("{}: New connection", addr); if self.itf.acquire() { - ctx.spawn(Box::new(itf_http::ItfHttp::new(self.itf.clone(), sock, addr))); + let itf = self.itf.clone(); + ctx.spawn(move|ctx| { Box::new(itf_http::ItfHttp::new(ctx, itf.clone(), sock, addr)) }); } else { // TODO: Specify which connection limit warn!("Connection limit reached, consider increasing max-connections."); @@ -145,9 +140,11 @@ pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> { cfg: itf.clone(), // Silly, config is never deallocated. }); - for addr in &itf.addr { - let lst = try!(Listener::new(interface.clone(), &addr)); - ev.spawn(Box::new(lst)); + for &addr in &itf.addr { + trace!("Binding {}", addr); + let sock = try!(TcpListener::bind(&addr)); + let itf = interface.clone(); + ev.spawn(move|ctx| { Box::new(Listener::new(ctx, itf, sock, addr)) }); } } Ok(()) |