diff options
author | Yorhel <git@yorhel.nl> | 2016-09-07 15:04:19 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2016-09-07 15:05:04 +0200 |
commit | ccd38998fd8ff2bbf2f916e6e0d04087eaaf6293 (patch) | |
tree | 539b4849c0d11f902defb6a66445c4f74c7bf075 /src/listener.rs | |
parent | efe595493c9a8dd1e4df3b9045fff012521449c8 (diff) |
Add max-connections (global+interface) and io-timeout (interface) config options
TODO: Add a global io-timeout that is inherited by interfaces that don't
set it? Or is that needless complexity?
I've added Sync bounds to Machines and implemented all this using
Arc/Mutex in order for Machines to be runnable on multiple threads later
on.
Diffstat (limited to 'src/listener.rs')
-rw-r--r-- | src/listener.rs | 87 |
1 files changed, 81 insertions, 6 deletions
diff --git a/src/listener.rs b/src/listener.rs index fcac06d..f1a1ad9 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -3,22 +3,82 @@ use mio::tcp::TcpListener; use std::net::SocketAddr; use std::io::{Result,ErrorKind}; use std::time::Duration; -use config::Config; use eventloop::{Machine,Context,EventLoop,TToken}; +use std::sync::{Arc,Mutex}; +use config; use itf_http; + +struct ConnCount { + max: u32, + cnt: Mutex<u32>, +} + +impl ConnCount { + fn new(max: u32) -> ConnCount { + ConnCount { max: max, cnt: Mutex::new(0) } + } + + // Increases count and returns true if there is a slot free + fn acquire(&self) -> bool { + let mut cnt = self.cnt.lock().unwrap(); + if *cnt >= self.max { + false + } else { + *cnt += 1; + true + } + } + + fn release(&self) { + let mut cnt = self.cnt.lock().unwrap(); + *cnt -= 1; + } +} + + + +pub struct Interface { + connections: ConnCount, + global_connections: Arc<ConnCount>, + pub cfg: config::Interface, +} + + +impl Interface { + pub fn acquire(&self) -> bool { + if self.connections.acquire() { + if self.global_connections.acquire() { + return true; + } else { + self.connections.release(); + } + } + false + } + + // TODO: Using a guard object is much less error-prone than this manual release(), but needs + // either yet-another-Arc or unsafe code. + pub fn release(&self) { + self.connections.release(); + self.global_connections.release(); + } +} + + struct Listener { sock: TcpListener, addr: SocketAddr, io: Token, timeout: TToken, + itf: Arc<Interface>, } impl Listener { - fn new(addr: &SocketAddr) -> Result<Listener> { + fn new(itf: Arc<Interface>, addr: &SocketAddr) -> Result<Listener> { trace!("Binding {}", addr); let sock = try!(TcpListener::bind(addr)); Ok(Listener { @@ -26,6 +86,7 @@ impl Listener { addr: *addr, io: Token(0), timeout: TToken(0), + itf: itf, }) } } @@ -39,7 +100,6 @@ impl Machine for Listener { } fn handle(&mut self, ctx: &mut Context, _: Event) { - // TODO: max_connections or something match self.sock.accept() { Err(err) => { match err.kind() { @@ -57,7 +117,12 @@ impl Machine for Listener { }, Ok((sock, addr)) => { debug!("{}: New connection", addr); - ctx.spawn(Box::new(itf_http::ItfHttp::new(sock, addr))); + if self.itf.acquire() { + ctx.spawn(Box::new(itf_http::ItfHttp::new(self.itf.clone(), sock, addr))); + } else { + // TODO: Specify which connection limit + warn!("Connection limit reached, consider increasing max-connections."); + } } } } @@ -68,10 +133,20 @@ impl Machine for Listener { } -pub fn setup(ev: &mut EventLoop, cfg: &Config) -> Result<()> { +pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> { + // XXX: All this ARC stuff is silly, listeners stay alive forever so none of this memory is + // ever deallocated. + let global_connections = Arc::new(ConnCount::new(cfg.max_connections)); + for itf in &cfg.itf { + let interface = Arc::new(Interface { + connections: ConnCount::new(itf.max_connections), + global_connections: global_connections.clone(), + cfg: itf.clone(), // Silly, config is never deallocated. + }); + for addr in &itf.addr { - let lst = try!(Listener::new(&addr)); + let lst = try!(Listener::new(interface.clone(), &addr)); ev.spawn(Box::new(lst)); } } |