1 /* 2 * Copyright (c) 2014 by Farsight Security, Inc. 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining 5 * a copy of this software and associated documentation files (the 6 * "Software"), to deal in the Software without restriction, including 7 * without limitation the rights to use, copy, modify, merge, publish, 8 * distribute, sublicense, and/or sell copies of the Software, and to 9 * permit persons to whom the Software is furnished to do so, subject to 10 * the following conditions: 11 * 12 * The above copyright notice and this permission notice shall be included 13 * in all copies or substantial portions of the Software. 14 * 15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 22 * 23 */ 24 25 #ifndef FSTRM_IOTHR_H 26 #define FSTRM_IOTHR_H 27 28 /** 29 * \defgroup fstrm_iothr fstrm_iothr 30 * 31 * The `fstrm_iothr` interface creates a background I/O thread which writes 32 * Frame Streams encapsulated data frames into an output stream specified by an 33 * \ref fstrm_writer object. It exposes non-blocking input queues that can be 34 * used by worker threads to asynchronously write data frames to the output 35 * stream. A deferred deallocation callback is invoked after the I/O thread has 36 * disposed of a queued data frame. 37 * 38 * In order to create an `fstrm_iothr` object, the caller must first configure 39 * and instantiate an `fstrm_writer` object and pass this instance to the 40 * fstrm_iothr_init() function. The `fstrm_iothr` object then takes ownership of 41 * the `fstrm_writer` object. It is responsible for serializing writes and will 42 * take care of destroying the captive `fstrm_writer` object at the same time 43 * the `fstrm_iothr` object is destroyed. The caller should not perform any 44 * operations on the captive `fstrm_writer` object after it has been passed to 45 * `fstrm_iothr_init()`. 46 * 47 * Parameters used to configure the I/O thread are passed through an 48 * `fstrm_iothr_options` object. These options have to be specified in advance 49 * and are mostly performance knobs which have reasonable defaults. 50 * 51 * Once the `fstrm_iothr` object has been created, handles to the input queues 52 * used to submit data frames can be obtained by calling 53 * `fstrm_iothr_get_input_queue()`. This function can be called up to 54 * **num_input_queues** times, and can be safely called concurrently. For 55 * instance, in an application with a fixed number of worker threads, an input 56 * queue can be dedicated to each worker thread by setting the 57 * **num_input_queues** option to the number of worker threads, and then calling 58 * `fstrm_iothr_get_input_queue()` from each worker thread's startup function to 59 * obtain a per-thread input queue. 60 * 61 * @{ 62 */ 63 64 /** 65 * Initialize an `fstrm_iothr_options` object. This is needed to pass 66 * configuration parameters to fstrm_iothr_init(). 67 * 68 * \return 69 * `fstrm_iothr_options` object. 70 */ 71 struct fstrm_iothr_options * 72 fstrm_iothr_options_init(void); 73 74 /** 75 * Destroy an `fstrm_iothr_options` object. 76 * 77 * \param opt 78 * Pointer to `fstrm_iothr_options` object. 79 */ 80 void 81 fstrm_iothr_options_destroy(struct fstrm_iothr_options **opt); 82 83 /** 84 * Set the `buffer_hint` parameter. This is the threshold number of bytes to 85 * accumulate in the output buffer before forcing a buffer flush. 86 * 87 * \param opt 88 * `fstrm_iothr_options` object. 89 * \param buffer_hint 90 * New `buffer_hint` value. 91 * 92 * \retval #fstrm_res_success 93 * \retval #fstrm_res_failure 94 */ 95 fstrm_res 96 fstrm_iothr_options_set_buffer_hint( 97 struct fstrm_iothr_options *opt, 98 unsigned buffer_hint); 99 100 /** Minimum `buffer_hint` value. */ 101 #define FSTRM_IOTHR_BUFFER_HINT_MIN 1024 102 103 /** Default `buffer_hint` value. */ 104 #define FSTRM_IOTHR_BUFFER_HINT_DEFAULT 8192 105 106 /** Maximum `buffer_hint` value. */ 107 #define FSTRM_IOTHR_BUFFER_HINT_MAX 65536 108 109 /** 110 * Set the `flush_timeout` parameter. This is the number of seconds to allow 111 * unflushed data to remain in the output buffer. 112 * 113 * \param opt 114 * `fstrm_iothr_options` object. 115 * \param flush_timeout 116 * New `flush_timeout` value. 117 * 118 * \retval #fstrm_res_success 119 * \retval #fstrm_res_failure 120 */ 121 fstrm_res 122 fstrm_iothr_options_set_flush_timeout( 123 struct fstrm_iothr_options *opt, 124 unsigned flush_timeout); 125 126 /** Minimum `flush_timeout` value. */ 127 #define FSTRM_IOTHR_FLUSH_TIMEOUT_MIN 1 128 129 /** Default `flush_timeout` value. */ 130 #define FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT 1 131 132 /** Maximum `flush_timeout` value. */ 133 #define FSTRM_IOTHR_FLUSH_TIMEOUT_MAX 600 134 135 /** 136 * Set the `input_queue_size` parameter. This is the number of queue entries to 137 * allocate per each input queue. This option controls the number of outstanding 138 * data frames per input queue that can be outstanding for deferred processing 139 * by the `fstrm_iothr` object and thus affects performance and memory usage. 140 * 141 * This parameter must be a power-of-2. 142 * 143 * \param opt 144 * `fstrm_iothr_options` object. 145 * \param input_queue_size 146 * New `input_queue_size` value. 147 * 148 * \retval #fstrm_res_success 149 * \retval #fstrm_res_failure 150 */ 151 fstrm_res 152 fstrm_iothr_options_set_input_queue_size( 153 struct fstrm_iothr_options *opt, 154 unsigned input_queue_size); 155 156 /** Minimum `input_queue_size` value. */ 157 #define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN 2 158 159 /** Default `input_queue_size` value. */ 160 #define FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT 512 161 162 /** Maximum `input_queue_size` value. */ 163 #define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX 16384 164 165 /** 166 * Set the `num_input_queues` parameter. This is the number of input queues to 167 * create and must match the number of times that fstrm_iothr_get_input_queue() 168 * is called on the corresponding `fstrm_iothr` object. 169 * 170 * \param opt 171 * `fstrm_iothr_options` object. 172 * \param num_input_queues 173 * New `num_input_queues` value. 174 * 175 * \retval #fstrm_res_success 176 * \retval #fstrm_res_failure 177 */ 178 fstrm_res 179 fstrm_iothr_options_set_num_input_queues( 180 struct fstrm_iothr_options *opt, 181 unsigned num_input_queues); 182 183 /** Minimum `num_input_queues` value. */ 184 #define FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN 1 185 186 /** Default `num_input_queues` value. */ 187 #define FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT 1 188 189 /** 190 * Set the `output_queue_size` parameter. This is the number of queue entries to 191 * allocate for the output queue. This option controls the maximum number of 192 * data frames that can be accumulated in the output queue before a buffer flush 193 * must occur and thus affects performance and memory usage. 194 * 195 * \param opt 196 * `fstrm_iothr_options` object. 197 * \param output_queue_size 198 * New `output_queue_size` value. 199 * 200 * \retval #fstrm_res_success 201 * \retval #fstrm_res_failure 202 */ 203 fstrm_res 204 fstrm_iothr_options_set_output_queue_size( 205 struct fstrm_iothr_options *opt, 206 unsigned output_queue_size); 207 208 /** Minimum `output_queue_size` value. */ 209 #define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN 2 210 211 /** Default `output_queue_size` value. */ 212 #define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT 64 213 214 /** Maximum `output_queue_size` value. */ 215 #define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX IOV_MAX 216 217 /** 218 * Queue models. 219 * \see fstrm_iothr_options_set_queue_model() 220 */ 221 typedef enum { 222 /** Single Producer, Single Consumer. */ 223 FSTRM_IOTHR_QUEUE_MODEL_SPSC, 224 225 /** Multiple Producer, Single Consumer. */ 226 FSTRM_IOTHR_QUEUE_MODEL_MPSC, 227 } fstrm_iothr_queue_model; 228 229 230 /** 231 * Set the `queue_model` parameter. This controls what queueing semantics to use 232 * for `fstrm_iothr_queue` objects. Single Producer queues 233 * (#FSTRM_IOTHR_QUEUE_MODEL_SPSC) may only have a single thread at a time 234 * calling fstrm_iothr_submit() on a given `fstrm_iothr_queue` object, while 235 * Multiple Producer queues (#FSTRM_IOTHR_QUEUE_MODEL_MPSC) may have multiple 236 * threads concurrently calling fstrm_iothr_submit() on a given 237 * `fstrm_iothr_queue` object. 238 * 239 * \param opt 240 * `fstrm_iothr_options` object. 241 * \param queue_model 242 * New `queue_model` value. 243 * 244 * \retval #fstrm_res_success 245 * \retval #fstrm_res_failure 246 */ 247 fstrm_res 248 fstrm_iothr_options_set_queue_model( 249 struct fstrm_iothr_options *opt, 250 fstrm_iothr_queue_model queue_model); 251 252 /** Default `queue_model` value. */ 253 #define FSTRM_IOTHR_QUEUE_MODEL_DEFAULT FSTRM_IOTHR_QUEUE_MODEL_SPSC 254 255 /** 256 * Set the `queue_notify_threshold` parameter. This controls the number of 257 * outstanding queue entries to allow on an input queue before waking the I/O 258 * thread, which will cause the outstanding queue entries to begin draining. 259 * 260 * \param opt 261 * `fstrm_iothr_options` object. 262 * \param queue_notify_threshold 263 * New `queue_notify_threshold` value. 264 * 265 * \retval #fstrm_res_success 266 * \retval #fstrm_res_failure 267 */ 268 fstrm_res 269 fstrm_iothr_options_set_queue_notify_threshold( 270 struct fstrm_iothr_options *opt, 271 unsigned queue_notify_threshold); 272 273 /** Minimum `queue_notify_threshold` value. */ 274 #define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN 1 275 276 /** Default `queue_notify_threshold` value. */ 277 #define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT 32 278 279 /** 280 * Set the `reopen_interval` parameter. This controls the number of seconds to 281 * wait between attempts to reopen a closed `fstrm_writer` output stream. 282 * 283 * \param opt 284 * `fstrm_iothr_options` object. 285 * \param reopen_interval 286 * New `queue_notify_threshold` value. 287 * 288 * \retval #fstrm_res_success 289 * \retval #fstrm_res_failure 290 */ 291 fstrm_res 292 fstrm_iothr_options_set_reopen_interval( 293 struct fstrm_iothr_options *opt, 294 unsigned reopen_interval); 295 296 /** Minimum `reopen_interval` value. */ 297 #define FSTRM_IOTHR_REOPEN_INTERVAL_MIN 1 298 299 /** Default `reopen_interval` value. */ 300 #define FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT 5 301 302 /** Maximum `reopen_interval` value. */ 303 #define FSTRM_IOTHR_REOPEN_INTERVAL_MAX 600 304 305 /** 306 * Initialize an `fstrm_iothr` object. This creates a background I/O thread 307 * which asynchronously writes data frames submitted by other threads which call 308 * fstrm_iothr_submit(). 309 * 310 * \param opt 311 * `fstrm_iothr_options` object. May be NULL, in which case default values 312 * will be used. 313 * 314 * \param writer 315 * Pointer to `fstrm_writer` object. Must be non-NULL. 316 * 317 * \return 318 * `fstrm_iothr` object. 319 * \retval 320 * NULL on failure. 321 */ 322 struct fstrm_iothr * 323 fstrm_iothr_init( 324 const struct fstrm_iothr_options *opt, 325 struct fstrm_writer **writer); 326 327 /** 328 * Destroy an `fstrm_iothr` object. This signals the background I/O thread to 329 * flush or discard any queued data frames and deallocates any resources used 330 * internally. This function blocks until the I/O thread has terminated. 331 * 332 * \param iothr 333 * Pointer to `fstrm_iothr` object. 334 */ 335 void 336 fstrm_iothr_destroy(struct fstrm_iothr **iothr); 337 338 /** 339 * Obtain an `fstrm_iothr_queue` object for submitting data frames to the 340 * `fstrm_iothr` object. `fstrm_iothr_queue` objects are child objects of their 341 * parent `fstrm_iothr` object and will be destroyed when fstrm_iothr_destroy() 342 * is called on the parent `fstrm_iothr` object. 343 * 344 * This function is thread-safe and may be called simultaneously from any 345 * thread. For example, in a program which employs a fixed number of worker 346 * threads to handle requests, fstrm_iothr_get_input_queue() may be called from 347 * a thread startup routine without synchronization. 348 * 349 * `fstrm_iothr` objects allocate a fixed total number of `fstrm_iothr_queue` 350 * objects during the call to fstrm_iothr_init(). To adjust this parameter, use 351 * fstrm_iothr_options_set_num_input_queues(). 352 * 353 * This function will fail if it is called more than **num_input_queues** times. 354 * By default, only one input queue is initialized per `fstrm_iothr` object. 355 * 356 * For optimum performance in a threaded program, each worker thread submitting 357 * data frames should have a dedicated `fstrm_iothr_queue` object. This allows 358 * each worker thread to have its own queue which is processed independently by 359 * the I/O thread. If the queue model for the `fstrm_iothr` object is set to 360 * #FSTRM_IOTHR_QUEUE_MODEL_SPSC, this results in contention-free access to the 361 * input queue. 362 * 363 * \param iothr 364 * `fstrm_iothr` object. 365 * 366 * \return 367 * `fstrm_iothr_queue` object. 368 * \retval 369 * NULL on failure. 370 */ 371 struct fstrm_iothr_queue * 372 fstrm_iothr_get_input_queue(struct fstrm_iothr *iothr); 373 374 /** 375 * Obtain an `fstrm_iothr_queue` object for submitting data frames to the 376 * `fstrm_iothr` object. This function is like fstrm_iothr_get_input_queue() 377 * except it indexes into the `fstrm_iothr_queue`'s array of input queues. 378 * 379 * \param iothr 380 * `fstrm_iothr` object. 381 * \param idx 382 * Index of the `fstrm_iothr_queue` object to retrieve. This value is 383 * limited by the **num_input_queues** option. 384 * 385 * \return 386 * `fstrm_iothr_queue` object. 387 * \retval 388 * NULL on failure. 389 */ 390 struct fstrm_iothr_queue * 391 fstrm_iothr_get_input_queue_idx(struct fstrm_iothr *iothr, size_t idx); 392 393 /** 394 * Submit a data frame to the background I/O thread. If successfully queued and 395 * the I/O thread has an active output stream opened, the data frame will be 396 * asynchronously written to the output stream. 397 * 398 * When this function returns #fstrm_res_success, responsibility for 399 * deallocating the data frame specified by the `data` parameter passes to the 400 * `fstrm` library. The caller **MUST** ensure that the `data` object remains 401 * valid after fstrm_iothr_submit() returns. The callback function specified by 402 * the `free_func` parameter will be invoked once the data frame is no longer 403 * needed by the `fstrm` library. For example, if the data frame is dynamically 404 * allocated, the data frame may be deallocated in the callback function. 405 * 406 * Note that if this function returns #fstrm_res_failure, the responsibility for 407 * deallocating the data frame remains with the caller. 408 * 409 * As a convenience, if `data` is allocated with the system's `malloc()`, 410 * `fstrm_free_wrapper` may be provided as the `free_func` parameter with the 411 * `free_data` parameter set to `NULL`. This will cause the system's `free()` to 412 * be invoked to deallocate `data`. 413 * 414 * `free_func` may be NULL, in which case no callback function will be invoked 415 * to dispose of `buf`. This behavior may be useful if `data` is a global, 416 * statically allocated object. 417 * 418 * \param iothr 419 * `fstrm_iothr` object. 420 * \param ioq 421 * `fstrm_iothr_queue` object. 422 * \param data 423 * Data frame bytes. 424 * \param len 425 * Number of bytes in `data`. 426 * \param free_func 427 * Callback function to deallocate the data frame. The `data` and 428 * `free_data` parameters passed to this callback will be the same values 429 * originally supplied in the call to fstrm_iothr_submit(). 430 * \param free_data 431 * Parameter to pass to `free_func`. 432 * 433 * \retval #fstrm_res_success 434 * The data frame was successfully queued. 435 * \retval #fstrm_res_again 436 * The queue is full. 437 * \retval #fstrm_res_failure 438 * Permanent failure. 439 */ 440 fstrm_res 441 fstrm_iothr_submit( 442 struct fstrm_iothr *iothr, struct fstrm_iothr_queue *ioq, 443 void *data, size_t len, 444 void (*free_func)(void *buf, void *free_data), void *free_data); 445 446 /** 447 * Wrapper function for the system's `free()`, suitable for use as the 448 * `free_func` callback for fstrm_iothr_submit(). 449 * 450 * \param data 451 * Object to call `free()` on. 452 * \param free_data 453 * Unused. 454 */ 455 void 456 fstrm_free_wrapper(void *data, void *free_data); 457 458 /**@}*/ 459 460 #endif /* FSTRM_IOTHR_H */ 461