summaryrefslogtreecommitdiff
path: root/src/sock.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sock.rs')
-rw-r--r--src/sock.rs518
1 files changed, 518 insertions, 0 deletions
diff --git a/src/sock.rs b/src/sock.rs
new file mode 100644
index 0000000..8102ebc
--- /dev/null
+++ b/src/sock.rs
@@ -0,0 +1,518 @@
+use std::io::{self,Read,Write};
+use std::net::{TcpStream,SocketAddr};
+use std::process::Child;
+use std::sync::{Mutex,Condvar};
+
+use crate::err::Result;
+
+
+// Returns an error if the argument is not a "Keyword" in the control protocol. The control spec
+// mentions several types of "keyword"s; It is used to specify command names, settings and GETINFO
+// keys, and each seem to have different parsing rules. This function is intended to validate user
+// input in order to prevent protocol injection. It is therefore lenient enough in that it allows
+// for the different "keyword" contexts, but should still be strict enough to prevent the string
+// from being interpreted as something other than a keyword.
+fn is_keyword(s: &str) -> Result<()> {
+ if s.contains(|c: char| !c.is_ascii_alphanumeric() && c != '/' && c != '-' && c != '_' && c != '.') {
+ return Err(err!(Keyword, s.to_string()))
+ }
+ Ok(())
+}
+
+
+// A Reply (Sync or Async) consists of any number of intermediate lines (Mid/Data) followed by an
+// End line.
+type Reply = Vec<ReplyLine>;
+
+
+#[derive(Debug,PartialEq)]
+pub(crate) struct ReplyLine {
+ len: usize, // Length of this reply, in bytes, including the final CRLF
+ status: u16,
+ end: bool, // If this is a Mid/Data or End reply
+ text: String,
+ data: Vec<u8>
+}
+
+
+// Used in error message formatting
+impl<'a> std::fmt::Display for ReplyLine {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{} {}", self.status, self.text)
+ }
+}
+
+
+impl ReplyLine {
+ fn is_async(&self) -> bool {
+ self.status >= 600 && self.status < 700
+ }
+
+ fn parse_data(buf: &[u8]) -> Option<(usize,Vec<u8>)> {
+ // Yes, this actually does need an optimized find_bytes() implementation; With a naive
+ // iterator it will take quite a while to parse `GETINFO md/all`.
+ let mut off = match twoway::find_bytes(buf, b"\r\n.\r\n") {
+ None => return None,
+ Some(i) => i
+ };
+ let mut data = &buf[..off];
+ off += 5;
+
+ // Look for a "\r\n." sequence and remove the dot.
+ let mut ndata = Vec::new();
+ if data.len() > 0 && data[0] == b'.' {
+ data = &data[1..];
+ }
+ while let Some(idx) = twoway::find_bytes(data, b"\r\n.") {
+ ndata.extend_from_slice(&data[..idx+2]);
+ data = &data[idx+3..];
+ }
+ ndata.extend_from_slice(data);
+ Some((off, ndata))
+ }
+
+ // Returns None if there is not enough data in the buffer.
+ fn parse(buf: &[u8]) -> Result<Option<ReplyLine>> {
+ let mut off = match twoway::find_bytes(buf, b"\r\n") {
+ None => return Ok(None),
+ Some(i) if i < 4 => return Err(err!(Parse)),
+ Some(i) => i
+ };
+ let line = &buf[..off];
+ off += 2;
+
+ let status : u16 = std::str::from_utf8(&line[..3]).map_err(|_| err!(Parse))?.parse().map_err(|_| err!(Parse))?;
+ let end = match line[3] {
+ b'-' => false,
+ b'+' => false,
+ b' ' => true,
+ _ => return Err(err!(Parse)),
+ };
+ let text = std::str::from_utf8(&line[4..]).map_err(|_| err!(Parse))?;
+
+ let data = if line[3] == b'+' {
+ if let Some((len, d)) = Self::parse_data(&buf[off..]) {
+ off += len;
+ d
+ } else {
+ return Ok(None)
+ }
+ } else {
+ Vec::new()
+ };
+
+ Ok(Some(ReplyLine {
+ len: off,
+ status: status,
+ end: end,
+ text: text.to_string(),
+ data: data,
+ }))
+ }
+}
+
+
+/// Authentication method and credentials used to authenticate to the Tor control socket.
+/// The `COOKIE` and `SAFECOOKIE` authentication methods are not supported.
+pub enum Auth {
+ /// Authenticate without credentials.
+ Null,
+ /// Authenticate with a password, this corresponds to the 'HashedControlPassword' configuration
+ /// option in torrc.
+ HashedPassword(String)
+}
+
+
+
+
+// A double-quoted string where only \ and " are escaped.
+// Encoding: QuotedString("hello").to_string()
+// Decoding: QuotedString::parse("\"hello\"")
+struct QuotedString<'a>(&'a str);
+
+impl<'a> std::fmt::Display for QuotedString<'a> {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ use std::fmt::Write;
+ f.write_char('"')?;
+ for c in self.0.chars() {
+ if c == '"' || c == '\\' {
+ f.write_char('\\')?;
+ }
+ f.write_char(c)?;
+ }
+ f.write_char('"')
+ }
+}
+
+impl<'a> QuotedString<'a> {
+ fn parse(s: &str) -> Result<String> {
+ if s.len() < 2 || !s.starts_with('"') || !s.ends_with('"') {
+ return Err(err!(Parse))
+ }
+ let mut out = String::with_capacity(s.len());
+ let mut q = false;
+ for c in (&s[1..s.len()-1]).chars() {
+ match (q, c) {
+ (true , _ ) => { q = false; out.push(c) },
+ (false, '\\') => q = true,
+ (false, _ ) => out.push(c),
+ }
+ }
+ if q {
+ return Err(err!(Parse))
+ }
+ Ok(out)
+ }
+}
+
+
+
+
+/// Tor control socket.
+///
+/// A socket is created either by connecting to an existing Tor process using `Sock::connect()`, or
+/// by calling `spawn()` to spawn a new Tor process.
+///
+/// A socket can be used from multiple threads, with the following limitations:
+/// - It is possible to run commands simultaneously from multiple threads, but the order in which
+/// they are executed is, of course, not deterministic. Running a `setconf()` and `getconf()` on
+/// the same keys at the same time may not give reliable results.
+/// - You should only read asynchronous events from a single thread at a time. The API allows for
+/// multiple threads to read events, but an event will only be received by one thread.
+pub struct Sock {
+ sock: TcpStream,
+ // Should be set when tor is running as a child process and this socket has "ownership" of the process.
+ child: Option<Child>,
+ // Read buffer - only one thread can be reading from the socket at a time.
+ rdbuf: Mutex<Vec<u8>>,
+ // Read queue - if multiple threads are interested in reading from the socket, then this
+ // structure is used to coordinate which thread gets to read from the socket. If that thread
+ // happens to read a reply that it isn't interested in (e.g. it expected a sync reply and got
+ // an async one), then that reply is pushed to the queue.
+ queue: Mutex<SockQueue>,
+ queuecv: Condvar,
+ // Write mutex - this ensures only a single command can be sent at a time. This mutex is
+ // currently also held while reading the response of a command, because the read queue itself
+ // does not ensure that sync replies are consumed by different threads in the proper order.
+ // (Could probably be fixed by keeping track of a command sequence - Tor does support
+ // pipelining multiple commands, so this is a potential performance improvement)
+ writer: Mutex<bool>,
+}
+
+struct SockQueue {
+ queue: Vec<Reply>,
+ reading: bool,
+}
+
+
+impl Drop for Sock {
+ fn drop(&mut self) {
+ // We have to wait() for the child to shut down, otherwise we get a zombie process. This is
+ // a quick-and-dirty approach which doesn't really give Tor the time to do a clean
+ // shutdown. In fact, we kill the process before even shutting down the socket.
+ // A cleaner "shut down socket and wait a bit" approach should probably be implemented as a
+ // separate method.
+ if let Some(mut c) = self.child.take() {
+ c.kill().is_ok();
+ c.wait().is_ok();
+ }
+ }
+}
+
+
+impl Sock {
+ fn read_line(buf: &mut Vec<u8>, sock: &TcpStream) -> Result<ReplyLine> {
+ // This buffer handling isn't very efficient, but that's probably fine.
+ loop {
+ if let Some(r) = ReplyLine::parse(&buf[..])? {
+ buf.drain(..r.len);
+ return Ok(r);
+ }
+
+ let mut rd = [0u8; 512];
+ match (&*sock).read(&mut rd) {
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
+ Err(e) => return Err(e.into()),
+ Ok(0) => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Unexpected disconnect").into()),
+ Ok(l) => buf.extend_from_slice(&rd[..l])
+ }
+ }
+ }
+
+ fn read_reply(buf: &mut Vec<u8>, sock: &TcpStream) -> Result<Reply> {
+ let mut reply : Reply = Vec::new();
+ while reply.last().map(|r| !r.end).unwrap_or(true) {
+ reply.push(Self::read_line(buf, sock)?);
+ }
+ Ok(reply)
+ }
+
+ fn get_reply(&self, async_event: bool) -> Result<Reply> {
+ // Check the queue to see if our reply has already been read.
+ {
+ let mut queue = self.queue.lock().unwrap();
+ loop {
+ if let Some(idx) = queue.queue.iter().position(|r| r[0].is_async() == async_event) {
+ return Ok(queue.queue.remove(idx))
+ }
+ // Nothing in the queue, but another thread is currently reading, so let's wait.
+ if queue.reading {
+ queue = self.queuecv.wait(queue).unwrap();
+ // Nothing in the queue and nobody else is reading, let's read from the socket.
+ } else {
+ queue.reading = true;
+ break;
+ }
+ }
+ }
+
+ // We now have the read flag, so keep reading until we have our reply.
+ let ret = loop {
+ let mut buf = self.rdbuf.lock().unwrap();
+ match Self::read_reply(&mut buf, &self.sock) {
+ Err(e) => break Err(e),
+ Ok(r) => {
+ if r[0].is_async() == async_event {
+ break Ok(r)
+ // This reply isn't for us, add it to the queue
+ } else {
+ self.queue.lock().unwrap().queue.push(r);
+ self.queuecv.notify_one();
+ }
+ }
+ }
+ };
+ self.queue.lock().unwrap().reading = false;
+ self.queuecv.notify_one();
+ ret
+ }
+
+ fn cmd<S: AsRef<[u8]>>(&self, cmd: S) -> Result<Reply> {
+ let _guard = self.writer.lock().unwrap();
+ print!("C: {}", std::str::from_utf8(cmd.as_ref()).unwrap());
+
+ (&self.sock).write_all(cmd.as_ref())?;
+ let mut ret = self.get_reply(false)?;
+ if ret[0].status >= 200 && ret[0].status < 300 {
+ Ok(ret)
+ } else {
+ Err(err!(Status, ret.remove(0)))
+ }
+ }
+
+ pub(crate) fn connect_child(s: &SocketAddr, auth: &Auth, child: Option<Child>) -> Result<Sock> {
+ let sock = Sock {
+ sock: TcpStream::connect(s)?,
+ child: child,
+ rdbuf: Mutex::new(Vec::new()),
+ queue: Mutex::new(SockQueue {
+ queue: Vec::new(),
+ reading: false,
+ }),
+ queuecv: Condvar::new(),
+ writer: Mutex::new(true),
+ };
+
+ match auth {
+ Auth::Null => sock.cmd("AUTHENTICATE\r\n")?,
+ Auth::HashedPassword(s) => sock.cmd(format!("AUTHENTICATE {}\r\n", QuotedString(s)))?
+ };
+ Ok(sock)
+ }
+
+ /// Connect to a running Tor process and authenticate using the given authentication method.
+ pub fn connect(s: &SocketAddr, auth: &Auth) -> Result<Sock> {
+ Self::connect_child(s, auth, None)
+ }
+
+ /// Send a TAKEOWNERSHIP command. If this command has been acknowledged, the Tor process
+ /// will automatically shut down when this control socket is closed.
+ ///
+ /// Ownership is already implied if this socket has been created with the `spawn()` function,
+ /// so in that case you do not have to call this method.
+ pub fn takeownership(&self) -> Result<()> {
+ self.cmd("TAKEOWNERSHIP\r\n").map(|_|())
+ }
+
+ /// Send a DROPOWNERSHIP command. This reverses any earlier TAKEOWNERSHIP command and tells
+ /// the Tor process to keep running even after this control socket is closed.
+ ///
+ /// Note that, if this socket has been created with the `spawn()` function, then the Tor
+ /// process will still be killed when the socket is dropped. This method is only useful when
+ /// connected to an external Tor process.
+ pub fn dropownership(&self) -> Result<()> {
+ self.cmd("DROPOWNERSHIP\r\n").map(|_|())
+ }
+
+ /// Send a QUIT command. This tells Tor to close this control socket. Any further commands will
+ /// likely result in an error.
+ ///
+ /// The Tor process will exit if this control socket has called `takeownership()` before.
+ // TODO: Get rid of this function and implement a more thorough shutdown() instead?
+ pub fn quit(&self) -> Result<()> {
+ self.cmd("QUIT\r\n").map(|_|())
+ }
+
+
+ // XXX: This IntoIterator works with &[..] and &vec![..]. Haven't tested HashMap/BTreeMap yet,
+ // but I suspect their signature doesn't match.
+ // Warning: There's no validation on the key string format, so this command allows protocol
+ // injection.
+ fn setresetconf<'a,T>(&self, mut msg: String, settings: T) -> Result<()>
+ where T: IntoIterator<Item = &'a (&'a str, Option<&'a str>)>,
+ {
+ use std::fmt::Write;
+ for (k, v) in settings {
+ is_keyword(k)?;
+ msg.push(' ');
+ msg.push_str(k);
+ if let Some(v) = v {
+ write!(msg, "={}", QuotedString(v)).is_ok();
+ }
+ }
+ msg.push_str("\r\n");
+ self.cmd(msg).map(|_|())
+ }
+
+ /// Send a SETCONF command.
+ pub fn setconf<'a,T>(&self, settings: T) -> Result<()>
+ where T: IntoIterator<Item = &'a (&'a str, Option<&'a str>)>
+ {
+ self.setresetconf("SETCONF".to_string(), settings)
+ }
+
+ /// Send a RESETCONF command.
+ pub fn resetconf<'a,T>(&self, settings: T) -> Result<()>
+ where T: IntoIterator<Item = &'a (&'a str, Option<&'a str>)>
+ {
+ self.setresetconf("RESETCONF".to_string(), settings)
+ }
+
+ /// Returns the configuration variables for the requested keys. The values are returned in the
+ /// order they were requested. Some keys may return multiple values, these will be listed
+ /// multiple times. Some keys may return `None` when are unset or default. An error is
+ /// returned if any of the requested keys does not exist.
+ ///
+ /// This method corresponds to the GETCONF command.
+ pub fn getconf<'a,T: IntoIterator<Item = &'a &'a str>>(&self, keys: T) -> Result<Vec<(String, Option<String>)>> {
+ let mut msg = "GETCONF".to_string();
+ for k in keys {
+ is_keyword(k)?;
+ msg.push(' ');
+ msg.push_str(k);
+ }
+ msg.push_str("\r\n");
+
+ let mut res = Vec::new();
+ for line in self.cmd(msg)? {
+ if let Some(is) = line.text.find('=') {
+ let val = &line.text[is+1..];
+ let val = if val.starts_with('"') { QuotedString::parse(val)? } else { val.to_string() };
+ res.push(( (&line.text[..is]).to_string(), Some(val) ));
+ } else {
+ res.push((line.text, None));
+ }
+ }
+ Ok(res)
+ }
+
+ /// Similar to the `getconf()` method, except this is used to get run-time variables that are
+ /// not saved in the configuration. An error is returned if any of the requested keys does not
+ /// exist.
+ ///
+ /// Corresponds to the GETINFO command. Refer to the GETINFO documentation in the [tor-control
+ /// specification](https://gitweb.torproject.org/torspec.git/blob/control-spec.txt) for the
+ /// list of accepted keys.
+ pub fn getinfo<'a,T: IntoIterator<Item = &'a &'a str>>(&self, keys: T) -> Result<Vec<(String, String)>> {
+ let mut msg = "GETINFO".to_string();
+ for k in keys {
+ is_keyword(k)?;
+ msg.push(' ');
+ msg.push_str(k);
+ }
+ msg.push_str("\r\n");
+
+ let mut res = Vec::new();
+ for line in self.cmd(msg)? {
+ if line.text == "OK" {
+ break;
+ }
+ if let Some(is) = line.text.find('=') {
+ let val = if line.data.len() > 0 {
+ String::from_utf8(line.data).map_err(|_|err!(Parse))?
+ } else {
+ (&line.text[is+1..]).to_string()
+ };
+ res.push(( (&line.text[..is]).to_string(), val ));
+ } else {
+ return Err(err!(Parse));
+ }
+ }
+ Ok(res)
+ }
+
+ pub fn events(&'a self, list: ..) -> Result<Events<'a>> {
+ }
+}
+
+
+pub struct Events<'a> {
+ sock: &'a Sock
+}
+
+impl<'a> Drop for Events<'a> {
+ fn drop(&mut self) {
+ self.sock.cmd("SETEVENTS\r\n").is_ok();
+ }
+}
+
+impl<'a> Iterator for Events<'a> {
+ type Item = Result<Event>;
+ fn next(&mut self) -> Option<Self::Item> {
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn parse_replyline() {
+ assert!(ReplyLine::parse(b"\r\n").is_err());
+ assert!(ReplyLine::parse(b"250\r\n").is_err());
+ assert!(ReplyLine::parse(b"2500 Text\r\n").is_err());
+ assert!(ReplyLine::parse(b"250.Text\r\n").is_err());
+ assert!(ReplyLine::parse(b".50 Text\r\n").is_err());
+ assert!(ReplyLine::parse(b"-50 Text\r\n").is_err());
+ assert!(ReplyLine::parse(b"01x Text\r\n").is_err());
+
+ assert_eq!(ReplyLine::parse(b"").unwrap(), None);
+ assert_eq!(ReplyLine::parse(b"250 Hello\r").unwrap(), None);
+ assert_eq!(ReplyLine::parse(b"250+Hello\r\n").unwrap(), None);
+ assert_eq!(ReplyLine::parse(b"250+Hello\r\ndata\r\n..a\r\n.\r").unwrap(), None);
+
+ assert_eq!(ReplyLine::parse(b"250 \r\n").unwrap(), Some(ReplyLine { len: 6, status: 250, end: true, text: "".to_string(), data: Vec::new() }) );
+ assert_eq!(ReplyLine::parse(b"650-Hello\r\n").unwrap(), Some(ReplyLine { len: 11, status: 650, end: false, text: "Hello".to_string(), data: Vec::new() }) );
+ assert_eq!(ReplyLine::parse(b"650+Hello\r\n.data\r\n.\r\n").unwrap(), Some(ReplyLine { len: 21, status: 650, end: false, text: "Hello".to_string(), data: b"data".to_vec() }) );
+ assert_eq!(ReplyLine::parse(b"650+Hello\r\ndata\r\n..\r\n.\r\n").unwrap(), Some(ReplyLine { len: 24, status: 650, end: false, text: "Hello".to_string(), data: b"data\r\n.".to_vec() }) );
+ }
+
+ #[test]
+ fn qs_fmt() {
+ assert_eq!(QuotedString("").to_string(), "\"\"".to_string());
+ assert_eq!(QuotedString(" \\a\"b \x02").to_string(), "\" \\\\a\\\"b \x02\"".to_string());
+ }
+
+ #[test]
+ fn qs_parse() {
+ assert!(QuotedString::parse("").is_err());
+ assert!(QuotedString::parse("\"").is_err());
+ assert!(QuotedString::parse("\" \\\"").is_err());
+ assert!(QuotedString::parse("abc").is_err());
+
+ assert_eq!(QuotedString::parse("\"\"").unwrap(), "".to_string());
+ assert_eq!(QuotedString::parse("\" \\\\a\\\"b \x02\"").unwrap(), " \\a\"b \x02".to_string());
+ assert_eq!(QuotedString::parse("\"\\r\\n\"").unwrap(), "rn".to_string());
+ }
+}