summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2020-06-02 10:13:38 +0200
committerYorhel <git@yorhel.nl>2020-06-02 10:13:40 +0200
commitc96a72a24da24d7213ea66b361e3bb406839726f (patch)
tree3c601e81483a414070ca1a5fba7618d20326d967
parent5a94e815037034deed218326bd8b0edce19ab927 (diff)
Add initial pack indexer
Logging and error reporting is bad and it doesn't handle a few error cases, but we've got to start somewhere.
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml2
-rw-r--r--sql/schema.sql7
-rw-r--r--src/config.rs9
-rw-r--r--src/db.rs10
-rw-r--r--src/dispatch/meta.rs11
-rw-r--r--src/dispatch/mod.rs33
-rw-r--r--src/dispatch/pack.rs279
8 files changed, 317 insertions, 35 deletions
diff --git a/Cargo.lock b/Cargo.lock
index acf2cb7..7e079d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -195,6 +195,7 @@ dependencies = [
"postgres 0.17.3 (registry+https://github.com/rust-lang/crates.io-index)",
"postgres-types 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "zstd 0.4.28+zstd.1.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 0f37aec..47b043c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,4 +30,4 @@ regex = "1.3"
#serde_derive = "1.0"
#serde_urlencoded = "0.6"
#serde_yaml = "0.8"
-#zstd = "0.4"
+zstd = "0.4"
diff --git a/sql/schema.sql b/sql/schema.sql
index f1751fb..33da3c9 100644
--- a/sql/schema.sql
+++ b/sql/schema.sql
@@ -116,9 +116,9 @@ CREATE INDEX pack_indices_pack ON pack_indices (pack) INCLUDE (blake3);
CREATE TABLE packs (
pack hash256 not null CONSTRAINT packs_pkey PRIMARY KEY,
- needindex timestamp not null default now(), -- NULL if this pack has been fetched and indexed in the DB, otherwise timestamp when this pack has been first seen (for FIFO pack fetching?)
+ needindex timestamp default now(), -- NULL if this pack has been fetched and indexed in the DB, otherwise timestamp when this pack has been first seen (for FIFO pack fetching?)
files integer, -- Number of files in this pack, allowing for efficient sum(files) to get unique file count of a share.
- size integer -- Sum of file sizes in this pack, allowing for efficient sum(size) to get total share size.
+ size bigint -- Sum of file sizes in this pack, allowing for efficient sum(size) to get total share size.
);
-- Files in packs
@@ -129,8 +129,7 @@ CREATE TABLE files (
blake3 hash256 not null,
size bigint not null,
mime text,
- meta text,
- search tsvector not null, -- TODO: generated column
+ meta bytea,
CONSTRAINT files_pkey PRIMARY KEY(pack, blake3)
);
diff --git a/src/config.rs b/src/config.rs
index c6893a9..2c02aec 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -6,8 +6,7 @@ pub struct Config {
pub database_url: String,
pub tor_address: SocketAddr,
pub meta_processes: i32,
- pub fetch_processes: i32,
- pub index_processes: i32,
+ pub pack_processes: i32,
pub log_file: String,
pub log_level: log::LevelFilter,
pub listen: (SocketAddr,Option<u32>),
@@ -22,8 +21,7 @@ impl Default for Config {
database_url: "postgresql://chifs@localhost:5432".to_string(),
tor_address: "127.0.0.1:9050".parse().unwrap(),
meta_processes: 10,
- fetch_processes: 10,
- index_processes: 1,
+ pack_processes: 10,
log_file: "none".to_string(),
log_level: log::LevelFilter::Debug,
listen: ("127.0.0.1:9080".parse().unwrap(), None),
@@ -52,8 +50,7 @@ impl Config {
"DatabaseUrl" => self.database_url = line.arg,
"TorAddress" => self.tor_address = line.sockaddr()?,
"MetaProcesses" => self.meta_processes = line.u64()? as i32, // TODO: bounds check
- "FetchProcesses"=> self.fetch_processes = line.u64()? as i32, // TODO: bounds check
- "IndexProcesses"=> self.index_processes = line.u64()? as i32, // TODO: bounds check
+ "PackProcesses" => self.pack_processes = line.u64()? as i32, // TODO: bounds check
"LogFile" => self.log_file = line.arg,
"LogLevel" => self.log_level = line.log_level()?,
"Listen" => self.listen = line.listenaddr()?,
diff --git a/src/db.rs b/src/db.rs
index be57a53..3cc50a6 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -67,19 +67,17 @@ impl DerefMut for Conn {
impl Pool {
pub fn new(conf: &Config) -> Pool {
// Maximum number of connections:
- // - Each index task will take a connection for its entire duration, so must be larger than
- // conf.index_processes.
- // - The dispatcher will keep one connection for its entire lifetime, too.
+ // - The dispatcher will keep one connection for its entire lifetime.
// - All other actions minimize their use of the database, so we won't need many connections
- // for those (2*num_cpus would likely be the maximum sensible value here). Currently
- // hardcoded to 1.
+ // for those (2*num_cpus would likely be the maximum sensible value here).
+ // Let's stick with hardcoded 4 connection for now.
Pool(Arc::new(PoolImpl {
state: Mutex::new(PoolState {
conns: Vec::new(),
lent: 0,
}),
cv: Condvar::new(),
- max: conf.index_processes as usize + 1 + 1,
+ max: 4,
dburl: conf.database_url.to_string(),
}))
// TODO: Ensure the latest database migrations have been performed.
diff --git a/src/dispatch/meta.rs b/src/dispatch/meta.rs
index f00acf9..ee56051 100644
--- a/src/dispatch/meta.rs
+++ b/src/dispatch/meta.rs
@@ -1,3 +1,12 @@
+/* This implements the 'Meta' action, which does the following:
+ *
+ * - Fetch a share's .chifs-share/meta file
+ * - If the blake3.packs hash is different from the DB, also fetch that file
+ * - Update the `shares` table with the newly fetched metdata
+ * - Insert all newly found pack hashes
+ * (not their contents, that's the responsibility of the 'Pack' action)
+ * - Delete all packs and files not found in the new blake3.packs list
+ */
use std::io::Read;
use std::time::Instant;
use std::convert::TryFrom;
@@ -75,7 +84,7 @@ fn update(conf: &Config, db: &Pool, log: &Log, shareid: i32, addr: &str, blake3:
let t_packs = Instant::now();
let mut packs : Vec<u8> = Vec::new();
- if blake3 != Some(new_blake3) && db.get().query_one("SELECT 1 FROM pack_indices WHERE blake3 = $1 LIMIT 1", &[&new_blake3]).is_err() {
+ if blake3 != Some(new_blake3) && db.get().query_opt("SELECT 1 FROM pack_indices WHERE blake3 = $1 LIMIT 1", &[&new_blake3]).unwrap().is_none() {
fetch_packs(&conn, &new_blake3, &mut packs)?;
}
let t_packs_end = Instant::now();
diff --git a/src/dispatch/mod.rs b/src/dispatch/mod.rs
index bd2349d..ce6f13a 100644
--- a/src/dispatch/mod.rs
+++ b/src/dispatch/mod.rs
@@ -3,18 +3,17 @@ use std::hash::Hash as StdHash;
use std::sync::{Arc,Mutex};
use std::collections::HashSet;
use std::time::Duration;
-//use chrono::prelude::*;
use fallible_iterator::FallibleIterator;
use chifs_common::Hash;
-mod meta;
mod log;
+mod meta;
+mod pack;
use crate::db::Pool;
use crate::config::Config;
-// Not sure why I generalized this, T is prolly always going to be an i32.
struct TaskGuard<T: Display + StdHash + Eq + Copy + Clone> {
lst: Arc<Mutex<HashSet<T>>>,
id: T
@@ -79,24 +78,16 @@ fn handle_queue<S,T,F,G>(action: &str, num: i32, conn: &mut postgres::Client, qu
pub fn run(conf: Arc<Config>, db: Pool) {
- let meta_tasks : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
- //let fetch_tasks : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
- //let index_tasks : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
+ let meta_tasks : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
+ let pack_tasks : Arc<Mutex<HashSet<Hash>>> = Arc::new(Mutex::new(HashSet::new()));
let mut conn = db.get();
conn.batch_execute("LISTEN share_action;").unwrap();
- // TODO: These queries probably need (partial) indices on the next_* fields to avoid a
- // sequential table scan, but I don't have enough test data at the moment to convince Postgres
- // to actually use such an index.
- //
- // The additional clause in fetch_q is to ensure that we don't fetch data while the 'Index'
- // action is lagging behind. It also prevents losing updates to a race condition: If we fetch
- // new data while old data is being indexed, it's possible that 'next_index' is being reset to
- // NULL even when we do have new data to index. Yay, parallelism.
- let meta_q = conn.prepare("SELECT id, addr, blake3_packs FROM shares WHERE next_meta <= NOW() ORDER BY next_meta LIMIT $1::int").unwrap();
- //let fetch_q = conn.prepare("SELECT id, addr FROM shares WHERE next_fetch <= NOW() AND next_index IS NULL ORDER BY next_fetch LIMIT $1::int").unwrap();
- //let index_q = conn.prepare("SELECT id, addr, next_index FROM shares WHERE next_index <= NOW() ORDER BY next_index LIMIT $1::int").unwrap();
+ // TODO: These queries probably need (partial) indices to avoid sequential table scans.
+ // They'll be executed a lot, so this need to be fast.
+ let meta_q = conn.prepare("SELECT id, addr, blake3_packs FROM shares WHERE next_meta <= NOW() ORDER BY next_meta LIMIT $1::int").unwrap();
+ let pack_q = conn.prepare("SELECT pi.blake3 FROM pack_indices pi JOIN packs p ON p.pack = pi.pack WHERE p.needindex IS NOT NULL AND p.needindex < NOW() GROUP BY pi.blake3 ORDER BY MIN(p.needindex) LIMIT $1::int").unwrap();
loop {
// Consume all outstanding notifications before we read the queues
@@ -110,6 +101,14 @@ pub fn run(conf: Arc<Config>, db: Pool) {
}
).unwrap();
+ handle_queue("Pack",
+ conf.pack_processes, &mut conn, &pack_q, &pack_tasks,
+ |row| (row.get(0), (conf.clone(), db.clone(), row.get(0))),
+ move |data: (Arc<Config>, Pool, Hash)| {
+ pack::run(&data.0, data.1, data.2);
+ }
+ ).unwrap();
+
trace!("Dispatcher going to sleep");
let r = conn.notifications().timeout_iter(Duration::from_secs(60)).next().unwrap();
trace!("Dispatcher woken up by {}", if r.is_none() { "timeout" } else { "notification"});
diff --git a/src/dispatch/pack.rs b/src/dispatch/pack.rs
new file mode 100644
index 0000000..a153d5b
--- /dev/null
+++ b/src/dispatch/pack.rs
@@ -0,0 +1,279 @@
+/* This implements the 'Pack' action, which is responsible for fetching pack files and indexing
+ * their contents in the database.
+ *
+ * A single run() takes the blake3 hash of a pack_index and tries to fetch/index several of its
+ * packs. It would be much simpler to call run() for each individual pack, but that has a few
+ * disadvantages:
+ * - Multiple run()s are performed in parallel, but it makes a lot more sense to fetch packs from
+ * different shares in parallel rather than multiple packs from a single share. Splitting the
+ * work into pack_index hashes makes sure we only have a single 'Pack' action connected to a
+ * share at a time.
+ * - We can fetch multiple packs from a share using pipelining, reducing the effects of Tor network
+ * latency.
+ * - Pack files can be relatively small, updating the database for each pack can have high
+ * overhead.
+ */
+use std::time::Instant;
+use std::io::{Read,Write};
+use bendy::decoding::{Object,Decoder,FromBencode};
+use chifs_common::{Hash,util::VecFill};
+
+use std::sync::Arc;
+use crate::httpclient::{Client,Error};
+use crate::db::Pool;
+use crate::config::Config;
+
+// Maximum number of shares to try fetching a single pack_index from.
+// Shares will be tried in random order, moving to the next share on error.
+const MAX_SHARES : i32 = 5;
+
+// Maximum number of packs to try fetching in a single run().
+const MAX_PACKS : i32 = 10_000;
+
+// Number of pack requests to send in advance.
+// Ideally this should approximate (available_bandwidth/avg_pack_size)*network_latency, but a
+// hardcoded high-enough number ought to work well enough.
+const PIPELINE : usize = 32;
+
+const MAX_PACKSIZE : usize = 16*1024*1024;
+
+
+// Curious that Object does not implement ToBencode.
+// No matter, a custom encoder is simple enough.
+fn encode_object(obj: Object, buf: &mut Vec<u8>) -> Result<(), Error> {
+ match obj {
+ Object::List(mut l) => {
+ buf.push(b'l');
+ while let Some(e) = l.next_object()? {
+ encode_object(e, buf)?
+ }
+ buf.push(b'e');
+ },
+ Object::Dict(mut d) => {
+ buf.push(b'd');
+ while let Some((k,v)) = d.next_pair()? {
+ encode_object(Object::Bytes(k), buf)?;
+ encode_object(v, buf)?;
+ }
+ buf.push(b'e');
+ },
+ Object::Integer(i) => {
+ buf.push(b'i');
+ buf.extend_from_slice(i.as_bytes());
+ buf.push(b'e');
+ },
+ Object::Bytes(b) => {
+ let _ = write!(buf, "{}:", buf.len());
+ buf.extend_from_slice(b);
+ },
+ }
+ Ok(())
+}
+
+
+struct File<'a> {
+ paths: Vec<&'a str>,
+ blake3: Hash,
+ size: i64,
+ mime: Option<&'a str>,
+ pieces: Option<&'a [u8]>,
+ meta: Option<Vec<u8>>,
+}
+
+fn parse_file<'obj,'ser>(obj: Object<'obj,'ser>) -> Result<File<'ser>,Error> {
+ let mut file = File {
+ paths: Vec::new(),
+ blake3: [0u8;32].into(),
+ size: -1,
+ mime: None,
+ pieces: None,
+ meta: None,
+ };
+ let mut meta = b"d".to_vec();
+
+ let mut dict = obj.try_into_dictionary()?;
+ while let Some((k,v)) = dict.next_pair()? {
+ match k {
+ b"paths" => {
+ let mut lst = v.try_into_list()?;
+ while let Some(p) = lst.next_object()? {
+ let path = std::str::from_utf8(p.try_into_bytes()?).map_err(|_| Error::Custom("Invalid UTF-8 in path".to_string()))?;
+ if !chifs_common::util::is_valid_path(&path) {
+ return Err(Error::Custom(format!("Invalid path: {}", path)));
+ }
+ file.paths.push(path)
+ }
+ },
+ b"blake3" => file.blake3 = Hash::decode_bencode_object(v)?,
+ b"size" => file.size = i64::decode_bencode_object(v)?,
+ b"mime" => file.mime = Some(std::str::from_utf8(v.try_into_bytes()?).map_err(|_| Error::Custom("Invalid UTF-8 in mime field".to_string()))?),
+ b"blake3 pieces" => file.pieces = Some(v.try_into_bytes()?),
+ _ => {
+ if std::str::from_utf8(k).is_err() {
+ return Err(Error::Custom("Invalid UTF-8 in metadata key".to_string()));
+ }
+ encode_object(Object::Bytes(k), &mut meta)?;
+ encode_object(v, &mut meta)?;
+ }
+ }
+ }
+
+ if meta != b"d" {
+ meta.push(b'e');
+ file.meta = Some(meta);
+ }
+
+ if file.size < 0 {
+ return Err(Error::Custom("Invalid or missing file size in file metadata".to_string()));
+ }
+ if file.paths.is_empty() {
+ return Err(Error::Custom("Invalid or missing paths in file metadata".to_string()));
+ }
+ if file.blake3 == [0u8;32].into() {
+ return Err(Error::Custom("Missing blake3 hash in file metadata".to_string()));
+ }
+ // TODO: Validate pieces
+ Ok(file)
+}
+
+
+fn index_pack(db: &Pool, pack: &Hash, zstd: Vec<u8>) -> Result<(),Error> {
+ let t_start = Instant::now();
+ let zstd_len = zstd.len();
+
+ // Don't decode the zstd buffer in a single call, we have to make sure it doesn't exceed MAX_PACKSIZE.
+ let mut buf = Vec::new();
+ let mut dec = zstd::Decoder::with_buffer(&zstd[..])?;
+ while buf.len() < MAX_PACKSIZE && buf.fill(&mut dec, (16*1024).min(3+MAX_PACKSIZE-buf.len()))? > 0 {}
+ drop(zstd);
+
+ if buf.len() > MAX_PACKSIZE {
+ return Err(Error::Custom("pack file too large".to_string()));
+ }
+ if !buf.starts_with(b"d6:!chifs12:share-pack-1") {
+ return Err(Error::Custom("unrecognized pack format".to_string()));
+ }
+
+ let mut dec = Decoder::new(&buf[..]);
+ let mut dict = dec.next_object()?.unwrap().try_into_dictionary()?;
+ let mut files = loop {
+ match dict.next_pair()? {
+ None => return Err(Error::Custom("pack does not contain a \"files\" listing".to_string())),
+ Some((k,v)) if k == b"files" => break v.try_into_list()?,
+ _ => (),
+ }
+ };
+
+ let mut db = db.get();
+ let mut txn = db.transaction().unwrap();
+ let mut file_num = 0i32;
+ let mut file_size = 0i64;
+ // XXX: q_file can fail if the pack has been deleted in the mean time, not panicing would be nice.
+ let q_file = txn.prepare("INSERT INTO files (pack, paths, blake3, size, mime, meta) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT (pack,blake3) DO NOTHING").unwrap();
+ // TODO
+ //let q_hash = txn.prepare("INSERT INTO blake3_pieces (blake3, pieces) VALUES ($1,$2) ON CONFLICT (blake3) DO NOTHING").unwrap();
+
+ while let Some(file) = files.next_object()? {
+ // XXX: Validation error on a single file causes the entire pack to be marked as failed,
+ // would be nice to skip the file instead (but then we need better error reporting)
+ let f = parse_file(file)?;
+ txn.execute(&q_file, &[pack, &f.paths, &f.blake3, &f.size, &f.mime, &f.meta]).unwrap();
+ file_num += 1;
+ file_size = f.size.saturating_add(file_size);
+ }
+
+ drop(files);
+ dict.consume_all()?;
+
+ txn.execute("UPDATE packs SET needindex = NULL, files = $1, size = $2 WHERE pack = $3", &[&file_num, &file_size, pack]).unwrap();
+ txn.commit().unwrap();
+ trace!("Pack {} indexed in {:.1}ms, compressed {} bytes, uncompressed {} bytes, {} files, {} bytes",
+ pack, (Instant::now()-t_start).as_secs_f32()*1000., zstd_len, buf.len(), file_num, file_size);
+ Ok(())
+}
+
+
+fn fetch_packs(conf: &Config, db: &Pool, addr: &str, packs: &mut Vec<Hash>) -> Result<(),Error> {
+ let conn = Arc::new(Client::socks_connect(&conf.tor_address, addr)?);
+
+ let (tx,rx) = std::sync::mpsc::sync_channel(PIPELINE);
+ let conn_wr = conn.clone();
+ let thread = std::thread::spawn(move || -> Result<(),Error> {
+ for pack in rx.iter() {
+ conn_wr.get(&format!("/.chifs-share/blake3/{}", pack))?;
+ }
+ Ok(())
+ });
+
+ let mut send_num = packs.len();
+ let mut send_queue = |p: &Vec<Hash>| -> Result<(),std::sync::mpsc::SendError<Hash>> {
+ while send_num > 0 && p.len() - send_num < PIPELINE {
+ trace!("Queueing GET {}/.chifs-share/blake3/{} ({}/{})", addr, p[send_num-1], send_num, p.len());
+ send_num -= 1;
+ tx.send(p[send_num])?
+ }
+ Ok(())
+ };
+
+ while let Some(pack) = packs.last().cloned() {
+ if send_queue(packs).is_err() {
+ break;
+ }
+
+ let res = conn.recv(MAX_PACKSIZE as u64)?;
+ if res.code != 200 {
+ return Err(Error::BadStatus(res.code));
+ }
+
+ let mut buf = Vec::new();
+ (&*conn).read_to_end(&mut buf)?;
+ let hash = Hash::hash_buf(&buf);
+ if hash != pack {
+ return Err(Error::Custom(format!("BLAKE3 mismatch, got {}", hash)));
+ }
+ // Error while indexing the pack does not count as an error for this particular share,
+ // since we've already validated that the pack hash matches its contents.
+ //
+ // XXX: Each pack is added to the DB in its own transaction, which might be a little
+ // inefficient for very small (e.g. single-file) packs. Buffering up multiple packs and
+ // batch-adding them to the DB could improve performance a bit.
+ if let Err(err) = index_pack(db, &pack, buf) {
+ info!("Failed to index pack {}: {}", pack, err);
+ // TODO: log error, set 'needindex' to NULL and pretend this is an empty pack
+ }
+ packs.pop();
+ }
+
+ drop(tx);
+ thread.join().unwrap()
+}
+
+
+pub fn run(conf: &Config, db: Pool, blake3: Hash) {
+ // TODO: It's possible that another pack_index being fetched concurrently shares packs with the
+ // one we're working on. Would be nice to avoid fetching the same packs twice.
+ let (shares, mut packs) = {
+ let mut db = db.get();
+
+ let shares : Vec<(i32,String)> = db
+ .query("SELECT id, addr FROM shares WHERE blake3_packs = $1 ORDER BY RANDOM() LIMIT $2::int", &[&blake3, &MAX_SHARES]).unwrap()
+ .into_iter().map(|r| (r.get(0), r.get(1))).collect();
+
+ let packs : Vec<Hash> = db
+ .query("SELECT p.pack FROM pack_indices pi JOIN packs p ON p.pack = pi.pack
+ WHERE pi.blake3 = $1 AND p.needindex IS NOT NULL AND p.needindex < NOW() LIMIT $2::int", &[&blake3, &MAX_PACKS]).unwrap()
+ .into_iter().map(|r| r.get(0)).collect();
+
+ (shares,packs)
+ };
+
+ let mut share_idx = 0;
+ while !packs.is_empty() {
+ let share = &shares[share_idx];
+ if let Err(e) = fetch_packs(conf, &db, &share.1, &mut packs) {
+ info!("Packs failed for share {} ({}) pack {:?}: {}", share.0, share.1, packs.last(), e);
+ }
+ // TODO: Set packs.needindex to a future date and quit if we've tried all shares without any progress.
+ share_idx = (share_idx + 1) % shares.len();
+ }
+}