1 /*
2  * Copyright (c) 2009-2012, 2015 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 #include "transparent.h"
22 
23 /* Export. */
24 
25 struct nmsg_message *
nmsg_message_init(struct nmsg_msgmod * mod)26 nmsg_message_init(struct nmsg_msgmod *mod) {
27 	struct nmsg_message *msg;
28 	nmsg_res res;
29 
30 	/* allocate space */
31 	msg = calloc(1, sizeof(*msg));
32 	if (msg == NULL)
33 		return (NULL);
34 
35 	/* initialize ->mod */
36 	msg->mod = mod;
37 
38 	/* initialize ->message */
39 	res = _nmsg_message_init_message(msg);
40 	if (res != nmsg_res_success) {
41 		free(msg);
42 		return (NULL);
43 	}
44 
45 	/* initialize ->np */
46 	res = _nmsg_message_init_payload(msg);
47 	if (res != nmsg_res_success) {
48 		free(msg->message);
49 		free(msg);
50 		return (NULL);
51 	}
52 
53 	return (msg);
54 }
55 
56 nmsg_res
_nmsg_message_dup_protobuf(const struct nmsg_message * msg,ProtobufCMessage ** dst)57 _nmsg_message_dup_protobuf(const struct nmsg_message *msg, ProtobufCMessage **dst) {
58 	ProtobufCBufferSimple sbuf = {{0}};
59 
60 	sbuf.base.append = protobuf_c_buffer_simple_append;
61 	sbuf.len = 0;
62 	sbuf.data = malloc(1024);
63 	if (sbuf.data == NULL)
64 		return (nmsg_res_memfail);
65 	sbuf.must_free_data = 1;
66 	sbuf.alloced = 1024;
67 
68 	protobuf_c_message_pack_to_buffer(msg->message, (ProtobufCBuffer *) &sbuf);
69 	if (sbuf.data == NULL)
70 		return (nmsg_res_memfail);
71 
72 	*dst = protobuf_c_message_unpack(msg->mod->plugin->pbdescr, NULL,
73 					 sbuf.len, sbuf.data);
74 	free(sbuf.data);
75 	if (*dst == NULL)
76 		return (nmsg_res_memfail);
77 
78 	return (nmsg_res_success);
79 }
80 
81 struct nmsg_message *
_nmsg_message_dup(struct nmsg_message * msg)82 _nmsg_message_dup(struct nmsg_message *msg) {
83 	nmsg_res res;
84 	struct nmsg_message *msgdup;
85 
86 	/* allocate space */
87 	msgdup = calloc(1, sizeof(*msgdup));
88 	if (msgdup == NULL)
89 		return (NULL);
90 
91 	/* initialize ->mod */
92 	msgdup->mod = msg->mod;
93 
94 	/* initialize ->message */
95 	if (msg->message != NULL &&
96 	    msg->mod->plugin->type == nmsg_msgmod_type_transparent &&
97 	    msg->mod->plugin->pbdescr != NULL)
98 	{
99 		res = _nmsg_message_dup_protobuf(msg, &(msgdup->message));
100 		if (res != nmsg_res_success) {
101 			free(msgdup);
102 			return (NULL);
103 		}
104 	}
105 
106 	/* initialize ->np */
107 	if (msg->np != NULL) {
108 		msgdup->np = malloc(sizeof(*msg->np));
109 		if (msgdup->np == NULL) {
110 			free(msgdup->message);
111 			free(msgdup);
112 			return (NULL);
113 		}
114 		memcpy(msgdup->np, msg->np, sizeof(*msg->np));
115 
116 		if (msg->np->has_payload && msg->np->payload.data != NULL) {
117 			msgdup->np->payload.data = malloc(msg->np->payload.len);
118 			if (msgdup->np->payload.data == NULL) {
119 				free(msgdup->np);
120 				free(msgdup->message);
121 				free(msgdup);
122 				return (NULL);
123 			}
124 			memcpy(msgdup->np->payload.data, msg->np->payload.data,
125 			       msg->np->payload.len);
126 		}
127 
128 		if (msgdup->np->base.n_unknown_fields != 0) {
129 			msgdup->np->base.n_unknown_fields = 0;
130 			msgdup->np->base.unknown_fields = NULL;
131 		}
132 	}
133 
134 	/* initialize ->msg_clos */
135 	if (msg->mod != NULL && msg->mod->plugin->msg_load != NULL)
136 		msg->mod->plugin->msg_load(msgdup, &msgdup->msg_clos);
137 
138 	return (msgdup);
139 }
140 
141 struct nmsg_message *
_nmsg_message_from_payload(Nmsg__NmsgPayload * np)142 _nmsg_message_from_payload(Nmsg__NmsgPayload *np) {
143 	struct nmsg_message *msg;
144 
145 	/* allocate space */
146 	msg = calloc(1, sizeof(*msg));
147 	if (msg == NULL)
148 		return (NULL);
149 
150 	/* initialize ->mod */
151 	msg->mod = nmsg_msgmod_lookup(np->vid, np->msgtype);
152 
153 	/* initialize ->message */
154 	msg->message = NULL;
155 
156 	/* initialize ->np */
157 	msg->np = np;
158 
159 	/* initialize ->msg_clos */
160 	if (msg->mod != NULL && msg->mod->plugin->msg_load != NULL)
161 		msg->mod->plugin->msg_load(msg, &msg->msg_clos);
162 
163 	/* strip unknown fields */
164 	if (np->base.n_unknown_fields != 0) {
165 		unsigned i;
166 
167 		for (i = 0; i < np->base.n_unknown_fields; i++)
168 			free(np->base.unknown_fields[i].data);
169 
170 		free(np->base.unknown_fields);
171 		np->base.unknown_fields = NULL;
172 		np->base.n_unknown_fields = 0;
173 	}
174 
175 	return (msg);
176 }
177 
178 struct nmsg_message *
nmsg_message_from_raw_payload(unsigned vid,unsigned msgtype,uint8_t * data,size_t sz,const struct timespec * ts)179 nmsg_message_from_raw_payload(unsigned vid, unsigned msgtype,
180 			      uint8_t *data, size_t sz,
181 			      const struct timespec *ts)
182 {
183 	nmsg_message_t msg;
184 
185 	/* allocate message object */
186 	msg = calloc(1, sizeof(*msg));
187 	if (msg == NULL)
188 		return (NULL);
189 
190 	/* allocate the NmsgPayload */
191 	msg->np = calloc(1, sizeof(*(msg->np)));
192 	if (msg->np == NULL) {
193 		free(msg);
194 		return (NULL);
195 	}
196 
197 	/* initialize ->np */
198 	nmsg__nmsg_payload__init(msg->np);
199 	msg->np->base.n_unknown_fields = 0;
200 	msg->np->base.unknown_fields = NULL;
201 	msg->np->vid = vid;
202 	msg->np->msgtype = msgtype;
203 	msg->np->has_payload = true;
204 	msg->np->payload.data = data;
205 	msg->np->payload.len = sz;
206 	nmsg_message_set_time(msg, ts);
207 
208 	/* initialize ->mod */
209 	msg->mod = nmsg_msgmod_lookup(vid, msgtype);
210 
211 	/* initialize ->msg_clos */
212 	if (msg->mod != NULL && msg->mod->plugin->msg_load != NULL)
213 		msg->mod->plugin->msg_load(msg, &msg->msg_clos);
214 
215 	return (msg);
216 }
217 
218 #ifdef HAVE_YAJL
219 nmsg_res
nmsg_message_from_json(const char * json,nmsg_message_t * msg)220 nmsg_message_from_json(const char *json, nmsg_message_t *msg) {
221 	nmsg_res res;
222 	yajl_val node;
223 
224 	yajl_val vname_v;
225 	const char *vname_path[] = { "vname", (const char *) 0 };
226 	yajl_val mname_v;
227 	const char *mname_path[] = { "mname", (const char *) 0 };
228 	struct nmsg_msgmod *mod;
229 
230 	yajl_val source_v;
231 	const char *source_path[] = { "source", (const char *) 0 };
232 
233 	yajl_val operator_v;
234 	const char *operator_path[] = { "operator", (const char *) 0 };
235 
236 	yajl_val group_v;
237 	const char *group_path[] = { "group", (const char *) 0 };
238 
239 	yajl_val time_v;
240 	const char *time_path[] = { "time", (const char *) 0 };
241 	struct timespec ts;
242 
243 	yajl_val message_v;
244 	const char *message_path[] = { "message", (const char *) 0 };
245 
246 	*msg = NULL;
247 
248 	node = yajl_tree_parse(json, 0, 0);
249 
250 	if (node == NULL)
251 		return (nmsg_res_parse_error);
252 
253 	vname_v = yajl_tree_get(node, vname_path, yajl_t_string);
254 	mname_v = yajl_tree_get(node, mname_path, yajl_t_string);
255 	if (vname_v == NULL || mname_v == NULL) {
256 		res = (nmsg_res_parse_error);
257 		goto err;
258 	} else {
259 		char *vname, *mname;
260 
261 		vname = YAJL_GET_STRING(vname_v);
262 		mname = YAJL_GET_STRING(mname_v);
263 
264 		mod = nmsg_msgmod_lookup_byname(vname, mname);
265 		if (mod == NULL) {
266 			res = (nmsg_res_parse_error);
267 			goto err;
268 		}
269 	}
270 
271 	*msg = nmsg_message_init(mod);
272 	if (*msg == NULL) {
273 		res = (nmsg_res_failure);
274 		goto err;
275 	}
276 
277 	source_v = yajl_tree_get(node, source_path, yajl_t_any);
278 	if (source_v) {
279 		uint32_t source;
280 
281 		if (YAJL_IS_STRING(source_v)) {
282 			sscanf(YAJL_GET_STRING(source_v), "%x", &source);
283 		} else if (YAJL_IS_INTEGER(source_v)) {
284 			source = YAJL_GET_INTEGER(source_v);
285 		} else {
286 			res = (nmsg_res_parse_error);
287 			goto err;
288 		}
289 		nmsg_message_set_source(*msg, source);
290 	}
291 
292 	operator_v = yajl_tree_get(node, operator_path, yajl_t_any);
293 	if (operator_v) {
294 		uint32_t operator;
295 
296 		if (YAJL_IS_STRING(operator_v)) {
297 			operator = nmsg_alias_by_value(nmsg_alias_operator, YAJL_GET_STRING(operator_v));
298 		} else if (YAJL_IS_INTEGER(operator_v)) {
299 			operator = YAJL_GET_INTEGER(operator_v);
300 		} else {
301 			res = (nmsg_res_parse_error);
302 			goto err;
303 		}
304 		nmsg_message_set_operator(*msg, operator);
305 	}
306 
307 	group_v = yajl_tree_get(node, group_path, yajl_t_any);
308 	if (group_v) {
309 		uint32_t group;
310 
311 		if (YAJL_IS_STRING(group_v)) {
312 			group = nmsg_alias_by_value(nmsg_alias_group, YAJL_GET_STRING(group_v));
313 		} else if (YAJL_IS_INTEGER(group_v)) {
314 			group = YAJL_GET_INTEGER(group_v);
315 		} else {
316 			res = (nmsg_res_parse_error);
317 			goto err;
318 		}
319 		nmsg_message_set_group(*msg, group);
320 	}
321 
322 	time_v = yajl_tree_get(node, time_path, yajl_t_any);
323 	if (time_v) {
324 		if (YAJL_IS_STRING(time_v)) {
325 			struct tm tm;
326 			char * remainder;
327 
328 			remainder = strptime(YAJL_GET_STRING(time_v), "%Y-%m-%d %T", &tm);
329 			if (remainder == NULL) {
330 				res = (nmsg_res_parse_error);
331 				goto err;
332 			}
333 
334 			ts.tv_sec = timegm(&tm);
335 
336 			if (sscanf(remainder, ".%ld", &ts.tv_nsec) == 0) {
337 				ts.tv_nsec = 0;
338 			}
339 		} else if (YAJL_IS_INTEGER(time_v)) {
340 			ts.tv_sec = YAJL_GET_INTEGER(time_v);
341 			ts.tv_nsec = 0;
342 		} else if (YAJL_IS_DOUBLE(time_v)) {
343 			nmsg_timespec_from_double(YAJL_GET_DOUBLE(time_v), &ts);
344 		} else {
345 			res = (nmsg_res_parse_error);
346 			goto err;
347 		}
348 	} else {
349 		nmsg_timespec_get(&ts);
350 	}
351 	nmsg_message_set_time(*msg, &ts);
352 
353 	switch (mod->plugin->type) {
354 	case nmsg_msgmod_type_transparent:
355 		message_v = yajl_tree_get(node, message_path, yajl_t_object);
356 		if (message_v) {
357 			res = (_nmsg_msgmod_json_to_message(message_v, *msg));
358 			if (res != nmsg_res_success) {
359 				goto err;
360 			}
361 		} else {
362 			res = (nmsg_res_parse_error);
363 			goto err;
364 		}
365 		break;
366 	default:
367 		res = (nmsg_res_notimpl);
368 		goto err;
369 	}
370 
371 	yajl_tree_free(node);
372 
373 	return (nmsg_res_success);
374 err:
375 	if (*msg != NULL) {
376 		nmsg_message_destroy(msg);
377 	}
378 
379 	yajl_tree_free(node);
380 	return (res);
381 }
382 #else /* HAVE_YAJL */
383 nmsg_res
nmsg_message_from_json(const char * json,nmsg_message_t * msg)384 nmsg_message_from_json(__attribute__((unused)) const char *json,
385                        __attribute__((unused)) nmsg_message_t *msg) {
386 	return (nmsg_res_notimpl);
387 }
388 #endif /* HAVE_YAJL */
389 
390 nmsg_res
_nmsg_message_init_message(struct nmsg_message * msg)391 _nmsg_message_init_message(struct nmsg_message *msg) {
392 	if (msg->mod->plugin->type == nmsg_msgmod_type_transparent &&
393 	    msg->mod->plugin->pbdescr != NULL)
394 	{
395 		msg->message = calloc(1, msg->mod->plugin->pbdescr->sizeof_message);
396 		if (msg->message == NULL)
397 			return (nmsg_res_memfail);
398 		msg->message->descriptor = msg->mod->plugin->pbdescr;
399 	} else {
400 		msg->message = NULL;
401 	}
402 	return (nmsg_res_success);
403 }
404 
405 nmsg_res
_nmsg_message_init_payload(struct nmsg_message * msg)406 _nmsg_message_init_payload(struct nmsg_message *msg) {
407 	struct timespec ts;
408 
409 	msg->np = malloc(sizeof(*msg->np));
410 	if (msg->np == NULL)
411 		return (nmsg_res_memfail);
412 	nmsg__nmsg_payload__init(msg->np);
413 	msg->np->vid = msg->mod->plugin->vendor.id;
414 	msg->np->msgtype = msg->mod->plugin->msgtype.id;
415 	nmsg_timespec_get(&ts);
416 	msg->np->time_sec = ts.tv_sec;
417 	msg->np->time_nsec = ts.tv_nsec;
418 
419 	return (nmsg_res_success);
420 }
421 
422 void
nmsg_message_destroy(struct nmsg_message ** msg)423 nmsg_message_destroy(struct nmsg_message **msg) {
424 	if ((*msg)->mod != NULL && (*msg)->mod->plugin->msg_fini != NULL)
425 		(*msg)->mod->plugin->msg_fini(*msg, (*msg)->msg_clos);
426 
427 	if ((*msg)->message != NULL) {
428 		protobuf_c_message_free_unpacked((*msg)->message, NULL);
429 		(*msg)->message = NULL;
430 	}
431 	if ((*msg)->np != NULL)
432 		_nmsg_payload_free(&(*msg)->np);
433 
434 	nmsg_message_free_allocations(*msg);
435 
436 	free(*msg);
437 	*msg = NULL;
438 }
439 
440 nmsg_res
_nmsg_message_deserialize(struct nmsg_message * msg)441 _nmsg_message_deserialize(struct nmsg_message *msg) {
442 	if (msg->message != NULL)
443 		return (nmsg_res_success);
444 
445 	if (msg->np != NULL) {
446 		if (msg->mod == NULL || msg->np->has_payload == 0)
447 			return (nmsg_res_failure);
448 		msg->message = protobuf_c_message_unpack(msg->mod->plugin->pbdescr, NULL,
449 							 msg->np->payload.len,
450 							 msg->np->payload.data);
451 		if (msg->message == NULL)
452 			return (nmsg_res_memfail);
453 		return (nmsg_res_success);
454 	}
455 	return (nmsg_res_failure);
456 }
457 
458 nmsg_res
_nmsg_message_serialize(struct nmsg_message * msg)459 _nmsg_message_serialize(struct nmsg_message *msg) {
460 	ProtobufCBufferSimple sbuf = {{0}};
461 	nmsg_res res;
462 	size_t sz;
463 
464 	if (msg->message != NULL &&
465 	    (msg->updated || msg->np == NULL))
466 	{
467 		if (msg->np == NULL) {
468 			res = _nmsg_message_init_payload(msg);
469 			if (res != nmsg_res_success)
470 				return (res);
471 		}
472 
473 		sbuf.base.append = protobuf_c_buffer_simple_append;
474 		sbuf.len = 0;
475 		sbuf.data = malloc(1024);
476 		if (sbuf.data == NULL)
477 			return (nmsg_res_memfail);
478 		sbuf.must_free_data = 1;
479 		sbuf.alloced = 1024;
480 
481 		sz = protobuf_c_message_pack_to_buffer((ProtobufCMessage *) msg->message,
482 						       (ProtobufCBuffer *) &sbuf);
483 		if (msg->np->payload.data != NULL)
484 			free(msg->np->payload.data);
485 
486 		msg->np->has_payload = true;
487 		msg->np->payload.data = sbuf.data;
488 		msg->np->payload.len = sz;
489 
490 		msg->updated = false;
491 	}
492 
493 	return (nmsg_res_success);
494 }
495 
496 nmsg_res
nmsg_message_to_pres(struct nmsg_message * msg,char ** pres,const char * endline)497 nmsg_message_to_pres(struct nmsg_message *msg, char **pres, const char *endline) {
498 	if (msg->mod == NULL)
499 		return (nmsg_res_failure);
500 	switch (msg->mod->plugin->type) {
501 	case nmsg_msgmod_type_transparent:
502 		return (_nmsg_message_payload_to_pres(msg, pres, endline));
503 	case nmsg_msgmod_type_opaque:
504 		if (msg->mod->plugin->payload_to_pres != NULL)
505 			return (msg->mod->plugin->payload_to_pres(msg->np, pres, endline));
506 	default:
507 		return (nmsg_res_notimpl);
508 	}
509 }
510 
511 nmsg_res
nmsg_message_to_json(nmsg_message_t msg,char ** json)512 nmsg_message_to_json(nmsg_message_t msg, char **json) {
513 	if (msg->mod == NULL)
514 		return (nmsg_res_failure);
515 	switch (msg->mod->plugin->type) {
516 	case nmsg_msgmod_type_transparent:
517 		return (_nmsg_message_payload_to_json(msg, json));
518 	case nmsg_msgmod_type_opaque:
519 		return (nmsg_res_notimpl);
520 	default:
521 		return (nmsg_res_notimpl);
522 	}
523 }
524 
525 nmsg_res
nmsg_message_add_allocation(struct nmsg_message * msg,void * ptr)526 nmsg_message_add_allocation(struct nmsg_message *msg, void *ptr) {
527 	void *tmp;
528 
529 	msg->n_allocs += 1;
530 	tmp = msg->allocs;
531 	msg->allocs = realloc(msg->allocs, sizeof(ptr) * msg->n_allocs);
532 	if (msg->allocs == NULL) {
533 		msg->allocs = tmp;
534 		msg->n_allocs -= 1;
535 		return (nmsg_res_memfail);
536 	}
537 	msg->allocs[msg->n_allocs - 1] = ptr;
538 
539 	return (nmsg_res_success);
540 }
541 
542 void
nmsg_message_free_allocations(struct nmsg_message * msg)543 nmsg_message_free_allocations(struct nmsg_message *msg) {
544 	size_t n;
545 
546 	for (n = 0; n < msg->n_allocs; n++)
547 		free(msg->allocs[n]);
548 	free(msg->allocs);
549 	msg->allocs = NULL;
550 	msg->n_allocs = 0;
551 }
552 
553 nmsg_msgmod_t
nmsg_message_get_msgmod(nmsg_message_t msg)554 nmsg_message_get_msgmod(nmsg_message_t msg) {
555 	return (msg->mod);
556 }
557 
558 int32_t
nmsg_message_get_vid(nmsg_message_t msg)559 nmsg_message_get_vid(nmsg_message_t msg) {
560 	return (msg->np->vid);
561 }
562 
563 int32_t
nmsg_message_get_msgtype(nmsg_message_t msg)564 nmsg_message_get_msgtype(nmsg_message_t msg) {
565 	return (msg->np->msgtype);
566 }
567 
568 void *
nmsg_message_get_payload(nmsg_message_t msg)569 nmsg_message_get_payload(nmsg_message_t msg) {
570 	nmsg_res res;
571 
572 	res = _nmsg_message_deserialize(msg);
573 	assert(res == nmsg_res_success && msg->message != NULL);
574 	return ((void *) msg->message);
575 }
576 
577 size_t
nmsg_message_get_payload_size(nmsg_message_t msg)578 nmsg_message_get_payload_size(nmsg_message_t msg) {
579 
580 	assert(msg->np != NULL);
581 	return (_nmsg_payload_size(msg->np));
582 }
583 
584 void
nmsg_message_update(nmsg_message_t msg)585 nmsg_message_update(nmsg_message_t msg) {
586 	msg->updated = true;
587 }
588 
589 void
nmsg_message_compact_payload(nmsg_message_t msg)590 nmsg_message_compact_payload(nmsg_message_t msg) {
591 	if (msg->message != NULL) {
592 		protobuf_c_message_free_unpacked(msg->message, NULL);
593 		msg->message = NULL;
594 	}
595 }
596 
597 void
nmsg_message_get_time(nmsg_message_t msg,struct timespec * ts)598 nmsg_message_get_time(nmsg_message_t msg, struct timespec *ts) {
599 	ts->tv_sec = msg->np->time_sec;
600 	ts->tv_nsec = msg->np->time_nsec;
601 }
602 
603 void
nmsg_message_set_time(nmsg_message_t msg,const struct timespec * ts)604 nmsg_message_set_time(nmsg_message_t msg, const struct timespec *ts) {
605 	if (ts == NULL) {
606 		struct timespec now;
607 		nmsg_timespec_get(&now);
608 		nmsg_message_set_time(msg, &now);
609 	} else {
610 		msg->np->time_sec = ts->tv_sec;
611 		msg->np->time_nsec = ts->tv_nsec;
612 	}
613 }
614 
615 uint32_t
nmsg_message_get_source(nmsg_message_t msg)616 nmsg_message_get_source(nmsg_message_t msg) {
617 	if (msg->np->has_source)
618 		return (msg->np->source);
619 	return (0);
620 }
621 
622 uint32_t
nmsg_message_get_operator(nmsg_message_t msg)623 nmsg_message_get_operator(nmsg_message_t msg) {
624 	if (msg->np->has_operator_)
625 		return (msg->np->operator_);
626 	return (0);
627 }
628 
629 uint32_t
nmsg_message_get_group(nmsg_message_t msg)630 nmsg_message_get_group(nmsg_message_t msg) {
631 	if (msg->np->has_group)
632 		return (msg->np->group);
633 	return (0);
634 }
635 
636 void
nmsg_message_set_source(nmsg_message_t msg,uint32_t source)637 nmsg_message_set_source(nmsg_message_t msg, uint32_t source) {
638 	if (source == 0) {
639 		msg->np->has_source = 0;
640 	} else {
641 		msg->np->has_source = 1;
642 		msg->np->source = source;
643 	}
644 }
645 
646 void
nmsg_message_set_operator(nmsg_message_t msg,uint32_t operator)647 nmsg_message_set_operator(nmsg_message_t msg, uint32_t operator) {
648 	if (operator == 0) {
649 		msg->np->has_operator_ = 0;
650 	} else {
651 		msg->np->has_operator_ = 1;
652 		msg->np->operator_ = operator;
653 	}
654 }
655 
656 void
nmsg_message_set_group(nmsg_message_t msg,uint32_t group)657 nmsg_message_set_group(nmsg_message_t msg, uint32_t group) {
658 	if (group == 0) {
659 		msg->np->has_group = 0;
660 	} else {
661 		msg->np->has_group = 1;
662 		msg->np->group = group;
663 	}
664 }
665