1 /*
2  * libetp implementation
3  *
4  * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without modifica-
8  * tion, are permitted provided that the following conditions are met:
9  *
10  *   1.  Redistributions of source code must retain the above copyright notice,
11  *       this list of conditions and the following disclaimer.
12  *
13  *   2.  Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in the
15  *       documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19  * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO
20  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21  * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25  * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26  * OF THE POSSIBILITY OF SUCH DAMAGE.
27  *
28  * Alternatively, the contents of this file may be used under the terms of
29  * the GNU General Public License ("GPL") version 2 or any later version,
30  * in which case the provisions of the GPL are applicable instead of
31  * the above. If you wish to allow the use of your version of this file
32  * only under the terms of the GPL and not to allow others to use your
33  * version of this file under the BSD license, indicate your decision
34  * by deleting the provisions above and replace them with the notice
35  * and other provisions required by the GPL. If you do not delete the
36  * provisions above, a recipient may use your version of this file under
37  * either the BSD or the GPL.
38  */
39 
40 #if HAVE_SYS_PRCTL_H
41 # include <sys/prctl.h>
42 #endif
43 
44 #ifdef EIO_STACKSIZE
45 # define X_STACKSIZE EIO_STACKSIZE
46 #endif
47 #include "xthread.h"
48 
49 #ifndef ETP_API_DECL
50 # define ETP_API_DECL static
51 #endif
52 
53 #ifndef ETP_PRI_MIN
54 # define ETP_PRI_MIN 0
55 # define ETP_PRI_MAX 0
56 #endif
57 
58 #ifndef ETP_TYPE_QUIT
59 # define ETP_TYPE_QUIT 0
60 #endif
61 
62 #ifndef ETP_TYPE_GROUP
63 # define ETP_TYPE_GROUP 1
64 #endif
65 
66 #ifndef ETP_WANT_POLL
67 # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
68 #endif
69 #ifndef ETP_DONE_POLL
70 # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
71 #endif
72 
73 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
74 
75 #define ETP_TICKS ((1000000 + 1023) >> 10)
76 
77 enum {
78   ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
79   ETP_FLAG_DELAYED  = 0x08, /* groiup request has been delayed */
80 };
81 
82 /* calculate time difference in ~1/ETP_TICKS of a second */
83 ecb_inline int
etp_tvdiff(struct timeval * tv1,struct timeval * tv2)84 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
85 {
86   return  (tv2->tv_sec  - tv1->tv_sec ) * ETP_TICKS
87        + ((tv2->tv_usec - tv1->tv_usec) >> 10);
88 }
89 
90 struct etp_tmpbuf
91 {
92   void *ptr;
93   int len;
94 };
95 
96 static void *
etp_tmpbuf_get(struct etp_tmpbuf * buf,int len)97 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
98 {
99   if (buf->len < len)
100     {
101       free (buf->ptr);
102       buf->ptr = malloc (buf->len = len);
103     }
104 
105   return buf->ptr;
106 }
107 
108 /*
109  * a somewhat faster data structure might be nice, but
110  * with 8 priorities this actually needs <20 insns
111  * per shift, the most expensive operation.
112  */
113 typedef struct
114 {
115   ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
116   int size;
117 } etp_reqq;
118 
119 typedef struct etp_pool *etp_pool;
120 
121 typedef struct etp_worker
122 {
123   etp_pool pool;
124 
125   struct etp_tmpbuf tmpbuf;
126 
127   /* locked by pool->wrklock */
128   struct etp_worker *prev, *next;
129 
130   xthread_t tid;
131 
132 #ifdef ETP_WORKER_COMMON
133   ETP_WORKER_COMMON
134 #endif
135 } etp_worker;
136 
137 struct etp_pool
138 {
139    void *userdata;
140 
141    etp_reqq req_queue;
142    etp_reqq res_queue;
143 
144    unsigned int started, idle, wanted;
145 
146    unsigned int max_poll_time;     /* pool->reslock */
147    unsigned int max_poll_reqs;     /* pool->reslock */
148 
149    unsigned int nreqs;    /* pool->reqlock */
150    unsigned int nready;   /* pool->reqlock */
151    unsigned int npending; /* pool->reqlock */
152    unsigned int max_idle;      /* maximum number of threads that can pool->idle indefinitely */
153    unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
154 
155    void (*want_poll_cb) (void *userdata);
156    void (*done_poll_cb) (void *userdata);
157 
158    xmutex_t wrklock;
159    xmutex_t reslock;
160    xmutex_t reqlock;
161    xcond_t  reqwait;
162 
163    etp_worker wrk_first;
164 };
165 
166 #define ETP_WORKER_LOCK(wrk)   X_LOCK   (pool->wrklock)
167 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
168 
169 /* worker threads management */
170 
171 static void
etp_worker_clear(etp_worker * wrk)172 etp_worker_clear (etp_worker *wrk)
173 {
174 }
175 
176 static void ecb_cold
etp_worker_free(etp_worker * wrk)177 etp_worker_free (etp_worker *wrk)
178 {
179   free (wrk->tmpbuf.ptr);
180 
181   wrk->next->prev = wrk->prev;
182   wrk->prev->next = wrk->next;
183 
184   free (wrk);
185 }
186 
187 ETP_API_DECL unsigned int
etp_nreqs(etp_pool pool)188 etp_nreqs (etp_pool pool)
189 {
190   int retval;
191   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reqlock);
192   retval = pool->nreqs;
193   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
194   return retval;
195 }
196 
197 ETP_API_DECL unsigned int
etp_nready(etp_pool pool)198 etp_nready (etp_pool pool)
199 {
200   unsigned int retval;
201 
202   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reqlock);
203   retval = pool->nready;
204   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
205 
206   return retval;
207 }
208 
209 ETP_API_DECL unsigned int
etp_npending(etp_pool pool)210 etp_npending (etp_pool pool)
211 {
212   unsigned int retval;
213 
214   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reqlock);
215   retval = pool->npending;
216   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
217 
218   return retval;
219 }
220 
221 ETP_API_DECL unsigned int
etp_nthreads(etp_pool pool)222 etp_nthreads (etp_pool pool)
223 {
224   unsigned int retval;
225 
226   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reqlock);
227   retval = pool->started;
228   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
229 
230   return retval;
231 }
232 
233 static void ecb_noinline ecb_cold
reqq_init(etp_reqq * q)234 reqq_init (etp_reqq *q)
235 {
236   int pri;
237 
238   for (pri = 0; pri < ETP_NUM_PRI; ++pri)
239     q->qs[pri] = q->qe[pri] = 0;
240 
241   q->size = 0;
242 }
243 
244 static int ecb_noinline
reqq_push(etp_reqq * q,ETP_REQ * req)245 reqq_push (etp_reqq *q, ETP_REQ *req)
246 {
247   int pri = req->pri;
248   req->next = 0;
249 
250   if (q->qe[pri])
251     {
252       q->qe[pri]->next = req;
253       q->qe[pri] = req;
254     }
255   else
256     q->qe[pri] = q->qs[pri] = req;
257 
258   return q->size++;
259 }
260 
261 static ETP_REQ * ecb_noinline
reqq_shift(etp_reqq * q)262 reqq_shift (etp_reqq *q)
263 {
264   int pri;
265 
266   if (!q->size)
267     return 0;
268 
269   --q->size;
270 
271   for (pri = ETP_NUM_PRI; pri--; )
272     {
273       ETP_REQ *req = q->qs[pri];
274 
275       if (req)
276         {
277           if (!(q->qs[pri] = (ETP_REQ *)req->next))
278             q->qe[pri] = 0;
279 
280           return req;
281         }
282     }
283 
284   abort ();
285 }
286 
287 ETP_API_DECL int ecb_cold
etp_init(etp_pool pool,void * userdata,void (* want_poll)(void * userdata),void (* done_poll)(void * userdata))288 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
289 {
290   X_MUTEX_CREATE (pool->wrklock);
291   X_MUTEX_CREATE (pool->reslock);
292   X_MUTEX_CREATE (pool->reqlock);
293   X_COND_CREATE  (pool->reqwait);
294 
295   reqq_init (&pool->req_queue);
296   reqq_init (&pool->res_queue);
297 
298   pool->wrk_first.next =
299   pool->wrk_first.prev = &pool->wrk_first;
300 
301   pool->started  = 0;
302   pool->idle     = 0;
303   pool->nreqs    = 0;
304   pool->nready   = 0;
305   pool->npending = 0;
306   pool->wanted   = 4;
307 
308   pool->max_idle = 4;      /* maximum number of threads that can pool->idle indefinitely */
309   pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
310 
311   pool->userdata     = userdata;
312   pool->want_poll_cb = want_poll;
313   pool->done_poll_cb = done_poll;
314 
315   return 0;
316 }
317 
318 static void ecb_noinline ecb_cold
etp_proc_init(void)319 etp_proc_init (void)
320 {
321 #if HAVE_PRCTL_SET_NAME
322   /* provide a more sensible "thread name" */
323   char name[16 + 1];
324   const int namelen = sizeof (name) - 1;
325   int len;
326 
327   prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
328   name [namelen] = 0;
329   len = strlen (name);
330   strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
331   prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
332 #endif
333 }
334 
X_THREAD_PROC(etp_proc)335 X_THREAD_PROC (etp_proc)
336 {
337   ETP_REQ *req;
338   struct timespec ts;
339   etp_worker *self = (etp_worker *)thr_arg;
340   etp_pool pool = self->pool;
341 
342   etp_proc_init ();
343 
344   /* try to distribute timeouts somewhat evenly */
345   ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
346 
347   for (;;)
348     {
349       ts.tv_sec = 0;
350 
351       X_LOCK (pool->reqlock);
352 
353       for (;;)
354         {
355           req = reqq_shift (&pool->req_queue);
356 
357           if (ecb_expect_true (req))
358             break;
359 
360           if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
361             {
362               X_UNLOCK (pool->reqlock);
363               X_LOCK (pool->wrklock);
364               --pool->started;
365               X_UNLOCK (pool->wrklock);
366               goto quit;
367             }
368 
369           ++pool->idle;
370 
371           if (pool->idle <= pool->max_idle)
372             /* we are allowed to pool->idle, so do so without any timeout */
373             X_COND_WAIT (pool->reqwait, pool->reqlock);
374           else
375             {
376               /* initialise timeout once */
377               if (!ts.tv_sec)
378                 ts.tv_sec = time (0) + pool->idle_timeout;
379 
380               if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
381                 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
382             }
383 
384           --pool->idle;
385         }
386 
387       --pool->nready;
388 
389       X_UNLOCK (pool->reqlock);
390 
391       if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
392         goto quit;
393 
394       ETP_EXECUTE (self, req);
395 
396       X_LOCK (pool->reslock);
397 
398       ++pool->npending;
399 
400       if (!reqq_push (&pool->res_queue, req))
401         ETP_WANT_POLL (pool);
402 
403       etp_worker_clear (self);
404 
405       X_UNLOCK (pool->reslock);
406     }
407 
408 quit:
409   free (req);
410 
411   X_LOCK (pool->wrklock);
412   etp_worker_free (self);
413   X_UNLOCK (pool->wrklock);
414 
415   return 0;
416 }
417 
418 static void ecb_cold
etp_start_thread(etp_pool pool)419 etp_start_thread (etp_pool pool)
420 {
421   etp_worker *wrk = calloc (1, sizeof (etp_worker));
422 
423   /*TODO*/
424   assert (("unable to allocate worker thread data", wrk));
425 
426   wrk->pool = pool;
427 
428   X_LOCK (pool->wrklock);
429 
430   if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
431     {
432       wrk->prev = &pool->wrk_first;
433       wrk->next = pool->wrk_first.next;
434       pool->wrk_first.next->prev = wrk;
435       pool->wrk_first.next = wrk;
436       ++pool->started;
437     }
438   else
439     free (wrk);
440 
441   X_UNLOCK (pool->wrklock);
442 }
443 
444 static void
etp_maybe_start_thread(etp_pool pool)445 etp_maybe_start_thread (etp_pool pool)
446 {
447   if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
448     return;
449 
450   /* todo: maybe use pool->idle here, but might be less exact */
451   if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
452     return;
453 
454   etp_start_thread (pool);
455 }
456 
457 static void ecb_cold
etp_end_thread(etp_pool pool)458 etp_end_thread (etp_pool pool)
459 {
460   ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
461 
462   req->type = ETP_TYPE_QUIT;
463   req->pri  = ETP_PRI_MAX - ETP_PRI_MIN;
464 
465   X_LOCK (pool->reqlock);
466   reqq_push (&pool->req_queue, req);
467   X_COND_SIGNAL (pool->reqwait);
468   X_UNLOCK (pool->reqlock);
469 
470   X_LOCK (pool->wrklock);
471   --pool->started;
472   X_UNLOCK (pool->wrklock);
473 }
474 
475 ETP_API_DECL int
etp_poll(etp_pool pool)476 etp_poll (etp_pool pool)
477 {
478   unsigned int maxreqs;
479   unsigned int maxtime;
480   struct timeval tv_start, tv_now;
481 
482   X_LOCK (pool->reslock);
483   maxreqs = pool->max_poll_reqs;
484   maxtime = pool->max_poll_time;
485   X_UNLOCK (pool->reslock);
486 
487   if (maxtime)
488     gettimeofday (&tv_start, 0);
489 
490   for (;;)
491     {
492       ETP_REQ *req;
493 
494       etp_maybe_start_thread (pool);
495 
496       X_LOCK (pool->reslock);
497       req = reqq_shift (&pool->res_queue);
498 
499       if (ecb_expect_true (req))
500         {
501           --pool->npending;
502 
503           if (!pool->res_queue.size)
504             ETP_DONE_POLL (pool);
505         }
506 
507       X_UNLOCK (pool->reslock);
508 
509       if (ecb_expect_false (!req))
510         return 0;
511 
512       X_LOCK (pool->reqlock);
513       --pool->nreqs;
514       X_UNLOCK (pool->reqlock);
515 
516       if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
517         {
518           req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
519           continue;
520         }
521       else
522         {
523           int res = ETP_FINISH (req);
524           if (ecb_expect_false (res))
525             return res;
526         }
527 
528       if (ecb_expect_false (maxreqs && !--maxreqs))
529         break;
530 
531       if (maxtime)
532         {
533           gettimeofday (&tv_now, 0);
534 
535           if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
536             break;
537         }
538     }
539 
540   errno = EAGAIN;
541   return -1;
542 }
543 
544 ETP_API_DECL void
545 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
546 
547 ETP_API_DECL void
etp_cancel(etp_pool pool,ETP_REQ * req)548 etp_cancel (etp_pool pool, ETP_REQ *req)
549 {
550   req->cancelled = 1;
551 
552   etp_grp_cancel (pool, req);
553 }
554 
555 ETP_API_DECL void
etp_grp_cancel(etp_pool pool,ETP_REQ * grp)556 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
557 {
558   for (grp = grp->grp_first; grp; grp = grp->grp_next)
559     etp_cancel (pool, grp);
560 }
561 
562 ETP_API_DECL void
etp_submit(etp_pool pool,ETP_REQ * req)563 etp_submit (etp_pool pool, ETP_REQ *req)
564 {
565   req->pri -= ETP_PRI_MIN;
566 
567   if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
568   if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
569 
570   if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
571     {
572       /* I hope this is worth it :/ */
573       X_LOCK (pool->reqlock);
574       ++pool->nreqs;
575       X_UNLOCK (pool->reqlock);
576 
577       X_LOCK (pool->reslock);
578 
579       ++pool->npending;
580 
581       if (!reqq_push (&pool->res_queue, req))
582         ETP_WANT_POLL (pool);
583 
584       X_UNLOCK (pool->reslock);
585     }
586   else
587     {
588       X_LOCK (pool->reqlock);
589       ++pool->nreqs;
590       ++pool->nready;
591       reqq_push (&pool->req_queue, req);
592       X_COND_SIGNAL (pool->reqwait);
593       X_UNLOCK (pool->reqlock);
594 
595       etp_maybe_start_thread (pool);
596     }
597 }
598 
599 ETP_API_DECL void ecb_cold
etp_set_max_poll_time(etp_pool pool,double seconds)600 etp_set_max_poll_time (etp_pool pool, double seconds)
601 {
602   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reslock);
603   pool->max_poll_time = seconds * ETP_TICKS;
604   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
605 }
606 
607 ETP_API_DECL void ecb_cold
etp_set_max_poll_reqs(etp_pool pool,unsigned int maxreqs)608 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
609 {
610   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reslock);
611   pool->max_poll_reqs = maxreqs;
612   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
613 }
614 
615 ETP_API_DECL void ecb_cold
etp_set_max_idle(etp_pool pool,unsigned int threads)616 etp_set_max_idle (etp_pool pool, unsigned int threads)
617 {
618   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reqlock);
619   pool->max_idle = threads;
620   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
621 }
622 
623 ETP_API_DECL void ecb_cold
etp_set_idle_timeout(etp_pool pool,unsigned int seconds)624 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
625 {
626   if (WORDACCESS_UNSAFE) X_LOCK   (pool->reqlock);
627   pool->idle_timeout = seconds;
628   if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
629 }
630 
631 ETP_API_DECL void ecb_cold
etp_set_min_parallel(etp_pool pool,unsigned int threads)632 etp_set_min_parallel (etp_pool pool, unsigned int threads)
633 {
634   if (pool->wanted < threads)
635     pool->wanted = threads;
636 }
637 
638 ETP_API_DECL void ecb_cold
etp_set_max_parallel(etp_pool pool,unsigned int threads)639 etp_set_max_parallel (etp_pool pool, unsigned int threads)
640 {
641   if (pool->wanted > threads)
642     pool->wanted = threads;
643 
644   while (pool->started > pool->wanted)
645     etp_end_thread (pool);
646 }
647 
648