1 #ifdef HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4 
5 #include <unistd.h>
6 #include "Eina.h"
7 #include "eina_thread_queue.h"
8 #include "eina_safety_checks.h"
9 #include "eina_log.h"
10 
11 #include "eina_private.h"
12 
13 #ifdef __ATOMIC_RELAXED
14 #define ATOMIC 1
15 #endif
16 
17 // use spinlocks for read/write locks as they lead to more throughput and
18 // these locks are meant to be held very temporarily, if there is any
19 // contention at all
20 #define RW_SPINLOCK 1
21 
22 #ifdef RW_SPINLOCK
23 #define RWLOCK           Eina_Spinlock
24 #define RWLOCK_NEW(x)    eina_spinlock_new(x)
25 #define RWLOCK_FREE(x)   eina_spinlock_free(x)
26 #define RWLOCK_LOCK(x)   eina_spinlock_take(x)
27 #define RWLOCK_UNLOCK(x) eina_spinlock_release(x)
28 #else
29 #define RWLOCK           Eina_Lock
30 #define RWLOCK_NEW(x)    eina_lock_new(x)
31 #define RWLOCK_FREE(x)   eina_lock_free(x)
32 #define RWLOCK_LOCK(x)   eina_lock_take(x)
33 #define RWLOCK_UNLOCK(x) eina_lock_release(x)
34 #endif
35 
36 typedef struct _Eina_Thread_Queue_Msg_Block Eina_Thread_Queue_Msg_Block;
37 
38 struct _Eina_Thread_Queue
39 {
40    Eina_Thread_Queue_Msg_Block  *data; // all the data being written to
41    Eina_Thread_Queue_Msg_Block  *last; // the last block where new data goes
42    Eina_Thread_Queue_Msg_Block  *read; // block when reading starts from data
43    Eina_Thread_Queue            *parent; // parent queue to wake on send
44    RWLOCK                        lock_read; // a lock for when doing reads
45    RWLOCK                        lock_write; // a lock for doing writes
46    Eina_Semaphore                sem; // signalling - one per message
47 #ifndef ATOMIC
48    Eina_Spinlock                 lock_pending; // lock for pending field
49 #endif
50    int                           pending; // how many messages left to read
51    int                           fd; // optional fd to write byte to on msg
52 };
53 
54 struct _Eina_Thread_Queue_Msg_Block
55 {
56    Eina_Thread_Queue_Msg_Block  *next; // next block in the list
57    Eina_Lock                     lock_non_0_ref; // block non-0 ref state
58 #ifndef ATOMIC
59    Eina_Spinlock                 lock_ref; // lock for ref field
60    Eina_Spinlock                 lock_first; // lock for first field
61 #endif
62    int                           ref; // the number of open reads/writes
63    int                           size; // the total allocated bytes of data[]
64    int                           first; // the byte pos of the first msg
65    int                           last; // the byte pos just after the last msg
66    Eina_Bool                     full : 1; // is this block full yet?
67    Eina_Thread_Queue_Msg         data[1]; // data in memory beyond struct end
68 };
69 
70 // the minimum size of any message block holding 1 or more messages
71 #define MIN_SIZE ((int)(4096 - sizeof(Eina_Thread_Queue_Msg_Block) + sizeof(Eina_Thread_Queue_Msg)))
72 
73 // a pool of spare message blocks that are only of the minimum size so we
74 // avoid reallocation via malloc/free etc. to avoid free memory pages and
75 // pressure on the malloc subsystem
76 static int _eina_thread_queue_log_dom = -1;
77 static int _eina_thread_queue_block_pool_count = 0;
78 static Eina_Spinlock _eina_thread_queue_block_pool_lock;
79 static Eina_Thread_Queue_Msg_Block *_eina_thread_queue_block_pool = NULL;
80 
81 #ifdef ERR
82 # undef ERR
83 #endif
84 #define ERR(...) EINA_LOG_DOM_ERR(_eina_thread_queue_log_dom, __VA_ARGS__)
85 
86 #ifdef DBG
87 # undef DBG
88 #endif
89 #define DBG(...) EINA_LOG_DOM_DBG(_eina_thread_queue_log_dom, __VA_ARGS__)
90 
91 // api's to get message blocks from the pool or put them back in
92 static Eina_Thread_Queue_Msg_Block *
_eina_thread_queue_msg_block_new(int size)93 _eina_thread_queue_msg_block_new(int size)
94 {
95    Eina_Thread_Queue_Msg_Block *blk;
96 
97    eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
98    if (_eina_thread_queue_block_pool)
99      {
100         blk = _eina_thread_queue_block_pool;
101         if (blk->size >= size)
102           {
103              blk->first = 0;
104              blk->last = 0;
105              blk->ref = 0;
106              blk->full = 0;
107              _eina_thread_queue_block_pool = blk->next;
108              blk->next = NULL;
109              _eina_thread_queue_block_pool_count--;
110              eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
111              return blk;
112           }
113         blk = NULL;
114      }
115    eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
116 
117    blk = malloc(sizeof(Eina_Thread_Queue_Msg_Block) -
118                 sizeof(Eina_Thread_Queue_Msg) +
119                 size);
120    if (!blk)
121      {
122         ERR("Thread queue block buffer of size %i allocation failed", size);
123         return NULL;
124      }
125    blk->next = NULL;
126 #ifndef ATOMIC
127    eina_spinlock_new(&(blk->lock_ref));
128    eina_spinlock_new(&(blk->lock_first));
129 #endif
130    eina_lock_new(&(blk->lock_non_0_ref));
131    blk->size = size;
132    blk->first = 0;
133    blk->last = 0;
134    blk->ref = 0;
135    blk->full = 0;
136    return blk;
137 }
138 
139 static void
_eina_thread_queue_msg_block_real_free(Eina_Thread_Queue_Msg_Block * blk)140 _eina_thread_queue_msg_block_real_free(Eina_Thread_Queue_Msg_Block *blk)
141 {
142    eina_lock_take(&(blk->lock_non_0_ref));
143    eina_lock_release(&(blk->lock_non_0_ref));
144    eina_lock_free(&(blk->lock_non_0_ref));
145 #ifndef ATOMIC
146    eina_spinlock_take(&(blk->lock_ref));
147    eina_spinlock_release(&(blk->lock_ref));
148    eina_spinlock_free(&(blk->lock_ref));
149    eina_spinlock_take(&(blk->lock_first));
150    eina_spinlock_release(&(blk->lock_first));
151    eina_spinlock_free(&(blk->lock_first));
152 #endif
153    free(blk);
154 }
155 
156 static void
_eina_thread_queue_msg_block_free(Eina_Thread_Queue_Msg_Block * blk)157 _eina_thread_queue_msg_block_free(Eina_Thread_Queue_Msg_Block *blk)
158 {
159    if (blk->size == MIN_SIZE)
160      {
161         eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
162         if (_eina_thread_queue_block_pool_count < 20)
163           {
164              _eina_thread_queue_block_pool_count++;
165              blk->next = _eina_thread_queue_block_pool;
166              _eina_thread_queue_block_pool = blk;
167              eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
168           }
169         else
170           {
171              eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
172              _eina_thread_queue_msg_block_real_free(blk);
173           }
174      }
175    else _eina_thread_queue_msg_block_real_free(blk);
176 }
177 
178 static Eina_Bool
_eina_thread_queue_msg_block_pool_init(void)179 _eina_thread_queue_msg_block_pool_init(void)
180 {
181    return eina_spinlock_new(&_eina_thread_queue_block_pool_lock);
182 }
183 
184 static void
_eina_thread_queue_msg_block_pool_shutdown(void)185 _eina_thread_queue_msg_block_pool_shutdown(void)
186 {
187    eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
188    while (_eina_thread_queue_block_pool)
189      {
190         Eina_Thread_Queue_Msg_Block *blk, *blknext;
191 
192         for (;;)
193           {
194              blk = _eina_thread_queue_block_pool;
195              if (!blk) break;
196              blknext = blk->next;
197              _eina_thread_queue_msg_block_real_free(blk);
198              _eina_thread_queue_block_pool = blknext;
199           }
200      }
201    eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
202    eina_spinlock_free(&_eina_thread_queue_block_pool_lock);
203 }
204 
205 // utility functions for waiting/waking threads
206 static void
_eina_thread_queue_wait(Eina_Thread_Queue * thq)207 _eina_thread_queue_wait(Eina_Thread_Queue *thq)
208 {
209    if (!eina_semaphore_lock(&(thq->sem)))
210      ERR("Thread queue semaphore lock/wait failed - bad things will happen");
211 }
212 
213 static void
_eina_thread_queue_wake(Eina_Thread_Queue * thq)214 _eina_thread_queue_wake(Eina_Thread_Queue *thq)
215 {
216    if (!eina_semaphore_release(&(thq->sem), 1))
217      ERR("Thread queue semaphore release/wakeup faile - bad things will happen");
218 }
219 
220 // how to allocate or release memory within one of the message blocks for
221 // an arbitrary sized bit of message data. the size always includes the
222 // message header which tells you the size of that message
223 static Eina_Thread_Queue_Msg *
_eina_thread_queue_msg_alloc(Eina_Thread_Queue * thq,int size,Eina_Thread_Queue_Msg_Block ** blkret)224 _eina_thread_queue_msg_alloc(Eina_Thread_Queue *thq, int size, Eina_Thread_Queue_Msg_Block **blkret)
225 {
226    Eina_Thread_Queue_Msg_Block *blk;
227    Eina_Thread_Queue_Msg *msg = NULL;
228    int ref;
229 
230    // round up to nearest 8
231    size = ((size + 7) >> 3) << 3;
232    if (!thq->data)
233      {
234         if (size < MIN_SIZE)
235           thq->data = _eina_thread_queue_msg_block_new(MIN_SIZE);
236         else
237           thq->data = _eina_thread_queue_msg_block_new(size);
238         thq->last = thq->data;
239      }
240    blk = thq->last;
241    if (blk->full)
242      {
243         if (size < MIN_SIZE)
244           blk->next = _eina_thread_queue_msg_block_new(MIN_SIZE);
245         else
246           blk->next = _eina_thread_queue_msg_block_new(size);
247         blk = blk->next;
248         thq->last = blk;
249      }
250    if ((blk->size - blk->last) >= size)
251      {
252         blk->last += size;
253         if (blk->last == blk->size) blk->full = 1;
254         msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + (blk->last - size));
255      }
256    else
257      {
258         if (size < MIN_SIZE)
259           blk->next = _eina_thread_queue_msg_block_new(MIN_SIZE);
260         else
261           blk->next = _eina_thread_queue_msg_block_new(size);
262         blk = blk->next;
263         thq->last = blk;
264         blk->last += size;
265         if (blk->last == blk->size) blk->full = 1;
266         msg = (Eina_Thread_Queue_Msg *)(&(blk->data[0]));
267      }
268    msg->size = size;
269 #ifdef ATOMIC
270    ref = __atomic_add_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
271 #else
272    eina_spinlock_take(&(blk->lock_ref));
273    blk->ref++;
274    ref = blk->ref;
275    eina_spinlock_release(&(blk->lock_ref));
276 #endif
277    if (ref == 1) eina_lock_take(&(blk->lock_non_0_ref));
278    *blkret = blk;
279    return msg;
280 }
281 
282 static void
_eina_thread_queue_msg_alloc_done(Eina_Thread_Queue_Msg_Block * blk)283 _eina_thread_queue_msg_alloc_done(Eina_Thread_Queue_Msg_Block *blk)
284 {
285    int ref;
286 #ifdef ATOMIC
287    ref = __atomic_sub_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
288 #else
289    eina_spinlock_take(&(blk->lock_ref));
290    blk->ref--;
291    ref = blk->ref;
292    eina_spinlock_release(&(blk->lock_ref));
293 #endif
294    if (ref == 0) eina_lock_release(&(blk->lock_non_0_ref));
295 }
296 
297 static Eina_Thread_Queue_Msg *
_eina_thread_queue_msg_fetch(Eina_Thread_Queue * thq,Eina_Thread_Queue_Msg_Block ** blkret)298 _eina_thread_queue_msg_fetch(Eina_Thread_Queue *thq, Eina_Thread_Queue_Msg_Block **blkret)
299 {
300    Eina_Thread_Queue_Msg_Block *blk;
301    Eina_Thread_Queue_Msg *msg;
302    int ref, first;
303 
304    if (!thq->read)
305      {
306         RWLOCK_LOCK(&(thq->lock_write));
307         blk = thq->data;
308         if (!blk)
309           {
310              RWLOCK_UNLOCK(&(thq->lock_write));
311              return NULL;
312           }
313 #ifdef ATOMIC
314         __atomic_load(&(blk->ref), &ref, __ATOMIC_RELAXED);
315 #else
316         eina_spinlock_take(&(blk->lock_ref));
317         ref = blk->ref;
318         eina_spinlock_release(&(blk->lock_ref));
319 #endif
320         if (ref > 0) eina_lock_take(&(blk->lock_non_0_ref));
321         thq->read = blk;
322         if (thq->last == blk) thq->last = blk->next;
323         thq->data = blk->next;
324         blk->next = NULL;
325         if (ref > 0) eina_lock_release(&(blk->lock_non_0_ref));
326         RWLOCK_UNLOCK(&(thq->lock_write));
327      }
328    blk = thq->read;
329 #ifdef ATOMIC
330    __atomic_load(&blk->first, &first, __ATOMIC_RELAXED);
331    msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + first);
332    first = __atomic_add_fetch(&(blk->first), msg->size, __ATOMIC_RELAXED);
333 #else
334    eina_spinlock_take(&blk->lock_first);
335    msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + blk->first);
336    first = blk->first += msg->size;
337    eina_spinlock_release(&blk->lock_first);
338 #endif
339    if (first >= blk->last) thq->read = NULL;
340    *blkret = blk;
341 #ifdef ATOMIC
342    __atomic_add_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
343 #else
344    eina_spinlock_take(&(blk->lock_ref));
345    blk->ref++;
346    eina_spinlock_release(&(blk->lock_ref));
347 #endif
348    return msg;
349 }
350 
351 static void
_eina_thread_queue_msg_fetch_done(Eina_Thread_Queue_Msg_Block * blk)352 _eina_thread_queue_msg_fetch_done(Eina_Thread_Queue_Msg_Block *blk)
353 {
354    int ref, first;
355 
356 #ifdef ATOMIC
357    ref = __atomic_sub_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
358    __atomic_load(&blk->first, &first, __ATOMIC_RELAXED);
359 #else
360    eina_spinlock_take(&(blk->lock_ref));
361    blk->ref--;
362    ref = blk->ref;
363    eina_spinlock_release(&(blk->lock_ref));
364    eina_spinlock_take(&blk->lock_first);
365    first = blk->first;
366    eina_spinlock_release(&blk->lock_first);
367 #endif
368    if ((first >= blk->last) && (ref == 0))
369      _eina_thread_queue_msg_block_free(blk);
370 }
371 
372 
373 //////////////////////////////////////////////////////////////////////////////
374 Eina_Bool
eina_thread_queue_init(void)375 eina_thread_queue_init(void)
376 {
377    _eina_thread_queue_log_dom = eina_log_domain_register("eina_thread_queue",
378                                                          EINA_LOG_COLOR_DEFAULT);
379    if (_eina_thread_queue_log_dom < 0)
380      {
381         EINA_LOG_ERR("Could not register log domain: eina_thread_queue");
382         return EINA_FALSE;
383      }
384    if (!_eina_thread_queue_msg_block_pool_init())
385      {
386         ERR("Cannot init thread queue block pool spinlock");
387         return EINA_FALSE;
388      }
389    return EINA_TRUE;
390 }
391 
392 Eina_Bool
eina_thread_queue_shutdown(void)393 eina_thread_queue_shutdown(void)
394 {
395    _eina_thread_queue_msg_block_pool_shutdown();
396    eina_log_domain_unregister(_eina_thread_queue_log_dom);
397    return EINA_TRUE;
398 }
399 
400 EAPI Eina_Thread_Queue *
eina_thread_queue_new(void)401 eina_thread_queue_new(void)
402 {
403    Eina_Thread_Queue *thq;
404 
405    thq = calloc(1, sizeof(Eina_Thread_Queue));
406    if (!thq)
407      {
408         ERR("Allocation of Thread queue structure failed");
409         return NULL;
410      }
411    thq->fd = -1;
412    if (!eina_semaphore_new(&(thq->sem), 0))
413      {
414         ERR("Cannot init new semaphore for eina_threadqueue");
415         free(thq);
416         return NULL;
417      }
418    RWLOCK_NEW(&(thq->lock_read));
419    RWLOCK_NEW(&(thq->lock_write));
420 #ifndef ATOMIC
421    eina_spinlock_new(&(thq->lock_pending));
422 #endif
423    return thq;
424 }
425 
426 EAPI void
eina_thread_queue_free(Eina_Thread_Queue * thq)427 eina_thread_queue_free(Eina_Thread_Queue *thq)
428 {
429    if (!thq) return;
430 
431 #ifndef ATOMIC
432    eina_spinlock_free(&(thq->lock_pending));
433 #endif
434    RWLOCK_FREE(&(thq->lock_read));
435    RWLOCK_FREE(&(thq->lock_write));
436    eina_semaphore_free(&(thq->sem));
437    free(thq);
438 }
439 
440 EAPI void *
eina_thread_queue_send(Eina_Thread_Queue * thq,int size,void ** allocref)441 eina_thread_queue_send(Eina_Thread_Queue *thq, int size, void **allocref)
442 {
443    Eina_Thread_Queue_Msg *msg;
444    Eina_Thread_Queue_Msg_Block *blk;
445 
446    RWLOCK_LOCK(&(thq->lock_write));
447    msg = _eina_thread_queue_msg_alloc(thq, size, &blk);
448    RWLOCK_UNLOCK(&(thq->lock_write));
449    *allocref = blk;
450 #ifdef ATOMIC
451   __atomic_add_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
452 #else
453    eina_spinlock_take(&(thq->lock_pending));
454    thq->pending++;
455    eina_spinlock_release(&(thq->lock_pending));
456 #endif
457    return msg;
458 }
459 
460 EAPI void
eina_thread_queue_send_done(Eina_Thread_Queue * thq,void * allocref)461 eina_thread_queue_send_done(Eina_Thread_Queue *thq, void *allocref)
462 {
463    _eina_thread_queue_msg_alloc_done(allocref);
464    _eina_thread_queue_wake(thq);
465    if (thq->parent)
466      {
467         void *ref;
468         Eina_Thread_Queue_Msg_Sub *msg;
469 
470         msg = eina_thread_queue_send(thq->parent,
471                                      sizeof(Eina_Thread_Queue_Msg_Sub), &ref);
472         if (msg)
473           {
474              msg->queue = thq;
475              eina_thread_queue_send_done(thq->parent, ref);
476           }
477      }
478    if (thq->fd >= 0)
479      {
480         char dummy = 0;
481         if (write(thq->fd, &dummy, 1) != 1)
482           ERR("Eina Threadqueue write to fd %i failed", thq->fd);
483      }
484 }
485 
486 EAPI void *
eina_thread_queue_wait(Eina_Thread_Queue * thq,void ** allocref)487 eina_thread_queue_wait(Eina_Thread_Queue *thq, void **allocref)
488 {
489    Eina_Thread_Queue_Msg *msg;
490    Eina_Thread_Queue_Msg_Block *blk;
491 
492    _eina_thread_queue_wait(thq);
493    RWLOCK_LOCK(&(thq->lock_read));
494    msg = _eina_thread_queue_msg_fetch(thq, &blk);
495    RWLOCK_UNLOCK(&(thq->lock_read));
496    *allocref = blk;
497 #ifdef ATOMIC
498   __atomic_sub_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
499 #else
500    eina_spinlock_take(&(thq->lock_pending));
501    thq->pending--;
502    eina_spinlock_release(&(thq->lock_pending));
503 #endif
504    return msg;
505 }
506 
507 EAPI void
eina_thread_queue_wait_done(Eina_Thread_Queue * thq EINA_UNUSED,void * allocref)508 eina_thread_queue_wait_done(Eina_Thread_Queue *thq EINA_UNUSED, void *allocref)
509 {
510    _eina_thread_queue_msg_fetch_done(allocref);
511 }
512 
513 EAPI void *
eina_thread_queue_poll(Eina_Thread_Queue * thq,void ** allocref)514 eina_thread_queue_poll(Eina_Thread_Queue *thq, void **allocref)
515 {
516    Eina_Thread_Queue_Msg *msg;
517    Eina_Thread_Queue_Msg_Block *blk;
518 
519    RWLOCK_LOCK(&(thq->lock_read));
520    msg = _eina_thread_queue_msg_fetch(thq, &blk);
521    RWLOCK_UNLOCK(&(thq->lock_read));
522    if (msg)
523      {
524         _eina_thread_queue_wait(thq);
525         *allocref = blk;
526 #ifdef ATOMIC
527         __atomic_sub_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
528 #else
529         eina_spinlock_take(&(thq->lock_pending));
530         thq->pending--;
531         eina_spinlock_release(&(thq->lock_pending));
532 #endif
533      }
534    return msg;
535 }
536 
537 EAPI int
eina_thread_queue_pending_get(const Eina_Thread_Queue * thq)538 eina_thread_queue_pending_get(const Eina_Thread_Queue *thq)
539 {
540    int pending;
541 
542 #ifdef ATOMIC
543    __atomic_load(&(thq->pending), &pending, __ATOMIC_RELAXED);
544 #else
545    eina_spinlock_take((Eina_Spinlock *)&(thq->lock_pending));
546    pending = thq->pending;
547    eina_spinlock_release((Eina_Spinlock *)&(thq->lock_pending));
548 #endif
549    return pending;
550 }
551 
552 EAPI void
eina_thread_queue_parent_set(Eina_Thread_Queue * thq,Eina_Thread_Queue * thq_parent)553 eina_thread_queue_parent_set(Eina_Thread_Queue *thq, Eina_Thread_Queue *thq_parent)
554 {
555    thq->parent = thq_parent;
556 }
557 
558 EAPI Eina_Thread_Queue *
eina_thread_queue_parent_get(const Eina_Thread_Queue * thq)559 eina_thread_queue_parent_get(const Eina_Thread_Queue *thq)
560 {
561    return thq->parent;
562 }
563 
564 EAPI void
eina_thread_queue_fd_set(Eina_Thread_Queue * thq,int fd)565 eina_thread_queue_fd_set(Eina_Thread_Queue *thq, int fd)
566 {
567    thq->fd = fd;
568 }
569 
570 EAPI int
eina_thread_queue_fd_get(const Eina_Thread_Queue * thq)571 eina_thread_queue_fd_get(const Eina_Thread_Queue *thq)
572 {
573    return thq->fd;
574 }
575