diff options
Diffstat (limited to 'src/sock.rs')
-rw-r--r-- | src/sock.rs | 518 |
1 files changed, 518 insertions, 0 deletions
diff --git a/src/sock.rs b/src/sock.rs new file mode 100644 index 0000000..8102ebc --- /dev/null +++ b/src/sock.rs @@ -0,0 +1,518 @@ +use std::io::{self,Read,Write}; +use std::net::{TcpStream,SocketAddr}; +use std::process::Child; +use std::sync::{Mutex,Condvar}; + +use crate::err::Result; + + +// Returns an error if the argument is not a "Keyword" in the control protocol. The control spec +// mentions several types of "keyword"s; It is used to specify command names, settings and GETINFO +// keys, and each seem to have different parsing rules. This function is intended to validate user +// input in order to prevent protocol injection. It is therefore lenient enough in that it allows +// for the different "keyword" contexts, but should still be strict enough to prevent the string +// from being interpreted as something other than a keyword. +fn is_keyword(s: &str) -> Result<()> { + if s.contains(|c: char| !c.is_ascii_alphanumeric() && c != '/' && c != '-' && c != '_' && c != '.') { + return Err(err!(Keyword, s.to_string())) + } + Ok(()) +} + + +// A Reply (Sync or Async) consists of any number of intermediate lines (Mid/Data) followed by an +// End line. +type Reply = Vec<ReplyLine>; + + +#[derive(Debug,PartialEq)] +pub(crate) struct ReplyLine { + len: usize, // Length of this reply, in bytes, including the final CRLF + status: u16, + end: bool, // If this is a Mid/Data or End reply + text: String, + data: Vec<u8> +} + + +// Used in error message formatting +impl<'a> std::fmt::Display for ReplyLine { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} {}", self.status, self.text) + } +} + + +impl ReplyLine { + fn is_async(&self) -> bool { + self.status >= 600 && self.status < 700 + } + + fn parse_data(buf: &[u8]) -> Option<(usize,Vec<u8>)> { + // Yes, this actually does need an optimized find_bytes() implementation; With a naive + // iterator it will take quite a while to parse `GETINFO md/all`. + let mut off = match twoway::find_bytes(buf, b"\r\n.\r\n") { + None => return None, + Some(i) => i + }; + let mut data = &buf[..off]; + off += 5; + + // Look for a "\r\n." sequence and remove the dot. + let mut ndata = Vec::new(); + if data.len() > 0 && data[0] == b'.' { + data = &data[1..]; + } + while let Some(idx) = twoway::find_bytes(data, b"\r\n.") { + ndata.extend_from_slice(&data[..idx+2]); + data = &data[idx+3..]; + } + ndata.extend_from_slice(data); + Some((off, ndata)) + } + + // Returns None if there is not enough data in the buffer. + fn parse(buf: &[u8]) -> Result<Option<ReplyLine>> { + let mut off = match twoway::find_bytes(buf, b"\r\n") { + None => return Ok(None), + Some(i) if i < 4 => return Err(err!(Parse)), + Some(i) => i + }; + let line = &buf[..off]; + off += 2; + + let status : u16 = std::str::from_utf8(&line[..3]).map_err(|_| err!(Parse))?.parse().map_err(|_| err!(Parse))?; + let end = match line[3] { + b'-' => false, + b'+' => false, + b' ' => true, + _ => return Err(err!(Parse)), + }; + let text = std::str::from_utf8(&line[4..]).map_err(|_| err!(Parse))?; + + let data = if line[3] == b'+' { + if let Some((len, d)) = Self::parse_data(&buf[off..]) { + off += len; + d + } else { + return Ok(None) + } + } else { + Vec::new() + }; + + Ok(Some(ReplyLine { + len: off, + status: status, + end: end, + text: text.to_string(), + data: data, + })) + } +} + + +/// Authentication method and credentials used to authenticate to the Tor control socket. +/// The `COOKIE` and `SAFECOOKIE` authentication methods are not supported. +pub enum Auth { + /// Authenticate without credentials. + Null, + /// Authenticate with a password, this corresponds to the 'HashedControlPassword' configuration + /// option in torrc. + HashedPassword(String) +} + + + + +// A double-quoted string where only \ and " are escaped. +// Encoding: QuotedString("hello").to_string() +// Decoding: QuotedString::parse("\"hello\"") +struct QuotedString<'a>(&'a str); + +impl<'a> std::fmt::Display for QuotedString<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + use std::fmt::Write; + f.write_char('"')?; + for c in self.0.chars() { + if c == '"' || c == '\\' { + f.write_char('\\')?; + } + f.write_char(c)?; + } + f.write_char('"') + } +} + +impl<'a> QuotedString<'a> { + fn parse(s: &str) -> Result<String> { + if s.len() < 2 || !s.starts_with('"') || !s.ends_with('"') { + return Err(err!(Parse)) + } + let mut out = String::with_capacity(s.len()); + let mut q = false; + for c in (&s[1..s.len()-1]).chars() { + match (q, c) { + (true , _ ) => { q = false; out.push(c) }, + (false, '\\') => q = true, + (false, _ ) => out.push(c), + } + } + if q { + return Err(err!(Parse)) + } + Ok(out) + } +} + + + + +/// Tor control socket. +/// +/// A socket is created either by connecting to an existing Tor process using `Sock::connect()`, or +/// by calling `spawn()` to spawn a new Tor process. +/// +/// A socket can be used from multiple threads, with the following limitations: +/// - It is possible to run commands simultaneously from multiple threads, but the order in which +/// they are executed is, of course, not deterministic. Running a `setconf()` and `getconf()` on +/// the same keys at the same time may not give reliable results. +/// - You should only read asynchronous events from a single thread at a time. The API allows for +/// multiple threads to read events, but an event will only be received by one thread. +pub struct Sock { + sock: TcpStream, + // Should be set when tor is running as a child process and this socket has "ownership" of the process. + child: Option<Child>, + // Read buffer - only one thread can be reading from the socket at a time. + rdbuf: Mutex<Vec<u8>>, + // Read queue - if multiple threads are interested in reading from the socket, then this + // structure is used to coordinate which thread gets to read from the socket. If that thread + // happens to read a reply that it isn't interested in (e.g. it expected a sync reply and got + // an async one), then that reply is pushed to the queue. + queue: Mutex<SockQueue>, + queuecv: Condvar, + // Write mutex - this ensures only a single command can be sent at a time. This mutex is + // currently also held while reading the response of a command, because the read queue itself + // does not ensure that sync replies are consumed by different threads in the proper order. + // (Could probably be fixed by keeping track of a command sequence - Tor does support + // pipelining multiple commands, so this is a potential performance improvement) + writer: Mutex<bool>, +} + +struct SockQueue { + queue: Vec<Reply>, + reading: bool, +} + + +impl Drop for Sock { + fn drop(&mut self) { + // We have to wait() for the child to shut down, otherwise we get a zombie process. This is + // a quick-and-dirty approach which doesn't really give Tor the time to do a clean + // shutdown. In fact, we kill the process before even shutting down the socket. + // A cleaner "shut down socket and wait a bit" approach should probably be implemented as a + // separate method. + if let Some(mut c) = self.child.take() { + c.kill().is_ok(); + c.wait().is_ok(); + } + } +} + + +impl Sock { + fn read_line(buf: &mut Vec<u8>, sock: &TcpStream) -> Result<ReplyLine> { + // This buffer handling isn't very efficient, but that's probably fine. + loop { + if let Some(r) = ReplyLine::parse(&buf[..])? { + buf.drain(..r.len); + return Ok(r); + } + + let mut rd = [0u8; 512]; + match (&*sock).read(&mut rd) { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (), + Err(e) => return Err(e.into()), + Ok(0) => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Unexpected disconnect").into()), + Ok(l) => buf.extend_from_slice(&rd[..l]) + } + } + } + + fn read_reply(buf: &mut Vec<u8>, sock: &TcpStream) -> Result<Reply> { + let mut reply : Reply = Vec::new(); + while reply.last().map(|r| !r.end).unwrap_or(true) { + reply.push(Self::read_line(buf, sock)?); + } + Ok(reply) + } + + fn get_reply(&self, async_event: bool) -> Result<Reply> { + // Check the queue to see if our reply has already been read. + { + let mut queue = self.queue.lock().unwrap(); + loop { + if let Some(idx) = queue.queue.iter().position(|r| r[0].is_async() == async_event) { + return Ok(queue.queue.remove(idx)) + } + // Nothing in the queue, but another thread is currently reading, so let's wait. + if queue.reading { + queue = self.queuecv.wait(queue).unwrap(); + // Nothing in the queue and nobody else is reading, let's read from the socket. + } else { + queue.reading = true; + break; + } + } + } + + // We now have the read flag, so keep reading until we have our reply. + let ret = loop { + let mut buf = self.rdbuf.lock().unwrap(); + match Self::read_reply(&mut buf, &self.sock) { + Err(e) => break Err(e), + Ok(r) => { + if r[0].is_async() == async_event { + break Ok(r) + // This reply isn't for us, add it to the queue + } else { + self.queue.lock().unwrap().queue.push(r); + self.queuecv.notify_one(); + } + } + } + }; + self.queue.lock().unwrap().reading = false; + self.queuecv.notify_one(); + ret + } + + fn cmd<S: AsRef<[u8]>>(&self, cmd: S) -> Result<Reply> { + let _guard = self.writer.lock().unwrap(); + print!("C: {}", std::str::from_utf8(cmd.as_ref()).unwrap()); + + (&self.sock).write_all(cmd.as_ref())?; + let mut ret = self.get_reply(false)?; + if ret[0].status >= 200 && ret[0].status < 300 { + Ok(ret) + } else { + Err(err!(Status, ret.remove(0))) + } + } + + pub(crate) fn connect_child(s: &SocketAddr, auth: &Auth, child: Option<Child>) -> Result<Sock> { + let sock = Sock { + sock: TcpStream::connect(s)?, + child: child, + rdbuf: Mutex::new(Vec::new()), + queue: Mutex::new(SockQueue { + queue: Vec::new(), + reading: false, + }), + queuecv: Condvar::new(), + writer: Mutex::new(true), + }; + + match auth { + Auth::Null => sock.cmd("AUTHENTICATE\r\n")?, + Auth::HashedPassword(s) => sock.cmd(format!("AUTHENTICATE {}\r\n", QuotedString(s)))? + }; + Ok(sock) + } + + /// Connect to a running Tor process and authenticate using the given authentication method. + pub fn connect(s: &SocketAddr, auth: &Auth) -> Result<Sock> { + Self::connect_child(s, auth, None) + } + + /// Send a TAKEOWNERSHIP command. If this command has been acknowledged, the Tor process + /// will automatically shut down when this control socket is closed. + /// + /// Ownership is already implied if this socket has been created with the `spawn()` function, + /// so in that case you do not have to call this method. + pub fn takeownership(&self) -> Result<()> { + self.cmd("TAKEOWNERSHIP\r\n").map(|_|()) + } + + /// Send a DROPOWNERSHIP command. This reverses any earlier TAKEOWNERSHIP command and tells + /// the Tor process to keep running even after this control socket is closed. + /// + /// Note that, if this socket has been created with the `spawn()` function, then the Tor + /// process will still be killed when the socket is dropped. This method is only useful when + /// connected to an external Tor process. + pub fn dropownership(&self) -> Result<()> { + self.cmd("DROPOWNERSHIP\r\n").map(|_|()) + } + + /// Send a QUIT command. This tells Tor to close this control socket. Any further commands will + /// likely result in an error. + /// + /// The Tor process will exit if this control socket has called `takeownership()` before. + // TODO: Get rid of this function and implement a more thorough shutdown() instead? + pub fn quit(&self) -> Result<()> { + self.cmd("QUIT\r\n").map(|_|()) + } + + + // XXX: This IntoIterator works with &[..] and &vec![..]. Haven't tested HashMap/BTreeMap yet, + // but I suspect their signature doesn't match. + // Warning: There's no validation on the key string format, so this command allows protocol + // injection. + fn setresetconf<'a,T>(&self, mut msg: String, settings: T) -> Result<()> + where T: IntoIterator<Item = &'a (&'a str, Option<&'a str>)>, + { + use std::fmt::Write; + for (k, v) in settings { + is_keyword(k)?; + msg.push(' '); + msg.push_str(k); + if let Some(v) = v { + write!(msg, "={}", QuotedString(v)).is_ok(); + } + } + msg.push_str("\r\n"); + self.cmd(msg).map(|_|()) + } + + /// Send a SETCONF command. + pub fn setconf<'a,T>(&self, settings: T) -> Result<()> + where T: IntoIterator<Item = &'a (&'a str, Option<&'a str>)> + { + self.setresetconf("SETCONF".to_string(), settings) + } + + /// Send a RESETCONF command. + pub fn resetconf<'a,T>(&self, settings: T) -> Result<()> + where T: IntoIterator<Item = &'a (&'a str, Option<&'a str>)> + { + self.setresetconf("RESETCONF".to_string(), settings) + } + + /// Returns the configuration variables for the requested keys. The values are returned in the + /// order they were requested. Some keys may return multiple values, these will be listed + /// multiple times. Some keys may return `None` when are unset or default. An error is + /// returned if any of the requested keys does not exist. + /// + /// This method corresponds to the GETCONF command. + pub fn getconf<'a,T: IntoIterator<Item = &'a &'a str>>(&self, keys: T) -> Result<Vec<(String, Option<String>)>> { + let mut msg = "GETCONF".to_string(); + for k in keys { + is_keyword(k)?; + msg.push(' '); + msg.push_str(k); + } + msg.push_str("\r\n"); + + let mut res = Vec::new(); + for line in self.cmd(msg)? { + if let Some(is) = line.text.find('=') { + let val = &line.text[is+1..]; + let val = if val.starts_with('"') { QuotedString::parse(val)? } else { val.to_string() }; + res.push(( (&line.text[..is]).to_string(), Some(val) )); + } else { + res.push((line.text, None)); + } + } + Ok(res) + } + + /// Similar to the `getconf()` method, except this is used to get run-time variables that are + /// not saved in the configuration. An error is returned if any of the requested keys does not + /// exist. + /// + /// Corresponds to the GETINFO command. Refer to the GETINFO documentation in the [tor-control + /// specification](https://gitweb.torproject.org/torspec.git/blob/control-spec.txt) for the + /// list of accepted keys. + pub fn getinfo<'a,T: IntoIterator<Item = &'a &'a str>>(&self, keys: T) -> Result<Vec<(String, String)>> { + let mut msg = "GETINFO".to_string(); + for k in keys { + is_keyword(k)?; + msg.push(' '); + msg.push_str(k); + } + msg.push_str("\r\n"); + + let mut res = Vec::new(); + for line in self.cmd(msg)? { + if line.text == "OK" { + break; + } + if let Some(is) = line.text.find('=') { + let val = if line.data.len() > 0 { + String::from_utf8(line.data).map_err(|_|err!(Parse))? + } else { + (&line.text[is+1..]).to_string() + }; + res.push(( (&line.text[..is]).to_string(), val )); + } else { + return Err(err!(Parse)); + } + } + Ok(res) + } + + pub fn events(&'a self, list: ..) -> Result<Events<'a>> { + } +} + + +pub struct Events<'a> { + sock: &'a Sock +} + +impl<'a> Drop for Events<'a> { + fn drop(&mut self) { + self.sock.cmd("SETEVENTS\r\n").is_ok(); + } +} + +impl<'a> Iterator for Events<'a> { + type Item = Result<Event>; + fn next(&mut self) -> Option<Self::Item> { + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_replyline() { + assert!(ReplyLine::parse(b"\r\n").is_err()); + assert!(ReplyLine::parse(b"250\r\n").is_err()); + assert!(ReplyLine::parse(b"2500 Text\r\n").is_err()); + assert!(ReplyLine::parse(b"250.Text\r\n").is_err()); + assert!(ReplyLine::parse(b".50 Text\r\n").is_err()); + assert!(ReplyLine::parse(b"-50 Text\r\n").is_err()); + assert!(ReplyLine::parse(b"01x Text\r\n").is_err()); + + assert_eq!(ReplyLine::parse(b"").unwrap(), None); + assert_eq!(ReplyLine::parse(b"250 Hello\r").unwrap(), None); + assert_eq!(ReplyLine::parse(b"250+Hello\r\n").unwrap(), None); + assert_eq!(ReplyLine::parse(b"250+Hello\r\ndata\r\n..a\r\n.\r").unwrap(), None); + + assert_eq!(ReplyLine::parse(b"250 \r\n").unwrap(), Some(ReplyLine { len: 6, status: 250, end: true, text: "".to_string(), data: Vec::new() }) ); + assert_eq!(ReplyLine::parse(b"650-Hello\r\n").unwrap(), Some(ReplyLine { len: 11, status: 650, end: false, text: "Hello".to_string(), data: Vec::new() }) ); + assert_eq!(ReplyLine::parse(b"650+Hello\r\n.data\r\n.\r\n").unwrap(), Some(ReplyLine { len: 21, status: 650, end: false, text: "Hello".to_string(), data: b"data".to_vec() }) ); + assert_eq!(ReplyLine::parse(b"650+Hello\r\ndata\r\n..\r\n.\r\n").unwrap(), Some(ReplyLine { len: 24, status: 650, end: false, text: "Hello".to_string(), data: b"data\r\n.".to_vec() }) ); + } + + #[test] + fn qs_fmt() { + assert_eq!(QuotedString("").to_string(), "\"\"".to_string()); + assert_eq!(QuotedString(" \\a\"b \x02").to_string(), "\" \\\\a\\\"b \x02\"".to_string()); + } + + #[test] + fn qs_parse() { + assert!(QuotedString::parse("").is_err()); + assert!(QuotedString::parse("\"").is_err()); + assert!(QuotedString::parse("\" \\\"").is_err()); + assert!(QuotedString::parse("abc").is_err()); + + assert_eq!(QuotedString::parse("\"\"").unwrap(), "".to_string()); + assert_eq!(QuotedString::parse("\" \\\\a\\\"b \x02\"").unwrap(), " \\a\"b \x02".to_string()); + assert_eq!(QuotedString::parse("\"\\r\\n\"").unwrap(), "rn".to_string()); + } +} |