summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2020-05-24 16:07:43 +0200
committerYorhel <git@yorhel.nl>2020-05-24 16:29:33 +0200
commit38c1de17fae7ccda6ad44dd2530abbbd708303a8 (patch)
treec99a12846bda65e5153ce400abc3a3bac875e772
parent55561ec66efabc0f6a51dbe3c5295b4df8a414e2 (diff)
Update the "Meta" fetching action and have it sync blake3.packs too
Doing this in a single action ensures that share.blake3_packs always has corresponding rows in pack_indices and that outdated/deleted files are instantly removed from the DB. Also updated the httpclient to support a maximum response body size, to avoid consuming more RAM than we'd like.
-rw-r--r--Cargo.lock44
-rw-r--r--Cargo.toml1
-rw-r--r--sql/schema.sql4
-rw-r--r--src/action_meta.rs136
-rw-r--r--src/dispatch.rs8
-rw-r--r--src/httpclient.rs45
-rw-r--r--src/share_log.rs2
7 files changed, 200 insertions, 40 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ad64233..f513b32 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -74,6 +74,14 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "bendy"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "failure 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -169,6 +177,7 @@ dependencies = [
name = "chifs-hub"
version = "0.1.0"
dependencies = [
+ "bendy 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"blake3 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"chifs-common 0.1.0 (git+https://code.blicky.net/chifs/chifs-common-rs)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -266,6 +275,26 @@ dependencies = [
]
[[package]]
+name = "failure"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "backtrace 0.3.48 (registry+https://github.com/rust-lang/crates.io-index)",
+ "failure_derive 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "failure_derive"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "proc-macro2 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
+ "quote 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "syn 1.0.23 (registry+https://github.com/rust-lang/crates.io-index)",
+ "synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -897,6 +926,17 @@ dependencies = [
]
[[package]]
+name = "synstructure"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "proc-macro2 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
+ "quote 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "syn 1.0.23 (registry+https://github.com/rust-lang/crates.io-index)",
+ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "syslog"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1095,6 +1135,7 @@ dependencies = [
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
"checksum backtrace 0.3.48 (registry+https://github.com/rust-lang/crates.io-index)" = "0df2f85c8a2abbe3b7d7e748052fdd9b76a0458fdeb16ad4223f5eca78c7c130"
"checksum base64 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "53d1ccbaf7d9ec9537465a97bf19edc1a4e158ecb49fc16178202238c569cc42"
+"checksum bendy 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "16d3bd02b709d5a61b1380df54b409119298335609d0f56960fbd4d206e5a621"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum blake3 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "423897d97e11b810c9da22458400b28ec866991c711409073662eb34dc44bfff"
"checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
@@ -1116,6 +1157,8 @@ dependencies = [
"checksum data-encoding 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72aa14c04dfae8dd7d8a2b1cb7ca2152618cd01336dbfe704b8dcbf8d41dbd69"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3"
+"checksum failure 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86"
+"checksum failure_derive 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
"checksum fallible-iterator 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
"checksum fern 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e69ab0d5aca163e388c3a49d284fed6c3d0810700e77c5ae2756a50ec1a4daaa"
@@ -1195,6 +1238,7 @@ dependencies = [
"checksum stringprep 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
"checksum subtle 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee"
"checksum syn 1.0.23 (registry+https://github.com/rust-lang/crates.io-index)" = "95b5f192649e48a5302a13f2feb224df883b98933222369e4b3b0fe2a5447269"
+"checksum synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545"
"checksum syslog 4.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a0641142b4081d3d44beffa4eefd7346a228cdf91ed70186db2ca2cef762d327"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
diff --git a/Cargo.toml b/Cargo.toml
index 9144ecf..7e97fa7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ license = "MIT"
edition = "2018"
[dependencies]
+bendy = "0.3"
blake3 = "0.3"
chrono = { version = "0.4" }
chifs-common = { git = "https://code.blicky.net/chifs/chifs-common-rs" }
diff --git a/sql/schema.sql b/sql/schema.sql
index 487ab2e..8d4f775 100644
--- a/sql/schema.sql
+++ b/sql/schema.sql
@@ -50,6 +50,8 @@ CREATE TABLE shares (
total_size bigint
);
+CREATE INDEX shares_blake3_packs ON shares (blake3_packs);
+
CREATE TYPE share_action AS ENUM ('Meta');
@@ -110,6 +112,8 @@ CREATE TABLE pack_indices (
CONSTRAINT share_packs_pkey PRIMARY KEY(blake3, pack)
);
+CREATE INDEX pack_indices_pack ON pack_indices (pack) INCLUDE (blake3);
+
CREATE TABLE packs (
pack hash256 not null CONSTRAINT packs_pkey PRIMARY KEY,
needindex timestamp not null default now(), -- NULL if this pack has been fetched and indexed in the DB, otherwise timestamp when this pack has been first seen (for FIFO pack fetching?)
diff --git a/src/action_meta.rs b/src/action_meta.rs
index 04bb4c0..4e31e1b 100644
--- a/src/action_meta.rs
+++ b/src/action_meta.rs
@@ -1,47 +1,131 @@
-//use chrono::prelude::*;
-use blake3::Hash;
+use std::io::Read;
+use std::time::Instant;
use crate::httpclient::{Client,Error};
use crate::config::Config;
use crate::db::Pool;
use crate::share_log;
+const MAXMETALEN : u64 = 1024*1024;
+const MAXPACKSLEN : u64 = 64*1024*1024; // 64 MiB is 2 million pack files. Ought to be enough, right?
-struct Meta {
- version: u8,
- blake3_packs: Hash,
- title: Option<String>,
- contact: Option<String>
+
+fn fetch_meta(conn: &Client) -> Result<(Option<String>,Option<String>,Vec<u8>), Error> {
+ let t_start = Instant::now();
+ conn.get("/.chifs-share/meta")?;
+
+ let res = conn.recv(MAXMETALEN)?;
+ if res.code != 200 {
+ return Err(Error::BadStatus(res.code));
+ }
+
+ let mut title = None;
+ let mut contact = None;
+ let mut packs = None;
+ {
+ let mut buf = Vec::new();
+ (&*conn).read_to_end(&mut buf)?;
+ trace!("Fetched {}/.chifs-share/meta in {}s ({} bytes)", &conn.host, (Instant::now()-t_start).as_secs_f32(), buf.len());
+
+ if !buf.starts_with(b"d6:!chifs12:share-meta-1") {
+ return Err(Error::Custom("Unrecognized share meta format".to_string()));
+ }
+
+ let mut dec = bendy::decoding::Decoder::new(&buf[..]);
+ let mut dict = dec.next_object()?.unwrap().try_into_dictionary()?;
+ while let Some((k,v)) = dict.next_pair()? {
+ match k {
+ b"title" => title = Some(String::from_utf8(v.try_into_bytes()?.to_vec()).map_err(|_| Error::Custom("Invalid UTF-8 in title field".to_string()))?),
+ b"contact" => contact = Some(String::from_utf8(v.try_into_bytes()?.to_vec()).map_err(|_| Error::Custom("Invalid UTF-8 in contact field".to_string()))?),
+ b"blake3.packs" => packs = Some(v.try_into_bytes()?.to_vec()),
+ _ => ()
+ }
+ }
+ };
+ let packs = packs.ok_or_else(|| Error::Custom("No 'blake3.packs' found in metadata".to_string()))?;
+ Ok((title, contact, packs))
}
-fn fetch_meta(conf: &Config, addr: &str) -> Result<Meta, Error> {
- let conn = Client::socks_connect(&conf.tor_address, addr)?;
- conn.get("/.chifs-share/meta.json")?;
- let res = conn.recv()?;
+fn fetch_packs(conn: &Client, blake3: &[u8], buf: &mut Vec<u8>) -> Result<(), Error> {
+ let t_start = Instant::now();
+ conn.get("/.chifs-share/blake3.packs")?;
+ let res = conn.recv(MAXPACKSLEN)?;
if res.code != 200 {
return Err(Error::BadStatus(res.code));
}
- Err(Error::BadStatus(0)) // TODO
+ (&*conn).read_to_end(buf)?;
+ trace!("Fetched {}/.chifs-share/blake3.packs in {}s ({} bytes)", &conn.host, (Instant::now()-t_start).as_secs_f32(), buf.len());
+ if buf.len() % 32 != 0 {
+ return Err(Error::Custom("blake3.packs size is not a multiple of 32".to_string()));
+ }
+ if blake3::hash(&buf[..]).as_bytes() != blake3 {
+ return Err(Error::Custom("blake3.packs does not match the blake3 hash indicated in the metadata".to_string()));
+ }
+ Ok(())
+}
+
+
+fn update(conf: &Config, db: &Pool, shareid: i32, addr: &str, blake3: Option<&[u8]>) -> Result<(), Error> {
+ let conn = Client::socks_connect(&conf.tor_address, addr)?;
+ let (title, contact, new_blake3) = fetch_meta(&conn)?;
+
+ let mut packs : Vec<u8> = Vec::new();
+ if blake3 != Some(&new_blake3[..]) {
+ if db.get().query_one("SELECT 1 FROM pack_indices WHERE blake3 = $1::bytea::hash256 LIMIT 1", &[&new_blake3]).is_err() {
+ fetch_packs(&conn, &new_blake3[..], &mut packs)?;
+ }
+ }
+
+ let mut db = db.get();
+ let mut txn = db.transaction().unwrap();
- /*
- let meta : Meta = from_reader(&conn)?;
- if meta.version < 1 {
- return Err(Error::Custom(format!("Invalid metdata version {}", meta.version)));
+ let t_ins = Instant::now();
+ if !packs.is_empty() {
+ let q_ind = txn.prepare("INSERT INTO pack_indices (blake3, pack) VALUES ($1::bytea::hash256, $2::bytea::hash256) ON CONFLICT (blake3, pack) DO NOTHING").unwrap();
+ let q_packs = txn.prepare("INSERT INTO packs (pack) VALUES ($1::bytea::hash256) ON CONFLICT (pack) DO NOTHING").unwrap();
+ for p in packs.chunks_exact(32) {
+ txn.execute(&q_ind, &[&new_blake3, &p]).unwrap();
+ txn.execute(&q_packs, &[&p]).unwrap();
+ }
}
- Ok(meta)*/
+
+ let t_upd = Instant::now();
+ txn.execute(
+ "UPDATE shares SET title = $1, contact = $2, blake3_packs = $3::bytea::hash256 WHERE id = $4 AND (title,contact,blake3_packs) IS DISTINCT FROM ($1,$2,$3::bytea::hash256)",
+ &[&title, &contact, &new_blake3, &shareid]
+ ).unwrap();
+
+ let t_del = Instant::now();
+ if blake3 != Some(&new_blake3[..]) {
+ // Delete unreferenced pack_indices, packs and files. The latter is done automatically as
+ // part of an ON DELETE foreign key reference.
+ txn.execute("
+ WITH del(blake3) AS (SELECT $1::bytea::hash256 WHERE NOT EXISTS(SELECT 1 FROM shares WHERE blake3_packs = $1::bytea::hash256))
+ , x AS (
+ DELETE FROM packs
+ WHERE pack IN(SELECT pack FROM pack_indices WHERE blake3 IN(SELECT blake3 FROM del))
+ AND NOT EXISTS(SELECT 1 FROM pack_indices pi WHERE blake3 NOT IN(SELECT blake3 FROM del) AND pi.pack = packs.pack)
+ ) DELETE FROM pack_indices WHERE blake3 IN(SELECT blake3 FROM del)
+ ", &[&blake3]).unwrap();
+ }
+
+ let t_commit = Instant::now();
+ txn.commit().unwrap();
+
+ trace!("DB updates for Meta {} ({}); pack insert: {}ms ({}), share update: {}ms, pack del: {}ms, commit: {}ms",
+ shareid, addr,
+ (t_upd-t_ins).as_secs_f32()*1000., packs.len(),
+ (t_del-t_upd).as_secs_f32()*1000.,
+ (t_commit-t_del).as_secs_f32()*1000.,
+ (Instant::now()-t_commit).as_secs_f32()*1000.);
+
+ Ok(())
}
-pub fn run(conf: &Config, db: Pool, shareid: i32, addr: &str) {
+pub fn run(conf: &Config, db: Pool, shareid: i32, addr: &str, blake3: Option<&[u8]>) {
debug!("Fetching Meta for share {} ({})", shareid, addr);
- share_log::Action::Meta.run(db.clone(), shareid, |_|
- fetch_meta(conf, addr)
- .map(|meta| {
- db.get().execute("UPDATE shares SET title = $1, contact = $2, blake3_packs = $3 WHERE id = $4",
- &[&meta.title, &meta.contact, &&meta.blake3_packs.as_bytes()[..], &shareid]
- ).unwrap();
- })
- )
+ share_log::Action::Meta.run(db.clone(), shareid, |_| update(conf, &db, shareid, addr, blake3));
}
diff --git a/src/dispatch.rs b/src/dispatch.rs
index beb2c46..ceaa7c2 100644
--- a/src/dispatch.rs
+++ b/src/dispatch.rs
@@ -93,7 +93,7 @@ pub fn run(conf: Arc<Config>, db: Pool) {
// action is lagging behind. It also prevents losing updates to a race condition: If we fetch
// new data while old data is being indexed, it's possible that 'next_index' is being reset to
// NULL even when we do have new data to index. Yay, parallelism.
- let meta_q = conn.prepare("SELECT id, addr FROM shares WHERE next_meta <= NOW() ORDER BY next_meta LIMIT $1::int").unwrap();
+ let meta_q = conn.prepare("SELECT id, addr, blake3_packs::bytea FROM shares WHERE next_meta <= NOW() ORDER BY next_meta LIMIT $1::int").unwrap();
//let fetch_q = conn.prepare("SELECT id, addr FROM shares WHERE next_fetch <= NOW() AND next_index IS NULL ORDER BY next_fetch LIMIT $1::int").unwrap();
//let index_q = conn.prepare("SELECT id, addr, next_index FROM shares WHERE next_index <= NOW() ORDER BY next_index LIMIT $1::int").unwrap();
@@ -103,9 +103,9 @@ pub fn run(conf: Arc<Config>, db: Pool) {
handle_queue("Meta",
conf.meta_processes, &mut conn, &meta_q, &meta_tasks,
- |row| (row.get(0), (conf.clone(), db.clone(), row.get(0), row.get(1))),
- move |data: (Arc<Config>, Pool, i32, String)| {
- action_meta::run(&data.0, data.1, data.2, &data.3[..]);
+ |row| (row.get(0), (conf.clone(), db.clone(), row.get(0), row.get(1), row.get(2))),
+ move |data: (Arc<Config>, Pool, i32, String, Option<Vec<u8>>)| {
+ action_meta::run(&data.0, data.1, data.2, &data.3[..], data.4.as_deref());
}
).unwrap();
diff --git a/src/httpclient.rs b/src/httpclient.rs
index 53bd1f3..1de29f7 100644
--- a/src/httpclient.rs
+++ b/src/httpclient.rs
@@ -20,11 +20,13 @@ pub enum Error {
IO(io::Error),
Http(httparse::Error),
HttpHead(&'static str),
+ BodySize,
// The following errors are not used by this library itself, but are common errors when
// handling with responses and are thus included here to make it easier to re-use this Error
// type for users.
BadStatus(u16),
+ Bencode(bendy::decoding::Error),
Custom(String),
}
@@ -50,7 +52,9 @@ impl fmt::Display for Error {
IO(ref e) => write!(f, "IO error: {}", e),
Http(ref e) => write!(f, "Error reading HTTP response: {}", e),
HttpHead(s) => write!(f, "Error parsing HTTP {} header", s),
+ BodySize => write!(f, "Response body too large"),
BadStatus(s) => write!(f, "Unexpected HTTP response status ({})", s),
+ Bencode(e) => write!(f, "Error parsing bencode: {}", e),
Custom(s) => write!(f, "{}", s),
}
}
@@ -62,6 +66,12 @@ impl From<io::Error> for Error {
}
}
+impl From<bendy::decoding::Error> for Error {
+ fn from(e: bendy::decoding::Error) -> Error {
+ Error::Bencode(e)
+ }
+}
+
pub struct Response {
pub code: u16,
@@ -131,8 +141,10 @@ enum ReadState {
}
struct ReadData {
- buf: Vec<u8>,
- state: ReadState
+ buf: Vec<u8>,
+ state: ReadState,
+ read: u64, // Number of body bytes read so far
+ max: u64 // Maximum size of the body we're willing to accept
}
@@ -156,12 +168,11 @@ struct ReadData {
// This client is a bit overdesigned for the Hub use-case, we don't really need pipelining and
// keep-alive, but I expect that this code will also be used by download clients.
//
-// TODO:
-// - Chunked encoding (not terribly important for now, just an optimization)
-// - This code needs tests!
+// TODO: Actually support multiple requests when the server does a 'Connection: close'? The ChiFS
+// spec does mandate pipelining support, so considering that an error is probably alright.
pub struct Client {
+ pub host: String,
conn: Stream,
- host: String,
rd: Mutex<ReadData>,
}
@@ -231,7 +242,7 @@ impl Client {
let conn = Client { conn,
host: hostname.to_string(),
- rd: Mutex::new(ReadData { buf: Vec::new(), state: ReadState::None }),
+ rd: Mutex::new(ReadData { buf: Vec::new(), state: ReadState::None, read: 0, max: u64::max_value() }),
};
// A compliant implementation would wait for a server reply before sending the request, but
@@ -275,7 +286,7 @@ impl Client {
/// Receive a response from the remote server. The body of the response can be read using the
/// `Read` trait of this Client object. If the response doesn't include a body, then the first
/// `read()` will indicate EOF.
- pub fn recv(&self) -> Result<Response, Error> {
+ pub fn recv(&self, max_body: u64) -> Result<Response, Error> {
// Make sure to fully consume the body of any previous response.
std::io::copy(&mut (&*self), &mut std::io::sink())?;
@@ -300,6 +311,14 @@ impl Client {
else if !res.keepalive { ReadState::Close }
else { ReadState::None };
+ if let ReadState::Length(len) = rd.state {
+ if len > max_body {
+ return Err(Error::BodySize);
+ }
+ }
+
+ rd.read = 0;
+ rd.max = max_body;
return Ok(res)
}
}
@@ -343,7 +362,7 @@ impl Read for &Client {
}
let rbuf = &mut rd.buf;
- match rd.state {
+ let res = match rd.state {
ReadState::None => Ok(0),
ReadState::Close => {
if !rbuf.is_empty() {
@@ -393,7 +412,15 @@ impl Read for &Client {
}
Ok(len)
},
+ };
+
+ if let Ok(len) = res {
+ rd.read += len as u64;
+ if rd.read > rd.max {
+ return Err(io::Error::new(io::ErrorKind::InvalidInput, "Response body too large"));
+ }
}
+ res
}
}
diff --git a/src/share_log.rs b/src/share_log.rs
index ba5e0f1..c17d061 100644
--- a/src/share_log.rs
+++ b/src/share_log.rs
@@ -46,7 +46,7 @@ impl Action {
db.get().execute("
INSERT INTO share_log (share, action, error, start, duration)
- VALUES ($1, $2, $3, $4, $5, )",
+ VALUES ($1, $2, $3, $4, $5 )",
&[&log.share, &log.action, &log.error, &log.start, &log.duration]
).unwrap();
}