diff options
author | Yorhel <git@yorhel.nl> | 2022-01-11 12:40:20 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2022-02-07 14:13:36 +0100 |
commit | c27dca1fba1e37474cf6631a42f678f168009b6c (patch) | |
tree | a57be69c700a137ac667f748c4dc5e7a72c49848 | |
parent | 35dd631e557fc19d16aa933ed1ae45e6b30da9cc (diff) |
Parallel scanning: early proof-of-concept implementation
And it's not looking well; this implementation seems to be 3x slower in
the hot cache scenario with -J8, which is a major regression. There's
way too much lock contention and context switching.
Haven't tested with actual disk I/O yet and I've not yet measured how
much parallelism this approach will actually get us in practice, nor
whether the disk access patterns of this approach make a whole lot of
sense. Maybe this low-memory approach will not work out and I'll end up
rewriting this to scan disjoint subtrees after all.
TODO:
- Validate how much parallelism we can actually get with this algorithm
- Lots of benchmarking and tuning (and most likely some re-architecting)
- Re-implement exclude pattern matching
- Document -J option
- Make OOM handling thread-safe
-rw-r--r-- | src/main.zig | 7 | ||||
-rw-r--r-- | src/scan.zig | 369 | ||||
-rw-r--r-- | src/ui.zig | 1 |
3 files changed, 300 insertions, 77 deletions
diff --git a/src/main.zig b/src/main.zig index 37c7e2d..bffe98e 100644 --- a/src/main.zig +++ b/src/main.zig @@ -42,6 +42,7 @@ pub const config = struct { pub var same_fs: bool = false; pub var extended: bool = false; + pub var parallel: u8 = 1; pub var follow_symlinks: bool = false; pub var exclude_caches: bool = false; pub var exclude_kernfs: bool = false; @@ -158,7 +159,11 @@ const Args = struct { fn argConfig(args: *Args, opt: Args.Option) bool { if (opt.is("-q") or opt.is("--slow-ui-updates")) config.update_delay = 2*std.time.ns_per_s else if (opt.is("--fast-ui-updates")) config.update_delay = 100*std.time.ns_per_ms - else if (opt.is("-x") or opt.is("--one-file-system")) config.same_fs = true + else if (opt.is("-J")) { + const val = args.arg(); + config.parallel = std.fmt.parseInt(u8, val, 10) catch ui.die("Invalid argument to -J: {s}, expected number.\n", .{val}); + if (config.parallel == 0) ui.die("Number of threads (-J) cannot be 0.\n", .{}); + } else if (opt.is("-x") or opt.is("--one-file-system")) config.same_fs = true else if (opt.is("--cross-file-system")) config.same_fs = false else if (opt.is("-e") or opt.is("--extended")) config.extended = true else if (opt.is("--no-extended")) config.extended = false diff --git a/src/scan.zig b/src/scan.zig index b2a6d8b..baf1281 100644 --- a/src/scan.zig +++ b/src/scan.zig @@ -53,11 +53,15 @@ const Stat = struct { } }; -var kernfs_cache: std.AutoHashMap(u64,bool) = std.AutoHashMap(u64,bool).init(main.allocator); - // This function only works on Linux fn isKernfs(dir: std.fs.Dir, dev: u64) bool { - if (kernfs_cache.get(dev)) |e| return e; + const state = struct { + var cache = std.AutoHashMap(u64,bool).init(main.allocator); + var lock = std.Thread.Mutex{}; + }; + state.lock.lock(); + defer state.lock.unlock(); + if (state.cache.get(dev)) |e| return e; var buf: c_statfs.struct_statfs = undefined; if (c_statfs.fstatfs(dir.fd, &buf) != 0) return false; // silently ignoring errors isn't too nice. const iskern = switch (buf.f_type) { @@ -77,7 +81,7 @@ fn isKernfs(dir: std.fs.Dir, dev: u64) bool { => true, else => false, }; - kernfs_cache.put(dev, iskern) catch {}; + state.cache.put(dev, iskern) catch {}; return iskern; } @@ -103,10 +107,10 @@ fn writeJsonString(wr: anytype, s: []const u8) !void { try wr.writeByte('"'); } -// A ScanDir represents an in-memory directory listing (i.e. model.Dir) where +// A MemDir represents an in-memory directory listing (i.e. model.Dir) where // entries read from disk can be merged into, without doing an O(1) lookup for // each entry. -const ScanDir = struct { +const MemDir = struct { dir: *model.Dir, // Lookup table for name -> *entry. @@ -263,7 +267,7 @@ const ScanDir = struct { // const Context = struct { // When scanning to RAM - parents: ?std.ArrayList(ScanDir) = null, + parents: ?std.ArrayList(MemDir) = null, // When scanning to a file wr: ?*Writer = null, @@ -303,8 +307,8 @@ const Context = struct { fn initMem(dir: ?*model.Dir) *Self { var self = main.allocator.create(Self) catch unreachable; - self.* = .{ .parents = std.ArrayList(ScanDir).init(main.allocator) }; - if (dir) |d| self.parents.?.append(ScanDir.init(d)) catch unreachable; + self.* = .{ .parents = std.ArrayList(MemDir).init(main.allocator) }; + if (dir) |d| self.parents.?.append(MemDir.init(d)) catch unreachable; return self; } @@ -427,7 +431,7 @@ const Context = struct { p.items[p.items.len-1].addStat(self.name, &self.stat); if (e.dir()) |d| // Enter the directory - p.append(ScanDir.init(d)) catch unreachable; + p.append(MemDir.init(d)) catch unreachable; } else if (self.wr) |wr| self.writeStat(wr.writer(), dir_dev) catch |e| writeErr(e); @@ -451,90 +455,304 @@ const Context = struct { // Context that is currently being used for scanning. var active_context: *Context = undefined; -// Read and index entries of the given dir. -fn scanDir(ctx: *Context, dir: std.fs.Dir, dir_dev: u64) void { - var it = main.allocator.create(std.fs.Dir.Iterator) catch unreachable; - defer main.allocator.destroy(it); - it.* = dir.iterate(); - while(true) { - const entry = it.next() catch { - ctx.setDirlistError(); - return; - } orelse break; - - ctx.stat.dir = false; - ctx.pushPath(entry.name); - defer ctx.popPath(); - main.handleEvent(false, false); - - // XXX: This algorithm is extremely slow, can be optimized with some clever pattern parsing. - const excluded = blk: { - for (main.config.exclude_patterns.items) |pat| { - var path = ctx.pathZ(); - while (path.len > 0) { - if (c_fnmatch.fnmatch(pat, path, 0) == 0) break :blk true; - if (std.mem.indexOfScalar(u8, path, '/')) |idx| path = path[idx+1..:0] - else break; - } - } - break :blk false; - }; - if (excluded) { - ctx.addSpecial(.excluded); - continue; + +// The following filesystem scanning implementation is designed to support +// some degree of parallelism while generating a serialized tree without +// consuming ~too~ much memory. +// +// It would likely be easier and more efficient to have each thread work on a +// completely sparate branch of the filesystem tree, but our current JSON +// export format requires that entries are output in a certain order, which +// means we either need to construct the full tree in memory before generating +// any output (which I'd really rather not do), or we're stuck scanning the +// filesystem in the required order and lose some opportunities for +// parallelism. This is an attempt at doing the latter. +const scanner = struct { + var tail: *Level = undefined; + // Currently used to protect both the scan stack state and the output + // Context, may be worth trying to split in two. + var lock = std.Thread.Mutex{}; + var cond = std.Thread.Condition{}; + + // Number of stat() calls to batch in a single task; This little thread + // pool implementation is pretty damn inefficient, so batching helps cut + // down on synchronization overhead. Can be removed if we ever manage to + // migrate to a more efficient thread pool. + const BATCH: usize = 128; + + // Maximum number of name lists to keep for each level in the stack. Higher + // number means potentially higher degree of parallelism, but comes at the + // cost of potentially higher memory and file descriptor use. + const SUBDIRS_PER_LEVEL: u8 = 8; + + const StatEntry = struct { + name: [:0]u8, + stat: Stat, + }; + + const SpecialEntry = struct { + name: [:0]u8, + t: Context.Special, + }; + + const NextLevel = struct { + name: [:0]u8, + stat: Stat, + level: *Level, // XXX: Only 'dir', 'names', and 'specials' are really relevant here + }; + + // Represents a directory that is being scanned. + const Level = struct { + dir: std.fs.Dir, + dir_dev: u64, + dirListError: bool = false, + names: std.ArrayListUnmanaged([:0]u8) = .{}, // Queue of names to stat() + names_busy: u8 = 0, // Number of threads running stat() + files: std.ArrayListUnmanaged(StatEntry) = .{}, // Queue of files we can output + specials: std.ArrayListUnmanaged(SpecialEntry) = .{}, // Queue of "special" items we can output + dirs: std.ArrayListUnmanaged(StatEntry) = .{}, // Queue of dirs we can read + dirs_busy: u8 = 0, // Number of 'dirs' being processed at the moment + next: std.ArrayListUnmanaged(NextLevel) = .{}, // Queue of subdirs to scan next + sub: ?*Level = null, // Subdirectory currently being scanned + parent: ?*Level, + + // Assumption: all queues are empty + fn destroy(lvl: *Level) void { + lvl.dir.close(); + lvl.names.deinit(main.allocator); + lvl.files.deinit(main.allocator); + lvl.specials.deinit(main.allocator); + lvl.dirs.deinit(main.allocator); + lvl.next.deinit(main.allocator); + main.allocator.destroy(lvl); } + }; - ctx.stat = Stat.read(dir, ctx.name, false) catch { - ctx.addSpecial(.err); - continue; - }; + // Drain the output queue ('files', 'specials') if we can. + // Assumes we hold the lock. + fn outputQueue(lvl: *Level) void { + if (lvl.sub != null) return; - if (main.config.same_fs and ctx.stat.dev != dir_dev) { - ctx.addSpecial(.other_fs); - continue; + if (lvl.dirListError) { + active_context.setDirlistError(); + lvl.dirListError = false; } - if (main.config.follow_symlinks and ctx.stat.symlink) { - if (Stat.read(dir, ctx.name, true)) |nstat| { - if (!nstat.dir) { - ctx.stat = nstat; - // Symlink targets may reside on different filesystems, - // this will break hardlink detection and counting so let's disable it. - if (ctx.stat.hlinkc and ctx.stat.dev != dir_dev) - ctx.stat.hlinkc = false; - } - } else |_| {} + for (lvl.specials.items) |e| { + active_context.stat.dir = false; + active_context.pushPath(e.name); + active_context.addSpecial(e.t); + active_context.popPath(); + main.allocator.free(e.name); + } + for (lvl.files.items) |e| { + // TODO: ctx API is inefficient here, no need to copy that Stat + active_context.stat.dir = false; + active_context.pushPath(e.name); + active_context.stat = e.stat; + active_context.addStat(lvl.dir_dev); + active_context.popPath(); + main.allocator.free(e.name); + } + lvl.specials.clearRetainingCapacity(); + lvl.files.clearRetainingCapacity(); + } + + // Leave the current dir if we're done with it and find a new dir to enter. + fn navigate() void { + //std.debug.print("ctx={s}, names={} dirs={} next={}\n", .{ active_context.path.items, tail.names.items.len, tail.dirs.items.len, tail.next.items.len }); + + // Assumption: outputQueue() has been called on the tail, so + // 'files' and 'specials' are always empty. + while (tail.parent != null and tail.sub == null + and tail.names.items.len == 0 and tail.names_busy == 0 + and tail.dirs.items.len == 0 and tail.dirs_busy == 0 + and tail.next.items.len == 0 + ) { + //std.debug.print("Pop\n", .{}); + active_context.popPath(); + const lvl = tail; + lvl.parent.?.sub = null; + tail = lvl.parent.?; + lvl.destroy(); + outputQueue(tail); } + if (tail.sub == null and tail.next.items.len > 0) { + const sub = tail.next.pop(); + //std.debug.print("Push {s}\n", .{sub.name}); + active_context.pushPath(sub.name); + active_context.stat = sub.stat; + active_context.addStat(tail.dir_dev); + main.allocator.free(sub.name); + tail.sub = sub.level; + tail = sub.level; + } + + // TODO: Only wake up threads when there's enough new work queued, all + // that context switching is SLOW. + cond.broadcast(); + } - var edir = - if (ctx.stat.dir) dir.openDirZ(ctx.name, .{ .access_sub_paths = true, .iterate = true, .no_follow = true }) catch { - ctx.addSpecial(.err); - continue; - } else null; - defer if (edir != null) edir.?.close(); + fn readNamesDir(lvl: *Level) void { + var it = lvl.dir.iterate(); + while (true) { + const entry = it.next() catch { + lvl.dirListError = true; + break; + } orelse break; - if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and ctx.stat.dir and isKernfs(edir.?, ctx.stat.dev)) { - ctx.addSpecial(.kernfs); - continue; + // TODO: Check for exclude patterns + + lvl.names.append(main.allocator, main.allocator.dupeZ(u8, entry.name) catch unreachable) catch unreachable; } + } + + fn readNames(parent: *Level) void { + const stat = parent.dirs.pop(); + lock.unlock(); - if (main.config.exclude_caches and ctx.stat.dir) { - if (edir.?.openFileZ("CACHEDIR.TAG", .{})) |f| { + var dir = parent.dir.openDirZ(stat.name, .{ .access_sub_paths = true, .iterate = true, .no_follow = true }) catch { + lock.lock(); + parent.specials.append(main.allocator, .{ .name = stat.name, .t = .err }) catch unreachable; + return; + }; + + if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and isKernfs(dir, stat.stat.dev)) { + lock.lock(); + parent.specials.append(main.allocator, .{ .name = stat.name, .t = .kernfs }) catch unreachable; + return; + } + + if (main.config.exclude_caches) { + if (dir.openFileZ("CACHEDIR.TAG", .{})) |f| { const sig = "Signature: 8a477f597d28d172789f06886806bc55"; var buf: [sig.len]u8 = undefined; if (f.reader().readAll(&buf)) |len| { if (len == sig.len and std.mem.eql(u8, &buf, sig)) { - ctx.addSpecial(.excluded); - continue; + lock.lock(); + parent.specials.append(main.allocator, .{ .name = stat.name, .t = .excluded }) catch unreachable; + return; } } else |_| {} } else |_| {} } - ctx.addStat(dir_dev); - if (ctx.stat.dir) scanDir(ctx, edir.?, ctx.stat.dev); + var lvl = main.allocator.create(Level) catch unreachable; + lvl.* = .{ .dir = dir, .dir_dev = stat.stat.dev, .parent = parent }; + readNamesDir(lvl); + + lock.lock(); + // Treat empty directories as files + if (lvl.names.items.len == 0 and lvl.specials.items.len == 0) { + if (lvl.dirListError) { // XXX: this loses information about this entry being a directory :( + parent.specials.append(main.allocator, .{ .name = stat.name, .t = .err }) catch unreachable; + } else + parent.files.append(main.allocator, stat) catch unreachable; + dir.close(); + main.allocator.destroy(lvl); + } else { + parent.next.append(main.allocator, .{ .name = stat.name, .stat = stat.stat, .level = lvl }) catch unreachable; + } } -} + + fn statNames(lvl: *Level) void { + var names: [BATCH][:0]u8 = undefined; + var stats: [BATCH]Stat = undefined; + var errs: [BATCH]bool = undefined; + const len = std.math.min(names.len, lvl.names.items.len); + std.mem.copy([]u8, &names, lvl.names.items[lvl.names.items.len-len..]); + lvl.names.items.len -= len; + lock.unlock(); + + var i: usize = 0; + while (i < len) : (i += 1) { + if (Stat.read(lvl.dir, names[i], false)) |s| { + errs[i] = false; + if (main.config.follow_symlinks and s.symlink) { + if (Stat.read(lvl.dir, names[i], true)) |nstat| { + if (!nstat.dir) { + stats[i] = nstat; + // Symlink targets may reside on different filesystems, + // this will break hardlink detection and counting so let's disable it. + if (nstat.hlinkc and nstat.dev != lvl.dir_dev) + stats[i].hlinkc = false; + } + } else |_| stats[i] = s; + } else stats[i] = s; + + } else |_| + errs[i] = true; + } + + lock.lock(); + i = 0; + while (i < len) : (i += 1) { + if (errs[i]) + lvl.specials.append(main.allocator, .{ .name = names[i], .t = .err }) catch unreachable + else if (main.config.same_fs and stats[i].dev != lvl.dir_dev) + lvl.specials.append(main.allocator, .{ .name = names[i], .t = .other_fs }) catch unreachable + else if (stats[i].dir) + lvl.dirs.append(main.allocator, .{ .name = names[i], .stat = stats[i] }) catch unreachable + else + lvl.files.append(main.allocator, .{ .name = names[i], .stat = stats[i] }) catch unreachable; + } + } + + fn runThread() void { + lock.lock(); + outer: while (true) { + var curlvl: ?*Level = tail; + while (curlvl) |lvl| : (curlvl = lvl.parent) { + + // If we have subdirectories to read, do that first to keep the 'names' queues filled up. + if (lvl.dirs.items.len > 0 and lvl.dirs_busy + lvl.next.items.len < SUBDIRS_PER_LEVEL) { + lvl.dirs_busy += 1; + readNames(lvl); + lvl.dirs_busy -= 1; + outputQueue(lvl); + navigate(); + continue :outer; + } + + // Then look for names to stat + if (lvl.names.items.len > 0) { + lvl.names_busy += 1; + statNames(lvl); + lvl.names_busy -= 1; + outputQueue(lvl); + navigate(); + continue :outer; + } + } + + // If we're here, then we found no work to do. + if (tail.parent == null and tail.dirs_busy == 0 and tail.names_busy == 0) { + cond.broadcast(); // only necessary if we don't always wake up threads when there's work to do. + break; + } + cond.wait(&lock); + } + lock.unlock(); + } + // TODO: progress UI + + // Scan the given dir. The active_context is assumed to have been + // initialized already and the entry for the given *dir has already been + // output. + // The given dir is closed when done. + fn scan(dir: std.fs.Dir, dir_dev: u64) void { + tail = main.allocator.create(Level) catch unreachable; + tail.* = .{ .dir = dir, .dir_dev = dir_dev, .parent = null }; + readNamesDir(tail); + var threads = main.allocator.alloc(std.Thread, main.config.parallel-1) catch unreachable; + for (threads) |*t| t.* = std.Thread.spawn(.{ .stack_size = 128*1024 }, runThread, .{}) catch unreachable; + runThread(); + for (threads) |*t| t.join(); + tail.destroy(); + tail = undefined; + } +}; + pub fn scanRoot(path: []const u8, out: ?std.fs.File) !void { active_context = if (out) |f| Context.initFile(f) else Context.initMem(null); @@ -569,8 +787,7 @@ pub fn scan() void { main.handleEvent(true, true); return; }; - defer dir.close(); - scanDir(active_context, dir, active_context.stat.dev); + scanner.scan(dir, active_context.stat.dev); active_context.popPath(); active_context.final(); } @@ -42,6 +42,7 @@ pub fn quit() noreturn { // Also, init() and other ncurses-related functions may have hidden allocation, // no clue if ncurses will consistently report OOM, but we're not handling that // right now. +// TODO: Make thread-safe. pub fn oom() void { const haveui = inited; deinit(); |