use mio::{Ready,Token,Event}; use mio::tcp::TcpListener; use std::net::SocketAddr; use std::io::{Result,ErrorKind}; use std::time::Duration; use eventloop::{Machine,Context,EventLoop,TToken}; use std::sync::{Arc,Mutex}; use config; use itf_http; struct ConnCount { max: u32, cnt: Mutex, } impl ConnCount { fn new(max: u32) -> ConnCount { ConnCount { max: max, cnt: Mutex::new(0) } } // Increases count and returns true if there is a slot free fn acquire(&self) -> bool { let mut cnt = self.cnt.lock().unwrap(); if *cnt >= self.max { false } else { *cnt += 1; true } } fn release(&self) { let mut cnt = self.cnt.lock().unwrap(); *cnt -= 1; } } pub struct Interface { connections: ConnCount, global_connections: Arc, pub cfg: config::Interface, } impl Interface { pub fn acquire(&self) -> bool { if self.connections.acquire() { if self.global_connections.acquire() { return true; } else { self.connections.release(); } } false } // TODO: Using a guard object is much less error-prone than this manual release(), but needs // either yet-another-Arc or unsafe code. pub fn release(&self) { self.connections.release(); self.global_connections.release(); } } struct Listener { sock: TcpListener, addr: SocketAddr, io: Token, timeout: TToken, itf: Arc, } impl Listener { fn new(ctx: &mut Context, itf: Arc, sock: TcpListener, addr: SocketAddr) -> Listener { info!("Listening on {}", addr); let io = ctx.register(&sock, Ready::readable()); Listener { sock: sock, addr: addr, io: io, timeout: ctx.alloc_timeout(), itf: itf, } } } impl Machine for Listener { fn handle(&mut self, ctx: &mut Context, _: Event) { match self.sock.accept() { Err(err) => { match err.kind() { ErrorKind::WouldBlock | ErrorKind::Interrupted | ErrorKind::TimedOut => { /* Uninteresting temporary errors */ }, _ => { error!("Error accepting on {}: {}", self.addr, err); // Stop listening for short time. If this error is not persistent (like // EMFILE), it will solve itself by just waiting. Not much else we can do. ctx.deregister(&self.sock, self.io); ctx.reset_timeout(self.timeout, Duration::from_millis(500)); } } }, Ok((sock, addr)) => { debug!("{}: New connection", addr); if self.itf.acquire() { let itf = self.itf.clone(); ctx.spawn(move|ctx| { Box::new(itf_http::ItfHttp::new(ctx, itf.clone(), sock, addr)) }); } else { // TODO: Specify which connection limit warn!("Connection limit reached, consider increasing max-connections."); } } } } fn timeout(&mut self, ctx: &mut Context, _: TToken) { self.io = ctx.register(&self.sock, Ready::readable()); } } pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> { // XXX: All this ARC stuff is silly, listeners stay alive forever so none of this memory is // ever deallocated. let global_connections = Arc::new(ConnCount::new(cfg.max_connections)); for itf in &cfg.itf { let interface = Arc::new(Interface { connections: ConnCount::new(itf.max_connections), global_connections: global_connections.clone(), cfg: itf.clone(), // Silly, config is never deallocated. }); for &addr in &itf.addr { trace!("Binding {}", addr); let sock = try!(TcpListener::bind(&addr)); let itf = interface.clone(); ev.spawn(move|ctx| { Box::new(Listener::new(ctx, itf, sock, addr)) }); } } Ok(()) }