diff options
author | Yorhel <git@yorhel.nl> | 2021-02-09 13:44:56 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2021-02-09 13:44:58 +0100 |
commit | 5830b90c1337bf0049a114038966a390a64b503d (patch) | |
tree | 2e3edbe0993aa65e4613dece69c3b7f2081c0069 | |
parent | a057860d5ad3408fb58c0c5e3fc5be94aaf973ff (diff) |
More experiments with cli parsing and connection limiting
I have no clue how this suspend/resume and async/await stuff works, I'm
still trying to get a feel for it.
-rw-r--r-- | Config.zig | 153 | ||||
-rw-r--r-- | Request.zig | 28 | ||||
-rw-r--r-- | main.zig | 269 |
3 files changed, 270 insertions, 180 deletions
diff --git a/Config.zig b/Config.zig new file mode 100644 index 0000000..b8c990f --- /dev/null +++ b/Config.zig @@ -0,0 +1,153 @@ +const std = @import("std"); +const Address = std.net.Address; +const alloc = @import("main.zig").alloc; +const Self = @This(); + +version: bool = false, +help: bool = false, +listeners: std.ArrayList(Listener) = std.ArrayList(Listener).init(alloc), +backend: Backend = .{}, + +_current_opt: []const u8 = "", +_args: std.process.ArgIteratorPosix, + +pub const Listener = struct { + addr: ?Address, + max_connections: u32 = 0, +}; + +pub const Backend = struct { + spawn: ?[]const u8 = null, + max_processes: u32 = 0, +}; + +// Should be marked as noreturn, but https://github.com/ziglang/zig/issues/5728 +pub fn err(comptime fmt: []const u8, arg: anytype) void { + _ = std.io.getStdErr().writer().print(fmt, arg) catch {}; + std.process.exit(1); +} + +pub fn read() !Self { + const eq = std.mem.eql; + var args = std.process.args(); + var self = Self{ + ._args = std.process.ArgIteratorPosix.init(), + }; + _ = self._args.next(); // Program name + while (self._args.next()) |arg| { + self._current_opt = arg; + if (eq(u8, arg, "-h")) self.help = true + else if (eq(u8, arg, "-V")) self.version = true + + // Font-end options + else if (eq(u8, arg, "listen")) try self.listeners.append(self.readListen()) + else if (eq(u8, arg, "max-connections")) self.listener().max_connections = self.readInt(u32) + + // Backend options + else if (eq(u8, arg, "spawn")) self.readSpawn() + + else err("Unrecognized option: '{s}'.\n", .{arg}); + } + return self; +} + +// Return the currently-being-configured listener, or exit with an error if we're not configuring a listener. +fn listener(self: *Self) *Listener { + if (self.listeners.items.len == 0 or self.backend.spawn != null) { + err("Option '{s}' can only be used when configuring a listener.\n", .{self._current_opt}); + unreachable; + } + return &self.listeners.items[self.listeners.items.len - 1]; +} + +fn optArg(self: *Self) [:0]const u8 { + if (self._args.next()) |arg| { + return arg; + } + err("Option '{s}' requires an argument.\n", .{self._current_opt}); + unreachable; +} + +fn parseTypedAddr(arg: []const u8, ltype: *[]const u8) !?Address { + var str: []const u8 = arg; + if (std.mem.indexOfScalar(u8, str, ':')) |i| { + ltype.* = str[0..i]; + str = str[i + 1 ..]; + } else { + ltype.* = str; + return null; + } + if (str.len == 0) { + return null; + } + + if (str[0] == '/' or str[0] == '.') { + return try Address.initUnix(str); + } + + var port: u16 = 5000; + if (std.mem.lastIndexOfScalar(u8, str, ':')) |i| { + if (std.fmt.parseUnsigned(u16, str[i + 1 ..], 10)) |p| { + port = p; + str = str[0..i]; + } else |_| {} + } + return try Address.resolveIp(if (str.len == 0) "0.0.0.0" else str, port); +} + +fn readListen(self: *Self) Listener { + const arg = self.optArg(); + var ltype: []const u8 = undefined; + if (parseTypedAddr(arg, <ype)) |a| { + if (!std.mem.eql(u8, ltype, "fastcgi")) { + err("Unknown listener type '{s}', only fastcgi is supported at the moment.\n", .{ltype}); + } + return Listener{ .addr = a }; + } else |e| { + err("Error parsing address '{s}': {}\n", .{ arg, e }); + unreachable; + } +} + +fn readInt(self: *Self, comptime T: type) T { + return std.fmt.parseInt(T, self.optArg(), 10) catch |_| { + err("Option '{s}' expects an integer argument.\n", .{self._current_opt}); + unreachable; + }; +} + +fn readSpawn(self: *Self) void { + if (self.backend.spawn != null) err("Only a single 'spawn' option is supported.\n", .{}); + const arg = self.optArg(); + if (!std.mem.startsWith(u8, arg, "fastcgi:")) err("The 'spawn' option only supports 'fastcgi:' paths at the moment.\n", .{}); + const path = arg[8..]; + if (path.len == 0) err("No spawn path given.\n", .{}); + self.backend.spawn = path; +} + +test "parse typed address" { + const T = struct { in: []const u8, lt: []const u8, a: ?[]const u8 }; + for ([_]T{ + .{ .in = "fastcgi", .lt = "fastcgi", .a = null }, + .{ .in = "fastcgi:", .lt = "fastcgi", .a = null }, + .{ .in = "fastcgi:/", .lt = "fastcgi", .a = "/" }, + .{ .in = "fastcgi:./some/path", .lt = "fastcgi", .a = "./some/path" }, + .{ .in = "fastcgi::300", .lt = "fastcgi", .a = "0.0.0.0:300" }, + .{ .in = "fastcgi:127.0.0.2", .lt = "fastcgi", .a = "127.0.0.2:5000" }, + .{ .in = "fastcgi::::10", .lt = "fastcgi", .a = "[::]:10" }, + }) |t| { + var ltype: []const u8 = undefined; + const addr = parseTypedAddr(t.in, <ype) catch unreachable; + std.testing.expectEqualStrings(t.lt, ltype); + if (t.a) |a| { + var buf: [128]u8 = undefined; + var naddr = std.fmt.bufPrint(&buf, "{}", .{addr.?}) catch unreachable; + if (std.mem.indexOfScalar(u8, naddr, 0)) |n| { + naddr = naddr[0..n]; // Formatting UNIX addresses may include zero bytes, remove those. + } + std.testing.expectEqualStrings(a, naddr); + } else { + std.testing.expect(addr == null); + } + } +} diff --git a/Request.zig b/Request.zig deleted file mode 100644 index 02dbbd5..0000000 --- a/Request.zig +++ /dev/null @@ -1,28 +0,0 @@ -const std = @import("std"); -const main = @import("main.zig"); -const alloc = main.alloc; -const Self = @This(); - -front: ?*main.Front, - -/// FastCGI requestId at the front-end side. -front_id: u16, - -/// Whether we should keep the front-end connection alive after this request has been processed. -front_keep_conn: bool, - -// TODO: backend pointer -back: ?*void = null, - -/// List of parameters in FastCGI format. -params: std.ArrayList(u8) = std.ArrayList(u8).init(alloc), - -/// Free resources associated with this request. -/// Does nothing when 'front' or 'back' are still set, as that means this request is still referenced from either side. -pub fn deinit(self: *Self) void { - if (self.front != null or self.back != null) { - return; - } - self.params.deinit(); - alloc.destroy(self); -} @@ -1,6 +1,6 @@ const std = @import("std"); const fastcgi = @import("fastcgi.zig"); -const Request = @import("Request.zig"); +const Config = @import("Config.zig"); const Address = std.net.Address; pub const io_mode = .evented; @@ -10,93 +10,11 @@ pub const alloc = &general_purpose_allocator.allocator; const mainloop = std.event.Loop.instance.?; -const Config = struct { - version: bool = false, - help: bool = false, - listeners: std.ArrayList(Listener) = std.ArrayList(Listener).init(alloc), - - const Listener = struct { - addr: ?Address, - }; - - // Should be marked as noreturn, but https://github.com/ziglang/zig/issues/5728 - fn err(comptime fmt: []const u8, arg: anytype) void { - _ = std.io.getStdErr().writer().print(fmt, arg) catch {}; - std.process.exit(1); - } - - fn read() !Config { - const eq = std.mem.eql; - var args = std.process.args(); - var conf = Config{}; - _ = args.nextPosix(); // Program name - while (args.nextPosix()) |arg| { - if (eq(u8, arg, "-h")) { - conf.help = true; - } else if (eq(u8, arg, "-V")) { - conf.version = true; - } else if (eq(u8, arg, "listen")) { - try conf.listeners.append(try readListen(&args)); - } else { - err("Unrecognized option: '{s}'.\n", .{arg}); - } - } - return conf; - } - - fn optArg(args: *std.process.ArgIterator, name: []const u8) [:0]const u8 { - if (args.nextPosix()) |arg| { - return arg; - } - err("Option '{s}' requires an argument.\n", .{name}); - unreachable; - } - - fn parseTypedAddr(arg: []const u8, ltype: *[]const u8) !?Address { - var str: []const u8 = arg; - if (std.mem.indexOfScalar(u8, str, ':')) |i| { - ltype.* = str[0..i]; - str = str[i + 1 ..]; - } else { - ltype.* = str; - return null; - } - if (str.len == 0) { - return null; - } - - if (str[0] == '/' or str[0] == '.') { - return try Address.initUnix(str); - } - - var port: u16 = 5000; - if (std.mem.lastIndexOfScalar(u8, str, ':')) |i| { - if (std.fmt.parseUnsigned(u16, str[i + 1 ..], 10)) |p| { - port = p; - str = str[0..i]; - } else |_| {} - } - return try Address.resolveIp(if (str.len == 0) "0.0.0.0" else str, port); - } - - fn readListen(args: *std.process.ArgIterator) !Listener { - const arg = optArg(args, "listen"); - var ltype: []const u8 = undefined; - if (parseTypedAddr(arg, <ype)) |a| { - if (!std.mem.eql(u8, ltype, "fastcgi")) { - err("Unknown listerer type '{s}'.\n", .{ltype}); - } - return Listener{ .addr = a }; - } else |e| { - err("Error parsing address '{s}': {}\n", .{ arg, e }); - unreachable; - } - } -}; - pub fn main() !void { const conf = try Config.read(); + std.debug.print("Listener = {}, Front = {}\n", .{@sizeOf(Listener), @sizeOf(Front)}); + if (conf.version) { std.io.getStdOut().writeAll("fcgy 0.1\n") catch unreachable; std.process.exit(0); @@ -113,116 +31,163 @@ pub fn main() !void { } for (conf.listeners.items) |l| { - var server = std.net.StreamServer.init(.{ .reuse_address = true }); + const listener = alloc.create(Listener) catch unreachable; + listener.* = .{ + .conf = &l, + .server = std.net.StreamServer.init(.{ .reuse_address = true }), + }; if (l.addr) |a| { - try server.listen(l.addr.?); + try listener.server.listen(l.addr.?); std.debug.print("Listening on {}\n", .{l.addr}); } else { Config.err("Cannot mix standard I/O and server-type listeners.\n", .{}); } - try mainloop.runDetached(alloc, listener, .{ &l, server }); + listener.frameNextTick = .{ .data = &listener.frame }; + listener.frame = async listener.listen(); } mainloop.run(); } -fn listener(conf: *const Config.Listener, server_: std.net.StreamServer) void { - var server = server_; - defer server.deinit(); - while (true) { - const con = server.accept() catch unreachable; - const front = alloc.create(FrontNode) catch unreachable; - front.* = .{ - .data = Front{ - .addr = con.address, - .stream = con.stream, - }, - }; - front_list.append(front); - mainloop.runDetached(alloc, Front.run, .{&front.data}) catch unreachable; +const Listener = struct { + conf: *const Config.Listener, + server: std.net.StreamServer, + connections: std.TailQueue(Front) = .{}, + num_connections: u32 = 0, + frame: @Frame(listen) = undefined, + frameNextTick: std.event.Loop.NextTickNode = undefined, + + const Self = @This(); + const FrontNode = std.TailQueue(Front).Node; + + fn listen(self: *Self) void { + while (true) { + while (self.conf.max_connections > 0 and self.num_connections >= self.conf.max_connections) { + std.debug.print("max-connections ({}) reached, suspending listener\n", .{self.conf.max_connections}); + suspend; + std.debug.print("resuming listener\n", .{}); + } + const con = self.server.accept() catch unreachable; + const front = alloc.create(FrontNode) catch unreachable; + // XXX: Hopefully the copy of the large Front struct is optimized away, may want to verify. + front.* = .{ + .data = Front.init(self, con.address, con.stream) + }; + self.connections.append(front); + self.num_connections += 1; + mainloop.runDetached(alloc, Front.run, .{&front.data}) catch unreachable; + } } -} -// XXX: Do we even need to keep track of open Front connections? -const FrontNode = std.TailQueue(Front).Node; -var front_list: std.TailQueue(Front) = .{}; + // Remove a connection from the list and free the struct. + fn remove_connection(self: *Self, front: *Front) void { + const node = @fieldParentPtr(FrontNode, "data", front); + self.connections.remove(node); + self.num_connections -= 1; + alloc.destroy(node); + + // Resume the listener if we just made room for another connection. + if (self.conf.max_connections > 0 and self.num_connections + 1 == self.conf.max_connections) + mainloop.onNextTick(&self.frameNextTick); + } +}; pub const Front = struct { addr: Address, stream: std.net.Stream, + listener: *Listener, requests: std.AutoHashMap(u16, *Request) = std.AutoHashMap(u16, *Request).init(alloc), - - fn run(self: *Front) void { - std.debug.print("Connection from {}\n", .{self.addr}); - self.loop() catch |e| { - std.debug.print("Connection from {} closed: {}\n", .{ self.addr, e }); + has_shutdown: bool = false, + rd: std.io.BufferedReader(4096, std.net.Stream.Reader), + wr: std.io.BufferedWriter(4096, std.net.Stream.Writer), + + const Self = @This(); + + fn init(listener: *Listener, addr: Address, stream: std.net.Stream) Self { + return Self{ + .addr = addr, + .stream = stream, + .listener = listener, + .rd = std.io.bufferedReader(stream.reader()), + .wr = std.io.bufferedWriter(stream.writer()), }; + } - const node = @fieldParentPtr(FrontNode, "data", self); - front_list.remove(node); - self.stream.close(); - alloc.destroy(node); + fn run(self: *Self) void { + std.debug.print("{*} Connection from {}\n", .{ self, self.addr }); + self.loop() catch |e| std.debug.print("{*} Connection closed: {}\n", .{ self, e }); + if (self.has_shutdown) std.debug.print("{*} Connection has been shutdown\n", .{self}); + + self.shutdown(); + self.listener.remove_connection(self); } - fn loop(self: *Front) !void { - var rd = std.io.bufferedReader(self.stream.reader()).reader(); - while (true) { - const r = try fastcgi.Record.read(rd); + /// Close the socket and stop the loop() + fn shutdown(self: *Self) void { + if (!self.has_shutdown) self.stream.close(); + self.has_shutdown = true; + } + + fn loop(self: *Self) !void { + while (!self.has_shutdown) { + const r = try fastcgi.Record.read(self.rd.reader()); std.debug.print("Received FastCGI record: {}\n", .{r}); switch (r.type) { - .begin_request => try self.handleBegin(r.request_id, try r.readBegin(rd)), - .params => try self.handleParams(&r, rd), - else => try r.skip(rd), + .begin_request => try self.handleBegin(&r), + .params => try self.handleParams(&r), + else => try r.skip(self.rd.reader()), } } } - fn handleBegin(self: *Front, id: u16, b: fastcgi.Record.Begin) !void { + fn handleBegin(self: *Self, r: *const fastcgi.Record) !void { + const b = try r.readBegin(self.rd.reader()); // TODO: Handle other role types (which servers support that, though?) if (b.role != .responder) return error.unsupportedFastCgiRole; std.debug.print("Begin request: {}\n", .{b}); var req = try alloc.create(Request); req.* = .{ .front = self, - .front_id = id, + .front_id = r.request_id, .front_keep_conn = b.keep_conn, }; errdefer req.deinit(); - try self.requests.putNoClobber(id, req); + try self.requests.putNoClobber(r.request_id, req); } - fn handleParams(self: *Front, r: *const fastcgi.Record, rd: anytype) !void { + fn handleParams(self: *Self, r: *const fastcgi.Record) !void { const req = self.requests.get(r.request_id) orelse return; // We don't assign a back-end until we have all params, so this shouldn't happen. if (req.back != null) return error.invalidState; - if (r.content_length > 0) return r.readArrayList(rd, &req.params); + if (r.content_length > 0) return r.readArrayList(self.rd.reader(), &req.params); // TODO: We received all params now, assign a backend and process the request - try r.skip(rd); + try r.skip(self.rd.reader()); } }; -test "parse typed address" { - const T = struct { in: []const u8, lt: []const u8, a: ?[]const u8 }; - for ([_]T{ - .{ .in = "fastcgi", .lt = "fastcgi", .a = null }, - .{ .in = "fastcgi:", .lt = "fastcgi", .a = null }, - .{ .in = "fastcgi:/", .lt = "fastcgi", .a = "/" }, - .{ .in = "fastcgi:./some/path", .lt = "fastcgi", .a = "./some/path" }, - .{ .in = "fastcgi::300", .lt = "fastcgi", .a = "0.0.0.0:300" }, - .{ .in = "fastcgi:127.0.0.2", .lt = "fastcgi", .a = "127.0.0.2:5000" }, - .{ .in = "fastcgi::::10", .lt = "fastcgi", .a = "[::]:10" }, - }) |t| { - var ltype: []const u8 = undefined; - const addr = Config.parseTypedAddr(t.in, <ype) catch unreachable; - std.testing.expectEqualStrings(t.lt, ltype); - if (t.a) |a| { - var buf: [128]u8 = undefined; - var naddr = std.fmt.bufPrint(&buf, "{}", .{addr.?}) catch unreachable; - if (std.mem.indexOfScalar(u8, naddr, 0)) |n| { - naddr = naddr[0..n]; // Formatting UNIX addresses may include zero bytes, remove those. - } - std.testing.expectEqualStrings(a, naddr); - } else { - std.testing.expect(addr == null); +pub const Request = struct { + front: ?*Front, + + /// FastCGI requestId at the front-end side. + front_id: u16, + + /// Whether we should keep the front-end connection alive after this request has been processed. + front_keep_conn: bool, + + // TODO: backend pointer + back: ?*void = null, + + /// List of parameters in FastCGI format. + params: std.ArrayList(u8) = std.ArrayList(u8).init(alloc), + + const Self = @This(); + + /// Free resources associated with this request. + /// Does nothing when 'front' or 'back' are still set, as that means this request is still referenced from either side. + pub fn deinit(self: *Self) void { + if (self.front != null or self.back != null) { + return; } + self.params.deinit(); + alloc.destroy(self); } -} +}; |