1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_sds.h>
23 #include <fluent-bit/flb_kv.h>
24 #include <fluent-bit/flb_utils.h>
25 #include <fluent-bit/flb_pack.h>
26 #include <fluent-bit/flb_config_map.h>
27 #include <fluent-bit/flb_gzip.h>
28 #include <mbedtls/sha256.h>
29 #include <mbedtls/base64.h>
30
31 #include <msgpack.h>
32
33 #include "azure_blob.h"
34 #include "azure_blob_uri.h"
35 #include "azure_blob_conf.h"
36 #include "azure_blob_appendblob.h"
37 #include "azure_blob_blockblob.h"
38 #include "azure_blob_http.h"
39
40 #define CREATE_BLOB 1337
41
azure_blob_format(struct flb_config * config,struct flb_input_instance * ins,void * plugin_context,void * flush_ctx,const char * tag,int tag_len,const void * data,size_t bytes,void ** out_data,size_t * out_size)42 static int azure_blob_format(struct flb_config *config,
43 struct flb_input_instance *ins,
44 void *plugin_context,
45 void *flush_ctx,
46 const char *tag, int tag_len,
47 const void *data, size_t bytes,
48 void **out_data, size_t *out_size)
49 {
50 flb_sds_t out_buf;
51 struct flb_azure_blob *ctx = plugin_context;
52
53 out_buf = flb_pack_msgpack_to_json_format(data, bytes,
54 FLB_PACK_JSON_FORMAT_LINES,
55 FLB_PACK_JSON_DATE_ISO8601,
56 ctx->date_key);
57 if (!out_buf) {
58 return -1;
59 }
60
61 *out_data = out_buf;
62 *out_size = flb_sds_len(out_buf);
63 return 0;
64 }
65
send_blob(struct flb_config * config,struct flb_input_instance * i_ins,struct flb_azure_blob * ctx,char * name,char * tag,int tag_len,void * data,size_t bytes)66 static int send_blob(struct flb_config *config,
67 struct flb_input_instance *i_ins,
68 struct flb_azure_blob *ctx, char *name,
69 char *tag, int tag_len, void *data, size_t bytes)
70 {
71 int ret;
72 int compressed = FLB_FALSE;
73 int content_encoding = FLB_FALSE;
74 int content_type = FLB_FALSE;
75 uint64_t ms = 0;
76 size_t b_sent;
77 void *out_buf;
78 size_t out_size;
79 flb_sds_t uri = NULL;
80 flb_sds_t blockid = NULL;
81 void *payload_buf;
82 size_t payload_size;
83 struct flb_http_client *c;
84 struct flb_upstream_conn *u_conn;
85
86 if (ctx->btype == AZURE_BLOB_APPENDBLOB) {
87 uri = azb_append_blob_uri(ctx, tag);
88 }
89 else if (ctx->btype == AZURE_BLOB_BLOCKBLOB) {
90 blockid = azb_block_blob_id(&ms);
91 if (!blockid) {
92 flb_plg_error(ctx->ins, "could not generate block id");
93 return FLB_RETRY;
94 }
95 uri = azb_block_blob_uri(ctx, tag, blockid, ms);
96 }
97
98 if (!uri) {
99 flb_free(blockid);
100 return FLB_RETRY;
101 }
102
103 /* Get upstream connection */
104 u_conn = flb_upstream_conn_get(ctx->u);
105 if (!u_conn) {
106 flb_plg_error(ctx->ins,
107 "cannot create upstream connection for append_blob");
108 flb_sds_destroy(uri);
109 flb_free(blockid);
110 return FLB_RETRY;
111 }
112
113 /* Format the data */
114 ret = azure_blob_format(config, i_ins,
115 ctx, NULL,
116 tag, tag_len,
117 data, bytes,
118 &out_buf, &out_size);
119 if (ret != 0) {
120 flb_upstream_conn_release(u_conn);
121 flb_sds_destroy(uri);
122 flb_free(blockid);
123 return FLB_RETRY;
124 }
125
126 /* Map buffer */
127 payload_buf = out_buf;
128 payload_size = out_size;
129
130 if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) {
131 ret = flb_gzip_compress((void *) out_buf, out_size,
132 &payload_buf, &payload_size);
133 if (ret == -1) {
134 flb_plg_error(ctx->ins,
135 "cannot gzip payload, disabling compression");
136 }
137 else {
138 compressed = FLB_TRUE;
139 /* JSON buffer is not longer needed */
140 flb_sds_destroy(out_buf);
141 }
142 }
143
144 if (ctx->compress_blob == FLB_TRUE) {
145 content_encoding = AZURE_BLOB_CE_NONE;
146 content_type = AZURE_BLOB_CT_GZIP;
147 }
148 else if (compressed == FLB_TRUE) {
149 content_encoding = AZURE_BLOB_CE_GZIP;
150 content_type = AZURE_BLOB_CT_JSON;
151 }
152
153 /* Create HTTP client context */
154 c = flb_http_client(u_conn, FLB_HTTP_PUT,
155 uri,
156 payload_buf, payload_size, NULL, 0, NULL, 0);
157 if (!c) {
158 flb_plg_error(ctx->ins, "cannot create HTTP client context");
159 flb_sds_destroy(out_buf);
160 flb_upstream_conn_release(u_conn);
161 flb_free(blockid);
162 return FLB_RETRY;
163 }
164
165 /* Prepare headers and authentication */
166 azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE,
167 content_type, content_encoding);
168
169 /* Send HTTP request */
170 ret = flb_http_do(c, &b_sent);
171 flb_sds_destroy(uri);
172
173 /* Release */
174 if (compressed == FLB_FALSE) {
175 flb_sds_destroy(out_buf);
176 }
177 else {
178 flb_free(payload_buf);
179 }
180
181 flb_upstream_conn_release(u_conn);
182
183 /* Validate HTTP status */
184 if (ret == -1) {
185 flb_plg_error(ctx->ins, "error sending append_blob");
186 flb_free(blockid);
187 return FLB_RETRY;
188 }
189
190 if (c->resp.status == 201) {
191 flb_plg_info(ctx->ins, "content appended to blob successfully");
192 flb_http_client_destroy(c);
193
194 if (ctx->btype == AZURE_BLOB_BLOCKBLOB) {
195 ret = azb_block_blob_commit(ctx, blockid, tag, ms);
196 flb_free(blockid);
197 return ret;
198 }
199 flb_free(blockid);
200 return FLB_OK;
201 }
202 else if (c->resp.status == 404) {
203 flb_plg_info(ctx->ins, "blob not found: %s", c->uri);
204 flb_http_client_destroy(c);
205 return CREATE_BLOB;
206 }
207 else if (c->resp.payload_size > 0) {
208 flb_plg_error(ctx->ins, "cannot append content to blob\n%s",
209 c->resp.payload);
210 if (strstr(c->resp.payload, "must be 0 for Create Append")) {
211 flb_http_client_destroy(c);
212 return CREATE_BLOB;
213 }
214 }
215 else {
216 flb_plg_error(ctx->ins, "cannot append content to blob");
217 }
218 flb_http_client_destroy(c);
219
220 return FLB_RETRY;
221 }
222
create_blob(struct flb_azure_blob * ctx,char * name)223 static int create_blob(struct flb_azure_blob *ctx, char *name)
224 {
225 int ret;
226 size_t b_sent;
227 flb_sds_t uri = NULL;
228 struct flb_http_client *c;
229 struct flb_upstream_conn *u_conn;
230
231 uri = azb_uri_create_blob(ctx, name);
232 if (!uri) {
233 return FLB_RETRY;
234 }
235
236 /* Get upstream connection */
237 u_conn = flb_upstream_conn_get(ctx->u);
238 if (!u_conn) {
239 flb_plg_error(ctx->ins,
240 "cannot create upstream connection for create_append_blob");
241 flb_sds_destroy(uri);
242 return FLB_RETRY;
243 }
244
245 /* Create HTTP client context */
246 c = flb_http_client(u_conn, FLB_HTTP_PUT,
247 uri,
248 NULL, 0, NULL, 0, NULL, 0);
249 if (!c) {
250 flb_plg_error(ctx->ins, "cannot create HTTP client context");
251 flb_upstream_conn_release(u_conn);
252 flb_sds_destroy(uri);
253 return FLB_RETRY;
254 }
255
256 /* Prepare headers and authentication */
257 azb_http_client_setup(ctx, c, -1, FLB_TRUE,
258 AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
259
260 /* Send HTTP request */
261 ret = flb_http_do(c, &b_sent);
262 flb_sds_destroy(uri);
263
264 if (ret == -1) {
265 flb_plg_error(ctx->ins, "error sending append_blob");
266 flb_http_client_destroy(c);
267 flb_upstream_conn_release(u_conn);
268 return FLB_RETRY;
269 }
270
271 if (c->resp.status == 201) {
272 flb_plg_info(ctx->ins, "blob created successfully: %s", c->uri);
273 }
274 else {
275 if (c->resp.payload_size > 0) {
276 flb_plg_error(ctx->ins, "http_status=%i cannot create append blob\n%s",
277 c->resp.status, c->resp.payload);
278 }
279 else {
280 flb_plg_error(ctx->ins, "http_status=%i cannot create append blob",
281 c->resp.status);
282 }
283 flb_http_client_destroy(c);
284 flb_upstream_conn_release(u_conn);
285 return FLB_RETRY;
286 }
287
288 flb_http_client_destroy(c);
289 flb_upstream_conn_release(u_conn);
290 return FLB_OK;
291 }
292
create_container(struct flb_azure_blob * ctx,char * name)293 static int create_container(struct flb_azure_blob *ctx, char *name)
294 {
295 int ret;
296 size_t b_sent;
297 flb_sds_t uri;
298 struct flb_http_client *c;
299 struct flb_upstream_conn *u_conn;
300
301 /* Get upstream connection */
302 u_conn = flb_upstream_conn_get(ctx->u);
303 if (!u_conn) {
304 flb_plg_error(ctx->ins,
305 "cannot create upstream connection for container creation");
306 return FLB_FALSE;
307 }
308
309 /* URI */
310 uri = azb_uri_ensure_or_create_container(ctx);
311 if (!uri) {
312 flb_upstream_conn_release(u_conn);
313 return FLB_FALSE;
314 }
315
316 /* Create HTTP client context */
317 c = flb_http_client(u_conn, FLB_HTTP_PUT,
318 uri,
319 NULL, 0, NULL, 0, NULL, 0);
320 if (!c) {
321 flb_plg_error(ctx->ins, "cannot create HTTP client context");
322 flb_upstream_conn_release(u_conn);
323 return FLB_FALSE;
324 }
325
326 /* Prepare headers and authentication */
327 azb_http_client_setup(ctx, c, -1, FLB_FALSE,
328 AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
329
330 /* Send HTTP request */
331 ret = flb_http_do(c, &b_sent);
332
333 /* Release URI */
334 flb_sds_destroy(uri);
335
336 /* Validate http response */
337 if (ret == -1) {
338 flb_plg_error(ctx->ins, "error requesting container creation");
339 flb_http_client_destroy(c);
340 flb_upstream_conn_release(u_conn);
341 return FLB_FALSE;
342 }
343
344 if (c->resp.status == 201) {
345 flb_plg_info(ctx->ins, "container '%s' created sucessfully", name);
346 }
347 else {
348 if (c->resp.payload_size > 0) {
349 flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
350 name, c->resp.payload);
351 }
352 else {
353 flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
354 name, c->resp.payload);
355 }
356 flb_http_client_destroy(c);
357 flb_upstream_conn_release(u_conn);
358 return FLB_FALSE;
359 }
360
361 flb_http_client_destroy(c);
362 flb_upstream_conn_release(u_conn);
363 return FLB_TRUE;
364 }
365
366 /*
367 * Check that the container exists, if it doesn't and the configuration property
368 * auto_create_container is enabled, it will send a request to create it. If it
369 * could not be created or auto_create_container is disabled, it returns FLB_FALSE.
370 */
ensure_container(struct flb_azure_blob * ctx)371 static int ensure_container(struct flb_azure_blob *ctx)
372 {
373 int ret;
374 int status;
375 size_t b_sent;
376 flb_sds_t uri;
377 struct flb_http_client *c;
378 struct flb_upstream_conn *u_conn;
379
380 uri = azb_uri_ensure_or_create_container(ctx);
381 if (!uri) {
382 return FLB_FALSE;
383 }
384
385 /* Get upstream connection */
386 u_conn = flb_upstream_conn_get(ctx->u);
387 if (!u_conn) {
388 flb_plg_error(ctx->ins,
389 "cannot create upstream connection for container check");
390 flb_sds_destroy(uri);
391 return FLB_FALSE;
392 }
393
394 /* Create HTTP client context */
395 c = flb_http_client(u_conn, FLB_HTTP_GET,
396 uri,
397 NULL, 0, NULL, 0, NULL, 0);
398 if (!c) {
399 flb_plg_error(ctx->ins, "cannot create HTTP client context");
400 flb_upstream_conn_release(u_conn);
401 return FLB_FALSE;
402 }
403 flb_http_strip_port_from_host(c);
404
405 /* Prepare headers and authentication */
406 azb_http_client_setup(ctx, c, -1, FLB_FALSE,
407 AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
408
409 /* Send HTTP request */
410 ret = flb_http_do(c, &b_sent);
411 flb_sds_destroy(uri);
412
413 if (ret == -1) {
414 flb_plg_error(ctx->ins, "error requesting container properties");
415 flb_upstream_conn_release(u_conn);
416 return FLB_FALSE;
417 }
418
419 status = c->resp.status;
420 flb_http_client_destroy(c);
421
422 /* Release connection */
423 flb_upstream_conn_release(u_conn);
424
425 /* Request was successful, validate HTTP status code */
426 if (status == 404) {
427 /* The container was not found, try to create it */
428 flb_plg_info(ctx->ins, "container '%s' not found, trying to create it",
429 ctx->container_name);
430 ret = create_container(ctx, ctx->container_name);
431 return ret;
432 }
433 else if (status == 200) {
434 return FLB_TRUE;
435 }
436
437 return FLB_FALSE;
438 }
439
cb_azure_blob_init(struct flb_output_instance * ins,struct flb_config * config,void * data)440 static int cb_azure_blob_init(struct flb_output_instance *ins,
441 struct flb_config *config, void *data)
442 {
443 struct flb_azure_blob *ctx = NULL;
444 (void) ins;
445 (void) config;
446 (void) data;
447
448 ctx = flb_azure_blob_conf_create(ins, config);
449 if (!ctx) {
450 return -1;
451 }
452
453 flb_output_set_http_debug_callbacks(ins);
454 return 0;
455 }
456
cb_azure_blob_flush(const void * data,size_t bytes,const char * tag,int tag_len,struct flb_input_instance * i_ins,void * out_context,struct flb_config * config)457 static void cb_azure_blob_flush(const void *data, size_t bytes,
458 const char *tag, int tag_len,
459 struct flb_input_instance *i_ins,
460 void *out_context,
461 struct flb_config *config)
462 {
463 int ret;
464 struct flb_azure_blob *ctx = out_context;
465 (void) i_ins;
466 (void) config;
467
468 /* Validate the container exists, otherwise just create it */
469 ret = ensure_container(ctx);
470 if (ret == FLB_FALSE) {
471 FLB_OUTPUT_RETURN(FLB_RETRY);
472 }
473
474 ret = send_blob(config, i_ins, ctx, (char *) tag,
475 (char *) tag, tag_len, (char *) data, bytes);
476 if (ret == CREATE_BLOB) {
477 ret = create_blob(ctx, (char *) tag);
478 if (ret == FLB_OK) {
479 ret = send_blob(config, i_ins, ctx, (char *) tag,
480 (char *) tag, tag_len, (char *) data, bytes);
481 }
482 }
483
484 /* FLB_RETRY, FLB_OK, FLB_ERROR */
485 FLB_OUTPUT_RETURN(ret);
486 }
487
cb_azure_blob_exit(void * data,struct flb_config * config)488 static int cb_azure_blob_exit(void *data, struct flb_config *config)
489 {
490 struct flb_azure_blob *ctx = data;
491
492 if (!ctx) {
493 return 0;
494 }
495
496 flb_azure_blob_conf_destroy(ctx);
497 return 0;
498 }
499
500 /* Configuration properties map */
501 static struct flb_config_map config_map[] = {
502 {
503 FLB_CONFIG_MAP_STR, "account_name", NULL,
504 0, FLB_TRUE, offsetof(struct flb_azure_blob, account_name),
505 "Azure account name (mandatory)"
506 },
507
508 {
509 FLB_CONFIG_MAP_STR, "container_name", NULL,
510 0, FLB_TRUE, offsetof(struct flb_azure_blob, container_name),
511 "Container name (mandatory)"
512 },
513
514 {
515 FLB_CONFIG_MAP_BOOL, "auto_create_container", "true",
516 0, FLB_TRUE, offsetof(struct flb_azure_blob, auto_create_container),
517 "Auto create container if it don't exists"
518 },
519
520 {
521 FLB_CONFIG_MAP_STR, "blob_type", "appendblob",
522 0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_type),
523 "Set the block type: appendblob or blockblob"
524 },
525
526 {
527 FLB_CONFIG_MAP_STR, "compress", NULL,
528 0, FLB_FALSE, 0,
529 "Set payload compression in network transfer. Option available is 'gzip'"
530 },
531
532 {
533 FLB_CONFIG_MAP_BOOL, "compress_blob", "false",
534 0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob),
535 "Enable block blob GZIP compression in the final blob file. This option is "
536 "not compatible with 'appendblob' block type"
537 },
538
539 {
540 FLB_CONFIG_MAP_BOOL, "emulator_mode", "false",
541 0, FLB_TRUE, offsetof(struct flb_azure_blob, emulator_mode),
542 "Use emulator mode, enable it if you want to use Azurite"
543 },
544
545 {
546 FLB_CONFIG_MAP_STR, "shared_key", NULL,
547 0, FLB_TRUE, offsetof(struct flb_azure_blob, shared_key),
548 "Azure shared key"
549 },
550
551 {
552 FLB_CONFIG_MAP_STR, "endpoint", NULL,
553 0, FLB_TRUE, offsetof(struct flb_azure_blob, endpoint),
554 "Custom full URL endpoint to use an emulator"
555 },
556
557 {
558 FLB_CONFIG_MAP_STR, "path", NULL,
559 0, FLB_TRUE, offsetof(struct flb_azure_blob, path),
560 "Set a path for your blob"
561 },
562
563 {
564 FLB_CONFIG_MAP_STR, "date_key", "@timestamp",
565 0, FLB_TRUE, offsetof(struct flb_azure_blob, date_key),
566 "Name of the key that will have the record timestamp"
567 },
568
569 /* EOF */
570 {0}
571 };
572
573 /* Plugin registration */
574 struct flb_output_plugin out_azure_blob_plugin = {
575 .name = "azure_blob",
576 .description = "Azure Blob Storage",
577 .cb_init = cb_azure_blob_init,
578 .cb_flush = cb_azure_blob_flush,
579 .cb_exit = cb_azure_blob_exit,
580
581 /* Test */
582 .test_formatter.callback = azure_blob_format,
583
584 .config_map = config_map,
585
586 /* Plugin flags */
587 .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
588 };
589