diff options
author | Yorhel <git@yorhel.nl> | 2016-09-07 15:04:19 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2016-09-07 15:05:04 +0200 |
commit | ccd38998fd8ff2bbf2f916e6e0d04087eaaf6293 (patch) | |
tree | 539b4849c0d11f902defb6a66445c4f74c7bf075 /src | |
parent | efe595493c9a8dd1e4df3b9045fff012521449c8 (diff) |
Add max-connections (global+interface) and io-timeout (interface) config options
TODO: Add a global io-timeout that is inherited by interfaces that don't
set it? Or is that needless complexity?
I've added Sync bounds to Machines and implemented all this using
Arc/Mutex in order for Machines to be runnable on multiple threads later
on.
Diffstat (limited to 'src')
-rw-r--r-- | src/config.rs | 52 | ||||
-rw-r--r-- | src/eventloop.rs | 8 | ||||
-rw-r--r-- | src/itf_http.rs | 16 | ||||
-rw-r--r-- | src/listener.rs | 87 |
4 files changed, 134 insertions, 29 deletions
diff --git a/src/config.rs b/src/config.rs index c3882d8..655ca19 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,8 @@ use std::str::FromStr; use nom; use std::net::SocketAddr; use nom::{IResult,Input,Consumer,ConsumerState,Move,Producer,HexDisplay}; -use nom::{alpha,alphanumeric,space,multispace,line_ending,not_line_ending}; +use nom::{alpha,is_alphanumeric,space,multispace,line_ending,not_line_ending}; +use std::time::Duration; #[derive(Debug,PartialEq,Eq)] @@ -23,8 +24,10 @@ named!(parse_itf<&[u8],Expr>, chain!( ||{ Expr::Interface(val) } )); +fn is_key(c: u8) -> bool { is_alphanumeric(c) || c == b'-' } + named!(parse_keyval<&[u8],Expr>, chain!( - key: map_res!(alphanumeric, std::str::from_utf8) ~ + key: map_res!(take_while!(is_key), std::str::from_utf8) ~ space ~ val: map_res!(not_line_ending, std::str::from_utf8) ~ line_ending, @@ -40,14 +43,17 @@ named!(parse_expr<&[u8],Expr>, alt!( )); -#[derive(Debug,Clone,Default)] +#[derive(Debug,Clone)] pub struct Interface { - pub addr: Vec<SocketAddr> + pub addr: Vec<SocketAddr>, + pub max_connections: u32, + pub io_timeout: Duration, } -#[derive(Debug,Clone,Default)] +#[derive(Debug,Clone)] pub struct Config { - pub itf: Vec<Interface> + pub itf: Vec<Interface>, + pub max_connections: u32, } @@ -70,12 +76,30 @@ struct ConfigConsumer { impl ConfigConsumer { + fn new() -> ConfigConsumer { + ConfigConsumer{ + state: ParseState::Root, + c_state: ConsumerState::Continue(Move::Consume(0)), + cfg: Config { + itf: Vec::new(), + max_connections: 1000, + }, + } + } + fn handle_root(&mut self, e: Expr) { match e { Expr::Interface(_) => { - self.cfg.itf.push(Default::default()); + self.cfg.itf.push(Interface { + addr: Vec::new(), + max_connections: std::u32::MAX, + io_timeout: Duration::from_secs(60), + }); self.state = ParseState::Interface }, + Expr::Keyval("max-connections", v) => { + self.cfg.max_connections = u32::from_str(v).expect("Invalid value"); + }, _ => { panic!("Not implemented yet.") } } } @@ -87,6 +111,12 @@ impl ConfigConsumer { // TODO: More flexible input format (e.g. ":80" or dns "localhost:8080") itf.addr.push(SocketAddr::from_str(a).expect("I crash on invalid strings")); }, + Expr::Keyval("max-connections", v) => { + itf.max_connections = u32::from_str(v).expect("Invalid value"); + }, + Expr::Keyval("io-timeout", v) => { + itf.io_timeout = Duration::from_secs(u16::from_str(v).expect("Invalid value") as u64); + }, Expr::Close => { if itf.addr.len() < 1 { panic!("No interface address configured"); @@ -137,11 +167,7 @@ impl<'a> Consumer<&'a[u8], (), (), nom::Move> for ConfigConsumer { impl Config { pub fn parse(file: &str) -> Result<Config, Err> { let mut p = try!(nom::FileProducer::new(file, 1024).map_err(Err::Io)); - let mut c = ConfigConsumer{ - state: ParseState::Root, - c_state: ConsumerState::Continue(Move::Consume(0)), - cfg: Default::default() - }; + let mut c = ConfigConsumer::new(); loop { match *p.apply(&mut c) { ConsumerState::Done(_, _) => { return Ok(c.cfg) } @@ -163,6 +189,7 @@ fn test_parse_expr() { assert_eq!(e(b"##/!@$% \nx"), x(Expr::Comment)); assert_eq!(e(b"}\nx"), x(Expr::Close)); assert_eq!(e(b"key val\nx"), x(Expr::Keyval("key", "val"))); + assert_eq!(e(b"k-y val\nx"), x(Expr::Keyval("k-y", "val"))); assert_eq!(e(b"ke\t v a \nx"), x(Expr::Keyval("ke", "v a"))); assert_eq!(e(b"interface http {x"), x(Expr::Interface("http"))); assert_eq!(e(b"interface \thttp{x"), x(Expr::Interface("http"))); @@ -171,6 +198,7 @@ fn test_parse_expr() { "", "something", "#something", + "som/thing else", ][..]; for s in l { if let IResult::Done(_,_) = e(s.as_bytes()) { diff --git a/src/eventloop.rs b/src/eventloop.rs index 1a41762..93343b1 100644 --- a/src/eventloop.rs +++ b/src/eventloop.rs @@ -88,7 +88,7 @@ pub struct EventLoop { timer: Timer<TToken>, timers: Slab<Option<(MToken,Timeout)>, TToken>, // A machine entry is set to None during a method call on the Machine object. - machines: Slab<Option<Box<Machine>>, MToken>, + machines: Slab<Option<Box<Machine + Sync>>, MToken>, } pub struct Context<'a> { @@ -158,7 +158,7 @@ impl<'a> Context<'a> { self.removed } - pub fn spawn(&mut self, machine: Box<Machine>) { + pub fn spawn(&mut self, machine: Box<Machine + Sync>) { self.parent.spawn(machine); } } @@ -189,7 +189,7 @@ impl EventLoop { } } - pub fn spawn(&mut self, mut machine: Box<Machine>) { + pub fn spawn(&mut self, mut machine: Box<Machine + Sync>) { let mtoken = slab_insert(&mut self.machines, None); trace!("[{}] Spawning machine", mtoken.0); { @@ -202,7 +202,7 @@ impl EventLoop { // XXX: I don't get why "&mut Machine" doesn't work in the FnOnce. fn dispatch<F>(&mut self, mtoken: MToken, f: F) - where F: FnOnce(&mut Box<Machine>, &mut Context) { + where F: FnOnce(&mut Box<Machine + Sync>, &mut Context) { let mut machine = match self.machines.entry(mtoken) { None => { return; }, Some(mut x) => { x.replace(None).unwrap() }, diff --git a/src/itf_http.rs b/src/itf_http.rs index f234740..a89d9e0 100644 --- a/src/itf_http.rs +++ b/src/itf_http.rs @@ -2,8 +2,9 @@ use mio::{Ready,Token,Event}; use mio::tcp::TcpStream; use std::net::SocketAddr; use std::io::ErrorKind; -use std::time::Duration; +use listener::Interface; use eventloop::{Machine,Context,TToken}; +use std::sync::Arc; use netbuf::Buf; @@ -16,12 +17,12 @@ pub struct ItfHttp { rbuf: Buf, wbuf: Buf, iotimeout: TToken, /* timeout on any form of IO */ - iotimeout_val: Duration, + itf: Arc<Interface>, } impl ItfHttp { - pub fn new(sock: TcpStream, addr: SocketAddr) -> ItfHttp { + pub fn new(itf: Arc<Interface>, sock: TcpStream, addr: SocketAddr) -> ItfHttp { ItfHttp { sock: sock, addr: addr, @@ -30,7 +31,7 @@ impl ItfHttp { rbuf: Buf::new(), wbuf: Buf::new(), iotimeout: TToken(0), - iotimeout_val: Duration::from_secs(10) // TODO: Make configurable & increase + itf: itf, } } @@ -92,10 +93,11 @@ impl ItfHttp { self.ioreg = reg; ctx.reregister(&self.sock, self.io, self.ioreg); } - ctx.reset_timeout(self.iotimeout, self.iotimeout_val); + ctx.reset_timeout(self.iotimeout, self.itf.cfg.io_timeout); } fn remove(&mut self, ctx: &mut Context) { + self.itf.release(); ctx.deregister(&self.sock, self.io); ctx.cancel_timeout(self.iotimeout); ctx.remove(); @@ -106,7 +108,7 @@ impl ItfHttp { impl Machine for ItfHttp { fn init(&mut self, ctx: &mut Context) { self.io = ctx.register(&self.sock, self.ioreg); - self.iotimeout = ctx.set_timeout(self.iotimeout_val); + self.iotimeout = ctx.set_timeout(self.itf.cfg.io_timeout); } fn handle(&mut self, ctx: &mut Context, ev: Event) { @@ -122,7 +124,7 @@ impl Machine for ItfHttp { } fn timeout(&mut self, ctx: &mut Context, _: TToken) { - debug!("{}: No IO activity in {} seconds", self.addr, self.iotimeout_val.as_secs()); + debug!("{}: No IO activity in {} seconds", self.addr, self.itf.cfg.io_timeout.as_secs()); self.remove(ctx); } } diff --git a/src/listener.rs b/src/listener.rs index fcac06d..f1a1ad9 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -3,22 +3,82 @@ use mio::tcp::TcpListener; use std::net::SocketAddr; use std::io::{Result,ErrorKind}; use std::time::Duration; -use config::Config; use eventloop::{Machine,Context,EventLoop,TToken}; +use std::sync::{Arc,Mutex}; +use config; use itf_http; + +struct ConnCount { + max: u32, + cnt: Mutex<u32>, +} + +impl ConnCount { + fn new(max: u32) -> ConnCount { + ConnCount { max: max, cnt: Mutex::new(0) } + } + + // Increases count and returns true if there is a slot free + fn acquire(&self) -> bool { + let mut cnt = self.cnt.lock().unwrap(); + if *cnt >= self.max { + false + } else { + *cnt += 1; + true + } + } + + fn release(&self) { + let mut cnt = self.cnt.lock().unwrap(); + *cnt -= 1; + } +} + + + +pub struct Interface { + connections: ConnCount, + global_connections: Arc<ConnCount>, + pub cfg: config::Interface, +} + + +impl Interface { + pub fn acquire(&self) -> bool { + if self.connections.acquire() { + if self.global_connections.acquire() { + return true; + } else { + self.connections.release(); + } + } + false + } + + // TODO: Using a guard object is much less error-prone than this manual release(), but needs + // either yet-another-Arc or unsafe code. + pub fn release(&self) { + self.connections.release(); + self.global_connections.release(); + } +} + + struct Listener { sock: TcpListener, addr: SocketAddr, io: Token, timeout: TToken, + itf: Arc<Interface>, } impl Listener { - fn new(addr: &SocketAddr) -> Result<Listener> { + fn new(itf: Arc<Interface>, addr: &SocketAddr) -> Result<Listener> { trace!("Binding {}", addr); let sock = try!(TcpListener::bind(addr)); Ok(Listener { @@ -26,6 +86,7 @@ impl Listener { addr: *addr, io: Token(0), timeout: TToken(0), + itf: itf, }) } } @@ -39,7 +100,6 @@ impl Machine for Listener { } fn handle(&mut self, ctx: &mut Context, _: Event) { - // TODO: max_connections or something match self.sock.accept() { Err(err) => { match err.kind() { @@ -57,7 +117,12 @@ impl Machine for Listener { }, Ok((sock, addr)) => { debug!("{}: New connection", addr); - ctx.spawn(Box::new(itf_http::ItfHttp::new(sock, addr))); + if self.itf.acquire() { + ctx.spawn(Box::new(itf_http::ItfHttp::new(self.itf.clone(), sock, addr))); + } else { + // TODO: Specify which connection limit + warn!("Connection limit reached, consider increasing max-connections."); + } } } } @@ -68,10 +133,20 @@ impl Machine for Listener { } -pub fn setup(ev: &mut EventLoop, cfg: &Config) -> Result<()> { +pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> { + // XXX: All this ARC stuff is silly, listeners stay alive forever so none of this memory is + // ever deallocated. + let global_connections = Arc::new(ConnCount::new(cfg.max_connections)); + for itf in &cfg.itf { + let interface = Arc::new(Interface { + connections: ConnCount::new(itf.max_connections), + global_connections: global_connections.clone(), + cfg: itf.clone(), // Silly, config is never deallocated. + }); + for addr in &itf.addr { - let lst = try!(Listener::new(&addr)); + let lst = try!(Listener::new(interface.clone(), &addr)); ev.spawn(Box::new(lst)); } } |