1 /*
2 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 *
6 * This Source Code Form is subject to the terms of the Mozilla Public
7 * License, v. 2.0. If a copy of the MPL was not distributed with this
8 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
9 *
10 * See the COPYRIGHT file distributed with this work for additional
11 * information regarding copyright ownership.
12 */
13
14 #include <inttypes.h>
15
16 #include <isc/align.h>
17 #include <isc/atomic.h>
18 #include <isc/hp.h>
19 #include <isc/mem.h>
20 #include <isc/queue.h>
21 #include <isc/string.h>
22
23 #define BUFFER_SIZE 1024
24
25 #define MAX_THREADS 128
26
27 #define ALIGNMENT 128
28
29 static uintptr_t nulluintptr = (uintptr_t)NULL;
30
31 typedef struct node {
32 atomic_uint_fast32_t deqidx;
33 atomic_uintptr_t items[BUFFER_SIZE];
34 atomic_uint_fast32_t enqidx;
35 atomic_uintptr_t next;
36 isc_mem_t *mctx;
37 } node_t;
38
39 /* we just need one Hazard Pointer */
40 #define HP_TAIL 0
41 #define HP_HEAD 0
42
43 struct isc_queue {
44 alignas(ALIGNMENT) atomic_uintptr_t head;
45 alignas(ALIGNMENT) atomic_uintptr_t tail;
46 isc_mem_t *mctx;
47 int max_threads;
48 int taken;
49 isc_hp_t *hp;
50 void *alloced_ptr;
51 };
52
53 static node_t *
node_new(isc_mem_t * mctx,uintptr_t item)54 node_new(isc_mem_t *mctx, uintptr_t item) {
55 node_t *node = isc_mem_get(mctx, sizeof(*node));
56 *node = (node_t){ .mctx = NULL };
57
58 atomic_init(&node->deqidx, 0);
59 atomic_init(&node->enqidx, 1);
60 atomic_init(&node->next, 0);
61 atomic_init(&node->items[0], item);
62
63 for (int i = 1; i < BUFFER_SIZE; i++) {
64 atomic_init(&node->items[i], 0);
65 }
66
67 isc_mem_attach(mctx, &node->mctx);
68
69 return (node);
70 }
71
72 static void
node_destroy(void * node0)73 node_destroy(void *node0) {
74 node_t *node = (node_t *)node0;
75
76 isc_mem_putanddetach(&node->mctx, node, sizeof(*node));
77 }
78
79 static bool
node_cas_next(node_t * node,node_t * cmp,const node_t * val)80 node_cas_next(node_t *node, node_t *cmp, const node_t *val) {
81 return (atomic_compare_exchange_strong(&node->next, (uintptr_t *)&cmp,
82 (uintptr_t)val));
83 }
84
85 static bool
queue_cas_tail(isc_queue_t * queue,node_t * cmp,const node_t * val)86 queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) {
87 return (atomic_compare_exchange_strong(&queue->tail, (uintptr_t *)&cmp,
88 (uintptr_t)val));
89 }
90
91 static bool
queue_cas_head(isc_queue_t * queue,node_t * cmp,const node_t * val)92 queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) {
93 return (atomic_compare_exchange_strong(&queue->head, (uintptr_t *)&cmp,
94 (uintptr_t)val));
95 }
96
97 isc_queue_t *
isc_queue_new(isc_mem_t * mctx,int max_threads)98 isc_queue_new(isc_mem_t *mctx, int max_threads) {
99 isc_queue_t *queue = NULL;
100 node_t *sentinel = NULL;
101 void *qbuf = NULL;
102 uintptr_t qptr;
103
104 /*
105 * A trick to allocate an aligned isc_queue_t structure
106 */
107 qbuf = isc_mem_get(mctx, sizeof(*queue) + ALIGNMENT);
108 qptr = (uintptr_t)qbuf;
109 queue = (isc_queue_t *)(qptr + (ALIGNMENT - (qptr % ALIGNMENT)));
110
111 if (max_threads == 0) {
112 max_threads = MAX_THREADS;
113 }
114
115 *queue = (isc_queue_t){
116 .max_threads = max_threads,
117 .alloced_ptr = qbuf,
118 };
119
120 isc_mem_attach(mctx, &queue->mctx);
121
122 queue->hp = isc_hp_new(mctx, 1, node_destroy);
123
124 sentinel = node_new(mctx, nulluintptr);
125 atomic_init(&sentinel->enqidx, 0);
126
127 atomic_init(&queue->head, (uintptr_t)sentinel);
128 atomic_init(&queue->tail, (uintptr_t)sentinel);
129
130 return (queue);
131 }
132
133 void
isc_queue_enqueue(isc_queue_t * queue,uintptr_t item)134 isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) {
135 REQUIRE(item != nulluintptr);
136
137 while (true) {
138 node_t *lt = NULL;
139 uint_fast32_t idx;
140 uintptr_t n = nulluintptr;
141
142 lt = (node_t *)isc_hp_protect(queue->hp, 0, &queue->tail);
143 idx = atomic_fetch_add(<->enqidx, 1);
144 if (idx > BUFFER_SIZE - 1) {
145 node_t *lnext = NULL;
146
147 if (lt != (node_t *)atomic_load(&queue->tail)) {
148 continue;
149 }
150
151 lnext = (node_t *)atomic_load(<->next);
152 if (lnext == NULL) {
153 node_t *newnode = node_new(queue->mctx, item);
154 if (node_cas_next(lt, NULL, newnode)) {
155 queue_cas_tail(queue, lt, newnode);
156 isc_hp_clear(queue->hp);
157 return;
158 }
159 node_destroy(newnode);
160 } else {
161 queue_cas_tail(queue, lt, lnext);
162 }
163
164 continue;
165 }
166
167 if (atomic_compare_exchange_strong(<->items[idx], &n, item)) {
168 isc_hp_clear(queue->hp);
169 return;
170 }
171 }
172 }
173
174 uintptr_t
isc_queue_dequeue(isc_queue_t * queue)175 isc_queue_dequeue(isc_queue_t *queue) {
176 REQUIRE(queue != NULL);
177
178 while (true) {
179 node_t *lh = NULL;
180 uint_fast32_t idx;
181 uintptr_t item;
182
183 lh = (node_t *)isc_hp_protect(queue->hp, 0, &queue->head);
184 if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) &&
185 atomic_load(&lh->next) == nulluintptr)
186 {
187 break;
188 }
189
190 idx = atomic_fetch_add(&lh->deqidx, 1);
191 if (idx > BUFFER_SIZE - 1) {
192 node_t *lnext = (node_t *)atomic_load(&lh->next);
193 if (lnext == NULL) {
194 break;
195 }
196 if (queue_cas_head(queue, lh, lnext)) {
197 isc_hp_retire(queue->hp, (uintptr_t)lh);
198 }
199
200 continue;
201 }
202
203 item = atomic_exchange(&(lh->items[idx]),
204 (uintptr_t)&queue->taken);
205 if (item == nulluintptr) {
206 continue;
207 }
208
209 isc_hp_clear(queue->hp);
210 return (item);
211 }
212
213 isc_hp_clear(queue->hp);
214 return (nulluintptr);
215 }
216
217 void
isc_queue_destroy(isc_queue_t * queue)218 isc_queue_destroy(isc_queue_t *queue) {
219 node_t *last = NULL;
220 void *alloced = NULL;
221
222 REQUIRE(queue != NULL);
223
224 while (isc_queue_dequeue(queue) != nulluintptr) {
225 /* do nothing */
226 }
227
228 last = (node_t *)atomic_load_relaxed(&queue->head);
229 node_destroy(last);
230 isc_hp_destroy(queue->hp);
231
232 alloced = queue->alloced_ptr;
233 isc_mem_putanddetach(&queue->mctx, alloced, sizeof(*queue) + ALIGNMENT);
234 }
235