1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logmsg/logmsg.h"
26 #include "str-utils.h"
27 #include "str-repr/encode.h"
28 #include "messages.h"
29 #include "logpipe.h"
30 #include "timeutils/cache.h"
31 #include "timeutils/misc.h"
32 #include "logmsg/nvtable.h"
33 #include "stats/stats-registry.h"
34 #include "stats/stats-cluster-single.h"
35 #include "template/templates.h"
36 #include "tls-support.h"
37 #include "compat/string.h"
38 #include "rcptid.h"
39 #include "template/macros.h"
40 #include "host-id.h"
41 #include "ack-tracker/ack_tracker.h"
42 #include "apphook.h"
43 
44 #include <glib/gprintf.h>
45 #include <sys/types.h>
46 #include <time.h>
47 #include <unistd.h>
48 #include <string.h>
49 #include <stdlib.h>
50 #include <ctype.h>
51 #include <syslog.h>
52 
53 /*
54  * Reference/ACK counting for LogMessage structures
55  *
56  * Each LogMessage structure is allocated when received by a LogSource
57  * instance, and then freed once all destinations finish with it.  Since a
58  * LogMessage is processed by different threads, reference counting must be
59  * atomic.
60  *
61  * A similar counter is used to track when a given message is considered to
62  * be delivered.  In case flow-control is in use, the number of
63  * to-be-expected ACKs are counted in an atomic variable.
64  *
65  * Code is written in a way, that it is explicitly mentioned whether a
66  * function expects a reference (called "consuming" the ref), or whether it
67  * gets a 'borrowed' reference (more common).  Also, a function that returns
68  * a LogMessage instance generally returns a reference too.
69  *
70  * Because of these rules, quite a number of refs/unrefs are being made
71  * during the processing of a message, even though they wouldn't cause the
72  * structure to be freed.  Considering that ref/unref operations are
73  * expensive atomic operations, these have a considerable overhead.
74  *
75  * The solution we employ in this module, is that we try to exploit the
76  * fact, that each thread is usually working on a single LogMessage
77  * instance, and once it is done with it, it fetches the next one, and so
78  * on.  In the LogReader/LogWriter cases, this usage pattern is quite
79  * normal.
80  *
81  * Also, a quite similar usage pattern can be applied to the ACK counter
82  * (the one which tracks how much flow-controlled destinations need to
83  * confirm the deliver of the message).
84  *
85  * The solution implemented here is outlined below (and the same rules are
86  * used for ACKs and REFs):
87  *
88  *    - ACKs and REFs are put into the same 32 bit integer value, one half
89  *      is used for the ref counter, the other is used for the ACK counter
90  *      (see the macros LOGMSG_REFCACHE_ below)
91  *
92  *    - The processing of a given message in a given thread is enclosed by
93  *      calls to log_msg_refcache_start() / log_msg_refcache_stop()
94  *
95  *    - The calls to these functions is optional, if they are not called,
96  *      then normal atomic reference counting is performed
97  *
98  *    - When refcache_start() is called, the atomic reference count is not
99  *      changed by log_msg_ref()/unref(). Rather, a per-thread variable is
100  *      used to count the _difference_ to the "would-be" value of the ref counter.
101  *
102  *    - When refcache_stop() is called, the atomic reference counter is
103  *      updated as a single atomic operation. This means, that if a thread
104  *      calls refcache_start() early enough, then the ref/unref operations
105  *      performed by the given thread can be completely atomic-op free.
106  *
107  *    - There's one catch: if the producer thread (e.g. LogReader), doesn't
108  *      add references to a consumer thread (e.g.  LogWriter), and the
109  *      consumer doesn't use the refcache infrastructure, then the counters
110  *      could become negative (simply because the reader's real number of
111  *      references is not represented in the refcounter).  This is solved by
112  *      adding a large-enough number (so called BIAS) to the ref counter in
113  *      refcache_start(), which ensures that all possible writers will see a
114  *      positive value.  This is then subtracted in refcache_stop() the
115  *      same way as the other references.
116  *
117  * Since we use the same atomic variable to store two things, updating that
118  * counter becomes somewhat more complicated, therefore a g_atomic_int_add()
119  * doesn't suffice.  We're using a CAS loop (compare-and-exchange) to do our
120  * stuff, but that shouldn't have that much of an overhead.
121  */
122 
123 TLS_BLOCK_START
124 {
125   /* message that is being processed by the current thread. Its ack/ref changes are cached */
126   LogMessage *logmsg_current;
127   /* whether the consumer is flow-controlled, (the producer always is) */
128   gboolean logmsg_cached_ack_needed;
129 
130   /* has to be signed as these can become negative */
131   /* number of cached refs by the current thread */
132   gint logmsg_cached_refs;
133   /* number of cached acks by the current thread */
134   gint logmsg_cached_acks;
135   /* abort flag in the current thread for acks */
136   gboolean logmsg_cached_abort;
137   /* suspend flag in the current thread for acks */
138   gboolean logmsg_cached_suspend;
139 }
140 TLS_BLOCK_END;
141 
142 #define logmsg_current              __tls_deref(logmsg_current)
143 #define logmsg_cached_refs          __tls_deref(logmsg_cached_refs)
144 #define logmsg_cached_acks          __tls_deref(logmsg_cached_acks)
145 #define logmsg_cached_ack_needed    __tls_deref(logmsg_cached_ack_needed)
146 #define logmsg_cached_abort         __tls_deref(logmsg_cached_abort)
147 #define logmsg_cached_suspend       __tls_deref(logmsg_cached_suspend)
148 
149 #define LOGMSG_REFCACHE_SUSPEND_SHIFT                 31 /* number of bits to shift to get the SUSPEND flag */
150 #define LOGMSG_REFCACHE_SUSPEND_MASK          0x80000000 /* bit mask to extract the SUSPEND flag */
151 #define LOGMSG_REFCACHE_ABORT_SHIFT                   30 /* number of bits to shift to get the ABORT flag */
152 #define LOGMSG_REFCACHE_ABORT_MASK            0x40000000 /* bit mask to extract the ABORT flag */
153 #define LOGMSG_REFCACHE_ACK_SHIFT                     15 /* number of bits to shift to get the ACK counter */
154 #define LOGMSG_REFCACHE_ACK_MASK              0x3FFF8000 /* bit mask to extract the ACK counter */
155 #define LOGMSG_REFCACHE_REF_SHIFT                      0 /* number of bits to shift to get the REF counter */
156 #define LOGMSG_REFCACHE_REF_MASK              0x00007FFF /* bit mask to extract the ACK counter */
157 #define LOGMSG_REFCACHE_BIAS                  0x00002000 /* the BIAS we add to the ref counter in refcache_start */
158 
159 #define LOGMSG_REFCACHE_REF_TO_VALUE(x)    (((x) << LOGMSG_REFCACHE_REF_SHIFT)   & LOGMSG_REFCACHE_REF_MASK)
160 #define LOGMSG_REFCACHE_ACK_TO_VALUE(x)    (((x) << LOGMSG_REFCACHE_ACK_SHIFT)   & LOGMSG_REFCACHE_ACK_MASK)
161 #define LOGMSG_REFCACHE_ABORT_TO_VALUE(x)  (((x) << LOGMSG_REFCACHE_ABORT_SHIFT) & LOGMSG_REFCACHE_ABORT_MASK)
162 #define LOGMSG_REFCACHE_SUSPEND_TO_VALUE(x)  (((x) << LOGMSG_REFCACHE_SUSPEND_SHIFT) & LOGMSG_REFCACHE_SUSPEND_MASK)
163 
164 #define LOGMSG_REFCACHE_VALUE_TO_REF(x)    (((x) & LOGMSG_REFCACHE_REF_MASK)   >> LOGMSG_REFCACHE_REF_SHIFT)
165 #define LOGMSG_REFCACHE_VALUE_TO_ACK(x)    (((x) & LOGMSG_REFCACHE_ACK_MASK)   >> LOGMSG_REFCACHE_ACK_SHIFT)
166 #define LOGMSG_REFCACHE_VALUE_TO_ABORT(x)  (((x) & LOGMSG_REFCACHE_ABORT_MASK) >> LOGMSG_REFCACHE_ABORT_SHIFT)
167 #define LOGMSG_REFCACHE_VALUE_TO_SUSPEND(x)  (((x) & LOGMSG_REFCACHE_SUSPEND_MASK) >> LOGMSG_REFCACHE_SUSPEND_SHIFT)
168 
169 /**********************************************************************
170  * LogMessage
171  **********************************************************************/
172 
173 static inline gboolean
log_msg_chk_flag(const LogMessage * self,gint32 flag)174 log_msg_chk_flag(const LogMessage *self, gint32 flag)
175 {
176   return self->flags & flag;
177 }
178 
179 static inline void
log_msg_set_flag(LogMessage * self,gint32 flag)180 log_msg_set_flag(LogMessage *self, gint32 flag)
181 {
182   self->flags |= flag;
183 }
184 
185 static inline void
log_msg_set_host_id(LogMessage * msg)186 log_msg_set_host_id(LogMessage *msg)
187 {
188   msg->host_id = host_id_get();
189 }
190 
191 /* the index matches the value id */
192 const gchar *builtin_value_names[] =
193 {
194   "HOST",
195   "HOST_FROM",
196   "MESSAGE",
197   "PROGRAM",
198   "PID",
199   "MSGID",
200   "SOURCE",
201   "LEGACY_MSGHDR",
202   "__RESERVED_LM_V_MAX",
203   NULL,
204 };
205 
206 static void
__free_macro_value(void * val)207 __free_macro_value(void *val)
208 {
209   g_string_free((GString *) val, TRUE);
210 }
211 
212 static NVHandle match_handles[256];
213 NVRegistry *logmsg_registry;
214 const char logmsg_sd_prefix[] = ".SDATA.";
215 const gint logmsg_sd_prefix_len = sizeof(logmsg_sd_prefix) - 1;
216 gint logmsg_queue_node_max = 1;
217 /* statistics */
218 static StatsCounterItem *count_msg_clones;
219 static StatsCounterItem *count_payload_reallocs;
220 static StatsCounterItem *count_sdata_updates;
221 static StatsCounterItem *count_allocated_bytes;
222 static GPrivate priv_macro_value = G_PRIVATE_INIT(__free_macro_value);
223 
224 void
log_msg_write_protect(LogMessage * self)225 log_msg_write_protect(LogMessage *self)
226 {
227   self->protected = TRUE;
228 }
229 
230 LogMessage *
log_msg_make_writable(LogMessage ** pself,const LogPathOptions * path_options)231 log_msg_make_writable(LogMessage **pself, const LogPathOptions *path_options)
232 {
233   if (log_msg_is_write_protected(*pself))
234     {
235       LogMessage *new;
236 
237       new = log_msg_clone_cow(*pself, path_options);
238       log_msg_unref(*pself);
239       *pself = new;
240     }
241   return *pself;
242 }
243 
244 
245 static void
log_msg_update_sdata_slow(LogMessage * self,NVHandle handle,const gchar * name,gssize name_len)246 log_msg_update_sdata_slow(LogMessage *self, NVHandle handle, const gchar *name, gssize name_len)
247 {
248   guint16 alloc_sdata;
249   guint16 prefix_and_block_len;
250   gint i;
251   const gchar *dot;
252 
253   /* this was a structured data element, insert a ref to the sdata array */
254 
255   stats_counter_inc(count_sdata_updates);
256   if (self->num_sdata == 255)
257     {
258       msg_error("syslog-ng only supports 255 SD elements right now, just drop an email to the mailing list that it was not enough with your use-case so we can increase it");
259       return;
260     }
261 
262   if (self->alloc_sdata <= self->num_sdata)
263     {
264       alloc_sdata = MAX(self->num_sdata + 1, STRICT_ROUND_TO_NEXT_EIGHT(self->num_sdata));
265       if (alloc_sdata > 255)
266         alloc_sdata = 255;
267     }
268   else
269     alloc_sdata = self->alloc_sdata;
270 
271   if (log_msg_chk_flag(self, LF_STATE_OWN_SDATA) && self->sdata)
272     {
273       if (self->alloc_sdata < alloc_sdata)
274         {
275           self->sdata = g_realloc(self->sdata, alloc_sdata * sizeof(self->sdata[0]));
276           memset(&self->sdata[self->alloc_sdata], 0, (alloc_sdata - self->alloc_sdata) * sizeof(self->sdata[0]));
277         }
278     }
279   else
280     {
281       NVHandle *sdata;
282 
283       sdata = g_malloc(alloc_sdata * sizeof(self->sdata[0]));
284       if (self->num_sdata)
285         memcpy(sdata, self->sdata, self->num_sdata * sizeof(self->sdata[0]));
286       memset(&sdata[self->num_sdata], 0, sizeof(self->sdata[0]) * (self->alloc_sdata - self->num_sdata));
287       self->sdata = sdata;
288       log_msg_set_flag(self, LF_STATE_OWN_SDATA);
289     }
290   guint16 old_alloc_sdata = self->alloc_sdata;
291   self->alloc_sdata = alloc_sdata;
292   if (self->sdata)
293     {
294       self->allocated_bytes += ((self->alloc_sdata - old_alloc_sdata) * sizeof(self->sdata[0]));
295       stats_counter_add(count_allocated_bytes, (self->alloc_sdata - old_alloc_sdata) * sizeof(self->sdata[0]));
296     }
297   /* ok, we have our own SDATA array now which has at least one free slot */
298 
299   if (!self->initial_parse)
300     {
301       dot = memrchr(name, '.', name_len);
302       prefix_and_block_len = dot - name;
303 
304       for (i = self->num_sdata - 1; i >= 0; i--)
305         {
306           gssize sdata_name_len;
307           const gchar *sdata_name;
308 
309           sdata_name_len = 0;
310           sdata_name = log_msg_get_value_name(self->sdata[i], &sdata_name_len);
311           if (sdata_name_len > prefix_and_block_len &&
312               strncmp(sdata_name, name, prefix_and_block_len) == 0)
313             {
314               /* ok we have found the last SDATA entry that has the same block */
315               break;
316             }
317         }
318       i++;
319     }
320   else
321     i = -1;
322 
323   if (i >= 0)
324     {
325       memmove(&self->sdata[i+1], &self->sdata[i], (self->num_sdata - i) * sizeof(self->sdata[0]));
326       self->sdata[i] = handle;
327     }
328   else
329     {
330       self->sdata[self->num_sdata] = handle;
331     }
332   self->num_sdata++;
333 }
334 
335 static inline void
log_msg_update_sdata(LogMessage * self,NVHandle handle,const gchar * name,gssize name_len)336 log_msg_update_sdata(LogMessage *self, NVHandle handle, const gchar *name, gssize name_len)
337 {
338   guint8 flags;
339 
340   flags = nv_registry_get_handle_flags(logmsg_registry, handle);
341   if (G_UNLIKELY(flags & LM_VF_SDATA))
342     log_msg_update_sdata_slow(self, handle, name, name_len);
343 }
344 
345 NVHandle
log_msg_get_value_handle(const gchar * value_name)346 log_msg_get_value_handle(const gchar *value_name)
347 {
348   NVHandle handle;
349 
350   handle = nv_registry_alloc_handle(logmsg_registry, value_name);
351 
352   /* check if name starts with sd_prefix and has at least one additional character */
353   if (strncmp(value_name, logmsg_sd_prefix, logmsg_sd_prefix_len) == 0 && value_name[6])
354     {
355       nv_registry_set_handle_flags(logmsg_registry, handle, LM_VF_SDATA);
356     }
357 
358   return handle;
359 }
360 
361 gboolean
log_msg_is_value_name_valid(const gchar * value)362 log_msg_is_value_name_valid(const gchar *value)
363 {
364   if (strncmp(value, logmsg_sd_prefix, logmsg_sd_prefix_len) == 0)
365     {
366       const gchar *dot;
367       int dot_found = 0;
368 
369       dot = strchr(value, '.');
370       while (dot && strlen(dot) > 1)
371         {
372           dot_found++;
373           dot++;
374           dot = strchr(dot, '.');
375         }
376       return (dot_found >= 3);
377     }
378   else
379     return TRUE;
380 }
381 
382 const gchar *
log_msg_get_macro_value(const LogMessage * self,gint id,gssize * value_len)383 log_msg_get_macro_value(const LogMessage *self, gint id, gssize *value_len)
384 {
385   GString *value;
386 
387   value = g_private_get(&priv_macro_value);
388   if (!value)
389     {
390       value = g_string_sized_new(256);
391       g_private_replace(&priv_macro_value, value);
392     }
393   g_string_truncate(value, 0);
394 
395   log_macro_expand_simple(value, id, self);
396   if (value_len)
397     *value_len = value->len;
398   return value->str;
399 }
400 
401 gboolean
log_msg_is_handle_macro(NVHandle handle)402 log_msg_is_handle_macro(NVHandle handle)
403 {
404   guint16 flags;
405 
406   flags = nv_registry_get_handle_flags(logmsg_registry, handle);
407   return !!(flags & LM_VF_MACRO);
408 }
409 
410 gboolean
log_msg_is_handle_sdata(NVHandle handle)411 log_msg_is_handle_sdata(NVHandle handle)
412 {
413   guint16 flags;
414 
415   flags = nv_registry_get_handle_flags(logmsg_registry, handle);
416   return !!(flags & LM_VF_SDATA);
417 }
418 
419 gboolean
log_msg_is_handle_match(NVHandle handle)420 log_msg_is_handle_match(NVHandle handle)
421 {
422   g_assert(match_handles[0] && match_handles[255] && match_handles[0] < match_handles[255]);
423 
424   /* NOTE: match_handles are allocated sequentially in log_msg_registry_init(),
425    * so this simple & fast check is enough */
426   return (match_handles[0] <= handle && handle <= match_handles[255]);
427 }
428 
429 static void
log_msg_init_queue_node(LogMessage * msg,LogMessageQueueNode * node,const LogPathOptions * path_options)430 log_msg_init_queue_node(LogMessage *msg, LogMessageQueueNode *node, const LogPathOptions *path_options)
431 {
432   INIT_IV_LIST_HEAD(&node->list);
433   node->ack_needed = path_options->ack_needed;
434   node->flow_control_requested = path_options->flow_control_requested;
435   node->msg = log_msg_ref(msg);
436   log_msg_write_protect(msg);
437 }
438 
439 /*
440  * Allocates a new LogMessageQueueNode instance to be enqueued in a
441  * LogQueue.
442  *
443  * NOTE: Assumed to be running in the source thread, and that the same
444  * LogMessage instance is only put into queue from the same thread (e.g.
445  * the related fields are _NOT_ locked).
446  */
447 LogMessageQueueNode *
log_msg_alloc_queue_node(LogMessage * msg,const LogPathOptions * path_options)448 log_msg_alloc_queue_node(LogMessage *msg, const LogPathOptions *path_options)
449 {
450   LogMessageQueueNode *node;
451 
452   if (msg->cur_node < msg->num_nodes)
453     {
454       node = &msg->nodes[msg->cur_node++];
455       node->embedded = TRUE;
456     }
457   else
458     {
459       gint nodes = (volatile gint) logmsg_queue_node_max;
460 
461       /* this is a racy update, but it doesn't really hurt if we lose an
462        * update or if we continue with a smaller value in parallel threads
463        * for some time yet, since the smaller number only means that we
464        * pre-allocate somewhat less LogMsgQueueNodes in the message
465        * structure, but will be fine regardless (if we'd overflow the
466        * pre-allocated space, we start allocating nodes dynamically from
467        * heap.
468        */
469       if (nodes < 32 && nodes <= msg->num_nodes)
470         logmsg_queue_node_max = msg->num_nodes + 1;
471       node = g_slice_new(LogMessageQueueNode);
472       node->embedded = FALSE;
473     }
474   log_msg_init_queue_node(msg, node, path_options);
475   return node;
476 }
477 
478 LogMessageQueueNode *
log_msg_alloc_dynamic_queue_node(LogMessage * msg,const LogPathOptions * path_options)479 log_msg_alloc_dynamic_queue_node(LogMessage *msg, const LogPathOptions *path_options)
480 {
481   LogMessageQueueNode *node;
482   node = g_slice_new(LogMessageQueueNode);
483   node->embedded = FALSE;
484   log_msg_init_queue_node(msg, node, path_options);
485   return node;
486 }
487 
488 void
log_msg_free_queue_node(LogMessageQueueNode * node)489 log_msg_free_queue_node(LogMessageQueueNode *node)
490 {
491   if (!node->embedded)
492     g_slice_free(LogMessageQueueNode, node);
493 }
494 
495 static gboolean
_log_name_value_updates(LogMessage * self)496 _log_name_value_updates(LogMessage *self)
497 {
498   /* we don't log name value updates for internal messages that are
499    * initialized at this point, as that may generate an endless recursion.
500    * log_msg_new_internal() calling log_msg_set_value(), which in turn
501    * generates an internal message, again calling log_msg_set_value()
502    */
503   return (self->flags & LF_INTERNAL) == 0;
504 }
505 
506 static inline gboolean
_value_invalidates_legacy_header(NVHandle handle)507 _value_invalidates_legacy_header(NVHandle handle)
508 {
509   return handle == LM_V_PROGRAM || handle == LM_V_PID;
510 }
511 
512 void
log_msg_set_value(LogMessage * self,NVHandle handle,const gchar * value,gssize value_len)513 log_msg_set_value(LogMessage *self, NVHandle handle, const gchar *value, gssize value_len)
514 {
515   const gchar *name;
516   gssize name_len;
517   gboolean new_entry = FALSE;
518 
519   g_assert(!log_msg_is_write_protected(self));
520 
521   if (handle == LM_V_NONE)
522     return;
523 
524   name_len = 0;
525   name = log_msg_get_value_name(handle, &name_len);
526 
527   if (_log_name_value_updates(self))
528     {
529       msg_trace("Setting value",
530                 evt_tag_str("name", name),
531                 evt_tag_printf("value", "%.*s", (gint) value_len, value),
532                 evt_tag_printf("msg", "%p", self));
533     }
534 
535   if (value_len < 0)
536     value_len = strlen(value);
537 
538   if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
539     {
540       self->payload = nv_table_clone(self->payload, name_len + value_len + 2);
541       log_msg_set_flag(self, LF_STATE_OWN_PAYLOAD);
542       self->allocated_bytes += self->payload->size;
543       stats_counter_add(count_allocated_bytes, self->payload->size);
544     }
545 
546   /* we need a loop here as a single realloc may not be enough. Might help
547    * if we pass how much bytes we need though. */
548 
549   while (!nv_table_add_value(self->payload, handle, name, name_len, value, value_len, &new_entry))
550     {
551       /* error allocating string in payload, reallocate */
552       guint32 old_size = self->payload->size;
553       if (!nv_table_realloc(self->payload, &self->payload))
554         {
555           /* can't grow the payload, it has reached the maximum size */
556           msg_info("Cannot store value for this log message, maximum size has been reached",
557                    evt_tag_int("maximum_payload", NV_TABLE_MAX_BYTES),
558                    evt_tag_str("name", name),
559                    evt_tag_printf("value", "%.32s%s", value, value_len > 32 ? "..." : ""));
560           break;
561         }
562       guint32 new_size = self->payload->size;
563       self->allocated_bytes += (new_size - old_size);
564       stats_counter_add(count_allocated_bytes, new_size-old_size);
565       stats_counter_inc(count_payload_reallocs);
566     }
567 
568   if (new_entry)
569     log_msg_update_sdata(self, handle, name, name_len);
570 
571   if (_value_invalidates_legacy_header(handle))
572     log_msg_unset_value(self, LM_V_LEGACY_MSGHDR);
573 }
574 
575 void
log_msg_unset_value(LogMessage * self,NVHandle handle)576 log_msg_unset_value(LogMessage *self, NVHandle handle)
577 {
578   g_assert(!log_msg_is_write_protected(self));
579 
580   if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
581     {
582       self->payload = nv_table_clone(self->payload, 0);
583       log_msg_set_flag(self, LF_STATE_OWN_PAYLOAD);
584     }
585 
586   while (!nv_table_unset_value(self->payload, handle))
587     {
588       /* error allocating string in payload, reallocate */
589       guint32 old_size = self->payload->size;
590       if (!nv_table_realloc(self->payload, &self->payload))
591         {
592           /* can't grow the payload, it has reached the maximum size */
593           const gchar *name = log_msg_get_value_name(handle, NULL);
594           msg_info("Cannot unset value for this log message, maximum size has been reached",
595                    evt_tag_int("maximum_payload", NV_TABLE_MAX_BYTES),
596                    evt_tag_str("name", name));
597           break;
598         }
599       guint32 new_size = self->payload->size;
600       self->allocated_bytes += (new_size - old_size);
601       stats_counter_add(count_allocated_bytes, new_size-old_size);
602       stats_counter_inc(count_payload_reallocs);
603     }
604 
605   if (_value_invalidates_legacy_header(handle))
606     log_msg_unset_value(self, LM_V_LEGACY_MSGHDR);
607 }
608 
609 void
log_msg_unset_value_by_name(LogMessage * self,const gchar * name)610 log_msg_unset_value_by_name(LogMessage *self, const gchar *name)
611 {
612   log_msg_unset_value(self, log_msg_get_value_handle(name));
613 }
614 
615 void
log_msg_set_value_indirect(LogMessage * self,NVHandle handle,NVHandle ref_handle,guint8 type,guint16 ofs,guint16 len)616 log_msg_set_value_indirect(LogMessage *self, NVHandle handle, NVHandle ref_handle, guint8 type, guint16 ofs,
617                            guint16 len)
618 {
619   const gchar *name;
620   gssize name_len;
621   gboolean new_entry = FALSE;
622 
623   g_assert(!log_msg_is_write_protected(self));
624 
625   if (handle == LM_V_NONE)
626     return;
627 
628   g_assert(handle >= LM_V_MAX);
629 
630   name_len = 0;
631   name = log_msg_get_value_name(handle, &name_len);
632 
633   if (_log_name_value_updates(self))
634     {
635       msg_trace("Setting indirect value",
636                 evt_tag_printf("msg", "%p", self),
637                 evt_tag_str("name", name),
638                 evt_tag_int("ref_handle", ref_handle),
639                 evt_tag_int("ofs", ofs),
640                 evt_tag_int("len", len));
641     }
642 
643   if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
644     {
645       self->payload = nv_table_clone(self->payload, name_len + 1);
646       log_msg_set_flag(self, LF_STATE_OWN_PAYLOAD);
647     }
648 
649   NVReferencedSlice referenced_slice =
650   {
651     .handle = ref_handle,
652     .ofs = ofs,
653     .len = len,
654     .type = type
655   };
656 
657   while (!nv_table_add_value_indirect(self->payload, handle, name, name_len, &referenced_slice, &new_entry))
658     {
659       /* error allocating string in payload, reallocate */
660       if (!nv_table_realloc(self->payload, &self->payload))
661         {
662           /* error growing the payload, skip without storing the value */
663           msg_info("Cannot store referenced value for this log message, maximum size has been reached",
664                    evt_tag_str("name", name),
665                    evt_tag_str("ref-name", log_msg_get_value_name(ref_handle, NULL)));
666           break;
667         }
668       stats_counter_inc(count_payload_reallocs);
669     }
670 
671   if (new_entry)
672     log_msg_update_sdata(self, handle, name, name_len);
673 }
674 
675 gboolean
log_msg_values_foreach(const LogMessage * self,NVTableForeachFunc func,gpointer user_data)676 log_msg_values_foreach(const LogMessage *self, NVTableForeachFunc func, gpointer user_data)
677 {
678   return nv_table_foreach(self->payload, logmsg_registry, func, user_data);
679 }
680 
681 void
log_msg_set_match(LogMessage * self,gint index_,const gchar * value,gssize value_len)682 log_msg_set_match(LogMessage *self, gint index_, const gchar *value, gssize value_len)
683 {
684   g_assert(index_ < 256);
685 
686   if (index_ >= self->num_matches)
687     self->num_matches = index_ + 1;
688   log_msg_set_value(self, match_handles[index_], value, value_len);
689 }
690 
691 void
log_msg_set_match_indirect(LogMessage * self,gint index_,NVHandle ref_handle,guint8 type,guint16 ofs,guint16 len)692 log_msg_set_match_indirect(LogMessage *self, gint index_, NVHandle ref_handle, guint8 type, guint16 ofs, guint16 len)
693 {
694   g_assert(index_ < 256);
695 
696   log_msg_set_value_indirect(self, match_handles[index_], ref_handle, type, ofs, len);
697 }
698 
699 void
log_msg_clear_matches(LogMessage * self)700 log_msg_clear_matches(LogMessage *self)
701 {
702   gint i;
703 
704   for (i = 0; i < self->num_matches; i++)
705     {
706       log_msg_set_value(self, match_handles[i], "", 0);
707     }
708   self->num_matches = 0;
709 }
710 
711 #if GLIB_SIZEOF_LONG != GLIB_SIZEOF_VOID_P
712 #error "The tags bit array assumes that long is the same size as the pointer"
713 #endif
714 
715 #if GLIB_SIZEOF_LONG == 8
716 #define LOGMSG_TAGS_NDX_SHIFT 6
717 #define LOGMSG_TAGS_NDX_MASK  0x3F
718 #define LOGMSG_TAGS_BITS      64
719 #elif GLIB_SIZEOF_LONG == 4
720 #define LOGMSG_TAGS_NDX_SHIFT 5
721 #define LOGMSG_TAGS_NDX_MASK  0x1F
722 #define LOGMSG_TAGS_BITS      32
723 #else
724 #error "Unsupported word length, only 32 or 64 bit platforms are supported"
725 #endif
726 
727 static inline void
log_msg_tags_foreach_item(const LogMessage * self,gint base,gulong item,LogMessageTagsForeachFunc callback,gpointer user_data)728 log_msg_tags_foreach_item(const LogMessage *self, gint base, gulong item, LogMessageTagsForeachFunc callback,
729                           gpointer user_data)
730 {
731   gint i;
732 
733   for (i = 0; i < LOGMSG_TAGS_BITS; i++)
734     {
735       if (G_LIKELY(!item))
736         return;
737       if (item & 1)
738         {
739           LogTagId id = (LogTagId) base + i;
740 
741           callback(self, id, log_tags_get_by_id(id), user_data);
742         }
743       item >>= 1;
744     }
745 }
746 
747 
748 void
log_msg_tags_foreach(const LogMessage * self,LogMessageTagsForeachFunc callback,gpointer user_data)749 log_msg_tags_foreach(const LogMessage *self, LogMessageTagsForeachFunc callback, gpointer user_data)
750 {
751   guint i;
752 
753   if (self->num_tags == 0)
754     {
755       log_msg_tags_foreach_item(self, 0, (gulong) self->tags, callback, user_data);
756     }
757   else
758     {
759       for (i = 0; i != self->num_tags; ++i)
760         {
761           log_msg_tags_foreach_item(self, i * LOGMSG_TAGS_BITS, self->tags[i], callback, user_data);
762         }
763     }
764 }
765 
766 
767 static inline void
log_msg_set_bit(gulong * tags,gint index_,gboolean value)768 log_msg_set_bit(gulong *tags, gint index_, gboolean value)
769 {
770   if (value)
771     tags[index_ >> LOGMSG_TAGS_NDX_SHIFT] |= ((gulong) (1UL << (index_ & LOGMSG_TAGS_NDX_MASK)));
772   else
773     tags[index_ >> LOGMSG_TAGS_NDX_SHIFT] &= ~((gulong) (1UL << (index_ & LOGMSG_TAGS_NDX_MASK)));
774 }
775 
776 static inline gboolean
log_msg_get_bit(gulong * tags,gint index_)777 log_msg_get_bit(gulong *tags, gint index_)
778 {
779   return !!(tags[index_ >> LOGMSG_TAGS_NDX_SHIFT] & ((gulong) (1UL << (index_ & LOGMSG_TAGS_NDX_MASK))));
780 }
781 
782 void
log_msg_set_tag_by_id_onoff(LogMessage * self,LogTagId id,gboolean on)783 log_msg_set_tag_by_id_onoff(LogMessage *self, LogTagId id, gboolean on)
784 {
785   gulong *old_tags;
786   gint old_num_tags;
787   gboolean inline_tags;
788 
789   g_assert(!log_msg_is_write_protected(self));
790   if (!log_msg_chk_flag(self, LF_STATE_OWN_TAGS) && self->num_tags)
791     {
792       self->tags = g_memdup(self->tags, sizeof(self->tags[0]) * self->num_tags);
793     }
794   log_msg_set_flag(self, LF_STATE_OWN_TAGS);
795 
796   /* if num_tags is 0, it means that we use inline storage of tags */
797   inline_tags = self->num_tags == 0;
798   if (inline_tags && id < LOGMSG_TAGS_BITS)
799     {
800       /* store this tag inline */
801       log_msg_set_bit((gulong *) &self->tags, id, on);
802     }
803   else
804     {
805       /* we can't put this tag inline, either because it is too large, or we don't have the inline space any more */
806 
807       if ((self->num_tags * LOGMSG_TAGS_BITS) <= id)
808         {
809           if (G_UNLIKELY(8159 < id))
810             {
811               msg_error("Maximum number of tags reached");
812               return;
813             }
814           old_num_tags = self->num_tags;
815           self->num_tags = (id / LOGMSG_TAGS_BITS) + 1;
816 
817           old_tags = self->tags;
818           if (old_num_tags)
819             self->tags = g_realloc(self->tags, sizeof(self->tags[0]) * self->num_tags);
820           else
821             self->tags = g_malloc(sizeof(self->tags[0]) * self->num_tags);
822           memset(&self->tags[old_num_tags], 0, (self->num_tags - old_num_tags) * sizeof(self->tags[0]));
823 
824           if (inline_tags)
825             self->tags[0] = (gulong) old_tags;
826         }
827 
828       log_msg_set_bit(self->tags, id, on);
829     }
830   if (on)
831     {
832       log_tags_inc_counter(id);
833     }
834   else
835     {
836       log_tags_dec_counter(id);
837     }
838 }
839 
840 void
log_msg_set_tag_by_id(LogMessage * self,LogTagId id)841 log_msg_set_tag_by_id(LogMessage *self, LogTagId id)
842 {
843   log_msg_set_tag_by_id_onoff(self, id, TRUE);
844 }
845 
846 void
log_msg_set_tag_by_name(LogMessage * self,const gchar * name)847 log_msg_set_tag_by_name(LogMessage *self, const gchar *name)
848 {
849   log_msg_set_tag_by_id_onoff(self, log_tags_get_by_name(name), TRUE);
850 }
851 
852 void
log_msg_clear_tag_by_id(LogMessage * self,LogTagId id)853 log_msg_clear_tag_by_id(LogMessage *self, LogTagId id)
854 {
855   log_msg_set_tag_by_id_onoff(self, id, FALSE);
856 }
857 
858 void
log_msg_clear_tag_by_name(LogMessage * self,const gchar * name)859 log_msg_clear_tag_by_name(LogMessage *self, const gchar *name)
860 {
861   log_msg_set_tag_by_id_onoff(self, log_tags_get_by_name(name), FALSE);
862 }
863 
864 gboolean
log_msg_is_tag_by_id(LogMessage * self,LogTagId id)865 log_msg_is_tag_by_id(LogMessage *self, LogTagId id)
866 {
867   if (G_UNLIKELY(8159 < id))
868     {
869       msg_error("Invalid tag", evt_tag_int("id", (gint) id));
870       return FALSE;
871     }
872   if (self->num_tags == 0 && id < LOGMSG_TAGS_BITS)
873     return log_msg_get_bit((gulong *) &self->tags, id);
874   else if (id < self->num_tags * LOGMSG_TAGS_BITS)
875     return log_msg_get_bit(self->tags, id);
876   else
877     return FALSE;
878 }
879 
880 gboolean
log_msg_is_tag_by_name(LogMessage * self,const gchar * name)881 log_msg_is_tag_by_name(LogMessage *self, const gchar *name)
882 {
883   return log_msg_is_tag_by_id(self, log_tags_get_by_name(name));
884 }
885 
886 /* structured data elements */
887 
888 static void
log_msg_sdata_append_key_escaped(GString * result,const gchar * sstr,gssize len)889 log_msg_sdata_append_key_escaped(GString *result, const gchar *sstr, gssize len)
890 {
891   /* The specification does not have any way to escape keys.
892    * The goal is to create syntactically valid structured data fields. */
893   const guchar *ustr = (const guchar *) sstr;
894 
895   for (gssize i = 0; i < len; i++)
896     {
897       if (!isascii(ustr[i]) || ustr[i] == '=' || ustr[i] == ' '
898           || ustr[i] == '[' || ustr[i] == ']' || ustr[i] == '"')
899         {
900           gchar hex_code[4];
901           g_sprintf(hex_code, "%%%02X", ustr[i]);
902           g_string_append(result, hex_code);
903         }
904       else
905         g_string_append_c(result, ustr[i]);
906     }
907 }
908 
909 static void
log_msg_sdata_append_escaped(GString * result,const gchar * sstr,gssize len)910 log_msg_sdata_append_escaped(GString *result, const gchar *sstr, gssize len)
911 {
912   gint i;
913   const guchar *ustr = (const guchar *) sstr;
914 
915   for (i = 0; i < len; i++)
916     {
917       if (ustr[i] == '"' || ustr[i] == '\\' || ustr[i] == ']')
918         {
919           g_string_append_c(result, '\\');
920           g_string_append_c(result, ustr[i]);
921         }
922       else
923         g_string_append_c(result, ustr[i]);
924     }
925 }
926 
927 void
log_msg_append_format_sdata(const LogMessage * self,GString * result,guint32 seq_num)928 log_msg_append_format_sdata(const LogMessage *self, GString *result,  guint32 seq_num)
929 {
930   const gchar *value;
931   const gchar *sdata_name, *sdata_elem, *sdata_param, *cur_elem = NULL, *dot;
932   gssize sdata_name_len, sdata_elem_len, sdata_param_len, cur_elem_len = 0, len;
933   gint i;
934   static NVHandle meta_seqid = 0;
935   gssize seqid_length;
936   gboolean has_seq_num = FALSE;
937   const gchar *seqid;
938 
939   if (!meta_seqid)
940     meta_seqid = log_msg_get_value_handle(".SDATA.meta.sequenceId");
941 
942   seqid = log_msg_get_value(self, meta_seqid, &seqid_length);
943   APPEND_ZERO(seqid, seqid, seqid_length);
944   if (seqid[0])
945     /* Message stores sequenceId */
946     has_seq_num = TRUE;
947   else
948     /* Message hasn't sequenceId */
949     has_seq_num = FALSE;
950 
951   for (i = 0; i < self->num_sdata; i++)
952     {
953       NVHandle handle = self->sdata[i];
954       guint16 handle_flags;
955       gint sd_id_len;
956 
957       sdata_name_len = 0;
958       sdata_name = log_msg_get_value_name(handle, &sdata_name_len);
959       handle_flags = nv_registry_get_handle_flags(logmsg_registry, handle);
960       value = log_msg_get_value_if_set(self, handle, &len);
961       if (!value)
962         continue;
963 
964       g_assert(handle_flags & LM_VF_SDATA);
965 
966       /* sdata_name always begins with .SDATA. */
967       g_assert(sdata_name_len > 6);
968 
969       sdata_elem = sdata_name + 7;
970       sd_id_len = (handle_flags >> 8);
971 
972       if (sd_id_len)
973         {
974           dot = sdata_elem + sd_id_len;
975           if (dot - sdata_name != sdata_name_len)
976             {
977               g_assert((dot - sdata_name < sdata_name_len) && *dot == '.');
978             }
979           else
980             {
981               /* Standalone sdata e.g. [[UserData.updatelist@18372.4]] */
982               dot = NULL;
983             }
984         }
985       else
986         {
987           dot = memrchr(sdata_elem, '.', sdata_name_len - 7);
988         }
989 
990       if (G_LIKELY(dot))
991         {
992           sdata_elem_len = dot - sdata_elem;
993 
994           sdata_param = dot + 1;
995           sdata_param_len = sdata_name_len - (dot + 1 - sdata_name);
996         }
997       else
998         {
999           sdata_elem_len = sdata_name_len - 7;
1000           if (sdata_elem_len == 0)
1001             {
1002               sdata_elem = "none";
1003               sdata_elem_len = 4;
1004             }
1005           sdata_param = "";
1006           sdata_param_len = 0;
1007         }
1008       if (!cur_elem || sdata_elem_len != cur_elem_len || strncmp(cur_elem, sdata_elem, sdata_elem_len) != 0)
1009         {
1010           if (cur_elem)
1011             {
1012               /* close the previous block */
1013               g_string_append_c(result, ']');
1014             }
1015 
1016           /* the current SD block has changed, emit a start */
1017           g_string_append_c(result, '[');
1018           log_msg_sdata_append_key_escaped(result, sdata_elem, sdata_elem_len);
1019 
1020           /* update cur_elem */
1021           cur_elem = sdata_elem;
1022           cur_elem_len = sdata_elem_len;
1023         }
1024       /* if message hasn't sequenceId and the cur_elem is the meta block Append the sequenceId for the result
1025          if seq_num isn't 0 */
1026       if (!has_seq_num && seq_num!=0 && strncmp(sdata_elem, "meta.", 5) == 0)
1027         {
1028           gchar sequence_id[16];
1029           g_snprintf(sequence_id, sizeof(sequence_id), "%d", seq_num);
1030           g_string_append_c(result, ' ');
1031           g_string_append_len(result, "sequenceId=\"", 12);
1032           g_string_append_len(result, sequence_id, strlen(sequence_id));
1033           g_string_append_c(result, '"');
1034           has_seq_num = TRUE;
1035         }
1036       if (sdata_param_len)
1037         {
1038           if (value)
1039             {
1040               g_string_append_c(result, ' ');
1041               log_msg_sdata_append_key_escaped(result, sdata_param, sdata_param_len);
1042               g_string_append(result, "=\"");
1043               log_msg_sdata_append_escaped(result, value, len);
1044               g_string_append_c(result, '"');
1045             }
1046         }
1047     }
1048   if (cur_elem)
1049     {
1050       g_string_append_c(result, ']');
1051     }
1052   /*
1053     There was no meta block and if sequenceId must be added (seq_num!=0)
1054     create the whole meta block with sequenceId
1055   */
1056   if (!has_seq_num && seq_num!=0)
1057     {
1058       gchar sequence_id[16];
1059       g_snprintf(sequence_id, sizeof(sequence_id), "%d", seq_num);
1060       g_string_append_c(result, '[');
1061       g_string_append_len(result, "meta sequenceId=\"", 17);
1062       g_string_append_len(result, sequence_id, strlen(sequence_id));
1063       g_string_append_len(result, "\"]", 2);
1064     }
1065 }
1066 
1067 void
log_msg_format_sdata(const LogMessage * self,GString * result,guint32 seq_num)1068 log_msg_format_sdata(const LogMessage *self, GString *result,  guint32 seq_num)
1069 {
1070   g_string_truncate(result, 0);
1071   log_msg_append_format_sdata(self, result, seq_num);
1072 }
1073 
1074 gboolean
log_msg_append_tags_callback(const LogMessage * self,LogTagId tag_id,const gchar * name,gpointer user_data)1075 log_msg_append_tags_callback(const LogMessage *self, LogTagId tag_id, const gchar *name, gpointer user_data)
1076 {
1077   GString *result = (GString *) ((gpointer *) user_data)[0];
1078   gint original_length = GPOINTER_TO_UINT(((gpointer *) user_data)[1]);
1079 
1080   g_assert(result);
1081 
1082   if (result->len > original_length)
1083     g_string_append_c(result, ',');
1084 
1085   str_repr_encode_append(result, name, -1, ",");
1086   return TRUE;
1087 }
1088 
1089 void
log_msg_print_tags(const LogMessage * self,GString * result)1090 log_msg_print_tags(const LogMessage *self, GString *result)
1091 {
1092   gpointer args[] = { result, GUINT_TO_POINTER(result->len) };
1093 
1094   log_msg_tags_foreach(self, log_msg_append_tags_callback, args);
1095 }
1096 
1097 void
log_msg_set_saddr(LogMessage * self,GSockAddr * saddr)1098 log_msg_set_saddr(LogMessage *self, GSockAddr *saddr)
1099 {
1100   log_msg_set_saddr_ref(self, g_sockaddr_ref(saddr));
1101 }
1102 
1103 void
log_msg_set_saddr_ref(LogMessage * self,GSockAddr * saddr)1104 log_msg_set_saddr_ref(LogMessage *self, GSockAddr *saddr)
1105 {
1106   if (log_msg_chk_flag(self, LF_STATE_OWN_SADDR))
1107     g_sockaddr_unref(self->saddr);
1108   self->saddr = saddr;
1109   self->flags |= LF_STATE_OWN_SADDR;
1110 }
1111 
1112 void
log_msg_set_daddr(LogMessage * self,GSockAddr * daddr)1113 log_msg_set_daddr(LogMessage *self, GSockAddr *daddr)
1114 {
1115   log_msg_set_daddr_ref(self, g_sockaddr_ref(daddr));
1116 }
1117 
1118 void
log_msg_set_daddr_ref(LogMessage * self,GSockAddr * daddr)1119 log_msg_set_daddr_ref(LogMessage *self, GSockAddr *daddr)
1120 {
1121   if (log_msg_chk_flag(self,  LF_STATE_OWN_DADDR))
1122     g_sockaddr_unref(self->daddr);
1123   self->daddr = daddr;
1124   self->flags |= LF_STATE_OWN_DADDR;
1125 }
1126 
1127 /**
1128  * log_msg_init:
1129  * @self: LogMessage instance
1130  * @saddr: sender address
1131  *
1132  * This function initializes a LogMessage instance without allocating it
1133  * first. It is used internally by the log_msg_new function.
1134  **/
1135 static void
log_msg_init(LogMessage * self)1136 log_msg_init(LogMessage *self)
1137 {
1138   GTimeVal tv;
1139 
1140   /* ref is set to 1, ack is set to 0 */
1141   self->ack_and_ref_and_abort_and_suspended = LOGMSG_REFCACHE_REF_TO_VALUE(1);
1142   cached_g_current_time(&tv);
1143   self->timestamps[LM_TS_RECVD].ut_sec = tv.tv_sec;
1144   self->timestamps[LM_TS_RECVD].ut_usec = tv.tv_usec;
1145   self->timestamps[LM_TS_RECVD].ut_gmtoff = get_local_timezone_ofs(self->timestamps[LM_TS_RECVD].ut_sec);
1146   self->timestamps[LM_TS_STAMP] = self->timestamps[LM_TS_RECVD];
1147   unix_time_unset(&self->timestamps[LM_TS_PROCESSED]);
1148 
1149   self->sdata = NULL;
1150   self->saddr = NULL;
1151   self->daddr = NULL;
1152 
1153   self->original = NULL;
1154   self->flags |= LF_STATE_OWN_MASK;
1155   self->pri = LOG_USER | LOG_NOTICE;
1156 
1157   self->rcptid = rcptid_generate_id();
1158   log_msg_set_host_id(self);
1159 }
1160 
1161 void
log_msg_clear(LogMessage * self)1162 log_msg_clear(LogMessage *self)
1163 {
1164   if(log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD))
1165     nv_table_unref(self->payload);
1166   self->payload = nv_table_new(LM_V_MAX, 16, 256);
1167 
1168   if (log_msg_chk_flag(self, LF_STATE_OWN_TAGS) && self->tags)
1169     {
1170       gboolean inline_tags = self->num_tags == 0;
1171 
1172       if (inline_tags)
1173         self->tags = NULL;
1174       else
1175         memset(self->tags, 0, self->num_tags * sizeof(self->tags[0]));
1176     }
1177   else
1178     {
1179       self->tags = NULL;
1180       self->num_tags = 0;
1181     }
1182 
1183   self->num_matches = 0;
1184   if (!log_msg_chk_flag(self, LF_STATE_OWN_SDATA))
1185     {
1186       self->sdata = NULL;
1187       self->alloc_sdata = 0;
1188     }
1189   self->num_sdata = 0;
1190 
1191   if (log_msg_chk_flag(self, LF_STATE_OWN_SADDR))
1192     g_sockaddr_unref(self->saddr);
1193   self->saddr = NULL;
1194   if (log_msg_chk_flag(self, LF_STATE_OWN_DADDR))
1195     g_sockaddr_unref(self->daddr);
1196   self->daddr = NULL;
1197 
1198   /* clear "local", "utf8", "internal", "mark" and similar flags, we start afresh */
1199   self->flags = LF_STATE_OWN_MASK;
1200 }
1201 
1202 static inline LogMessage *
log_msg_alloc(gsize payload_size)1203 log_msg_alloc(gsize payload_size)
1204 {
1205   LogMessage *msg;
1206   gsize payload_space = payload_size ? nv_table_get_alloc_size(LM_V_MAX, 16, payload_size) : 0;
1207   gsize alloc_size, payload_ofs = 0;
1208 
1209   /* NOTE: logmsg_node_max is updated from parallel threads without locking. */
1210   gint nodes = (volatile gint) logmsg_queue_node_max;
1211 
1212   alloc_size = sizeof(LogMessage) + sizeof(LogMessageQueueNode) * nodes;
1213   /* align to 8 boundary */
1214   if (payload_size)
1215     {
1216       alloc_size = (alloc_size + 7) & ~7;
1217       payload_ofs = alloc_size;
1218       alloc_size += payload_space;
1219     }
1220   msg = g_malloc(alloc_size);
1221 
1222   memset(msg, 0, sizeof(LogMessage));
1223 
1224   if (payload_size)
1225     msg->payload = nv_table_init_borrowed(((gchar *) msg) + payload_ofs, payload_space, LM_V_MAX);
1226 
1227   msg->num_nodes = nodes;
1228   msg->allocated_bytes = alloc_size + payload_space;
1229   stats_counter_add(count_allocated_bytes, msg->allocated_bytes);
1230   return msg;
1231 }
1232 
1233 static gboolean
_merge_value(NVHandle handle,const gchar * name,const gchar * value,gssize value_len,gpointer user_data)1234 _merge_value(NVHandle handle, const gchar *name, const gchar *value, gssize value_len, gpointer user_data)
1235 {
1236   LogMessage *msg = (LogMessage *) user_data;
1237 
1238   if (!nv_table_is_value_set(msg->payload, handle))
1239     log_msg_set_value(msg, handle, value, value_len);
1240   return FALSE;
1241 }
1242 
1243 void
log_msg_merge_context(LogMessage * self,LogMessage ** context,gsize context_len)1244 log_msg_merge_context(LogMessage *self, LogMessage **context, gsize context_len)
1245 {
1246   gint i;
1247 
1248   for (i = context_len - 1; i >= 0; i--)
1249     {
1250       LogMessage *msg_to_be_merged = context[i];
1251 
1252       log_msg_values_foreach(msg_to_be_merged, _merge_value, self);
1253     }
1254 }
1255 
1256 static void
log_msg_clone_ack(LogMessage * msg,AckType ack_type)1257 log_msg_clone_ack(LogMessage *msg, AckType ack_type)
1258 {
1259   LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
1260 
1261   g_assert(msg->original);
1262   path_options.ack_needed = TRUE;
1263   log_msg_ack(msg->original, &path_options, ack_type);
1264 }
1265 
1266 /*
1267  * log_msg_clone_cow:
1268  *
1269  * Clone a copy-on-write (cow) copy of a log message.
1270  */
1271 LogMessage *
log_msg_clone_cow(LogMessage * msg,const LogPathOptions * path_options)1272 log_msg_clone_cow(LogMessage *msg, const LogPathOptions *path_options)
1273 {
1274   LogMessage *self = log_msg_alloc(0);
1275   gsize allocated_bytes = self->allocated_bytes;
1276 
1277   stats_counter_inc(count_msg_clones);
1278   log_msg_write_protect(msg);
1279 
1280   memcpy(self, msg, sizeof(*msg));
1281   msg->allocated_bytes = allocated_bytes;
1282 
1283   msg_trace("Message was cloned",
1284             evt_tag_printf("original_msg", "%p", msg),
1285             evt_tag_printf("new_msg", "%p", self));
1286 
1287   /* every field _must_ be initialized explicitly if its direct
1288    * copying would cause problems (like copying a pointer by value) */
1289 
1290   /* reference the original message */
1291   self->original = log_msg_ref(msg);
1292   self->ack_and_ref_and_abort_and_suspended = LOGMSG_REFCACHE_REF_TO_VALUE(1) + LOGMSG_REFCACHE_ACK_TO_VALUE(
1293                                                 0) + LOGMSG_REFCACHE_ABORT_TO_VALUE(0);
1294   self->cur_node = 0;
1295   self->protected = FALSE;
1296 
1297   log_msg_add_ack(self, path_options);
1298   if (!path_options->ack_needed)
1299     {
1300       self->ack_func  = NULL;
1301     }
1302   else
1303     {
1304       self->ack_func = log_msg_clone_ack;
1305     }
1306 
1307   self->flags &= ~LF_STATE_MASK;
1308 
1309   if (self->num_tags == 0)
1310     self->flags |= LF_STATE_OWN_TAGS;
1311   return self;
1312 }
1313 
1314 static gsize
_determine_payload_size(gint length,MsgFormatOptions * parse_options)1315 _determine_payload_size(gint length, MsgFormatOptions *parse_options)
1316 {
1317   gsize payload_size;
1318 
1319   if ((parse_options->flags & LP_STORE_RAW_MESSAGE))
1320     payload_size = length * 4;
1321   else
1322     payload_size = length * 2;
1323 
1324   return MAX(payload_size, 256);
1325 }
1326 
1327 /**
1328  * log_msg_new:
1329  * @msg: message to parse
1330  * @length: length of @msg
1331  * @flags: parse flags (LP_*)
1332  *
1333  * This function allocates, parses and returns a new LogMessage instance.
1334  **/
1335 LogMessage *
log_msg_new(const gchar * msg,gint length,MsgFormatOptions * parse_options)1336 log_msg_new(const gchar *msg, gint length,
1337             MsgFormatOptions *parse_options)
1338 {
1339   LogMessage *self = log_msg_alloc(_determine_payload_size(length, parse_options));
1340 
1341   log_msg_init(self);
1342 
1343   msg_trace("Initial message parsing follows");
1344   msg_format_parse(parse_options, self, (guchar *) msg, length);
1345   return self;
1346 }
1347 
1348 LogMessage *
log_msg_new_empty(void)1349 log_msg_new_empty(void)
1350 {
1351   LogMessage *self = log_msg_alloc(256);
1352 
1353   log_msg_init(self);
1354   return self;
1355 }
1356 
1357 /* This function creates a new log message that should be considered local */
1358 LogMessage *
log_msg_new_local(void)1359 log_msg_new_local(void)
1360 {
1361   LogMessage *self = log_msg_new_empty();
1362 
1363   self->flags |= LF_LOCAL;
1364   return self;
1365 }
1366 
1367 
1368 /**
1369  * log_msg_new_internal:
1370  * @prio: message priority (LOG_*)
1371  * @msg: message text
1372  * @flags: parse flags (LP_*)
1373  *
1374  * This function creates a new log message for messages originating
1375  * internally to syslog-ng
1376  **/
1377 LogMessage *
log_msg_new_internal(gint prio,const gchar * msg)1378 log_msg_new_internal(gint prio, const gchar *msg)
1379 {
1380   gchar buf[32];
1381   LogMessage *self;
1382 
1383   g_snprintf(buf, sizeof(buf), "%d", (int) getpid());
1384   self = log_msg_new_local();
1385   self->flags |= LF_INTERNAL;
1386   self->initial_parse = TRUE;
1387   log_msg_set_value(self, LM_V_PROGRAM, "syslog-ng", 9);
1388   log_msg_set_value(self, LM_V_PID, buf, -1);
1389   log_msg_set_value(self, LM_V_MESSAGE, msg, -1);
1390   self->initial_parse = FALSE;
1391   self->pri = prio;
1392 
1393   return self;
1394 }
1395 
1396 /**
1397  * log_msg_new_mark:
1398  *
1399  * This function returns a new MARK message. MARK messages have the LF_MARK
1400  * flag set.
1401  **/
1402 LogMessage *
log_msg_new_mark(void)1403 log_msg_new_mark(void)
1404 {
1405   LogMessage *self = log_msg_new_local();
1406 
1407   log_msg_set_value(self, LM_V_MESSAGE, "-- MARK --", 10);
1408   self->pri = LOG_SYSLOG | LOG_INFO;
1409   self->flags |= LF_MARK | LF_INTERNAL;
1410   return self;
1411 }
1412 
1413 /**
1414  * log_msg_free:
1415  * @self: LogMessage instance
1416  *
1417  * Frees a LogMessage instance.
1418  **/
1419 static void
log_msg_free(LogMessage * self)1420 log_msg_free(LogMessage *self)
1421 {
1422   if (log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD) && self->payload)
1423     nv_table_unref(self->payload);
1424   if (log_msg_chk_flag(self, LF_STATE_OWN_TAGS) && self->tags && self->num_tags > 0)
1425     g_free(self->tags);
1426 
1427   if (log_msg_chk_flag(self, LF_STATE_OWN_SDATA) && self->sdata)
1428     g_free(self->sdata);
1429   if (log_msg_chk_flag(self, LF_STATE_OWN_SADDR))
1430     g_sockaddr_unref(self->saddr);
1431   if (log_msg_chk_flag(self, LF_STATE_OWN_DADDR))
1432     g_sockaddr_unref(self->daddr);
1433 
1434   if (self->original)
1435     log_msg_unref(self->original);
1436 
1437   stats_counter_sub(count_allocated_bytes, self->allocated_bytes);
1438 
1439   g_free(self);
1440 }
1441 
1442 /**
1443  * log_msg_drop:
1444  * @msg: LogMessage instance
1445  * @path_options: path specific options
1446  *
1447  * This function is called whenever a destination driver feels that it is
1448  * unable to process this message. It acks and unrefs the message.
1449  **/
1450 void
log_msg_drop(LogMessage * msg,const LogPathOptions * path_options,AckType ack_type)1451 log_msg_drop(LogMessage *msg, const LogPathOptions *path_options, AckType ack_type)
1452 {
1453   log_msg_ack(msg, path_options, ack_type);
1454   log_msg_unref(msg);
1455 }
1456 
1457 static AckType
_ack_and_ref_and_abort_and_suspend_to_acktype(gint value)1458 _ack_and_ref_and_abort_and_suspend_to_acktype(gint value)
1459 {
1460   AckType type = AT_PROCESSED;
1461 
1462   if (IS_SUSPENDFLAG_ON(LOGMSG_REFCACHE_VALUE_TO_SUSPEND(value)))
1463     type = AT_SUSPENDED;
1464   else if (IS_ABORTFLAG_ON(LOGMSG_REFCACHE_VALUE_TO_ABORT(value)))
1465     type = AT_ABORTED;
1466 
1467   return type;
1468 }
1469 
1470 
1471 /***************************************************************************************
1472  * In order to read & understand this code, reading the comment on the top
1473  * of this file about ref/ack handling is strongly recommended.
1474  ***************************************************************************************/
1475 
1476 /* Function to update the combined ACK (with the abort flag) and REF counter. */
1477 static inline gint
log_msg_update_ack_and_ref_and_abort_and_suspended(LogMessage * self,gint add_ref,gint add_ack,gint add_abort,gint add_suspend)1478 log_msg_update_ack_and_ref_and_abort_and_suspended(LogMessage *self, gint add_ref, gint add_ack, gint add_abort,
1479                                                    gint add_suspend)
1480 {
1481   gint old_value, new_value;
1482   do
1483     {
1484       new_value = old_value = (volatile gint) self->ack_and_ref_and_abort_and_suspended;
1485       new_value = (new_value & ~LOGMSG_REFCACHE_REF_MASK)   + LOGMSG_REFCACHE_REF_TO_VALUE(  (LOGMSG_REFCACHE_VALUE_TO_REF(
1486                     old_value)   + add_ref));
1487       new_value = (new_value & ~LOGMSG_REFCACHE_ACK_MASK)   + LOGMSG_REFCACHE_ACK_TO_VALUE(  (LOGMSG_REFCACHE_VALUE_TO_ACK(
1488                     old_value)   + add_ack));
1489       new_value = (new_value & ~LOGMSG_REFCACHE_ABORT_MASK) + LOGMSG_REFCACHE_ABORT_TO_VALUE((LOGMSG_REFCACHE_VALUE_TO_ABORT(
1490                     old_value) | add_abort));
1491       new_value = (new_value & ~LOGMSG_REFCACHE_SUSPEND_MASK) + LOGMSG_REFCACHE_SUSPEND_TO_VALUE((
1492                     LOGMSG_REFCACHE_VALUE_TO_SUSPEND(old_value) | add_suspend));
1493     }
1494   while (!g_atomic_int_compare_and_exchange(&self->ack_and_ref_and_abort_and_suspended, old_value, new_value));
1495 
1496   return old_value;
1497 }
1498 
1499 /* Function to update the combined ACK (without abort) and REF counter. */
1500 static inline gint
log_msg_update_ack_and_ref(LogMessage * self,gint add_ref,gint add_ack)1501 log_msg_update_ack_and_ref(LogMessage *self, gint add_ref, gint add_ack)
1502 {
1503   return log_msg_update_ack_and_ref_and_abort_and_suspended(self, add_ref, add_ack, 0, 0);
1504 }
1505 
1506 /**
1507  * log_msg_ref:
1508  * @self: LogMessage instance
1509  *
1510  * Increment reference count of @self and return the new reference.
1511  **/
1512 LogMessage *
log_msg_ref(LogMessage * self)1513 log_msg_ref(LogMessage *self)
1514 {
1515   gint old_value;
1516 
1517   if (G_LIKELY(logmsg_current == self))
1518     {
1519       /* fastpath, @self is the current message, ref/unref processing is
1520        * delayed until log_msg_refcache_stop() is called */
1521 
1522       logmsg_cached_refs++;
1523       return self;
1524     }
1525 
1526   /* slow path, refcache is not used, do the ordinary way */
1527   old_value = log_msg_update_ack_and_ref(self, 1, 0);
1528   g_assert(LOGMSG_REFCACHE_VALUE_TO_REF(old_value) >= 1);
1529   return self;
1530 }
1531 
1532 /**
1533  * log_msg_unref:
1534  * @self: LogMessage instance
1535  *
1536  * Decrement reference count and free self if the reference count becomes 0.
1537  **/
1538 void
log_msg_unref(LogMessage * self)1539 log_msg_unref(LogMessage *self)
1540 {
1541   gint old_value;
1542 
1543   if (G_LIKELY(logmsg_current == self))
1544     {
1545       /* fastpath, @self is the current message, ref/unref processing is
1546        * delayed until log_msg_refcache_stop() is called */
1547 
1548       logmsg_cached_refs--;
1549       return;
1550     }
1551 
1552   old_value = log_msg_update_ack_and_ref(self, -1, 0);
1553   g_assert(LOGMSG_REFCACHE_VALUE_TO_REF(old_value) >= 1);
1554 
1555   if (LOGMSG_REFCACHE_VALUE_TO_REF(old_value) == 1)
1556     {
1557       log_msg_free(self);
1558     }
1559 }
1560 
1561 /**
1562  * log_msg_add_ack:
1563  * @m: LogMessage instance
1564  *
1565  * This function increments the number of required acknowledges.
1566  **/
1567 void
log_msg_add_ack(LogMessage * self,const LogPathOptions * path_options)1568 log_msg_add_ack(LogMessage *self, const LogPathOptions *path_options)
1569 {
1570   if (path_options->ack_needed)
1571     {
1572       if (G_LIKELY(logmsg_current == self))
1573         {
1574           /* fastpath, @self is the current message, add_ack/ack processing is
1575            * delayed until log_msg_refcache_stop() is called */
1576 
1577           logmsg_cached_acks++;
1578           logmsg_cached_ack_needed = TRUE;
1579           return;
1580         }
1581       log_msg_update_ack_and_ref(self, 0, 1);
1582     }
1583 }
1584 
1585 
1586 /**
1587  * log_msg_ack:
1588  * @msg: LogMessage instance
1589  * @path_options: path specific options
1590  * @acked: TRUE: positive ack, FALSE: negative ACK
1591  *
1592  * Indicate that the message was processed successfully and the sender can
1593  * queue further messages.
1594  **/
1595 void
log_msg_ack(LogMessage * self,const LogPathOptions * path_options,AckType ack_type)1596 log_msg_ack(LogMessage *self, const LogPathOptions *path_options, AckType ack_type)
1597 {
1598   gint old_value;
1599 
1600   if (path_options->ack_needed)
1601     {
1602       if (G_LIKELY(logmsg_current == self))
1603         {
1604           /* fastpath, @self is the current message, add_ack/ack processing is
1605            * delayed until log_msg_refcache_stop() is called */
1606 
1607           logmsg_cached_acks--;
1608           logmsg_cached_abort |= IS_ACK_ABORTED(ack_type);
1609           logmsg_cached_suspend |= IS_ACK_SUSPENDED(ack_type);
1610           return;
1611         }
1612       old_value = log_msg_update_ack_and_ref_and_abort_and_suspended(self, 0, -1, IS_ACK_ABORTED(ack_type),
1613                   IS_ACK_SUSPENDED(ack_type));
1614       if (LOGMSG_REFCACHE_VALUE_TO_ACK(old_value) == 1)
1615         {
1616           if (ack_type == AT_SUSPENDED)
1617             self->ack_func(self, AT_SUSPENDED);
1618           else if (ack_type == AT_ABORTED)
1619             self->ack_func(self, AT_ABORTED);
1620           else
1621             self->ack_func(self, _ack_and_ref_and_abort_and_suspend_to_acktype(old_value));
1622         }
1623     }
1624 }
1625 
1626 /*
1627  * Break out of an acknowledgement chain. The incoming message is
1628  * ACKed and a new path options structure is returned that can be used
1629  * to send to further consuming pipes.
1630  */
1631 const LogPathOptions *
log_msg_break_ack(LogMessage * msg,const LogPathOptions * path_options,LogPathOptions * local_options)1632 log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOptions *local_options)
1633 {
1634   /* NOTE: in case the user requested flow control, we can't break the
1635    * ACK chain, as that would lead to early acks, that would cause
1636    * message loss */
1637 
1638   g_assert(!path_options->flow_control_requested);
1639 
1640   log_msg_ack(msg, path_options, AT_PROCESSED);
1641 
1642   *local_options = *path_options;
1643   local_options->ack_needed = FALSE;
1644 
1645   return local_options;
1646 }
1647 
1648 
1649 /*
1650  * Start caching ref/unref/ack/add-ack operations in the current thread for
1651  * the message specified by @self.  See the comment at the top of this file
1652  * for more information.
1653  *
1654  * This function is to be called by the producer thread (e.g. the one
1655  * that generates new messages). You should use
1656  * log_msg_refcache_start_consumer() in consumer threads instead.
1657  *
1658  * This function cannot be called for the same message from multiple
1659  * threads.
1660  *
1661  */
1662 void
log_msg_refcache_start_producer(LogMessage * self)1663 log_msg_refcache_start_producer(LogMessage *self)
1664 {
1665   g_assert(logmsg_current == NULL);
1666 
1667   logmsg_current = self;
1668   /* we're the producer of said message, and thus we want to inhibit
1669    * freeing/acking it due to our cached refs, add a bias large enough
1670    * to cover any possible unrefs/acks of the consumer side */
1671 
1672   /* we don't need to be thread-safe here, as a producer has just created this message and no parallel access is yet possible */
1673 
1674   self->ack_and_ref_and_abort_and_suspended = (self->ack_and_ref_and_abort_and_suspended & ~LOGMSG_REFCACHE_REF_MASK) +
1675                                               LOGMSG_REFCACHE_REF_TO_VALUE((LOGMSG_REFCACHE_VALUE_TO_REF(self->ack_and_ref_and_abort_and_suspended) +
1676                                                   LOGMSG_REFCACHE_BIAS));
1677   self->ack_and_ref_and_abort_and_suspended = (self->ack_and_ref_and_abort_and_suspended & ~LOGMSG_REFCACHE_ACK_MASK) +
1678                                               LOGMSG_REFCACHE_ACK_TO_VALUE((LOGMSG_REFCACHE_VALUE_TO_ACK(self->ack_and_ref_and_abort_and_suspended) +
1679                                                   LOGMSG_REFCACHE_BIAS));
1680 
1681   logmsg_cached_refs = -LOGMSG_REFCACHE_BIAS;
1682   logmsg_cached_acks = -LOGMSG_REFCACHE_BIAS;
1683   logmsg_cached_abort = FALSE;
1684   logmsg_cached_suspend = FALSE;
1685   logmsg_cached_ack_needed = TRUE;
1686 }
1687 
1688 /*
1689  * Start caching ref/unref/ack/add-ack operations in the current thread for
1690  * the message specified by @self.  See the comment at the top of this file
1691  * for more information.
1692  *
1693  * This function is to be called by the consumer threads (e.g. the
1694  * ones that consume messages).
1695  *
1696  * This function can be called from multiple consumer threads at the
1697  * same time, even for the same message.
1698  *
1699  */
1700 void
log_msg_refcache_start_consumer(LogMessage * self,const LogPathOptions * path_options)1701 log_msg_refcache_start_consumer(LogMessage *self, const LogPathOptions *path_options)
1702 {
1703   g_assert(logmsg_current == NULL);
1704 
1705   logmsg_current = self;
1706   logmsg_cached_ack_needed = path_options->ack_needed;
1707   logmsg_cached_refs = 0;
1708   logmsg_cached_acks = 0;
1709   logmsg_cached_abort = FALSE;
1710   logmsg_cached_suspend = FALSE;
1711 }
1712 
1713 /*
1714  * Stop caching ref/unref/ack/add-ack operations in the current thread for
1715  * the message specified by the log_msg_refcache_start() function.
1716  *
1717  * See the comment at the top of this file for more information.
1718  */
1719 void
log_msg_refcache_stop(void)1720 log_msg_refcache_stop(void)
1721 {
1722   gint old_value;
1723   gint current_cached_acks;
1724   gboolean current_cached_abort;
1725   gboolean current_cached_suspend;
1726 
1727   g_assert(logmsg_current != NULL);
1728 
1729   /* validate that we didn't overflow the counters:
1730    *
1731    * Both counters must be:
1732    *
1733    * - at least 1 smaller than the bias, rationale:
1734    *
1735    *      - if we are caching "bias" number of refs, it may happen
1736    *        that there are bias number of unrefs, potentially running
1737    *        in consumer threads
1738    *
1739    *      - if the potential unrefs is larger than the bias value, it may
1740    *        happen that the producer sets the bias (trying to avoid
1741    *        the freeing of the LogMessage), but still it gets freed.
1742    *
1743    * - not smaller than the "-bias" value, rationale:
1744    *      - if we are caching "bias" number of unrefs the same can happen
1745    *        as with the ref case.
1746    *
1747    */
1748   g_assert((logmsg_cached_acks < LOGMSG_REFCACHE_BIAS - 1) && (logmsg_cached_acks >= -LOGMSG_REFCACHE_BIAS));
1749   g_assert((logmsg_cached_refs < LOGMSG_REFCACHE_BIAS - 1) && (logmsg_cached_refs >= -LOGMSG_REFCACHE_BIAS));
1750 
1751   /*
1752    * We fold the differences in ack/ref counts in three stages:
1753    *
1754    *   1) we take a ref of logmsg_current, this is needed so that the
1755    *      message is not freed until we return from refcache_stop()
1756    *
1757    *   2) we add in all the diffs that were accumulated between
1758    *      refcache_start and refcache_stop. This gets us a final value of the
1759    *      ack counter, ref must be >= as we took a ref ourselves.
1760    *
1761    *   3) we call the ack handler if needed, this might change ref counters
1762    *      recursively (but not ack counters as that already atomically
1763    *      dropped to zero)
1764    *
1765    *   4) drop the ref we took in step 1) above
1766    *
1767    *   4) then we fold in the net ref results of the ack callback and
1768    *      refcache_stop() combined. This either causes the LogMessage to be
1769    *      freed (when we were the last), or it stays around because of other
1770    *      refs.
1771    */
1772 
1773   /* 1) take ref */
1774   log_msg_ref(logmsg_current);
1775 
1776   /* 2) fold in ref/ack counter diffs into the atomic value */
1777 
1778   current_cached_acks = logmsg_cached_acks;
1779   logmsg_cached_acks = 0;
1780 
1781   current_cached_abort = logmsg_cached_abort;
1782   logmsg_cached_abort = FALSE;
1783 
1784   current_cached_suspend = logmsg_cached_suspend;
1785   logmsg_cached_suspend = FALSE;
1786 
1787   old_value = log_msg_update_ack_and_ref_and_abort_and_suspended(logmsg_current, 0, current_cached_acks,
1788               current_cached_abort, current_cached_suspend);
1789 
1790   if ((LOGMSG_REFCACHE_VALUE_TO_ACK(old_value) == -current_cached_acks) && logmsg_cached_ack_needed)
1791     {
1792       AckType ack_type_cumulated = _ack_and_ref_and_abort_and_suspend_to_acktype(old_value);
1793 
1794       if (current_cached_suspend)
1795         ack_type_cumulated = AT_SUSPENDED;
1796       else if (current_cached_abort)
1797         ack_type_cumulated = AT_ABORTED;
1798 
1799       /* 3) call the ack handler */
1800 
1801       logmsg_current->ack_func(logmsg_current, ack_type_cumulated);
1802 
1803       /* the ack callback may not change the ack counters, it already
1804        * dropped to zero atomically, changing that again is an error */
1805 
1806       g_assert(logmsg_cached_acks == 0);
1807     }
1808 
1809   /* 4) drop our own ref */
1810   log_msg_unref(logmsg_current);
1811 
1812   /* 5) fold the combined result of our own ref/unref and ack handler's results */
1813   old_value = log_msg_update_ack_and_ref(logmsg_current, logmsg_cached_refs, 0);
1814 
1815   if (LOGMSG_REFCACHE_VALUE_TO_REF(old_value) == -logmsg_cached_refs)
1816     log_msg_free(logmsg_current);
1817   logmsg_cached_refs = 0;
1818   logmsg_current = NULL;
1819 }
1820 
1821 void
log_msg_registry_init(void)1822 log_msg_registry_init(void)
1823 {
1824   gint i;
1825 
1826   logmsg_registry = nv_registry_new(builtin_value_names, NVHANDLE_MAX_VALUE);
1827   nv_registry_add_alias(logmsg_registry, LM_V_MESSAGE, "MSG");
1828   nv_registry_add_alias(logmsg_registry, LM_V_MESSAGE, "MSGONLY");
1829   nv_registry_add_alias(logmsg_registry, LM_V_HOST, "FULLHOST");
1830   nv_registry_add_alias(logmsg_registry, LM_V_HOST_FROM, "FULLHOST_FROM");
1831 
1832   for (i = 0; macros[i].name; i++)
1833     {
1834       if (nv_registry_get_handle(logmsg_registry, macros[i].name) == 0)
1835         {
1836           NVHandle handle;
1837 
1838           handle = nv_registry_alloc_handle(logmsg_registry, macros[i].name);
1839           nv_registry_set_handle_flags(logmsg_registry, handle, (macros[i].id << 8) + LM_VF_MACRO);
1840         }
1841     }
1842 
1843   /* register $0 - $255 in order */
1844   for (i = 0; i < 256; i++)
1845     {
1846       gchar buf[8];
1847 
1848       g_snprintf(buf, sizeof(buf), "%d", i);
1849       match_handles[i] = nv_registry_alloc_handle(logmsg_registry, buf);
1850     }
1851 }
1852 
1853 void
log_msg_registry_deinit(void)1854 log_msg_registry_deinit(void)
1855 {
1856   nv_registry_free(logmsg_registry);
1857   logmsg_registry = NULL;
1858 }
1859 
1860 void
log_msg_registry_foreach(GHFunc func,gpointer user_data)1861 log_msg_registry_foreach(GHFunc func, gpointer user_data)
1862 {
1863   nv_registry_foreach(logmsg_registry, func, user_data);
1864 }
1865 
1866 static void
log_msg_register_stats(void)1867 log_msg_register_stats(void)
1868 {
1869   stats_lock();
1870   StatsClusterKey sc_key;
1871   stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "msg_clones", NULL );
1872   stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &count_msg_clones);
1873 
1874   stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "payload_reallocs", NULL );
1875   stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &count_payload_reallocs);
1876 
1877   stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "sdata_updates", NULL );
1878   stats_register_counter(0, &sc_key, SC_TYPE_PROCESSED, &count_sdata_updates);
1879 
1880   stats_cluster_single_key_set(&sc_key, SCS_GLOBAL, "msg_allocated_bytes", NULL);
1881   stats_register_counter(1, &sc_key, SC_TYPE_SINGLE_VALUE, &count_allocated_bytes);
1882   stats_unlock();
1883 }
1884 
1885 void
log_msg_global_init(void)1886 log_msg_global_init(void)
1887 {
1888   log_msg_registry_init();
1889 
1890   /* NOTE: we always initialize counters as they are on stats-level(0),
1891    * however we need to defer that as the stats subsystem may not be
1892    * operational yet */
1893   register_application_hook(AH_RUNNING, (ApplicationHookFunc) log_msg_register_stats, NULL, AHM_RUN_ONCE);
1894 }
1895 
1896 
1897 const gchar *
log_msg_get_handle_name(NVHandle handle,gssize * length)1898 log_msg_get_handle_name(NVHandle handle, gssize *length)
1899 {
1900   return nv_registry_get_handle_name(logmsg_registry, handle, length);
1901 }
1902 
1903 void
log_msg_global_deinit(void)1904 log_msg_global_deinit(void)
1905 {
1906   log_msg_registry_deinit();
1907 }
1908 
1909 gint
log_msg_lookup_time_stamp_name(const gchar * name)1910 log_msg_lookup_time_stamp_name(const gchar *name)
1911 {
1912   if (strcmp(name, "stamp") == 0)
1913     return LM_TS_STAMP;
1914   else if (strcmp(name, "recvd") == 0)
1915     return LM_TS_RECVD;
1916   return -1;
1917 }
1918 
1919 gssize
log_msg_get_size(LogMessage * self)1920 log_msg_get_size(LogMessage *self)
1921 {
1922   if (!self)
1923     return 0;
1924 
1925   return
1926     sizeof(LogMessage) + // msg.static fields
1927     + self->alloc_sdata * sizeof(self->sdata[0]) +
1928     g_sockaddr_len(self->saddr) + g_sockaddr_len(self->daddr) +
1929     ((self->num_tags) ? sizeof(self->tags[0]) * self->num_tags : 0) +
1930     nv_table_get_memory_consumption(self->payload); // msg.payload (nvtable)
1931 }
1932 
1933 #ifdef __linux__
1934 
1935 const gchar *
1936 __log_msg_get_value(const LogMessage *self, NVHandle handle, gssize *value_len)
1937 __attribute__((alias("log_msg_get_value")));
1938 
1939 const gchar *
1940 __log_msg_get_value_by_name(const LogMessage *self, const gchar *name, gssize *value_len)
1941 __attribute__((alias("log_msg_get_value_by_name")));
1942 
1943 void
1944 __log_msg_set_value_by_name(LogMessage *self, const gchar *name, const gchar *value, gssize length)
1945 __attribute__((alias("log_msg_set_value_by_name")));
1946 
1947 #endif
1948