summaryrefslogtreecommitdiff
path: root/src/iostream.rs
blob: 22d5ef0cd8d6660a88b3c9da847902c4f68dd84e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
use mio::{Ready,Token,Event};
use mio::tcp::TcpStream;
use std::io::{Result,ErrorKind,Error};
use std::time::Duration;
use netbuf::Buf;

use eventloop::{Context,TToken};


pub struct IoStream {
    pub sock: TcpStream,
    pub rbuf: Buf,
    pub wbuf: Buf,
    io: Token,
    ioreg: Ready,
    rtime: TToken,
    wtime: TToken,
    rtimeout: Duration,
    wtimeout: Duration,
    rmax: usize,
    rexpect: bool,
}


impl IoStream {
    pub fn new(ctx: &mut Context, sock: TcpStream, rtimeout: Duration, wtimeout: Duration, rmax: usize) -> IoStream {
        let io = ctx.reg_alloc();
        let rtime = ctx.timeout_alloc();
        ctx.reg_set(&sock, io, Ready::readable());
        ctx.timeout_set(rtime, rtimeout);
        IoStream {
            sock: sock,
            io: io,
            ioreg: Ready::readable(),
            rbuf: Buf::new(),
            wbuf: Buf::new(),
            rtime: rtime,
            wtime: ctx.timeout_alloc(),
            rtimeout: rtimeout,
            wtimeout: wtimeout,
            rmax: rmax,
            rexpect: true,
        }
    }

    /// Modifies the expect_read flag. If this flag is set, we are expecting to read data from the
    /// other end, and will throw an error if nothing is read within the timeout.
    /// This flag merely affects the timeout behaviour; we keep trying to read even if we are not
    /// expecting any data, in order catch a closed connection early and optimize I/O when
    /// receiving multiple requests over one stream. Until the read buffer is full, that is.
    #[allow(dead_code)]
    pub fn set_expect_read(&mut self, ctx: &mut Context, x: bool) {
        self.rexpect = x;
        self.set_ioreg(ctx);
    }

    /// Updates the I/O registration with the current state and starts/stops timers as necessary.
    /// Should be called whenever something is consumed from the read buffer or something is added
    /// to the write buffer.
    pub fn set_ioreg(&mut self, ctx: &mut Context) {
        let oldreg = self.ioreg;

        // Optimization: If the write buffer was empty before but contains data now, we can already
        // try to write it out at this point. This way we can avoid going through the event loop if
        // the OS buffers have enough space.
        // TODO: Measure how effective this is in practice; if this only tends to happen on the
        // first write to a new socket then it might not be worth the effort.
        if self.wbuf.len() > 0 && !oldreg.contains(Ready::writable()) {
            // Ignore errors, if we catch an error we can handle it in the next loop iteration.
            let _ = self.wbuf.write_to(&mut self.sock);
        }

        let mut reg = Ready::none();

        if self.rbuf.len() < self.rmax {
            reg.insert(Ready::readable());
            if self.rexpect && !oldreg.contains(Ready::readable()) {
                ctx.timeout_set(self.rtime, self.rtimeout);
            }
        } else {
            ctx.timeout_unset(self.rtime);
        }

        if self.wbuf.len() > 0 {
            reg.insert(Ready::writable());
            if !oldreg.contains(Ready::writable()) {
                ctx.timeout_set(self.wtime, self.wtimeout);
            }
        } else {
            ctx.timeout_unset(self.wtime);
        }

        if reg != oldreg {
            if reg == Ready::none() {
                ctx.reg_unset(&self.sock, self.io);
            } else {
                ctx.reg_set(&self.sock, self.io, reg);
            }
        }
        self.ioreg = reg;
    }

    fn handle_io<F,G>(&mut self, cond: bool, io: F, tim: G) -> Result<usize>
            where F: FnOnce(&mut Self) -> Result<usize>, G: FnOnce(&mut Self) {
        if !cond {
            return Ok(0);
        }

        // Error conversion:
        // - Ok(0) is converted into a ConnectionReset error
        // - Temporary errors are converted into Ok(0)
        match io(self) {
            Err(err) => {
                match err.kind() {
                    ErrorKind::WouldBlock |
                    ErrorKind::Interrupted => {
                        Ok(0)
                    },
                    _ => {
                        Err(err)
                    }
                }
            },
            Ok(0) => {
                Err(Error::new(ErrorKind::ConnectionReset, "Connection reset by peer"))
            },
            Ok(n) => {
                tim(self);
                Ok(n)
            }
        }
    }

    /// Handle an IO event. Returns the (read_bytes, write_bytes) on success. Both can be 0 if the
    /// event was not intended for this steam or if the read/write operations didn't feel like
    /// doing anything. It's possible that data was read from the stream even if this method
    /// returns an error.
    pub fn handle(&mut self, ctx: &mut Context, ev: Event) -> Result<(usize, usize)> {
        if ev.token() != self.io {
            return Ok((0,0));
        }

        let canr = ev.kind().is_readable() && self.rbuf.len() < self.rmax;
        let canw = ev.kind().is_writable() && self.wbuf.len() > 0;

        let rd = try!(self.handle_io(canr,
            |s|{ s.rbuf.read_from(&mut s.sock) },
            |s|{ if s.rexpect { ctx.timeout_set(s.rtime, s.rtimeout) } }
        ));

        let wr = try!(self.handle_io(canw,
            |s|{ s.wbuf.write_to(&mut s.sock) },
            |s|{ ctx.timeout_set(s.wtime, s.wtimeout) }
        ));

        if rd > 0 || wr > 0 {
            self.set_ioreg(ctx);
        }
        Ok((rd,wr))
    }

    pub fn timeout(&mut self, t: TToken) -> Result<()> {
        if t == self.rtime {
            Err(Error::new(ErrorKind::TimedOut, "Read timeout"))
        } else if t == self.wtime {
            Err(Error::new(ErrorKind::TimedOut, "Write timeout"))
        } else {
            Ok(())
        }
    }

    pub fn remove(&mut self, ctx: &mut Context) {
        ctx.reg_free(&self.sock, self.io);
        ctx.timeout_free(self.rtime);
        ctx.timeout_free(self.wtime);
    }
}