1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_input_plugin.h>
23 #include <fluent-bit/flb_pack.h>
24 #include <fluent-bit/flb_unescape.h>
25 
26 #include "tail_config.h"
27 #include "tail_dockermode.h"
28 #include "tail_file_internal.h"
29 
flb_tail_dmode_create(struct flb_tail_config * ctx,struct flb_input_instance * ins,struct flb_config * config)30 int flb_tail_dmode_create(struct flb_tail_config *ctx,
31                           struct flb_input_instance *ins,
32                           struct flb_config *config)
33 {
34     const char *tmp;
35 
36     if (ctx->multiline == FLB_TRUE) {
37         flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline "
38                       "is enabled");
39         return -1;
40     }
41 
42 #ifdef FLB_HAVE_REGEX
43     /* First line Parser */
44     tmp = flb_input_get_property("docker_mode_parser", ins);
45     if (tmp) {
46         ctx->docker_mode_parser = flb_parser_get(tmp, config);
47         if (!ctx->docker_mode_parser) {
48             flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
49         }
50     }
51     else {
52         ctx->docker_mode_parser = NULL;
53     }
54 #endif
55 
56     tmp = flb_input_get_property("docker_mode_flush", ins);
57     if (!tmp) {
58         ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH;
59     }
60     else {
61         ctx->docker_mode_flush = atoi(tmp);
62         if (ctx->docker_mode_flush <= 0) {
63             ctx->docker_mode_flush = 1;
64         }
65     }
66 
67     return 0;
68 }
69 
modify_json_cond(char * js,size_t js_len,char ** val,size_t * val_len,char ** out,size_t * out_len,int cond (char *,size_t),int mod (char *,size_t,char **,size_t *,void *),void * data)70 static int modify_json_cond(char *js, size_t js_len,
71                             char **val, size_t *val_len,
72                             char **out, size_t *out_len,
73                             int cond(char*, size_t),
74                             int mod(char*, size_t, char**, size_t*, void*), void *data)
75 {
76     int ret;
77     struct flb_pack_state state;
78     jsmntok_t *t;
79     jsmntok_t *t_val = NULL;
80     int i;
81     int i_root = -1;
82     int i_key = -1;
83     char *old_val;
84     size_t old_val_len;
85     char *new_val = NULL;
86     size_t new_val_len = 0;
87     size_t mod_len;
88 
89     ret = flb_pack_state_init(&state);
90     if (ret != 0) {
91         ret = -1;
92         goto modify_json_cond_end;
93     }
94 
95     ret = flb_json_tokenise(js, js_len, &state);
96     if (ret != 0 || state.tokens_count == 0) {
97         ret = -1;
98         goto modify_json_cond_end;
99     }
100 
101     for (i = 0; i < state.tokens_count; i++) {
102         t = &state.tokens[i];
103 
104         if (i_key >= 0) {
105             if (t->parent == i_key) {
106                 if (t->type == JSMN_STRING) {
107                    t_val = t;
108                 }
109                 break;
110             }
111             continue;
112         }
113 
114         if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) {
115             i_root = i;
116             continue;
117         }
118         if (i_root == -1) {
119             continue;
120         }
121 
122         if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) {
123             i_key = i;
124         }
125     }
126 
127     if (!t_val) {
128         ret = -1;
129         goto modify_json_cond_end;
130     }
131 
132     *out = js;
133     *out_len = js_len;
134 
135     if (val) {
136         *val = js + t_val->start;
137     }
138     if (val_len) {
139         *val_len = t_val->end - t_val->start;
140     }
141 
142     if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) {
143         old_val = js + t_val->start;
144         old_val_len = t_val->end - t_val->start;
145         ret = mod(old_val, old_val_len, &new_val, &new_val_len, data);
146         if (ret != 0) {
147             ret = -1;
148             goto modify_json_cond_end;
149         }
150 
151         ret = 1;
152 
153         if (new_val == old_val) {
154             goto modify_json_cond_end;
155         }
156 
157         mod_len = js_len + new_val_len - old_val_len;
158         *out = flb_malloc(mod_len);
159         if (!*out) {
160             flb_errno();
161             flb_free(new_val);
162             ret = -1;
163             goto modify_json_cond_end;
164         }
165         *out_len = mod_len;
166 
167         memcpy(*out, js, t_val->start);
168         memcpy(*out + t_val->start, new_val, new_val_len);
169         memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end);
170 
171         flb_free(new_val);
172     }
173 
174  modify_json_cond_end:
175     flb_pack_state_reset(&state);
176     if (ret < 0) {
177         *out = NULL;
178     }
179     return ret;
180 }
181 
unesc_ends_with_nl(char * str,size_t len)182 static int unesc_ends_with_nl(char *str, size_t len)
183 {
184     char* unesc;
185     int unesc_len;
186     int nl;
187 
188     unesc = flb_malloc(len + 1);
189     if (!unesc) {
190         flb_errno();
191         return FLB_FALSE;
192     }
193     unesc_len = flb_unescape_string(str, len, &unesc);
194     nl = unesc[unesc_len - 1] == '\n';
195     flb_free(unesc);
196     return nl;
197 }
198 
prepend_sds_to_str(char * str,size_t len,char ** out,size_t * out_len,void * data)199 static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data)
200 {
201     flb_sds_t sds = data;
202 
203     if (flb_sds_len(sds) == 0) {
204         *out = str;
205         *out_len = len;
206         return 0;
207     }
208 
209     size_t mod_len = flb_sds_len(sds) + len;
210     *out = flb_malloc(mod_len);
211     if (!*out) {
212         flb_errno();
213         return -1;
214     }
215     *out_len = mod_len;
216 
217     memcpy(*out, sds, flb_sds_len(sds));
218     memcpy(*out + flb_sds_len(sds), str, len);
219     return 0;
220 }
221 
use_sds(char * str,size_t len,char ** out,size_t * out_len,void * data)222 static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data)
223 {
224     flb_sds_t sds = data;
225     size_t mod_len = flb_sds_len(sds);
226     *out = flb_malloc(mod_len);
227     if (!*out) {
228         flb_errno();
229         return -1;
230     }
231     *out_len = mod_len;
232 
233     memcpy(*out, sds, flb_sds_len(sds));
234     return 0;
235 }
236 
flb_tail_dmode_process_content(time_t now,char * line,size_t line_len,char ** repl_line,size_t * repl_line_len,struct flb_tail_file * file,struct flb_tail_config * ctx,msgpack_sbuffer * mp_sbuf,msgpack_packer * mp_pck)237 int flb_tail_dmode_process_content(time_t now,
238                                    char* line, size_t line_len,
239                                    char **repl_line, size_t *repl_line_len,
240                                    struct flb_tail_file *file,
241                                    struct flb_tail_config *ctx,
242                                    msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck
243                                    )
244 {
245     char* val = NULL;
246     size_t val_len;
247     int ret;
248     void *out_buf = NULL;
249     size_t out_size;
250     struct flb_time out_time = {0};
251     *repl_line = NULL;
252     *repl_line_len = 0;
253     flb_sds_t tmp;
254     flb_sds_t tmp_copy;
255 
256 #ifdef FLB_HAVE_REGEX
257     if (ctx->docker_mode_parser) {
258         ret = flb_parser_do(ctx->docker_mode_parser, line, line_len,
259                             &out_buf, &out_size, &out_time);
260         flb_free(out_buf);
261 
262         /*
263         * Set dmode_firstline if the line meets the first-line requirement
264         */
265         if(ret >= 0) {
266             file->dmode_firstline = true;
267         }
268 
269         /*
270         * Process if buffer contains full log line
271         */
272         if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) {
273             /*
274             * Buffered log should be flushed out
275             * as current line meets first-line requirement
276             */
277             if(ret >= 0) {
278                 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
279             }
280 
281             /*
282             * Flush the buffer if multiline has not been detected yet
283             */
284             if (!file->dmode_firstline) {
285                 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
286             }
287         }
288     }
289 #endif
290 
291     ret = modify_json_cond(line, line_len,
292                            &val, &val_len,
293                            repl_line, repl_line_len,
294                            unesc_ends_with_nl,
295                            prepend_sds_to_str, file->dmode_buf);
296     if (ret >= 0) {
297         /* line is a valid json */
298         flb_sds_len_set(file->dmode_lastline, 0);
299 
300         /* concatenate current log line with buffered one */
301         tmp = flb_sds_cat(file->dmode_buf, val, val_len);
302         if (!tmp) {
303             flb_errno();
304             return -1;
305         }
306         file->dmode_buf = tmp;
307 
308         tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len);
309         if (!tmp_copy) {
310             flb_errno();
311             return -1;
312         }
313 
314         file->dmode_lastline = tmp_copy;
315         file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1);
316 
317         if (ret == 0) {
318             /* Line not ended with newline */
319             file->dmode_complete = false;
320         }
321         else {
322             /* Line ended with newline */
323             file->dmode_complete = true;
324 #ifdef FLB_HAVE_REGEX
325             if (!ctx->docker_mode_parser) {
326                 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
327             }
328 #else
329             flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
330 #endif
331         }
332     }
333     return ret;
334 }
335 
flb_tail_dmode_flush(msgpack_sbuffer * mp_sbuf,msgpack_packer * mp_pck,struct flb_tail_file * file,struct flb_tail_config * ctx)336 void flb_tail_dmode_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
337                           struct flb_tail_file *file, struct flb_tail_config *ctx)
338 {
339     int ret;
340     char *repl_line = NULL;
341     size_t repl_line_len = 0;
342     void *out_buf = NULL;
343     size_t out_size;
344     struct flb_time out_time = {0};
345     time_t now = time(NULL);
346 
347     if (flb_sds_len(file->dmode_lastline) == 0) {
348         return;
349     }
350 
351     flb_time_zero(&out_time);
352 
353     ret = modify_json_cond(file->dmode_lastline, flb_sds_len(file->dmode_lastline),
354                            NULL, NULL,
355                            &repl_line, &repl_line_len,
356                            NULL,
357                            use_sds, file->dmode_buf);
358     if (ret < 0) {
359         return;
360     }
361 
362     flb_sds_len_set(file->dmode_buf, 0);
363     flb_sds_len_set(file->dmode_lastline, 0);
364     file->dmode_flush_timeout = 0;
365 
366 #ifdef FLB_HAVE_REGEX
367     if (ctx->parser) {
368         ret = flb_parser_do(ctx->parser, repl_line, repl_line_len,
369                             &out_buf, &out_size, &out_time);
370         if (ret >= 0) {
371             if (flb_time_to_double(&out_time) == 0) {
372                 flb_time_get(&out_time);
373             }
374             if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) {
375                 goto dmode_flush_end;
376             }
377             flb_tail_pack_line_map(mp_sbuf, mp_pck, &out_time,
378                                    (char**) &out_buf, &out_size, file, 0);
379             goto dmode_flush_end;        }
380     }
381 #endif
382     flb_time_get(&out_time);
383     flb_tail_file_pack_line(mp_sbuf, mp_pck, &out_time,
384                             repl_line, repl_line_len, file, 0);
385 
386  dmode_flush_end:
387     flb_free(repl_line);
388     flb_free(out_buf);
389 }
390 
file_pending_flush(struct flb_tail_config * ctx,struct flb_tail_file * file,time_t now)391 static void file_pending_flush(struct flb_tail_config *ctx,
392                                struct flb_tail_file *file, time_t now)
393 {
394     msgpack_sbuffer mp_sbuf;
395     msgpack_packer mp_pck;
396 
397     if (file->dmode_flush_timeout > now) {
398         return;
399     }
400 
401     if (flb_sds_len(file->dmode_lastline) == 0) {
402         return;
403     }
404 
405     msgpack_sbuffer_init(&mp_sbuf);
406     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
407 
408     flb_tail_dmode_flush(&mp_sbuf, &mp_pck, file, ctx);
409 
410     flb_input_chunk_append_raw(ctx->ins,
411                                file->tag_buf,
412                                file->tag_len,
413                                mp_sbuf.data,
414                                mp_sbuf.size);
415     msgpack_sbuffer_destroy(&mp_sbuf);
416 }
417 
flb_tail_dmode_pending_flush_all(struct flb_tail_config * ctx)418 int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx)
419 {
420     time_t expired;
421     struct mk_list *head;
422     struct flb_tail_file *file;
423 
424     expired = time(NULL) + 3600;
425 
426     /* Iterate promoted event files with pending bytes */
427     mk_list_foreach(head, &ctx->files_static) {
428         file = mk_list_entry(head, struct flb_tail_file, _head);
429         file_pending_flush(ctx, file, expired);
430     }
431 
432     /* Iterate promoted event files with pending bytes */
433     mk_list_foreach(head, &ctx->files_event) {
434         file = mk_list_entry(head, struct flb_tail_file, _head);
435         file_pending_flush(ctx, file, expired);
436     }
437 
438     return 0;
439 }
440 
flb_tail_dmode_pending_flush(struct flb_input_instance * ins,struct flb_config * config,void * context)441 int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
442                                  struct flb_config *config, void *context)
443 {
444     time_t now;
445     struct mk_list *head;
446     struct flb_tail_file *file;
447     struct flb_tail_config *ctx = context;
448 
449     now = time(NULL);
450 
451     /* Iterate static event files with pending bytes */
452     mk_list_foreach(head, &ctx->files_static) {
453         file = mk_list_entry(head, struct flb_tail_file, _head);
454         file_pending_flush(ctx, file, now);
455     }
456 
457     /* Iterate promoted event files with pending bytes */
458     mk_list_foreach(head, &ctx->files_event) {
459         file = mk_list_entry(head, struct flb_tail_file, _head);
460         file_pending_flush(ctx, file, now);
461     }
462 
463     return 0;
464 }
465