summaryrefslogtreecommitdiff
path: root/src/dispatch.rs
blob: 4378f2e77112fcc35ffa5aa2a08c471f5ff21de0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::fmt::Display;
use std::hash::Hash as StdHash;
use std::sync::{Arc,Mutex};
use std::collections::HashSet;
use std::time::Duration;
//use chrono::prelude::*;
use fallible_iterator::FallibleIterator;
use chifs_common::Hash;

use crate::db::Pool;
use crate::config::Config;
use crate::action_meta;
//use crate::action_fetch;
//use crate::action_index;


// Not sure why I generalized this, T is prolly always going to be an i32.
struct TaskGuard<T: Display + StdHash + Eq + Copy + Clone> {
    lst: Arc<Mutex<HashSet<T>>>,
    id: T
}

impl<T: Display + StdHash + Eq + Copy + Clone> TaskGuard<T> {
    fn done(self) {} // Implicitely calls drop
}

impl<T: Display + StdHash + Eq + Copy + Clone> Drop for TaskGuard<T> {
    fn drop(&mut self) {
        if std::thread::panicking() {
            error!("Thread for task {} has panicked", self.id);
            std::thread::sleep(Duration::from_secs(5)); // Just to make sure we don't consume all resources if we get in a panic loop
        }
        self.lst.lock().unwrap().remove(&self.id);
    }
}


fn handle_queue<S,T,F,G>(action: &str, num: i32, conn: &mut postgres::Client, queue: &postgres::Statement, tasks: &Arc<Mutex<HashSet<T>>>, f: F, cmd: G) -> Result<(),postgres::Error>
    where T: Display + StdHash + Eq + Copy + Clone + Send + 'static,
          F: Fn(&postgres::row::Row) -> (T, S),
          G: FnOnce(S) + Send + Copy + 'static,
          S: Send + 'static
{
    // We have to lock the tasks list even during the database query, otherwise the following
    // sequence can happen:
    //
    // T: *working*
    // D: Reading queue from database
    // T: Done, updating database
    // T: Removed from task list
    // D: Read old value from database, task is not in the list anymore, spawn the same task again
    let mut tasks_lock = tasks.lock().unwrap();

    // Don't bother getting the queue if we're too busy
    if tasks_lock.len() >= num as usize {
        return Ok(());
    }

    let res = conn.query(queue, &[&num])?;
    trace!("{} queue query returned {} rows", action, res.len());

    for task in &res {
        if tasks_lock.len() >= num as usize {
            break;
        }
        let (id, data) = f(&task);
        if !tasks_lock.insert(id) {
            continue;
        }
        trace!("Dispatching {} task id {}", action, id);
        let guard = TaskGuard { lst: tasks.clone(), id };
        std::thread::spawn(move || {
            cmd(data);
            guard.done();
        });
    }
    Ok(())
}


pub fn run(conf: Arc<Config>, db: Pool) {
    let meta_tasks  : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
    //let fetch_tasks : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
    //let index_tasks : Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));

    let mut conn = db.get();
    conn.batch_execute("LISTEN share_action;").unwrap();

    // TODO: These queries probably need (partial) indices on the next_* fields to avoid a
    // sequential table scan, but I don't have enough test data at the moment to convince Postgres
    // to actually use such an index.
    //
    // The additional clause in fetch_q is to ensure that we don't fetch data while the 'Index'
    // action is lagging behind. It also prevents losing updates to a race condition: If we fetch
    // new data while old data is being indexed, it's possible that 'next_index' is being reset to
    // NULL even when we do have new data to index. Yay, parallelism.
    let meta_q  = conn.prepare("SELECT id, addr, blake3_packs FROM shares WHERE next_meta  <= NOW()                        ORDER BY next_meta  LIMIT $1::int").unwrap();
    //let fetch_q = conn.prepare("SELECT id, addr             FROM shares WHERE next_fetch <= NOW() AND next_index IS NULL ORDER BY next_fetch LIMIT $1::int").unwrap();
    //let index_q = conn.prepare("SELECT id, addr, next_index FROM shares WHERE next_index <= NOW()                        ORDER BY next_index LIMIT $1::int").unwrap();

    loop {
        // Consume all outstanding notifications before we read the queues
        conn.notifications().iter().count().unwrap();

        handle_queue("Meta",
            conf.meta_processes, &mut conn, &meta_q, &meta_tasks,
            |row| (row.get(0), (conf.clone(), db.clone(), row.get(0), row.get(1), row.get(2))),
            move |data: (Arc<Config>, Pool, i32, String, Option<Hash>)| {
                action_meta::run(&data.0, data.1, data.2, &data.3[..], data.4);
            }
        ).unwrap();

        /*
        handle_queue("Fetch",
            conf.fetch_processes, &fetch_q, &fetch_tasks,
            |row| (row.get(0), (conf.clone(), db.clone(), row.get(0), row.get(1))),
            move |data: (Arc<Config>, Pool, i32, String)| {
                action_fetch::run(&data.0, data.1, data.2, &data.3[..]);
            }
        ).unwrap();

        handle_queue("Index",
            conf.index_processes, &index_q, &index_tasks,
            |row| (row.get(0), (conf.clone(), db.clone(), row.get(0), row.get(1), row.get(2))),
            move |data: (Arc<Config>, Pool, i32, String, DateTime<Utc>)| {
                let path = action_fetch::index_path(&data.0, &data.3[..], data.4);
                action_index::run(data.1, data.2, &path[..]);
            }
        ).unwrap();
        */

        trace!("Dispatcher going to sleep");
        let r = conn.notifications().timeout_iter(Duration::from_secs(60)).next().unwrap();
        trace!("Dispatcher woken up by {}", if r.is_none() { "timeout" } else { "notification"});

        // It's possible that we wake up from a NOTIFY that was triggered from a task thread before
        // the thread has finished removing itself from the tasks list; Let's give them a few msecs
        // to settle so that we have enough slots for our next queue run.
        std::thread::sleep(Duration::from_millis(50));
    }
}