summaryrefslogtreecommitdiff
path: root/src/eventloop.rs
blob: 93343b155c5981953422687ec0986ec2df05aec2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
/* This is a thin wrapper around mio for building state machines on a shared event loop.
 * It's similar in spirit to Rotor, but with a bunch of simplifications and other differences.
 * (Rotor is currently in too much flux and its ecosystem very incomplete/alpha)
 *
 * = Machine -> Eventloop interaction
 *
 * The biggest difference (I think) between Rotor and this is the way it deals with the problem of
 * communicating actions back from the Machine to the EventLoop. Since the EventLoop is the owner
 * of the Machine, it is not possible to give a mutable borrow of the EventLoop to a method in the
 * Machine, as that what will result in a double mutable borrow. This problem is inherent in the
 * architecture and Rust is correct to disallow the borrow; since it would be possible to mutate
 * the Machine object through multiple aliases.
 *
 * Rotor solves this problem by having the method handlers return a "response", i.e. an action that
 * it wants the EventLoop to perform. This implementation is more flexible but slightly more hacky:
 * When calling a method in the Machine, the Machine object itself is temporarily removed from the
 * EventLoop object, thus removing the alias and allowing both objects to be Mutably borrowed.
 * Downside: This solution is more prone to logic errors that Rust can't catch, like removing or
 * re-using the MToken while inside a Machine handler.
 *
 * = Some rants on mio::timer
 *
 * This implementation wraps mio::timer in a way to provide an API that is similar to IO
 * registration. The mio-provided API has a few odd warts that make it hard to use:
 * - On registration you get a Timeout object, but on poll you get your custom token
 * - On poll you only get your custom token, not the previous Timeout object. In fact, the Timeout
 *   object has implicitly become invalid.
 * - The Timeout object is needed in order to change the timeout or cancel it
 *
 * This wrapping provides extra overhead because we need to keep track of each timer registration.
 * Some other weak points of mio::timer:
 * - It spawns a thread in order to signal timeouts. To me it seems much easier and efficient to
 *   simply provide a timeout value that can be passed to Poll::poll().
 * - It does an up-front allocation for all timer objects, which is a waste of memory if the upper
 *   bound on the number of timers is much higher than what is actually reached (likely), and
 *   forces me to calculate this upper bound in the first place. (It also requires me to add
 *   ugly-looking unwrap()'s to function calls that should never fail)
 *
 *
 * TODO: Machines don't care on which thread they run, so as a scalability improvement it's
 * possible to spawn a configurable number of threads on start-up, run a separate event loop on
 * each, and assign each Machine to a different event loop.
 */
use mio;
use mio::{Poll,Events,Token};
use mio::timer::{Timer,Timeout};
use std;
use std::time::Duration;
use slab::Slab;


// TODO: Upstream this to Slab crate?
fn slab_insert<T, I:Into<usize>+From<usize>>(s: &mut Slab<T,I>, t: T) -> I {
    s.insert(t).or_else(|t| {
        // Grow by some small fixed number. From what I can see the Slab implementation already
        // does exponential growth internally.
        s.reserve_exact(8);
        s.insert(t)
    }).unwrap_or_else(|_| { unreachable!() })
}


// Reserved token for the timer. Assuming this value is never allocated inside a Slab.
// Note that mio already uses MAX internally, and this is undocumented.
// Such a fragile API. -.-
const TIMER_TOKEN: Token = Token(std::usize::MAX-1);


pub trait Machine {
    fn init(&mut self, &mut Context);
    fn handle(&mut self, &mut Context, mio::Event);
    fn timeout(&mut self, &mut Context, TToken);
}


#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct MToken(pub usize);
impl From<usize> for MToken { fn from(val: usize) -> MToken { MToken(val) } }
impl From<MToken> for usize { fn from(val: MToken) -> usize { val.0 } }

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TToken(pub usize);
impl From<usize> for TToken { fn from(val: usize) -> TToken { TToken(val) } }
impl From<TToken> for usize { fn from(val: TToken) -> usize { val.0 } }


pub struct EventLoop {
    poll: Poll,
    tokens: Slab<MToken, Token>,
    timer: Timer<TToken>,
    timers: Slab<Option<(MToken,Timeout)>, TToken>,
    // A machine entry is set to None during a method call on the Machine object.
    machines: Slab<Option<Box<Machine + Sync>>, MToken>,
}

pub struct Context<'a> {
    parent: &'a mut EventLoop,
    machine: MToken,
    removed: bool
}


impl<'a> Context<'a> {
    // This method is opinionated in always providing a PollOpt::level(). It's the only option that
    // makes sense. :)
    pub fn register<E: ?Sized>(&mut self, io: &E, interest: mio::Ready) -> Token where E: mio::Evented {
        let token = slab_insert(&mut self.parent.tokens, self.machine);
        self.parent.poll.register(io, token, interest, mio::PollOpt::level()).unwrap();
        token
    }

    pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: mio::Ready) where E: mio::Evented {
        assert_eq!(self.parent.tokens[token], self.machine);
        self.parent.poll.reregister(io, token, interest, mio::PollOpt::level()).unwrap();
    }

    pub fn deregister<E: ?Sized>(&mut self, io: &E, token: Token) where E: mio::Evented {
        assert_eq!(self.parent.tokens[token], self.machine);
        self.parent.tokens.remove(token);
        self.parent.poll.deregister(io).unwrap();
    }

    // The _timeout() methods behave the same as the above register methods, and follow the same
    // control flow, independent of whether the timeout has passed in between or not:
    // 1. set_timeout() or alloc_timeout() to get and initialize a token,
    // 2. Followed by any number of reset_timeout() to change the duration of the timer if it is
    //    still active, or set the new timeout after it has fired.
    // 3. cancel_timeout() to stop the timer if it still active, and deallocate the timeout

    // Allocates a timeout token but does not set a timeout. The token can be activated with
    // reset_timeout()
    pub fn alloc_timeout(&mut self) -> TToken {
        slab_insert(&mut self.parent.timers, None)
    }

    pub fn set_timeout(&mut self, d: Duration) -> TToken {
        let token = self.alloc_timeout();
        self.reset_timeout(token, d);
        token
    }

    pub fn reset_timeout(&mut self, token: TToken, d: Duration) {
        if let Some((_, ref timeout)) = self.parent.timers[token] {
            self.parent.timer.cancel_timeout(timeout);
        }
        self.parent.timers[token] = Some((self.machine, self.parent.timer.set_timeout(d, token).unwrap()));
    }

    pub fn cancel_timeout(&mut self, token: TToken) {
        if let Some( Some((_, timeout)) ) = self.parent.timers.remove(token) {
            self.parent.timer.cancel_timeout(&timeout);
        }
    }

    pub fn remove(&mut self) {
        self.removed = true;
    }

    pub fn is_removed(&self) -> bool {
        self.removed
    }

    pub fn spawn(&mut self, machine: Box<Machine + Sync>) {
        self.parent.spawn(machine);
    }
}


impl EventLoop {
    pub fn new() -> EventLoop {
        let timer = mio::timer::Builder::default()
            // Timers are used for (1) aborting a connection or process after some timeout and
            // (2) back-off after listen error. (1) will be fine with 1-second precision, and
            // (2) should be incredibly rare. The current value is terribly imprecise for a
            // generic timer, but should suffice for our limited use case.
            .tick_duration(Duration::from_millis(200))
            // TODO: Calculate a more exact upper limit. I'd prefer mio to just resize internal
            // structures on demand, but it can't do that (yet). The default value allocates
            // 2MB of memory, which is far too wasteful.
            .capacity(4_000).build();

        let poll = Poll::new().unwrap();
        poll.register(&timer, TIMER_TOKEN, mio::Ready::readable(), mio::PollOpt::edge()).unwrap();

        EventLoop {
            poll: poll,
            tokens: Slab::with_capacity(16),
            timer: timer,
            timers: Slab::with_capacity(16),
            machines: Slab::with_capacity(16),
        }
    }

    pub fn spawn(&mut self, mut machine: Box<Machine + Sync>) {
        let mtoken = slab_insert(&mut self.machines, None);
        trace!("[{}] Spawning machine", mtoken.0);
        {
            let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
            machine.init(&mut ctx);
            assert!(!ctx.removed);
        }
        self.machines[mtoken] = Some(machine);
    }

    // XXX: I don't get why "&mut Machine" doesn't work in the FnOnce.
    fn dispatch<F>(&mut self, mtoken: MToken, f: F)
            where F: FnOnce(&mut Box<Machine + Sync>, &mut Context) {
        let mut machine = match self.machines.entry(mtoken) {
            None => { return; },
            Some(mut x) => { x.replace(None).unwrap() },
        };

        let removed = {
            let mut ctx = Context{ parent: self, machine: mtoken, removed: false };
            f(&mut machine, &mut ctx);
            ctx.removed
        };

        if !removed {
            self.machines[mtoken] = Some(machine);
        } else {
            trace!("[{}] Removing machine", mtoken.0);
        }
    }

    fn dispatch_io(&mut self, event: mio::Event) {
        if let Some(&mtoken) = self.tokens.get(event.token()) {
            self.dispatch(mtoken, |m, ctx| {
                trace!("[{}] Calling handle for event {} state {:?}", mtoken.0, event.token().0, event.kind());
                m.handle(ctx, event)
            });
        }
    }

    fn dispatch_timeout(&mut self) {
        while let Some(ttoken) = self.timer.poll() {
            let mtoken = self.timers.entry(ttoken).and_then(|mut e| { e.replace(None).map(|(t,_)| { t }) });
            if let Some(mtoken) = mtoken {
                self.dispatch(mtoken, |m, ctx| {
                    trace!("[{}] Calling timeout", mtoken.0);
                    m.timeout(ctx, ttoken)
                });
            }
        }
    }

    pub fn run(&mut self) {
        let mut events = Events::with_capacity(64);
        debug!("Entering event loop");
        loop {
            self.poll.poll(&mut events, None).unwrap();
            for event in events.iter() {
                if event.token() == TIMER_TOKEN {
                    self.dispatch_timeout();
                } else {
                    self.dispatch_io(event);
                }
            }
        }
    }
}