1 #ifdef HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 #include <unistd.h>
6 #include "Eina.h"
7 #include "eina_thread_queue.h"
8 #include "eina_safety_checks.h"
9 #include "eina_log.h"
10
11 #include "eina_private.h"
12
13 #ifdef __ATOMIC_RELAXED
14 #define ATOMIC 1
15 #endif
16
17 // use spinlocks for read/write locks as they lead to more throughput and
18 // these locks are meant to be held very temporarily, if there is any
19 // contention at all
20 #define RW_SPINLOCK 1
21
22 #ifdef RW_SPINLOCK
23 #define RWLOCK Eina_Spinlock
24 #define RWLOCK_NEW(x) eina_spinlock_new(x)
25 #define RWLOCK_FREE(x) eina_spinlock_free(x)
26 #define RWLOCK_LOCK(x) eina_spinlock_take(x)
27 #define RWLOCK_UNLOCK(x) eina_spinlock_release(x)
28 #else
29 #define RWLOCK Eina_Lock
30 #define RWLOCK_NEW(x) eina_lock_new(x)
31 #define RWLOCK_FREE(x) eina_lock_free(x)
32 #define RWLOCK_LOCK(x) eina_lock_take(x)
33 #define RWLOCK_UNLOCK(x) eina_lock_release(x)
34 #endif
35
36 typedef struct _Eina_Thread_Queue_Msg_Block Eina_Thread_Queue_Msg_Block;
37
38 struct _Eina_Thread_Queue
39 {
40 Eina_Thread_Queue_Msg_Block *data; // all the data being written to
41 Eina_Thread_Queue_Msg_Block *last; // the last block where new data goes
42 Eina_Thread_Queue_Msg_Block *read; // block when reading starts from data
43 Eina_Thread_Queue *parent; // parent queue to wake on send
44 RWLOCK lock_read; // a lock for when doing reads
45 RWLOCK lock_write; // a lock for doing writes
46 Eina_Semaphore sem; // signalling - one per message
47 #ifndef ATOMIC
48 Eina_Spinlock lock_pending; // lock for pending field
49 #endif
50 int pending; // how many messages left to read
51 int fd; // optional fd to write byte to on msg
52 };
53
54 struct _Eina_Thread_Queue_Msg_Block
55 {
56 Eina_Thread_Queue_Msg_Block *next; // next block in the list
57 Eina_Lock lock_non_0_ref; // block non-0 ref state
58 #ifndef ATOMIC
59 Eina_Spinlock lock_ref; // lock for ref field
60 Eina_Spinlock lock_first; // lock for first field
61 #endif
62 int ref; // the number of open reads/writes
63 int size; // the total allocated bytes of data[]
64 int first; // the byte pos of the first msg
65 int last; // the byte pos just after the last msg
66 Eina_Bool full : 1; // is this block full yet?
67 Eina_Thread_Queue_Msg data[1]; // data in memory beyond struct end
68 };
69
70 // the minimum size of any message block holding 1 or more messages
71 #define MIN_SIZE ((int)(4096 - sizeof(Eina_Thread_Queue_Msg_Block) + sizeof(Eina_Thread_Queue_Msg)))
72
73 // a pool of spare message blocks that are only of the minimum size so we
74 // avoid reallocation via malloc/free etc. to avoid free memory pages and
75 // pressure on the malloc subsystem
76 static int _eina_thread_queue_log_dom = -1;
77 static int _eina_thread_queue_block_pool_count = 0;
78 static Eina_Spinlock _eina_thread_queue_block_pool_lock;
79 static Eina_Thread_Queue_Msg_Block *_eina_thread_queue_block_pool = NULL;
80
81 #ifdef ERR
82 # undef ERR
83 #endif
84 #define ERR(...) EINA_LOG_DOM_ERR(_eina_thread_queue_log_dom, __VA_ARGS__)
85
86 #ifdef DBG
87 # undef DBG
88 #endif
89 #define DBG(...) EINA_LOG_DOM_DBG(_eina_thread_queue_log_dom, __VA_ARGS__)
90
91 // api's to get message blocks from the pool or put them back in
92 static Eina_Thread_Queue_Msg_Block *
_eina_thread_queue_msg_block_new(int size)93 _eina_thread_queue_msg_block_new(int size)
94 {
95 Eina_Thread_Queue_Msg_Block *blk;
96
97 eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
98 if (_eina_thread_queue_block_pool)
99 {
100 blk = _eina_thread_queue_block_pool;
101 if (blk->size >= size)
102 {
103 blk->first = 0;
104 blk->last = 0;
105 blk->ref = 0;
106 blk->full = 0;
107 _eina_thread_queue_block_pool = blk->next;
108 blk->next = NULL;
109 _eina_thread_queue_block_pool_count--;
110 eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
111 return blk;
112 }
113 blk = NULL;
114 }
115 eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
116
117 blk = malloc(sizeof(Eina_Thread_Queue_Msg_Block) -
118 sizeof(Eina_Thread_Queue_Msg) +
119 size);
120 if (!blk)
121 {
122 ERR("Thread queue block buffer of size %i allocation failed", size);
123 return NULL;
124 }
125 blk->next = NULL;
126 #ifndef ATOMIC
127 eina_spinlock_new(&(blk->lock_ref));
128 eina_spinlock_new(&(blk->lock_first));
129 #endif
130 eina_lock_new(&(blk->lock_non_0_ref));
131 blk->size = size;
132 blk->first = 0;
133 blk->last = 0;
134 blk->ref = 0;
135 blk->full = 0;
136 return blk;
137 }
138
139 static void
_eina_thread_queue_msg_block_real_free(Eina_Thread_Queue_Msg_Block * blk)140 _eina_thread_queue_msg_block_real_free(Eina_Thread_Queue_Msg_Block *blk)
141 {
142 eina_lock_take(&(blk->lock_non_0_ref));
143 eina_lock_release(&(blk->lock_non_0_ref));
144 eina_lock_free(&(blk->lock_non_0_ref));
145 #ifndef ATOMIC
146 eina_spinlock_take(&(blk->lock_ref));
147 eina_spinlock_release(&(blk->lock_ref));
148 eina_spinlock_free(&(blk->lock_ref));
149 eina_spinlock_take(&(blk->lock_first));
150 eina_spinlock_release(&(blk->lock_first));
151 eina_spinlock_free(&(blk->lock_first));
152 #endif
153 free(blk);
154 }
155
156 static void
_eina_thread_queue_msg_block_free(Eina_Thread_Queue_Msg_Block * blk)157 _eina_thread_queue_msg_block_free(Eina_Thread_Queue_Msg_Block *blk)
158 {
159 if (blk->size == MIN_SIZE)
160 {
161 eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
162 if (_eina_thread_queue_block_pool_count < 20)
163 {
164 _eina_thread_queue_block_pool_count++;
165 blk->next = _eina_thread_queue_block_pool;
166 _eina_thread_queue_block_pool = blk;
167 eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
168 }
169 else
170 {
171 eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
172 _eina_thread_queue_msg_block_real_free(blk);
173 }
174 }
175 else _eina_thread_queue_msg_block_real_free(blk);
176 }
177
178 static Eina_Bool
_eina_thread_queue_msg_block_pool_init(void)179 _eina_thread_queue_msg_block_pool_init(void)
180 {
181 return eina_spinlock_new(&_eina_thread_queue_block_pool_lock);
182 }
183
184 static void
_eina_thread_queue_msg_block_pool_shutdown(void)185 _eina_thread_queue_msg_block_pool_shutdown(void)
186 {
187 eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
188 while (_eina_thread_queue_block_pool)
189 {
190 Eina_Thread_Queue_Msg_Block *blk, *blknext;
191
192 for (;;)
193 {
194 blk = _eina_thread_queue_block_pool;
195 if (!blk) break;
196 blknext = blk->next;
197 _eina_thread_queue_msg_block_real_free(blk);
198 _eina_thread_queue_block_pool = blknext;
199 }
200 }
201 eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
202 eina_spinlock_free(&_eina_thread_queue_block_pool_lock);
203 }
204
205 // utility functions for waiting/waking threads
206 static void
_eina_thread_queue_wait(Eina_Thread_Queue * thq)207 _eina_thread_queue_wait(Eina_Thread_Queue *thq)
208 {
209 if (!eina_semaphore_lock(&(thq->sem)))
210 ERR("Thread queue semaphore lock/wait failed - bad things will happen");
211 }
212
213 static void
_eina_thread_queue_wake(Eina_Thread_Queue * thq)214 _eina_thread_queue_wake(Eina_Thread_Queue *thq)
215 {
216 if (!eina_semaphore_release(&(thq->sem), 1))
217 ERR("Thread queue semaphore release/wakeup faile - bad things will happen");
218 }
219
220 // how to allocate or release memory within one of the message blocks for
221 // an arbitrary sized bit of message data. the size always includes the
222 // message header which tells you the size of that message
223 static Eina_Thread_Queue_Msg *
_eina_thread_queue_msg_alloc(Eina_Thread_Queue * thq,int size,Eina_Thread_Queue_Msg_Block ** blkret)224 _eina_thread_queue_msg_alloc(Eina_Thread_Queue *thq, int size, Eina_Thread_Queue_Msg_Block **blkret)
225 {
226 Eina_Thread_Queue_Msg_Block *blk;
227 Eina_Thread_Queue_Msg *msg = NULL;
228 int ref;
229
230 // round up to nearest 8
231 size = ((size + 7) >> 3) << 3;
232 if (!thq->data)
233 {
234 if (size < MIN_SIZE)
235 thq->data = _eina_thread_queue_msg_block_new(MIN_SIZE);
236 else
237 thq->data = _eina_thread_queue_msg_block_new(size);
238 thq->last = thq->data;
239 }
240 blk = thq->last;
241 if (blk->full)
242 {
243 if (size < MIN_SIZE)
244 blk->next = _eina_thread_queue_msg_block_new(MIN_SIZE);
245 else
246 blk->next = _eina_thread_queue_msg_block_new(size);
247 blk = blk->next;
248 thq->last = blk;
249 }
250 if ((blk->size - blk->last) >= size)
251 {
252 blk->last += size;
253 if (blk->last == blk->size) blk->full = 1;
254 msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + (blk->last - size));
255 }
256 else
257 {
258 if (size < MIN_SIZE)
259 blk->next = _eina_thread_queue_msg_block_new(MIN_SIZE);
260 else
261 blk->next = _eina_thread_queue_msg_block_new(size);
262 blk = blk->next;
263 thq->last = blk;
264 blk->last += size;
265 if (blk->last == blk->size) blk->full = 1;
266 msg = (Eina_Thread_Queue_Msg *)(&(blk->data[0]));
267 }
268 msg->size = size;
269 #ifdef ATOMIC
270 ref = __atomic_add_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
271 #else
272 eina_spinlock_take(&(blk->lock_ref));
273 blk->ref++;
274 ref = blk->ref;
275 eina_spinlock_release(&(blk->lock_ref));
276 #endif
277 if (ref == 1) eina_lock_take(&(blk->lock_non_0_ref));
278 *blkret = blk;
279 return msg;
280 }
281
282 static void
_eina_thread_queue_msg_alloc_done(Eina_Thread_Queue_Msg_Block * blk)283 _eina_thread_queue_msg_alloc_done(Eina_Thread_Queue_Msg_Block *blk)
284 {
285 int ref;
286 #ifdef ATOMIC
287 ref = __atomic_sub_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
288 #else
289 eina_spinlock_take(&(blk->lock_ref));
290 blk->ref--;
291 ref = blk->ref;
292 eina_spinlock_release(&(blk->lock_ref));
293 #endif
294 if (ref == 0) eina_lock_release(&(blk->lock_non_0_ref));
295 }
296
297 static Eina_Thread_Queue_Msg *
_eina_thread_queue_msg_fetch(Eina_Thread_Queue * thq,Eina_Thread_Queue_Msg_Block ** blkret)298 _eina_thread_queue_msg_fetch(Eina_Thread_Queue *thq, Eina_Thread_Queue_Msg_Block **blkret)
299 {
300 Eina_Thread_Queue_Msg_Block *blk;
301 Eina_Thread_Queue_Msg *msg;
302 int ref, first;
303
304 if (!thq->read)
305 {
306 RWLOCK_LOCK(&(thq->lock_write));
307 blk = thq->data;
308 if (!blk)
309 {
310 RWLOCK_UNLOCK(&(thq->lock_write));
311 return NULL;
312 }
313 #ifdef ATOMIC
314 __atomic_load(&(blk->ref), &ref, __ATOMIC_RELAXED);
315 #else
316 eina_spinlock_take(&(blk->lock_ref));
317 ref = blk->ref;
318 eina_spinlock_release(&(blk->lock_ref));
319 #endif
320 if (ref > 0) eina_lock_take(&(blk->lock_non_0_ref));
321 thq->read = blk;
322 if (thq->last == blk) thq->last = blk->next;
323 thq->data = blk->next;
324 blk->next = NULL;
325 if (ref > 0) eina_lock_release(&(blk->lock_non_0_ref));
326 RWLOCK_UNLOCK(&(thq->lock_write));
327 }
328 blk = thq->read;
329 #ifdef ATOMIC
330 __atomic_load(&blk->first, &first, __ATOMIC_RELAXED);
331 msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + first);
332 first = __atomic_add_fetch(&(blk->first), msg->size, __ATOMIC_RELAXED);
333 #else
334 eina_spinlock_take(&blk->lock_first);
335 msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + blk->first);
336 first = blk->first += msg->size;
337 eina_spinlock_release(&blk->lock_first);
338 #endif
339 if (first >= blk->last) thq->read = NULL;
340 *blkret = blk;
341 #ifdef ATOMIC
342 __atomic_add_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
343 #else
344 eina_spinlock_take(&(blk->lock_ref));
345 blk->ref++;
346 eina_spinlock_release(&(blk->lock_ref));
347 #endif
348 return msg;
349 }
350
351 static void
_eina_thread_queue_msg_fetch_done(Eina_Thread_Queue_Msg_Block * blk)352 _eina_thread_queue_msg_fetch_done(Eina_Thread_Queue_Msg_Block *blk)
353 {
354 int ref, first;
355
356 #ifdef ATOMIC
357 ref = __atomic_sub_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
358 __atomic_load(&blk->first, &first, __ATOMIC_RELAXED);
359 #else
360 eina_spinlock_take(&(blk->lock_ref));
361 blk->ref--;
362 ref = blk->ref;
363 eina_spinlock_release(&(blk->lock_ref));
364 eina_spinlock_take(&blk->lock_first);
365 first = blk->first;
366 eina_spinlock_release(&blk->lock_first);
367 #endif
368 if ((first >= blk->last) && (ref == 0))
369 _eina_thread_queue_msg_block_free(blk);
370 }
371
372
373 //////////////////////////////////////////////////////////////////////////////
374 Eina_Bool
eina_thread_queue_init(void)375 eina_thread_queue_init(void)
376 {
377 _eina_thread_queue_log_dom = eina_log_domain_register("eina_thread_queue",
378 EINA_LOG_COLOR_DEFAULT);
379 if (_eina_thread_queue_log_dom < 0)
380 {
381 EINA_LOG_ERR("Could not register log domain: eina_thread_queue");
382 return EINA_FALSE;
383 }
384 if (!_eina_thread_queue_msg_block_pool_init())
385 {
386 ERR("Cannot init thread queue block pool spinlock");
387 return EINA_FALSE;
388 }
389 return EINA_TRUE;
390 }
391
392 Eina_Bool
eina_thread_queue_shutdown(void)393 eina_thread_queue_shutdown(void)
394 {
395 _eina_thread_queue_msg_block_pool_shutdown();
396 eina_log_domain_unregister(_eina_thread_queue_log_dom);
397 return EINA_TRUE;
398 }
399
400 EAPI Eina_Thread_Queue *
eina_thread_queue_new(void)401 eina_thread_queue_new(void)
402 {
403 Eina_Thread_Queue *thq;
404
405 thq = calloc(1, sizeof(Eina_Thread_Queue));
406 if (!thq)
407 {
408 ERR("Allocation of Thread queue structure failed");
409 return NULL;
410 }
411 thq->fd = -1;
412 if (!eina_semaphore_new(&(thq->sem), 0))
413 {
414 ERR("Cannot init new semaphore for eina_threadqueue");
415 free(thq);
416 return NULL;
417 }
418 RWLOCK_NEW(&(thq->lock_read));
419 RWLOCK_NEW(&(thq->lock_write));
420 #ifndef ATOMIC
421 eina_spinlock_new(&(thq->lock_pending));
422 #endif
423 return thq;
424 }
425
426 EAPI void
eina_thread_queue_free(Eina_Thread_Queue * thq)427 eina_thread_queue_free(Eina_Thread_Queue *thq)
428 {
429 if (!thq) return;
430
431 #ifndef ATOMIC
432 eina_spinlock_free(&(thq->lock_pending));
433 #endif
434 RWLOCK_FREE(&(thq->lock_read));
435 RWLOCK_FREE(&(thq->lock_write));
436 eina_semaphore_free(&(thq->sem));
437 free(thq);
438 }
439
440 EAPI void *
eina_thread_queue_send(Eina_Thread_Queue * thq,int size,void ** allocref)441 eina_thread_queue_send(Eina_Thread_Queue *thq, int size, void **allocref)
442 {
443 Eina_Thread_Queue_Msg *msg;
444 Eina_Thread_Queue_Msg_Block *blk;
445
446 RWLOCK_LOCK(&(thq->lock_write));
447 msg = _eina_thread_queue_msg_alloc(thq, size, &blk);
448 RWLOCK_UNLOCK(&(thq->lock_write));
449 *allocref = blk;
450 #ifdef ATOMIC
451 __atomic_add_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
452 #else
453 eina_spinlock_take(&(thq->lock_pending));
454 thq->pending++;
455 eina_spinlock_release(&(thq->lock_pending));
456 #endif
457 return msg;
458 }
459
460 EAPI void
eina_thread_queue_send_done(Eina_Thread_Queue * thq,void * allocref)461 eina_thread_queue_send_done(Eina_Thread_Queue *thq, void *allocref)
462 {
463 _eina_thread_queue_msg_alloc_done(allocref);
464 _eina_thread_queue_wake(thq);
465 if (thq->parent)
466 {
467 void *ref;
468 Eina_Thread_Queue_Msg_Sub *msg;
469
470 msg = eina_thread_queue_send(thq->parent,
471 sizeof(Eina_Thread_Queue_Msg_Sub), &ref);
472 if (msg)
473 {
474 msg->queue = thq;
475 eina_thread_queue_send_done(thq->parent, ref);
476 }
477 }
478 if (thq->fd >= 0)
479 {
480 char dummy = 0;
481 if (write(thq->fd, &dummy, 1) != 1)
482 ERR("Eina Threadqueue write to fd %i failed", thq->fd);
483 }
484 }
485
486 EAPI void *
eina_thread_queue_wait(Eina_Thread_Queue * thq,void ** allocref)487 eina_thread_queue_wait(Eina_Thread_Queue *thq, void **allocref)
488 {
489 Eina_Thread_Queue_Msg *msg;
490 Eina_Thread_Queue_Msg_Block *blk;
491
492 _eina_thread_queue_wait(thq);
493 RWLOCK_LOCK(&(thq->lock_read));
494 msg = _eina_thread_queue_msg_fetch(thq, &blk);
495 RWLOCK_UNLOCK(&(thq->lock_read));
496 *allocref = blk;
497 #ifdef ATOMIC
498 __atomic_sub_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
499 #else
500 eina_spinlock_take(&(thq->lock_pending));
501 thq->pending--;
502 eina_spinlock_release(&(thq->lock_pending));
503 #endif
504 return msg;
505 }
506
507 EAPI void
eina_thread_queue_wait_done(Eina_Thread_Queue * thq EINA_UNUSED,void * allocref)508 eina_thread_queue_wait_done(Eina_Thread_Queue *thq EINA_UNUSED, void *allocref)
509 {
510 _eina_thread_queue_msg_fetch_done(allocref);
511 }
512
513 EAPI void *
eina_thread_queue_poll(Eina_Thread_Queue * thq,void ** allocref)514 eina_thread_queue_poll(Eina_Thread_Queue *thq, void **allocref)
515 {
516 Eina_Thread_Queue_Msg *msg;
517 Eina_Thread_Queue_Msg_Block *blk;
518
519 RWLOCK_LOCK(&(thq->lock_read));
520 msg = _eina_thread_queue_msg_fetch(thq, &blk);
521 RWLOCK_UNLOCK(&(thq->lock_read));
522 if (msg)
523 {
524 _eina_thread_queue_wait(thq);
525 *allocref = blk;
526 #ifdef ATOMIC
527 __atomic_sub_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
528 #else
529 eina_spinlock_take(&(thq->lock_pending));
530 thq->pending--;
531 eina_spinlock_release(&(thq->lock_pending));
532 #endif
533 }
534 return msg;
535 }
536
537 EAPI int
eina_thread_queue_pending_get(const Eina_Thread_Queue * thq)538 eina_thread_queue_pending_get(const Eina_Thread_Queue *thq)
539 {
540 int pending;
541
542 #ifdef ATOMIC
543 __atomic_load(&(thq->pending), &pending, __ATOMIC_RELAXED);
544 #else
545 eina_spinlock_take((Eina_Spinlock *)&(thq->lock_pending));
546 pending = thq->pending;
547 eina_spinlock_release((Eina_Spinlock *)&(thq->lock_pending));
548 #endif
549 return pending;
550 }
551
552 EAPI void
eina_thread_queue_parent_set(Eina_Thread_Queue * thq,Eina_Thread_Queue * thq_parent)553 eina_thread_queue_parent_set(Eina_Thread_Queue *thq, Eina_Thread_Queue *thq_parent)
554 {
555 thq->parent = thq_parent;
556 }
557
558 EAPI Eina_Thread_Queue *
eina_thread_queue_parent_get(const Eina_Thread_Queue * thq)559 eina_thread_queue_parent_get(const Eina_Thread_Queue *thq)
560 {
561 return thq->parent;
562 }
563
564 EAPI void
eina_thread_queue_fd_set(Eina_Thread_Queue * thq,int fd)565 eina_thread_queue_fd_set(Eina_Thread_Queue *thq, int fd)
566 {
567 thq->fd = fd;
568 }
569
570 EAPI int
eina_thread_queue_fd_get(const Eina_Thread_Queue * thq)571 eina_thread_queue_fd_get(const Eina_Thread_Queue *thq)
572 {
573 return thq->fd;
574 }
575