1 /* 2 * Copyright (c) 2008-2015, 2017-2019 by Farsight Security, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef NMSG_INPUT_H 18 #define NMSG_INPUT_H 19 20 /*! \file nmsg/input.h 21 * \brief Convert input streams to nmsg format. 22 * 23 * Nmsg can import data into a stream of payloads from several different input 24 * sources: 25 * 26 * \li Wire-format NMSG containers which contain one or more binary 27 * payloads that can be read from file or datagram socket sources. This is 28 * the native NMSG interchange format. 29 * 30 * \li libpcap packets from a pcap savefile or network interface that will 31 * be reassembled into IP datagrams and passed to a message format specific 32 * function for conversion into nmsg payloads. 33 * 34 * \li Presentation format data (ASCII lines) read from a file, converted 35 * by a message format specific function into nmsg payloads. 36 * 37 * <b>MP:</b> 38 * \li Clients must ensure synchronized access when reading from an 39 * nmsg_input_t object. 40 * 41 * <b>Reliability:</b> 42 * \li Clients must not touch the underlying file descriptor or pcap_t 43 * object. Cleanup will be handled by the nmsg_input_close() function. 44 * 45 * <b>Resources:</b> 46 * \li An internal buffer will be allocated and used until an nmsg_input_t 47 * object is destroyed. 48 */ 49 50 /** 51 * An enum identifying the underlying implementation of an nmsg_input_t object. 52 * This is used for nmsg_io's close event notification. 53 */ 54 typedef enum { 55 nmsg_input_type_stream, /*%< NMSG payloads from file or socket */ 56 nmsg_input_type_pcap, /*%< pcap packets from file or interface */ 57 nmsg_input_type_pres, /*%< presentation form */ 58 nmsg_input_type_callback, 59 nmsg_input_type_json, /*%< JSON form */ 60 } nmsg_input_type; 61 62 /** 63 * Initialize a new NMSG stream input from a byte-stream file source. 64 * 65 * \param[in] fd Readable file descriptor from a byte-stream source. 66 * 67 * \return Opaque pointer that is NULL on failure or non-NULL on success. 68 */ 69 nmsg_input_t 70 nmsg_input_open_file(int fd); 71 72 /** 73 * Initialize a new NMSG stream input from a datagram socket source. 74 * 75 * \param[in] fd Readable datagram socket. 76 * 77 * \return Opaque pointer that is NULL on failure or non-NULL on success. 78 */ 79 nmsg_input_t 80 nmsg_input_open_sock(int fd); 81 82 /** 83 * Initialize a new NMSG stream input from a ZMQ socket source. 84 * 85 * \param[in] s ZMQ socket. 86 * 87 * \return Opaque pointer that is NULL on failure or non-NULL on success. 88 */ 89 nmsg_input_t 90 nmsg_input_open_zmq(void *s); 91 92 /** 93 * Create an ZMQ socket and initialize a new NMSG stream input from it. 94 * 95 * This function is a wrapper for nmsg_input_open_zmq(). Instead of taking an 96 * already initialized ZMQ socket object, it takes an endpoint argument like 97 * zmq_connect() and zmq_bind() do which is a string containing a 98 * "transport://address" specification and initializes an ZMQ socket object. 99 * However, this endpoint string will be munged in order to support additional 100 * functionality: 101 * 102 * The caller may select between a bound or connected ZMQ socket by appending 103 * ",accept" or ",connect" to the endpoint argument. (If not given, this 104 * function behaves as if ",connect" was passed.) That is, ",accept" uses 105 * zmq_bind() to obtain a ZMQ endpoint, and ",connect" uses zmq_connect(). 106 * 107 * The caller may additionally select between a SUB socket or a PULL 108 * socket by appending ",pubsub" or ",pushpull". (If not given, this function 109 * behaves as if ",pubsub" was passed.) 110 * 111 * \see nmsg_output_open_zmq_endpoint() 112 * 113 * \param[in] zmq_ctx ZMQ context object. 114 * 115 * \param[in] ep ZMQ endpoint (with nmsg-specific extensions) 116 * 117 * \return Opaque pointer that is NULL on failure or non-NULL on success. 118 */ 119 nmsg_input_t 120 nmsg_input_open_zmq_endpoint(void *zmq_ctx, const char *ep); 121 122 /** 123 * Initialize a new nmsg input closure. This allows a user-provided callback to 124 * function as an nmsg input, for instance to participate in an nmsg_io loop. 125 * The callback is responsible for creating an nmsg_message_t object and 126 * returning it to the caller. 127 * 128 * \param[in] cb Non-NULL function pointer. 129 * 130 * \param[in] user Optionally NULL pointer which will be passed to the callback. 131 * 132 * \return Opaque pointer that is NULL on failure or non-NULL on success. 133 */ 134 nmsg_input_t 135 nmsg_input_open_callback(nmsg_cb_message_read cb, void *user); 136 137 /** 138 * Initialize a new "null source" NMSG stream input. 139 * 140 * A "null source" means the actual gathering of input is not performed by 141 * the library but rather by the caller. A "null source" nmsg_input_t thus 142 * serves only to hold the state associated with the stream. 143 * 144 * Calling nmsg_input_loop() or nmsg_input_read() on a "null source" input 145 * will fail. Callers instead need to use nmsg_input_read_null(). 146 * 147 * \return Opaque pointer that is NULL on failure or non-NULL on success. 148 */ 149 nmsg_input_t 150 nmsg_input_open_null(void); 151 152 /** 153 * Initialize a new NMSG presentation form input from a file descriptor. 154 * 155 * \param[in] fd Readable file descriptor. 156 * 157 * \param[in] msgmod Handle that implements the desired presentation form 158 * to NMSG conversion. 159 * 160 * \return Opaque pointer that is NULL on failure or non-NULL on success. 161 */ 162 nmsg_input_t 163 nmsg_input_open_pres(int fd, nmsg_msgmod_t msgmod); 164 165 /** 166 * Initialize a new NMSG JSON form input from a file descriptor. 167 * 168 * See nmsg_output_open_json for details of the JSON format. 169 * 170 * \param[in] fd Readable file descriptor. 171 * 172 * \return Opaque pointer that is NULL on failure or non-NULL on success. 173 */ 174 nmsg_input_t 175 nmsg_input_open_json(int fd); 176 177 /** 178 * Initialize a new NMSG pcap input from a pcap descriptor. 179 * 180 * \param[in] pcap Descriptor returned by libpcap. Supported data link types are 181 * those supported by nmsg_ipdg_parse_pcap(). 182 * 183 * \param[in] msgmod Handle that implements the desired IP datagram to 184 * NMSG conversion. 185 * 186 * \return Opaque pointer that is NULL on failure or non-NULL on success. 187 */ 188 nmsg_input_t 189 nmsg_input_open_pcap(nmsg_pcap_t pcap, nmsg_msgmod_t msgmod); 190 191 /** 192 * Close an nmsg_input_t object and release all associated resources. 193 * 194 * \param[in] input Valid pointer to an nmsg_input_t object. 195 * 196 * \return #nmsg_res_success 197 */ 198 nmsg_res 199 nmsg_input_close(nmsg_input_t *input); 200 201 /** 202 * Loop over an input stream and call a user-provided function for each payload. 203 * 204 * \param[in] input Valid nmsg_input_t. 205 * 206 * \param[in] count Non-negative to indicate a finite number of payloads to 207 * process or negative to indicate all available payloads should be 208 * processed. 209 * 210 * \param[in] cb Non-NULL function pointer that will be called once for each 211 * payload. 212 * 213 * \param[in] user Optionally NULL pointer which will be passed to the callback. 214 * 215 * \return Any of nmsg_input_read()'s return values. 216 */ 217 nmsg_res 218 nmsg_input_loop(nmsg_input_t input, int count, nmsg_cb_message cb, void *user); 219 220 /** 221 * Break out of an #nmsg_input_loop() early. 222 * 223 * \param[in] input Valid nmsg_input_t. 224 */ 225 void 226 nmsg_input_breakloop(nmsg_input_t input); 227 228 /** 229 * Read one NMSG message from an input stream. 230 * 231 * \param[in] input Valid nmsg_input_t. 232 * 233 * \param[out] msg Pointer to where an nmsg_message_t object may be stored. 234 * 235 * \return #nmsg_res_success 236 * \return #nmsg_res_failure 237 * \return #nmsg_res_again 238 * \return #nmsg_res_eof 239 * \return #nmsg_res_magic_mismatch 240 * \return #nmsg_res_version_mismatch 241 */ 242 nmsg_res 243 nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg); 244 245 /** 246 * Read zero, one, or more NMSG messages from a "null source" input. The caller 247 * must supply a buffer containing the serialized NMSG container. This function 248 * may return #nmsg_res_success with n_msg set to zero, which indicates that the 249 * NMSG container contained a fragment. 250 * 251 * \param[in] input Valid "null source" nmsg_input_t. 252 * 253 * \param[in] buf Input buffer containing a serialized NMSG container. 254 * 255 * \param[in] buf_len Length of input buffer. 256 * 257 * \param[in] ts Current "time". May be NULL to indicate the current wall clock 258 * time. 259 * 260 * \param[out] msg Pointer to where an array of nmsg_message_t objects may be 261 * stored. 262 * 263 * \param[out] n_msg Pointer to where the size of the output array will be 264 * stored. 265 * 266 * \return #nmsg_res_success 267 * \return #nmsg_res_again 268 * \return #nmsg_res_magic_mismatch 269 * \return #nmsg_res_version_mismatch 270 */ 271 nmsg_res 272 nmsg_input_read_null(nmsg_input_t input, uint8_t *buf, size_t buf_len, 273 struct timespec *ts, nmsg_message_t **msg, size_t *n_msg); 274 275 /** 276 * Filter an nmsg_input_t for a given vendor ID / message type. 277 * 278 * NMSG messages whose vid and and msgtype fields do not match the filter will 279 * be silently discarded when reading from the input. 280 * 281 * Calling this function with vid=0 and msgtype=0 will disable the filter. 282 * 283 * \param[in] input nmsg_input_t object. 284 * 285 * \param[in] vid Vendor ID. 286 * 287 * \param[in] msgtype Message type. 288 */ 289 void 290 nmsg_input_set_filter_msgtype(nmsg_input_t input, 291 unsigned vid, unsigned msgtype); 292 293 /** 294 * Filter an nmsg_input_t for a given vendor ID / message type. 295 * 296 * \param[in] input nmsg_input_t object. 297 * 298 * \param[in] vname Vendor ID name. 299 * 300 * \param[in] mname Message type name. 301 */ 302 nmsg_res 303 nmsg_input_set_filter_msgtype_byname(nmsg_input_t input, 304 const char *vname, const char *mname); 305 306 /** 307 * Set a source filter for input NMSG payloads. This is only effective for file, 308 * socket, and xs inputs. Only NMSG payloads whose source field matches the 309 * source filter will be output by nmsg_input_read() or nmsg_input_loop(). 310 * 311 * \param[in] input NMSG stream nmsg_input_t object. 312 * 313 * \param[in] source Source ID filter, 0 to disable. 314 */ 315 void 316 nmsg_input_set_filter_source(nmsg_input_t input, unsigned source); 317 318 /** 319 * Set an operator filter for input NMSG payloads. This has no effect on 320 * non-NMSG inputs. Only NMSG payloads whose operator field matches the 321 * operator filter will be output by nmsg_input_read() or nmsg_input_loop(). 322 * 323 * \param[in] input NMSG stream nmsg_input_t object. 324 * 325 * \param[in] operator_ Operator ID filter, 0 to disable. 326 */ 327 void 328 nmsg_input_set_filter_operator(nmsg_input_t input, unsigned operator_); 329 330 /** 331 * Set a group filter for input NMSG payloads. This has no effect on non-NMSG 332 * inputs. Only NMSG payloads whose group field matches the group filter will 333 * be output by nmsg_input_read() or nmsg_input_loop(). 334 * 335 * \param[in] input NMSG stream nmsg_input_t object. 336 * 337 * \param[in] group Group ID filter, 0 to disable. 338 */ 339 void 340 nmsg_input_set_filter_group(nmsg_input_t input, unsigned group); 341 342 /** 343 * Configure non-blocking I/O for a stream input. 344 * 345 * \param[in] input NMSG stream nmsg_input_t object. 346 * 347 * \param[in] flag boolean value, true to clear O_NONBLOCK on the 348 * underlying file descriptor, false to set O_NONBLOCK. 349 * 350 * \return #nmsg_res_success 351 * \return #nmsg_res_failure 352 */ 353 nmsg_res 354 nmsg_input_set_blocking_io(nmsg_input_t input, bool flag); 355 356 /** 357 * Set the target ingress byte rate for a stream input. If the target byte 358 * rate is positive, reading from the input may sleep in order to maintain the 359 * target consumption rate. 360 * 361 * Setting this value to a non-positive value will disable ingress byte rate 362 * control. 363 * 364 * \param[in] input NMSG stream nmsg_input_t object. 365 * 366 * \param[in] rate Target byte rate in bytes/second. 367 * 368 * \return #nmsg_res_success 369 * \return #nmsg_res_failure 370 */ 371 nmsg_res 372 nmsg_input_set_byte_rate(nmsg_input_t input, size_t rate); 373 374 /** 375 * Enable or disable seqsrc verification on an NMSG stream nmsg_input_t object. 376 * Note that for stream nmsg_input_t objects, seqsrc verification is enabled 377 * by default. 378 * 379 * \param[in] input NMSG stream nmsg_input_t object. 380 * 381 * \param[in] verify boolean value, true to enable verification, false to 382 * disable verification. 383 * 384 * \return #nmsg_res_success 385 * \return #nmsg_res_failure 386 */ 387 nmsg_res 388 nmsg_input_set_verify_seqsrc(nmsg_input_t input, bool verify); 389 390 /** 391 * For UDP datagram socket nmsg_input_t objects, retrieve the total number of 392 * NMSG containers that have been received since the nmsg_input_t object was 393 * created. 394 * 395 * \param[in] input UDP socket based NMSG input object. 396 * 397 * \param[out] count Total number of NMSG containers received by the 398 * nmsg_input_t object during its lifetime. 399 * 400 * \return #nmsg_res_success 401 * \return #nmsg_res_failure 402 */ 403 nmsg_res 404 nmsg_input_get_count_container_received(nmsg_input_t input, uint64_t *count); 405 406 /** 407 * For UDP datagram socket nmsg_input_t objects, retrieve the total number of 408 * NMSG containers that been dropped since the nmsg_input_t object was 409 * created. Sequence number tracking must have been previously enabled by a 410 * call to #nmsg_input_set_verify_seqsrc(). 411 * 412 * \param[in] input UDP socket based NMSG input object. 413 * 414 * \param[out] count Number of NMSG containers determined to have been dropped 415 * by the nmsg_input_t object since sequence number tracking was enabled. 416 * 417 * \return #nmsg_res_success 418 * \return #nmsg_res_failure 419 */ 420 nmsg_res 421 nmsg_input_get_count_container_dropped(nmsg_input_t input, uint64_t *count); 422 423 #endif /* NMSG_INPUT_H */ 424