1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #ifndef LOGPIPE_H_INCLUDED
26 #define LOGPIPE_H_INCLUDED
27
28 #include "syslog-ng.h"
29 #include "logmsg/logmsg.h"
30 #include "cfg.h"
31 #include "atomic.h"
32 #include "messages.h"
33 #include "signal-slot-connector/signal-slot-connector.h"
34
35 /* notify code values */
36 #define NC_CLOSE 1
37 #define NC_READ_ERROR 2
38 #define NC_WRITE_ERROR 3
39 #define NC_FILE_MOVED 4
40 #define NC_FILE_EOF 5
41 #define NC_REOPEN_REQUIRED 6
42 #define NC_FILE_DELETED 7
43
44 /* indicates that the LogPipe was initialized */
45 #define PIF_INITIALIZED 0x0001
46 /* indicates that this LogPipe got cloned into the tree already */
47 #define PIF_INLINED 0x0002
48
49 /* log statement flags that are copied to the head of a branch */
50 #define PIF_BRANCH_FINAL 0x0004
51 #define PIF_BRANCH_FALLBACK 0x0008
52 #define PIF_BRANCH_PROPERTIES (PIF_BRANCH_FINAL + PIF_BRANCH_FALLBACK)
53
54 #define PIF_DROP_UNMATCHED 0x0010
55
56 /* branch starting with this pipe wants hard flow control */
57 #define PIF_HARD_FLOW_CONTROL 0x0020
58
59 /* this pipe is a source for messages, it is not meant to be used to
60 * forward messages, syslog-ng will only use these pipes for the
61 * left-hand side of the processing graph, e.g. no other pipes may be
62 * sending messages to these pipes and these are expected to generate
63 * messages "automatically". */
64
65 #define PIF_SOURCE 0x0040
66
67 /* private flags range, to be used by other LogPipe instances for their own purposes */
68
69 #define PIF_PRIVATE(x) ((x) << 16)
70
71 /**
72 *
73 * Processing pipeline
74 *
75 * Within syslog-ng, the user configuration is converted into a tree-like
76 * structure. It's node in this tree is a LogPipe object responsible for
77 * queueing message towards the destination. Each node is free to
78 * drop/transform the message it receives.
79 *
80 * The center.c module contains code that transforms the configuration
81 * into the log processing tree. Each log statement in user configuration
82 * becomes a linked list of pipes, then each source, referenced by the
83 * is piped into the newly created pipe.
84 *
85 * Something like this:
86 *
87 * log statement:
88 * mpx | filter | parser | dest1 | dest2 | dest3
89 *
90 * source1 -> log statement1
91 * |-> log statement2
92 *
93 * E.g. each source is sending to each log path it was referenced from. Each
94 * item in the log path is a pipe, which receives messages and forwards it
95 * at its discretion. Filters are pipes too, which lose data. Destinations
96 * are piping their output to the next element on the pipeline. This
97 * basically means that the pipeline is a wired representation of the user
98 * configuration without having to loop through configuration data.
99 *
100 * Reference counting
101 *
102 * The pipes do not reference each other through their pipe_next member,
103 * simply because there'd be too much reference loops to care about.
104 * Instead pipe_next is a borrowed reference, which is assumed to be valid
105 * as long as the configuration is not freed.
106 *
107 * Flow control
108 *
109 * Flow control is the mechanism used to control the message rate between
110 * input/output sides of syslog-ng in order to avoid message loss. If the
111 * two sides were independent, the input side could well receive messages
112 * at a much higher rate than the destination is able to cope with.
113 *
114 * This is implemented by allocating a per-source window (similar to a TCP
115 * window), which can be "filled" by the source without the danger of
116 * output queue overflow. Also, whenever a message is processed by the
117 * destination it invokes an ACK, which in turn increments the window size.
118 *
119 * This basically boils down to the following:
120 * * the source is free to receive as much messages as fits into its window
121 * * whenever the destination has processed a message, this is signalled
122 * to freeing up a lot in its window
123 * * if the message is full, the source is suspended, no further messages
124 * are received.
125 *
126 * This controls the message rate but doesn't completely ruin throughput,
127 * as the source has some space without being suspended, as suspension and
128 * resuming action takes considerable amount of time (mostly latency, but
129 * CPU is certainly also used).
130 *
131 * There are currently two forms of flow control:
132 * * hard flow control
133 * * soft flow control
134 *
135 * The first is the form of flow control present in earlier syslog-ng
136 * versions and was renamed as "hard" in order to differentiate from the
137 * other form. Hard means that the source is completely suspended until
138 * the destination indeed processed a message. If the network is down,
139 * the disk is full, the source will not accept messages.
140 *
141 * Soft flow control was introduced when syslog-ng became threaded and the
142 * earlier priority based behaviour couldn't be mimiced any other way.
143 * Soft flow control cannot be configured, it is automatically used by
144 * file destinations if "hard" flow control is not enabled by the user.
145 * Soft flow control means that flow is only controlled as long as the
146 * destination is writable, if an error occurs (disk full, etc) messages
147 * get dropped on the floor. But as long as the destination is writable,
148 * the destination rate controls the source rate as well.
149 *
150 * The behaviour in non-threaded syslog-ng was, that destinations were
151 * prioritized over sources, and whenever a destination was writable,
152 * sources were implicitly suspended. This is not easily implementable by
153 * threads and ivykis, thus this alternative mechanism was created.
154 *
155 * Please note that soft-flow-control is a somewhat stronger guarantee
156 * than the earlier behaviour, therefore it is currently only used for
157 * destination files.
158 *
159 * Plugin overrides
160 *
161 * Various methods can be overridden by external objects within
162 * LogPipe and derived classes. The aim of this functionality to
163 * make it possible to attach new functions to a LogPipe at runtime.
164 *
165 * For example, it'd make sense to implement the "suppress"
166 * functionality as such plugin, which is currently implemented in
167 * LogWriter, and in case a non-LogWriter destination would need it,
168 * then a separate implementation would be needed.
169 *
170 * The way to override a method by an external object is as follows:
171 *
172 * - it should save the current value of the method address (for
173 * example "queue" for the queue method)
174 *
175 * - it should change the pointer pointing to the relevant method to
176 * its own code (e.g. change "queue" in LogPipe)
177 *
178 * - once the hook is invoked, it should take care about calling the
179 * original function
180 **/
181
182 struct _LogPathOptions
183 {
184 /* an acknowledgement is "passed" to this path, an ACK is still
185 * needed to close the window slot. This was called "flow-control"
186 * and meant both of these things: the user requested
187 * flags(flow-control), _AND_ an acknowledgement was needed. With
188 * the latest change, the one below specifies the user option,
189 * while the "ack is still needed" condition is stored in
190 * ack_needed.
191 */
192
193 gboolean ack_needed;
194
195 /* The user has requested flow-control on this processing path,
196 * which means that the destination should invoke log_msg_ack()
197 * after it has completed processing it (e.g. after sending to the
198 * actual destination, possibly after confirmation if the transport
199 * supports that). If flow-control is not requested, destinations
200 * are permitted to call log_msg_ack() early (e.g. at queue time).
201 *
202 * This is initially FALSE and can be set to TRUE anywhere _before_
203 * the destination driver, which will actually carry out the
204 * required action.
205 */
206
207 gboolean flow_control_requested;
208
209 gboolean *matched;
210 };
211
212 #define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL }
213 #define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL }
214
215 struct _LogPipe
216 {
217 GAtomicCounter ref_cnt;
218 gint32 flags;
219
220 void (*queue)(LogPipe *self, LogMessage *msg, const LogPathOptions *path_options);
221
222 GlobalConfig *cfg;
223 LogExprNode *expr_node;
224 LogPipe *pipe_next;
225 StatsCounterItem *discarded_messages;
226 const gchar *persist_name;
227 gchar *plugin_name;
228 SignalSlotConnector *signal_slot_connector;
229
230 gboolean (*pre_init)(LogPipe *self);
231 gboolean (*init)(LogPipe *self);
232 gboolean (*deinit)(LogPipe *self);
233 void (*post_deinit)(LogPipe *self);
234
235 /* this event function is used to perform necessary operation, such as
236 * starting worker thread, and etc. therefore, syslog-ng will terminate if
237 * return value is false.
238 */
239 gboolean (*on_config_inited)(LogPipe *self);
240
241 const gchar *(*generate_persist_name)(const LogPipe *self);
242 GList *(*arcs)(LogPipe *self);
243
244 /* clone this pipe when used in multiple locations in the processing
245 * pipe-line. If it contains state, it should behave as if it was
246 * the same instance, otherwise it can be a copy.
247 */
248 LogPipe *(*clone)(LogPipe *self);
249
250 void (*free_fn)(LogPipe *self);
251 void (*notify)(LogPipe *self, gint notify_code, gpointer user_data);
252 GList *info;
253 };
254
255 /*
256 cpu-cache optimization: queue method should be as close as possible to flags.
257
258 Rationale: the pointer pointing to this LogPipe instance is
259 resolved right before calling queue and the colocation of flags and
260 the queue pointer ensures that they are on the same cache line. The
261 queue method is probably the single most often called virtual method
262 */
263 G_STATIC_ASSERT(G_STRUCT_OFFSET(LogPipe, queue) - G_STRUCT_OFFSET(LogPipe, flags) <= 4);
264
265 extern gboolean (*pipe_single_step_hook)(LogPipe *pipe, LogMessage *msg, const LogPathOptions *path_options);
266
267 LogPipe *log_pipe_ref(LogPipe *self);
268 gboolean log_pipe_unref(LogPipe *self);
269 LogPipe *log_pipe_new(GlobalConfig *cfg);
270 void log_pipe_init_instance(LogPipe *self, GlobalConfig *cfg);
271 void log_pipe_forward_notify(LogPipe *self, gint notify_code, gpointer user_data);
272 EVTTAG *log_pipe_location_tag(LogPipe *pipe);
273 void log_pipe_attach_expr_node(LogPipe *self, LogExprNode *expr_node);
274 void log_pipe_detach_expr_node(LogPipe *self);
275
276 static inline GlobalConfig *
log_pipe_get_config(LogPipe * s)277 log_pipe_get_config(LogPipe *s)
278 {
279 g_assert(s->cfg);
280 return s->cfg;
281 }
282
283 static inline void
log_pipe_set_config(LogPipe * s,GlobalConfig * cfg)284 log_pipe_set_config(LogPipe *s, GlobalConfig *cfg)
285 {
286 s->cfg = cfg;
287 }
288
289 static inline void
log_pipe_reset_config(LogPipe * s)290 log_pipe_reset_config(LogPipe *s)
291 {
292 log_pipe_set_config(s, NULL);
293 }
294
295 static inline gboolean
log_pipe_init(LogPipe * s)296 log_pipe_init(LogPipe *s)
297 {
298 if (!(s->flags & PIF_INITIALIZED))
299 {
300 if (s->pre_init && !s->pre_init(s))
301 return FALSE;
302 if (!s->init || s->init(s))
303 {
304 s->flags |= PIF_INITIALIZED;
305 return TRUE;
306 }
307 return FALSE;
308 }
309 return TRUE;
310 }
311
312 static inline gboolean
log_pipe_deinit(LogPipe * s)313 log_pipe_deinit(LogPipe *s)
314 {
315 if ((s->flags & PIF_INITIALIZED))
316 {
317 if (!s->deinit || s->deinit(s))
318 {
319 s->flags &= ~PIF_INITIALIZED;
320
321 if (s->post_deinit)
322 s->post_deinit(s);
323 return TRUE;
324 }
325 return FALSE;
326 }
327 return TRUE;
328 }
329
330 static inline gboolean
log_pipe_on_config_inited(LogPipe * s)331 log_pipe_on_config_inited(LogPipe *s)
332 {
333 if (s->on_config_inited)
334 return s->on_config_inited(s);
335 return TRUE;
336 }
337
338 static inline void
339 log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options);
340
341 static inline void
log_pipe_forward_msg(LogPipe * self,LogMessage * msg,const LogPathOptions * path_options)342 log_pipe_forward_msg(LogPipe *self, LogMessage *msg, const LogPathOptions *path_options)
343 {
344 if (self->pipe_next)
345 {
346 log_pipe_queue(self->pipe_next, msg, path_options);
347 }
348 else
349 {
350 log_msg_drop(msg, path_options, AT_PROCESSED);
351 }
352 }
353
354 static inline void
log_pipe_queue(LogPipe * s,LogMessage * msg,const LogPathOptions * path_options)355 log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
356 {
357 LogPathOptions local_path_options;
358 g_assert((s->flags & PIF_INITIALIZED) != 0);
359
360 if (G_UNLIKELY(pipe_single_step_hook))
361 {
362 if (!pipe_single_step_hook(s, msg, path_options))
363 {
364 log_msg_drop(msg, path_options, AT_PROCESSED);
365 return;
366 }
367 }
368
369 if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL)))
370 {
371 local_path_options = *path_options;
372
373 local_path_options.flow_control_requested = 1;
374 path_options = &local_path_options;
375
376 msg_trace("Requesting flow control", log_pipe_location_tag(s));
377 }
378
379 if (s->queue)
380 {
381 s->queue(s, msg, path_options);
382 }
383 else
384 {
385 log_pipe_forward_msg(s, msg, path_options);
386 }
387
388 if (path_options->matched && !(*path_options->matched) && (s->flags & PIF_DROP_UNMATCHED))
389 {
390 (*path_options->matched) = TRUE;
391 }
392 }
393
394 static inline LogPipe *
log_pipe_clone(LogPipe * self)395 log_pipe_clone(LogPipe *self)
396 {
397 g_assert(NULL != self->clone);
398 return self->clone(self);
399 }
400
401 static inline void
log_pipe_notify(LogPipe * s,gint notify_code,gpointer user_data)402 log_pipe_notify(LogPipe *s, gint notify_code, gpointer user_data)
403 {
404 if (s->notify)
405 s->notify(s, notify_code, user_data);
406 }
407
408 static inline void
log_pipe_append(LogPipe * s,LogPipe * next)409 log_pipe_append(LogPipe *s, LogPipe *next)
410 {
411 s->pipe_next = next;
412 }
413
414 void
415 log_pipe_set_persist_name(LogPipe *self, const gchar *persist_name);
416
417 const gchar *
418 log_pipe_get_persist_name(const LogPipe *self);
419
420 void log_pipe_free_method(LogPipe *s);
421
422 static inline GList *
log_pipe_get_arcs(LogPipe * s)423 log_pipe_get_arcs(LogPipe *s)
424 {
425 return s->arcs(s);
426 }
427
428 void log_pipe_add_info(LogPipe *self, const gchar *info);
429 #endif
430