1// SPDX-License-Identifier: MIT
2// Copyright (c) 2015-2020 Zig Contributors
3// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
4// The MIT license requires this copyright notice to be included in all copies
5// and substantial portions of the software.
6const std = @import("std");
7const builtin = @import("builtin");
8const ThreadPool = @This();
9
10mutex: std.Thread.Mutex = .{},
11is_running: bool = true,
12allocator: std.mem.Allocator,
13workers: []Worker,
14run_queue: RunQueue = .{},
15idle_queue: IdleQueue = .{},
16
17const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent);
18const RunQueue = std.SinglyLinkedList(Runnable);
19const Runnable = struct {
20    runFn: fn (*Runnable) void,
21};
22
23const Worker = struct {
24    pool: *ThreadPool,
25    thread: std.Thread,
26    /// The node is for this worker only and must have an already initialized event
27    /// when the thread is spawned.
28    idle_node: IdleQueue.Node,
29
30    fn run(worker: *Worker) void {
31        const pool = worker.pool;
32
33        while (true) {
34            pool.mutex.lock();
35
36            if (pool.run_queue.popFirst()) |run_node| {
37                pool.mutex.unlock();
38                (run_node.data.runFn)(&run_node.data);
39                continue;
40            }
41
42            if (pool.is_running) {
43                worker.idle_node.data.reset();
44
45                pool.idle_queue.prepend(&worker.idle_node);
46                pool.mutex.unlock();
47
48                worker.idle_node.data.wait();
49                continue;
50            }
51
52            pool.mutex.unlock();
53            return;
54        }
55    }
56};
57
58pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void {
59    self.* = .{
60        .allocator = allocator,
61        .workers = &[_]Worker{},
62    };
63    if (builtin.single_threaded)
64        return;
65
66    const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
67    self.workers = try allocator.alloc(Worker, worker_count);
68    errdefer allocator.free(self.workers);
69
70    var worker_index: usize = 0;
71    errdefer self.destroyWorkers(worker_index);
72    while (worker_index < worker_count) : (worker_index += 1) {
73        const worker = &self.workers[worker_index];
74        worker.pool = self;
75
76        // Each worker requires its ResetEvent to be pre-initialized.
77        try worker.idle_node.data.init();
78        errdefer worker.idle_node.data.deinit();
79
80        worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker});
81    }
82}
83
84fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
85    for (self.workers[0..spawned]) |*worker| {
86        worker.thread.join();
87        worker.idle_node.data.deinit();
88    }
89}
90
91pub fn deinit(self: *ThreadPool) void {
92    {
93        self.mutex.lock();
94        defer self.mutex.unlock();
95
96        self.is_running = false;
97        while (self.idle_queue.popFirst()) |idle_node|
98            idle_node.data.set();
99    }
100
101    self.destroyWorkers(self.workers.len);
102    self.allocator.free(self.workers);
103}
104
105pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
106    if (builtin.single_threaded) {
107        @call(.{}, func, args);
108        return;
109    }
110
111    const Args = @TypeOf(args);
112    const Closure = struct {
113        arguments: Args,
114        pool: *ThreadPool,
115        run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
116
117        fn runFn(runnable: *Runnable) void {
118            const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
119            const closure = @fieldParentPtr(@This(), "run_node", run_node);
120            @call(.{}, func, closure.arguments);
121
122            const mutex = &closure.pool.mutex;
123            mutex.lock();
124            defer mutex.unlock();
125            closure.pool.allocator.destroy(closure);
126        }
127    };
128
129    self.mutex.lock();
130    defer self.mutex.unlock();
131
132    const closure = try self.allocator.create(Closure);
133    closure.* = .{
134        .arguments = args,
135        .pool = self,
136    };
137
138    self.run_queue.prepend(&closure.run_node);
139
140    if (self.idle_queue.popFirst()) |idle_node|
141        idle_node.data.set();
142}
143