1// SPDX-License-Identifier: MIT 2// Copyright (c) 2015-2020 Zig Contributors 3// This file is part of [zig](https://ziglang.org/), which is MIT licensed. 4// The MIT license requires this copyright notice to be included in all copies 5// and substantial portions of the software. 6const std = @import("std"); 7const builtin = @import("builtin"); 8const ThreadPool = @This(); 9 10mutex: std.Thread.Mutex = .{}, 11is_running: bool = true, 12allocator: std.mem.Allocator, 13workers: []Worker, 14run_queue: RunQueue = .{}, 15idle_queue: IdleQueue = .{}, 16 17const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent); 18const RunQueue = std.SinglyLinkedList(Runnable); 19const Runnable = struct { 20 runFn: fn (*Runnable) void, 21}; 22 23const Worker = struct { 24 pool: *ThreadPool, 25 thread: std.Thread, 26 /// The node is for this worker only and must have an already initialized event 27 /// when the thread is spawned. 28 idle_node: IdleQueue.Node, 29 30 fn run(worker: *Worker) void { 31 const pool = worker.pool; 32 33 while (true) { 34 pool.mutex.lock(); 35 36 if (pool.run_queue.popFirst()) |run_node| { 37 pool.mutex.unlock(); 38 (run_node.data.runFn)(&run_node.data); 39 continue; 40 } 41 42 if (pool.is_running) { 43 worker.idle_node.data.reset(); 44 45 pool.idle_queue.prepend(&worker.idle_node); 46 pool.mutex.unlock(); 47 48 worker.idle_node.data.wait(); 49 continue; 50 } 51 52 pool.mutex.unlock(); 53 return; 54 } 55 } 56}; 57 58pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void { 59 self.* = .{ 60 .allocator = allocator, 61 .workers = &[_]Worker{}, 62 }; 63 if (builtin.single_threaded) 64 return; 65 66 const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1); 67 self.workers = try allocator.alloc(Worker, worker_count); 68 errdefer allocator.free(self.workers); 69 70 var worker_index: usize = 0; 71 errdefer self.destroyWorkers(worker_index); 72 while (worker_index < worker_count) : (worker_index += 1) { 73 const worker = &self.workers[worker_index]; 74 worker.pool = self; 75 76 // Each worker requires its ResetEvent to be pre-initialized. 77 try worker.idle_node.data.init(); 78 errdefer worker.idle_node.data.deinit(); 79 80 worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker}); 81 } 82} 83 84fn destroyWorkers(self: *ThreadPool, spawned: usize) void { 85 for (self.workers[0..spawned]) |*worker| { 86 worker.thread.join(); 87 worker.idle_node.data.deinit(); 88 } 89} 90 91pub fn deinit(self: *ThreadPool) void { 92 { 93 self.mutex.lock(); 94 defer self.mutex.unlock(); 95 96 self.is_running = false; 97 while (self.idle_queue.popFirst()) |idle_node| 98 idle_node.data.set(); 99 } 100 101 self.destroyWorkers(self.workers.len); 102 self.allocator.free(self.workers); 103} 104 105pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { 106 if (builtin.single_threaded) { 107 @call(.{}, func, args); 108 return; 109 } 110 111 const Args = @TypeOf(args); 112 const Closure = struct { 113 arguments: Args, 114 pool: *ThreadPool, 115 run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, 116 117 fn runFn(runnable: *Runnable) void { 118 const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); 119 const closure = @fieldParentPtr(@This(), "run_node", run_node); 120 @call(.{}, func, closure.arguments); 121 122 const mutex = &closure.pool.mutex; 123 mutex.lock(); 124 defer mutex.unlock(); 125 closure.pool.allocator.destroy(closure); 126 } 127 }; 128 129 self.mutex.lock(); 130 defer self.mutex.unlock(); 131 132 const closure = try self.allocator.create(Closure); 133 closure.* = .{ 134 .arguments = args, 135 .pool = self, 136 }; 137 138 self.run_queue.prepend(&closure.run_node); 139 140 if (self.idle_queue.popFirst()) |idle_node| 141 idle_node.data.set(); 142} 143