summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/eventloop.rs41
-rw-r--r--src/itf_http.rs66
-rw-r--r--src/listener.rs8
3 files changed, 61 insertions, 54 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs
index befd104..9192401 100644
--- a/src/eventloop.rs
+++ b/src/eventloop.rs
@@ -66,9 +66,18 @@ fn slab_insert<T, I:Into<usize>+From<usize>>(s: &mut Slab<T,I>, t: T) -> I {
const TIMER_TOKEN: Token = Token(std::usize::MAX-1);
+pub enum Action {
+ Remove
+}
+
pub trait Machine {
- fn handle(&mut self, &mut Context, mio::Event);
- fn timeout(&mut self, &mut Context, TToken);
+ fn handle(&mut self, &mut Context, mio::Event) -> Option<Action>;
+ fn timeout(&mut self, &mut Context, TToken) -> Option<Action>;
+ // Called right before the Machine is destroyed, allows it to deregister() and
+ // cancel_timeout().
+ // TODO: This cleanup is better done automatically, either at removal of the Machine or through
+ // a guard object given to the Machine.
+ fn remove(&mut self, &mut Context) { }
}
@@ -93,7 +102,6 @@ pub struct EventLoop {
pub struct Context<'a> {
parent: &'a mut EventLoop,
machine: MToken,
- removed: bool
}
@@ -149,14 +157,6 @@ impl<'a> Context<'a> {
}
}
- pub fn remove(&mut self) {
- self.removed = true;
- }
-
- pub fn is_removed(&self) -> bool {
- self.removed
- }
-
pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> {
self.parent.spawn(f);
}
@@ -192,32 +192,31 @@ impl EventLoop {
let mtoken = slab_insert(&mut self.machines, None);
trace!("[{}] Spawning machine", mtoken.0);
let machine = {
- let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
- let m = f(&mut ctx);
- assert!(!ctx.removed);
- m
+ let mut ctx = Context{ parent: self, machine: mtoken };
+ f(&mut ctx)
};
self.machines[mtoken] = Some(machine);
}
// XXX: I don't get why "&mut Machine" doesn't work in the FnOnce.
fn dispatch<F>(&mut self, mtoken: MToken, f: F)
- where F: FnOnce(&mut Box<Machine>, &mut Context) {
+ where F: FnOnce(&mut Box<Machine>, &mut Context) -> Option<Action> {
let mut machine = match self.machines.entry(mtoken) {
None => { return; },
Some(mut x) => { x.replace(None).unwrap() },
};
- let removed = {
- let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
- f(&mut machine, &mut ctx);
- ctx.removed
+ let action = {
+ let mut ctx = Context{ parent: self, machine: mtoken };
+ f(&mut machine, &mut ctx)
};
- if !removed {
+ if action.is_none() {
self.machines[mtoken] = Some(machine);
} else {
trace!("[{}] Removing machine", mtoken.0);
+ let mut ctx = Context{ parent: self, machine: mtoken };
+ machine.remove(&mut ctx);
}
}
diff --git a/src/itf_http.rs b/src/itf_http.rs
index cda923f..012323c 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -2,11 +2,12 @@ use mio::{Ready,Token,Event};
use mio::tcp::TcpStream;
use std::net::SocketAddr;
use std::io::ErrorKind;
-use listener::Interface;
-use eventloop::{Machine,Context,TToken};
use std::sync::Arc;
use netbuf::Buf;
+use listener::Interface;
+use eventloop::{Machine,Context,TToken,Action};
+
// TODO: Set a max size on the buffers and provide backpressure
pub struct ItfHttp {
@@ -21,6 +22,15 @@ pub struct ItfHttp {
}
+macro_rules! try_act {
+ ( $e: expr ) => {{
+ if let Some(x) = $e {
+ return Some(x);
+ }
+ }};
+}
+
+
impl ItfHttp {
pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
let io = ctx.register(&sock, Ready::readable());
@@ -36,22 +46,21 @@ impl ItfHttp {
}
}
- fn handle_read(&mut self, ctx: &mut Context) {
+ fn handle_read(&mut self) -> Option<Action> {
match self.rbuf.read_from(&mut self.sock) {
Err(err) => {
match err.kind() {
ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
_ => {
debug!("{}: Read error: {}", self.addr, err);
- self.remove(ctx);
+ return Some(Action::Remove);
}
}
- return;
+ return None;
},
Ok(0) => {
debug!("{}: Connection closed", self.addr);
- self.remove(ctx);
- return;
+ return Some(Action::Remove);
},
_ => {}
}
@@ -59,31 +68,30 @@ impl ItfHttp {
// through some handlers to get a response and send it back. But for now let's just act
// like we wanted to be an echo server all along.
self.rbuf.write_to(&mut self.wbuf).unwrap();
+ None
}
- fn handle_write(&mut self, ctx: &mut Context) {
+ fn handle_write(&mut self) -> Option<Action> {
if let Err(err) = self.wbuf.write_to(&mut self.sock) {
match err.kind() {
ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
_ => {
debug!("{}: Write error: {}", self.addr, err);
- self.remove(ctx);
+ return Some(Action::Remove);
}
}
}
+ None
}
- fn set_ioreg(&mut self, ctx: &mut Context) {
+ fn set_ioreg(&mut self, ctx: &mut Context) -> Option<Action> {
// Optimization: If the write buffer was empty before but contains data now, we can already
// try to write it out at this point. This way we can avoid going through the event loop if
// the OS buffers have enough space.
// TODO: Measure how effective this is in practice; if this only tends to happen on the
// first write to a new socket then it might not be worth the effort.
if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) {
- self.handle_write(ctx);
- if ctx.is_removed() {
- return;
- }
+ try_act!(self.handle_write());
}
let mut reg = Ready::readable();
@@ -95,32 +103,30 @@ impl ItfHttp {
ctx.reregister(&self.sock, self.io, self.ioreg);
}
ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout);
- }
-
- fn remove(&mut self, ctx: &mut Context) {
- self.itf.release();
- ctx.deregister(&self.sock, self.io);
- ctx.cancel_timeout(self.iotimeout);
- ctx.remove();
+ None
}
}
impl Machine for ItfHttp {
- fn handle(&mut self, ctx: &mut Context, ev: Event) {
+ fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> {
if ev.kind().is_readable() {
- self.handle_read(ctx);
+ try_act!(self.handle_read());
}
- if !ctx.is_removed() && ev.kind().is_writable() {
- self.handle_write(ctx);
- }
- if !ctx.is_removed() {
- self.set_ioreg(ctx);
+ if ev.kind().is_writable() {
+ try_act!(self.handle_write());
}
+ self.set_ioreg(ctx)
}
- fn timeout(&mut self, ctx: &mut Context, _: TToken) {
+ fn timeout(&mut self, _: &mut Context, _: TToken) -> Option<Action> {
debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs());
- self.remove(ctx);
+ Some(Action::Remove)
+ }
+
+ fn remove(&mut self, ctx: &mut Context) {
+ self.itf.release();
+ ctx.deregister(&self.sock, self.io);
+ ctx.cancel_timeout(self.iotimeout);
}
}
diff --git a/src/listener.rs b/src/listener.rs
index 2823fe0..d6dc45f 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -3,9 +3,9 @@ use mio::tcp::TcpListener;
use std::net::SocketAddr;
use std::io::{Result,ErrorKind};
use std::time::Duration;
-use eventloop::{Machine,Context,EventLoop,TToken};
use std::sync::{Arc,Mutex};
+use eventloop::{Machine,Context,EventLoop,TToken,Action};
use config;
use itf_http;
@@ -93,7 +93,7 @@ impl Listener {
impl Machine for Listener {
- fn handle(&mut self, ctx: &mut Context, _: Event) {
+ fn handle(&mut self, ctx: &mut Context, _: Event) -> Option<Action> {
match self.sock.accept() {
Err(err) => {
match err.kind() {
@@ -120,10 +120,12 @@ impl Machine for Listener {
}
}
}
+ None
}
- fn timeout(&mut self, ctx: &mut Context, _: TToken) {
+ fn timeout(&mut self, ctx: &mut Context, _: TToken) -> Option<Action> {
self.io = ctx.register(&self.sock, Ready::readable());
+ None
}
}