1 /*
2 * Copyright (c) 2008-2019 by Farsight Security, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /* Import. */
18
19 #include "private.h"
20
21 /* Forward. */
22
23 static nmsg_output_t output_open_stream(nmsg_stream_type, int, size_t);
24 static nmsg_output_t output_open_stream_base(nmsg_stream_type, size_t);
25 static nmsg_res output_write_callback(nmsg_output_t, nmsg_message_t);
26
27 /* Export. */
28
29 nmsg_output_t
nmsg_output_open_file(int fd,size_t bufsz)30 nmsg_output_open_file(int fd, size_t bufsz) {
31 return (output_open_stream(nmsg_stream_type_file, fd, bufsz));
32 }
33
34 nmsg_output_t
nmsg_output_open_sock(int fd,size_t bufsz)35 nmsg_output_open_sock(int fd, size_t bufsz) {
36 return (output_open_stream(nmsg_stream_type_sock, fd, bufsz));
37 }
38
39 #ifdef HAVE_LIBZMQ
40 nmsg_output_t
nmsg_output_open_zmq(void * s,size_t bufsz)41 nmsg_output_open_zmq(void *s, size_t bufsz) {
42 struct nmsg_output *output;
43
44 output = output_open_stream_base(nmsg_stream_type_zmq, bufsz);
45 if (output == NULL)
46 return (output);
47
48 output->stream->zmq = s;
49
50 return (output);
51 }
52 #else /* HAVE_LIBZMQ */
53 nmsg_output_t
nmsg_output_open_zmq(void * s,size_t bufsz)54 nmsg_output_open_zmq(void *s __attribute__((unused)),
55 size_t bufsz __attribute__((unused)))
56 {
57 return (NULL);
58 }
59 #endif /* HAVE_LIBZMQ */
60
61 nmsg_output_t
nmsg_output_open_pres(int fd)62 nmsg_output_open_pres(int fd) {
63 struct nmsg_output *output;
64
65 output = calloc(1, sizeof(*output));
66 if (output == NULL)
67 return (NULL);
68 output->type = nmsg_output_type_pres;
69 output->write_fp = _output_pres_write;
70
71 output->pres = calloc(1, sizeof(*(output->pres)));
72 if (output->pres == NULL) {
73 free(output);
74 return (NULL);
75 }
76 output->pres->fp = fdopen(fd, "w");
77 if (output->pres->fp == NULL) {
78 free(output->pres);
79 free(output);
80 return (NULL);
81 }
82 output->pres->endline = strdup("\n");
83 pthread_mutex_init(&output->pres->lock, NULL);
84
85 return (output);
86 }
87
88 #ifdef HAVE_YAJL
89 nmsg_output_t
nmsg_output_open_json(int fd)90 nmsg_output_open_json(int fd) {
91 struct nmsg_output *output;
92 int newfd;
93
94 output = calloc(1, sizeof(*output));
95 if (output == NULL)
96 return (NULL);
97 output->type = nmsg_output_type_json;
98 output->write_fp = _output_json_write;
99
100 output->json = calloc(1, sizeof(*(output->json)));
101 if (output->json == NULL) {
102 free(output);
103 return (NULL);
104 }
105
106 output->json->orig_fd = fd;
107
108 newfd = dup(fd);
109 if (newfd == -1) {
110 free(output->json);
111 free(output);
112 return (NULL);
113 }
114
115 output->json->fp = fdopen(newfd, "w");
116 if (output->json->fp == NULL) {
117 free(output->json);
118 free(output);
119 return (NULL);
120 }
121 pthread_mutex_init(&output->json->lock, NULL);
122
123 return (output);
124 }
125 #else /* HAVE_YAJL */
126 nmsg_output_t
nmsg_output_open_json(int fd)127 nmsg_output_open_json(__attribute__((unused)) int fd) {
128 return (NULL);
129 }
130 #endif /* HAVE_YAJL */
131
132 nmsg_output_t
nmsg_output_open_callback(nmsg_cb_message cb,void * user)133 nmsg_output_open_callback(nmsg_cb_message cb, void *user) {
134 struct nmsg_output *output;
135
136 output = calloc(1, sizeof(*output));
137 if (output == NULL)
138 return (NULL);
139 output->type = nmsg_output_type_callback;
140 output->write_fp = output_write_callback;
141
142 output->callback = calloc(1, sizeof(*(output->callback)));
143 if (output->callback == NULL) {
144 free(output);
145 return (NULL);
146 }
147 output->callback->cb = cb;
148 output->callback->user = user;
149
150 return (output);
151 }
152
153 nmsg_res
nmsg_output_flush(nmsg_output_t output)154 nmsg_output_flush(nmsg_output_t output) {
155 return (output->flush_fp(output));
156 }
157
158 nmsg_res
nmsg_output_write(nmsg_output_t output,nmsg_message_t msg)159 nmsg_output_write(nmsg_output_t output, nmsg_message_t msg) {
160 nmsg_res res;
161
162 res = _nmsg_message_serialize(msg);
163 if (res != nmsg_res_success)
164 return (res);
165
166 if (output->do_filter == true &&
167 (output->filter_vid != msg->np->vid ||
168 output->filter_msgtype != msg->np->msgtype))
169 {
170 return (nmsg_res_success);
171 }
172
173 res = output->write_fp(output, msg);
174 return (res);
175 }
176
177 nmsg_res
nmsg_output_close(nmsg_output_t * output)178 nmsg_output_close(nmsg_output_t *output) {
179 nmsg_res res;
180
181 res = nmsg_res_success;
182 switch ((*output)->type) {
183 case nmsg_output_type_stream:
184 res = _output_nmsg_flush(*output);
185 if ((*output)->stream->random != NULL)
186 nmsg_random_destroy(&((*output)->stream->random));
187 #ifdef HAVE_LIBZMQ
188 if ((*output)->stream->type == nmsg_stream_type_zmq)
189 zmq_close((*output)->stream->zmq);
190 #else /* HAVE_LIBZMQ */
191 assert((*output)->stream->type != nmsg_stream_type_zmq);
192 #endif /* HAVE_LIBZMQ */
193 if ((*output)->stream->type == nmsg_stream_type_file ||
194 (*output)->stream->type == nmsg_stream_type_sock)
195 {
196 if (_nmsg_global_autoclose)
197 close((*output)->stream->fd);
198 }
199 nmsg_container_destroy(&(*output)->stream->c);
200 free((*output)->stream);
201 break;
202 case nmsg_output_type_pres:
203 fclose((*output)->pres->fp);
204 free((*output)->pres->endline);
205 free((*output)->pres);
206 break;
207 case nmsg_output_type_json:
208 if (_nmsg_global_autoclose)
209 close((*output)->json->orig_fd);
210 fclose((*output)->json->fp);
211 free((*output)->json);
212 break;
213 case nmsg_output_type_callback:
214 free((*output)->callback);
215 break;
216 }
217 free(*output);
218 *output = NULL;
219 return (res);
220 }
221
222 void
nmsg_output_set_buffered(nmsg_output_t output,bool buffered)223 nmsg_output_set_buffered(nmsg_output_t output, bool buffered) {
224 if (output->type == nmsg_output_type_stream) {
225 output->stream->buffered = buffered;
226 } else if (output->type == nmsg_output_type_pres ||
227 output->type == nmsg_output_type_json) {
228 output->pres->flush = !(buffered);
229 }
230 }
231
232 void
nmsg_output_set_filter_msgtype(nmsg_output_t output,unsigned vid,unsigned msgtype)233 nmsg_output_set_filter_msgtype(nmsg_output_t output, unsigned vid, unsigned msgtype) {
234 if (vid == 0 && msgtype == 0)
235 output->do_filter = false;
236 else
237 output->do_filter = true;
238
239 output->filter_vid = vid;
240 output->filter_msgtype = msgtype;
241 }
242
243 nmsg_res
nmsg_output_set_filter_msgtype_byname(nmsg_output_t output,const char * vname,const char * mname)244 nmsg_output_set_filter_msgtype_byname(nmsg_output_t output,
245 const char *vname, const char *mname)
246 {
247 unsigned vid, msgtype;
248
249 if (vname == NULL || mname == NULL)
250 return (nmsg_res_failure);
251
252 vid = nmsg_msgmod_vname_to_vid(vname);
253 if (vid == 0)
254 return (nmsg_res_failure);
255 msgtype = nmsg_msgmod_mname_to_msgtype(vid, mname);
256 if (msgtype == 0)
257 return (nmsg_res_failure);
258
259 nmsg_output_set_filter_msgtype(output, vid, msgtype);
260
261 return (nmsg_res_success);
262 }
263
264 void
nmsg_output_set_rate(nmsg_output_t output,nmsg_rate_t rate)265 nmsg_output_set_rate(nmsg_output_t output, nmsg_rate_t rate) {
266 if (output->type != nmsg_output_type_stream)
267 return;
268
269 pthread_mutex_lock(&output->stream->lock);
270 output->stream->rate = rate;
271 pthread_mutex_unlock(&output->stream->lock);
272 }
273
274 void
nmsg_output_set_zlibout(nmsg_output_t output,bool zlibout)275 nmsg_output_set_zlibout(nmsg_output_t output, bool zlibout) {
276 if (output->type != nmsg_output_type_stream)
277 return;
278 output->stream->do_zlib = zlibout;
279 }
280
281 void
nmsg_output_set_endline(nmsg_output_t output,const char * endline)282 nmsg_output_set_endline(nmsg_output_t output, const char *endline) {
283 if (output->type == nmsg_output_type_pres) {
284 if (output->pres->endline != NULL)
285 free(output->pres->endline);
286 output->pres->endline = strdup(endline);
287 }
288 }
289
290 void
nmsg_output_set_source(nmsg_output_t output,unsigned source)291 nmsg_output_set_source(nmsg_output_t output, unsigned source) {
292 if (output->type == nmsg_output_type_stream)
293 output->stream->source = source;
294 }
295
296 void
nmsg_output_set_operator(nmsg_output_t output,unsigned operator)297 nmsg_output_set_operator(nmsg_output_t output, unsigned operator) {
298 if (output->type == nmsg_output_type_stream)
299 output->stream->operator = operator;
300 }
301
302 void
nmsg_output_set_group(nmsg_output_t output,unsigned group)303 nmsg_output_set_group(nmsg_output_t output, unsigned group) {
304 if (output->type == nmsg_output_type_stream)
305 output->stream->group = group;
306 }
307
308 void
_output_stop(nmsg_output_t output)309 _output_stop(nmsg_output_t output) {
310 output->stop = true;
311 }
312
313 /* Private functions. */
314
315 static nmsg_output_t
output_open_stream(nmsg_stream_type type,int fd,size_t bufsz)316 output_open_stream(nmsg_stream_type type, int fd, size_t bufsz) {
317 struct nmsg_output *output;
318
319 output = output_open_stream_base(type, bufsz);
320 if (output == NULL)
321 return (output);
322
323 /* fd */
324 if (type == nmsg_stream_type_file ||
325 type == nmsg_stream_type_sock)
326 {
327 output->stream->fd = fd;
328 }
329
330 return (output);
331 }
332
333 static nmsg_output_t
output_open_stream_base(nmsg_stream_type type,size_t bufsz)334 output_open_stream_base(nmsg_stream_type type, size_t bufsz) {
335 struct nmsg_output *output;
336
337 /* nmsg_output */
338 output = calloc(1, sizeof(*output));
339 if (output == NULL)
340 return (NULL);
341 output->type = nmsg_output_type_stream;
342 output->write_fp = _output_nmsg_write;
343 output->flush_fp = _output_nmsg_flush;
344
345 /* nmsg_stream_output */
346 output->stream = calloc(1, sizeof(*(output->stream)));
347 if (output->stream == NULL) {
348 free(output);
349 return (NULL);
350 }
351 pthread_mutex_init(&output->stream->lock, NULL);
352 output->stream->type = type;
353 output->stream->buffered = true;
354
355 /* seed the rng, needed for fragment and sequence IDs */
356 output->stream->random = nmsg_random_init();
357 if (output->stream->random == NULL) {
358 free(output->stream);
359 free(output);
360 return (NULL);
361 }
362
363 /* enable container sequencing */
364 if (output->stream->type == nmsg_stream_type_sock ||
365 output->stream->type == nmsg_stream_type_zmq)
366 {
367 output->stream->do_sequence = true;
368
369 /* generate sequence ID */
370 nmsg_random_buf(output->stream->random,
371 (uint8_t *) &output->stream->sequence_id,
372 sizeof(output->stream->sequence_id));
373 }
374
375 /* bufsz */
376 if (bufsz < NMSG_WBUFSZ_MIN)
377 bufsz = NMSG_WBUFSZ_MIN;
378 if (bufsz > NMSG_WBUFSZ_MAX)
379 bufsz = NMSG_WBUFSZ_MAX;
380 output->stream->bufsz = bufsz;
381
382 /* nmsg container */
383 output->stream->c = nmsg_container_init(bufsz);
384 if (output->stream->c == NULL) {
385 nmsg_random_destroy(&output->stream->random);
386 free(output->stream);
387 free(output);
388 return (NULL);
389 }
390 nmsg_container_set_sequence(output->stream->c, output->stream->do_sequence);
391
392 return (output);
393 }
394
395 static nmsg_res
output_write_callback(nmsg_output_t output,nmsg_message_t msg)396 output_write_callback(nmsg_output_t output, nmsg_message_t msg) {
397 output->callback->cb(msg, output->callback->user);
398 return (nmsg_res_success);
399 }
400