1 /*
2  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
3  *
4  * SPDX-License-Identifier: MPL-2.0
5  *
6  * This Source Code Form is subject to the terms of the Mozilla Public
7  * License, v. 2.0. If a copy of the MPL was not distributed with this
8  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
9  *
10  * See the COPYRIGHT file distributed with this work for additional
11  * information regarding copyright ownership.
12  */
13 
14 #include <inttypes.h>
15 
16 #include <isc/align.h>
17 #include <isc/atomic.h>
18 #include <isc/hp.h>
19 #include <isc/mem.h>
20 #include <isc/queue.h>
21 #include <isc/string.h>
22 
23 #define BUFFER_SIZE 1024
24 
25 #define MAX_THREADS 128
26 
27 #define ALIGNMENT 128
28 
29 static uintptr_t nulluintptr = (uintptr_t)NULL;
30 
31 typedef struct node {
32 	atomic_uint_fast32_t deqidx;
33 	atomic_uintptr_t items[BUFFER_SIZE];
34 	atomic_uint_fast32_t enqidx;
35 	atomic_uintptr_t next;
36 	isc_mem_t *mctx;
37 } node_t;
38 
39 /* we just need one Hazard Pointer */
40 #define HP_TAIL 0
41 #define HP_HEAD 0
42 
43 struct isc_queue {
44 	alignas(ALIGNMENT) atomic_uintptr_t head;
45 	alignas(ALIGNMENT) atomic_uintptr_t tail;
46 	isc_mem_t *mctx;
47 	int max_threads;
48 	int taken;
49 	isc_hp_t *hp;
50 	void *alloced_ptr;
51 };
52 
53 static node_t *
node_new(isc_mem_t * mctx,uintptr_t item)54 node_new(isc_mem_t *mctx, uintptr_t item) {
55 	node_t *node = isc_mem_get(mctx, sizeof(*node));
56 	*node = (node_t){ .mctx = NULL };
57 
58 	atomic_init(&node->deqidx, 0);
59 	atomic_init(&node->enqidx, 1);
60 	atomic_init(&node->next, 0);
61 	atomic_init(&node->items[0], item);
62 
63 	for (int i = 1; i < BUFFER_SIZE; i++) {
64 		atomic_init(&node->items[i], 0);
65 	}
66 
67 	isc_mem_attach(mctx, &node->mctx);
68 
69 	return (node);
70 }
71 
72 static void
node_destroy(void * node0)73 node_destroy(void *node0) {
74 	node_t *node = (node_t *)node0;
75 
76 	isc_mem_putanddetach(&node->mctx, node, sizeof(*node));
77 }
78 
79 static bool
node_cas_next(node_t * node,node_t * cmp,const node_t * val)80 node_cas_next(node_t *node, node_t *cmp, const node_t *val) {
81 	return (atomic_compare_exchange_strong(&node->next, (uintptr_t *)&cmp,
82 					       (uintptr_t)val));
83 }
84 
85 static bool
queue_cas_tail(isc_queue_t * queue,node_t * cmp,const node_t * val)86 queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) {
87 	return (atomic_compare_exchange_strong(&queue->tail, (uintptr_t *)&cmp,
88 					       (uintptr_t)val));
89 }
90 
91 static bool
queue_cas_head(isc_queue_t * queue,node_t * cmp,const node_t * val)92 queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) {
93 	return (atomic_compare_exchange_strong(&queue->head, (uintptr_t *)&cmp,
94 					       (uintptr_t)val));
95 }
96 
97 isc_queue_t *
isc_queue_new(isc_mem_t * mctx,int max_threads)98 isc_queue_new(isc_mem_t *mctx, int max_threads) {
99 	isc_queue_t *queue = NULL;
100 	node_t *sentinel = NULL;
101 	void *qbuf = NULL;
102 	uintptr_t qptr;
103 
104 	/*
105 	 * A trick to allocate an aligned isc_queue_t structure
106 	 */
107 	qbuf = isc_mem_get(mctx, sizeof(*queue) + ALIGNMENT);
108 	qptr = (uintptr_t)qbuf;
109 	queue = (isc_queue_t *)(qptr + (ALIGNMENT - (qptr % ALIGNMENT)));
110 
111 	if (max_threads == 0) {
112 		max_threads = MAX_THREADS;
113 	}
114 
115 	*queue = (isc_queue_t){
116 		.max_threads = max_threads,
117 		.alloced_ptr = qbuf,
118 	};
119 
120 	isc_mem_attach(mctx, &queue->mctx);
121 
122 	queue->hp = isc_hp_new(mctx, 1, node_destroy);
123 
124 	sentinel = node_new(mctx, nulluintptr);
125 	atomic_init(&sentinel->enqidx, 0);
126 
127 	atomic_init(&queue->head, (uintptr_t)sentinel);
128 	atomic_init(&queue->tail, (uintptr_t)sentinel);
129 
130 	return (queue);
131 }
132 
133 void
isc_queue_enqueue(isc_queue_t * queue,uintptr_t item)134 isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) {
135 	REQUIRE(item != nulluintptr);
136 
137 	while (true) {
138 		node_t *lt = NULL;
139 		uint_fast32_t idx;
140 		uintptr_t n = nulluintptr;
141 
142 		lt = (node_t *)isc_hp_protect(queue->hp, 0, &queue->tail);
143 		idx = atomic_fetch_add(&lt->enqidx, 1);
144 		if (idx > BUFFER_SIZE - 1) {
145 			node_t *lnext = NULL;
146 
147 			if (lt != (node_t *)atomic_load(&queue->tail)) {
148 				continue;
149 			}
150 
151 			lnext = (node_t *)atomic_load(&lt->next);
152 			if (lnext == NULL) {
153 				node_t *newnode = node_new(queue->mctx, item);
154 				if (node_cas_next(lt, NULL, newnode)) {
155 					queue_cas_tail(queue, lt, newnode);
156 					isc_hp_clear(queue->hp);
157 					return;
158 				}
159 				node_destroy(newnode);
160 			} else {
161 				queue_cas_tail(queue, lt, lnext);
162 			}
163 
164 			continue;
165 		}
166 
167 		if (atomic_compare_exchange_strong(&lt->items[idx], &n, item)) {
168 			isc_hp_clear(queue->hp);
169 			return;
170 		}
171 	}
172 }
173 
174 uintptr_t
isc_queue_dequeue(isc_queue_t * queue)175 isc_queue_dequeue(isc_queue_t *queue) {
176 	REQUIRE(queue != NULL);
177 
178 	while (true) {
179 		node_t *lh = NULL;
180 		uint_fast32_t idx;
181 		uintptr_t item;
182 
183 		lh = (node_t *)isc_hp_protect(queue->hp, 0, &queue->head);
184 		if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) &&
185 		    atomic_load(&lh->next) == nulluintptr)
186 		{
187 			break;
188 		}
189 
190 		idx = atomic_fetch_add(&lh->deqidx, 1);
191 		if (idx > BUFFER_SIZE - 1) {
192 			node_t *lnext = (node_t *)atomic_load(&lh->next);
193 			if (lnext == NULL) {
194 				break;
195 			}
196 			if (queue_cas_head(queue, lh, lnext)) {
197 				isc_hp_retire(queue->hp, (uintptr_t)lh);
198 			}
199 
200 			continue;
201 		}
202 
203 		item = atomic_exchange(&(lh->items[idx]),
204 				       (uintptr_t)&queue->taken);
205 		if (item == nulluintptr) {
206 			continue;
207 		}
208 
209 		isc_hp_clear(queue->hp);
210 		return (item);
211 	}
212 
213 	isc_hp_clear(queue->hp);
214 	return (nulluintptr);
215 }
216 
217 void
isc_queue_destroy(isc_queue_t * queue)218 isc_queue_destroy(isc_queue_t *queue) {
219 	node_t *last = NULL;
220 	void *alloced = NULL;
221 
222 	REQUIRE(queue != NULL);
223 
224 	while (isc_queue_dequeue(queue) != nulluintptr) {
225 		/* do nothing */
226 	}
227 
228 	last = (node_t *)atomic_load_relaxed(&queue->head);
229 	node_destroy(last);
230 	isc_hp_destroy(queue->hp);
231 
232 	alloced = queue->alloced_ptr;
233 	isc_mem_putanddetach(&queue->mctx, alloced, sizeof(*queue) + ALIGNMENT);
234 }
235