1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #ifndef LOGPIPE_H_INCLUDED
26 #define LOGPIPE_H_INCLUDED
27 
28 #include "syslog-ng.h"
29 #include "logmsg/logmsg.h"
30 #include "cfg.h"
31 #include "atomic.h"
32 #include "messages.h"
33 #include "signal-slot-connector/signal-slot-connector.h"
34 
35 /* notify code values */
36 #define NC_CLOSE       1
37 #define NC_READ_ERROR  2
38 #define NC_WRITE_ERROR 3
39 #define NC_FILE_MOVED  4
40 #define NC_FILE_EOF    5
41 #define NC_REOPEN_REQUIRED 6
42 #define NC_FILE_DELETED 7
43 
44 /* indicates that the LogPipe was initialized */
45 #define PIF_INITIALIZED       0x0001
46 /* indicates that this LogPipe got cloned into the tree already */
47 #define PIF_INLINED           0x0002
48 
49 /* log statement flags that are copied to the head of a branch */
50 #define PIF_BRANCH_FINAL      0x0004
51 #define PIF_BRANCH_FALLBACK   0x0008
52 #define PIF_BRANCH_PROPERTIES (PIF_BRANCH_FINAL + PIF_BRANCH_FALLBACK)
53 
54 #define PIF_DROP_UNMATCHED    0x0010
55 
56 /* branch starting with this pipe wants hard flow control */
57 #define PIF_HARD_FLOW_CONTROL 0x0020
58 
59 /* this pipe is a source for messages, it is not meant to be used to
60  * forward messages, syslog-ng will only use these pipes for the
61  * left-hand side of the processing graph, e.g. no other pipes may be
62  * sending messages to these pipes and these are expected to generate
63  * messages "automatically". */
64 
65 #define PIF_SOURCE            0x0040
66 
67 /* private flags range, to be used by other LogPipe instances for their own purposes */
68 
69 #define PIF_PRIVATE(x)       ((x) << 16)
70 
71 /**
72  *
73  * Processing pipeline
74  *
75  *   Within syslog-ng, the user configuration is converted into a tree-like
76  *   structure.  It's node in this tree is a LogPipe object responsible for
77  *   queueing message towards the destination.  Each node is free to
78  *   drop/transform the message it receives.
79  *
80  *   The center.c module contains code that transforms the configuration
81  *   into the log processing tree.  Each log statement in user configuration
82  *   becomes a linked list of pipes, then each source, referenced by the
83  *   is piped into the newly created pipe.
84  *
85  *   Something like this:
86  *
87  *    log statement:
88  *       mpx | filter | parser | dest1 | dest2 | dest3
89  *
90  *    source1 -> log statement1
91  *            |-> log statement2
92  *
93  *   E.g. each source is sending to each log path it was referenced from. Each
94  *   item in the log path is a pipe, which receives messages and forwards it
95  *   at its discretion. Filters are pipes too, which lose data. Destinations
96  *   are piping their output to the next element on the pipeline. This
97  *   basically means that the pipeline is a wired representation of the user
98  *   configuration without having to loop through configuration data.
99  *
100  * Reference counting
101  *
102  *   The pipes do not reference each other through their pipe_next member,
103  *   simply because there'd be too much reference loops to care about.
104  *   Instead pipe_next is a borrowed reference, which is assumed to be valid
105  *   as long as the configuration is not freed.
106  *
107  * Flow control
108  *
109  *   Flow control is the mechanism used to control the message rate between
110  *   input/output sides of syslog-ng in order to avoid message loss.  If the
111  *   two sides were independent, the input side could well receive messages
112  *   at a much higher rate than the destination is able to cope with.
113  *
114  *   This is implemented by allocating a per-source window (similar to a TCP
115  *   window), which can be "filled" by the source without the danger of
116  *   output queue overflow.  Also, whenever a message is processed by the
117  *   destination it invokes an ACK, which in turn increments the window size.
118  *
119  *   This basically boils down to the following:
120  *     * the source is free to receive as much messages as fits into its window
121  *     * whenever the destination has processed a message, this is signalled
122  *       to freeing up a lot in its window
123  *     * if the message is full, the source is suspended, no further messages
124  *       are received.
125  *
126  *   This controls the message rate but doesn't completely ruin throughput,
127  *   as the source has some space without being suspended, as suspension and
128  *   resuming action takes considerable amount of time (mostly latency, but
129  *   CPU is certainly also used).
130  *
131  *   There are currently two forms of flow control:
132  *     * hard flow control
133  *     * soft flow control
134  *
135  *   The first is the form of flow control present in earlier syslog-ng
136  *   versions and was renamed as "hard" in order to differentiate from the
137  *   other form.  Hard means that the source is completely suspended until
138  *   the destination indeed processed a message.  If the network is down,
139  *   the disk is full, the source will not accept messages.
140  *
141  *   Soft flow control was introduced when syslog-ng became threaded and the
142  *   earlier priority based behaviour couldn't be mimiced any other way.
143  *   Soft flow control cannot be configured, it is automatically used by
144  *   file destinations if "hard" flow control is not enabled by the user.
145  *   Soft flow control means that flow is only controlled as long as the
146  *   destination is writable, if an error occurs (disk full, etc) messages
147  *   get dropped on the floor.  But as long as the destination is writable,
148  *   the destination rate controls the source rate as well.
149  *
150  *   The behaviour in non-threaded syslog-ng was, that destinations were
151  *   prioritized over sources, and whenever a destination was writable,
152  *   sources were implicitly suspended.  This is not easily implementable by
153  *   threads and ivykis, thus this alternative mechanism was created.
154  *
155  *   Please note that soft-flow-control is a somewhat stronger guarantee
156  *   than the earlier behaviour, therefore it is currently only used for
157  *   destination files.
158  *
159  * Plugin overrides
160  *
161  *   Various methods can be overridden by external objects within
162  *   LogPipe and derived classes. The aim of this functionality to
163  *   make it possible to attach new functions to a LogPipe at runtime.
164  *
165  *   For example, it'd make sense to implement the "suppress"
166  *   functionality as such plugin, which is currently implemented in
167  *   LogWriter, and in case a non-LogWriter destination would need it,
168  *   then a separate implementation would be needed.
169  *
170  *   The way to override a method by an external object is as follows:
171  *
172  *     - it should save the current value of the method address (for
173  *       example "queue" for the queue method)
174  *
175  *     - it should change the pointer pointing to the relevant method to
176  *       its own code (e.g. change "queue" in LogPipe)
177  *
178  *     - once the hook is invoked, it should take care about calling the
179  *       original function
180  **/
181 
182 struct _LogPathOptions
183 {
184   /* an acknowledgement is "passed" to this path, an ACK is still
185    * needed to close the window slot. This was called "flow-control"
186    * and meant both of these things: the user requested
187    * flags(flow-control), _AND_ an acknowledgement was needed. With
188    * the latest change, the one below specifies the user option,
189    * while the "ack is still needed" condition is stored in
190    * ack_needed.
191    */
192 
193   gboolean ack_needed;
194 
195   /* The user has requested flow-control on this processing path,
196    * which means that the destination should invoke log_msg_ack()
197    * after it has completed processing it (e.g. after sending to the
198    * actual destination, possibly after confirmation if the transport
199    * supports that). If flow-control is not requested, destinations
200    * are permitted to call log_msg_ack() early (e.g. at queue time).
201    *
202    * This is initially FALSE and can be set to TRUE anywhere _before_
203    * the destination driver, which will actually carry out the
204    * required action.
205    */
206 
207   gboolean flow_control_requested;
208 
209   gboolean *matched;
210 };
211 
212 #define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL }
213 #define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL }
214 
215 struct _LogPipe
216 {
217   GAtomicCounter ref_cnt;
218   gint32 flags;
219 
220   void (*queue)(LogPipe *self, LogMessage *msg, const LogPathOptions *path_options);
221 
222   GlobalConfig *cfg;
223   LogExprNode *expr_node;
224   LogPipe *pipe_next;
225   StatsCounterItem *discarded_messages;
226   const gchar *persist_name;
227   gchar *plugin_name;
228   SignalSlotConnector *signal_slot_connector;
229 
230   gboolean (*pre_init)(LogPipe *self);
231   gboolean (*init)(LogPipe *self);
232   gboolean (*deinit)(LogPipe *self);
233   void (*post_deinit)(LogPipe *self);
234 
235   /* this event function is used to perform necessary operation, such as
236    * starting worker thread, and etc. therefore, syslog-ng will terminate if
237    * return value is false.
238    */
239   gboolean (*on_config_inited)(LogPipe *self);
240 
241   const gchar *(*generate_persist_name)(const LogPipe *self);
242   GList *(*arcs)(LogPipe *self);
243 
244   /* clone this pipe when used in multiple locations in the processing
245    * pipe-line. If it contains state, it should behave as if it was
246    * the same instance, otherwise it can be a copy.
247    */
248   LogPipe *(*clone)(LogPipe *self);
249 
250   void (*free_fn)(LogPipe *self);
251   void (*notify)(LogPipe *self, gint notify_code, gpointer user_data);
252   GList *info;
253 };
254 
255 /*
256    cpu-cache optimization: queue method should be as close as possible to flags.
257 
258    Rationale: the pointer pointing to this LogPipe instance is
259    resolved right before calling queue and the colocation of flags and
260    the queue pointer ensures that they are on the same cache line. The
261    queue method is probably the single most often called virtual method
262  */
263 G_STATIC_ASSERT(G_STRUCT_OFFSET(LogPipe, queue) - G_STRUCT_OFFSET(LogPipe, flags) <= 4);
264 
265 extern gboolean (*pipe_single_step_hook)(LogPipe *pipe, LogMessage *msg, const LogPathOptions *path_options);
266 
267 LogPipe *log_pipe_ref(LogPipe *self);
268 gboolean log_pipe_unref(LogPipe *self);
269 LogPipe *log_pipe_new(GlobalConfig *cfg);
270 void log_pipe_init_instance(LogPipe *self, GlobalConfig *cfg);
271 void log_pipe_forward_notify(LogPipe *self, gint notify_code, gpointer user_data);
272 EVTTAG *log_pipe_location_tag(LogPipe *pipe);
273 void log_pipe_attach_expr_node(LogPipe *self, LogExprNode *expr_node);
274 void log_pipe_detach_expr_node(LogPipe *self);
275 
276 static inline GlobalConfig *
log_pipe_get_config(LogPipe * s)277 log_pipe_get_config(LogPipe *s)
278 {
279   g_assert(s->cfg);
280   return s->cfg;
281 }
282 
283 static inline void
log_pipe_set_config(LogPipe * s,GlobalConfig * cfg)284 log_pipe_set_config(LogPipe *s, GlobalConfig *cfg)
285 {
286   s->cfg = cfg;
287 }
288 
289 static inline void
log_pipe_reset_config(LogPipe * s)290 log_pipe_reset_config(LogPipe *s)
291 {
292   log_pipe_set_config(s, NULL);
293 }
294 
295 static inline gboolean
log_pipe_init(LogPipe * s)296 log_pipe_init(LogPipe *s)
297 {
298   if (!(s->flags & PIF_INITIALIZED))
299     {
300       if (s->pre_init && !s->pre_init(s))
301         return FALSE;
302       if (!s->init || s->init(s))
303         {
304           s->flags |= PIF_INITIALIZED;
305           return TRUE;
306         }
307       return FALSE;
308     }
309   return TRUE;
310 }
311 
312 static inline gboolean
log_pipe_deinit(LogPipe * s)313 log_pipe_deinit(LogPipe *s)
314 {
315   if ((s->flags & PIF_INITIALIZED))
316     {
317       if (!s->deinit || s->deinit(s))
318         {
319           s->flags &= ~PIF_INITIALIZED;
320 
321           if (s->post_deinit)
322             s->post_deinit(s);
323           return TRUE;
324         }
325       return FALSE;
326     }
327   return TRUE;
328 }
329 
330 static inline gboolean
log_pipe_on_config_inited(LogPipe * s)331 log_pipe_on_config_inited(LogPipe *s)
332 {
333   if (s->on_config_inited)
334     return s->on_config_inited(s);
335   return TRUE;
336 }
337 
338 static inline void
339 log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options);
340 
341 static inline void
log_pipe_forward_msg(LogPipe * self,LogMessage * msg,const LogPathOptions * path_options)342 log_pipe_forward_msg(LogPipe *self, LogMessage *msg, const LogPathOptions *path_options)
343 {
344   if (self->pipe_next)
345     {
346       log_pipe_queue(self->pipe_next, msg, path_options);
347     }
348   else
349     {
350       log_msg_drop(msg, path_options, AT_PROCESSED);
351     }
352 }
353 
354 static inline void
log_pipe_queue(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)355 log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
356 {
357   LogPathOptions local_path_options;
358   g_assert((s->flags & PIF_INITIALIZED) != 0);
359 
360   if (G_UNLIKELY(pipe_single_step_hook))
361     {
362       if (!pipe_single_step_hook(s, msg, path_options))
363         {
364           log_msg_drop(msg, path_options, AT_PROCESSED);
365           return;
366         }
367     }
368 
369   if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL)))
370     {
371       local_path_options = *path_options;
372 
373       local_path_options.flow_control_requested = 1;
374       path_options = &local_path_options;
375 
376       msg_trace("Requesting flow control", log_pipe_location_tag(s));
377     }
378 
379   if (s->queue)
380     {
381       s->queue(s, msg, path_options);
382     }
383   else
384     {
385       log_pipe_forward_msg(s, msg, path_options);
386     }
387 
388   if (path_options->matched && !(*path_options->matched) && (s->flags & PIF_DROP_UNMATCHED))
389     {
390       (*path_options->matched) = TRUE;
391     }
392 }
393 
394 static inline LogPipe *
log_pipe_clone(LogPipe * self)395 log_pipe_clone(LogPipe *self)
396 {
397   g_assert(NULL != self->clone);
398   return self->clone(self);
399 }
400 
401 static inline void
log_pipe_notify(LogPipe * s,gint notify_code,gpointer user_data)402 log_pipe_notify(LogPipe *s, gint notify_code, gpointer user_data)
403 {
404   if (s->notify)
405     s->notify(s, notify_code, user_data);
406 }
407 
408 static inline void
log_pipe_append(LogPipe * s,LogPipe * next)409 log_pipe_append(LogPipe *s, LogPipe *next)
410 {
411   s->pipe_next = next;
412 }
413 
414 void
415 log_pipe_set_persist_name(LogPipe *self, const gchar *persist_name);
416 
417 const gchar *
418 log_pipe_get_persist_name(const LogPipe *self);
419 
420 void log_pipe_free_method(LogPipe *s);
421 
422 static inline GList *
log_pipe_get_arcs(LogPipe * s)423 log_pipe_get_arcs(LogPipe *s)
424 {
425   return s->arcs(s);
426 }
427 
428 void log_pipe_add_info(LogPipe *self, const gchar *info);
429 #endif
430