1 /*
2  * Copyright (c) 2008-2019 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 static nmsg_output_t	output_open_stream(nmsg_stream_type, int, size_t);
24 static nmsg_output_t	output_open_stream_base(nmsg_stream_type, size_t);
25 static nmsg_res		output_write_callback(nmsg_output_t, nmsg_message_t);
26 
27 /* Export. */
28 
29 nmsg_output_t
nmsg_output_open_file(int fd,size_t bufsz)30 nmsg_output_open_file(int fd, size_t bufsz) {
31 	return (output_open_stream(nmsg_stream_type_file, fd, bufsz));
32 }
33 
34 nmsg_output_t
nmsg_output_open_sock(int fd,size_t bufsz)35 nmsg_output_open_sock(int fd, size_t bufsz) {
36 	return (output_open_stream(nmsg_stream_type_sock, fd, bufsz));
37 }
38 
39 #ifdef HAVE_LIBZMQ
40 nmsg_output_t
nmsg_output_open_zmq(void * s,size_t bufsz)41 nmsg_output_open_zmq(void *s, size_t bufsz) {
42 	struct nmsg_output *output;
43 
44 	output = output_open_stream_base(nmsg_stream_type_zmq, bufsz);
45 	if (output == NULL)
46 		return (output);
47 
48 	output->stream->zmq = s;
49 
50 	return (output);
51 }
52 #else /* HAVE_LIBZMQ */
53 nmsg_output_t
nmsg_output_open_zmq(void * s,size_t bufsz)54 nmsg_output_open_zmq(void *s __attribute__((unused)),
55 		    size_t bufsz __attribute__((unused)))
56 {
57 	return (NULL);
58 }
59 #endif /* HAVE_LIBZMQ */
60 
61 nmsg_output_t
nmsg_output_open_pres(int fd)62 nmsg_output_open_pres(int fd) {
63 	struct nmsg_output *output;
64 
65 	output = calloc(1, sizeof(*output));
66 	if (output == NULL)
67 		return (NULL);
68 	output->type = nmsg_output_type_pres;
69 	output->write_fp = _output_pres_write;
70 
71 	output->pres = calloc(1, sizeof(*(output->pres)));
72 	if (output->pres == NULL) {
73 		free(output);
74 		return (NULL);
75 	}
76 	output->pres->fp = fdopen(fd, "w");
77 	if (output->pres->fp == NULL) {
78 		free(output->pres);
79 		free(output);
80 		return (NULL);
81 	}
82 	output->pres->endline = strdup("\n");
83 	pthread_mutex_init(&output->pres->lock, NULL);
84 
85 	return (output);
86 }
87 
88 #ifdef HAVE_YAJL
89 nmsg_output_t
nmsg_output_open_json(int fd)90 nmsg_output_open_json(int fd) {
91 	struct nmsg_output *output;
92 	int newfd;
93 
94 	output = calloc(1, sizeof(*output));
95 	if (output == NULL)
96 		return (NULL);
97 	output->type = nmsg_output_type_json;
98 	output->write_fp = _output_json_write;
99 
100 	output->json = calloc(1, sizeof(*(output->json)));
101 	if (output->json == NULL) {
102 		free(output);
103 		return (NULL);
104 	}
105 
106 	output->json->orig_fd = fd;
107 
108 	newfd = dup(fd);
109 	if (newfd == -1) {
110 		free(output->json);
111 		free(output);
112 		return (NULL);
113 	}
114 
115 	output->json->fp = fdopen(newfd, "w");
116 	if (output->json->fp == NULL) {
117 		free(output->json);
118 		free(output);
119 		return (NULL);
120 	}
121 	pthread_mutex_init(&output->json->lock, NULL);
122 
123 	return (output);
124 }
125 #else /* HAVE_YAJL */
126 nmsg_output_t
nmsg_output_open_json(int fd)127 nmsg_output_open_json(__attribute__((unused)) int fd) {
128 	return (NULL);
129 }
130 #endif /* HAVE_YAJL */
131 
132 nmsg_output_t
nmsg_output_open_callback(nmsg_cb_message cb,void * user)133 nmsg_output_open_callback(nmsg_cb_message cb, void *user) {
134 	struct nmsg_output *output;
135 
136 	output = calloc(1, sizeof(*output));
137 	if (output == NULL)
138 		return (NULL);
139 	output->type = nmsg_output_type_callback;
140 	output->write_fp = output_write_callback;
141 
142 	output->callback = calloc(1, sizeof(*(output->callback)));
143 	if (output->callback == NULL) {
144 		free(output);
145 		return (NULL);
146 	}
147 	output->callback->cb = cb;
148 	output->callback->user = user;
149 
150 	return (output);
151 }
152 
153 nmsg_res
nmsg_output_flush(nmsg_output_t output)154 nmsg_output_flush(nmsg_output_t output) {
155 	return (output->flush_fp(output));
156 }
157 
158 nmsg_res
nmsg_output_write(nmsg_output_t output,nmsg_message_t msg)159 nmsg_output_write(nmsg_output_t output, nmsg_message_t msg) {
160 	nmsg_res res;
161 
162 	res = _nmsg_message_serialize(msg);
163 	if (res != nmsg_res_success)
164 		return (res);
165 
166 	if (output->do_filter == true &&
167 	    (output->filter_vid != msg->np->vid ||
168 	     output->filter_msgtype != msg->np->msgtype))
169 	{
170 		return (nmsg_res_success);
171 	}
172 
173 	res = output->write_fp(output, msg);
174 	return (res);
175 }
176 
177 nmsg_res
nmsg_output_close(nmsg_output_t * output)178 nmsg_output_close(nmsg_output_t *output) {
179 	nmsg_res res;
180 
181 	res = nmsg_res_success;
182 	switch ((*output)->type) {
183 	case nmsg_output_type_stream:
184 		res = _output_nmsg_flush(*output);
185 		if ((*output)->stream->random != NULL)
186 			nmsg_random_destroy(&((*output)->stream->random));
187 #ifdef HAVE_LIBZMQ
188 		if ((*output)->stream->type == nmsg_stream_type_zmq)
189 			zmq_close((*output)->stream->zmq);
190 #else /* HAVE_LIBZMQ */
191 		assert((*output)->stream->type != nmsg_stream_type_zmq);
192 #endif /* HAVE_LIBZMQ */
193 		if ((*output)->stream->type == nmsg_stream_type_file ||
194 		    (*output)->stream->type == nmsg_stream_type_sock)
195 		{
196 			if (_nmsg_global_autoclose)
197 				close((*output)->stream->fd);
198 		}
199 		nmsg_container_destroy(&(*output)->stream->c);
200 		free((*output)->stream);
201 		break;
202 	case nmsg_output_type_pres:
203 		fclose((*output)->pres->fp);
204 		free((*output)->pres->endline);
205 		free((*output)->pres);
206 		break;
207 	case nmsg_output_type_json:
208 		if (_nmsg_global_autoclose)
209 			close((*output)->json->orig_fd);
210 		fclose((*output)->json->fp);
211 		free((*output)->json);
212 		break;
213 	case nmsg_output_type_callback:
214 		free((*output)->callback);
215 		break;
216 	}
217 	free(*output);
218 	*output = NULL;
219 	return (res);
220 }
221 
222 void
nmsg_output_set_buffered(nmsg_output_t output,bool buffered)223 nmsg_output_set_buffered(nmsg_output_t output, bool buffered) {
224 	if (output->type == nmsg_output_type_stream) {
225 		output->stream->buffered = buffered;
226 	} else if (output->type == nmsg_output_type_pres ||
227 			output->type == nmsg_output_type_json) {
228 		output->pres->flush = !(buffered);
229 	}
230 }
231 
232 void
nmsg_output_set_filter_msgtype(nmsg_output_t output,unsigned vid,unsigned msgtype)233 nmsg_output_set_filter_msgtype(nmsg_output_t output, unsigned vid, unsigned msgtype) {
234 	if (vid == 0 && msgtype == 0)
235 		output->do_filter = false;
236 	else
237 		output->do_filter = true;
238 
239 	output->filter_vid = vid;
240 	output->filter_msgtype = msgtype;
241 }
242 
243 nmsg_res
nmsg_output_set_filter_msgtype_byname(nmsg_output_t output,const char * vname,const char * mname)244 nmsg_output_set_filter_msgtype_byname(nmsg_output_t output,
245 				      const char *vname, const char *mname)
246 {
247 	unsigned vid, msgtype;
248 
249 	if (vname == NULL || mname == NULL)
250 		return (nmsg_res_failure);
251 
252 	vid = nmsg_msgmod_vname_to_vid(vname);
253 	if (vid == 0)
254 		return (nmsg_res_failure);
255 	msgtype = nmsg_msgmod_mname_to_msgtype(vid, mname);
256 	if (msgtype == 0)
257 		return (nmsg_res_failure);
258 
259 	nmsg_output_set_filter_msgtype(output, vid, msgtype);
260 
261 	return (nmsg_res_success);
262 }
263 
264 void
nmsg_output_set_rate(nmsg_output_t output,nmsg_rate_t rate)265 nmsg_output_set_rate(nmsg_output_t output, nmsg_rate_t rate) {
266 	if (output->type != nmsg_output_type_stream)
267 		return;
268 
269 	pthread_mutex_lock(&output->stream->lock);
270 	output->stream->rate = rate;
271 	pthread_mutex_unlock(&output->stream->lock);
272 }
273 
274 void
nmsg_output_set_zlibout(nmsg_output_t output,bool zlibout)275 nmsg_output_set_zlibout(nmsg_output_t output, bool zlibout) {
276 	if (output->type != nmsg_output_type_stream)
277 		return;
278 	output->stream->do_zlib = zlibout;
279 }
280 
281 void
nmsg_output_set_endline(nmsg_output_t output,const char * endline)282 nmsg_output_set_endline(nmsg_output_t output, const char *endline) {
283 	if (output->type == nmsg_output_type_pres) {
284 		if (output->pres->endline != NULL)
285 			free(output->pres->endline);
286 		output->pres->endline = strdup(endline);
287 	}
288 }
289 
290 void
nmsg_output_set_source(nmsg_output_t output,unsigned source)291 nmsg_output_set_source(nmsg_output_t output, unsigned source) {
292 	if (output->type == nmsg_output_type_stream)
293 		output->stream->source = source;
294 }
295 
296 void
nmsg_output_set_operator(nmsg_output_t output,unsigned operator)297 nmsg_output_set_operator(nmsg_output_t output, unsigned operator) {
298 	if (output->type == nmsg_output_type_stream)
299 		output->stream->operator = operator;
300 }
301 
302 void
nmsg_output_set_group(nmsg_output_t output,unsigned group)303 nmsg_output_set_group(nmsg_output_t output, unsigned group) {
304 	if (output->type == nmsg_output_type_stream)
305 		output->stream->group = group;
306 }
307 
308 void
_output_stop(nmsg_output_t output)309 _output_stop(nmsg_output_t output) {
310 	output->stop = true;
311 }
312 
313 /* Private functions. */
314 
315 static nmsg_output_t
output_open_stream(nmsg_stream_type type,int fd,size_t bufsz)316 output_open_stream(nmsg_stream_type type, int fd, size_t bufsz) {
317 	struct nmsg_output *output;
318 
319 	output = output_open_stream_base(type, bufsz);
320 	if (output == NULL)
321 		return (output);
322 
323 	/* fd */
324 	if (type == nmsg_stream_type_file ||
325 	    type == nmsg_stream_type_sock)
326 	{
327 		output->stream->fd = fd;
328 	}
329 
330 	return (output);
331 }
332 
333 static nmsg_output_t
output_open_stream_base(nmsg_stream_type type,size_t bufsz)334 output_open_stream_base(nmsg_stream_type type, size_t bufsz) {
335 	struct nmsg_output *output;
336 
337 	/* nmsg_output */
338 	output = calloc(1, sizeof(*output));
339 	if (output == NULL)
340 		return (NULL);
341 	output->type = nmsg_output_type_stream;
342 	output->write_fp = _output_nmsg_write;
343 	output->flush_fp = _output_nmsg_flush;
344 
345 	/* nmsg_stream_output */
346 	output->stream = calloc(1, sizeof(*(output->stream)));
347 	if (output->stream == NULL) {
348 		free(output);
349 		return (NULL);
350 	}
351 	pthread_mutex_init(&output->stream->lock, NULL);
352 	output->stream->type = type;
353 	output->stream->buffered = true;
354 
355 	/* seed the rng, needed for fragment and sequence IDs */
356 	output->stream->random = nmsg_random_init();
357 	if (output->stream->random == NULL) {
358 		free(output->stream);
359 		free(output);
360 		return (NULL);
361 	}
362 
363 	/* enable container sequencing */
364 	if (output->stream->type == nmsg_stream_type_sock ||
365 	    output->stream->type == nmsg_stream_type_zmq)
366 	{
367 		output->stream->do_sequence = true;
368 
369 		/* generate sequence ID */
370 		nmsg_random_buf(output->stream->random,
371 				(uint8_t *) &output->stream->sequence_id,
372 				sizeof(output->stream->sequence_id));
373 	}
374 
375 	/* bufsz */
376 	if (bufsz < NMSG_WBUFSZ_MIN)
377 		bufsz = NMSG_WBUFSZ_MIN;
378 	if (bufsz > NMSG_WBUFSZ_MAX)
379 		bufsz = NMSG_WBUFSZ_MAX;
380 	output->stream->bufsz = bufsz;
381 
382 	/* nmsg container */
383 	output->stream->c = nmsg_container_init(bufsz);
384 	if (output->stream->c == NULL) {
385 		nmsg_random_destroy(&output->stream->random);
386 		free(output->stream);
387 		free(output);
388 		return (NULL);
389 	}
390 	nmsg_container_set_sequence(output->stream->c, output->stream->do_sequence);
391 
392 	return (output);
393 }
394 
395 static nmsg_res
output_write_callback(nmsg_output_t output,nmsg_message_t msg)396 output_write_callback(nmsg_output_t output, nmsg_message_t msg) {
397 	output->callback->cb(msg, output->callback->user);
398 	return (nmsg_res_success);
399 }
400