1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_output_plugin.h>
22 #include <fluent-bit/flb_unescape.h>
23 #include <fluent-bit/flb_jsmn.h>
24
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <fluent-bit/flb_utils.h>
29
30 #include "bigquery.h"
31 #include "bigquery_conf.h"
32
33
key_cmp(char * str,int len,char * cmp)34 static inline int key_cmp(char *str, int len, char *cmp) {
35
36 if (strlen(cmp) != len) {
37 return -1;
38 }
39
40 return strncasecmp(str, cmp, len);
41 }
42
flb_bigquery_read_credentials_file(struct flb_bigquery * ctx,char * creds,struct flb_bigquery_oauth_credentials * ctx_creds)43 static int flb_bigquery_read_credentials_file(struct flb_bigquery *ctx,
44 char *creds,
45 struct flb_bigquery_oauth_credentials *ctx_creds)
46 {
47 int i;
48 int ret;
49 int len;
50 int key_len;
51 int val_len;
52 int tok_size = 32;
53 char *buf;
54 char *key;
55 char *val;
56 flb_sds_t tmp;
57 struct stat st;
58 jsmn_parser parser;
59 jsmntok_t *t;
60 jsmntok_t *tokens;
61
62 /* Validate credentials path */
63 ret = stat(creds, &st);
64 if (ret == -1) {
65 flb_errno();
66 flb_plg_error(ctx->ins, "cannot open credentials file: %s",
67 creds);
68 return -1;
69 }
70
71 if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) {
72 flb_plg_error(ctx->ins, "credentials file "
73 "is not a valid file: %s", creds);
74 return -1;
75 }
76
77 /* Read file content */
78 buf = mk_file_to_buffer(creds);
79 if (!buf) {
80 flb_plg_error(ctx->ins, "error reading credentials file: %s",
81 creds);
82 return -1;
83 }
84
85 /* Parse content */
86 jsmn_init(&parser);
87 tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);
88 if (!tokens) {
89 flb_errno();
90 flb_free(buf);
91 return -1;
92 }
93
94 ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size);
95 if (ret <= 0) {
96 flb_plg_error(ctx->ins, "invalid JSON credentials file: %s",
97 creds);
98 flb_free(buf);
99 flb_free(tokens);
100 return -1;
101 }
102
103 t = &tokens[0];
104 if (t->type != JSMN_OBJECT) {
105 flb_plg_error(ctx->ins, "invalid JSON map on file: %s",
106 creds);
107 flb_free(buf);
108 flb_free(tokens);
109 return -1;
110 }
111
112 /* Parse JSON tokens */
113 for (i = 1; i < ret; i++) {
114 t = &tokens[i];
115 if (t->type != JSMN_STRING) {
116 continue;
117 }
118
119 if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){
120 break;
121 }
122
123 /* Key */
124 key = buf + t->start;
125 key_len = (t->end - t->start);
126
127 /* Value */
128 i++;
129 t = &tokens[i];
130 val = buf + t->start;
131 val_len = (t->end - t->start);
132
133 if (key_cmp(key, key_len, "type") == 0) {
134 ctx_creds->type = flb_sds_create_len(val, val_len);
135 }
136 else if (key_cmp(key, key_len, "project_id") == 0) {
137 ctx_creds->project_id = flb_sds_create_len(val, val_len);
138 }
139 else if (key_cmp(key, key_len, "private_key_id") == 0) {
140 ctx_creds->private_key_id = flb_sds_create_len(val, val_len);
141 }
142 else if (key_cmp(key, key_len, "private_key") == 0) {
143 tmp = flb_sds_create_len(val, val_len);
144 if (tmp) {
145 /* Unescape private key */
146 len = flb_sds_len(tmp);
147 ctx_creds->private_key = flb_sds_create_size(len);
148 flb_unescape_string(tmp, len,
149 &ctx_creds->private_key);
150 flb_sds_destroy(tmp);
151 }
152 }
153 else if (key_cmp(key, key_len, "client_email") == 0) {
154 ctx_creds->client_email = flb_sds_create_len(val, val_len);
155 }
156 else if (key_cmp(key, key_len, "client_id") == 0) {
157 ctx_creds->client_id = flb_sds_create_len(val, val_len);
158 }
159 else if (key_cmp(key, key_len, "auth_uri") == 0) {
160 ctx_creds->auth_uri = flb_sds_create_len(val, val_len);
161 }
162 else if (key_cmp(key, key_len, "token_uri") == 0) {
163 ctx_creds->token_uri = flb_sds_create_len(val, val_len);
164 }
165 }
166
167 flb_free(buf);
168 flb_free(tokens);
169
170 return 0;
171 }
172
173
flb_bigquery_conf_create(struct flb_output_instance * ins,struct flb_config * config)174 struct flb_bigquery *flb_bigquery_conf_create(struct flb_output_instance *ins,
175 struct flb_config *config)
176 {
177 int ret;
178 const char *tmp;
179 struct flb_bigquery *ctx;
180 struct flb_bigquery_oauth_credentials *creds;
181
182 /* Allocate config context */
183 ctx = flb_calloc(1, sizeof(struct flb_bigquery));
184 if (!ctx) {
185 flb_errno();
186 return NULL;
187 }
188 ctx->ins = ins;
189 ctx->config = config;
190
191 /* Lookup credentials file */
192 creds = flb_calloc(1, sizeof(struct flb_bigquery_oauth_credentials));
193 if (!creds) {
194 flb_errno();
195 flb_free(ctx);
196 return NULL;
197 }
198 ctx->oauth_credentials = creds;
199
200 tmp = flb_output_get_property("google_service_credentials", ins);
201 if (tmp) {
202 ctx->credentials_file = flb_sds_create(tmp);
203 }
204 else {
205 tmp = getenv("GOOGLE_SERVICE_CREDENTIALS");
206 if (tmp) {
207 ctx->credentials_file = flb_sds_create(tmp);
208 }
209 }
210
211 if (ctx->credentials_file) {
212 ret = flb_bigquery_read_credentials_file(ctx,
213 ctx->credentials_file,
214 ctx->oauth_credentials);
215 if (ret != 0) {
216 flb_bigquery_conf_destroy(ctx);
217 return NULL;
218 }
219 }
220 else {
221 /*
222 * If no credentials file has been defined, do manual lookup of the
223 * client email and the private key.
224 */
225
226 /* Service Account Email */
227 tmp = flb_output_get_property("service_account_email", ins);
228 if (tmp) {
229 creds->client_email = flb_sds_create(tmp);
230 }
231 else {
232 tmp = getenv("SERVICE_ACCOUNT_EMAIL");
233 if (tmp) {
234 creds->client_email = flb_sds_create(tmp);
235 }
236 }
237
238 /* Service Account Secret */
239 tmp = flb_output_get_property("service_account_secret", ins);
240 if (tmp) {
241 creds->private_key = flb_sds_create(tmp);
242 }
243 else {
244 tmp = getenv("SERVICE_ACCOUNT_SECRET");
245 if (tmp) {
246 creds->private_key = flb_sds_create(tmp);
247 }
248 }
249
250 if (!creds->client_email) {
251 flb_plg_error(ctx->ins, "client_email is not defined");
252 flb_bigquery_conf_destroy(ctx);
253 return NULL;
254 }
255
256 if (!creds->private_key) {
257 flb_plg_error(ctx->ins, "private_key is not defined");
258 flb_bigquery_conf_destroy(ctx);
259 return NULL;
260 }
261 }
262
263 /* config: 'project_id' */
264 tmp = flb_output_get_property("project_id", ins);
265 if (tmp) {
266 ctx->project_id = flb_sds_create(tmp);
267 }
268 else {
269 if (creds->project_id) {
270 ctx->project_id = flb_sds_create(creds->project_id);
271 if (!ctx->project_id) {
272 flb_plg_error(ctx->ins,
273 "failed extracting 'project_id' from credentials.");
274 flb_bigquery_conf_destroy(ctx);
275 return NULL;
276 }
277 }
278 else {
279 flb_plg_error(ctx->ins,
280 "no 'project_id' configured or present in credentials.");
281 flb_bigquery_conf_destroy(ctx);
282 return NULL;
283 }
284 }
285
286 /* config: 'dataset_id' */
287 tmp = flb_output_get_property("dataset_id", ins);
288 if (tmp) {
289 ctx->dataset_id = flb_sds_create(tmp);
290 }
291 else {
292 flb_plg_error(ctx->ins, "property 'dataset_id' is not defined");
293 flb_bigquery_conf_destroy(ctx);
294 return NULL;
295 }
296
297 /* config: 'table_id' */
298 tmp = flb_output_get_property("table_id", ins);
299 if (tmp) {
300 ctx->table_id = flb_sds_create(tmp);
301 }
302 else {
303 flb_plg_error(ctx->ins, "property 'table_id' is not defined");
304 flb_bigquery_conf_destroy(ctx);
305 return NULL;
306 }
307
308 /* config: 'skip_invalid_rows' */
309 tmp = flb_output_get_property("skip_invalid_rows", ins);
310 if (tmp && flb_utils_bool(tmp)) {
311 ctx->skip_invalid_rows = FLB_TRUE;
312 }
313 else {
314 ctx->skip_invalid_rows = FLB_FALSE;
315 }
316
317 /* config: 'ignore_unknown_values' */
318 tmp = flb_output_get_property("ignore_unknown_values", ins);
319 if (tmp && flb_utils_bool(tmp)) {
320 ctx->ignore_unknown_values = FLB_TRUE;
321 }
322 else {
323 ctx->ignore_unknown_values = FLB_FALSE;
324 }
325
326 /* Create the target URI */
327 ctx->uri = flb_sds_create_size(sizeof(FLB_BIGQUERY_RESOURCE_TEMPLATE)-6 +
328 flb_sds_len(ctx->project_id) +
329 flb_sds_len(ctx->dataset_id) +
330 flb_sds_len(ctx->table_id));
331 if (!ctx->uri) {
332 flb_errno();
333 flb_bigquery_conf_destroy(ctx);
334 return NULL;
335 }
336 ctx->uri = flb_sds_printf(&ctx->uri, FLB_BIGQUERY_RESOURCE_TEMPLATE,
337 ctx->project_id, ctx->dataset_id, ctx->table_id);
338
339 flb_plg_info(ctx->ins, "project='%s' dataset='%s' table='%s'",
340 ctx->project_id, ctx->dataset_id, ctx->table_id);
341
342 return ctx;
343 }
344
345
flb_bigquery_oauth_credentials_destroy(struct flb_bigquery_oauth_credentials * creds)346 int flb_bigquery_oauth_credentials_destroy(struct flb_bigquery_oauth_credentials *creds)
347 {
348 if (!creds) {
349 return -1;
350 }
351 flb_sds_destroy(creds->type);
352 flb_sds_destroy(creds->project_id);
353 flb_sds_destroy(creds->private_key_id);
354 flb_sds_destroy(creds->private_key);
355 flb_sds_destroy(creds->client_email);
356 flb_sds_destroy(creds->client_id);
357 flb_sds_destroy(creds->auth_uri);
358 flb_sds_destroy(creds->token_uri);
359
360 flb_free(creds);
361
362 return 0;
363 }
364
flb_bigquery_conf_destroy(struct flb_bigquery * ctx)365 int flb_bigquery_conf_destroy(struct flb_bigquery *ctx)
366 {
367 if (!ctx) {
368 return -1;
369 }
370
371 flb_sds_destroy(ctx->credentials_file);
372
373 flb_bigquery_oauth_credentials_destroy(ctx->oauth_credentials);
374
375 flb_sds_destroy(ctx->project_id);
376 flb_sds_destroy(ctx->dataset_id);
377 flb_sds_destroy(ctx->table_id);
378 flb_sds_destroy(ctx->uri);
379
380 if (ctx->o) {
381 flb_oauth2_destroy(ctx->o);
382 }
383
384 flb_free(ctx);
385 return 0;
386 }
387