1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_mem.h>
23 #include <fluent-bit/flb_log.h>
24 #include <fluent-bit/flb_pack.h>
25 #include <fluent-bit/multiline/flb_ml.h>
26 #include <fluent-bit/multiline/flb_ml_rule.h>
27 #include <xxhash.h>
28 
ml_flush_stdout(struct flb_ml_parser * parser,struct flb_ml_stream * mst,void * data,char * buf_data,size_t buf_size)29 static int ml_flush_stdout(struct flb_ml_parser *parser,
30                            struct flb_ml_stream *mst,
31                            void *data, char *buf_data, size_t buf_size)
32 {
33     fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n",
34             ANSI_GREEN, ANSI_RESET);
35 
36     /* Print incoming flush buffer */
37     flb_pack_print(buf_data, buf_size);
38 
39     fprintf(stdout, "%s----------- EOF -----------%s\n",
40             ANSI_GREEN, ANSI_RESET);
41     return 0;
42 }
43 
stream_group_create(struct flb_ml_stream * mst,char * name,int len)44 static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst,
45                                                        char *name, int len)
46 {
47     struct flb_ml_stream_group *group;
48 
49     if (!name) {
50         name = "_default";
51     }
52 
53     group = flb_calloc(1, sizeof(struct flb_ml_stream_group));
54     if (!group) {
55         flb_errno();
56         return NULL;
57     }
58     group->name = flb_sds_create_len(name, len);
59     if (!group->name) {
60         flb_free(group);
61         return NULL;
62     }
63 
64     /* status */
65     group->first_line = FLB_TRUE;
66 
67     /* multiline buffer */
68     group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE);
69     if (!group->buf) {
70         flb_error("cannot allocate multiline stream buffer in group %s", name);
71         flb_sds_destroy(group->name);
72         flb_free(group);
73         return NULL;
74     }
75 
76     /* msgpack buffer */
77     msgpack_sbuffer_init(&group->mp_sbuf);
78     msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write);
79 
80     mk_list_add(&group->_head, &mst->groups);
81 
82     return group;
83 }
84 
flb_ml_stream_group_get(struct flb_ml_parser_ins * parser_i,struct flb_ml_stream * mst,msgpack_object * group_name)85 struct flb_ml_stream_group *flb_ml_stream_group_get(struct flb_ml_parser_ins *parser_i,
86                                                     struct flb_ml_stream *mst,
87                                                     msgpack_object *group_name)
88 {
89     int len;
90     char *name;
91     struct flb_ml_parser *mlp;
92     struct mk_list *head;
93     struct flb_ml_stream_group *group = NULL;
94 
95     mlp = parser_i->ml_parser;
96 
97     /* If key_group was not defined, we already have a default group */
98     if (!mlp->key_group || !group_name) {
99         group = mk_list_entry_first(&mst->groups,
100                                     struct flb_ml_stream_group,
101                                     _head);
102         return group;
103     }
104 
105     /* Lookup for a candidate group */
106     len = group_name->via.str.size;
107     name = (char *)group_name->via.str.ptr;
108 
109     mk_list_foreach(head, &mst->groups) {
110         group = mk_list_entry(head, struct flb_ml_stream_group, _head);
111         if (flb_sds_cmp(group->name, name, len) == 0) {
112             return group;
113         }
114         else {
115             group = NULL;
116             continue;
117         }
118     }
119 
120     /* No group has been found, create a new one */
121     if (mk_list_size(&mst->groups) >= FLB_ML_MAX_GROUPS) {
122         flb_error("[multiline] stream %s exceeded number of allowed groups (%i)",
123                   mst->name, FLB_ML_MAX_GROUPS);
124         return NULL;
125     }
126 
127     group = stream_group_create(mst, name, len);
128     return group;
129 }
130 
stream_group_destroy(struct flb_ml_stream_group * group)131 static void stream_group_destroy(struct flb_ml_stream_group *group)
132 {
133     if (group->name) {
134         flb_sds_destroy(group->name);
135     }
136     if (group->buf) {
137         flb_sds_destroy(group->buf);
138     }
139     msgpack_sbuffer_destroy(&group->mp_sbuf);
140     mk_list_del(&group->_head);
141     flb_free(group);
142 }
143 
stream_group_destroy_all(struct flb_ml_stream * mst)144 static void stream_group_destroy_all(struct flb_ml_stream *mst)
145 {
146     struct mk_list *tmp;
147     struct mk_list *head;
148     struct flb_ml_stream_group *group;
149 
150     mk_list_foreach_safe(head, tmp, &mst->groups) {
151         group = mk_list_entry(head, struct flb_ml_stream_group, _head);
152         stream_group_destroy(group);
153     }
154 }
155 
stream_group_init(struct flb_ml_stream * stream)156 static int stream_group_init(struct flb_ml_stream *stream)
157 {
158     struct flb_ml_stream_group *group = NULL;
159 
160     mk_list_init(&stream->groups);
161 
162     /* create a default group */
163     group = stream_group_create(stream, NULL, 0);
164     if (!group) {
165         flb_error("[multiline] error initializing default group for "
166                   "stream '%s'", stream->name);
167         return -1;
168     }
169 
170     return 0;
171 }
172 
stream_create(uint64_t id,struct flb_ml_parser_ins * parser,int (* cb_flush)(struct flb_ml_parser *,struct flb_ml_stream *,void * cb_data,char * buf_data,size_t buf_size),void * cb_data)173 static struct flb_ml_stream *stream_create(uint64_t id,
174                                            struct flb_ml_parser_ins *parser,
175                                            int (*cb_flush) (struct flb_ml_parser *,
176                                                             struct flb_ml_stream *,
177                                                             void *cb_data,
178                                                             char *buf_data,
179                                                             size_t buf_size),
180                                            void *cb_data)
181 {
182     int ret;
183     struct flb_ml_stream *stream;
184 
185     stream = flb_calloc(1, sizeof(struct flb_ml_stream));
186     if (!stream) {
187         flb_errno();
188         return NULL;
189     }
190     stream->id = id;
191     stream->parser = parser;
192 
193     /* Flush Callback and opaque data type */
194     if (cb_flush) {
195         stream->cb_flush = cb_flush;
196     }
197     else {
198         stream->cb_flush = ml_flush_stdout;
199     }
200     stream->cb_data = cb_data;
201 
202     ret = stream_group_init(stream);
203     if (ret != 0) {
204         flb_free(stream);
205         return NULL;
206     }
207 
208     mk_list_add(&stream->_head, &parser->streams);
209     return stream;
210 }
211 
flb_ml_stream_create(struct flb_ml * ml,char * name,int name_len,int (* cb_flush)(struct flb_ml_parser *,struct flb_ml_stream *,void * cb_data,char * buf_data,size_t buf_size),void * cb_data,uint64_t * stream_id)212 int flb_ml_stream_create(struct flb_ml *ml,
213                          char *name,
214                          int name_len,
215                          int (*cb_flush) (struct flb_ml_parser *,
216                                           struct flb_ml_stream *,
217                                           void *cb_data,
218                                           char *buf_data,
219                                           size_t buf_size),
220                          void *cb_data,
221                          uint64_t *stream_id)
222 {
223     uint64_t id;
224     struct mk_list *head;
225     struct mk_list *head_group;
226     struct flb_ml_stream *mst;
227     struct flb_ml_group *group;
228     struct flb_ml_parser_ins *parser;
229 
230     if (!name) {
231         return -1;
232     }
233 
234     if (name_len <= 0) {
235         name_len = strlen(name);
236     }
237 
238     /* Set the stream id by creating a hash using the name */
239     id = XXH3_64bits(name, name_len);
240 
241     /* For every group and parser, create a stream for this stream_id/hash */
242     mk_list_foreach(head, &ml->groups) {
243         group = mk_list_entry(head, struct flb_ml_group, _head);
244         mk_list_foreach(head_group, &group->parsers) {
245             parser = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
246 
247             /* Check if the stream already exists on the parser */
248             if (flb_ml_stream_get(parser, id) != NULL) {
249                 continue;
250             }
251 
252             /* Create the stream */
253             mst = stream_create(id, parser, cb_flush, cb_data);
254             if (!mst) {
255                 flb_error("[multiline] could not create stream_id=%" PRIu64
256                           "for stream '%s' on parser '%s'",
257                           stream_id, name, parser->ml_parser->name);
258                 return -1;
259             }
260         }
261     }
262 
263     *stream_id = id;
264     return 0;
265 }
266 
flb_ml_stream_get(struct flb_ml_parser_ins * parser,uint64_t stream_id)267 struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser,
268                                         uint64_t stream_id)
269 {
270     struct mk_list *head;
271     struct flb_ml_stream *mst = NULL;
272 
273     mk_list_foreach(head, &parser->streams) {
274         mst = mk_list_entry(head, struct flb_ml_stream, _head);
275         if (mst->id == stream_id) {
276             return mst;
277         }
278     }
279 
280     return NULL;
281 }
282 
flb_ml_stream_id_destroy_all(struct flb_ml * ml,uint64_t stream_id)283 void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id)
284 {
285     struct mk_list *tmp;
286     struct mk_list *head;
287     struct mk_list *head_group;
288     struct mk_list *head_stream;
289     struct flb_ml_group *group;
290     struct flb_ml_stream *mst;
291     struct flb_ml_parser_ins *parser_i;
292 
293     /* groups */
294     mk_list_foreach(head, &ml->groups) {
295         group = mk_list_entry(head, struct flb_ml_group, _head);
296 
297         /* parser instances */
298         mk_list_foreach(head_group, &group->parsers) {
299             parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
300 
301             /* streams */
302             mk_list_foreach_safe(head_stream, tmp, &parser_i->streams) {
303                 mst = mk_list_entry(head_stream, struct flb_ml_stream, _head);
304                 if (mst->id != stream_id) {
305                     continue;
306                 }
307 
308                 /* flush any pending data */
309                 flb_ml_flush_parser_instance(ml, parser_i, stream_id);
310 
311                 /* destroy internal groups of the stream */
312                 flb_ml_stream_destroy(mst);
313             }
314         }
315     }
316 }
317 
flb_ml_stream_destroy(struct flb_ml_stream * mst)318 int flb_ml_stream_destroy(struct flb_ml_stream *mst)
319 {
320     mk_list_del(&mst->_head);
321     if (mst->name) {
322         flb_sds_destroy(mst->name);
323     }
324 
325     /* destroy groups */
326     stream_group_destroy_all(mst);
327 
328     flb_free(mst);
329 
330     return 0;
331 }
332