1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_unescape.h>
23 #include <fluent-bit/flb_jsmn.h>
24 
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <fluent-bit/flb_utils.h>
29 
30 #include "bigquery.h"
31 #include "bigquery_conf.h"
32 
33 
key_cmp(char * str,int len,char * cmp)34 static inline int key_cmp(char *str, int len, char *cmp) {
35 
36     if (strlen(cmp) != len) {
37         return -1;
38     }
39 
40     return strncasecmp(str, cmp, len);
41 }
42 
flb_bigquery_read_credentials_file(struct flb_bigquery * ctx,char * creds,struct flb_bigquery_oauth_credentials * ctx_creds)43 static int flb_bigquery_read_credentials_file(struct flb_bigquery *ctx,
44                                               char *creds,
45                                               struct flb_bigquery_oauth_credentials *ctx_creds)
46 {
47     int i;
48     int ret;
49     int len;
50     int key_len;
51     int val_len;
52     int tok_size = 32;
53     char *buf;
54     char *key;
55     char *val;
56     flb_sds_t tmp;
57     struct stat st;
58     jsmn_parser parser;
59     jsmntok_t *t;
60     jsmntok_t *tokens;
61 
62     /* Validate credentials path */
63     ret = stat(creds, &st);
64     if (ret == -1) {
65         flb_errno();
66         flb_plg_error(ctx->ins, "cannot open credentials file: %s",
67                       creds);
68         return -1;
69     }
70 
71     if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) {
72         flb_plg_error(ctx->ins, "credentials file "
73                       "is not a valid file: %s", creds);
74         return -1;
75     }
76 
77     /* Read file content */
78     buf = mk_file_to_buffer(creds);
79     if (!buf) {
80         flb_plg_error(ctx->ins, "error reading credentials file: %s",
81                       creds);
82         return -1;
83     }
84 
85     /* Parse content */
86     jsmn_init(&parser);
87     tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);
88     if (!tokens) {
89         flb_errno();
90         flb_free(buf);
91         return -1;
92     }
93 
94     ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size);
95     if (ret <= 0) {
96         flb_plg_error(ctx->ins, "invalid JSON credentials file: %s",
97                       creds);
98         flb_free(buf);
99         flb_free(tokens);
100         return -1;
101     }
102 
103     t = &tokens[0];
104     if (t->type != JSMN_OBJECT) {
105         flb_plg_error(ctx->ins, "invalid JSON map on file: %s",
106                       creds);
107         flb_free(buf);
108         flb_free(tokens);
109         return -1;
110     }
111 
112     /* Parse JSON tokens */
113     for (i = 1; i < ret; i++) {
114         t = &tokens[i];
115         if (t->type != JSMN_STRING) {
116             continue;
117         }
118 
119         if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){
120             break;
121         }
122 
123         /* Key */
124         key = buf + t->start;
125         key_len = (t->end - t->start);
126 
127         /* Value */
128         i++;
129         t = &tokens[i];
130         val = buf + t->start;
131         val_len = (t->end - t->start);
132 
133         if (key_cmp(key, key_len, "type") == 0) {
134             ctx_creds->type = flb_sds_create_len(val, val_len);
135         }
136         else if (key_cmp(key, key_len, "project_id") == 0) {
137             ctx_creds->project_id = flb_sds_create_len(val, val_len);
138         }
139         else if (key_cmp(key, key_len, "private_key_id") == 0) {
140             ctx_creds->private_key_id = flb_sds_create_len(val, val_len);
141         }
142         else if (key_cmp(key, key_len, "private_key") == 0) {
143             tmp = flb_sds_create_len(val, val_len);
144             if (tmp) {
145                 /* Unescape private key */
146                 len = flb_sds_len(tmp);
147                 ctx_creds->private_key = flb_sds_create_size(len);
148                 flb_unescape_string(tmp, len,
149                                     &ctx_creds->private_key);
150                 flb_sds_destroy(tmp);
151             }
152         }
153         else if (key_cmp(key, key_len, "client_email") == 0) {
154             ctx_creds->client_email = flb_sds_create_len(val, val_len);
155         }
156         else if (key_cmp(key, key_len, "client_id") == 0) {
157             ctx_creds->client_id = flb_sds_create_len(val, val_len);
158         }
159         else if (key_cmp(key, key_len, "auth_uri") == 0) {
160             ctx_creds->auth_uri = flb_sds_create_len(val, val_len);
161         }
162         else if (key_cmp(key, key_len, "token_uri") == 0) {
163             ctx_creds->token_uri = flb_sds_create_len(val, val_len);
164         }
165     }
166 
167     flb_free(buf);
168     flb_free(tokens);
169 
170     return 0;
171 }
172 
173 
flb_bigquery_conf_create(struct flb_output_instance * ins,struct flb_config * config)174 struct flb_bigquery *flb_bigquery_conf_create(struct flb_output_instance *ins,
175                                               struct flb_config *config)
176 {
177     int ret;
178     const char *tmp;
179     struct flb_bigquery *ctx;
180     struct flb_bigquery_oauth_credentials *creds;
181 
182     /* Allocate config context */
183     ctx = flb_calloc(1, sizeof(struct flb_bigquery));
184     if (!ctx) {
185         flb_errno();
186         return NULL;
187     }
188     ctx->ins = ins;
189     ctx->config = config;
190 
191     /* Lookup credentials file */
192     creds = flb_calloc(1, sizeof(struct flb_bigquery_oauth_credentials));
193     if (!creds) {
194         flb_errno();
195         flb_free(ctx);
196         return NULL;
197     }
198     ctx->oauth_credentials = creds;
199 
200     tmp = flb_output_get_property("google_service_credentials", ins);
201     if (tmp) {
202         ctx->credentials_file = flb_sds_create(tmp);
203     }
204     else {
205         tmp = getenv("GOOGLE_SERVICE_CREDENTIALS");
206         if (tmp) {
207             ctx->credentials_file = flb_sds_create(tmp);
208         }
209     }
210 
211     if (ctx->credentials_file) {
212         ret = flb_bigquery_read_credentials_file(ctx,
213                                                  ctx->credentials_file,
214                                                  ctx->oauth_credentials);
215         if (ret != 0) {
216             flb_bigquery_conf_destroy(ctx);
217             return NULL;
218         }
219     }
220     else {
221         /*
222          * If no credentials file has been defined, do manual lookup of the
223          * client email and the private key.
224          */
225 
226         /* Service Account Email */
227         tmp = flb_output_get_property("service_account_email", ins);
228         if (tmp) {
229             creds->client_email = flb_sds_create(tmp);
230         }
231         else {
232             tmp = getenv("SERVICE_ACCOUNT_EMAIL");
233             if (tmp) {
234                 creds->client_email = flb_sds_create(tmp);
235             }
236         }
237 
238         /* Service Account Secret */
239         tmp = flb_output_get_property("service_account_secret", ins);
240         if (tmp) {
241             creds->private_key = flb_sds_create(tmp);
242         }
243         else {
244             tmp = getenv("SERVICE_ACCOUNT_SECRET");
245             if (tmp) {
246                 creds->private_key = flb_sds_create(tmp);
247             }
248         }
249 
250         if (!creds->client_email) {
251             flb_plg_error(ctx->ins, "client_email is not defined");
252             flb_bigquery_conf_destroy(ctx);
253             return NULL;
254         }
255 
256         if (!creds->private_key) {
257             flb_plg_error(ctx->ins, "private_key is not defined");
258             flb_bigquery_conf_destroy(ctx);
259             return NULL;
260         }
261     }
262 
263     /* config: 'project_id' */
264     tmp = flb_output_get_property("project_id", ins);
265     if (tmp) {
266         ctx->project_id = flb_sds_create(tmp);
267     }
268     else {
269        if (creds->project_id) {
270             ctx->project_id = flb_sds_create(creds->project_id);
271             if (!ctx->project_id) {
272                 flb_plg_error(ctx->ins,
273                               "failed extracting 'project_id' from credentials.");
274                 flb_bigquery_conf_destroy(ctx);
275                 return NULL;
276             }
277         }
278         else {
279             flb_plg_error(ctx->ins,
280                           "no 'project_id' configured or present in credentials.");
281             flb_bigquery_conf_destroy(ctx);
282             return NULL;
283         }
284     }
285 
286     /* config: 'dataset_id' */
287     tmp = flb_output_get_property("dataset_id", ins);
288     if (tmp) {
289         ctx->dataset_id = flb_sds_create(tmp);
290     }
291     else {
292         flb_plg_error(ctx->ins, "property 'dataset_id' is not defined");
293         flb_bigquery_conf_destroy(ctx);
294         return NULL;
295     }
296 
297     /* config: 'table_id' */
298     tmp = flb_output_get_property("table_id", ins);
299     if (tmp) {
300         ctx->table_id = flb_sds_create(tmp);
301     }
302     else {
303         flb_plg_error(ctx->ins, "property 'table_id' is not defined");
304         flb_bigquery_conf_destroy(ctx);
305         return NULL;
306     }
307 
308     /* config: 'skip_invalid_rows' */
309     tmp = flb_output_get_property("skip_invalid_rows", ins);
310     if (tmp && flb_utils_bool(tmp)) {
311         ctx->skip_invalid_rows = FLB_TRUE;
312     }
313     else {
314         ctx->skip_invalid_rows = FLB_FALSE;
315     }
316 
317     /* config: 'ignore_unknown_values' */
318     tmp = flb_output_get_property("ignore_unknown_values", ins);
319     if (tmp && flb_utils_bool(tmp)) {
320         ctx->ignore_unknown_values = FLB_TRUE;
321     }
322     else {
323         ctx->ignore_unknown_values = FLB_FALSE;
324     }
325 
326     /* Create the target URI */
327     ctx->uri = flb_sds_create_size(sizeof(FLB_BIGQUERY_RESOURCE_TEMPLATE)-6 +
328                                    flb_sds_len(ctx->project_id) +
329                                    flb_sds_len(ctx->dataset_id) +
330                                    flb_sds_len(ctx->table_id));
331     if (!ctx->uri) {
332         flb_errno();
333         flb_bigquery_conf_destroy(ctx);
334         return NULL;
335     }
336     ctx->uri = flb_sds_printf(&ctx->uri, FLB_BIGQUERY_RESOURCE_TEMPLATE,
337                               ctx->project_id, ctx->dataset_id, ctx->table_id);
338 
339     flb_plg_info(ctx->ins, "project='%s' dataset='%s' table='%s'",
340                  ctx->project_id, ctx->dataset_id, ctx->table_id);
341 
342     return ctx;
343 }
344 
345 
flb_bigquery_oauth_credentials_destroy(struct flb_bigquery_oauth_credentials * creds)346 int flb_bigquery_oauth_credentials_destroy(struct flb_bigquery_oauth_credentials *creds)
347 {
348     if (!creds) {
349         return -1;
350     }
351     flb_sds_destroy(creds->type);
352     flb_sds_destroy(creds->project_id);
353     flb_sds_destroy(creds->private_key_id);
354     flb_sds_destroy(creds->private_key);
355     flb_sds_destroy(creds->client_email);
356     flb_sds_destroy(creds->client_id);
357     flb_sds_destroy(creds->auth_uri);
358     flb_sds_destroy(creds->token_uri);
359 
360     flb_free(creds);
361 
362     return 0;
363 }
364 
flb_bigquery_conf_destroy(struct flb_bigquery * ctx)365 int flb_bigquery_conf_destroy(struct flb_bigquery *ctx)
366 {
367     if (!ctx) {
368         return -1;
369     }
370 
371     flb_sds_destroy(ctx->credentials_file);
372 
373     flb_bigquery_oauth_credentials_destroy(ctx->oauth_credentials);
374 
375     flb_sds_destroy(ctx->project_id);
376     flb_sds_destroy(ctx->dataset_id);
377     flb_sds_destroy(ctx->table_id);
378     flb_sds_destroy(ctx->uri);
379 
380     if (ctx->o) {
381         flb_oauth2_destroy(ctx->o);
382     }
383 
384     flb_free(ctx);
385     return 0;
386 }
387