summaryrefslogtreecommitdiff
path: root/src/listener.rs
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2016-09-07 15:04:19 +0200
committerYorhel <git@yorhel.nl>2016-09-07 15:05:04 +0200
commitccd38998fd8ff2bbf2f916e6e0d04087eaaf6293 (patch)
tree539b4849c0d11f902defb6a66445c4f74c7bf075 /src/listener.rs
parentefe595493c9a8dd1e4df3b9045fff012521449c8 (diff)
Add max-connections (global+interface) and io-timeout (interface) config options
TODO: Add a global io-timeout that is inherited by interfaces that don't set it? Or is that needless complexity? I've added Sync bounds to Machines and implemented all this using Arc/Mutex in order for Machines to be runnable on multiple threads later on.
Diffstat (limited to 'src/listener.rs')
-rw-r--r--src/listener.rs87
1 files changed, 81 insertions, 6 deletions
diff --git a/src/listener.rs b/src/listener.rs
index fcac06d..f1a1ad9 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -3,22 +3,82 @@ use mio::tcp::TcpListener;
use std::net::SocketAddr;
use std::io::{Result,ErrorKind};
use std::time::Duration;
-use config::Config;
use eventloop::{Machine,Context,EventLoop,TToken};
+use std::sync::{Arc,Mutex};
+use config;
use itf_http;
+
+struct ConnCount {
+ max: u32,
+ cnt: Mutex<u32>,
+}
+
+impl ConnCount {
+ fn new(max: u32) -> ConnCount {
+ ConnCount { max: max, cnt: Mutex::new(0) }
+ }
+
+ // Increases count and returns true if there is a slot free
+ fn acquire(&self) -> bool {
+ let mut cnt = self.cnt.lock().unwrap();
+ if *cnt >= self.max {
+ false
+ } else {
+ *cnt += 1;
+ true
+ }
+ }
+
+ fn release(&self) {
+ let mut cnt = self.cnt.lock().unwrap();
+ *cnt -= 1;
+ }
+}
+
+
+
+pub struct Interface {
+ connections: ConnCount,
+ global_connections: Arc<ConnCount>,
+ pub cfg: config::Interface,
+}
+
+
+impl Interface {
+ pub fn acquire(&self) -> bool {
+ if self.connections.acquire() {
+ if self.global_connections.acquire() {
+ return true;
+ } else {
+ self.connections.release();
+ }
+ }
+ false
+ }
+
+ // TODO: Using a guard object is much less error-prone than this manual release(), but needs
+ // either yet-another-Arc or unsafe code.
+ pub fn release(&self) {
+ self.connections.release();
+ self.global_connections.release();
+ }
+}
+
+
struct Listener {
sock: TcpListener,
addr: SocketAddr,
io: Token,
timeout: TToken,
+ itf: Arc<Interface>,
}
impl Listener {
- fn new(addr: &SocketAddr) -> Result<Listener> {
+ fn new(itf: Arc<Interface>, addr: &SocketAddr) -> Result<Listener> {
trace!("Binding {}", addr);
let sock = try!(TcpListener::bind(addr));
Ok(Listener {
@@ -26,6 +86,7 @@ impl Listener {
addr: *addr,
io: Token(0),
timeout: TToken(0),
+ itf: itf,
})
}
}
@@ -39,7 +100,6 @@ impl Machine for Listener {
}
fn handle(&mut self, ctx: &mut Context, _: Event) {
- // TODO: max_connections or something
match self.sock.accept() {
Err(err) => {
match err.kind() {
@@ -57,7 +117,12 @@ impl Machine for Listener {
},
Ok((sock, addr)) => {
debug!("{}: New connection", addr);
- ctx.spawn(Box::new(itf_http::ItfHttp::new(sock, addr)));
+ if self.itf.acquire() {
+ ctx.spawn(Box::new(itf_http::ItfHttp::new(self.itf.clone(), sock, addr)));
+ } else {
+ // TODO: Specify which connection limit
+ warn!("Connection limit reached, consider increasing max-connections.");
+ }
}
}
}
@@ -68,10 +133,20 @@ impl Machine for Listener {
}
-pub fn setup(ev: &mut EventLoop, cfg: &Config) -> Result<()> {
+pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> {
+ // XXX: All this ARC stuff is silly, listeners stay alive forever so none of this memory is
+ // ever deallocated.
+ let global_connections = Arc::new(ConnCount::new(cfg.max_connections));
+
for itf in &cfg.itf {
+ let interface = Arc::new(Interface {
+ connections: ConnCount::new(itf.max_connections),
+ global_connections: global_connections.clone(),
+ cfg: itf.clone(), // Silly, config is never deallocated.
+ });
+
for addr in &itf.addr {
- let lst = try!(Listener::new(&addr));
+ let lst = try!(Listener::new(interface.clone(), &addr));
ev.spawn(Box::new(lst));
}
}