1const std = @import("../std.zig");
2const builtin = @import("builtin");
3const root = @import("root");
4const assert = std.debug.assert;
5const testing = std.testing;
6const mem = std.mem;
7const os = std.os;
8const windows = os.windows;
9const maxInt = std.math.maxInt;
10const Thread = std.Thread;
11
12const is_windows = builtin.os.tag == .windows;
13
14pub const Loop = struct {
15    next_tick_queue: std.atomic.Queue(anyframe),
16    os_data: OsData,
17    final_resume_node: ResumeNode,
18    pending_event_count: usize,
19    extra_threads: []Thread,
20    /// TODO change this to a pool of configurable number of threads
21    /// and rename it to be not file-system-specific. it will become
22    /// a thread pool for turning non-CPU-bound blocking things into
23    /// async things. A fallback for any missing OS-specific API.
24    fs_thread: Thread,
25    fs_queue: std.atomic.Queue(Request),
26    fs_end_request: Request.Node,
27    fs_thread_wakeup: std.Thread.ResetEvent,
28
29    /// For resources that have the same lifetime as the `Loop`.
30    /// This is only used by `Loop` for the thread pool and associated resources.
31    arena: std.heap.ArenaAllocator,
32
33    /// State which manages frames that are sleeping on timers
34    delay_queue: DelayQueue,
35
36    /// Pre-allocated eventfds. All permanently active.
37    /// This is how `Loop` sends promises to be resumed on other threads.
38    available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
39    eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
40
41    pub const NextTickNode = std.atomic.Queue(anyframe).Node;
42
43    pub const ResumeNode = struct {
44        id: Id,
45        handle: anyframe,
46        overlapped: Overlapped,
47
48        pub const overlapped_init = switch (builtin.os.tag) {
49            .windows => windows.OVERLAPPED{
50                .Internal = 0,
51                .InternalHigh = 0,
52                .DUMMYUNIONNAME = .{
53                    .DUMMYSTRUCTNAME = .{
54                        .Offset = 0,
55                        .OffsetHigh = 0,
56                    },
57                },
58                .hEvent = null,
59            },
60            else => {},
61        };
62        pub const Overlapped = @TypeOf(overlapped_init);
63
64        pub const Id = enum {
65            Basic,
66            Stop,
67            EventFd,
68        };
69
70        pub const EventFd = switch (builtin.os.tag) {
71            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd,
72            .linux => struct {
73                base: ResumeNode,
74                epoll_op: u32,
75                eventfd: i32,
76            },
77            .windows => struct {
78                base: ResumeNode,
79                completion_key: usize,
80            },
81            else => struct {},
82        };
83
84        const KEventFd = struct {
85            base: ResumeNode,
86            kevent: os.Kevent,
87        };
88
89        pub const Basic = switch (builtin.os.tag) {
90            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic,
91            .linux => struct {
92                base: ResumeNode,
93            },
94            .windows => struct {
95                base: ResumeNode,
96            },
97            else => @compileError("unsupported OS"),
98        };
99
100        const KEventBasic = struct {
101            base: ResumeNode,
102            kev: os.Kevent,
103        };
104    };
105
106    var global_instance_state: Loop = undefined;
107    const default_instance: ?*Loop = switch (std.io.mode) {
108        .blocking => null,
109        .evented => &global_instance_state,
110    };
111    pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance;
112
113    /// TODO copy elision / named return values so that the threads referencing *Loop
114    /// have the correct pointer value.
115    /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
116    pub fn init(self: *Loop) !void {
117        if (builtin.single_threaded or
118            (@hasDecl(root, "event_loop_mode") and root.event_loop_mode == .single_threaded))
119        {
120            return self.initSingleThreaded();
121        } else {
122            return self.initMultiThreaded();
123        }
124    }
125
126    /// After initialization, call run().
127    /// TODO copy elision / named return values so that the threads referencing *Loop
128    /// have the correct pointer value.
129    /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
130    pub fn initSingleThreaded(self: *Loop) !void {
131        return self.initThreadPool(1);
132    }
133
134    /// After initialization, call run().
135    /// This is the same as `initThreadPool` using `Thread.getCpuCount` to determine the thread
136    /// pool size.
137    /// TODO copy elision / named return values so that the threads referencing *Loop
138    /// have the correct pointer value.
139    /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
140    pub fn initMultiThreaded(self: *Loop) !void {
141        if (builtin.single_threaded)
142            @compileError("initMultiThreaded unavailable when building in single-threaded mode");
143        const core_count = try Thread.getCpuCount();
144        return self.initThreadPool(core_count);
145    }
146
147    /// Thread count is the total thread count. The thread pool size will be
148    /// max(thread_count - 1, 0)
149    pub fn initThreadPool(self: *Loop, thread_count: usize) !void {
150        self.* = Loop{
151            .arena = std.heap.ArenaAllocator.init(std.heap.page_allocator),
152            .pending_event_count = 1,
153            .os_data = undefined,
154            .next_tick_queue = std.atomic.Queue(anyframe).init(),
155            .extra_threads = undefined,
156            .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
157            .eventfd_resume_nodes = undefined,
158            .final_resume_node = ResumeNode{
159                .id = ResumeNode.Id.Stop,
160                .handle = undefined,
161                .overlapped = ResumeNode.overlapped_init,
162            },
163            .fs_end_request = .{ .data = .{ .msg = .end, .finish = .NoAction } },
164            .fs_queue = std.atomic.Queue(Request).init(),
165            .fs_thread = undefined,
166            .fs_thread_wakeup = undefined,
167            .delay_queue = undefined,
168        };
169        try self.fs_thread_wakeup.init();
170        errdefer self.fs_thread_wakeup.deinit();
171        errdefer self.arena.deinit();
172
173        // We need at least one of these in case the fs thread wants to use onNextTick
174        const extra_thread_count = thread_count - 1;
175        const resume_node_count = std.math.max(extra_thread_count, 1);
176        self.eventfd_resume_nodes = try self.arena.allocator().alloc(
177            std.atomic.Stack(ResumeNode.EventFd).Node,
178            resume_node_count,
179        );
180
181        self.extra_threads = try self.arena.allocator().alloc(Thread, extra_thread_count);
182
183        try self.initOsData(extra_thread_count);
184        errdefer self.deinitOsData();
185
186        if (!builtin.single_threaded) {
187            self.fs_thread = try Thread.spawn(.{}, posixFsRun, .{self});
188        }
189        errdefer if (!builtin.single_threaded) {
190            self.posixFsRequest(&self.fs_end_request);
191            self.fs_thread.join();
192        };
193
194        if (!builtin.single_threaded)
195            try self.delay_queue.init();
196    }
197
198    pub fn deinit(self: *Loop) void {
199        self.deinitOsData();
200        self.fs_thread_wakeup.deinit();
201        self.arena.deinit();
202        self.* = undefined;
203    }
204
205    const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError ||
206        Thread.SpawnError || os.EpollCtlError || os.KEventError ||
207        windows.CreateIoCompletionPortError;
208
209    const wakeup_bytes = [_]u8{0x1} ** 8;
210
211    fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
212        nosuspend switch (builtin.os.tag) {
213            .linux => {
214                errdefer {
215                    while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
216                }
217                for (self.eventfd_resume_nodes) |*eventfd_node| {
218                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
219                        .data = ResumeNode.EventFd{
220                            .base = ResumeNode{
221                                .id = .EventFd,
222                                .handle = undefined,
223                                .overlapped = ResumeNode.overlapped_init,
224                            },
225                            .eventfd = try os.eventfd(1, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK),
226                            .epoll_op = os.linux.EPOLL.CTL_ADD,
227                        },
228                        .next = undefined,
229                    };
230                    self.available_eventfd_resume_nodes.push(eventfd_node);
231                }
232
233                self.os_data.epollfd = try os.epoll_create1(os.linux.EPOLL.CLOEXEC);
234                errdefer os.close(self.os_data.epollfd);
235
236                self.os_data.final_eventfd = try os.eventfd(0, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK);
237                errdefer os.close(self.os_data.final_eventfd);
238
239                self.os_data.final_eventfd_event = os.linux.epoll_event{
240                    .events = os.linux.EPOLL.IN,
241                    .data = os.linux.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
242                };
243                try os.epoll_ctl(
244                    self.os_data.epollfd,
245                    os.linux.EPOLL.CTL_ADD,
246                    self.os_data.final_eventfd,
247                    &self.os_data.final_eventfd_event,
248                );
249
250                if (builtin.single_threaded) {
251                    assert(extra_thread_count == 0);
252                    return;
253                }
254
255                var extra_thread_index: usize = 0;
256                errdefer {
257                    // writing 8 bytes to an eventfd cannot fail
258                    const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
259                    assert(amt == wakeup_bytes.len);
260                    while (extra_thread_index != 0) {
261                        extra_thread_index -= 1;
262                        self.extra_threads[extra_thread_index].join();
263                    }
264                }
265                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
266                    self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
267                }
268            },
269            .macos, .freebsd, .netbsd, .dragonfly => {
270                self.os_data.kqfd = try os.kqueue();
271                errdefer os.close(self.os_data.kqfd);
272
273                const empty_kevs = &[0]os.Kevent{};
274
275                for (self.eventfd_resume_nodes) |*eventfd_node, i| {
276                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
277                        .data = ResumeNode.EventFd{
278                            .base = ResumeNode{
279                                .id = ResumeNode.Id.EventFd,
280                                .handle = undefined,
281                                .overlapped = ResumeNode.overlapped_init,
282                            },
283                            // this one is for sending events
284                            .kevent = os.Kevent{
285                                .ident = i,
286                                .filter = os.system.EVFILT_USER,
287                                .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE,
288                                .fflags = 0,
289                                .data = 0,
290                                .udata = @ptrToInt(&eventfd_node.data.base),
291                            },
292                        },
293                        .next = undefined,
294                    };
295                    self.available_eventfd_resume_nodes.push(eventfd_node);
296                    const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
297                    _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
298                    eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE;
299                    eventfd_node.data.kevent.fflags = os.system.NOTE_TRIGGER;
300                }
301
302                // Pre-add so that we cannot get error.SystemResources
303                // later when we try to activate it.
304                self.os_data.final_kevent = os.Kevent{
305                    .ident = extra_thread_count,
306                    .filter = os.system.EVFILT_USER,
307                    .flags = os.system.EV_ADD | os.system.EV_DISABLE,
308                    .fflags = 0,
309                    .data = 0,
310                    .udata = @ptrToInt(&self.final_resume_node),
311                };
312                const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
313                _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
314                self.os_data.final_kevent.flags = os.system.EV_ENABLE;
315                self.os_data.final_kevent.fflags = os.system.NOTE_TRIGGER;
316
317                if (builtin.single_threaded) {
318                    assert(extra_thread_count == 0);
319                    return;
320                }
321
322                var extra_thread_index: usize = 0;
323                errdefer {
324                    _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
325                    while (extra_thread_index != 0) {
326                        extra_thread_index -= 1;
327                        self.extra_threads[extra_thread_index].join();
328                    }
329                }
330                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
331                    self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
332                }
333            },
334            .openbsd => {
335                self.os_data.kqfd = try os.kqueue();
336                errdefer os.close(self.os_data.kqfd);
337
338                const empty_kevs = &[0]os.Kevent{};
339
340                for (self.eventfd_resume_nodes) |*eventfd_node, i| {
341                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
342                        .data = ResumeNode.EventFd{
343                            .base = ResumeNode{
344                                .id = ResumeNode.Id.EventFd,
345                                .handle = undefined,
346                                .overlapped = ResumeNode.overlapped_init,
347                            },
348                            // this one is for sending events
349                            .kevent = os.Kevent{
350                                .ident = i,
351                                .filter = os.system.EVFILT_TIMER,
352                                .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE | os.system.EV_ONESHOT,
353                                .fflags = 0,
354                                .data = 0,
355                                .udata = @ptrToInt(&eventfd_node.data.base),
356                            },
357                        },
358                        .next = undefined,
359                    };
360                    self.available_eventfd_resume_nodes.push(eventfd_node);
361                    const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
362                    _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
363                    eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE;
364                }
365
366                // Pre-add so that we cannot get error.SystemResources
367                // later when we try to activate it.
368                self.os_data.final_kevent = os.Kevent{
369                    .ident = extra_thread_count,
370                    .filter = os.system.EVFILT_TIMER,
371                    .flags = os.system.EV_ADD | os.system.EV_ONESHOT | os.system.EV_DISABLE,
372                    .fflags = 0,
373                    .data = 0,
374                    .udata = @ptrToInt(&self.final_resume_node),
375                };
376                const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
377                _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
378                self.os_data.final_kevent.flags = os.system.EV_ENABLE;
379
380                if (builtin.single_threaded) {
381                    assert(extra_thread_count == 0);
382                    return;
383                }
384
385                var extra_thread_index: usize = 0;
386                errdefer {
387                    _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
388                    while (extra_thread_index != 0) {
389                        extra_thread_index -= 1;
390                        self.extra_threads[extra_thread_index].join();
391                    }
392                }
393                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
394                    self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
395                }
396            },
397            .windows => {
398                self.os_data.io_port = try windows.CreateIoCompletionPort(
399                    windows.INVALID_HANDLE_VALUE,
400                    null,
401                    undefined,
402                    maxInt(windows.DWORD),
403                );
404                errdefer windows.CloseHandle(self.os_data.io_port);
405
406                for (self.eventfd_resume_nodes) |*eventfd_node| {
407                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
408                        .data = ResumeNode.EventFd{
409                            .base = ResumeNode{
410                                .id = ResumeNode.Id.EventFd,
411                                .handle = undefined,
412                                .overlapped = ResumeNode.overlapped_init,
413                            },
414                            // this one is for sending events
415                            .completion_key = @ptrToInt(&eventfd_node.data.base),
416                        },
417                        .next = undefined,
418                    };
419                    self.available_eventfd_resume_nodes.push(eventfd_node);
420                }
421
422                if (builtin.single_threaded) {
423                    assert(extra_thread_count == 0);
424                    return;
425                }
426
427                var extra_thread_index: usize = 0;
428                errdefer {
429                    var i: usize = 0;
430                    while (i < extra_thread_index) : (i += 1) {
431                        while (true) {
432                            const overlapped = &self.final_resume_node.overlapped;
433                            windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
434                            break;
435                        }
436                    }
437                    while (extra_thread_index != 0) {
438                        extra_thread_index -= 1;
439                        self.extra_threads[extra_thread_index].join();
440                    }
441                }
442                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
443                    self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
444                }
445            },
446            else => {},
447        };
448    }
449
450    fn deinitOsData(self: *Loop) void {
451        nosuspend switch (builtin.os.tag) {
452            .linux => {
453                os.close(self.os_data.final_eventfd);
454                while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
455                os.close(self.os_data.epollfd);
456            },
457            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
458                os.close(self.os_data.kqfd);
459            },
460            .windows => {
461                windows.CloseHandle(self.os_data.io_port);
462            },
463            else => {},
464        };
465    }
466
467    /// resume_node must live longer than the anyframe that it holds a reference to.
468    /// flags must contain EPOLLET
469    pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void {
470        assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
471        self.beginOneEvent();
472        errdefer self.finishOneEvent();
473        try self.linuxModFd(
474            fd,
475            os.linux.EPOLL.CTL_ADD,
476            flags,
477            resume_node,
478        );
479    }
480
481    pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void {
482        assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
483        var ev = os.linux.epoll_event{
484            .events = flags,
485            .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
486        };
487        try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev);
488    }
489
490    pub fn linuxRemoveFd(self: *Loop, fd: i32) void {
491        os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL.CTL_DEL, fd, null) catch {};
492        self.finishOneEvent();
493    }
494
495    pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void {
496        assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
497        assert(flags & os.linux.EPOLL.ONESHOT == os.linux.EPOLL.ONESHOT);
498        var resume_node = ResumeNode.Basic{
499            .base = ResumeNode{
500                .id = .Basic,
501                .handle = @frame(),
502                .overlapped = ResumeNode.overlapped_init,
503            },
504        };
505        var need_to_delete = true;
506        defer if (need_to_delete) self.linuxRemoveFd(fd);
507
508        suspend {
509            self.linuxAddFd(fd, &resume_node.base, flags) catch |err| switch (err) {
510                error.FileDescriptorNotRegistered => unreachable,
511                error.OperationCausesCircularLoop => unreachable,
512                error.FileDescriptorIncompatibleWithEpoll => unreachable,
513                error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe
514
515                error.SystemResources,
516                error.UserResourceLimitReached,
517                error.Unexpected,
518                => {
519                    need_to_delete = false;
520                    // Fall back to a blocking poll(). Ideally this codepath is never hit, since
521                    // epoll should be just fine. But this is better than incorrect behavior.
522                    var poll_flags: i16 = 0;
523                    if ((flags & os.linux.EPOLL.IN) != 0) poll_flags |= os.POLL.IN;
524                    if ((flags & os.linux.EPOLL.OUT) != 0) poll_flags |= os.POLL.OUT;
525                    var pfd = [1]os.pollfd{os.pollfd{
526                        .fd = fd,
527                        .events = poll_flags,
528                        .revents = undefined,
529                    }};
530                    _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) {
531                        error.NetworkSubsystemFailed => unreachable, // only possible on windows
532
533                        error.SystemResources,
534                        error.Unexpected,
535                        => {
536                            // Even poll() didn't work. The best we can do now is sleep for a
537                            // small duration and then hope that something changed.
538                            std.time.sleep(1 * std.time.ns_per_ms);
539                        },
540                    };
541                    resume @frame();
542                },
543            };
544        }
545    }
546
547    pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void {
548        switch (builtin.os.tag) {
549            .linux => {
550                self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN);
551            },
552            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
553                self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_READ, os.system.EV_ONESHOT);
554            },
555            else => @compileError("Unsupported OS"),
556        }
557    }
558
559    pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void {
560        switch (builtin.os.tag) {
561            .linux => {
562                self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT);
563            },
564            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
565                self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_WRITE, os.system.EV_ONESHOT);
566            },
567            else => @compileError("Unsupported OS"),
568        }
569    }
570
571    pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void {
572        switch (builtin.os.tag) {
573            .linux => {
574                self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT | os.linux.EPOLL.IN);
575            },
576            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
577                self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_READ, os.system.EV_ONESHOT);
578                self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_WRITE, os.system.EV_ONESHOT);
579            },
580            else => @compileError("Unsupported OS"),
581        }
582    }
583
584    pub fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, flags: u16) void {
585        var resume_node = ResumeNode.Basic{
586            .base = ResumeNode{
587                .id = ResumeNode.Id.Basic,
588                .handle = @frame(),
589                .overlapped = ResumeNode.overlapped_init,
590            },
591            .kev = undefined,
592        };
593
594        defer {
595            // If the kevent was set to be ONESHOT, it doesn't need to be deleted manually.
596            if (flags & os.system.EV_ONESHOT != 0) {
597                self.bsdRemoveKev(ident, filter);
598            }
599        }
600
601        suspend {
602            self.bsdAddKev(&resume_node, ident, filter, flags) catch unreachable;
603        }
604    }
605
606    /// resume_node must live longer than the anyframe that it holds a reference to.
607    pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, flags: u16) !void {
608        self.beginOneEvent();
609        errdefer self.finishOneEvent();
610        var kev = [1]os.Kevent{os.Kevent{
611            .ident = ident,
612            .filter = filter,
613            .flags = os.system.EV_ADD | os.system.EV_ENABLE | os.system.EV_CLEAR | flags,
614            .fflags = 0,
615            .data = 0,
616            .udata = @ptrToInt(&resume_node.base),
617        }};
618        const empty_kevs = &[0]os.Kevent{};
619        _ = try os.kevent(self.os_data.kqfd, &kev, empty_kevs, null);
620    }
621
622    pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void {
623        var kev = [1]os.Kevent{os.Kevent{
624            .ident = ident,
625            .filter = filter,
626            .flags = os.system.EV_DELETE,
627            .fflags = 0,
628            .data = 0,
629            .udata = 0,
630        }};
631        const empty_kevs = &[0]os.Kevent{};
632        _ = os.kevent(self.os_data.kqfd, &kev, empty_kevs, null) catch undefined;
633        self.finishOneEvent();
634    }
635
636    fn dispatch(self: *Loop) void {
637        while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
638            const next_tick_node = self.next_tick_queue.get() orelse {
639                self.available_eventfd_resume_nodes.push(resume_stack_node);
640                return;
641            };
642            const eventfd_node = &resume_stack_node.data;
643            eventfd_node.base.handle = next_tick_node.data;
644            switch (builtin.os.tag) {
645                .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
646                    const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.kevent);
647                    const empty_kevs = &[0]os.Kevent{};
648                    _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch {
649                        self.next_tick_queue.unget(next_tick_node);
650                        self.available_eventfd_resume_nodes.push(resume_stack_node);
651                        return;
652                    };
653                },
654                .linux => {
655                    // the pending count is already accounted for
656                    const epoll_events = os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN | os.linux.EPOLL.OUT |
657                        os.linux.EPOLL.ET;
658                    self.linuxModFd(
659                        eventfd_node.eventfd,
660                        eventfd_node.epoll_op,
661                        epoll_events,
662                        &eventfd_node.base,
663                    ) catch {
664                        self.next_tick_queue.unget(next_tick_node);
665                        self.available_eventfd_resume_nodes.push(resume_stack_node);
666                        return;
667                    };
668                },
669                .windows => {
670                    windows.PostQueuedCompletionStatus(
671                        self.os_data.io_port,
672                        undefined,
673                        undefined,
674                        &eventfd_node.base.overlapped,
675                    ) catch {
676                        self.next_tick_queue.unget(next_tick_node);
677                        self.available_eventfd_resume_nodes.push(resume_stack_node);
678                        return;
679                    };
680                },
681                else => @compileError("unsupported OS"),
682            }
683        }
684    }
685
686    /// Bring your own linked list node. This means it can't fail.
687    pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
688        self.beginOneEvent(); // finished in dispatch()
689        self.next_tick_queue.put(node);
690        self.dispatch();
691    }
692
693    pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void {
694        if (self.next_tick_queue.remove(node)) {
695            self.finishOneEvent();
696        }
697    }
698
699    pub fn run(self: *Loop) void {
700        self.finishOneEvent(); // the reference we start with
701
702        self.workerRun();
703
704        if (!builtin.single_threaded) {
705            switch (builtin.os.tag) {
706                .linux,
707                .macos,
708                .freebsd,
709                .netbsd,
710                .dragonfly,
711                .openbsd,
712                => self.fs_thread.join(),
713                else => {},
714            }
715        }
716
717        for (self.extra_threads) |extra_thread| {
718            extra_thread.join();
719        }
720
721        @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
722        self.delay_queue.event.set();
723        self.delay_queue.thread.join();
724    }
725
726    /// Runs the provided function asynchronously. The function's frame is allocated
727    /// with `allocator` and freed when the function returns.
728    /// `func` must return void and it can be an async function.
729    /// Yields to the event loop, running the function on the next tick.
730    pub fn runDetached(self: *Loop, alloc: mem.Allocator, comptime func: anytype, args: anytype) error{OutOfMemory}!void {
731        if (!std.io.is_async) @compileError("Can't use runDetached in non-async mode!");
732        if (@TypeOf(@call(.{}, func, args)) != void) {
733            @compileError("`func` must not have a return value");
734        }
735
736        const Wrapper = struct {
737            const Args = @TypeOf(args);
738            fn run(func_args: Args, loop: *Loop, allocator: mem.Allocator) void {
739                loop.beginOneEvent();
740                loop.yield();
741                @call(.{}, func, func_args); // compile error when called with non-void ret type
742                suspend {
743                    loop.finishOneEvent();
744                    allocator.destroy(@frame());
745                }
746            }
747        };
748
749        var run_frame = try alloc.create(@Frame(Wrapper.run));
750        run_frame.* = async Wrapper.run(args, self, alloc);
751    }
752
753    /// Yielding lets the event loop run, starting any unstarted async operations.
754    /// Note that async operations automatically start when a function yields for any other reason,
755    /// for example, when async I/O is performed. This function is intended to be used only when
756    /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O
757    /// is performed.
758    pub fn yield(self: *Loop) void {
759        suspend {
760            var my_tick_node = NextTickNode{
761                .prev = undefined,
762                .next = undefined,
763                .data = @frame(),
764            };
765            self.onNextTick(&my_tick_node);
766        }
767    }
768
769    /// If the build is multi-threaded and there is an event loop, then it calls `yield`. Otherwise,
770    /// does nothing.
771    pub fn startCpuBoundOperation() void {
772        if (builtin.single_threaded) {
773            return;
774        } else if (instance) |event_loop| {
775            event_loop.yield();
776        }
777    }
778
779    /// call finishOneEvent when done
780    pub fn beginOneEvent(self: *Loop) void {
781        _ = @atomicRmw(usize, &self.pending_event_count, .Add, 1, .SeqCst);
782    }
783
784    pub fn finishOneEvent(self: *Loop) void {
785        nosuspend {
786            const prev = @atomicRmw(usize, &self.pending_event_count, .Sub, 1, .SeqCst);
787            if (prev != 1) return;
788
789            // cause all the threads to stop
790            self.posixFsRequest(&self.fs_end_request);
791
792            switch (builtin.os.tag) {
793                .linux => {
794                    // writing to the eventfd will only wake up one thread, thus multiple writes
795                    // are needed to wakeup all the threads
796                    var i: usize = 0;
797                    while (i < self.extra_threads.len + 1) : (i += 1) {
798                        // writing 8 bytes to an eventfd cannot fail
799                        const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
800                        assert(amt == wakeup_bytes.len);
801                    }
802                    return;
803                },
804                .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
805                    const final_kevent = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
806                    const empty_kevs = &[0]os.Kevent{};
807                    // cannot fail because we already added it and this just enables it
808                    _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable;
809                    return;
810                },
811                .windows => {
812                    var i: usize = 0;
813                    while (i < self.extra_threads.len + 1) : (i += 1) {
814                        while (true) {
815                            const overlapped = &self.final_resume_node.overlapped;
816                            windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
817                            break;
818                        }
819                    }
820                    return;
821                },
822                else => @compileError("unsupported OS"),
823            }
824        }
825    }
826
827    pub fn sleep(self: *Loop, nanoseconds: u64) void {
828        if (builtin.single_threaded)
829            @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded");
830
831        suspend {
832            const now = self.delay_queue.timer.read();
833
834            var entry: DelayQueue.Waiters.Entry = undefined;
835            entry.init(@frame(), now + nanoseconds);
836            self.delay_queue.waiters.insert(&entry);
837
838            // Speculatively wake up the timer thread when we add a new entry.
839            // If the timer thread is sleeping on a longer entry, we need to
840            // interrupt it so that our entry can be expired in time.
841            self.delay_queue.event.set();
842        }
843    }
844
845    const DelayQueue = struct {
846        timer: std.time.Timer,
847        waiters: Waiters,
848        thread: std.Thread,
849        event: std.Thread.AutoResetEvent,
850        is_running: bool,
851
852        /// Initialize the delay queue by spawning the timer thread
853        /// and starting any timer resources.
854        fn init(self: *DelayQueue) !void {
855            self.* = DelayQueue{
856                .timer = try std.time.Timer.start(),
857                .waiters = DelayQueue.Waiters{
858                    .entries = std.atomic.Queue(anyframe).init(),
859                },
860                .event = std.Thread.AutoResetEvent{},
861                .is_running = true,
862                // Must be last so that it can read the other state, such as `is_running`.
863                .thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}),
864            };
865        }
866
867        /// Entry point for the timer thread
868        /// which waits for timer entries to expire and reschedules them.
869        fn run(self: *DelayQueue) void {
870            const loop = @fieldParentPtr(Loop, "delay_queue", self);
871
872            while (@atomicLoad(bool, &self.is_running, .SeqCst)) {
873                const now = self.timer.read();
874
875                if (self.waiters.popExpired(now)) |entry| {
876                    loop.onNextTick(&entry.node);
877                    continue;
878                }
879
880                if (self.waiters.nextExpire()) |expires| {
881                    if (now >= expires)
882                        continue;
883                    self.event.timedWait(expires - now) catch {};
884                } else {
885                    self.event.wait();
886                }
887            }
888        }
889
890        // TODO: use a tickless heirarchical timer wheel:
891        // https://github.com/wahern/timeout/
892        const Waiters = struct {
893            entries: std.atomic.Queue(anyframe),
894
895            const Entry = struct {
896                node: NextTickNode,
897                expires: u64,
898
899                fn init(self: *Entry, frame: anyframe, expires: u64) void {
900                    self.node.data = frame;
901                    self.expires = expires;
902                }
903            };
904
905            /// Registers the entry into the queue of waiting frames
906            fn insert(self: *Waiters, entry: *Entry) void {
907                self.entries.put(&entry.node);
908            }
909
910            /// Dequeues one expired event relative to `now`
911            fn popExpired(self: *Waiters, now: u64) ?*Entry {
912                const entry = self.peekExpiringEntry() orelse return null;
913                if (entry.expires > now)
914                    return null;
915
916                assert(self.entries.remove(&entry.node));
917                return entry;
918            }
919
920            /// Returns an estimate for the amount of time
921            /// to wait until the next waiting entry expires.
922            fn nextExpire(self: *Waiters) ?u64 {
923                const entry = self.peekExpiringEntry() orelse return null;
924                return entry.expires;
925            }
926
927            fn peekExpiringEntry(self: *Waiters) ?*Entry {
928                self.entries.mutex.lock();
929                defer self.entries.mutex.unlock();
930
931                // starting from the head
932                var head = self.entries.head orelse return null;
933
934                // traverse the list of waiting entires to
935                // find the Node with the smallest `expires` field
936                var min = head;
937                while (head.next) |node| {
938                    const minEntry = @fieldParentPtr(Entry, "node", min);
939                    const nodeEntry = @fieldParentPtr(Entry, "node", node);
940                    if (nodeEntry.expires < minEntry.expires)
941                        min = node;
942                    head = node;
943                }
944
945                return @fieldParentPtr(Entry, "node", min);
946            }
947        };
948    };
949
950    /// ------- I/0 APIs -------
951    pub fn accept(
952        self: *Loop,
953        /// This argument is a socket that has been created with `socket`, bound to a local address
954        /// with `bind`, and is listening for connections after a `listen`.
955        sockfd: os.socket_t,
956        /// This argument is a pointer to a sockaddr structure.  This structure is filled in with  the
957        /// address  of  the  peer  socket, as known to the communications layer.  The exact format of the
958        /// address returned addr is determined by the socket's address  family  (see  `socket`  and  the
959        /// respective  protocol  man  pages).
960        addr: *os.sockaddr,
961        /// This argument is a value-result argument: the caller must initialize it to contain  the
962        /// size (in bytes) of the structure pointed to by addr; on return it will contain the actual size
963        /// of the peer address.
964        ///
965        /// The returned address is truncated if the buffer provided is too small; in this  case,  `addr_size`
966        /// will return a value greater than was supplied to the call.
967        addr_size: *os.socklen_t,
968        /// The following values can be bitwise ORed in flags to obtain different behavior:
969        /// * `SOCK.CLOEXEC`  - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor.   See  the
970        ///   description  of the `O.CLOEXEC` flag in `open` for reasons why this may be useful.
971        flags: u32,
972    ) os.AcceptError!os.socket_t {
973        while (true) {
974            return os.accept(sockfd, addr, addr_size, flags | os.SOCK.NONBLOCK) catch |err| switch (err) {
975                error.WouldBlock => {
976                    self.waitUntilFdReadable(sockfd);
977                    continue;
978                },
979                else => return err,
980            };
981        }
982    }
983
984    pub fn connect(self: *Loop, sockfd: os.socket_t, sock_addr: *const os.sockaddr, len: os.socklen_t) os.ConnectError!void {
985        os.connect(sockfd, sock_addr, len) catch |err| switch (err) {
986            error.WouldBlock => {
987                self.waitUntilFdWritable(sockfd);
988                return os.getsockoptError(sockfd);
989            },
990            else => return err,
991        };
992    }
993
994    /// Performs an async `os.open` using a separate thread.
995    pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t {
996        var req_node = Request.Node{
997            .data = .{
998                .msg = .{
999                    .open = .{
1000                        .path = file_path,
1001                        .flags = flags,
1002                        .mode = mode,
1003                        .result = undefined,
1004                    },
1005                },
1006                .finish = .{ .TickNode = .{ .data = @frame() } },
1007            },
1008        };
1009        suspend {
1010            self.posixFsRequest(&req_node);
1011        }
1012        return req_node.data.msg.open.result;
1013    }
1014
1015    /// Performs an async `os.opent` using a separate thread.
1016    pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t {
1017        var req_node = Request.Node{
1018            .data = .{
1019                .msg = .{
1020                    .openat = .{
1021                        .fd = fd,
1022                        .path = file_path,
1023                        .flags = flags,
1024                        .mode = mode,
1025                        .result = undefined,
1026                    },
1027                },
1028                .finish = .{ .TickNode = .{ .data = @frame() } },
1029            },
1030        };
1031        suspend {
1032            self.posixFsRequest(&req_node);
1033        }
1034        return req_node.data.msg.openat.result;
1035    }
1036
1037    /// Performs an async `os.close` using a separate thread.
1038    pub fn close(self: *Loop, fd: os.fd_t) void {
1039        var req_node = Request.Node{
1040            .data = .{
1041                .msg = .{ .close = .{ .fd = fd } },
1042                .finish = .{ .TickNode = .{ .data = @frame() } },
1043            },
1044        };
1045        suspend {
1046            self.posixFsRequest(&req_node);
1047        }
1048    }
1049
1050    /// Performs an async `os.read` using a separate thread.
1051    /// `fd` must block and not return EAGAIN.
1052    pub fn read(self: *Loop, fd: os.fd_t, buf: []u8, simulate_evented: bool) os.ReadError!usize {
1053        if (simulate_evented) {
1054            var req_node = Request.Node{
1055                .data = .{
1056                    .msg = .{
1057                        .read = .{
1058                            .fd = fd,
1059                            .buf = buf,
1060                            .result = undefined,
1061                        },
1062                    },
1063                    .finish = .{ .TickNode = .{ .data = @frame() } },
1064                },
1065            };
1066            suspend {
1067                self.posixFsRequest(&req_node);
1068            }
1069            return req_node.data.msg.read.result;
1070        } else {
1071            while (true) {
1072                return os.read(fd, buf) catch |err| switch (err) {
1073                    error.WouldBlock => {
1074                        self.waitUntilFdReadable(fd);
1075                        continue;
1076                    },
1077                    else => return err,
1078                };
1079            }
1080        }
1081    }
1082
1083    /// Performs an async `os.readv` using a separate thread.
1084    /// `fd` must block and not return EAGAIN.
1085    pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, simulate_evented: bool) os.ReadError!usize {
1086        if (simulate_evented) {
1087            var req_node = Request.Node{
1088                .data = .{
1089                    .msg = .{
1090                        .readv = .{
1091                            .fd = fd,
1092                            .iov = iov,
1093                            .result = undefined,
1094                        },
1095                    },
1096                    .finish = .{ .TickNode = .{ .data = @frame() } },
1097                },
1098            };
1099            suspend {
1100                self.posixFsRequest(&req_node);
1101            }
1102            return req_node.data.msg.readv.result;
1103        } else {
1104            while (true) {
1105                return os.readv(fd, iov) catch |err| switch (err) {
1106                    error.WouldBlock => {
1107                        self.waitUntilFdReadable(fd);
1108                        continue;
1109                    },
1110                    else => return err,
1111                };
1112            }
1113        }
1114    }
1115
1116    /// Performs an async `os.pread` using a separate thread.
1117    /// `fd` must block and not return EAGAIN.
1118    pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64, simulate_evented: bool) os.PReadError!usize {
1119        if (simulate_evented) {
1120            var req_node = Request.Node{
1121                .data = .{
1122                    .msg = .{
1123                        .pread = .{
1124                            .fd = fd,
1125                            .buf = buf,
1126                            .offset = offset,
1127                            .result = undefined,
1128                        },
1129                    },
1130                    .finish = .{ .TickNode = .{ .data = @frame() } },
1131                },
1132            };
1133            suspend {
1134                self.posixFsRequest(&req_node);
1135            }
1136            return req_node.data.msg.pread.result;
1137        } else {
1138            while (true) {
1139                return os.pread(fd, buf, offset) catch |err| switch (err) {
1140                    error.WouldBlock => {
1141                        self.waitUntilFdReadable(fd);
1142                        continue;
1143                    },
1144                    else => return err,
1145                };
1146            }
1147        }
1148    }
1149
1150    /// Performs an async `os.preadv` using a separate thread.
1151    /// `fd` must block and not return EAGAIN.
1152    pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64, simulate_evented: bool) os.ReadError!usize {
1153        if (simulate_evented) {
1154            var req_node = Request.Node{
1155                .data = .{
1156                    .msg = .{
1157                        .preadv = .{
1158                            .fd = fd,
1159                            .iov = iov,
1160                            .offset = offset,
1161                            .result = undefined,
1162                        },
1163                    },
1164                    .finish = .{ .TickNode = .{ .data = @frame() } },
1165                },
1166            };
1167            suspend {
1168                self.posixFsRequest(&req_node);
1169            }
1170            return req_node.data.msg.preadv.result;
1171        } else {
1172            while (true) {
1173                return os.preadv(fd, iov, offset) catch |err| switch (err) {
1174                    error.WouldBlock => {
1175                        self.waitUntilFdReadable(fd);
1176                        continue;
1177                    },
1178                    else => return err,
1179                };
1180            }
1181        }
1182    }
1183
1184    /// Performs an async `os.write` using a separate thread.
1185    /// `fd` must block and not return EAGAIN.
1186    pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8, simulate_evented: bool) os.WriteError!usize {
1187        if (simulate_evented) {
1188            var req_node = Request.Node{
1189                .data = .{
1190                    .msg = .{
1191                        .write = .{
1192                            .fd = fd,
1193                            .bytes = bytes,
1194                            .result = undefined,
1195                        },
1196                    },
1197                    .finish = .{ .TickNode = .{ .data = @frame() } },
1198                },
1199            };
1200            suspend {
1201                self.posixFsRequest(&req_node);
1202            }
1203            return req_node.data.msg.write.result;
1204        } else {
1205            while (true) {
1206                return os.write(fd, bytes) catch |err| switch (err) {
1207                    error.WouldBlock => {
1208                        self.waitUntilFdWritable(fd);
1209                        continue;
1210                    },
1211                    else => return err,
1212                };
1213            }
1214        }
1215    }
1216
1217    /// Performs an async `os.writev` using a separate thread.
1218    /// `fd` must block and not return EAGAIN.
1219    pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, simulate_evented: bool) os.WriteError!usize {
1220        if (simulate_evented) {
1221            var req_node = Request.Node{
1222                .data = .{
1223                    .msg = .{
1224                        .writev = .{
1225                            .fd = fd,
1226                            .iov = iov,
1227                            .result = undefined,
1228                        },
1229                    },
1230                    .finish = .{ .TickNode = .{ .data = @frame() } },
1231                },
1232            };
1233            suspend {
1234                self.posixFsRequest(&req_node);
1235            }
1236            return req_node.data.msg.writev.result;
1237        } else {
1238            while (true) {
1239                return os.writev(fd, iov) catch |err| switch (err) {
1240                    error.WouldBlock => {
1241                        self.waitUntilFdWritable(fd);
1242                        continue;
1243                    },
1244                    else => return err,
1245                };
1246            }
1247        }
1248    }
1249
1250    /// Performs an async `os.pwrite` using a separate thread.
1251    /// `fd` must block and not return EAGAIN.
1252    pub fn pwrite(self: *Loop, fd: os.fd_t, bytes: []const u8, offset: u64, simulate_evented: bool) os.PerformsWriteError!usize {
1253        if (simulate_evented) {
1254            var req_node = Request.Node{
1255                .data = .{
1256                    .msg = .{
1257                        .pwrite = .{
1258                            .fd = fd,
1259                            .bytes = bytes,
1260                            .offset = offset,
1261                            .result = undefined,
1262                        },
1263                    },
1264                    .finish = .{ .TickNode = .{ .data = @frame() } },
1265                },
1266            };
1267            suspend {
1268                self.posixFsRequest(&req_node);
1269            }
1270            return req_node.data.msg.pwrite.result;
1271        } else {
1272            while (true) {
1273                return os.pwrite(fd, bytes, offset) catch |err| switch (err) {
1274                    error.WouldBlock => {
1275                        self.waitUntilFdWritable(fd);
1276                        continue;
1277                    },
1278                    else => return err,
1279                };
1280            }
1281        }
1282    }
1283
1284    /// Performs an async `os.pwritev` using a separate thread.
1285    /// `fd` must block and not return EAGAIN.
1286    pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64, simulate_evented: bool) os.PWriteError!usize {
1287        if (simulate_evented) {
1288            var req_node = Request.Node{
1289                .data = .{
1290                    .msg = .{
1291                        .pwritev = .{
1292                            .fd = fd,
1293                            .iov = iov,
1294                            .offset = offset,
1295                            .result = undefined,
1296                        },
1297                    },
1298                    .finish = .{ .TickNode = .{ .data = @frame() } },
1299                },
1300            };
1301            suspend {
1302                self.posixFsRequest(&req_node);
1303            }
1304            return req_node.data.msg.pwritev.result;
1305        } else {
1306            while (true) {
1307                return os.pwritev(fd, iov, offset) catch |err| switch (err) {
1308                    error.WouldBlock => {
1309                        self.waitUntilFdWritable(fd);
1310                        continue;
1311                    },
1312                    else => return err,
1313                };
1314            }
1315        }
1316    }
1317
1318    pub fn sendto(
1319        self: *Loop,
1320        /// The file descriptor of the sending socket.
1321        sockfd: os.fd_t,
1322        /// Message to send.
1323        buf: []const u8,
1324        flags: u32,
1325        dest_addr: ?*const os.sockaddr,
1326        addrlen: os.socklen_t,
1327    ) os.SendToError!usize {
1328        while (true) {
1329            return os.sendto(sockfd, buf, flags, dest_addr, addrlen) catch |err| switch (err) {
1330                error.WouldBlock => {
1331                    self.waitUntilFdWritable(sockfd);
1332                    continue;
1333                },
1334                else => return err,
1335            };
1336        }
1337    }
1338
1339    pub fn recvfrom(
1340        self: *Loop,
1341        sockfd: os.fd_t,
1342        buf: []u8,
1343        flags: u32,
1344        src_addr: ?*os.sockaddr,
1345        addrlen: ?*os.socklen_t,
1346    ) os.RecvFromError!usize {
1347        while (true) {
1348            return os.recvfrom(sockfd, buf, flags, src_addr, addrlen) catch |err| switch (err) {
1349                error.WouldBlock => {
1350                    self.waitUntilFdReadable(sockfd);
1351                    continue;
1352                },
1353                else => return err,
1354            };
1355        }
1356    }
1357
1358    /// Performs an async `os.faccessatZ` using a separate thread.
1359    /// `fd` must block and not return EAGAIN.
1360    pub fn faccessatZ(
1361        self: *Loop,
1362        dirfd: os.fd_t,
1363        path_z: [*:0]const u8,
1364        mode: u32,
1365        flags: u32,
1366    ) os.AccessError!void {
1367        var req_node = Request.Node{
1368            .data = .{
1369                .msg = .{
1370                    .faccessat = .{
1371                        .dirfd = dirfd,
1372                        .path = path_z,
1373                        .mode = mode,
1374                        .flags = flags,
1375                        .result = undefined,
1376                    },
1377                },
1378                .finish = .{ .TickNode = .{ .data = @frame() } },
1379            },
1380        };
1381        suspend {
1382            self.posixFsRequest(&req_node);
1383        }
1384        return req_node.data.msg.faccessat.result;
1385    }
1386
1387    fn workerRun(self: *Loop) void {
1388        while (true) {
1389            while (true) {
1390                const next_tick_node = self.next_tick_queue.get() orelse break;
1391                self.dispatch();
1392                resume next_tick_node.data;
1393                self.finishOneEvent();
1394            }
1395
1396            switch (builtin.os.tag) {
1397                .linux => {
1398                    // only process 1 event so we don't steal from other threads
1399                    var events: [1]os.linux.epoll_event = undefined;
1400                    const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1);
1401                    for (events[0..count]) |ev| {
1402                        const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
1403                        const handle = resume_node.handle;
1404                        const resume_node_id = resume_node.id;
1405                        switch (resume_node_id) {
1406                            .Basic => {},
1407                            .Stop => return,
1408                            .EventFd => {
1409                                const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
1410                                event_fd_node.epoll_op = os.linux.EPOLL.CTL_MOD;
1411                                const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
1412                                self.available_eventfd_resume_nodes.push(stack_node);
1413                            },
1414                        }
1415                        resume handle;
1416                        if (resume_node_id == ResumeNode.Id.EventFd) {
1417                            self.finishOneEvent();
1418                        }
1419                    }
1420                },
1421                .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
1422                    var eventlist: [1]os.Kevent = undefined;
1423                    const empty_kevs = &[0]os.Kevent{};
1424                    const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable;
1425                    for (eventlist[0..count]) |ev| {
1426                        const resume_node = @intToPtr(*ResumeNode, ev.udata);
1427                        const handle = resume_node.handle;
1428                        const resume_node_id = resume_node.id;
1429                        switch (resume_node_id) {
1430                            .Basic => {
1431                                const basic_node = @fieldParentPtr(ResumeNode.Basic, "base", resume_node);
1432                                basic_node.kev = ev;
1433                            },
1434                            .Stop => return,
1435                            .EventFd => {
1436                                const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
1437                                const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
1438                                self.available_eventfd_resume_nodes.push(stack_node);
1439                            },
1440                        }
1441                        resume handle;
1442                        if (resume_node_id == ResumeNode.Id.EventFd) {
1443                            self.finishOneEvent();
1444                        }
1445                    }
1446                },
1447                .windows => {
1448                    var completion_key: usize = undefined;
1449                    const overlapped = while (true) {
1450                        var nbytes: windows.DWORD = undefined;
1451                        var overlapped: ?*windows.OVERLAPPED = undefined;
1452                        switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
1453                            .Aborted => return,
1454                            .Normal => {},
1455                            .EOF => {},
1456                            .Cancelled => continue,
1457                        }
1458                        if (overlapped) |o| break o;
1459                    } else unreachable; // TODO else unreachable should not be necessary
1460                    const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped);
1461                    const handle = resume_node.handle;
1462                    const resume_node_id = resume_node.id;
1463                    switch (resume_node_id) {
1464                        .Basic => {},
1465                        .Stop => return,
1466                        .EventFd => {
1467                            const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
1468                            const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
1469                            self.available_eventfd_resume_nodes.push(stack_node);
1470                        },
1471                    }
1472                    resume handle;
1473                    self.finishOneEvent();
1474                },
1475                else => @compileError("unsupported OS"),
1476            }
1477        }
1478    }
1479
1480    fn posixFsRequest(self: *Loop, request_node: *Request.Node) void {
1481        self.beginOneEvent(); // finished in posixFsRun after processing the msg
1482        self.fs_queue.put(request_node);
1483        self.fs_thread_wakeup.set();
1484    }
1485
1486    fn posixFsCancel(self: *Loop, request_node: *Request.Node) void {
1487        if (self.fs_queue.remove(request_node)) {
1488            self.finishOneEvent();
1489        }
1490    }
1491
1492    fn posixFsRun(self: *Loop) void {
1493        nosuspend while (true) {
1494            self.fs_thread_wakeup.reset();
1495            while (self.fs_queue.get()) |node| {
1496                switch (node.data.msg) {
1497                    .end => return,
1498                    .read => |*msg| {
1499                        msg.result = os.read(msg.fd, msg.buf);
1500                    },
1501                    .readv => |*msg| {
1502                        msg.result = os.readv(msg.fd, msg.iov);
1503                    },
1504                    .write => |*msg| {
1505                        msg.result = os.write(msg.fd, msg.bytes);
1506                    },
1507                    .writev => |*msg| {
1508                        msg.result = os.writev(msg.fd, msg.iov);
1509                    },
1510                    .pwrite => |*msg| {
1511                        msg.result = os.pwrite(msg.fd, msg.bytes, msg.offset);
1512                    },
1513                    .pwritev => |*msg| {
1514                        msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
1515                    },
1516                    .pread => |*msg| {
1517                        msg.result = os.pread(msg.fd, msg.buf, msg.offset);
1518                    },
1519                    .preadv => |*msg| {
1520                        msg.result = os.preadv(msg.fd, msg.iov, msg.offset);
1521                    },
1522                    .open => |*msg| {
1523                        if (is_windows) unreachable; // TODO
1524                        msg.result = os.openZ(msg.path, msg.flags, msg.mode);
1525                    },
1526                    .openat => |*msg| {
1527                        if (is_windows) unreachable; // TODO
1528                        msg.result = os.openatZ(msg.fd, msg.path, msg.flags, msg.mode);
1529                    },
1530                    .faccessat => |*msg| {
1531                        msg.result = os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags);
1532                    },
1533                    .close => |*msg| os.close(msg.fd),
1534                }
1535                switch (node.data.finish) {
1536                    .TickNode => |*tick_node| self.onNextTick(tick_node),
1537                    .NoAction => {},
1538                }
1539                self.finishOneEvent();
1540            }
1541            self.fs_thread_wakeup.wait();
1542        };
1543    }
1544
1545    const OsData = switch (builtin.os.tag) {
1546        .linux => LinuxOsData,
1547        .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData,
1548        .windows => struct {
1549            io_port: windows.HANDLE,
1550            extra_thread_count: usize,
1551        },
1552        else => struct {},
1553    };
1554
1555    const KEventData = struct {
1556        kqfd: i32,
1557        final_kevent: os.Kevent,
1558    };
1559
1560    const LinuxOsData = struct {
1561        epollfd: i32,
1562        final_eventfd: i32,
1563        final_eventfd_event: os.linux.epoll_event,
1564    };
1565
1566    pub const Request = struct {
1567        msg: Msg,
1568        finish: Finish,
1569
1570        pub const Node = std.atomic.Queue(Request).Node;
1571
1572        pub const Finish = union(enum) {
1573            TickNode: Loop.NextTickNode,
1574            NoAction,
1575        };
1576
1577        pub const Msg = union(enum) {
1578            read: Read,
1579            readv: ReadV,
1580            write: Write,
1581            writev: WriteV,
1582            pwrite: PWrite,
1583            pwritev: PWriteV,
1584            pread: PRead,
1585            preadv: PReadV,
1586            open: Open,
1587            openat: OpenAt,
1588            close: Close,
1589            faccessat: FAccessAt,
1590
1591            /// special - means the fs thread should exit
1592            end,
1593
1594            pub const Read = struct {
1595                fd: os.fd_t,
1596                buf: []u8,
1597                result: Error!usize,
1598
1599                pub const Error = os.ReadError;
1600            };
1601
1602            pub const ReadV = struct {
1603                fd: os.fd_t,
1604                iov: []const os.iovec,
1605                result: Error!usize,
1606
1607                pub const Error = os.ReadError;
1608            };
1609
1610            pub const Write = struct {
1611                fd: os.fd_t,
1612                bytes: []const u8,
1613                result: Error!usize,
1614
1615                pub const Error = os.WriteError;
1616            };
1617
1618            pub const WriteV = struct {
1619                fd: os.fd_t,
1620                iov: []const os.iovec_const,
1621                result: Error!usize,
1622
1623                pub const Error = os.WriteError;
1624            };
1625
1626            pub const PWrite = struct {
1627                fd: os.fd_t,
1628                bytes: []const u8,
1629                offset: usize,
1630                result: Error!usize,
1631
1632                pub const Error = os.PWriteError;
1633            };
1634
1635            pub const PWriteV = struct {
1636                fd: os.fd_t,
1637                iov: []const os.iovec_const,
1638                offset: usize,
1639                result: Error!usize,
1640
1641                pub const Error = os.PWriteError;
1642            };
1643
1644            pub const PRead = struct {
1645                fd: os.fd_t,
1646                buf: []u8,
1647                offset: usize,
1648                result: Error!usize,
1649
1650                pub const Error = os.PReadError;
1651            };
1652
1653            pub const PReadV = struct {
1654                fd: os.fd_t,
1655                iov: []const os.iovec,
1656                offset: usize,
1657                result: Error!usize,
1658
1659                pub const Error = os.PReadError;
1660            };
1661
1662            pub const Open = struct {
1663                path: [*:0]const u8,
1664                flags: u32,
1665                mode: os.mode_t,
1666                result: Error!os.fd_t,
1667
1668                pub const Error = os.OpenError;
1669            };
1670
1671            pub const OpenAt = struct {
1672                fd: os.fd_t,
1673                path: [*:0]const u8,
1674                flags: u32,
1675                mode: os.mode_t,
1676                result: Error!os.fd_t,
1677
1678                pub const Error = os.OpenError;
1679            };
1680
1681            pub const Close = struct {
1682                fd: os.fd_t,
1683            };
1684
1685            pub const FAccessAt = struct {
1686                dirfd: os.fd_t,
1687                path: [*:0]const u8,
1688                mode: u32,
1689                flags: u32,
1690                result: Error!void,
1691
1692                pub const Error = os.AccessError;
1693            };
1694        };
1695    };
1696};
1697
1698test "std.event.Loop - basic" {
1699    // https://github.com/ziglang/zig/issues/1908
1700    if (builtin.single_threaded) return error.SkipZigTest;
1701
1702    if (true) {
1703        // https://github.com/ziglang/zig/issues/4922
1704        return error.SkipZigTest;
1705    }
1706
1707    var loop: Loop = undefined;
1708    try loop.initMultiThreaded();
1709    defer loop.deinit();
1710
1711    loop.run();
1712}
1713
1714fn testEventLoop() i32 {
1715    return 1234;
1716}
1717
1718fn testEventLoop2(h: anyframe->i32, did_it: *bool) void {
1719    const value = await h;
1720    try testing.expect(value == 1234);
1721    did_it.* = true;
1722}
1723
1724var testRunDetachedData: usize = 0;
1725test "std.event.Loop - runDetached" {
1726    // https://github.com/ziglang/zig/issues/1908
1727    if (builtin.single_threaded) return error.SkipZigTest;
1728    if (!std.io.is_async) return error.SkipZigTest;
1729    if (true) {
1730        // https://github.com/ziglang/zig/issues/4922
1731        return error.SkipZigTest;
1732    }
1733
1734    var loop: Loop = undefined;
1735    try loop.initMultiThreaded();
1736    defer loop.deinit();
1737
1738    // Schedule the execution, won't actually start until we start the
1739    // event loop.
1740    try loop.runDetached(std.testing.allocator, testRunDetached, .{});
1741
1742    // Now we can start the event loop. The function will return only
1743    // after all tasks have been completed, allowing us to synchonize
1744    // with the previous runDetached.
1745    loop.run();
1746
1747    try testing.expect(testRunDetachedData == 1);
1748}
1749
1750fn testRunDetached() void {
1751    testRunDetachedData += 1;
1752}
1753
1754test "std.event.Loop - sleep" {
1755    // https://github.com/ziglang/zig/issues/1908
1756    if (builtin.single_threaded) return error.SkipZigTest;
1757    if (!std.io.is_async) return error.SkipZigTest;
1758
1759    const frames = try testing.allocator.alloc(@Frame(testSleep), 10);
1760    defer testing.allocator.free(frames);
1761
1762    const wait_time = 100 * std.time.ns_per_ms;
1763    var sleep_count: usize = 0;
1764
1765    for (frames) |*frame|
1766        frame.* = async testSleep(wait_time, &sleep_count);
1767    for (frames) |*frame|
1768        await frame;
1769
1770    try testing.expect(sleep_count == frames.len);
1771}
1772
1773fn testSleep(wait_ns: u64, sleep_count: *usize) void {
1774    Loop.instance.?.sleep(wait_ns);
1775    _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst);
1776}
1777