summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2016-09-06 19:37:34 +0200
committerYorhel <git@yorhel.nl>2016-09-06 19:37:34 +0200
commitefe595493c9a8dd1e4df3b9045fff012521449c8 (patch)
treecb1d03aaa426707495c58427c69b81c9f62abaea
parent39515558ad68edffc6fe52936714fcefbfc58949 (diff)
Add accept() backoff timer + tcp idle timer + minor eventloop API improvements
-rw-r--r--src/eventloop.rs169
-rw-r--r--src/itf_http.rs26
-rw-r--r--src/listener.rs25
3 files changed, 176 insertions, 44 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs
index 84f1e77..1a41762 100644
--- a/src/eventloop.rs
+++ b/src/eventloop.rs
@@ -2,6 +2,8 @@
* It's similar in spirit to Rotor, but with a bunch of simplifications and other differences.
* (Rotor is currently in too much flux and its ecosystem very incomplete/alpha)
*
+ * = Machine -> Eventloop interaction
+ *
* The biggest difference (I think) between Rotor and this is the way it deals with the problem of
* communicating actions back from the Machine to the EventLoop. Since the EventLoop is the owner
* of the Machine, it is not possible to give a mutable borrow of the EventLoop to a method in the
@@ -16,12 +18,34 @@
* Downside: This solution is more prone to logic errors that Rust can't catch, like removing or
* re-using the MToken while inside a Machine handler.
*
+ * = Some rants on mio::timer
+ *
+ * This implementation wraps mio::timer in a way to provide an API that is similar to IO
+ * registration. The mio-provided API has a few odd warts that make it hard to use:
+ * - On registration you get a Timeout object, but on poll you get your custom token
+ * - On poll you only get your custom token, not the previous Timeout object. In fact, the Timeout
+ * object has implicitly become invalid.
+ * - The Timeout object is needed in order to change the timeout or cancel it
+ *
+ * This wrapping provides extra overhead because we need to keep track of each timer registration.
+ * Some other weak points of mio::timer:
+ * - It spawns a thread in order to signal timeouts. To me it seems much easier and efficient to
+ * simply provide a timeout value that can be passed to Poll::poll().
+ * - It does an up-front allocation for all timer objects, which is a waste of memory if the upper
+ * bound on the number of timers is much higher than what is actually reached (likely), and
+ * forces me to calculate this upper bound in the first place. (It also requires me to add
+ * ugly-looking unwrap()'s to function calls that should never fail)
+ *
+ *
* TODO: Machines don't care on which thread they run, so as a scalability improvement it's
* possible to spawn a configurable number of threads on start-up, run a separate event loop on
* each, and assign each Machine to a different event loop.
*/
use mio;
use mio::{Poll,Events,Token};
+use mio::timer::{Timer,Timeout};
+use std;
+use std::time::Duration;
use slab::Slab;
@@ -36,9 +60,16 @@ fn slab_insert<T, I:Into<usize>+From<usize>>(s: &mut Slab<T,I>, t: T) -> I {
}
+// Reserved token for the timer. Assuming this value is never allocated inside a Slab.
+// Note that mio already uses MAX internally, and this is undocumented.
+// Such a fragile API. -.-
+const TIMER_TOKEN: Token = Token(std::usize::MAX-1);
+
+
pub trait Machine {
fn init(&mut self, &mut Context);
fn handle(&mut self, &mut Context, mio::Event);
+ fn timeout(&mut self, &mut Context, TToken);
}
@@ -46,12 +77,18 @@ pub trait Machine {
impl From<usize> for MToken { fn from(val: usize) -> MToken { MToken(val) } }
impl From<MToken> for usize { fn from(val: MToken) -> usize { val.0 } }
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TToken(pub usize);
+impl From<usize> for TToken { fn from(val: usize) -> TToken { TToken(val) } }
+impl From<TToken> for usize { fn from(val: TToken) -> usize { val.0 } }
+
pub struct EventLoop {
poll: Poll,
tokens: Slab<MToken, Token>,
+ timer: Timer<TToken>,
+ timers: Slab<Option<(MToken,Timeout)>, TToken>,
// A machine entry is set to None during a method call on the Machine object.
- machines: Slab<Option<Box<Machine>>, MToken>
+ machines: Slab<Option<Box<Machine>>, MToken>,
}
pub struct Context<'a> {
@@ -62,29 +99,57 @@ pub struct Context<'a> {
impl<'a> Context<'a> {
- pub fn assign(&mut self) -> Token {
- slab_insert(&mut self.parent.tokens, self.machine)
- }
-
- pub fn unassign(&mut self, t: Token) {
- assert_eq!(self.parent.tokens[t], self.machine);
- self.parent.tokens.remove(t);
- }
-
// This method is opinionated in always providing a PollOpt::level(). It's the only option that
// makes sense. :)
- pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
+ pub fn register<E: ?Sized>(&mut self, io: &E, interest: mio::Ready) -> Token where E: mio::Evented {
+ let token = slab_insert(&mut self.parent.tokens, self.machine);
self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap();
+ token
}
pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
+ assert_eq!(self.parent.tokens[token], self.machine);
self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap();
}
- pub fn deregister<E: ?Sized>(&mut self, io: &E) where E: mio::Evented {
+ pub fn deregister<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented {
+ assert_eq!(self.parent.tokens[token], self.machine);
+ self.parent.tokens.remove(token);
self.parent.poll.deregister(io).unwrap();
}
+ // The _timeout() methods behave the same as the above register methods, and follow the same
+ // control flow, independent of whether the timeout has passed in between or not:
+ // 1. set_timeout() or alloc_timeout() to get and initialize a token,
+ // 2. Followed by any number of reset_timeout() to change the duration of the timer if it is
+ // still active, or set the new timeout after it has fired.
+ // 3. cancel_timeout() to stop the timer if it still active, and deallocate the timeout
+
+ // Allocates a timeout token but does not set a timeout. The token can be activated with
+ // reset_timeout()
+ pub fn alloc_timeout(&mut self) -> TToken {
+ slab_insert(&mut self.parent.timers, None)
+ }
+
+ pub fn set_timeout(&mut self, d: Duration) -> TToken {
+ let token = self.alloc_timeout();
+ self.reset_timeout(token, d);
+ token
+ }
+
+ pub fn reset_timeout(&mut self, token: TToken, d: Duration) {
+ if let Some((_, ref timeout)) = self.parent.timers[token] {
+ self.parent.timer.cancel_timeout(timeout);
+ }
+ self.parent.timers[token] = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap()));
+ }
+
+ pub fn cancel_timeout(&mut self, token: TToken) {
+ if let Some( Some((_, timeout)) ) = self.parent.timers.remove(token) {
+ self.parent.timer.cancel_timeout(&timeout);
+ }
+ }
+
pub fn remove(&mut self) {
self.removed = true;
}
@@ -101,16 +166,32 @@ impl<'a> Context<'a> {
impl EventLoop {
pub fn new() -> EventLoop {
+ let timer = mio::timer::Builder::default()
+ // Timers are used for (1) aborting a connection or process after some timeout and
+ // (2) back-off after listen error. (1) will be fine with 1-second precision, and
+ // (2) should be incredibly rare. The current value is terribly imprecise for a
+ // generic timer, but should suffice for our limited use case.
+ .tick_duration(Duration::from_millis(200))
+ // TODO: Calculate a more exact upper limit. I'd prefer mio to just resize internal
+ // structures on demand, but it can't do that (yet). The default value allocates
+ // 2MB of memory, which is far too wasteful.
+ .capacity(4_000).build();
+
+ let poll = Poll::new().unwrap();
+ poll.register(&timer, TIMER_TOKEN, mio::Ready::readable(), mio::PollOpt::edge()).unwrap();
+
EventLoop {
- poll: Poll::new().unwrap(),
+ poll: poll,
tokens: Slab::with_capacity(16),
- machines: Slab::with_capacity(32),
+ timer: timer,
+ timers: Slab::with_capacity(16),
+ machines: Slab::with_capacity(16),
}
}
pub fn spawn(&mut self, mut machine: Box<Machine>) {
let mtoken = slab_insert(&mut self.machines, None);
- trace!("Spawning machine {}", mtoken.0);
+ trace!("[{}] Spawning machine", mtoken.0);
{
let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
machine.init(&mut ctx);
@@ -119,26 +200,58 @@ impl EventLoop {
self.machines[mtoken] = Some(machine);
}
+ // XXX: I don't get why "&mut Machine" doesn't work in the FnOnce.
+ fn dispatch<F>(&mut self, mtoken: MToken, f: F)
+ where F: FnOnce(&mut Box<Machine>, &mut Context) {
+ let mut machine = match self.machines.entry(mtoken) {
+ None => { return; },
+ Some(mut x) => { x.replace(None).unwrap() },
+ };
+
+ let removed = {
+ let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
+ f(&mut machine, &mut ctx);
+ ctx.removed
+ };
+
+ if !removed {
+ self.machines[mtoken] = Some(machine);
+ } else {
+ trace!("[{}] Removing machine", mtoken.0);
+ }
+ }
+
+ fn dispatch_io(&mut self, event: mio::Event) {
+ if let Some(&mtoken) = self.tokens.get(event.token()) {
+ self.dispatch(mtoken, |m, ctx| {
+ trace!("[{}] Calling handle for event {} state {:?}", mtoken.0, event.token().0, event.kind());
+ m.handle(ctx, event)
+ });
+ }
+ }
+
+ fn dispatch_timeout(&mut self) {
+ while let Some(ttoken) = self.timer.poll() {
+ let mtoken = self.timers.entry(ttoken).and_then(|mut e| { e.replace(None).map(|(t,_)| { t }) });
+ if let Some(mtoken) = mtoken {
+ self.dispatch(mtoken, |m, ctx| {
+ trace!("[{}] Calling timeout", mtoken.0);
+ m.timeout(ctx, ttoken)
+ });
+ }
+ }
+ }
+
pub fn run(&mut self) {
let mut events = Events::with_capacity(64);
debug!("Entering event loop");
loop {
self.poll.poll(&mut events, None).unwrap();
for event in events.iter() {
- let mtoken = self.tokens[event.token()];
- trace!("Poll returned event {} on machine {} state {:?}", event.token().0, mtoken.0, event.kind());
-
- let mut machine = self.machines.entry(mtoken).unwrap().replace(None).unwrap();
- let removed = {
- let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
- machine.handle(&mut ctx, event);
- ctx.removed
- };
-
- if !removed {
- self.machines[mtoken] = Some(machine);
+ if event.token() == TIMER_TOKEN {
+ self.dispatch_timeout();
} else {
- trace!("Removing machine {}", mtoken.0);
+ self.dispatch_io(event);
}
}
}
diff --git a/src/itf_http.rs b/src/itf_http.rs
index f3995cd..f234740 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -2,11 +2,11 @@ use mio::{Ready,Token,Event};
use mio::tcp::TcpStream;
use std::net::SocketAddr;
use std::io::ErrorKind;
-use eventloop::{Machine,Context};
+use std::time::Duration;
+use eventloop::{Machine,Context,TToken};
use netbuf::Buf;
-// TODO: timeouts
// TODO: Set a max size on the buffers and provide backpressure
pub struct ItfHttp {
sock: TcpStream,
@@ -14,7 +14,9 @@ pub struct ItfHttp {
io: Token,
ioreg: Ready,
rbuf: Buf,
- wbuf: Buf
+ wbuf: Buf,
+ iotimeout: TToken, /* timeout on any form of IO */
+ iotimeout_val: Duration,
}
@@ -26,7 +28,9 @@ impl ItfHttp {
io: Token(0),
ioreg: Ready::readable(),
rbuf: Buf::new(),
- wbuf: Buf::new()
+ wbuf: Buf::new(),
+ iotimeout: TToken(0),
+ iotimeout_val: Duration::from_secs(10) // TODO: Make configurable & increase
}
}
@@ -88,11 +92,12 @@ impl ItfHttp {
self.ioreg = reg;
ctx.reregister(&self.sock, self.io, self.ioreg);
}
+ ctx.reset_timeout(self.iotimeout, self.iotimeout_val);
}
fn remove(&mut self, ctx: &mut Context) {
- ctx.deregister(&self.sock);
- ctx.unassign(self.io);
+ ctx.deregister(&self.sock, self.io);
+ ctx.cancel_timeout(self.iotimeout);
ctx.remove();
}
}
@@ -100,8 +105,8 @@ impl ItfHttp {
impl Machine for ItfHttp {
fn init(&mut self, ctx: &mut Context) {
- self.io = ctx.assign();
- ctx.register(&self.sock, self.io, self.ioreg);
+ self.io = ctx.register(&self.sock, self.ioreg);
+ self.iotimeout = ctx.set_timeout(self.iotimeout_val);
}
fn handle(&mut self, ctx: &mut Context, ev: Event) {
@@ -115,4 +120,9 @@ impl Machine for ItfHttp {
self.set_ioreg(ctx);
}
}
+
+ fn timeout(&mut self, ctx: &mut Context, _: TToken) {
+ debug!("{}: No IO activity in {} seconds", self.addr, self.iotimeout_val.as_secs());
+ self.remove(ctx);
+ }
}
diff --git a/src/listener.rs b/src/listener.rs
index 293099e..fcac06d 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -2,8 +2,9 @@ use mio::{Ready,Token,Event};
use mio::tcp::TcpListener;
use std::net::SocketAddr;
use std::io::{Result,ErrorKind};
+use std::time::Duration;
use config::Config;
-use eventloop::{Machine,Context,EventLoop};
+use eventloop::{Machine,Context,EventLoop,TToken};
use itf_http;
@@ -11,7 +12,8 @@ use itf_http;
struct Listener {
sock: TcpListener,
addr: SocketAddr,
- io: Token
+ io: Token,
+ timeout: TToken,
}
@@ -22,7 +24,8 @@ impl Listener {
Ok(Listener {
sock: sock,
addr: *addr,
- io: Token(0)
+ io: Token(0),
+ timeout: TToken(0),
})
}
}
@@ -30,8 +33,8 @@ impl Listener {
impl Machine for Listener {
fn init(&mut self, ctx: &mut Context) {
- self.io = ctx.assign();
- ctx.register(&self.sock, self.io, Ready::readable());
+ self.io = ctx.register(&self.sock, Ready::readable());
+ self.timeout = ctx.alloc_timeout();
info!("Listening on {}", self.addr);
}
@@ -42,11 +45,13 @@ impl Machine for Listener {
match err.kind() {
ErrorKind::WouldBlock |
ErrorKind::Interrupted |
- ErrorKind::TimedOut => { },
+ ErrorKind::TimedOut => { /* Uninteresting temporary errors */ },
_ => {
error!("Error accepting on {}: {}", self.addr, err);
- // If the error is persistent, we may be getting into an infinite loop.
- // TODO: Have a back-off timer here (especially for EMFILE).
+ // Stop listening for short time. If this error is not persistent (like
+ // EMFILE), it will solve itself by just waiting. Not much else we can do.
+ ctx.deregister(&self.sock, self.io);
+ ctx.reset_timeout(self.timeout, Duration::from_millis(500));
}
}
},
@@ -56,6 +61,10 @@ impl Machine for Listener {
}
}
}
+
+ fn timeout(&mut self, ctx: &mut Context, _: TToken) {
+ self.io = ctx.register(&self.sock, Ready::readable());
+ }
}