1const std = @import("../std.zig"); 2const builtin = @import("builtin"); 3const assert = std.debug.assert; 4const testing = std.testing; 5const mem = std.mem; 6const Loop = std.event.Loop; 7 8/// Thread-safe async/await lock. 9/// Functions which are waiting for the lock are suspended, and 10/// are resumed when the lock is released, in order. 11/// Allows only one actor to hold the lock. 12/// TODO: make this API also work in blocking I/O mode. 13pub const Lock = struct { 14 mutex: std.Thread.Mutex = std.Thread.Mutex{}, 15 head: usize = UNLOCKED, 16 17 const UNLOCKED = 0; 18 const LOCKED = 1; 19 20 const global_event_loop = Loop.instance orelse 21 @compileError("std.event.Lock currently only works with event-based I/O"); 22 23 const Waiter = struct { 24 // forced Waiter alignment to ensure it doesn't clash with LOCKED 25 next: ?*Waiter align(2), 26 tail: *Waiter, 27 node: Loop.NextTickNode, 28 }; 29 30 pub fn initLocked() Lock { 31 return Lock{ .head = LOCKED }; 32 } 33 34 pub fn acquire(self: *Lock) Held { 35 self.mutex.lock(); 36 37 // self.head transitions from multiple stages depending on the value: 38 // UNLOCKED -> LOCKED: 39 // acquire Lock ownership when theres no waiters 40 // LOCKED -> <Waiter head ptr>: 41 // Lock is already owned, enqueue first Waiter 42 // <head ptr> -> <head ptr>: 43 // Lock is owned with pending waiters. Push our waiter to the queue. 44 45 if (self.head == UNLOCKED) { 46 self.head = LOCKED; 47 self.mutex.unlock(); 48 return Held{ .lock = self }; 49 } 50 51 var waiter: Waiter = undefined; 52 waiter.next = null; 53 waiter.tail = &waiter; 54 55 const head = switch (self.head) { 56 UNLOCKED => unreachable, 57 LOCKED => null, 58 else => @intToPtr(*Waiter, self.head), 59 }; 60 61 if (head) |h| { 62 h.tail.next = &waiter; 63 h.tail = &waiter; 64 } else { 65 self.head = @ptrToInt(&waiter); 66 } 67 68 suspend { 69 waiter.node = Loop.NextTickNode{ 70 .prev = undefined, 71 .next = undefined, 72 .data = @frame(), 73 }; 74 self.mutex.unlock(); 75 } 76 77 return Held{ .lock = self }; 78 } 79 80 pub const Held = struct { 81 lock: *Lock, 82 83 pub fn release(self: Held) void { 84 const waiter = blk: { 85 self.lock.mutex.lock(); 86 defer self.lock.mutex.unlock(); 87 88 // self.head goes through the reverse transition from acquire(): 89 // <head ptr> -> <new head ptr>: 90 // pop a waiter from the queue to give Lock ownership when theres still others pending 91 // <head ptr> -> LOCKED: 92 // pop the laster waiter from the queue, while also giving it lock ownership when awaken 93 // LOCKED -> UNLOCKED: 94 // last lock owner releases lock while no one else is waiting for it 95 96 switch (self.lock.head) { 97 UNLOCKED => { 98 unreachable; // Lock unlocked while unlocking 99 }, 100 LOCKED => { 101 self.lock.head = UNLOCKED; 102 break :blk null; 103 }, 104 else => { 105 const waiter = @intToPtr(*Waiter, self.lock.head); 106 self.lock.head = if (waiter.next == null) LOCKED else @ptrToInt(waiter.next); 107 if (waiter.next) |next| 108 next.tail = waiter.tail; 109 break :blk waiter; 110 }, 111 } 112 }; 113 114 if (waiter) |w| { 115 global_event_loop.onNextTick(&w.node); 116 } 117 } 118 }; 119}; 120 121test "std.event.Lock" { 122 if (!std.io.is_async) return error.SkipZigTest; 123 124 // TODO https://github.com/ziglang/zig/issues/1908 125 if (builtin.single_threaded) return error.SkipZigTest; 126 127 // TODO https://github.com/ziglang/zig/issues/3251 128 if (builtin.os.tag == .freebsd) return error.SkipZigTest; 129 130 var lock = Lock{}; 131 testLock(&lock); 132 133 const expected_result = [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; 134 try testing.expectEqualSlices(i32, &expected_result, &shared_test_data); 135} 136fn testLock(lock: *Lock) void { 137 var handle1 = async lockRunner(lock); 138 var handle2 = async lockRunner(lock); 139 var handle3 = async lockRunner(lock); 140 141 await handle1; 142 await handle2; 143 await handle3; 144} 145 146var shared_test_data = [1]i32{0} ** 10; 147var shared_test_index: usize = 0; 148 149fn lockRunner(lock: *Lock) void { 150 Lock.global_event_loop.yield(); 151 152 var i: usize = 0; 153 while (i < shared_test_data.len) : (i += 1) { 154 const handle = lock.acquire(); 155 defer handle.release(); 156 157 shared_test_index = 0; 158 while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) { 159 shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1; 160 } 161 } 162} 163