1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_input_plugin.h>
23 #include <fluent-bit/flb_pack.h>
24 #include <fluent-bit/flb_unescape.h>
25
26 #include "tail_config.h"
27 #include "tail_dockermode.h"
28 #include "tail_file_internal.h"
29
flb_tail_dmode_create(struct flb_tail_config * ctx,struct flb_input_instance * ins,struct flb_config * config)30 int flb_tail_dmode_create(struct flb_tail_config *ctx,
31 struct flb_input_instance *ins,
32 struct flb_config *config)
33 {
34 const char *tmp;
35
36 if (ctx->multiline == FLB_TRUE) {
37 flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline "
38 "is enabled");
39 return -1;
40 }
41
42 #ifdef FLB_HAVE_REGEX
43 /* First line Parser */
44 tmp = flb_input_get_property("docker_mode_parser", ins);
45 if (tmp) {
46 ctx->docker_mode_parser = flb_parser_get(tmp, config);
47 if (!ctx->docker_mode_parser) {
48 flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
49 }
50 }
51 else {
52 ctx->docker_mode_parser = NULL;
53 }
54 #endif
55
56 tmp = flb_input_get_property("docker_mode_flush", ins);
57 if (!tmp) {
58 ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH;
59 }
60 else {
61 ctx->docker_mode_flush = atoi(tmp);
62 if (ctx->docker_mode_flush <= 0) {
63 ctx->docker_mode_flush = 1;
64 }
65 }
66
67 return 0;
68 }
69
modify_json_cond(char * js,size_t js_len,char ** val,size_t * val_len,char ** out,size_t * out_len,int cond (char *,size_t),int mod (char *,size_t,char **,size_t *,void *),void * data)70 static int modify_json_cond(char *js, size_t js_len,
71 char **val, size_t *val_len,
72 char **out, size_t *out_len,
73 int cond(char*, size_t),
74 int mod(char*, size_t, char**, size_t*, void*), void *data)
75 {
76 int ret;
77 struct flb_pack_state state;
78 jsmntok_t *t;
79 jsmntok_t *t_val = NULL;
80 int i;
81 int i_root = -1;
82 int i_key = -1;
83 char *old_val;
84 size_t old_val_len;
85 char *new_val = NULL;
86 size_t new_val_len = 0;
87 size_t mod_len;
88
89 ret = flb_pack_state_init(&state);
90 if (ret != 0) {
91 ret = -1;
92 goto modify_json_cond_end;
93 }
94
95 ret = flb_json_tokenise(js, js_len, &state);
96 if (ret != 0 || state.tokens_count == 0) {
97 ret = -1;
98 goto modify_json_cond_end;
99 }
100
101 for (i = 0; i < state.tokens_count; i++) {
102 t = &state.tokens[i];
103
104 if (i_key >= 0) {
105 if (t->parent == i_key) {
106 if (t->type == JSMN_STRING) {
107 t_val = t;
108 }
109 break;
110 }
111 continue;
112 }
113
114 if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) {
115 i_root = i;
116 continue;
117 }
118 if (i_root == -1) {
119 continue;
120 }
121
122 if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) {
123 i_key = i;
124 }
125 }
126
127 if (!t_val) {
128 ret = -1;
129 goto modify_json_cond_end;
130 }
131
132 *out = js;
133 *out_len = js_len;
134
135 if (val) {
136 *val = js + t_val->start;
137 }
138 if (val_len) {
139 *val_len = t_val->end - t_val->start;
140 }
141
142 if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) {
143 old_val = js + t_val->start;
144 old_val_len = t_val->end - t_val->start;
145 ret = mod(old_val, old_val_len, &new_val, &new_val_len, data);
146 if (ret != 0) {
147 ret = -1;
148 goto modify_json_cond_end;
149 }
150
151 ret = 1;
152
153 if (new_val == old_val) {
154 goto modify_json_cond_end;
155 }
156
157 mod_len = js_len + new_val_len - old_val_len;
158 *out = flb_malloc(mod_len);
159 if (!*out) {
160 flb_errno();
161 flb_free(new_val);
162 ret = -1;
163 goto modify_json_cond_end;
164 }
165 *out_len = mod_len;
166
167 memcpy(*out, js, t_val->start);
168 memcpy(*out + t_val->start, new_val, new_val_len);
169 memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end);
170
171 flb_free(new_val);
172 }
173
174 modify_json_cond_end:
175 flb_pack_state_reset(&state);
176 if (ret < 0) {
177 *out = NULL;
178 }
179 return ret;
180 }
181
unesc_ends_with_nl(char * str,size_t len)182 static int unesc_ends_with_nl(char *str, size_t len)
183 {
184 char* unesc;
185 int unesc_len;
186 int nl;
187
188 unesc = flb_malloc(len + 1);
189 if (!unesc) {
190 flb_errno();
191 return FLB_FALSE;
192 }
193 unesc_len = flb_unescape_string(str, len, &unesc);
194 nl = unesc[unesc_len - 1] == '\n';
195 flb_free(unesc);
196 return nl;
197 }
198
prepend_sds_to_str(char * str,size_t len,char ** out,size_t * out_len,void * data)199 static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data)
200 {
201 flb_sds_t sds = data;
202
203 if (flb_sds_len(sds) == 0) {
204 *out = str;
205 *out_len = len;
206 return 0;
207 }
208
209 size_t mod_len = flb_sds_len(sds) + len;
210 *out = flb_malloc(mod_len);
211 if (!*out) {
212 flb_errno();
213 return -1;
214 }
215 *out_len = mod_len;
216
217 memcpy(*out, sds, flb_sds_len(sds));
218 memcpy(*out + flb_sds_len(sds), str, len);
219 return 0;
220 }
221
use_sds(char * str,size_t len,char ** out,size_t * out_len,void * data)222 static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data)
223 {
224 flb_sds_t sds = data;
225 size_t mod_len = flb_sds_len(sds);
226 *out = flb_malloc(mod_len);
227 if (!*out) {
228 flb_errno();
229 return -1;
230 }
231 *out_len = mod_len;
232
233 memcpy(*out, sds, flb_sds_len(sds));
234 return 0;
235 }
236
flb_tail_dmode_process_content(time_t now,char * line,size_t line_len,char ** repl_line,size_t * repl_line_len,struct flb_tail_file * file,struct flb_tail_config * ctx,msgpack_sbuffer * mp_sbuf,msgpack_packer * mp_pck)237 int flb_tail_dmode_process_content(time_t now,
238 char* line, size_t line_len,
239 char **repl_line, size_t *repl_line_len,
240 struct flb_tail_file *file,
241 struct flb_tail_config *ctx,
242 msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck
243 )
244 {
245 char* val = NULL;
246 size_t val_len;
247 int ret;
248 void *out_buf = NULL;
249 size_t out_size;
250 struct flb_time out_time = {0};
251 *repl_line = NULL;
252 *repl_line_len = 0;
253 flb_sds_t tmp;
254 flb_sds_t tmp_copy;
255
256 #ifdef FLB_HAVE_REGEX
257 if (ctx->docker_mode_parser) {
258 ret = flb_parser_do(ctx->docker_mode_parser, line, line_len,
259 &out_buf, &out_size, &out_time);
260 flb_free(out_buf);
261
262 /*
263 * Set dmode_firstline if the line meets the first-line requirement
264 */
265 if(ret >= 0) {
266 file->dmode_firstline = true;
267 }
268
269 /*
270 * Process if buffer contains full log line
271 */
272 if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) {
273 /*
274 * Buffered log should be flushed out
275 * as current line meets first-line requirement
276 */
277 if(ret >= 0) {
278 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
279 }
280
281 /*
282 * Flush the buffer if multiline has not been detected yet
283 */
284 if (!file->dmode_firstline) {
285 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
286 }
287 }
288 }
289 #endif
290
291 ret = modify_json_cond(line, line_len,
292 &val, &val_len,
293 repl_line, repl_line_len,
294 unesc_ends_with_nl,
295 prepend_sds_to_str, file->dmode_buf);
296 if (ret >= 0) {
297 /* line is a valid json */
298 flb_sds_len_set(file->dmode_lastline, 0);
299
300 /* concatenate current log line with buffered one */
301 tmp = flb_sds_cat(file->dmode_buf, val, val_len);
302 if (!tmp) {
303 flb_errno();
304 return -1;
305 }
306 file->dmode_buf = tmp;
307
308 tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len);
309 if (!tmp_copy) {
310 flb_errno();
311 return -1;
312 }
313
314 file->dmode_lastline = tmp_copy;
315 file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1);
316
317 if (ret == 0) {
318 /* Line not ended with newline */
319 file->dmode_complete = false;
320 }
321 else {
322 /* Line ended with newline */
323 file->dmode_complete = true;
324 #ifdef FLB_HAVE_REGEX
325 if (!ctx->docker_mode_parser) {
326 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
327 }
328 #else
329 flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
330 #endif
331 }
332 }
333 return ret;
334 }
335
flb_tail_dmode_flush(msgpack_sbuffer * mp_sbuf,msgpack_packer * mp_pck,struct flb_tail_file * file,struct flb_tail_config * ctx)336 void flb_tail_dmode_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
337 struct flb_tail_file *file, struct flb_tail_config *ctx)
338 {
339 int ret;
340 char *repl_line = NULL;
341 size_t repl_line_len = 0;
342 void *out_buf = NULL;
343 size_t out_size;
344 struct flb_time out_time = {0};
345 time_t now = time(NULL);
346
347 if (flb_sds_len(file->dmode_lastline) == 0) {
348 return;
349 }
350
351 flb_time_zero(&out_time);
352
353 ret = modify_json_cond(file->dmode_lastline, flb_sds_len(file->dmode_lastline),
354 NULL, NULL,
355 &repl_line, &repl_line_len,
356 NULL,
357 use_sds, file->dmode_buf);
358 if (ret < 0) {
359 return;
360 }
361
362 flb_sds_len_set(file->dmode_buf, 0);
363 flb_sds_len_set(file->dmode_lastline, 0);
364 file->dmode_flush_timeout = 0;
365
366 #ifdef FLB_HAVE_REGEX
367 if (ctx->parser) {
368 ret = flb_parser_do(ctx->parser, repl_line, repl_line_len,
369 &out_buf, &out_size, &out_time);
370 if (ret >= 0) {
371 if (flb_time_to_double(&out_time) == 0) {
372 flb_time_get(&out_time);
373 }
374 if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) {
375 goto dmode_flush_end;
376 }
377 flb_tail_pack_line_map(mp_sbuf, mp_pck, &out_time,
378 (char**) &out_buf, &out_size, file, 0);
379 goto dmode_flush_end; }
380 }
381 #endif
382 flb_time_get(&out_time);
383 flb_tail_file_pack_line(mp_sbuf, mp_pck, &out_time,
384 repl_line, repl_line_len, file, 0);
385
386 dmode_flush_end:
387 flb_free(repl_line);
388 flb_free(out_buf);
389 }
390
file_pending_flush(struct flb_tail_config * ctx,struct flb_tail_file * file,time_t now)391 static void file_pending_flush(struct flb_tail_config *ctx,
392 struct flb_tail_file *file, time_t now)
393 {
394 msgpack_sbuffer mp_sbuf;
395 msgpack_packer mp_pck;
396
397 if (file->dmode_flush_timeout > now) {
398 return;
399 }
400
401 if (flb_sds_len(file->dmode_lastline) == 0) {
402 return;
403 }
404
405 msgpack_sbuffer_init(&mp_sbuf);
406 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
407
408 flb_tail_dmode_flush(&mp_sbuf, &mp_pck, file, ctx);
409
410 flb_input_chunk_append_raw(ctx->ins,
411 file->tag_buf,
412 file->tag_len,
413 mp_sbuf.data,
414 mp_sbuf.size);
415 msgpack_sbuffer_destroy(&mp_sbuf);
416 }
417
flb_tail_dmode_pending_flush_all(struct flb_tail_config * ctx)418 int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx)
419 {
420 time_t expired;
421 struct mk_list *head;
422 struct flb_tail_file *file;
423
424 expired = time(NULL) + 3600;
425
426 /* Iterate promoted event files with pending bytes */
427 mk_list_foreach(head, &ctx->files_static) {
428 file = mk_list_entry(head, struct flb_tail_file, _head);
429 file_pending_flush(ctx, file, expired);
430 }
431
432 /* Iterate promoted event files with pending bytes */
433 mk_list_foreach(head, &ctx->files_event) {
434 file = mk_list_entry(head, struct flb_tail_file, _head);
435 file_pending_flush(ctx, file, expired);
436 }
437
438 return 0;
439 }
440
flb_tail_dmode_pending_flush(struct flb_input_instance * ins,struct flb_config * config,void * context)441 int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
442 struct flb_config *config, void *context)
443 {
444 time_t now;
445 struct mk_list *head;
446 struct flb_tail_file *file;
447 struct flb_tail_config *ctx = context;
448
449 now = time(NULL);
450
451 /* Iterate static event files with pending bytes */
452 mk_list_foreach(head, &ctx->files_static) {
453 file = mk_list_entry(head, struct flb_tail_file, _head);
454 file_pending_flush(ctx, file, now);
455 }
456
457 /* Iterate promoted event files with pending bytes */
458 mk_list_foreach(head, &ctx->files_event) {
459 file = mk_list_entry(head, struct flb_tail_file, _head);
460 file_pending_flush(ctx, file, now);
461 }
462
463 return 0;
464 }
465