1const std = @import("../std.zig"); 2const builtin = @import("builtin"); 3const root = @import("root"); 4const assert = std.debug.assert; 5const testing = std.testing; 6const mem = std.mem; 7const os = std.os; 8const windows = os.windows; 9const maxInt = std.math.maxInt; 10const Thread = std.Thread; 11 12const is_windows = builtin.os.tag == .windows; 13 14pub const Loop = struct { 15 next_tick_queue: std.atomic.Queue(anyframe), 16 os_data: OsData, 17 final_resume_node: ResumeNode, 18 pending_event_count: usize, 19 extra_threads: []Thread, 20 /// TODO change this to a pool of configurable number of threads 21 /// and rename it to be not file-system-specific. it will become 22 /// a thread pool for turning non-CPU-bound blocking things into 23 /// async things. A fallback for any missing OS-specific API. 24 fs_thread: Thread, 25 fs_queue: std.atomic.Queue(Request), 26 fs_end_request: Request.Node, 27 fs_thread_wakeup: std.Thread.ResetEvent, 28 29 /// For resources that have the same lifetime as the `Loop`. 30 /// This is only used by `Loop` for the thread pool and associated resources. 31 arena: std.heap.ArenaAllocator, 32 33 /// State which manages frames that are sleeping on timers 34 delay_queue: DelayQueue, 35 36 /// Pre-allocated eventfds. All permanently active. 37 /// This is how `Loop` sends promises to be resumed on other threads. 38 available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), 39 eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, 40 41 pub const NextTickNode = std.atomic.Queue(anyframe).Node; 42 43 pub const ResumeNode = struct { 44 id: Id, 45 handle: anyframe, 46 overlapped: Overlapped, 47 48 pub const overlapped_init = switch (builtin.os.tag) { 49 .windows => windows.OVERLAPPED{ 50 .Internal = 0, 51 .InternalHigh = 0, 52 .DUMMYUNIONNAME = .{ 53 .DUMMYSTRUCTNAME = .{ 54 .Offset = 0, 55 .OffsetHigh = 0, 56 }, 57 }, 58 .hEvent = null, 59 }, 60 else => {}, 61 }; 62 pub const Overlapped = @TypeOf(overlapped_init); 63 64 pub const Id = enum { 65 Basic, 66 Stop, 67 EventFd, 68 }; 69 70 pub const EventFd = switch (builtin.os.tag) { 71 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd, 72 .linux => struct { 73 base: ResumeNode, 74 epoll_op: u32, 75 eventfd: i32, 76 }, 77 .windows => struct { 78 base: ResumeNode, 79 completion_key: usize, 80 }, 81 else => struct {}, 82 }; 83 84 const KEventFd = struct { 85 base: ResumeNode, 86 kevent: os.Kevent, 87 }; 88 89 pub const Basic = switch (builtin.os.tag) { 90 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic, 91 .linux => struct { 92 base: ResumeNode, 93 }, 94 .windows => struct { 95 base: ResumeNode, 96 }, 97 else => @compileError("unsupported OS"), 98 }; 99 100 const KEventBasic = struct { 101 base: ResumeNode, 102 kev: os.Kevent, 103 }; 104 }; 105 106 var global_instance_state: Loop = undefined; 107 const default_instance: ?*Loop = switch (std.io.mode) { 108 .blocking => null, 109 .evented => &global_instance_state, 110 }; 111 pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance; 112 113 /// TODO copy elision / named return values so that the threads referencing *Loop 114 /// have the correct pointer value. 115 /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 116 pub fn init(self: *Loop) !void { 117 if (builtin.single_threaded or 118 (@hasDecl(root, "event_loop_mode") and root.event_loop_mode == .single_threaded)) 119 { 120 return self.initSingleThreaded(); 121 } else { 122 return self.initMultiThreaded(); 123 } 124 } 125 126 /// After initialization, call run(). 127 /// TODO copy elision / named return values so that the threads referencing *Loop 128 /// have the correct pointer value. 129 /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 130 pub fn initSingleThreaded(self: *Loop) !void { 131 return self.initThreadPool(1); 132 } 133 134 /// After initialization, call run(). 135 /// This is the same as `initThreadPool` using `Thread.getCpuCount` to determine the thread 136 /// pool size. 137 /// TODO copy elision / named return values so that the threads referencing *Loop 138 /// have the correct pointer value. 139 /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 140 pub fn initMultiThreaded(self: *Loop) !void { 141 if (builtin.single_threaded) 142 @compileError("initMultiThreaded unavailable when building in single-threaded mode"); 143 const core_count = try Thread.getCpuCount(); 144 return self.initThreadPool(core_count); 145 } 146 147 /// Thread count is the total thread count. The thread pool size will be 148 /// max(thread_count - 1, 0) 149 pub fn initThreadPool(self: *Loop, thread_count: usize) !void { 150 self.* = Loop{ 151 .arena = std.heap.ArenaAllocator.init(std.heap.page_allocator), 152 .pending_event_count = 1, 153 .os_data = undefined, 154 .next_tick_queue = std.atomic.Queue(anyframe).init(), 155 .extra_threads = undefined, 156 .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), 157 .eventfd_resume_nodes = undefined, 158 .final_resume_node = ResumeNode{ 159 .id = ResumeNode.Id.Stop, 160 .handle = undefined, 161 .overlapped = ResumeNode.overlapped_init, 162 }, 163 .fs_end_request = .{ .data = .{ .msg = .end, .finish = .NoAction } }, 164 .fs_queue = std.atomic.Queue(Request).init(), 165 .fs_thread = undefined, 166 .fs_thread_wakeup = undefined, 167 .delay_queue = undefined, 168 }; 169 try self.fs_thread_wakeup.init(); 170 errdefer self.fs_thread_wakeup.deinit(); 171 errdefer self.arena.deinit(); 172 173 // We need at least one of these in case the fs thread wants to use onNextTick 174 const extra_thread_count = thread_count - 1; 175 const resume_node_count = std.math.max(extra_thread_count, 1); 176 self.eventfd_resume_nodes = try self.arena.allocator().alloc( 177 std.atomic.Stack(ResumeNode.EventFd).Node, 178 resume_node_count, 179 ); 180 181 self.extra_threads = try self.arena.allocator().alloc(Thread, extra_thread_count); 182 183 try self.initOsData(extra_thread_count); 184 errdefer self.deinitOsData(); 185 186 if (!builtin.single_threaded) { 187 self.fs_thread = try Thread.spawn(.{}, posixFsRun, .{self}); 188 } 189 errdefer if (!builtin.single_threaded) { 190 self.posixFsRequest(&self.fs_end_request); 191 self.fs_thread.join(); 192 }; 193 194 if (!builtin.single_threaded) 195 try self.delay_queue.init(); 196 } 197 198 pub fn deinit(self: *Loop) void { 199 self.deinitOsData(); 200 self.fs_thread_wakeup.deinit(); 201 self.arena.deinit(); 202 self.* = undefined; 203 } 204 205 const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError || 206 Thread.SpawnError || os.EpollCtlError || os.KEventError || 207 windows.CreateIoCompletionPortError; 208 209 const wakeup_bytes = [_]u8{0x1} ** 8; 210 211 fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { 212 nosuspend switch (builtin.os.tag) { 213 .linux => { 214 errdefer { 215 while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); 216 } 217 for (self.eventfd_resume_nodes) |*eventfd_node| { 218 eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ 219 .data = ResumeNode.EventFd{ 220 .base = ResumeNode{ 221 .id = .EventFd, 222 .handle = undefined, 223 .overlapped = ResumeNode.overlapped_init, 224 }, 225 .eventfd = try os.eventfd(1, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK), 226 .epoll_op = os.linux.EPOLL.CTL_ADD, 227 }, 228 .next = undefined, 229 }; 230 self.available_eventfd_resume_nodes.push(eventfd_node); 231 } 232 233 self.os_data.epollfd = try os.epoll_create1(os.linux.EPOLL.CLOEXEC); 234 errdefer os.close(self.os_data.epollfd); 235 236 self.os_data.final_eventfd = try os.eventfd(0, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK); 237 errdefer os.close(self.os_data.final_eventfd); 238 239 self.os_data.final_eventfd_event = os.linux.epoll_event{ 240 .events = os.linux.EPOLL.IN, 241 .data = os.linux.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, 242 }; 243 try os.epoll_ctl( 244 self.os_data.epollfd, 245 os.linux.EPOLL.CTL_ADD, 246 self.os_data.final_eventfd, 247 &self.os_data.final_eventfd_event, 248 ); 249 250 if (builtin.single_threaded) { 251 assert(extra_thread_count == 0); 252 return; 253 } 254 255 var extra_thread_index: usize = 0; 256 errdefer { 257 // writing 8 bytes to an eventfd cannot fail 258 const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; 259 assert(amt == wakeup_bytes.len); 260 while (extra_thread_index != 0) { 261 extra_thread_index -= 1; 262 self.extra_threads[extra_thread_index].join(); 263 } 264 } 265 while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { 266 self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); 267 } 268 }, 269 .macos, .freebsd, .netbsd, .dragonfly => { 270 self.os_data.kqfd = try os.kqueue(); 271 errdefer os.close(self.os_data.kqfd); 272 273 const empty_kevs = &[0]os.Kevent{}; 274 275 for (self.eventfd_resume_nodes) |*eventfd_node, i| { 276 eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ 277 .data = ResumeNode.EventFd{ 278 .base = ResumeNode{ 279 .id = ResumeNode.Id.EventFd, 280 .handle = undefined, 281 .overlapped = ResumeNode.overlapped_init, 282 }, 283 // this one is for sending events 284 .kevent = os.Kevent{ 285 .ident = i, 286 .filter = os.system.EVFILT_USER, 287 .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE, 288 .fflags = 0, 289 .data = 0, 290 .udata = @ptrToInt(&eventfd_node.data.base), 291 }, 292 }, 293 .next = undefined, 294 }; 295 self.available_eventfd_resume_nodes.push(eventfd_node); 296 const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent); 297 _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); 298 eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE; 299 eventfd_node.data.kevent.fflags = os.system.NOTE_TRIGGER; 300 } 301 302 // Pre-add so that we cannot get error.SystemResources 303 // later when we try to activate it. 304 self.os_data.final_kevent = os.Kevent{ 305 .ident = extra_thread_count, 306 .filter = os.system.EVFILT_USER, 307 .flags = os.system.EV_ADD | os.system.EV_DISABLE, 308 .fflags = 0, 309 .data = 0, 310 .udata = @ptrToInt(&self.final_resume_node), 311 }; 312 const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent); 313 _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); 314 self.os_data.final_kevent.flags = os.system.EV_ENABLE; 315 self.os_data.final_kevent.fflags = os.system.NOTE_TRIGGER; 316 317 if (builtin.single_threaded) { 318 assert(extra_thread_count == 0); 319 return; 320 } 321 322 var extra_thread_index: usize = 0; 323 errdefer { 324 _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; 325 while (extra_thread_index != 0) { 326 extra_thread_index -= 1; 327 self.extra_threads[extra_thread_index].join(); 328 } 329 } 330 while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { 331 self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); 332 } 333 }, 334 .openbsd => { 335 self.os_data.kqfd = try os.kqueue(); 336 errdefer os.close(self.os_data.kqfd); 337 338 const empty_kevs = &[0]os.Kevent{}; 339 340 for (self.eventfd_resume_nodes) |*eventfd_node, i| { 341 eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ 342 .data = ResumeNode.EventFd{ 343 .base = ResumeNode{ 344 .id = ResumeNode.Id.EventFd, 345 .handle = undefined, 346 .overlapped = ResumeNode.overlapped_init, 347 }, 348 // this one is for sending events 349 .kevent = os.Kevent{ 350 .ident = i, 351 .filter = os.system.EVFILT_TIMER, 352 .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE | os.system.EV_ONESHOT, 353 .fflags = 0, 354 .data = 0, 355 .udata = @ptrToInt(&eventfd_node.data.base), 356 }, 357 }, 358 .next = undefined, 359 }; 360 self.available_eventfd_resume_nodes.push(eventfd_node); 361 const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent); 362 _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); 363 eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE; 364 } 365 366 // Pre-add so that we cannot get error.SystemResources 367 // later when we try to activate it. 368 self.os_data.final_kevent = os.Kevent{ 369 .ident = extra_thread_count, 370 .filter = os.system.EVFILT_TIMER, 371 .flags = os.system.EV_ADD | os.system.EV_ONESHOT | os.system.EV_DISABLE, 372 .fflags = 0, 373 .data = 0, 374 .udata = @ptrToInt(&self.final_resume_node), 375 }; 376 const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent); 377 _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); 378 self.os_data.final_kevent.flags = os.system.EV_ENABLE; 379 380 if (builtin.single_threaded) { 381 assert(extra_thread_count == 0); 382 return; 383 } 384 385 var extra_thread_index: usize = 0; 386 errdefer { 387 _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; 388 while (extra_thread_index != 0) { 389 extra_thread_index -= 1; 390 self.extra_threads[extra_thread_index].join(); 391 } 392 } 393 while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { 394 self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); 395 } 396 }, 397 .windows => { 398 self.os_data.io_port = try windows.CreateIoCompletionPort( 399 windows.INVALID_HANDLE_VALUE, 400 null, 401 undefined, 402 maxInt(windows.DWORD), 403 ); 404 errdefer windows.CloseHandle(self.os_data.io_port); 405 406 for (self.eventfd_resume_nodes) |*eventfd_node| { 407 eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ 408 .data = ResumeNode.EventFd{ 409 .base = ResumeNode{ 410 .id = ResumeNode.Id.EventFd, 411 .handle = undefined, 412 .overlapped = ResumeNode.overlapped_init, 413 }, 414 // this one is for sending events 415 .completion_key = @ptrToInt(&eventfd_node.data.base), 416 }, 417 .next = undefined, 418 }; 419 self.available_eventfd_resume_nodes.push(eventfd_node); 420 } 421 422 if (builtin.single_threaded) { 423 assert(extra_thread_count == 0); 424 return; 425 } 426 427 var extra_thread_index: usize = 0; 428 errdefer { 429 var i: usize = 0; 430 while (i < extra_thread_index) : (i += 1) { 431 while (true) { 432 const overlapped = &self.final_resume_node.overlapped; 433 windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; 434 break; 435 } 436 } 437 while (extra_thread_index != 0) { 438 extra_thread_index -= 1; 439 self.extra_threads[extra_thread_index].join(); 440 } 441 } 442 while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { 443 self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); 444 } 445 }, 446 else => {}, 447 }; 448 } 449 450 fn deinitOsData(self: *Loop) void { 451 nosuspend switch (builtin.os.tag) { 452 .linux => { 453 os.close(self.os_data.final_eventfd); 454 while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); 455 os.close(self.os_data.epollfd); 456 }, 457 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 458 os.close(self.os_data.kqfd); 459 }, 460 .windows => { 461 windows.CloseHandle(self.os_data.io_port); 462 }, 463 else => {}, 464 }; 465 } 466 467 /// resume_node must live longer than the anyframe that it holds a reference to. 468 /// flags must contain EPOLLET 469 pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void { 470 assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET); 471 self.beginOneEvent(); 472 errdefer self.finishOneEvent(); 473 try self.linuxModFd( 474 fd, 475 os.linux.EPOLL.CTL_ADD, 476 flags, 477 resume_node, 478 ); 479 } 480 481 pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void { 482 assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET); 483 var ev = os.linux.epoll_event{ 484 .events = flags, 485 .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, 486 }; 487 try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev); 488 } 489 490 pub fn linuxRemoveFd(self: *Loop, fd: i32) void { 491 os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL.CTL_DEL, fd, null) catch {}; 492 self.finishOneEvent(); 493 } 494 495 pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void { 496 assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET); 497 assert(flags & os.linux.EPOLL.ONESHOT == os.linux.EPOLL.ONESHOT); 498 var resume_node = ResumeNode.Basic{ 499 .base = ResumeNode{ 500 .id = .Basic, 501 .handle = @frame(), 502 .overlapped = ResumeNode.overlapped_init, 503 }, 504 }; 505 var need_to_delete = true; 506 defer if (need_to_delete) self.linuxRemoveFd(fd); 507 508 suspend { 509 self.linuxAddFd(fd, &resume_node.base, flags) catch |err| switch (err) { 510 error.FileDescriptorNotRegistered => unreachable, 511 error.OperationCausesCircularLoop => unreachable, 512 error.FileDescriptorIncompatibleWithEpoll => unreachable, 513 error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe 514 515 error.SystemResources, 516 error.UserResourceLimitReached, 517 error.Unexpected, 518 => { 519 need_to_delete = false; 520 // Fall back to a blocking poll(). Ideally this codepath is never hit, since 521 // epoll should be just fine. But this is better than incorrect behavior. 522 var poll_flags: i16 = 0; 523 if ((flags & os.linux.EPOLL.IN) != 0) poll_flags |= os.POLL.IN; 524 if ((flags & os.linux.EPOLL.OUT) != 0) poll_flags |= os.POLL.OUT; 525 var pfd = [1]os.pollfd{os.pollfd{ 526 .fd = fd, 527 .events = poll_flags, 528 .revents = undefined, 529 }}; 530 _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) { 531 error.NetworkSubsystemFailed => unreachable, // only possible on windows 532 533 error.SystemResources, 534 error.Unexpected, 535 => { 536 // Even poll() didn't work. The best we can do now is sleep for a 537 // small duration and then hope that something changed. 538 std.time.sleep(1 * std.time.ns_per_ms); 539 }, 540 }; 541 resume @frame(); 542 }, 543 }; 544 } 545 } 546 547 pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void { 548 switch (builtin.os.tag) { 549 .linux => { 550 self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN); 551 }, 552 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 553 self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_READ, os.system.EV_ONESHOT); 554 }, 555 else => @compileError("Unsupported OS"), 556 } 557 } 558 559 pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void { 560 switch (builtin.os.tag) { 561 .linux => { 562 self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT); 563 }, 564 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 565 self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_WRITE, os.system.EV_ONESHOT); 566 }, 567 else => @compileError("Unsupported OS"), 568 } 569 } 570 571 pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void { 572 switch (builtin.os.tag) { 573 .linux => { 574 self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT | os.linux.EPOLL.IN); 575 }, 576 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 577 self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_READ, os.system.EV_ONESHOT); 578 self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_WRITE, os.system.EV_ONESHOT); 579 }, 580 else => @compileError("Unsupported OS"), 581 } 582 } 583 584 pub fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, flags: u16) void { 585 var resume_node = ResumeNode.Basic{ 586 .base = ResumeNode{ 587 .id = ResumeNode.Id.Basic, 588 .handle = @frame(), 589 .overlapped = ResumeNode.overlapped_init, 590 }, 591 .kev = undefined, 592 }; 593 594 defer { 595 // If the kevent was set to be ONESHOT, it doesn't need to be deleted manually. 596 if (flags & os.system.EV_ONESHOT != 0) { 597 self.bsdRemoveKev(ident, filter); 598 } 599 } 600 601 suspend { 602 self.bsdAddKev(&resume_node, ident, filter, flags) catch unreachable; 603 } 604 } 605 606 /// resume_node must live longer than the anyframe that it holds a reference to. 607 pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, flags: u16) !void { 608 self.beginOneEvent(); 609 errdefer self.finishOneEvent(); 610 var kev = [1]os.Kevent{os.Kevent{ 611 .ident = ident, 612 .filter = filter, 613 .flags = os.system.EV_ADD | os.system.EV_ENABLE | os.system.EV_CLEAR | flags, 614 .fflags = 0, 615 .data = 0, 616 .udata = @ptrToInt(&resume_node.base), 617 }}; 618 const empty_kevs = &[0]os.Kevent{}; 619 _ = try os.kevent(self.os_data.kqfd, &kev, empty_kevs, null); 620 } 621 622 pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void { 623 var kev = [1]os.Kevent{os.Kevent{ 624 .ident = ident, 625 .filter = filter, 626 .flags = os.system.EV_DELETE, 627 .fflags = 0, 628 .data = 0, 629 .udata = 0, 630 }}; 631 const empty_kevs = &[0]os.Kevent{}; 632 _ = os.kevent(self.os_data.kqfd, &kev, empty_kevs, null) catch undefined; 633 self.finishOneEvent(); 634 } 635 636 fn dispatch(self: *Loop) void { 637 while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { 638 const next_tick_node = self.next_tick_queue.get() orelse { 639 self.available_eventfd_resume_nodes.push(resume_stack_node); 640 return; 641 }; 642 const eventfd_node = &resume_stack_node.data; 643 eventfd_node.base.handle = next_tick_node.data; 644 switch (builtin.os.tag) { 645 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 646 const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.kevent); 647 const empty_kevs = &[0]os.Kevent{}; 648 _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { 649 self.next_tick_queue.unget(next_tick_node); 650 self.available_eventfd_resume_nodes.push(resume_stack_node); 651 return; 652 }; 653 }, 654 .linux => { 655 // the pending count is already accounted for 656 const epoll_events = os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN | os.linux.EPOLL.OUT | 657 os.linux.EPOLL.ET; 658 self.linuxModFd( 659 eventfd_node.eventfd, 660 eventfd_node.epoll_op, 661 epoll_events, 662 &eventfd_node.base, 663 ) catch { 664 self.next_tick_queue.unget(next_tick_node); 665 self.available_eventfd_resume_nodes.push(resume_stack_node); 666 return; 667 }; 668 }, 669 .windows => { 670 windows.PostQueuedCompletionStatus( 671 self.os_data.io_port, 672 undefined, 673 undefined, 674 &eventfd_node.base.overlapped, 675 ) catch { 676 self.next_tick_queue.unget(next_tick_node); 677 self.available_eventfd_resume_nodes.push(resume_stack_node); 678 return; 679 }; 680 }, 681 else => @compileError("unsupported OS"), 682 } 683 } 684 } 685 686 /// Bring your own linked list node. This means it can't fail. 687 pub fn onNextTick(self: *Loop, node: *NextTickNode) void { 688 self.beginOneEvent(); // finished in dispatch() 689 self.next_tick_queue.put(node); 690 self.dispatch(); 691 } 692 693 pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void { 694 if (self.next_tick_queue.remove(node)) { 695 self.finishOneEvent(); 696 } 697 } 698 699 pub fn run(self: *Loop) void { 700 self.finishOneEvent(); // the reference we start with 701 702 self.workerRun(); 703 704 if (!builtin.single_threaded) { 705 switch (builtin.os.tag) { 706 .linux, 707 .macos, 708 .freebsd, 709 .netbsd, 710 .dragonfly, 711 .openbsd, 712 => self.fs_thread.join(), 713 else => {}, 714 } 715 } 716 717 for (self.extra_threads) |extra_thread| { 718 extra_thread.join(); 719 } 720 721 @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst); 722 self.delay_queue.event.set(); 723 self.delay_queue.thread.join(); 724 } 725 726 /// Runs the provided function asynchronously. The function's frame is allocated 727 /// with `allocator` and freed when the function returns. 728 /// `func` must return void and it can be an async function. 729 /// Yields to the event loop, running the function on the next tick. 730 pub fn runDetached(self: *Loop, alloc: mem.Allocator, comptime func: anytype, args: anytype) error{OutOfMemory}!void { 731 if (!std.io.is_async) @compileError("Can't use runDetached in non-async mode!"); 732 if (@TypeOf(@call(.{}, func, args)) != void) { 733 @compileError("`func` must not have a return value"); 734 } 735 736 const Wrapper = struct { 737 const Args = @TypeOf(args); 738 fn run(func_args: Args, loop: *Loop, allocator: mem.Allocator) void { 739 loop.beginOneEvent(); 740 loop.yield(); 741 @call(.{}, func, func_args); // compile error when called with non-void ret type 742 suspend { 743 loop.finishOneEvent(); 744 allocator.destroy(@frame()); 745 } 746 } 747 }; 748 749 var run_frame = try alloc.create(@Frame(Wrapper.run)); 750 run_frame.* = async Wrapper.run(args, self, alloc); 751 } 752 753 /// Yielding lets the event loop run, starting any unstarted async operations. 754 /// Note that async operations automatically start when a function yields for any other reason, 755 /// for example, when async I/O is performed. This function is intended to be used only when 756 /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O 757 /// is performed. 758 pub fn yield(self: *Loop) void { 759 suspend { 760 var my_tick_node = NextTickNode{ 761 .prev = undefined, 762 .next = undefined, 763 .data = @frame(), 764 }; 765 self.onNextTick(&my_tick_node); 766 } 767 } 768 769 /// If the build is multi-threaded and there is an event loop, then it calls `yield`. Otherwise, 770 /// does nothing. 771 pub fn startCpuBoundOperation() void { 772 if (builtin.single_threaded) { 773 return; 774 } else if (instance) |event_loop| { 775 event_loop.yield(); 776 } 777 } 778 779 /// call finishOneEvent when done 780 pub fn beginOneEvent(self: *Loop) void { 781 _ = @atomicRmw(usize, &self.pending_event_count, .Add, 1, .SeqCst); 782 } 783 784 pub fn finishOneEvent(self: *Loop) void { 785 nosuspend { 786 const prev = @atomicRmw(usize, &self.pending_event_count, .Sub, 1, .SeqCst); 787 if (prev != 1) return; 788 789 // cause all the threads to stop 790 self.posixFsRequest(&self.fs_end_request); 791 792 switch (builtin.os.tag) { 793 .linux => { 794 // writing to the eventfd will only wake up one thread, thus multiple writes 795 // are needed to wakeup all the threads 796 var i: usize = 0; 797 while (i < self.extra_threads.len + 1) : (i += 1) { 798 // writing 8 bytes to an eventfd cannot fail 799 const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; 800 assert(amt == wakeup_bytes.len); 801 } 802 return; 803 }, 804 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 805 const final_kevent = @as(*const [1]os.Kevent, &self.os_data.final_kevent); 806 const empty_kevs = &[0]os.Kevent{}; 807 // cannot fail because we already added it and this just enables it 808 _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; 809 return; 810 }, 811 .windows => { 812 var i: usize = 0; 813 while (i < self.extra_threads.len + 1) : (i += 1) { 814 while (true) { 815 const overlapped = &self.final_resume_node.overlapped; 816 windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; 817 break; 818 } 819 } 820 return; 821 }, 822 else => @compileError("unsupported OS"), 823 } 824 } 825 } 826 827 pub fn sleep(self: *Loop, nanoseconds: u64) void { 828 if (builtin.single_threaded) 829 @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded"); 830 831 suspend { 832 const now = self.delay_queue.timer.read(); 833 834 var entry: DelayQueue.Waiters.Entry = undefined; 835 entry.init(@frame(), now + nanoseconds); 836 self.delay_queue.waiters.insert(&entry); 837 838 // Speculatively wake up the timer thread when we add a new entry. 839 // If the timer thread is sleeping on a longer entry, we need to 840 // interrupt it so that our entry can be expired in time. 841 self.delay_queue.event.set(); 842 } 843 } 844 845 const DelayQueue = struct { 846 timer: std.time.Timer, 847 waiters: Waiters, 848 thread: std.Thread, 849 event: std.Thread.AutoResetEvent, 850 is_running: bool, 851 852 /// Initialize the delay queue by spawning the timer thread 853 /// and starting any timer resources. 854 fn init(self: *DelayQueue) !void { 855 self.* = DelayQueue{ 856 .timer = try std.time.Timer.start(), 857 .waiters = DelayQueue.Waiters{ 858 .entries = std.atomic.Queue(anyframe).init(), 859 }, 860 .event = std.Thread.AutoResetEvent{}, 861 .is_running = true, 862 // Must be last so that it can read the other state, such as `is_running`. 863 .thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}), 864 }; 865 } 866 867 /// Entry point for the timer thread 868 /// which waits for timer entries to expire and reschedules them. 869 fn run(self: *DelayQueue) void { 870 const loop = @fieldParentPtr(Loop, "delay_queue", self); 871 872 while (@atomicLoad(bool, &self.is_running, .SeqCst)) { 873 const now = self.timer.read(); 874 875 if (self.waiters.popExpired(now)) |entry| { 876 loop.onNextTick(&entry.node); 877 continue; 878 } 879 880 if (self.waiters.nextExpire()) |expires| { 881 if (now >= expires) 882 continue; 883 self.event.timedWait(expires - now) catch {}; 884 } else { 885 self.event.wait(); 886 } 887 } 888 } 889 890 // TODO: use a tickless heirarchical timer wheel: 891 // https://github.com/wahern/timeout/ 892 const Waiters = struct { 893 entries: std.atomic.Queue(anyframe), 894 895 const Entry = struct { 896 node: NextTickNode, 897 expires: u64, 898 899 fn init(self: *Entry, frame: anyframe, expires: u64) void { 900 self.node.data = frame; 901 self.expires = expires; 902 } 903 }; 904 905 /// Registers the entry into the queue of waiting frames 906 fn insert(self: *Waiters, entry: *Entry) void { 907 self.entries.put(&entry.node); 908 } 909 910 /// Dequeues one expired event relative to `now` 911 fn popExpired(self: *Waiters, now: u64) ?*Entry { 912 const entry = self.peekExpiringEntry() orelse return null; 913 if (entry.expires > now) 914 return null; 915 916 assert(self.entries.remove(&entry.node)); 917 return entry; 918 } 919 920 /// Returns an estimate for the amount of time 921 /// to wait until the next waiting entry expires. 922 fn nextExpire(self: *Waiters) ?u64 { 923 const entry = self.peekExpiringEntry() orelse return null; 924 return entry.expires; 925 } 926 927 fn peekExpiringEntry(self: *Waiters) ?*Entry { 928 self.entries.mutex.lock(); 929 defer self.entries.mutex.unlock(); 930 931 // starting from the head 932 var head = self.entries.head orelse return null; 933 934 // traverse the list of waiting entires to 935 // find the Node with the smallest `expires` field 936 var min = head; 937 while (head.next) |node| { 938 const minEntry = @fieldParentPtr(Entry, "node", min); 939 const nodeEntry = @fieldParentPtr(Entry, "node", node); 940 if (nodeEntry.expires < minEntry.expires) 941 min = node; 942 head = node; 943 } 944 945 return @fieldParentPtr(Entry, "node", min); 946 } 947 }; 948 }; 949 950 /// ------- I/0 APIs ------- 951 pub fn accept( 952 self: *Loop, 953 /// This argument is a socket that has been created with `socket`, bound to a local address 954 /// with `bind`, and is listening for connections after a `listen`. 955 sockfd: os.socket_t, 956 /// This argument is a pointer to a sockaddr structure. This structure is filled in with the 957 /// address of the peer socket, as known to the communications layer. The exact format of the 958 /// address returned addr is determined by the socket's address family (see `socket` and the 959 /// respective protocol man pages). 960 addr: *os.sockaddr, 961 /// This argument is a value-result argument: the caller must initialize it to contain the 962 /// size (in bytes) of the structure pointed to by addr; on return it will contain the actual size 963 /// of the peer address. 964 /// 965 /// The returned address is truncated if the buffer provided is too small; in this case, `addr_size` 966 /// will return a value greater than was supplied to the call. 967 addr_size: *os.socklen_t, 968 /// The following values can be bitwise ORed in flags to obtain different behavior: 969 /// * `SOCK.CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the 970 /// description of the `O.CLOEXEC` flag in `open` for reasons why this may be useful. 971 flags: u32, 972 ) os.AcceptError!os.socket_t { 973 while (true) { 974 return os.accept(sockfd, addr, addr_size, flags | os.SOCK.NONBLOCK) catch |err| switch (err) { 975 error.WouldBlock => { 976 self.waitUntilFdReadable(sockfd); 977 continue; 978 }, 979 else => return err, 980 }; 981 } 982 } 983 984 pub fn connect(self: *Loop, sockfd: os.socket_t, sock_addr: *const os.sockaddr, len: os.socklen_t) os.ConnectError!void { 985 os.connect(sockfd, sock_addr, len) catch |err| switch (err) { 986 error.WouldBlock => { 987 self.waitUntilFdWritable(sockfd); 988 return os.getsockoptError(sockfd); 989 }, 990 else => return err, 991 }; 992 } 993 994 /// Performs an async `os.open` using a separate thread. 995 pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t { 996 var req_node = Request.Node{ 997 .data = .{ 998 .msg = .{ 999 .open = .{ 1000 .path = file_path, 1001 .flags = flags, 1002 .mode = mode, 1003 .result = undefined, 1004 }, 1005 }, 1006 .finish = .{ .TickNode = .{ .data = @frame() } }, 1007 }, 1008 }; 1009 suspend { 1010 self.posixFsRequest(&req_node); 1011 } 1012 return req_node.data.msg.open.result; 1013 } 1014 1015 /// Performs an async `os.opent` using a separate thread. 1016 pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t { 1017 var req_node = Request.Node{ 1018 .data = .{ 1019 .msg = .{ 1020 .openat = .{ 1021 .fd = fd, 1022 .path = file_path, 1023 .flags = flags, 1024 .mode = mode, 1025 .result = undefined, 1026 }, 1027 }, 1028 .finish = .{ .TickNode = .{ .data = @frame() } }, 1029 }, 1030 }; 1031 suspend { 1032 self.posixFsRequest(&req_node); 1033 } 1034 return req_node.data.msg.openat.result; 1035 } 1036 1037 /// Performs an async `os.close` using a separate thread. 1038 pub fn close(self: *Loop, fd: os.fd_t) void { 1039 var req_node = Request.Node{ 1040 .data = .{ 1041 .msg = .{ .close = .{ .fd = fd } }, 1042 .finish = .{ .TickNode = .{ .data = @frame() } }, 1043 }, 1044 }; 1045 suspend { 1046 self.posixFsRequest(&req_node); 1047 } 1048 } 1049 1050 /// Performs an async `os.read` using a separate thread. 1051 /// `fd` must block and not return EAGAIN. 1052 pub fn read(self: *Loop, fd: os.fd_t, buf: []u8, simulate_evented: bool) os.ReadError!usize { 1053 if (simulate_evented) { 1054 var req_node = Request.Node{ 1055 .data = .{ 1056 .msg = .{ 1057 .read = .{ 1058 .fd = fd, 1059 .buf = buf, 1060 .result = undefined, 1061 }, 1062 }, 1063 .finish = .{ .TickNode = .{ .data = @frame() } }, 1064 }, 1065 }; 1066 suspend { 1067 self.posixFsRequest(&req_node); 1068 } 1069 return req_node.data.msg.read.result; 1070 } else { 1071 while (true) { 1072 return os.read(fd, buf) catch |err| switch (err) { 1073 error.WouldBlock => { 1074 self.waitUntilFdReadable(fd); 1075 continue; 1076 }, 1077 else => return err, 1078 }; 1079 } 1080 } 1081 } 1082 1083 /// Performs an async `os.readv` using a separate thread. 1084 /// `fd` must block and not return EAGAIN. 1085 pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, simulate_evented: bool) os.ReadError!usize { 1086 if (simulate_evented) { 1087 var req_node = Request.Node{ 1088 .data = .{ 1089 .msg = .{ 1090 .readv = .{ 1091 .fd = fd, 1092 .iov = iov, 1093 .result = undefined, 1094 }, 1095 }, 1096 .finish = .{ .TickNode = .{ .data = @frame() } }, 1097 }, 1098 }; 1099 suspend { 1100 self.posixFsRequest(&req_node); 1101 } 1102 return req_node.data.msg.readv.result; 1103 } else { 1104 while (true) { 1105 return os.readv(fd, iov) catch |err| switch (err) { 1106 error.WouldBlock => { 1107 self.waitUntilFdReadable(fd); 1108 continue; 1109 }, 1110 else => return err, 1111 }; 1112 } 1113 } 1114 } 1115 1116 /// Performs an async `os.pread` using a separate thread. 1117 /// `fd` must block and not return EAGAIN. 1118 pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64, simulate_evented: bool) os.PReadError!usize { 1119 if (simulate_evented) { 1120 var req_node = Request.Node{ 1121 .data = .{ 1122 .msg = .{ 1123 .pread = .{ 1124 .fd = fd, 1125 .buf = buf, 1126 .offset = offset, 1127 .result = undefined, 1128 }, 1129 }, 1130 .finish = .{ .TickNode = .{ .data = @frame() } }, 1131 }, 1132 }; 1133 suspend { 1134 self.posixFsRequest(&req_node); 1135 } 1136 return req_node.data.msg.pread.result; 1137 } else { 1138 while (true) { 1139 return os.pread(fd, buf, offset) catch |err| switch (err) { 1140 error.WouldBlock => { 1141 self.waitUntilFdReadable(fd); 1142 continue; 1143 }, 1144 else => return err, 1145 }; 1146 } 1147 } 1148 } 1149 1150 /// Performs an async `os.preadv` using a separate thread. 1151 /// `fd` must block and not return EAGAIN. 1152 pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64, simulate_evented: bool) os.ReadError!usize { 1153 if (simulate_evented) { 1154 var req_node = Request.Node{ 1155 .data = .{ 1156 .msg = .{ 1157 .preadv = .{ 1158 .fd = fd, 1159 .iov = iov, 1160 .offset = offset, 1161 .result = undefined, 1162 }, 1163 }, 1164 .finish = .{ .TickNode = .{ .data = @frame() } }, 1165 }, 1166 }; 1167 suspend { 1168 self.posixFsRequest(&req_node); 1169 } 1170 return req_node.data.msg.preadv.result; 1171 } else { 1172 while (true) { 1173 return os.preadv(fd, iov, offset) catch |err| switch (err) { 1174 error.WouldBlock => { 1175 self.waitUntilFdReadable(fd); 1176 continue; 1177 }, 1178 else => return err, 1179 }; 1180 } 1181 } 1182 } 1183 1184 /// Performs an async `os.write` using a separate thread. 1185 /// `fd` must block and not return EAGAIN. 1186 pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8, simulate_evented: bool) os.WriteError!usize { 1187 if (simulate_evented) { 1188 var req_node = Request.Node{ 1189 .data = .{ 1190 .msg = .{ 1191 .write = .{ 1192 .fd = fd, 1193 .bytes = bytes, 1194 .result = undefined, 1195 }, 1196 }, 1197 .finish = .{ .TickNode = .{ .data = @frame() } }, 1198 }, 1199 }; 1200 suspend { 1201 self.posixFsRequest(&req_node); 1202 } 1203 return req_node.data.msg.write.result; 1204 } else { 1205 while (true) { 1206 return os.write(fd, bytes) catch |err| switch (err) { 1207 error.WouldBlock => { 1208 self.waitUntilFdWritable(fd); 1209 continue; 1210 }, 1211 else => return err, 1212 }; 1213 } 1214 } 1215 } 1216 1217 /// Performs an async `os.writev` using a separate thread. 1218 /// `fd` must block and not return EAGAIN. 1219 pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, simulate_evented: bool) os.WriteError!usize { 1220 if (simulate_evented) { 1221 var req_node = Request.Node{ 1222 .data = .{ 1223 .msg = .{ 1224 .writev = .{ 1225 .fd = fd, 1226 .iov = iov, 1227 .result = undefined, 1228 }, 1229 }, 1230 .finish = .{ .TickNode = .{ .data = @frame() } }, 1231 }, 1232 }; 1233 suspend { 1234 self.posixFsRequest(&req_node); 1235 } 1236 return req_node.data.msg.writev.result; 1237 } else { 1238 while (true) { 1239 return os.writev(fd, iov) catch |err| switch (err) { 1240 error.WouldBlock => { 1241 self.waitUntilFdWritable(fd); 1242 continue; 1243 }, 1244 else => return err, 1245 }; 1246 } 1247 } 1248 } 1249 1250 /// Performs an async `os.pwrite` using a separate thread. 1251 /// `fd` must block and not return EAGAIN. 1252 pub fn pwrite(self: *Loop, fd: os.fd_t, bytes: []const u8, offset: u64, simulate_evented: bool) os.PerformsWriteError!usize { 1253 if (simulate_evented) { 1254 var req_node = Request.Node{ 1255 .data = .{ 1256 .msg = .{ 1257 .pwrite = .{ 1258 .fd = fd, 1259 .bytes = bytes, 1260 .offset = offset, 1261 .result = undefined, 1262 }, 1263 }, 1264 .finish = .{ .TickNode = .{ .data = @frame() } }, 1265 }, 1266 }; 1267 suspend { 1268 self.posixFsRequest(&req_node); 1269 } 1270 return req_node.data.msg.pwrite.result; 1271 } else { 1272 while (true) { 1273 return os.pwrite(fd, bytes, offset) catch |err| switch (err) { 1274 error.WouldBlock => { 1275 self.waitUntilFdWritable(fd); 1276 continue; 1277 }, 1278 else => return err, 1279 }; 1280 } 1281 } 1282 } 1283 1284 /// Performs an async `os.pwritev` using a separate thread. 1285 /// `fd` must block and not return EAGAIN. 1286 pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64, simulate_evented: bool) os.PWriteError!usize { 1287 if (simulate_evented) { 1288 var req_node = Request.Node{ 1289 .data = .{ 1290 .msg = .{ 1291 .pwritev = .{ 1292 .fd = fd, 1293 .iov = iov, 1294 .offset = offset, 1295 .result = undefined, 1296 }, 1297 }, 1298 .finish = .{ .TickNode = .{ .data = @frame() } }, 1299 }, 1300 }; 1301 suspend { 1302 self.posixFsRequest(&req_node); 1303 } 1304 return req_node.data.msg.pwritev.result; 1305 } else { 1306 while (true) { 1307 return os.pwritev(fd, iov, offset) catch |err| switch (err) { 1308 error.WouldBlock => { 1309 self.waitUntilFdWritable(fd); 1310 continue; 1311 }, 1312 else => return err, 1313 }; 1314 } 1315 } 1316 } 1317 1318 pub fn sendto( 1319 self: *Loop, 1320 /// The file descriptor of the sending socket. 1321 sockfd: os.fd_t, 1322 /// Message to send. 1323 buf: []const u8, 1324 flags: u32, 1325 dest_addr: ?*const os.sockaddr, 1326 addrlen: os.socklen_t, 1327 ) os.SendToError!usize { 1328 while (true) { 1329 return os.sendto(sockfd, buf, flags, dest_addr, addrlen) catch |err| switch (err) { 1330 error.WouldBlock => { 1331 self.waitUntilFdWritable(sockfd); 1332 continue; 1333 }, 1334 else => return err, 1335 }; 1336 } 1337 } 1338 1339 pub fn recvfrom( 1340 self: *Loop, 1341 sockfd: os.fd_t, 1342 buf: []u8, 1343 flags: u32, 1344 src_addr: ?*os.sockaddr, 1345 addrlen: ?*os.socklen_t, 1346 ) os.RecvFromError!usize { 1347 while (true) { 1348 return os.recvfrom(sockfd, buf, flags, src_addr, addrlen) catch |err| switch (err) { 1349 error.WouldBlock => { 1350 self.waitUntilFdReadable(sockfd); 1351 continue; 1352 }, 1353 else => return err, 1354 }; 1355 } 1356 } 1357 1358 /// Performs an async `os.faccessatZ` using a separate thread. 1359 /// `fd` must block and not return EAGAIN. 1360 pub fn faccessatZ( 1361 self: *Loop, 1362 dirfd: os.fd_t, 1363 path_z: [*:0]const u8, 1364 mode: u32, 1365 flags: u32, 1366 ) os.AccessError!void { 1367 var req_node = Request.Node{ 1368 .data = .{ 1369 .msg = .{ 1370 .faccessat = .{ 1371 .dirfd = dirfd, 1372 .path = path_z, 1373 .mode = mode, 1374 .flags = flags, 1375 .result = undefined, 1376 }, 1377 }, 1378 .finish = .{ .TickNode = .{ .data = @frame() } }, 1379 }, 1380 }; 1381 suspend { 1382 self.posixFsRequest(&req_node); 1383 } 1384 return req_node.data.msg.faccessat.result; 1385 } 1386 1387 fn workerRun(self: *Loop) void { 1388 while (true) { 1389 while (true) { 1390 const next_tick_node = self.next_tick_queue.get() orelse break; 1391 self.dispatch(); 1392 resume next_tick_node.data; 1393 self.finishOneEvent(); 1394 } 1395 1396 switch (builtin.os.tag) { 1397 .linux => { 1398 // only process 1 event so we don't steal from other threads 1399 var events: [1]os.linux.epoll_event = undefined; 1400 const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1); 1401 for (events[0..count]) |ev| { 1402 const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); 1403 const handle = resume_node.handle; 1404 const resume_node_id = resume_node.id; 1405 switch (resume_node_id) { 1406 .Basic => {}, 1407 .Stop => return, 1408 .EventFd => { 1409 const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); 1410 event_fd_node.epoll_op = os.linux.EPOLL.CTL_MOD; 1411 const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); 1412 self.available_eventfd_resume_nodes.push(stack_node); 1413 }, 1414 } 1415 resume handle; 1416 if (resume_node_id == ResumeNode.Id.EventFd) { 1417 self.finishOneEvent(); 1418 } 1419 } 1420 }, 1421 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { 1422 var eventlist: [1]os.Kevent = undefined; 1423 const empty_kevs = &[0]os.Kevent{}; 1424 const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; 1425 for (eventlist[0..count]) |ev| { 1426 const resume_node = @intToPtr(*ResumeNode, ev.udata); 1427 const handle = resume_node.handle; 1428 const resume_node_id = resume_node.id; 1429 switch (resume_node_id) { 1430 .Basic => { 1431 const basic_node = @fieldParentPtr(ResumeNode.Basic, "base", resume_node); 1432 basic_node.kev = ev; 1433 }, 1434 .Stop => return, 1435 .EventFd => { 1436 const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); 1437 const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); 1438 self.available_eventfd_resume_nodes.push(stack_node); 1439 }, 1440 } 1441 resume handle; 1442 if (resume_node_id == ResumeNode.Id.EventFd) { 1443 self.finishOneEvent(); 1444 } 1445 } 1446 }, 1447 .windows => { 1448 var completion_key: usize = undefined; 1449 const overlapped = while (true) { 1450 var nbytes: windows.DWORD = undefined; 1451 var overlapped: ?*windows.OVERLAPPED = undefined; 1452 switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { 1453 .Aborted => return, 1454 .Normal => {}, 1455 .EOF => {}, 1456 .Cancelled => continue, 1457 } 1458 if (overlapped) |o| break o; 1459 } else unreachable; // TODO else unreachable should not be necessary 1460 const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped); 1461 const handle = resume_node.handle; 1462 const resume_node_id = resume_node.id; 1463 switch (resume_node_id) { 1464 .Basic => {}, 1465 .Stop => return, 1466 .EventFd => { 1467 const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); 1468 const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); 1469 self.available_eventfd_resume_nodes.push(stack_node); 1470 }, 1471 } 1472 resume handle; 1473 self.finishOneEvent(); 1474 }, 1475 else => @compileError("unsupported OS"), 1476 } 1477 } 1478 } 1479 1480 fn posixFsRequest(self: *Loop, request_node: *Request.Node) void { 1481 self.beginOneEvent(); // finished in posixFsRun after processing the msg 1482 self.fs_queue.put(request_node); 1483 self.fs_thread_wakeup.set(); 1484 } 1485 1486 fn posixFsCancel(self: *Loop, request_node: *Request.Node) void { 1487 if (self.fs_queue.remove(request_node)) { 1488 self.finishOneEvent(); 1489 } 1490 } 1491 1492 fn posixFsRun(self: *Loop) void { 1493 nosuspend while (true) { 1494 self.fs_thread_wakeup.reset(); 1495 while (self.fs_queue.get()) |node| { 1496 switch (node.data.msg) { 1497 .end => return, 1498 .read => |*msg| { 1499 msg.result = os.read(msg.fd, msg.buf); 1500 }, 1501 .readv => |*msg| { 1502 msg.result = os.readv(msg.fd, msg.iov); 1503 }, 1504 .write => |*msg| { 1505 msg.result = os.write(msg.fd, msg.bytes); 1506 }, 1507 .writev => |*msg| { 1508 msg.result = os.writev(msg.fd, msg.iov); 1509 }, 1510 .pwrite => |*msg| { 1511 msg.result = os.pwrite(msg.fd, msg.bytes, msg.offset); 1512 }, 1513 .pwritev => |*msg| { 1514 msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); 1515 }, 1516 .pread => |*msg| { 1517 msg.result = os.pread(msg.fd, msg.buf, msg.offset); 1518 }, 1519 .preadv => |*msg| { 1520 msg.result = os.preadv(msg.fd, msg.iov, msg.offset); 1521 }, 1522 .open => |*msg| { 1523 if (is_windows) unreachable; // TODO 1524 msg.result = os.openZ(msg.path, msg.flags, msg.mode); 1525 }, 1526 .openat => |*msg| { 1527 if (is_windows) unreachable; // TODO 1528 msg.result = os.openatZ(msg.fd, msg.path, msg.flags, msg.mode); 1529 }, 1530 .faccessat => |*msg| { 1531 msg.result = os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags); 1532 }, 1533 .close => |*msg| os.close(msg.fd), 1534 } 1535 switch (node.data.finish) { 1536 .TickNode => |*tick_node| self.onNextTick(tick_node), 1537 .NoAction => {}, 1538 } 1539 self.finishOneEvent(); 1540 } 1541 self.fs_thread_wakeup.wait(); 1542 }; 1543 } 1544 1545 const OsData = switch (builtin.os.tag) { 1546 .linux => LinuxOsData, 1547 .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData, 1548 .windows => struct { 1549 io_port: windows.HANDLE, 1550 extra_thread_count: usize, 1551 }, 1552 else => struct {}, 1553 }; 1554 1555 const KEventData = struct { 1556 kqfd: i32, 1557 final_kevent: os.Kevent, 1558 }; 1559 1560 const LinuxOsData = struct { 1561 epollfd: i32, 1562 final_eventfd: i32, 1563 final_eventfd_event: os.linux.epoll_event, 1564 }; 1565 1566 pub const Request = struct { 1567 msg: Msg, 1568 finish: Finish, 1569 1570 pub const Node = std.atomic.Queue(Request).Node; 1571 1572 pub const Finish = union(enum) { 1573 TickNode: Loop.NextTickNode, 1574 NoAction, 1575 }; 1576 1577 pub const Msg = union(enum) { 1578 read: Read, 1579 readv: ReadV, 1580 write: Write, 1581 writev: WriteV, 1582 pwrite: PWrite, 1583 pwritev: PWriteV, 1584 pread: PRead, 1585 preadv: PReadV, 1586 open: Open, 1587 openat: OpenAt, 1588 close: Close, 1589 faccessat: FAccessAt, 1590 1591 /// special - means the fs thread should exit 1592 end, 1593 1594 pub const Read = struct { 1595 fd: os.fd_t, 1596 buf: []u8, 1597 result: Error!usize, 1598 1599 pub const Error = os.ReadError; 1600 }; 1601 1602 pub const ReadV = struct { 1603 fd: os.fd_t, 1604 iov: []const os.iovec, 1605 result: Error!usize, 1606 1607 pub const Error = os.ReadError; 1608 }; 1609 1610 pub const Write = struct { 1611 fd: os.fd_t, 1612 bytes: []const u8, 1613 result: Error!usize, 1614 1615 pub const Error = os.WriteError; 1616 }; 1617 1618 pub const WriteV = struct { 1619 fd: os.fd_t, 1620 iov: []const os.iovec_const, 1621 result: Error!usize, 1622 1623 pub const Error = os.WriteError; 1624 }; 1625 1626 pub const PWrite = struct { 1627 fd: os.fd_t, 1628 bytes: []const u8, 1629 offset: usize, 1630 result: Error!usize, 1631 1632 pub const Error = os.PWriteError; 1633 }; 1634 1635 pub const PWriteV = struct { 1636 fd: os.fd_t, 1637 iov: []const os.iovec_const, 1638 offset: usize, 1639 result: Error!usize, 1640 1641 pub const Error = os.PWriteError; 1642 }; 1643 1644 pub const PRead = struct { 1645 fd: os.fd_t, 1646 buf: []u8, 1647 offset: usize, 1648 result: Error!usize, 1649 1650 pub const Error = os.PReadError; 1651 }; 1652 1653 pub const PReadV = struct { 1654 fd: os.fd_t, 1655 iov: []const os.iovec, 1656 offset: usize, 1657 result: Error!usize, 1658 1659 pub const Error = os.PReadError; 1660 }; 1661 1662 pub const Open = struct { 1663 path: [*:0]const u8, 1664 flags: u32, 1665 mode: os.mode_t, 1666 result: Error!os.fd_t, 1667 1668 pub const Error = os.OpenError; 1669 }; 1670 1671 pub const OpenAt = struct { 1672 fd: os.fd_t, 1673 path: [*:0]const u8, 1674 flags: u32, 1675 mode: os.mode_t, 1676 result: Error!os.fd_t, 1677 1678 pub const Error = os.OpenError; 1679 }; 1680 1681 pub const Close = struct { 1682 fd: os.fd_t, 1683 }; 1684 1685 pub const FAccessAt = struct { 1686 dirfd: os.fd_t, 1687 path: [*:0]const u8, 1688 mode: u32, 1689 flags: u32, 1690 result: Error!void, 1691 1692 pub const Error = os.AccessError; 1693 }; 1694 }; 1695 }; 1696}; 1697 1698test "std.event.Loop - basic" { 1699 // https://github.com/ziglang/zig/issues/1908 1700 if (builtin.single_threaded) return error.SkipZigTest; 1701 1702 if (true) { 1703 // https://github.com/ziglang/zig/issues/4922 1704 return error.SkipZigTest; 1705 } 1706 1707 var loop: Loop = undefined; 1708 try loop.initMultiThreaded(); 1709 defer loop.deinit(); 1710 1711 loop.run(); 1712} 1713 1714fn testEventLoop() i32 { 1715 return 1234; 1716} 1717 1718fn testEventLoop2(h: anyframe->i32, did_it: *bool) void { 1719 const value = await h; 1720 try testing.expect(value == 1234); 1721 did_it.* = true; 1722} 1723 1724var testRunDetachedData: usize = 0; 1725test "std.event.Loop - runDetached" { 1726 // https://github.com/ziglang/zig/issues/1908 1727 if (builtin.single_threaded) return error.SkipZigTest; 1728 if (!std.io.is_async) return error.SkipZigTest; 1729 if (true) { 1730 // https://github.com/ziglang/zig/issues/4922 1731 return error.SkipZigTest; 1732 } 1733 1734 var loop: Loop = undefined; 1735 try loop.initMultiThreaded(); 1736 defer loop.deinit(); 1737 1738 // Schedule the execution, won't actually start until we start the 1739 // event loop. 1740 try loop.runDetached(std.testing.allocator, testRunDetached, .{}); 1741 1742 // Now we can start the event loop. The function will return only 1743 // after all tasks have been completed, allowing us to synchonize 1744 // with the previous runDetached. 1745 loop.run(); 1746 1747 try testing.expect(testRunDetachedData == 1); 1748} 1749 1750fn testRunDetached() void { 1751 testRunDetachedData += 1; 1752} 1753 1754test "std.event.Loop - sleep" { 1755 // https://github.com/ziglang/zig/issues/1908 1756 if (builtin.single_threaded) return error.SkipZigTest; 1757 if (!std.io.is_async) return error.SkipZigTest; 1758 1759 const frames = try testing.allocator.alloc(@Frame(testSleep), 10); 1760 defer testing.allocator.free(frames); 1761 1762 const wait_time = 100 * std.time.ns_per_ms; 1763 var sleep_count: usize = 0; 1764 1765 for (frames) |*frame| 1766 frame.* = async testSleep(wait_time, &sleep_count); 1767 for (frames) |*frame| 1768 await frame; 1769 1770 try testing.expect(sleep_count == frames.len); 1771} 1772 1773fn testSleep(wait_ns: u64, sleep_count: *usize) void { 1774 Loop.instance.?.sleep(wait_ns); 1775 _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst); 1776} 1777