1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_io.h>
23 #include <fluent-bit/flb_log.h>
24 #include <fluent-bit/flb_http_client.h>
25 #include <fluent-bit/flb_pack.h>
26 #include <fluent-bit/flb_time.h>
27 #include <fluent-bit/flb_gzip.h>
28 
29 #include <msgpack.h>
30 
31 #include "datadog.h"
32 #include "datadog_conf.h"
33 #include "datadog_remap.h"
34 
cb_datadog_init(struct flb_output_instance * ins,struct flb_config * config,void * data)35 static int cb_datadog_init(struct flb_output_instance *ins,
36                            struct flb_config *config, void *data)
37 {
38     struct flb_out_datadog *ctx = NULL;
39     (void) data;
40 
41     ctx = flb_datadog_conf_create(ins, config);
42     if (!ctx) {
43         return -1;
44     }
45 
46     /* Set the plugin context */
47     flb_output_set_context(ins, ctx);
48     return 0;
49 }
50 
timestamp_format(const struct flb_time * tms)51 static int64_t timestamp_format(const struct flb_time* tms) {
52     int64_t timestamp = 0;
53 
54     /* Format the time, use milliseconds precision not nanoseconds */
55     timestamp = tms->tm.tv_sec * 1000;
56     timestamp += tms->tm.tv_nsec / 1000000;
57 
58     /* round up if necessary */
59     if (tms->tm.tv_nsec % 1000000 >= 500000) {
60         ++timestamp;
61     }
62     return timestamp;
63 }
64 
dd_msgpack_pack_key_value_str(msgpack_packer * mp_pck,const char * key,size_t key_size,const char * val,size_t val_size)65 static void dd_msgpack_pack_key_value_str(msgpack_packer* mp_pck,
66                                           const char *key, size_t key_size,
67                                           const char *val, size_t val_size)
68 {
69     msgpack_pack_str(mp_pck, key_size);
70     msgpack_pack_str_body(mp_pck, key, key_size);
71     msgpack_pack_str(mp_pck, val_size);
72     msgpack_pack_str_body(mp_pck,val, val_size);
73 }
74 
dd_compare_msgpack_obj_key_with_str(const msgpack_object obj,const char * key,size_t key_size)75 static int dd_compare_msgpack_obj_key_with_str(const msgpack_object obj, const char *key, size_t key_size) {
76 
77     if (obj.via.str.size == key_size && memcmp(obj.via.str.ptr,key, key_size) == 0) {
78         return FLB_TRUE;
79     }
80 
81     return FLB_FALSE;
82 }
83 
datadog_format(struct flb_config * config,struct flb_input_instance * ins,void * plugin_context,void * flush_ctx,const char * tag,int tag_len,const void * data,size_t bytes,void ** out_data,size_t * out_size)84 static int datadog_format(struct flb_config *config,
85                           struct flb_input_instance *ins,
86                           void *plugin_context,
87                           void *flush_ctx,
88                           const char *tag, int tag_len,
89                           const void *data, size_t bytes,
90                           void **out_data, size_t *out_size)
91 {
92     int i;
93     int ind;
94     int byte_cnt;
95     int remap_cnt;
96     /* for msgpack global structs */
97     int array_size = 0;
98     size_t off = 0;
99     msgpack_unpacked result;
100     msgpack_sbuffer mp_sbuf;
101     msgpack_packer mp_pck;
102     /* for sub msgpack objs */
103     int map_size;
104     struct flb_time tms;
105     int64_t timestamp;
106     msgpack_object *obj;
107     msgpack_object map;
108     msgpack_object root;
109     msgpack_object k;
110     msgpack_object v;
111     struct flb_out_datadog *ctx = plugin_context;
112 
113     /* output buffer */
114     flb_sds_t out_buf;
115     flb_sds_t remapped_tags = NULL;
116 
117     /* Count number of records */
118     array_size = flb_mp_count(data, bytes);
119 
120     /* Create temporary msgpack buffer */
121     msgpack_sbuffer_init(&mp_sbuf);
122     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
123 
124     /* Prepare array for all entries */
125     msgpack_pack_array(&mp_pck, array_size);
126 
127     off = 0;
128     msgpack_unpacked_init(&result);
129     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
130         root = result.data;
131 
132         /* Get timestamp and object */
133         flb_time_pop_from_msgpack(&tms, &result, &obj);
134         timestamp = timestamp_format(&tms);
135 
136         map = root.via.array.ptr[1];
137         map_size = map.via.map.size;
138 
139         /*
140          * msgpack requires knowing/allocating exact map size in advance, so we need to
141          * loop through the map twice. First time here to count how many attr we can
142          * remap to tags, and second time later where we actually perform the remapping.
143          */
144         remap_cnt = 0, byte_cnt = ctx->dd_tags ? flb_sds_len(ctx->dd_tags) : 0;
145         if (ctx->remap) {
146             for (i = 0; i < map_size; i++) {
147                 if (dd_attr_need_remapping(map.via.map.ptr[i].key,
148                                            map.via.map.ptr[i].val) >= 0) {
149                     remap_cnt++;
150                     /*
151                      * here we also *estimated* the size of buffer needed to hold the
152                      * remapped tags. We can't know the size for sure until we do the
153                      * remapping, the estimation here is just for efficiency, so that
154                      * appending tags won't cause repeated resizing/copying
155                      */
156                     byte_cnt += 2 * (map.via.map.ptr[i].key.via.str.size +
157                                      map.via.map.ptr[i].val.via.str.size);
158                 }
159             }
160 
161             if (!remapped_tags) {
162                 remapped_tags = flb_sds_create_size(byte_cnt);
163             }
164 
165             /*
166              * we reuse this buffer across messages, which means we have to clear it
167              * for each message flb_sds doesn't have a clear function, so we copy a
168              * empty string to achieve the same effect
169              */
170             remapped_tags = flb_sds_copy(remapped_tags, "", 0);
171         }
172 
173         /*
174          * build new object(map) with additional space for datadog entries for those
175          * remapped attributes, we need to remove them from the map. Note: If there were
176          * no dd_tags specified AND there will be remapped attributes, we need to add 1
177          * to account for the new presense of the dd_tags
178          */
179         if (remap_cnt && (ctx->dd_tags == NULL)) {
180             msgpack_pack_map(&mp_pck,
181                              ctx->nb_additional_entries + map_size + 1 - remap_cnt);
182         }
183         else {
184             msgpack_pack_map(&mp_pck, ctx->nb_additional_entries + map_size - remap_cnt);
185         }
186 
187         /* timestamp */
188         msgpack_pack_str(&mp_pck, flb_sds_len(ctx->json_date_key));
189         msgpack_pack_str_body(&mp_pck,
190                               ctx->json_date_key,
191                               flb_sds_len(ctx->json_date_key));
192         msgpack_pack_int64(&mp_pck, timestamp);
193 
194         /* include_tag_key */
195         if (ctx->include_tag_key == FLB_TRUE) {
196             dd_msgpack_pack_key_value_str(&mp_pck,
197                                           ctx->tag_key, flb_sds_len(ctx->tag_key),
198                                           tag, tag_len);
199         }
200 
201         /* dd_source */
202         if (ctx->dd_source != NULL) {
203             dd_msgpack_pack_key_value_str(&mp_pck,
204                                           FLB_DATADOG_DD_SOURCE_KEY,
205                                           sizeof(FLB_DATADOG_DD_SOURCE_KEY) -1,
206                                           ctx->dd_source, flb_sds_len(ctx->dd_source));
207         }
208 
209         /* dd_service */
210         if (ctx->dd_service != NULL) {
211             dd_msgpack_pack_key_value_str(&mp_pck,
212                                           FLB_DATADOG_DD_SERVICE_KEY,
213                                           sizeof(FLB_DATADOG_DD_SERVICE_KEY) -1,
214                                           ctx->dd_service, flb_sds_len(ctx->dd_service));
215         }
216 
217         /* Append initial object k/v */
218         ind = 0;
219         for (i = 0; i < map_size; i++) {
220             k = map.via.map.ptr[i].key;
221             v = map.via.map.ptr[i].val;
222 
223             /*
224              * actually perform the remapping here. For matched attr, we remap and
225              * append them to remapped_tags buffer, then skip the rest of processing
226              * (so they won't be packed as attr)
227              */
228             if (ctx->remap && (ind = dd_attr_need_remapping(k, v)) >=0 ) {
229                 remapping[ind].remap_to_tag(remapping[ind].remap_tag_name, v,
230                                             remapped_tags);
231                 continue;
232             }
233 
234             /* Mapping between input keys to specific datadog keys */
235             if (ctx->dd_message_key != NULL &&
236                 dd_compare_msgpack_obj_key_with_str(k, ctx->dd_message_key,
237                                                     flb_sds_len(ctx->dd_message_key)) == FLB_TRUE) {
238                 msgpack_pack_str(&mp_pck, sizeof(FLB_DATADOG_DD_MESSAGE_KEY)-1);
239                 msgpack_pack_str_body(&mp_pck, FLB_DATADOG_DD_MESSAGE_KEY,
240                                       sizeof(FLB_DATADOG_DD_MESSAGE_KEY)-1);
241             }
242             else {
243                 msgpack_pack_object(&mp_pck, k);
244             }
245 
246             msgpack_pack_object(&mp_pck, v);
247         }
248 
249         /* here we concatenate ctx->dd_tags and remapped_tags, depending on their presence */
250         if (remap_cnt) {
251             if (ctx->dd_tags != NULL) {
252                 flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR,
253                             strlen(FLB_DATADOG_TAG_SEPERATOR));
254                 flb_sds_cat(remapped_tags, ctx->dd_tags, strlen(ctx->dd_tags));
255             }
256             dd_msgpack_pack_key_value_str(&mp_pck,
257                                           FLB_DATADOG_DD_TAGS_KEY,
258                                           sizeof(FLB_DATADOG_DD_TAGS_KEY) -1,
259                                           remapped_tags, flb_sds_len(remapped_tags));
260         }
261         else if (ctx->dd_tags != NULL) {
262             dd_msgpack_pack_key_value_str(&mp_pck,
263                                           FLB_DATADOG_DD_TAGS_KEY,
264                                           sizeof(FLB_DATADOG_DD_TAGS_KEY) -1,
265                                           ctx->dd_tags, flb_sds_len(ctx->dd_tags));
266         }
267     }
268 
269     /* Convert from msgpack to JSON */
270     out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
271     msgpack_sbuffer_destroy(&mp_sbuf);
272 
273     if (!out_buf) {
274         flb_plg_error(ctx->ins, "error formatting JSON payload");
275         if (remapped_tags) {
276             flb_sds_destroy(remapped_tags);
277         }
278         msgpack_unpacked_destroy(&result);
279         return -1;
280     }
281 
282     *out_data = out_buf;
283     *out_size = flb_sds_len(out_buf);
284 
285     /* Cleanup */
286     msgpack_unpacked_destroy(&result);
287     if (remapped_tags) {
288         flb_sds_destroy(remapped_tags);
289     }
290 
291     return 0;
292 }
293 
cb_datadog_flush(const void * data,size_t bytes,const char * tag,int tag_len,struct flb_input_instance * i_ins,void * out_context,struct flb_config * config)294 static void cb_datadog_flush(const void *data, size_t bytes,
295                              const char *tag, int tag_len,
296                              struct flb_input_instance *i_ins,
297                              void *out_context,
298                              struct flb_config *config)
299 {
300     struct flb_out_datadog *ctx = out_context;
301     struct flb_upstream_conn *upstream_conn;
302     struct flb_http_client *client;
303     void *out_buf;
304     size_t out_size;
305     flb_sds_t payload_buf;
306     size_t payload_size = 0;
307     void *final_payload_buf = NULL;
308     size_t final_payload_size = 0;
309     size_t b_sent;
310     int ret = FLB_ERROR;
311     int compressed = FLB_FALSE;
312 
313     /* Get upstream connection */
314     upstream_conn = flb_upstream_conn_get(ctx->upstream);
315     if (!upstream_conn) {
316         FLB_OUTPUT_RETURN(FLB_RETRY);
317     }
318 
319     /* Convert input data into a Datadog JSON payload */
320     ret = datadog_format(config, i_ins,
321                          ctx, NULL,
322                          tag, tag_len,
323                          data, bytes,
324                          &out_buf, &out_size);
325     if (ret == -1) {
326         flb_upstream_conn_release(upstream_conn);
327         FLB_OUTPUT_RETURN(FLB_ERROR);
328     }
329 
330     payload_buf = (flb_sds_t) out_buf;
331     payload_size = out_size;
332 
333     /* Should we compress the payload ? */
334     if (ctx->compress_gzip == FLB_TRUE) {
335         ret = flb_gzip_compress((void *) payload_buf, payload_size,
336                                 &final_payload_buf, &final_payload_size);
337         if (ret == -1) {
338             flb_error("[out_http] cannot gzip payload, disabling compression");
339         } else {
340             compressed = FLB_TRUE;
341         }
342     } else {
343         final_payload_buf = payload_buf;
344         final_payload_size = payload_size;
345     }
346 
347     /* Create HTTP client context */
348     client = flb_http_client(upstream_conn, FLB_HTTP_POST, ctx->uri,
349                              final_payload_buf, final_payload_size,
350                              ctx->host, ctx->port,
351                              ctx->proxy, 0);
352     if (!client) {
353         flb_upstream_conn_release(upstream_conn);
354         FLB_OUTPUT_RETURN(FLB_ERROR);
355     }
356 
357     flb_http_add_header(client, "User-Agent", 10, "Fluent-Bit", 10);
358     flb_http_add_header(client,
359                         FLB_DATADOG_CONTENT_TYPE, sizeof(FLB_DATADOG_CONTENT_TYPE) - 1,
360                         FLB_DATADOG_MIME_JSON, sizeof(FLB_DATADOG_MIME_JSON) - 1);
361 
362     /* Content Encoding: gzip */
363     if (compressed == FLB_TRUE) {
364         flb_http_set_content_encoding_gzip(client);
365     }
366     /* TODO: Append other headers if needed*/
367 
368     /* finaly send the query */
369     ret = flb_http_do(client, &b_sent);
370     if (ret == 0) {
371         if (client->resp.status < 200 || client->resp.status > 205) {
372             flb_plg_error(ctx->ins, "%s%s:%i HTTP status=%i",
373                           ctx->scheme, ctx->host, ctx->port,
374                           client->resp.status);
375             ret = FLB_RETRY;
376         }
377         else {
378             if (client->resp.payload) {
379                 flb_plg_info(ctx->ins, "%s%s, port=%i, HTTP status=%i payload=%s",
380                              ctx->scheme, ctx->host, ctx->port,
381                              client->resp.status, client->resp.payload);
382             }
383             else {
384                 flb_plg_info(ctx->ins, "%s%s, port=%i, HTTP status=%i",
385                              ctx->scheme, ctx->host, ctx->port,
386                              client->resp.status);
387             }
388             ret = FLB_OK;
389         }
390     }
391     else {
392         flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)",
393                       ctx->host, ctx->port, ret);
394         ret = FLB_RETRY;
395     }
396 
397     /*
398      * If the final_payload_buf buffer is different than payload_buf, means
399      * we generated a different payload and must be freed.
400      */
401     if (final_payload_buf != payload_buf) {
402         flb_free(final_payload_buf);
403     }
404     /* Destroy HTTP client context */
405     flb_sds_destroy(payload_buf);
406     flb_http_client_destroy(client);
407     flb_upstream_conn_release(upstream_conn);
408 
409     FLB_OUTPUT_RETURN(ret);
410 }
411 
412 
cb_datadog_exit(void * data,struct flb_config * config)413 static int cb_datadog_exit(void *data, struct flb_config *config)
414 {
415     struct flb_out_datadog *ctx = data;
416 
417     if (!ctx) {
418         return 0;
419     }
420 
421     flb_datadog_conf_destroy(ctx);
422     return 0;
423 }
424 
425 struct flb_output_plugin out_datadog_plugin = {
426     .name         = "datadog",
427     .description  = "Send events to DataDog HTTP Event Collector",
428     .cb_init      = cb_datadog_init,
429     .cb_flush     = cb_datadog_flush,
430     .cb_exit      = cb_datadog_exit,
431 
432     /* Test */
433     .test_formatter.callback = datadog_format,
434 
435     /* Plugin flags */
436     .flags        = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
437 };
438