diff options
author | Yorhel <git@yorhel.nl> | 2013-06-28 14:58:45 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2013-06-28 14:58:45 +0200 |
commit | d66abc047b83ff3b5d7fca70e256fe1666bdb9da (patch) | |
tree | 7c5bb84701e8fe1714dba6b08360b2cb9ca95da8 | |
parent | 73a6aae5d88360d2fb078f36190ff8bbc151bb0e (diff) |
share,api: Started on hash queue + index + multithreaded hasher
Very much unfinished, early stages, etc. Doesn't do anything useful yet,
other than queueing up files to hash and calculating some fun scheduling
information.
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | doc/api.pod | 24 | ||||
-rw-r--r-- | src/share/conf.c | 4 | ||||
-rw-r--r-- | src/share/fl.h | 2 | ||||
-rw-r--r-- | src/share/hash.c | 220 | ||||
-rw-r--r-- | src/share/hash.h | 57 | ||||
-rw-r--r-- | src/share/local.h | 1 | ||||
-rw-r--r-- | src/share/scan.c | 30 | ||||
-rw-r--r-- | src/share/share.h | 3 |
9 files changed, 327 insertions, 16 deletions
diff --git a/Makefile.am b/Makefile.am index 92c32c5..2bec6be 100644 --- a/Makefile.am +++ b/Makefile.am @@ -120,6 +120,7 @@ globster_SOURCES=\ src/share/conf.c\ src/share/db.c\ src/share/fl.c\ + src/share/hash.c\ src/share/scan.c\ src/share/share.c\ src/util/adc.c\ @@ -154,6 +155,7 @@ noinst_HEADERS=\ src/share/conf.h\ src/share/db.h\ src/share/fl.h\ + src/share/hash.h\ src/share/scan.h\ src/share/share.h\ src/util/adc.h\ diff --git a/doc/api.pod b/doc/api.pod index 2b3f233..0275333 100644 --- a/doc/api.pod +++ b/doc/api.pod @@ -685,6 +685,30 @@ Not implemented yet. I<TODO:> Cookie(File|Hash)? +=head2 net.blicky.Globster.Hasher (hash) + +I<TODO:> Add to global object structure. + + property u QueueFiles r + property t QueueSize r + + property u TotalFiles + property t TotalSize + +I<TODO:> Methods/property to pause/resume hashing + +I<TODO:> Property to control the maximum hash rate, and maybe the number of threads. + +I<TODO:> Property to control whether posix_fadvise() should be used. + +I<TODO:> Method to obtain the hash data of a certain file by filesystem path. + +I<TODO:> Method to insert the hash data of a certain file into the database. + +I<TODO:> Interface to obtain the files that have recently been hashed, are +currently being hashed, or are still queued for hashing? + + =head2 net.blicky.Globster.FileBrowser (browse) C</net/blicky/Globster/Share/$id> and re-usable for remote file lists. diff --git a/src/share/conf.c b/src/share/conf.c index 5b9d476..b02c65d 100644 --- a/src/share/conf.c +++ b/src/share/conf.c @@ -488,8 +488,10 @@ static void share_manager_load() { * removed from the database before the new Precedence values were saved. */ size_t i; - for(i=0; i<share_conf_shares.n; i++) + for(i=0; i<share_conf_shares.n; i++) { share_fixprecedence(share_conf_shares.a[i], false); + share_share_refresh(share_conf_shares.a[i], "/"); + } } diff --git a/src/share/fl.h b/src/share/fl.h index 4c578cf..f9a28ea 100644 --- a/src/share/fl.h +++ b/src/share/fl.h @@ -72,7 +72,7 @@ static inline char *share_fl_name(share_fl_t *f) { static inline bool share_fl_hastth(share_fl_t *f) { assert(!share_fl_isdir(f)); - return memcmp(f->tth, (uint64_t[]){0,0,0}, 24) == 0; + return memcmp(f->tth, (uint64_t[]){0,0,0}, 24) != 0; } diff --git a/src/share/hash.c b/src/share/hash.c new file mode 100644 index 0000000..6e0cf57 --- /dev/null +++ b/src/share/hash.c @@ -0,0 +1,220 @@ +/* Copyright (c) 2012-2013 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include <global.h> +#include <share/local.h> +#include <tiger.h> + +/* Global TODO: + * - Actual hashing functions + * - Use a separate evtp pool (to better control parallelism) + * - Update the QueueFiles, QueueSize, TotalFiles and TotalSize properties + * - Communication with the database + * - Hash index + * - The stuff mentioned in api.pod + */ + +/* Maximum number of levels to keep, including root (level 0). The ADC docs + * specify that this should be at least 7, and everyone else uses 10. Since + * keeping 10 levels of hash data does eat up more space than I'd be willing to + * spend on it, let's use 8. */ +#define HASH_KEEP_LEVEL 8 + +/* Minimum leaf chunk size, 64k according to the ADC spec. + * Must be a power of two and larger than 1024. */ +#define HASH_MIN_CHUNK 65536 + + +/* Minimum number of jobs in the thread pool queue. This variable should be at + * least as large as the preferred number of threads to use. */ +#define HASH_JOBS 16 + +/* Maximum number of jobs per file */ +#define HASH_JOBS_PER_FILE 8 + +/* Minimum size for a single job. + * Must be a power of two and larger than 2*HASH_MIN_CHUNK. */ +#define HASH_MIN_JOB_SIZE (16*1024*1024) + + +typedef struct { + /* w.data = the share_hash_fn_t struct. */ + evtp_work_t w; + int chunk_start; + int chunk_num; +} share_hash_work_t; + + +/* Possible states: + * chunks == NULL : Looking up filename in database + * chunks != NULL && fd < 0 : Opening file + * chunks != NULL && fd >= 0: Hashing + * While hashing, jobnum functions as a reference count on how many jobs still + * need to finish. The object should not be freed unless all jobs are finished. + */ +typedef struct { + share_fl_t *fl; /* NULL when the fl item has been removed from the hash queue while hashing. */ + t_res *chunks; + uint64_t filesize; + uint64_t chunksize; + int chunknum; + int jobnum; + int fd; + share_hash_work_t work[HASH_JOBS_PER_FILE]; + char fn[]; +} share_hash_fn_t; + + +typedef struct { + dbo_t dbo; + dbo_hash_properties_t props; + + /* List of files that are currently being hashed */ + share_hash_fn_t *fns[HASH_JOBS]; + + int jobs; /* Number of active jobs */ + + /* List of shares for which there is something to hash */ + share_t *head, *tail; +} obj_t; + +static obj_t obj[1]; + + +static share_hash_fn_t *share_hash_fn_create(share_fl_t *fl, const char *path) { + share_hash_fn_t *fn = malloc(offsetof(share_hash_fn_t, fn) + strlen(path)+1); + fn->fl = fl; + fn->chunks = NULL; + fn->fd = -1; + strcpy(fn->fn, path); + fn->filesize = share_fl_size(fl); + + fn->chunksize = HASH_MIN_CHUNK; + while((fn->chunksize * (((uint64_t)1)<<(HASH_KEEP_LEVEL-1))) < fn->filesize) + fn->chunksize <<= 1; + + fn->chunknum = (fn->filesize + fn->chunksize - 1)/fn->chunksize; + if(!fn->chunknum) + fn->chunknum = 1; + + /* XXX: This feels like a roundabout way of calculating the distribution of + * chunks among jobs, could possibly be simplified. */ + int chunkperjob = HASH_MIN_JOB_SIZE / fn->chunksize; + if(chunkperjob < 2) + chunkperjob = 2; + + if((fn->chunknum / chunkperjob) > HASH_JOBS_PER_FILE) + chunkperjob = ((fn->chunknum-1) / HASH_JOBS_PER_FILE) + 1; + + fn->jobnum = 0; + int chunksleft = fn->chunknum; + while(chunksleft > 0) { + fn->work[fn->jobnum].w.data = fn; + fn->work[fn->jobnum].chunk_start = fn->chunknum - chunksleft; + fn->work[fn->jobnum].chunk_num = chunkperjob > chunksleft ? chunksleft : chunkperjob; + chunksleft -= fn->work[fn->jobnum].chunk_num; + fn->jobnum++; + assert(fn->jobnum <= HASH_JOBS_PER_FILE); + } + + ytrace("Queueing hash for '%s', fs=%"PRIu64", cs=%"PRIu64", cn=%d, cpj=%d, jn=%d", + fn->fn, fn->filesize, fn->chunksize, fn->chunknum, chunkperjob, fn->jobnum); + + /* The jobs aren't submitted to the thread pool yet (we first have to + * open() the file), but we already update obj->jobs here to indicate how + * many jobs have been scheduled. */ + obj->jobs += fn->jobnum; + + /* TODO: Queue database check */ + return fn; +} + + +static void share_hash_queue_process() { + if(obj->jobs >= HASH_JOBS || !obj->head) + return; + + size_t slot; + for(slot=0; slot<HASH_JOBS; slot++) + if(!obj->fns[slot]) + break; + assert(slot < HASH_JOBS); + + /* Find and dequeue next item. The share_t object is re-inserted at the end + * of the queue, causing files from different shares to be hashed in a + * round-robin fashion. This guarantees that no share will be starved of + * hashing progress. */ + share_t *s = obj->head; + plist_remove(*obj, h, s); + share_fl_t *fl = s->h.head; + list_remove(s->h, fl); + if(s->h.head) + plist_insert(*obj, h, s, NULL); + + /* TODO: Check whether the same file (as idenfitied by the path) is already + * being hashed. This happens when the same path is present multiple times + * in the same share or in multiple shares. */ + + kstring_t path = {}; + share_share_fl_path(s, fl, &path); + obj->fns[slot] = share_hash_fn_create(fl, path.s); + free(path.s); + + /* Process the next item in the queue while there's still some stuff to do */ + share_hash_queue_process(); +} + + +void share_hash_add(share_t *s, share_fl_t *fl) { + if(share_fl_hastth(fl)) { + /* TODO: Add to hash index */ + return; + } + + if(!s->h.head) + plist_insert(*obj, h, s, NULL); + list_insert(s->h, fl, NULL); + share_hash_queue_process(); +} + + +void share_hash_remove(share_t *s, share_fl_t *fl) { + if(!share_fl_hastth(fl)) { + /* TODO: Remove from hash index */ + return; + } + + size_t i; + for(i=0; i<HASH_JOBS; i++) + if(obj->fns[i] && obj->fns[i]->fl == fl) { + /* TODO: Cancel hashing and free fn object */ + obj->fns[i] = NULL; + return; + } + + list_remove(s->h, fl); + if(!s->h.head) + plist_remove(*obj, h, s); +} + + +/* vim: set noet sw=4 ts=4: */ diff --git a/src/share/hash.h b/src/share/hash.h new file mode 100644 index 0000000..ffe9339 --- /dev/null +++ b/src/share/hash.h @@ -0,0 +1,57 @@ +/* Copyright (c) 2012-2013 Yoran Heling + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SHARE_HASH_H +#define SHARE_HASH_H + +__KHASH_TYPE(hashfiles, share_fl_t *, char); + +typedef struct { + /* Set of share_fl_t items which have a non-zero hash, indexed by their tth + * field. */ + kh_hashfiles_t *hashfiles; + + /* Total size of all (hashed) shared files. Duplicates are only counted + * once. */ + uint64_t size; + + /* Queue of files to be hashed. */ + share_fl_t *head, *tail; + + /* List of share_t objects for which there is something to hash */ + share_t *next, *prev; +} share_hash_t; + + +/* Should be called whenever a new fl item has been added to the file list. If + * the item already has a hash, it is added to the in-memory hash index, + * otherwise it is added to the hash queue. When a file has been hashed, it is + * automatically added to the hash index and the (on-disk) database, and + * share_share_updatehash() is called. */ +void share_hash_add(share_t *s, share_fl_t *fl); + +/* Should be called whenever a file has been removed from the file list. */ +void share_hash_remove(share_t *s, share_fl_t *fl); + +#endif + +/* vim: set noet sw=4 ts=4: */ diff --git a/src/share/local.h b/src/share/local.h index 840333e..c59525f 100644 --- a/src/share/local.h +++ b/src/share/local.h @@ -35,6 +35,7 @@ typedef vec_t(share_fl_t *) share_flv_t; #include <share/conf.h> #include <share/db.h> #include <share/fl.h> +#include <share/hash.h> #include <share/scan.h> #include <share/share.h> diff --git a/src/share/scan.c b/src/share/scan.c index b480ddb..9eab779 100644 --- a/src/share/scan.c +++ b/src/share/scan.c @@ -247,28 +247,30 @@ static void share_scan_work(evtp_work_t *_w) { } -static void share_scan_merge_new(share_fl_t *dir, share_fl_t *fl) { + +static void share_scan_merge_new(share_t *s, share_fl_t *dir, share_fl_t *fl) { fl->parent = dir; + if(!share_fl_isdir(fl)) + share_hash_add(s, fl); /* TODO: Add to sharefiles table */ - /* TODO: Add to hash queue */ } -static void share_scan_merge_del(share_fl_t *fl) { - /* TODO: Remove from hash index or queue */ - /* TODO: Remove from sharefiles table */ - +static void share_scan_merge_del(share_t *s, share_fl_t *fl) { if(share_fl_isdir(fl)) { size_t i; for(i=0; i<fl->sub.n; i++) - share_scan_merge_del(fl->sub.a[i]); + share_scan_merge_del(s, fl->sub.a[i]); free(fl->sub.a); + } else { + share_hash_remove(s, fl); + /* TODO: Remove from sharefiles table */ } free(fl); } -static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) { +static void share_scan_merge(share_t *s, share_fl_t *dst, share_flv_t *lst) { size_t li = 0, di = 0; while(li < lst->n || di < dst->sub.n) { @@ -279,7 +281,7 @@ static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) { /* lf < df, meaning an item in lst is not in dst */ if(cmp < 0) { vec_insert_order(dst->sub, di, lf); - share_scan_merge_new(dst, lf); + share_scan_merge_new(s, dst, lf); di++; li++; continue; @@ -287,7 +289,7 @@ static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) { /* lf > df, meaning an item in dst is not in lst */ if(cmp > 0) { - share_scan_merge_del(df); + share_scan_merge_del(s, df); vec_remove_order(dst->sub, di); continue; } @@ -296,8 +298,8 @@ static void share_scan_merge(share_fl_t *dst, share_flv_t *lst) { if(share_fl_isdir(lf) != share_fl_isdir(df) || (!share_fl_isdir(lf) && (share_fl_size(lf) != share_fl_size(df) || lf->lastmod != df->lastmod || lf->pathid != lf->pathid)) || casestr_cmp(share_fl_name(lf), share_fl_name(df)) != 0) { - share_scan_merge_del(df); - share_scan_merge_new(dst, lf); + share_scan_merge_del(s, df); + share_scan_merge_new(s, dst, lf); dst->sub.a[di] = lf; } di++; @@ -337,7 +339,7 @@ static void share_scan_work_done(evtp_work_t *_w) { } free(sc->work); - share_scan_merge(sc->dir, &dest); + share_scan_merge(sc->s, sc->dir, &dest); free(dest.a); share_scan_next(sc); } @@ -404,7 +406,7 @@ static void share_scan_dir(share_scan_t *sc) { free(vpath.s); if(!work.n) { - share_scan_merge(sc->dir, &dest); + share_scan_merge(sc->s, sc->dir, &dest); share_scan_next(sc); } else { vec_insert_order(work, work.n, NULL); diff --git a/src/share/share.h b/src/share/share.h index 133c336..3e857c6 100644 --- a/src/share/share.h +++ b/src/share/share.h @@ -35,6 +35,9 @@ struct share_t { share_fl_t *root; share_scan_t *sc; + + /* Managed by share/hash.c */ + share_hash_t h; }; |