1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logmsg/logmsg.h"
26 #include "str-utils.h"
27 #include "str-repr/encode.h"
28 #include "messages.h"
29 #include "logpipe.h"
30 #include "timeutils/cache.h"
31 #include "timeutils/misc.h"
32 #include "logmsg/nvtable.h"
33 #include "stats/stats-registry.h"
34 #include "stats/stats-cluster-single.h"
35 #include "template/templates.h"
36 #include "tls-support.h"
37 #include "compat/string.h"
38 #include "rcptid.h"
39 #include "template/macros.h"
40 #include "host-id.h"
41 #include "ack-tracker/ack_tracker.h"
42 #include "apphook.h"
43
44 #include <glib/gprintf.h>
45 #include <sys/types.h>
46 #include <time.h>
47 #include <unistd.h>
48 #include <string.h>
49 #include <stdlib.h>
50 #include <ctype.h>
51 #include <syslog.h>
52
53 /*
54 * Reference/ACK counting for LogMessage structures
55 *
56 * Each LogMessage structure is allocated when received by a LogSource
57 * instance, and then freed once all destinations finish with it. Since a
58 * LogMessage is processed by different threads, reference counting must be
59 * atomic.
60 *
61 * A similar counter is used to track when a given message is considered to
62 * be delivered. In case flow-control is in use, the number of
63 * to-be-expected ACKs are counted in an atomic variable.
64 *
65 * Code is written in a way, that it is explicitly mentioned whether a
66 * function expects a reference (called "consuming" the ref), or whether it
67 * gets a 'borrowed' reference (more common). Also, a function that returns
68 * a LogMessage instance generally returns a reference too.
69 *
70 * Because of these rules, quite a number of refs/unrefs are being made
71 * during the processing of a message, even though they wouldn't cause the
72 * structure to be freed. Considering that ref/unref operations are
73 * expensive atomic operations, these have a considerable overhead.
74 *
75 * The solution we employ in this module, is that we try to exploit the
76 * fact, that each thread is usually working on a single LogMessage
77 * instance, and once it is done with it, it fetches the next one, and so
78 * on. In the LogReader/LogWriter cases, this usage pattern is quite
79 * normal.
80 *
81 * Also, a quite similar usage pattern can be applied to the ACK counter
82 * (the one which tracks how much flow-controlled destinations need to
83 * confirm the deliver of the message).
84 *
85 * The solution implemented here is outlined below (and the same rules are
86 * used for ACKs and REFs):
87 *
88 * - ACKs and REFs are put into the same 32 bit integer value, one half
89 * is used for the ref counter, the other is used for the ACK counter
90 * (see the macros LOGMSG_REFCACHE_ below)
91 *
92 * - The processing of a given message in a given thread is enclosed by
93 * calls to log_msg_refcache_start() / log_msg_refcache_stop()
94 *
95 * - The calls to these functions is optional, if they are not called,
96 * then normal atomic reference counting is performed
97 *
98 * - When refcache_start() is called, the atomic reference count is not
99 * changed by log_msg_ref()/unref(). Rather, a per-thread variable is
100 * used to count the _difference_ to the "would-be" value of the ref counter.
101 *
102 * - When refcache_stop() is called, the atomic reference counter is
103 * updated as a single atomic operation. This means, that if a thread
104 * calls refcache_start() early enough, then the ref/unref operations
105 * performed by the given thread can be completely atomic-op free.
106 *
107 * - There's one catch: if the producer thread (e.g. LogReader), doesn't
108 * add references to a consumer thread (e.g. LogWriter), and the
109 * consumer doesn't use the refcache infrastructure, then the counters
110 * could become negative (simply because the reader's real number of
111 * references is not represented in the refcounter). This is solved by
112 * adding a large-enough number (so called BIAS) to the ref counter in
113 * refcache_start(), which ensures that all possible writers will see a
114 * positive value. This is then subtracted in refcache_stop() the
115 * same way as the other references.
116 *
117 * Since we use the same atomic variable to store two things, updating that
118 * counter becomes somewhat more complicated, therefore a g_atomic_int_add()
119 * doesn't suffice. We're using a CAS loop (compare-and-exchange) to do our
120 * stuff, but that shouldn't have that much of an overhead.
121 */
122
123 TLS_BLOCK_START
124 {
125 /* message that is being processed by the current thread. Its ack/ref changes are cached */
126 LogMessage *logmsg_current;
127 /* whether the consumer is flow-controlled, (the producer always is) */
128 gboolean logmsg_cached_ack_needed;
129
130 /* has to be signed as these can become negative */
131 /* number of cached refs by the current thread */
132 gint logmsg_cached_refs;
133 /* number of cached acks by the current thread */
134 gint logmsg_cached_acks;
135 /* abort flag in the current thread for acks */
136 gboolean logmsg_cached_abort;
137 /* suspend flag in the current thread for acks */
138 gboolean logmsg_cached_suspend;
139 }
140 TLS_BLOCK_END;
141
142 #define logmsg_current __tls_deref(logmsg_current)
143 #define logmsg_cached_refs __tls_deref(logmsg_cached_refs)
144 #define logmsg_cached_acks __tls_deref(logmsg_cached_acks)
145 #define logmsg_cached_ack_needed __tls_deref(logmsg_cached_ack_needed)
146 #define logmsg_cached_abort __tls_deref(logmsg_cached_abort)
147 #define logmsg_cached_suspend __tls_deref(logmsg_cached_suspend)
148
149 #define LOGMSG_REFCACHE_SUSPEND_SHIFT 31 /* number of bits to shift to get the SUSPEND flag */
150 #define LOGMSG_REFCACHE_SUSPEND_MASK 0x80000000 /* bit mask to extract the SUSPEND flag */
151 #define LOGMSG_REFCACHE_ABORT_SHIFT 30 /* number of bits to shift to get the ABORT flag */
152 #define LOGMSG_REFCACHE_ABORT_MASK 0x40000000 /* bit mask to extract the ABORT flag */
153 #define LOGMSG_REFCACHE_ACK_SHIFT 15 /* number of bits to shift to get the ACK counter */
154 #define LOGMSG_REFCACHE_ACK_MASK 0x3FFF8000 /* bit mask to extract the ACK counter */
155 #define LOGMSG_REFCACHE_REF_SHIFT 0 /* number of bits to shift to get the REF counter */
156 #define LOGMSG_REFCACHE_REF_MASK 0x00007FFF /* bit mask to extract the ACK counter */
157 #define LOGMSG_REFCACHE_BIAS 0x00002000 /* the BIAS we add to the ref counter in refcache_start */
158
159 #define LOGMSG_REFCACHE_REF_TO_VALUE(x) (((x) << LOGMSG_REFCACHE_REF_SHIFT) & LOGMSG_REFCACHE_REF_MASK)
160 #define LOGMSG_REFCACHE_ACK_TO_VALUE(x) (((x) << LOGMSG_REFCACHE_ACK_SHIFT) & LOGMSG_REFCACHE_ACK_MASK)
161 #define LOGMSG_REFCACHE_ABORT_TO_VALUE(x) (((x) << LOGMSG_REFCACHE_ABORT_SHIFT) & LOGMSG_REFCACHE_ABORT_MASK)
162 #define LOGMSG_REFCACHE_SUSPEND_TO_VALUE(x) (((x) << LOGMSG_REFCACHE_SUSPEND_SHIFT) & LOGMSG_REFCACHE_SUSPEND_MASK)
163
164 #define LOGMSG_REFCACHE_VALUE_TO_REF(x) (((x) & LOGMSG_REFCACHE_REF_MASK) >> LOGMSG_REFCACHE_REF_SHIFT)
165 #define LOGMSG_REFCACHE_VALUE_TO_ACK(x) (((x) & LOGMSG_REFCACHE_ACK_MASK) >> LOGMSG_REFCACHE_ACK_SHIFT)
166 #define LOGMSG_REFCACHE_VALUE_TO_ABORT(x) (((x) & LOGMSG_REFCACHE_ABORT_MASK) >> LOGMSG_REFCACHE_ABORT_SHIFT)
167 #define LOGMSG_REFCACHE_VALUE_TO_SUSPEND(x) (((x) & LOGMSG_REFCACHE_SUSPEND_MASK) >> LOGMSG_REFCACHE_SUSPEND_SHIFT)
168
169 /**********************************************************************
170 * LogMessage
171 **********************************************************************/
172
173 static inline gboolean
log_msg_chk_flag(const LogMessage * self,gint32 flag)174 log_msg_chk_flag(const LogMessage *self, gint32 flag)
175 {
176 return self->flags & flag;
177 }
178
179 static inline void
log_msg_set_flag(LogMessage * self,gint32 flag)180 log_msg_set_flag(LogMessage *self, gint32 flag)
181 {
182 self->flags |= flag;
183 }
184
185 static inline void
log_msg_set_host_id(LogMessage * msg)186 log_msg_set_host_id(LogMessage *msg)
187 {
188 msg->host_id = host_id_get();
189 }
190
191 /* the index matches the value id */
192 const gchar *builtin_value_names[] =
193 {
194 "HOST",
195 "HOST_FROM",
196 "MESSAGE",
197 "PROGRAM",
198 "PID",
199 "MSGID",
200 "SOURCE",
201 "LEGACY_MSGHDR",
202 "__RESERVED_LM_V_MAX",
203 NULL,
204 };
205
206 static void
__free_macro_value(void * val)207 __free_macro_value(void *val)
208 {
209 g_string_free((GString *) val, TRUE);
210 }
211
212 static NVHandle match_handles[256];
213 NVRegistry *logmsg_registry;
214 const char logmsg_sd_prefix[] = ".SDATA.";
215 const gint logmsg_sd_prefix_len = sizeof(logmsg_sd_prefix) - 1;
216 gint logmsg_queue_node_max = 1;
217 /* statistics */
218 static StatsCounterItem *count_msg_clones;
219 static StatsCounterItem *count_payload_reallocs;
220 static StatsCounterItem *count_sdata_updates;
221 static StatsCounterItem *count_allocated_bytes;
222 static GPrivate priv_macro_value = G_PRIVATE_INIT(__free_macro_value);
223
224 void
log_msg_write_protect(LogMessage * self)225 log_msg_write_protect(LogMessage *self)
226 {
227 self->protected = TRUE;
228 }
229
230 LogMessage *
log_msg_make_writable(LogMessage ** pself,const LogPathOptions * path_options)231 log_msg_make_writable(LogMessage **pself, const LogPathOptions *path_options)
232 {
233 if (log_msg_is_write_protected(*pself))
234 {
235 LogMessage *new;
236
237 new = log_msg_clone_cow(*pself, path_options);
238 log_msg_unref(*pself);
239 *pself = new;
240 }
241 return *pself;
242 }
243
244
245 static void
log_msg_update_sdata_slow(LogMessage * self,NVHandle handle,const gchar * name,gssize name_len)246 log_msg_update_sdata_slow(LogMessage *self, NVHandle handle, const gchar *name, gssize name_len)
247 {
248 guint16 alloc_sdata;
249 guint16 prefix_and_block_len;
250 gint i;
251 const gchar *dot;
252
253 /* this was a structured data element, insert a ref to the sdata array */
254
255 stats_counter_inc(count_sdata_updates);
256 if (self->num_sdata == 255)
257 {
258 msg_error("syslog-ng only supports 255 SD elements right now, just drop an email to the mailing list that it was not enough with your use-case so we can increase it");
259 return;
260 }
261
262 if (self->alloc_sdata <= self->num_sdata)
263 {
264 alloc_sdata = MAX(self->num_sdata + 1, STRICT_ROUND_TO_NEXT_EIGHT(self->num_sdata));
265 if (alloc_sdata > 255)
266 alloc_sdata = 255;
267 }
268 else
269 alloc_sdata = self->alloc_sdata;
270
271 if (log_msg_chk_flag(self, LF_STATE_OWN_SDATA) && self->sdata)
272 {
273 if (self->alloc_sdata < alloc_sdata)
274 {
275 self->sdata = g_realloc(self->sdata, alloc_sdata * sizeof(self->sdata[0]));
276 memset(&self->sdata[self->alloc_sdata], 0, (alloc_sdata - self->alloc_sdata) * sizeof(self->sdata[0]));
277 }
278 }
279 else
280 {
281 NVHandle *sdata;
282
283 sdata = g_malloc(alloc_sdata * sizeof(self->sdata[0]));
284 if (self->num_sdata)
285 memcpy(sdata, self->sdata, self->num_sdata * sizeof(self->sdata[0]));
286 memset(&sdata[self->num_sdata], 0, sizeof(self->sdata[0]) * (self->alloc_sdata - self->num_sdata));
287 self->sdata = sdata;
288 log_msg_set_flag(self, LF_STATE_OWN_SDATA);
289 }
290 guint16 old_alloc_sdata = self->alloc_sdata;
291 self->alloc_sdata = alloc_sdata;
292 if (self->sdata)
293 {
294 self->allocated_bytes += ((self->alloc_sdata - old_alloc_sdata) * sizeof(self->sdata[0]));
295 stats_counter_add(count_allocated_bytes, (self->alloc_sdata - old_alloc_sdata) * sizeof(self->sdata[0]));
296 }
297 /* ok, we have our own SDATA array now which has at least one free slot */
298
299 if (!self->initial_parse)
300 {
301 dot = memrchr(name, '.', name_len);
302 prefix_and_block_len = dot - name;
303
304 for (i = self->num_sdata - 1; i >= 0; i--)
305 {
306 gssize sdata_name_len;
307 const gchar *sdata_name;
308
309 sdata_name_len = 0;
310 sdata_name = log_msg_get_value_name(self->sdata[i], &sdata_name_len);
311 if (sdata_name_len > prefix_and_block_len &&
312 strncmp(sdata_name, name, prefix_and_block_len) == 0)
313 {
314 /* ok we have found the last SDATA entry that has the same block */
315 break;
316 }
317 }
318 i++;
319 }
320 else
321 i = -1;
322
323 if (i >= 0)
324 {
325 memmove(&self->sdata[i+1], &self->sdata[i], (self->num_sdata - i) * sizeof(self->sdata[0]));
326 self->sdata[i] = handle;
327 }
328 else
329 {
330 self->sdata[self->num_sdata] = handle;
331 }
332 self->num_sdata++;
333 }
334
335 static inline void
log_msg_update_sdata(LogMessage * self,NVHandle handle,const gchar * name,gssize name_len)336 log_msg_update_sdata(LogMessage *self, NVHandle handle, const gchar *name, gssize name_len)
337 {
338 guint8 flags;
339
340 flags = nv_registry_get_handle_flags(logmsg_registry, handle);
341 if (G_UNLIKELY(flags & LM_VF_SDATA))
342 log_msg_update_sdata_slow(self, handle, name, name_len);
343 }
344
345 NVHandle
log_msg_get_value_handle(const gchar * value_name)346 log_msg_get_value_handle(const gchar *value_name)
347 {
348 NVHandle handle;
349
350 handle = nv_registry_alloc_handle(logmsg_registry, value_name);
351
352 /* check if name starts with sd_prefix and has at least one additional character */
353 if (strncmp(value_name, logmsg_sd_prefix, logmsg_sd_prefix_len) == 0 && value_name[6])
354 {
355 nv_registry_set_handle_flags(logmsg_registry, handle, LM_VF_SDATA);
356 }
357
358 return handle;
359 }
360
361 gboolean
log_msg_is_value_name_valid(const gchar * value)362 log_msg_is_value_name_valid(const gchar *value)
363 {
364 if (strncmp(value, logmsg_sd_prefix, logmsg_sd_prefix_len) == 0)
365 {
366 const gchar *dot;
367 int dot_found = 0;
368
369 dot = strchr(value, '.');
370 while (dot && strlen(dot) > 1)
371 {
372 dot_found++;
373 dot++;
374 dot = strchr(dot, '.');
375 }
376 return (dot_found >= 3);
377 }
378 else
379 return TRUE;
380 }
381
382 const gchar *
log_msg_get_macro_value(const LogMessage * self,gint id,gssize * value_len)383 log_msg_get_macro_value(const LogMessage *self, gint id, gssize *value_len)
384 {
385 GString *value;
386
387 value = g_private_get(&priv_macro_value);
388 if (!value)
389 {
390 value = g_string_sized_new(256);
391 g_private_replace(&priv_macro_value, value);
392 }
393 g_string_truncate(value, 0);
394
395 log_macro_expand_simple(value, id, self);
396 if (value_len)
397 *value_len = value->len;
398 return value->str;
399 }
400
401 gboolean
log_msg_is_handle_macro(NVHandle handle)402 log_msg_is_handle_macro(NVHandle handle)
403 {
404 guint16 flags;
405
406 flags = nv_registry_get_handle_flags(logmsg_registry, handle);
407 return !!(flags & LM_VF_MACRO);
408 }
409
410 gboolean
log_msg_is_handle_sdata(NVHandle handle)411 log_msg_is_handle_sdata(NVHandle handle)
412 {
413 guint16 flags;
414
415 flags = nv_registry_get_handle_flags(logmsg_registry, handle);
416 return !!(flags & LM_VF_SDATA);
417 }
418
419 gboolean
log_msg_is_handle_match(NVHandle handle)420 log_msg_is_handle_match(NVHandle handle)
421 {
422 g_assert(match_handles[0] && match_handles[255] && match_handles[0] < match_handles[255]);
423
424 /* NOTE: match_handles are allocated sequentially in log_msg_registry_init(),
425 * so this simple & fast check is enough */
426 return (match_handles[0] <= handle && handle <= match_handles[255]);
427 }
428
429 static void
log_msg_init_queue_node(LogMessage * msg,LogMessageQueueNode * node,const LogPathOptions * path_options)430 log_msg_init_queue_node(LogMessage *msg, LogMessageQueueNode *node, const LogPathOptions *path_options)
431 {
432 INIT_IV_LIST_HEAD(&node->list);
433 node->ack_needed = path_options->ack_needed;
434 node->flow_control_requested = path_options->flow_control_requested;
435 node->msg = log_msg_ref(msg);
436 log_msg_write_protect(msg);
437 }
438
439 /*
440 * Allocates a new LogMessageQueueNode instance to be enqueued in a
441 * LogQueue.
442 *
443 * NOTE: Assumed to be running in the source thread, and that the same
444 * LogMessage instance is only put into queue from the same thread (e.g.
445 * the related fields are _NOT_ locked).
446 */
447 LogMessageQueueNode *
log_msg_alloc_queue_node(LogMessage * msg,const LogPathOptions * path_options)448 log_msg_alloc_queue_node(LogMessage *msg, const LogPathOptions *path_options)
449 {
450 LogMessageQueueNode *node;
451
452 if (msg->cur_node < msg->num_nodes)
453 {
454 node = &msg->nodes[msg->cur_node++];
455 node->embedded = TRUE;
456 }
457 else
458 {
459 gint nodes = (volatile gint) logmsg_queue_node_max;
460
461 /* this is a racy update, but it doesn't really hurt if we lose an
462 * update or if we continue with a smaller value in parallel threads
463 * for some time yet, since the smaller number only means that we
464 * pre-allocate somewhat less LogMsgQueueNodes in the message
465 * structure, but will be fine regardless (if we'd overflow the
466 * pre-allocated space, we start allocating nodes dynamically from
467 * heap.
468 */
469 if (nodes < 32 && nodes <= msg->num_nodes)
470 logmsg_queue_node_max = msg->num_nodes + 1;
471 node = g_slice_new(LogMessageQueueNode);
472 node->embedded = FALSE;
473 }
474 log_msg_init_queue_node(msg, node, path_options);
475 return node;
476 }
477
478 LogMessageQueueNode *
log_msg_alloc_dynamic_queue_node(LogMessage * msg,const LogPathOptions * path_options)479 log_msg_alloc_dynamic_queue_node(LogMessage *msg, const LogPathOptions *path_options)
480 {
481 LogMessageQueueNode *node;
482 node = g_slice_new(LogMessageQueueNode);
483 node->embedded = FALSE;
484 log_msg_init_queue_node(msg, node, path_options);
485 return node;
486 }
487
488 void
log_msg_free_queue_node(LogMessageQueueNode * node)489 log_msg_free_queue_node(LogMessageQueueNode *node)
490 {
491 if (!node->embedded)
492 g_slice_free(LogMessageQueueNode, node);
493 }
494
495 static gboolean
_log_name_value_updates(LogMessage * self)496 _log_name_value_updates(LogMessage *self)
497 {
498 /* we don't log name value updates for internal messages that are
499 * initialized at this point, as that may generate an endless recursion.
500 * log_msg_new_internal() calling log_msg_set_value(), which in turn
501 * generates an internal message, again calling log_msg_set_value()
502 */
503 return (self->flags & LF_INTERNAL) == 0;
504 }
505
506 static inline gboolean
_value_invalidates_legacy_header(NVHandle handle)507 _value_invalidates_legacy_header(NVHandle handle)
508 {
509 return handle == LM_V_PROGRAM || handle == LM_V_PID;
510 }
511
512 void
log_msg_set_value(LogMessage * self,NVHandle handle,const gchar * value,gssize value_len)513 log_msg_set_value(LogMessage *self, NVHandle handle, const gchar *value, gssize value_len)
514 {
515 const gchar *name;
516 gssize name_len;
517 gboolean new_entry = FALSE;
518
519 g_assert(!log_msg_is_write_protected(self));
520
521 if (handle == LM_V_NONE)
522 return;
523
524 name_len = 0;
525 name = log_msg_get_value_name(handle, &name_len);
526
527 if (_log_name_value_updates(self))
528 {
529 msg_trace("Setting value",
530 evt_tag_str("name", name),
531 evt_tag_printf("value", "%.*s", (gint) value_len, value),
532 evt_tag_printf("msg", "%p", self));
533 }
534
535 if (value_len < 0)
536 value_len = strlen(value);
537
538 if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
539 {
540 self->payload = nv_table_clone(self->payload, name_len + value_len + 2);
541 log_msg_set_flag(self, LF_STATE_OWN_PAYLOAD);
542 self->allocated_bytes += self->payload->size;
543 stats_counter_add(count_allocated_bytes, self->payload->size);
544 }
545
546 /* we need a loop here as a single realloc may not be enough. Might help
547 * if we pass how much bytes we need though. */
548
549 while (!nv_table_add_value(self->payload, handle, name, name_len, value, value_len, &new_entry))
550 {
551 /* error allocating string in payload, reallocate */
552 guint32 old_size = self->payload->size;
553 if (!nv_table_realloc(self->payload, &self->payload))
554 {
555 /* can't grow the payload, it has reached the maximum size */
556 msg_info("Cannot store value for this log message, maximum size has been reached",
557 evt_tag_int("maximum_payload", NV_TABLE_MAX_BYTES),
558 evt_tag_str("name", name),
559 evt_tag_printf("value", "%.32s%s", value, value_len > 32 ? "..." : ""));
560 break;
561 }
562 guint32 new_size = self->payload->size;
563 self->allocated_bytes += (new_size - old_size);
564 stats_counter_add(count_allocated_bytes, new_size-old_size);
565 stats_counter_inc(count_payload_reallocs);
566 }
567
568 if (new_entry)
569 log_msg_update_sdata(self, handle, name, name_len);
570
571 if (_value_invalidates_legacy_header(handle))
572 log_msg_unset_value(self, LM_V_LEGACY_MSGHDR);
573 }
574
575 void
log_msg_unset_value(LogMessage * self,NVHandle handle)576 log_msg_unset_value(LogMessage *self, NVHandle handle)
577 {
578 g_assert(!log_msg_is_write_protected(self));
579
580 if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
581 {
582 self->payload = nv_table_clone(self->payload, 0);
583 log_msg_set_flag(self, LF_STATE_OWN_PAYLOAD);
584 }
585
586 while (!nv_table_unset_value(self->payload, handle))
587 {
588 /* error allocating string in payload, reallocate */
589 guint32 old_size = self->payload->size;
590 if (!nv_table_realloc(self->payload, &self->payload))
591 {
592 /* can't grow the payload, it has reached the maximum size */
593 const gchar *name = log_msg_get_value_name(handle, NULL);
594 msg_info("Cannot unset value for this log message, maximum size has been reached",
595 evt_tag_int("maximum_payload", NV_TABLE_MAX_BYTES),
596 evt_tag_str("name", name));
597 break;
598 }
599 guint32 new_size = self->payload->size;
600 self->allocated_bytes += (new_size - old_size);
601 stats_counter_add(count_allocated_bytes, new_size-old_size);
602 stats_counter_inc(count_payload_reallocs);
603 }
604
605 if (_value_invalidates_legacy_header(handle))
606 log_msg_unset_value(self, LM_V_LEGACY_MSGHDR);
607 }
608
609 void
log_msg_unset_value_by_name(LogMessage * self,const gchar * name)610 log_msg_unset_value_by_name(LogMessage *self, const gchar *name)
611 {
612 log_msg_unset_value(self, log_msg_get_value_handle(name));
613 }
614
615 void
log_msg_set_value_indirect(LogMessage * self,NVHandle handle,NVHandle ref_handle,guint8 type,guint16 ofs,guint16 len)616 log_msg_set_value_indirect(LogMessage *self, NVHandle handle, NVHandle ref_handle, guint8 type, guint16 ofs,
617 guint16 len)
618 {
619 const gchar *name;
620 gssize name_len;
621 gboolean new_entry = FALSE;
622
623 g_assert(!log_msg_is_write_protected(self));
624
625 if (handle == LM_V_NONE)
626 return;
627
628 g_assert(handle >= LM_V_MAX);
629
630 name_len = 0;
631 name = log_msg_get_value_name(handle, &name_len);
632
633 if (_log_name_value_updates(self))
634 {
635 msg_trace("Setting indirect value",
636 evt_tag_printf("msg", "%p", self),
637 evt_tag_str("name", name),
638 evt_tag_int("ref_handle", ref_handle),
639 evt_tag_int("ofs", ofs),
640 evt_tag_int("len", len));
641 }
642
643 if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
644 {
645 self->payload = nv_table_clone(self->payload, name_len + 1);
646 log_msg_set_flag(self, LF_STATE_OWN_PAYLOAD);
647 }
648
649 NVReferencedSlice referenced_slice =
650 {
651 .handle = ref_handle,
652 .ofs = ofs,
653 .len = len,
654 .type = type
655 };
656
657 while (!nv_table_add_value_indirect(self->payload, handle, name, name_len, &referenced_slice, &new_entry))
658 {
659 /* error allocating string in payload, reallocate */
660 if (!nv_table_realloc(self->payload, &self->payload))
661 {
662 /* error growing the payload, skip without storing the value */
663 msg_info("Cannot store referenced value for this log message, maximum size has been reached",
664 evt_tag_str("name", name),
665 evt_tag_str("ref-name", log_msg_get_value_name(ref_handle, NULL)));
666 break;
667 }
668 stats_counter_inc(count_payload_reallocs);
669 }
670
671 if (new_entry)
672 log_msg_update_sdata(self, handle, name, name_len);
673 }
674
675 gboolean
log_msg_values_foreach(const LogMessage * self,NVTableForeachFunc func,gpointer user_data)676 log_msg_values_foreach(const LogMessage *self, NVTableForeachFunc func, gpointer user_data)
677 {
678 return nv_table_foreach(self->payload, logmsg_registry, func, user_data);
679 }
680
681 void
log_msg_set_match(LogMessage * self,gint index_,const gchar * value,gssize value_len)682 log_msg_set_match(LogMessage *self, gint index_, const gchar *value, gssize value_len)
683 {
684 g_assert(index_ < 256);
685
686 if (index_ >= self->num_matches)
687 self->num_matches = index_ + 1;
688 log_msg_set_value(self, match_handles[index_], value, value_len);
689 }
690
691 void
log_msg_set_match_indirect(LogMessage * self,gint index_,NVHandle ref_handle,guint8 type,guint16 ofs,guint16 len)692 log_msg_set_match_indirect(LogMessage *self, gint index_, NVHandle ref_handle, guint8 type, guint16 ofs, guint16 len)
693 {
694 g_assert(index_ < 256);
695
696 log_msg_set_value_indirect(self, match_handles[index_], ref_handle, type, ofs, len);
697 }
698
699 void
log_msg_clear_matches(LogMessage * self)700 log_msg_clear_matches(LogMessage *self)
701 {
702 gint i;
703
704 for (i = 0; i < self->num_matches; i++)
705 {
706 log_msg_set_value(self, match_handles[i], "", 0);
707 }
708 self->num_matches = 0;
709 }
710
711 #if GLIB_SIZEOF_LONG != GLIB_SIZEOF_VOID_P
712 #error "The tags bit array assumes that long is the same size as the pointer"
713 #endif
714
715 #if GLIB_SIZEOF_LONG == 8
716 #define LOGMSG_TAGS_NDX_SHIFT 6
717 #define LOGMSG_TAGS_NDX_MASK 0x3F
718 #define LOGMSG_TAGS_BITS 64
719 #elif GLIB_SIZEOF_LONG == 4
720 #define LOGMSG_TAGS_NDX_SHIFT 5
721 #define LOGMSG_TAGS_NDX_MASK 0x1F
722 #define LOGMSG_TAGS_BITS 32
723 #else
724 #error "Unsupported word length, only 32 or 64 bit platforms are supported"
725 #endif
726
727 static inline void
log_msg_tags_foreach_item(const LogMessage * self,gint base,gulong item,LogMessageTagsForeachFunc callback,gpointer user_data)728 log_msg_tags_foreach_item(const LogMessage *self, gint base, gulong item, LogMessageTagsForeachFunc callback,
729 gpointer user_data)
730 {
731 gint i;
732
733 for (i = 0; i < LOGMSG_TAGS_BITS; i++)
734 {
735 if (G_LIKELY(!item))
736 return;
737 if (item & 1)
738 {
739 LogTagId id = (LogTagId) base + i;
740
741 callback(self, id, log_tags_get_by_id(id), user_data);
742 }
743 item >>= 1;
744 }
745 }
746
747
748 void
log_msg_tags_foreach(const LogMessage * self,LogMessageTagsForeachFunc callback,gpointer user_data)749 log_msg_tags_foreach(const LogMessage *self, LogMessageTagsForeachFunc callback, gpointer user_data)
750 {
751 guint i;
752
753 if (self->num_tags == 0)
754 {
755 log_msg_tags_foreach_item(self, 0, (gulong) self->tags, callback, user_data);
756 }
757 else
758 {
759 for (i = 0; i != self->num_tags; ++i)
760 {
761 log_msg_tags_foreach_item(self, i * LOGMSG_TAGS_BITS, self->tags[i], callback, user_data);
762 }
763 }
764 }
765
766
767 static inline void
log_msg_set_bit(gulong * tags,gint index_,gboolean value)768 log_msg_set_bit(gulong *tags, gint index_, gboolean value)
769 {
770 if (value)
771 tags[index_ >> LOGMSG_TAGS_NDX_SHIFT] |= ((gulong) (1UL << (index_ & LOGMSG_TAGS_NDX_MASK)));
772 else
773 tags[index_ >> LOGMSG_TAGS_NDX_SHIFT] &= ~((gulong) (1UL << (index_ & LOGMSG_TAGS_NDX_MASK)));
774 }
775
776 static inline gboolean
log_msg_get_bit(gulong * tags,gint index_)777 log_msg_get_bit(gulong *tags, gint index_)
778 {
779 return !!(tags[index_ >> LOGMSG_TAGS_NDX_SHIFT] & ((gulong) (1UL << (index_ & LOGMSG_TAGS_NDX_MASK))));
780 }
781
782 void
log_msg_set_tag_by_id_onoff(LogMessage * self,LogTagId id,gboolean on)783 log_msg_set_tag_by_id_onoff(LogMessage *self, LogTagId id, gboolean on)
784 {
785 gulong *old_tags;
786 gint old_num_tags;
787 gboolean inline_tags;
788
789 g_assert(!log_msg_is_write_protected(self));
790 if (!log_msg_chk_flag(self, LF_STATE_OWN_TAGS) && self->num_tags)
791 {
792 self->tags = g_memdup(self->tags, sizeof(self->tags[0]) * self->num_tags);
793 }
794 log_msg_set_flag(self, LF_STATE_OWN_TAGS);
795
796 /* if num_tags is 0, it means that we use inline storage of tags */
797 inline_tags = self->num_tags == 0;
798 if (inline_tags && id < LOGMSG_TAGS_BITS)
799 {
800 /* store this tag inline */
801 log_msg_set_bit((gulong *) &self->tags, id, on);
802 }
803 else
804 {
805 /* we can't put this tag inline, either because it is too large, or we don't have the inline space any more */
806
807 if ((self->num_tags * LOGMSG_TAGS_BITS) <= id)
808 {
809 if (G_UNLIKELY(8159 < id))
810 {
811 msg_error("Maximum number of tags reached");
812 return;
813 }
814 old_num_tags = self->num_tags;
815 self->num_tags = (id / LOGMSG_TAGS_BITS) + 1;
816
817 old_tags = self->tags;
818 if (old_num_tags)
819 self->tags = g_realloc(self->tags, sizeof(self->tags[0]) * self->num_tags);
820 else
821 self->tags = g_malloc(sizeof(self->tags[0]) * self->num_tags);
822 memset(&self->tags[old_num_tags], 0, (self->num_tags - old_num_tags) * sizeof(self->tags[0]));
823
824 if (inline_tags)
825 self->tags[0] = (gulong) old_tags;
826 }
827
828 log_msg_set_bit(self->tags, id, on);
829 }
830 if (on)
831 {
832 log_tags_inc_counter(id);
833 }
834 else
835 {
836 log_tags_dec_counter(id);
837 }
838 }
839
840 void
log_msg_set_tag_by_id(LogMessage * self,LogTagId id)841 log_msg_set_tag_by_id(LogMessage *self, LogTagId id)
842 {
843 log_msg_set_tag_by_id_onoff(self, id, TRUE);
844 }
845
846 void
log_msg_set_tag_by_name(LogMessage * self,const gchar * name)847 log_msg_set_tag_by_name(LogMessage *self, const gchar *name)
848 {
849 log_msg_set_tag_by_id_onoff(self, log_tags_get_by_name(name), TRUE);
850 }
851
852 void
log_msg_clear_tag_by_id(LogMessage * self,LogTagId id)853 log_msg_clear_tag_by_id(LogMessage *self, LogTagId id)
854 {
855 log_msg_set_tag_by_id_onoff(self, id, FALSE);
856 }
857
858 void
log_msg_clear_tag_by_name(LogMessage * self,const gchar * name)859 log_msg_clear_tag_by_name(LogMessage *self, const gchar *name)
860 {
861 log_msg_set_tag_by_id_onoff(self, log_tags_get_by_name(name), FALSE);
862 }
863
864 gboolean
log_msg_is_tag_by_id(LogMessage * self,LogTagId id)865 log_msg_is_tag_by_id(LogMessage *self, LogTagId id)
866 {
867 if (G_UNLIKELY(8159 < id))
868 {
869 msg_error("Invalid tag", evt_tag_int("id", (gint) id));
870 return FALSE;
871 }
872 if (self->num_tags == 0 && id < LOGMSG_TAGS_BITS)
873 return log_msg_get_bit((gulong *) &self->tags, id);
874 else if (id < self->num_tags * LOGMSG_TAGS_BITS)
875 return log_msg_get_bit(self->tags, id);
876 else
877 return FALSE;
878 }
879
880 gboolean
log_msg_is_tag_by_name(LogMessage * self,const gchar * name)881 log_msg_is_tag_by_name(LogMessage *self, const gchar *name)
882 {
883 return log_msg_is_tag_by_id(self, log_tags_get_by_name(name));
884 }
885
886 /* structured data elements */
887
888 static void
log_msg_sdata_append_key_escaped(GString * result,const gchar * sstr,gssize len)889 log_msg_sdata_append_key_escaped(GString *result, const gchar *sstr, gssize len)
890 {
891 /* The specification does not have any way to escape keys.
892 * The goal is to create syntactically valid structured data fields. */
893 const guchar *ustr = (const guchar *) sstr;
894
895 for (gssize i = 0; i < len; i++)
896 {
897 if (!isascii(ustr[i]) || ustr[i] == '=' || ustr[i] == ' '
898 || ustr[i] == '[' || ustr[i] == ']' || ustr[i] == '"')
899 {
900 gchar hex_code[4];
901 g_sprintf(hex_code, "%%%02X", ustr[i]);
902 g_string_append(result, hex_code);
903 }
904 else
905 g_string_append_c(result, ustr[i]);
906 }
907 }
908
909 static void
log_msg_sdata_append_escaped(GString * result,const gchar * sstr,gssize len)910 log_msg_sdata_append_escaped(GString *result, const gchar *sstr, gssize len)
911 {
912 gint i;
913 const guchar *ustr = (const guchar *) sstr;
914
915 for (i = 0; i < len; i++)
916 {
917 if (ustr[i] == '"' || ustr[i] == '\\' || ustr[i] == ']')
918 {
919 g_string_append_c(result, '\\');
920 g_string_append_c(result, ustr[i]);
921 }
922 else
923 g_string_append_c(result, ustr[i]);
924 }
925 }
926
927 void
log_msg_append_format_sdata(const LogMessage * self,GString * result,guint32 seq_num)928 log_msg_append_format_sdata(const LogMessage *self, GString *result, guint32 seq_num)
929 {
930 const gchar *value;
931 const gchar *sdata_name, *sdata_elem, *sdata_param, *cur_elem = NULL, *dot;
932 gssize sdata_name_len, sdata_elem_len, sdata_param_len, cur_elem_len = 0, len;
933 gint i;
934 static NVHandle meta_seqid = 0;
935 gssize seqid_length;
936 gboolean has_seq_num = FALSE;
937 const gchar *seqid;
938
939 if (!meta_seqid)
940 meta_seqid = log_msg_get_value_handle(".SDATA.meta.sequenceId");
941
942 seqid = log_msg_get_value(self, meta_seqid, &seqid_length);
943 APPEND_ZERO(seqid, seqid, seqid_length);
944 if (seqid[0])
945 /* Message stores sequenceId */
946 has_seq_num = TRUE;
947 else
948 /* Message hasn't sequenceId */
949 has_seq_num = FALSE;
950
951 for (i = 0; i < self->num_sdata; i++)
952 {
953 NVHandle handle = self->sdata[i];
954 guint16 handle_flags;
955 gint sd_id_len;
956
957 sdata_name_len = 0;
958 sdata_name = log_msg_get_value_name(handle, &sdata_name_len);
959 handle_flags = nv_registry_get_handle_flags(logmsg_registry, handle);
960 value = log_msg_get_value_if_set(self, handle, &len);
961 if (!value)
962 continue;
963
964 g_assert(handle_flags & LM_VF_SDATA);
965
966 /* sdata_name always begins with .SDATA. */
967 g_assert(sdata_name_len > 6);
968
969 sdata_elem = sdata_name + 7;
970 sd_id_len = (handle_flags >> 8);
971
972 if (sd_id_len)
973 {
974 dot = sdata_elem + sd_id_len;
975 if (dot - sdata_name != sdata_name_len)
976 {
977 g_assert((dot - sdata_name < sdata_name_len) && *dot == '.');
978 }
979 else
980 {
981 /* Standalone sdata e.g. [[UserData.updatelist@18372.4]] */
982 dot = NULL;
983 }
984 }
985 else
986 {
987 dot = memrchr(sdata_elem, '.', sdata_name_len - 7);
988 }
989
990 if (G_LIKELY(dot))
991 {
992 sdata_elem_len = dot - sdata_elem;
993
994 sdata_param = dot + 1;
995 sdata_param_len = sdata_name_len - (dot + 1 - sdata_name);
996 }
997 else
998 {
999 sdata_elem_len = sdata_name_len - 7;
1000 if (sdata_elem_len == 0)
1001 {
1002 sdata_elem = "none";
1003 sdata_elem_len = 4;
1004 }
1005 sdata_param = "";
1006 sdata_param_len = 0;
1007 }
1008 if (!cur_elem || sdata_elem_len != cur_elem_len || strncmp(cur_elem, sdata_elem, sdata_elem_len) != 0)
1009 {
1010 if (cur_elem)
1011 {
1012 /* close the previous block */
1013 g_string_append_c(result, ']');
1014 }
1015
1016 /* the current SD block has changed, emit a start */
1017 g_string_append_c(result, '[');
1018 log_msg_sdata_append_key_escaped(result, sdata_elem, sdata_elem_len);
1019
1020 /* update cur_elem */
1021 cur_elem = sdata_elem;
1022 cur_elem_len = sdata_elem_len;
1023 }
1024 /* if message hasn't sequenceId and the cur_elem is the meta block Append the sequenceId for the result
1025 if seq_num isn't 0 */
1026 if (!has_seq_num && seq_num!=0 && strncmp(sdata_elem, "meta.", 5) == 0)
1027 {
1028 gchar sequence_id[16];
1029 g_snprintf(sequence_id, sizeof(sequence_id), "%d", seq_num);
1030 g_string_append_c(result, ' ');
1031 g_string_append_len(result, "sequenceId=\"", 12);
1032 g_string_append_len(result, sequence_id, strlen(sequence_id));
1033 g_string_append_c(result, '"');
1034 has_seq_num = TRUE;
1035 }
1036 if (sdata_param_len)
1037 {
1038 if (value)
1039 {
1040 g_string_append_c(result, ' ');
1041 log_msg_sdata_append_key_escaped(result, sdata_param, sdata_param_len);
1042 g_string_append(result, "=\"");
1043 log_msg_sdata_append_escaped(result, value, len);
1044 g_string_append_c(result, '"');
1045 }
1046 }
1047 }
1048 if (cur_elem)
1049 {
1050 g_string_append_c(result, ']');
1051 }
1052 /*
1053 There was no meta block and if sequenceId must be added (seq_num!=0)
1054 create the whole meta block with sequenceId
1055 */
1056 if (!has_seq_num && seq_num!=0)
1057 {
1058 gchar sequence_id[16];
1059 g_snprintf(sequence_id, sizeof(sequence_id), "%d", seq_num);
1060 g_string_append_c(result, '[');
1061 g_string_append_len(result, "meta sequenceId=\"", 17);
1062 g_string_append_len(result, sequence_id, strlen(sequence_id));
1063 g_string_append_len(result, "\"]", 2);
1064 }
1065 }
1066
1067 void
log_msg_format_sdata(const LogMessage * self,GString * result,guint32 seq_num)1068 log_msg_format_sdata(const LogMessage *self, GString *result, guint32 seq_num)
1069 {
1070 g_string_truncate(result, 0);
1071 log_msg_append_format_sdata(self, result, seq_num);
1072 }
1073
1074 gboolean
log_msg_append_tags_callback(const LogMessage * self,LogTagId tag_id,const gchar * name,gpointer user_data)1075 log_msg_append_tags_callback(const LogMessage *self, LogTagId tag_id, const gchar *name, gpointer user_data)
1076 {
1077 GString *result = (GString *) ((gpointer *) user_data)[0];
1078 gint original_length = GPOINTER_TO_UINT(((gpointer *) user_data)[1]);
1079
1080 g_assert(result);
1081
1082 if (result->len > original_length)
1083 g_string_append_c(result, ',');
1084
1085 str_repr_encode_append(result, name, -1, ",");
1086 return TRUE;
1087 }
1088
1089 void
log_msg_print_tags(const LogMessage * self,GString * result)1090 log_msg_print_tags(const LogMessage *self, GString *result)
1091 {
1092 gpointer args[] = { result, GUINT_TO_POINTER(result->len) };
1093
1094 log_msg_tags_foreach(self, log_msg_append_tags_callback, args);
1095 }
1096
1097 void
log_msg_set_saddr(LogMessage * self,GSockAddr * saddr)1098 log_msg_set_saddr(LogMessage *self, GSockAddr *saddr)
1099 {
1100 log_msg_set_saddr_ref(self, g_sockaddr_ref(saddr));
1101 }
1102
1103 void
log_msg_set_saddr_ref(LogMessage * self,GSockAddr * saddr)1104 log_msg_set_saddr_ref(LogMessage *self, GSockAddr *saddr)
1105 {
1106 if (log_msg_chk_flag(self, LF_STATE_OWN_SADDR))
1107 g_sockaddr_unref(self->saddr);
1108 self->saddr = saddr;
1109 self->flags |= LF_STATE_OWN_SADDR;
1110 }
1111
1112 void
log_msg_set_daddr(LogMessage * self,GSockAddr * daddr)1113 log_msg_set_daddr(LogMessage *self, GSockAddr *daddr)
1114 {
1115 log_msg_set_daddr_ref(self, g_sockaddr_ref(daddr));
1116 }
1117
1118 void
log_msg_set_daddr_ref(LogMessage * self,GSockAddr * daddr)1119 log_msg_set_daddr_ref(LogMessage *self, GSockAddr *daddr)
1120 {
1121 if (log_msg_chk_flag(self, LF_STATE_OWN_DADDR))
1122 g_sockaddr_unref(self->daddr);
1123 self->daddr = daddr;
1124 self->flags |= LF_STATE_OWN_DADDR;
1125 }
1126
1127 /**
1128 * log_msg_init:
1129 * @self: LogMessage instance
1130 * @saddr: sender address
1131 *
1132 * This function initializes a LogMessage instance without allocating it
1133 * first. It is used internally by the log_msg_new function.
1134 **/
1135 static void
log_msg_init(LogMessage * self)1136 log_msg_init(LogMessage *self)
1137 {
1138 GTimeVal tv;
1139
1140 /* ref is set to 1, ack is set to 0 */
1141 self->ack_and_ref_and_abort_and_suspended = LOGMSG_REFCACHE_REF_TO_VALUE(1);
1142 cached_g_current_time(&tv);
1143 self->timestamps[LM_TS_RECVD].ut_sec = tv.tv_sec;
1144 self->timestamps[LM_TS_RECVD].ut_usec = tv.tv_usec;
1145 self->timestamps[LM_TS_RECVD].ut_gmtoff = get_local_timezone_ofs(self->timestamps[LM_TS_RECVD].ut_sec);
1146 self->timestamps[LM_TS_STAMP] = self->timestamps[LM_TS_RECVD];
1147 unix_time_unset(&self->timestamps[LM_TS_PROCESSED]);
1148
1149 self->sdata = NULL;
1150 self->saddr = NULL;
1151 self->daddr = NULL;
1152
1153 self->original = NULL;
1154 self->flags |= LF_STATE_OWN_MASK;
1155 self->pri = LOG_USER | LOG_NOTICE;
1156
1157 self->rcptid = rcptid_generate_id();
1158 log_msg_set_host_id(self);
1159 }
1160
1161 void
log_msg_clear(LogMessage * self)1162 log_msg_clear(LogMessage *self)
1163 {
1164 if(log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
1165 nv_table_unref(self->payload);
1166 self->payload = nv_table_new(LM_V_MAX, 16, 256);
1167
1168 if (log_msg_chk_flag(self, LF_STATE_OWN_TAGS) && self->tags)
1169 {
1170 gboolean inline_tags = self->num_tags == 0;
1171
1172 if (inline_tags)
1173 self->tags = NULL;
1174 else
1175 memset(self->tags, 0, self->num_tags * sizeof(self->tags[0]));
1176 }
1177 else
1178 {
1179 self->tags = NULL;
1180 self->num_tags = 0;
1181 }
1182
1183 self->num_matches = 0;
1184 if (!log_msg_chk_flag(self, LF_STATE_OWN_SDATA))
1185 {
1186 self->sdata = NULL;
1187 self->alloc_sdata = 0;
1188 }
1189 self->num_sdata = 0;
1190
1191 if (log_msg_chk_flag(self, LF_STATE_OWN_SADDR))
1192 g_sockaddr_unref(self->saddr);
1193 self->saddr = NULL;
1194 if (log_msg_chk_flag(self, LF_STATE_OWN_DADDR))
1195 g_sockaddr_unref(self->daddr);
1196 self->daddr = NULL;
1197
1198 /* clear "local", "utf8", "internal", "mark" and similar flags, we start afresh */
1199 self->flags = LF_STATE_OWN_MASK;
1200 }
1201
1202 static inline LogMessage *
log_msg_alloc(gsize payload_size)1203 log_msg_alloc(gsize payload_size)
1204 {
1205 LogMessage *msg;
1206 gsize payload_space = payload_size ? nv_table_get_alloc_size(LM_V_MAX, 16, payload_size) : 0;
1207 gsize alloc_size, payload_ofs = 0;
1208
1209 /* NOTE: logmsg_node_max is updated from parallel threads without locking. */
1210 gint nodes = (volatile gint) logmsg_queue_node_max;
1211
1212 alloc_size = sizeof(LogMessage) + sizeof(LogMessageQueueNode) * nodes;
1213 /* align to 8 boundary */
1214 if (payload_size)
1215 {
1216 alloc_size = (alloc_size + 7) & ~7;
1217 payload_ofs = alloc_size;
1218 alloc_size += payload_space;
1219 }
1220 msg = g_malloc(alloc_size);
1221
1222 memset(msg, 0, sizeof(LogMessage));
1223
1224 if (payload_size)
1225 msg->payload = nv_table_init_borrowed(((gchar *) msg) + payload_ofs, payload_space, LM_V_MAX);
1226
1227 msg->num_nodes = nodes;
1228 msg->allocated_bytes = alloc_size + payload_space;
1229 stats_counter_add(count_allocated_bytes, msg->allocated_bytes);
1230 return msg;
1231 }
1232
1233 static gboolean
_merge_value(NVHandle handle,const gchar * name,const gchar * value,gssize value_len,gpointer user_data)1234 _merge_value(NVHandle handle, const gchar *name, const gchar *value, gssize value_len, gpointer user_data)
1235 {
1236 LogMessage *msg = (LogMessage *) user_data;
1237
1238 if (!nv_table_is_value_set(msg->payload, handle))
1239 log_msg_set_value(msg, handle, value, value_len);
1240 return FALSE;
1241 }
1242
1243 void
log_msg_merge_context(LogMessage * self,LogMessage ** context,gsize context_len)1244 log_msg_merge_context(LogMessage *self, LogMessage **context, gsize context_len)
1245 {
1246 gint i;
1247
1248 for (i = context_len - 1; i >= 0; i--)
1249 {
1250 LogMessage *msg_to_be_merged = context[i];
1251
1252 log_msg_values_foreach(msg_to_be_merged, _merge_value, self);
1253 }
1254 }
1255
1256 static void
log_msg_clone_ack(LogMessage * msg,AckType ack_type)1257 log_msg_clone_ack(LogMessage *msg, AckType ack_type)
1258 {
1259 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
1260
1261 g_assert(msg->original);
1262 path_options.ack_needed = TRUE;
1263 log_msg_ack(msg->original, &path_options, ack_type);
1264 }
1265
1266 /*
1267 * log_msg_clone_cow:
1268 *
1269 * Clone a copy-on-write (cow) copy of a log message.
1270 */
1271 LogMessage *
log_msg_clone_cow(LogMessage * msg,const LogPathOptions * path_options)1272 log_msg_clone_cow(LogMessage *msg, const LogPathOptions *path_options)
1273 {
1274 LogMessage *self = log_msg_alloc(0);
1275 gsize allocated_bytes = self->allocated_bytes;
1276
1277 stats_counter_inc(count_msg_clones);
1278 log_msg_write_protect(msg);
1279
1280 memcpy(self, msg, sizeof(*msg));
1281 msg->allocated_bytes = allocated_bytes;
1282
1283 msg_trace("Message was cloned",
1284 evt_tag_printf("original_msg", "%p", msg),
1285 evt_tag_printf("new_msg", "%p", self));
1286
1287 /* every field _must_ be initialized explicitly if its direct
1288 * copying would cause problems (like copying a pointer by value) */
1289
1290 /* reference the original message */
1291 self->original = log_msg_ref(msg);
1292 self->ack_and_ref_and_abort_and_suspended = LOGMSG_REFCACHE_REF_TO_VALUE(1) + LOGMSG_REFCACHE_ACK_TO_VALUE(
1293 0) + LOGMSG_REFCACHE_ABORT_TO_VALUE(0);
1294 self->cur_node = 0;
1295 self->protected = FALSE;
1296
1297 log_msg_add_ack(self, path_options);
1298 if (!path_options->ack_needed)
1299 {
1300 self->ack_func = NULL;
1301 }
1302 else
1303 {
1304 self->ack_func = log_msg_clone_ack;
1305 }
1306
1307 self->flags &= ~LF_STATE_MASK;
1308
1309 if (self->num_tags == 0)
1310 self->flags |= LF_STATE_OWN_TAGS;
1311 return self;
1312 }
1313
1314 static gsize
_determine_payload_size(gint length,MsgFormatOptions * parse_options)1315 _determine_payload_size(gint length, MsgFormatOptions *parse_options)
1316 {
1317 gsize payload_size;
1318
1319 if ((parse_options->flags & LP_STORE_RAW_MESSAGE))
1320 payload_size = length * 4;
1321 else
1322 payload_size = length * 2;
1323
1324 return MAX(payload_size, 256);
1325 }
1326
1327 /**
1328 * log_msg_new:
1329 * @msg: message to parse
1330 * @length: length of @msg
1331 * @flags: parse flags (LP_*)
1332 *
1333 * This function allocates, parses and returns a new LogMessage instance.
1334 **/
1335 LogMessage *
log_msg_new(const gchar * msg,gint length,MsgFormatOptions * parse_options)1336 log_msg_new(const gchar *msg, gint length,
1337 MsgFormatOptions *parse_options)
1338 {
1339 LogMessage *self = log_msg_alloc(_determine_payload_size(length, parse_options));
1340
1341 log_msg_init(self);
1342
1343 msg_trace("Initial message parsing follows");
1344 msg_format_parse(parse_options, self, (guchar *) msg, length);
1345 return self;
1346 }
1347
1348 LogMessage *
log_msg_new_empty(void)1349 log_msg_new_empty(void)
1350 {
1351 LogMessage *self = log_msg_alloc(256);
1352
1353 log_msg_init(self);
1354 return self;
1355 }
1356
1357 /* This function creates a new log message that should be considered local */
1358 LogMessage *
log_msg_new_local(void)1359 log_msg_new_local(void)
1360 {
1361 LogMessage *self = log_msg_new_empty();
1362
1363 self->flags |= LF_LOCAL;
1364 return self;
1365 }
1366
1367
1368 /**
1369 * log_msg_new_internal:
1370 * @prio: message priority (LOG_*)
1371 * @msg: message text
1372 * @flags: parse flags (LP_*)
1373 *
1374 * This function creates a new log message for messages originating
1375 * internally to syslog-ng
1376 **/
1377 LogMessage *
log_msg_new_internal(gint prio,const gchar * msg)1378 log_msg_new_internal(gint prio, const gchar *msg)
1379 {
1380 gchar buf[32];
1381 LogMessage *self;
1382
1383 g_snprintf(buf, sizeof(buf), "%d", (int) getpid());
1384 self = log_msg_new_local();
1385 self->flags |= LF_INTERNAL;
1386 self->initial_parse = TRUE;
1387 log_msg_set_value(self, LM_V_PROGRAM, "syslog-ng", 9);
1388 log_msg_set_value(self, LM_V_PID, buf, -1);
1389 log_msg_set_value(self, LM_V_MESSAGE, msg, -1);
1390 self->initial_parse = FALSE;
1391 self->pri = prio;
1392
1393 return self;
1394 }
1395
1396 /**
1397 * log_msg_new_mark:
1398 *
1399 * This function returns a new MARK message. MARK messages have the LF_MARK
1400 * flag set.
1401 **/
1402 LogMessage *
log_msg_new_mark(void)1403 log_msg_new_mark(void)
1404 {
1405 LogMessage *self = log_msg_new_local();
1406
1407 log_msg_set_value(self, LM_V_MESSAGE, "-- MARK --", 10);
1408 self->pri = LOG_SYSLOG | LOG_INFO;
1409 self->flags |= LF_MARK | LF_INTERNAL;
1410 return self;
1411 }
1412
1413 /**
1414 * log_msg_free:
1415 * @self: LogMessage instance
1416 *
1417 * Frees a LogMessage instance.
1418 **/
1419 static void
log_msg_free(LogMessage * self)1420 log_msg_free(LogMessage *self)
1421 {
1422 if (log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD) && self->payload)
1423 nv_table_unref(self->payload);
1424 if (log_msg_chk_flag(self, LF_STATE_OWN_TAGS) && self->tags && self->num_tags > 0)
1425 g_free(self->tags);
1426
1427 if (log_msg_chk_flag(self, LF_STATE_OWN_SDATA) && self->sdata)
1428 g_free(self->sdata);
1429 if (log_msg_chk_flag(self, LF_STATE_OWN_SADDR))
1430 g_sockaddr_unref(self->saddr);
1431 if (log_msg_chk_flag(self, LF_STATE_OWN_DADDR))
1432 g_sockaddr_unref(self->daddr);
1433
1434 if (self->original)
1435 log_msg_unref(self->original);
1436
1437 stats_counter_sub(count_allocated_bytes, self->allocated_bytes);
1438
1439 g_free(self);
1440 }
1441
1442 /**
1443 * log_msg_drop:
1444 * @msg: LogMessage instance
1445 * @path_options: path specific options
1446 *
1447 * This function is called whenever a destination driver feels that it is
1448 * unable to process this message. It acks and unrefs the message.
1449 **/
1450 void
log_msg_drop(LogMessage * msg,const LogPathOptions * path_options,AckType ack_type)1451 log_msg_drop(LogMessage *msg, const LogPathOptions *path_options, AckType ack_type)
1452 {
1453 log_msg_ack(msg, path_options, ack_type);
1454 log_msg_unref(msg);
1455 }
1456
1457 static AckType
_ack_and_ref_and_abort_and_suspend_to_acktype(gint value)1458 _ack_and_ref_and_abort_and_suspend_to_acktype(gint value)
1459 {
1460 AckType type = AT_PROCESSED;
1461
1462 if (IS_SUSPENDFLAG_ON(LOGMSG_REFCACHE_VALUE_TO_SUSPEND(value)))
1463 type = AT_SUSPENDED;
1464 else if (IS_ABORTFLAG_ON(LOGMSG_REFCACHE_VALUE_TO_ABORT(value)))
1465 type = AT_ABORTED;
1466
1467 return type;
1468 }
1469
1470
1471 /***************************************************************************************
1472 * In order to read & understand this code, reading the comment on the top
1473 * of this file about ref/ack handling is strongly recommended.
1474 ***************************************************************************************/
1475
1476 /* Function to update the combined ACK (with the abort flag) and REF counter. */
1477 static inline gint
log_msg_update_ack_and_ref_and_abort_and_suspended(LogMessage * self,gint add_ref,gint add_ack,gint add_abort,gint add_suspend)1478 log_msg_update_ack_and_ref_and_abort_and_suspended(LogMessage *self, gint add_ref, gint add_ack, gint add_abort,
1479 gint add_suspend)
1480 {
1481 gint old_value, new_value;
1482 do
1483 {
1484 new_value = old_value = (volatile gint) self->ack_and_ref_and_abort_and_suspended;
1485 new_value = (new_value & ~LOGMSG_REFCACHE_REF_MASK) + LOGMSG_REFCACHE_REF_TO_VALUE( (LOGMSG_REFCACHE_VALUE_TO_REF(
1486 old_value) + add_ref));
1487 new_value = (new_value & ~LOGMSG_REFCACHE_ACK_MASK) + LOGMSG_REFCACHE_ACK_TO_VALUE( (LOGMSG_REFCACHE_VALUE_TO_ACK(
1488 old_value) + add_ack));
1489 new_value = (new_value & ~LOGMSG_REFCACHE_ABORT_MASK) + LOGMSG_REFCACHE_ABORT_TO_VALUE((LOGMSG_REFCACHE_VALUE_TO_ABORT(
1490 old_value) | add_abort));
1491 new_value = (new_value & ~LOGMSG_REFCACHE_SUSPEND_MASK) + LOGMSG_REFCACHE_SUSPEND_TO_VALUE((
1492 LOGMSG_REFCACHE_VALUE_TO_SUSPEND(old_value) | add_suspend));
1493 }
1494 while (!g_atomic_int_compare_and_exchange(&self->ack_and_ref_and_abort_and_suspended, old_value, new_value));
1495
1496 return old_value;
1497 }
1498
1499 /* Function to update the combined ACK (without abort) and REF counter. */
1500 static inline gint
log_msg_update_ack_and_ref(LogMessage * self,gint add_ref,gint add_ack)1501 log_msg_update_ack_and_ref(LogMessage *self, gint add_ref, gint add_ack)
1502 {
1503 return log_msg_update_ack_and_ref_and_abort_and_suspended(self, add_ref, add_ack, 0, 0);
1504 }
1505
1506 /**
1507 * log_msg_ref:
1508 * @self: LogMessage instance
1509 *
1510 * Increment reference count of @self and return the new reference.
1511 **/
1512 LogMessage *
log_msg_ref(LogMessage * self)1513 log_msg_ref(LogMessage *self)
1514 {
1515 gint old_value;
1516
1517 if (G_LIKELY(logmsg_current == self))
1518 {
1519 /* fastpath, @self is the current message, ref/unref processing is
1520 * delayed until log_msg_refcache_stop() is called */
1521
1522 logmsg_cached_refs++;
1523 return self;
1524 }
1525
1526 /* slow path, refcache is not used, do the ordinary way */
1527 old_value = log_msg_update_ack_and_ref(self, 1, 0);
1528 g_assert(LOGMSG_REFCACHE_VALUE_TO_REF(old_value) >= 1);
1529 return self;
1530 }
1531
1532 /**
1533 * log_msg_unref:
1534 * @self: LogMessage instance
1535 *
1536 * Decrement reference count and free self if the reference count becomes 0.
1537 **/
1538 void
log_msg_unref(LogMessage * self)1539 log_msg_unref(LogMessage *self)
1540 {
1541 gint old_value;
1542
1543 if (G_LIKELY(logmsg_current == self))
1544 {
1545 /* fastpath, @self is the current message, ref/unref processing is
1546 * delayed until log_msg_refcache_stop() is called */
1547
1548 logmsg_cached_refs--;
1549 return;
1550 }
1551
1552 old_value = log_msg_update_ack_and_ref(self, -1, 0);
1553 g_assert(LOGMSG_REFCACHE_VALUE_TO_REF(old_value) >= 1);
1554
1555 if (LOGMSG_REFCACHE_VALUE_TO_REF(old_value) == 1)
1556 {
1557 log_msg_free(self);
1558 }
1559 }
1560
1561 /**
1562 * log_msg_add_ack:
1563 * @m: LogMessage instance
1564 *
1565 * This function increments the number of required acknowledges.
1566 **/
1567 void
log_msg_add_ack(LogMessage * self,const LogPathOptions * path_options)1568 log_msg_add_ack(LogMessage *self, const LogPathOptions *path_options)
1569 {
1570 if (path_options->ack_needed)
1571 {
1572 if (G_LIKELY(logmsg_current == self))
1573 {
1574 /* fastpath, @self is the current message, add_ack/ack processing is
1575 * delayed until log_msg_refcache_stop() is called */
1576
1577 logmsg_cached_acks++;
1578 logmsg_cached_ack_needed = TRUE;
1579 return;
1580 }
1581 log_msg_update_ack_and_ref(self, 0, 1);
1582 }
1583 }
1584
1585
1586 /**
1587 * log_msg_ack:
1588 * @msg: LogMessage instance
1589 * @path_options: path specific options
1590 * @acked: TRUE: positive ack, FALSE: negative ACK
1591 *
1592 * Indicate that the message was processed successfully and the sender can
1593 * queue further messages.
1594 **/
1595 void
log_msg_ack(LogMessage * self,const LogPathOptions * path_options,AckType ack_type)1596 log_msg_ack(LogMessage *self, const LogPathOptions *path_options, AckType ack_type)
1597 {
1598 gint old_value;
1599
1600 if (path_options->ack_needed)
1601 {
1602 if (G_LIKELY(logmsg_current == self))
1603 {
1604 /* fastpath, @self is the current message, add_ack/ack processing is
1605 * delayed until log_msg_refcache_stop() is called */
1606
1607 logmsg_cached_acks--;
1608 logmsg_cached_abort |= IS_ACK_ABORTED(ack_type);
1609 logmsg_cached_suspend |= IS_ACK_SUSPENDED(ack_type);
1610 return;
1611 }
1612 old_value = log_msg_update_ack_and_ref_and_abort_and_suspended(self, 0, -1, IS_ACK_ABORTED(ack_type),
1613 IS_ACK_SUSPENDED(ack_type));
1614 if (LOGMSG_REFCACHE_VALUE_TO_ACK(old_value) == 1)
1615 {
1616 if (ack_type == AT_SUSPENDED)
1617 self->ack_func(self, AT_SUSPENDED);
1618 else if (ack_type == AT_ABORTED)
1619 self->ack_func(self, AT_ABORTED);
1620 else
1621 self->ack_func(self, _ack_and_ref_and_abort_and_suspend_to_acktype(old_value));
1622 }
1623 }
1624 }
1625
1626 /*
1627 * Break out of an acknowledgement chain. The incoming message is
1628 * ACKed and a new path options structure is returned that can be used
1629 * to send to further consuming pipes.
1630 */
1631 const LogPathOptions *
log_msg_break_ack(LogMessage * msg,const LogPathOptions * path_options,LogPathOptions * local_options)1632 log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOptions *local_options)
1633 {
1634 /* NOTE: in case the user requested flow control, we can't break the
1635 * ACK chain, as that would lead to early acks, that would cause
1636 * message loss */
1637
1638 g_assert(!path_options->flow_control_requested);
1639
1640 log_msg_ack(msg, path_options, AT_PROCESSED);
1641
1642 *local_options = *path_options;
1643 local_options->ack_needed = FALSE;
1644
1645 return local_options;
1646 }
1647
1648
1649 /*
1650 * Start caching ref/unref/ack/add-ack operations in the current thread for
1651 * the message specified by @self. See the comment at the top of this file
1652 * for more information.
1653 *
1654 * This function is to be called by the producer thread (e.g. the one
1655 * that generates new messages). You should use
1656 * log_msg_refcache_start_consumer() in consumer threads instead.
1657 *
1658 * This function cannot be called for the same message from multiple
1659 * threads.
1660 *
1661 */
1662 void
log_msg_refcache_start_producer(LogMessage * self)1663 log_msg_refcache_start_producer(LogMessage *self)
1664 {
1665 g_assert(logmsg_current == NULL);
1666
1667 logmsg_current = self;
1668 /* we're the producer of said message, and thus we want to inhibit
1669 * freeing/acking it due to our cached refs, add a bias large enough
1670 * to cover any possible unrefs/acks of the consumer side */
1671
1672 /* we don't need to be thread-safe here, as a producer has just created this message and no parallel access is yet possible */
1673
1674 self->ack_and_ref_and_abort_and_suspended = (self->ack_and_ref_and_abort_and_suspended & ~LOGMSG_REFCACHE_REF_MASK) +
1675 LOGMSG_REFCACHE_REF_TO_VALUE((LOGMSG_REFCACHE_VALUE_TO_REF(self->ack_and_ref_and_abort_and_suspended) +
1676 LOGMSG_REFCACHE_BIAS));
1677 self->ack_and_ref_and_abort_and_suspended = (self->ack_and_ref_and_abort_and_suspended & ~LOGMSG_REFCACHE_ACK_MASK) +
1678 LOGMSG_REFCACHE_ACK_TO_VALUE((LOGMSG_REFCACHE_VALUE_TO_ACK(self->ack_and_ref_and_abort_and_suspended) +
1679 LOGMSG_REFCACHE_BIAS));
1680
1681 logmsg_cached_refs = -LOGMSG_REFCACHE_BIAS;
1682 logmsg_cached_acks = -LOGMSG_REFCACHE_BIAS;
1683 logmsg_cached_abort = FALSE;
1684 logmsg_cached_suspend = FALSE;
1685 logmsg_cached_ack_needed = TRUE;
1686 }
1687
1688 /*
1689 * Start caching ref/unref/ack/add-ack operations in the current thread for
1690 * the message specified by @self. See the comment at the top of this file
1691 * for more information.
1692 *
1693 * This function is to be called by the consumer threads (e.g. the
1694 * ones that consume messages).
1695 *
1696 * This function can be called from multiple consumer threads at the
1697 * same time, even for the same message.
1698 *
1699 */
1700 void
log_msg_refcache_start_consumer(LogMessage * self,const LogPathOptions * path_options)1701 log_msg_refcache_start_consumer(LogMessage *self, const LogPathOptions *path_options)
1702 {
1703 g_assert(logmsg_current == NULL);
1704
1705 logmsg_current = self;
1706 logmsg_cached_ack_needed = path_options->ack_needed;
1707 logmsg_cached_refs = 0;
1708 logmsg_cached_acks = 0;
1709 logmsg_cached_abort = FALSE;
1710 logmsg_cached_suspend = FALSE;
1711 }
1712
1713 /*
1714 * Stop caching ref/unref/ack/add-ack operations in the current thread for
1715 * the message specified by the log_msg_refcache_start() function.
1716 *
1717 * See the comment at the top of this file for more information.
1718 */
1719 void
log_msg_refcache_stop(void)1720 log_msg_refcache_stop(void)
1721 {
1722 gint old_value;
1723 gint current_cached_acks;
1724 gboolean current_cached_abort;
1725 gboolean current_cached_suspend;
1726
1727 g_assert(logmsg_current != NULL);
1728
1729 /* validate that we didn't overflow the counters:
1730 *
1731 * Both counters must be:
1732 *
1733 * - at least 1 smaller than the bias, rationale:
1734 *
1735 * - if we are caching "bias" number of refs, it may happen
1736 * that there are bias number of unrefs, potentially running
1737 * in consumer threads
1738 *
1739 * - if the potential unrefs is larger than the bias value, it may
1740 * happen that the producer sets the bias (trying to avoid
1741 * the freeing of the LogMessage), but still it gets freed.
1742 *
1743 * - not smaller than the "-bias" value, rationale:
1744 * - if we are caching "bias" number of unrefs the same can happen
1745 * as with the ref case.
1746 *
1747 */
1748 g_assert((logmsg_cached_acks < LOGMSG_REFCACHE_BIAS - 1) && (logmsg_cached_acks >= -LOGMSG_REFCACHE_BIAS));
1749 g_assert((logmsg_cached_refs < LOGMSG_REFCACHE_BIAS - 1) && (logmsg_cached_refs >= -LOGMSG_REFCACHE_BIAS));
1750
1751 /*
1752 * We fold the differences in ack/ref counts in three stages:
1753 *
1754 * 1) we take a ref of logmsg_current, this is needed so that the
1755 * message is not freed until we return from refcache_stop()
1756 *
1757 * 2) we add in all the diffs that were accumulated between
1758 * refcache_start and refcache_stop. This gets us a final value of the
1759 * ack counter, ref must be >= as we took a ref ourselves.
1760 *
1761 * 3) we call the ack handler if needed, this might change ref counters
1762 * recursively (but not ack counters as that already atomically
1763 * dropped to zero)
1764 *
1765 * 4) drop the ref we took in step 1) above
1766 *
1767 * 4) then we fold in the net ref results of the ack callback and
1768 * refcache_stop() combined. This either causes the LogMessage to be
1769 * freed (when we were the last), or it stays around because of other
1770 * refs.
1771 */
1772
1773 /* 1) take ref */
1774 log_msg_ref(logmsg_current);
1775
1776 /* 2) fold in ref/ack counter diffs into the atomic value */
1777
1778 current_cached_acks = logmsg_cached_acks;
1779 logmsg_cached_acks = 0;
1780
1781 current_cached_abort = logmsg_cached_abort;
1782 logmsg_cached_abort = FALSE;
1783
1784 current_cached_suspend = logmsg_cached_suspend;
1785 logmsg_cached_suspend = FALSE;
1786
1787 old_value = log_msg_update_ack_and_ref_and_abort_and_suspended(logmsg_current, 0, current_cached_acks,
1788 current_cached_abort, current_cached_suspend);
1789
1790 if ((LOGMSG_REFCACHE_VALUE_TO_ACK(old_value) == -current_cached_acks) && logmsg_cached_ack_needed)
1791 {
1792 AckType ack_type_cumulated = _ack_and_ref_and_abort_and_suspend_to_acktype(old_value);
1793
1794 if (current_cached_suspend)
1795 ack_type_cumulated = AT_SUSPENDED;
1796 else if (current_cached_abort)
1797 ack_type_cumulated = AT_ABORTED;
1798
1799 /* 3) call the ack handler */
1800
1801 logmsg_current->ack_func(logmsg_current, ack_type_cumulated);
1802
1803 /* the ack callback may not change the ack counters, it already
1804 * dropped to zero atomically, changing that again is an error */
1805
1806 g_assert(logmsg_cached_acks == 0);
1807 }
1808
1809 /* 4) drop our own ref */
1810 log_msg_unref(logmsg_current);
1811
1812 /* 5) fold the combined result of our own ref/unref and ack handler's results */
1813 old_value = log_msg_update_ack_and_ref(logmsg_current, logmsg_cached_refs, 0);
1814
1815 if (LOGMSG_REFCACHE_VALUE_TO_REF(old_value) == -logmsg_cached_refs)
1816 log_msg_free(logmsg_current);
1817 logmsg_cached_refs = 0;
1818 logmsg_current = NULL;
1819 }
1820
1821 void
log_msg_registry_init(void)1822 log_msg_registry_init(void)
1823 {
1824 gint i;
1825
1826 logmsg_registry = nv_registry_new(builtin_value_names, NVHANDLE_MAX_VALUE);
1827 nv_registry_add_alias(logmsg_registry, LM_V_MESSAGE, "MSG");
1828 nv_registry_add_alias(logmsg_registry, LM_V_MESSAGE, "MSGONLY");
1829 nv_registry_add_alias(logmsg_registry, LM_V_HOST, "FULLHOST");
1830 nv_registry_add_alias(logmsg_registry, LM_V_HOST_FROM, "FULLHOST_FROM");
1831
1832 for (i = 0; macros[i].name; i++)
1833 {
1834 if (nv_registry_get_handle(logmsg_registry, macros[i].name) == 0)
1835 {
1836 NVHandle handle;
1837
1838 handle = nv_registry_alloc_handle(logmsg_registry, macros[i].name);
1839 nv_registry_set_handle_flags(logmsg_registry, handle, (macros[i].id << 8) + LM_VF_MACRO);
1840 }
1841 }
1842
1843 /* register $0 - $255 in order */
1844 for (i = 0; i < 256; i++)
1845 {
1846 gchar buf[8];
1847
1848 g_snprintf(buf, sizeof(buf), "%d", i);
1849 match_handles[i] = nv_registry_alloc_handle(logmsg_registry, buf);
1850 }
1851 }
1852
1853 void
log_msg_registry_deinit(void)1854 log_msg_registry_deinit(void)
1855 {
1856 nv_registry_free(logmsg_registry);
1857 logmsg_registry = NULL;
1858 }
1859
1860 void
log_msg_registry_foreach(GHFunc func,gpointer user_data)1861 log_msg_registry_foreach(GHFunc func, gpointer user_data)
1862 {
1863 nv_registry_foreach(logmsg_registry, func, user_data);
1864 }
1865
1866 static void
log_msg_register_stats(void)1867 log_msg_register_stats(void)
1868 {
1869 stats_lock();
1870 StatsClusterKey sc_key;
1871 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "msg_clones", NULL );
1872 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &count_msg_clones);
1873
1874 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "payload_reallocs", NULL );
1875 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &count_payload_reallocs);
1876
1877 stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "sdata_updates", NULL );
1878 stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &count_sdata_updates);
1879
1880 stats_cluster_single_key_set(&sc_key, SCS_GLOBAL, "msg_allocated_bytes", NULL);
1881 stats_register_counter(1, &sc_key, SC_TYPE_SINGLE_VALUE, &count_allocated_bytes);
1882 stats_unlock();
1883 }
1884
1885 void
log_msg_global_init(void)1886 log_msg_global_init(void)
1887 {
1888 log_msg_registry_init();
1889
1890 /* NOTE: we always initialize counters as they are on stats-level(0),
1891 * however we need to defer that as the stats subsystem may not be
1892 * operational yet */
1893 register_application_hook(AH_RUNNING, (ApplicationHookFunc) log_msg_register_stats, NULL, AHM_RUN_ONCE);
1894 }
1895
1896
1897 const gchar *
log_msg_get_handle_name(NVHandle handle,gssize * length)1898 log_msg_get_handle_name(NVHandle handle, gssize *length)
1899 {
1900 return nv_registry_get_handle_name(logmsg_registry, handle, length);
1901 }
1902
1903 void
log_msg_global_deinit(void)1904 log_msg_global_deinit(void)
1905 {
1906 log_msg_registry_deinit();
1907 }
1908
1909 gint
log_msg_lookup_time_stamp_name(const gchar * name)1910 log_msg_lookup_time_stamp_name(const gchar *name)
1911 {
1912 if (strcmp(name, "stamp") == 0)
1913 return LM_TS_STAMP;
1914 else if (strcmp(name, "recvd") == 0)
1915 return LM_TS_RECVD;
1916 return -1;
1917 }
1918
1919 gssize
log_msg_get_size(LogMessage * self)1920 log_msg_get_size(LogMessage *self)
1921 {
1922 if (!self)
1923 return 0;
1924
1925 return
1926 sizeof(LogMessage) + // msg.static fields
1927 + self->alloc_sdata * sizeof(self->sdata[0]) +
1928 g_sockaddr_len(self->saddr) + g_sockaddr_len(self->daddr) +
1929 ((self->num_tags) ? sizeof(self->tags[0]) * self->num_tags : 0) +
1930 nv_table_get_memory_consumption(self->payload); // msg.payload (nvtable)
1931 }
1932
1933 #ifdef __linux__
1934
1935 const gchar *
1936 __log_msg_get_value(const LogMessage *self, NVHandle handle, gssize *value_len)
1937 __attribute__((alias("log_msg_get_value")));
1938
1939 const gchar *
1940 __log_msg_get_value_by_name(const LogMessage *self, const gchar *name, gssize *value_len)
1941 __attribute__((alias("log_msg_get_value_by_name")));
1942
1943 void
1944 __log_msg_set_value_by_name(LogMessage *self, const gchar *name, const gchar *value, gssize length)
1945 __attribute__((alias("log_msg_set_value_by_name")));
1946
1947 #endif
1948