1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
use std::io::Write;
use std::sync::Arc;
use std::error::Error;
use std::net::TcpStream;
use netbuf::Buf;
use rustls::{Session,ClientSession,ClientConfig};
use msg::Message;
const VNDB_HOST: &'static str = "api.vndb.org";
const VNDB_PORT_RAW: u16 = 19534;
const VNDB_PORT_TLS: u16 = 19535;
pub struct Connection {
stream: TcpStream,
tls: Option<ClientSession>,
rbuf: Buf,
wbuf: Buf, // Only used for raw connections. In the case of TLS we directly write to the ClientSession buffer.
}
impl Connection {
// Connects to the remote API, but does not yet start the TLS handshake. This is deferred to
// the first recv(), to allow rustls to send the initial command during the handshake.
pub fn connect(hostname: Option<&str>, port: Option<u16>, tls: Option<&Arc<ClientConfig>>) -> Result<Connection,Box<Error>> {
let hostname = hostname.unwrap_or(VNDB_HOST);
let port = port.unwrap_or(if tls.is_some() { VNDB_PORT_TLS } else { VNDB_PORT_RAW });
Ok(Connection {
stream: TcpStream::connect((hostname, port))?,
tls: tls.map(|c| ClientSession::new(c, hostname)),
rbuf: Buf::new(),
wbuf: Buf::new(),
})
}
// Push a new command to the output buffer. Doesn't actually send anything until a recv() is
// performed. Can be used to pipeline multiple commands.
pub fn send(&mut self, cmd: &Message) {
let w = self.tls.as_mut().map(|x| x as &mut Write).unwrap_or(&mut self.wbuf);
write!(w, "{}\x04", cmd).unwrap();
}
// XXX: Both this io_tls and the io_raw code may hang if the OS send buffers are full and the
// server is providing backpressure while waiting for us to read from our recv buffers.
// Properly handling that scenario requires async I/O. In practice, the server will disconnect
// us before that happens.
fn io_tls(&mut self) -> Result<(), Box<Error>> {
let tls = self.tls.as_mut().unwrap();
while tls.wants_write() {
tls.write_tls(&mut self.stream)?;
}
if tls.wants_read() {
tls.read_tls(&mut self.stream)?;
tls.process_new_packets()?;
self.rbuf.read_from(tls)?;
}
Ok(())
}
fn io_raw(&mut self) -> Result<(), Box<Error>> {
while self.wbuf.len() > 0 {
self.wbuf.write_to(&mut self.stream)?;
}
self.rbuf.read_from(&mut self.stream)?;
Ok(())
}
// Send any outstanding commands and wait for a reply. Only returns a single response, if you
// expected multiple responses, call this method multiple times. This method blocks until at
// least one full response has been received.
pub fn recv(&mut self) -> Result<Message, Box<Error>> {
let len = {
while !self.rbuf.as_ref().contains(&4) {
if self.tls.is_some() {
self.io_tls()?
} else {
self.io_raw()?
}
}
self.rbuf.as_ref().split(|&c| c == 4).next().unwrap().len()
};
let m = Message::parse(::std::str::from_utf8(&self.rbuf[..len])?)?;
self.rbuf.consume(len+1);
Ok(m)
}
// WARNING: This method should not be called when there are still outstanding commands.
// (Might as well get rid of this convenient method to prevent that scenario?)
pub fn cmd(&mut self, cmd: &Message) -> Result<Message, Box<Error>> {
self.send(cmd);
self.recv()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn connect() {
let mut c = Connection::connect(None, None, Some(&::TLS_CONF)).unwrap();
c.send(&Message::parse("login {\"protocol\":1,\"client\":\"vndb-rust\",\"clientver\":\"0.1\"}").unwrap());
c.send(&Message::parse("dbstats").unwrap());
let r1 = format!("{}", c.recv().unwrap());
let r2 = format!("{}", c.recv().unwrap());
assert_eq!(&r1, "ok");
assert!(r2.starts_with("dbstats {"));
}
}
|