1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /** @defgroup
36  *  @ingroup
37  *  @brief
38  *
39  *
40  *
41  * @{
42  *
43  *----------------------------------------------------------------------------*/
44 
45 #include "dnscore/dnscore-config.h"
46 #include <stddef.h>
47 #include <unistd.h>
48 #include <sys/types.h>
49 #include <sys/mman.h>
50 #include <dnscore/format.h>
51 
52 #include "dnscore/dnscore.h"
53 #include "dnscore/fdtools.h"
54 #include "dnscore/mutex.h"
55 #include "dnscore/shared-circular-buffer.h"
56 
57 #define L1_DATA_LINE_SIZE 0x40
58 #define L1_DATA_LINE_MASK (L1_DATA_LINE_SIZE - 1)
59 
60 #define DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE 0
61 #define DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT 0
62 
63 struct shared_circular_buffer
64 {
65     mutex_t mtx;
66     cond_t cond_r;
67     cond_t cond_w;
68     size_t mask;
69     size_t total_size;
70     size_t additional_buffer_size;
71     u8 *additional_buffer_ptr;
72 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
73     debug_memory_by_tag_context_t *memctx;
74 #endif
75 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
76     s64 last_report_time;
77     s64 peak_usage;
78 #endif
79 #ifndef WIN32
80     volatile s64 enqueue_index __attribute__ ((aligned (L1_DATA_LINE_SIZE)));
81     volatile s64 dequeue_index __attribute__ ((aligned (L1_DATA_LINE_SIZE)));
82     struct shared_circular_buffer_slot base[] __attribute__ ((aligned (L1_DATA_LINE_SIZE)));
83 #else
84     volatile s64 enqueue_index;
85     volatile s64 dequeue_index;
86     struct shared_circular_buffer_slot base[];
87 #endif
88 };
89 
90 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
91 static void
shared_circular_buffer_stats(struct shared_circular_buffer * buffer)92 shared_circular_buffer_stats(struct shared_circular_buffer* buffer)
93 {
94     s64 now = timeus();
95     if(now - buffer->last_report_time > 60 * ONE_SECOND_US)
96     {
97         buffer->last_report_time = now;
98         s64 size = buffer->mask + 1;
99         s64 used = buffer->enqueue_index - buffer->dequeue_index;
100         if(used < 0)
101         {
102             used = size - used;
103         }
104         if(buffer->peak_usage < used)
105         {
106             buffer->peak_usage = used;
107         }
108 
109         formatln("shared_circular_buffer@%p: free=%lli, used=%lli, peak=%lli, enqueue=%lli, dequeue=%lli",
110                  buffer, size - used, used, buffer->peak_usage, buffer->enqueue_index, buffer->dequeue_index);
111     }
112 }
113 #endif
114 
shared_circular_buffer_additional_space_ptr(struct shared_circular_buffer * buffer)115 u8 *shared_circular_buffer_additional_space_ptr(struct shared_circular_buffer* buffer)
116 {
117     return buffer->additional_buffer_ptr;
118 }
119 
shared_circular_buffer_additional_space_size(struct shared_circular_buffer * buffer)120 size_t shared_circular_buffer_additional_space_size(struct shared_circular_buffer* buffer)
121 {
122     return buffer->additional_buffer_size;
123 }
124 
125 struct shared_circular_buffer*
shared_circular_buffer_create_ex(u8 log_2_buffer_size,u32 additional_space_bytes)126 shared_circular_buffer_create_ex(u8 log_2_buffer_size, u32 additional_space_bytes)
127 {
128     struct shared_circular_buffer *buffer;
129 
130     const size_t header_size = (sizeof(struct shared_circular_buffer) + L1_DATA_LINE_MASK) & ~L1_DATA_LINE_MASK;
131     size_t buffer_size = sizeof(struct shared_circular_buffer_slot) << log_2_buffer_size;
132 
133     size_t additional_space_real_bytes = (additional_space_bytes + 4095) & ~4095;
134 
135     const size_t total_buffer_size = ((header_size + buffer_size + 4095) & ~4095) + additional_space_real_bytes;
136 
137     buffer = (struct shared_circular_buffer*)mmap(NULL, total_buffer_size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
138 
139     if(buffer != ((struct shared_circular_buffer*)MAP_FAILED))
140     {
141         memset(buffer, 0, header_size);
142 
143         if(mutex_init_process_shared(&buffer->mtx) != 0)
144         {
145             munmap(buffer, total_buffer_size);
146             return NULL;
147         }
148         if(cond_init_process_shared(&buffer->cond_r) != 0)
149         {
150             mutex_destroy(&buffer->mtx);
151             munmap(buffer, total_buffer_size);
152             return NULL;
153         }
154 
155         if(cond_init_process_shared(&buffer->cond_w) != 0)
156         {
157             cond_finalize(&buffer->cond_r);
158             mutex_destroy(&buffer->mtx);
159             munmap(buffer, total_buffer_size);
160             return NULL;
161         }
162 
163         buffer->enqueue_index = 0;
164         buffer->dequeue_index = 0;
165         buffer->mask = (1 << log_2_buffer_size) - 1;
166         buffer->total_size = total_buffer_size;
167         buffer->additional_buffer_size = additional_space_real_bytes;
168         buffer->additional_buffer_ptr = &((u8*)buffer)[total_buffer_size - additional_space_real_bytes];
169 
170 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
171         buffer->memctx = debug_memory_by_tag_new_instance("shrqueue");
172 #endif
173 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
174         buffer->last_report_time = 0;
175         buffer->peak_usage = 0;
176 #endif
177 
178         return buffer;
179     }
180     else
181     {
182         return NULL;
183     }
184 }
185 
186 struct shared_circular_buffer*
shared_circular_buffer_create(u8 log_2_buffer_size)187 shared_circular_buffer_create(u8 log_2_buffer_size)
188 {
189     struct shared_circular_buffer* ret = shared_circular_buffer_create_ex(log_2_buffer_size, 0);
190     return ret;
191 }
192 
193 void
shared_circular_buffer_destroy(struct shared_circular_buffer * buffer)194 shared_circular_buffer_destroy(struct shared_circular_buffer* buffer)
195 {
196     if(buffer != NULL)
197     {
198 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
199         debug_memory_by_tag_delete(buffer->memctx);
200 #endif
201         cond_finalize(&buffer->cond_w);
202         cond_finalize(&buffer->cond_r);
203         mutex_destroy(&buffer->mtx);
204         munmap(buffer, buffer->total_size);
205     }
206 }
207 
208 struct shared_circular_buffer_slot*
shared_circular_buffer_prepare_enqueue(struct shared_circular_buffer * buffer)209 shared_circular_buffer_prepare_enqueue(struct shared_circular_buffer* buffer)
210 {
211     struct shared_circular_buffer_slot *ret;
212 
213     mutex_lock(&buffer->mtx);
214 
215     for(;;)
216     {
217         s64 di = buffer->dequeue_index;
218         s64 ei = buffer->enqueue_index;
219 
220         if((ei >= di) && ((ei - di) <= (s64)buffer->mask))
221         {
222 
223             ret = (struct shared_circular_buffer_slot*)&buffer->base[ei & buffer->mask];
224             ret->state = 0;
225 #if DEBUG
226             memset(ret->data, 'E', sizeof(ret->data));
227 #endif
228             buffer->enqueue_index = ei + 1;
229 
230 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
231             shared_circular_buffer_stats(buffer);
232 #endif
233             break;
234         }
235 
236         cond_wait(&buffer->cond_w, &buffer->mtx); // wait to write
237     }
238 
239     // cond_notify(&buffer->cond_r); // notify reader
240 
241     mutex_unlock(&buffer->mtx);
242 
243     return ret;
244 }
245 
246 struct shared_circular_buffer_slot*
shared_circular_buffer_try_prepare_enqueue(struct shared_circular_buffer * buffer)247 shared_circular_buffer_try_prepare_enqueue(struct shared_circular_buffer* buffer)
248 {
249     struct shared_circular_buffer_slot *ret;
250 
251     if(mutex_trylock(&buffer->mtx))
252     {
253         s64 di = buffer->dequeue_index;
254         s64 ei = buffer->enqueue_index;
255 
256         if((ei >= di) && ((ei - di) <= (s64)buffer->mask))
257         {
258             ret = (struct shared_circular_buffer_slot*)&buffer->base[ei & buffer->mask];
259             ret->state = 0;
260 #if DEBUG
261             memset(ret->data, 'e', sizeof(ret->data));
262 #endif
263             buffer->enqueue_index = ei + 1;
264 
265 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
266             shared_circular_buffer_stats(buffer);
267 #endif
268         }
269         else
270         {
271             ret = NULL;
272         }
273 
274         mutex_unlock(&buffer->mtx);
275 
276         return ret;
277     }
278 
279     return NULL;
280 }
281 
282 void
shared_circular_buffer_commit_enqueue(struct shared_circular_buffer * buffer,struct shared_circular_buffer_slot * slot)283 shared_circular_buffer_commit_enqueue(struct shared_circular_buffer* buffer, struct shared_circular_buffer_slot* slot)
284 {
285     mutex_lock(&buffer->mtx);
286     slot->state = 1;
287 #if DEBUG
288 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
289     debug_memory_by_tag_alloc_notify(buffer->memctx, GENERIC_TAG, sizeof(*slot));
290 #endif
291 #endif
292 
293 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
294     shared_circular_buffer_stats(buffer);
295 #endif
296     cond_notify(&buffer->cond_r);
297     mutex_unlock(&buffer->mtx);
298 }
299 
300 size_t
shared_circular_buffer_get_index(struct shared_circular_buffer * buffer,struct shared_circular_buffer_slot * slot)301 shared_circular_buffer_get_index(struct shared_circular_buffer* buffer, struct shared_circular_buffer_slot* slot)
302 {
303     return slot - buffer->base;
304 }
305 
306 bool
shared_circular_buffer_empty(struct shared_circular_buffer * buffer)307 shared_circular_buffer_empty(struct shared_circular_buffer* buffer)
308 {
309     bool ret;
310     mutex_lock(&buffer->mtx);
311     ret = buffer->dequeue_index == buffer->enqueue_index;
312     mutex_unlock(&buffer->mtx);
313     return ret;
314 }
315 
316 size_t
shared_circular_buffer_size(struct shared_circular_buffer * buffer)317 shared_circular_buffer_size(struct shared_circular_buffer* buffer)
318 {
319     size_t ret;
320     mutex_lock(&buffer->mtx);
321     ret = buffer->enqueue_index - buffer->dequeue_index;
322     mutex_unlock(&buffer->mtx);
323     return ret;
324 }
325 
326 size_t
shared_circular_buffer_avail(struct shared_circular_buffer * buffer)327 shared_circular_buffer_avail(struct shared_circular_buffer* buffer)
328 {
329     return buffer->mask - shared_circular_buffer_size(buffer);
330 }
331 
332 void
shared_circular_buffer_lock(struct shared_circular_buffer * buffer)333 shared_circular_buffer_lock(struct shared_circular_buffer* buffer)
334 {
335     mutex_lock(&buffer->mtx);
336 }
337 
338 void
shared_circular_buffer_unlock(struct shared_circular_buffer * buffer)339 shared_circular_buffer_unlock(struct shared_circular_buffer* buffer)
340 {
341     mutex_unlock(&buffer->mtx);
342 }
343 
344 struct shared_circular_buffer_slot*
shared_circular_buffer_prepare_dequeue(struct shared_circular_buffer * buffer)345 shared_circular_buffer_prepare_dequeue(struct shared_circular_buffer* buffer)
346 {
347     struct shared_circular_buffer_slot *ret;
348 
349     mutex_lock(&buffer->mtx);
350 
351     for(;;)
352     {
353         s64 di = buffer->dequeue_index;
354         s64 ei = buffer->enqueue_index;
355         if(di < ei)
356         {
357             ret = (struct shared_circular_buffer_slot*)&buffer->base[di & buffer->mask];
358             u8 * volatile state = &ret->state;
359 
360             while(*state != 1)
361             {
362                 cond_wait(&buffer->cond_r, &buffer->mtx); // wait to read // there is only one dequeuer so there is no need to reload this slot
363             }
364 
365 #if DEBUG
366             *state = 2;
367 #endif
368 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
369             shared_circular_buffer_stats(buffer);
370 #endif
371             break;
372         }
373 
374         cond_wait(&buffer->cond_r, &buffer->mtx); // wait to read
375     }
376 
377     mutex_unlock(&buffer->mtx);
378 
379     return ret;
380 }
381 
382 struct shared_circular_buffer_slot*
shared_circular_buffer_prepare_dequeue_with_timeout(struct shared_circular_buffer * buffer,s64 timeoutus)383 shared_circular_buffer_prepare_dequeue_with_timeout(struct shared_circular_buffer* buffer, s64 timeoutus)
384 {
385     struct shared_circular_buffer_slot *ret;
386 
387     mutex_lock(&buffer->mtx);
388 
389     for(;;)
390     {
391         s64 di = buffer->dequeue_index;
392         s64 ei = buffer->enqueue_index;
393         if(di < ei)
394         {
395             ret = (struct shared_circular_buffer_slot*)&buffer->base[di & buffer->mask];
396             u8 * volatile state = &ret->state;
397 
398             while(*state != 1)
399             {
400                 if(cond_timedwait(&buffer->cond_r, &buffer->mtx, timeoutus) != 0) // wait to read // there is only one dequeuer so there is no need to reload this slot
401                 {
402                     ret = NULL;
403                     break;
404                 }
405             }
406 
407 #if DEBUG
408             *state = 2;
409 #endif
410 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
411             shared_circular_buffer_stats(buffer);
412 #endif
413             break;
414         }
415 
416         if(cond_timedwait(&buffer->cond_r, &buffer->mtx, timeoutus) != 0) // wait to read
417         {
418             ret = NULL;
419             break;
420         }
421     }
422 
423     mutex_unlock(&buffer->mtx);
424 
425     return ret;
426 }
427 
428 void
shared_circular_buffer_commit_dequeue(struct shared_circular_buffer * buffer)429 shared_circular_buffer_commit_dequeue(struct shared_circular_buffer* buffer)
430 {
431     mutex_lock(&buffer->mtx);
432 
433 #if DEBUG
434     struct shared_circular_buffer_slot *ret;
435     ret = (struct shared_circular_buffer_slot*)&buffer->base[buffer->dequeue_index & buffer->mask];
436     memset(ret->data, 'D', sizeof(ret->data));
437 
438 #if DEBUG_SHARED_CIRCULAR_BUFFER_MEM_USAGE && DNSCORE_DEBUG_HAS_BLOCK_TAG
439     debug_memory_by_tag_free_notify(buffer->memctx, GENERIC_TAG, sizeof(*ret));
440 #endif
441 #endif
442 
443     if(++buffer->dequeue_index == buffer->enqueue_index)
444     {
445         if(buffer->enqueue_index > 65535) // don't advise for less than a few pages
446         {
447             intptr ptr = (intptr)&buffer->base[0];
448             ptr += 4095;
449             ptr &=~4095;
450             size_t size = buffer->total_size - buffer->additional_buffer_size;
451             if(size > 8192)
452             {
453                 size -= 8192;
454                 madvise((void*)ptr, size, MADV_DONTNEED);
455             }
456         }
457 
458         buffer->enqueue_index = 0;
459         buffer->dequeue_index = 0;
460 
461 #if DEBUG_SHARED_CIRCULAR_BUFFER_SELF_REPORT
462         shared_circular_buffer_stats(buffer);
463 #endif
464     }
465 
466     cond_notify(&buffer->cond_w); // notify writers
467 
468     mutex_unlock(&buffer->mtx);
469 }
470 
471 /** @} */
472