summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2013-06-28 14:58:45 +0200
committerYorhel <git@yorhel.nl>2013-06-28 14:58:45 +0200
commitd66abc047b83ff3b5d7fca70e256fe1666bdb9da (patch)
tree7c5bb84701e8fe1714dba6b08360b2cb9ca95da8
parent73a6aae5d88360d2fb078f36190ff8bbc151bb0e (diff)
share,api: Started on hash queue + index + multithreaded hasher
Very much unfinished, early stages, etc. Doesn't do anything useful yet, other than queueing up files to hash and calculating some fun scheduling information.
-rw-r--r--Makefile.am2
-rw-r--r--doc/api.pod24
-rw-r--r--src/share/conf.c4
-rw-r--r--src/share/fl.h2
-rw-r--r--src/share/hash.c220
-rw-r--r--src/share/hash.h57
-rw-r--r--src/share/local.h1
-rw-r--r--src/share/scan.c30
-rw-r--r--src/share/share.h3
9 files changed, 327 insertions, 16 deletions
diff --git a/Makefile.am b/Makefile.am
index 92c32c5..2bec6be 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -120,6 +120,7 @@ globster_SOURCES=\
src/share/conf.c\
src/share/db.c\
src/share/fl.c\
+ src/share/hash.c\
src/share/scan.c\
src/share/share.c\
src/util/adc.c\
@@ -154,6 +155,7 @@ noinst_HEADERS=\
src/share/conf.h\
src/share/db.h\
src/share/fl.h\
+ src/share/hash.h\
src/share/scan.h\
src/share/share.h\
src/util/adc.h\
diff --git a/doc/api.pod b/doc/api.pod
index 2b3f233..0275333 100644
--- a/doc/api.pod
+++ b/doc/api.pod
@@ -685,6 +685,30 @@ Not implemented yet.
I<TODO:> Cookie(File|Hash)?
+=head2 net.blicky.Globster.Hasher (hash)
+
+I<TODO:> Add to global object structure.
+
+ property u QueueFiles r
+ property t QueueSize r
+
+ property u TotalFiles
+ property t TotalSize
+
+I<TODO:> Methods/property to pause/resume hashing
+
+I<TODO:> Property to control the maximum hash rate, and maybe the number of threads.
+
+I<TODO:> Property to control whether posix_fadvise() should be used.
+
+I<TODO:> Method to obtain the hash data of a certain file by filesystem path.
+
+I<TODO:> Method to insert the hash data of a certain file into the database.
+
+I<TODO:> Interface to obtain the files that have recently been hashed, are
+currently being hashed, or are still queued for hashing?
+
+
=head2 net.blicky.Globster.FileBrowser (browse)
C</net/blicky/Globster/Share/$id> and re-usable for remote file lists.
diff --git a/src/share/conf.c b/src/share/conf.c
index 5b9d476..b02c65d 100644
--- a/src/share/conf.c
+++ b/src/share/conf.c
@@ -488,8 +488,10 @@ static void share_manager_load() {
* removed from the database before the new Precedence values were saved.
*/
size_t i;
- for(i=0; i<share_conf_shares.n; i++)
+ for(i=0; i<share_conf_shares.n; i++) {
share_fixprecedence(share_conf_shares.a[i], false);
+ share_share_refresh(share_conf_shares.a[i], "/");
+ }
}
diff --git a/src/share/fl.h b/src/share/fl.h
index 4c578cf..f9a28ea 100644
--- a/src/share/fl.h
+++ b/src/share/fl.h
@@ -72,7 +72,7 @@ static inline char *share_fl_name(share_fl_t *f) {
static inline bool share_fl_hastth(share_fl_t *f) {
assert(!share_fl_isdir(f));
- return memcmp(f->tth, (uint64_t[]){0,0,0}, 24) == 0;
+ return memcmp(f->tth, (uint64_t[]){0,0,0}, 24) != 0;
}
diff --git a/src/share/hash.c b/src/share/hash.c
new file mode 100644
index 0000000..6e0cf57
--- /dev/null
+++ b/src/share/hash.c
@@ -0,0 +1,220 @@
+/* Copyright (c) 2012-2013 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#include <global.h>
+#include <share/local.h>
+#include <tiger.h>
+
+/* Global TODO:
+ * - Actual hashing functions
+ * - Use a separate evtp pool (to better control parallelism)
+ * - Update the QueueFiles, QueueSize, TotalFiles and TotalSize properties
+ * - Communication with the database
+ * - Hash index
+ * - The stuff mentioned in api.pod
+ */
+
+/* Maximum number of levels to keep, including root (level 0). The ADC docs
+ * specify that this should be at least 7, and everyone else uses 10. Since
+ * keeping 10 levels of hash data does eat up more space than I'd be willing to
+ * spend on it, let's use 8. */
+#define HASH_KEEP_LEVEL 8
+
+/* Minimum leaf chunk size, 64k according to the ADC spec.
+ * Must be a power of two and larger than 1024. */
+#define HASH_MIN_CHUNK 65536
+
+
+/* Minimum number of jobs in the thread pool queue. This variable should be at
+ * least as large as the preferred number of threads to use. */
+#define HASH_JOBS 16
+
+/* Maximum number of jobs per file */
+#define HASH_JOBS_PER_FILE 8
+
+/* Minimum size for a single job.
+ * Must be a power of two and larger than 2*HASH_MIN_CHUNK. */
+#define HASH_MIN_JOB_SIZE (16*1024*1024)
+
+
+typedef struct {
+ /* w.data = the share_hash_fn_t struct. */
+ evtp_work_t w;
+ int chunk_start;
+ int chunk_num;
+} share_hash_work_t;
+
+
+/* Possible states:
+ * chunks == NULL : Looking up filename in database
+ * chunks != NULL && fd < 0 : Opening file
+ * chunks != NULL && fd >= 0: Hashing
+ * While hashing, jobnum functions as a reference count on how many jobs still
+ * need to finish. The object should not be freed unless all jobs are finished.
+ */
+typedef struct {
+ share_fl_t *fl; /* NULL when the fl item has been removed from the hash queue while hashing. */
+ t_res *chunks;
+ uint64_t filesize;
+ uint64_t chunksize;
+ int chunknum;
+ int jobnum;
+ int fd;
+ share_hash_work_t work[HASH_JOBS_PER_FILE];
+ char fn[];
+} share_hash_fn_t;
+
+
+typedef struct {
+ dbo_t dbo;
+ dbo_hash_properties_t props;
+
+ /* List of files that are currently being hashed */
+ share_hash_fn_t *fns[HASH_JOBS];
+
+ int jobs; /* Number of active jobs */
+
+ /* List of shares for which there is something to hash */
+ share_t *head, *tail;
+} obj_t;
+
+static obj_t obj[1];
+
+
+static share_hash_fn_t *share_hash_fn_create(share_fl_t *fl, const char *path) {
+ share_hash_fn_t *fn = malloc(offsetof(share_hash_fn_t, fn) + strlen(path)+1);
+ fn->fl = fl;
+ fn->chunks = NULL;
+ fn->fd = -1;
+ strcpy(fn->fn, path);
+ fn->filesize = share_fl_size(fl);
+
+ fn->chunksize = HASH_MIN_CHUNK;
+ while((fn->chunksize * (((uint64_t)1)<<(HASH_KEEP_LEVEL-1))) < fn->filesize)
+ fn->chunksize <<= 1;
+
+ fn->chunknum = (fn->filesize + fn->chunksize - 1)/fn->chunksize;
+ if(!fn->chunknum)
+ fn->chunknum = 1;
+
+ /* XXX: This feels like a roundabout way of calculating the distribution of
+ * chunks among jobs, could possibly be simplified. */
+ int chunkperjob = HASH_MIN_JOB_SIZE / fn->chunksize;
+ if(chunkperjob < 2)
+ chunkperjob = 2;
+
+ if((fn->chunknum / chunkperjob) > HASH_JOBS_PER_FILE)
+ chunkperjob = ((fn->chunknum-1) / HASH_JOBS_PER_FILE) + 1;
+
+ fn->jobnum = 0;
+ int chunksleft = fn->chunknum;
+ while(chunksleft > 0) {
+ fn->work[fn->jobnum].w.data = fn;
+ fn->work[fn->jobnum].chunk_start = fn->chunknum - chunksleft;
+ fn->work[fn->jobnum].chunk_num = chunkperjob > chunksleft ? chunksleft : chunkperjob;
+ chunksleft -= fn->work[fn->jobnum].chunk_num;
+ fn->jobnum++;
+ assert(fn->jobnum <= HASH_JOBS_PER_FILE);
+ }
+
+ ytrace("Queueing hash for '%s', fs=%"PRIu64", cs=%"PRIu64", cn=%d, cpj=%d, jn=%d",
+ fn->fn, fn->filesize, fn->chunksize, fn->chunknum, chunkperjob, fn->jobnum);
+
+ /* The jobs aren't submitted to the thread pool yet (we first have to
+ * open() the file), but we already update obj->jobs here to indicate how
+ * many jobs have been scheduled. */
+ obj->jobs += fn->jobnum;
+
+ /* TODO: Queue database check */
+ return fn;
+}
+
+
+static void share_hash_queue_process() {
+ if(obj->jobs >= HASH_JOBS || !obj->head)
+ return;
+
+ size_t slot;
+ for(slot=0; slot<HASH_JOBS; slot++)
+ if(!obj->fns[slot])
+ break;
+ assert(slot < HASH_JOBS);
+
+ /* Find and dequeue next item. The share_t object is re-inserted at the end
+ * of the queue, causing files from different shares to be hashed in a
+ * round-robin fashion. This guarantees that no share will be starved of
+ * hashing progress. */
+ share_t *s = obj->head;
+ plist_remove(*obj, h, s);
+ share_fl_t *fl = s->h.head;
+ list_remove(s->h, fl);
+ if(s->h.head)
+ plist_insert(*obj, h, s, NULL);
+
+ /* TODO: Check whether the same file (as idenfitied by the path) is already
+ * being hashed. This happens when the same path is present multiple times
+ * in the same share or in multiple shares. */
+
+ kstring_t path = {};
+ share_share_fl_path(s, fl, &path);
+ obj->fns[slot] = share_hash_fn_create(fl, path.s);
+ free(path.s);
+
+ /* Process the next item in the queue while there's still some stuff to do */
+ share_hash_queue_process();
+}
+
+
+void share_hash_add(share_t *s, share_fl_t *fl) {
+ if(share_fl_hastth(fl)) {
+ /* TODO: Add to hash index */
+ return;
+ }
+
+ if(!s->h.head)
+ plist_insert(*obj, h, s, NULL);
+ list_insert(s->h, fl, NULL);
+ share_hash_queue_process();
+}
+
+
+void share_hash_remove(share_t *s, share_fl_t *fl) {
+ if(!share_fl_hastth(fl)) {
+ /* TODO: Remove from hash index */
+ return;
+ }
+
+ size_t i;
+ for(i=0; i<HASH_JOBS; i++)
+ if(obj->fns[i] && obj->fns[i]->fl == fl) {
+ /* TODO: Cancel hashing and free fn object */
+ obj->fns[i] = NULL;
+ return;
+ }
+
+ list_remove(s->h, fl);
+ if(!s->h.head)
+ plist_remove(*obj, h, s);
+}
+
+
+/* vim: set noet sw=4 ts=4: */
diff --git a/src/share/hash.h b/src/share/hash.h
new file mode 100644
index 0000000..ffe9339
--- /dev/null
+++ b/src/share/hash.h
@@ -0,0 +1,57 @@
+/* Copyright (c) 2012-2013 Yoran Heling
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+ CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#ifndef SHARE_HASH_H
+#define SHARE_HASH_H
+
+__KHASH_TYPE(hashfiles, share_fl_t *, char);
+
+typedef struct {
+ /* Set of share_fl_t items which have a non-zero hash, indexed by their tth
+ * field. */
+ kh_hashfiles_t *hashfiles;
+
+ /* Total size of all (hashed) shared files. Duplicates are only counted
+ * once. */
+ uint64_t size;
+
+ /* Queue of files to be hashed. */
+ share_fl_t *head, *tail;
+
+ /* List of share_t objects for which there is something to hash */
+ share_t *next, *prev;
+} share_hash_t;
+
+
+/* Should be called whenever a new fl item has been added to the file list. If
+ * the item already has a hash, it is added to the in-memory hash index,
+ * otherwise it is added to the hash queue. When a file has been hashed, it is
+ * automatically added to the hash index and the (on-disk) database, and
+ * share_share_updatehash() is called. */
+void share_hash_add(share_t *s, share_fl_t *fl);
+
+/* Should be called whenever a file has been removed from the file list. */
+void share_hash_remove(share_t *s, share_fl_t *fl);
+
+#endif
+
+/* vim: set noet sw=4 ts=4: */
diff --git a/src/share/local.h b/src/share/local.h
index 840333e..c59525f 100644
--- a/src/share/local.h
+++ b/src/share/local.h
@@ -35,6 +35,7 @@ typedef vec_t(share_fl_t *) share_flv_t;
#include <share/conf.h>
#include <share/db.h>
#include <share/fl.h>
+#include <share/hash.h>
#include <share/scan.h>
#include <share/share.h>
diff --git a/src/share/scan.c b/src/share/scan.c
index b480ddb..9eab779 100644
--- a/src/share/scan.c
+++ b/src/share/scan.c
@@ -247,28 +247,30 @@ static void share_scan_work(evtp_work_t *_w) {
}
-static void share_scan_merge_new(share_fl_t *dir, share_fl_t *fl) {
+
+static void share_scan_merge_new(share_t *s, share_fl_t *dir, share_fl_t *fl) {
fl->parent = dir;
+ if(!share_fl_isdir(fl))
+ share_hash_add(s, fl);
/* TODO: Add to sharefiles table */
- /* TODO: Add to hash queue */
}
-static void share_scan_merge_del(share_fl_t *fl) {
- /* TODO: Remove from hash index or queue */
- /* TODO: Remove from sharefiles table */
-
+static void share_scan_merge_del(share_t *s, share_fl_t *fl) {
if(share_fl_isdir(fl)) {
size_t i;
for(i=0; i<fl->sub.n; i++)
- share_scan_merge_del(fl->sub.a[i]);
+ share_scan_merge_del(s, fl->sub.a[i]);
free(fl->sub.a);
+ } else {
+ share_hash_remove(s, fl);
+ /* TODO: Remove from sharefiles table */
}
free(fl);
}
-static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) {
+static void share_scan_merge(share_t *s, share_fl_t *dst, share_flv_t *lst) {
size_t li = 0, di = 0;
while(li < lst->n || di < dst->sub.n) {
@@ -279,7 +281,7 @@ static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) {
/* lf < df, meaning an item in lst is not in dst */
if(cmp < 0) {
vec_insert_order(dst->sub, di, lf);
- share_scan_merge_new(dst, lf);
+ share_scan_merge_new(s, dst, lf);
di++;
li++;
continue;
@@ -287,7 +289,7 @@ static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) {
/* lf > df, meaning an item in dst is not in lst */
if(cmp > 0) {
- share_scan_merge_del(df);
+ share_scan_merge_del(s, df);
vec_remove_order(dst->sub, di);
continue;
}
@@ -296,8 +298,8 @@ static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) {
if(share_fl_isdir(lf) != share_fl_isdir(df)
|| (!share_fl_isdir(lf) && (share_fl_size(lf) != share_fl_size(df) || lf->lastmod != df->lastmod || lf->pathid != lf->pathid))
|| casestr_cmp(share_fl_name(lf), share_fl_name(df)) != 0) {
- share_scan_merge_del(df);
- share_scan_merge_new(dst, lf);
+ share_scan_merge_del(s, df);
+ share_scan_merge_new(s, dst, lf);
dst->sub.a[di] = lf;
}
di++;
@@ -337,7 +339,7 @@ static void share_scan_work_done(evtp_work_t *_w) {
}
free(sc->work);
- share_scan_merge(sc->dir, &dest);
+ share_scan_merge(sc->s, sc->dir, &dest);
free(dest.a);
share_scan_next(sc);
}
@@ -404,7 +406,7 @@ static void share_scan_dir(share_scan_t *sc) {
free(vpath.s);
if(!work.n) {
- share_scan_merge(sc->dir, &dest);
+ share_scan_merge(sc->s, sc->dir, &dest);
share_scan_next(sc);
} else {
vec_insert_order(work, work.n, NULL);
diff --git a/src/share/share.h b/src/share/share.h
index 133c336..3e857c6 100644
--- a/src/share/share.h
+++ b/src/share/share.h
@@ -35,6 +35,9 @@ struct share_t {
share_fl_t *root;
share_scan_t *sc;
+
+ /* Managed by share/hash.c */
+ share_hash_t h;
};