1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_compat.h>
23 #include <fluent-bit/flb_filter.h>
24 #include <fluent-bit/flb_filter_plugin.h>
25 #include <fluent-bit/flb_luajit.h>
26 #include <fluent-bit/flb_utils.h>
27 #include <fluent-bit/flb_pack.h>
28 #include <fluent-bit/flb_sds.h>
29 #include <fluent-bit/flb_time.h>
30 #include <msgpack.h>
31
32 #include "lua_config.h"
33
34 /* Push timestamp as a Lua table into the stack */
lua_pushtimetable(lua_State * l,struct flb_time * tm)35 static void lua_pushtimetable(lua_State *l, struct flb_time *tm)
36 {
37 lua_createtable(l, 0, 2);
38
39 /* seconds */
40 lua_pushlstring(l, "sec", 3);
41 lua_pushinteger(l, tm->tm.tv_sec);
42 lua_settable(l, -3);
43
44 /* nanoseconds */
45 lua_pushlstring(l, "nsec", 4);
46 lua_pushinteger(l, tm->tm.tv_nsec);
47 lua_settable(l, -3);
48 }
49
lua_pushmsgpack(lua_State * l,msgpack_object * o)50 static void lua_pushmsgpack(lua_State *l, msgpack_object *o)
51 {
52 int i;
53 int size;
54
55 lua_checkstack(l, 3);
56
57 switch(o->type) {
58 case MSGPACK_OBJECT_NIL:
59 lua_pushnil(l);
60 break;
61
62 case MSGPACK_OBJECT_BOOLEAN:
63 lua_pushboolean(l, o->via.boolean);
64 break;
65
66 case MSGPACK_OBJECT_POSITIVE_INTEGER:
67 lua_pushinteger(l, (double) o->via.u64);
68 break;
69
70 case MSGPACK_OBJECT_NEGATIVE_INTEGER:
71 lua_pushinteger(l, (double) o->via.i64);
72 break;
73
74 case MSGPACK_OBJECT_FLOAT32:
75 case MSGPACK_OBJECT_FLOAT64:
76 lua_pushnumber(l, (double) o->via.f64);
77 break;
78
79 case MSGPACK_OBJECT_STR:
80 lua_pushlstring(l, o->via.str.ptr, o->via.str.size);
81 break;
82
83 case MSGPACK_OBJECT_BIN:
84 lua_pushlstring(l, o->via.bin.ptr, o->via.bin.size);
85 break;
86
87 case MSGPACK_OBJECT_EXT:
88 lua_pushlstring(l, o->via.ext.ptr, o->via.ext.size);
89 break;
90
91 case MSGPACK_OBJECT_ARRAY:
92 size = o->via.array.size;
93 lua_createtable(l, size, 0);
94 if (size != 0) {
95 msgpack_object *p = o->via.array.ptr;
96 for (i = 0; i < size; i++) {
97 lua_pushmsgpack(l, p+i);
98 lua_rawseti (l, -2, i+1);
99 }
100 }
101 break;
102
103 case MSGPACK_OBJECT_MAP:
104 size = o->via.map.size;
105 lua_createtable(l, 0, size);
106 if (size != 0) {
107 msgpack_object_kv *p = o->via.map.ptr;
108 for (i = 0; i < size; i++) {
109 lua_pushmsgpack(l, &(p+i)->key);
110 lua_pushmsgpack(l, &(p+i)->val);
111 lua_settable(l, -3);
112 }
113 }
114 break;
115 }
116
117 }
118
119 /*
120 * This function is to call lua function table.maxn.
121 * CAUTION: table.maxn is removed from Lua 5.2.
122 * If we update luajit which is based Lua 5.2+,
123 * this function should be removed.
124 */
125 static int lua_isinteger(lua_State *L, int index);
lua_table_maxn(lua_State * l)126 static int lua_table_maxn(lua_State *l)
127 {
128 #if defined(LUA_VERSION_NUM) && LUA_VERSION_NUM < 520
129 int ret = -1;
130 if (lua_type(l, -1) != LUA_TTABLE) {
131 return -1;
132 }
133
134 lua_getglobal(l, "table");
135 lua_getfield(l, -1, "maxn");
136 lua_remove(l, -2); /* remove table (lua_getglobal(L, "table")) */
137 lua_pushvalue(l, -2); /* copy record to top of stack */
138 ret = lua_pcall(l, 1, 1, 0);
139 if (ret < 0) {
140 flb_error("[filter_lua] failed to exec table.maxn ret=%d", ret);
141 return -1;
142 }
143 if (lua_type(l, -1) != LUA_TNUMBER) {
144 flb_error("[filter_lua] not LUA_TNUMBER");
145 lua_pop(l, 1);
146 return -1;
147 }
148
149 if (lua_isinteger(l, -1)) {
150 ret = lua_tointeger(l, -1);
151 }
152 lua_pop(l, 1);
153
154 return ret;
155 #else
156 return (int)lua_rawlen(l, 1);
157 #endif
158 }
159
lua_arraylength(lua_State * l)160 static int lua_arraylength(lua_State *l)
161 {
162 lua_Integer n;
163 int count = 0;
164 int max = 0;
165 int ret = 0;
166
167 ret = lua_table_maxn(l);
168 if (ret > 0) {
169 return ret;
170 }
171
172 lua_pushnil(l);
173 while (lua_next(l, -2) != 0) {
174 if (lua_type(l, -2) == LUA_TNUMBER) {
175 n = lua_tonumber(l, -2);
176 if (n > 0) {
177 max = n > max ? n : max;
178 count++;
179 lua_pop(l, 1);
180 continue;
181 }
182 }
183 lua_pop(l, 2);
184 return -1;
185 }
186 if (max != count)
187 return -1;
188 return max;
189 }
190
191 static void lua_tomsgpack(struct lua_filter *lf, msgpack_packer *pck, int index);
lua_toarray(struct lua_filter * lf,msgpack_packer * pck,int index)192 static void lua_toarray(struct lua_filter *lf, msgpack_packer *pck, int index)
193 {
194 int len;
195 int i;
196 lua_State *l = lf->lua->state;
197
198 lua_pushnumber(l, (lua_Number)lua_objlen(l, -1)); // lua_len
199 len = (int)lua_tointeger(l, -1);
200 lua_pop(l, 1);
201
202 msgpack_pack_array(pck, len);
203 for (i = 1; i <= len; i++) {
204 lua_rawgeti(l, -1, i);
205 lua_tomsgpack(lf, pck, 0);
206 lua_pop(l, 1);
207 }
208 }
try_to_convert_data_type(struct lua_filter * lf,msgpack_packer * pck,int index)209 static void try_to_convert_data_type(struct lua_filter *lf,
210 msgpack_packer *pck,
211 int index)
212 {
213 size_t len;
214 const char *tmp = NULL;
215 lua_State *l = lf->lua->state;
216
217 struct mk_list *tmp_list = NULL;
218 struct mk_list *head = NULL;
219 struct l2c_type *l2c = NULL;
220
221 // convert to int
222 if ((lua_type(l, -2) == LUA_TSTRING)
223 && lua_type(l, -1) == LUA_TNUMBER){
224 tmp = lua_tolstring(l, -2, &len);
225
226 mk_list_foreach_safe(head, tmp_list, &lf->l2c_types) {
227 l2c = mk_list_entry(head, struct l2c_type, _head);
228 if (!strncmp(l2c->key, tmp, len) && l2c->type == L2C_TYPE_INT) {
229 lua_tomsgpack(lf, pck, -1);
230 msgpack_pack_int64(pck, (int64_t)lua_tonumber(l, -1));
231 return;
232 }
233 }
234 }
235 else if ((lua_type(l, -2) == LUA_TSTRING)
236 && lua_type(l, -1) == LUA_TTABLE){
237 tmp = lua_tolstring(l, -2, &len);
238
239 mk_list_foreach_safe(head, tmp_list, &lf->l2c_types) {
240 l2c = mk_list_entry(head, struct l2c_type, _head);
241 if (!strncmp(l2c->key, tmp, len) && l2c->type == L2C_TYPE_ARRAY) {
242 lua_tomsgpack(lf, pck, -1);
243 lua_toarray(lf, pck, 0);
244 return;
245 }
246 }
247 }
248
249 /* not matched */
250 lua_tomsgpack(lf, pck, -1);
251 lua_tomsgpack(lf, pck, 0);
252 }
253
lua_isinteger(lua_State * L,int index)254 static int lua_isinteger(lua_State *L, int index)
255 {
256 lua_Number n;
257 lua_Integer i;
258
259 if (lua_type(L, index) == LUA_TNUMBER) {
260 n = lua_tonumber(L, index);
261 i = lua_tointeger(L, index);
262
263 if (i == n) {
264 return 1;
265 }
266 }
267 return 0;
268 }
269
lua_tomsgpack(struct lua_filter * lf,msgpack_packer * pck,int index)270 static void lua_tomsgpack(struct lua_filter *lf, msgpack_packer *pck, int index)
271 {
272 int len;
273 int i;
274 lua_State *l = lf->lua->state;
275
276 switch (lua_type(l, -1 + index)) {
277 case LUA_TSTRING:
278 {
279 const char *str;
280 size_t len;
281
282 str = lua_tolstring(l, -1 + index, &len);
283
284 msgpack_pack_str(pck, len);
285 msgpack_pack_str_body(pck, str, len);
286 }
287 break;
288 case LUA_TNUMBER:
289 {
290 if (lua_isinteger(l, -1 + index)) {
291 int64_t num = lua_tointeger(l, -1 + index);
292 msgpack_pack_int64(pck, num);
293 }
294 else {
295 double num = lua_tonumber(l, -1 + index);
296 msgpack_pack_double(pck, num);
297 }
298 }
299 break;
300 case LUA_TBOOLEAN:
301 if (lua_toboolean(l, -1 + index))
302 msgpack_pack_true(pck);
303 else
304 msgpack_pack_false(pck);
305 break;
306 case LUA_TTABLE:
307 len = lua_arraylength(l);
308 if (len > 0) {
309 msgpack_pack_array(pck, len);
310 for (i = 1; i <= len; i++) {
311 lua_rawgeti(l, -1, i);
312 lua_tomsgpack(lf, pck, 0);
313 lua_pop(l, 1);
314 }
315 } else
316 {
317 len = 0;
318 lua_pushnil(l);
319 while (lua_next(l, -2) != 0) {
320 lua_pop(l, 1);
321 len++;
322 }
323 msgpack_pack_map(pck, len);
324
325 lua_pushnil(l);
326
327 if (lf->l2c_types_num > 0) {
328 /* type conversion */
329 while (lua_next(l, -2) != 0) {
330 try_to_convert_data_type(lf, pck, index);
331 lua_pop(l, 1);
332 }
333 } else {
334 while (lua_next(l, -2) != 0) {
335 lua_tomsgpack(lf, pck, -1);
336 lua_tomsgpack(lf, pck, 0);
337 lua_pop(l, 1);
338 }
339 }
340 }
341 break;
342 case LUA_TNIL:
343 msgpack_pack_nil(pck);
344 break;
345
346 case LUA_TLIGHTUSERDATA:
347 if (lua_touserdata(l, -1 + index) == NULL) {
348 msgpack_pack_nil(pck);
349 break;
350 }
351 case LUA_TFUNCTION:
352 case LUA_TUSERDATA:
353 case LUA_TTHREAD:
354 /* cannot serialize */
355 break;
356 }
357 }
358
is_valid_func(lua_State * lua,flb_sds_t func)359 static int is_valid_func(lua_State *lua, flb_sds_t func)
360 {
361 int ret = FLB_FALSE;
362
363 lua_getglobal(lua, func);
364 if (lua_isfunction(lua, -1)) {
365 ret = FLB_TRUE;
366 }
367 lua_pop(lua, -1); /* discard return value of isfunction */
368
369 return ret;
370 }
371
cb_lua_init(struct flb_filter_instance * f_ins,struct flb_config * config,void * data)372 static int cb_lua_init(struct flb_filter_instance *f_ins,
373 struct flb_config *config,
374 void *data)
375 {
376 int ret;
377 (void) data;
378 struct lua_filter *ctx;
379 struct flb_luajit *lj;
380
381 /* Create context */
382 ctx = lua_config_create(f_ins, config);
383 if (!ctx) {
384 flb_error("[filter_lua] filter cannot be loaded");
385 return -1;
386 }
387
388 /* Create LuaJIT state/vm */
389 lj = flb_luajit_create(config);
390 if (!lj) {
391 lua_config_destroy(ctx);
392 return -1;
393 }
394 ctx->lua = lj;
395
396 /* Load Script */
397 ret = flb_luajit_load_script(ctx->lua, ctx->script);
398 if (ret == -1) {
399 lua_config_destroy(ctx);
400 return -1;
401 }
402 lua_pcall(ctx->lua->state, 0, 0, 0);
403
404 if (is_valid_func(ctx->lua->state, ctx->call) != FLB_TRUE) {
405 flb_plg_error(ctx->ins, "function %s is not found", ctx->call);
406 lua_config_destroy(ctx);
407 return -1;
408 }
409
410 /* Set context */
411 flb_filter_set_context(f_ins, ctx);
412
413 return 0;
414 }
415
pack_result(struct flb_time * ts,msgpack_packer * pck,msgpack_sbuffer * sbuf,char * data,size_t bytes)416 static int pack_result (struct flb_time *ts, msgpack_packer *pck, msgpack_sbuffer *sbuf,
417 char *data, size_t bytes)
418 {
419 int ret;
420 int size;
421 int i;
422 size_t off = 0;
423 msgpack_object root;
424 msgpack_unpacked result;
425
426 msgpack_unpacked_init(&result);
427 ret = msgpack_unpack_next(&result, data, bytes, &off);
428 if (ret != MSGPACK_UNPACK_SUCCESS) {
429 msgpack_unpacked_destroy(&result);
430 return FLB_FALSE;
431 }
432
433 root = result.data;
434 /* check for array */
435 if (root.type == MSGPACK_OBJECT_ARRAY) {
436 size = root.via.array.size;
437 if (size > 0) {
438 msgpack_object *map = root.via.array.ptr;
439 for (i = 0; i < size; i++) {
440 if ((map+i)->type != MSGPACK_OBJECT_MAP) {
441 msgpack_unpacked_destroy(&result);
442 return FLB_FALSE;
443 }
444 if ((map+i)->via.map.size <= 0) {
445 msgpack_unpacked_destroy(&result);
446 return FLB_FALSE;
447 }
448 /* main array */
449 msgpack_pack_array(pck, 2);
450
451 /* timestamp: convert from double to Fluent Bit format */
452 flb_time_append_to_msgpack(ts, pck, 0);
453
454 /* Pack lua table */
455 msgpack_pack_object(pck, *(map+i));
456 }
457 msgpack_unpacked_destroy(&result);
458 return FLB_TRUE;
459 }
460 else {
461 msgpack_unpacked_destroy(&result);
462 return FLB_FALSE;
463 }
464 }
465
466 /* check for map */
467 if (root.type != MSGPACK_OBJECT_MAP) {
468 msgpack_unpacked_destroy(&result);
469 return FLB_FALSE;
470 }
471
472 if (root.via.map.size <= 0) {
473 msgpack_unpacked_destroy(&result);
474 return FLB_FALSE;
475 }
476
477 /* main array */
478 msgpack_pack_array(pck, 2);
479
480 flb_time_append_to_msgpack(ts, pck, 0);
481
482 /* Pack lua table */
483 msgpack_sbuffer_write(sbuf, data, bytes);
484
485 msgpack_unpacked_destroy(&result);
486 return FLB_TRUE;
487 }
488
cb_lua_filter(const void * data,size_t bytes,const char * tag,int tag_len,void ** out_buf,size_t * out_bytes,struct flb_filter_instance * f_ins,void * filter_context,struct flb_config * config)489 static int cb_lua_filter(const void *data, size_t bytes,
490 const char *tag, int tag_len,
491 void **out_buf, size_t *out_bytes,
492 struct flb_filter_instance *f_ins,
493 void *filter_context,
494 struct flb_config *config)
495 {
496 int ret;
497 size_t off = 0;
498 (void) f_ins;
499 (void) config;
500 double ts = 0;
501 msgpack_object *p;
502 msgpack_object root;
503 msgpack_unpacked result;
504 msgpack_sbuffer tmp_sbuf;
505 msgpack_packer tmp_pck;
506 struct flb_time t_orig;
507 struct flb_time t;
508 struct lua_filter *ctx = filter_context;
509 /* Lua return values */
510 int l_code;
511 double l_timestamp;
512
513 /* Create temporary msgpack buffer */
514 msgpack_sbuffer_init(&tmp_sbuf);
515 msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
516
517 msgpack_unpacked_init(&result);
518 while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
519 msgpack_packer data_pck;
520 msgpack_sbuffer data_sbuf;
521
522 msgpack_sbuffer_init(&data_sbuf);
523 msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write);
524
525 root = result.data;
526
527 /* Get timestamp */
528 flb_time_pop_from_msgpack(&t, &result, &p);
529 t_orig = t;
530
531 /* Prepare function call, pass 3 arguments, expect 3 return values */
532 lua_getglobal(ctx->lua->state, ctx->call);
533 lua_pushstring(ctx->lua->state, tag);
534
535 /* Timestamp */
536 if (ctx->time_as_table == FLB_TRUE) {
537 lua_pushtimetable(ctx->lua->state, &t);
538 }
539 else {
540 ts = flb_time_to_double(&t);
541 lua_pushnumber(ctx->lua->state, ts);
542 }
543
544 lua_pushmsgpack(ctx->lua->state, p);
545 if (ctx->protected_mode) {
546 ret = lua_pcall(ctx->lua->state, 3, 3, 0);
547 if (ret != 0) {
548 flb_plg_error(ctx->ins, "error code %d: %s",
549 ret, lua_tostring(ctx->lua->state, -1));
550 lua_pop(ctx->lua->state, 1);
551 msgpack_sbuffer_destroy(&tmp_sbuf);
552 msgpack_sbuffer_destroy(&data_sbuf);
553 msgpack_unpacked_destroy(&result);
554 return FLB_FILTER_NOTOUCH;
555 }
556 }
557 else {
558 lua_call(ctx->lua->state, 3, 3);
559 }
560
561 /* Initialize Return values */
562 l_code = 0;
563 l_timestamp = ts;
564
565 lua_tomsgpack(ctx, &data_pck, 0);
566 lua_pop(ctx->lua->state, 1);
567
568 /* Lua table */
569 if (ctx->time_as_table == FLB_TRUE) {
570 if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) {
571 /* Retrieve seconds */
572 lua_getfield(ctx->lua->state, -1, "sec");
573 t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1);
574 lua_pop(ctx->lua->state, 1);
575
576 /* Retrieve nanoseconds */
577 lua_getfield(ctx->lua->state, -1, "nsec");
578 t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1);
579 lua_pop(ctx->lua->state, 2);
580 }
581 else {
582 flb_plg_error(ctx->ins, "invalid lua timestamp type returned");
583 t = t_orig;
584 }
585 }
586 else {
587 l_timestamp = (double) lua_tonumber(ctx->lua->state, -1);
588 lua_pop(ctx->lua->state, 1);
589 }
590
591 l_code = (int) lua_tointeger(ctx->lua->state, -1);
592 lua_pop(ctx->lua->state, 1);
593
594 if (l_code == -1) { /* Skip record */
595 msgpack_sbuffer_destroy(&data_sbuf);
596 continue;
597 }
598 else if (l_code == 0) { /* Keep record, repack */
599 msgpack_pack_object(&tmp_pck, root);
600 }
601 else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */
602 if (l_code == 1) {
603 if (ctx->time_as_table == FLB_FALSE) {
604 flb_time_from_double(&t, l_timestamp);
605 }
606 }
607 else if (l_code == 2) {
608 /* Keep the timestamp */
609 t = t_orig;
610 }
611 ret = pack_result(&t, &tmp_pck, &tmp_sbuf,
612 data_sbuf.data, data_sbuf.size);
613 if (ret == FLB_FALSE) {
614 flb_plg_error(ctx->ins, "invalid table returned at %s(), %s",
615 ctx->call, ctx->script);
616 msgpack_sbuffer_destroy(&tmp_sbuf);
617 msgpack_sbuffer_destroy(&data_sbuf);
618 msgpack_unpacked_destroy(&result);
619 return FLB_FILTER_NOTOUCH;
620 }
621 }
622 else { /* Unexpected return code, keep original content */
623 flb_plg_error(ctx->ins, "unexpected Lua script return code %i, "
624 "original record will be kept." , l_code);
625 msgpack_pack_object(&tmp_pck, root);
626 }
627 msgpack_sbuffer_destroy(&data_sbuf);
628 }
629 msgpack_unpacked_destroy(&result);
630
631 /* link new buffers */
632 *out_buf = tmp_sbuf.data;
633 *out_bytes = tmp_sbuf.size;
634
635 return FLB_FILTER_MODIFIED;
636 }
637
cb_lua_exit(void * data,struct flb_config * config)638 static int cb_lua_exit(void *data, struct flb_config *config)
639 {
640 struct lua_filter *ctx;
641
642 ctx = data;
643 flb_luajit_destroy(ctx->lua);
644 lua_config_destroy(ctx);
645
646 return 0;
647 }
648
649 static struct flb_config_map config_map[] = {
650 {
651 FLB_CONFIG_MAP_STR, "script", NULL,
652 0, FLB_FALSE, 0,
653 "The path of lua script."
654 },
655 {
656 FLB_CONFIG_MAP_STR, "call", NULL,
657 0, FLB_TRUE, offsetof(struct lua_filter, call),
658 "Lua function name that will be triggered to do filtering."
659 },
660 {
661 FLB_CONFIG_MAP_STR, "type_int_key", NULL,
662 0, FLB_FALSE, 0,
663 "If these keys are matched, the fields are converted to integer. "
664 "If more than one key, delimit by space."
665 },
666 {
667 FLB_CONFIG_MAP_STR, "type_array_key", NULL,
668 0, FLB_FALSE, 0,
669 "If these keys are matched, the fields are converted to array. "
670 "If more than one key, delimit by space."
671 },
672 {
673 FLB_CONFIG_MAP_BOOL, "protected_mode", "true",
674 0, FLB_TRUE, offsetof(struct lua_filter, protected_mode),
675 "If enabled, Lua script will be executed in protected mode. "
676 "It prevents to crash when invalid Lua script is executed."
677 },
678 {
679 FLB_CONFIG_MAP_BOOL, "time_as_table", "false",
680 0, FLB_TRUE, offsetof(struct lua_filter, time_as_table),
681 "If enabled, Fluent-bit will pass the timestamp as a Lua table "
682 "with keys \"sec\" for seconds since epoch and \"nsec\" for nanoseconds."
683 },
684 {0}
685 };
686
687 struct flb_filter_plugin filter_lua_plugin = {
688 .name = "lua",
689 .description = "Lua Scripting Filter",
690 .cb_init = cb_lua_init,
691 .cb_filter = cb_lua_filter,
692 .cb_exit = cb_lua_exit,
693 .config_map = config_map,
694 .flags = 0
695 };
696