1const std = @import("../std.zig"); 2const builtin = @import("builtin"); 3const assert = std.debug.assert; 4const expect = std.testing.expect; 5 6/// Many reader, many writer, non-allocating, thread-safe 7/// Uses a spinlock to protect push() and pop() 8/// When building in single threaded mode, this is a simple linked list. 9pub fn Stack(comptime T: type) type { 10 return struct { 11 root: ?*Node, 12 lock: @TypeOf(lock_init), 13 14 const lock_init = if (builtin.single_threaded) {} else false; 15 16 pub const Self = @This(); 17 18 pub const Node = struct { 19 next: ?*Node, 20 data: T, 21 }; 22 23 pub fn init() Self { 24 return Self{ 25 .root = null, 26 .lock = lock_init, 27 }; 28 } 29 30 /// push operation, but only if you are the first item in the stack. if you did not succeed in 31 /// being the first item in the stack, returns the other item that was there. 32 pub fn pushFirst(self: *Self, node: *Node) ?*Node { 33 node.next = null; 34 return @cmpxchgStrong(?*Node, &self.root, null, node, .SeqCst, .SeqCst); 35 } 36 37 pub fn push(self: *Self, node: *Node) void { 38 if (builtin.single_threaded) { 39 node.next = self.root; 40 self.root = node; 41 } else { 42 while (@atomicRmw(bool, &self.lock, .Xchg, true, .SeqCst)) {} 43 defer assert(@atomicRmw(bool, &self.lock, .Xchg, false, .SeqCst)); 44 45 node.next = self.root; 46 self.root = node; 47 } 48 } 49 50 pub fn pop(self: *Self) ?*Node { 51 if (builtin.single_threaded) { 52 const root = self.root orelse return null; 53 self.root = root.next; 54 return root; 55 } else { 56 while (@atomicRmw(bool, &self.lock, .Xchg, true, .SeqCst)) {} 57 defer assert(@atomicRmw(bool, &self.lock, .Xchg, false, .SeqCst)); 58 59 const root = self.root orelse return null; 60 self.root = root.next; 61 return root; 62 } 63 } 64 65 pub fn isEmpty(self: *Self) bool { 66 return @atomicLoad(?*Node, &self.root, .SeqCst) == null; 67 } 68 }; 69} 70 71const Context = struct { 72 allocator: std.mem.Allocator, 73 stack: *Stack(i32), 74 put_sum: isize, 75 get_sum: isize, 76 get_count: usize, 77 puts_done: bool, 78}; 79// TODO add lazy evaluated build options and then put puts_per_thread behind 80// some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor 81// CI we would use a less aggressive setting since at 1 core, while we still 82// want this test to pass, we need a smaller value since there is so much thrashing 83// we would also use a less aggressive setting when running in valgrind 84const puts_per_thread = 500; 85const put_thread_count = 3; 86 87test "std.atomic.stack" { 88 var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024); 89 defer std.heap.page_allocator.free(plenty_of_memory); 90 91 var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory); 92 var a = fixed_buffer_allocator.threadSafeAllocator(); 93 94 var stack = Stack(i32).init(); 95 var context = Context{ 96 .allocator = a, 97 .stack = &stack, 98 .put_sum = 0, 99 .get_sum = 0, 100 .puts_done = false, 101 .get_count = 0, 102 }; 103 104 if (builtin.single_threaded) { 105 { 106 var i: usize = 0; 107 while (i < put_thread_count) : (i += 1) { 108 try expect(startPuts(&context) == 0); 109 } 110 } 111 context.puts_done = true; 112 { 113 var i: usize = 0; 114 while (i < put_thread_count) : (i += 1) { 115 try expect(startGets(&context) == 0); 116 } 117 } 118 } else { 119 var putters: [put_thread_count]std.Thread = undefined; 120 for (putters) |*t| { 121 t.* = try std.Thread.spawn(.{}, startPuts, .{&context}); 122 } 123 var getters: [put_thread_count]std.Thread = undefined; 124 for (getters) |*t| { 125 t.* = try std.Thread.spawn(.{}, startGets, .{&context}); 126 } 127 128 for (putters) |t| 129 t.join(); 130 @atomicStore(bool, &context.puts_done, true, .SeqCst); 131 for (getters) |t| 132 t.join(); 133 } 134 135 if (context.put_sum != context.get_sum) { 136 std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum }); 137 } 138 139 if (context.get_count != puts_per_thread * put_thread_count) { 140 std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{ 141 context.get_count, 142 @as(u32, puts_per_thread), 143 @as(u32, put_thread_count), 144 }); 145 } 146} 147 148fn startPuts(ctx: *Context) u8 { 149 var put_count: usize = puts_per_thread; 150 var prng = std.rand.DefaultPrng.init(0xdeadbeef); 151 const random = prng.random(); 152 while (put_count != 0) : (put_count -= 1) { 153 std.time.sleep(1); // let the os scheduler be our fuzz 154 const x = @bitCast(i32, random.int(u32)); 155 const node = ctx.allocator.create(Stack(i32).Node) catch unreachable; 156 node.* = Stack(i32).Node{ 157 .next = undefined, 158 .data = x, 159 }; 160 ctx.stack.push(node); 161 _ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst); 162 } 163 return 0; 164} 165 166fn startGets(ctx: *Context) u8 { 167 while (true) { 168 const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst); 169 170 while (ctx.stack.pop()) |node| { 171 std.time.sleep(1); // let the os scheduler be our fuzz 172 _ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst); 173 _ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst); 174 } 175 176 if (last) return 0; 177 } 178} 179