1 /**
2  * Copyright (c) 2007-2015, Timothy Stack
3  *
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  * * Redistributions of source code must retain the above copyright notice, this
10  * list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  * * Neither the name of Timothy Stack nor the names of its contributors
15  * may be used to endorse or promote products derived from this software
16  * without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
25  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "config.h"
31 
32 #include <stdio.h>
33 #include <stdarg.h>
34 #include <string.h>
35 
36 #include <memory>
37 
38 #include "base/string_util.hh"
39 #include "fmt/format.h"
40 #include "yajlpp/yajlpp.hh"
41 #include "yajlpp/yajlpp_def.hh"
42 #include "sql_util.hh"
43 #include "log_format_ext.hh"
44 #include "log_vtab_impl.hh"
45 #include "ptimec.hh"
46 #include "log_search_table.hh"
47 #include "command_executor.hh"
48 #include "lnav_util.hh"
49 
50 using namespace std;
51 
52 static auto intern_lifetime = intern_string::get_table_lifetime();
53 string_attr_type logline::L_PREFIX("prefix");
54 string_attr_type logline::L_TIMESTAMP("timestamp");
55 string_attr_type logline::L_FILE("file");
56 string_attr_type logline::L_PARTITION("partition");
57 string_attr_type logline::L_MODULE("module");
58 string_attr_type logline::L_OPID("opid");
59 string_attr_type logline::L_META("meta");
60 
61 external_log_format::mod_map_t external_log_format::MODULE_FORMATS;
62 std::vector<std::shared_ptr<external_log_format>> external_log_format::GRAPH_ORDERED_FORMATS;
63 
origin_in_full_msg(const char * msg,ssize_t len) const64 struct line_range logline_value::origin_in_full_msg(const char *msg, ssize_t len) const
65 {
66     if (this->lv_sub_offset == 0) {
67         return this->lv_origin;
68     }
69 
70     if (len == -1) {
71         len = strlen(msg);
72     }
73 
74     struct line_range retval = this->lv_origin;
75     const char *last = msg, *msg_end = msg + len;
76 
77     for (int lpc = 0; lpc < this->lv_sub_offset; lpc++) {
78         const auto *next = (const char *) memchr(last, '\n', msg_end - last);
79         require(next != NULL);
80 
81         next += 1;
82         int amount = (next - last);
83 
84         retval.lr_start += amount;
85         if (retval.lr_end != -1) {
86             retval.lr_end += amount;
87         }
88 
89         last = next + 1;
90     }
91 
92     if (retval.lr_end == -1) {
93         const auto *eol = (const char *) memchr(last, '\n', msg_end - last);
94 
95         if (eol == nullptr) {
96             retval.lr_end = len;
97         } else {
98             retval.lr_end = eol - msg;
99         }
100     }
101 
102     return retval;
103 }
104 
logline_value(logline_value_meta lvm,shared_buffer_ref & sbr,struct line_range origin)105 logline_value::logline_value(logline_value_meta lvm, shared_buffer_ref &sbr,
106                              struct line_range origin)
107     : lv_meta(std::move(lvm)), lv_origin(origin)
108 {
109     if (sbr.get_data() == nullptr) {
110         this->lv_meta.lvm_kind = value_kind_t::VALUE_NULL;
111     }
112 
113     switch (this->lv_meta.lvm_kind) {
114         case value_kind_t::VALUE_JSON:
115         case value_kind_t::VALUE_XML:
116         case value_kind_t::VALUE_STRUCT:
117         case value_kind_t::VALUE_TEXT:
118         case value_kind_t::VALUE_QUOTED:
119         case value_kind_t::VALUE_W3C_QUOTED:
120         case value_kind_t::VALUE_TIMESTAMP:
121             this->lv_sbr.subset(sbr, origin.lr_start, origin.length());
122             break;
123 
124         case value_kind_t::VALUE_NULL:
125             break;
126 
127         case value_kind_t::VALUE_INTEGER:
128             strtonum(this->lv_value.i, sbr.get_data_at(
129                 origin.lr_start), origin.length());
130             break;
131 
132         case value_kind_t::VALUE_FLOAT: {
133             ssize_t len = origin.length();
134             char scan_value[len + 1];
135 
136             memcpy(scan_value, sbr.get_data_at(origin.lr_start), len);
137             scan_value[len] = '\0';
138             this->lv_value.d = strtod(scan_value, nullptr);
139             break;
140         }
141 
142         case value_kind_t::VALUE_BOOLEAN:
143             if (strncmp(sbr.get_data_at(origin.lr_start), "true", origin.length()) == 0 ||
144                 strncmp(sbr.get_data_at(origin.lr_start), "yes", origin.length()) == 0) {
145                 this->lv_value.i = 1;
146             }
147             else {
148                 this->lv_value.i = 0;
149             }
150             break;
151 
152         case value_kind_t::VALUE_UNKNOWN:
153         case value_kind_t::VALUE__MAX:
154             ensure(0);
155             break;
156     }
157 }
158 
to_string() const159 std::string logline_value::to_string() const
160 {
161     char buffer[128];
162 
163     switch (this->lv_meta.lvm_kind) {
164         case value_kind_t::VALUE_NULL:
165             return "null";
166 
167         case value_kind_t::VALUE_JSON:
168         case value_kind_t::VALUE_XML:
169         case value_kind_t::VALUE_STRUCT:
170         case value_kind_t::VALUE_TEXT:
171         case value_kind_t::VALUE_TIMESTAMP:
172             if (this->lv_sbr.empty()) {
173                 return this->lv_intern_string.to_string();
174             }
175             return std::string(this->lv_sbr.get_data(), this->lv_sbr.length());
176 
177         case value_kind_t::VALUE_QUOTED:
178         case value_kind_t::VALUE_W3C_QUOTED:
179             if (this->lv_sbr.length() == 0) {
180                 return "";
181             } else {
182                 switch (this->lv_sbr.get_data()[0]) {
183                     case '\'':
184                     case '"': {
185                         auto unquote_func = this->lv_meta.lvm_kind == value_kind_t::VALUE_W3C_QUOTED ?
186                             unquote_w3c : unquote;
187                         char unquoted_str[this->lv_sbr.length()];
188                         size_t unquoted_len;
189 
190                         unquoted_len = unquote_func(unquoted_str,
191                                                     this->lv_sbr.get_data(),
192                                                     this->lv_sbr.length());
193                         return std::string(unquoted_str, unquoted_len);
194                     }
195                     default:
196                         return std::string(this->lv_sbr.get_data(), this->lv_sbr.length());
197                 }
198             }
199             break;
200 
201         case value_kind_t::VALUE_INTEGER:
202             snprintf(buffer, sizeof(buffer), "%" PRId64, this->lv_value.i);
203             break;
204 
205         case value_kind_t::VALUE_FLOAT:
206             snprintf(buffer, sizeof(buffer), "%lf", this->lv_value.d);
207             break;
208 
209         case value_kind_t::VALUE_BOOLEAN:
210             if (this->lv_value.i) {
211                 return "true";
212             }
213             else {
214                 return "false";
215             }
216             break;
217         case value_kind_t::VALUE_UNKNOWN:
218         case value_kind_t::VALUE__MAX:
219             ensure(0);
220             break;
221     }
222 
223     return std::string(buffer);
224 }
225 
226 vector<std::shared_ptr<log_format>> log_format::lf_root_formats;
227 
get_root_formats()228 vector<std::shared_ptr<log_format>> &log_format::get_root_formats()
229 {
230     return lf_root_formats;
231 }
232 
next_format(const std::vector<std::shared_ptr<external_log_format::pattern>> & patterns,int & index,int & locked_index)233 static bool next_format(const std::vector<std::shared_ptr<external_log_format::pattern>> &patterns,
234                         int &index,
235                         int &locked_index)
236 {
237     bool retval = true;
238 
239     if (locked_index == -1) {
240         index += 1;
241         if (index >= (int)patterns.size()) {
242             retval = false;
243         }
244     }
245     else if (index == locked_index) {
246         retval = false;
247     }
248     else {
249         index = locked_index;
250     }
251 
252     return retval;
253 }
254 
next_format(pcre_format * fmt,int & index,int & locked_index)255 bool log_format::next_format(pcre_format *fmt, int &index, int &locked_index)
256 {
257     bool retval = true;
258 
259     if (locked_index == -1) {
260         index += 1;
261         if (fmt[index].name == NULL) {
262             retval = false;
263         }
264     }
265     else if (index == locked_index) {
266         retval = false;
267     }
268     else {
269         index = locked_index;
270     }
271 
272     return retval;
273 }
274 
log_scanf(uint32_t line_number,const char * line,size_t len,pcre_format * fmt,const char * time_fmt[],struct exttm * tm_out,struct timeval * tv_out,...)275 const char *log_format::log_scanf(uint32_t line_number,
276                                   const char *line,
277                                   size_t len,
278                                   pcre_format *fmt,
279                                   const char *time_fmt[],
280                                   struct exttm *tm_out,
281                                   struct timeval *tv_out,
282                                   ...)
283 {
284     int curr_fmt = -1;
285     const char *retval = NULL;
286     bool done = false;
287     pcre_input pi(line, 0, len);
288     pcre_context_static<128> pc;
289     va_list args;
290     int pat_index = this->last_pattern_index();
291 
292     while (!done && next_format(fmt, curr_fmt, pat_index)) {
293         va_start(args, tv_out);
294 
295         pi.reset(line, 0, len);
296         if (!fmt[curr_fmt].pcre.match(pc, pi, PCRE_NO_UTF8_CHECK)) {
297             retval = NULL;
298         }
299         else {
300             pcre_context::capture_t *ts = pc[fmt[curr_fmt].pf_timestamp_index];
301 
302             for (auto &iter : pc) {
303                 pcre_context::capture_t *cap = va_arg(
304                         args, pcre_context::capture_t *);
305 
306                 *cap = iter;
307             }
308 
309             retval = this->lf_date_time.scan(
310                     pi.get_substr_start(ts), ts->length(), NULL, tm_out, *tv_out);
311 
312             if (retval) {
313                 if (curr_fmt != pat_index) {
314                     uint32_t lock_line;
315 
316                     if (this->lf_pattern_locks.empty()) {
317                         lock_line = 0;
318                     } else {
319                         lock_line = line_number;
320                     }
321 
322                     this->lf_pattern_locks.emplace_back(lock_line, curr_fmt);
323                 }
324                 this->lf_timestamp_flags = tm_out->et_flags;
325                 done = true;
326             }
327         }
328 
329         va_end(args);
330     }
331 
332     return retval;
333 }
334 
check_for_new_year(std::vector<logline> & dst,exttm etm,struct timeval log_tv)335 void log_format::check_for_new_year(std::vector<logline> &dst, exttm etm,
336                                     struct timeval log_tv)
337 {
338     if (dst.empty()) {
339         return;
340     }
341 
342     time_t diff = dst.back().get_time() - log_tv.tv_sec;
343     int off_year = 0, off_month = 0, off_day = 0, off_hour = 0;
344     bool do_change = true;
345 
346     if (diff <= 0) {
347         return;
348     }
349     if ((etm.et_flags & ETF_MONTH_SET) && diff >= (24 * 60 * 60)) {
350         off_year = 1;
351     } else if (diff >= (24 * 60 * 60)) {
352         off_month = 1;
353     } else if (!(etm.et_flags & ETF_DAY_SET) && (diff >= (60 * 60))) {
354         off_day = 1;
355     } else if (!(etm.et_flags & ETF_DAY_SET)) {
356         off_hour = 1;
357     } else {
358         do_change = false;
359     }
360 
361     if (!do_change) {
362         return;
363     }
364     log_debug("%d:detected time rollover; offsets=%d %d %d %d", dst.size(),
365               off_year, off_month, off_day, off_hour);
366     for (auto &ll : dst) {
367         time_t     ot = ll.get_time();
368         struct tm otm;
369 
370         gmtime_r(&ot, &otm);
371         otm.tm_year -= off_year;
372         otm.tm_mon  -= off_month;
373         otm.tm_mday -= off_day;
374         otm.tm_hour -= off_hour;
375         auto new_time = tm2sec(&otm);
376         if (new_time == -1) {
377             continue;
378         }
379         ll.set_time(new_time);
380     }
381 }
382 
383 /*
384  * XXX This needs some cleanup.
385  */
386 struct json_log_userdata {
json_log_userdatajson_log_userdata387     json_log_userdata(shared_buffer_ref &sbr)
388             : jlu_format(NULL), jlu_line(NULL), jlu_base_line(NULL),
389               jlu_sub_line_count(1), jlu_handle(NULL), jlu_line_value(NULL),
390               jlu_line_size(0), jlu_sub_start(0), jlu_shared_buffer(sbr) {
391 
392     };
393 
394     external_log_format *jlu_format;
395     const logline *jlu_line;
396     logline *jlu_base_line;
397     int jlu_sub_line_count;
398     yajl_handle jlu_handle;
399     const char *jlu_line_value;
400     size_t jlu_line_size;
401     size_t jlu_sub_start;
402     shared_buffer_ref &jlu_shared_buffer;
403 };
404 
405 static int read_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len);
406 
read_json_null(yajlpp_parse_context * ypc)407 static int read_json_null(yajlpp_parse_context *ypc)
408 {
409     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
410     const intern_string_t field_name = ypc->get_path();
411 
412     jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
413         field_name, ypc->is_level(1));
414 
415     return 1;
416 }
417 
read_json_bool(yajlpp_parse_context * ypc,int val)418 static int read_json_bool(yajlpp_parse_context *ypc, int val)
419 {
420     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
421     const intern_string_t field_name = ypc->get_path();
422 
423     jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
424         field_name, ypc->is_level(1));
425 
426     return 1;
427 }
428 
read_json_int(yajlpp_parse_context * ypc,long long val)429 static int read_json_int(yajlpp_parse_context *ypc, long long val)
430 {
431     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
432     const intern_string_t field_name = ypc->get_path();
433 
434     if (jlu->jlu_format->lf_timestamp_field == field_name) {
435         long long divisor = jlu->jlu_format->elf_timestamp_divisor;
436         struct timeval tv;
437 
438         tv.tv_sec = val / divisor;
439         tv.tv_usec = (val % divisor) * (1000000.0 / divisor);
440         jlu->jlu_base_line->set_time(tv);
441     }
442     else if (jlu->jlu_format->elf_level_field == field_name) {
443         if (jlu->jlu_format->elf_level_pairs.empty()) {
444             char level_buf[128];
445 
446             snprintf(level_buf, sizeof(level_buf), "%lld", val);
447 
448             pcre_input pi(level_buf);
449             pcre_context::capture_t level_cap = {0, (int) strlen(level_buf)};
450 
451             jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(pi, &level_cap));
452         } else {
453             vector<pair<int64_t, log_level_t> >::iterator iter;
454 
455             for (iter = jlu->jlu_format->elf_level_pairs.begin();
456                  iter != jlu->jlu_format->elf_level_pairs.end();
457                  ++iter) {
458                 if (iter->first == val) {
459                     jlu->jlu_base_line->set_level(iter->second);
460                     break;
461                 }
462             }
463         }
464     }
465 
466     jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
467         field_name, ypc->is_level(1));
468 
469     return 1;
470 }
471 
read_json_double(yajlpp_parse_context * ypc,double val)472 static int read_json_double(yajlpp_parse_context *ypc, double val)
473 {
474     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
475     const intern_string_t field_name = ypc->get_path();
476 
477     if (jlu->jlu_format->lf_timestamp_field == field_name) {
478         double divisor = jlu->jlu_format->elf_timestamp_divisor;
479         struct timeval tv;
480 
481         tv.tv_sec = val / divisor;
482         tv.tv_usec = fmod(val, divisor) * (1000000.0 / divisor);
483         jlu->jlu_base_line->set_time(tv);
484     }
485 
486     jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
487         field_name, ypc->is_level(1));
488 
489     return 1;
490 }
491 
json_array_start(void * ctx)492 static int json_array_start(void *ctx)
493 {
494     yajlpp_parse_context *ypc = (yajlpp_parse_context *)ctx;
495     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
496 
497     if (ypc->ypc_path_index_stack.size() == 2) {
498         const intern_string_t field_name = ypc->get_path_fragment_i(0);
499 
500         jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(field_name, true);
501         jlu->jlu_sub_start = yajl_get_bytes_consumed(jlu->jlu_handle) - 1;
502     }
503 
504     return 1;
505 }
506 
json_array_end(void * ctx)507 static int json_array_end(void *ctx)
508 {
509     yajlpp_parse_context *ypc = (yajlpp_parse_context *)ctx;
510     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
511 
512     if (ypc->ypc_path_index_stack.size() == 1) {
513         const intern_string_t field_name = ypc->get_path_fragment_i(0);
514         size_t sub_end = yajl_get_bytes_consumed(jlu->jlu_handle);
515         shared_buffer_ref sbr;
516 
517         sbr.subset(jlu->jlu_shared_buffer, jlu->jlu_sub_start,
518             sub_end - jlu->jlu_sub_start);
519         jlu->jlu_format->jlf_line_values.emplace_back(jlu->jlu_format->
520             get_value_meta(field_name, value_kind_t::VALUE_JSON), sbr);
521     }
522 
523     return 1;
524 }
525 
526 static struct json_path_container json_log_handlers = {
527     json_path_handler(pcrepp("\\w+"))
528         .add_cb(read_json_null)
529         .add_cb(read_json_bool)
530         .add_cb(read_json_int)
531         .add_cb(read_json_double)
532         .add_cb(read_json_field)
533 };
534 
535 static int rewrite_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len);
536 
rewrite_json_null(yajlpp_parse_context * ypc)537 static int rewrite_json_null(yajlpp_parse_context *ypc)
538 {
539     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
540     const intern_string_t field_name = ypc->get_path();
541 
542     if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
543         return 1;
544     }
545     jlu->jlu_format->jlf_line_values.emplace_back(jlu->jlu_format->
546         get_value_meta(field_name, value_kind_t::VALUE_NULL));
547 
548     return 1;
549 }
550 
rewrite_json_bool(yajlpp_parse_context * ypc,int val)551 static int rewrite_json_bool(yajlpp_parse_context *ypc, int val)
552 {
553     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
554     const intern_string_t field_name = ypc->get_path();
555 
556     if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
557         return 1;
558     }
559     jlu->jlu_format->jlf_line_values.emplace_back(
560         jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_BOOLEAN),
561         (bool) val);
562     return 1;
563 }
564 
rewrite_json_int(yajlpp_parse_context * ypc,long long val)565 static int rewrite_json_int(yajlpp_parse_context *ypc, long long val)
566 {
567     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
568     const intern_string_t field_name = ypc->get_path();
569 
570     if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
571         return 1;
572     }
573     jlu->jlu_format->jlf_line_values.emplace_back(
574         jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_INTEGER),
575         (int64_t) val);
576     return 1;
577 }
578 
rewrite_json_double(yajlpp_parse_context * ypc,double val)579 static int rewrite_json_double(yajlpp_parse_context *ypc, double val)
580 {
581     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
582     const intern_string_t field_name = ypc->get_path();
583 
584     if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
585         return 1;
586     }
587     jlu->jlu_format->jlf_line_values.emplace_back(
588         jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_FLOAT),
589         val);
590 
591     return 1;
592 }
593 
594 static struct json_path_container json_log_rewrite_handlers = {
595     json_path_handler(pcrepp("\\w+"))
596         .add_cb(rewrite_json_null)
597         .add_cb(rewrite_json_bool)
598         .add_cb(rewrite_json_int)
599         .add_cb(rewrite_json_double)
600         .add_cb(rewrite_json_field)
601 };
602 
scan_for_partial(shared_buffer_ref & sbr,size_t & len_out) const603 bool external_log_format::scan_for_partial(shared_buffer_ref &sbr, size_t &len_out) const
604 {
605     if (this->elf_type != ELF_TYPE_TEXT) {
606         return false;
607     }
608 
609     auto pat = this->elf_pattern_order[this->last_pattern_index()];
610     pcre_input pi(sbr.get_data(), 0, sbr.length());
611 
612     if (!this->elf_multiline) {
613         len_out = pat->p_pcre->match_partial(pi);
614         return true;
615     }
616 
617     if (pat->p_timestamp_end == -1 || pat->p_timestamp_end > (int)sbr.length()) {
618         len_out = 0;
619         return false;
620     }
621 
622     len_out = pat->p_pcre->match_partial(pi);
623     return (int)len_out > pat->p_timestamp_end;
624 }
625 
scan(logfile & lf,std::vector<logline> & dst,const line_info & li,shared_buffer_ref & sbr)626 log_format::scan_result_t external_log_format::scan(logfile &lf,
627                                                     std::vector<logline> &dst,
628                                                     const line_info &li,
629                                                     shared_buffer_ref &sbr)
630 {
631     if (this->elf_type == ELF_TYPE_JSON) {
632         yajlpp_parse_context &ypc = *(this->jlf_parse_context);
633         logline ll(li.li_file_range.fr_offset, 0, 0, LEVEL_INFO);
634         yajl_handle handle = this->jlf_yajl_handle.get();
635         json_log_userdata jlu(sbr);
636 
637         if (!this->lf_specialized && dst.size() >= 3) {
638             return log_format::SCAN_NO_MATCH;
639         }
640 
641         if (li.li_partial) {
642             log_debug("skipping partial line at offset %d", li.li_file_range.fr_offset);
643             return log_format::SCAN_INCOMPLETE;
644         }
645 
646         const auto *line_data = (const unsigned char *) sbr.get_data();
647 
648         yajl_reset(handle);
649         ypc.set_static_handler(json_log_handlers.jpc_children[0]);
650         ypc.ypc_userdata = &jlu;
651         ypc.ypc_ignore_unused = true;
652         ypc.ypc_alt_callbacks.yajl_start_array = json_array_start;
653         ypc.ypc_alt_callbacks.yajl_start_map = json_array_start;
654         ypc.ypc_alt_callbacks.yajl_end_array = nullptr;
655         ypc.ypc_alt_callbacks.yajl_end_map = nullptr;
656         jlu.jlu_format = this;
657         jlu.jlu_base_line = &ll;
658         jlu.jlu_line_value = sbr.get_data();
659         jlu.jlu_line_size = sbr.length();
660         jlu.jlu_handle = handle;
661         if (yajl_parse(handle, line_data, sbr.length()) == yajl_status_ok &&
662             yajl_complete_parse(handle) == yajl_status_ok) {
663             if (ll.get_time() == 0) {
664                 return log_format::SCAN_NO_MATCH;
665             }
666 
667             jlu.jlu_sub_line_count += this->jlf_line_format_init_count;
668             for (int lpc = 0; lpc < jlu.jlu_sub_line_count; lpc++) {
669                 ll.set_sub_offset(lpc);
670                 if (lpc > 0) {
671                     ll.set_level((log_level_t) (ll.get_level_and_flags() |
672                         LEVEL_CONTINUED));
673                 }
674                 dst.emplace_back(ll);
675             }
676         }
677         else {
678             unsigned char *msg;
679             int line_count = 2;
680 
681             msg = yajl_get_error(handle, 1, (const unsigned char *)sbr.get_data(), sbr.length());
682             if (msg != nullptr) {
683                 log_debug("Unable to parse line at offset %d: %s", li.li_file_range.fr_offset, msg);
684                 line_count = count(msg, msg + strlen((char *) msg), '\n') + 1;
685                 yajl_free_error(handle, msg);
686             }
687             if (!this->lf_specialized) {
688                 return log_format::SCAN_NO_MATCH;
689             }
690             for (int lpc = 0; lpc < line_count; lpc++) {
691                 log_level_t level = LEVEL_INVALID;
692 
693                 ll.set_time(dst.back().get_time());
694                 if (lpc > 0) {
695                     level = (log_level_t) (level | LEVEL_CONTINUED);
696                 }
697                 ll.set_level(level);
698                 ll.set_sub_offset(lpc);
699                 dst.emplace_back(ll);
700             }
701         }
702 
703         return log_format::SCAN_MATCH;
704     }
705 
706     pcre_input pi(sbr.get_data(), 0, sbr.length());
707     pcre_context_static<128> pc;
708     int curr_fmt = -1, orig_lock = this->last_pattern_index();
709     int pat_index = orig_lock;
710 
711     while (::next_format(this->elf_pattern_order, curr_fmt, pat_index)) {
712         auto fpat = this->elf_pattern_order[curr_fmt];
713         auto& pat = fpat->p_pcre;
714 
715         if (fpat->p_module_format) {
716             continue;
717         }
718 
719         if (!pat->match(pc, pi, PCRE_NO_UTF8_CHECK)) {
720             if (!this->lf_pattern_locks.empty() && pat_index != -1) {
721                 curr_fmt = -1;
722                 pat_index = -1;
723             }
724             continue;
725         }
726 
727         pcre_context::capture_t *ts = pc[fpat->p_timestamp_field_index];
728         pcre_context::capture_t *level_cap = pc[fpat->p_level_field_index];
729         pcre_context::capture_t *mod_cap = pc[fpat->p_module_field_index];
730         pcre_context::capture_t *opid_cap = pc[fpat->p_opid_field_index];
731         pcre_context::capture_t *body_cap = pc[fpat->p_body_field_index];
732         const char *ts_str = pi.get_substr_start(ts);
733         const char *last;
734         struct exttm log_time_tm;
735         struct timeval log_tv;
736         uint8_t mod_index = 0, opid = 0;
737 
738         if ((last = this->lf_date_time.scan(ts_str,
739                                             ts->length(),
740                                             this->get_timestamp_formats(),
741                                             &log_time_tm,
742                                             log_tv)) == nullptr) {
743             this->lf_date_time.unlock();
744             if ((last = this->lf_date_time.scan(ts_str,
745                                                 ts->length(),
746                                                 this->get_timestamp_formats(),
747                                                 &log_time_tm,
748                                                 log_tv)) == nullptr) {
749                 continue;
750             }
751         }
752 
753         log_level_t level = this->convert_level(pi, level_cap);
754 
755         this->lf_timestamp_flags = log_time_tm.et_flags;
756 
757         if (!((log_time_tm.et_flags & ETF_DAY_SET) &&
758               (log_time_tm.et_flags & ETF_MONTH_SET) &&
759               (log_time_tm.et_flags & ETF_YEAR_SET))) {
760             this->check_for_new_year(dst, log_time_tm, log_tv);
761         }
762 
763         if (opid_cap != nullptr) {
764             opid = hash_str(pi.get_substr_start(opid_cap), opid_cap->length());
765         }
766 
767         if (mod_cap != nullptr) {
768             intern_string_t mod_name = intern_string::lookup(
769                     pi.get_substr_start(mod_cap), mod_cap->length());
770             auto mod_iter = MODULE_FORMATS.find(mod_name);
771 
772             if (mod_iter == MODULE_FORMATS.end()) {
773                 mod_index = module_scan(pi, body_cap, mod_name);
774                 mod_iter = MODULE_FORMATS.find(mod_name);
775             }
776             else if (mod_iter->second.mf_mod_format) {
777                 mod_index = mod_iter->second.mf_mod_format->lf_mod_index;
778             }
779 
780             if (mod_index && level_cap && body_cap) {
781                 auto mod_elf = dynamic_pointer_cast<external_log_format>(
782                     mod_iter->second.mf_mod_format);
783 
784                 if (mod_elf) {
785                     pcre_context_static<128> mod_pc;
786                     shared_buffer_ref body_ref;
787 
788                     body_cap->ltrim(sbr.get_data());
789 
790                     pcre_input mod_pi(pi.get_substr_start(body_cap),
791                                       0,
792                                       body_cap->length());
793                     int mod_pat_index = mod_elf->last_pattern_index();
794                     pattern &mod_pat = *mod_elf->elf_pattern_order[mod_pat_index];
795 
796                     if (mod_pat.p_pcre->match(mod_pc, mod_pi)) {
797                         auto mod_level_cap = mod_pc[mod_pat.p_level_field_index];
798 
799                         level = mod_elf->convert_level(mod_pi, mod_level_cap);
800                     }
801                 }
802             }
803         }
804 
805         for (auto value_index : fpat->p_numeric_value_indexes) {
806             const indexed_value_def &ivd = fpat->p_value_by_index[value_index];
807             const value_def &vd = *ivd.ivd_value_def;
808             pcre_context::capture_t *num_cap = pc[ivd.ivd_index];
809 
810             if (num_cap != nullptr && num_cap->is_valid()) {
811                 const struct scaling_factor *scaling = nullptr;
812 
813                 if (ivd.ivd_unit_field_index >= 0) {
814                     pcre_context::iterator unit_cap = pc[ivd.ivd_unit_field_index];
815 
816                     if (unit_cap != nullptr && unit_cap->is_valid()) {
817                         intern_string_t unit_val = intern_string::lookup(
818                             pi.get_substr_start(unit_cap), unit_cap->length());
819                         std::map<const intern_string_t, scaling_factor>::const_iterator unit_iter;
820 
821                         unit_iter = vd.vd_unit_scaling.find(unit_val);
822                         if (unit_iter != vd.vd_unit_scaling.end()) {
823                             const struct scaling_factor &sf = unit_iter->second;
824 
825                             scaling = &sf;
826                         }
827                     }
828                 }
829 
830                 const char *num_cap_start = pi.get_substr_start(num_cap);
831                 const char *num_cap_end = num_cap_start + num_cap->length();
832                 double dvalue = strtod(num_cap_start, (char **) &num_cap_end);
833 
834                 if (num_cap_end == num_cap_start + num_cap->length()) {
835                     if (scaling != nullptr) {
836                         scaling->scale(dvalue);
837                     }
838                     this->lf_value_stats[vd.vd_values_index].add_value(dvalue);
839                 }
840             }
841         }
842 
843         dst.emplace_back(li.li_file_range.fr_offset, log_tv, level, mod_index, opid);
844 
845         if (orig_lock != curr_fmt) {
846             uint32_t lock_line;
847 
848             log_debug("%zu: changing pattern lock %d -> %d",
849                       dst.size() - 1, orig_lock, curr_fmt);
850             if (this->lf_pattern_locks.empty()) {
851                 lock_line = 0;
852             } else {
853                 lock_line = dst.size() - 1;
854             }
855             this->lf_pattern_locks.emplace_back(lock_line, curr_fmt);
856         }
857         return log_format::SCAN_MATCH;
858     }
859 
860     if (this->lf_specialized && !this->elf_multiline) {
861         auto& last_line = dst.back();
862 
863         dst.emplace_back(li.li_file_range.fr_offset,
864                          last_line.get_timeval(),
865                          log_level_t::LEVEL_INVALID);
866 
867         return log_format::SCAN_MATCH;
868     }
869 
870     return log_format::SCAN_NO_MATCH;
871 }
872 
module_scan(const pcre_input & pi,pcre_context::capture_t * body_cap,const intern_string_t & mod_name)873 uint8_t external_log_format::module_scan(const pcre_input &pi,
874                                          pcre_context::capture_t *body_cap,
875                                          const intern_string_t &mod_name)
876 {
877     uint8_t mod_index;
878     body_cap->ltrim(pi.get_string());
879     pcre_input body_pi(pi.get_substr_start(body_cap), 0, body_cap->length());
880     auto& ext_fmts = GRAPH_ORDERED_FORMATS;
881     pcre_context_static<128> pc;
882     module_format mf;
883 
884     for (auto& elf : ext_fmts) {
885         int curr_fmt = -1, fmt_lock = -1;
886 
887         while (::next_format(elf->elf_pattern_order, curr_fmt, fmt_lock)) {
888             auto fpat = elf->elf_pattern_order[curr_fmt];
889             auto& pat = fpat->p_pcre;
890 
891             if (!fpat->p_module_format) {
892                 continue;
893             }
894 
895             if (!pat->match(pc, body_pi)) {
896                 continue;
897             }
898 
899             log_debug("%s:module format found -- %s (%d)",
900                       mod_name.get(),
901                       elf->get_name().get(),
902                       elf->lf_mod_index);
903 
904             mod_index = elf->lf_mod_index;
905             mf.mf_mod_format = elf->specialized(curr_fmt);
906             MODULE_FORMATS[mod_name] = mf;
907 
908             return mod_index;
909         }
910     }
911 
912     MODULE_FORMATS[mod_name] = mf;
913 
914     return 0;
915 }
916 
annotate(uint64_t line_number,shared_buffer_ref & line,string_attrs_t & sa,std::vector<logline_value> & values,bool annotate_module) const917 void external_log_format::annotate(uint64_t line_number, shared_buffer_ref &line, string_attrs_t &sa,
918                                    std::vector<logline_value> &values, bool annotate_module) const
919 {
920     pcre_context_static<128> pc;
921     pcre_input pi(line.get_data(), 0, line.length());
922     struct line_range lr;
923     pcre_context::capture_t *cap, *body_cap, *module_cap = nullptr;
924 
925     if (this->elf_type != ELF_TYPE_TEXT) {
926         values = this->jlf_line_values;
927         sa = this->jlf_line_attrs;
928         return;
929     }
930 
931     if (line.empty()) {
932         return;
933     }
934 
935     int pat_index = this->pattern_index_for_line(line_number);
936     pattern &pat = *this->elf_pattern_order[pat_index];
937 
938     if (!pat.p_pcre->match(pc, pi, PCRE_NO_UTF8_CHECK)) {
939         // A continued line still needs a body.
940         lr.lr_start = 0;
941         lr.lr_end = line.length();
942         sa.emplace_back(lr, &SA_BODY);
943         if (!this->elf_multiline) {
944             auto len = pat.p_pcre->match_partial(pi);
945             sa.emplace_back(line_range{(int) len, -1},
946                             &SA_INVALID,
947                             (void *) "Log line does not match any pattern");
948         }
949         return;
950     }
951 
952     if (!pat.p_module_format) {
953         cap = pc[pat.p_timestamp_field_index];
954         if (cap->is_valid()) {
955             lr.lr_start = cap->c_begin;
956             lr.lr_end = cap->c_end;
957             sa.emplace_back(lr, &logline::L_TIMESTAMP);
958         }
959 
960         if (pat.p_module_field_index != -1) {
961             module_cap = pc[pat.p_module_field_index];
962             if (module_cap != nullptr && module_cap->is_valid()) {
963                 lr.lr_start = module_cap->c_begin;
964                 lr.lr_end = module_cap->c_end;
965                 sa.emplace_back(lr, &logline::L_MODULE);
966             }
967         }
968 
969         cap = pc[pat.p_opid_field_index];
970         if (cap != nullptr && cap->is_valid()) {
971             lr.lr_start = cap->c_begin;
972             lr.lr_end = cap->c_end;
973             sa.emplace_back(lr, &logline::L_OPID);
974         }
975     }
976 
977     body_cap = pc[pat.p_body_field_index];
978 
979     for (size_t lpc = 0; lpc < pat.p_value_by_index.size(); lpc++) {
980         const indexed_value_def &ivd = pat.p_value_by_index[lpc];
981         const struct scaling_factor *scaling = nullptr;
982         pcre_context::capture_t *cap = pc[ivd.ivd_index];
983         const value_def &vd = *ivd.ivd_value_def;
984 
985         if (ivd.ivd_unit_field_index >= 0) {
986             pcre_context::iterator unit_cap = pc[ivd.ivd_unit_field_index];
987 
988             if (unit_cap != nullptr && unit_cap->c_begin != -1) {
989                 intern_string_t unit_val = intern_string::lookup(
990                     pi.get_substr_start(unit_cap), unit_cap->length());
991                 auto unit_iter = vd.vd_unit_scaling.find(unit_val);
992                 if (unit_iter != vd.vd_unit_scaling.end()) {
993                     const struct scaling_factor &sf = unit_iter->second;
994 
995                     scaling = &sf;
996                 }
997             }
998         }
999 
1000         if (cap->is_valid()) {
1001             values.emplace_back(vd.vd_meta,
1002                                 line,
1003                                 line_range{cap->c_begin, cap->c_end});
1004             values.back().apply_scaling(scaling);
1005         } else {
1006             values.emplace_back(vd.vd_meta);
1007         }
1008         if (pat.p_module_format) {
1009             values.back().lv_meta.lvm_from_module = true;
1010         }
1011     }
1012 
1013     bool did_mod_annotate_body = false;
1014     if (annotate_module && module_cap != nullptr && body_cap != nullptr &&
1015             body_cap->is_valid()) {
1016         intern_string_t mod_name = intern_string::lookup(
1017                 pi.get_substr_start(module_cap), module_cap->length());
1018         auto mod_iter = MODULE_FORMATS.find(mod_name);
1019 
1020         if (mod_iter != MODULE_FORMATS.end() &&
1021             mod_iter->second.mf_mod_format != nullptr) {
1022             module_format &mf = mod_iter->second;
1023             shared_buffer_ref body_ref;
1024 
1025             body_cap->ltrim(line.get_data());
1026             body_ref.subset(line, body_cap->c_begin, body_cap->length());
1027 
1028             auto pre_mod_values_size = values.size();
1029             auto pre_mod_sa_size = sa.size();
1030             mf.mf_mod_format->annotate(line_number, body_ref, sa, values, false);
1031             for (size_t lpc = pre_mod_values_size; lpc < values.size(); lpc++) {
1032                 values[lpc].lv_origin.shift(0, body_cap->c_begin);
1033             }
1034             for (size_t lpc = pre_mod_sa_size; lpc < sa.size(); lpc++) {
1035                 sa[lpc].sa_range.shift(0, body_cap->c_begin);
1036             }
1037             did_mod_annotate_body = true;
1038         }
1039     }
1040     if (!did_mod_annotate_body) {
1041         if (body_cap != nullptr && body_cap->is_valid()) {
1042             lr.lr_start = body_cap->c_begin;
1043             lr.lr_end = body_cap->c_end;
1044         }
1045         else {
1046             lr.lr_start = line.length();
1047             lr.lr_end = line.length();
1048         }
1049         sa.emplace_back(lr, &SA_BODY);
1050     }
1051 }
1052 
rewrite(exec_context & ec,shared_buffer_ref & line,string_attrs_t & sa,string & value_out)1053 void external_log_format::rewrite(exec_context &ec,
1054                                   shared_buffer_ref &line,
1055                                   string_attrs_t &sa,
1056                                   string &value_out)
1057 {
1058     vector<logline_value>::iterator shift_iter;
1059     auto &values = *ec.ec_line_values;
1060 
1061     value_out.assign(line.get_data(), line.length());
1062 
1063     for (auto iter = values.begin(); iter != values.end(); ++iter) {
1064         if (!iter->lv_origin.is_valid()) {
1065             log_debug("not rewriting value with invalid origin -- %s", iter->lv_meta.lvm_name.get());
1066             continue;
1067         }
1068 
1069         auto vd_iter = this->elf_value_defs.find(iter->lv_meta.lvm_name);
1070         if (vd_iter == this->elf_value_defs.end()) {
1071             log_debug("not rewriting undefined value -- %s", iter->lv_meta.lvm_name.get());
1072             continue;
1073         }
1074 
1075         const auto &vd = *vd_iter->second;
1076 
1077         if (vd.vd_rewriter.empty()) {
1078             continue;
1079         }
1080 
1081         auto _sg = ec.enter_source(this->elf_name.to_string() +
1082                                    ":" +
1083                                    vd_iter->first.to_string(),
1084                                    1);
1085         auto field_value = execute_any(ec, vd.vd_rewriter)
1086             .orElse(err_to_ok).unwrap();
1087         struct line_range adj_origin = iter->origin_in_full_msg(
1088             value_out.c_str(), value_out.length());
1089 
1090         value_out.erase(adj_origin.lr_start, adj_origin.length());
1091 
1092         int32_t shift_amount = field_value.length() - adj_origin.length();
1093         value_out.insert(adj_origin.lr_start, field_value);
1094         for (shift_iter = values.begin();
1095              shift_iter != values.end(); ++shift_iter) {
1096             shift_iter->lv_origin.shift(adj_origin.lr_start, shift_amount);
1097         }
1098         shift_string_attrs(sa, adj_origin.lr_start, shift_amount);
1099     }
1100 }
1101 
read_json_field(yajlpp_parse_context * ypc,const unsigned char * str,size_t len)1102 static int read_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len)
1103 {
1104     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
1105     const intern_string_t field_name = ypc->get_path();
1106     struct exttm tm_out;
1107     struct timeval tv_out;
1108 
1109     if (jlu->jlu_format->lf_timestamp_field == field_name) {
1110         jlu->jlu_format->lf_date_time.scan((const char *)str, len, jlu->jlu_format->get_timestamp_formats(), &tm_out, tv_out);
1111         // Leave off the machine oriented flag since we convert it anyhow
1112         jlu->jlu_format->lf_timestamp_flags = tm_out.et_flags & ~ETF_MACHINE_ORIENTED;
1113         jlu->jlu_base_line->set_time(tv_out);
1114     }
1115     else if (jlu->jlu_format->elf_level_field == field_name) {
1116         pcre_input pi((const char *) str, 0, len);
1117         pcre_context::capture_t level_cap = {0, (int) len};
1118 
1119         jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(pi, &level_cap));
1120     }
1121     else if (jlu->jlu_format->elf_opid_field == field_name) {
1122         uint8_t opid = hash_str((const char *) str, len);
1123         jlu->jlu_base_line->set_opid(opid);
1124     }
1125 
1126     jlu->jlu_sub_line_count += jlu->jlu_format->value_line_count(
1127         field_name, ypc->is_level(1), str, len);
1128 
1129     return 1;
1130 }
1131 
rewrite_json_field(yajlpp_parse_context * ypc,const unsigned char * str,size_t len)1132 static int rewrite_json_field(yajlpp_parse_context *ypc, const unsigned char *str, size_t len)
1133 {
1134     static const intern_string_t body_name = intern_string::lookup("body", -1);
1135     json_log_userdata *jlu = (json_log_userdata *)ypc->ypc_userdata;
1136     const intern_string_t field_name = ypc->get_path();
1137 
1138     if (jlu->jlu_format->lf_timestamp_field == field_name) {
1139         char time_buf[64];
1140 
1141         // TODO add a timeval kind to logline_value
1142         if (jlu->jlu_line->is_time_skewed()) {
1143             struct timeval tv;
1144             struct exttm tm;
1145 
1146             jlu->jlu_format->lf_date_time.scan((const char *) str, len,
1147                                                jlu->jlu_format->get_timestamp_formats(),
1148                                                &tm, tv);
1149             sql_strftime(time_buf, sizeof(time_buf), tv, 'T');
1150         }
1151         else {
1152             sql_strftime(time_buf, sizeof(time_buf),
1153                          jlu->jlu_line->get_timeval(), 'T');
1154         }
1155         tmp_shared_buffer tsb(time_buf);
1156         jlu->jlu_format->jlf_line_values.emplace_back(
1157             jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_TEXT),
1158             tsb.tsb_ref);
1159     }
1160     else if (jlu->jlu_shared_buffer.contains((const char *)str)) {
1161         shared_buffer_ref sbr;
1162 
1163         sbr.subset(jlu->jlu_shared_buffer,
1164                 (off_t) ((const char *)str - jlu->jlu_line_value),
1165                 len);
1166         if (field_name == jlu->jlu_format->elf_body_field) {
1167             jlu->jlu_format->jlf_line_values.emplace_back(
1168                 jlu->jlu_format->get_value_meta(body_name, value_kind_t::VALUE_TEXT),
1169                 sbr);
1170         }
1171         if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
1172             return 1;
1173         }
1174 
1175         jlu->jlu_format->jlf_line_values.emplace_back(
1176             jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_TEXT), sbr);
1177     }
1178     else {
1179         tmp_shared_buffer tsb((const char *)str, len);
1180 
1181         if (field_name == jlu->jlu_format->elf_body_field) {
1182             jlu->jlu_format->jlf_line_values.emplace_back(
1183                 jlu->jlu_format->get_value_meta(body_name, value_kind_t::VALUE_TEXT),
1184                 tsb.tsb_ref);
1185         }
1186         if (!ypc->is_level(1) && !jlu->jlu_format->has_value_def(field_name)) {
1187             return 1;
1188         }
1189 
1190         jlu->jlu_format->jlf_line_values.emplace_back(
1191             jlu->jlu_format->get_value_meta(field_name, value_kind_t::VALUE_TEXT),
1192             tsb.tsb_ref);
1193     }
1194 
1195     return 1;
1196 }
1197 
get_subline(const logline & ll,shared_buffer_ref & sbr,bool full_message)1198 void external_log_format::get_subline(const logline &ll, shared_buffer_ref &sbr, bool full_message)
1199 {
1200     if (this->elf_type == ELF_TYPE_TEXT) {
1201         return;
1202     }
1203 
1204     if (this->jlf_cached_offset != ll.get_offset() ||
1205         this->jlf_cached_full != full_message) {
1206         yajlpp_parse_context &ypc = *(this->jlf_parse_context);
1207         yajl_handle handle = this->jlf_yajl_handle.get();
1208         json_log_userdata jlu(sbr);
1209 
1210         this->jlf_share_manager.invalidate_refs();
1211         this->jlf_cached_line.clear();
1212         this->jlf_line_values.clear();
1213         this->jlf_line_offsets.clear();
1214         this->jlf_line_attrs.clear();
1215 
1216         yajl_reset(handle);
1217         ypc.set_static_handler(json_log_rewrite_handlers.jpc_children[0]);
1218         ypc.ypc_userdata = &jlu;
1219         ypc.ypc_ignore_unused = true;
1220         ypc.ypc_alt_callbacks.yajl_start_array = json_array_start;
1221         ypc.ypc_alt_callbacks.yajl_end_array = json_array_end;
1222         ypc.ypc_alt_callbacks.yajl_start_map = json_array_start;
1223         ypc.ypc_alt_callbacks.yajl_end_map = json_array_end;
1224         jlu.jlu_format = this;
1225         jlu.jlu_line = &ll;
1226         jlu.jlu_handle = handle;
1227         jlu.jlu_line_value = sbr.get_data();
1228 
1229         yajl_status parse_status = yajl_parse(handle,
1230             (const unsigned char *)sbr.get_data(), sbr.length());
1231         if (parse_status != yajl_status_ok ||
1232             yajl_complete_parse(handle) != yajl_status_ok) {
1233             unsigned char* msg;
1234             string full_msg;
1235 
1236             msg = yajl_get_error(handle, 1, (const unsigned char *)sbr.get_data(), sbr.length());
1237             if (msg != nullptr) {
1238                 full_msg = fmt::format(
1239                     "[offset: {}] {}\n{}",
1240                     ll.get_offset(),
1241                     fmt::string_view{sbr.get_data(), sbr.length()},
1242                     msg);
1243                 yajl_free_error(handle, msg);
1244             }
1245 
1246             this->jlf_cached_line.resize(full_msg.size());
1247             memcpy(this->jlf_cached_line.data(), full_msg.data(), full_msg.size());
1248             this->jlf_line_values.clear();
1249             this->jlf_line_attrs.emplace_back(
1250                 line_range{0, -1},
1251                 &SA_INVALID,
1252                 (void *) "JSON line failed to parse");
1253         } else {
1254             std::vector<logline_value>::iterator lv_iter;
1255             bool used_values[this->jlf_line_values.size()];
1256             struct line_range lr;
1257 
1258             memset(used_values, 0, sizeof(used_values));
1259 
1260             for (lv_iter = this->jlf_line_values.begin();
1261                  lv_iter != this->jlf_line_values.end();
1262                  ++lv_iter) {
1263                 lv_iter->lv_meta.lvm_format = this;
1264             }
1265 
1266             int sub_offset = 1 + this->jlf_line_format_init_count;
1267             for (const auto &jfe : this->jlf_line_format) {
1268                 static const intern_string_t ts_field = intern_string::lookup("__timestamp__", -1);
1269                 static const intern_string_t level_field = intern_string::lookup("__level__");
1270                 size_t begin_size = this->jlf_cached_line.size();
1271 
1272                 switch (jfe.jfe_type) {
1273                 case JLF_CONSTANT:
1274                     this->json_append_to_cache(jfe.jfe_default_value.c_str(),
1275                             jfe.jfe_default_value.size());
1276                     break;
1277                 case JLF_VARIABLE:
1278                     lv_iter = find_if(this->jlf_line_values.begin(),
1279                                       this->jlf_line_values.end(),
1280                                       logline_value_cmp(&jfe.jfe_value));
1281                     if (lv_iter != this->jlf_line_values.end()) {
1282                         string str = lv_iter->to_string();
1283                         size_t nl_pos = str.find('\n');
1284 
1285                         lr.lr_start = this->jlf_cached_line.size();
1286 
1287                         lv_iter->lv_meta.lvm_hidden = lv_iter->lv_meta.lvm_user_hidden;
1288                         if ((int)str.size() > jfe.jfe_max_width) {
1289                             switch (jfe.jfe_overflow) {
1290                                 case json_format_element::overflow_t::ABBREV: {
1291                                     this->json_append_to_cache(
1292                                         str.c_str(), str.size());
1293                                     size_t new_size = abbreviate_str(
1294                                         &this->jlf_cached_line[lr.lr_start],
1295                                         str.size(),
1296                                         jfe.jfe_max_width);
1297 
1298                                     this->jlf_cached_line.resize(
1299                                         lr.lr_start + new_size);
1300                                     break;
1301                                 }
1302                                 case json_format_element::overflow_t::TRUNCATE: {
1303                                     this->json_append_to_cache(
1304                                         str.c_str(), jfe.jfe_max_width);
1305                                     break;
1306                                 }
1307                                 case json_format_element::overflow_t::DOTDOT: {
1308                                     size_t middle = (jfe.jfe_max_width / 2) - 1;
1309                                     this->json_append_to_cache(
1310                                         str.c_str(), middle);
1311                                     this->json_append_to_cache("..", 2);
1312                                     size_t rest = (jfe.jfe_max_width - middle - 2);
1313                                     this->json_append_to_cache(
1314                                         str.c_str() + str.size() - rest, rest);
1315                                     break;
1316                                 }
1317                             }
1318                         }
1319                         else {
1320                             sub_offset += count(str.begin(), str.end(), '\n');
1321                             this->json_append(jfe, str.c_str(), str.size());
1322                         }
1323 
1324                         if (nl_pos == string::npos || full_message) {
1325                             lr.lr_end = this->jlf_cached_line.size();
1326                         } else {
1327                             lr.lr_end = lr.lr_start + nl_pos;
1328                         }
1329 
1330                         if (lv_iter->lv_meta.lvm_name == this->lf_timestamp_field) {
1331                             this->jlf_line_attrs.emplace_back(
1332                                 lr, &logline::L_TIMESTAMP);
1333                         }
1334                         else if (lv_iter->lv_meta.lvm_name == this->elf_body_field) {
1335                             this->jlf_line_attrs.emplace_back(
1336                                 lr, &SA_BODY);
1337                         }
1338                         else if (lv_iter->lv_meta.lvm_name == this->elf_opid_field) {
1339                             this->jlf_line_attrs.emplace_back(
1340                                     lr, &logline::L_OPID);
1341                         }
1342                         lv_iter->lv_origin = lr;
1343                         used_values[distance(this->jlf_line_values.begin(),
1344                                              lv_iter)] = true;
1345                     }
1346                     else if (jfe.jfe_value == ts_field) {
1347                         struct line_range lr;
1348                         ssize_t ts_len;
1349                         char ts[64];
1350 
1351                         if (jfe.jfe_ts_format.empty()) {
1352                             ts_len = sql_strftime(ts, sizeof(ts),
1353                                                   ll.get_timeval(), 'T');
1354                         } else {
1355                             struct exttm et;
1356 
1357                             ll.to_exttm(et);
1358                             ts_len = ftime_fmt(ts, sizeof(ts),
1359                                                jfe.jfe_ts_format.c_str(),
1360                                                et);
1361                         }
1362                         lr.lr_start = this->jlf_cached_line.size();
1363                         this->json_append_to_cache(ts, ts_len);
1364                         lr.lr_end = this->jlf_cached_line.size();
1365                         this->jlf_line_attrs.emplace_back(lr, &logline::L_TIMESTAMP);
1366 
1367                         lv_iter = find_if(this->jlf_line_values.begin(),
1368                                           this->jlf_line_values.end(),
1369                                           logline_value_cmp(&this->lf_timestamp_field));
1370                         if (lv_iter != this->jlf_line_values.end()) {
1371                             used_values[distance(this->jlf_line_values.begin(),
1372                                                  lv_iter)] = true;
1373                         }
1374                     }
1375                     else if (jfe.jfe_value == level_field) {
1376                         this->json_append(jfe, ll.get_level_name(), -1);
1377                     }
1378                     else {
1379                         this->json_append(jfe,
1380                                           jfe.jfe_default_value.c_str(),
1381                                           jfe.jfe_default_value.size());
1382                     }
1383 
1384                     switch (jfe.jfe_text_transform) {
1385                         case external_log_format::json_format_element::transform_t::NONE:
1386                             break;
1387                         case external_log_format::json_format_element::transform_t::UPPERCASE:
1388                             for (size_t cindex = begin_size; cindex < this->jlf_cached_line.size(); cindex++) {
1389                                 this->jlf_cached_line[cindex] = toupper(this->jlf_cached_line[cindex]);
1390                             }
1391                             break;
1392                         case external_log_format::json_format_element::transform_t::LOWERCASE:
1393                             for (size_t cindex = begin_size; cindex < this->jlf_cached_line.size(); cindex++) {
1394                                 this->jlf_cached_line[cindex] = tolower(this->jlf_cached_line[cindex]);
1395                             }
1396                             break;
1397                         case external_log_format::json_format_element::transform_t::CAPITALIZE:
1398                             for (size_t cindex = begin_size; cindex < begin_size + 1; cindex++) {
1399                                 this->jlf_cached_line[cindex] = toupper(this->jlf_cached_line[cindex]);
1400                             }
1401                             for (size_t cindex = begin_size + 1; cindex < this->jlf_cached_line.size(); cindex++) {
1402                                 this->jlf_cached_line[cindex] = tolower(this->jlf_cached_line[cindex]);
1403                             }
1404                             break;
1405                     }
1406                     break;
1407                 }
1408             }
1409             this->json_append_to_cache("\n", 1);
1410 
1411             for (size_t lpc = 0; lpc < this->jlf_line_values.size(); lpc++) {
1412                 static const intern_string_t body_name = intern_string::lookup(
1413                     "body", -1);
1414                 logline_value &lv = this->jlf_line_values[lpc];
1415 
1416                 if (lv.lv_meta.lvm_hidden || used_values[lpc] || body_name == lv.lv_meta.lvm_name) {
1417                     continue;
1418                 }
1419 
1420                 const std::string str = lv.to_string();
1421                 size_t curr_pos = 0, nl_pos, line_len = -1;
1422 
1423                 lv.lv_sub_offset = sub_offset;
1424                 lv.lv_origin.lr_start = 2 + lv.lv_meta.lvm_name.size() + 2;
1425                 do {
1426                     nl_pos = str.find('\n', curr_pos);
1427                     if (nl_pos != std::string::npos) {
1428                         line_len = nl_pos - curr_pos;
1429                     }
1430                     else {
1431                         line_len = str.size() - curr_pos;
1432                     }
1433                     this->json_append_to_cache("  ", 2);
1434                     this->json_append_to_cache(lv.lv_meta.lvm_name.get(),
1435                                                lv.lv_meta.lvm_name.size());
1436                     this->json_append_to_cache(": ", 2);
1437                     this->json_append_to_cache(
1438                         &str.c_str()[curr_pos], line_len);
1439                     this->json_append_to_cache("\n", 1);
1440                     curr_pos = nl_pos + 1;
1441                     sub_offset += 1;
1442                 } while (nl_pos != std::string::npos &&
1443                          nl_pos < str.size());
1444             }
1445 
1446         }
1447 
1448         this->jlf_line_offsets.push_back(0);
1449         for (size_t lpc = 0; lpc < this->jlf_cached_line.size(); lpc++) {
1450             if (this->jlf_cached_line[lpc] == '\n') {
1451                 this->jlf_line_offsets.push_back(lpc + 1);
1452             }
1453         }
1454         this->jlf_line_offsets.push_back(this->jlf_cached_line.size());
1455         this->jlf_cached_offset = ll.get_offset();
1456         this->jlf_cached_full = full_message;
1457     }
1458 
1459     off_t this_off = 0, next_off = 0;
1460 
1461     if (!this->jlf_line_offsets.empty() && ll.get_sub_offset() < this->jlf_line_offsets.size()) {
1462         require(ll.get_sub_offset() < this->jlf_line_offsets.size());
1463 
1464         this_off = this->jlf_line_offsets[ll.get_sub_offset()];
1465         if ((ll.get_sub_offset() + 1) < (int)this->jlf_line_offsets.size()) {
1466             next_off = this->jlf_line_offsets[ll.get_sub_offset() + 1];
1467         }
1468         else {
1469             next_off = this->jlf_cached_line.size();
1470         }
1471         if (next_off > 0 && this->jlf_cached_line[next_off - 1] == '\n' &&
1472             this_off != next_off) {
1473             next_off -= 1;
1474         }
1475     }
1476 
1477     if (full_message) {
1478         sbr.share(this->jlf_share_manager,
1479                   &this->jlf_cached_line[0],
1480                   this->jlf_cached_line.size());
1481     }
1482     else {
1483         sbr.share(this->jlf_share_manager,
1484                   this->jlf_cached_line.data() + this_off,
1485                   next_off - this_off);
1486     }
1487 }
1488 
build(std::vector<std::string> & errors)1489 void external_log_format::build(std::vector<std::string> &errors) {
1490     if (!this->lf_timestamp_field.empty()) {
1491         auto &vd = this->elf_value_defs[this->lf_timestamp_field];
1492         if (vd.get() == nullptr) {
1493             vd = make_shared<external_log_format::value_def>(
1494                 this->lf_timestamp_field,
1495                 value_kind_t::VALUE_TEXT,
1496                 -1,
1497                 this);
1498         }
1499         vd->vd_meta.lvm_name = this->lf_timestamp_field;
1500         vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1501         vd->vd_internal = true;
1502     }
1503     if (!this->elf_level_field.empty() && this->elf_value_defs.
1504         find(this->elf_level_field) == this->elf_value_defs.end()) {
1505         auto &vd = this->elf_value_defs[this->elf_level_field];
1506         if (vd.get() == nullptr) {
1507             vd = make_shared<external_log_format::value_def>(
1508                 this->elf_level_field,
1509                 value_kind_t::VALUE_TEXT,
1510                 -1,
1511                 this);
1512         }
1513         vd->vd_meta.lvm_name = this->elf_level_field;
1514         vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1515         vd->vd_internal = true;
1516     }
1517     if (!this->elf_body_field.empty()) {
1518         auto &vd = this->elf_value_defs[this->elf_body_field];
1519         if (vd.get() == nullptr) {
1520             vd = make_shared<external_log_format::value_def>(
1521                 this->elf_body_field,
1522                 value_kind_t::VALUE_TEXT,
1523                 -1,
1524                 this);
1525         }
1526         vd->vd_meta.lvm_name = this->elf_body_field;
1527         vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1528         vd->vd_internal = true;
1529     }
1530 
1531     if (!this->lf_timestamp_format.empty()) {
1532         this->lf_timestamp_format.push_back(nullptr);
1533     }
1534     try {
1535         this->elf_filename_pcre =
1536             std::make_shared<pcrepp>(this->elf_file_pattern);
1537     }
1538     catch (const pcrepp::error &e) {
1539         errors.push_back("error:" +
1540                          this->elf_name.to_string() + ".file-pattern:" +
1541                          e.what());
1542     }
1543     for (auto iter = this->elf_patterns.begin();
1544          iter != this->elf_patterns.end();
1545          ++iter) {
1546         pattern &pat = *iter->second;
1547 
1548         if (pat.p_module_format) {
1549             this->elf_has_module_format = true;
1550         }
1551 
1552         try {
1553             pat.p_pcre = std::make_unique<pcrepp>(pat.p_string, PCRE_DOTALL);
1554         }
1555         catch (const pcrepp::error &e) {
1556             errors.push_back("error:" +
1557                              this->elf_name.to_string() + ".regex[" +
1558                              iter->first + "]" +
1559                              ":" +
1560                              e.what());
1561             errors.push_back("error:" +
1562                              this->elf_name.to_string() + ".regex[" +
1563                              iter->first + "]" +
1564                              ":" + pat.p_string);
1565             errors.push_back("error:" +
1566                              this->elf_name.to_string() + ".regex[" +
1567                              iter->first + "]" +
1568                              ":" + string(e.e_offset, ' ') +
1569                              "^");
1570             continue;
1571         }
1572         for (pcre_named_capture::iterator name_iter = pat.p_pcre->named_begin();
1573              name_iter != pat.p_pcre->named_end();
1574              ++name_iter) {
1575             const intern_string_t name = intern_string::lookup(
1576                 name_iter->pnc_name, -1);
1577 
1578             if (name == this->lf_timestamp_field) {
1579                 pat.p_timestamp_field_index = name_iter->index();
1580             }
1581             if (name == this->elf_level_field) {
1582                 pat.p_level_field_index = name_iter->index();
1583             }
1584             if (name == this->elf_module_id_field) {
1585                 pat.p_module_field_index = name_iter->index();
1586             }
1587             if (name == this->elf_opid_field) {
1588                 pat.p_opid_field_index = name_iter->index();
1589             }
1590             if (name == this->elf_body_field) {
1591                 pat.p_body_field_index = name_iter->index();
1592             }
1593 
1594             auto value_iter = this->elf_value_defs.find(name);
1595             if (value_iter != this->elf_value_defs.end()) {
1596                 auto vd = value_iter->second;
1597                 indexed_value_def ivd;
1598 
1599                 ivd.ivd_index = name_iter->index();
1600                 if (!vd->vd_unit_field.empty()) {
1601                     ivd.ivd_unit_field_index = pat.p_pcre->name_index(
1602                         vd->vd_unit_field.get());
1603                 }
1604                 else {
1605                     ivd.ivd_unit_field_index = -1;
1606                 }
1607                 if (!vd->vd_internal && vd->vd_meta.lvm_column == -1) {
1608                     vd->vd_meta.lvm_column = this->elf_column_count++;
1609                 }
1610                 ivd.ivd_value_def = vd;
1611                 pat.p_value_by_index.push_back(ivd);
1612             }
1613         }
1614 
1615         stable_sort(pat.p_value_by_index.begin(), pat.p_value_by_index.end());
1616 
1617         for (int lpc = 0; lpc < (int)pat.p_value_by_index.size(); lpc++) {
1618             auto &ivd = pat.p_value_by_index[lpc];
1619             auto vd = ivd.ivd_value_def;
1620 
1621             if (!vd->vd_foreign_key && !vd->vd_meta.lvm_identifier) {
1622                 switch (vd->vd_meta.lvm_kind) {
1623                     case value_kind_t::VALUE_INTEGER:
1624                     case value_kind_t::VALUE_FLOAT:
1625                         pat.p_numeric_value_indexes.push_back(lpc);
1626                         break;
1627                     default:
1628                         break;
1629                 }
1630             }
1631         }
1632 
1633         if (!this->elf_level_field.empty() && pat.p_level_field_index == -1) {
1634             log_warning("%s:level field '%s' not found in pattern",
1635                         pat.p_config_path.c_str(),
1636                         this->elf_level_field.get());
1637         }
1638         if (!this->elf_module_id_field.empty() &&
1639             pat.p_module_field_index == -1) {
1640             log_warning("%s:module field '%s' not found in pattern",
1641                         pat.p_config_path.c_str(),
1642                         this->elf_module_id_field.get());
1643         }
1644         if (!this->elf_body_field.empty() && pat.p_body_field_index == -1) {
1645             log_warning("%s:body field '%s' not found in pattern",
1646                         pat.p_config_path.c_str(),
1647                         this->elf_body_field.get());
1648         }
1649 
1650         this->elf_pattern_order.push_back(iter->second);
1651     }
1652 
1653     if (this->elf_type != ELF_TYPE_TEXT) {
1654         if (!this->elf_patterns.empty()) {
1655             errors.push_back("error:" +
1656                              this->elf_name.to_string() +
1657                              ": structured logs cannot have regexes");
1658         }
1659         if (this->elf_type == ELF_TYPE_JSON) {
1660             this->jlf_parse_context = std::make_shared<yajlpp_parse_context>(
1661                 this->elf_name.to_string());
1662             this->jlf_yajl_handle.reset(
1663                 yajl_alloc(&this->jlf_parse_context->ypc_callbacks,
1664                            nullptr,
1665                            this->jlf_parse_context.get()),
1666                 yajl_handle_deleter());
1667             yajl_config(this->jlf_yajl_handle.get(), yajl_dont_validate_strings,
1668                         1);
1669         }
1670 
1671     }
1672     else {
1673         if (this->elf_patterns.empty()) {
1674             errors.push_back("error:" +
1675                              this->elf_name.to_string() +
1676                              ": no regexes specified for format");
1677         }
1678     }
1679 
1680     for (auto &elf_level_pattern : this->elf_level_patterns) {
1681         try {
1682             elf_level_pattern.second.lp_pcre = std::make_shared<pcrepp>(
1683                 elf_level_pattern.second.lp_regex.c_str());
1684         }
1685         catch (const pcrepp::error &e) {
1686             errors.push_back("error:" +
1687                              this->elf_name.to_string() + ".level:" + e.what());
1688         }
1689     }
1690 
1691     stable_sort(this->elf_level_pairs.begin(), this->elf_level_pairs.end());
1692 
1693     for (auto &vd : this->elf_value_def_order) {
1694         std::vector<std::string>::iterator act_iter;
1695 
1696         if (!vd->vd_internal &&
1697             vd->vd_meta.lvm_column == -1) {
1698             vd->vd_meta.lvm_column = this->elf_column_count++;
1699         }
1700 
1701         if (vd->vd_meta.lvm_kind == value_kind_t::VALUE_UNKNOWN) {
1702             vd->vd_meta.lvm_kind = value_kind_t::VALUE_TEXT;
1703         }
1704 
1705         for (act_iter = vd->vd_action_list.begin();
1706              act_iter != vd->vd_action_list.end();
1707              ++act_iter) {
1708             if (this->lf_action_defs.find(*act_iter) ==
1709                 this->lf_action_defs.end()) {
1710                 errors.push_back("error:" +
1711                                  this->elf_name.to_string() + ":" +
1712                                      vd->vd_meta.lvm_name.get() +
1713                                  ": cannot find action -- " + (*act_iter));
1714             }
1715         }
1716     }
1717 
1718     if (this->elf_type == ELF_TYPE_TEXT && this->elf_samples.empty()) {
1719         errors.push_back("error:" +
1720                          this->elf_name.to_string() +
1721                          ":no sample logs provided, all formats must have samples");
1722     }
1723 
1724     for (auto &elf_sample : this->elf_samples) {
1725         pcre_context_static<128> pc;
1726         pcre_input pi(elf_sample.s_line);
1727         bool found = false;
1728 
1729         for (auto pat_iter = this->elf_pattern_order.begin();
1730              pat_iter != this->elf_pattern_order.end() && !found;
1731              ++pat_iter) {
1732             pattern &pat = *(*pat_iter);
1733 
1734             if (!pat.p_pcre) {
1735                 continue;
1736             }
1737 
1738             if (!pat.p_module_format &&
1739                 pat.p_pcre->name_index(this->lf_timestamp_field.to_string()) <
1740                 0) {
1741                 errors.push_back("error:" +
1742                                  this->elf_name.to_string() +
1743                                  ":timestamp field '" +
1744                                  this->lf_timestamp_field.get() +
1745                                  "' not found in pattern -- " +
1746                                  pat.p_string);
1747                 continue;
1748             }
1749 
1750             if (pat.p_pcre->match(pc, pi)) {
1751                 if (pat.p_module_format) {
1752                     found = true;
1753                     continue;
1754                 }
1755                 pcre_context::capture_t *ts_cap =
1756                     pc[this->lf_timestamp_field.get()];
1757                 pcre_context::capture_t *level_cap = pc[pat.p_level_field_index];
1758                 const char *ts = pi.get_substr_start(ts_cap);
1759                 ssize_t ts_len = pc[this->lf_timestamp_field.get()]->length();
1760                 const char *const *custom_formats = this->get_timestamp_formats();
1761                 date_time_scanner dts;
1762                 struct timeval tv;
1763                 struct exttm tm;
1764 
1765                 if (ts_cap->c_begin == 0) {
1766                     pat.p_timestamp_end = ts_cap->c_end;
1767                 }
1768                 found = true;
1769                 if (ts_len == -1 ||
1770                     dts.scan(ts, ts_len, custom_formats, &tm, tv) == NULL) {
1771                     errors.push_back("error:" +
1772                                      this->elf_name.to_string() +
1773                                      ":invalid sample -- " +
1774                                          elf_sample.s_line);
1775                     errors.push_back("error:" +
1776                                      this->elf_name.to_string() +
1777                                      ":unrecognized timestamp format -- " + ts);
1778 
1779                     if (custom_formats == NULL) {
1780                         for (int lpc = 0;
1781                              PTIMEC_FORMATS[lpc].pf_fmt != NULL; lpc++) {
1782                             off_t off = 0;
1783 
1784                             PTIMEC_FORMATS[lpc].pf_func(&tm, ts, off, ts_len);
1785                             errors.push_back("  format: " +
1786                                              string(
1787                                                  PTIMEC_FORMATS[lpc].pf_fmt) +
1788                                              "; matched: " + string(ts, off));
1789                         }
1790                     }
1791                     else {
1792                         for (int lpc = 0; custom_formats[lpc] != NULL; lpc++) {
1793                             off_t off = 0;
1794 
1795                             ptime_fmt(custom_formats[lpc], &tm, ts, off,
1796                                       ts_len);
1797                             errors.push_back("  format: " +
1798                                              string(custom_formats[lpc]) +
1799                                              "; matched: " + string(ts, off));
1800                         }
1801                     }
1802                 }
1803 
1804                 log_level_t level = this->convert_level(pi, level_cap);
1805 
1806                 if (elf_sample.s_level != LEVEL_UNKNOWN) {
1807                     if (elf_sample.s_level != level) {
1808                         errors.push_back("error:" +
1809                                          this->elf_name.to_string() +
1810                                          ":invalid sample -- " +
1811                                              elf_sample.s_line);
1812                         errors.push_back("error:" +
1813                                          this->elf_name.to_string() +
1814                                          ":parsed level '" +
1815                                          level_names[level] +
1816                                          "' does not match expected level of '" +
1817                                          level_names[elf_sample.s_level] +
1818                                          "'");
1819                     }
1820                 }
1821             }
1822         }
1823 
1824         if (!found) {
1825             errors.push_back("error:" +
1826                              this->elf_name.to_string() +
1827                              ":invalid sample         -- " +
1828                                  elf_sample.s_line);
1829 
1830             for (auto pat_iter = this->elf_pattern_order.begin();
1831                  pat_iter != this->elf_pattern_order.end();
1832                  ++pat_iter) {
1833                 pattern &pat = *(*pat_iter);
1834 
1835                 if (!pat.p_pcre) {
1836                     continue;
1837                 }
1838 
1839                 size_t partial_len = pat.p_pcre->match_partial(pi);
1840 
1841                 if (partial_len > 0) {
1842                     errors.push_back("error:" +
1843                                      this->elf_name.to_string() +
1844                                      ":partial sample matched -- " +
1845                                          elf_sample.s_line.substr(0, partial_len));
1846                     errors.push_back("error:  against pattern " +
1847                                      (*pat_iter)->p_config_path +
1848                                      " -- " +
1849                                      (*pat_iter)->p_string);
1850                 }
1851                 else {
1852                     errors.push_back("error:" +
1853                                      this->elf_name.to_string() +
1854                                      ":no partial match found");
1855                 }
1856             }
1857         }
1858     }
1859 
1860     for (auto &elf_value_def : this->elf_value_defs) {
1861         if (elf_value_def.second->vd_foreign_key || elf_value_def.second->vd_meta.lvm_identifier) {
1862             continue;
1863         }
1864 
1865         switch (elf_value_def.second->vd_meta.lvm_kind) {
1866             case value_kind_t::VALUE_INTEGER:
1867             case value_kind_t::VALUE_FLOAT:
1868                 elf_value_def.second->vd_values_index = this->elf_numeric_value_defs.size();
1869                 this->elf_numeric_value_defs.push_back(elf_value_def.second);
1870                 break;
1871             default:
1872                 break;
1873         }
1874     }
1875 
1876     this->lf_value_stats.resize(this->elf_numeric_value_defs.size());
1877 
1878     int format_index = 0;
1879     for (auto iter = this->jlf_line_format.begin();
1880          iter != this->jlf_line_format.end();
1881          ++iter, format_index++) {
1882         static const intern_string_t ts = intern_string::lookup("__timestamp__");
1883         static const intern_string_t level_field = intern_string::lookup("__level__");
1884         json_format_element &jfe = *iter;
1885 
1886         if (startswith(jfe.jfe_value.get(), "/")) {
1887             jfe.jfe_value = intern_string::lookup(jfe.jfe_value.get() + 1);
1888         }
1889         if (!jfe.jfe_ts_format.empty()) {
1890             if (!jfe.jfe_value.empty() && jfe.jfe_value != ts) {
1891                 log_warning("%s:line-format[%d]:ignoring field '%s' since "
1892                             "timestamp-format was used",
1893                             this->elf_name.get(), format_index,
1894                             jfe.jfe_value.get());
1895             }
1896             jfe.jfe_value = ts;
1897         }
1898 
1899         switch (jfe.jfe_type) {
1900             case JLF_VARIABLE: {
1901                 auto vd_iter = this->elf_value_defs.find(jfe.jfe_value);
1902                 if (jfe.jfe_value == ts) {
1903                     this->elf_value_defs[this->lf_timestamp_field]->vd_meta.lvm_hidden = true;
1904                 } else if (jfe.jfe_value == level_field) {
1905                     this->elf_value_defs[this->elf_level_field]->vd_meta.lvm_hidden = true;
1906                 } else if (vd_iter == this->elf_value_defs.end()) {
1907                     char index_str[32];
1908 
1909                     snprintf(index_str, sizeof(index_str), "%d", format_index);
1910                     errors.push_back("error:" +
1911                                      this->elf_name.to_string() +
1912                                      ":line-format[" +
1913                                      index_str +
1914                                      "]:line format variable is not defined -- " +
1915                                      jfe.jfe_value.to_string());
1916                 }
1917                 break;
1918             }
1919             case JLF_CONSTANT:
1920                 this->jlf_line_format_init_count +=
1921                     std::count(jfe.jfe_default_value.begin(),
1922                                jfe.jfe_default_value.end(),
1923                                '\n');
1924                 break;
1925             default:
1926                 break;
1927         }
1928     }
1929 
1930     for (auto &hd_pair : this->elf_highlighter_patterns) {
1931         external_log_format::highlighter_def &hd = hd_pair.second;
1932         const std::string &pattern = hd.hd_pattern;
1933         const char *errptr;
1934         auto fg = styling::color_unit::make_empty();
1935         auto bg = styling::color_unit::make_empty();
1936         int eoff, attrs = 0;
1937 
1938         if (!hd.hd_color.empty()) {
1939             fg = styling::color_unit::from_str(hd.hd_color)
1940                 .unwrapOrElse([&](const auto& msg) {
1941                     errors.push_back("error:"
1942                                      + this->elf_name.to_string()
1943                                      + ":highlighters/"
1944                                      + hd_pair.first.to_string()
1945                                      + "/color:"
1946                                      + msg);
1947                     return styling::color_unit::make_empty();
1948                 });
1949         }
1950 
1951         if (!hd.hd_background_color.empty()) {
1952             bg = styling::color_unit::from_str(hd.hd_background_color)
1953                 .unwrapOrElse([&](const auto& msg) {
1954                     errors.push_back("error:"
1955                                      + this->elf_name.to_string()
1956                                      + ":highlighters/"
1957                                      + hd_pair.first.to_string()
1958                                      + "/color:"
1959                                      + msg);
1960                     return styling::color_unit::make_empty();
1961                 });
1962         }
1963 
1964         if (hd.hd_underline) {
1965             attrs |= A_UNDERLINE;
1966         }
1967         if (hd.hd_blink) {
1968             attrs |= A_BLINK;
1969         }
1970 
1971         pcre *code = pcre_compile(pattern.c_str(),
1972                                   PCRE_CASELESS,
1973                                   &errptr,
1974                                   &eoff,
1975                                   nullptr);
1976 
1977         if (code == nullptr) {
1978             errors.push_back("error:"
1979                              + this->elf_name.to_string()
1980                              + ":highlighters/"
1981                              + hd_pair.first.to_string()
1982                              + ":"
1983                              + string(errptr));
1984             errors.push_back("error:"
1985                              + this->elf_name.to_string()
1986                              + ":highlighters/"
1987                              + hd_pair.first.to_string()
1988                              + ":"
1989                              + pattern);
1990             errors.push_back("error:"
1991                              + this->elf_name.to_string()
1992                              + ":highlighters/"
1993                              + hd_pair.first.to_string()
1994                              + ":"
1995                              + string(eoff, ' ')
1996                              + "^");
1997         } else {
1998             this->lf_highlighters.emplace_back(code);
1999             this->lf_highlighters.back()
2000                 .with_pattern(pattern)
2001                 .with_format_name(this->elf_name)
2002                 .with_color(fg, bg)
2003                 .with_attrs(attrs);
2004         }
2005     }
2006 }
2007 
register_vtabs(log_vtab_manager * vtab_manager,std::vector<std::string> & errors)2008 void external_log_format::register_vtabs(log_vtab_manager *vtab_manager,
2009                                          std::vector<std::string> &errors)
2010 {
2011     vector<pair<intern_string_t, string>>::iterator search_iter;
2012     for (search_iter = this->elf_search_tables.begin();
2013          search_iter != this->elf_search_tables.end();
2014          ++search_iter) {
2015         auto re_res = pcrepp::from_str(search_iter->second,
2016                                        log_search_table::pattern_options());
2017 
2018         if (re_res.isErr()) {
2019             errors.push_back(fmt::format(
2020                 "error:{}:{}:unable to compile regex '{}': {}",
2021                 this->elf_name.get(),
2022                 search_iter->first.get(),
2023                 search_iter->second,
2024                 re_res.unwrapErr().ce_msg));
2025             continue;
2026         }
2027 
2028         auto lst = std::make_shared<log_search_table>(
2029             re_res.unwrap(), search_iter->first);
2030         string errmsg;
2031 
2032         errmsg = vtab_manager->register_vtab(lst);
2033         if (!errmsg.empty()) {
2034             errors.push_back(
2035                 "error:" +
2036                 this->elf_name.to_string() +
2037                 ":" +
2038                 search_iter->first.to_string() +
2039                 ":unable to register table -- " +
2040                 errmsg);
2041         }
2042     }
2043 }
2044 
match_samples(const vector<sample> & samples) const2045 bool external_log_format::match_samples(const vector<sample> &samples) const
2046 {
2047     for (const auto &sample_iter : samples) {
2048         for (const auto &pat_iter : this->elf_pattern_order) {
2049             pattern &pat = *pat_iter;
2050 
2051             if (!pat.p_pcre) {
2052                 continue;
2053             }
2054 
2055             pcre_context_static<128> pc;
2056             pcre_input pi(sample_iter.s_line);
2057 
2058             if (pat.p_pcre->match(pc, pi)) {
2059                 return true;
2060             }
2061         }
2062     }
2063 
2064     return false;
2065 }
2066 
2067 class external_log_table : public log_format_vtab_impl {
2068 public:
external_log_table(const external_log_format & elf)2069     external_log_table(const external_log_format &elf) :
2070         log_format_vtab_impl(elf), elt_format(elf) {
2071     };
2072 
get_columns(vector<vtab_column> & cols) const2073     void get_columns(vector<vtab_column> &cols) const {
2074         const external_log_format &elf = this->elt_format;
2075 
2076         cols.resize(elf.elf_column_count);
2077         for (const auto &vd : elf.elf_value_def_order) {
2078             auto type_pair = log_vtab_impl::logline_value_to_sqlite_type(vd->vd_meta.lvm_kind);
2079 
2080             if (vd->vd_meta.lvm_column == -1) {
2081                 continue;
2082             }
2083 
2084             require(0 <= vd->vd_meta.lvm_column && vd->vd_meta.lvm_column < elf.elf_column_count);
2085 
2086             cols[vd->vd_meta.lvm_column].vc_name = vd->vd_meta.lvm_name.get();
2087             cols[vd->vd_meta.lvm_column].vc_type = type_pair.first;
2088             cols[vd->vd_meta.lvm_column].vc_subtype = type_pair.second;
2089             cols[vd->vd_meta.lvm_column].vc_collator = vd->vd_collate;
2090             cols[vd->vd_meta.lvm_column].vc_comment = vd->vd_description;
2091         }
2092     };
2093 
get_foreign_keys(std::vector<std::string> & keys_inout) const2094     void get_foreign_keys(std::vector<std::string> &keys_inout) const
2095     {
2096         log_vtab_impl::get_foreign_keys(keys_inout);
2097 
2098         for (const auto &elf_value_def : this->elt_format.elf_value_defs) {
2099             if (elf_value_def.second->vd_foreign_key) {
2100                 keys_inout.emplace_back(elf_value_def.first.to_string());
2101             }
2102         }
2103     };
2104 
next(log_cursor & lc,logfile_sub_source & lss)2105     virtual bool next(log_cursor &lc, logfile_sub_source &lss)
2106     {
2107         lc.lc_curr_line = lc.lc_curr_line + 1_vl;
2108         lc.lc_sub_index = 0;
2109 
2110         if (lc.is_eof()) {
2111             return true;
2112         }
2113 
2114         content_line_t cl(lss.at(lc.lc_curr_line));
2115         auto lf = lss.find_file_ptr(cl);
2116         auto lf_iter = lf->begin() + cl;
2117         uint8_t mod_id = lf_iter->get_module_id();
2118 
2119         if (lf_iter->is_continued()) {
2120             return false;
2121         }
2122 
2123         this->elt_module_format.mf_mod_format = nullptr;
2124         if (lf->get_format_name() == this->lfvi_format.get_name()) {
2125             return true;
2126         } else if (mod_id && mod_id == this->lfvi_format.lf_mod_index) {
2127             auto format = lf->get_format();
2128 
2129             return lf->read_line(lf_iter).map([this, format, cl](auto line) {
2130                 std::vector<logline_value> values;
2131                 shared_buffer_ref body_ref;
2132                 struct line_range mod_name_range;
2133                 intern_string_t mod_name;
2134 
2135                 this->vi_attrs.clear();
2136                 format->annotate(cl, line, this->vi_attrs, values, false);
2137                 this->elt_container_body = find_string_attr_range(this->vi_attrs, &SA_BODY);
2138                 if (!this->elt_container_body.is_valid()) {
2139                     return false;
2140                 }
2141                 this->elt_container_body.ltrim(line.get_data());
2142                 body_ref.subset(line,
2143                                 this->elt_container_body.lr_start,
2144                                 this->elt_container_body.length());
2145                 mod_name_range = find_string_attr_range(this->vi_attrs,
2146                                                         &logline::L_MODULE);
2147                 if (!mod_name_range.is_valid()) {
2148                     return false;
2149                 }
2150                 mod_name = intern_string::lookup(
2151                     &line.get_data()[mod_name_range.lr_start],
2152                     mod_name_range.length());
2153                 this->vi_attrs.clear();
2154                 this->elt_module_format = external_log_format::MODULE_FORMATS[mod_name];
2155                 if (!this->elt_module_format.mf_mod_format) {
2156                     return false;
2157                 }
2158                 return this->elt_module_format.mf_mod_format->get_name() ==
2159                        this->lfvi_format.get_name();
2160             }).unwrapOr(false);
2161         }
2162 
2163         return false;
2164     };
2165 
extract(shared_ptr<logfile> lf,uint64_t line_number,shared_buffer_ref & line,std::vector<logline_value> & values)2166     virtual void extract(shared_ptr<logfile> lf,
2167                          uint64_t line_number,
2168                          shared_buffer_ref &line,
2169                          std::vector<logline_value> &values)
2170     {
2171         auto format = lf->get_format();
2172 
2173         if (this->elt_module_format.mf_mod_format != nullptr) {
2174             shared_buffer_ref body_ref;
2175 
2176             body_ref.subset(line, this->elt_container_body.lr_start,
2177                             this->elt_container_body.length());
2178             this->vi_attrs.clear();
2179             values.clear();
2180             this->elt_module_format.mf_mod_format->annotate(line_number,
2181                                                             body_ref,
2182                                                             this->vi_attrs,
2183                                                             values,
2184                                                             false);
2185         }
2186         else {
2187             this->vi_attrs.clear();
2188             format->annotate(line_number, line, this->vi_attrs, values, false);
2189         }
2190     };
2191 
2192     const external_log_format &elt_format;
2193     module_format elt_module_format;
2194     struct line_range elt_container_body;
2195 };
2196 
get_vtab_impl() const2197 std::shared_ptr<log_vtab_impl> external_log_format::get_vtab_impl() const
2198 {
2199     return std::make_shared<external_log_table>(*this);
2200 }
2201 
specialized(int fmt_lock)2202 std::shared_ptr<log_format> external_log_format::specialized(int fmt_lock)
2203 {
2204     auto retval = std::make_shared<external_log_format>(*this);
2205 
2206     retval->lf_specialized = true;
2207     this->lf_pattern_locks.clear();
2208     if (fmt_lock != -1) {
2209         retval->lf_pattern_locks.emplace_back(0, fmt_lock);
2210     }
2211 
2212     if (this->elf_type == ELF_TYPE_JSON) {
2213         this->jlf_parse_context = std::make_shared<yajlpp_parse_context>(this->elf_name.to_string());
2214         this->jlf_yajl_handle.reset(
2215             yajl_alloc(&this->jlf_parse_context->ypc_callbacks,
2216                        nullptr,
2217                        this->jlf_parse_context.get()),
2218             yajl_handle_deleter());
2219         yajl_config(this->jlf_yajl_handle.get(), yajl_dont_validate_strings, 1);
2220         this->jlf_cached_line.reserve(16 * 1024);
2221     }
2222 
2223     this->lf_value_stats.clear();
2224     this->lf_value_stats.resize(this->elf_numeric_value_defs.size());
2225 
2226     return retval;
2227 }
2228 
match_name(const string & filename)2229 bool external_log_format::match_name(const string &filename)
2230 {
2231     if (this->elf_file_pattern.empty()) {
2232         return true;
2233     }
2234 
2235     pcre_context_static<10> pc;
2236     pcre_input pi(filename);
2237 
2238     return this->elf_filename_pcre->match(pc, pi);
2239 }
2240 
pattern_index_for_line(uint64_t line_number) const2241 int log_format::pattern_index_for_line(uint64_t line_number) const
2242 {
2243     auto iter = lower_bound(this->lf_pattern_locks.cbegin(),
2244                             this->lf_pattern_locks.cend(),
2245                             line_number,
2246                             [](const pattern_for_lines &pfl, uint32_t line) {
2247         return pfl.pfl_line < line;
2248     });
2249 
2250     if (iter == this->lf_pattern_locks.end() ||
2251         iter->pfl_line != line_number) {
2252         --iter;
2253     }
2254 
2255     return iter->pfl_pat_index;
2256 }
2257 
get_pattern_name(uint64_t line_number) const2258 std::string log_format::get_pattern_name(uint64_t line_number) const
2259 {
2260     int pat_index = this->pattern_index_for_line(line_number);
2261     return fmt::format("builtin ({})", pat_index);
2262 }
2263 
pattern_for_lines(uint32_t pfl_line,uint32_t pfl_pat_index)2264 log_format::pattern_for_lines::pattern_for_lines(
2265     uint32_t pfl_line, uint32_t pfl_pat_index) :
2266     pfl_line(pfl_line), pfl_pat_index(pfl_pat_index)
2267 {
2268 }
2269 
2270 /* XXX */
2271 #include "log_format_impls.cc"
2272