1 /*
2  * Copyright (c) 2008-2015, 2017-2019 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef NMSG_INPUT_H
18 #define NMSG_INPUT_H
19 
20 /*! \file nmsg/input.h
21  * \brief Convert input streams to nmsg format.
22  *
23  * Nmsg can import data into a stream of payloads from several different input
24  * sources:
25  *
26  *	\li Wire-format NMSG containers which contain one or more binary
27  *	payloads that can be read from file or datagram socket sources. This is
28  *	the native NMSG interchange format.
29  *
30  *	\li libpcap packets from a pcap savefile or network interface that will
31  *	be reassembled into IP datagrams and passed to a message format specific
32  *	function for conversion into nmsg payloads.
33  *
34  *	\li Presentation format data (ASCII lines) read from a file, converted
35  *	by a message format specific function into nmsg payloads.
36  *
37  * <b>MP:</b>
38  *	\li Clients must ensure synchronized access when reading from an
39  *	nmsg_input_t object.
40  *
41  * <b>Reliability:</b>
42  *	\li Clients must not touch the underlying file descriptor or pcap_t
43  *	object. Cleanup will be handled by the nmsg_input_close() function.
44  *
45  * <b>Resources:</b>
46  *	\li An internal buffer will be allocated and used until an nmsg_input_t
47  *	object is destroyed.
48  */
49 
50 /**
51  * An enum identifying the underlying implementation of an nmsg_input_t object.
52  * This is used for nmsg_io's close event notification.
53  */
54 typedef enum {
55 	nmsg_input_type_stream,	/*%< NMSG payloads from file or socket */
56 	nmsg_input_type_pcap,	/*%< pcap packets from file or interface */
57 	nmsg_input_type_pres,	/*%< presentation form */
58 	nmsg_input_type_callback,
59 	nmsg_input_type_json,	/*%< JSON form */
60 } nmsg_input_type;
61 
62 /**
63  * Initialize a new NMSG stream input from a byte-stream file source.
64  *
65  * \param[in] fd Readable file descriptor from a byte-stream source.
66  *
67  * \return Opaque pointer that is NULL on failure or non-NULL on success.
68  */
69 nmsg_input_t
70 nmsg_input_open_file(int fd);
71 
72 /**
73  * Initialize a new NMSG stream input from a datagram socket source.
74  *
75  * \param[in] fd Readable datagram socket.
76  *
77  * \return Opaque pointer that is NULL on failure or non-NULL on success.
78  */
79 nmsg_input_t
80 nmsg_input_open_sock(int fd);
81 
82 /**
83  * Initialize a new NMSG stream input from a ZMQ socket source.
84  *
85  * \param[in] s ZMQ socket.
86  *
87  * \return Opaque pointer that is NULL on failure or non-NULL on success.
88  */
89 nmsg_input_t
90 nmsg_input_open_zmq(void *s);
91 
92 /**
93  * Create an ZMQ socket and initialize a new NMSG stream input from it.
94  *
95  * This function is a wrapper for nmsg_input_open_zmq(). Instead of taking an
96  * already initialized ZMQ socket object, it takes an endpoint argument like
97  * zmq_connect() and zmq_bind() do which is a string containing a
98  * "transport://address" specification and initializes an ZMQ socket object.
99  * However, this endpoint string will be munged in order to support additional
100  * functionality:
101  *
102  * The caller may select between a bound or connected ZMQ socket by appending
103  * ",accept" or ",connect" to the endpoint argument. (If not given, this
104  * function behaves as if ",connect" was passed.) That is, ",accept" uses
105  * zmq_bind() to obtain a ZMQ endpoint, and ",connect" uses zmq_connect().
106  *
107  * The caller may additionally select between a SUB socket or a PULL
108  * socket by appending ",pubsub" or ",pushpull". (If not given, this function
109  * behaves as if ",pubsub" was passed.)
110  *
111  * \see nmsg_output_open_zmq_endpoint()
112  *
113  * \param[in] zmq_ctx ZMQ context object.
114  *
115  * \param[in] ep ZMQ endpoint (with nmsg-specific extensions)
116  *
117  * \return Opaque pointer that is NULL on failure or non-NULL on success.
118  */
119 nmsg_input_t
120 nmsg_input_open_zmq_endpoint(void *zmq_ctx, const char *ep);
121 
122 /**
123  * Initialize a new nmsg input closure. This allows a user-provided callback to
124  * function as an nmsg input, for instance to participate in an nmsg_io loop.
125  * The callback is responsible for creating an nmsg_message_t object and
126  * returning it to the caller.
127  *
128  * \param[in] cb Non-NULL function pointer.
129  *
130  * \param[in] user Optionally NULL pointer which will be passed to the callback.
131  *
132  * \return Opaque pointer that is NULL on failure or non-NULL on success.
133  */
134 nmsg_input_t
135 nmsg_input_open_callback(nmsg_cb_message_read cb, void *user);
136 
137 /**
138  * Initialize a new "null source" NMSG stream input.
139  *
140  * A "null source" means the actual gathering of input is not performed by
141  * the library but rather by the caller. A "null source" nmsg_input_t thus
142  * serves only to hold the state associated with the stream.
143  *
144  * Calling nmsg_input_loop() or nmsg_input_read() on a "null source" input
145  * will fail. Callers instead need to use nmsg_input_read_null().
146  *
147  * \return Opaque pointer that is NULL on failure or non-NULL on success.
148  */
149 nmsg_input_t
150 nmsg_input_open_null(void);
151 
152 /**
153  * Initialize a new NMSG presentation form input from a file descriptor.
154  *
155  * \param[in] fd Readable file descriptor.
156  *
157  * \param[in] msgmod Handle that implements the desired presentation form
158  *	to NMSG conversion.
159  *
160  * \return Opaque pointer that is NULL on failure or non-NULL on success.
161  */
162 nmsg_input_t
163 nmsg_input_open_pres(int fd, nmsg_msgmod_t msgmod);
164 
165 /**
166  * Initialize a new NMSG JSON form input from a file descriptor.
167  *
168  * See nmsg_output_open_json for details of the JSON format.
169  *
170  * \param[in] fd Readable file descriptor.
171  *
172  * \return Opaque pointer that is NULL on failure or non-NULL on success.
173  */
174 nmsg_input_t
175 nmsg_input_open_json(int fd);
176 
177 /**
178  * Initialize a new NMSG pcap input from a pcap descriptor.
179  *
180  * \param[in] pcap Descriptor returned by libpcap. Supported data link types are
181  * those supported by nmsg_ipdg_parse_pcap().
182  *
183  * \param[in] msgmod Handle that implements the desired IP datagram to
184  *	NMSG conversion.
185  *
186  * \return Opaque pointer that is NULL on failure or non-NULL on success.
187  */
188 nmsg_input_t
189 nmsg_input_open_pcap(nmsg_pcap_t pcap, nmsg_msgmod_t msgmod);
190 
191 /**
192  * Close an nmsg_input_t object and release all associated resources.
193  *
194  * \param[in] input Valid pointer to an nmsg_input_t object.
195  *
196  * \return #nmsg_res_success
197  */
198 nmsg_res
199 nmsg_input_close(nmsg_input_t *input);
200 
201 /**
202  * Loop over an input stream and call a user-provided function for each payload.
203  *
204  * \param[in] input Valid nmsg_input_t.
205  *
206  * \param[in] count Non-negative to indicate a finite number of payloads to
207  *	process or negative to indicate all available payloads should be
208  *	processed.
209  *
210  * \param[in] cb Non-NULL function pointer that will be called once for each
211  *	payload.
212  *
213  * \param[in] user Optionally NULL pointer which will be passed to the callback.
214  *
215  * \return Any of nmsg_input_read()'s return values.
216  */
217 nmsg_res
218 nmsg_input_loop(nmsg_input_t input, int count, nmsg_cb_message cb, void *user);
219 
220 /**
221  * Break out of an #nmsg_input_loop() early.
222  *
223  * \param[in] input Valid nmsg_input_t.
224  */
225 void
226 nmsg_input_breakloop(nmsg_input_t input);
227 
228 /**
229  * Read one NMSG message from an input stream.
230  *
231  * \param[in] input Valid nmsg_input_t.
232  *
233  * \param[out] msg Pointer to where an nmsg_message_t object may be stored.
234  *
235  * \return #nmsg_res_success
236  * \return #nmsg_res_failure
237  * \return #nmsg_res_again
238  * \return #nmsg_res_eof
239  * \return #nmsg_res_magic_mismatch
240  * \return #nmsg_res_version_mismatch
241  */
242 nmsg_res
243 nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg);
244 
245 /**
246  * Read zero, one, or more NMSG messages from a "null source" input. The caller
247  * must supply a buffer containing the serialized NMSG container. This function
248  * may return #nmsg_res_success with n_msg set to zero, which indicates that the
249  * NMSG container contained a fragment.
250  *
251  * \param[in] input Valid "null source" nmsg_input_t.
252  *
253  * \param[in] buf Input buffer containing a serialized NMSG container.
254  *
255  * \param[in] buf_len Length of input buffer.
256  *
257  * \param[in] ts Current "time". May be NULL to indicate the current wall clock
258  * time.
259  *
260  * \param[out] msg Pointer to where an array of nmsg_message_t objects may be
261  * stored.
262  *
263  * \param[out] n_msg Pointer to where the size of the output array will be
264  * stored.
265  *
266  * \return #nmsg_res_success
267  * \return #nmsg_res_again
268  * \return #nmsg_res_magic_mismatch
269  * \return #nmsg_res_version_mismatch
270  */
271 nmsg_res
272 nmsg_input_read_null(nmsg_input_t input, uint8_t *buf, size_t buf_len,
273 		     struct timespec *ts, nmsg_message_t **msg, size_t *n_msg);
274 
275 /**
276  * Filter an nmsg_input_t for a given vendor ID / message type.
277  *
278  * NMSG messages whose vid and and msgtype fields do not match the filter will
279  * be silently discarded when reading from the input.
280  *
281  * Calling this function with vid=0 and msgtype=0 will disable the filter.
282  *
283  * \param[in] input nmsg_input_t object.
284  *
285  * \param[in] vid Vendor ID.
286  *
287  * \param[in] msgtype Message type.
288  */
289 void
290 nmsg_input_set_filter_msgtype(nmsg_input_t input,
291 			      unsigned vid, unsigned msgtype);
292 
293 /**
294  * Filter an nmsg_input_t for a given vendor ID / message type.
295  *
296  * \param[in] input nmsg_input_t object.
297  *
298  * \param[in] vname Vendor ID name.
299  *
300  * \param[in] mname Message type name.
301  */
302 nmsg_res
303 nmsg_input_set_filter_msgtype_byname(nmsg_input_t input,
304 				     const char *vname, const char *mname);
305 
306 /**
307  * Set a source filter for input NMSG payloads. This is only effective for file,
308  * socket, and xs inputs. Only NMSG payloads whose source field matches the
309  * source filter will be output by nmsg_input_read() or nmsg_input_loop().
310  *
311  * \param[in] input NMSG stream nmsg_input_t object.
312  *
313  * \param[in] source Source ID filter, 0 to disable.
314  */
315 void
316 nmsg_input_set_filter_source(nmsg_input_t input, unsigned source);
317 
318 /**
319  * Set an operator filter for input NMSG payloads. This has no effect on
320  * non-NMSG inputs. Only NMSG payloads whose operator field matches the
321  * operator filter will be output by nmsg_input_read() or nmsg_input_loop().
322  *
323  * \param[in] input NMSG stream nmsg_input_t object.
324  *
325  * \param[in] operator_ Operator ID filter, 0 to disable.
326  */
327 void
328 nmsg_input_set_filter_operator(nmsg_input_t input, unsigned operator_);
329 
330 /**
331  * Set a group filter for input NMSG payloads. This has no effect on non-NMSG
332  * inputs. Only NMSG payloads whose group field matches the group filter will
333  * be output by nmsg_input_read() or nmsg_input_loop().
334  *
335  * \param[in] input NMSG stream nmsg_input_t object.
336  *
337  * \param[in] group Group ID filter, 0 to disable.
338  */
339 void
340 nmsg_input_set_filter_group(nmsg_input_t input, unsigned group);
341 
342 /**
343  * Configure non-blocking I/O for a stream input.
344  *
345  * \param[in] input NMSG stream nmsg_input_t object.
346  *
347  * \param[in] flag boolean value, true to clear O_NONBLOCK on the
348  *	underlying file descriptor, false to set O_NONBLOCK.
349  *
350  * \return #nmsg_res_success
351  * \return #nmsg_res_failure
352  */
353 nmsg_res
354 nmsg_input_set_blocking_io(nmsg_input_t input, bool flag);
355 
356 /**
357  * Set the target ingress byte rate for a stream input. If the target byte
358  * rate is positive, reading from the input may sleep in order to maintain the
359  * target consumption rate.
360  *
361  * Setting this value to a non-positive value will disable ingress byte rate
362  * control.
363  *
364  * \param[in] input NMSG stream nmsg_input_t object.
365  *
366  * \param[in] rate Target byte rate in bytes/second.
367  *
368  * \return #nmsg_res_success
369  * \return #nmsg_res_failure
370  */
371 nmsg_res
372 nmsg_input_set_byte_rate(nmsg_input_t input, size_t rate);
373 
374 /**
375  * Enable or disable seqsrc verification on an NMSG stream nmsg_input_t object.
376  * Note that for stream nmsg_input_t objects, seqsrc verification is enabled
377  * by default.
378  *
379  * \param[in] input NMSG stream nmsg_input_t object.
380  *
381  * \param[in] verify boolean value, true to enable verification, false to
382  *	disable verification.
383  *
384  * \return #nmsg_res_success
385  * \return #nmsg_res_failure
386  */
387 nmsg_res
388 nmsg_input_set_verify_seqsrc(nmsg_input_t input, bool verify);
389 
390 /**
391  * For UDP datagram socket nmsg_input_t objects, retrieve the total number of
392  * NMSG containers that have been received since the nmsg_input_t object was
393  * created.
394  *
395  * \param[in] input UDP socket based NMSG input object.
396  *
397  * \param[out] count Total number of NMSG containers received by the
398  *	nmsg_input_t object during its lifetime.
399  *
400  * \return #nmsg_res_success
401  * \return #nmsg_res_failure
402  */
403 nmsg_res
404 nmsg_input_get_count_container_received(nmsg_input_t input, uint64_t *count);
405 
406 /**
407  * For UDP datagram socket nmsg_input_t objects, retrieve the total number of
408  * NMSG containers that been dropped since the nmsg_input_t object was
409  * created. Sequence number tracking must have been previously enabled by a
410  * call to #nmsg_input_set_verify_seqsrc().
411  *
412  * \param[in] input UDP socket based NMSG input object.
413  *
414  * \param[out] count Number of NMSG containers determined to have been dropped
415  *	by the nmsg_input_t object since sequence number tracking was enabled.
416  *
417  * \return #nmsg_res_success
418  * \return #nmsg_res_failure
419 */
420 nmsg_res
421 nmsg_input_get_count_container_dropped(nmsg_input_t input, uint64_t *count);
422 
423 #endif /* NMSG_INPUT_H */
424