1 /*
2 * Copyright (c) 2013, 2014, 2016-2017 by Farsight Security, Inc.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files (the
6 * "Software"), to deal in the Software without restriction, including
7 * without limitation the rights to use, copy, modify, merge, publish,
8 * distribute, sublicense, and/or sell copies of the Software, and to
9 * permit persons to whom the Software is furnished to do so, subject to
10 * the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included
13 * in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 *
23 */
24
25 #include "fstrm-private.h"
26
27 static void *fstrm__iothr_thr(void *);
28
29 struct fstrm_iothr_options {
30 unsigned buffer_hint;
31 unsigned flush_timeout;
32 unsigned input_queue_size;
33 unsigned num_input_queues;
34 unsigned output_queue_size;
35 unsigned queue_notify_threshold;
36 unsigned reopen_interval;
37 fstrm_iothr_queue_model queue_model;
38 };
39
40 static const struct fstrm_iothr_options default_fstrm_iothr_options = {
41 .buffer_hint = FSTRM_IOTHR_BUFFER_HINT_DEFAULT,
42 .flush_timeout = FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT,
43 .input_queue_size = FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT,
44 .num_input_queues = FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT,
45 .output_queue_size = FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT,
46 .queue_model = FSTRM_IOTHR_QUEUE_MODEL_DEFAULT,
47 .queue_notify_threshold = FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT,
48 .reopen_interval = FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT,
49 };
50
51 struct fstrm_iothr_queue {
52 struct my_queue *q;
53 };
54
55 struct fstrm__iothr_queue_entry {
56 /* The deallocation callback. */
57 void (*free_func)(void *, void *);
58 void *free_data;
59
60 /* The actual payload bytes, allocated by the caller. */
61 void *data;
62
63 /* Number of bytes in 'data'. */
64 uint32_t len_data;
65 };
66
67 struct fstrm_iothr {
68 /* The I/O thread. */
69 pthread_t thr;
70
71 /* Copy of options. supplied by caller. */
72 struct fstrm_iothr_options opt;
73
74 /* Queue implementation. */
75 const struct my_queue_ops *queue_ops;
76
77 /* Writer. */
78 struct fstrm_writer *writer;
79
80 /* Whether the writer is opened or not. */
81 bool opened;
82
83 /* Last time the writer's 'open' method was called. */
84 time_t last_open_attempt;
85
86 /* Allocated array of input queues, size opt.num_input_queues. */
87 struct fstrm_iothr_queue *queues;
88
89 /* Whether the I/O thread is shutting down. */
90 volatile bool shutting_down;
91
92 #if HAVE_CLOCK_GETTIME
93 /* Optimal clockid_t's. */
94 clockid_t clkid_gettime;
95 clockid_t clkid_pthread;
96 #endif
97
98 /*
99 * Conditional variable and lock, used by producer thread
100 * (fstrm_iothr_submit) to signal sleeping I/O thread that the low
101 * watermark (opt.queue_notify_threshold) has been reached.
102 */
103 pthread_cond_t cv;
104 pthread_mutex_t cv_lock;
105
106 /* Used to return unique queues from fstrm_iothr_get_queue(). */
107 pthread_mutex_t get_queue_lock;
108 unsigned get_queue_idx;
109
110 /* Output queue. */
111 unsigned outq_idx;
112 struct iovec *outq_iov;
113 struct fstrm__iothr_queue_entry *outq_entries;
114 unsigned outq_nbytes;
115 };
116
117 struct fstrm_iothr_options *
fstrm_iothr_options_init(void)118 fstrm_iothr_options_init(void)
119 {
120 struct fstrm_iothr_options *opt;
121 opt = my_malloc(sizeof(*opt));
122 memmove(opt, &default_fstrm_iothr_options, sizeof(*opt));
123 return opt;
124 }
125
126 void
fstrm_iothr_options_destroy(struct fstrm_iothr_options ** opt)127 fstrm_iothr_options_destroy(struct fstrm_iothr_options **opt)
128 {
129 if (*opt != NULL)
130 my_free(*opt);
131 }
132
133 fstrm_res
fstrm_iothr_options_set_buffer_hint(struct fstrm_iothr_options * opt,unsigned buffer_hint)134 fstrm_iothr_options_set_buffer_hint(struct fstrm_iothr_options *opt,
135 unsigned buffer_hint)
136 {
137 if (buffer_hint < FSTRM_IOTHR_BUFFER_HINT_MIN ||
138 buffer_hint > FSTRM_IOTHR_BUFFER_HINT_MAX)
139 {
140 return fstrm_res_failure;
141 }
142 opt->buffer_hint = buffer_hint;
143 return fstrm_res_success;
144 }
145
146 fstrm_res
fstrm_iothr_options_set_flush_timeout(struct fstrm_iothr_options * opt,unsigned flush_timeout)147 fstrm_iothr_options_set_flush_timeout(struct fstrm_iothr_options *opt,
148 unsigned flush_timeout)
149 {
150 if (flush_timeout < FSTRM_IOTHR_FLUSH_TIMEOUT_MIN ||
151 flush_timeout > FSTRM_IOTHR_FLUSH_TIMEOUT_MAX)
152 {
153 return fstrm_res_failure;
154 }
155 opt->flush_timeout = flush_timeout;
156 return fstrm_res_success;
157 }
158
159 fstrm_res
fstrm_iothr_options_set_input_queue_size(struct fstrm_iothr_options * opt,unsigned input_queue_size)160 fstrm_iothr_options_set_input_queue_size(struct fstrm_iothr_options *opt,
161 unsigned input_queue_size)
162 {
163 if (input_queue_size < FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN ||
164 input_queue_size > FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX ||
165 input_queue_size & 1)
166 {
167 return fstrm_res_failure;
168 }
169 opt->input_queue_size = input_queue_size;
170 return fstrm_res_success;
171 }
172
173 fstrm_res
fstrm_iothr_options_set_num_input_queues(struct fstrm_iothr_options * opt,unsigned num_input_queues)174 fstrm_iothr_options_set_num_input_queues(struct fstrm_iothr_options *opt,
175 unsigned num_input_queues)
176 {
177 if (num_input_queues < FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN)
178 return fstrm_res_failure;
179 opt->num_input_queues = num_input_queues;
180 return fstrm_res_success;
181 }
182
183 fstrm_res
fstrm_iothr_options_set_output_queue_size(struct fstrm_iothr_options * opt,unsigned output_queue_size)184 fstrm_iothr_options_set_output_queue_size(struct fstrm_iothr_options *opt,
185 unsigned output_queue_size)
186 {
187 if (output_queue_size < FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN ||
188 output_queue_size > FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX)
189 {
190 return fstrm_res_failure;
191 }
192 opt->output_queue_size = output_queue_size;
193 return fstrm_res_success;
194 }
195
196 fstrm_res
fstrm_iothr_options_set_queue_model(struct fstrm_iothr_options * opt,fstrm_iothr_queue_model queue_model)197 fstrm_iothr_options_set_queue_model(struct fstrm_iothr_options *opt,
198 fstrm_iothr_queue_model queue_model)
199 {
200 if (queue_model != FSTRM_IOTHR_QUEUE_MODEL_SPSC &&
201 queue_model != FSTRM_IOTHR_QUEUE_MODEL_MPSC)
202 {
203 return fstrm_res_failure;
204 }
205 opt->queue_model = queue_model;
206 return fstrm_res_success;
207 }
208
209 fstrm_res
fstrm_iothr_options_set_queue_notify_threshold(struct fstrm_iothr_options * opt,unsigned queue_notify_threshold)210 fstrm_iothr_options_set_queue_notify_threshold(struct fstrm_iothr_options *opt,
211 unsigned queue_notify_threshold)
212 {
213 if (queue_notify_threshold < FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN)
214 return fstrm_res_failure;
215 opt->queue_notify_threshold = queue_notify_threshold;
216 return fstrm_res_success;
217 }
218
219 fstrm_res
fstrm_iothr_options_set_reopen_interval(struct fstrm_iothr_options * opt,unsigned reopen_interval)220 fstrm_iothr_options_set_reopen_interval(struct fstrm_iothr_options *opt,
221 unsigned reopen_interval)
222 {
223 if (reopen_interval < FSTRM_IOTHR_REOPEN_INTERVAL_MIN ||
224 reopen_interval > FSTRM_IOTHR_REOPEN_INTERVAL_MAX)
225 {
226 return fstrm_res_failure;
227 }
228 opt->reopen_interval = reopen_interval;
229 return fstrm_res_success;
230 }
231
232 struct fstrm_iothr *
fstrm_iothr_init(const struct fstrm_iothr_options * opt,struct fstrm_writer ** writer)233 fstrm_iothr_init(const struct fstrm_iothr_options *opt,
234 struct fstrm_writer **writer)
235 {
236 struct fstrm_iothr *iothr = NULL;
237
238 int res;
239 pthread_condattr_t ca;
240
241 /* Initialize fstrm_iothr and copy options. */
242 iothr = my_calloc(1, sizeof(*iothr));
243 if (opt == NULL)
244 opt = &default_fstrm_iothr_options;
245 memmove(&iothr->opt, opt, sizeof(iothr->opt));
246
247 /*
248 * Some platforms have a ridiculously low IOV_MAX, literally the lowest
249 * value even allowed by POSIX, which is lower than our conservative
250 * FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT. Accommodate these platforms by
251 * silently clamping output_queue_size to IOV_MAX.
252 */
253 if (iothr->opt.output_queue_size > IOV_MAX)
254 iothr->opt.output_queue_size = IOV_MAX;
255
256 /*
257 * Set the queue implementation.
258 *
259 * The memory barrier based queue implementation is the only one of our
260 * queue implementations that supports SPSC, so if it is not available,
261 * use the mutex based queue implementation instead. The mutex
262 * implementation is technically MPSC, but MPSC is strictly stronger
263 * than SPSC.
264 */
265 if (iothr->opt.queue_model == FSTRM_IOTHR_QUEUE_MODEL_SPSC) {
266 #ifdef MY_HAVE_MEMORY_BARRIERS
267 iothr->queue_ops = &my_queue_mb_ops;
268 #else
269 iothr->queue_ops = &my_queue_mutex_ops;
270 #endif
271 } else {
272 iothr->queue_ops = &my_queue_mutex_ops;
273 }
274
275 #if HAVE_CLOCK_GETTIME
276 /* Detect best clocks. */
277 if (!fstrm__get_best_monotonic_clocks(&iothr->clkid_gettime,
278 &iothr->clkid_pthread,
279 NULL))
280 {
281 goto fail;
282 }
283 #endif
284
285 /* Initialize the input queues. */
286 iothr->queues = my_calloc(iothr->opt.num_input_queues,
287 sizeof(struct fstrm_iothr_queue));
288 for (size_t i = 0; i < iothr->opt.num_input_queues; i++) {
289 iothr->queues[i].q = iothr->queue_ops->init(iothr->opt.input_queue_size,
290 sizeof(struct fstrm__iothr_queue_entry));
291 if (iothr->queues[i].q == NULL)
292 goto fail;
293 }
294
295 /* Initialize the output queue. */
296 iothr->outq_iov = my_calloc(iothr->opt.output_queue_size,
297 sizeof(struct iovec));
298 iothr->outq_entries = my_calloc(iothr->opt.output_queue_size,
299 sizeof(struct fstrm__iothr_queue_entry));
300
301 /* Initialize the condition variable. */
302 res = pthread_condattr_init(&ca);
303 assert(res == 0);
304
305 #if HAVE_CLOCK_GETTIME && HAVE_PTHREAD_CONDATTR_SETCLOCK
306 res = pthread_condattr_setclock(&ca, iothr->clkid_pthread);
307 assert(res == 0);
308 #endif
309
310 res = pthread_cond_init(&iothr->cv, &ca);
311 assert(res == 0);
312
313 res = pthread_condattr_destroy(&ca);
314 assert(res == 0);
315
316 /* Initialize the mutex protecting the condition variable. */
317 res = pthread_mutex_init(&iothr->cv_lock, NULL);
318 assert(res == 0);
319
320 /* Initialize the mutex protecting fstrm_iothr_get_queue(). */
321 res = pthread_mutex_init(&iothr->get_queue_lock, NULL);
322 assert(res == 0);
323
324 /* Take the caller's writer. */
325 iothr->writer = *writer;
326 *writer = NULL;
327
328 /* Start the I/O thread. */
329 res = pthread_create(&iothr->thr, NULL, fstrm__iothr_thr, iothr);
330 assert(res == 0);
331
332 return iothr;
333 fail:
334 fstrm_iothr_destroy(&iothr);
335 return NULL;
336 }
337
338 static inline void
fstrm__iothr_queue_entry_free_bytes(struct fstrm__iothr_queue_entry * entry)339 fstrm__iothr_queue_entry_free_bytes(struct fstrm__iothr_queue_entry *entry)
340 {
341 if (entry->free_func != NULL)
342 entry->free_func(entry->data, entry->free_data);
343 }
344
345 static void
fstrm__iothr_free_queues(struct fstrm_iothr * iothr)346 fstrm__iothr_free_queues(struct fstrm_iothr *iothr)
347 {
348 size_t i;
349 for (i = 0; i < iothr->opt.num_input_queues; i++) {
350 struct my_queue *queue;
351 struct fstrm__iothr_queue_entry entry;
352
353 queue = iothr->queues[i].q;
354 while (iothr->queue_ops->remove(queue, &entry, NULL))
355 fstrm__iothr_queue_entry_free_bytes(&entry);
356 iothr->queue_ops->destroy(&queue);
357 }
358 my_free(iothr->queues);
359 }
360
361 void
fstrm_iothr_destroy(struct fstrm_iothr ** iothr)362 fstrm_iothr_destroy(struct fstrm_iothr **iothr)
363 {
364 if (*iothr != NULL) {
365 /*
366 * Signal the I/O thread that a shutdown is in progress.
367 * This waits for the I/O thread to finish.
368 */
369 (*iothr)->shutting_down = true;
370 pthread_cond_signal(&(*iothr)->cv);
371 pthread_join((*iothr)->thr, NULL);
372 pthread_cond_destroy(&(*iothr)->cv);
373 pthread_mutex_destroy(&(*iothr)->cv_lock);
374 pthread_mutex_destroy(&(*iothr)->get_queue_lock);
375
376 /* Destroy the writer by calling its 'destroy' method. */
377 (void)fstrm_writer_destroy(&(*iothr)->writer);
378
379 /* Cleanup our allocations. */
380 fstrm__iothr_free_queues(*iothr);
381 my_free((*iothr)->outq_iov);
382 my_free((*iothr)->outq_entries);
383 my_free(*iothr);
384 }
385 }
386
387 struct fstrm_iothr_queue *
fstrm_iothr_get_input_queue(struct fstrm_iothr * iothr)388 fstrm_iothr_get_input_queue(struct fstrm_iothr *iothr)
389 {
390 struct fstrm_iothr_queue *q = NULL;
391
392 pthread_mutex_lock(&iothr->get_queue_lock);
393 if (iothr->get_queue_idx < iothr->opt.num_input_queues) {
394 q = &iothr->queues[iothr->get_queue_idx];
395 iothr->get_queue_idx++;
396 }
397 pthread_mutex_unlock(&iothr->get_queue_lock);
398
399 return q;
400 }
401
402 struct fstrm_iothr_queue *
fstrm_iothr_get_input_queue_idx(struct fstrm_iothr * iothr,size_t idx)403 fstrm_iothr_get_input_queue_idx(struct fstrm_iothr *iothr, size_t idx)
404 {
405 struct fstrm_iothr_queue *q = NULL;
406
407 if (idx < iothr->opt.num_input_queues)
408 q = &iothr->queues[idx];
409
410 return q;
411 }
412
413 void
fstrm_free_wrapper(void * data,void * free_data)414 fstrm_free_wrapper(void *data,
415 void *free_data __attribute__((__unused__)))
416 {
417 free(data);
418 }
419
420 fstrm_res
fstrm_iothr_submit(struct fstrm_iothr * iothr,struct fstrm_iothr_queue * ioq,void * data,size_t len,void (* free_func)(void *,void *),void * free_data)421 fstrm_iothr_submit(struct fstrm_iothr *iothr, struct fstrm_iothr_queue *ioq,
422 void *data, size_t len,
423 void (*free_func)(void *, void *), void *free_data)
424 {
425 unsigned space = 0;
426 struct fstrm__iothr_queue_entry entry;
427
428 if (unlikely(iothr->shutting_down))
429 return fstrm_res_failure;
430
431 if (unlikely(len < 1 || len >= UINT32_MAX || data == NULL))
432 return fstrm_res_invalid;
433
434 entry.data = data;
435 entry.len_data = (uint32_t) len;
436 entry.free_func = free_func;
437 entry.free_data = free_data;
438
439 if (likely(len > 0) && iothr->queue_ops->insert(ioq->q, &entry, &space)) {
440 if (space == iothr->opt.queue_notify_threshold)
441 pthread_cond_signal(&iothr->cv);
442 return fstrm_res_success;
443 } else {
444 return fstrm_res_again;
445 }
446 }
447
448 static void
fstrm__iothr_close(struct fstrm_iothr * iothr)449 fstrm__iothr_close(struct fstrm_iothr *iothr)
450 {
451 if (iothr->opened) {
452 iothr->opened = false;
453 fstrm_writer_close(iothr->writer);
454 }
455 }
456
457 static void
fstrm__iothr_flush_output(struct fstrm_iothr * iothr)458 fstrm__iothr_flush_output(struct fstrm_iothr *iothr)
459 {
460 fstrm_res res;
461
462 /* Do the actual write. */
463 if (likely(iothr->opened && iothr->outq_idx > 0)) {
464 res = fstrm_writer_writev(iothr->writer, iothr->outq_iov,
465 iothr->outq_idx);
466 if (res != fstrm_res_success)
467 fstrm__iothr_close(iothr);
468 }
469
470 /* Perform the deferred deallocations. */
471 for (unsigned i = 0; i < iothr->outq_idx; i++)
472 fstrm__iothr_queue_entry_free_bytes(&iothr->outq_entries[i]);
473
474 /* Zero counters and indices. */
475 iothr->outq_idx = 0;
476 iothr->outq_nbytes = 0;
477 }
478
479 static void
fstrm__iothr_maybe_flush_output(struct fstrm_iothr * iothr,size_t nbytes)480 fstrm__iothr_maybe_flush_output(struct fstrm_iothr *iothr, size_t nbytes)
481 {
482 assert(iothr->outq_idx <= iothr->opt.output_queue_size);
483 if (iothr->outq_idx > 0) {
484 if (iothr->outq_idx >= iothr->opt.output_queue_size ||
485 iothr->outq_nbytes + nbytes >= iothr->opt.buffer_hint)
486 {
487 /*
488 * If the output queue is full, or there are more than
489 * 'buffer_hint' bytes of data ready to be sent, flush
490 * the output.
491 */
492 fstrm__iothr_flush_output(iothr);
493 }
494 }
495 }
496
497 static void
fstrm__iothr_process_queue_entry(struct fstrm_iothr * iothr,struct fstrm__iothr_queue_entry * entry)498 fstrm__iothr_process_queue_entry(struct fstrm_iothr *iothr,
499 struct fstrm__iothr_queue_entry *entry)
500 {
501 if (likely(iothr->opened)) {
502 size_t nbytes = sizeof(uint32_t) + entry->len_data;
503
504 fstrm__iothr_maybe_flush_output(iothr, nbytes);
505
506 /* Copy the entry to the array of outstanding queue entries. */
507 iothr->outq_entries[iothr->outq_idx] = *entry;
508
509 /* Add an iovec for the entry. */
510 iothr->outq_iov[iothr->outq_idx].iov_base = (void *)entry->data;
511 iothr->outq_iov[iothr->outq_idx].iov_len = (size_t)entry->len_data;
512
513 /* Increment the number of output queue entries. */
514 iothr->outq_idx++;
515
516 /* There are now nbytes more data waiting to be sent. */
517 iothr->outq_nbytes += nbytes;
518 } else {
519 /* Writer is closed, just discard the payload. */
520 fstrm__iothr_queue_entry_free_bytes(entry);
521 }
522 }
523
524 static unsigned
fstrm__iothr_process_queues(struct fstrm_iothr * iothr)525 fstrm__iothr_process_queues(struct fstrm_iothr *iothr)
526 {
527 struct fstrm__iothr_queue_entry entry;
528 unsigned total = 0;
529
530 /*
531 * Remove input queue entries from each thread's circular queue, and
532 * add them to our output queue.
533 */
534 for (unsigned i = 0; i < iothr->opt.num_input_queues; i++) {
535 if (iothr->queue_ops->remove(iothr->queues[i].q, &entry, NULL)) {
536 fstrm__iothr_process_queue_entry(iothr, &entry);
537 total++;
538 }
539 }
540
541 return total;
542 }
543
544 static fstrm_res
fstrm__iothr_open(struct fstrm_iothr * iothr)545 fstrm__iothr_open(struct fstrm_iothr *iothr)
546 {
547 fstrm_res res;
548
549 res = fstrm_writer_open(iothr->writer);
550 if (res == fstrm_res_success)
551 iothr->opened = true;
552 else
553 iothr->opened = false;
554 return res;
555 }
556
557 static void
fstrm__iothr_maybe_open(struct fstrm_iothr * iothr)558 fstrm__iothr_maybe_open(struct fstrm_iothr *iothr)
559 {
560 /* If we're already connected, there's nothing to do. */
561 if (likely(iothr->opened))
562 return;
563
564 time_t since;
565 struct timespec ts;
566
567 /* Check if the reopen interval has expired yet. */
568 #if HAVE_CLOCK_GETTIME
569 int rv = clock_gettime(iothr->clkid_gettime, &ts);
570 assert(rv == 0);
571 #else
572 my_gettime(-1, &ts);
573 #endif
574 since = ts.tv_sec - iothr->last_open_attempt;
575 if (since < (time_t) iothr->opt.reopen_interval)
576 return;
577
578 /* Attempt to open the transport. */
579 iothr->last_open_attempt = ts.tv_sec;
580 if (fstrm__iothr_open(iothr) != fstrm_res_success)
581 return;
582 }
583
584 static void
fstrm__iothr_thr_setup(void)585 fstrm__iothr_thr_setup(void)
586 {
587 sigset_t set;
588 int s;
589
590 sigemptyset(&set);
591 sigaddset(&set, SIGPIPE);
592 s = pthread_sigmask(SIG_BLOCK, &set, NULL);
593 assert(s == 0);
594 }
595
596 static void *
fstrm__iothr_thr(void * arg)597 fstrm__iothr_thr(void *arg)
598 {
599 struct fstrm_iothr *iothr = (struct fstrm_iothr *)arg;
600
601 fstrm__iothr_thr_setup();
602 fstrm__iothr_maybe_open(iothr);
603
604 for (;;) {
605 int res;
606 unsigned count;
607
608 if (unlikely(iothr->shutting_down)) {
609 while (fstrm__iothr_process_queues(iothr));
610 fstrm__iothr_flush_output(iothr);
611 fstrm__iothr_close(iothr);
612 break;
613 }
614
615 fstrm__iothr_maybe_open(iothr);
616
617 count = fstrm__iothr_process_queues(iothr);
618 if (count != 0)
619 continue;
620
621 struct timespec ts;
622 #if HAVE_CLOCK_GETTIME
623 #if HAVE_PTHREAD_CONDATTR_SETCLOCK
624 int rv = clock_gettime(iothr->clkid_pthread, &ts);
625 #else
626 int rv = clock_gettime(CLOCK_REALTIME, &ts);
627 #endif
628 assert(rv == 0);
629 #else
630 my_gettime(-1, &ts);
631 #endif
632 ts.tv_sec += iothr->opt.flush_timeout;
633
634 pthread_mutex_lock(&iothr->cv_lock);
635 res = pthread_cond_timedwait(&iothr->cv, &iothr->cv_lock, &ts);
636 pthread_mutex_unlock(&iothr->cv_lock);
637
638 if (res == ETIMEDOUT)
639 fstrm__iothr_flush_output(iothr);
640 }
641
642 return NULL;
643 }
644