use std::io::{self,Read,Write}; use std::net::{TcpStream,SocketAddr}; use std::process::Child; use std::sync::{Mutex,Condvar}; use crate::err::Result; // Returns an error if the argument is not a "Keyword" in the control protocol. The control spec // mentions several types of "keyword"s; It is used to specify command names, settings and GETINFO // keys, and each seem to have different parsing rules. This function is intended to validate user // input in order to prevent protocol injection. It is therefore lenient enough in that it allows // for the different "keyword" contexts, but should still be strict enough to prevent the string // from being interpreted as something other than a keyword. fn is_keyword(s: &str) -> Result<()> { if s.contains(|c: char| !c.is_ascii_alphanumeric() && c != '/' && c != '-' && c != '_' && c != '.') { return Err(err!(Keyword, s.to_string())) } Ok(()) } // A Reply (Sync or Async) consists of any number of intermediate lines (Mid/Data) followed by an // End line. type Reply = Vec; #[derive(Debug,PartialEq)] pub(crate) struct ReplyLine { len: usize, // Length of this reply, in bytes, including the final CRLF status: u16, end: bool, // If this is a Mid/Data or End reply text: String, data: Vec } // Used in error message formatting impl<'a> std::fmt::Display for ReplyLine { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{} {}", self.status, self.text) } } impl ReplyLine { fn is_async(&self) -> bool { self.status >= 600 && self.status < 700 } fn parse_data(buf: &[u8]) -> Option<(usize,Vec)> { // Yes, this actually does need an optimized find_bytes() implementation; With a naive // iterator it will take quite a while to parse `GETINFO md/all`. let mut off = match twoway::find_bytes(buf, b"\r\n.\r\n") { None => return None, Some(i) => i }; let mut data = &buf[]; off += 5; // Look for a "\r\n." sequence and remove the dot. let mut ndata = Vec::new(); if data.len() > 0 && data[0] == b'.' { data = &data[1..]; } while let Some(idx) = twoway::find_bytes(data, b"\r\n.") { ndata.extend_from_slice(&data[..idx+2]); data = &data[idx+3..]; } ndata.extend_from_slice(data); Some((off, ndata)) } // Returns None if there is not enough data in the buffer. fn parse(buf: &[u8]) -> Result> { let mut off = match twoway::find_bytes(buf, b"\r\n") { None => return Ok(None), Some(i) if i < 4 => return Err(err!(Parse)), Some(i) => i }; let line = &buf[]; off += 2; let status : u16 = std::str::from_utf8(&line[..3]).map_err(|_| err!(Parse))?.parse().map_err(|_| err!(Parse))?; let end = match line[3] { b'-' => false, b'+' => false, b' ' => true, _ => return Err(err!(Parse)), }; let text = std::str::from_utf8(&line[4..]).map_err(|_| err!(Parse))?; let data = if line[3] == b'+' { if let Some((len, d)) = Self::parse_data(&buf[off..]) { off += len; d } else { return Ok(None) } } else { Vec::new() }; Ok(Some(ReplyLine { len: off, status: status, end: end, text: text.to_string(), data: data, })) } } /// Authentication method and credentials used to authenticate to the Tor control socket. /// The `COOKIE` and `SAFECOOKIE` authentication methods are not supported. pub enum Auth { /// Authenticate without credentials. Null, /// Authenticate with a password, this corresponds to the 'HashedControlPassword' configuration /// option in torrc. HashedPassword(String) } #[derive(Debug)] pub enum Event { // Log messages Debug(String), Info(String), Notice(String), Warn(String), Err(String), } // A double-quoted string where only \ and " are escaped. // Encoding: QuotedString("hello").to_string() // Decoding: QuotedString::parse("\"hello\"") struct QuotedString<'a>(&'a str); impl<'a> std::fmt::Display for QuotedString<'a> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { use std::fmt::Write; f.write_char('"')?; for c in self.0.chars() { if c == '"' || c == '\\' { f.write_char('\\')?; } f.write_char(c)?; } f.write_char('"') } } impl<'a> QuotedString<'a> { fn parse(s: &str) -> Result { if s.len() < 2 || !s.starts_with('"') || !s.ends_with('"') { return Err(err!(Parse)) } let mut out = String::with_capacity(s.len()); let mut q = false; for c in (&s[1..s.len()-1]).chars() { match (q, c) { (true , _ ) => { q = false; out.push(c) }, (false, '\\') => q = true, (false, _ ) => out.push(c), } } if q { return Err(err!(Parse)) } Ok(out) } } /// Tor control socket. /// /// A socket is created either by connecting to an existing Tor process using `Sock::connect()`, or /// by calling `spawn()` to spawn a new Tor process. /// /// A socket can be used from multiple threads, with the following limitations: /// - It is possible to run commands simultaneously from multiple threads, but the order in which /// they are executed is, of course, not deterministic. Running a `setconf()` and `getconf()` on /// the same keys at the same time may not give reliable results. /// - You should only read asynchronous events from a single thread at a time. The API allows for /// multiple threads to read events, but an event will only be received by one thread. pub struct Sock { sock: TcpStream, // Should be set when tor is running as a child process and this socket has "ownership" of the process. child: Option, // Read buffer - only one thread can be reading from the socket at a time. rdbuf: Mutex>, // Read queue - if multiple threads are interested in reading from the socket, then this // structure is used to coordinate which thread gets to read from the socket. If that thread // happens to read a reply that it isn't interested in (e.g. it expected a sync reply and got // an async one), then that reply is pushed to the queue. queue: Mutex, queuecv: Condvar, // Write mutex - this ensures only a single command can be sent at a time. This mutex is // currently also held while reading the response of a command, because the read queue itself // does not ensure that sync replies are consumed by different threads in the proper order. // (Could probably be fixed by keeping track of a command sequence - Tor does support // pipelining multiple commands, so this is a potential performance improvement) writer: Mutex, } struct SockQueue { queue: Vec, reading: bool, } impl Drop for Sock { fn drop(&mut self) { // We have to wait() for the child to shut down, otherwise we get a zombie process. This is // a quick-and-dirty approach which doesn't really give Tor the time to do a clean // shutdown. In fact, we kill the process before even shutting down the socket. // A cleaner "shut down socket and wait a bit" approach should probably be implemented as a // separate method. if let Some(mut c) = self.child.take() { c.kill().is_ok(); c.wait().is_ok(); } } } impl Sock { fn read_line(buf: &mut Vec, sock: &TcpStream) -> Result { // This buffer handling isn't very efficient, but that's probably fine. loop { if let Some(r) = ReplyLine::parse(&buf[..])? { buf.drain(..r.len); return Ok(r); } let mut rd = [0u8; 512]; match (&*sock).read(&mut rd) { Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (), Err(e) => return Err(e.into()), Ok(0) => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Unexpected disconnect").into()), Ok(l) => buf.extend_from_slice(&rd[..l]) } } } fn read_reply(buf: &mut Vec, sock: &TcpStream) -> Result { let mut reply : Reply = Vec::new(); while reply.last().map(|r| !r.end).unwrap_or(true) { reply.push(Self::read_line(buf, sock)?); } Ok(reply) } fn get_reply(&self, async_event: bool) -> Result { // Check the queue to see if our reply has already been read. { let mut queue = self.queue.lock().unwrap(); loop { if let Some(idx) = queue.queue.iter().position(|r| r[0].is_async() == async_event) { return Ok(queue.queue.remove(idx)) } // Nothing in the queue, but another thread is currently reading, so let's wait. if queue.reading { queue = self.queuecv.wait(queue).unwrap(); // Nothing in the queue and nobody else is reading, let's read from the socket. } else { queue.reading = true; break; } } } // We now have the read flag, so keep reading until we have our reply. let ret = loop { let mut buf = self.rdbuf.lock().unwrap(); match Self::read_reply(&mut buf, &self.sock) { Err(e) => break Err(e), Ok(r) => { if r[0].is_async() == async_event { break Ok(r) // This reply isn't for us, add it to the queue } else { self.queue.lock().unwrap().queue.push(r); self.queuecv.notify_one(); } } } }; self.queue.lock().unwrap().reading = false; self.queuecv.notify_one(); ret } fn cmd>(&self, cmd: S) -> Result { let _guard = self.writer.lock().unwrap(); print!("C: {}", std::str::from_utf8(cmd.as_ref()).unwrap()); (&self.sock).write_all(cmd.as_ref())?; let mut ret = self.get_reply(false)?; if ret[0].status >= 200 && ret[0].status < 300 { Ok(ret) } else { Err(err!(Status, ret.remove(0))) } } pub(crate) fn connect_child(s: &SocketAddr, auth: &Auth, child: Option) -> Result { let sock = Sock { sock: TcpStream::connect(s)?, child: child, rdbuf: Mutex::new(Vec::new()), queue: Mutex::new(SockQueue { queue: Vec::new(), reading: false, }), queuecv: Condvar::new(), writer: Mutex::new(true), }; match auth { Auth::Null => sock.cmd("AUTHENTICATE\r\n")?, Auth::HashedPassword(s) => sock.cmd(format!("AUTHENTICATE {}\r\n", QuotedString(s)))? }; Ok(sock) } /// Connect to a running Tor process and authenticate using the given authentication method. pub fn connect(s: &SocketAddr, auth: &Auth) -> Result { Self::connect_child(s, auth, None) } /// Send a TAKEOWNERSHIP command. If this command has been acknowledged, the Tor process /// will automatically shut down when this control socket is closed. /// /// Ownership is already implied if this socket has been created with the `spawn()` function, /// so in that case you do not have to call this method. pub fn takeownership(&self) -> Result<()> { self.cmd("TAKEOWNERSHIP\r\n").map(|_|()) } /// Send a DROPOWNERSHIP command. This reverses any earlier TAKEOWNERSHIP command and tells /// the Tor process to keep running even after this control socket is closed. /// /// Note that, if this socket has been created with the `spawn()` function, then the Tor /// process will still be killed when the socket is dropped. This method is only useful when /// connected to an external Tor process. pub fn dropownership(&self) -> Result<()> { self.cmd("DROPOWNERSHIP\r\n").map(|_|()) } /// Send a QUIT command. This tells Tor to close this control socket. Any further commands will /// likely result in an error. /// /// The Tor process will exit if this control socket has called `takeownership()` before. // TODO: Get rid of this function and implement a more thorough shutdown() instead? pub fn quit(&self) -> Result<()> { self.cmd("QUIT\r\n").map(|_|()) } // XXX: This IntoIterator works with &[..] and &vec![..]. Haven't tested HashMap/BTreeMap yet, // but I suspect their signature doesn't match. fn setresetconf<'a,T>(&self, mut msg: String, settings: T) -> Result<()> where T: IntoIterator)>, { use std::fmt::Write; for (k, v) in settings { is_keyword(k)?; msg.push(' '); msg.push_str(k); if let Some(v) = v { write!(msg, "={}", QuotedString(v)).is_ok(); } } msg.push_str("\r\n"); self.cmd(msg).map(|_|()) } /// Send a SETCONF command. pub fn setconf<'a,T>(&self, settings: T) -> Result<()> where T: IntoIterator)> { self.setresetconf("SETCONF".to_string(), settings) } /// Send a RESETCONF command. pub fn resetconf<'a,T>(&self, settings: T) -> Result<()> where T: IntoIterator)> { self.setresetconf("RESETCONF".to_string(), settings) } /// Returns the configuration variables for the requested keys. The values are returned in the /// order they were requested. Some keys may return multiple values, these will be listed /// multiple times. Some keys may return `None` when are unset or default. An error is /// returned if any of the requested keys does not exist. /// /// This method corresponds to the GETCONF command. pub fn getconf<'a,T: IntoIterator>(&self, keys: T) -> Result)>> { let mut msg = "GETCONF".to_string(); for k in keys { is_keyword(k)?; msg.push(' '); msg.push_str(k); } msg.push_str("\r\n"); let mut res = Vec::new(); for line in self.cmd(msg)? { if let Some(is) = line.text.find('=') { let val = &line.text[is+1..]; let val = if val.starts_with('"') { QuotedString::parse(val)? } else { val.to_string() }; res.push(( (&line.text[]).to_string(), Some(val) )); } else { res.push((line.text, None)); } } Ok(res) } /// Similar to the `getconf()` method, except this is used to get run-time variables that are /// not saved in the configuration. An error is returned if any of the requested keys does not /// exist. /// /// Corresponds to the GETINFO command. Refer to the GETINFO documentation in the [tor-control /// specification]( for the /// list of accepted keys. pub fn getinfo<'a,T: IntoIterator>(&self, keys: T) -> Result> { let mut msg = "GETINFO".to_string(); for k in keys { is_keyword(k)?; msg.push(' '); msg.push_str(k); } msg.push_str("\r\n"); let mut res = Vec::new(); for line in self.cmd(msg)? { if line.text == "OK" { break; } if let Some(is) = line.text.find('=') { let val = if > 0 { String::from_utf8(|_|err!(Parse))? } else { (&line.text[is+1..]).to_string() }; res.push(( (&line.text[]).to_string(), val )); } else { return Err(err!(Parse)); } } Ok(res) } // TODO: Create an enum for supported event types, rather than this string thing. We don't // support reading all types of events anyway. pub fn setevents<'a,T: IntoIterator>(&self, events: T) -> Result<()> { let mut msg = "SETEVENTS".to_string(); for e in events { is_keyword(e)?; msg.push(' '); msg.push_str(e); } msg.push_str("\r\n"); self.cmd(msg).map(|_|()) } /// Read an event from the socket. This method blocks until an event has been received. pub fn read_event(&self) -> Result { let mut ret = self.get_reply(true)?; let ev = ret.remove(0); fn logmsg(r: ReplyLine, skip: usize) -> String { if { (&r.text[skip..]).trim().to_owned() } else { String::from_utf8_lossy(& } } if ev.status == 650 && ev.text.starts_with("DEBUG") { Ok(Event::Debug(logmsg(ev, 6))) } else if ev.status == 650 && ev.text.starts_with("NOTICE") { Ok(Event::Notice(logmsg(ev, 7))) } else if ev.status == 650 && ev.text.starts_with("INFO") { Ok(Event::Info(logmsg(ev, 5))) } else if ev.status == 650 && ev.text.starts_with("WARN") { Ok(Event::Warn(logmsg(ev, 5))) } else if ev.status == 650 && ev.text.starts_with("ERR") { Ok(Event::Err(logmsg(ev, 4))) } else { Err(err!(UnknownEvent, ev.text)) } } } #[cfg(test)] mod tests { use super::*; #[test] fn parse_replyline() { assert!(ReplyLine::parse(b"\r\n").is_err()); assert!(ReplyLine::parse(b"250\r\n").is_err()); assert!(ReplyLine::parse(b"2500 Text\r\n").is_err()); assert!(ReplyLine::parse(b"250.Text\r\n").is_err()); assert!(ReplyLine::parse(b".50 Text\r\n").is_err()); assert!(ReplyLine::parse(b"-50 Text\r\n").is_err()); assert!(ReplyLine::parse(b"01x Text\r\n").is_err()); assert_eq!(ReplyLine::parse(b"").unwrap(), None); assert_eq!(ReplyLine::parse(b"250 Hello\r").unwrap(), None); assert_eq!(ReplyLine::parse(b"250+Hello\r\n").unwrap(), None); assert_eq!(ReplyLine::parse(b"250+Hello\r\ndata\r\n..a\r\n.\r").unwrap(), None); assert_eq!(ReplyLine::parse(b"250 \r\n").unwrap(), Some(ReplyLine { len: 6, status: 250, end: true, text: "".to_string(), data: Vec::new() }) ); assert_eq!(ReplyLine::parse(b"650-Hello\r\n").unwrap(), Some(ReplyLine { len: 11, status: 650, end: false, text: "Hello".to_string(), data: Vec::new() }) ); assert_eq!(ReplyLine::parse(b"650+Hello\r\\r\n.\r\n").unwrap(), Some(ReplyLine { len: 21, status: 650, end: false, text: "Hello".to_string(), data: b"data".to_vec() }) ); assert_eq!(ReplyLine::parse(b"650+Hello\r\ndata\r\n..\r\n.\r\n").unwrap(), Some(ReplyLine { len: 24, status: 650, end: false, text: "Hello".to_string(), data: b"data\r\n.".to_vec() }) ); } #[test] fn qs_fmt() { assert_eq!(QuotedString("").to_string(), "\"\"".to_string()); assert_eq!(QuotedString(" \\a\"b \x02").to_string(), "\" \\\\a\\\"b \x02\"".to_string()); } #[test] fn qs_parse() { assert!(QuotedString::parse("").is_err()); assert!(QuotedString::parse("\"").is_err()); assert!(QuotedString::parse("\" \\\"").is_err()); assert!(QuotedString::parse("abc").is_err()); assert_eq!(QuotedString::parse("\"\"").unwrap(), "".to_string()); assert_eq!(QuotedString::parse("\" \\\\a\\\"b \x02\"").unwrap(), " \\a\"b \x02".to_string()); assert_eq!(QuotedString::parse("\"\\r\\n\"").unwrap(), "rn".to_string()); } }