summaryrefslogtreecommitdiff
path: root/src/itf_http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/itf_http.rs')
-rw-r--r--src/itf_http.rs115
1 files changed, 24 insertions, 91 deletions
diff --git a/src/itf_http.rs b/src/itf_http.rs
index 012323c..0d261a2 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -1,31 +1,28 @@
-use mio::{Ready,Token,Event};
+use mio::Event;
use mio::tcp::TcpStream;
use std::net::SocketAddr;
-use std::io::ErrorKind;
use std::sync::Arc;
-use netbuf::Buf;
+use iostream::IoStream;
use listener::Interface;
use eventloop::{Machine,Context,TToken,Action};
-// TODO: Set a max size on the buffers and provide backpressure
pub struct ItfHttp {
- sock: TcpStream,
+ io: IoStream,
addr: SocketAddr,
- io: Token,
- ioreg: Ready,
- rbuf: Buf,
- wbuf: Buf,
- iotimeout: TToken, /* timeout on any form of IO */
itf: Arc<Interface>,
}
-macro_rules! try_act {
- ( $e: expr ) => {{
- if let Some(x) = $e {
- return Some(x);
+macro_rules! try_rm {
+ ( $s: expr, $e: expr ) => {{
+ match $e {
+ Err(e) => {
+ debug!("{}: {}", $s.addr, e);
+ return Some(Action::Remove);
+ },
+ Ok(v) => v
}
}};
}
@@ -33,100 +30,36 @@ macro_rules! try_act {
impl ItfHttp {
pub fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
- let io = ctx.register(&sock, Ready::readable());
+ let io = IoStream::new(ctx, sock, itf.cfg.io_timeout, itf.cfg.io_timeout, 4096);
ItfHttp {
- sock: sock,
- addr: addr,
io: io,
- ioreg: Ready::readable(),
- rbuf: Buf::new(),
- wbuf: Buf::new(),
- iotimeout: ctx.set_timeout(itf.cfg.io_timeout),
+ addr: addr,
itf: itf,
}
}
+}
+
+
+impl Machine for ItfHttp {
+ fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> {
+ try_rm!(self, self.io.handle(ctx, ev));
- fn handle_read(&mut self) -> Option<Action> {
- match self.rbuf.read_from(&mut self.sock) {
- Err(err) => {
- match err.kind() {
- ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
- _ => {
- debug!("{}: Read error: {}", self.addr, err);
- return Some(Action::Remove);
- }
- }
- return None;
- },
- Ok(0) => {
- debug!("{}: Connection closed", self.addr);
- return Some(Action::Remove);
- },
- _ => {}
- }
// This is where we parse the stuff, generate a request object, throw the request object
// through some handlers to get a response and send it back. But for now let's just act
// like we wanted to be an echo server all along.
- self.rbuf.write_to(&mut self.wbuf).unwrap();
- None
- }
+ let _ = self.io.rbuf.write_to(&mut self.io.wbuf);
+ self.io.set_ioreg(ctx);
- fn handle_write(&mut self) -> Option<Action> {
- if let Err(err) = self.wbuf.write_to(&mut self.sock) {
- match err.kind() {
- ErrorKind::WouldBlock | ErrorKind::Interrupted => { },
- _ => {
- debug!("{}: Write error: {}", self.addr, err);
- return Some(Action::Remove);
- }
- }
- }
None
}
- fn set_ioreg(&mut self, ctx: &mut Context) -> Option<Action> {
- // Optimization: If the write buffer was empty before but contains data now, we can already
- // try to write it out at this point. This way we can avoid going through the event loop if
- // the OS buffers have enough space.
- // TODO: Measure how effective this is in practice; if this only tends to happen on the
- // first write to a new socket then it might not be worth the effort.
- if self.wbuf.len() > 0 && !self.ioreg.contains(Ready::writable()) {
- try_act!(self.handle_write());
- }
-
- let mut reg = Ready::readable();
- if self.wbuf.len() > 0 {
- reg.insert(Ready::writable());
- }
- if reg != self.ioreg {
- self.ioreg = reg;
- ctx.reregister(&self.sock, self.io, self.ioreg);
- }
- ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout);
+ fn timeout(&mut self, _: &mut Context, t: TToken) -> Option<Action> {
+ try_rm!(self, self.io.timeout(t));
None
}
-}
-
-
-impl Machine for ItfHttp {
- fn handle(&mut self, ctx: &mut Context, ev: Event) -> Option<Action> {
- if ev.kind().is_readable() {
- try_act!(self.handle_read());
- }
- if ev.kind().is_writable() {
- try_act!(self.handle_write());
- }
- self.set_ioreg(ctx)
- }
-
- fn timeout(&mut self, _: &mut Context, _: TToken) -> Option<Action> {
- debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs());
- Some(Action::Remove)
- }
fn remove(&mut self, ctx: &mut Context) {
self.itf.release();
- ctx.deregister(&self.sock, self.io);
- ctx.cancel_timeout(self.iotimeout);
+ self.io.remove(ctx);
}
}