1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_sds.h>
23 #include <fluent-bit/flb_kv.h>
24 #include <fluent-bit/flb_utils.h>
25 #include <fluent-bit/flb_pack.h>
26 #include <fluent-bit/flb_config_map.h>
27 #include <fluent-bit/flb_gzip.h>
28 #include <mbedtls/sha256.h>
29 #include <mbedtls/base64.h>
30 
31 #include <msgpack.h>
32 
33 #include "azure_blob.h"
34 #include "azure_blob_uri.h"
35 #include "azure_blob_conf.h"
36 #include "azure_blob_appendblob.h"
37 #include "azure_blob_blockblob.h"
38 #include "azure_blob_http.h"
39 
40 #define CREATE_BLOB  1337
41 
azure_blob_format(struct flb_config * config,struct flb_input_instance * ins,void * plugin_context,void * flush_ctx,const char * tag,int tag_len,const void * data,size_t bytes,void ** out_data,size_t * out_size)42 static int azure_blob_format(struct flb_config *config,
43                              struct flb_input_instance *ins,
44                              void *plugin_context,
45                              void *flush_ctx,
46                              const char *tag, int tag_len,
47                              const void *data, size_t bytes,
48                              void **out_data, size_t *out_size)
49 {
50     flb_sds_t out_buf;
51     struct flb_azure_blob *ctx = plugin_context;
52 
53     out_buf = flb_pack_msgpack_to_json_format(data, bytes,
54                                               FLB_PACK_JSON_FORMAT_LINES,
55                                               FLB_PACK_JSON_DATE_ISO8601,
56                                               ctx->date_key);
57     if (!out_buf) {
58         return -1;
59     }
60 
61     *out_data = out_buf;
62     *out_size = flb_sds_len(out_buf);
63     return 0;
64 }
65 
send_blob(struct flb_config * config,struct flb_input_instance * i_ins,struct flb_azure_blob * ctx,char * name,char * tag,int tag_len,void * data,size_t bytes)66 static int send_blob(struct flb_config *config,
67                      struct flb_input_instance *i_ins,
68                      struct flb_azure_blob *ctx, char *name,
69                      char *tag, int tag_len, void *data, size_t bytes)
70 {
71     int ret;
72     int compressed = FLB_FALSE;
73     int content_encoding = FLB_FALSE;
74     int content_type = FLB_FALSE;
75     uint64_t ms = 0;
76     size_t b_sent;
77     void *out_buf;
78     size_t out_size;
79     flb_sds_t uri = NULL;
80     flb_sds_t blockid = NULL;
81     void *payload_buf;
82     size_t payload_size;
83     struct flb_http_client *c;
84     struct flb_upstream_conn *u_conn;
85 
86     if (ctx->btype == AZURE_BLOB_APPENDBLOB) {
87         uri = azb_append_blob_uri(ctx, tag);
88     }
89     else if (ctx->btype == AZURE_BLOB_BLOCKBLOB) {
90         blockid = azb_block_blob_id(&ms);
91         if (!blockid) {
92             flb_plg_error(ctx->ins, "could not generate block id");
93             return FLB_RETRY;
94         }
95         uri = azb_block_blob_uri(ctx, tag, blockid, ms);
96     }
97 
98     if (!uri) {
99         flb_free(blockid);
100         return FLB_RETRY;
101     }
102 
103     /* Get upstream connection */
104     u_conn = flb_upstream_conn_get(ctx->u);
105     if (!u_conn) {
106         flb_plg_error(ctx->ins,
107                       "cannot create upstream connection for append_blob");
108         flb_sds_destroy(uri);
109         flb_free(blockid);
110         return FLB_RETRY;
111     }
112 
113     /* Format the data */
114     ret = azure_blob_format(config, i_ins,
115                             ctx, NULL,
116                             tag, tag_len,
117                             data, bytes,
118                             &out_buf, &out_size);
119     if (ret != 0) {
120         flb_upstream_conn_release(u_conn);
121         flb_sds_destroy(uri);
122         flb_free(blockid);
123         return FLB_RETRY;
124     }
125 
126     /* Map buffer */
127     payload_buf = out_buf;
128     payload_size = out_size;
129 
130     if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) {
131         ret = flb_gzip_compress((void *) out_buf, out_size,
132                                 &payload_buf, &payload_size);
133         if (ret == -1) {
134             flb_plg_error(ctx->ins,
135                           "cannot gzip payload, disabling compression");
136         }
137         else {
138             compressed = FLB_TRUE;
139             /* JSON buffer is not longer needed */
140             flb_sds_destroy(out_buf);
141         }
142     }
143 
144     if (ctx->compress_blob == FLB_TRUE) {
145         content_encoding = AZURE_BLOB_CE_NONE;
146         content_type = AZURE_BLOB_CT_GZIP;
147     }
148     else if (compressed == FLB_TRUE) {
149         content_encoding = AZURE_BLOB_CE_GZIP;
150         content_type = AZURE_BLOB_CT_JSON;
151     }
152 
153     /* Create HTTP client context */
154     c = flb_http_client(u_conn, FLB_HTTP_PUT,
155                         uri,
156                         payload_buf, payload_size, NULL, 0, NULL, 0);
157     if (!c) {
158         flb_plg_error(ctx->ins, "cannot create HTTP client context");
159         flb_sds_destroy(out_buf);
160         flb_upstream_conn_release(u_conn);
161         flb_free(blockid);
162         return FLB_RETRY;
163     }
164 
165     /* Prepare headers and authentication */
166     azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE,
167                           content_type, content_encoding);
168 
169     /* Send HTTP request */
170     ret = flb_http_do(c, &b_sent);
171     flb_sds_destroy(uri);
172 
173     /* Release */
174     if (compressed == FLB_FALSE) {
175         flb_sds_destroy(out_buf);
176     }
177     else {
178         flb_free(payload_buf);
179     }
180 
181     flb_upstream_conn_release(u_conn);
182 
183     /* Validate HTTP status */
184     if (ret == -1) {
185         flb_plg_error(ctx->ins, "error sending append_blob");
186         flb_free(blockid);
187         return FLB_RETRY;
188     }
189 
190     if (c->resp.status == 201) {
191         flb_plg_info(ctx->ins, "content appended to blob successfully");
192         flb_http_client_destroy(c);
193 
194         if (ctx->btype == AZURE_BLOB_BLOCKBLOB) {
195             ret = azb_block_blob_commit(ctx, blockid, tag, ms);
196             flb_free(blockid);
197             return ret;
198         }
199         flb_free(blockid);
200         return FLB_OK;
201     }
202     else if (c->resp.status == 404) {
203         flb_plg_info(ctx->ins, "blob not found: %s", c->uri);
204         flb_http_client_destroy(c);
205         return CREATE_BLOB;
206     }
207     else if (c->resp.payload_size > 0) {
208         flb_plg_error(ctx->ins, "cannot append content to blob\n%s",
209                       c->resp.payload);
210         if (strstr(c->resp.payload, "must be 0 for Create Append")) {
211             flb_http_client_destroy(c);
212             return CREATE_BLOB;
213         }
214     }
215     else {
216         flb_plg_error(ctx->ins, "cannot append content to blob");
217     }
218     flb_http_client_destroy(c);
219 
220     return FLB_RETRY;
221 }
222 
create_blob(struct flb_azure_blob * ctx,char * name)223 static int create_blob(struct flb_azure_blob *ctx, char *name)
224 {
225     int ret;
226     size_t b_sent;
227     flb_sds_t uri = NULL;
228     struct flb_http_client *c;
229     struct flb_upstream_conn *u_conn;
230 
231     uri = azb_uri_create_blob(ctx, name);
232     if (!uri) {
233         return FLB_RETRY;
234     }
235 
236     /* Get upstream connection */
237     u_conn = flb_upstream_conn_get(ctx->u);
238     if (!u_conn) {
239         flb_plg_error(ctx->ins,
240                       "cannot create upstream connection for create_append_blob");
241         flb_sds_destroy(uri);
242         return FLB_RETRY;
243     }
244 
245     /* Create HTTP client context */
246     c = flb_http_client(u_conn, FLB_HTTP_PUT,
247                         uri,
248                         NULL, 0, NULL, 0, NULL, 0);
249     if (!c) {
250         flb_plg_error(ctx->ins, "cannot create HTTP client context");
251         flb_upstream_conn_release(u_conn);
252         flb_sds_destroy(uri);
253         return FLB_RETRY;
254     }
255 
256     /* Prepare headers and authentication */
257     azb_http_client_setup(ctx, c, -1, FLB_TRUE,
258                           AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
259 
260     /* Send HTTP request */
261     ret = flb_http_do(c, &b_sent);
262     flb_sds_destroy(uri);
263 
264     if (ret == -1) {
265         flb_plg_error(ctx->ins, "error sending append_blob");
266         flb_http_client_destroy(c);
267         flb_upstream_conn_release(u_conn);
268         return FLB_RETRY;
269     }
270 
271     if (c->resp.status == 201) {
272         flb_plg_info(ctx->ins, "blob created successfully: %s", c->uri);
273     }
274     else {
275         if (c->resp.payload_size > 0) {
276             flb_plg_error(ctx->ins, "http_status=%i cannot create append blob\n%s",
277                           c->resp.status, c->resp.payload);
278         }
279         else {
280             flb_plg_error(ctx->ins, "http_status=%i cannot create append blob",
281                           c->resp.status);
282         }
283         flb_http_client_destroy(c);
284         flb_upstream_conn_release(u_conn);
285         return FLB_RETRY;
286     }
287 
288     flb_http_client_destroy(c);
289     flb_upstream_conn_release(u_conn);
290     return FLB_OK;
291 }
292 
create_container(struct flb_azure_blob * ctx,char * name)293 static int create_container(struct flb_azure_blob *ctx, char *name)
294 {
295     int ret;
296     size_t b_sent;
297     flb_sds_t uri;
298     struct flb_http_client *c;
299     struct flb_upstream_conn *u_conn;
300 
301     /* Get upstream connection */
302     u_conn = flb_upstream_conn_get(ctx->u);
303     if (!u_conn) {
304         flb_plg_error(ctx->ins,
305                       "cannot create upstream connection for container creation");
306         return FLB_FALSE;
307     }
308 
309     /* URI */
310     uri = azb_uri_ensure_or_create_container(ctx);
311     if (!uri) {
312         flb_upstream_conn_release(u_conn);
313         return FLB_FALSE;
314     }
315 
316     /* Create HTTP client context */
317     c = flb_http_client(u_conn, FLB_HTTP_PUT,
318                         uri,
319                         NULL, 0, NULL, 0, NULL, 0);
320     if (!c) {
321         flb_plg_error(ctx->ins, "cannot create HTTP client context");
322         flb_upstream_conn_release(u_conn);
323         return FLB_FALSE;
324     }
325 
326     /* Prepare headers and authentication */
327     azb_http_client_setup(ctx, c, -1, FLB_FALSE,
328                           AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
329 
330     /* Send HTTP request */
331     ret = flb_http_do(c, &b_sent);
332 
333     /* Release URI */
334     flb_sds_destroy(uri);
335 
336     /* Validate http response */
337     if (ret == -1) {
338         flb_plg_error(ctx->ins, "error requesting container creation");
339         flb_http_client_destroy(c);
340         flb_upstream_conn_release(u_conn);
341         return FLB_FALSE;
342     }
343 
344     if (c->resp.status == 201) {
345         flb_plg_info(ctx->ins, "container '%s' created sucessfully", name);
346     }
347     else {
348         if (c->resp.payload_size > 0) {
349             flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
350                           name, c->resp.payload);
351         }
352         else {
353             flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
354                           name, c->resp.payload);
355         }
356         flb_http_client_destroy(c);
357         flb_upstream_conn_release(u_conn);
358         return FLB_FALSE;
359     }
360 
361     flb_http_client_destroy(c);
362     flb_upstream_conn_release(u_conn);
363     return FLB_TRUE;
364 }
365 
366 /*
367  * Check that the container exists, if it doesn't and the configuration property
368  * auto_create_container is enabled, it will send a request to create it. If it
369  * could not be created or auto_create_container is disabled, it returns FLB_FALSE.
370  */
ensure_container(struct flb_azure_blob * ctx)371 static int ensure_container(struct flb_azure_blob *ctx)
372 {
373     int ret;
374     int status;
375     size_t b_sent;
376     flb_sds_t uri;
377     struct flb_http_client *c;
378     struct flb_upstream_conn *u_conn;
379 
380     uri = azb_uri_ensure_or_create_container(ctx);
381     if (!uri) {
382         return FLB_FALSE;
383     }
384 
385     /* Get upstream connection */
386     u_conn = flb_upstream_conn_get(ctx->u);
387     if (!u_conn) {
388         flb_plg_error(ctx->ins,
389                       "cannot create upstream connection for container check");
390         flb_sds_destroy(uri);
391         return FLB_FALSE;
392     }
393 
394     /* Create HTTP client context */
395     c = flb_http_client(u_conn, FLB_HTTP_GET,
396                         uri,
397                         NULL, 0, NULL, 0, NULL, 0);
398     if (!c) {
399         flb_plg_error(ctx->ins, "cannot create HTTP client context");
400         flb_upstream_conn_release(u_conn);
401         return FLB_FALSE;
402     }
403     flb_http_strip_port_from_host(c);
404 
405     /* Prepare headers and authentication */
406     azb_http_client_setup(ctx, c, -1, FLB_FALSE,
407                           AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
408 
409     /* Send HTTP request */
410     ret = flb_http_do(c, &b_sent);
411     flb_sds_destroy(uri);
412 
413     if (ret == -1) {
414         flb_plg_error(ctx->ins, "error requesting container properties");
415         flb_upstream_conn_release(u_conn);
416         return FLB_FALSE;
417     }
418 
419     status = c->resp.status;
420     flb_http_client_destroy(c);
421 
422     /* Release connection */
423     flb_upstream_conn_release(u_conn);
424 
425     /* Request was successful, validate HTTP status code */
426     if (status == 404) {
427         /* The container was not found, try to create it */
428         flb_plg_info(ctx->ins, "container '%s' not found, trying to create it",
429                      ctx->container_name);
430         ret = create_container(ctx, ctx->container_name);
431         return ret;
432     }
433     else if (status == 200) {
434         return FLB_TRUE;
435     }
436 
437     return FLB_FALSE;
438 }
439 
cb_azure_blob_init(struct flb_output_instance * ins,struct flb_config * config,void * data)440 static int cb_azure_blob_init(struct flb_output_instance *ins,
441                               struct flb_config *config, void *data)
442 {
443     struct flb_azure_blob *ctx = NULL;
444     (void) ins;
445     (void) config;
446     (void) data;
447 
448     ctx = flb_azure_blob_conf_create(ins, config);
449     if (!ctx) {
450         return -1;
451     }
452 
453     flb_output_set_http_debug_callbacks(ins);
454     return 0;
455 }
456 
cb_azure_blob_flush(const void * data,size_t bytes,const char * tag,int tag_len,struct flb_input_instance * i_ins,void * out_context,struct flb_config * config)457 static void cb_azure_blob_flush(const void *data, size_t bytes,
458                                 const char *tag, int tag_len,
459                                 struct flb_input_instance *i_ins,
460                                 void *out_context,
461                                 struct flb_config *config)
462 {
463     int ret;
464     struct flb_azure_blob *ctx = out_context;
465     (void) i_ins;
466     (void) config;
467 
468     /* Validate the container exists, otherwise just create it */
469     ret = ensure_container(ctx);
470     if (ret == FLB_FALSE) {
471         FLB_OUTPUT_RETURN(FLB_RETRY);
472     }
473 
474     ret = send_blob(config, i_ins, ctx, (char *) tag,
475                     (char *) tag, tag_len, (char *) data, bytes);
476     if (ret == CREATE_BLOB) {
477         ret = create_blob(ctx, (char *) tag);
478         if (ret == FLB_OK) {
479             ret = send_blob(config, i_ins, ctx, (char *) tag,
480                             (char *) tag, tag_len, (char *) data, bytes);
481         }
482     }
483 
484     /* FLB_RETRY, FLB_OK, FLB_ERROR */
485     FLB_OUTPUT_RETURN(ret);
486 }
487 
cb_azure_blob_exit(void * data,struct flb_config * config)488 static int cb_azure_blob_exit(void *data, struct flb_config *config)
489 {
490     struct flb_azure_blob *ctx = data;
491 
492     if (!ctx) {
493         return 0;
494     }
495 
496     flb_azure_blob_conf_destroy(ctx);
497     return 0;
498 }
499 
500 /* Configuration properties map */
501 static struct flb_config_map config_map[] = {
502     {
503      FLB_CONFIG_MAP_STR, "account_name", NULL,
504      0, FLB_TRUE, offsetof(struct flb_azure_blob, account_name),
505      "Azure account name (mandatory)"
506     },
507 
508     {
509      FLB_CONFIG_MAP_STR, "container_name", NULL,
510      0, FLB_TRUE, offsetof(struct flb_azure_blob, container_name),
511      "Container name (mandatory)"
512     },
513 
514     {
515      FLB_CONFIG_MAP_BOOL, "auto_create_container", "true",
516      0, FLB_TRUE, offsetof(struct flb_azure_blob, auto_create_container),
517      "Auto create container if it don't exists"
518     },
519 
520     {
521      FLB_CONFIG_MAP_STR, "blob_type", "appendblob",
522      0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_type),
523      "Set the block type: appendblob or blockblob"
524     },
525 
526     {
527      FLB_CONFIG_MAP_STR, "compress", NULL,
528      0, FLB_FALSE, 0,
529      "Set payload compression in network transfer. Option available is 'gzip'"
530     },
531 
532     {
533      FLB_CONFIG_MAP_BOOL, "compress_blob", "false",
534      0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob),
535      "Enable block blob GZIP compression in the final blob file. This option is "
536      "not compatible with 'appendblob' block type"
537     },
538 
539     {
540      FLB_CONFIG_MAP_BOOL, "emulator_mode", "false",
541      0, FLB_TRUE, offsetof(struct flb_azure_blob, emulator_mode),
542      "Use emulator mode, enable it if you want to use Azurite"
543     },
544 
545     {
546      FLB_CONFIG_MAP_STR, "shared_key", NULL,
547      0, FLB_TRUE, offsetof(struct flb_azure_blob, shared_key),
548      "Azure shared key"
549     },
550 
551     {
552      FLB_CONFIG_MAP_STR, "endpoint", NULL,
553      0, FLB_TRUE, offsetof(struct flb_azure_blob, endpoint),
554      "Custom full URL endpoint to use an emulator"
555     },
556 
557     {
558      FLB_CONFIG_MAP_STR, "path", NULL,
559      0, FLB_TRUE, offsetof(struct flb_azure_blob, path),
560      "Set a path for your blob"
561     },
562 
563     {
564      FLB_CONFIG_MAP_STR, "date_key", "@timestamp",
565      0, FLB_TRUE, offsetof(struct flb_azure_blob, date_key),
566      "Name of the key that will have the record timestamp"
567     },
568 
569     /* EOF */
570     {0}
571 };
572 
573 /* Plugin registration */
574 struct flb_output_plugin out_azure_blob_plugin = {
575     .name         = "azure_blob",
576     .description  = "Azure Blob Storage",
577     .cb_init      = cb_azure_blob_init,
578     .cb_flush     = cb_azure_blob_flush,
579     .cb_exit      = cb_azure_blob_exit,
580 
581     /* Test */
582     .test_formatter.callback = azure_blob_format,
583 
584     .config_map   = config_map,
585 
586     /* Plugin flags */
587     .flags          = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
588 };
589