summaryrefslogtreecommitdiff
path: root/src/dispatch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatch.rs')
-rw-r--r--src/dispatch.rs17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/dispatch.rs b/src/dispatch.rs
index ceaa7c2..4378f2e 100644
--- a/src/dispatch.rs
+++ b/src/dispatch.rs
@@ -1,10 +1,11 @@
use std::fmt::Display;
-use std::hash::Hash;
+use std::hash::Hash as StdHash;
use std::sync::{Arc,Mutex};
use std::collections::HashSet;
use std::time::Duration;
//use chrono::prelude::*;
use fallible_iterator::FallibleIterator;
+use chifs_common::Hash;
use crate::db::Pool;
use crate::config::Config;
@@ -14,16 +15,16 @@ use crate::action_meta;
// Not sure why I generalized this, T is prolly always going to be an i32.
-struct TaskGuard<T: Display + Hash + Eq + Copy + Clone> {
+struct TaskGuard<T: Display + StdHash + Eq + Copy + Clone> {
lst: Arc<Mutex<HashSet<T>>>,
id: T
}
-impl<T: Display + Hash + Eq + Copy + Clone> TaskGuard<T> {
+impl<T: Display + StdHash + Eq + Copy + Clone> TaskGuard<T> {
fn done(self) {} // Implicitely calls drop
}
-impl<T: Display + Hash + Eq + Copy + Clone> Drop for TaskGuard<T> {
+impl<T: Display + StdHash + Eq + Copy + Clone> Drop for TaskGuard<T> {
fn drop(&mut self) {
if std::thread::panicking() {
error!("Thread for task {} has panicked", self.id);
@@ -35,7 +36,7 @@ impl<T: Display + Hash + Eq + Copy + Clone> Drop for TaskGuard<T> {
fn handle_queue<S,T,F,G>(action: &str, num: i32, conn: &mut postgres::Client, queue: &postgres::Statement, tasks: &Arc<Mutex<HashSet<T>>>, f: F, cmd: G) -> Result<(),postgres::Error>
- where T: Display + Hash + Eq + Copy + Clone + Send + 'static,
+ where T: Display + StdHash + Eq + Copy + Clone + Send + 'static,
F: Fn(&postgres::row::Row) -> (T, S),
G: FnOnce(S) + Send + Copy + 'static,
S: Send + 'static
@@ -93,7 +94,7 @@ pub fn run(conf: Arc<Config>, db: Pool) {
// action is lagging behind. It also prevents losing updates to a race condition: If we fetch
// new data while old data is being indexed, it's possible that 'next_index' is being reset to
// NULL even when we do have new data to index. Yay, parallelism.
- let meta_q = conn.prepare("SELECT id, addr, blake3_packs::bytea FROM shares WHERE next_meta <= NOW() ORDER BY next_meta LIMIT $1::int").unwrap();
+ let meta_q = conn.prepare("SELECT id, addr, blake3_packs FROM shares WHERE next_meta <= NOW() ORDER BY next_meta LIMIT $1::int").unwrap();
//let fetch_q = conn.prepare("SELECT id, addr FROM shares WHERE next_fetch <= NOW() AND next_index IS NULL ORDER BY next_fetch LIMIT $1::int").unwrap();
//let index_q = conn.prepare("SELECT id, addr, next_index FROM shares WHERE next_index <= NOW() ORDER BY next_index LIMIT $1::int").unwrap();
@@ -104,8 +105,8 @@ pub fn run(conf: Arc<Config>, db: Pool) {
handle_queue("Meta",
conf.meta_processes, &mut conn, &meta_q, &meta_tasks,
|row| (row.get(0), (conf.clone(), db.clone(), row.get(0), row.get(1), row.get(2))),
- move |data: (Arc<Config>, Pool, i32, String, Option<Vec<u8>>)| {
- action_meta::run(&data.0, data.1, data.2, &data.3[..], data.4.as_deref());
+ move |data: (Arc<Config>, Pool, i32, String, Option<Hash>)| {
+ action_meta::run(&data.0, data.1, data.2, &data.3[..], data.4);
}
).unwrap();