1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_http_client.h>
23 #include <fluent-bit/flb_pack.h>
24 #include <fluent-bit/flb_utils.h>
25 #include <fluent-bit/flb_time.h>
26 #include <fluent-bit/flb_oauth2.h>
27 #include <fluent-bit/flb_regex.h>
28 #include <fluent-bit/flb_pthread.h>
29
30 #include <msgpack.h>
31
32 #include "gce_metadata.h"
33 #include "stackdriver.h"
34 #include "stackdriver_conf.h"
35 #include "stackdriver_operation.h"
36 #include "stackdriver_source_location.h"
37 #include "stackdriver_http_request.h"
38 #include "stackdriver_timestamp.h"
39 #include "stackdriver_helper.h"
40 #include <mbedtls/base64.h>
41 #include <mbedtls/sha256.h>
42
43 pthread_key_t oauth2_type;
44 pthread_key_t oauth2_token;
45
oauth2_cache_exit(void * ptr)46 static void oauth2_cache_exit(void *ptr)
47 {
48 if (ptr) {
49 flb_sds_destroy(ptr);
50 }
51 }
52
oauth2_cache_init()53 static void oauth2_cache_init()
54 {
55 /* oauth2 pthread key */
56 pthread_key_create(&oauth2_type, oauth2_cache_exit);
57 pthread_key_create(&oauth2_token, oauth2_cache_exit);
58 }
59
60 /* Set oauth2 type and token in pthread keys */
oauth2_cache_set(char * type,char * token)61 static void oauth2_cache_set(char *type, char *token)
62 {
63 flb_sds_t tmp;
64
65 /* oauth2 type */
66 tmp = pthread_getspecific(oauth2_type);
67 if (tmp) {
68 flb_sds_destroy(tmp);
69 }
70 tmp = flb_sds_create(type);
71 pthread_setspecific(oauth2_type, tmp);
72
73 /* oauth2 access token */
74 tmp = pthread_getspecific(oauth2_token);
75 if (tmp) {
76 flb_sds_destroy(tmp);
77 }
78 tmp = flb_sds_create(token);
79 pthread_setspecific(oauth2_token, tmp);
80 }
81
82 /* By using pthread keys cached values, compose the authorizatoin token */
oauth2_cache_to_token()83 static flb_sds_t oauth2_cache_to_token()
84 {
85 flb_sds_t type;
86 flb_sds_t token;
87 flb_sds_t output;
88
89 type = pthread_getspecific(oauth2_type);
90 if (!type) {
91 return NULL;
92 }
93
94 output = flb_sds_create(type);
95 if (!output) {
96 return NULL;
97 }
98
99 token = pthread_getspecific(oauth2_token);
100 flb_sds_printf(&output, " %s", token);
101 return output;
102 }
103
104 /*
105 * Base64 Encoding in JWT must:
106 *
107 * - remove any trailing padding '=' character
108 * - replace '+' with '-'
109 * - replace '/' with '_'
110 *
111 * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C
112 */
jwt_base64_url_encode(unsigned char * out_buf,size_t out_size,unsigned char * in_buf,size_t in_size,size_t * olen)113 int jwt_base64_url_encode(unsigned char *out_buf, size_t out_size,
114 unsigned char *in_buf, size_t in_size,
115 size_t *olen)
116
117 {
118 int i;
119 size_t len;
120
121 /* do normal base64 encoding */
122 mbedtls_base64_encode(out_buf, out_size - 1,
123 &len, in_buf, in_size);
124
125 /* Replace '+' and '/' characters */
126 for (i = 0; i < len && out_buf[i] != '='; i++) {
127 if (out_buf[i] == '+') {
128 out_buf[i] = '-';
129 }
130 else if (out_buf[i] == '/') {
131 out_buf[i] = '_';
132 }
133 }
134
135 /* Now 'i' becomes the new length */
136 *olen = i;
137 return 0;
138 }
139
jwt_encode(char * payload,char * secret,char ** out_signature,size_t * out_size,struct flb_stackdriver * ctx)140 static int jwt_encode(char *payload, char *secret,
141 char **out_signature, size_t *out_size,
142 struct flb_stackdriver *ctx)
143 {
144 int ret;
145 int len;
146 int buf_size;
147 size_t olen;
148 char *buf;
149 char *sigd;
150 char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}";
151 unsigned char sha256_buf[32] = {0};
152 mbedtls_sha256_context sha256_ctx;
153 mbedtls_rsa_context *rsa;
154 flb_sds_t out;
155 mbedtls_pk_context pk_ctx;
156 unsigned char sig[256] = {0};
157
158 buf_size = (strlen(payload) + strlen(secret)) * 2;
159 buf = flb_malloc(buf_size);
160 if (!buf) {
161 flb_errno();
162 return -1;
163 }
164
165 /* Encode header */
166 len = strlen(headers);
167 mbedtls_base64_encode((unsigned char *) buf, buf_size - 1,
168 &olen, (unsigned char *) headers, len);
169
170 /* Create buffer to store JWT */
171 out = flb_sds_create_size(2048);
172 if (!out) {
173 flb_errno();
174 flb_free(buf);
175 return -1;
176 }
177
178 /* Append header */
179 flb_sds_cat(out, buf, olen);
180 flb_sds_cat(out, ".", 1);
181
182 /* Encode Payload */
183 len = strlen(payload);
184 jwt_base64_url_encode((unsigned char *) buf, buf_size,
185 (unsigned char *) payload, len, &olen);
186
187 /* Append Payload */
188 flb_sds_cat(out, buf, olen);
189
190 /* do sha256() of base64(header).base64(payload) */
191 mbedtls_sha256_init(&sha256_ctx);
192 mbedtls_sha256_starts(&sha256_ctx, 0);
193 mbedtls_sha256_update(&sha256_ctx, (const unsigned char *) out,
194 flb_sds_len(out));
195 mbedtls_sha256_finish(&sha256_ctx, sha256_buf);
196
197 /* In mbedTLS cert length must include the null byte */
198 len = strlen(secret) + 1;
199
200 /* Load Private Key */
201 mbedtls_pk_init(&pk_ctx);
202 ret = mbedtls_pk_parse_key(&pk_ctx,
203 (unsigned char *) secret, len, NULL, 0);
204 if (ret != 0) {
205 flb_plg_error(ctx->ins, "error loading private key");
206 flb_free(buf);
207 flb_sds_destroy(out);
208 return -1;
209 }
210
211 /* Create RSA context */
212 rsa = mbedtls_pk_rsa(pk_ctx);
213 if (!rsa) {
214 flb_plg_error(ctx->ins, "error creating RSA context");
215 flb_free(buf);
216 flb_sds_destroy(out);
217 mbedtls_pk_free(&pk_ctx);
218 return -1;
219 }
220
221 ret = mbedtls_rsa_pkcs1_sign(rsa, NULL, NULL,
222 MBEDTLS_RSA_PRIVATE, MBEDTLS_MD_SHA256,
223 0, (unsigned char *) sha256_buf, sig);
224 if (ret != 0) {
225 flb_plg_error(ctx->ins, "error signing SHA256");
226 flb_free(buf);
227 flb_sds_destroy(out);
228 mbedtls_pk_free(&pk_ctx);
229 return -1;
230 }
231
232 sigd = flb_malloc(2048);
233 if (!sigd) {
234 flb_errno();
235 flb_free(buf);
236 flb_sds_destroy(out);
237 mbedtls_pk_free(&pk_ctx);
238 return -1;
239 }
240
241 jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen);
242
243 flb_sds_cat(out, ".", 1);
244 flb_sds_cat(out, sigd, olen);
245
246 *out_signature = out;
247 *out_size = flb_sds_len(out);
248
249 flb_free(buf);
250 flb_free(sigd);
251 mbedtls_pk_free(&pk_ctx);
252
253 return 0;
254 }
255
256 /* Create a new oauth2 context and get a oauth2 token */
get_oauth2_token(struct flb_stackdriver * ctx)257 static int get_oauth2_token(struct flb_stackdriver *ctx)
258 {
259 int ret;
260 char *token;
261 char *sig_data;
262 size_t sig_size;
263 time_t issued;
264 time_t expires;
265 char payload[1024];
266
267 flb_oauth2_payload_clear(ctx->o);
268
269 /* In case of using metadata server, fetch token from there */
270 if (ctx->metadata_server_auth) {
271 return gce_metadata_read_token(ctx);
272 }
273
274 /* JWT encode for oauth2 */
275 issued = time(NULL);
276 expires = issued + FLB_STD_TOKEN_REFRESH;
277
278 snprintf(payload, sizeof(payload) - 1,
279 "{\"iss\": \"%s\", \"scope\": \"%s\", "
280 "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}",
281 ctx->client_email, FLB_STD_SCOPE,
282 FLB_STD_AUTH_URL,
283 expires, issued);
284
285 /* Compose JWT signature */
286 ret = jwt_encode(payload, ctx->private_key, &sig_data, &sig_size, ctx);
287 if (ret != 0) {
288 flb_plg_error(ctx->ins, "JWT signature generation failed");
289 return -1;
290 }
291 flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data);
292
293 ret = flb_oauth2_payload_append(ctx->o,
294 "grant_type", -1,
295 "urn:ietf:params:oauth:"
296 "grant-type:jwt-bearer", -1);
297 if (ret == -1) {
298 flb_plg_error(ctx->ins, "error appending oauth2 params");
299 flb_sds_destroy(sig_data);
300 return -1;
301 }
302
303 ret = flb_oauth2_payload_append(ctx->o,
304 "assertion", -1,
305 sig_data, sig_size);
306 if (ret == -1) {
307 flb_plg_error(ctx->ins, "error appending oauth2 params");
308 flb_sds_destroy(sig_data);
309 return -1;
310 }
311 flb_sds_destroy(sig_data);
312
313 /* Retrieve access token */
314 token = flb_oauth2_token_get(ctx->o);
315 if (!token) {
316 flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
317 return -1;
318 }
319
320 return 0;
321 }
322
get_google_token(struct flb_stackdriver * ctx)323 static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
324 {
325 int ret = 0;
326 flb_sds_t output = NULL;
327
328 ret = pthread_mutex_trylock(&ctx->token_mutex);
329 if (ret == EBUSY) {
330 /*
331 * If the routine is locked we just use our pre-cached values and
332 * compose the expected authorization value.
333 *
334 * If the routine fails it will return NULL and the caller will just
335 * issue a FLB_RETRY.
336 */
337 return oauth2_cache_to_token();
338 }
339 else if (ret != 0) {
340 flb_plg_error(ctx->ins, "error locking mutex");
341 return NULL;
342 }
343
344 if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
345 ret = get_oauth2_token(ctx);
346 }
347
348 /* Copy string to prevent race conditions (get_oauth2 can free the string) */
349 if (ret == 0) {
350 /* Update pthread keys cached values */
351 oauth2_cache_set(ctx->o->token_type, ctx->o->access_token);
352
353 /* Compose outgoing buffer using cached values */
354 output = oauth2_cache_to_token();
355 }
356
357 if (pthread_mutex_unlock(&ctx->token_mutex)){
358 flb_plg_error(ctx->ins, "error unlocking mutex");
359 if (output) {
360 flb_sds_destroy(output);
361 }
362 return NULL;
363 }
364
365
366 return output;
367 }
368
validate_msgpack_unpacked_data(msgpack_object root)369 static bool validate_msgpack_unpacked_data(msgpack_object root)
370 {
371 return root.type == MSGPACK_OBJECT_ARRAY &&
372 root.via.array.size == 2 &&
373 root.via.array.ptr[1].type == MSGPACK_OBJECT_MAP;
374 }
375
replace_prefix_dot(flb_sds_t s,int tag_prefix_len)376 void replace_prefix_dot(flb_sds_t s, int tag_prefix_len)
377 {
378 int i;
379 int str_len;
380 char c;
381
382 if (!s) {
383 return;
384 }
385
386 str_len = flb_sds_len(s);
387 if (tag_prefix_len > str_len) {
388 flb_error("[output] tag_prefix shouldn't be longer than local_resource_id");
389 return;
390 }
391
392 for (i = 0; i < tag_prefix_len; i++) {
393 c = s[i];
394
395 if (c == '.') {
396 s[i] = '_';
397 }
398 }
399 }
400
get_str_value_from_msgpack_map(msgpack_object_map map,const char * key,int key_size)401 static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map,
402 const char *key, int key_size)
403 {
404 int i;
405 msgpack_object k;
406 msgpack_object v;
407 flb_sds_t ptr = NULL;
408
409 for (i = 0; i < map.size; i++) {
410 k = map.ptr[i].key;
411 v = map.ptr[i].val;
412
413 if (k.type != MSGPACK_OBJECT_STR) {
414 continue;
415 }
416
417 if (k.via.str.size == key_size &&
418 strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) {
419 /* make sure to free it after use */
420 ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size);
421 break;
422 }
423 }
424
425 return ptr;
426 }
427
428 /* parse_monitored_resource is to extract the monitoired resource labels
429 * from "logging.googleapis.com/monitored_resource" in log data
430 * and append to 'resource'/'labels' in log entry.
431 * Monitored resource type is already read from resource field in stackdriver
432 * output plugin configuration parameters.
433 *
434 * The structure of monitored_resource is:
435 * {
436 * "logging.googleapis.com/monitored_resource": {
437 * "labels": {
438 * "resource_label": <label_value>,
439 * }
440 * }
441 * }
442 * See https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types
443 * for required labels for each monitored resource.
444 */
445
parse_monitored_resource(struct flb_stackdriver * ctx,const void * data,size_t bytes,msgpack_packer * mp_pck)446 static int parse_monitored_resource(struct flb_stackdriver *ctx, const void *data, size_t bytes, msgpack_packer *mp_pck)
447 {
448 int ret = -1;
449 size_t off = 0;
450 msgpack_object *obj;
451 msgpack_unpacked result;
452 msgpack_unpacked_init(&result);
453 while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
454 if (result.data.type != MSGPACK_OBJECT_ARRAY) {
455 continue;
456 }
457 if (result.data.via.array.size != 2) {
458 continue;
459 }
460 obj = &result.data.via.array.ptr[1];
461 if (obj->type != MSGPACK_OBJECT_MAP) {
462 continue;
463 }
464 msgpack_object_kv *kv = obj->via.map.ptr;
465 msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
466 for (; kv < kvend; ++kv) {
467 if (kv->val.type == MSGPACK_OBJECT_MAP && kv->key.type == MSGPACK_OBJECT_STR
468 && strncmp (MONITORED_RESOURCE_KEY, kv->key.via.str.ptr, kv->key.via.str.size) == 0) {
469 msgpack_object subobj = kv->val;
470 msgpack_object_kv *p = subobj.via.map.ptr;
471 msgpack_object_kv *pend = subobj.via.map.ptr + subobj.via.map.size;
472 for (; p < pend; ++p) {
473 if (p->key.type != MSGPACK_OBJECT_STR || p->val.type != MSGPACK_OBJECT_MAP) {
474 continue;
475 }
476 if (strncmp("labels", p->key.via.str.ptr, p->key.via.str.size) == 0) {
477 msgpack_object labels = p->val;
478 msgpack_object_kv *q = labels.via.map.ptr;
479 msgpack_object_kv *qend = labels.via.map.ptr + labels.via.map.size;
480 int fields = 0;
481 for (; q < qend; ++q) {
482 if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
483 flb_plg_error(ctx->ins, "Key and value should be string in the %s/labels", MONITORED_RESOURCE_KEY);
484 }
485 ++fields;
486 }
487 if (fields > 0) {
488 msgpack_pack_map(mp_pck, fields);
489 q = labels.via.map.ptr;
490 for (; q < qend; ++q) {
491 if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
492 continue;
493 }
494 flb_plg_debug(ctx->ins, "[%s] found in the payload", MONITORED_RESOURCE_KEY);
495 msgpack_pack_str(mp_pck, q->key.via.str.size);
496 msgpack_pack_str_body(mp_pck, q->key.via.str.ptr, q->key.via.str.size);
497 msgpack_pack_str(mp_pck, q->val.via.str.size);
498 msgpack_pack_str_body(mp_pck, q->val.via.str.ptr, q->val.via.str.size);
499 }
500 msgpack_unpacked_destroy(&result);
501 ret = 0;
502 return ret;
503 }
504 }
505 }
506 }
507 }
508 }
509
510 msgpack_unpacked_destroy(&result);
511 flb_plg_debug(ctx->ins, "[%s] not found in the payload", MONITORED_RESOURCE_KEY);
512 return ret;
513 }
514
515 /*
516 * Given a local_resource_id, split the content using the proper separator generating
517 * a linked list to store the spliited string
518 */
parse_local_resource_id_to_list(char * local_resource_id,char * type)519 static struct mk_list *parse_local_resource_id_to_list(char *local_resource_id, char *type)
520 {
521 int ret = -1;
522 int max_split = -1;
523 int len_k8s_container;
524 int len_k8s_node;
525 int len_k8s_pod;
526 struct mk_list *list;
527
528 len_k8s_container = sizeof(K8S_CONTAINER) - 1;
529 len_k8s_node = sizeof(K8S_NODE) - 1;
530 len_k8s_pod = sizeof(K8S_POD) - 1;
531
532 /* Allocate list head */
533 list = flb_malloc(sizeof(struct mk_list));
534 if (!list) {
535 flb_errno();
536 return NULL;
537 }
538 mk_list_init(list);
539
540 /* Determinate the max split value based on type */
541 if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
542 /* including the prefix of tag */
543 max_split = 4;
544 }
545 else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
546 max_split = 2;
547 }
548 else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
549 max_split = 3;
550 }
551
552 /* The local_resource_id is splitted by '.' */
553 ret = flb_slist_split_string(list, local_resource_id, '.', max_split);
554
555 if (ret == -1 || mk_list_size(list) != max_split) {
556 flb_error("error parsing local_resource_id [%s] for type %s", local_resource_id, type);
557 flb_slist_destroy(list);
558 flb_free(list);
559 return NULL;
560 }
561
562 return list;
563 }
564
565 /*
566 * extract_local_resource_id():
567 * - extract the value from "logging.googleapis.com/local_resource_id" field
568 * - if local_resource_id is missing from the payLoad, use the tag of the log
569 */
extract_local_resource_id(const void * data,size_t bytes,struct flb_stackdriver * ctx,const char * tag)570 static int extract_local_resource_id(const void *data, size_t bytes,
571 struct flb_stackdriver *ctx, const char *tag) {
572 msgpack_object root;
573 msgpack_object_map map;
574 msgpack_unpacked result;
575 flb_sds_t local_resource_id;
576 size_t off = 0;
577
578 msgpack_unpacked_init(&result);
579 if (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
580 root = result.data;
581
582 if (!validate_msgpack_unpacked_data(root)) {
583 msgpack_unpacked_destroy(&result);
584 flb_plg_error(ctx->ins, "unexpected record format");
585 return -1;
586 }
587
588 map = root.via.array.ptr[1].via.map;
589 local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY,
590 LEN_LOCAL_RESOURCE_ID_KEY);
591
592 if (local_resource_id == NULL) {
593 /* if local_resource_id is not found, use the tag of the log */
594 flb_plg_debug(ctx->ins, "local_resource_id not found, "
595 "tag [%s] is assigned for local_resource_id", tag);
596 local_resource_id = flb_sds_create(tag);
597 }
598
599 /* we need to create up the local_resource_id from previous log */
600 if (ctx->local_resource_id) {
601 flb_sds_destroy(ctx->local_resource_id);
602 }
603
604 ctx->local_resource_id = flb_sds_create(local_resource_id);
605 }
606 else {
607 msgpack_unpacked_destroy(&result);
608 flb_plg_error(ctx->ins, "failed to unpack data");
609 return -1;
610 }
611
612 flb_sds_destroy(local_resource_id);
613 msgpack_unpacked_destroy(&result);
614 return 0;
615 }
616
617 /*
618 * set_monitored_resource_labels():
619 * - use the extracted local_resource_id to assign the label keys for different
620 * resource types that are specified in the configuration of stackdriver_out plugin
621 */
set_monitored_resource_labels(struct flb_stackdriver * ctx,char * type)622 static int set_monitored_resource_labels(struct flb_stackdriver *ctx, char *type)
623 {
624 int ret = -1;
625 int first = FLB_TRUE;
626 int counter = 0;
627 int len_k8s_container;
628 int len_k8s_node;
629 int len_k8s_pod;
630 size_t prefix_len = 0;
631 struct local_resource_id_list *ptr;
632 struct mk_list *list = NULL;
633 struct mk_list *head;
634 flb_sds_t new_local_resource_id;
635
636 if (!ctx->local_resource_id) {
637 flb_plg_error(ctx->ins, "local_resource_is is not assigned");
638 return -1;
639 }
640
641 len_k8s_container = sizeof(K8S_CONTAINER) - 1;
642 len_k8s_node = sizeof(K8S_NODE) - 1;
643 len_k8s_pod = sizeof(K8S_POD) - 1;
644
645 prefix_len = flb_sds_len(ctx->tag_prefix);
646 if (flb_sds_casecmp(ctx->tag_prefix, ctx->local_resource_id, prefix_len) != 0) {
647 flb_plg_error(ctx->ins, "tag_prefix [%s] doesn't match the prefix of"
648 " local_resource_id [%s]", ctx->tag_prefix,
649 ctx->local_resource_id);
650 return -1;
651 }
652
653 new_local_resource_id = flb_sds_create_len(ctx->local_resource_id,
654 flb_sds_len(ctx->local_resource_id));
655 replace_prefix_dot(new_local_resource_id, prefix_len - 1);
656
657 if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
658 list = parse_local_resource_id_to_list(new_local_resource_id, K8S_CONTAINER);
659 if (!list) {
660 goto error;
661 }
662
663 /* iterate through the list */
664 mk_list_foreach(head, list) {
665 ptr = mk_list_entry(head, struct local_resource_id_list, _head);
666 if (first) {
667 first = FLB_FALSE;
668 continue;
669 }
670
671 /* Follow the order of fields in local_resource_id */
672 if (counter == 0) {
673 if (ctx->namespace_name) {
674 flb_sds_destroy(ctx->namespace_name);
675 }
676 ctx->namespace_name = flb_sds_create(ptr->val);
677 }
678 else if (counter == 1) {
679 if (ctx->pod_name) {
680 flb_sds_destroy(ctx->pod_name);
681 }
682 ctx->pod_name = flb_sds_create(ptr->val);
683 }
684 else if (counter == 2) {
685 if (ctx->container_name) {
686 flb_sds_destroy(ctx->container_name);
687 }
688 ctx->container_name = flb_sds_create(ptr->val);
689 }
690
691 counter++;
692 }
693
694 if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) {
695 goto error;
696 }
697 }
698 else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
699 list = parse_local_resource_id_to_list(new_local_resource_id, K8S_NODE);
700 if (!list) {
701 goto error;
702 }
703
704 mk_list_foreach(head, list) {
705 ptr = mk_list_entry(head, struct local_resource_id_list, _head);
706 if (first) {
707 first = FLB_FALSE;
708 continue;
709 }
710
711 if (ptr != NULL) {
712 if (ctx->node_name) {
713 flb_sds_destroy(ctx->node_name);
714 }
715 ctx->node_name = flb_sds_create(ptr->val);
716 }
717 }
718
719 if (!ctx->node_name) {
720 goto error;
721 }
722 }
723 else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
724 list = parse_local_resource_id_to_list(new_local_resource_id, K8S_POD);
725 if (!list) {
726 goto error;
727 }
728
729 mk_list_foreach(head, list) {
730 ptr = mk_list_entry(head, struct local_resource_id_list, _head);
731 if (first) {
732 first = FLB_FALSE;
733 continue;
734 }
735
736 /* Follow the order of fields in local_resource_id */
737 if (counter == 0) {
738 if (ctx->namespace_name) {
739 flb_sds_destroy(ctx->namespace_name);
740 }
741 ctx->namespace_name = flb_sds_create(ptr->val);
742 }
743 else if (counter == 1) {
744 if (ctx->pod_name) {
745 flb_sds_destroy(ctx->pod_name);
746 }
747 ctx->pod_name = flb_sds_create(ptr->val);
748 }
749
750 counter++;
751 }
752
753 if (!ctx->namespace_name || !ctx->pod_name) {
754 goto error;
755 }
756 }
757
758 ret = 0;
759
760 if (list) {
761 flb_slist_destroy(list);
762 flb_free(list);
763 }
764 flb_sds_destroy(new_local_resource_id);
765
766 return ret;
767
768 error:
769 if (list) {
770 flb_slist_destroy(list);
771 flb_free(list);
772 }
773
774 if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
775 if (ctx->namespace_name) {
776 flb_sds_destroy(ctx->namespace_name);
777 }
778
779 if (ctx->pod_name) {
780 flb_sds_destroy(ctx->pod_name);
781 }
782
783 if (ctx->container_name) {
784 flb_sds_destroy(ctx->container_name);
785 }
786 }
787 else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
788 if (ctx->node_name) {
789 flb_sds_destroy(ctx->node_name);
790 }
791 }
792 else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
793 if (ctx->namespace_name) {
794 flb_sds_destroy(ctx->namespace_name);
795 }
796
797 if (ctx->pod_name) {
798 flb_sds_destroy(ctx->pod_name);
799 }
800 }
801
802 flb_sds_destroy(new_local_resource_id);
803 return -1;
804 }
805
is_tag_match_regex(struct flb_stackdriver * ctx,const char * tag,int tag_len)806 static int is_tag_match_regex(struct flb_stackdriver *ctx,
807 const char *tag, int tag_len)
808 {
809 int ret;
810 int tag_prefix_len;
811 int len_to_be_matched;
812 const char *tag_str_to_be_matcheds;
813
814 tag_prefix_len = flb_sds_len(ctx->tag_prefix);
815 if (tag_len > tag_prefix_len &&
816 flb_sds_cmp(ctx->tag_prefix, tag, tag_prefix_len) != 0) {
817 return 0;
818 }
819
820 tag_str_to_be_matcheds = tag + tag_prefix_len;
821 len_to_be_matched = tag_len - tag_prefix_len;
822 ret = flb_regex_match(ctx->regex,
823 (unsigned char *) tag_str_to_be_matcheds,
824 len_to_be_matched);
825
826 /* 1 -> match; 0 -> doesn't match; < 0 -> error */
827 return ret;
828 }
829
is_local_resource_id_match_regex(struct flb_stackdriver * ctx)830 static int is_local_resource_id_match_regex(struct flb_stackdriver *ctx)
831 {
832 int ret;
833 int prefix_len;
834 int len_to_be_matched;
835 const char *str_to_be_matcheds;
836
837 if (!ctx->local_resource_id) {
838 flb_plg_warn(ctx->ins, "local_resource_id not found in the payload");
839 return -1;
840 }
841
842 prefix_len = flb_sds_len(ctx->tag_prefix);
843 str_to_be_matcheds = ctx->local_resource_id + prefix_len;
844 len_to_be_matched = flb_sds_len(ctx->local_resource_id) - prefix_len;
845
846 ret = flb_regex_match(ctx->regex,
847 (unsigned char *) str_to_be_matcheds,
848 len_to_be_matched);
849
850 /* 1 -> match; 0 -> doesn't match; < 0 -> error */
851 return ret;
852 }
853
854 static void cb_results(const char *name, const char *value,
855 size_t vlen, void *data);
856 /*
857 * extract_resource_labels_from_regex(4) will only be called if the
858 * tag or local_resource_id field matches the regex rule
859 */
extract_resource_labels_from_regex(struct flb_stackdriver * ctx,const char * tag,int tag_len,int from_tag)860 static int extract_resource_labels_from_regex(struct flb_stackdriver *ctx,
861 const char *tag, int tag_len,
862 int from_tag)
863 {
864 int ret = 1;
865 int prefix_len;
866 int len_to_be_matched;
867 int local_resource_id_len;
868 const char *str_to_be_matcheds;
869 struct flb_regex_search result;
870
871 prefix_len = flb_sds_len(ctx->tag_prefix);
872 if (from_tag == FLB_TRUE) {
873 local_resource_id_len = tag_len;
874 str_to_be_matcheds = tag + prefix_len;
875 }
876 else {
877 // this will be called only if the payload contains local_resource_id
878 local_resource_id_len = flb_sds_len(ctx->local_resource_id);
879 str_to_be_matcheds = ctx->local_resource_id + prefix_len;
880 }
881
882 len_to_be_matched = local_resource_id_len - prefix_len;
883 ret = flb_regex_do(ctx->regex, str_to_be_matcheds, len_to_be_matched, &result);
884 if (ret <= 0) {
885 flb_plg_warn(ctx->ins, "invalid pattern for given value %s when"
886 " extracting resource labels", str_to_be_matcheds);
887 return -1;
888 }
889
890 flb_regex_parse(ctx->regex, &result, cb_results, ctx);
891
892 return ret;
893 }
894
process_local_resource_id(struct flb_stackdriver * ctx,const char * tag,int tag_len,char * type)895 static int process_local_resource_id(struct flb_stackdriver *ctx,
896 const char *tag, int tag_len, char *type)
897 {
898 int ret;
899
900 // parsing local_resource_id from tag takes higher priority
901 if (is_tag_match_regex(ctx, tag, tag_len) > 0) {
902 ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_TRUE);
903 }
904 else if (is_local_resource_id_match_regex(ctx) > 0) {
905 ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_FALSE);
906 }
907 else {
908 ret = set_monitored_resource_labels(ctx, type);
909 }
910
911 return ret;
912 }
913
914 /*
915 * parse_labels
916 * - Iterate throught the original payload (obj) and find out the entry that matches
917 * the labels_key
918 * - Used to convert all labels under labels_key to root-level `labels` field
919 */
parse_labels(struct flb_stackdriver * ctx,msgpack_object * obj)920 static msgpack_object *parse_labels(struct flb_stackdriver *ctx, msgpack_object *obj)
921 {
922 int i;
923 int len;
924 msgpack_object_kv *kv = NULL;
925
926 if (!obj || obj->type != MSGPACK_OBJECT_MAP) {
927 return NULL;
928 }
929
930 len = flb_sds_len(ctx->labels_key);
931 for (i = 0; i < obj->via.map.size; i++) {
932 kv = &obj->via.map.ptr[i];
933 if (flb_sds_casecmp(ctx->labels_key, kv->key.via.str.ptr, len) == 0) {
934 /* only the first matching entry will be returned */
935 return &kv->val;
936 }
937 }
938
939 //flb_plg_debug(ctx->ins, "labels_key [%s] not found in the payload",
940 // ctx->labels_key);
941 return NULL;
942 }
943
cb_results(const char * name,const char * value,size_t vlen,void * data)944 static void cb_results(const char *name, const char *value,
945 size_t vlen, void *data)
946 {
947 struct flb_stackdriver *ctx = data;
948
949 if (vlen == 0) {
950 return;
951 }
952
953 if (strcmp(name, "pod_name") == 0) {
954 if (ctx->pod_name != NULL) {
955 flb_sds_destroy(ctx->pod_name);
956 }
957 ctx->pod_name = flb_sds_create_len(value, vlen);
958 }
959 else if (strcmp(name, "namespace_name") == 0) {
960 if (ctx->namespace_name != NULL) {
961 flb_sds_destroy(ctx->namespace_name);
962 }
963 ctx->namespace_name = flb_sds_create_len(value, vlen);
964 }
965 else if (strcmp(name, "container_name") == 0) {
966 if (ctx->container_name != NULL) {
967 flb_sds_destroy(ctx->container_name);
968 }
969 ctx->container_name = flb_sds_create_len(value, vlen);
970 }
971 else if (strcmp(name, "node_name") == 0) {
972 if (ctx->node_name != NULL) {
973 flb_sds_destroy(ctx->node_name);
974 }
975 ctx->node_name = flb_sds_create_len(value, vlen);
976 }
977
978 return;
979 }
980
flb_stackdriver_regex_init(struct flb_stackdriver * ctx)981 int flb_stackdriver_regex_init(struct flb_stackdriver *ctx)
982 {
983 /* If a custom regex is not set, use the defaults */
984 if (!ctx->custom_k8s_regex) {
985 ctx->custom_k8s_regex = flb_sds_create(DEFAULT_TAG_REGEX);
986 if (!ctx->custom_k8s_regex) {
987 return -1;
988 }
989 }
990
991 ctx->regex = flb_regex_create(ctx->custom_k8s_regex);
992 if (!ctx->regex) {
993 return -1;
994 }
995
996 return 0;
997 }
998
cb_stackdriver_init(struct flb_output_instance * ins,struct flb_config * config,void * data)999 static int cb_stackdriver_init(struct flb_output_instance *ins,
1000 struct flb_config *config, void *data)
1001 {
1002 int ret;
1003 int io_flags = FLB_IO_TLS;
1004 char *token;
1005 struct flb_stackdriver *ctx;
1006
1007 /* Create config context */
1008 ctx = flb_stackdriver_conf_create(ins, config);
1009 if (!ctx) {
1010 flb_plg_error(ins, "configuration failed");
1011 return -1;
1012 }
1013
1014 /* Load config map */
1015 ret = flb_output_config_map_set(ins, (void *) ctx);
1016 if (ret == -1) {
1017 return -1;
1018 }
1019
1020 /* Set context */
1021 flb_output_set_context(ins, ctx);
1022
1023 /* Network mode IPv6 */
1024 if (ins->host.ipv6 == FLB_TRUE) {
1025 io_flags |= FLB_IO_IPV6;
1026 }
1027
1028 /* Initialize oauth2 cache pthread keys */
1029 oauth2_cache_init();
1030
1031 /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */
1032 pthread_mutex_init(&ctx->token_mutex, NULL);
1033
1034 /* Create Upstream context for Stackdriver Logging (no oauth2 service) */
1035 ctx->u = flb_upstream_create_url(config, FLB_STD_WRITE_URL,
1036 io_flags, ins->tls);
1037 ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_server,
1038 FLB_IO_TCP, NULL);
1039
1040 /* Create oauth2 context */
1041 ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000);
1042
1043 if (!ctx->u) {
1044 flb_plg_error(ctx->ins, "upstream creation failed");
1045 return -1;
1046 }
1047 if (!ctx->metadata_u) {
1048 flb_plg_error(ctx->ins, "metadata upstream creation failed");
1049 return -1;
1050 }
1051 if (!ctx->o) {
1052 flb_plg_error(ctx->ins, "cannot create oauth2 context");
1053 return -1;
1054 }
1055 flb_output_upstream_set(ctx->u, ins);
1056
1057 /* Metadata Upstream Sync flags */
1058 ctx->metadata_u->flags &= ~FLB_IO_ASYNC;
1059
1060 if (ins->test_mode == FLB_FALSE) {
1061 /* Retrieve oauth2 token */
1062 token = get_google_token(ctx);
1063 if (!token) {
1064 flb_plg_warn(ctx->ins, "token retrieval failed");
1065 }
1066 else {
1067 flb_sds_destroy(token);
1068 }
1069 }
1070
1071 if (ctx->metadata_server_auth) {
1072 ret = gce_metadata_read_project_id(ctx);
1073 if (ret == -1) {
1074 return -1;
1075 }
1076
1077 if (!ctx->is_generic_resource_type) {
1078 ret = gce_metadata_read_zone(ctx);
1079 if (ret == -1) {
1080 return -1;
1081 }
1082
1083 ret = gce_metadata_read_instance_id(ctx);
1084 if (ret == -1) {
1085 return -1;
1086 }
1087 }
1088 }
1089
1090 /* Validate project_id */
1091 if (!ctx->project_id) {
1092 flb_plg_error(ctx->ins, "property 'project_id' is not set");
1093 return -1;
1094 }
1095
1096 if (!ctx->export_to_project_id) {
1097 ctx->export_to_project_id = flb_sds_create(ctx->project_id);
1098 }
1099
1100 ret = flb_stackdriver_regex_init(ctx);
1101 if (ret == -1) {
1102 flb_plg_error(ctx->ins, "failed to init stackdriver custom regex");
1103 return -1;
1104 }
1105
1106 return 0;
1107 }
1108
validate_severity_level(severity_t * s,const char * str,const unsigned int str_size)1109 static int validate_severity_level(severity_t * s,
1110 const char * str,
1111 const unsigned int str_size)
1112 {
1113 int i = 0;
1114
1115 const static struct {
1116 severity_t s;
1117 const unsigned int str_size;
1118 const char * str;
1119 } enum_mapping[] = {
1120 {FLB_STD_EMERGENCY, 9, "EMERGENCY"},
1121 {FLB_STD_EMERGENCY, 5, "EMERG" },
1122
1123 {FLB_STD_ALERT , 1, "A" },
1124 {FLB_STD_ALERT , 5, "ALERT" },
1125
1126 {FLB_STD_CRITICAL , 1, "C" },
1127 {FLB_STD_CRITICAL , 1, "F" },
1128 {FLB_STD_CRITICAL , 4, "CRIT" },
1129 {FLB_STD_CRITICAL , 5, "FATAL" },
1130 {FLB_STD_CRITICAL , 8, "CRITICAL" },
1131
1132 {FLB_STD_ERROR , 1, "E" },
1133 {FLB_STD_ERROR , 3, "ERR" },
1134 {FLB_STD_ERROR , 5, "ERROR" },
1135 {FLB_STD_ERROR , 6, "SEVERE" },
1136
1137 {FLB_STD_WARNING , 1, "W" },
1138 {FLB_STD_WARNING , 4, "WARN" },
1139 {FLB_STD_WARNING , 7, "WARNING" },
1140
1141 {FLB_STD_NOTICE , 1, "N" },
1142 {FLB_STD_NOTICE , 6, "NOTICE" },
1143
1144 {FLB_STD_INFO , 1, "I" },
1145 {FLB_STD_INFO , 4, "INFO" },
1146
1147 {FLB_STD_DEBUG , 1, "D" },
1148 {FLB_STD_DEBUG , 5, "DEBUG" },
1149 {FLB_STD_DEBUG , 5, "TRACE" },
1150 {FLB_STD_DEBUG , 9, "TRACE_INT"},
1151 {FLB_STD_DEBUG , 4, "FINE" },
1152 {FLB_STD_DEBUG , 5, "FINER" },
1153 {FLB_STD_DEBUG , 6, "FINEST" },
1154 {FLB_STD_DEBUG , 6, "CONFIG" },
1155
1156 {FLB_STD_DEFAULT , 7, "DEFAULT" }
1157 };
1158
1159 for (i = 0; i < sizeof (enum_mapping) / sizeof (enum_mapping[0]); ++i) {
1160 if (enum_mapping[i].str_size != str_size) {
1161 continue;
1162 }
1163
1164 if (strncasecmp(str, enum_mapping[i].str, str_size) == 0) {
1165 *s = enum_mapping[i].s;
1166 return 0;
1167 }
1168 }
1169 return -1;
1170 }
1171
get_msgpack_obj(msgpack_object * subobj,const msgpack_object * o,const flb_sds_t key,const int key_size,msgpack_object_type type)1172 static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o,
1173 const flb_sds_t key, const int key_size,
1174 msgpack_object_type type)
1175 {
1176 int i = 0;
1177 msgpack_object_kv * p = NULL;
1178
1179 if (o == NULL || subobj == NULL) {
1180 return -1;
1181 }
1182
1183 for (i = 0; i < o->via.map.size; i++) {
1184 p = &o->via.map.ptr[i];
1185 if (p->val.type != type) {
1186 continue;
1187 }
1188
1189 if (flb_sds_cmp(key, p->key.via.str.ptr, p->key.via.str.size) == 0) {
1190 *subobj = p->val;
1191 return 0;
1192 }
1193 }
1194 return -1;
1195 }
1196
get_string(flb_sds_t * s,const msgpack_object * o,const flb_sds_t key)1197 static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key)
1198 {
1199 msgpack_object tmp;
1200 if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) {
1201 *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size);
1202 return 0;
1203 }
1204
1205 *s = 0;
1206 return -1;
1207 }
1208
get_severity_level(severity_t * s,const msgpack_object * o,const flb_sds_t key)1209 static int get_severity_level(severity_t * s, const msgpack_object * o,
1210 const flb_sds_t key)
1211 {
1212 msgpack_object tmp;
1213 if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0
1214 && validate_severity_level(s, tmp.via.str.ptr, tmp.via.str.size) == 0) {
1215 return 0;
1216 }
1217 *s = 0;
1218 return -1;
1219 }
1220
validate_insert_id(msgpack_object * insert_id_value,const msgpack_object * obj)1221 static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
1222 const msgpack_object * obj)
1223 {
1224 int i = 0;
1225 msgpack_object_kv * p = NULL;
1226 insert_id_status ret = INSERTID_NOT_PRESENT;
1227
1228 if (obj == NULL) {
1229 return ret;
1230 }
1231
1232 for (i = 0; i < obj->via.map.size; i++) {
1233 p = &obj->via.map.ptr[i];
1234 if (p->key.type != MSGPACK_OBJECT_STR) {
1235 continue;
1236 }
1237 if (validate_key(p->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
1238 if (p->val.type == MSGPACK_OBJECT_STR && p->val.via.str.size > 0) {
1239 *insert_id_value = p->val;
1240 ret = INSERTID_VALID;
1241 }
1242 else {
1243 ret = INSERTID_INVALID;
1244 }
1245 break;
1246 }
1247 }
1248 return ret;
1249 }
1250
pack_json_payload(int insert_id_extracted,int operation_extracted,int operation_extra_size,int source_location_extracted,int source_location_extra_size,int http_request_extracted,int http_request_extra_size,timestamp_status tms_status,msgpack_packer * mp_pck,msgpack_object * obj,struct flb_stackdriver * ctx)1251 static int pack_json_payload(int insert_id_extracted,
1252 int operation_extracted, int operation_extra_size,
1253 int source_location_extracted,
1254 int source_location_extra_size,
1255 int http_request_extracted,
1256 int http_request_extra_size,
1257 timestamp_status tms_status,
1258 msgpack_packer *mp_pck, msgpack_object *obj,
1259 struct flb_stackdriver *ctx)
1260 {
1261 /* Specified fields include local_resource_id, operation, sourceLocation ... */
1262 int i, j;
1263 int to_remove = 0;
1264 int ret;
1265 int map_size;
1266 int new_map_size;
1267 int len;
1268 int len_to_be_removed;
1269 int key_not_found;
1270 flb_sds_t removed;
1271 flb_sds_t monitored_resource_key;
1272 flb_sds_t local_resource_id_key;
1273 flb_sds_t stream;
1274 msgpack_object_kv *kv = obj->via.map.ptr;
1275 msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
1276
1277 monitored_resource_key = flb_sds_create(MONITORED_RESOURCE_KEY);
1278 local_resource_id_key = flb_sds_create(LOCAL_RESOURCE_ID_KEY);
1279 stream = flb_sds_create("stream");
1280 /*
1281 * array of elements that need to be removed from payload,
1282 * special field 'operation' will be processed individually
1283 */
1284 flb_sds_t to_be_removed[] =
1285 {
1286 monitored_resource_key,
1287 local_resource_id_key,
1288 ctx->labels_key,
1289 ctx->severity_key,
1290 ctx->trace_key,
1291 ctx->log_name_key,
1292 stream
1293 /* more special fields are required to be added, but, if this grows with more
1294 than a few records, it might need to be converted to flb_hash
1295 */
1296 };
1297
1298 if (insert_id_extracted == FLB_TRUE) {
1299 to_remove += 1;
1300 }
1301 if (operation_extracted == FLB_TRUE && operation_extra_size == 0) {
1302 to_remove += 1;
1303 }
1304 if (source_location_extracted == FLB_TRUE && source_location_extra_size == 0) {
1305 to_remove += 1;
1306 }
1307 if (http_request_extracted == FLB_TRUE && http_request_extra_size == 0) {
1308 to_remove += 1;
1309 }
1310 if (tms_status == FORMAT_TIMESTAMP_OBJECT) {
1311 to_remove += 1;
1312 }
1313 if (tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
1314 to_remove += 2;
1315 }
1316
1317 map_size = obj->via.map.size;
1318 len_to_be_removed = sizeof(to_be_removed) / sizeof(to_be_removed[0]);
1319 for (i = 0; i < map_size; i++) {
1320 kv = &obj->via.map.ptr[i];
1321 len = kv->key.via.str.size;
1322 for (j = 0; j < len_to_be_removed; j++) {
1323 removed = to_be_removed[j];
1324 /*
1325 * check length of key to avoid partial matching
1326 * e.g. labels key = labels && kv->key = labelss
1327 */
1328 if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
1329 to_remove += 1;
1330 break;
1331 }
1332 }
1333 }
1334
1335 new_map_size = map_size - to_remove;
1336
1337 ret = msgpack_pack_map(mp_pck, new_map_size);
1338 if (ret < 0) {
1339 goto error;
1340 }
1341
1342 /* points back to the beginning of map */
1343 kv = obj->via.map.ptr;
1344 for(; kv != kvend; ++kv ) {
1345 key_not_found = 1;
1346
1347 /* processing logging.googleapis.com/insertId */
1348 if (insert_id_extracted == FLB_TRUE
1349 && validate_key(kv->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
1350 continue;
1351 }
1352
1353 /* processing logging.googleapis.com/operation */
1354 if (validate_key(kv->key, OPERATION_FIELD_IN_JSON,
1355 OPERATION_KEY_SIZE)
1356 && kv->val.type == MSGPACK_OBJECT_MAP) {
1357 if (operation_extra_size > 0) {
1358 msgpack_pack_object(mp_pck, kv->key);
1359 pack_extra_operation_subfields(mp_pck, &kv->val, operation_extra_size);
1360 }
1361 continue;
1362 }
1363
1364 if (validate_key(kv->key, SOURCELOCATION_FIELD_IN_JSON,
1365 SOURCE_LOCATION_SIZE)
1366 && kv->val.type == MSGPACK_OBJECT_MAP) {
1367
1368 if (source_location_extra_size > 0) {
1369 msgpack_pack_object(mp_pck, kv->key);
1370 pack_extra_source_location_subfields(mp_pck, &kv->val,
1371 source_location_extra_size);
1372 }
1373 continue;
1374 }
1375
1376 if (validate_key(kv->key, ctx->http_request_key,
1377 ctx->http_request_key_size)
1378 && kv->val.type == MSGPACK_OBJECT_MAP) {
1379
1380 if(http_request_extra_size > 0) {
1381 msgpack_pack_object(mp_pck, kv->key);
1382 pack_extra_http_request_subfields(mp_pck, &kv->val,
1383 http_request_extra_size);
1384 }
1385 continue;
1386 }
1387
1388 if (validate_key(kv->key, "timestamp", 9)
1389 && tms_status == FORMAT_TIMESTAMP_OBJECT) {
1390 continue;
1391 }
1392
1393 if (validate_key(kv->key, "timestampSeconds", 16)
1394 && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
1395 continue;
1396 }
1397 if (validate_key(kv->key, "timestampNanos", 14)
1398 && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
1399 continue;
1400 }
1401
1402 len = kv->key.via.str.size;
1403 for (j = 0; j < len_to_be_removed; j++) {
1404 removed = to_be_removed[j];
1405 if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
1406 key_not_found = 0;
1407 break;
1408 }
1409 }
1410
1411 if (key_not_found) {
1412 ret = msgpack_pack_object(mp_pck, kv->key);
1413 if (ret < 0) {
1414 goto error;
1415 }
1416 ret = msgpack_pack_object(mp_pck, kv->val);
1417 if (ret < 0) {
1418 goto error;
1419 }
1420 }
1421 }
1422
1423 flb_sds_destroy(monitored_resource_key);
1424 flb_sds_destroy(local_resource_id_key);
1425 flb_sds_destroy(stream);
1426 return 0;
1427
1428 error:
1429 flb_sds_destroy(monitored_resource_key);
1430 flb_sds_destroy(local_resource_id_key);
1431 flb_sds_destroy(stream);
1432 return ret;
1433 }
1434
stackdriver_format(struct flb_config * config,struct flb_input_instance * ins,void * plugin_context,void * flush_ctx,const char * tag,int tag_len,const void * data,size_t bytes,void ** out_data,size_t * out_size)1435 static int stackdriver_format(struct flb_config *config,
1436 struct flb_input_instance *ins,
1437 void *plugin_context,
1438 void *flush_ctx,
1439 const char *tag, int tag_len,
1440 const void *data, size_t bytes,
1441 void **out_data, size_t *out_size)
1442 {
1443 int len;
1444 int ret;
1445 int array_size = 0;
1446 /* The default value is 3: timestamp, jsonPayload, logName. */
1447 int entry_size = 3;
1448 size_t s;
1449 size_t off = 0;
1450 char path[PATH_MAX];
1451 char time_formatted[255];
1452 const char *newtag;
1453 const char *new_log_name;
1454 msgpack_object *obj;
1455 msgpack_object *labels_ptr;
1456 msgpack_unpacked result;
1457 msgpack_sbuffer mp_sbuf;
1458 msgpack_packer mp_pck;
1459 flb_sds_t out_buf;
1460 struct flb_stackdriver *ctx = plugin_context;
1461
1462 /* Parameters for severity */
1463 int severity_extracted = FLB_FALSE;
1464 severity_t severity;
1465
1466 /* Parameters for trace */
1467 int trace_extracted = FLB_FALSE;
1468 flb_sds_t trace;
1469 char stackdriver_trace[PATH_MAX];
1470 const char *new_trace;
1471
1472 /* Parameters for log name */
1473 int log_name_extracted = FLB_FALSE;
1474 flb_sds_t log_name = NULL;
1475 flb_sds_t stream = NULL;
1476 flb_sds_t stream_key;
1477
1478 /* Parameters for insertId */
1479 msgpack_object insert_id_obj;
1480 insert_id_status in_status;
1481 int insert_id_extracted;
1482
1483 /* Parameters in Operation */
1484 flb_sds_t operation_id;
1485 flb_sds_t operation_producer;
1486 int operation_first = FLB_FALSE;
1487 int operation_last = FLB_FALSE;
1488 int operation_extracted = FLB_FALSE;
1489 int operation_extra_size = 0;
1490
1491 /* Parameters for sourceLocation */
1492 flb_sds_t source_location_file;
1493 int64_t source_location_line = 0;
1494 flb_sds_t source_location_function;
1495 int source_location_extracted = FLB_FALSE;
1496 int source_location_extra_size = 0;
1497
1498 /* Parameters for httpRequest */
1499 struct http_request_field http_request;
1500 int http_request_extracted = FLB_FALSE;
1501 int http_request_extra_size = 0;
1502
1503 /* Parameters for Timestamp */
1504 struct tm tm;
1505 struct flb_time tms;
1506 timestamp_status tms_status;
1507
1508 /* Count number of records */
1509 array_size = flb_mp_count(data, bytes);
1510
1511 /*
1512 * Search each entry and validate insertId.
1513 * Reject the entry if insertId is invalid.
1514 * If all the entries are rejected, stop formatting.
1515 *
1516 */
1517 off = 0;
1518 msgpack_unpacked_init(&result);
1519 while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
1520 flb_time_pop_from_msgpack(&tms, &result, &obj);
1521
1522 /* Extract insertId */
1523 in_status = validate_insert_id(&insert_id_obj, obj);
1524 if (in_status == INSERTID_INVALID) {
1525 flb_plg_error(ctx->ins,
1526 "Incorrect insertId received. InsertId should be non-empty string.");
1527 array_size -= 1;
1528 }
1529 }
1530 msgpack_unpacked_destroy(&result);
1531
1532 if (array_size == 0) {
1533 *out_size = 0;
1534 return -1;
1535 }
1536
1537 /* Create temporal msgpack buffer */
1538 msgpack_sbuffer_init(&mp_sbuf);
1539 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1540
1541 /*
1542 * Pack root map (resource & entries):
1543 *
1544 * {"resource": {"type": "...", "labels": {...},
1545 * "entries": []
1546 */
1547 msgpack_pack_map(&mp_pck, 2);
1548
1549 msgpack_pack_str(&mp_pck, 8);
1550 msgpack_pack_str_body(&mp_pck, "resource", 8);
1551
1552 /* type & labels */
1553 msgpack_pack_map(&mp_pck, 2);
1554
1555 /* type */
1556 msgpack_pack_str(&mp_pck, 4);
1557 msgpack_pack_str_body(&mp_pck, "type", 4);
1558 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->resource));
1559 msgpack_pack_str_body(&mp_pck, ctx->resource,
1560 flb_sds_len(ctx->resource));
1561
1562 msgpack_pack_str(&mp_pck, 6);
1563 msgpack_pack_str_body(&mp_pck, "labels", 6);
1564
1565 if (ctx->is_k8s_resource_type) {
1566 ret = extract_local_resource_id(data, bytes, ctx, tag);
1567 if (ret != 0) {
1568 flb_plg_error(ctx->ins, "fail to construct local_resource_id");
1569 msgpack_sbuffer_destroy(&mp_sbuf);
1570 return -1;
1571 }
1572 }
1573
1574 ret = parse_monitored_resource(ctx, data, bytes, &mp_pck);
1575 if (ret != 0) {
1576 if (strcmp(ctx->resource, "global") == 0) {
1577 /* global resource has field project_id */
1578 msgpack_pack_map(&mp_pck, 1);
1579 msgpack_pack_str(&mp_pck, 10);
1580 msgpack_pack_str_body(&mp_pck, "project_id", 10);
1581 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1582 msgpack_pack_str_body(&mp_pck,
1583 ctx->project_id, flb_sds_len(ctx->project_id));
1584 }
1585 else if (ctx->is_generic_resource_type) {
1586 if (strcmp(ctx->resource, "generic_node") == 0) {
1587 /* generic_node has fields project_id, location, namespace, node_id */
1588 msgpack_pack_map(&mp_pck, 4);
1589
1590 msgpack_pack_str(&mp_pck, 7);
1591 msgpack_pack_str_body(&mp_pck, "node_id", 7);
1592 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_id));
1593 msgpack_pack_str_body(&mp_pck,
1594 ctx->node_id, flb_sds_len(ctx->node_id));
1595 }
1596 else {
1597 /* generic_task has fields project_id, location, namespace, job, task_id */
1598 msgpack_pack_map(&mp_pck, 5);
1599
1600 msgpack_pack_str(&mp_pck, 3);
1601 msgpack_pack_str_body(&mp_pck, "job", 3);
1602 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->job));
1603 msgpack_pack_str_body(&mp_pck,
1604 ctx->job, flb_sds_len(ctx->job));
1605
1606 msgpack_pack_str(&mp_pck, 7);
1607 msgpack_pack_str_body(&mp_pck, "task_id", 7);
1608 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->task_id));
1609 msgpack_pack_str_body(&mp_pck,
1610 ctx->task_id, flb_sds_len(ctx->task_id));
1611 }
1612
1613 msgpack_pack_str(&mp_pck, 10);
1614 msgpack_pack_str_body(&mp_pck, "project_id", 10);
1615 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1616 msgpack_pack_str_body(&mp_pck,
1617 ctx->project_id, flb_sds_len(ctx->project_id));
1618
1619 msgpack_pack_str(&mp_pck, 8);
1620 msgpack_pack_str_body(&mp_pck, "location", 8);
1621 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->location));
1622 msgpack_pack_str_body(&mp_pck,
1623 ctx->location, flb_sds_len(ctx->location));
1624
1625 msgpack_pack_str(&mp_pck, 9);
1626 msgpack_pack_str_body(&mp_pck, "namespace", 9);
1627 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_id));
1628 msgpack_pack_str_body(&mp_pck,
1629 ctx->namespace_id, flb_sds_len(ctx->namespace_id));
1630 }
1631 else if (strcmp(ctx->resource, "gce_instance") == 0) {
1632 /* gce_instance resource has fields project_id, zone, instance_id */
1633 msgpack_pack_map(&mp_pck, 3);
1634
1635 msgpack_pack_str(&mp_pck, 10);
1636 msgpack_pack_str_body(&mp_pck, "project_id", 10);
1637 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1638 msgpack_pack_str_body(&mp_pck,
1639 ctx->project_id, flb_sds_len(ctx->project_id));
1640
1641 msgpack_pack_str(&mp_pck, 4);
1642 msgpack_pack_str_body(&mp_pck, "zone", 4);
1643 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->zone));
1644 msgpack_pack_str_body(&mp_pck, ctx->zone, flb_sds_len(ctx->zone));
1645
1646 msgpack_pack_str(&mp_pck, 11);
1647 msgpack_pack_str_body(&mp_pck, "instance_id", 11);
1648 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->instance_id));
1649 msgpack_pack_str_body(&mp_pck,
1650 ctx->instance_id, flb_sds_len(ctx->instance_id));
1651 }
1652 else if (strcmp(ctx->resource, K8S_CONTAINER) == 0) {
1653 /* k8s_container resource has fields project_id, location, cluster_name,
1654 * namespace_name, pod_name, container_name
1655 *
1656 * The local_resource_id for k8s_container is in format:
1657 * k8s_container.<namespace_name>.<pod_name>.<container_name>
1658 */
1659
1660 ret = process_local_resource_id(ctx, tag, tag_len, K8S_CONTAINER);
1661 if (ret == -1) {
1662 flb_plg_error(ctx->ins, "fail to extract resource labels "
1663 "for k8s_container resource type");
1664 msgpack_sbuffer_destroy(&mp_sbuf);
1665 return -1;
1666 }
1667
1668 msgpack_pack_map(&mp_pck, 6);
1669
1670 msgpack_pack_str(&mp_pck, 10);
1671 msgpack_pack_str_body(&mp_pck, "project_id", 10);
1672 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1673 msgpack_pack_str_body(&mp_pck,
1674 ctx->project_id, flb_sds_len(ctx->project_id));
1675
1676 msgpack_pack_str(&mp_pck, 8);
1677 msgpack_pack_str_body(&mp_pck, "location", 8);
1678 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
1679 msgpack_pack_str_body(&mp_pck,
1680 ctx->cluster_location, flb_sds_len(ctx->cluster_location));
1681
1682 msgpack_pack_str(&mp_pck, 12);
1683 msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
1684 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
1685 msgpack_pack_str_body(&mp_pck,
1686 ctx->cluster_name, flb_sds_len(ctx->cluster_name));
1687
1688 msgpack_pack_str(&mp_pck, 14);
1689 msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
1690 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
1691 msgpack_pack_str_body(&mp_pck,
1692 ctx->namespace_name, flb_sds_len(ctx->namespace_name));
1693
1694 msgpack_pack_str(&mp_pck, 8);
1695 msgpack_pack_str_body(&mp_pck, "pod_name", 8);
1696 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
1697 msgpack_pack_str_body(&mp_pck,
1698 ctx->pod_name, flb_sds_len(ctx->pod_name));
1699
1700 msgpack_pack_str(&mp_pck, 14);
1701 msgpack_pack_str_body(&mp_pck, "container_name", 14);
1702 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name));
1703 msgpack_pack_str_body(&mp_pck,
1704 ctx->container_name, flb_sds_len(ctx->container_name));
1705 }
1706 else if (strcmp(ctx->resource, K8S_NODE) == 0) {
1707 /* k8s_node resource has fields project_id, location, cluster_name, node_name
1708 *
1709 * The local_resource_id for k8s_node is in format:
1710 * k8s_node.<node_name>
1711 */
1712
1713 ret = process_local_resource_id(ctx, tag, tag_len, K8S_NODE);
1714 if (ret == -1) {
1715 flb_plg_error(ctx->ins, "fail to process local_resource_id from "
1716 "log entry for k8s_node");
1717 msgpack_sbuffer_destroy(&mp_sbuf);
1718 return -1;
1719 }
1720
1721 msgpack_pack_map(&mp_pck, 4);
1722
1723 msgpack_pack_str(&mp_pck, 10);
1724 msgpack_pack_str_body(&mp_pck, "project_id", 10);
1725 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1726 msgpack_pack_str_body(&mp_pck,
1727 ctx->project_id, flb_sds_len(ctx->project_id));
1728
1729 msgpack_pack_str(&mp_pck, 8);
1730 msgpack_pack_str_body(&mp_pck, "location", 8);
1731 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
1732 msgpack_pack_str_body(&mp_pck,
1733 ctx->cluster_location, flb_sds_len(ctx->cluster_location));
1734
1735 msgpack_pack_str(&mp_pck, 12);
1736 msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
1737 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
1738 msgpack_pack_str_body(&mp_pck,
1739 ctx->cluster_name, flb_sds_len(ctx->cluster_name));
1740
1741 msgpack_pack_str(&mp_pck, 9);
1742 msgpack_pack_str_body(&mp_pck, "node_name", 9);
1743 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name));
1744 msgpack_pack_str_body(&mp_pck,
1745 ctx->node_name, flb_sds_len(ctx->node_name));
1746 }
1747 else if (strcmp(ctx->resource, K8S_POD) == 0) {
1748 /* k8s_pod resource has fields project_id, location, cluster_name,
1749 * namespace_name, pod_name.
1750 *
1751 * The local_resource_id for k8s_pod is in format:
1752 * k8s_pod.<namespace_name>.<pod_name>
1753 */
1754
1755 ret = process_local_resource_id(ctx, tag, tag_len, K8S_POD);
1756 if (ret != 0) {
1757 flb_plg_error(ctx->ins, "fail to process local_resource_id from "
1758 "log entry for k8s_pod");
1759 msgpack_sbuffer_destroy(&mp_sbuf);
1760 return -1;
1761 }
1762
1763 msgpack_pack_map(&mp_pck, 5);
1764
1765 msgpack_pack_str(&mp_pck, 10);
1766 msgpack_pack_str_body(&mp_pck, "project_id", 10);
1767 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1768 msgpack_pack_str_body(&mp_pck,
1769 ctx->project_id, flb_sds_len(ctx->project_id));
1770
1771 msgpack_pack_str(&mp_pck, 8);
1772 msgpack_pack_str_body(&mp_pck, "location", 8);
1773 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
1774 msgpack_pack_str_body(&mp_pck,
1775 ctx->cluster_location, flb_sds_len(ctx->cluster_location));
1776
1777 msgpack_pack_str(&mp_pck, 12);
1778 msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
1779 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
1780 msgpack_pack_str_body(&mp_pck,
1781 ctx->cluster_name, flb_sds_len(ctx->cluster_name));
1782
1783 msgpack_pack_str(&mp_pck, 14);
1784 msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
1785 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
1786 msgpack_pack_str_body(&mp_pck,
1787 ctx->namespace_name, flb_sds_len(ctx->namespace_name));
1788
1789 msgpack_pack_str(&mp_pck, 8);
1790 msgpack_pack_str_body(&mp_pck, "pod_name", 8);
1791 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
1792 msgpack_pack_str_body(&mp_pck,
1793 ctx->pod_name, flb_sds_len(ctx->pod_name));
1794 }
1795 else {
1796 flb_plg_error(ctx->ins, "unsupported resource type '%s'",
1797 ctx->resource);
1798 msgpack_sbuffer_destroy(&mp_sbuf);
1799 return -1;
1800 }
1801 }
1802 msgpack_pack_str(&mp_pck, 7);
1803 msgpack_pack_str_body(&mp_pck, "entries", 7);
1804
1805 /* Append entries */
1806 msgpack_pack_array(&mp_pck, array_size);
1807
1808 off = 0;
1809 msgpack_unpacked_init(&result);
1810 while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
1811 /* Get timestamp */
1812 flb_time_pop_from_msgpack(&tms, &result, &obj);
1813 tms_status = extract_timestamp(obj, &tms);
1814
1815 /*
1816 * Pack entry
1817 *
1818 * {
1819 * "severity": "...",
1820 * "labels": "...",
1821 * "logName": "...",
1822 * "jsonPayload": {...},
1823 * "timestamp": "...",
1824 * "trace": "..."
1825 * }
1826 */
1827 entry_size = 3;
1828
1829 /* Extract severity */
1830 severity_extracted = FLB_FALSE;
1831 if (ctx->severity_key
1832 && get_severity_level(&severity, obj, ctx->severity_key) == 0) {
1833 severity_extracted = FLB_TRUE;
1834 entry_size += 1;
1835 }
1836
1837 /* Extract trace */
1838 trace_extracted = FLB_FALSE;
1839 if (ctx->trace_key
1840 && get_string(&trace, obj, ctx->trace_key) == 0) {
1841 trace_extracted = FLB_TRUE;
1842 entry_size += 1;
1843 }
1844
1845 /* Extract log name */
1846 log_name_extracted = FLB_FALSE;
1847 if (ctx->log_name_key
1848 && get_string(&log_name, obj, ctx->log_name_key) == 0) {
1849 log_name_extracted = FLB_TRUE;
1850 }
1851
1852 /* Extract insertId */
1853 in_status = validate_insert_id(&insert_id_obj, obj);
1854 if (in_status == INSERTID_VALID) {
1855 insert_id_extracted = FLB_TRUE;
1856 entry_size += 1;
1857 }
1858 else if (in_status == INSERTID_NOT_PRESENT) {
1859 insert_id_extracted = FLB_FALSE;
1860 }
1861 else {
1862 if (log_name_extracted == FLB_TRUE) {
1863 flb_sds_destroy(log_name);
1864 }
1865 continue;
1866 }
1867
1868 /* Extract operation */
1869 operation_id = flb_sds_create("");
1870 operation_producer = flb_sds_create("");
1871 operation_first = FLB_FALSE;
1872 operation_last = FLB_FALSE;
1873 operation_extra_size = 0;
1874 operation_extracted = extract_operation(&operation_id, &operation_producer,
1875 &operation_first, &operation_last, obj,
1876 &operation_extra_size);
1877
1878 if (operation_extracted == FLB_TRUE) {
1879 entry_size += 1;
1880 }
1881
1882 /* Extract sourceLocation */
1883 source_location_file = flb_sds_create("");
1884 source_location_line = 0;
1885 source_location_function = flb_sds_create("");
1886 source_location_extra_size = 0;
1887 source_location_extracted = extract_source_location(&source_location_file,
1888 &source_location_line,
1889 &source_location_function,
1890 obj,
1891 &source_location_extra_size);
1892
1893 if (source_location_extracted == FLB_TRUE) {
1894 entry_size += 1;
1895 }
1896
1897 /* Extract httpRequest */
1898 init_http_request(&http_request);
1899 http_request_extra_size = 0;
1900 http_request_extracted = extract_http_request(&http_request, obj,
1901 &http_request_extra_size);
1902 if (http_request_extracted == FLB_TRUE) {
1903 entry_size += 1;
1904 }
1905
1906 /* Extract labels */
1907 labels_ptr = parse_labels(ctx, obj);
1908 if (labels_ptr != NULL) {
1909 if (labels_ptr->type != MSGPACK_OBJECT_MAP) {
1910 flb_plg_error(ctx->ins, "the type of labels should be map");
1911 flb_sds_destroy(operation_id);
1912 flb_sds_destroy(operation_producer);
1913 msgpack_unpacked_destroy(&result);
1914 msgpack_sbuffer_destroy(&mp_sbuf);
1915 return -1;
1916 }
1917 entry_size += 1;
1918 }
1919
1920 msgpack_pack_map(&mp_pck, entry_size);
1921
1922 /* Add severity into the log entry */
1923 if (severity_extracted == FLB_TRUE) {
1924 msgpack_pack_str(&mp_pck, 8);
1925 msgpack_pack_str_body(&mp_pck, "severity", 8);
1926 msgpack_pack_int(&mp_pck, severity);
1927 }
1928
1929 /* Add trace into the log entry */
1930 if (trace_extracted == FLB_TRUE) {
1931 msgpack_pack_str(&mp_pck, 5);
1932 msgpack_pack_str_body(&mp_pck, "trace", 5);
1933
1934 if (ctx->autoformat_stackdriver_trace) {
1935 len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1,
1936 "projects/%s/traces/%s", ctx->project_id, trace);
1937 new_trace = stackdriver_trace;
1938 }
1939 else {
1940 len = flb_sds_len(trace);
1941 new_trace = trace;
1942 }
1943
1944 msgpack_pack_str(&mp_pck, len);
1945 msgpack_pack_str_body(&mp_pck, new_trace, len);
1946 flb_sds_destroy(trace);
1947 }
1948
1949 /* Add insertId field into the log entry */
1950 if (insert_id_extracted == FLB_TRUE) {
1951 msgpack_pack_str(&mp_pck, 8);
1952 msgpack_pack_str_body(&mp_pck, "insertId", 8);
1953 msgpack_pack_object(&mp_pck, insert_id_obj);
1954 }
1955
1956 /* Add operation field into the log entry */
1957 if (operation_extracted == FLB_TRUE) {
1958 add_operation_field(&operation_id, &operation_producer,
1959 &operation_first, &operation_last, &mp_pck);
1960 }
1961
1962 /* Add sourceLocation field into the log entry */
1963 if (source_location_extracted == FLB_TRUE) {
1964 add_source_location_field(&source_location_file, source_location_line,
1965 &source_location_function, &mp_pck);
1966 }
1967
1968 /* Add httpRequest field into the log entry */
1969 if (http_request_extracted == FLB_TRUE) {
1970 add_http_request_field(&http_request, &mp_pck);
1971 }
1972
1973 /* labels */
1974 if (labels_ptr != NULL) {
1975 msgpack_pack_str(&mp_pck, 6);
1976 msgpack_pack_str_body(&mp_pck, "labels", 6);
1977 msgpack_pack_object(&mp_pck, *labels_ptr);
1978 }
1979
1980 /* Clean up id and producer if operation extracted */
1981 flb_sds_destroy(operation_id);
1982 flb_sds_destroy(operation_producer);
1983 flb_sds_destroy(source_location_file);
1984 flb_sds_destroy(source_location_function);
1985 destroy_http_request(&http_request);
1986
1987 /* jsonPayload */
1988 msgpack_pack_str(&mp_pck, 11);
1989 msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
1990 pack_json_payload(insert_id_extracted,
1991 operation_extracted, operation_extra_size,
1992 source_location_extracted,
1993 source_location_extra_size,
1994 http_request_extracted,
1995 http_request_extra_size,
1996 tms_status,
1997 &mp_pck, obj, ctx);
1998
1999 /* avoid modifying the original tag */
2000 newtag = tag;
2001 stream_key = flb_sds_create("stream");
2002 if (ctx->is_k8s_resource_type
2003 && get_string(&stream, obj, stream_key) == 0) {
2004 if (flb_sds_cmp(stream, STDOUT, flb_sds_len(stream)) == 0) {
2005 newtag = "stdout";
2006 }
2007 else if (flb_sds_cmp(stream, STDERR, flb_sds_len(stream)) == 0) {
2008 newtag = "stderr";
2009 }
2010 }
2011
2012 if (log_name_extracted == FLB_FALSE) {
2013 new_log_name = newtag;
2014 }
2015 else {
2016 new_log_name = log_name;
2017 }
2018
2019 /* logName */
2020 len = snprintf(path, sizeof(path) - 1,
2021 "projects/%s/logs/%s", ctx->export_to_project_id, new_log_name);
2022
2023 if (log_name_extracted == FLB_TRUE) {
2024 flb_sds_destroy(log_name);
2025 }
2026
2027 msgpack_pack_str(&mp_pck, 7);
2028 msgpack_pack_str_body(&mp_pck, "logName", 7);
2029 msgpack_pack_str(&mp_pck, len);
2030 msgpack_pack_str_body(&mp_pck, path, len);
2031 flb_sds_destroy(stream_key);
2032 flb_sds_destroy(stream);
2033
2034 /* timestamp */
2035 msgpack_pack_str(&mp_pck, 9);
2036 msgpack_pack_str_body(&mp_pck, "timestamp", 9);
2037
2038 /* Format the time */
2039 /*
2040 * If format is timestamp_object or timestamp_duo_fields,
2041 * tms has been updated.
2042 *
2043 * If timestamp is not presen,
2044 * use the default tms(current time).
2045 */
2046
2047 gmtime_r(&tms.tm.tv_sec, &tm);
2048 s = strftime(time_formatted, sizeof(time_formatted) - 1,
2049 FLB_STD_TIME_FMT, &tm);
2050 len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
2051 ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
2052 s += len;
2053
2054 msgpack_pack_str(&mp_pck, s);
2055 msgpack_pack_str_body(&mp_pck, time_formatted, s);
2056
2057 }
2058
2059 /* Convert from msgpack to JSON */
2060 out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
2061 msgpack_sbuffer_destroy(&mp_sbuf);
2062
2063 if (!out_buf) {
2064 flb_plg_error(ctx->ins, "error formatting JSON payload");
2065 msgpack_unpacked_destroy(&result);
2066 return -1;
2067 }
2068
2069 *out_data = out_buf;
2070 *out_size = flb_sds_len(out_buf);
2071
2072 return 0;
2073 }
2074
cb_stackdriver_flush(const void * data,size_t bytes,const char * tag,int tag_len,struct flb_input_instance * i_ins,void * out_context,struct flb_config * config)2075 static void cb_stackdriver_flush(const void *data, size_t bytes,
2076 const char *tag, int tag_len,
2077 struct flb_input_instance *i_ins,
2078 void *out_context,
2079 struct flb_config *config)
2080 {
2081 (void) i_ins;
2082 (void) config;
2083 int ret;
2084 int ret_code = FLB_RETRY;
2085 size_t b_sent;
2086 flb_sds_t token;
2087 flb_sds_t payload_buf;
2088 size_t payload_size;
2089 void *out_buf;
2090 size_t out_size;
2091 struct flb_stackdriver *ctx = out_context;
2092 struct flb_upstream_conn *u_conn;
2093 struct flb_http_client *c;
2094 #ifdef FLB_HAVE_METRICS
2095 char *name = (char *) flb_output_name(ctx->ins);
2096 uint64_t ts = cmt_time_now();
2097 #endif
2098
2099 /* Get upstream connection */
2100 u_conn = flb_upstream_conn_get(ctx->u);
2101 if (!u_conn) {
2102 #ifdef FLB_HAVE_METRICS
2103 cmt_counter_inc(ctx->cmt_failed_requests,
2104 ts, 1, (char *[]) {name});
2105
2106 /* OLD api */
2107 flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2108 #endif
2109 FLB_OUTPUT_RETURN(FLB_RETRY);
2110 }
2111
2112 /* Reformat msgpack to stackdriver JSON payload */
2113 ret = stackdriver_format(config, i_ins,
2114 ctx, NULL,
2115 tag, tag_len,
2116 data, bytes,
2117 &out_buf, &out_size);
2118 if (ret != 0) {
2119 #ifdef FLB_HAVE_METRICS
2120 cmt_counter_inc(ctx->cmt_failed_requests,
2121 ts, 1, (char *[]) {name});
2122
2123 /* OLD api */
2124 flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2125 #endif
2126 flb_upstream_conn_release(u_conn);
2127 FLB_OUTPUT_RETURN(FLB_RETRY);
2128 }
2129
2130 payload_buf = (flb_sds_t) out_buf;
2131 payload_size = out_size;
2132
2133 /* Get or renew Token */
2134 token = get_google_token(ctx);
2135 if (!token) {
2136 flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
2137 flb_upstream_conn_release(u_conn);
2138 flb_sds_destroy(payload_buf);
2139 #ifdef FLB_HAVE_METRICS
2140 cmt_counter_inc(ctx->cmt_failed_requests,
2141 ts, 1, (char *[]) {name});
2142
2143 /* OLD api */
2144 flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2145 #endif
2146 FLB_OUTPUT_RETURN(FLB_RETRY);
2147 }
2148
2149 /* Compose HTTP Client request */
2150 c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI,
2151 payload_buf, payload_size, NULL, 0, NULL, 0);
2152
2153 flb_http_buffer_size(c, 4192);
2154
2155 if (ctx->stackdriver_agent) {
2156 flb_http_add_header(c, "User-Agent", 10,
2157 ctx->stackdriver_agent,
2158 flb_sds_len(ctx->stackdriver_agent));
2159 }
2160 else {
2161 flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
2162 }
2163
2164 flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
2165 flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
2166
2167 /* Send HTTP request */
2168 ret = flb_http_do(c, &b_sent);
2169
2170 /* validate response */
2171 if (ret != 0) {
2172 flb_plg_warn(ctx->ins, "http_do=%i", ret);
2173 ret_code = FLB_RETRY;
2174 }
2175 else {
2176 /* The request was issued successfully, validate the 'error' field */
2177 flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
2178 if (c->resp.status == 200) {
2179 ret_code = FLB_OK;
2180 }
2181 else if (c->resp.status >= 400 && c->resp.status < 500) {
2182 ret_code = FLB_ERROR;
2183 flb_plg_warn(ctx->ins, "error\n%s",
2184 c->resp.payload);
2185 }
2186 else {
2187 if (c->resp.payload_size > 0) {
2188 /* we got an error */
2189 flb_plg_warn(ctx->ins, "error\n%s",
2190 c->resp.payload);
2191 }
2192 else {
2193 flb_plg_debug(ctx->ins, "response\n%s",
2194 c->resp.payload);
2195 }
2196 ret_code = FLB_RETRY;
2197 }
2198 }
2199
2200 /* Update specific stackdriver metrics */
2201 #ifdef FLB_HAVE_METRICS
2202 if (ret_code == FLB_OK) {
2203 cmt_counter_inc(ctx->cmt_successful_requests,
2204 ts, 1, (char *[]) {name});
2205
2206 /* OLD api */
2207 flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);
2208 }
2209 else {
2210 cmt_counter_inc(ctx->cmt_failed_requests,
2211 ts, 1, (char *[]) {name});
2212
2213 /* OLD api */
2214 flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2215 }
2216 #endif
2217
2218 /* Cleanup */
2219 flb_sds_destroy(payload_buf);
2220 flb_sds_destroy(token);
2221 flb_http_client_destroy(c);
2222 flb_upstream_conn_release(u_conn);
2223
2224 /* Done */
2225 FLB_OUTPUT_RETURN(ret_code);
2226 }
2227
cb_stackdriver_exit(void * data,struct flb_config * config)2228 static int cb_stackdriver_exit(void *data, struct flb_config *config)
2229 {
2230 struct flb_stackdriver *ctx = data;
2231
2232 if (!ctx) {
2233 return -1;
2234 }
2235
2236 flb_stackdriver_conf_destroy(ctx);
2237 return 0;
2238 }
2239
2240 struct flb_output_plugin out_stackdriver_plugin = {
2241 .name = "stackdriver",
2242 .description = "Send events to Google Stackdriver Logging",
2243 .cb_init = cb_stackdriver_init,
2244 .cb_flush = cb_stackdriver_flush,
2245 .cb_exit = cb_stackdriver_exit,
2246
2247 /* Test */
2248 .test_formatter.callback = stackdriver_format,
2249
2250 /* Plugin flags */
2251 .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
2252 };
2253