summaryrefslogtreecommitdiff
path: root/src/listener.rs
blob: d6dc45fdd460b6e9166bf6b2e622e55004788835 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use mio::{Ready,Token,Event};
use mio::tcp::TcpListener;
use std::net::SocketAddr;
use std::io::{Result,ErrorKind};
use std::time::Duration;
use std::sync::{Arc,Mutex};

use eventloop::{Machine,Context,EventLoop,TToken,Action};
use config;
use itf_http;



struct ConnCount {
    max: u32,
    cnt: Mutex<u32>,
}

impl ConnCount {
    fn new(max: u32) -> ConnCount {
        ConnCount { max: max, cnt: Mutex::new(0) }
    }

    // Increases count and returns true if there is a slot free
    fn acquire(&self) -> bool {
        let mut cnt = self.cnt.lock().unwrap();
        if *cnt >= self.max {
            false
        } else {
            *cnt += 1;
            true
        }
    }

    fn release(&self) {
        let mut cnt = self.cnt.lock().unwrap();
        *cnt -= 1;
    }
}



pub struct Interface {
    connections: ConnCount,
    global_connections: Arc<ConnCount>,
    pub cfg: config::Interface,
}


impl Interface {
    pub fn acquire(&self) -> bool {
        if self.connections.acquire() {
            if self.global_connections.acquire() {
                return true;
            } else {
                self.connections.release();
            }
        }
        false
    }

    // TODO: Using a guard object is much less error-prone than this manual release(), but needs
    // either yet-another-Arc or unsafe code.
    pub fn release(&self) {
        self.connections.release();
        self.global_connections.release();
    }
}


struct Listener {
    sock: TcpListener,
    addr: SocketAddr,
    io: Token,
    timeout: TToken,
    itf: Arc<Interface>,
}


impl Listener {
    fn new(ctx: &mut Context, itf: Arc<Interface>, sock: TcpListener, addr: SocketAddr) -> Listener {
        info!("Listening on {}", addr);
        let io = ctx.register(&sock, Ready::readable());
        Listener {
            sock: sock,
            addr: addr,
            io: io,
            timeout: ctx.alloc_timeout(),
            itf: itf,
        }
    }
}


impl Machine for Listener {
    fn handle(&mut self, ctx: &mut Context, _: Event) -> Option<Action> {
        match self.sock.accept() {
            Err(err) => {
                match err.kind() {
                    ErrorKind::WouldBlock |
                    ErrorKind::Interrupted |
                    ErrorKind::TimedOut => { /* Uninteresting temporary errors */ },
                    _ => {
                        error!("Error accepting on {}: {}", self.addr, err);
                        // Stop listening for short time. If this error is not persistent (like
                        // EMFILE), it will solve itself by just waiting. Not much else we can do.
                        ctx.deregister(&self.sock, self.io);
                        ctx.reset_timeout(self.timeout, Duration::from_millis(500));
                    }
                }
            },
            Ok((sock, addr)) => {
                debug!("{}: New connection", addr);
                if self.itf.acquire() {
                    let itf = self.itf.clone();
                    ctx.spawn(move|ctx| { Box::new(itf_http::ItfHttp::new(ctx, itf.clone(), sock, addr)) });
                } else {
                    // TODO: Specify which connection limit
                    warn!("Connection limit reached, consider increasing max-connections.");
                }
            }
        }
        None
    }

    fn timeout(&mut self, ctx: &mut Context, _: TToken) -> Option<Action> {
        self.io = ctx.register(&self.sock, Ready::readable());
        None
    }
}


pub fn setup(ev: &mut EventLoop, cfg: &config::Config) -> Result<()> {
    // XXX: All this ARC stuff is silly, listeners stay alive forever so none of this memory is
    // ever deallocated.
    let global_connections = Arc::new(ConnCount::new(cfg.max_connections));

    for itf in &cfg.itf {
        let interface = Arc::new(Interface {
            connections: ConnCount::new(itf.max_connections),
            global_connections: global_connections.clone(),
            cfg: itf.clone(), // Silly, config is never deallocated.
        });

        for &addr in &itf.addr {
            trace!("Binding {}", addr);
            let sock = try!(TcpListener::bind(&addr));
            let itf = interface.clone();
            ev.spawn(move|ctx| { Box::new(Listener::new(ctx, itf, sock, addr)) });
        }
    }
    Ok(())
}