summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2021-02-09 13:44:56 +0100
committerYorhel <git@yorhel.nl>2021-02-09 13:44:58 +0100
commit5830b90c1337bf0049a114038966a390a64b503d (patch)
tree2e3edbe0993aa65e4613dece69c3b7f2081c0069
parenta057860d5ad3408fb58c0c5e3fc5be94aaf973ff (diff)
More experiments with cli parsing and connection limiting
I have no clue how this suspend/resume and async/await stuff works, I'm still trying to get a feel for it.
-rw-r--r--Config.zig153
-rw-r--r--Request.zig28
-rw-r--r--main.zig269
3 files changed, 270 insertions, 180 deletions
diff --git a/Config.zig b/Config.zig
new file mode 100644
index 0000000..b8c990f
--- /dev/null
+++ b/Config.zig
@@ -0,0 +1,153 @@
+const std = @import("std");
+const Address = std.net.Address;
+const alloc = @import("main.zig").alloc;
+const Self = @This();
+
+version: bool = false,
+help: bool = false,
+listeners: std.ArrayList(Listener) = std.ArrayList(Listener).init(alloc),
+backend: Backend = .{},
+
+_current_opt: []const u8 = "",
+_args: std.process.ArgIteratorPosix,
+
+pub const Listener = struct {
+ addr: ?Address,
+ max_connections: u32 = 0,
+};
+
+pub const Backend = struct {
+ spawn: ?[]const u8 = null,
+ max_processes: u32 = 0,
+};
+
+// Should be marked as noreturn, but https://github.com/ziglang/zig/issues/5728
+pub fn err(comptime fmt: []const u8, arg: anytype) void {
+ _ = std.io.getStdErr().writer().print(fmt, arg) catch {};
+ std.process.exit(1);
+}
+
+pub fn read() !Self {
+ const eq = std.mem.eql;
+ var args = std.process.args();
+ var self = Self{
+ ._args = std.process.ArgIteratorPosix.init(),
+ };
+ _ = self._args.next(); // Program name
+ while (self._args.next()) |arg| {
+ self._current_opt = arg;
+ if (eq(u8, arg, "-h")) self.help = true
+ else if (eq(u8, arg, "-V")) self.version = true
+
+ // Font-end options
+ else if (eq(u8, arg, "listen")) try self.listeners.append(self.readListen())
+ else if (eq(u8, arg, "max-connections")) self.listener().max_connections = self.readInt(u32)
+
+ // Backend options
+ else if (eq(u8, arg, "spawn")) self.readSpawn()
+
+ else err("Unrecognized option: '{s}'.\n", .{arg});
+ }
+ return self;
+}
+
+// Return the currently-being-configured listener, or exit with an error if we're not configuring a listener.
+fn listener(self: *Self) *Listener {
+ if (self.listeners.items.len == 0 or self.backend.spawn != null) {
+ err("Option '{s}' can only be used when configuring a listener.\n", .{self._current_opt});
+ unreachable;
+ }
+ return &self.listeners.items[self.listeners.items.len - 1];
+}
+
+fn optArg(self: *Self) [:0]const u8 {
+ if (self._args.next()) |arg| {
+ return arg;
+ }
+ err("Option '{s}' requires an argument.\n", .{self._current_opt});
+ unreachable;
+}
+
+fn parseTypedAddr(arg: []const u8, ltype: *[]const u8) !?Address {
+ var str: []const u8 = arg;
+ if (std.mem.indexOfScalar(u8, str, ':')) |i| {
+ ltype.* = str[0..i];
+ str = str[i + 1 ..];
+ } else {
+ ltype.* = str;
+ return null;
+ }
+ if (str.len == 0) {
+ return null;
+ }
+
+ if (str[0] == '/' or str[0] == '.') {
+ return try Address.initUnix(str);
+ }
+
+ var port: u16 = 5000;
+ if (std.mem.lastIndexOfScalar(u8, str, ':')) |i| {
+ if (std.fmt.parseUnsigned(u16, str[i + 1 ..], 10)) |p| {
+ port = p;
+ str = str[0..i];
+ } else |_| {}
+ }
+ return try Address.resolveIp(if (str.len == 0) "0.0.0.0" else str, port);
+}
+
+fn readListen(self: *Self) Listener {
+ const arg = self.optArg();
+ var ltype: []const u8 = undefined;
+ if (parseTypedAddr(arg, &ltype)) |a| {
+ if (!std.mem.eql(u8, ltype, "fastcgi")) {
+ err("Unknown listener type '{s}', only fastcgi is supported at the moment.\n", .{ltype});
+ }
+ return Listener{ .addr = a };
+ } else |e| {
+ err("Error parsing address '{s}': {}\n", .{ arg, e });
+ unreachable;
+ }
+}
+
+fn readInt(self: *Self, comptime T: type) T {
+ return std.fmt.parseInt(T, self.optArg(), 10) catch |_| {
+ err("Option '{s}' expects an integer argument.\n", .{self._current_opt});
+ unreachable;
+ };
+}
+
+fn readSpawn(self: *Self) void {
+ if (self.backend.spawn != null) err("Only a single 'spawn' option is supported.\n", .{});
+ const arg = self.optArg();
+ if (!std.mem.startsWith(u8, arg, "fastcgi:")) err("The 'spawn' option only supports 'fastcgi:' paths at the moment.\n", .{});
+ const path = arg[8..];
+ if (path.len == 0) err("No spawn path given.\n", .{});
+ self.backend.spawn = path;
+}
+
+test "parse typed address" {
+ const T = struct { in: []const u8, lt: []const u8, a: ?[]const u8 };
+ for ([_]T{
+ .{ .in = "fastcgi", .lt = "fastcgi", .a = null },
+ .{ .in = "fastcgi:", .lt = "fastcgi", .a = null },
+ .{ .in = "fastcgi:/", .lt = "fastcgi", .a = "/" },
+ .{ .in = "fastcgi:./some/path", .lt = "fastcgi", .a = "./some/path" },
+ .{ .in = "fastcgi::300", .lt = "fastcgi", .a = "0.0.0.0:300" },
+ .{ .in = "fastcgi:127.0.0.2", .lt = "fastcgi", .a = "127.0.0.2:5000" },
+ .{ .in = "fastcgi::::10", .lt = "fastcgi", .a = "[::]:10" },
+ }) |t| {
+ var ltype: []const u8 = undefined;
+ const addr = parseTypedAddr(t.in, &ltype) catch unreachable;
+ std.testing.expectEqualStrings(t.lt, ltype);
+ if (t.a) |a| {
+ var buf: [128]u8 = undefined;
+ var naddr = std.fmt.bufPrint(&buf, "{}", .{addr.?}) catch unreachable;
+ if (std.mem.indexOfScalar(u8, naddr, 0)) |n| {
+ naddr = naddr[0..n]; // Formatting UNIX addresses may include zero bytes, remove those.
+ }
+ std.testing.expectEqualStrings(a, naddr);
+ } else {
+ std.testing.expect(addr == null);
+ }
+ }
+}
diff --git a/Request.zig b/Request.zig
deleted file mode 100644
index 02dbbd5..0000000
--- a/Request.zig
+++ /dev/null
@@ -1,28 +0,0 @@
-const std = @import("std");
-const main = @import("main.zig");
-const alloc = main.alloc;
-const Self = @This();
-
-front: ?*main.Front,
-
-/// FastCGI requestId at the front-end side.
-front_id: u16,
-
-/// Whether we should keep the front-end connection alive after this request has been processed.
-front_keep_conn: bool,
-
-// TODO: backend pointer
-back: ?*void = null,
-
-/// List of parameters in FastCGI format.
-params: std.ArrayList(u8) = std.ArrayList(u8).init(alloc),
-
-/// Free resources associated with this request.
-/// Does nothing when 'front' or 'back' are still set, as that means this request is still referenced from either side.
-pub fn deinit(self: *Self) void {
- if (self.front != null or self.back != null) {
- return;
- }
- self.params.deinit();
- alloc.destroy(self);
-}
diff --git a/main.zig b/main.zig
index 21268b6..11cabb6 100644
--- a/main.zig
+++ b/main.zig
@@ -1,6 +1,6 @@
const std = @import("std");
const fastcgi = @import("fastcgi.zig");
-const Request = @import("Request.zig");
+const Config = @import("Config.zig");
const Address = std.net.Address;
pub const io_mode = .evented;
@@ -10,93 +10,11 @@ pub const alloc = &general_purpose_allocator.allocator;
const mainloop = std.event.Loop.instance.?;
-const Config = struct {
- version: bool = false,
- help: bool = false,
- listeners: std.ArrayList(Listener) = std.ArrayList(Listener).init(alloc),
-
- const Listener = struct {
- addr: ?Address,
- };
-
- // Should be marked as noreturn, but https://github.com/ziglang/zig/issues/5728
- fn err(comptime fmt: []const u8, arg: anytype) void {
- _ = std.io.getStdErr().writer().print(fmt, arg) catch {};
- std.process.exit(1);
- }
-
- fn read() !Config {
- const eq = std.mem.eql;
- var args = std.process.args();
- var conf = Config{};
- _ = args.nextPosix(); // Program name
- while (args.nextPosix()) |arg| {
- if (eq(u8, arg, "-h")) {
- conf.help = true;
- } else if (eq(u8, arg, "-V")) {
- conf.version = true;
- } else if (eq(u8, arg, "listen")) {
- try conf.listeners.append(try readListen(&args));
- } else {
- err("Unrecognized option: '{s}'.\n", .{arg});
- }
- }
- return conf;
- }
-
- fn optArg(args: *std.process.ArgIterator, name: []const u8) [:0]const u8 {
- if (args.nextPosix()) |arg| {
- return arg;
- }
- err("Option '{s}' requires an argument.\n", .{name});
- unreachable;
- }
-
- fn parseTypedAddr(arg: []const u8, ltype: *[]const u8) !?Address {
- var str: []const u8 = arg;
- if (std.mem.indexOfScalar(u8, str, ':')) |i| {
- ltype.* = str[0..i];
- str = str[i + 1 ..];
- } else {
- ltype.* = str;
- return null;
- }
- if (str.len == 0) {
- return null;
- }
-
- if (str[0] == '/' or str[0] == '.') {
- return try Address.initUnix(str);
- }
-
- var port: u16 = 5000;
- if (std.mem.lastIndexOfScalar(u8, str, ':')) |i| {
- if (std.fmt.parseUnsigned(u16, str[i + 1 ..], 10)) |p| {
- port = p;
- str = str[0..i];
- } else |_| {}
- }
- return try Address.resolveIp(if (str.len == 0) "0.0.0.0" else str, port);
- }
-
- fn readListen(args: *std.process.ArgIterator) !Listener {
- const arg = optArg(args, "listen");
- var ltype: []const u8 = undefined;
- if (parseTypedAddr(arg, &ltype)) |a| {
- if (!std.mem.eql(u8, ltype, "fastcgi")) {
- err("Unknown listerer type '{s}'.\n", .{ltype});
- }
- return Listener{ .addr = a };
- } else |e| {
- err("Error parsing address '{s}': {}\n", .{ arg, e });
- unreachable;
- }
- }
-};
-
pub fn main() !void {
const conf = try Config.read();
+ std.debug.print("Listener = {}, Front = {}\n", .{@sizeOf(Listener), @sizeOf(Front)});
+
if (conf.version) {
std.io.getStdOut().writeAll("fcgy 0.1\n") catch unreachable;
std.process.exit(0);
@@ -113,116 +31,163 @@ pub fn main() !void {
}
for (conf.listeners.items) |l| {
- var server = std.net.StreamServer.init(.{ .reuse_address = true });
+ const listener = alloc.create(Listener) catch unreachable;
+ listener.* = .{
+ .conf = &l,
+ .server = std.net.StreamServer.init(.{ .reuse_address = true }),
+ };
if (l.addr) |a| {
- try server.listen(l.addr.?);
+ try listener.server.listen(l.addr.?);
std.debug.print("Listening on {}\n", .{l.addr});
} else {
Config.err("Cannot mix standard I/O and server-type listeners.\n", .{});
}
- try mainloop.runDetached(alloc, listener, .{ &l, server });
+ listener.frameNextTick = .{ .data = &listener.frame };
+ listener.frame = async listener.listen();
}
mainloop.run();
}
-fn listener(conf: *const Config.Listener, server_: std.net.StreamServer) void {
- var server = server_;
- defer server.deinit();
- while (true) {
- const con = server.accept() catch unreachable;
- const front = alloc.create(FrontNode) catch unreachable;
- front.* = .{
- .data = Front{
- .addr = con.address,
- .stream = con.stream,
- },
- };
- front_list.append(front);
- mainloop.runDetached(alloc, Front.run, .{&front.data}) catch unreachable;
+const Listener = struct {
+ conf: *const Config.Listener,
+ server: std.net.StreamServer,
+ connections: std.TailQueue(Front) = .{},
+ num_connections: u32 = 0,
+ frame: @Frame(listen) = undefined,
+ frameNextTick: std.event.Loop.NextTickNode = undefined,
+
+ const Self = @This();
+ const FrontNode = std.TailQueue(Front).Node;
+
+ fn listen(self: *Self) void {
+ while (true) {
+ while (self.conf.max_connections > 0 and self.num_connections >= self.conf.max_connections) {
+ std.debug.print("max-connections ({}) reached, suspending listener\n", .{self.conf.max_connections});
+ suspend;
+ std.debug.print("resuming listener\n", .{});
+ }
+ const con = self.server.accept() catch unreachable;
+ const front = alloc.create(FrontNode) catch unreachable;
+ // XXX: Hopefully the copy of the large Front struct is optimized away, may want to verify.
+ front.* = .{
+ .data = Front.init(self, con.address, con.stream)
+ };
+ self.connections.append(front);
+ self.num_connections += 1;
+ mainloop.runDetached(alloc, Front.run, .{&front.data}) catch unreachable;
+ }
}
-}
-// XXX: Do we even need to keep track of open Front connections?
-const FrontNode = std.TailQueue(Front).Node;
-var front_list: std.TailQueue(Front) = .{};
+ // Remove a connection from the list and free the struct.
+ fn remove_connection(self: *Self, front: *Front) void {
+ const node = @fieldParentPtr(FrontNode, "data", front);
+ self.connections.remove(node);
+ self.num_connections -= 1;
+ alloc.destroy(node);
+
+ // Resume the listener if we just made room for another connection.
+ if (self.conf.max_connections > 0 and self.num_connections + 1 == self.conf.max_connections)
+ mainloop.onNextTick(&self.frameNextTick);
+ }
+};
pub const Front = struct {
addr: Address,
stream: std.net.Stream,
+ listener: *Listener,
requests: std.AutoHashMap(u16, *Request) = std.AutoHashMap(u16, *Request).init(alloc),
-
- fn run(self: *Front) void {
- std.debug.print("Connection from {}\n", .{self.addr});
- self.loop() catch |e| {
- std.debug.print("Connection from {} closed: {}\n", .{ self.addr, e });
+ has_shutdown: bool = false,
+ rd: std.io.BufferedReader(4096, std.net.Stream.Reader),
+ wr: std.io.BufferedWriter(4096, std.net.Stream.Writer),
+
+ const Self = @This();
+
+ fn init(listener: *Listener, addr: Address, stream: std.net.Stream) Self {
+ return Self{
+ .addr = addr,
+ .stream = stream,
+ .listener = listener,
+ .rd = std.io.bufferedReader(stream.reader()),
+ .wr = std.io.bufferedWriter(stream.writer()),
};
+ }
- const node = @fieldParentPtr(FrontNode, "data", self);
- front_list.remove(node);
- self.stream.close();
- alloc.destroy(node);
+ fn run(self: *Self) void {
+ std.debug.print("{*} Connection from {}\n", .{ self, self.addr });
+ self.loop() catch |e| std.debug.print("{*} Connection closed: {}\n", .{ self, e });
+ if (self.has_shutdown) std.debug.print("{*} Connection has been shutdown\n", .{self});
+
+ self.shutdown();
+ self.listener.remove_connection(self);
}
- fn loop(self: *Front) !void {
- var rd = std.io.bufferedReader(self.stream.reader()).reader();
- while (true) {
- const r = try fastcgi.Record.read(rd);
+ /// Close the socket and stop the loop()
+ fn shutdown(self: *Self) void {
+ if (!self.has_shutdown) self.stream.close();
+ self.has_shutdown = true;
+ }
+
+ fn loop(self: *Self) !void {
+ while (!self.has_shutdown) {
+ const r = try fastcgi.Record.read(self.rd.reader());
std.debug.print("Received FastCGI record: {}\n", .{r});
switch (r.type) {
- .begin_request => try self.handleBegin(r.request_id, try r.readBegin(rd)),
- .params => try self.handleParams(&r, rd),
- else => try r.skip(rd),
+ .begin_request => try self.handleBegin(&r),
+ .params => try self.handleParams(&r),
+ else => try r.skip(self.rd.reader()),
}
}
}
- fn handleBegin(self: *Front, id: u16, b: fastcgi.Record.Begin) !void {
+ fn handleBegin(self: *Self, r: *const fastcgi.Record) !void {
+ const b = try r.readBegin(self.rd.reader());
// TODO: Handle other role types (which servers support that, though?)
if (b.role != .responder) return error.unsupportedFastCgiRole;
std.debug.print("Begin request: {}\n", .{b});
var req = try alloc.create(Request);
req.* = .{
.front = self,
- .front_id = id,
+ .front_id = r.request_id,
.front_keep_conn = b.keep_conn,
};
errdefer req.deinit();
- try self.requests.putNoClobber(id, req);
+ try self.requests.putNoClobber(r.request_id, req);
}
- fn handleParams(self: *Front, r: *const fastcgi.Record, rd: anytype) !void {
+ fn handleParams(self: *Self, r: *const fastcgi.Record) !void {
const req = self.requests.get(r.request_id) orelse return;
// We don't assign a back-end until we have all params, so this shouldn't happen.
if (req.back != null) return error.invalidState;
- if (r.content_length > 0) return r.readArrayList(rd, &req.params);
+ if (r.content_length > 0) return r.readArrayList(self.rd.reader(), &req.params);
// TODO: We received all params now, assign a backend and process the request
- try r.skip(rd);
+ try r.skip(self.rd.reader());
}
};
-test "parse typed address" {
- const T = struct { in: []const u8, lt: []const u8, a: ?[]const u8 };
- for ([_]T{
- .{ .in = "fastcgi", .lt = "fastcgi", .a = null },
- .{ .in = "fastcgi:", .lt = "fastcgi", .a = null },
- .{ .in = "fastcgi:/", .lt = "fastcgi", .a = "/" },
- .{ .in = "fastcgi:./some/path", .lt = "fastcgi", .a = "./some/path" },
- .{ .in = "fastcgi::300", .lt = "fastcgi", .a = "0.0.0.0:300" },
- .{ .in = "fastcgi:127.0.0.2", .lt = "fastcgi", .a = "127.0.0.2:5000" },
- .{ .in = "fastcgi::::10", .lt = "fastcgi", .a = "[::]:10" },
- }) |t| {
- var ltype: []const u8 = undefined;
- const addr = Config.parseTypedAddr(t.in, &ltype) catch unreachable;
- std.testing.expectEqualStrings(t.lt, ltype);
- if (t.a) |a| {
- var buf: [128]u8 = undefined;
- var naddr = std.fmt.bufPrint(&buf, "{}", .{addr.?}) catch unreachable;
- if (std.mem.indexOfScalar(u8, naddr, 0)) |n| {
- naddr = naddr[0..n]; // Formatting UNIX addresses may include zero bytes, remove those.
- }
- std.testing.expectEqualStrings(a, naddr);
- } else {
- std.testing.expect(addr == null);
+pub const Request = struct {
+ front: ?*Front,
+
+ /// FastCGI requestId at the front-end side.
+ front_id: u16,
+
+ /// Whether we should keep the front-end connection alive after this request has been processed.
+ front_keep_conn: bool,
+
+ // TODO: backend pointer
+ back: ?*void = null,
+
+ /// List of parameters in FastCGI format.
+ params: std.ArrayList(u8) = std.ArrayList(u8).init(alloc),
+
+ const Self = @This();
+
+ /// Free resources associated with this request.
+ /// Does nothing when 'front' or 'back' are still set, as that means this request is still referenced from either side.
+ pub fn deinit(self: *Self) void {
+ if (self.front != null or self.back != null) {
+ return;
}
+ self.params.deinit();
+ alloc.destroy(self);
}
-}
+};