summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2022-01-11 12:40:20 +0100
committerYorhel <git@yorhel.nl>2022-02-07 14:13:36 +0100
commitc27dca1fba1e37474cf6631a42f678f168009b6c (patch)
treea57be69c700a137ac667f748c4dc5e7a72c49848
parent35dd631e557fc19d16aa933ed1ae45e6b30da9cc (diff)
Parallel scanning: early proof-of-concept implementation
And it's not looking well; this implementation seems to be 3x slower in the hot cache scenario with -J8, which is a major regression. There's way too much lock contention and context switching. Haven't tested with actual disk I/O yet and I've not yet measured how much parallelism this approach will actually get us in practice, nor whether the disk access patterns of this approach make a whole lot of sense. Maybe this low-memory approach will not work out and I'll end up rewriting this to scan disjoint subtrees after all. TODO: - Validate how much parallelism we can actually get with this algorithm - Lots of benchmarking and tuning (and most likely some re-architecting) - Re-implement exclude pattern matching - Document -J option - Make OOM handling thread-safe
-rw-r--r--src/main.zig7
-rw-r--r--src/scan.zig369
-rw-r--r--src/ui.zig1
3 files changed, 300 insertions, 77 deletions
diff --git a/src/main.zig b/src/main.zig
index 37c7e2d..bffe98e 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -42,6 +42,7 @@ pub const config = struct {
pub var same_fs: bool = false;
pub var extended: bool = false;
+ pub var parallel: u8 = 1;
pub var follow_symlinks: bool = false;
pub var exclude_caches: bool = false;
pub var exclude_kernfs: bool = false;
@@ -158,7 +159,11 @@ const Args = struct {
fn argConfig(args: *Args, opt: Args.Option) bool {
if (opt.is("-q") or opt.is("--slow-ui-updates")) config.update_delay = 2*std.time.ns_per_s
else if (opt.is("--fast-ui-updates")) config.update_delay = 100*std.time.ns_per_ms
- else if (opt.is("-x") or opt.is("--one-file-system")) config.same_fs = true
+ else if (opt.is("-J")) {
+ const val = args.arg();
+ config.parallel = std.fmt.parseInt(u8, val, 10) catch ui.die("Invalid argument to -J: {s}, expected number.\n", .{val});
+ if (config.parallel == 0) ui.die("Number of threads (-J) cannot be 0.\n", .{});
+ } else if (opt.is("-x") or opt.is("--one-file-system")) config.same_fs = true
else if (opt.is("--cross-file-system")) config.same_fs = false
else if (opt.is("-e") or opt.is("--extended")) config.extended = true
else if (opt.is("--no-extended")) config.extended = false
diff --git a/src/scan.zig b/src/scan.zig
index b2a6d8b..baf1281 100644
--- a/src/scan.zig
+++ b/src/scan.zig
@@ -53,11 +53,15 @@ const Stat = struct {
}
};
-var kernfs_cache: std.AutoHashMap(u64,bool) = std.AutoHashMap(u64,bool).init(main.allocator);
-
// This function only works on Linux
fn isKernfs(dir: std.fs.Dir, dev: u64) bool {
- if (kernfs_cache.get(dev)) |e| return e;
+ const state = struct {
+ var cache = std.AutoHashMap(u64,bool).init(main.allocator);
+ var lock = std.Thread.Mutex{};
+ };
+ state.lock.lock();
+ defer state.lock.unlock();
+ if (state.cache.get(dev)) |e| return e;
var buf: c_statfs.struct_statfs = undefined;
if (c_statfs.fstatfs(dir.fd, &buf) != 0) return false; // silently ignoring errors isn't too nice.
const iskern = switch (buf.f_type) {
@@ -77,7 +81,7 @@ fn isKernfs(dir: std.fs.Dir, dev: u64) bool {
=> true,
else => false,
};
- kernfs_cache.put(dev, iskern) catch {};
+ state.cache.put(dev, iskern) catch {};
return iskern;
}
@@ -103,10 +107,10 @@ fn writeJsonString(wr: anytype, s: []const u8) !void {
try wr.writeByte('"');
}
-// A ScanDir represents an in-memory directory listing (i.e. model.Dir) where
+// A MemDir represents an in-memory directory listing (i.e. model.Dir) where
// entries read from disk can be merged into, without doing an O(1) lookup for
// each entry.
-const ScanDir = struct {
+const MemDir = struct {
dir: *model.Dir,
// Lookup table for name -> *entry.
@@ -263,7 +267,7 @@ const ScanDir = struct {
//
const Context = struct {
// When scanning to RAM
- parents: ?std.ArrayList(ScanDir) = null,
+ parents: ?std.ArrayList(MemDir) = null,
// When scanning to a file
wr: ?*Writer = null,
@@ -303,8 +307,8 @@ const Context = struct {
fn initMem(dir: ?*model.Dir) *Self {
var self = main.allocator.create(Self) catch unreachable;
- self.* = .{ .parents = std.ArrayList(ScanDir).init(main.allocator) };
- if (dir) |d| self.parents.?.append(ScanDir.init(d)) catch unreachable;
+ self.* = .{ .parents = std.ArrayList(MemDir).init(main.allocator) };
+ if (dir) |d| self.parents.?.append(MemDir.init(d)) catch unreachable;
return self;
}
@@ -427,7 +431,7 @@ const Context = struct {
p.items[p.items.len-1].addStat(self.name, &self.stat);
if (e.dir()) |d| // Enter the directory
- p.append(ScanDir.init(d)) catch unreachable;
+ p.append(MemDir.init(d)) catch unreachable;
} else if (self.wr) |wr|
self.writeStat(wr.writer(), dir_dev) catch |e| writeErr(e);
@@ -451,90 +455,304 @@ const Context = struct {
// Context that is currently being used for scanning.
var active_context: *Context = undefined;
-// Read and index entries of the given dir.
-fn scanDir(ctx: *Context, dir: std.fs.Dir, dir_dev: u64) void {
- var it = main.allocator.create(std.fs.Dir.Iterator) catch unreachable;
- defer main.allocator.destroy(it);
- it.* = dir.iterate();
- while(true) {
- const entry = it.next() catch {
- ctx.setDirlistError();
- return;
- } orelse break;
-
- ctx.stat.dir = false;
- ctx.pushPath(entry.name);
- defer ctx.popPath();
- main.handleEvent(false, false);
-
- // XXX: This algorithm is extremely slow, can be optimized with some clever pattern parsing.
- const excluded = blk: {
- for (main.config.exclude_patterns.items) |pat| {
- var path = ctx.pathZ();
- while (path.len > 0) {
- if (c_fnmatch.fnmatch(pat, path, 0) == 0) break :blk true;
- if (std.mem.indexOfScalar(u8, path, '/')) |idx| path = path[idx+1..:0]
- else break;
- }
- }
- break :blk false;
- };
- if (excluded) {
- ctx.addSpecial(.excluded);
- continue;
+
+// The following filesystem scanning implementation is designed to support
+// some degree of parallelism while generating a serialized tree without
+// consuming ~too~ much memory.
+//
+// It would likely be easier and more efficient to have each thread work on a
+// completely sparate branch of the filesystem tree, but our current JSON
+// export format requires that entries are output in a certain order, which
+// means we either need to construct the full tree in memory before generating
+// any output (which I'd really rather not do), or we're stuck scanning the
+// filesystem in the required order and lose some opportunities for
+// parallelism. This is an attempt at doing the latter.
+const scanner = struct {
+ var tail: *Level = undefined;
+ // Currently used to protect both the scan stack state and the output
+ // Context, may be worth trying to split in two.
+ var lock = std.Thread.Mutex{};
+ var cond = std.Thread.Condition{};
+
+ // Number of stat() calls to batch in a single task; This little thread
+ // pool implementation is pretty damn inefficient, so batching helps cut
+ // down on synchronization overhead. Can be removed if we ever manage to
+ // migrate to a more efficient thread pool.
+ const BATCH: usize = 128;
+
+ // Maximum number of name lists to keep for each level in the stack. Higher
+ // number means potentially higher degree of parallelism, but comes at the
+ // cost of potentially higher memory and file descriptor use.
+ const SUBDIRS_PER_LEVEL: u8 = 8;
+
+ const StatEntry = struct {
+ name: [:0]u8,
+ stat: Stat,
+ };
+
+ const SpecialEntry = struct {
+ name: [:0]u8,
+ t: Context.Special,
+ };
+
+ const NextLevel = struct {
+ name: [:0]u8,
+ stat: Stat,
+ level: *Level, // XXX: Only 'dir', 'names', and 'specials' are really relevant here
+ };
+
+ // Represents a directory that is being scanned.
+ const Level = struct {
+ dir: std.fs.Dir,
+ dir_dev: u64,
+ dirListError: bool = false,
+ names: std.ArrayListUnmanaged([:0]u8) = .{}, // Queue of names to stat()
+ names_busy: u8 = 0, // Number of threads running stat()
+ files: std.ArrayListUnmanaged(StatEntry) = .{}, // Queue of files we can output
+ specials: std.ArrayListUnmanaged(SpecialEntry) = .{}, // Queue of "special" items we can output
+ dirs: std.ArrayListUnmanaged(StatEntry) = .{}, // Queue of dirs we can read
+ dirs_busy: u8 = 0, // Number of 'dirs' being processed at the moment
+ next: std.ArrayListUnmanaged(NextLevel) = .{}, // Queue of subdirs to scan next
+ sub: ?*Level = null, // Subdirectory currently being scanned
+ parent: ?*Level,
+
+ // Assumption: all queues are empty
+ fn destroy(lvl: *Level) void {
+ lvl.dir.close();
+ lvl.names.deinit(main.allocator);
+ lvl.files.deinit(main.allocator);
+ lvl.specials.deinit(main.allocator);
+ lvl.dirs.deinit(main.allocator);
+ lvl.next.deinit(main.allocator);
+ main.allocator.destroy(lvl);
}
+ };
- ctx.stat = Stat.read(dir, ctx.name, false) catch {
- ctx.addSpecial(.err);
- continue;
- };
+ // Drain the output queue ('files', 'specials') if we can.
+ // Assumes we hold the lock.
+ fn outputQueue(lvl: *Level) void {
+ if (lvl.sub != null) return;
- if (main.config.same_fs and ctx.stat.dev != dir_dev) {
- ctx.addSpecial(.other_fs);
- continue;
+ if (lvl.dirListError) {
+ active_context.setDirlistError();
+ lvl.dirListError = false;
}
- if (main.config.follow_symlinks and ctx.stat.symlink) {
- if (Stat.read(dir, ctx.name, true)) |nstat| {
- if (!nstat.dir) {
- ctx.stat = nstat;
- // Symlink targets may reside on different filesystems,
- // this will break hardlink detection and counting so let's disable it.
- if (ctx.stat.hlinkc and ctx.stat.dev != dir_dev)
- ctx.stat.hlinkc = false;
- }
- } else |_| {}
+ for (lvl.specials.items) |e| {
+ active_context.stat.dir = false;
+ active_context.pushPath(e.name);
+ active_context.addSpecial(e.t);
+ active_context.popPath();
+ main.allocator.free(e.name);
+ }
+ for (lvl.files.items) |e| {
+ // TODO: ctx API is inefficient here, no need to copy that Stat
+ active_context.stat.dir = false;
+ active_context.pushPath(e.name);
+ active_context.stat = e.stat;
+ active_context.addStat(lvl.dir_dev);
+ active_context.popPath();
+ main.allocator.free(e.name);
+ }
+ lvl.specials.clearRetainingCapacity();
+ lvl.files.clearRetainingCapacity();
+ }
+
+ // Leave the current dir if we're done with it and find a new dir to enter.
+ fn navigate() void {
+ //std.debug.print("ctx={s}, names={} dirs={} next={}\n", .{ active_context.path.items, tail.names.items.len, tail.dirs.items.len, tail.next.items.len });
+
+ // Assumption: outputQueue() has been called on the tail, so
+ // 'files' and 'specials' are always empty.
+ while (tail.parent != null and tail.sub == null
+ and tail.names.items.len == 0 and tail.names_busy == 0
+ and tail.dirs.items.len == 0 and tail.dirs_busy == 0
+ and tail.next.items.len == 0
+ ) {
+ //std.debug.print("Pop\n", .{});
+ active_context.popPath();
+ const lvl = tail;
+ lvl.parent.?.sub = null;
+ tail = lvl.parent.?;
+ lvl.destroy();
+ outputQueue(tail);
}
+ if (tail.sub == null and tail.next.items.len > 0) {
+ const sub = tail.next.pop();
+ //std.debug.print("Push {s}\n", .{sub.name});
+ active_context.pushPath(sub.name);
+ active_context.stat = sub.stat;
+ active_context.addStat(tail.dir_dev);
+ main.allocator.free(sub.name);
+ tail.sub = sub.level;
+ tail = sub.level;
+ }
+
+ // TODO: Only wake up threads when there's enough new work queued, all
+ // that context switching is SLOW.
+ cond.broadcast();
+ }
- var edir =
- if (ctx.stat.dir) dir.openDirZ(ctx.name, .{ .access_sub_paths = true, .iterate = true, .no_follow = true }) catch {
- ctx.addSpecial(.err);
- continue;
- } else null;
- defer if (edir != null) edir.?.close();
+ fn readNamesDir(lvl: *Level) void {
+ var it = lvl.dir.iterate();
+ while (true) {
+ const entry = it.next() catch {
+ lvl.dirListError = true;
+ break;
+ } orelse break;
- if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and ctx.stat.dir and isKernfs(edir.?, ctx.stat.dev)) {
- ctx.addSpecial(.kernfs);
- continue;
+ // TODO: Check for exclude patterns
+
+ lvl.names.append(main.allocator, main.allocator.dupeZ(u8, entry.name) catch unreachable) catch unreachable;
}
+ }
+
+ fn readNames(parent: *Level) void {
+ const stat = parent.dirs.pop();
+ lock.unlock();
- if (main.config.exclude_caches and ctx.stat.dir) {
- if (edir.?.openFileZ("CACHEDIR.TAG", .{})) |f| {
+ var dir = parent.dir.openDirZ(stat.name, .{ .access_sub_paths = true, .iterate = true, .no_follow = true }) catch {
+ lock.lock();
+ parent.specials.append(main.allocator, .{ .name = stat.name, .t = .err }) catch unreachable;
+ return;
+ };
+
+ if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and isKernfs(dir, stat.stat.dev)) {
+ lock.lock();
+ parent.specials.append(main.allocator, .{ .name = stat.name, .t = .kernfs }) catch unreachable;
+ return;
+ }
+
+ if (main.config.exclude_caches) {
+ if (dir.openFileZ("CACHEDIR.TAG", .{})) |f| {
const sig = "Signature: 8a477f597d28d172789f06886806bc55";
var buf: [sig.len]u8 = undefined;
if (f.reader().readAll(&buf)) |len| {
if (len == sig.len and std.mem.eql(u8, &buf, sig)) {
- ctx.addSpecial(.excluded);
- continue;
+ lock.lock();
+ parent.specials.append(main.allocator, .{ .name = stat.name, .t = .excluded }) catch unreachable;
+ return;
}
} else |_| {}
} else |_| {}
}
- ctx.addStat(dir_dev);
- if (ctx.stat.dir) scanDir(ctx, edir.?, ctx.stat.dev);
+ var lvl = main.allocator.create(Level) catch unreachable;
+ lvl.* = .{ .dir = dir, .dir_dev = stat.stat.dev, .parent = parent };
+ readNamesDir(lvl);
+
+ lock.lock();
+ // Treat empty directories as files
+ if (lvl.names.items.len == 0 and lvl.specials.items.len == 0) {
+ if (lvl.dirListError) { // XXX: this loses information about this entry being a directory :(
+ parent.specials.append(main.allocator, .{ .name = stat.name, .t = .err }) catch unreachable;
+ } else
+ parent.files.append(main.allocator, stat) catch unreachable;
+ dir.close();
+ main.allocator.destroy(lvl);
+ } else {
+ parent.next.append(main.allocator, .{ .name = stat.name, .stat = stat.stat, .level = lvl }) catch unreachable;
+ }
}
-}
+
+ fn statNames(lvl: *Level) void {
+ var names: [BATCH][:0]u8 = undefined;
+ var stats: [BATCH]Stat = undefined;
+ var errs: [BATCH]bool = undefined;
+ const len = std.math.min(names.len, lvl.names.items.len);
+ std.mem.copy([]u8, &names, lvl.names.items[lvl.names.items.len-len..]);
+ lvl.names.items.len -= len;
+ lock.unlock();
+
+ var i: usize = 0;
+ while (i < len) : (i += 1) {
+ if (Stat.read(lvl.dir, names[i], false)) |s| {
+ errs[i] = false;
+ if (main.config.follow_symlinks and s.symlink) {
+ if (Stat.read(lvl.dir, names[i], true)) |nstat| {
+ if (!nstat.dir) {
+ stats[i] = nstat;
+ // Symlink targets may reside on different filesystems,
+ // this will break hardlink detection and counting so let's disable it.
+ if (nstat.hlinkc and nstat.dev != lvl.dir_dev)
+ stats[i].hlinkc = false;
+ }
+ } else |_| stats[i] = s;
+ } else stats[i] = s;
+
+ } else |_|
+ errs[i] = true;
+ }
+
+ lock.lock();
+ i = 0;
+ while (i < len) : (i += 1) {
+ if (errs[i])
+ lvl.specials.append(main.allocator, .{ .name = names[i], .t = .err }) catch unreachable
+ else if (main.config.same_fs and stats[i].dev != lvl.dir_dev)
+ lvl.specials.append(main.allocator, .{ .name = names[i], .t = .other_fs }) catch unreachable
+ else if (stats[i].dir)
+ lvl.dirs.append(main.allocator, .{ .name = names[i], .stat = stats[i] }) catch unreachable
+ else
+ lvl.files.append(main.allocator, .{ .name = names[i], .stat = stats[i] }) catch unreachable;
+ }
+ }
+
+ fn runThread() void {
+ lock.lock();
+ outer: while (true) {
+ var curlvl: ?*Level = tail;
+ while (curlvl) |lvl| : (curlvl = lvl.parent) {
+
+ // If we have subdirectories to read, do that first to keep the 'names' queues filled up.
+ if (lvl.dirs.items.len > 0 and lvl.dirs_busy + lvl.next.items.len < SUBDIRS_PER_LEVEL) {
+ lvl.dirs_busy += 1;
+ readNames(lvl);
+ lvl.dirs_busy -= 1;
+ outputQueue(lvl);
+ navigate();
+ continue :outer;
+ }
+
+ // Then look for names to stat
+ if (lvl.names.items.len > 0) {
+ lvl.names_busy += 1;
+ statNames(lvl);
+ lvl.names_busy -= 1;
+ outputQueue(lvl);
+ navigate();
+ continue :outer;
+ }
+ }
+
+ // If we're here, then we found no work to do.
+ if (tail.parent == null and tail.dirs_busy == 0 and tail.names_busy == 0) {
+ cond.broadcast(); // only necessary if we don't always wake up threads when there's work to do.
+ break;
+ }
+ cond.wait(&lock);
+ }
+ lock.unlock();
+ }
+ // TODO: progress UI
+
+ // Scan the given dir. The active_context is assumed to have been
+ // initialized already and the entry for the given *dir has already been
+ // output.
+ // The given dir is closed when done.
+ fn scan(dir: std.fs.Dir, dir_dev: u64) void {
+ tail = main.allocator.create(Level) catch unreachable;
+ tail.* = .{ .dir = dir, .dir_dev = dir_dev, .parent = null };
+ readNamesDir(tail);
+ var threads = main.allocator.alloc(std.Thread, main.config.parallel-1) catch unreachable;
+ for (threads) |*t| t.* = std.Thread.spawn(.{ .stack_size = 128*1024 }, runThread, .{}) catch unreachable;
+ runThread();
+ for (threads) |*t| t.join();
+ tail.destroy();
+ tail = undefined;
+ }
+};
+
pub fn scanRoot(path: []const u8, out: ?std.fs.File) !void {
active_context = if (out) |f| Context.initFile(f) else Context.initMem(null);
@@ -569,8 +787,7 @@ pub fn scan() void {
main.handleEvent(true, true);
return;
};
- defer dir.close();
- scanDir(active_context, dir, active_context.stat.dev);
+ scanner.scan(dir, active_context.stat.dev);
active_context.popPath();
active_context.final();
}
diff --git a/src/ui.zig b/src/ui.zig
index 1d92077..5409470 100644
--- a/src/ui.zig
+++ b/src/ui.zig
@@ -42,6 +42,7 @@ pub fn quit() noreturn {
// Also, init() and other ncurses-related functions may have hidden allocation,
// no clue if ncurses will consistently report OOM, but we're not handling that
// right now.
+// TODO: Make thread-safe.
pub fn oom() void {
const haveui = inited;
deinit();