1 /*
2  * Copyright (c) 2013, 2014, 2016-2017 by Farsight Security, Inc.
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files (the
6  * "Software"), to deal in the Software without restriction, including
7  * without limitation the rights to use, copy, modify, merge, publish,
8  * distribute, sublicense, and/or sell copies of the Software, and to
9  * permit persons to whom the Software is furnished to do so, subject to
10  * the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included
13  * in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22  *
23  */
24 
25 #include "fstrm-private.h"
26 
27 static void *fstrm__iothr_thr(void *);
28 
29 struct fstrm_iothr_options {
30 	unsigned			buffer_hint;
31 	unsigned			flush_timeout;
32 	unsigned			input_queue_size;
33 	unsigned			num_input_queues;
34 	unsigned			output_queue_size;
35 	unsigned			queue_notify_threshold;
36 	unsigned			reopen_interval;
37 	fstrm_iothr_queue_model		queue_model;
38 };
39 
40 static const struct fstrm_iothr_options default_fstrm_iothr_options = {
41 	.buffer_hint			= FSTRM_IOTHR_BUFFER_HINT_DEFAULT,
42 	.flush_timeout			= FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT,
43 	.input_queue_size		= FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT,
44 	.num_input_queues		= FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT,
45 	.output_queue_size		= FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT,
46 	.queue_model			= FSTRM_IOTHR_QUEUE_MODEL_DEFAULT,
47 	.queue_notify_threshold		= FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT,
48 	.reopen_interval		= FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT,
49 };
50 
51 struct fstrm_iothr_queue {
52 	struct my_queue			*q;
53 };
54 
55 struct fstrm__iothr_queue_entry {
56 	/* The deallocation callback. */
57 	void				(*free_func)(void *, void *);
58 	void				*free_data;
59 
60 	/* The actual payload bytes, allocated by the caller. */
61 	void				*data;
62 
63 	/* Number of bytes in 'data'. */
64 	uint32_t			len_data;
65 };
66 
67 struct fstrm_iothr {
68 	/* The I/O thread. */
69 	pthread_t			thr;
70 
71 	/* Copy of options. supplied by caller. */
72 	struct fstrm_iothr_options	opt;
73 
74 	/* Queue implementation. */
75 	const struct my_queue_ops	*queue_ops;
76 
77 	/* Writer. */
78 	struct fstrm_writer		*writer;
79 
80 	/* Whether the writer is opened or not. */
81 	bool				opened;
82 
83 	/* Last time the writer's 'open' method was called. */
84 	time_t				last_open_attempt;
85 
86 	/* Allocated array of input queues, size opt.num_input_queues. */
87 	struct fstrm_iothr_queue	*queues;
88 
89 	/* Whether the I/O thread is shutting down. */
90 	volatile bool			shutting_down;
91 
92 #if HAVE_CLOCK_GETTIME
93 	/* Optimal clockid_t's. */
94 	clockid_t			clkid_gettime;
95 	clockid_t			clkid_pthread;
96 #endif
97 
98 	/*
99 	 * Conditional variable and lock, used by producer thread
100 	 * (fstrm_iothr_submit) to signal sleeping I/O thread that the low
101 	 * watermark (opt.queue_notify_threshold) has been reached.
102 	 */
103 	pthread_cond_t			cv;
104 	pthread_mutex_t			cv_lock;
105 
106 	/* Used to return unique queues from fstrm_iothr_get_queue(). */
107 	pthread_mutex_t			get_queue_lock;
108 	unsigned			get_queue_idx;
109 
110 	/* Output queue. */
111 	unsigned			outq_idx;
112 	struct iovec			*outq_iov;
113 	struct fstrm__iothr_queue_entry	*outq_entries;
114 	unsigned			outq_nbytes;
115 };
116 
117 struct fstrm_iothr_options *
fstrm_iothr_options_init(void)118 fstrm_iothr_options_init(void)
119 {
120 	struct fstrm_iothr_options *opt;
121 	opt = my_malloc(sizeof(*opt));
122 	memmove(opt, &default_fstrm_iothr_options, sizeof(*opt));
123 	return opt;
124 }
125 
126 void
fstrm_iothr_options_destroy(struct fstrm_iothr_options ** opt)127 fstrm_iothr_options_destroy(struct fstrm_iothr_options **opt)
128 {
129 	if (*opt != NULL)
130 		my_free(*opt);
131 }
132 
133 fstrm_res
fstrm_iothr_options_set_buffer_hint(struct fstrm_iothr_options * opt,unsigned buffer_hint)134 fstrm_iothr_options_set_buffer_hint(struct fstrm_iothr_options *opt,
135 				    unsigned buffer_hint)
136 {
137 	if (buffer_hint < FSTRM_IOTHR_BUFFER_HINT_MIN ||
138 	    buffer_hint > FSTRM_IOTHR_BUFFER_HINT_MAX)
139 	{
140 		return fstrm_res_failure;
141 	}
142 	opt->buffer_hint = buffer_hint;
143 	return fstrm_res_success;
144 }
145 
146 fstrm_res
fstrm_iothr_options_set_flush_timeout(struct fstrm_iothr_options * opt,unsigned flush_timeout)147 fstrm_iothr_options_set_flush_timeout(struct fstrm_iothr_options *opt,
148 				      unsigned flush_timeout)
149 {
150 	if (flush_timeout < FSTRM_IOTHR_FLUSH_TIMEOUT_MIN ||
151 	    flush_timeout > FSTRM_IOTHR_FLUSH_TIMEOUT_MAX)
152 	{
153 		return fstrm_res_failure;
154 	}
155 	opt->flush_timeout = flush_timeout;
156 	return fstrm_res_success;
157 }
158 
159 fstrm_res
fstrm_iothr_options_set_input_queue_size(struct fstrm_iothr_options * opt,unsigned input_queue_size)160 fstrm_iothr_options_set_input_queue_size(struct fstrm_iothr_options *opt,
161 					 unsigned input_queue_size)
162 {
163 	if (input_queue_size < FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN ||
164 	    input_queue_size > FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX ||
165 	    input_queue_size & 1)
166 	{
167 		return fstrm_res_failure;
168 	}
169 	opt->input_queue_size = input_queue_size;
170 	return fstrm_res_success;
171 }
172 
173 fstrm_res
fstrm_iothr_options_set_num_input_queues(struct fstrm_iothr_options * opt,unsigned num_input_queues)174 fstrm_iothr_options_set_num_input_queues(struct fstrm_iothr_options *opt,
175 					 unsigned num_input_queues)
176 {
177 	if (num_input_queues < FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN)
178 		return fstrm_res_failure;
179 	opt->num_input_queues = num_input_queues;
180 	return fstrm_res_success;
181 }
182 
183 fstrm_res
fstrm_iothr_options_set_output_queue_size(struct fstrm_iothr_options * opt,unsigned output_queue_size)184 fstrm_iothr_options_set_output_queue_size(struct fstrm_iothr_options *opt,
185 					  unsigned output_queue_size)
186 {
187 	if (output_queue_size < FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN ||
188 	    output_queue_size > FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX)
189 	{
190 		return fstrm_res_failure;
191 	}
192 	opt->output_queue_size = output_queue_size;
193 	return fstrm_res_success;
194 }
195 
196 fstrm_res
fstrm_iothr_options_set_queue_model(struct fstrm_iothr_options * opt,fstrm_iothr_queue_model queue_model)197 fstrm_iothr_options_set_queue_model(struct fstrm_iothr_options *opt,
198 				    fstrm_iothr_queue_model queue_model)
199 {
200 	if (queue_model != FSTRM_IOTHR_QUEUE_MODEL_SPSC &&
201 	    queue_model != FSTRM_IOTHR_QUEUE_MODEL_MPSC)
202 	{
203 		return fstrm_res_failure;
204 	}
205 	opt->queue_model = queue_model;
206 	return fstrm_res_success;
207 }
208 
209 fstrm_res
fstrm_iothr_options_set_queue_notify_threshold(struct fstrm_iothr_options * opt,unsigned queue_notify_threshold)210 fstrm_iothr_options_set_queue_notify_threshold(struct fstrm_iothr_options *opt,
211 					       unsigned queue_notify_threshold)
212 {
213 	if (queue_notify_threshold < FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN)
214 		return fstrm_res_failure;
215 	opt->queue_notify_threshold = queue_notify_threshold;
216 	return fstrm_res_success;
217 }
218 
219 fstrm_res
fstrm_iothr_options_set_reopen_interval(struct fstrm_iothr_options * opt,unsigned reopen_interval)220 fstrm_iothr_options_set_reopen_interval(struct fstrm_iothr_options *opt,
221 					unsigned reopen_interval)
222 {
223 	if (reopen_interval < FSTRM_IOTHR_REOPEN_INTERVAL_MIN ||
224 	    reopen_interval > FSTRM_IOTHR_REOPEN_INTERVAL_MAX)
225 	{
226 		return fstrm_res_failure;
227 	}
228 	opt->reopen_interval = reopen_interval;
229 	return fstrm_res_success;
230 }
231 
232 struct fstrm_iothr *
fstrm_iothr_init(const struct fstrm_iothr_options * opt,struct fstrm_writer ** writer)233 fstrm_iothr_init(const struct fstrm_iothr_options *opt,
234 		 struct fstrm_writer **writer)
235 {
236 	struct fstrm_iothr *iothr = NULL;
237 
238 	int res;
239 	pthread_condattr_t ca;
240 
241 	/* Initialize fstrm_iothr and copy options. */
242 	iothr = my_calloc(1, sizeof(*iothr));
243 	if (opt == NULL)
244 		opt = &default_fstrm_iothr_options;
245 	memmove(&iothr->opt, opt, sizeof(iothr->opt));
246 
247 	/*
248 	 * Some platforms have a ridiculously low IOV_MAX, literally the lowest
249 	 * value even allowed by POSIX, which is lower than our conservative
250 	 * FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT. Accommodate these platforms by
251 	 * silently clamping output_queue_size to IOV_MAX.
252 	 */
253 	if (iothr->opt.output_queue_size > IOV_MAX)
254 		iothr->opt.output_queue_size = IOV_MAX;
255 
256 	/*
257 	 * Set the queue implementation.
258 	 *
259 	 * The memory barrier based queue implementation is the only one of our
260 	 * queue implementations that supports SPSC, so if it is not available,
261 	 * use the mutex based queue implementation instead. The mutex
262 	 * implementation is technically MPSC, but MPSC is strictly stronger
263 	 * than SPSC.
264 	 */
265 	if (iothr->opt.queue_model == FSTRM_IOTHR_QUEUE_MODEL_SPSC) {
266 #ifdef MY_HAVE_MEMORY_BARRIERS
267 		iothr->queue_ops = &my_queue_mb_ops;
268 #else
269 		iothr->queue_ops = &my_queue_mutex_ops;
270 #endif
271 	} else {
272 		iothr->queue_ops = &my_queue_mutex_ops;
273 	}
274 
275 #if HAVE_CLOCK_GETTIME
276 	/* Detect best clocks. */
277 	if (!fstrm__get_best_monotonic_clocks(&iothr->clkid_gettime,
278 					      &iothr->clkid_pthread,
279 					      NULL))
280 	{
281 		goto fail;
282 	}
283 #endif
284 
285 	/* Initialize the input queues. */
286 	iothr->queues = my_calloc(iothr->opt.num_input_queues,
287 				  sizeof(struct fstrm_iothr_queue));
288 	for (size_t i = 0; i < iothr->opt.num_input_queues; i++) {
289 		iothr->queues[i].q = iothr->queue_ops->init(iothr->opt.input_queue_size,
290 			sizeof(struct fstrm__iothr_queue_entry));
291 		if (iothr->queues[i].q == NULL)
292 			goto fail;
293 	}
294 
295 	/* Initialize the output queue. */
296 	iothr->outq_iov = my_calloc(iothr->opt.output_queue_size,
297 				    sizeof(struct iovec));
298 	iothr->outq_entries = my_calloc(iothr->opt.output_queue_size,
299 					sizeof(struct fstrm__iothr_queue_entry));
300 
301 	/* Initialize the condition variable. */
302 	res = pthread_condattr_init(&ca);
303 	assert(res == 0);
304 
305 #if HAVE_CLOCK_GETTIME && HAVE_PTHREAD_CONDATTR_SETCLOCK
306 	res = pthread_condattr_setclock(&ca, iothr->clkid_pthread);
307 	assert(res == 0);
308 #endif
309 
310 	res = pthread_cond_init(&iothr->cv, &ca);
311 	assert(res == 0);
312 
313 	res = pthread_condattr_destroy(&ca);
314 	assert(res == 0);
315 
316 	/* Initialize the mutex protecting the condition variable. */
317 	res = pthread_mutex_init(&iothr->cv_lock, NULL);
318 	assert(res == 0);
319 
320 	/* Initialize the mutex protecting fstrm_iothr_get_queue(). */
321 	res = pthread_mutex_init(&iothr->get_queue_lock, NULL);
322 	assert(res == 0);
323 
324 	/* Take the caller's writer. */
325 	iothr->writer = *writer;
326 	*writer = NULL;
327 
328 	/* Start the I/O thread. */
329 	res = pthread_create(&iothr->thr, NULL, fstrm__iothr_thr, iothr);
330 	assert(res == 0);
331 
332 	return iothr;
333 fail:
334 	fstrm_iothr_destroy(&iothr);
335 	return NULL;
336 }
337 
338 static inline void
fstrm__iothr_queue_entry_free_bytes(struct fstrm__iothr_queue_entry * entry)339 fstrm__iothr_queue_entry_free_bytes(struct fstrm__iothr_queue_entry *entry)
340 {
341 	if (entry->free_func != NULL)
342 		entry->free_func(entry->data, entry->free_data);
343 }
344 
345 static void
fstrm__iothr_free_queues(struct fstrm_iothr * iothr)346 fstrm__iothr_free_queues(struct fstrm_iothr *iothr)
347 {
348 	size_t i;
349 	for (i = 0; i < iothr->opt.num_input_queues; i++) {
350 		struct my_queue *queue;
351 		struct fstrm__iothr_queue_entry entry;
352 
353 		queue = iothr->queues[i].q;
354 		while (iothr->queue_ops->remove(queue, &entry, NULL))
355 			fstrm__iothr_queue_entry_free_bytes(&entry);
356 		iothr->queue_ops->destroy(&queue);
357 	}
358 	my_free(iothr->queues);
359 }
360 
361 void
fstrm_iothr_destroy(struct fstrm_iothr ** iothr)362 fstrm_iothr_destroy(struct fstrm_iothr **iothr)
363 {
364 	if (*iothr != NULL) {
365 		/*
366 		 * Signal the I/O thread that a shutdown is in progress.
367 		 * This waits for the I/O thread to finish.
368 		 */
369 		(*iothr)->shutting_down = true;
370 		pthread_cond_signal(&(*iothr)->cv);
371 		pthread_join((*iothr)->thr, NULL);
372 		pthread_cond_destroy(&(*iothr)->cv);
373 		pthread_mutex_destroy(&(*iothr)->cv_lock);
374 		pthread_mutex_destroy(&(*iothr)->get_queue_lock);
375 
376 		/* Destroy the writer by calling its 'destroy' method. */
377 		(void)fstrm_writer_destroy(&(*iothr)->writer);
378 
379 		/* Cleanup our allocations. */
380 		fstrm__iothr_free_queues(*iothr);
381 		my_free((*iothr)->outq_iov);
382 		my_free((*iothr)->outq_entries);
383 		my_free(*iothr);
384 	}
385 }
386 
387 struct fstrm_iothr_queue *
fstrm_iothr_get_input_queue(struct fstrm_iothr * iothr)388 fstrm_iothr_get_input_queue(struct fstrm_iothr *iothr)
389 {
390 	struct fstrm_iothr_queue *q = NULL;
391 
392 	pthread_mutex_lock(&iothr->get_queue_lock);
393 	if (iothr->get_queue_idx < iothr->opt.num_input_queues) {
394 		q = &iothr->queues[iothr->get_queue_idx];
395 		iothr->get_queue_idx++;
396 	}
397 	pthread_mutex_unlock(&iothr->get_queue_lock);
398 
399 	return q;
400 }
401 
402 struct fstrm_iothr_queue *
fstrm_iothr_get_input_queue_idx(struct fstrm_iothr * iothr,size_t idx)403 fstrm_iothr_get_input_queue_idx(struct fstrm_iothr *iothr, size_t idx)
404 {
405 	struct fstrm_iothr_queue *q = NULL;
406 
407 	if (idx < iothr->opt.num_input_queues)
408 		q = &iothr->queues[idx];
409 
410 	return q;
411 }
412 
413 void
fstrm_free_wrapper(void * data,void * free_data)414 fstrm_free_wrapper(void *data,
415 		   void *free_data __attribute__((__unused__)))
416 {
417 	free(data);
418 }
419 
420 fstrm_res
fstrm_iothr_submit(struct fstrm_iothr * iothr,struct fstrm_iothr_queue * ioq,void * data,size_t len,void (* free_func)(void *,void *),void * free_data)421 fstrm_iothr_submit(struct fstrm_iothr *iothr, struct fstrm_iothr_queue *ioq,
422 		   void *data, size_t len,
423 		   void (*free_func)(void *, void *), void *free_data)
424 {
425 	unsigned space = 0;
426 	struct fstrm__iothr_queue_entry entry;
427 
428 	if (unlikely(iothr->shutting_down))
429 		return fstrm_res_failure;
430 
431 	if (unlikely(len < 1 || len >= UINT32_MAX || data == NULL))
432 		return fstrm_res_invalid;
433 
434 	entry.data = data;
435 	entry.len_data = (uint32_t) len;
436 	entry.free_func = free_func;
437 	entry.free_data = free_data;
438 
439 	if (likely(len > 0) && iothr->queue_ops->insert(ioq->q, &entry, &space)) {
440 		if (space == iothr->opt.queue_notify_threshold)
441 			pthread_cond_signal(&iothr->cv);
442 		return fstrm_res_success;
443 	} else {
444 		return fstrm_res_again;
445 	}
446 }
447 
448 static void
fstrm__iothr_close(struct fstrm_iothr * iothr)449 fstrm__iothr_close(struct fstrm_iothr *iothr)
450 {
451 	if (iothr->opened) {
452 		iothr->opened = false;
453 		fstrm_writer_close(iothr->writer);
454 	}
455 }
456 
457 static void
fstrm__iothr_flush_output(struct fstrm_iothr * iothr)458 fstrm__iothr_flush_output(struct fstrm_iothr *iothr)
459 {
460 	fstrm_res res;
461 
462 	/* Do the actual write. */
463 	if (likely(iothr->opened && iothr->outq_idx > 0)) {
464 		res = fstrm_writer_writev(iothr->writer, iothr->outq_iov,
465 					  iothr->outq_idx);
466 		if (res != fstrm_res_success)
467 			fstrm__iothr_close(iothr);
468 	}
469 
470 	/* Perform the deferred deallocations. */
471 	for (unsigned i = 0; i < iothr->outq_idx; i++)
472 		fstrm__iothr_queue_entry_free_bytes(&iothr->outq_entries[i]);
473 
474 	/* Zero counters and indices. */
475 	iothr->outq_idx = 0;
476 	iothr->outq_nbytes = 0;
477 }
478 
479 static void
fstrm__iothr_maybe_flush_output(struct fstrm_iothr * iothr,size_t nbytes)480 fstrm__iothr_maybe_flush_output(struct fstrm_iothr *iothr, size_t nbytes)
481 {
482 	assert(iothr->outq_idx <= iothr->opt.output_queue_size);
483 	if (iothr->outq_idx > 0) {
484 		if (iothr->outq_idx >= iothr->opt.output_queue_size ||
485 		    iothr->outq_nbytes + nbytes >= iothr->opt.buffer_hint)
486 		{
487 			/*
488 			 * If the output queue is full, or there are more than
489 			 * 'buffer_hint' bytes of data ready to be sent, flush
490 			 * the output.
491 			 */
492 			fstrm__iothr_flush_output(iothr);
493 		}
494 	}
495 }
496 
497 static void
fstrm__iothr_process_queue_entry(struct fstrm_iothr * iothr,struct fstrm__iothr_queue_entry * entry)498 fstrm__iothr_process_queue_entry(struct fstrm_iothr *iothr,
499 				 struct fstrm__iothr_queue_entry *entry)
500 {
501 	if (likely(iothr->opened)) {
502 		size_t nbytes = sizeof(uint32_t) + entry->len_data;
503 
504 		fstrm__iothr_maybe_flush_output(iothr, nbytes);
505 
506 		/* Copy the entry to the array of outstanding queue entries. */
507 		iothr->outq_entries[iothr->outq_idx] = *entry;
508 
509 		/* Add an iovec for the entry. */
510 		iothr->outq_iov[iothr->outq_idx].iov_base = (void *)entry->data;
511 		iothr->outq_iov[iothr->outq_idx].iov_len = (size_t)entry->len_data;
512 
513 		/* Increment the number of output queue entries. */
514 		iothr->outq_idx++;
515 
516 		/* There are now nbytes more data waiting to be sent. */
517 		iothr->outq_nbytes += nbytes;
518 	} else {
519 		/* Writer is closed, just discard the payload. */
520 		fstrm__iothr_queue_entry_free_bytes(entry);
521 	}
522 }
523 
524 static unsigned
fstrm__iothr_process_queues(struct fstrm_iothr * iothr)525 fstrm__iothr_process_queues(struct fstrm_iothr *iothr)
526 {
527 	struct fstrm__iothr_queue_entry entry;
528 	unsigned total = 0;
529 
530 	/*
531 	 * Remove input queue entries from each thread's circular queue, and
532 	 * add them to our output queue.
533 	 */
534 	for (unsigned i = 0; i < iothr->opt.num_input_queues; i++) {
535 		if (iothr->queue_ops->remove(iothr->queues[i].q, &entry, NULL)) {
536 			fstrm__iothr_process_queue_entry(iothr, &entry);
537 			total++;
538 		}
539 	}
540 
541 	return total;
542 }
543 
544 static fstrm_res
fstrm__iothr_open(struct fstrm_iothr * iothr)545 fstrm__iothr_open(struct fstrm_iothr *iothr)
546 {
547 	fstrm_res res;
548 
549 	res = fstrm_writer_open(iothr->writer);
550 	if (res == fstrm_res_success)
551 		iothr->opened = true;
552 	else
553 		iothr->opened = false;
554 	return res;
555 }
556 
557 static void
fstrm__iothr_maybe_open(struct fstrm_iothr * iothr)558 fstrm__iothr_maybe_open(struct fstrm_iothr *iothr)
559 {
560 	/* If we're already connected, there's nothing to do. */
561 	if (likely(iothr->opened))
562 		return;
563 
564 	time_t since;
565 	struct timespec ts;
566 
567 	/* Check if the reopen interval has expired yet. */
568 #if HAVE_CLOCK_GETTIME
569 	int rv = clock_gettime(iothr->clkid_gettime, &ts);
570 	assert(rv == 0);
571 #else
572 	my_gettime(-1, &ts);
573 #endif
574 	since = ts.tv_sec - iothr->last_open_attempt;
575 	if (since < (time_t) iothr->opt.reopen_interval)
576 		return;
577 
578 	/* Attempt to open the transport. */
579 	iothr->last_open_attempt = ts.tv_sec;
580 	if (fstrm__iothr_open(iothr) != fstrm_res_success)
581 		return;
582 }
583 
584 static void
fstrm__iothr_thr_setup(void)585 fstrm__iothr_thr_setup(void)
586 {
587 	sigset_t set;
588 	int s;
589 
590 	sigemptyset(&set);
591 	sigaddset(&set, SIGPIPE);
592 	s = pthread_sigmask(SIG_BLOCK, &set, NULL);
593 	assert(s == 0);
594 }
595 
596 static void *
fstrm__iothr_thr(void * arg)597 fstrm__iothr_thr(void *arg)
598 {
599 	struct fstrm_iothr *iothr = (struct fstrm_iothr *)arg;
600 
601 	fstrm__iothr_thr_setup();
602 	fstrm__iothr_maybe_open(iothr);
603 
604 	for (;;) {
605 		int res;
606 		unsigned count;
607 
608 		if (unlikely(iothr->shutting_down)) {
609 			while (fstrm__iothr_process_queues(iothr));
610 			fstrm__iothr_flush_output(iothr);
611 			fstrm__iothr_close(iothr);
612 			break;
613 		}
614 
615 		fstrm__iothr_maybe_open(iothr);
616 
617 		count = fstrm__iothr_process_queues(iothr);
618 		if (count != 0)
619 			continue;
620 
621 		struct timespec ts;
622 #if HAVE_CLOCK_GETTIME
623 #if HAVE_PTHREAD_CONDATTR_SETCLOCK
624 		int rv = clock_gettime(iothr->clkid_pthread, &ts);
625 #else
626 		int rv = clock_gettime(CLOCK_REALTIME, &ts);
627 #endif
628 		assert(rv == 0);
629 #else
630 		my_gettime(-1, &ts);
631 #endif
632 		ts.tv_sec += iothr->opt.flush_timeout;
633 
634 		pthread_mutex_lock(&iothr->cv_lock);
635 		res = pthread_cond_timedwait(&iothr->cv, &iothr->cv_lock, &ts);
636 		pthread_mutex_unlock(&iothr->cv_lock);
637 
638 		if (res == ETIMEDOUT)
639 			fstrm__iothr_flush_output(iothr);
640 	}
641 
642 	return NULL;
643 }
644