1 /**
2 * Copyright (c) 2007-2015, Timothy Stack
3 *
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice, this
10 * list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of Timothy Stack nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
25 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "config.h"
31
32 #include <stdio.h>
33 #include <stdarg.h>
34 #include <string.h>
35
36 #include <memory>
37
38 #include "base/string_util.hh"
39 #include "fmt/format.h"
40 #include "yajlpp/yajlpp.hh"
41 #include "yajlpp/yajlpp_def.hh"
42 #include "sql_util.hh"
43 #include "log_format_ext.hh"
44 #include "log_vtab_impl.hh"
45 #include "ptimec.hh"
46 #include "log_search_table.hh"
47 #include "command_executor.hh"
48 #include "lnav_util.hh"
49
50 using namespace std;
51
52 static auto intern_lifetime = intern_string::get_table_lifetime();
53 string_attr_type logline::L_PREFIX("prefix");
54 string_attr_type logline::L_TIMESTAMP("timestamp");
55 string_attr_type logline::L_FILE("file");
56 string_attr_type logline::L_PARTITION("partition");
57 string_attr_type logline::L_MODULE("module");
58 string_attr_type logline::L_OPID("opid");
59 string_attr_type logline::L_META("meta");
60
61 external_log_format::mod_map_t external_log_format::MODULE_FORMATS;
62 std::vector<std::shared_ptr<external_log_format>> external_log_format::GRAPH_ORDERED_FORMATS;
63
origin_in_full_msg(const char * msg,ssize_t len) const64 struct line_range logline_value::origin_in_full_msg(const char *msg, ssize_t len) const
65 {
66 if (this->lv_sub_offset == 0) {
67 return this->lv_origin;
68 }
69
70 if (len == -1) {
71 len = strlen(msg);
72 }
73
74 struct line_range retval = this->lv_origin;
75 const char *last = msg, *msg_end = msg + len;
76
77 for (int lpc = 0; lpc < this->lv_sub_offset; lpc++) {
78 const auto *next = (const char *) memchr(last, '\n', msg_end - last);
79 require(next != NULL);
80
81 next += 1;
82 int amount = (next - last);
83
84 retval.lr_start += amount;
85 if (retval.lr_end != -1) {
86 retval.lr_end += amount;
87 }
88
89 last = next + 1;
90 }
91
92 if (retval.lr_end == -1) {
93 const auto *eol = (const char *) memchr(last, '\n', msg_end - last);
94
95 if (eol == nullptr) {
96 retval.lr_end = len;
97 } else {
98 retval.lr_end = eol - msg;
99 }
100 }
101
102 return retval;
103 }
104
logline_value(logline_value_meta lvm,shared_buffer_ref & sbr,struct line_range origin)105 logline_value::logline_value(logline_value_meta lvm, shared_buffer_ref &sbr,
106 struct line_range origin)
107 : lv_meta(std::move(lvm)), lv_origin(origin)
108 {
109 if (sbr.get_data() == nullptr) {
110 this->lv_meta.lvm_kind = value_kind_t::VALUE_NULL;
111 }
112
113 switch (this->lv_meta.lvm_kind) {
114 case value_kind_t::VALUE_JSON:
115 case value_kind_t::VALUE_XML:
116 case value_kind_t::VALUE_STRUCT:
117 case value_kind_t::VALUE_TEXT:
118 case value_kind_t::VALUE_QUOTED:
119 case value_kind_t::VALUE_W3C_QUOTED:
120 case value_kind_t::VALUE_TIMESTAMP:
121 this->lv_sbr.subset(sbr, origin.lr_start, origin.length());
122 break;
123
124 case value_kind_t::VALUE_NULL:
125 break;
126
127 case value_kind_t::VALUE_INTEGER:
128 strtonum(this->lv_value.i, sbr.get_data_at(
129 origin.lr_start), origin.length());
130 break;
131
132 case value_kind_t::VALUE_FLOAT: {
133 ssize_t len = origin.length();
134 char scan_value[len + 1];
135
136 memcpy(scan_value, sbr.get_data_at(origin.lr_start), len);
137 scan_value[len] = '\0';
138 this->lv_value.d = strtod(scan_value, nullptr);
139 break;
140 }
141
142 case value_kind_t::VALUE_BOOLEAN:
143 if (strncmp(sbr.get_data_at(origin.lr_start), "true", origin.length()) == 0 ||
144 strncmp(sbr.get_data_at(origin.lr_start), "yes", origin.length()) == 0) {
145 this->lv_value.i = 1;
146 }
147 else {
148 this->lv_value.i = 0;
149 }
150 break;
151
152 case value_kind_t::VALUE_UNKNOWN:
153 case value_kind_t::VALUE__MAX:
154 ensure(0);
155 break;
156 }
157 }
158
to_string() const159 std::string logline_value::to_string() const
160 {
161 char buffer[128];
162
163 switch (this->lv_meta.lvm_kind) {
164 case value_kind_t::VALUE_NULL:
165 return "null";
166
167 case value_kind_t::VALUE_JSON:
168 case value_kind_t::VALUE_XML:
169 case value_kind_t::VALUE_STRUCT:
170 case value_kind_t::VALUE_TEXT:
171 case value_kind_t::VALUE_TIMESTAMP:
172 if (this->lv_sbr.empty()) {
173 return this->lv_intern_string.to_string();
174 }
175 return std::string(this->lv_sbr.get_data(), this->lv_sbr.length());
176
177 case value_kind_t::VALUE_QUOTED:
178 case value_kind_t::VALUE_W3C_QUOTED:
179 if (this->lv_sbr.length() == 0) {
180 return "";
181 } else {
182 switch (this->lv_sbr.get_data()[0]) {
183 case '\'':
184 case '"': {
185 auto unquote_func = this->lv_meta.lvm_kind == value_kind_t::VALUE_W3C_QUOTED ?
186 unquote_w3c : unquote;
187 char unquoted_str[this->lv_sbr.length()];
188 size_t unquoted_len;
189
190 unquoted_len = unquote_func(unquoted_str,
191 this->lv_sbr.get_data(),
192 this->lv_sbr.length());
193 return std::string(unquoted_str, unquoted_len);
194 }
195 default:
196 return std::string(this->lv_sbr.get_data(), this->lv_sbr.length());
197 }
198 }
199 break;
200
201 case value_kind_t::VALUE_INTEGER:
202 snprintf(buffer, sizeof(buffer), "%" PRId64, this->lv_value.i);
203 break;
204
205 case value_kind_t::VALUE_FLOAT:
206 snprintf(buffer, sizeof(buffer), "%lf", this->lv_value.d);
207 break;
208
209 case value_kind_t::VALUE_BOOLEAN:
210 if (this->lv_value.i) {
211 return "true";
212 }
213 else {
214 return "false";
215 }
216 break;
217 case value_kind_t::VALUE_UNKNOWN:
218 case value_kind_t::VALUE__MAX:
219 ensure(0);
220 break;
221 }
222
223 return std::string(buffer);
224 }
225
226 vector<std::shared_ptr<log_format>> log_format::lf_root_formats;
227
get_root_formats()228 vector<std::shared_ptr<log_format>> &log_format::get_root_formats()
229 {
230 return lf_root_formats;
231 }
232
next_format(const std::vector<std::shared_ptr<external_log_format::pattern>> & patterns,int & index,int & locked_index)233 static bool next_format(const std::vector<std::shared_ptr<external_log_format::pattern>> &patterns,
234 int &index,
235 int &locked_index)
236 {
237 bool retval = true;
238
239 if (locked_index == -1) {
240 index += 1;
241 if (index >= (int)patterns.size()) {
242 retval = false;
243 }
244 }
245 else if (index == locked_index) {
246 retval = false;
247 }
248 else {
249 index = locked_index;
250 }
251
252 return retval;
253 }
254
next_format(pcre_format * fmt,int & index,int & locked_index)255 bool log_format::next_format(pcre_format *fmt, int &index, int &locked_index)
256 {
257 bool retval = true;
258
259 if (locked_index == -1) {
260 index += 1;
261 if (fmt[index].name == NULL) {
262 retval = false;
263 }
264 }
265 else if (index == locked_index) {
266 retval = false;
267 }
268 else {
269 index = locked_index;
270 }
271
272 return retval;
273 }
274
log_scanf(uint32_t line_number,const char * line,size_t len,pcre_format * fmt,const char * time_fmt[],struct exttm * tm_out,struct timeval * tv_out,...)275 const char *log_format::log_scanf(uint32_t line_number,
276 const char *line,
277 size_t len,
278 pcre_format *fmt,
279 const char *time_fmt[],
280 struct exttm *tm_out,
281 struct timeval *tv_out,
282 ...)
283 {
284 int curr_fmt = -1;
285 const char *retval = NULL;
286 bool done = false;
287 pcre_input pi(line, 0, len);
288 pcre_context_static<128> pc;
289 va_list args;
290 int pat_index = this->last_pattern_index();
291
292 while (!done && next_format(fmt, curr_fmt, pat_index)) {
293 va_start(args, tv_out);
294
295 pi.reset(line, 0, len);
296 if (!fmt[curr_fmt].pcre.match(pc, pi, PCRE_NO_UTF8_CHECK)) {
297 retval = NULL;
298 }
299 else {
300 pcre_context::capture_t *ts = pc[fmt[curr_fmt].pf_timestamp_index];
301
302 for (auto &iter : pc) {
303 pcre_context::capture_t *cap = va_arg(
304 args, pcre_context::capture_t *);
305
306 *cap = iter;
307 }
308
309 retval = this->lf_date_time.scan(
310 pi.get_substr_start(ts), ts->length(), NULL, tm_out, *tv_out);
311
312 if (retval) {
313 if (curr_fmt != pat_index) {
314 uint32_t lock_line;
315
316 if (this->lf_pattern_locks.empty()) {
317 lock_line = 0;
318 } else {
319 lock_line = line_number;
320 }
321
322 this->lf_pattern_locks.emplace_back(lock_line, curr_fmt);
323 }
324 this->lf_timestamp_flags = tm_out->et_flags;
325 done = true;
326 }
327 }
328
329 va_end(args);
330 }
331
332 return retval;
333 }
334
check_for_new_year(std::vector<logline> & dst,exttm etm,struct timeval log_tv)335 void log_format::check_for_new_year(std::vector<logline> &dst, exttm etm,
336 struct timeval log_tv)
337 {
338 if (dst.empty()) {
339 return;
340 }
341
342 time_t diff = dst.back().get_time() - log_tv.tv_sec;
343 int off_year = 0, off_month = 0, off_day = 0, off_hour = 0;
344 bool do_change = true;
345
346 if (diff <= 0) {
347 return;
348 }
349 if ((etm.et_flags & ETF_MONTH_SET) && diff >= (24 * 60 * 60)) {
350 off_year = 1;
351 } else if (diff >= (24 * 60 * 60)) {
352 off_month = 1;
353 } else if (!(etm.et_flags & ETF_DAY_SET) && (diff >= (60 * 60))) {
354 off_day = 1;
355 } else if (!(etm.et_flags & ETF_DAY_SET)) {
356 off_hour = 1;
357 } else {
358 do_change = false;
359 }
360
361 if (!do_change) {
362 return;
363 }
364 log_debug("%d:detected time rollover; offsets=%d %d %d %d", dst.size(),
365 off_year, off_month, off_day, off_hour);
366 for (auto &ll : dst) {
367 time_t ot = ll.get_time();
368 struct tm otm;
369
370 gmtime_r(&ot, &otm);
371 otm.tm_year -= off_year;
372 otm.tm_mon -= off_month;
373 otm.tm_mday -= off_day;
374 otm.tm_hour -= off_hour;
375 auto new_time = tm2sec(&otm);
376 if (new_time == -1) {
377 continue;
378 }
379 ll.set_time(new_time);
380 }
381 }
382
383 /*
384 * XXX This needs some cleanup.
385 */
386 struct json_log_userdata {
json_log_userdatajson_log_userdata387 json_log_userdata(shared_buffer_ref &sbr)
388 : jlu_format(NULL), jlu_line(NULL), jlu_base_line(NULL),
389 jlu_sub_line_count(1), jlu_handle(NULL), jlu_line_value(NULL),
390 jlu_line_size(0), jlu_sub_start(0), jlu_shared_buffer(sbr) {
391
392 };
393
394 external_log_format *jlu_format;
395 const logline *jlu_line;
396 logline *jlu_base_line;
397 int jlu_sub_line_count;
398 yajl_handle jlu_handle;
399 const char *jlu_line_value;
400 size_t jlu_line_size;
401 size_t jlu_sub_start;
402 shared_buffer_ref &jlu_shared_buffer;
403 };
404
405 static int read_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len);
406
read_json_null(yajlpp_parse_context * ypc)407 static int read_json_null(yajlpp_parse_context *ypc)
408 {
409 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
410 const intern_string_t field_name = ypc->get_path();
411
412 jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
413 field_name, ypc->is_level(1));
414
415 return 1;
416 }
417
read_json_bool(yajlpp_parse_context * ypc,int val)418 static int read_json_bool(yajlpp_parse_context *ypc, int val)
419 {
420 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
421 const intern_string_t field_name = ypc->get_path();
422
423 jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
424 field_name, ypc->is_level(1));
425
426 return 1;
427 }
428
read_json_int(yajlpp_parse_context * ypc,long long val)429 static int read_json_int(yajlpp_parse_context *ypc, long long val)
430 {
431 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
432 const intern_string_t field_name = ypc->get_path();
433
434 if (jlu->jlu_format->lf_timestamp_field == field_name) {
435 long long divisor = jlu->jlu_format->elf_timestamp_divisor;
436 struct timeval tv;
437
438 tv.tv_sec = val / divisor;
439 tv.tv_usec = (val % divisor) * (1000000.0 / divisor);
440 jlu->jlu_base_line->set_time(tv);
441 }
442 else if (jlu->jlu_format->elf_level_field == field_name) {
443 if (jlu->jlu_format->elf_level_pairs.empty()) {
444 char level_buf[128];
445
446 snprintf(level_buf, sizeof(level_buf), "%lld", val);
447
448 pcre_input pi(level_buf);
449 pcre_context::capture_t level_cap = {0, (int) strlen(level_buf)};
450
451 jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(pi, &level_cap));
452 } else {
453 vector<pair<int64_t, log_level_t> >::iterator iter;
454
455 for (iter = jlu->jlu_format->elf_level_pairs.begin();
456 iter != jlu->jlu_format->elf_level_pairs.end();
457 ++iter) {
458 if (iter->first == val) {
459 jlu->jlu_base_line->set_level(iter->second);
460 break;
461 }
462 }
463 }
464 }
465
466 jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
467 field_name, ypc->is_level(1));
468
469 return 1;
470 }
471
read_json_double(yajlpp_parse_context * ypc,double val)472 static int read_json_double(yajlpp_parse_context *ypc, double val)
473 {
474 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
475 const intern_string_t field_name = ypc->get_path();
476
477 if (jlu->jlu_format->lf_timestamp_field == field_name) {
478 double divisor = jlu->jlu_format->elf_timestamp_divisor;
479 struct timeval tv;
480
481 tv.tv_sec = val / divisor;
482 tv.tv_usec = fmod(val, divisor) * (1000000.0 / divisor);
483 jlu->jlu_base_line->set_time(tv);
484 }
485
486 jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
487 field_name, ypc->is_level(1));
488
489 return 1;
490 }
491
json_array_start(void * ctx)492 static int json_array_start(void *ctx)
493 {
494 yajlpp_parse_context *ypc = (yajlpp_parse_context *)ctx;
495 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
496
497 if (ypc->ypc_path_index_stack.size() == 2) {
498 const intern_string_t field_name = ypc->get_path_fragment_i(0);
499
500 jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(field_name, true);
501 jlu->jlu_sub_start = yajl_get_bytes_consumed(jlu->jlu_handle) - 1;
502 }
503
504 return 1;
505 }
506
json_array_end(void * ctx)507 static int json_array_end(void *ctx)
508 {
509 yajlpp_parse_context *ypc = (yajlpp_parse_context *)ctx;
510 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
511
512 if (ypc->ypc_path_index_stack.size() == 1) {
513 const intern_string_t field_name = ypc->get_path_fragment_i(0);
514 size_t sub_end = yajl_get_bytes_consumed(jlu->jlu_handle);
515 shared_buffer_ref sbr;
516
517 sbr.subset(jlu->jlu_shared_buffer, jlu->jlu_sub_start,
518 sub_end - jlu->jlu_sub_start);
519 jlu->jlu_format->jlf_line_values.emplace_back(jlu->jlu_format->
520 get_value_meta(field_name, value_kind_t::VALUE_JSON), sbr);
521 }
522
523 return 1;
524 }
525
526 static struct json_path_container json_log_handlers = {
527 json_path_handler(pcrepp("\\w+"))
528 .add_cb(read_json_null)
529 .add_cb(read_json_bool)
530 .add_cb(read_json_int)
531 .add_cb(read_json_double)
532 .add_cb(read_json_field)
533 };
534
535 static int rewrite_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len);
536
rewrite_json_null(yajlpp_parse_context * ypc)537 static int rewrite_json_null(yajlpp_parse_context *ypc)
538 {
539 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
540 const intern_string_t field_name = ypc->get_path();
541
542 if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
543 return 1;
544 }
545 jlu->jlu_format->jlf_line_values.emplace_back(jlu->jlu_format->
546 get_value_meta(field_name, value_kind_t::VALUE_NULL));
547
548 return 1;
549 }
550
rewrite_json_bool(yajlpp_parse_context * ypc,int val)551 static int rewrite_json_bool(yajlpp_parse_context *ypc, int val)
552 {
553 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
554 const intern_string_t field_name = ypc->get_path();
555
556 if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
557 return 1;
558 }
559 jlu->jlu_format->jlf_line_values.emplace_back(
560 jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_BOOLEAN),
561 (bool) val);
562 return 1;
563 }
564
rewrite_json_int(yajlpp_parse_context * ypc,long long val)565 static int rewrite_json_int(yajlpp_parse_context *ypc, long long val)
566 {
567 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
568 const intern_string_t field_name = ypc->get_path();
569
570 if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
571 return 1;
572 }
573 jlu->jlu_format->jlf_line_values.emplace_back(
574 jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_INTEGER),
575 (int64_t) val);
576 return 1;
577 }
578
rewrite_json_double(yajlpp_parse_context * ypc,double val)579 static int rewrite_json_double(yajlpp_parse_context *ypc, double val)
580 {
581 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
582 const intern_string_t field_name = ypc->get_path();
583
584 if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
585 return 1;
586 }
587 jlu->jlu_format->jlf_line_values.emplace_back(
588 jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_FLOAT),
589 val);
590
591 return 1;
592 }
593
594 static struct json_path_container json_log_rewrite_handlers = {
595 json_path_handler(pcrepp("\\w+"))
596 .add_cb(rewrite_json_null)
597 .add_cb(rewrite_json_bool)
598 .add_cb(rewrite_json_int)
599 .add_cb(rewrite_json_double)
600 .add_cb(rewrite_json_field)
601 };
602
scan_for_partial(shared_buffer_ref & sbr,size_t & len_out) const603 bool external_log_format::scan_for_partial(shared_buffer_ref &sbr, size_t &len_out) const
604 {
605 if (this->elf_type != ELF_TYPE_TEXT) {
606 return false;
607 }
608
609 auto pat = this->elf_pattern_order[this->last_pattern_index()];
610 pcre_input pi(sbr.get_data(), 0, sbr.length());
611
612 if (!this->elf_multiline) {
613 len_out = pat->p_pcre->match_partial(pi);
614 return true;
615 }
616
617 if (pat->p_timestamp_end == -1 || pat->p_timestamp_end > (int)sbr.length()) {
618 len_out = 0;
619 return false;
620 }
621
622 len_out = pat->p_pcre->match_partial(pi);
623 return (int)len_out > pat->p_timestamp_end;
624 }
625
scan(logfile & lf,std::vector<logline> & dst,const line_info & li,shared_buffer_ref & sbr)626 log_format::scan_result_t external_log_format::scan(logfile &lf,
627 std::vector<logline> &dst,
628 const line_info &li,
629 shared_buffer_ref &sbr)
630 {
631 if (this->elf_type == ELF_TYPE_JSON) {
632 yajlpp_parse_context &ypc = *(this->jlf_parse_context);
633 logline ll(li.li_file_range.fr_offset, 0, 0, LEVEL_INFO);
634 yajl_handle handle = this->jlf_yajl_handle.get();
635 json_log_userdata jlu(sbr);
636
637 if (!this->lf_specialized && dst.size() >= 3) {
638 return log_format::SCAN_NO_MATCH;
639 }
640
641 if (li.li_partial) {
642 log_debug("skipping partial line at offset %d", li.li_file_range.fr_offset);
643 return log_format::SCAN_INCOMPLETE;
644 }
645
646 const auto *line_data = (const unsigned char *) sbr.get_data();
647
648 yajl_reset(handle);
649 ypc.set_static_handler(json_log_handlers.jpc_children[0]);
650 ypc.ypc_userdata = &jlu;
651 ypc.ypc_ignore_unused = true;
652 ypc.ypc_alt_callbacks.yajl_start_array = json_array_start;
653 ypc.ypc_alt_callbacks.yajl_start_map = json_array_start;
654 ypc.ypc_alt_callbacks.yajl_end_array = nullptr;
655 ypc.ypc_alt_callbacks.yajl_end_map = nullptr;
656 jlu.jlu_format = this;
657 jlu.jlu_base_line = ≪
658 jlu.jlu_line_value = sbr.get_data();
659 jlu.jlu_line_size = sbr.length();
660 jlu.jlu_handle = handle;
661 if (yajl_parse(handle, line_data, sbr.length()) == yajl_status_ok &&
662 yajl_complete_parse(handle) == yajl_status_ok) {
663 if (ll.get_time() == 0) {
664 return log_format::SCAN_NO_MATCH;
665 }
666
667 jlu.jlu_sub_line_count += this->jlf_line_format_init_count;
668 for (int lpc = 0; lpc < jlu.jlu_sub_line_count; lpc++) {
669 ll.set_sub_offset(lpc);
670 if (lpc > 0) {
671 ll.set_level((log_level_t) (ll.get_level_and_flags() |
672 LEVEL_CONTINUED));
673 }
674 dst.emplace_back(ll);
675 }
676 }
677 else {
678 unsigned char *msg;
679 int line_count = 2;
680
681 msg = yajl_get_error(handle, 1, (const unsigned char *)sbr.get_data(), sbr.length());
682 if (msg != nullptr) {
683 log_debug("Unable to parse line at offset %d: %s", li.li_file_range.fr_offset, msg);
684 line_count = count(msg, msg + strlen((char *) msg), '\n') + 1;
685 yajl_free_error(handle, msg);
686 }
687 if (!this->lf_specialized) {
688 return log_format::SCAN_NO_MATCH;
689 }
690 for (int lpc = 0; lpc < line_count; lpc++) {
691 log_level_t level = LEVEL_INVALID;
692
693 ll.set_time(dst.back().get_time());
694 if (lpc > 0) {
695 level = (log_level_t) (level | LEVEL_CONTINUED);
696 }
697 ll.set_level(level);
698 ll.set_sub_offset(lpc);
699 dst.emplace_back(ll);
700 }
701 }
702
703 return log_format::SCAN_MATCH;
704 }
705
706 pcre_input pi(sbr.get_data(), 0, sbr.length());
707 pcre_context_static<128> pc;
708 int curr_fmt = -1, orig_lock = this->last_pattern_index();
709 int pat_index = orig_lock;
710
711 while (::next_format(this->elf_pattern_order, curr_fmt, pat_index)) {
712 auto fpat = this->elf_pattern_order[curr_fmt];
713 auto& pat = fpat->p_pcre;
714
715 if (fpat->p_module_format) {
716 continue;
717 }
718
719 if (!pat->match(pc, pi, PCRE_NO_UTF8_CHECK)) {
720 if (!this->lf_pattern_locks.empty() && pat_index != -1) {
721 curr_fmt = -1;
722 pat_index = -1;
723 }
724 continue;
725 }
726
727 pcre_context::capture_t *ts = pc[fpat->p_timestamp_field_index];
728 pcre_context::capture_t *level_cap = pc[fpat->p_level_field_index];
729 pcre_context::capture_t *mod_cap = pc[fpat->p_module_field_index];
730 pcre_context::capture_t *opid_cap = pc[fpat->p_opid_field_index];
731 pcre_context::capture_t *body_cap = pc[fpat->p_body_field_index];
732 const char *ts_str = pi.get_substr_start(ts);
733 const char *last;
734 struct exttm log_time_tm;
735 struct timeval log_tv;
736 uint8_t mod_index = 0, opid = 0;
737
738 if ((last = this->lf_date_time.scan(ts_str,
739 ts->length(),
740 this->get_timestamp_formats(),
741 &log_time_tm,
742 log_tv)) == nullptr) {
743 this->lf_date_time.unlock();
744 if ((last = this->lf_date_time.scan(ts_str,
745 ts->length(),
746 this->get_timestamp_formats(),
747 &log_time_tm,
748 log_tv)) == nullptr) {
749 continue;
750 }
751 }
752
753 log_level_t level = this->convert_level(pi, level_cap);
754
755 this->lf_timestamp_flags = log_time_tm.et_flags;
756
757 if (!((log_time_tm.et_flags & ETF_DAY_SET) &&
758 (log_time_tm.et_flags & ETF_MONTH_SET) &&
759 (log_time_tm.et_flags & ETF_YEAR_SET))) {
760 this->check_for_new_year(dst, log_time_tm, log_tv);
761 }
762
763 if (opid_cap != nullptr) {
764 opid = hash_str(pi.get_substr_start(opid_cap), opid_cap->length());
765 }
766
767 if (mod_cap != nullptr) {
768 intern_string_t mod_name = intern_string::lookup(
769 pi.get_substr_start(mod_cap), mod_cap->length());
770 auto mod_iter = MODULE_FORMATS.find(mod_name);
771
772 if (mod_iter == MODULE_FORMATS.end()) {
773 mod_index = module_scan(pi, body_cap, mod_name);
774 mod_iter = MODULE_FORMATS.find(mod_name);
775 }
776 else if (mod_iter->second.mf_mod_format) {
777 mod_index = mod_iter->second.mf_mod_format->lf_mod_index;
778 }
779
780 if (mod_index && level_cap && body_cap) {
781 auto mod_elf = dynamic_pointer_cast<external_log_format>(
782 mod_iter->second.mf_mod_format);
783
784 if (mod_elf) {
785 pcre_context_static<128> mod_pc;
786 shared_buffer_ref body_ref;
787
788 body_cap->ltrim(sbr.get_data());
789
790 pcre_input mod_pi(pi.get_substr_start(body_cap),
791 0,
792 body_cap->length());
793 int mod_pat_index = mod_elf->last_pattern_index();
794 pattern &mod_pat = *mod_elf->elf_pattern_order[mod_pat_index];
795
796 if (mod_pat.p_pcre->match(mod_pc, mod_pi)) {
797 auto mod_level_cap = mod_pc[mod_pat.p_level_field_index];
798
799 level = mod_elf->convert_level(mod_pi, mod_level_cap);
800 }
801 }
802 }
803 }
804
805 for (auto value_index : fpat->p_numeric_value_indexes) {
806 const indexed_value_def &ivd = fpat->p_value_by_index[value_index];
807 const value_def &vd = *ivd.ivd_value_def;
808 pcre_context::capture_t *num_cap = pc[ivd.ivd_index];
809
810 if (num_cap != nullptr && num_cap->is_valid()) {
811 const struct scaling_factor *scaling = nullptr;
812
813 if (ivd.ivd_unit_field_index >= 0) {
814 pcre_context::iterator unit_cap = pc[ivd.ivd_unit_field_index];
815
816 if (unit_cap != nullptr && unit_cap->is_valid()) {
817 intern_string_t unit_val = intern_string::lookup(
818 pi.get_substr_start(unit_cap), unit_cap->length());
819 std::map<const intern_string_t, scaling_factor>::const_iterator unit_iter;
820
821 unit_iter = vd.vd_unit_scaling.find(unit_val);
822 if (unit_iter != vd.vd_unit_scaling.end()) {
823 const struct scaling_factor &sf = unit_iter->second;
824
825 scaling = &sf;
826 }
827 }
828 }
829
830 const char *num_cap_start = pi.get_substr_start(num_cap);
831 const char *num_cap_end = num_cap_start + num_cap->length();
832 double dvalue = strtod(num_cap_start, (char **) &num_cap_end);
833
834 if (num_cap_end == num_cap_start + num_cap->length()) {
835 if (scaling != nullptr) {
836 scaling->scale(dvalue);
837 }
838 this->lf_value_stats[vd.vd_values_index].add_value(dvalue);
839 }
840 }
841 }
842
843 dst.emplace_back(li.li_file_range.fr_offset, log_tv, level, mod_index, opid);
844
845 if (orig_lock != curr_fmt) {
846 uint32_t lock_line;
847
848 log_debug("%zu: changing pattern lock %d -> %d",
849 dst.size() - 1, orig_lock, curr_fmt);
850 if (this->lf_pattern_locks.empty()) {
851 lock_line = 0;
852 } else {
853 lock_line = dst.size() - 1;
854 }
855 this->lf_pattern_locks.emplace_back(lock_line, curr_fmt);
856 }
857 return log_format::SCAN_MATCH;
858 }
859
860 if (this->lf_specialized && !this->elf_multiline) {
861 auto& last_line = dst.back();
862
863 dst.emplace_back(li.li_file_range.fr_offset,
864 last_line.get_timeval(),
865 log_level_t::LEVEL_INVALID);
866
867 return log_format::SCAN_MATCH;
868 }
869
870 return log_format::SCAN_NO_MATCH;
871 }
872
module_scan(const pcre_input & pi,pcre_context::capture_t * body_cap,const intern_string_t & mod_name)873 uint8_t external_log_format::module_scan(const pcre_input &pi,
874 pcre_context::capture_t *body_cap,
875 const intern_string_t &mod_name)
876 {
877 uint8_t mod_index;
878 body_cap->ltrim(pi.get_string());
879 pcre_input body_pi(pi.get_substr_start(body_cap), 0, body_cap->length());
880 auto& ext_fmts = GRAPH_ORDERED_FORMATS;
881 pcre_context_static<128> pc;
882 module_format mf;
883
884 for (auto& elf : ext_fmts) {
885 int curr_fmt = -1, fmt_lock = -1;
886
887 while (::next_format(elf->elf_pattern_order, curr_fmt, fmt_lock)) {
888 auto fpat = elf->elf_pattern_order[curr_fmt];
889 auto& pat = fpat->p_pcre;
890
891 if (!fpat->p_module_format) {
892 continue;
893 }
894
895 if (!pat->match(pc, body_pi)) {
896 continue;
897 }
898
899 log_debug("%s:module format found -- %s (%d)",
900 mod_name.get(),
901 elf->get_name().get(),
902 elf->lf_mod_index);
903
904 mod_index = elf->lf_mod_index;
905 mf.mf_mod_format = elf->specialized(curr_fmt);
906 MODULE_FORMATS[mod_name] = mf;
907
908 return mod_index;
909 }
910 }
911
912 MODULE_FORMATS[mod_name] = mf;
913
914 return 0;
915 }
916
annotate(uint64_t line_number,shared_buffer_ref & line,string_attrs_t & sa,std::vector<logline_value> & values,bool annotate_module) const917 void external_log_format::annotate(uint64_t line_number, shared_buffer_ref &line, string_attrs_t &sa,
918 std::vector<logline_value> &values, bool annotate_module) const
919 {
920 pcre_context_static<128> pc;
921 pcre_input pi(line.get_data(), 0, line.length());
922 struct line_range lr;
923 pcre_context::capture_t *cap, *body_cap, *module_cap = nullptr;
924
925 if (this->elf_type != ELF_TYPE_TEXT) {
926 values = this->jlf_line_values;
927 sa = this->jlf_line_attrs;
928 return;
929 }
930
931 if (line.empty()) {
932 return;
933 }
934
935 int pat_index = this->pattern_index_for_line(line_number);
936 pattern &pat = *this->elf_pattern_order[pat_index];
937
938 if (!pat.p_pcre->match(pc, pi, PCRE_NO_UTF8_CHECK)) {
939 // A continued line still needs a body.
940 lr.lr_start = 0;
941 lr.lr_end = line.length();
942 sa.emplace_back(lr, &SA_BODY);
943 if (!this->elf_multiline) {
944 auto len = pat.p_pcre->match_partial(pi);
945 sa.emplace_back(line_range{(int) len, -1},
946 &SA_INVALID,
947 (void *) "Log line does not match any pattern");
948 }
949 return;
950 }
951
952 if (!pat.p_module_format) {
953 cap = pc[pat.p_timestamp_field_index];
954 if (cap->is_valid()) {
955 lr.lr_start = cap->c_begin;
956 lr.lr_end = cap->c_end;
957 sa.emplace_back(lr, &logline::L_TIMESTAMP);
958 }
959
960 if (pat.p_module_field_index != -1) {
961 module_cap = pc[pat.p_module_field_index];
962 if (module_cap != nullptr && module_cap->is_valid()) {
963 lr.lr_start = module_cap->c_begin;
964 lr.lr_end = module_cap->c_end;
965 sa.emplace_back(lr, &logline::L_MODULE);
966 }
967 }
968
969 cap = pc[pat.p_opid_field_index];
970 if (cap != nullptr && cap->is_valid()) {
971 lr.lr_start = cap->c_begin;
972 lr.lr_end = cap->c_end;
973 sa.emplace_back(lr, &logline::L_OPID);
974 }
975 }
976
977 body_cap = pc[pat.p_body_field_index];
978
979 for (size_t lpc = 0; lpc < pat.p_value_by_index.size(); lpc++) {
980 const indexed_value_def &ivd = pat.p_value_by_index[lpc];
981 const struct scaling_factor *scaling = nullptr;
982 pcre_context::capture_t *cap = pc[ivd.ivd_index];
983 const value_def &vd = *ivd.ivd_value_def;
984
985 if (ivd.ivd_unit_field_index >= 0) {
986 pcre_context::iterator unit_cap = pc[ivd.ivd_unit_field_index];
987
988 if (unit_cap != nullptr && unit_cap->c_begin != -1) {
989 intern_string_t unit_val = intern_string::lookup(
990 pi.get_substr_start(unit_cap), unit_cap->length());
991 auto unit_iter = vd.vd_unit_scaling.find(unit_val);
992 if (unit_iter != vd.vd_unit_scaling.end()) {
993 const struct scaling_factor &sf = unit_iter->second;
994
995 scaling = &sf;
996 }
997 }
998 }
999
1000 if (cap->is_valid()) {
1001 values.emplace_back(vd.vd_meta,
1002 line,
1003 line_range{cap->c_begin, cap->c_end});
1004 values.back().apply_scaling(scaling);
1005 } else {
1006 values.emplace_back(vd.vd_meta);
1007 }
1008 if (pat.p_module_format) {
1009 values.back().lv_meta.lvm_from_module = true;
1010 }
1011 }
1012
1013 bool did_mod_annotate_body = false;
1014 if (annotate_module && module_cap != nullptr && body_cap != nullptr &&
1015 body_cap->is_valid()) {
1016 intern_string_t mod_name = intern_string::lookup(
1017 pi.get_substr_start(module_cap), module_cap->length());
1018 auto mod_iter = MODULE_FORMATS.find(mod_name);
1019
1020 if (mod_iter != MODULE_FORMATS.end() &&
1021 mod_iter->second.mf_mod_format != nullptr) {
1022 module_format &mf = mod_iter->second;
1023 shared_buffer_ref body_ref;
1024
1025 body_cap->ltrim(line.get_data());
1026 body_ref.subset(line, body_cap->c_begin, body_cap->length());
1027
1028 auto pre_mod_values_size = values.size();
1029 auto pre_mod_sa_size = sa.size();
1030 mf.mf_mod_format->annotate(line_number, body_ref, sa, values, false);
1031 for (size_t lpc = pre_mod_values_size; lpc < values.size(); lpc++) {
1032 values[lpc].lv_origin.shift(0, body_cap->c_begin);
1033 }
1034 for (size_t lpc = pre_mod_sa_size; lpc < sa.size(); lpc++) {
1035 sa[lpc].sa_range.shift(0, body_cap->c_begin);
1036 }
1037 did_mod_annotate_body = true;
1038 }
1039 }
1040 if (!did_mod_annotate_body) {
1041 if (body_cap != nullptr && body_cap->is_valid()) {
1042 lr.lr_start = body_cap->c_begin;
1043 lr.lr_end = body_cap->c_end;
1044 }
1045 else {
1046 lr.lr_start = line.length();
1047 lr.lr_end = line.length();
1048 }
1049 sa.emplace_back(lr, &SA_BODY);
1050 }
1051 }
1052
rewrite(exec_context & ec,shared_buffer_ref & line,string_attrs_t & sa,string & value_out)1053 void external_log_format::rewrite(exec_context &ec,
1054 shared_buffer_ref &line,
1055 string_attrs_t &sa,
1056 string &value_out)
1057 {
1058 vector<logline_value>::iterator shift_iter;
1059 auto &values = *ec.ec_line_values;
1060
1061 value_out.assign(line.get_data(), line.length());
1062
1063 for (auto iter = values.begin(); iter != values.end(); ++iter) {
1064 if (!iter->lv_origin.is_valid()) {
1065 log_debug("not rewriting value with invalid origin -- %s", iter->lv_meta.lvm_name.get());
1066 continue;
1067 }
1068
1069 auto vd_iter = this->elf_value_defs.find(iter->lv_meta.lvm_name);
1070 if (vd_iter == this->elf_value_defs.end()) {
1071 log_debug("not rewriting undefined value -- %s", iter->lv_meta.lvm_name.get());
1072 continue;
1073 }
1074
1075 const auto &vd = *vd_iter->second;
1076
1077 if (vd.vd_rewriter.empty()) {
1078 continue;
1079 }
1080
1081 auto _sg = ec.enter_source(this->elf_name.to_string() +
1082 ":" +
1083 vd_iter->first.to_string(),
1084 1);
1085 auto field_value = execute_any(ec, vd.vd_rewriter)
1086 .orElse(err_to_ok).unwrap();
1087 struct line_range adj_origin = iter->origin_in_full_msg(
1088 value_out.c_str(), value_out.length());
1089
1090 value_out.erase(adj_origin.lr_start, adj_origin.length());
1091
1092 int32_t shift_amount = field_value.length() - adj_origin.length();
1093 value_out.insert(adj_origin.lr_start, field_value);
1094 for (shift_iter = values.begin();
1095 shift_iter != values.end(); ++shift_iter) {
1096 shift_iter->lv_origin.shift(adj_origin.lr_start, shift_amount);
1097 }
1098 shift_string_attrs(sa, adj_origin.lr_start, shift_amount);
1099 }
1100 }
1101
read_json_field(yajlpp_parse_context * ypc,const unsigned char * str,size_t len)1102 static int read_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len)
1103 {
1104 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
1105 const intern_string_t field_name = ypc->get_path();
1106 struct exttm tm_out;
1107 struct timeval tv_out;
1108
1109 if (jlu->jlu_format->lf_timestamp_field == field_name) {
1110 jlu->jlu_format->lf_date_time.scan((const char *)str, len, jlu->jlu_format->get_timestamp_formats(), &tm_out, tv_out);
1111 // Leave off the machine oriented flag since we convert it anyhow
1112 jlu->jlu_format->lf_timestamp_flags = tm_out.et_flags & ~ETF_MACHINE_ORIENTED;
1113 jlu->jlu_base_line->set_time(tv_out);
1114 }
1115 else if (jlu->jlu_format->elf_level_field == field_name) {
1116 pcre_input pi((const char *) str, 0, len);
1117 pcre_context::capture_t level_cap = {0, (int) len};
1118
1119 jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(pi, &level_cap));
1120 }
1121 else if (jlu->jlu_format->elf_opid_field == field_name) {
1122 uint8_t opid = hash_str((const char *) str, len);
1123 jlu->jlu_base_line->set_opid(opid);
1124 }
1125
1126 jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
1127 field_name, ypc->is_level(1), str, len);
1128
1129 return 1;
1130 }
1131
rewrite_json_field(yajlpp_parse_context * ypc,const unsigned char * str,size_t len)1132 static int rewrite_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len)
1133 {
1134 static const intern_string_t body_name = intern_string::lookup("body", -1);
1135 json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
1136 const intern_string_t field_name = ypc->get_path();
1137
1138 if (jlu->jlu_format->lf_timestamp_field == field_name) {
1139 char time_buf[64];
1140
1141 // TODO add a timeval kind to logline_value
1142 if (jlu->jlu_line->is_time_skewed()) {
1143 struct timeval tv;
1144 struct exttm tm;
1145
1146 jlu->jlu_format->lf_date_time.scan((const char *) str, len,
1147 jlu->jlu_format->get_timestamp_formats(),
1148 &tm, tv);
1149 sql_strftime(time_buf, sizeof(time_buf), tv, 'T');
1150 }
1151 else {
1152 sql_strftime(time_buf, sizeof(time_buf),
1153 jlu->jlu_line->get_timeval(), 'T');
1154 }
1155 tmp_shared_buffer tsb(time_buf);
1156 jlu->jlu_format->jlf_line_values.emplace_back(
1157 jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_TEXT),
1158 tsb.tsb_ref);
1159 }
1160 else if (jlu->jlu_shared_buffer.contains((const char *)str)) {
1161 shared_buffer_ref sbr;
1162
1163 sbr.subset(jlu->jlu_shared_buffer,
1164 (off_t) ((const char *)str - jlu->jlu_line_value),
1165 len);
1166 if (field_name == jlu->jlu_format->elf_body_field) {
1167 jlu->jlu_format->jlf_line_values.emplace_back(
1168 jlu->jlu_format->get_value_meta(body_name, value_kind_t::VALUE_TEXT),
1169 sbr);
1170 }
1171 if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
1172 return 1;
1173 }
1174
1175 jlu->jlu_format->jlf_line_values.emplace_back(
1176 jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_TEXT), sbr);
1177 }
1178 else {
1179 tmp_shared_buffer tsb((const char *)str, len);
1180
1181 if (field_name == jlu->jlu_format->elf_body_field) {
1182 jlu->jlu_format->jlf_line_values.emplace_back(
1183 jlu->jlu_format->get_value_meta(body_name, value_kind_t::VALUE_TEXT),
1184 tsb.tsb_ref);
1185 }
1186 if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
1187 return 1;
1188 }
1189
1190 jlu->jlu_format->jlf_line_values.emplace_back(
1191 jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_TEXT),
1192 tsb.tsb_ref);
1193 }
1194
1195 return 1;
1196 }
1197
get_subline(const logline & ll,shared_buffer_ref & sbr,bool full_message)1198 void external_log_format::get_subline(const logline &ll, shared_buffer_ref &sbr, bool full_message)
1199 {
1200 if (this->elf_type == ELF_TYPE_TEXT) {
1201 return;
1202 }
1203
1204 if (this->jlf_cached_offset != ll.get_offset() ||
1205 this->jlf_cached_full != full_message) {
1206 yajlpp_parse_context &ypc = *(this->jlf_parse_context);
1207 yajl_handle handle = this->jlf_yajl_handle.get();
1208 json_log_userdata jlu(sbr);
1209
1210 this->jlf_share_manager.invalidate_refs();
1211 this->jlf_cached_line.clear();
1212 this->jlf_line_values.clear();
1213 this->jlf_line_offsets.clear();
1214 this->jlf_line_attrs.clear();
1215
1216 yajl_reset(handle);
1217 ypc.set_static_handler(json_log_rewrite_handlers.jpc_children[0]);
1218 ypc.ypc_userdata = &jlu;
1219 ypc.ypc_ignore_unused = true;
1220 ypc.ypc_alt_callbacks.yajl_start_array = json_array_start;
1221 ypc.ypc_alt_callbacks.yajl_end_array = json_array_end;
1222 ypc.ypc_alt_callbacks.yajl_start_map = json_array_start;
1223 ypc.ypc_alt_callbacks.yajl_end_map = json_array_end;
1224 jlu.jlu_format = this;
1225 jlu.jlu_line = ≪
1226 jlu.jlu_handle = handle;
1227 jlu.jlu_line_value = sbr.get_data();
1228
1229 yajl_status parse_status = yajl_parse(handle,
1230 (const unsigned char *)sbr.get_data(), sbr.length());
1231 if (parse_status != yajl_status_ok ||
1232 yajl_complete_parse(handle) != yajl_status_ok) {
1233 unsigned char* msg;
1234 string full_msg;
1235
1236 msg = yajl_get_error(handle, 1, (const unsigned char *)sbr.get_data(), sbr.length());
1237 if (msg != nullptr) {
1238 full_msg = fmt::format(
1239 "[offset: {}] {}\n{}",
1240 ll.get_offset(),
1241 fmt::string_view{sbr.get_data(), sbr.length()},
1242 msg);
1243 yajl_free_error(handle, msg);
1244 }
1245
1246 this->jlf_cached_line.resize(full_msg.size());
1247 memcpy(this->jlf_cached_line.data(), full_msg.data(), full_msg.size());
1248 this->jlf_line_values.clear();
1249 this->jlf_line_attrs.emplace_back(
1250 line_range{0, -1},
1251 &SA_INVALID,
1252 (void *) "JSON line failed to parse");
1253 } else {
1254 std::vector<logline_value>::iterator lv_iter;
1255 bool used_values[this->jlf_line_values.size()];
1256 struct line_range lr;
1257
1258 memset(used_values, 0, sizeof(used_values));
1259
1260 for (lv_iter = this->jlf_line_values.begin();
1261 lv_iter != this->jlf_line_values.end();
1262 ++lv_iter) {
1263 lv_iter->lv_meta.lvm_format = this;
1264 }
1265
1266 int sub_offset = 1 + this->jlf_line_format_init_count;
1267 for (const auto &jfe : this->jlf_line_format) {
1268 static const intern_string_t ts_field = intern_string::lookup("__timestamp__", -1);
1269 static const intern_string_t level_field = intern_string::lookup("__level__");
1270 size_t begin_size = this->jlf_cached_line.size();
1271
1272 switch (jfe.jfe_type) {
1273 case JLF_CONSTANT:
1274 this->json_append_to_cache(jfe.jfe_default_value.c_str(),
1275 jfe.jfe_default_value.size());
1276 break;
1277 case JLF_VARIABLE:
1278 lv_iter = find_if(this->jlf_line_values.begin(),
1279 this->jlf_line_values.end(),
1280 logline_value_cmp(&jfe.jfe_value));
1281 if (lv_iter != this->jlf_line_values.end()) {
1282 string str = lv_iter->to_string();
1283 size_t nl_pos = str.find('\n');
1284
1285 lr.lr_start = this->jlf_cached_line.size();
1286
1287 lv_iter->lv_meta.lvm_hidden = lv_iter->lv_meta.lvm_user_hidden;
1288 if ((int)str.size() > jfe.jfe_max_width) {
1289 switch (jfe.jfe_overflow) {
1290 case json_format_element::overflow_t::ABBREV: {
1291 this->json_append_to_cache(
1292 str.c_str(), str.size());
1293 size_t new_size = abbreviate_str(
1294 &this->jlf_cached_line[lr.lr_start],
1295 str.size(),
1296 jfe.jfe_max_width);
1297
1298 this->jlf_cached_line.resize(
1299 lr.lr_start + new_size);
1300 break;
1301 }
1302 case json_format_element::overflow_t::TRUNCATE: {
1303 this->json_append_to_cache(
1304 str.c_str(), jfe.jfe_max_width);
1305 break;
1306 }
1307 case json_format_element::overflow_t::DOTDOT: {
1308 size_t middle = (jfe.jfe_max_width / 2) - 1;
1309 this->json_append_to_cache(
1310 str.c_str(), middle);
1311 this->json_append_to_cache("..", 2);
1312 size_t rest = (jfe.jfe_max_width - middle - 2);
1313 this->json_append_to_cache(
1314 str.c_str() + str.size() - rest, rest);
1315 break;
1316 }
1317 }
1318 }
1319 else {
1320 sub_offset += count(str.begin(), str.end(), '\n');
1321 this->json_append(jfe, str.c_str(), str.size());
1322 }
1323
1324 if (nl_pos == string::npos || full_message) {
1325 lr.lr_end = this->jlf_cached_line.size();
1326 } else {
1327 lr.lr_end = lr.lr_start + nl_pos;
1328 }
1329
1330 if (lv_iter->lv_meta.lvm_name == this->lf_timestamp_field) {
1331 this->jlf_line_attrs.emplace_back(
1332 lr, &logline::L_TIMESTAMP);
1333 }
1334 else if (lv_iter->lv_meta.lvm_name == this->elf_body_field) {
1335 this->jlf_line_attrs.emplace_back(
1336 lr, &SA_BODY);
1337 }
1338 else if (lv_iter->lv_meta.lvm_name == this->elf_opid_field) {
1339 this->jlf_line_attrs.emplace_back(
1340 lr, &logline::L_OPID);
1341 }
1342 lv_iter->lv_origin = lr;
1343 used_values[distance(this->jlf_line_values.begin(),
1344 lv_iter)] = true;
1345 }
1346 else if (jfe.jfe_value == ts_field) {
1347 struct line_range lr;
1348 ssize_t ts_len;
1349 char ts[64];
1350
1351 if (jfe.jfe_ts_format.empty()) {
1352 ts_len = sql_strftime(ts, sizeof(ts),
1353 ll.get_timeval(), 'T');
1354 } else {
1355 struct exttm et;
1356
1357 ll.to_exttm(et);
1358 ts_len = ftime_fmt(ts, sizeof(ts),
1359 jfe.jfe_ts_format.c_str(),
1360 et);
1361 }
1362 lr.lr_start = this->jlf_cached_line.size();
1363 this->json_append_to_cache(ts, ts_len);
1364 lr.lr_end = this->jlf_cached_line.size();
1365 this->jlf_line_attrs.emplace_back(lr, &logline::L_TIMESTAMP);
1366
1367 lv_iter = find_if(this->jlf_line_values.begin(),
1368 this->jlf_line_values.end(),
1369 logline_value_cmp(&this->lf_timestamp_field));
1370 if (lv_iter != this->jlf_line_values.end()) {
1371 used_values[distance(this->jlf_line_values.begin(),
1372 lv_iter)] = true;
1373 }
1374 }
1375 else if (jfe.jfe_value == level_field) {
1376 this->json_append(jfe, ll.get_level_name(), -1);
1377 }
1378 else {
1379 this->json_append(jfe,
1380 jfe.jfe_default_value.c_str(),
1381 jfe.jfe_default_value.size());
1382 }
1383
1384 switch (jfe.jfe_text_transform) {
1385 case external_log_format::json_format_element::transform_t::NONE:
1386 break;
1387 case external_log_format::json_format_element::transform_t::UPPERCASE:
1388 for (size_t cindex = begin_size; cindex < this->jlf_cached_line.size(); cindex++) {
1389 this->jlf_cached_line[cindex] = toupper(this->jlf_cached_line[cindex]);
1390 }
1391 break;
1392 case external_log_format::json_format_element::transform_t::LOWERCASE:
1393 for (size_t cindex = begin_size; cindex < this->jlf_cached_line.size(); cindex++) {
1394 this->jlf_cached_line[cindex] = tolower(this->jlf_cached_line[cindex]);
1395 }
1396 break;
1397 case external_log_format::json_format_element::transform_t::CAPITALIZE:
1398 for (size_t cindex = begin_size; cindex < begin_size + 1; cindex++) {
1399 this->jlf_cached_line[cindex] = toupper(this->jlf_cached_line[cindex]);
1400 }
1401 for (size_t cindex = begin_size + 1; cindex < this->jlf_cached_line.size(); cindex++) {
1402 this->jlf_cached_line[cindex] = tolower(this->jlf_cached_line[cindex]);
1403 }
1404 break;
1405 }
1406 break;
1407 }
1408 }
1409 this->json_append_to_cache("\n", 1);
1410
1411 for (size_t lpc = 0; lpc < this->jlf_line_values.size(); lpc++) {
1412 static const intern_string_t body_name = intern_string::lookup(
1413 "body", -1);
1414 logline_value &lv = this->jlf_line_values[lpc];
1415
1416 if (lv.lv_meta.lvm_hidden || used_values[lpc] || body_name == lv.lv_meta.lvm_name) {
1417 continue;
1418 }
1419
1420 const std::string str = lv.to_string();
1421 size_t curr_pos = 0, nl_pos, line_len = -1;
1422
1423 lv.lv_sub_offset = sub_offset;
1424 lv.lv_origin.lr_start = 2 + lv.lv_meta.lvm_name.size() + 2;
1425 do {
1426 nl_pos = str.find('\n', curr_pos);
1427 if (nl_pos != std::string::npos) {
1428 line_len = nl_pos - curr_pos;
1429 }
1430 else {
1431 line_len = str.size() - curr_pos;
1432 }
1433 this->json_append_to_cache(" ", 2);
1434 this->json_append_to_cache(lv.lv_meta.lvm_name.get(),
1435 lv.lv_meta.lvm_name.size());
1436 this->json_append_to_cache(": ", 2);
1437 this->json_append_to_cache(
1438 &str.c_str()[curr_pos], line_len);
1439 this->json_append_to_cache("\n", 1);
1440 curr_pos = nl_pos + 1;
1441 sub_offset += 1;
1442 } while (nl_pos != std::string::npos &&
1443 nl_pos < str.size());
1444 }
1445
1446 }
1447
1448 this->jlf_line_offsets.push_back(0);
1449 for (size_t lpc = 0; lpc < this->jlf_cached_line.size(); lpc++) {
1450 if (this->jlf_cached_line[lpc] == '\n') {
1451 this->jlf_line_offsets.push_back(lpc + 1);
1452 }
1453 }
1454 this->jlf_line_offsets.push_back(this->jlf_cached_line.size());
1455 this->jlf_cached_offset = ll.get_offset();
1456 this->jlf_cached_full = full_message;
1457 }
1458
1459 off_t this_off = 0, next_off = 0;
1460
1461 if (!this->jlf_line_offsets.empty() && ll.get_sub_offset() < this->jlf_line_offsets.size()) {
1462 require(ll.get_sub_offset() < this->jlf_line_offsets.size());
1463
1464 this_off = this->jlf_line_offsets[ll.get_sub_offset()];
1465 if ((ll.get_sub_offset() + 1) < (int)this->jlf_line_offsets.size()) {
1466 next_off = this->jlf_line_offsets[ll.get_sub_offset() + 1];
1467 }
1468 else {
1469 next_off = this->jlf_cached_line.size();
1470 }
1471 if (next_off > 0 && this->jlf_cached_line[next_off - 1] == '\n' &&
1472 this_off != next_off) {
1473 next_off -= 1;
1474 }
1475 }
1476
1477 if (full_message) {
1478 sbr.share(this->jlf_share_manager,
1479 &this->jlf_cached_line[0],
1480 this->jlf_cached_line.size());
1481 }
1482 else {
1483 sbr.share(this->jlf_share_manager,
1484 this->jlf_cached_line.data() + this_off,
1485 next_off - this_off);
1486 }
1487 }
1488
build(std::vector<std::string> & errors)1489 void external_log_format::build(std::vector<std::string> &errors) {
1490 if (!this->lf_timestamp_field.empty()) {
1491 auto &vd = this->elf_value_defs[this->lf_timestamp_field];
1492 if (vd.get() == nullptr) {
1493 vd = make_shared<external_log_format::value_def>(
1494 this->lf_timestamp_field,
1495 value_kind_t::VALUE_TEXT,
1496 -1,
1497 this);
1498 }
1499 vd->vd_meta.lvm_name = this->lf_timestamp_field;
1500 vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1501 vd->vd_internal = true;
1502 }
1503 if (!this->elf_level_field.empty() && this->elf_value_defs.
1504 find(this->elf_level_field) == this->elf_value_defs.end()) {
1505 auto &vd = this->elf_value_defs[this->elf_level_field];
1506 if (vd.get() == nullptr) {
1507 vd = make_shared<external_log_format::value_def>(
1508 this->elf_level_field,
1509 value_kind_t::VALUE_TEXT,
1510 -1,
1511 this);
1512 }
1513 vd->vd_meta.lvm_name = this->elf_level_field;
1514 vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1515 vd->vd_internal = true;
1516 }
1517 if (!this->elf_body_field.empty()) {
1518 auto &vd = this->elf_value_defs[this->elf_body_field];
1519 if (vd.get() == nullptr) {
1520 vd = make_shared<external_log_format::value_def>(
1521 this->elf_body_field,
1522 value_kind_t::VALUE_TEXT,
1523 -1,
1524 this);
1525 }
1526 vd->vd_meta.lvm_name = this->elf_body_field;
1527 vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1528 vd->vd_internal = true;
1529 }
1530
1531 if (!this->lf_timestamp_format.empty()) {
1532 this->lf_timestamp_format.push_back(nullptr);
1533 }
1534 try {
1535 this->elf_filename_pcre =
1536 std::make_shared<pcrepp>(this->elf_file_pattern);
1537 }
1538 catch (const pcrepp::error &e) {
1539 errors.push_back("error:" +
1540 this->elf_name.to_string() + ".file-pattern:" +
1541 e.what());
1542 }
1543 for (auto iter = this->elf_patterns.begin();
1544 iter != this->elf_patterns.end();
1545 ++iter) {
1546 pattern &pat = *iter->second;
1547
1548 if (pat.p_module_format) {
1549 this->elf_has_module_format = true;
1550 }
1551
1552 try {
1553 pat.p_pcre = std::make_unique<pcrepp>(pat.p_string, PCRE_DOTALL);
1554 }
1555 catch (const pcrepp::error &e) {
1556 errors.push_back("error:" +
1557 this->elf_name.to_string() + ".regex[" +
1558 iter->first + "]" +
1559 ":" +
1560 e.what());
1561 errors.push_back("error:" +
1562 this->elf_name.to_string() + ".regex[" +
1563 iter->first + "]" +
1564 ":" + pat.p_string);
1565 errors.push_back("error:" +
1566 this->elf_name.to_string() + ".regex[" +
1567 iter->first + "]" +
1568 ":" + string(e.e_offset, ' ') +
1569 "^");
1570 continue;
1571 }
1572 for (pcre_named_capture::iterator name_iter = pat.p_pcre->named_begin();
1573 name_iter != pat.p_pcre->named_end();
1574 ++name_iter) {
1575 const intern_string_t name = intern_string::lookup(
1576 name_iter->pnc_name, -1);
1577
1578 if (name == this->lf_timestamp_field) {
1579 pat.p_timestamp_field_index = name_iter->index();
1580 }
1581 if (name == this->elf_level_field) {
1582 pat.p_level_field_index = name_iter->index();
1583 }
1584 if (name == this->elf_module_id_field) {
1585 pat.p_module_field_index = name_iter->index();
1586 }
1587 if (name == this->elf_opid_field) {
1588 pat.p_opid_field_index = name_iter->index();
1589 }
1590 if (name == this->elf_body_field) {
1591 pat.p_body_field_index = name_iter->index();
1592 }
1593
1594 auto value_iter = this->elf_value_defs.find(name);
1595 if (value_iter != this->elf_value_defs.end()) {
1596 auto vd = value_iter->second;
1597 indexed_value_def ivd;
1598
1599 ivd.ivd_index = name_iter->index();
1600 if (!vd->vd_unit_field.empty()) {
1601 ivd.ivd_unit_field_index = pat.p_pcre->name_index(
1602 vd->vd_unit_field.get());
1603 }
1604 else {
1605 ivd.ivd_unit_field_index = -1;
1606 }
1607 if (!vd->vd_internal && vd->vd_meta.lvm_column == -1) {
1608 vd->vd_meta.lvm_column = this->elf_column_count++;
1609 }
1610 ivd.ivd_value_def = vd;
1611 pat.p_value_by_index.push_back(ivd);
1612 }
1613 }
1614
1615 stable_sort(pat.p_value_by_index.begin(), pat.p_value_by_index.end());
1616
1617 for (int lpc = 0; lpc < (int)pat.p_value_by_index.size(); lpc++) {
1618 auto &ivd = pat.p_value_by_index[lpc];
1619 auto vd = ivd.ivd_value_def;
1620
1621 if (!vd->vd_foreign_key && !vd->vd_meta.lvm_identifier) {
1622 switch (vd->vd_meta.lvm_kind) {
1623 case value_kind_t::VALUE_INTEGER:
1624 case value_kind_t::VALUE_FLOAT:
1625 pat.p_numeric_value_indexes.push_back(lpc);
1626 break;
1627 default:
1628 break;
1629 }
1630 }
1631 }
1632
1633 if (!this->elf_level_field.empty() && pat.p_level_field_index == -1) {
1634 log_warning("%s:level field '%s' not found in pattern",
1635 pat.p_config_path.c_str(),
1636 this->elf_level_field.get());
1637 }
1638 if (!this->elf_module_id_field.empty() &&
1639 pat.p_module_field_index == -1) {
1640 log_warning("%s:module field '%s' not found in pattern",
1641 pat.p_config_path.c_str(),
1642 this->elf_module_id_field.get());
1643 }
1644 if (!this->elf_body_field.empty() && pat.p_body_field_index == -1) {
1645 log_warning("%s:body field '%s' not found in pattern",
1646 pat.p_config_path.c_str(),
1647 this->elf_body_field.get());
1648 }
1649
1650 this->elf_pattern_order.push_back(iter->second);
1651 }
1652
1653 if (this->elf_type != ELF_TYPE_TEXT) {
1654 if (!this->elf_patterns.empty()) {
1655 errors.push_back("error:" +
1656 this->elf_name.to_string() +
1657 ": structured logs cannot have regexes");
1658 }
1659 if (this->elf_type == ELF_TYPE_JSON) {
1660 this->jlf_parse_context = std::make_shared<yajlpp_parse_context>(
1661 this->elf_name.to_string());
1662 this->jlf_yajl_handle.reset(
1663 yajl_alloc(&this->jlf_parse_context->ypc_callbacks,
1664 nullptr,
1665 this->jlf_parse_context.get()),
1666 yajl_handle_deleter());
1667 yajl_config(this->jlf_yajl_handle.get(), yajl_dont_validate_strings,
1668 1);
1669 }
1670
1671 }
1672 else {
1673 if (this->elf_patterns.empty()) {
1674 errors.push_back("error:" +
1675 this->elf_name.to_string() +
1676 ": no regexes specified for format");
1677 }
1678 }
1679
1680 for (auto &elf_level_pattern : this->elf_level_patterns) {
1681 try {
1682 elf_level_pattern.second.lp_pcre = std::make_shared<pcrepp>(
1683 elf_level_pattern.second.lp_regex.c_str());
1684 }
1685 catch (const pcrepp::error &e) {
1686 errors.push_back("error:" +
1687 this->elf_name.to_string() + ".level:" + e.what());
1688 }
1689 }
1690
1691 stable_sort(this->elf_level_pairs.begin(), this->elf_level_pairs.end());
1692
1693 for (auto &vd : this->elf_value_def_order) {
1694 std::vector<std::string>::iterator act_iter;
1695
1696 if (!vd->vd_internal &&
1697 vd->vd_meta.lvm_column == -1) {
1698 vd->vd_meta.lvm_column = this->elf_column_count++;
1699 }
1700
1701 if (vd->vd_meta.lvm_kind == value_kind_t::VALUE_UNKNOWN) {
1702 vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1703 }
1704
1705 for (act_iter = vd->vd_action_list.begin();
1706 act_iter != vd->vd_action_list.end();
1707 ++act_iter) {
1708 if (this->lf_action_defs.find(*act_iter) ==
1709 this->lf_action_defs.end()) {
1710 errors.push_back("error:" +
1711 this->elf_name.to_string() + ":" +
1712 vd->vd_meta.lvm_name.get() +
1713 ": cannot find action -- " + (*act_iter));
1714 }
1715 }
1716 }
1717
1718 if (this->elf_type == ELF_TYPE_TEXT && this->elf_samples.empty()) {
1719 errors.push_back("error:" +
1720 this->elf_name.to_string() +
1721 ":no sample logs provided, all formats must have samples");
1722 }
1723
1724 for (auto &elf_sample : this->elf_samples) {
1725 pcre_context_static<128> pc;
1726 pcre_input pi(elf_sample.s_line);
1727 bool found = false;
1728
1729 for (auto pat_iter = this->elf_pattern_order.begin();
1730 pat_iter != this->elf_pattern_order.end() && !found;
1731 ++pat_iter) {
1732 pattern &pat = *(*pat_iter);
1733
1734 if (!pat.p_pcre) {
1735 continue;
1736 }
1737
1738 if (!pat.p_module_format &&
1739 pat.p_pcre->name_index(this->lf_timestamp_field.to_string()) <
1740 0) {
1741 errors.push_back("error:" +
1742 this->elf_name.to_string() +
1743 ":timestamp field '" +
1744 this->lf_timestamp_field.get() +
1745 "' not found in pattern -- " +
1746 pat.p_string);
1747 continue;
1748 }
1749
1750 if (pat.p_pcre->match(pc, pi)) {
1751 if (pat.p_module_format) {
1752 found = true;
1753 continue;
1754 }
1755 pcre_context::capture_t *ts_cap =
1756 pc[this->lf_timestamp_field.get()];
1757 pcre_context::capture_t *level_cap = pc[pat.p_level_field_index];
1758 const char *ts = pi.get_substr_start(ts_cap);
1759 ssize_t ts_len = pc[this->lf_timestamp_field.get()]->length();
1760 const char *const *custom_formats = this->get_timestamp_formats();
1761 date_time_scanner dts;
1762 struct timeval tv;
1763 struct exttm tm;
1764
1765 if (ts_cap->c_begin == 0) {
1766 pat.p_timestamp_end = ts_cap->c_end;
1767 }
1768 found = true;
1769 if (ts_len == -1 ||
1770 dts.scan(ts, ts_len, custom_formats, &tm, tv) == NULL) {
1771 errors.push_back("error:" +
1772 this->elf_name.to_string() +
1773 ":invalid sample -- " +
1774 elf_sample.s_line);
1775 errors.push_back("error:" +
1776 this->elf_name.to_string() +
1777 ":unrecognized timestamp format -- " + ts);
1778
1779 if (custom_formats == NULL) {
1780 for (int lpc = 0;
1781 PTIMEC_FORMATS[lpc].pf_fmt != NULL; lpc++) {
1782 off_t off = 0;
1783
1784 PTIMEC_FORMATS[lpc].pf_func(&tm, ts, off, ts_len);
1785 errors.push_back(" format: " +
1786 string(
1787 PTIMEC_FORMATS[lpc].pf_fmt) +
1788 "; matched: " + string(ts, off));
1789 }
1790 }
1791 else {
1792 for (int lpc = 0; custom_formats[lpc] != NULL; lpc++) {
1793 off_t off = 0;
1794
1795 ptime_fmt(custom_formats[lpc], &tm, ts, off,
1796 ts_len);
1797 errors.push_back(" format: " +
1798 string(custom_formats[lpc]) +
1799 "; matched: " + string(ts, off));
1800 }
1801 }
1802 }
1803
1804 log_level_t level = this->convert_level(pi, level_cap);
1805
1806 if (elf_sample.s_level != LEVEL_UNKNOWN) {
1807 if (elf_sample.s_level != level) {
1808 errors.push_back("error:" +
1809 this->elf_name.to_string() +
1810 ":invalid sample -- " +
1811 elf_sample.s_line);
1812 errors.push_back("error:" +
1813 this->elf_name.to_string() +
1814 ":parsed level '" +
1815 level_names[level] +
1816 "' does not match expected level of '" +
1817 level_names[elf_sample.s_level] +
1818 "'");
1819 }
1820 }
1821 }
1822 }
1823
1824 if (!found) {
1825 errors.push_back("error:" +
1826 this->elf_name.to_string() +
1827 ":invalid sample -- " +
1828 elf_sample.s_line);
1829
1830 for (auto pat_iter = this->elf_pattern_order.begin();
1831 pat_iter != this->elf_pattern_order.end();
1832 ++pat_iter) {
1833 pattern &pat = *(*pat_iter);
1834
1835 if (!pat.p_pcre) {
1836 continue;
1837 }
1838
1839 size_t partial_len = pat.p_pcre->match_partial(pi);
1840
1841 if (partial_len > 0) {
1842 errors.push_back("error:" +
1843 this->elf_name.to_string() +
1844 ":partial sample matched -- " +
1845 elf_sample.s_line.substr(0, partial_len));
1846 errors.push_back("error: against pattern " +
1847 (*pat_iter)->p_config_path +
1848 " -- " +
1849 (*pat_iter)->p_string);
1850 }
1851 else {
1852 errors.push_back("error:" +
1853 this->elf_name.to_string() +
1854 ":no partial match found");
1855 }
1856 }
1857 }
1858 }
1859
1860 for (auto &elf_value_def : this->elf_value_defs) {
1861 if (elf_value_def.second->vd_foreign_key || elf_value_def.second->vd_meta.lvm_identifier) {
1862 continue;
1863 }
1864
1865 switch (elf_value_def.second->vd_meta.lvm_kind) {
1866 case value_kind_t::VALUE_INTEGER:
1867 case value_kind_t::VALUE_FLOAT:
1868 elf_value_def.second->vd_values_index = this->elf_numeric_value_defs.size();
1869 this->elf_numeric_value_defs.push_back(elf_value_def.second);
1870 break;
1871 default:
1872 break;
1873 }
1874 }
1875
1876 this->lf_value_stats.resize(this->elf_numeric_value_defs.size());
1877
1878 int format_index = 0;
1879 for (auto iter = this->jlf_line_format.begin();
1880 iter != this->jlf_line_format.end();
1881 ++iter, format_index++) {
1882 static const intern_string_t ts = intern_string::lookup("__timestamp__");
1883 static const intern_string_t level_field = intern_string::lookup("__level__");
1884 json_format_element &jfe = *iter;
1885
1886 if (startswith(jfe.jfe_value.get(), "/")) {
1887 jfe.jfe_value = intern_string::lookup(jfe.jfe_value.get() + 1);
1888 }
1889 if (!jfe.jfe_ts_format.empty()) {
1890 if (!jfe.jfe_value.empty() && jfe.jfe_value != ts) {
1891 log_warning("%s:line-format[%d]:ignoring field '%s' since "
1892 "timestamp-format was used",
1893 this->elf_name.get(), format_index,
1894 jfe.jfe_value.get());
1895 }
1896 jfe.jfe_value = ts;
1897 }
1898
1899 switch (jfe.jfe_type) {
1900 case JLF_VARIABLE: {
1901 auto vd_iter = this->elf_value_defs.find(jfe.jfe_value);
1902 if (jfe.jfe_value == ts) {
1903 this->elf_value_defs[this->lf_timestamp_field]->vd_meta.lvm_hidden = true;
1904 } else if (jfe.jfe_value == level_field) {
1905 this->elf_value_defs[this->elf_level_field]->vd_meta.lvm_hidden = true;
1906 } else if (vd_iter == this->elf_value_defs.end()) {
1907 char index_str[32];
1908
1909 snprintf(index_str, sizeof(index_str), "%d", format_index);
1910 errors.push_back("error:" +
1911 this->elf_name.to_string() +
1912 ":line-format[" +
1913 index_str +
1914 "]:line format variable is not defined -- " +
1915 jfe.jfe_value.to_string());
1916 }
1917 break;
1918 }
1919 case JLF_CONSTANT:
1920 this->jlf_line_format_init_count +=
1921 std::count(jfe.jfe_default_value.begin(),
1922 jfe.jfe_default_value.end(),
1923 '\n');
1924 break;
1925 default:
1926 break;
1927 }
1928 }
1929
1930 for (auto &hd_pair : this->elf_highlighter_patterns) {
1931 external_log_format::highlighter_def &hd = hd_pair.second;
1932 const std::string &pattern = hd.hd_pattern;
1933 const char *errptr;
1934 auto fg = styling::color_unit::make_empty();
1935 auto bg = styling::color_unit::make_empty();
1936 int eoff, attrs = 0;
1937
1938 if (!hd.hd_color.empty()) {
1939 fg = styling::color_unit::from_str(hd.hd_color)
1940 .unwrapOrElse([&](const auto& msg) {
1941 errors.push_back("error:"
1942 + this->elf_name.to_string()
1943 + ":highlighters/"
1944 + hd_pair.first.to_string()
1945 + "/color:"
1946 + msg);
1947 return styling::color_unit::make_empty();
1948 });
1949 }
1950
1951 if (!hd.hd_background_color.empty()) {
1952 bg = styling::color_unit::from_str(hd.hd_background_color)
1953 .unwrapOrElse([&](const auto& msg) {
1954 errors.push_back("error:"
1955 + this->elf_name.to_string()
1956 + ":highlighters/"
1957 + hd_pair.first.to_string()
1958 + "/color:"
1959 + msg);
1960 return styling::color_unit::make_empty();
1961 });
1962 }
1963
1964 if (hd.hd_underline) {
1965 attrs |= A_UNDERLINE;
1966 }
1967 if (hd.hd_blink) {
1968 attrs |= A_BLINK;
1969 }
1970
1971 pcre *code = pcre_compile(pattern.c_str(),
1972 PCRE_CASELESS,
1973 &errptr,
1974 &eoff,
1975 nullptr);
1976
1977 if (code == nullptr) {
1978 errors.push_back("error:"
1979 + this->elf_name.to_string()
1980 + ":highlighters/"
1981 + hd_pair.first.to_string()
1982 + ":"
1983 + string(errptr));
1984 errors.push_back("error:"
1985 + this->elf_name.to_string()
1986 + ":highlighters/"
1987 + hd_pair.first.to_string()
1988 + ":"
1989 + pattern);
1990 errors.push_back("error:"
1991 + this->elf_name.to_string()
1992 + ":highlighters/"
1993 + hd_pair.first.to_string()
1994 + ":"
1995 + string(eoff, ' ')
1996 + "^");
1997 } else {
1998 this->lf_highlighters.emplace_back(code);
1999 this->lf_highlighters.back()
2000 .with_pattern(pattern)
2001 .with_format_name(this->elf_name)
2002 .with_color(fg, bg)
2003 .with_attrs(attrs);
2004 }
2005 }
2006 }
2007
register_vtabs(log_vtab_manager * vtab_manager,std::vector<std::string> & errors)2008 void external_log_format::register_vtabs(log_vtab_manager *vtab_manager,
2009 std::vector<std::string> &errors)
2010 {
2011 vector<pair<intern_string_t, string>>::iterator search_iter;
2012 for (search_iter = this->elf_search_tables.begin();
2013 search_iter != this->elf_search_tables.end();
2014 ++search_iter) {
2015 auto re_res = pcrepp::from_str(search_iter->second,
2016 log_search_table::pattern_options());
2017
2018 if (re_res.isErr()) {
2019 errors.push_back(fmt::format(
2020 "error:{}:{}:unable to compile regex '{}': {}",
2021 this->elf_name.get(),
2022 search_iter->first.get(),
2023 search_iter->second,
2024 re_res.unwrapErr().ce_msg));
2025 continue;
2026 }
2027
2028 auto lst = std::make_shared<log_search_table>(
2029 re_res.unwrap(), search_iter->first);
2030 string errmsg;
2031
2032 errmsg = vtab_manager->register_vtab(lst);
2033 if (!errmsg.empty()) {
2034 errors.push_back(
2035 "error:" +
2036 this->elf_name.to_string() +
2037 ":" +
2038 search_iter->first.to_string() +
2039 ":unable to register table -- " +
2040 errmsg);
2041 }
2042 }
2043 }
2044
match_samples(const vector<sample> & samples) const2045 bool external_log_format::match_samples(const vector<sample> &samples) const
2046 {
2047 for (const auto &sample_iter : samples) {
2048 for (const auto &pat_iter : this->elf_pattern_order) {
2049 pattern &pat = *pat_iter;
2050
2051 if (!pat.p_pcre) {
2052 continue;
2053 }
2054
2055 pcre_context_static<128> pc;
2056 pcre_input pi(sample_iter.s_line);
2057
2058 if (pat.p_pcre->match(pc, pi)) {
2059 return true;
2060 }
2061 }
2062 }
2063
2064 return false;
2065 }
2066
2067 class external_log_table : public log_format_vtab_impl {
2068 public:
external_log_table(const external_log_format & elf)2069 external_log_table(const external_log_format &elf) :
2070 log_format_vtab_impl(elf), elt_format(elf) {
2071 };
2072
get_columns(vector<vtab_column> & cols) const2073 void get_columns(vector<vtab_column> &cols) const {
2074 const external_log_format &elf = this->elt_format;
2075
2076 cols.resize(elf.elf_column_count);
2077 for (const auto &vd : elf.elf_value_def_order) {
2078 auto type_pair = log_vtab_impl::logline_value_to_sqlite_type(vd->vd_meta.lvm_kind);
2079
2080 if (vd->vd_meta.lvm_column == -1) {
2081 continue;
2082 }
2083
2084 require(0 <= vd->vd_meta.lvm_column && vd->vd_meta.lvm_column < elf.elf_column_count);
2085
2086 cols[vd->vd_meta.lvm_column].vc_name = vd->vd_meta.lvm_name.get();
2087 cols[vd->vd_meta.lvm_column].vc_type = type_pair.first;
2088 cols[vd->vd_meta.lvm_column].vc_subtype = type_pair.second;
2089 cols[vd->vd_meta.lvm_column].vc_collator = vd->vd_collate;
2090 cols[vd->vd_meta.lvm_column].vc_comment = vd->vd_description;
2091 }
2092 };
2093
get_foreign_keys(std::vector<std::string> & keys_inout) const2094 void get_foreign_keys(std::vector<std::string> &keys_inout) const
2095 {
2096 log_vtab_impl::get_foreign_keys(keys_inout);
2097
2098 for (const auto &elf_value_def : this->elt_format.elf_value_defs) {
2099 if (elf_value_def.second->vd_foreign_key) {
2100 keys_inout.emplace_back(elf_value_def.first.to_string());
2101 }
2102 }
2103 };
2104
next(log_cursor & lc,logfile_sub_source & lss)2105 virtual bool next(log_cursor &lc, logfile_sub_source &lss)
2106 {
2107 lc.lc_curr_line = lc.lc_curr_line + 1_vl;
2108 lc.lc_sub_index = 0;
2109
2110 if (lc.is_eof()) {
2111 return true;
2112 }
2113
2114 content_line_t cl(lss.at(lc.lc_curr_line));
2115 auto lf = lss.find_file_ptr(cl);
2116 auto lf_iter = lf->begin() + cl;
2117 uint8_t mod_id = lf_iter->get_module_id();
2118
2119 if (lf_iter->is_continued()) {
2120 return false;
2121 }
2122
2123 this->elt_module_format.mf_mod_format = nullptr;
2124 if (lf->get_format_name() == this->lfvi_format.get_name()) {
2125 return true;
2126 } else if (mod_id && mod_id == this->lfvi_format.lf_mod_index) {
2127 auto format = lf->get_format();
2128
2129 return lf->read_line(lf_iter).map([this, format, cl](auto line) {
2130 std::vector<logline_value> values;
2131 shared_buffer_ref body_ref;
2132 struct line_range mod_name_range;
2133 intern_string_t mod_name;
2134
2135 this->vi_attrs.clear();
2136 format->annotate(cl, line, this->vi_attrs, values, false);
2137 this->elt_container_body = find_string_attr_range(this->vi_attrs, &SA_BODY);
2138 if (!this->elt_container_body.is_valid()) {
2139 return false;
2140 }
2141 this->elt_container_body.ltrim(line.get_data());
2142 body_ref.subset(line,
2143 this->elt_container_body.lr_start,
2144 this->elt_container_body.length());
2145 mod_name_range = find_string_attr_range(this->vi_attrs,
2146 &logline::L_MODULE);
2147 if (!mod_name_range.is_valid()) {
2148 return false;
2149 }
2150 mod_name = intern_string::lookup(
2151 &line.get_data()[mod_name_range.lr_start],
2152 mod_name_range.length());
2153 this->vi_attrs.clear();
2154 this->elt_module_format = external_log_format::MODULE_FORMATS[mod_name];
2155 if (!this->elt_module_format.mf_mod_format) {
2156 return false;
2157 }
2158 return this->elt_module_format.mf_mod_format->get_name() ==
2159 this->lfvi_format.get_name();
2160 }).unwrapOr(false);
2161 }
2162
2163 return false;
2164 };
2165
extract(shared_ptr<logfile> lf,uint64_t line_number,shared_buffer_ref & line,std::vector<logline_value> & values)2166 virtual void extract(shared_ptr<logfile> lf,
2167 uint64_t line_number,
2168 shared_buffer_ref &line,
2169 std::vector<logline_value> &values)
2170 {
2171 auto format = lf->get_format();
2172
2173 if (this->elt_module_format.mf_mod_format != nullptr) {
2174 shared_buffer_ref body_ref;
2175
2176 body_ref.subset(line, this->elt_container_body.lr_start,
2177 this->elt_container_body.length());
2178 this->vi_attrs.clear();
2179 values.clear();
2180 this->elt_module_format.mf_mod_format->annotate(line_number,
2181 body_ref,
2182 this->vi_attrs,
2183 values,
2184 false);
2185 }
2186 else {
2187 this->vi_attrs.clear();
2188 format->annotate(line_number, line, this->vi_attrs, values, false);
2189 }
2190 };
2191
2192 const external_log_format &elt_format;
2193 module_format elt_module_format;
2194 struct line_range elt_container_body;
2195 };
2196
get_vtab_impl() const2197 std::shared_ptr<log_vtab_impl> external_log_format::get_vtab_impl() const
2198 {
2199 return std::make_shared<external_log_table>(*this);
2200 }
2201
specialized(int fmt_lock)2202 std::shared_ptr<log_format> external_log_format::specialized(int fmt_lock)
2203 {
2204 auto retval = std::make_shared<external_log_format>(*this);
2205
2206 retval->lf_specialized = true;
2207 this->lf_pattern_locks.clear();
2208 if (fmt_lock != -1) {
2209 retval->lf_pattern_locks.emplace_back(0, fmt_lock);
2210 }
2211
2212 if (this->elf_type == ELF_TYPE_JSON) {
2213 this->jlf_parse_context = std::make_shared<yajlpp_parse_context>(this->elf_name.to_string());
2214 this->jlf_yajl_handle.reset(
2215 yajl_alloc(&this->jlf_parse_context->ypc_callbacks,
2216 nullptr,
2217 this->jlf_parse_context.get()),
2218 yajl_handle_deleter());
2219 yajl_config(this->jlf_yajl_handle.get(), yajl_dont_validate_strings, 1);
2220 this->jlf_cached_line.reserve(16 * 1024);
2221 }
2222
2223 this->lf_value_stats.clear();
2224 this->lf_value_stats.resize(this->elf_numeric_value_defs.size());
2225
2226 return retval;
2227 }
2228
match_name(const string & filename)2229 bool external_log_format::match_name(const string &filename)
2230 {
2231 if (this->elf_file_pattern.empty()) {
2232 return true;
2233 }
2234
2235 pcre_context_static<10> pc;
2236 pcre_input pi(filename);
2237
2238 return this->elf_filename_pcre->match(pc, pi);
2239 }
2240
pattern_index_for_line(uint64_t line_number) const2241 int log_format::pattern_index_for_line(uint64_t line_number) const
2242 {
2243 auto iter = lower_bound(this->lf_pattern_locks.cbegin(),
2244 this->lf_pattern_locks.cend(),
2245 line_number,
2246 [](const pattern_for_lines &pfl, uint32_t line) {
2247 return pfl.pfl_line < line;
2248 });
2249
2250 if (iter == this->lf_pattern_locks.end() ||
2251 iter->pfl_line != line_number) {
2252 --iter;
2253 }
2254
2255 return iter->pfl_pat_index;
2256 }
2257
get_pattern_name(uint64_t line_number) const2258 std::string log_format::get_pattern_name(uint64_t line_number) const
2259 {
2260 int pat_index = this->pattern_index_for_line(line_number);
2261 return fmt::format("builtin ({})", pat_index);
2262 }
2263
pattern_for_lines(uint32_t pfl_line,uint32_t pfl_pat_index)2264 log_format::pattern_for_lines::pattern_for_lines(
2265 uint32_t pfl_line, uint32_t pfl_pat_index) :
2266 pfl_line(pfl_line), pfl_pat_index(pfl_pat_index)
2267 {
2268 }
2269
2270 /* XXX */
2271 #include "log_format_impls.cc"
2272