summaryrefslogtreecommitdiff
path: root/src/itf_http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/itf_http.rs')
-rw-r--r--src/itf_http.rs66
1 files changed, 36 insertions, 30 deletions
diff --git a/src/itf_http.rs b/src/itf_http.rs
index cda923f..012323c 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -2,11 +2,12 @@ use mio::{Ready,Token,Event};
use mio::tcp::TcpStream;
use std::net::SocketAddr;
use std::io::ErrorKind;
-use listener::Interface;
-use eventloop::{Machine,Context,TToken};
use std::sync::Arc;
use netbuf::Buf;
+use listener::Interface;
+use eventloop::{Machine,Context,TToken,Action};
+
// TODO: Set a max size on the buffers and provide backpressure
pub struct ItfHttp {
@@ -21,6 +22,15 @@ pub struct ItfHttp {
}
+macro_rules! try_act {
+ ( $e: expr ) => {{
+ if let Some(x) = $e {
+ return Some(x);
+ }
+ }};
+}
+
+
impl ItfHttp {
pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
let io = ctx.register(&sock, Ready::readable());
@@ -36,22 +46,21 @@ impl ItfHttp {
}
}
- fn handle_read(&mut self, ctx: &mut Context) {
+ fn handle_read(&mut self) -> Option<Action> {
match self.rbuf.read_from(&mut self.sock) {
Err(err) => {
match err.kind() {
ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
_ => {
debug!("{}: Read error: {}", self.addr, err);
- self.remove(ctx);
+ return Some(Action::Remove);
}
}
- return;
+ return None;
},
Ok(0) => {
debug!("{}: Connection closed", self.addr);
- self.remove(ctx);
- return;
+ return Some(Action::Remove);
},
_ => {}
}
@@ -59,31 +68,30 @@ impl ItfHttp {
// through some handlers to get a response and send it back. But for now let's just act
// like we wanted to be an echo server all along.
self.rbuf.write_to(&mut self.wbuf).unwrap();
+ None
}
- fn handle_write(&mut self, ctx: &mut Context) {
+ fn handle_write(&mut self) -> Option<Action> {
if let Err(err) = self.wbuf.write_to(&mut self.sock) {
match err.kind() {
ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
_ => {
debug!("{}: Write error: {}", self.addr, err);
- self.remove(ctx);
+ return Some(Action::Remove);
}
}
}
+ None
}
- fn set_ioreg(&mut self, ctx: &mut Context) {
+ fn set_ioreg(&mut self, ctx: &mut Context) -> Option<Action> {
// Optimization: If the write buffer was empty before but contains data now, we can already
// try to write it out at this point. This way we can avoid going through the event loop if
// the OS buffers have enough space.
// TODO: Measure how effective this is in practice; if this only tends to happen on the
// first write to a new socket then it might not be worth the effort.
if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) {
- self.handle_write(ctx);
- if ctx.is_removed() {
- return;
- }
+ try_act!(self.handle_write());
}
let mut reg = Ready::readable();
@@ -95,32 +103,30 @@ impl ItfHttp {
ctx.reregister(&self.sock, self.io, self.ioreg);
}
ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout);
- }
-
- fn remove(&mut self, ctx: &mut Context) {
- self.itf.release();
- ctx.deregister(&self.sock, self.io);
- ctx.cancel_timeout(self.iotimeout);
- ctx.remove();
+ None
}
}
impl Machine for ItfHttp {
- fn handle(&mut self, ctx: &mut Context, ev: Event) {
+ fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> {
if ev.kind().is_readable() {
- self.handle_read(ctx);
+ try_act!(self.handle_read());
}
- if !ctx.is_removed() && ev.kind().is_writable() {
- self.handle_write(ctx);
- }
- if !ctx.is_removed() {
- self.set_ioreg(ctx);
+ if ev.kind().is_writable() {
+ try_act!(self.handle_write());
}
+ self.set_ioreg(ctx)
}
- fn timeout(&mut self, ctx: &mut Context, _: TToken) {
+ fn timeout(&mut self, _: &mut Context, _: TToken) -> Option<Action> {
debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs());
- self.remove(ctx);
+ Some(Action::Remove)
+ }
+
+ fn remove(&mut self, ctx: &mut Context) {
+ self.itf.release();
+ ctx.deregister(&self.sock, self.io);
+ ctx.cancel_timeout(self.iotimeout);
}
}