1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /** @defgroup
36 * @ingroup
37 * @brief
38 *
39 *
40 *
41 * @{
42 *
43 *----------------------------------------------------------------------------*/
44
45 #include "dnscore/dnscore-config.h"
46 #include <stddef.h>
47 #include <unistd.h>
48 #include <sys/types.h>
49 #include <sys/mman.h>
50 #include <dnscore/format.h>
51
52 #include "dnscore/dnscore.h"
53 #include "dnscore/fdtools.h"
54 #include "dnscore/mutex.h"
55 #include "dnscore/shared-circular-buffer.h"
56
57 #define L1_DATA_LINE_SIZE 0x40
58 #define L1_DATA_LINE_MASK (L1_DATA_LINE_SIZE - 1)
59
60 #define DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE 0
61 #define DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT 0
62
63 struct shared_circular_buffer
64 {
65 mutex_t mtx;
66 cond_t cond_r;
67 cond_t cond_w;
68 size_t mask;
69 size_t total_size;
70 size_t additional_buffer_size;
71 u8 *additional_buffer_ptr;
72 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
73 debug_memory_by_tag_context_t *memctx;
74 #endif
75 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
76 s64 last_report_time;
77 s64 peak_usage;
78 #endif
79 #ifndef WIN32
80 volatile s64 enqueue_index __attribute__ ((aligned (L1_DATA_LINE_SIZE)));
81 volatile s64 dequeue_index __attribute__ ((aligned (L1_DATA_LINE_SIZE)));
82 struct shared_circular_buffer_slot base[] __attribute__ ((aligned (L1_DATA_LINE_SIZE)));
83 #else
84 volatile s64 enqueue_index;
85 volatile s64 dequeue_index;
86 struct shared_circular_buffer_slot base[];
87 #endif
88 };
89
90 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
91 static void
shared_circular_buffer_stats(struct shared_circular_buffer * buffer)92 shared_circular_buffer_stats(struct shared_circular_buffer* buffer)
93 {
94 s64 now = timeus();
95 if(now - buffer->last_report_time > 60 * ONE_SECOND_US)
96 {
97 buffer->last_report_time = now;
98 s64 size = buffer->mask + 1;
99 s64 used = buffer->enqueue_index - buffer->dequeue_index;
100 if(used < 0)
101 {
102 used = size - used;
103 }
104 if(buffer->peak_usage < used)
105 {
106 buffer->peak_usage = used;
107 }
108
109 formatln("shared_circular_buffer@%p: free=%lli, used=%lli, peak=%lli, enqueue=%lli, dequeue=%lli",
110 buffer, size - used, used, buffer->peak_usage, buffer->enqueue_index, buffer->dequeue_index);
111 }
112 }
113 #endif
114
shared_circular_buffer_additional_space_ptr(struct shared_circular_buffer * buffer)115 u8 *shared_circular_buffer_additional_space_ptr(struct shared_circular_buffer* buffer)
116 {
117 return buffer->additional_buffer_ptr;
118 }
119
shared_circular_buffer_additional_space_size(struct shared_circular_buffer * buffer)120 size_t shared_circular_buffer_additional_space_size(struct shared_circular_buffer* buffer)
121 {
122 return buffer->additional_buffer_size;
123 }
124
125 struct shared_circular_buffer*
shared_circular_buffer_create_ex(u8 log_2_buffer_size,u32 additional_space_bytes)126 shared_circular_buffer_create_ex(u8 log_2_buffer_size, u32 additional_space_bytes)
127 {
128 struct shared_circular_buffer *buffer;
129
130 const size_t header_size = (sizeof(struct shared_circular_buffer) + L1_DATA_LINE_MASK) & ~L1_DATA_LINE_MASK;
131 size_t buffer_size = sizeof(struct shared_circular_buffer_slot) << log_2_buffer_size;
132
133 size_t additional_space_real_bytes = (additional_space_bytes + 4095) & ~4095;
134
135 const size_t total_buffer_size = ((header_size + buffer_size + 4095) & ~4095) + additional_space_real_bytes;
136
137 buffer = (struct shared_circular_buffer*)mmap(NULL, total_buffer_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
138
139 if(buffer != ((struct shared_circular_buffer*)MAP_FAILED))
140 {
141 memset(buffer, 0, header_size);
142
143 if(mutex_init_process_shared(&buffer->mtx) != 0)
144 {
145 munmap(buffer, total_buffer_size);
146 return NULL;
147 }
148 if(cond_init_process_shared(&buffer->cond_r) != 0)
149 {
150 mutex_destroy(&buffer->mtx);
151 munmap(buffer, total_buffer_size);
152 return NULL;
153 }
154
155 if(cond_init_process_shared(&buffer->cond_w) != 0)
156 {
157 cond_finalize(&buffer->cond_r);
158 mutex_destroy(&buffer->mtx);
159 munmap(buffer, total_buffer_size);
160 return NULL;
161 }
162
163 buffer->enqueue_index = 0;
164 buffer->dequeue_index = 0;
165 buffer->mask = (1 << log_2_buffer_size) - 1;
166 buffer->total_size = total_buffer_size;
167 buffer->additional_buffer_size = additional_space_real_bytes;
168 buffer->additional_buffer_ptr = &((u8*)buffer)[total_buffer_size - additional_space_real_bytes];
169
170 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
171 buffer->memctx = debug_memory_by_tag_new_instance("shrqueue");
172 #endif
173 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
174 buffer->last_report_time = 0;
175 buffer->peak_usage = 0;
176 #endif
177
178 return buffer;
179 }
180 else
181 {
182 return NULL;
183 }
184 }
185
186 struct shared_circular_buffer*
shared_circular_buffer_create(u8 log_2_buffer_size)187 shared_circular_buffer_create(u8 log_2_buffer_size)
188 {
189 struct shared_circular_buffer* ret = shared_circular_buffer_create_ex(log_2_buffer_size, 0);
190 return ret;
191 }
192
193 void
shared_circular_buffer_destroy(struct shared_circular_buffer * buffer)194 shared_circular_buffer_destroy(struct shared_circular_buffer* buffer)
195 {
196 if(buffer != NULL)
197 {
198 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
199 debug_memory_by_tag_delete(buffer->memctx);
200 #endif
201 cond_finalize(&buffer->cond_w);
202 cond_finalize(&buffer->cond_r);
203 mutex_destroy(&buffer->mtx);
204 munmap(buffer, buffer->total_size);
205 }
206 }
207
208 struct shared_circular_buffer_slot*
shared_circular_buffer_prepare_enqueue(struct shared_circular_buffer * buffer)209 shared_circular_buffer_prepare_enqueue(struct shared_circular_buffer* buffer)
210 {
211 struct shared_circular_buffer_slot *ret;
212
213 mutex_lock(&buffer->mtx);
214
215 for(;;)
216 {
217 s64 di = buffer->dequeue_index;
218 s64 ei = buffer->enqueue_index;
219
220 if((ei >= di) && ((ei - di) <= (s64)buffer->mask))
221 {
222
223 ret = (struct shared_circular_buffer_slot*)&buffer->base[ei & buffer->mask];
224 ret->state = 0;
225 #if DEBUG
226 memset(ret->data, 'E', sizeof(ret->data));
227 #endif
228 buffer->enqueue_index = ei + 1;
229
230 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
231 shared_circular_buffer_stats(buffer);
232 #endif
233 break;
234 }
235
236 cond_wait(&buffer->cond_w, &buffer->mtx); // wait to write
237 }
238
239 // cond_notify(&buffer->cond_r); // notify reader
240
241 mutex_unlock(&buffer->mtx);
242
243 return ret;
244 }
245
246 struct shared_circular_buffer_slot*
shared_circular_buffer_try_prepare_enqueue(struct shared_circular_buffer * buffer)247 shared_circular_buffer_try_prepare_enqueue(struct shared_circular_buffer* buffer)
248 {
249 struct shared_circular_buffer_slot *ret;
250
251 if(mutex_trylock(&buffer->mtx))
252 {
253 s64 di = buffer->dequeue_index;
254 s64 ei = buffer->enqueue_index;
255
256 if((ei >= di) && ((ei - di) <= (s64)buffer->mask))
257 {
258 ret = (struct shared_circular_buffer_slot*)&buffer->base[ei & buffer->mask];
259 ret->state = 0;
260 #if DEBUG
261 memset(ret->data, 'e', sizeof(ret->data));
262 #endif
263 buffer->enqueue_index = ei + 1;
264
265 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
266 shared_circular_buffer_stats(buffer);
267 #endif
268 }
269 else
270 {
271 ret = NULL;
272 }
273
274 mutex_unlock(&buffer->mtx);
275
276 return ret;
277 }
278
279 return NULL;
280 }
281
282 void
shared_circular_buffer_commit_enqueue(struct shared_circular_buffer * buffer,struct shared_circular_buffer_slot * slot)283 shared_circular_buffer_commit_enqueue(struct shared_circular_buffer* buffer, struct shared_circular_buffer_slot* slot)
284 {
285 mutex_lock(&buffer->mtx);
286 slot->state = 1;
287 #if DEBUG
288 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
289 debug_memory_by_tag_alloc_notify(buffer->memctx, GENERIC_TAG, sizeof(*slot));
290 #endif
291 #endif
292
293 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
294 shared_circular_buffer_stats(buffer);
295 #endif
296 cond_notify(&buffer->cond_r);
297 mutex_unlock(&buffer->mtx);
298 }
299
300 size_t
shared_circular_buffer_get_index(struct shared_circular_buffer * buffer,struct shared_circular_buffer_slot * slot)301 shared_circular_buffer_get_index(struct shared_circular_buffer* buffer, struct shared_circular_buffer_slot* slot)
302 {
303 return slot - buffer->base;
304 }
305
306 bool
shared_circular_buffer_empty(struct shared_circular_buffer * buffer)307 shared_circular_buffer_empty(struct shared_circular_buffer* buffer)
308 {
309 bool ret;
310 mutex_lock(&buffer->mtx);
311 ret = buffer->dequeue_index == buffer->enqueue_index;
312 mutex_unlock(&buffer->mtx);
313 return ret;
314 }
315
316 size_t
shared_circular_buffer_size(struct shared_circular_buffer * buffer)317 shared_circular_buffer_size(struct shared_circular_buffer* buffer)
318 {
319 size_t ret;
320 mutex_lock(&buffer->mtx);
321 ret = buffer->enqueue_index - buffer->dequeue_index;
322 mutex_unlock(&buffer->mtx);
323 return ret;
324 }
325
326 size_t
shared_circular_buffer_avail(struct shared_circular_buffer * buffer)327 shared_circular_buffer_avail(struct shared_circular_buffer* buffer)
328 {
329 return buffer->mask - shared_circular_buffer_size(buffer);
330 }
331
332 void
shared_circular_buffer_lock(struct shared_circular_buffer * buffer)333 shared_circular_buffer_lock(struct shared_circular_buffer* buffer)
334 {
335 mutex_lock(&buffer->mtx);
336 }
337
338 void
shared_circular_buffer_unlock(struct shared_circular_buffer * buffer)339 shared_circular_buffer_unlock(struct shared_circular_buffer* buffer)
340 {
341 mutex_unlock(&buffer->mtx);
342 }
343
344 struct shared_circular_buffer_slot*
shared_circular_buffer_prepare_dequeue(struct shared_circular_buffer * buffer)345 shared_circular_buffer_prepare_dequeue(struct shared_circular_buffer* buffer)
346 {
347 struct shared_circular_buffer_slot *ret;
348
349 mutex_lock(&buffer->mtx);
350
351 for(;;)
352 {
353 s64 di = buffer->dequeue_index;
354 s64 ei = buffer->enqueue_index;
355 if(di < ei)
356 {
357 ret = (struct shared_circular_buffer_slot*)&buffer->base[di & buffer->mask];
358 u8 * volatile state = &ret->state;
359
360 while(*state != 1)
361 {
362 cond_wait(&buffer->cond_r, &buffer->mtx); // wait to read // there is only one dequeuer so there is no need to reload this slot
363 }
364
365 #if DEBUG
366 *state = 2;
367 #endif
368 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
369 shared_circular_buffer_stats(buffer);
370 #endif
371 break;
372 }
373
374 cond_wait(&buffer->cond_r, &buffer->mtx); // wait to read
375 }
376
377 mutex_unlock(&buffer->mtx);
378
379 return ret;
380 }
381
382 struct shared_circular_buffer_slot*
shared_circular_buffer_prepare_dequeue_with_timeout(struct shared_circular_buffer * buffer,s64 timeoutus)383 shared_circular_buffer_prepare_dequeue_with_timeout(struct shared_circular_buffer* buffer, s64 timeoutus)
384 {
385 struct shared_circular_buffer_slot *ret;
386
387 mutex_lock(&buffer->mtx);
388
389 for(;;)
390 {
391 s64 di = buffer->dequeue_index;
392 s64 ei = buffer->enqueue_index;
393 if(di < ei)
394 {
395 ret = (struct shared_circular_buffer_slot*)&buffer->base[di & buffer->mask];
396 u8 * volatile state = &ret->state;
397
398 while(*state != 1)
399 {
400 if(cond_timedwait(&buffer->cond_r, &buffer->mtx, timeoutus) != 0) // wait to read // there is only one dequeuer so there is no need to reload this slot
401 {
402 ret = NULL;
403 break;
404 }
405 }
406
407 #if DEBUG
408 *state = 2;
409 #endif
410 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
411 shared_circular_buffer_stats(buffer);
412 #endif
413 break;
414 }
415
416 if(cond_timedwait(&buffer->cond_r, &buffer->mtx, timeoutus) != 0) // wait to read
417 {
418 ret = NULL;
419 break;
420 }
421 }
422
423 mutex_unlock(&buffer->mtx);
424
425 return ret;
426 }
427
428 void
shared_circular_buffer_commit_dequeue(struct shared_circular_buffer * buffer)429 shared_circular_buffer_commit_dequeue(struct shared_circular_buffer* buffer)
430 {
431 mutex_lock(&buffer->mtx);
432
433 #if DEBUG
434 struct shared_circular_buffer_slot *ret;
435 ret = (struct shared_circular_buffer_slot*)&buffer->base[buffer->dequeue_index & buffer->mask];
436 memset(ret->data, 'D', sizeof(ret->data));
437
438 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
439 debug_memory_by_tag_free_notify(buffer->memctx, GENERIC_TAG, sizeof(*ret));
440 #endif
441 #endif
442
443 if(++buffer->dequeue_index == buffer->enqueue_index)
444 {
445 if(buffer->enqueue_index > 65535) // don't advise for less than a few pages
446 {
447 intptr ptr = (intptr)&buffer->base[0];
448 ptr += 4095;
449 ptr &=~4095;
450 size_t size = buffer->total_size - buffer->additional_buffer_size;
451 if(size > 8192)
452 {
453 size -= 8192;
454 madvise((void*)ptr, size, MADV_DONTNEED);
455 }
456 }
457
458 buffer->enqueue_index = 0;
459 buffer->dequeue_index = 0;
460
461 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
462 shared_circular_buffer_stats(buffer);
463 #endif
464 }
465
466 cond_notify(&buffer->cond_w); // notify writers
467
468 mutex_unlock(&buffer->mtx);
469 }
470
471 /** @} */
472