summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2016-09-08 13:35:30 +0200
committerYorhel <git@yorhel.nl>2016-09-08 13:35:30 +0200
commit60e0eaab3674e88c7ff13a82ee44d524dbe59477 (patch)
treeabc06d33569764e75abd8975bb96d87387b776d6
parent4a94d72c7f16e7a347c3239d9327587e80ae32c6 (diff)
Add IoStream abstraction + max read buf size + revamp IO & timer registration
-rw-r--r--src/eventloop.rs85
-rw-r--r--src/iostream.rs177
-rw-r--r--src/itf_http.rs115
-rw-r--r--src/listener.rs11
-rw-r--r--src/main.rs1
5 files changed, 252 insertions, 137 deletions
diff --git a/src/eventloop.rs b/src/eventloop.rs
index 9192401..48c5984 100644
--- a/src/eventloop.rs
+++ b/src/eventloop.rs
@@ -92,7 +92,7 @@ impl From<TToken> for usize { fn from(val: TToken) -> usize { val.0 } }
pub struct EventLoop {
poll: Poll,
- tokens: Slab<MToken, Token>,
+ regs: Slab<Option<MToken>, Token>,
timer: Timer<TToken>,
timers: Slab<Option<(MToken,Timeout)>, TToken>,
// A machine entry is set to None during a method call on the Machine object.
@@ -106,57 +106,59 @@ pub struct Context<'a> {
impl<'a> Context<'a> {
- // This method is opinionated in always providing a PollOpt::level(). It's the only option that
- // makes sense. :)
- pub fn register<E: ?Sized>(&mut self, io: &E, interest: mio::Ready) -> Token where E: mio::Evented {
- let token = slab_insert(&mut self.parent.tokens, self.machine);
- self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap();
- token
+ pub fn reg_alloc(&mut self) -> Token {
+ slab_insert(&mut self.parent.regs, None)
}
- pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
- assert_eq!(self.parent.tokens[token], self.machine);
- self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap();
+ pub fn reg_free<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented {
+ if let Some(Some(_)) = self.parent.regs.remove(token) {
+ self.parent.poll.deregister(io).unwrap();
+ }
}
- pub fn deregister<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented {
- assert_eq!(self.parent.tokens[token], self.machine);
- self.parent.tokens.remove(token);
- self.parent.poll.deregister(io).unwrap();
+ pub fn reg_set<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
+ let r = self.parent.regs.entry(token).expect("Invalid token in reg_set").replace(Some(self.machine));
+ match r {
+ None => { self.parent.poll. register(io, token, interest, mio::PollOpt::level()).unwrap(); }
+ Some(_) => { self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap(); },
+ }
+ }
+
+ pub fn reg_unset<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented {
+ let mut r = self.parent.regs.get_mut(token).expect("Invalid token in reg_unset");
+ if let Some(_) = *r {
+ self.parent.poll.deregister(io).unwrap();
+ }
+ *r = None;
}
- // The _timeout() methods behave the same as the above register methods, and follow the same
- // control flow, independent of whether the timeout has passed in between or not:
- // 1. set_timeout() or alloc_timeout() to get and initialize a token,
- // 2. Followed by any number of reset_timeout() to change the duration of the timer if it is
- // still active, or set the new timeout after it has fired.
- // 3. cancel_timeout() to stop the timer if it still active, and deallocate the timeout
- // Allocates a timeout token but does not set a timeout. The token can be activated with
- // reset_timeout()
- pub fn alloc_timeout(&mut self) -> TToken {
+ pub fn timeout_alloc(&mut self) -> TToken {
slab_insert(&mut self.parent.timers, None)
}
- pub fn set_timeout(&mut self, d: Duration) -> TToken {
- let token = self.alloc_timeout();
- self.reset_timeout(token, d);
- token
+ pub fn timeout_free(&mut self, token: TToken) {
+ if let Some(Some((_, ref timeout))) = self.parent.timers.remove(token) {
+ self.parent.timer.cancel_timeout(timeout);
+ }
}
- pub fn reset_timeout(&mut self, token: TToken, d: Duration) {
- if let Some((_, ref timeout)) = self.parent.timers[token] {
+ pub fn timeout_set(&mut self, token: TToken, d: Duration) {
+ let newval = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap()));
+ let oldval = self.parent.timers.entry(token).expect("Invalid token in timeout_unset").replace(newval);
+ if let Some((_, ref timeout)) = oldval {
self.parent.timer.cancel_timeout(timeout);
}
- self.parent.timers[token] = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap()));
}
- pub fn cancel_timeout(&mut self, token: TToken) {
- if let Some( Some((_, timeout)) ) = self.parent.timers.remove(token) {
- self.parent.timer.cancel_timeout(&timeout);
+ pub fn timeout_unset(&mut self, token: TToken) {
+ let oldval = self.parent.timers.entry(token).expect("Invalid token in timeout_unset").replace(None);
+ if let Some((_, ref timeout)) = oldval {
+ self.parent.timer.cancel_timeout(timeout);
}
}
+
pub fn spawn<F>(&mut self, f: F) where F: Send + 'static + FnOnce(&mut Context) -> Box<Machine> {
self.parent.spawn(f);
}
@@ -181,7 +183,7 @@ impl EventLoop {
EventLoop {
poll: poll,
- tokens: Slab::with_capacity(16),
+ regs: Slab::with_capacity(16),
timer: timer,
timers: Slab::with_capacity(16),
machines: Slab::with_capacity(16),
@@ -221,7 +223,7 @@ impl EventLoop {
}
fn dispatch_io(&mut self, event: mio::Event) {
- if let Some(&mtoken) = self.tokens.get(event.token()) {
+ if let Some(&Some(mtoken)) = self.regs.get(event.token()) {
self.dispatch(mtoken, |m, ctx| {
trace!("[{}] Calling handle for event {} state {:?}", mtoken.0, event.token().0, event.kind());
m.handle(ctx, event)
@@ -245,12 +247,13 @@ impl EventLoop {
let mut events = Events::with_capacity(64);
debug!("Entering event loop");
loop {
- self.poll.poll(&mut events, None).unwrap();
- for event in events.iter() {
- if event.token() == TIMER_TOKEN {
- self.dispatch_timeout();
- } else {
- self.dispatch_io(event);
+ if let Ok(_) = self.poll.poll(&mut events, None) {
+ for event in events.iter() {
+ if event.token() == TIMER_TOKEN {
+ self.dispatch_timeout();
+ } else {
+ self.dispatch_io(event);
+ }
}
}
}
diff --git a/src/iostream.rs b/src/iostream.rs
new file mode 100644
index 0000000..22d5ef0
--- /dev/null
+++ b/src/iostream.rs
@@ -0,0 +1,177 @@
+use mio::{Ready,Token,Event};
+use mio::tcp::TcpStream;
+use std::io::{Result,ErrorKind,Error};
+use std::time::Duration;
+use netbuf::Buf;
+
+use eventloop::{Context,TToken};
+
+
+pub struct IoStream {
+ pub sock: TcpStream,
+ pub rbuf: Buf,
+ pub wbuf: Buf,
+ io: Token,
+ ioreg: Ready,
+ rtime: TToken,
+ wtime: TToken,
+ rtimeout: Duration,
+ wtimeout: Duration,
+ rmax: usize,
+ rexpect: bool,
+}
+
+
+impl IoStream {
+ pub fn new(ctx: &mut Context, sock: TcpStream, rtimeout: Duration, wtimeout: Duration, rmax: usize) -> IoStream {
+ let io = ctx.reg_alloc();
+ let rtime = ctx.timeout_alloc();
+ ctx.reg_set(&sock, io, Ready::readable());
+ ctx.timeout_set(rtime, rtimeout);
+ IoStream {
+ sock: sock,
+ io: io,
+ ioreg: Ready::readable(),
+ rbuf: Buf::new(),
+ wbuf: Buf::new(),
+ rtime: rtime,
+ wtime: ctx.timeout_alloc(),
+ rtimeout: rtimeout,
+ wtimeout: wtimeout,
+ rmax: rmax,
+ rexpect: true,
+ }
+ }
+
+ /// Modifies the expect_read flag. If this flag is set, we are expecting to read data from the
+ /// other end, and will throw an error if nothing is read within the timeout.
+ /// This flag merely affects the timeout behaviour; we keep trying to read even if we are not
+ /// expecting any data, in order catch a closed connection early and optimize I/O when
+ /// receiving multiple requests over one stream. Until the read buffer is full, that is.
+ #[allow(dead_code)]
+ pub fn set_expect_read(&mut self, ctx: &mut Context, x: bool) {
+ self.rexpect = x;
+ self.set_ioreg(ctx);
+ }
+
+ /// Updates the I/O registration with the current state and starts/stops timers as necessary.
+ /// Should be called whenever something is consumed from the read buffer or something is added
+ /// to the write buffer.
+ pub fn set_ioreg(&mut self, ctx: &mut Context) {
+ let oldreg = self.ioreg;
+
+ // Optimization: If the write buffer was empty before but contains data now, we can already
+ // try to write it out at this point. This way we can avoid going through the event loop if
+ // the OS buffers have enough space.
+ // TODO: Measure how effective this is in practice; if this only tends to happen on the
+ // first write to a new socket then it might not be worth the effort.
+ if self.wbuf.len() > 0 && !oldreg.contains(Ready::writable()) {
+ // Ignore errors, if we catch an error we can handle it in the next loop iteration.
+ let _ = self.wbuf.write_to(&mut self.sock);
+ }
+
+ let mut reg = Ready::none();
+
+ if self.rbuf.len() < self.rmax {
+ reg.insert(Ready::readable());
+ if self.rexpect && !oldreg.contains(Ready::readable()) {
+ ctx.timeout_set(self.rtime, self.rtimeout);
+ }
+ } else {
+ ctx.timeout_unset(self.rtime);
+ }
+
+ if self.wbuf.len() > 0 {
+ reg.insert(Ready::writable());
+ if !oldreg.contains(Ready::writable()) {
+ ctx.timeout_set(self.wtime, self.wtimeout);
+ }
+ } else {
+ ctx.timeout_unset(self.wtime);
+ }
+
+ if reg != oldreg {
+ if reg == Ready::none() {
+ ctx.reg_unset(&self.sock, self.io);
+ } else {
+ ctx.reg_set(&self.sock, self.io, reg);
+ }
+ }
+ self.ioreg = reg;
+ }
+
+ fn handle_io<F,G>(&mut self, cond: bool, io: F, tim: G) -> Result<usize>
+ where F: FnOnce(&mut Self) -> Result<usize>, G: FnOnce(&mut Self) {
+ if !cond {
+ return Ok(0);
+ }
+
+ // Error conversion:
+ // - Ok(0) is converted into a ConnectionReset error
+ // - Temporary errors are converted into Ok(0)
+ match io(self) {
+ Err(err) => {
+ match err.kind() {
+ ErrorKind::WouldBlock |
+ ErrorKind::Interrupted => {
+ Ok(0)
+ },
+ _ => {
+ Err(err)
+ }
+ }
+ },
+ Ok(0) => {
+ Err(Error::new(ErrorKind::ConnectionReset, "Connection reset by peer"))
+ },
+ Ok(n) => {
+ tim(self);
+ Ok(n)
+ }
+ }
+ }
+
+ /// Handle an IO event. Returns the (read_bytes, write_bytes) on success. Both can be 0 if the
+ /// event was not intended for this steam or if the read/write operations didn't feel like
+ /// doing anything. It's possible that data was read from the stream even if this method
+ /// returns an error.
+ pub fn handle(&mut self, ctx: &mut Context, ev: Event) -> Result<(usize, usize)> {
+ if ev.token() != self.io {
+ return Ok((0,0));
+ }
+
+ let canr = ev.kind().is_readable() && self.rbuf.len() < self.rmax;
+ let canw = ev.kind().is_writable() && self.wbuf.len() > 0;
+
+ let rd = try!(self.handle_io(canr,
+ |s|{ s.rbuf.read_from(&mut s.sock) },
+ |s|{ if s.rexpect { ctx.timeout_set(s.rtime, s.rtimeout) } }
+ ));
+
+ let wr = try!(self.handle_io(canw,
+ |s|{ s.wbuf.write_to(&mut s.sock) },
+ |s|{ ctx.timeout_set(s.wtime, s.wtimeout) }
+ ));
+
+ if rd > 0 || wr > 0 {
+ self.set_ioreg(ctx);
+ }
+ Ok((rd,wr))
+ }
+
+ pub fn timeout(&mut self, t: TToken) -> Result<()> {
+ if t == self.rtime {
+ Err(Error::new(ErrorKind::TimedOut, "Read timeout"))
+ } else if t == self.wtime {
+ Err(Error::new(ErrorKind::TimedOut, "Write timeout"))
+ } else {
+ Ok(())
+ }
+ }
+
+ pub fn remove(&mut self, ctx: &mut Context) {
+ ctx.reg_free(&self.sock, self.io);
+ ctx.timeout_free(self.rtime);
+ ctx.timeout_free(self.wtime);
+ }
+}
diff --git a/src/itf_http.rs b/src/itf_http.rs
index 012323c..0d261a2 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -1,31 +1,28 @@
-use mio::{Ready,Token,Event};
+use mio::Event;
use mio::tcp::TcpStream;
use std::net::SocketAddr;
-use std::io::ErrorKind;
use std::sync::Arc;
-use netbuf::Buf;
+use iostream::IoStream;
use listener::Interface;
use eventloop::{Machine,Context,TToken,Action};
-// TODO: Set a max size on the buffers and provide backpressure
pub struct ItfHttp {
- sock: TcpStream,
+ io: IoStream,
addr: SocketAddr,
- io: Token,
- ioreg: Ready,
- rbuf: Buf,
- wbuf: Buf,
- iotimeout: TToken, /* timeout on any form of IO */
itf: Arc<Interface>,
}
-macro_rules! try_act {
- ( $e: expr ) => {{
- if let Some(x) = $e {
- return Some(x);
+macro_rules! try_rm {
+ ( $s: expr, $e: expr ) => {{
+ match $e {
+ Err(e) => {
+ debug!("{}: {}", $s.addr, e);
+ return Some(Action::Remove);
+ },
+ Ok(v) => v
}
}};
}
@@ -33,100 +30,36 @@ macro_rules! try_act {
impl ItfHttp {
pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
- let io = ctx.register(&sock, Ready::readable());
+ let io = IoStream::new(ctx, sock, itf.cfg.io_timeout, itf.cfg.io_timeout, 4096);
ItfHttp {
- sock: sock,
- addr: addr,
io: io,
- ioreg: Ready::readable(),
- rbuf: Buf::new(),
- wbuf: Buf::new(),
- iotimeout: ctx.set_timeout(itf.cfg.io_timeout),
+ addr: addr,
itf: itf,
}
}
+}
+
+
+impl Machine for ItfHttp {
+ fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> {
+ try_rm!(self, self.io.handle(ctx, ev));
- fn handle_read(&mut self) -> Option<Action> {
- match self.rbuf.read_from(&mut self.sock) {
- Err(err) => {
- match err.kind() {
- ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
- _ => {
- debug!("{}: Read error: {}", self.addr, err);
- return Some(Action::Remove);
- }
- }
- return None;
- },
- Ok(0) => {
- debug!("{}: Connection closed", self.addr);
- return Some(Action::Remove);
- },
- _ => {}
- }
// This is where we parse the stuff, generate a request object, throw the request object
// through some handlers to get a response and send it back. But for now let's just act
// like we wanted to be an echo server all along.
- self.rbuf.write_to(&mut self.wbuf).unwrap();
- None
- }
+ let _ = self.io.rbuf.write_to(&mut self.io.wbuf);
+ self.io.set_ioreg(ctx);
- fn handle_write(&mut self) -> Option<Action> {
- if let Err(err) = self.wbuf.write_to(&mut self.sock) {
- match err.kind() {
- ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
- _ => {
- debug!("{}: Write error: {}", self.addr, err);
- return Some(Action::Remove);
- }
- }
- }
None
}
- fn set_ioreg(&mut self, ctx: &mut Context) -> Option<Action> {
- // Optimization: If the write buffer was empty before but contains data now, we can already
- // try to write it out at this point. This way we can avoid going through the event loop if
- // the OS buffers have enough space.
- // TODO: Measure how effective this is in practice; if this only tends to happen on the
- // first write to a new socket then it might not be worth the effort.
- if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) {
- try_act!(self.handle_write());
- }
-
- let mut reg = Ready::readable();
- if self.wbuf.len() > 0 {
- reg.insert(Ready::writable());
- }
- if reg != self.ioreg {
- self.ioreg = reg;
- ctx.reregister(&self.sock, self.io, self.ioreg);
- }
- ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout);
+ fn timeout(&mut self, _: &mut Context, t: TToken) -> Option<Action> {
+ try_rm!(self, self.io.timeout(t));
None
}
-}
-
-
-impl Machine for ItfHttp {
- fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> {
- if ev.kind().is_readable() {
- try_act!(self.handle_read());
- }
- if ev.kind().is_writable() {
- try_act!(self.handle_write());
- }
- self.set_ioreg(ctx)
- }
-
- fn timeout(&mut self, _: &mut Context, _: TToken) -> Option<Action> {
- debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs());
- Some(Action::Remove)
- }
fn remove(&mut self, ctx: &mut Context) {
self.itf.release();
- ctx.deregister(&self.sock, self.io);
- ctx.cancel_timeout(self.iotimeout);
+ self.io.remove(ctx);
}
}
diff --git a/src/listener.rs b/src/listener.rs
index d6dc45f..77c690a 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -80,12 +80,13 @@ struct Listener {
impl Listener {
fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpListener, addr: SocketAddr) -> Listener {
info!("Listening on {}", addr);
- let io = ctx.register(&sock, Ready::readable());
+ let io = ctx.reg_alloc();
+ ctx.reg_set(&sock, io, Ready::readable());
Listener {
sock: sock,
addr: addr,
io: io,
- timeout: ctx.alloc_timeout(),
+ timeout: ctx.timeout_alloc(),
itf: itf,
}
}
@@ -104,8 +105,8 @@ impl Machine for Listener {
error!("Error accepting on {}: {}", self.addr, err);
// Stop listening for short time. If this error is not persistent (like
// EMFILE), it will solve itself by just waiting. Not much else we can do.
- ctx.deregister(&self.sock, self.io);
- ctx.reset_timeout(self.timeout, Duration::from_millis(500));
+ ctx.reg_unset(&self.sock, self.io);
+ ctx.timeout_set(self.timeout, Duration::from_millis(500));
}
}
},
@@ -124,7 +125,7 @@ impl Machine for Listener {
}
fn timeout(&mut self, ctx: &mut Context, _: TToken) -> Option<Action> {
- self.io = ctx.register(&self.sock, Ready::readable());
+ ctx.reg_set(&self.sock, self.io, Ready::readable());
None
}
}
diff --git a/src/main.rs b/src/main.rs
index 5b990c9..8672f4b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -13,6 +13,7 @@ extern crate mio;
extern crate slab;
extern crate netbuf;
+mod iostream;
mod config;
mod eventloop;
mod listener;