1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_mem.h>
23 #include <fluent-bit/flb_log.h>
24 #include <fluent-bit/flb_pack.h>
25 #include <fluent-bit/multiline/flb_ml.h>
26 #include <fluent-bit/multiline/flb_ml_rule.h>
27 #include <xxhash.h>
28
ml_flush_stdout(struct flb_ml_parser * parser,struct flb_ml_stream * mst,void * data,char * buf_data,size_t buf_size)29 static int ml_flush_stdout(struct flb_ml_parser *parser,
30 struct flb_ml_stream *mst,
31 void *data, char *buf_data, size_t buf_size)
32 {
33 fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n",
34 ANSI_GREEN, ANSI_RESET);
35
36 /* Print incoming flush buffer */
37 flb_pack_print(buf_data, buf_size);
38
39 fprintf(stdout, "%s----------- EOF -----------%s\n",
40 ANSI_GREEN, ANSI_RESET);
41 return 0;
42 }
43
stream_group_create(struct flb_ml_stream * mst,char * name,int len)44 static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst,
45 char *name, int len)
46 {
47 struct flb_ml_stream_group *group;
48
49 if (!name) {
50 name = "_default";
51 }
52
53 group = flb_calloc(1, sizeof(struct flb_ml_stream_group));
54 if (!group) {
55 flb_errno();
56 return NULL;
57 }
58 group->name = flb_sds_create_len(name, len);
59 if (!group->name) {
60 flb_free(group);
61 return NULL;
62 }
63
64 /* status */
65 group->first_line = FLB_TRUE;
66
67 /* multiline buffer */
68 group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE);
69 if (!group->buf) {
70 flb_error("cannot allocate multiline stream buffer in group %s", name);
71 flb_sds_destroy(group->name);
72 flb_free(group);
73 return NULL;
74 }
75
76 /* msgpack buffer */
77 msgpack_sbuffer_init(&group->mp_sbuf);
78 msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write);
79
80 mk_list_add(&group->_head, &mst->groups);
81
82 return group;
83 }
84
flb_ml_stream_group_get(struct flb_ml_parser_ins * parser_i,struct flb_ml_stream * mst,msgpack_object * group_name)85 struct flb_ml_stream_group *flb_ml_stream_group_get(struct flb_ml_parser_ins *parser_i,
86 struct flb_ml_stream *mst,
87 msgpack_object *group_name)
88 {
89 int len;
90 char *name;
91 struct flb_ml_parser *mlp;
92 struct mk_list *head;
93 struct flb_ml_stream_group *group = NULL;
94
95 mlp = parser_i->ml_parser;
96
97 /* If key_group was not defined, we already have a default group */
98 if (!mlp->key_group || !group_name) {
99 group = mk_list_entry_first(&mst->groups,
100 struct flb_ml_stream_group,
101 _head);
102 return group;
103 }
104
105 /* Lookup for a candidate group */
106 len = group_name->via.str.size;
107 name = (char *)group_name->via.str.ptr;
108
109 mk_list_foreach(head, &mst->groups) {
110 group = mk_list_entry(head, struct flb_ml_stream_group, _head);
111 if (flb_sds_cmp(group->name, name, len) == 0) {
112 return group;
113 }
114 else {
115 group = NULL;
116 continue;
117 }
118 }
119
120 /* No group has been found, create a new one */
121 if (mk_list_size(&mst->groups) >= FLB_ML_MAX_GROUPS) {
122 flb_error("[multiline] stream %s exceeded number of allowed groups (%i)",
123 mst->name, FLB_ML_MAX_GROUPS);
124 return NULL;
125 }
126
127 group = stream_group_create(mst, name, len);
128 return group;
129 }
130
stream_group_destroy(struct flb_ml_stream_group * group)131 static void stream_group_destroy(struct flb_ml_stream_group *group)
132 {
133 if (group->name) {
134 flb_sds_destroy(group->name);
135 }
136 if (group->buf) {
137 flb_sds_destroy(group->buf);
138 }
139 msgpack_sbuffer_destroy(&group->mp_sbuf);
140 mk_list_del(&group->_head);
141 flb_free(group);
142 }
143
stream_group_destroy_all(struct flb_ml_stream * mst)144 static void stream_group_destroy_all(struct flb_ml_stream *mst)
145 {
146 struct mk_list *tmp;
147 struct mk_list *head;
148 struct flb_ml_stream_group *group;
149
150 mk_list_foreach_safe(head, tmp, &mst->groups) {
151 group = mk_list_entry(head, struct flb_ml_stream_group, _head);
152 stream_group_destroy(group);
153 }
154 }
155
stream_group_init(struct flb_ml_stream * stream)156 static int stream_group_init(struct flb_ml_stream *stream)
157 {
158 struct flb_ml_stream_group *group = NULL;
159
160 mk_list_init(&stream->groups);
161
162 /* create a default group */
163 group = stream_group_create(stream, NULL, 0);
164 if (!group) {
165 flb_error("[multiline] error initializing default group for "
166 "stream '%s'", stream->name);
167 return -1;
168 }
169
170 return 0;
171 }
172
stream_create(uint64_t id,struct flb_ml_parser_ins * parser,int (* cb_flush)(struct flb_ml_parser *,struct flb_ml_stream *,void * cb_data,char * buf_data,size_t buf_size),void * cb_data)173 static struct flb_ml_stream *stream_create(uint64_t id,
174 struct flb_ml_parser_ins *parser,
175 int (*cb_flush) (struct flb_ml_parser *,
176 struct flb_ml_stream *,
177 void *cb_data,
178 char *buf_data,
179 size_t buf_size),
180 void *cb_data)
181 {
182 int ret;
183 struct flb_ml_stream *stream;
184
185 stream = flb_calloc(1, sizeof(struct flb_ml_stream));
186 if (!stream) {
187 flb_errno();
188 return NULL;
189 }
190 stream->id = id;
191 stream->parser = parser;
192
193 /* Flush Callback and opaque data type */
194 if (cb_flush) {
195 stream->cb_flush = cb_flush;
196 }
197 else {
198 stream->cb_flush = ml_flush_stdout;
199 }
200 stream->cb_data = cb_data;
201
202 ret = stream_group_init(stream);
203 if (ret != 0) {
204 flb_free(stream);
205 return NULL;
206 }
207
208 mk_list_add(&stream->_head, &parser->streams);
209 return stream;
210 }
211
flb_ml_stream_create(struct flb_ml * ml,char * name,int name_len,int (* cb_flush)(struct flb_ml_parser *,struct flb_ml_stream *,void * cb_data,char * buf_data,size_t buf_size),void * cb_data,uint64_t * stream_id)212 int flb_ml_stream_create(struct flb_ml *ml,
213 char *name,
214 int name_len,
215 int (*cb_flush) (struct flb_ml_parser *,
216 struct flb_ml_stream *,
217 void *cb_data,
218 char *buf_data,
219 size_t buf_size),
220 void *cb_data,
221 uint64_t *stream_id)
222 {
223 uint64_t id;
224 struct mk_list *head;
225 struct mk_list *head_group;
226 struct flb_ml_stream *mst;
227 struct flb_ml_group *group;
228 struct flb_ml_parser_ins *parser;
229
230 if (!name) {
231 return -1;
232 }
233
234 if (name_len <= 0) {
235 name_len = strlen(name);
236 }
237
238 /* Set the stream id by creating a hash using the name */
239 id = XXH3_64bits(name, name_len);
240
241 /* For every group and parser, create a stream for this stream_id/hash */
242 mk_list_foreach(head, &ml->groups) {
243 group = mk_list_entry(head, struct flb_ml_group, _head);
244 mk_list_foreach(head_group, &group->parsers) {
245 parser = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
246
247 /* Check if the stream already exists on the parser */
248 if (flb_ml_stream_get(parser, id) != NULL) {
249 continue;
250 }
251
252 /* Create the stream */
253 mst = stream_create(id, parser, cb_flush, cb_data);
254 if (!mst) {
255 flb_error("[multiline] could not create stream_id=%" PRIu64
256 "for stream '%s' on parser '%s'",
257 stream_id, name, parser->ml_parser->name);
258 return -1;
259 }
260 }
261 }
262
263 *stream_id = id;
264 return 0;
265 }
266
flb_ml_stream_get(struct flb_ml_parser_ins * parser,uint64_t stream_id)267 struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser,
268 uint64_t stream_id)
269 {
270 struct mk_list *head;
271 struct flb_ml_stream *mst = NULL;
272
273 mk_list_foreach(head, &parser->streams) {
274 mst = mk_list_entry(head, struct flb_ml_stream, _head);
275 if (mst->id == stream_id) {
276 return mst;
277 }
278 }
279
280 return NULL;
281 }
282
flb_ml_stream_id_destroy_all(struct flb_ml * ml,uint64_t stream_id)283 void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id)
284 {
285 struct mk_list *tmp;
286 struct mk_list *head;
287 struct mk_list *head_group;
288 struct mk_list *head_stream;
289 struct flb_ml_group *group;
290 struct flb_ml_stream *mst;
291 struct flb_ml_parser_ins *parser_i;
292
293 /* groups */
294 mk_list_foreach(head, &ml->groups) {
295 group = mk_list_entry(head, struct flb_ml_group, _head);
296
297 /* parser instances */
298 mk_list_foreach(head_group, &group->parsers) {
299 parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
300
301 /* streams */
302 mk_list_foreach_safe(head_stream, tmp, &parser_i->streams) {
303 mst = mk_list_entry(head_stream, struct flb_ml_stream, _head);
304 if (mst->id != stream_id) {
305 continue;
306 }
307
308 /* flush any pending data */
309 flb_ml_flush_parser_instance(ml, parser_i, stream_id);
310
311 /* destroy internal groups of the stream */
312 flb_ml_stream_destroy(mst);
313 }
314 }
315 }
316 }
317
flb_ml_stream_destroy(struct flb_ml_stream * mst)318 int flb_ml_stream_destroy(struct flb_ml_stream *mst)
319 {
320 mk_list_del(&mst->_head);
321 if (mst->name) {
322 flb_sds_destroy(mst->name);
323 }
324
325 /* destroy groups */
326 stream_group_destroy_all(mst);
327
328 flb_free(mst);
329
330 return 0;
331 }
332