1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_http_client.h>
23 #include <fluent-bit/flb_pack.h>
24 #include <fluent-bit/flb_utils.h>
25 #include <fluent-bit/flb_time.h>
26 #include <fluent-bit/flb_oauth2.h>
27 #include <fluent-bit/flb_regex.h>
28 #include <fluent-bit/flb_pthread.h>
29 
30 #include <msgpack.h>
31 
32 #include "gce_metadata.h"
33 #include "stackdriver.h"
34 #include "stackdriver_conf.h"
35 #include "stackdriver_operation.h"
36 #include "stackdriver_source_location.h"
37 #include "stackdriver_http_request.h"
38 #include "stackdriver_timestamp.h"
39 #include "stackdriver_helper.h"
40 #include <mbedtls/base64.h>
41 #include <mbedtls/sha256.h>
42 
43 pthread_key_t oauth2_type;
44 pthread_key_t oauth2_token;
45 
oauth2_cache_exit(void * ptr)46 static void oauth2_cache_exit(void *ptr)
47 {
48     if (ptr) {
49         flb_sds_destroy(ptr);
50     }
51 }
52 
oauth2_cache_init()53 static void oauth2_cache_init()
54 {
55     /* oauth2 pthread key */
56     pthread_key_create(&oauth2_type, oauth2_cache_exit);
57     pthread_key_create(&oauth2_token, oauth2_cache_exit);
58 }
59 
60 /* Set oauth2 type and token in pthread keys */
oauth2_cache_set(char * type,char * token)61 static void oauth2_cache_set(char *type, char *token)
62 {
63     flb_sds_t tmp;
64 
65     /* oauth2 type */
66     tmp = pthread_getspecific(oauth2_type);
67     if (tmp) {
68         flb_sds_destroy(tmp);
69     }
70     tmp = flb_sds_create(type);
71     pthread_setspecific(oauth2_type, tmp);
72 
73     /* oauth2 access token */
74     tmp = pthread_getspecific(oauth2_token);
75     if (tmp) {
76         flb_sds_destroy(tmp);
77     }
78     tmp = flb_sds_create(token);
79     pthread_setspecific(oauth2_token, tmp);
80 }
81 
82 /* By using pthread keys cached values, compose the authorizatoin token */
oauth2_cache_to_token()83 static flb_sds_t oauth2_cache_to_token()
84 {
85     flb_sds_t type;
86     flb_sds_t token;
87     flb_sds_t output;
88 
89     type = pthread_getspecific(oauth2_type);
90     if (!type) {
91         return NULL;
92     }
93 
94     output = flb_sds_create(type);
95     if (!output) {
96         return NULL;
97     }
98 
99     token = pthread_getspecific(oauth2_token);
100     flb_sds_printf(&output, " %s", token);
101     return output;
102 }
103 
104 /*
105  * Base64 Encoding in JWT must:
106  *
107  * - remove any trailing padding '=' character
108  * - replace '+' with '-'
109  * - replace '/' with '_'
110  *
111  * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C
112  */
jwt_base64_url_encode(unsigned char * out_buf,size_t out_size,unsigned char * in_buf,size_t in_size,size_t * olen)113 int jwt_base64_url_encode(unsigned char *out_buf, size_t out_size,
114                           unsigned char *in_buf, size_t in_size,
115                           size_t *olen)
116 
117 {
118     int i;
119     size_t len;
120 
121     /* do normal base64 encoding */
122     mbedtls_base64_encode(out_buf, out_size - 1,
123                           &len, in_buf, in_size);
124 
125     /* Replace '+' and '/' characters */
126     for (i = 0; i < len && out_buf[i] != '='; i++) {
127         if (out_buf[i] == '+') {
128             out_buf[i] = '-';
129         }
130         else if (out_buf[i] == '/') {
131             out_buf[i] = '_';
132         }
133     }
134 
135     /* Now 'i' becomes the new length */
136     *olen = i;
137     return 0;
138 }
139 
jwt_encode(char * payload,char * secret,char ** out_signature,size_t * out_size,struct flb_stackdriver * ctx)140 static int jwt_encode(char *payload, char *secret,
141                       char **out_signature, size_t *out_size,
142                       struct flb_stackdriver *ctx)
143 {
144     int ret;
145     int len;
146     int buf_size;
147     size_t olen;
148     char *buf;
149     char *sigd;
150     char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}";
151     unsigned char sha256_buf[32] = {0};
152     mbedtls_sha256_context sha256_ctx;
153     mbedtls_rsa_context *rsa;
154     flb_sds_t out;
155     mbedtls_pk_context pk_ctx;
156     unsigned char sig[256] = {0};
157 
158     buf_size = (strlen(payload) + strlen(secret)) * 2;
159     buf = flb_malloc(buf_size);
160     if (!buf) {
161         flb_errno();
162         return -1;
163     }
164 
165     /* Encode header */
166     len = strlen(headers);
167     mbedtls_base64_encode((unsigned char *) buf, buf_size - 1,
168                           &olen, (unsigned char *) headers, len);
169 
170     /* Create buffer to store JWT */
171     out = flb_sds_create_size(2048);
172     if (!out) {
173         flb_errno();
174         flb_free(buf);
175         return -1;
176     }
177 
178     /* Append header */
179     flb_sds_cat(out, buf, olen);
180     flb_sds_cat(out, ".", 1);
181 
182     /* Encode Payload */
183     len = strlen(payload);
184     jwt_base64_url_encode((unsigned char *) buf, buf_size,
185                           (unsigned char *) payload, len, &olen);
186 
187     /* Append Payload */
188     flb_sds_cat(out, buf, olen);
189 
190     /* do sha256() of base64(header).base64(payload) */
191     mbedtls_sha256_init(&sha256_ctx);
192     mbedtls_sha256_starts(&sha256_ctx, 0);
193     mbedtls_sha256_update(&sha256_ctx, (const unsigned char *) out,
194                           flb_sds_len(out));
195     mbedtls_sha256_finish(&sha256_ctx, sha256_buf);
196 
197     /* In mbedTLS cert length must include the null byte */
198     len = strlen(secret) + 1;
199 
200     /* Load Private Key */
201     mbedtls_pk_init(&pk_ctx);
202     ret = mbedtls_pk_parse_key(&pk_ctx,
203                                (unsigned char *) secret, len, NULL, 0);
204     if (ret != 0) {
205         flb_plg_error(ctx->ins, "error loading private key");
206         flb_free(buf);
207         flb_sds_destroy(out);
208         return -1;
209     }
210 
211     /* Create RSA context */
212     rsa = mbedtls_pk_rsa(pk_ctx);
213     if (!rsa) {
214         flb_plg_error(ctx->ins, "error creating RSA context");
215         flb_free(buf);
216         flb_sds_destroy(out);
217         mbedtls_pk_free(&pk_ctx);
218         return -1;
219     }
220 
221     ret = mbedtls_rsa_pkcs1_sign(rsa, NULL, NULL,
222                                  MBEDTLS_RSA_PRIVATE, MBEDTLS_MD_SHA256,
223                                  0, (unsigned char *) sha256_buf, sig);
224     if (ret != 0) {
225         flb_plg_error(ctx->ins, "error signing SHA256");
226         flb_free(buf);
227         flb_sds_destroy(out);
228         mbedtls_pk_free(&pk_ctx);
229         return -1;
230     }
231 
232     sigd = flb_malloc(2048);
233     if (!sigd) {
234         flb_errno();
235         flb_free(buf);
236         flb_sds_destroy(out);
237         mbedtls_pk_free(&pk_ctx);
238         return -1;
239     }
240 
241     jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen);
242 
243     flb_sds_cat(out, ".", 1);
244     flb_sds_cat(out, sigd, olen);
245 
246     *out_signature = out;
247     *out_size = flb_sds_len(out);
248 
249     flb_free(buf);
250     flb_free(sigd);
251     mbedtls_pk_free(&pk_ctx);
252 
253     return 0;
254 }
255 
256 /* Create a new oauth2 context and get a oauth2 token */
get_oauth2_token(struct flb_stackdriver * ctx)257 static int get_oauth2_token(struct flb_stackdriver *ctx)
258 {
259     int ret;
260     char *token;
261     char *sig_data;
262     size_t sig_size;
263     time_t issued;
264     time_t expires;
265     char payload[1024];
266 
267     flb_oauth2_payload_clear(ctx->o);
268 
269     /* In case of using metadata server, fetch token from there */
270     if (ctx->metadata_server_auth) {
271         return gce_metadata_read_token(ctx);
272     }
273 
274     /* JWT encode for oauth2 */
275     issued = time(NULL);
276     expires = issued + FLB_STD_TOKEN_REFRESH;
277 
278     snprintf(payload, sizeof(payload) - 1,
279              "{\"iss\": \"%s\", \"scope\": \"%s\", "
280              "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}",
281              ctx->client_email, FLB_STD_SCOPE,
282              FLB_STD_AUTH_URL,
283              expires, issued);
284 
285     /* Compose JWT signature */
286     ret = jwt_encode(payload, ctx->private_key, &sig_data, &sig_size, ctx);
287     if (ret != 0) {
288         flb_plg_error(ctx->ins, "JWT signature generation failed");
289         return -1;
290     }
291     flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data);
292 
293     ret = flb_oauth2_payload_append(ctx->o,
294                                     "grant_type", -1,
295                                     "urn:ietf:params:oauth:"
296                                     "grant-type:jwt-bearer", -1);
297     if (ret == -1) {
298         flb_plg_error(ctx->ins, "error appending oauth2 params");
299         flb_sds_destroy(sig_data);
300         return -1;
301     }
302 
303     ret = flb_oauth2_payload_append(ctx->o,
304                                     "assertion", -1,
305                                     sig_data, sig_size);
306     if (ret == -1) {
307         flb_plg_error(ctx->ins, "error appending oauth2 params");
308         flb_sds_destroy(sig_data);
309         return -1;
310     }
311     flb_sds_destroy(sig_data);
312 
313     /* Retrieve access token */
314     token = flb_oauth2_token_get(ctx->o);
315     if (!token) {
316         flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
317         return -1;
318     }
319 
320     return 0;
321 }
322 
get_google_token(struct flb_stackdriver * ctx)323 static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
324 {
325     int ret = 0;
326     flb_sds_t output = NULL;
327 
328     ret = pthread_mutex_trylock(&ctx->token_mutex);
329     if (ret == EBUSY) {
330         /*
331          * If the routine is locked we just use our pre-cached values and
332          * compose the expected authorization value.
333          *
334          * If the routine fails it will return NULL and the caller will just
335          * issue a FLB_RETRY.
336          */
337         return oauth2_cache_to_token();
338     }
339     else if (ret != 0) {
340         flb_plg_error(ctx->ins, "error locking mutex");
341         return NULL;
342     }
343 
344     if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
345         ret = get_oauth2_token(ctx);
346     }
347 
348     /* Copy string to prevent race conditions (get_oauth2 can free the string) */
349     if (ret == 0) {
350         /* Update pthread keys cached values */
351         oauth2_cache_set(ctx->o->token_type, ctx->o->access_token);
352 
353         /* Compose outgoing buffer using cached values */
354         output = oauth2_cache_to_token();
355     }
356 
357     if (pthread_mutex_unlock(&ctx->token_mutex)){
358         flb_plg_error(ctx->ins, "error unlocking mutex");
359         if (output) {
360             flb_sds_destroy(output);
361         }
362         return NULL;
363     }
364 
365 
366     return output;
367 }
368 
validate_msgpack_unpacked_data(msgpack_object root)369 static bool validate_msgpack_unpacked_data(msgpack_object root)
370 {
371     return root.type == MSGPACK_OBJECT_ARRAY &&
372            root.via.array.size == 2 &&
373            root.via.array.ptr[1].type == MSGPACK_OBJECT_MAP;
374 }
375 
replace_prefix_dot(flb_sds_t s,int tag_prefix_len)376 void replace_prefix_dot(flb_sds_t s, int tag_prefix_len)
377 {
378     int i;
379     int str_len;
380     char c;
381 
382     if (!s) {
383         return;
384     }
385 
386     str_len = flb_sds_len(s);
387     if (tag_prefix_len > str_len) {
388         flb_error("[output] tag_prefix shouldn't be longer than local_resource_id");
389         return;
390     }
391 
392     for (i = 0; i < tag_prefix_len; i++) {
393         c = s[i];
394 
395         if (c == '.') {
396             s[i] = '_';
397         }
398     }
399 }
400 
get_str_value_from_msgpack_map(msgpack_object_map map,const char * key,int key_size)401 static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map,
402                                                 const char *key, int key_size)
403 {
404     int i;
405     msgpack_object k;
406     msgpack_object v;
407     flb_sds_t ptr = NULL;
408 
409     for (i = 0; i < map.size; i++) {
410         k = map.ptr[i].key;
411         v = map.ptr[i].val;
412 
413         if (k.type != MSGPACK_OBJECT_STR) {
414             continue;
415         }
416 
417         if (k.via.str.size == key_size &&
418             strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) {
419             /* make sure to free it after use */
420             ptr =  flb_sds_create_len(v.via.str.ptr, v.via.str.size);
421             break;
422         }
423     }
424 
425     return ptr;
426 }
427 
428 /* parse_monitored_resource is to extract the monitoired resource labels
429  * from "logging.googleapis.com/monitored_resource" in log data
430  * and append to 'resource'/'labels' in log entry.
431  * Monitored resource type is already read from resource field in stackdriver
432  * output plugin configuration parameters.
433  *
434  * The structure of monitored_resource is:
435  * {
436  *   "logging.googleapis.com/monitored_resource": {
437  *      "labels": {
438  *         "resource_label": <label_value>,
439  *      }
440  *    }
441  * }
442  * See https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types
443  * for required labels for each monitored resource.
444  */
445 
parse_monitored_resource(struct flb_stackdriver * ctx,const void * data,size_t bytes,msgpack_packer * mp_pck)446 static int parse_monitored_resource(struct flb_stackdriver *ctx, const void *data, size_t bytes, msgpack_packer *mp_pck)
447 {
448     int ret = -1;
449     size_t off = 0;
450     msgpack_object *obj;
451     msgpack_unpacked result;
452     msgpack_unpacked_init(&result);
453     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
454         if (result.data.type != MSGPACK_OBJECT_ARRAY) {
455             continue;
456         }
457         if (result.data.via.array.size != 2) {
458             continue;
459         }
460         obj = &result.data.via.array.ptr[1];
461         if (obj->type != MSGPACK_OBJECT_MAP) {
462             continue;
463         }
464         msgpack_object_kv *kv = obj->via.map.ptr;
465         msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
466         for (; kv < kvend; ++kv) {
467           if (kv->val.type == MSGPACK_OBJECT_MAP && kv->key.type == MSGPACK_OBJECT_STR
468           && strncmp (MONITORED_RESOURCE_KEY, kv->key.via.str.ptr, kv->key.via.str.size) == 0) {
469             msgpack_object subobj = kv->val;
470             msgpack_object_kv *p = subobj.via.map.ptr;
471             msgpack_object_kv *pend = subobj.via.map.ptr + subobj.via.map.size;
472             for (; p < pend; ++p) {
473               if (p->key.type != MSGPACK_OBJECT_STR || p->val.type != MSGPACK_OBJECT_MAP) {
474                 continue;
475               }
476               if (strncmp("labels", p->key.via.str.ptr, p->key.via.str.size) == 0) {
477                   msgpack_object labels = p->val;
478                   msgpack_object_kv *q = labels.via.map.ptr;
479                   msgpack_object_kv *qend = labels.via.map.ptr + labels.via.map.size;
480                   int fields = 0;
481                   for (; q < qend; ++q) {
482                     if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
483                         flb_plg_error(ctx->ins, "Key and value should be string in the %s/labels", MONITORED_RESOURCE_KEY);
484                     }
485                     ++fields;
486                   }
487                   if (fields > 0) {
488                     msgpack_pack_map(mp_pck, fields);
489                     q = labels.via.map.ptr;
490                     for (; q < qend; ++q) {
491                       if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
492                           continue;
493                       }
494                       flb_plg_debug(ctx->ins, "[%s] found in the payload", MONITORED_RESOURCE_KEY);
495                       msgpack_pack_str(mp_pck, q->key.via.str.size);
496                       msgpack_pack_str_body(mp_pck, q->key.via.str.ptr, q->key.via.str.size);
497                       msgpack_pack_str(mp_pck, q->val.via.str.size);
498                       msgpack_pack_str_body(mp_pck, q->val.via.str.ptr, q->val.via.str.size);
499                     }
500                     msgpack_unpacked_destroy(&result);
501                     ret = 0;
502                     return ret;
503                   }
504               }
505             }
506           }
507         }
508     }
509 
510     msgpack_unpacked_destroy(&result);
511     flb_plg_debug(ctx->ins, "[%s] not found in the payload", MONITORED_RESOURCE_KEY);
512     return ret;
513 }
514 
515 /*
516  * Given a local_resource_id, split the content using the proper separator generating
517  * a linked list to store the spliited string
518  */
parse_local_resource_id_to_list(char * local_resource_id,char * type)519 static struct mk_list *parse_local_resource_id_to_list(char *local_resource_id, char *type)
520 {
521     int ret = -1;
522     int max_split = -1;
523     int len_k8s_container;
524     int len_k8s_node;
525     int len_k8s_pod;
526     struct mk_list *list;
527 
528     len_k8s_container = sizeof(K8S_CONTAINER) - 1;
529     len_k8s_node = sizeof(K8S_NODE) - 1;
530     len_k8s_pod = sizeof(K8S_POD) - 1;
531 
532     /* Allocate list head */
533     list = flb_malloc(sizeof(struct mk_list));
534     if (!list) {
535         flb_errno();
536         return NULL;
537     }
538     mk_list_init(list);
539 
540     /* Determinate the max split value based on type */
541     if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
542         /* including the prefix of tag */
543         max_split = 4;
544     }
545     else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
546         max_split = 2;
547     }
548     else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
549         max_split = 3;
550     }
551 
552     /* The local_resource_id is splitted by '.' */
553     ret = flb_slist_split_string(list, local_resource_id, '.', max_split);
554 
555     if (ret == -1 || mk_list_size(list) != max_split) {
556         flb_error("error parsing local_resource_id [%s] for type %s", local_resource_id, type);
557         flb_slist_destroy(list);
558         flb_free(list);
559         return NULL;
560     }
561 
562     return list;
563 }
564 
565 /*
566  *    extract_local_resource_id():
567  *  - extract the value from "logging.googleapis.com/local_resource_id" field
568  *  - if local_resource_id is missing from the payLoad, use the tag of the log
569  */
extract_local_resource_id(const void * data,size_t bytes,struct flb_stackdriver * ctx,const char * tag)570 static int extract_local_resource_id(const void *data, size_t bytes,
571                                      struct flb_stackdriver *ctx, const char *tag) {
572     msgpack_object root;
573     msgpack_object_map map;
574     msgpack_unpacked result;
575     flb_sds_t local_resource_id;
576     size_t off = 0;
577 
578     msgpack_unpacked_init(&result);
579     if (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
580         root = result.data;
581 
582         if (!validate_msgpack_unpacked_data(root)) {
583             msgpack_unpacked_destroy(&result);
584             flb_plg_error(ctx->ins, "unexpected record format");
585             return -1;
586         }
587 
588         map = root.via.array.ptr[1].via.map;
589         local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY,
590                                                            LEN_LOCAL_RESOURCE_ID_KEY);
591 
592         if (local_resource_id == NULL) {
593             /* if local_resource_id is not found, use the tag of the log */
594             flb_plg_debug(ctx->ins, "local_resource_id not found, "
595                                     "tag [%s] is assigned for local_resource_id", tag);
596             local_resource_id = flb_sds_create(tag);
597         }
598 
599         /* we need to create up the local_resource_id from previous log */
600         if (ctx->local_resource_id) {
601             flb_sds_destroy(ctx->local_resource_id);
602         }
603 
604         ctx->local_resource_id = flb_sds_create(local_resource_id);
605     }
606     else {
607         msgpack_unpacked_destroy(&result);
608         flb_plg_error(ctx->ins, "failed to unpack data");
609         return -1;
610     }
611 
612     flb_sds_destroy(local_resource_id);
613     msgpack_unpacked_destroy(&result);
614     return 0;
615 }
616 
617 /*
618  *    set_monitored_resource_labels():
619  *  - use the extracted local_resource_id to assign the label keys for different
620  *    resource types that are specified in the configuration of stackdriver_out plugin
621  */
set_monitored_resource_labels(struct flb_stackdriver * ctx,char * type)622 static int set_monitored_resource_labels(struct flb_stackdriver *ctx, char *type)
623 {
624     int ret = -1;
625     int first = FLB_TRUE;
626     int counter = 0;
627     int len_k8s_container;
628     int len_k8s_node;
629     int len_k8s_pod;
630     size_t prefix_len = 0;
631     struct local_resource_id_list *ptr;
632     struct mk_list *list = NULL;
633     struct mk_list *head;
634     flb_sds_t new_local_resource_id;
635 
636     if (!ctx->local_resource_id) {
637         flb_plg_error(ctx->ins, "local_resource_is is not assigned");
638         return -1;
639     }
640 
641     len_k8s_container = sizeof(K8S_CONTAINER) - 1;
642     len_k8s_node = sizeof(K8S_NODE) - 1;
643     len_k8s_pod = sizeof(K8S_POD) - 1;
644 
645     prefix_len = flb_sds_len(ctx->tag_prefix);
646     if (flb_sds_casecmp(ctx->tag_prefix, ctx->local_resource_id, prefix_len) != 0) {
647         flb_plg_error(ctx->ins, "tag_prefix [%s] doesn't match the prefix of"
648                       " local_resource_id [%s]", ctx->tag_prefix,
649                       ctx->local_resource_id);
650         return -1;
651     }
652 
653     new_local_resource_id = flb_sds_create_len(ctx->local_resource_id,
654                                                flb_sds_len(ctx->local_resource_id));
655     replace_prefix_dot(new_local_resource_id, prefix_len - 1);
656 
657     if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
658         list = parse_local_resource_id_to_list(new_local_resource_id, K8S_CONTAINER);
659         if (!list) {
660             goto error;
661         }
662 
663         /* iterate through the list */
664         mk_list_foreach(head, list) {
665             ptr = mk_list_entry(head, struct local_resource_id_list, _head);
666             if (first) {
667                 first = FLB_FALSE;
668                 continue;
669             }
670 
671             /* Follow the order of fields in local_resource_id */
672             if (counter == 0) {
673                 if (ctx->namespace_name) {
674                     flb_sds_destroy(ctx->namespace_name);
675                 }
676                 ctx->namespace_name = flb_sds_create(ptr->val);
677             }
678             else if (counter == 1) {
679                 if (ctx->pod_name) {
680                     flb_sds_destroy(ctx->pod_name);
681                 }
682                 ctx->pod_name = flb_sds_create(ptr->val);
683             }
684             else if (counter == 2) {
685                 if (ctx->container_name) {
686                     flb_sds_destroy(ctx->container_name);
687                 }
688                 ctx->container_name = flb_sds_create(ptr->val);
689             }
690 
691             counter++;
692         }
693 
694         if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) {
695             goto error;
696         }
697     }
698     else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
699         list = parse_local_resource_id_to_list(new_local_resource_id, K8S_NODE);
700         if (!list) {
701             goto error;
702         }
703 
704         mk_list_foreach(head, list) {
705             ptr = mk_list_entry(head, struct local_resource_id_list, _head);
706             if (first) {
707                 first = FLB_FALSE;
708                 continue;
709             }
710 
711             if (ptr != NULL) {
712                 if (ctx->node_name) {
713                     flb_sds_destroy(ctx->node_name);
714                 }
715                 ctx->node_name = flb_sds_create(ptr->val);
716             }
717         }
718 
719         if (!ctx->node_name) {
720             goto error;
721         }
722     }
723     else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
724         list = parse_local_resource_id_to_list(new_local_resource_id, K8S_POD);
725         if (!list) {
726             goto error;
727         }
728 
729         mk_list_foreach(head, list) {
730             ptr = mk_list_entry(head, struct local_resource_id_list, _head);
731             if (first) {
732                 first = FLB_FALSE;
733                 continue;
734             }
735 
736             /* Follow the order of fields in local_resource_id */
737             if (counter == 0) {
738                 if (ctx->namespace_name) {
739                     flb_sds_destroy(ctx->namespace_name);
740                 }
741                 ctx->namespace_name = flb_sds_create(ptr->val);
742             }
743             else if (counter == 1) {
744                 if (ctx->pod_name) {
745                     flb_sds_destroy(ctx->pod_name);
746                 }
747                 ctx->pod_name = flb_sds_create(ptr->val);
748             }
749 
750             counter++;
751         }
752 
753         if (!ctx->namespace_name || !ctx->pod_name) {
754             goto error;
755         }
756     }
757 
758     ret = 0;
759 
760     if (list) {
761         flb_slist_destroy(list);
762         flb_free(list);
763     }
764     flb_sds_destroy(new_local_resource_id);
765 
766     return ret;
767 
768  error:
769     if (list) {
770         flb_slist_destroy(list);
771         flb_free(list);
772     }
773 
774     if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
775         if (ctx->namespace_name) {
776             flb_sds_destroy(ctx->namespace_name);
777         }
778 
779         if (ctx->pod_name) {
780             flb_sds_destroy(ctx->pod_name);
781         }
782 
783         if (ctx->container_name) {
784             flb_sds_destroy(ctx->container_name);
785         }
786     }
787     else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
788         if (ctx->node_name) {
789             flb_sds_destroy(ctx->node_name);
790         }
791     }
792     else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
793         if (ctx->namespace_name) {
794             flb_sds_destroy(ctx->namespace_name);
795         }
796 
797         if (ctx->pod_name) {
798             flb_sds_destroy(ctx->pod_name);
799         }
800     }
801 
802     flb_sds_destroy(new_local_resource_id);
803     return -1;
804 }
805 
is_tag_match_regex(struct flb_stackdriver * ctx,const char * tag,int tag_len)806 static int is_tag_match_regex(struct flb_stackdriver *ctx,
807                               const char *tag, int tag_len)
808 {
809     int ret;
810     int tag_prefix_len;
811     int len_to_be_matched;
812     const char *tag_str_to_be_matcheds;
813 
814     tag_prefix_len = flb_sds_len(ctx->tag_prefix);
815     if (tag_len > tag_prefix_len &&
816         flb_sds_cmp(ctx->tag_prefix, tag, tag_prefix_len) != 0) {
817         return 0;
818     }
819 
820     tag_str_to_be_matcheds = tag + tag_prefix_len;
821     len_to_be_matched = tag_len - tag_prefix_len;
822     ret = flb_regex_match(ctx->regex,
823                           (unsigned char *) tag_str_to_be_matcheds,
824                           len_to_be_matched);
825 
826     /* 1 -> match;  0 -> doesn't match;  < 0 -> error */
827     return ret;
828 }
829 
is_local_resource_id_match_regex(struct flb_stackdriver * ctx)830 static int is_local_resource_id_match_regex(struct flb_stackdriver *ctx)
831 {
832     int ret;
833     int prefix_len;
834     int len_to_be_matched;
835     const char *str_to_be_matcheds;
836 
837     if (!ctx->local_resource_id) {
838         flb_plg_warn(ctx->ins, "local_resource_id not found in the payload");
839         return -1;
840     }
841 
842     prefix_len = flb_sds_len(ctx->tag_prefix);
843     str_to_be_matcheds = ctx->local_resource_id + prefix_len;
844     len_to_be_matched = flb_sds_len(ctx->local_resource_id) - prefix_len;
845 
846     ret = flb_regex_match(ctx->regex,
847                           (unsigned char *) str_to_be_matcheds,
848                           len_to_be_matched);
849 
850     /* 1 -> match;  0 -> doesn't match;  < 0 -> error */
851     return ret;
852 }
853 
854 static void cb_results(const char *name, const char *value,
855                        size_t vlen, void *data);
856 /*
857  * extract_resource_labels_from_regex(4) will only be called if the
858  * tag or local_resource_id field matches the regex rule
859  */
extract_resource_labels_from_regex(struct flb_stackdriver * ctx,const char * tag,int tag_len,int from_tag)860 static int extract_resource_labels_from_regex(struct flb_stackdriver *ctx,
861                                               const char *tag, int tag_len,
862                                               int from_tag)
863 {
864     int ret = 1;
865     int prefix_len;
866     int len_to_be_matched;
867     int local_resource_id_len;
868     const char *str_to_be_matcheds;
869     struct flb_regex_search result;
870 
871     prefix_len = flb_sds_len(ctx->tag_prefix);
872     if (from_tag == FLB_TRUE) {
873         local_resource_id_len = tag_len;
874         str_to_be_matcheds = tag + prefix_len;
875     }
876     else {
877         // this will be called only if the payload contains local_resource_id
878         local_resource_id_len = flb_sds_len(ctx->local_resource_id);
879         str_to_be_matcheds = ctx->local_resource_id + prefix_len;
880     }
881 
882     len_to_be_matched = local_resource_id_len - prefix_len;
883     ret = flb_regex_do(ctx->regex, str_to_be_matcheds, len_to_be_matched, &result);
884     if (ret <= 0) {
885         flb_plg_warn(ctx->ins, "invalid pattern for given value %s when"
886                      " extracting resource labels", str_to_be_matcheds);
887         return -1;
888     }
889 
890     flb_regex_parse(ctx->regex, &result, cb_results, ctx);
891 
892     return ret;
893 }
894 
process_local_resource_id(struct flb_stackdriver * ctx,const char * tag,int tag_len,char * type)895 static int process_local_resource_id(struct flb_stackdriver *ctx,
896                                      const char *tag, int tag_len, char *type)
897 {
898     int ret;
899 
900     // parsing local_resource_id from tag takes higher priority
901     if (is_tag_match_regex(ctx, tag, tag_len) > 0) {
902         ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_TRUE);
903     }
904     else if (is_local_resource_id_match_regex(ctx) > 0) {
905         ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_FALSE);
906     }
907     else {
908         ret = set_monitored_resource_labels(ctx, type);
909     }
910 
911     return ret;
912 }
913 
914 /*
915  * parse_labels
916  * - Iterate throught the original payload (obj) and find out the entry that matches
917  *   the labels_key
918  * - Used to convert all labels under labels_key to root-level `labels` field
919  */
parse_labels(struct flb_stackdriver * ctx,msgpack_object * obj)920 static msgpack_object *parse_labels(struct flb_stackdriver *ctx, msgpack_object *obj)
921 {
922     int i;
923     int len;
924     msgpack_object_kv *kv = NULL;
925 
926     if (!obj || obj->type != MSGPACK_OBJECT_MAP) {
927         return NULL;
928     }
929 
930     len = flb_sds_len(ctx->labels_key);
931     for (i = 0; i < obj->via.map.size; i++) {
932         kv = &obj->via.map.ptr[i];
933         if (flb_sds_casecmp(ctx->labels_key, kv->key.via.str.ptr, len) == 0) {
934             /* only the first matching entry will be returned */
935             return &kv->val;
936         }
937     }
938 
939     //flb_plg_debug(ctx->ins, "labels_key [%s] not found in the payload",
940     //              ctx->labels_key);
941     return NULL;
942 }
943 
cb_results(const char * name,const char * value,size_t vlen,void * data)944 static void cb_results(const char *name, const char *value,
945                        size_t vlen, void *data)
946 {
947     struct flb_stackdriver *ctx = data;
948 
949     if (vlen == 0) {
950         return;
951     }
952 
953     if (strcmp(name, "pod_name") == 0) {
954         if (ctx->pod_name != NULL) {
955             flb_sds_destroy(ctx->pod_name);
956         }
957         ctx->pod_name = flb_sds_create_len(value, vlen);
958     }
959     else if (strcmp(name, "namespace_name") == 0) {
960         if (ctx->namespace_name != NULL) {
961             flb_sds_destroy(ctx->namespace_name);
962         }
963         ctx->namespace_name = flb_sds_create_len(value, vlen);
964     }
965     else if (strcmp(name, "container_name") == 0) {
966         if (ctx->container_name != NULL) {
967             flb_sds_destroy(ctx->container_name);
968         }
969         ctx->container_name = flb_sds_create_len(value, vlen);
970     }
971     else if (strcmp(name, "node_name") == 0) {
972         if (ctx->node_name != NULL) {
973             flb_sds_destroy(ctx->node_name);
974         }
975         ctx->node_name = flb_sds_create_len(value, vlen);
976     }
977 
978     return;
979 }
980 
flb_stackdriver_regex_init(struct flb_stackdriver * ctx)981 int flb_stackdriver_regex_init(struct flb_stackdriver *ctx)
982 {
983     /* If a custom regex is not set, use the defaults */
984     if (!ctx->custom_k8s_regex) {
985         ctx->custom_k8s_regex = flb_sds_create(DEFAULT_TAG_REGEX);
986         if (!ctx->custom_k8s_regex) {
987             return -1;
988         }
989     }
990 
991     ctx->regex = flb_regex_create(ctx->custom_k8s_regex);
992     if (!ctx->regex) {
993         return -1;
994     }
995 
996     return 0;
997 }
998 
cb_stackdriver_init(struct flb_output_instance * ins,struct flb_config * config,void * data)999 static int cb_stackdriver_init(struct flb_output_instance *ins,
1000                           struct flb_config *config, void *data)
1001 {
1002     int ret;
1003     int io_flags = FLB_IO_TLS;
1004     char *token;
1005     struct flb_stackdriver *ctx;
1006 
1007     /* Create config context */
1008     ctx = flb_stackdriver_conf_create(ins, config);
1009     if (!ctx) {
1010         flb_plg_error(ins, "configuration failed");
1011         return -1;
1012     }
1013 
1014     /* Load config map */
1015     ret = flb_output_config_map_set(ins, (void *) ctx);
1016     if (ret == -1) {
1017         return -1;
1018     }
1019 
1020     /* Set context */
1021     flb_output_set_context(ins, ctx);
1022 
1023     /* Network mode IPv6 */
1024     if (ins->host.ipv6 == FLB_TRUE) {
1025         io_flags |= FLB_IO_IPV6;
1026     }
1027 
1028     /* Initialize oauth2 cache pthread keys */
1029     oauth2_cache_init();
1030 
1031     /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */
1032     pthread_mutex_init(&ctx->token_mutex, NULL);
1033 
1034     /* Create Upstream context for Stackdriver Logging (no oauth2 service) */
1035     ctx->u = flb_upstream_create_url(config, FLB_STD_WRITE_URL,
1036                                      io_flags, ins->tls);
1037     ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_server,
1038                                               FLB_IO_TCP, NULL);
1039 
1040     /* Create oauth2 context */
1041     ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000);
1042 
1043     if (!ctx->u) {
1044         flb_plg_error(ctx->ins, "upstream creation failed");
1045         return -1;
1046     }
1047     if (!ctx->metadata_u) {
1048         flb_plg_error(ctx->ins, "metadata upstream creation failed");
1049         return -1;
1050     }
1051     if (!ctx->o) {
1052         flb_plg_error(ctx->ins, "cannot create oauth2 context");
1053         return -1;
1054     }
1055     flb_output_upstream_set(ctx->u, ins);
1056 
1057     /* Metadata Upstream Sync flags */
1058     ctx->metadata_u->flags &= ~FLB_IO_ASYNC;
1059 
1060     if (ins->test_mode == FLB_FALSE) {
1061         /* Retrieve oauth2 token */
1062         token = get_google_token(ctx);
1063         if (!token) {
1064             flb_plg_warn(ctx->ins, "token retrieval failed");
1065         }
1066         else {
1067             flb_sds_destroy(token);
1068         }
1069     }
1070 
1071     if (ctx->metadata_server_auth) {
1072         ret = gce_metadata_read_project_id(ctx);
1073         if (ret == -1) {
1074             return -1;
1075         }
1076 
1077         if (!ctx->is_generic_resource_type) {
1078             ret = gce_metadata_read_zone(ctx);
1079             if (ret == -1) {
1080                 return -1;
1081             }
1082 
1083             ret = gce_metadata_read_instance_id(ctx);
1084             if (ret == -1) {
1085                 return -1;
1086             }
1087         }
1088     }
1089 
1090     /* Validate project_id */
1091     if (!ctx->project_id) {
1092         flb_plg_error(ctx->ins, "property 'project_id' is not set");
1093         return -1;
1094     }
1095 
1096     if (!ctx->export_to_project_id) {
1097         ctx->export_to_project_id = flb_sds_create(ctx->project_id);
1098     }
1099 
1100     ret = flb_stackdriver_regex_init(ctx);
1101     if (ret == -1) {
1102         flb_plg_error(ctx->ins, "failed to init stackdriver custom regex");
1103         return -1;
1104     }
1105 
1106     return 0;
1107 }
1108 
validate_severity_level(severity_t * s,const char * str,const unsigned int str_size)1109 static int validate_severity_level(severity_t * s,
1110                                    const char * str,
1111                                    const unsigned int str_size)
1112 {
1113     int i = 0;
1114 
1115     const static struct {
1116         severity_t s;
1117         const unsigned int str_size;
1118         const char * str;
1119     }   enum_mapping[] = {
1120         {FLB_STD_EMERGENCY, 9, "EMERGENCY"},
1121         {FLB_STD_EMERGENCY, 5, "EMERG"    },
1122 
1123         {FLB_STD_ALERT    , 1, "A"        },
1124         {FLB_STD_ALERT    , 5, "ALERT"    },
1125 
1126         {FLB_STD_CRITICAL , 1, "C"        },
1127         {FLB_STD_CRITICAL , 1, "F"        },
1128         {FLB_STD_CRITICAL , 4, "CRIT"     },
1129         {FLB_STD_CRITICAL , 5, "FATAL"    },
1130         {FLB_STD_CRITICAL , 8, "CRITICAL" },
1131 
1132         {FLB_STD_ERROR    , 1, "E"        },
1133         {FLB_STD_ERROR    , 3, "ERR"      },
1134         {FLB_STD_ERROR    , 5, "ERROR"    },
1135         {FLB_STD_ERROR    , 6, "SEVERE"   },
1136 
1137         {FLB_STD_WARNING  , 1, "W"        },
1138         {FLB_STD_WARNING  , 4, "WARN"     },
1139         {FLB_STD_WARNING  , 7, "WARNING"  },
1140 
1141         {FLB_STD_NOTICE   , 1, "N"        },
1142         {FLB_STD_NOTICE   , 6, "NOTICE"   },
1143 
1144         {FLB_STD_INFO     , 1, "I"        },
1145         {FLB_STD_INFO     , 4, "INFO"     },
1146 
1147         {FLB_STD_DEBUG    , 1, "D"        },
1148         {FLB_STD_DEBUG    , 5, "DEBUG"    },
1149         {FLB_STD_DEBUG    , 5, "TRACE"    },
1150         {FLB_STD_DEBUG    , 9, "TRACE_INT"},
1151         {FLB_STD_DEBUG    , 4, "FINE"     },
1152         {FLB_STD_DEBUG    , 5, "FINER"    },
1153         {FLB_STD_DEBUG    , 6, "FINEST"   },
1154         {FLB_STD_DEBUG    , 6, "CONFIG"   },
1155 
1156         {FLB_STD_DEFAULT  , 7, "DEFAULT"  }
1157     };
1158 
1159     for (i = 0; i < sizeof (enum_mapping) / sizeof (enum_mapping[0]); ++i) {
1160         if (enum_mapping[i].str_size != str_size) {
1161             continue;
1162         }
1163 
1164         if (strncasecmp(str, enum_mapping[i].str, str_size) == 0) {
1165             *s = enum_mapping[i].s;
1166             return 0;
1167         }
1168     }
1169     return -1;
1170 }
1171 
get_msgpack_obj(msgpack_object * subobj,const msgpack_object * o,const flb_sds_t key,const int key_size,msgpack_object_type type)1172 static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o,
1173                            const flb_sds_t key, const int key_size,
1174                            msgpack_object_type type)
1175 {
1176     int i = 0;
1177     msgpack_object_kv * p = NULL;
1178 
1179     if (o == NULL || subobj == NULL) {
1180         return -1;
1181     }
1182 
1183     for (i = 0; i < o->via.map.size; i++) {
1184         p = &o->via.map.ptr[i];
1185         if (p->val.type != type) {
1186             continue;
1187         }
1188 
1189         if (flb_sds_cmp(key, p->key.via.str.ptr, p->key.via.str.size) == 0) {
1190             *subobj = p->val;
1191             return 0;
1192         }
1193     }
1194     return -1;
1195 }
1196 
get_string(flb_sds_t * s,const msgpack_object * o,const flb_sds_t key)1197 static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key)
1198 {
1199     msgpack_object tmp;
1200     if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) {
1201         *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size);
1202         return 0;
1203     }
1204 
1205     *s = 0;
1206     return -1;
1207 }
1208 
get_severity_level(severity_t * s,const msgpack_object * o,const flb_sds_t key)1209 static int get_severity_level(severity_t * s, const msgpack_object * o,
1210                               const flb_sds_t key)
1211 {
1212     msgpack_object tmp;
1213     if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0
1214         && validate_severity_level(s, tmp.via.str.ptr, tmp.via.str.size) == 0) {
1215         return 0;
1216     }
1217     *s = 0;
1218     return -1;
1219 }
1220 
validate_insert_id(msgpack_object * insert_id_value,const msgpack_object * obj)1221 static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
1222                                            const msgpack_object * obj)
1223 {
1224     int i = 0;
1225     msgpack_object_kv * p = NULL;
1226     insert_id_status ret = INSERTID_NOT_PRESENT;
1227 
1228     if (obj == NULL) {
1229         return ret;
1230     }
1231 
1232     for (i = 0; i < obj->via.map.size; i++) {
1233         p = &obj->via.map.ptr[i];
1234         if (p->key.type != MSGPACK_OBJECT_STR) {
1235             continue;
1236         }
1237         if (validate_key(p->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
1238             if (p->val.type == MSGPACK_OBJECT_STR && p->val.via.str.size > 0) {
1239                 *insert_id_value = p->val;
1240                 ret = INSERTID_VALID;
1241             }
1242             else {
1243                 ret = INSERTID_INVALID;
1244             }
1245             break;
1246         }
1247     }
1248     return ret;
1249 }
1250 
pack_json_payload(int insert_id_extracted,int operation_extracted,int operation_extra_size,int source_location_extracted,int source_location_extra_size,int http_request_extracted,int http_request_extra_size,timestamp_status tms_status,msgpack_packer * mp_pck,msgpack_object * obj,struct flb_stackdriver * ctx)1251 static int pack_json_payload(int insert_id_extracted,
1252                              int operation_extracted, int operation_extra_size,
1253                              int source_location_extracted,
1254                              int source_location_extra_size,
1255                              int http_request_extracted,
1256                              int http_request_extra_size,
1257                              timestamp_status tms_status,
1258                              msgpack_packer *mp_pck, msgpack_object *obj,
1259                              struct flb_stackdriver *ctx)
1260 {
1261     /* Specified fields include local_resource_id, operation, sourceLocation ... */
1262     int i, j;
1263     int to_remove = 0;
1264     int ret;
1265     int map_size;
1266     int new_map_size;
1267     int len;
1268     int len_to_be_removed;
1269     int key_not_found;
1270     flb_sds_t removed;
1271     flb_sds_t monitored_resource_key;
1272     flb_sds_t local_resource_id_key;
1273     flb_sds_t stream;
1274     msgpack_object_kv *kv = obj->via.map.ptr;
1275     msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
1276 
1277     monitored_resource_key = flb_sds_create(MONITORED_RESOURCE_KEY);
1278     local_resource_id_key = flb_sds_create(LOCAL_RESOURCE_ID_KEY);
1279     stream = flb_sds_create("stream");
1280     /*
1281      * array of elements that need to be removed from payload,
1282      * special field 'operation' will be processed individually
1283      */
1284     flb_sds_t to_be_removed[] =
1285     {
1286         monitored_resource_key,
1287         local_resource_id_key,
1288         ctx->labels_key,
1289         ctx->severity_key,
1290         ctx->trace_key,
1291         ctx->log_name_key,
1292         stream
1293         /* more special fields are required to be added, but, if this grows with more
1294            than a few records, it might need to be converted to flb_hash
1295          */
1296     };
1297 
1298     if (insert_id_extracted == FLB_TRUE) {
1299         to_remove += 1;
1300     }
1301     if (operation_extracted == FLB_TRUE && operation_extra_size == 0) {
1302         to_remove += 1;
1303     }
1304     if (source_location_extracted == FLB_TRUE && source_location_extra_size == 0) {
1305         to_remove += 1;
1306     }
1307     if (http_request_extracted == FLB_TRUE && http_request_extra_size == 0) {
1308         to_remove += 1;
1309     }
1310     if (tms_status == FORMAT_TIMESTAMP_OBJECT) {
1311         to_remove += 1;
1312     }
1313     if (tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
1314         to_remove += 2;
1315     }
1316 
1317     map_size = obj->via.map.size;
1318     len_to_be_removed = sizeof(to_be_removed) / sizeof(to_be_removed[0]);
1319     for (i = 0; i < map_size; i++) {
1320         kv = &obj->via.map.ptr[i];
1321         len = kv->key.via.str.size;
1322         for (j = 0; j < len_to_be_removed; j++) {
1323             removed = to_be_removed[j];
1324             /*
1325              * check length of key to avoid partial matching
1326              * e.g. labels key = labels && kv->key = labelss
1327              */
1328             if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
1329                 to_remove += 1;
1330                 break;
1331             }
1332         }
1333     }
1334 
1335     new_map_size = map_size - to_remove;
1336 
1337     ret = msgpack_pack_map(mp_pck, new_map_size);
1338     if (ret < 0) {
1339         goto error;
1340     }
1341 
1342     /* points back to the beginning of map */
1343     kv = obj->via.map.ptr;
1344     for(; kv != kvend; ++kv	) {
1345         key_not_found = 1;
1346 
1347         /* processing logging.googleapis.com/insertId */
1348         if (insert_id_extracted == FLB_TRUE
1349             && validate_key(kv->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
1350             continue;
1351         }
1352 
1353         /* processing logging.googleapis.com/operation */
1354         if (validate_key(kv->key, OPERATION_FIELD_IN_JSON,
1355                          OPERATION_KEY_SIZE)
1356             && kv->val.type == MSGPACK_OBJECT_MAP) {
1357             if (operation_extra_size > 0) {
1358                 msgpack_pack_object(mp_pck, kv->key);
1359                 pack_extra_operation_subfields(mp_pck, &kv->val, operation_extra_size);
1360             }
1361             continue;
1362         }
1363 
1364         if (validate_key(kv->key, SOURCELOCATION_FIELD_IN_JSON,
1365                          SOURCE_LOCATION_SIZE)
1366             && kv->val.type == MSGPACK_OBJECT_MAP) {
1367 
1368             if (source_location_extra_size > 0) {
1369                 msgpack_pack_object(mp_pck, kv->key);
1370                 pack_extra_source_location_subfields(mp_pck, &kv->val,
1371                                                      source_location_extra_size);
1372             }
1373             continue;
1374         }
1375 
1376         if (validate_key(kv->key, ctx->http_request_key,
1377                          ctx->http_request_key_size)
1378             && kv->val.type == MSGPACK_OBJECT_MAP) {
1379 
1380             if(http_request_extra_size > 0) {
1381                 msgpack_pack_object(mp_pck, kv->key);
1382                 pack_extra_http_request_subfields(mp_pck, &kv->val,
1383                                                   http_request_extra_size);
1384             }
1385             continue;
1386         }
1387 
1388         if (validate_key(kv->key, "timestamp", 9)
1389             && tms_status == FORMAT_TIMESTAMP_OBJECT) {
1390             continue;
1391         }
1392 
1393         if (validate_key(kv->key, "timestampSeconds", 16)
1394             && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
1395             continue;
1396         }
1397         if (validate_key(kv->key, "timestampNanos", 14)
1398             && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
1399             continue;
1400         }
1401 
1402         len = kv->key.via.str.size;
1403         for (j = 0; j < len_to_be_removed; j++) {
1404             removed = to_be_removed[j];
1405             if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
1406                 key_not_found = 0;
1407                 break;
1408             }
1409         }
1410 
1411         if (key_not_found) {
1412             ret = msgpack_pack_object(mp_pck, kv->key);
1413             if (ret < 0) {
1414                 goto error;
1415             }
1416             ret = msgpack_pack_object(mp_pck, kv->val);
1417             if (ret < 0) {
1418                 goto error;
1419             }
1420         }
1421     }
1422 
1423     flb_sds_destroy(monitored_resource_key);
1424     flb_sds_destroy(local_resource_id_key);
1425     flb_sds_destroy(stream);
1426     return 0;
1427 
1428     error:
1429         flb_sds_destroy(monitored_resource_key);
1430         flb_sds_destroy(local_resource_id_key);
1431         flb_sds_destroy(stream);
1432         return ret;
1433 }
1434 
stackdriver_format(struct flb_config * config,struct flb_input_instance * ins,void * plugin_context,void * flush_ctx,const char * tag,int tag_len,const void * data,size_t bytes,void ** out_data,size_t * out_size)1435 static int stackdriver_format(struct flb_config *config,
1436                               struct flb_input_instance *ins,
1437                               void *plugin_context,
1438                               void *flush_ctx,
1439                               const char *tag, int tag_len,
1440                               const void *data, size_t bytes,
1441                               void **out_data, size_t *out_size)
1442 {
1443     int len;
1444     int ret;
1445     int array_size = 0;
1446     /* The default value is 3: timestamp, jsonPayload, logName. */
1447     int entry_size = 3;
1448     size_t s;
1449     size_t off = 0;
1450     char path[PATH_MAX];
1451     char time_formatted[255];
1452     const char *newtag;
1453     const char *new_log_name;
1454     msgpack_object *obj;
1455     msgpack_object *labels_ptr;
1456     msgpack_unpacked result;
1457     msgpack_sbuffer mp_sbuf;
1458     msgpack_packer mp_pck;
1459     flb_sds_t out_buf;
1460     struct flb_stackdriver *ctx = plugin_context;
1461 
1462     /* Parameters for severity */
1463     int severity_extracted = FLB_FALSE;
1464     severity_t severity;
1465 
1466     /* Parameters for trace */
1467     int trace_extracted = FLB_FALSE;
1468     flb_sds_t trace;
1469     char stackdriver_trace[PATH_MAX];
1470     const char *new_trace;
1471 
1472     /* Parameters for log name */
1473     int log_name_extracted = FLB_FALSE;
1474     flb_sds_t log_name = NULL;
1475     flb_sds_t stream = NULL;
1476     flb_sds_t stream_key;
1477 
1478     /* Parameters for insertId */
1479     msgpack_object insert_id_obj;
1480     insert_id_status in_status;
1481     int insert_id_extracted;
1482 
1483     /* Parameters in Operation */
1484     flb_sds_t operation_id;
1485     flb_sds_t operation_producer;
1486     int operation_first = FLB_FALSE;
1487     int operation_last = FLB_FALSE;
1488     int operation_extracted = FLB_FALSE;
1489     int operation_extra_size = 0;
1490 
1491     /* Parameters for sourceLocation */
1492     flb_sds_t source_location_file;
1493     int64_t source_location_line = 0;
1494     flb_sds_t source_location_function;
1495     int source_location_extracted = FLB_FALSE;
1496     int source_location_extra_size = 0;
1497 
1498     /* Parameters for httpRequest */
1499     struct http_request_field http_request;
1500     int http_request_extracted = FLB_FALSE;
1501     int http_request_extra_size = 0;
1502 
1503     /* Parameters for Timestamp */
1504     struct tm tm;
1505     struct flb_time tms;
1506     timestamp_status tms_status;
1507 
1508     /* Count number of records */
1509     array_size = flb_mp_count(data, bytes);
1510 
1511     /*
1512      * Search each entry and validate insertId.
1513      * Reject the entry if insertId is invalid.
1514      * If all the entries are rejected, stop formatting.
1515      *
1516      */
1517     off = 0;
1518     msgpack_unpacked_init(&result);
1519     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
1520         flb_time_pop_from_msgpack(&tms, &result, &obj);
1521 
1522         /* Extract insertId */
1523         in_status = validate_insert_id(&insert_id_obj, obj);
1524         if (in_status == INSERTID_INVALID) {
1525             flb_plg_error(ctx->ins,
1526                           "Incorrect insertId received. InsertId should be non-empty string.");
1527             array_size -= 1;
1528         }
1529     }
1530     msgpack_unpacked_destroy(&result);
1531 
1532     if (array_size == 0) {
1533         *out_size = 0;
1534         return -1;
1535     }
1536 
1537     /* Create temporal msgpack buffer */
1538     msgpack_sbuffer_init(&mp_sbuf);
1539     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1540 
1541     /*
1542      * Pack root map (resource & entries):
1543      *
1544      * {"resource": {"type": "...", "labels": {...},
1545      *  "entries": []
1546      */
1547     msgpack_pack_map(&mp_pck, 2);
1548 
1549     msgpack_pack_str(&mp_pck, 8);
1550     msgpack_pack_str_body(&mp_pck, "resource", 8);
1551 
1552     /* type & labels */
1553     msgpack_pack_map(&mp_pck, 2);
1554 
1555     /* type */
1556     msgpack_pack_str(&mp_pck, 4);
1557     msgpack_pack_str_body(&mp_pck, "type", 4);
1558     msgpack_pack_str(&mp_pck, flb_sds_len(ctx->resource));
1559     msgpack_pack_str_body(&mp_pck, ctx->resource,
1560                           flb_sds_len(ctx->resource));
1561 
1562     msgpack_pack_str(&mp_pck, 6);
1563     msgpack_pack_str_body(&mp_pck, "labels", 6);
1564 
1565     if (ctx->is_k8s_resource_type) {
1566         ret = extract_local_resource_id(data, bytes, ctx, tag);
1567         if (ret != 0) {
1568             flb_plg_error(ctx->ins, "fail to construct local_resource_id");
1569             msgpack_sbuffer_destroy(&mp_sbuf);
1570             return -1;
1571         }
1572     }
1573 
1574     ret = parse_monitored_resource(ctx, data, bytes, &mp_pck);
1575     if (ret != 0) {
1576         if (strcmp(ctx->resource, "global") == 0) {
1577             /* global resource has field project_id */
1578             msgpack_pack_map(&mp_pck, 1);
1579             msgpack_pack_str(&mp_pck, 10);
1580             msgpack_pack_str_body(&mp_pck, "project_id", 10);
1581             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1582             msgpack_pack_str_body(&mp_pck,
1583                                   ctx->project_id, flb_sds_len(ctx->project_id));
1584         }
1585         else if (ctx->is_generic_resource_type) {
1586             if (strcmp(ctx->resource, "generic_node") == 0) {
1587                 /* generic_node has fields project_id, location, namespace, node_id */
1588                 msgpack_pack_map(&mp_pck, 4);
1589 
1590                 msgpack_pack_str(&mp_pck, 7);
1591                 msgpack_pack_str_body(&mp_pck, "node_id", 7);
1592                 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_id));
1593                 msgpack_pack_str_body(&mp_pck,
1594                                       ctx->node_id, flb_sds_len(ctx->node_id));
1595             }
1596             else {
1597                  /* generic_task has fields project_id, location, namespace, job, task_id */
1598                 msgpack_pack_map(&mp_pck, 5);
1599 
1600                 msgpack_pack_str(&mp_pck, 3);
1601                 msgpack_pack_str_body(&mp_pck, "job", 3);
1602                 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->job));
1603                 msgpack_pack_str_body(&mp_pck,
1604                                       ctx->job, flb_sds_len(ctx->job));
1605 
1606                 msgpack_pack_str(&mp_pck, 7);
1607                 msgpack_pack_str_body(&mp_pck, "task_id", 7);
1608                 msgpack_pack_str(&mp_pck, flb_sds_len(ctx->task_id));
1609                 msgpack_pack_str_body(&mp_pck,
1610                                       ctx->task_id, flb_sds_len(ctx->task_id));
1611             }
1612 
1613             msgpack_pack_str(&mp_pck, 10);
1614             msgpack_pack_str_body(&mp_pck, "project_id", 10);
1615             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1616             msgpack_pack_str_body(&mp_pck,
1617                                   ctx->project_id, flb_sds_len(ctx->project_id));
1618 
1619             msgpack_pack_str(&mp_pck, 8);
1620             msgpack_pack_str_body(&mp_pck, "location", 8);
1621             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->location));
1622             msgpack_pack_str_body(&mp_pck,
1623                                   ctx->location, flb_sds_len(ctx->location));
1624 
1625             msgpack_pack_str(&mp_pck, 9);
1626             msgpack_pack_str_body(&mp_pck, "namespace", 9);
1627             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_id));
1628             msgpack_pack_str_body(&mp_pck,
1629                                   ctx->namespace_id, flb_sds_len(ctx->namespace_id));
1630         }
1631         else if (strcmp(ctx->resource, "gce_instance") == 0) {
1632             /* gce_instance resource has fields project_id, zone, instance_id */
1633             msgpack_pack_map(&mp_pck, 3);
1634 
1635             msgpack_pack_str(&mp_pck, 10);
1636             msgpack_pack_str_body(&mp_pck, "project_id", 10);
1637             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1638             msgpack_pack_str_body(&mp_pck,
1639                                   ctx->project_id, flb_sds_len(ctx->project_id));
1640 
1641             msgpack_pack_str(&mp_pck, 4);
1642             msgpack_pack_str_body(&mp_pck, "zone", 4);
1643             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->zone));
1644             msgpack_pack_str_body(&mp_pck, ctx->zone, flb_sds_len(ctx->zone));
1645 
1646             msgpack_pack_str(&mp_pck, 11);
1647             msgpack_pack_str_body(&mp_pck, "instance_id", 11);
1648             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->instance_id));
1649             msgpack_pack_str_body(&mp_pck,
1650                                   ctx->instance_id, flb_sds_len(ctx->instance_id));
1651         }
1652         else if (strcmp(ctx->resource, K8S_CONTAINER) == 0) {
1653             /* k8s_container resource has fields project_id, location, cluster_name,
1654              *                                   namespace_name, pod_name, container_name
1655              *
1656              * The local_resource_id for k8s_container is in format:
1657              *    k8s_container.<namespace_name>.<pod_name>.<container_name>
1658              */
1659 
1660             ret = process_local_resource_id(ctx, tag, tag_len, K8S_CONTAINER);
1661             if (ret == -1) {
1662                 flb_plg_error(ctx->ins, "fail to extract resource labels "
1663                               "for k8s_container resource type");
1664                 msgpack_sbuffer_destroy(&mp_sbuf);
1665                 return -1;
1666             }
1667 
1668             msgpack_pack_map(&mp_pck, 6);
1669 
1670             msgpack_pack_str(&mp_pck, 10);
1671             msgpack_pack_str_body(&mp_pck, "project_id", 10);
1672             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1673             msgpack_pack_str_body(&mp_pck,
1674                                   ctx->project_id, flb_sds_len(ctx->project_id));
1675 
1676             msgpack_pack_str(&mp_pck, 8);
1677             msgpack_pack_str_body(&mp_pck, "location", 8);
1678             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
1679             msgpack_pack_str_body(&mp_pck,
1680                                   ctx->cluster_location, flb_sds_len(ctx->cluster_location));
1681 
1682             msgpack_pack_str(&mp_pck, 12);
1683             msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
1684             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
1685             msgpack_pack_str_body(&mp_pck,
1686                                   ctx->cluster_name, flb_sds_len(ctx->cluster_name));
1687 
1688             msgpack_pack_str(&mp_pck, 14);
1689             msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
1690             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
1691             msgpack_pack_str_body(&mp_pck,
1692                                   ctx->namespace_name, flb_sds_len(ctx->namespace_name));
1693 
1694             msgpack_pack_str(&mp_pck, 8);
1695             msgpack_pack_str_body(&mp_pck, "pod_name", 8);
1696             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
1697             msgpack_pack_str_body(&mp_pck,
1698                                   ctx->pod_name, flb_sds_len(ctx->pod_name));
1699 
1700             msgpack_pack_str(&mp_pck, 14);
1701             msgpack_pack_str_body(&mp_pck, "container_name", 14);
1702             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name));
1703             msgpack_pack_str_body(&mp_pck,
1704                                   ctx->container_name, flb_sds_len(ctx->container_name));
1705         }
1706         else if (strcmp(ctx->resource, K8S_NODE) == 0) {
1707             /* k8s_node resource has fields project_id, location, cluster_name, node_name
1708              *
1709              * The local_resource_id for k8s_node is in format:
1710              *      k8s_node.<node_name>
1711              */
1712 
1713             ret = process_local_resource_id(ctx, tag, tag_len, K8S_NODE);
1714             if (ret == -1) {
1715                 flb_plg_error(ctx->ins, "fail to process local_resource_id from "
1716                               "log entry for k8s_node");
1717                 msgpack_sbuffer_destroy(&mp_sbuf);
1718                 return -1;
1719             }
1720 
1721             msgpack_pack_map(&mp_pck, 4);
1722 
1723             msgpack_pack_str(&mp_pck, 10);
1724             msgpack_pack_str_body(&mp_pck, "project_id", 10);
1725             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1726             msgpack_pack_str_body(&mp_pck,
1727                                   ctx->project_id, flb_sds_len(ctx->project_id));
1728 
1729             msgpack_pack_str(&mp_pck, 8);
1730             msgpack_pack_str_body(&mp_pck, "location", 8);
1731             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
1732             msgpack_pack_str_body(&mp_pck,
1733                                   ctx->cluster_location, flb_sds_len(ctx->cluster_location));
1734 
1735             msgpack_pack_str(&mp_pck, 12);
1736             msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
1737             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
1738             msgpack_pack_str_body(&mp_pck,
1739                                   ctx->cluster_name, flb_sds_len(ctx->cluster_name));
1740 
1741             msgpack_pack_str(&mp_pck, 9);
1742             msgpack_pack_str_body(&mp_pck, "node_name", 9);
1743             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name));
1744             msgpack_pack_str_body(&mp_pck,
1745                                   ctx->node_name, flb_sds_len(ctx->node_name));
1746         }
1747         else if (strcmp(ctx->resource, K8S_POD) == 0) {
1748             /* k8s_pod resource has fields project_id, location, cluster_name,
1749              *                             namespace_name, pod_name.
1750              *
1751              * The local_resource_id for k8s_pod is in format:
1752              *      k8s_pod.<namespace_name>.<pod_name>
1753              */
1754 
1755             ret = process_local_resource_id(ctx, tag, tag_len, K8S_POD);
1756             if (ret != 0) {
1757                 flb_plg_error(ctx->ins, "fail to process local_resource_id from "
1758                               "log entry for k8s_pod");
1759                 msgpack_sbuffer_destroy(&mp_sbuf);
1760                 return -1;
1761             }
1762 
1763             msgpack_pack_map(&mp_pck, 5);
1764 
1765             msgpack_pack_str(&mp_pck, 10);
1766             msgpack_pack_str_body(&mp_pck, "project_id", 10);
1767             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
1768             msgpack_pack_str_body(&mp_pck,
1769                                   ctx->project_id, flb_sds_len(ctx->project_id));
1770 
1771             msgpack_pack_str(&mp_pck, 8);
1772             msgpack_pack_str_body(&mp_pck, "location", 8);
1773             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
1774             msgpack_pack_str_body(&mp_pck,
1775                                   ctx->cluster_location, flb_sds_len(ctx->cluster_location));
1776 
1777             msgpack_pack_str(&mp_pck, 12);
1778             msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
1779             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
1780             msgpack_pack_str_body(&mp_pck,
1781                                   ctx->cluster_name, flb_sds_len(ctx->cluster_name));
1782 
1783             msgpack_pack_str(&mp_pck, 14);
1784             msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
1785             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
1786             msgpack_pack_str_body(&mp_pck,
1787                                   ctx->namespace_name, flb_sds_len(ctx->namespace_name));
1788 
1789             msgpack_pack_str(&mp_pck, 8);
1790             msgpack_pack_str_body(&mp_pck, "pod_name", 8);
1791             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
1792             msgpack_pack_str_body(&mp_pck,
1793                                   ctx->pod_name, flb_sds_len(ctx->pod_name));
1794         }
1795         else {
1796             flb_plg_error(ctx->ins, "unsupported resource type '%s'",
1797                           ctx->resource);
1798             msgpack_sbuffer_destroy(&mp_sbuf);
1799             return -1;
1800         }
1801     }
1802     msgpack_pack_str(&mp_pck, 7);
1803     msgpack_pack_str_body(&mp_pck, "entries", 7);
1804 
1805     /* Append entries */
1806     msgpack_pack_array(&mp_pck, array_size);
1807 
1808     off = 0;
1809     msgpack_unpacked_init(&result);
1810     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
1811         /* Get timestamp */
1812         flb_time_pop_from_msgpack(&tms, &result, &obj);
1813         tms_status = extract_timestamp(obj, &tms);
1814 
1815         /*
1816          * Pack entry
1817          *
1818          * {
1819          *  "severity": "...",
1820          *  "labels": "...",
1821          *  "logName": "...",
1822          *  "jsonPayload": {...},
1823          *  "timestamp": "...",
1824          *  "trace": "..."
1825          * }
1826          */
1827         entry_size = 3;
1828 
1829         /* Extract severity */
1830         severity_extracted = FLB_FALSE;
1831         if (ctx->severity_key
1832             && get_severity_level(&severity, obj, ctx->severity_key) == 0) {
1833             severity_extracted = FLB_TRUE;
1834             entry_size += 1;
1835         }
1836 
1837         /* Extract trace */
1838         trace_extracted = FLB_FALSE;
1839         if (ctx->trace_key
1840             && get_string(&trace, obj, ctx->trace_key) == 0) {
1841             trace_extracted = FLB_TRUE;
1842             entry_size += 1;
1843         }
1844 
1845         /* Extract log name */
1846         log_name_extracted = FLB_FALSE;
1847         if (ctx->log_name_key
1848             && get_string(&log_name, obj, ctx->log_name_key) == 0) {
1849             log_name_extracted = FLB_TRUE;
1850         }
1851 
1852         /* Extract insertId */
1853         in_status = validate_insert_id(&insert_id_obj, obj);
1854         if (in_status == INSERTID_VALID) {
1855             insert_id_extracted = FLB_TRUE;
1856             entry_size += 1;
1857         }
1858         else if (in_status == INSERTID_NOT_PRESENT) {
1859             insert_id_extracted = FLB_FALSE;
1860         }
1861         else {
1862             if (log_name_extracted == FLB_TRUE) {
1863                 flb_sds_destroy(log_name);
1864             }
1865             continue;
1866         }
1867 
1868         /* Extract operation */
1869         operation_id = flb_sds_create("");
1870         operation_producer = flb_sds_create("");
1871         operation_first = FLB_FALSE;
1872         operation_last = FLB_FALSE;
1873         operation_extra_size = 0;
1874         operation_extracted = extract_operation(&operation_id, &operation_producer,
1875                                                 &operation_first, &operation_last, obj,
1876                                                 &operation_extra_size);
1877 
1878         if (operation_extracted == FLB_TRUE) {
1879             entry_size += 1;
1880         }
1881 
1882         /* Extract sourceLocation */
1883         source_location_file = flb_sds_create("");
1884         source_location_line = 0;
1885         source_location_function = flb_sds_create("");
1886         source_location_extra_size = 0;
1887         source_location_extracted = extract_source_location(&source_location_file,
1888                                                             &source_location_line,
1889                                                             &source_location_function,
1890                                                             obj,
1891                                                             &source_location_extra_size);
1892 
1893         if (source_location_extracted == FLB_TRUE) {
1894             entry_size += 1;
1895         }
1896 
1897         /* Extract httpRequest */
1898         init_http_request(&http_request);
1899         http_request_extra_size = 0;
1900         http_request_extracted = extract_http_request(&http_request, obj,
1901                                                       &http_request_extra_size);
1902         if (http_request_extracted == FLB_TRUE) {
1903             entry_size += 1;
1904         }
1905 
1906         /* Extract labels */
1907         labels_ptr = parse_labels(ctx, obj);
1908         if (labels_ptr != NULL) {
1909             if (labels_ptr->type != MSGPACK_OBJECT_MAP) {
1910                 flb_plg_error(ctx->ins, "the type of labels should be map");
1911                 flb_sds_destroy(operation_id);
1912                 flb_sds_destroy(operation_producer);
1913                 msgpack_unpacked_destroy(&result);
1914                 msgpack_sbuffer_destroy(&mp_sbuf);
1915                 return -1;
1916             }
1917             entry_size += 1;
1918         }
1919 
1920         msgpack_pack_map(&mp_pck, entry_size);
1921 
1922         /* Add severity into the log entry */
1923         if (severity_extracted == FLB_TRUE) {
1924             msgpack_pack_str(&mp_pck, 8);
1925             msgpack_pack_str_body(&mp_pck, "severity", 8);
1926             msgpack_pack_int(&mp_pck, severity);
1927         }
1928 
1929         /* Add trace into the log entry */
1930         if (trace_extracted == FLB_TRUE) {
1931             msgpack_pack_str(&mp_pck, 5);
1932             msgpack_pack_str_body(&mp_pck, "trace", 5);
1933 
1934             if (ctx->autoformat_stackdriver_trace) {
1935                 len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1,
1936                     "projects/%s/traces/%s", ctx->project_id, trace);
1937                 new_trace = stackdriver_trace;
1938             }
1939             else {
1940                 len = flb_sds_len(trace);
1941                 new_trace = trace;
1942             }
1943 
1944             msgpack_pack_str(&mp_pck, len);
1945             msgpack_pack_str_body(&mp_pck, new_trace, len);
1946             flb_sds_destroy(trace);
1947         }
1948 
1949         /* Add insertId field into the log entry */
1950         if (insert_id_extracted == FLB_TRUE) {
1951             msgpack_pack_str(&mp_pck, 8);
1952             msgpack_pack_str_body(&mp_pck, "insertId", 8);
1953             msgpack_pack_object(&mp_pck, insert_id_obj);
1954         }
1955 
1956         /* Add operation field into the log entry */
1957         if (operation_extracted == FLB_TRUE) {
1958             add_operation_field(&operation_id, &operation_producer,
1959                                 &operation_first, &operation_last, &mp_pck);
1960         }
1961 
1962         /* Add sourceLocation field into the log entry */
1963         if (source_location_extracted == FLB_TRUE) {
1964             add_source_location_field(&source_location_file, source_location_line,
1965                                       &source_location_function, &mp_pck);
1966         }
1967 
1968         /* Add httpRequest field into the log entry */
1969         if (http_request_extracted == FLB_TRUE) {
1970             add_http_request_field(&http_request, &mp_pck);
1971         }
1972 
1973         /* labels */
1974         if (labels_ptr != NULL) {
1975             msgpack_pack_str(&mp_pck, 6);
1976             msgpack_pack_str_body(&mp_pck, "labels", 6);
1977             msgpack_pack_object(&mp_pck, *labels_ptr);
1978         }
1979 
1980         /* Clean up id and producer if operation extracted */
1981         flb_sds_destroy(operation_id);
1982         flb_sds_destroy(operation_producer);
1983         flb_sds_destroy(source_location_file);
1984         flb_sds_destroy(source_location_function);
1985         destroy_http_request(&http_request);
1986 
1987         /* jsonPayload */
1988         msgpack_pack_str(&mp_pck, 11);
1989         msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
1990         pack_json_payload(insert_id_extracted,
1991                           operation_extracted, operation_extra_size,
1992                           source_location_extracted,
1993                           source_location_extra_size,
1994                           http_request_extracted,
1995                           http_request_extra_size,
1996                           tms_status,
1997                           &mp_pck, obj, ctx);
1998 
1999         /* avoid modifying the original tag */
2000         newtag = tag;
2001         stream_key = flb_sds_create("stream");
2002         if (ctx->is_k8s_resource_type
2003             && get_string(&stream, obj, stream_key) == 0) {
2004             if (flb_sds_cmp(stream, STDOUT, flb_sds_len(stream)) == 0) {
2005                 newtag = "stdout";
2006             }
2007             else if (flb_sds_cmp(stream, STDERR, flb_sds_len(stream)) == 0) {
2008                 newtag = "stderr";
2009             }
2010         }
2011 
2012         if (log_name_extracted == FLB_FALSE) {
2013             new_log_name = newtag;
2014         }
2015         else {
2016             new_log_name = log_name;
2017         }
2018 
2019         /* logName */
2020         len = snprintf(path, sizeof(path) - 1,
2021                        "projects/%s/logs/%s", ctx->export_to_project_id, new_log_name);
2022 
2023         if (log_name_extracted == FLB_TRUE) {
2024             flb_sds_destroy(log_name);
2025         }
2026 
2027         msgpack_pack_str(&mp_pck, 7);
2028         msgpack_pack_str_body(&mp_pck, "logName", 7);
2029         msgpack_pack_str(&mp_pck, len);
2030         msgpack_pack_str_body(&mp_pck, path, len);
2031         flb_sds_destroy(stream_key);
2032         flb_sds_destroy(stream);
2033 
2034         /* timestamp */
2035         msgpack_pack_str(&mp_pck, 9);
2036         msgpack_pack_str_body(&mp_pck, "timestamp", 9);
2037 
2038         /* Format the time */
2039         /*
2040          * If format is timestamp_object or timestamp_duo_fields,
2041          * tms has been updated.
2042          *
2043          * If timestamp is not presen,
2044          * use the default tms(current time).
2045          */
2046 
2047         gmtime_r(&tms.tm.tv_sec, &tm);
2048         s = strftime(time_formatted, sizeof(time_formatted) - 1,
2049                         FLB_STD_TIME_FMT, &tm);
2050         len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
2051                         ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
2052         s += len;
2053 
2054         msgpack_pack_str(&mp_pck, s);
2055         msgpack_pack_str_body(&mp_pck, time_formatted, s);
2056 
2057     }
2058 
2059     /* Convert from msgpack to JSON */
2060     out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
2061     msgpack_sbuffer_destroy(&mp_sbuf);
2062 
2063     if (!out_buf) {
2064         flb_plg_error(ctx->ins, "error formatting JSON payload");
2065         msgpack_unpacked_destroy(&result);
2066         return -1;
2067     }
2068 
2069     *out_data = out_buf;
2070     *out_size = flb_sds_len(out_buf);
2071 
2072     return 0;
2073 }
2074 
cb_stackdriver_flush(const void * data,size_t bytes,const char * tag,int tag_len,struct flb_input_instance * i_ins,void * out_context,struct flb_config * config)2075 static void cb_stackdriver_flush(const void *data, size_t bytes,
2076                                  const char *tag, int tag_len,
2077                                  struct flb_input_instance *i_ins,
2078                                  void *out_context,
2079                                  struct flb_config *config)
2080 {
2081     (void) i_ins;
2082     (void) config;
2083     int ret;
2084     int ret_code = FLB_RETRY;
2085     size_t b_sent;
2086     flb_sds_t token;
2087     flb_sds_t payload_buf;
2088     size_t payload_size;
2089     void *out_buf;
2090     size_t out_size;
2091     struct flb_stackdriver *ctx = out_context;
2092     struct flb_upstream_conn *u_conn;
2093     struct flb_http_client *c;
2094 #ifdef FLB_HAVE_METRICS
2095     char *name = (char *) flb_output_name(ctx->ins);
2096     uint64_t ts = cmt_time_now();
2097 #endif
2098 
2099     /* Get upstream connection */
2100     u_conn = flb_upstream_conn_get(ctx->u);
2101     if (!u_conn) {
2102 #ifdef FLB_HAVE_METRICS
2103         cmt_counter_inc(ctx->cmt_failed_requests,
2104                         ts, 1, (char *[]) {name});
2105 
2106         /* OLD api */
2107         flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2108 #endif
2109         FLB_OUTPUT_RETURN(FLB_RETRY);
2110     }
2111 
2112     /* Reformat msgpack to stackdriver JSON payload */
2113     ret = stackdriver_format(config, i_ins,
2114                              ctx, NULL,
2115                              tag, tag_len,
2116                              data, bytes,
2117                              &out_buf, &out_size);
2118     if (ret != 0) {
2119 #ifdef FLB_HAVE_METRICS
2120         cmt_counter_inc(ctx->cmt_failed_requests,
2121                         ts, 1, (char *[]) {name});
2122 
2123         /* OLD api */
2124         flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2125 #endif
2126         flb_upstream_conn_release(u_conn);
2127         FLB_OUTPUT_RETURN(FLB_RETRY);
2128     }
2129 
2130     payload_buf = (flb_sds_t) out_buf;
2131     payload_size = out_size;
2132 
2133     /* Get or renew Token */
2134     token = get_google_token(ctx);
2135     if (!token) {
2136         flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
2137         flb_upstream_conn_release(u_conn);
2138         flb_sds_destroy(payload_buf);
2139 #ifdef FLB_HAVE_METRICS
2140         cmt_counter_inc(ctx->cmt_failed_requests,
2141                         ts, 1, (char *[]) {name});
2142 
2143         /* OLD api */
2144         flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2145 #endif
2146         FLB_OUTPUT_RETURN(FLB_RETRY);
2147     }
2148 
2149     /* Compose HTTP Client request */
2150     c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI,
2151                         payload_buf, payload_size, NULL, 0, NULL, 0);
2152 
2153     flb_http_buffer_size(c, 4192);
2154 
2155     if (ctx->stackdriver_agent) {
2156         flb_http_add_header(c, "User-Agent", 10,
2157                             ctx->stackdriver_agent,
2158                             flb_sds_len(ctx->stackdriver_agent));
2159     }
2160     else {
2161         flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
2162     }
2163 
2164     flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
2165     flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
2166 
2167     /* Send HTTP request */
2168     ret = flb_http_do(c, &b_sent);
2169 
2170     /* validate response */
2171     if (ret != 0) {
2172         flb_plg_warn(ctx->ins, "http_do=%i", ret);
2173         ret_code = FLB_RETRY;
2174     }
2175     else {
2176         /* The request was issued successfully, validate the 'error' field */
2177         flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
2178         if (c->resp.status == 200) {
2179             ret_code = FLB_OK;
2180         }
2181         else if (c->resp.status >= 400 && c->resp.status < 500) {
2182             ret_code = FLB_ERROR;
2183             flb_plg_warn(ctx->ins, "error\n%s",
2184                 c->resp.payload);
2185         }
2186         else {
2187             if (c->resp.payload_size > 0) {
2188                 /* we got an error */
2189                 flb_plg_warn(ctx->ins, "error\n%s",
2190                              c->resp.payload);
2191             }
2192             else {
2193                 flb_plg_debug(ctx->ins, "response\n%s",
2194                               c->resp.payload);
2195             }
2196             ret_code = FLB_RETRY;
2197         }
2198     }
2199 
2200     /* Update specific stackdriver metrics */
2201 #ifdef FLB_HAVE_METRICS
2202     if (ret_code == FLB_OK) {
2203         cmt_counter_inc(ctx->cmt_successful_requests,
2204                         ts, 1, (char *[]) {name});
2205 
2206         /* OLD api */
2207         flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);
2208     }
2209     else {
2210         cmt_counter_inc(ctx->cmt_failed_requests,
2211                         ts, 1, (char *[]) {name});
2212 
2213         /* OLD api */
2214         flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
2215     }
2216 #endif
2217 
2218     /* Cleanup */
2219     flb_sds_destroy(payload_buf);
2220     flb_sds_destroy(token);
2221     flb_http_client_destroy(c);
2222     flb_upstream_conn_release(u_conn);
2223 
2224     /* Done */
2225     FLB_OUTPUT_RETURN(ret_code);
2226 }
2227 
cb_stackdriver_exit(void * data,struct flb_config * config)2228 static int cb_stackdriver_exit(void *data, struct flb_config *config)
2229 {
2230     struct flb_stackdriver *ctx = data;
2231 
2232     if (!ctx) {
2233         return -1;
2234     }
2235 
2236     flb_stackdriver_conf_destroy(ctx);
2237     return 0;
2238 }
2239 
2240 struct flb_output_plugin out_stackdriver_plugin = {
2241     .name         = "stackdriver",
2242     .description  = "Send events to Google Stackdriver Logging",
2243     .cb_init      = cb_stackdriver_init,
2244     .cb_flush     = cb_stackdriver_flush,
2245     .cb_exit      = cb_stackdriver_exit,
2246 
2247     /* Test */
2248     .test_formatter.callback = stackdriver_format,
2249 
2250     /* Plugin flags */
2251     .flags          = FLB_OUTPUT_NET | FLB_IO_TLS,
2252 };
2253