summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2016-09-07 15:04:19 +0200
committerYorhel <git@yorhel.nl>2016-09-07 15:05:04 +0200
commitccd38998fd8ff2bbf2f916e6e0d04087eaaf6293 (patch)
tree539b4849c0d11f902defb6a66445c4f74c7bf075 /src
parentefe595493c9a8dd1e4df3b9045fff012521449c8 (diff)
Add max-connections (global+interface) and io-timeout (interface) config options
TODO: Add a global io-timeout that is inherited by interfaces that don't set it? Or is that needless complexity? I've added Sync bounds to Machines and implemented all this using Arc/Mutex in order for Machines to be runnable on multiple threads later on.
Diffstat (limited to 'src')
-rw-r--r--src/config.rs52
-rw-r--r--src/eventloop.rs8
-rw-r--r--src/itf_http.rs16
-rw-r--r--src/listener.rs87
4 files changed, 134 insertions, 29 deletions
diff --git a/src/config.rs b/src/config.rs
index c3882d8..655ca19 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -3,7 +3,8 @@ use std::str::FromStr;
use nom;
use std::net::SocketAddr;
use nom::{IResult,Input,Consumer,ConsumerState,Move,Producer,HexDisplay};
-use nom::{alpha,alphanumeric,space,multispace,line_ending,not_line_ending};
+use nom::{alpha,is_alphanumeric,space,multispace,line_ending,not_line_ending};
+use std::time::Duration;
#[derive(Debug,PartialEq,Eq)]
@@ -23,8 +24,10 @@ named!(parse_itf<&[u8],Expr>, chain!(
||{ Expr::Interface(val) }
));
+fn is_key(c: u8) -> bool { is_alphanumeric(c) || c == b'-' }
+
named!(parse_keyval<&[u8],Expr>, chain!(
- key: map_res!(alphanumeric, std::str::from_utf8) ~
+ key: map_res!(take_while!(is_key), std::str::from_utf8) ~
space ~
val: map_res!(not_line_ending, std::str::from_utf8) ~
line_ending,
@@ -40,14 +43,17 @@ named!(parse_expr<&[u8],Expr>, alt!(
));
-#[derive(Debug,Clone,Default)]
+#[derive(Debug,Clone)]
pub struct Interface {
- pub addr: Vec<SocketAddr>
+ pub addr: Vec<SocketAddr>,
+ pub max_connections: u32,
+ pub io_timeout: Duration,
}
-#[derive(Debug,Clone,Default)]
+#[derive(Debug,Clone)]
pub struct Config {
- pub itf: Vec<Interface>
+ pub itf: Vec<Interface>,
+ pub max_connections: u32,
}
@@ -70,12 +76,30 @@ struct ConfigConsumer {
impl ConfigConsumer {
+ fn new() -> ConfigConsumer {
+ ConfigConsumer{
+ state: ParseState::Root,
+ c_state: ConsumerState::Continue(Move::Consume(0)),
+ cfg: Config {
+ itf: Vec::new(),
+ max_connections: 1000,
+ },
+ }
+ }
+
fn handle_root(&mut self, e: Expr) {
match e {
Expr::Interface(_) => {
- self.cfg.itf.push(Default::default());
+ self.cfg.itf.push(Interface {
+ addr: Vec::new(),
+ max_connections: std::u32::MAX,
+ io_timeout: Duration::from_secs(60),
+ });
self.state = ParseState::Interface
},
+ Expr::Keyval("max-connections", v) => {
+ self.cfg.max_connections = u32::from_str(v).expect("Invalid value");
+ },
_ => { panic!("Not implemented yet.") }
}
}
@@ -87,6 +111,12 @@ impl ConfigConsumer {
// TODO: More flexible input format (e.g. ":80" or dns "localhost:8080")
itf.addr.push(SocketAddr::from_str(a).expect("I crash on invalid strings"));
},
+ Expr::Keyval("max-connections", v) => {
+ itf.max_connections = u32::from_str(v).expect("Invalid value");
+ },
+ Expr::Keyval("io-timeout", v) => {
+ itf.io_timeout = Duration::from_secs(u16::from_str(v).expect("Invalid value") as u64);
+ },
Expr::Close => {
if itf.addr.len() < 1 {
panic!("No interface address configured");
@@ -137,11 +167,7 @@ impl<'a> Consumer<&'a[u8], (), (), nom::Move> for ConfigConsumer {
impl Config {
pub fn parse(file: &str) -> Result<Config, Err> {
let mut p = try!(nom::FileProducer::new(file, 1024).map_err(Err::Io));
- let mut c = ConfigConsumer{
- state: ParseState::Root,
- c_state: ConsumerState::Continue(Move::Consume(0)),
- cfg: Default::default()
- };
+ let mut c = ConfigConsumer::new();
loop {
match *p.apply(&mut c) {
ConsumerState::Done(_, _) => { return Ok(c.cfg) }
@@ -163,6 +189,7 @@ fn test_parse_expr() {
assert_eq!(e(b"##/!@$% \nx"), x(Expr::Comment));
assert_eq!(e(b"}\nx"), x(Expr::Close));
assert_eq!(e(b"key val\nx"), x(Expr::Keyval("key", "val")));
+ assert_eq!(e(b"k-y val\nx"), x(Expr::Keyval("k-y", "val")));
assert_eq!(e(b"ke\t v a \nx"), x(Expr::Keyval("ke", "v a")));
assert_eq!(e(b"interface http {x"), x(Expr::Interface("http")));
assert_eq!(e(b"interface \thttp{x"), x(Expr::Interface("http")));
@@ -171,6 +198,7 @@ fn test_parse_expr() {
"",
"something",
"#something",
+ "som/thing else",
][..];
for s in l {
if let IResult::Done(_,_) = e(s.as_bytes()) {
diff --git a/src/eventloop.rs b/src/eventloop.rs
index 1a41762..93343b1 100644
--- a/src/eventloop.rs
+++ b/src/eventloop.rs
@@ -88,7 +88,7 @@ pub struct EventLoop {
timer: Timer<TToken>,
timers: Slab<Option<(MToken,Timeout)>, TToken>,
// A machine entry is set to None during a method call on the Machine object.
- machines: Slab<Option<Box<Machine>>, MToken>,
+ machines: Slab<Option<Box<Machine + Sync>>, MToken>,
}
pub struct Context<'a> {
@@ -158,7 +158,7 @@ impl<'a> Context<'a> {
self.removed
}
- pub fn spawn(&mut self, machine: Box<Machine>) {
+ pub fn spawn(&mut self, machine: Box<Machine + Sync>) {
self.parent.spawn(machine);
}
}
@@ -189,7 +189,7 @@ impl EventLoop {
}
}
- pub fn spawn(&mut self, mut machine: Box<Machine>) {
+ pub fn spawn(&mut self, mut machine: Box<Machine + Sync>) {
let mtoken = slab_insert(&mut self.machines, None);
trace!("[{}] Spawning machine", mtoken.0);
{
@@ -202,7 +202,7 @@ impl EventLoop {
// XXX: I don't get why "&mut Machine" doesn't work in the FnOnce.
fn dispatch<F>(&mut self, mtoken: MToken, f: F)
- where F: FnOnce(&mut Box<Machine>, &mut Context) {
+ where F: FnOnce(&mut Box<Machine + Sync>, &mut Context) {
let mut machine = match self.machines.entry(mtoken) {
None => { return; },
Some(mut x) => { x.replace(None).unwrap() },
diff --git a/src/itf_http.rs b/src/itf_http.rs
index f234740..a89d9e0 100644
--- a/src/itf_http.rs
+++ b/src/itf_http.rs
@@ -2,8 +2,9 @@ use mio::{Ready,Token,Event};
use mio::tcp::TcpStream;
use std::net::SocketAddr;
use std::io::ErrorKind;
-use std::time::Duration;
+use listener::Interface;
use eventloop::{Machine,Context,TToken};
+use std::sync::Arc;
use netbuf::Buf;
@@ -16,12 +17,12 @@ pub struct ItfHttp {
rbuf: Buf,
wbuf: Buf,
iotimeout: TToken, /* timeout on any form of IO */
- iotimeout_val: Duration,
+ itf: Arc<Interface>,
}
impl ItfHttp {
- pub fn new(sock: TcpStream, addr: SocketAddr) -> ItfHttp {
+ pub fn new(itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp {
ItfHttp {
sock: sock,
addr: addr,
@@ -30,7 +31,7 @@ impl ItfHttp {
rbuf: Buf::new(),
wbuf: Buf::new(),
iotimeout: TToken(0),
- iotimeout_val: Duration::from_secs(10) // TODO: Make configurable & increase
+ itf: itf,
}
}
@@ -92,10 +93,11 @@ impl ItfHttp {
self.ioreg = reg;
ctx.reregister(&self.sock, self.io, self.ioreg);
}
- ctx.reset_timeout(self.iotimeout, self.iotimeout_val);
+ ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout);
}
fn remove(&mut self, ctx: &mut Context) {
+ self.itf.release();
ctx.deregister(&self.sock, self.io);
ctx.cancel_timeout(self.iotimeout);
ctx.remove();
@@ -106,7 +108,7 @@ impl ItfHttp {
impl Machine for ItfHttp {
fn init(&mut self, ctx: &mut Context) {
self.io = ctx.register(&self.sock, self.ioreg);
- self.iotimeout = ctx.set_timeout(self.iotimeout_val);
+ self.iotimeout = ctx.set_timeout(self.itf.cfg.io_timeout);
}
fn handle(&mut self, ctx: &mut Context, ev: Event) {
@@ -122,7 +124,7 @@ impl Machine for ItfHttp {
}
fn timeout(&mut self, ctx: &mut Context, _: TToken) {
- debug!("{}: No IO activity in {} seconds", self.addr, self.iotimeout_val.as_secs());
+ debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs());
self.remove(ctx);
}
}
diff --git a/src/listener.rs b/src/listener.rs
index fcac06d..f1a1ad9 100644
--- a/src/listener.rs
+++ b/src/listener.rs
@@ -3,22 +3,82 @@ use mio::tcp::TcpListener;
use std::net::SocketAddr;
use std::io::{Result,ErrorKind};
use std::time::Duration;
-use config::Config;
use eventloop::{Machine,Context,EventLoop,TToken};
+use std::sync::{Arc,Mutex};
+use config;
use itf_http;
+
+struct ConnCount {
+ max: u32,
+ cnt: Mutex<u32>,
+}
+
+impl ConnCount {
+ fn new(max: u32) -> ConnCount {
+ ConnCount { max: max, cnt: Mutex::new(0) }
+ }
+
+ // Increases count and returns true if there is a slot free
+ fn acquire(&self) -> bool {
+ let mut cnt = self.cnt.lock().unwrap();
+ if *cnt >= self.max {
+ false
+ } else {
+ *cnt += 1;
+ true
+ }
+ }
+
+ fn release(&self) {
+ let mut cnt = self.cnt.lock().unwrap();
+ *cnt -= 1;
+ }
+}
+
+
+
+pub struct Interface {
+ connections: ConnCount,
+ global_connections: Arc<ConnCount>,
+ pub cfg: config::Interface,
+}
+
+
+impl Interface {
+ pub fn acquire(&self) -> bool {
+ if self.connections.acquire() {
+ if self.global_connections.acquire() {
+ return true;
+ } else {
+ self.connections.release();
+ }
+ }
+ false
+ }
+
+ // TODO: Using a guard object is much less error-prone than this manual release(), but needs
+ // either yet-another-Arc or unsafe code.
+ pub fn release(&self) {
+ self.connections.release();
+ self.global_connections.release();
+ }
+}
+
+
struct Listener {
sock: TcpListener,
addr: SocketAddr,
io: Token,
timeout: TToken,
+ itf: Arc<Interface>,
}
impl Listener {
- fn new(addr: &SocketAddr) -> Result<Listener> {
+ fn new(itf: Arc<Interface>, addr: &SocketAddr) -> Result<Listener> {
trace!("Binding {}", addr);
let sock = try!(TcpListener::bind(addr));
Ok(Listener {
@@ -26,6 +86,7 @@ impl Listener {
addr: *addr,
io: Token(0),
timeout: TToken(0),
+ itf: itf,
})
}
}
@@ -39,7 +100,6 @@ impl Machine for Listener {
}
fn handle(&mut self, ctx: &mut Context, _: Event) {
- // TODO: max_connections or something
match self.sock.accept() {
Err(err) => {
match err.kind() {
@@ -57,7 +117,12 @@ impl Machine for Listener {
},
Ok((sock, addr)) => {
debug!("{}: New connection", addr);
- ctx.spawn(Box::new(itf_http::ItfHttp::new(sock, addr)));
+ if self.itf.acquire() {
+ ctx.spawn(Box::new(itf_http::ItfHttp::new(self.itf.clone(), sock, addr)));
+ } else {
+ // TODO: Specify which connection limit
+ warn!("Connection limit reached, consider increasing max-connections.");
+ }
}
}
}
@@ -68,10 +133,20 @@ impl Machine for Listener {
}
-pub fn setup(ev: &mut EventLoop, cfg: &Config) -> Result<()> {
+pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> {
+ // XXX: All this ARC stuff is silly, listeners stay alive forever so none of this memory is
+ // ever deallocated.
+ let global_connections = Arc::new(ConnCount::new(cfg.max_connections));
+
for itf in &cfg.itf {
+ let interface = Arc::new(Interface {
+ connections: ConnCount::new(itf.max_connections),
+ global_connections: global_connections.clone(),
+ cfg: itf.clone(), // Silly, config is never deallocated.
+ });
+
for addr in &itf.addr {
- let lst = try!(Listener::new(&addr));
+ let lst = try!(Listener::new(interface.clone(), &addr));
ev.spawn(Box::new(lst));
}
}