1 /*
2  * Copyright (c) 2014 by Farsight Security, Inc.
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files (the
6  * "Software"), to deal in the Software without restriction, including
7  * without limitation the rights to use, copy, modify, merge, publish,
8  * distribute, sublicense, and/or sell copies of the Software, and to
9  * permit persons to whom the Software is furnished to do so, subject to
10  * the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included
13  * in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22  *
23  */
24 
25 #ifndef FSTRM_IOTHR_H
26 #define FSTRM_IOTHR_H
27 
28 /**
29  * \defgroup fstrm_iothr fstrm_iothr
30  *
31  * The `fstrm_iothr` interface creates a background I/O thread which writes
32  * Frame Streams encapsulated data frames into an output stream specified by an
33  * \ref fstrm_writer object. It exposes non-blocking input queues that can be
34  * used by worker threads to asynchronously write data frames to the output
35  * stream. A deferred deallocation callback is invoked after the I/O thread has
36  * disposed of a queued data frame.
37  *
38  * In order to create an `fstrm_iothr` object, the caller must first configure
39  * and instantiate an `fstrm_writer` object and pass this instance to the
40  * fstrm_iothr_init() function. The `fstrm_iothr` object then takes ownership of
41  * the `fstrm_writer` object. It is responsible for serializing writes and will
42  * take care of destroying the captive `fstrm_writer` object at the same time
43  * the `fstrm_iothr` object is destroyed. The caller should not perform any
44  * operations on the captive `fstrm_writer` object after it has been passed to
45  * `fstrm_iothr_init()`.
46  *
47  * Parameters used to configure the I/O thread are passed through an
48  * `fstrm_iothr_options` object. These options have to be specified in advance
49  * and are mostly performance knobs which have reasonable defaults.
50  *
51  * Once the `fstrm_iothr` object has been created, handles to the input queues
52  * used to submit data frames can be obtained by calling
53  * `fstrm_iothr_get_input_queue()`. This function can be called up to
54  * **num_input_queues** times, and can be safely called concurrently. For
55  * instance, in an application with a fixed number of worker threads, an input
56  * queue can be dedicated to each worker thread by setting the
57  * **num_input_queues** option to the number of worker threads, and then calling
58  * `fstrm_iothr_get_input_queue()` from each worker thread's startup function to
59  * obtain a per-thread input queue.
60  *
61  * @{
62  */
63 
64 /**
65  * Initialize an `fstrm_iothr_options` object. This is needed to pass
66  * configuration parameters to fstrm_iothr_init().
67  *
68  * \return
69  *	`fstrm_iothr_options` object.
70  */
71 struct fstrm_iothr_options *
72 fstrm_iothr_options_init(void);
73 
74 /**
75  * Destroy an `fstrm_iothr_options` object.
76  *
77  * \param opt
78  *	Pointer to `fstrm_iothr_options` object.
79  */
80 void
81 fstrm_iothr_options_destroy(struct fstrm_iothr_options **opt);
82 
83 /**
84  * Set the `buffer_hint` parameter. This is the threshold number of bytes to
85  * accumulate in the output buffer before forcing a buffer flush.
86  *
87  * \param opt
88  *	`fstrm_iothr_options` object.
89  * \param buffer_hint
90  *	New `buffer_hint` value.
91  *
92  * \retval #fstrm_res_success
93  * \retval #fstrm_res_failure
94  */
95 fstrm_res
96 fstrm_iothr_options_set_buffer_hint(
97 	struct fstrm_iothr_options *opt,
98 	unsigned buffer_hint);
99 
100 /** Minimum `buffer_hint` value. */
101 #define FSTRM_IOTHR_BUFFER_HINT_MIN			1024
102 
103 /** Default `buffer_hint` value. */
104 #define FSTRM_IOTHR_BUFFER_HINT_DEFAULT			8192
105 
106 /** Maximum `buffer_hint` value. */
107 #define FSTRM_IOTHR_BUFFER_HINT_MAX			65536
108 
109 /**
110  * Set the `flush_timeout` parameter. This is the number of seconds to allow
111  * unflushed data to remain in the output buffer.
112  *
113  * \param opt
114  *	`fstrm_iothr_options` object.
115  * \param flush_timeout
116  *	New `flush_timeout` value.
117  *
118  * \retval #fstrm_res_success
119  * \retval #fstrm_res_failure
120  */
121 fstrm_res
122 fstrm_iothr_options_set_flush_timeout(
123 	struct fstrm_iothr_options *opt,
124 	unsigned flush_timeout);
125 
126 /** Minimum `flush_timeout` value. */
127 #define FSTRM_IOTHR_FLUSH_TIMEOUT_MIN			1
128 
129 /** Default `flush_timeout` value. */
130 #define FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT		1
131 
132 /** Maximum `flush_timeout` value. */
133 #define FSTRM_IOTHR_FLUSH_TIMEOUT_MAX			600
134 
135 /**
136  * Set the `input_queue_size` parameter. This is the number of queue entries to
137  * allocate per each input queue. This option controls the number of outstanding
138  * data frames per input queue that can be outstanding for deferred processing
139  * by the `fstrm_iothr` object and thus affects performance and memory usage.
140  *
141  * This parameter must be a power-of-2.
142  *
143  * \param opt
144  *	`fstrm_iothr_options` object.
145  * \param input_queue_size
146  *	New `input_queue_size` value.
147  *
148  * \retval #fstrm_res_success
149  * \retval #fstrm_res_failure
150  */
151 fstrm_res
152 fstrm_iothr_options_set_input_queue_size(
153 	struct fstrm_iothr_options *opt,
154 	unsigned input_queue_size);
155 
156 /** Minimum `input_queue_size` value. */
157 #define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN		2
158 
159 /** Default `input_queue_size` value. */
160 #define FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT		512
161 
162 /** Maximum `input_queue_size` value. */
163 #define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX		16384
164 
165 /**
166  * Set the `num_input_queues` parameter. This is the number of input queues to
167  * create and must match the number of times that fstrm_iothr_get_input_queue()
168  * is called on the corresponding `fstrm_iothr` object.
169  *
170  * \param opt
171  *	`fstrm_iothr_options` object.
172  * \param num_input_queues
173  *	New `num_input_queues` value.
174  *
175  * \retval #fstrm_res_success
176  * \retval #fstrm_res_failure
177  */
178 fstrm_res
179 fstrm_iothr_options_set_num_input_queues(
180 	struct fstrm_iothr_options *opt,
181 	unsigned num_input_queues);
182 
183 /** Minimum `num_input_queues` value. */
184 #define FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN		1
185 
186 /** Default `num_input_queues` value. */
187 #define FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT		1
188 
189 /**
190  * Set the `output_queue_size` parameter. This is the number of queue entries to
191  * allocate for the output queue. This option controls the maximum number of
192  * data frames that can be accumulated in the output queue before a buffer flush
193  * must occur and thus affects performance and memory usage.
194  *
195  * \param opt
196  *	`fstrm_iothr_options` object.
197  * \param output_queue_size
198  *	New `output_queue_size` value.
199  *
200  * \retval #fstrm_res_success
201  * \retval #fstrm_res_failure
202  */
203 fstrm_res
204 fstrm_iothr_options_set_output_queue_size(
205 	struct fstrm_iothr_options *opt,
206 	unsigned output_queue_size);
207 
208 /** Minimum `output_queue_size` value. */
209 #define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN		2
210 
211 /** Default `output_queue_size` value. */
212 #define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT		64
213 
214 /** Maximum `output_queue_size` value. */
215 #define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX		IOV_MAX
216 
217 /**
218  * Queue models.
219  * \see fstrm_iothr_options_set_queue_model()
220  */
221 typedef enum {
222 	/** Single Producer, Single Consumer. */
223 	FSTRM_IOTHR_QUEUE_MODEL_SPSC,
224 
225 	/** Multiple Producer, Single Consumer. */
226 	FSTRM_IOTHR_QUEUE_MODEL_MPSC,
227 } fstrm_iothr_queue_model;
228 
229 
230 /**
231  * Set the `queue_model` parameter. This controls what queueing semantics to use
232  * for `fstrm_iothr_queue` objects. Single Producer queues
233  * (#FSTRM_IOTHR_QUEUE_MODEL_SPSC) may only have a single thread at a time
234  * calling fstrm_iothr_submit() on a given `fstrm_iothr_queue` object, while
235  * Multiple Producer queues (#FSTRM_IOTHR_QUEUE_MODEL_MPSC) may have multiple
236  * threads concurrently calling fstrm_iothr_submit() on a given
237  * `fstrm_iothr_queue` object.
238  *
239  * \param opt
240  *	`fstrm_iothr_options` object.
241  * \param queue_model
242  *	New `queue_model` value.
243  *
244  * \retval #fstrm_res_success
245  * \retval #fstrm_res_failure
246  */
247 fstrm_res
248 fstrm_iothr_options_set_queue_model(
249 	struct fstrm_iothr_options *opt,
250 	fstrm_iothr_queue_model queue_model);
251 
252 /** Default `queue_model` value. */
253 #define FSTRM_IOTHR_QUEUE_MODEL_DEFAULT			FSTRM_IOTHR_QUEUE_MODEL_SPSC
254 
255 /**
256  * Set the `queue_notify_threshold` parameter. This controls the number of
257  * outstanding queue entries to allow on an input queue before waking the I/O
258  * thread, which will cause the outstanding queue entries to begin draining.
259  *
260  * \param opt
261  *	`fstrm_iothr_options` object.
262  * \param queue_notify_threshold
263  *	New `queue_notify_threshold` value.
264  *
265  * \retval #fstrm_res_success
266  * \retval #fstrm_res_failure
267  */
268 fstrm_res
269 fstrm_iothr_options_set_queue_notify_threshold(
270 	struct fstrm_iothr_options *opt,
271 	unsigned queue_notify_threshold);
272 
273 /** Minimum `queue_notify_threshold` value. */
274 #define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN		1
275 
276 /** Default `queue_notify_threshold` value. */
277 #define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT	32
278 
279 /**
280  * Set the `reopen_interval` parameter. This controls the number of seconds to
281  * wait between attempts to reopen a closed `fstrm_writer` output stream.
282  *
283  * \param opt
284  *	`fstrm_iothr_options` object.
285  * \param reopen_interval
286  *	New `queue_notify_threshold` value.
287  *
288  * \retval #fstrm_res_success
289  * \retval #fstrm_res_failure
290  */
291 fstrm_res
292 fstrm_iothr_options_set_reopen_interval(
293 	struct fstrm_iothr_options *opt,
294 	unsigned reopen_interval);
295 
296 /** Minimum `reopen_interval` value. */
297 #define FSTRM_IOTHR_REOPEN_INTERVAL_MIN			1
298 
299 /** Default `reopen_interval` value. */
300 #define FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT		5
301 
302 /** Maximum `reopen_interval` value. */
303 #define FSTRM_IOTHR_REOPEN_INTERVAL_MAX			600
304 
305 /**
306  * Initialize an `fstrm_iothr` object. This creates a background I/O thread
307  * which asynchronously writes data frames submitted by other threads which call
308  * fstrm_iothr_submit().
309  *
310  * \param opt
311  *	`fstrm_iothr_options` object. May be NULL, in which case default values
312  *	will be used.
313  *
314  * \param writer
315  *	Pointer to `fstrm_writer` object. Must be non-NULL.
316  *
317  * \return
318  *	`fstrm_iothr` object.
319  * \retval
320  *	NULL on failure.
321  */
322 struct fstrm_iothr *
323 fstrm_iothr_init(
324 	const struct fstrm_iothr_options *opt,
325 	struct fstrm_writer **writer);
326 
327 /**
328  * Destroy an `fstrm_iothr` object. This signals the background I/O thread to
329  * flush or discard any queued data frames and deallocates any resources used
330  * internally. This function blocks until the I/O thread has terminated.
331  *
332  * \param iothr
333  *	Pointer to `fstrm_iothr` object.
334  */
335 void
336 fstrm_iothr_destroy(struct fstrm_iothr **iothr);
337 
338 /**
339  * Obtain an `fstrm_iothr_queue` object for submitting data frames to the
340  * `fstrm_iothr` object. `fstrm_iothr_queue` objects are child objects of their
341  * parent `fstrm_iothr` object and will be destroyed when fstrm_iothr_destroy()
342  * is called on the parent `fstrm_iothr` object.
343  *
344  * This function is thread-safe and may be called simultaneously from any
345  * thread. For example, in a program which employs a fixed number of worker
346  * threads to handle requests, fstrm_iothr_get_input_queue() may be called from
347  * a thread startup routine without synchronization.
348  *
349  * `fstrm_iothr` objects allocate a fixed total number of `fstrm_iothr_queue`
350  * objects during the call to fstrm_iothr_init(). To adjust this parameter, use
351  * fstrm_iothr_options_set_num_input_queues().
352  *
353  * This function will fail if it is called more than **num_input_queues** times.
354  * By default, only one input queue is initialized per `fstrm_iothr` object.
355  *
356  * For optimum performance in a threaded program, each worker thread submitting
357  * data frames should have a dedicated `fstrm_iothr_queue` object. This allows
358  * each worker thread to have its own queue which is processed independently by
359  * the I/O thread. If the queue model for the `fstrm_iothr` object is set to
360  * #FSTRM_IOTHR_QUEUE_MODEL_SPSC, this results in contention-free access to the
361  * input queue.
362  *
363  * \param iothr
364  *	`fstrm_iothr` object.
365  *
366  * \return
367  *	`fstrm_iothr_queue` object.
368  * \retval
369  *	NULL on failure.
370  */
371 struct fstrm_iothr_queue *
372 fstrm_iothr_get_input_queue(struct fstrm_iothr *iothr);
373 
374 /**
375  * Obtain an `fstrm_iothr_queue` object for submitting data frames to the
376  * `fstrm_iothr` object. This function is like fstrm_iothr_get_input_queue()
377  * except it indexes into the `fstrm_iothr_queue`'s array of input queues.
378  *
379  * \param iothr
380  *	`fstrm_iothr` object.
381  * \param idx
382  *	Index of the `fstrm_iothr_queue` object to retrieve. This value is
383  *	limited by the **num_input_queues** option.
384  *
385  * \return
386  *	`fstrm_iothr_queue` object.
387  * \retval
388  *	NULL on failure.
389  */
390 struct fstrm_iothr_queue *
391 fstrm_iothr_get_input_queue_idx(struct fstrm_iothr *iothr, size_t idx);
392 
393 /**
394  * Submit a data frame to the background I/O thread. If successfully queued and
395  * the I/O thread has an active output stream opened, the data frame will be
396  * asynchronously written to the output stream.
397  *
398  * When this function returns #fstrm_res_success, responsibility for
399  * deallocating the data frame specified by the `data` parameter passes to the
400  * `fstrm` library. The caller **MUST** ensure that the `data` object remains
401  * valid after fstrm_iothr_submit() returns. The callback function specified by
402  * the `free_func` parameter will be invoked once the data frame is no longer
403  * needed by the `fstrm` library. For example, if the data frame is dynamically
404  * allocated, the data frame may be deallocated in the callback function.
405  *
406  * Note that if this function returns #fstrm_res_failure, the responsibility for
407  * deallocating the data frame remains with the caller.
408  *
409  * As a convenience, if `data` is allocated with the system's `malloc()`,
410  * `fstrm_free_wrapper` may be provided as the `free_func` parameter with the
411  * `free_data` parameter set to `NULL`. This will cause the system's `free()` to
412  * be invoked to deallocate `data`.
413  *
414  * `free_func` may be NULL, in which case no callback function will be invoked
415  * to dispose of `buf`. This behavior may be useful if `data` is a global,
416  * statically allocated object.
417  *
418  * \param iothr
419  *      `fstrm_iothr` object.
420  * \param ioq
421  *      `fstrm_iothr_queue` object.
422  * \param data
423  *      Data frame bytes.
424  * \param len
425  *      Number of bytes in `data`.
426  * \param free_func
427  *      Callback function to deallocate the data frame. The `data` and
428  *      `free_data` parameters passed to this callback will be the same values
429  *      originally supplied in the call to fstrm_iothr_submit().
430  * \param free_data
431  *      Parameter to pass to `free_func`.
432  *
433  * \retval #fstrm_res_success
434  *      The data frame was successfully queued.
435  * \retval #fstrm_res_again
436  *      The queue is full.
437  * \retval #fstrm_res_failure
438  *      Permanent failure.
439  */
440 fstrm_res
441 fstrm_iothr_submit(
442 	struct fstrm_iothr *iothr, struct fstrm_iothr_queue *ioq,
443 	void *data, size_t len,
444 	void (*free_func)(void *buf, void *free_data), void *free_data);
445 
446 /**
447  * Wrapper function for the system's `free()`, suitable for use as the
448  * `free_func` callback for fstrm_iothr_submit().
449  *
450  * \param data
451  *	Object to call `free()` on.
452  * \param free_data
453  *	Unused.
454  */
455 void
456 fstrm_free_wrapper(void *data, void *free_data);
457 
458 /**@}*/
459 
460 #endif /* FSTRM_IOTHR_H */
461