1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 #include <errno.h>
3 #include <string.h>
4 #include <fluent-bit/flb_mem.h>
5 #include <fluent-bit/flb_avro.h>
6 #include <fluent-bit/flb_log.h>
7 #include <fluent-bit/flb_pack.h>
8 #include <msgpack.h>
9
10 #include "flb_tests_internal.h"
11
12 /* AVRO iteration tests */
13 #define AVRO_SINGLE_MAP1 FLB_TESTS_DATA_PATH "/data/avro/json_single_map_001.json"
14 #define AVRO_MULTILINE_JSON FLB_TESTS_DATA_PATH "/data/avro/live-sample.json"
15
16 const char JSON_SINGLE_MAP_001_SCHEMA[] =
17 "{\"type\":\"record\",\
18 \"name\":\"Map001\",\
19 \"fields\":[\
20 {\"name\": \"key001\", \"type\": \"int\"},\
21 {\"name\": \"key002\", \"type\": \"float\"},\
22 {\"name\": \"key003\", \"type\": \"string\"},\
23 {\"name\": \"key004\", \"type\":\
24 {\"type\": \"array\", \"items\":\
25 {\"type\": \"map\",\"values\": \"int\"}}}]}";
26
test_init(avro_value_t * aobject,avro_schema_t * aschema,const char * json_schema,const char * json_data)27 msgpack_unpacked test_init(avro_value_t *aobject, avro_schema_t *aschema, const char *json_schema, const char *json_data) {
28 char *out_buf;
29 size_t out_size;
30 int root_type;
31
32 avro_value_iface_t *aclass = flb_avro_init(aobject, (char *)json_schema, strlen(json_schema), aschema);
33 TEST_CHECK(aclass != NULL);
34
35 char *data = mk_file_to_buffer(json_data);
36 TEST_CHECK(data != NULL);
37
38 size_t len = strlen(data);
39
40 TEST_CHECK(flb_pack_json(data, len, &out_buf, &out_size, &root_type) == 0);
41
42 msgpack_unpacked msg;
43 msgpack_unpacked_init(&msg);
44 TEST_CHECK(msgpack_unpack_next(&msg, out_buf, out_size, NULL) == MSGPACK_UNPACK_SUCCESS);
45
46 avro_value_iface_decref(aclass);
47 flb_free(data);
48 flb_free(out_buf);
49
50 return msg;
51 }
52 /* Unpack msgpack per avro schema */
test_unpack_to_avro()53 void test_unpack_to_avro()
54 {
55 avro_value_t aobject;
56 avro_schema_t aschema;
57
58 msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA, AVRO_SINGLE_MAP1);
59
60 msgpack_object_print(stderr, mp.data);
61 flb_msgpack_to_avro(&aobject, &mp.data);
62
63 avro_value_t test_value;
64 TEST_CHECK(avro_value_get_by_name(&aobject, "key001", &test_value, NULL) == 0);
65
66 int val001 = 0;
67 avro_value_get_int(&test_value, &val001);
68 TEST_CHECK(val001 == 123456789);
69
70 TEST_CHECK(avro_value_get_by_name(&aobject, "key002", &test_value, NULL) == 0);
71
72 float val002 = 0.0f;
73 // for some reason its rounding to this value
74 float val002_actual = 0.999888f;
75 avro_value_get_float(&test_value, &val002);
76 char str1[80];
77 char str2[80];
78 sprintf(str1, "%f", val002);
79 sprintf(str2, "%f", val002_actual);
80 flb_info("val002:%s:\n", str1);
81 flb_info("val002_actual:%s:\n", str2);
82 TEST_CHECK((strcmp(str1, str2) == 0));
83
84 TEST_CHECK(avro_value_get_by_name(&aobject, "key003", &test_value, NULL) == 0);
85 const char *val003 = NULL;
86 size_t val003_size = 0;
87 avro_value_get_string(&test_value, &val003, &val003_size);
88 flb_info("val003_size:%zu:\n", val003_size);
89 TEST_CHECK(val003[val003_size] == '\0');
90
91 TEST_CHECK((strcmp(val003, "abcdefghijk") == 0));
92 // avro_value_get_by_name returns ths string length plus the NUL
93 TEST_CHECK(val003_size == 12);
94
95 TEST_CHECK(avro_value_get_by_name(&aobject, "key004", &test_value, NULL) == 0);
96
97 size_t asize = 0;
98 avro_value_get_size(&test_value, &asize);
99 flb_info("asize:%zu:\n", asize);
100
101 TEST_CHECK(asize == 2);
102
103 // check the first map
104 avro_value_t k8sRecord;
105 TEST_CHECK(avro_value_get_by_index(&test_value, 0, &k8sRecord, NULL) == 0);
106
107 size_t msize = 0;
108 avro_value_get_size(&k8sRecord, &msize);
109 flb_info("msize:%zu:\n", msize);
110
111 TEST_CHECK(msize == 2);
112
113 avro_value_t obj_test;
114 const char *actual_key = NULL;
115 int actual = 0;
116
117 // check the first item in the map
118 TEST_CHECK(avro_value_get_by_index(&k8sRecord, 0, &obj_test, &actual_key) == 0);
119 flb_info("actual_key:%s:\n", actual_key);
120
121 TEST_CHECK(strcmp(actual_key, "a") == 0);
122 TEST_CHECK(avro_value_get_int(&obj_test, &actual) == 0);
123 TEST_CHECK(actual == 1);
124
125 // check the second item in the map
126 TEST_CHECK(avro_value_get_by_index(&k8sRecord, 1, &obj_test, &actual_key) == 0);
127 flb_info("actual_key:%s:\n", actual_key);
128
129 TEST_CHECK(strcmp(actual_key, "b") == 0);
130 TEST_CHECK(avro_value_get_int(&obj_test, &actual) == 0);
131 TEST_CHECK(actual == 2);
132
133 // check the second map
134 TEST_CHECK(avro_value_get_by_index(&test_value, 1, &k8sRecord, NULL) == 0);
135
136 avro_value_get_size(&k8sRecord, &msize);
137 flb_info("msize:%zu:\n", msize);
138
139 TEST_CHECK(msize == 2);
140
141 avro_value_decref(&aobject);
142 avro_schema_decref(aschema);
143 msgpack_unpacked_destroy(&mp);
144 }
145
test_parse_reordered_schema()146 void test_parse_reordered_schema()
147 {
148 // test same schema but different order of fields
149 const char *ts1 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
150 const char *ts2 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
151 const char *ts3 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"newnovalue\",\"type\":\"string\"},{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
152
153 const char *schemas[] = {ts1, ts2, ts3, ts2, ts1, NULL};
154
155 int i=0;
156 for (i=0; schemas[i] != NULL ; i++) {
157
158 avro_value_t aobject = {0};
159 avro_schema_t aschema = {0};
160
161 msgpack_unpacked msg = test_init(&aobject, &aschema, schemas[i], AVRO_MULTILINE_JSON);
162
163 msgpack_object_print(stderr, msg.data);
164
165 flb_msgpack_to_avro(&aobject, &msg.data);
166
167 avro_value_t log0;
168 TEST_CHECK(avro_value_get_by_name(&aobject, "log", &log0, NULL) == 0);
169
170 size_t size1 = 0;
171 const char *log_line = NULL;
172 TEST_CHECK(avro_value_get_string(&log0, &log_line, &size1) == 0);
173 char *pre = "2020-08-21T15:49:48.154291375Z";
174 TEST_CHECK((strncmp(pre, log_line, strlen(pre)) == 0));
175 flb_info("log_line len:%zu:\n", strlen(log_line));
176
177 avro_value_t kubernetes0;
178 TEST_CHECK(avro_value_get_by_name(&aobject, "kubernetes", &kubernetes0, NULL) == 0);
179
180 avro_value_get_size(&kubernetes0, &size1);
181 flb_info("asize:%zu:\n", size1);
182 TEST_CHECK(size1 == 10);
183
184 avro_value_t pn;
185 TEST_CHECK(avro_value_get_by_name(&kubernetes0, "pod_name", &pn, NULL) == 0);
186
187 const char *pod_name = NULL;
188 size_t pod_name_size = 0;
189 TEST_CHECK(avro_value_get_string(&pn, &pod_name, &pod_name_size) == 0);
190 TEST_CHECK(strcmp(pod_name, "rrrr-bert-completion-tb1-6786c9c8-wj25m") == 0);
191 TEST_CHECK(pod_name[pod_name_size] == '\0');
192 TEST_CHECK(strlen(pod_name) == (pod_name_size-1));
193
194 avro_value_t nn;
195 TEST_CHECK(avro_value_get_by_name(&kubernetes0, "namespace_name", &nn, NULL) == 0);
196
197 const char *namespace_name = NULL;
198 size_t namespace_name_size = 0;
199 TEST_CHECK(avro_value_get_string(&nn, &namespace_name, &namespace_name_size) == 0);
200 TEST_CHECK(strcmp(namespace_name, "k8s-fgg") == 0);
201
202 avro_value_t mapX;
203 TEST_CHECK(avro_value_get_by_name(&kubernetes0, "annotations", &mapX, NULL) == 0);
204
205 avro_value_get_size(&mapX, &size1);
206 flb_info("asize:%zu:\n", size1);
207
208 TEST_CHECK(size1 == 5);
209
210 // check the first item in the map
211 avro_value_t doas;
212 TEST_CHECK(avro_value_get_by_name(&mapX, "doAs", &doas, NULL) == 0);
213 const char *doaser = NULL;
214 size_t doaser_size;
215 TEST_CHECK(avro_value_get_string(&doas, &doaser, &doaser_size) == 0);
216 TEST_CHECK((strcmp(doaser, "weeb") == 0));
217
218 // check the second item in the map
219 avro_value_t iddecorator;
220 TEST_CHECK(avro_value_get_by_name(&mapX, "iddecorator.dkdk.username", &iddecorator, NULL) == 0);
221 const char *idder = NULL;
222 size_t idder_size;
223 TEST_CHECK(avro_value_get_string(&iddecorator, &idder, &idder_size) == 0);
224 TEST_CHECK((strcmp(idder, "rrrr") == 0));
225
226 avro_schema_decref(aschema);
227 msgpack_unpacked_destroy(&msg);
228 avro_value_decref(&aobject);
229 }
230
231 }
232
233 // int msgpack2avro(avro_value_t *val, msgpack_object *o)
234 // get a schema for a type like this:
235 // http://avro.apache.org/docs/current/api/c/index.html#_examples
236 // ../lib/msgpack-3.2.0/include/msgpack/pack.h
237 // static int msgpack_pack_nil(msgpack_packer* pk);
test_msgpack2avro()238 void test_msgpack2avro()
239 {
240 avro_value_t aobject;
241 avro_schema_t schema = avro_schema_null();
242 avro_value_iface_t *aclass = avro_generic_class_from_schema(schema);
243 avro_generic_value_new(aclass, &aobject);
244
245 msgpack_sbuffer sbuf;
246 msgpack_packer pk;
247 msgpack_zone mempool;
248 msgpack_object deserialized;
249
250 /* msgpack::sbuffer is a simple buffer implementation. */
251 msgpack_sbuffer_init(&sbuf);
252
253 /* serialize values into the buffer using msgpack_sbuffer_write callback function. */
254 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
255
256 msgpack_pack_nil(&pk);
257
258 /* deserialize the buffer into msgpack_object instance. */
259 /* deserialized object is valid during the msgpack_zone instance alive. */
260 msgpack_zone_init(&mempool, 2048);
261 msgpack_unpack(sbuf.data, sbuf.size, NULL, &mempool, &deserialized);
262
263 TEST_CHECK((msgpack2avro(&aobject, &deserialized) == FLB_TRUE));
264
265 msgpack_zone_destroy(&mempool);
266 msgpack_sbuffer_destroy(&sbuf);
267 }
268 const char JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION[] =
269 "{\"type\":\"record\",\
270 \"name\":\"Map001\",\
271 \"fields\":[\
272 {\"name\": \"key001\", \"type\": \"int\"},\
273 {\"name\": \"key002\", \"type\": \"float\"},\
274 {\"name\": \"key003\", \"type\": \"string\"},\
275 { \
276 \"name\": \"status\", \
277 \"default\": null, \
278 \"type\": [\"null\", \"string\"] \
279 }, \
280 {\"name\": \"key004\", \"type\":\
281 {\"type\": \"array\", \"items\":\
282 {\"type\": \"map\",\"values\": \"int\"}}}]}";
test_union_type_sanity()283 void test_union_type_sanity()
284 {
285 avro_value_t aobject;
286 avro_schema_t aschema;
287
288 msgpack_unpacked msg = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1);
289
290 msgpack_object_print(stderr, msg.data);
291 flb_msgpack_to_avro(&aobject, &msg.data);
292
293 size_t totalSize = 0;
294 avro_value_get_size(&aobject, &totalSize);
295 flb_info("totalSize:%zu:\n", totalSize);
296 // this is key001,2,3,4 and the status field which is the union type
297 TEST_CHECK(totalSize == 5);
298
299 avro_value_t test_value;
300 TEST_CHECK(avro_value_get_by_name(&aobject, "key001", &test_value, NULL) == 0);
301
302 int val001 = 0;
303 avro_value_get_int(&test_value, &val001);
304 TEST_CHECK(val001 == 123456789);
305
306 TEST_CHECK(avro_value_get_by_name(&aobject, "key002", &test_value, NULL) == 0);
307
308 float val002 = 0.0f;
309 // for some reason its rounding to this value
310 float val002_actual = 0.999888f;
311 avro_value_get_float(&test_value, &val002);
312 char str1[80];
313 char str2[80];
314 sprintf(str1, "%f", val002);
315 sprintf(str2, "%f", val002_actual);
316 flb_info("val002:%s:\n", str1);
317 flb_info("val002_actual:%s:\n", str2);
318 TEST_CHECK((strcmp(str1, str2) == 0));
319
320 TEST_CHECK(avro_value_get_by_name(&aobject, "key003", &test_value, NULL) == 0);
321 const char *val003 = NULL;
322 size_t val003_size = 0;
323 TEST_CHECK(avro_value_get_string(&test_value, &val003, &val003_size) == 0);
324 flb_info("val003_size:%zu:\n", val003_size);
325 TEST_CHECK(val003[val003_size] == '\0');
326
327 TEST_CHECK((strcmp(val003, "abcdefghijk") == 0));
328 // avro_value_get_by_name returns ths string length plus the NUL
329 TEST_CHECK(val003_size == 12);
330
331 TEST_CHECK(avro_value_get_by_name(&aobject, "key004", &test_value, NULL) == 0);
332
333 size_t asize = 0;
334 avro_value_get_size(&test_value, &asize);
335 flb_info("asize:%zu:\n", asize);
336
337 TEST_CHECK(asize == 2);
338
339 TEST_CHECK(avro_value_get_by_name(&aobject, "status", &test_value, NULL) == 0);
340
341 avro_value_decref(&aobject);
342 avro_schema_decref(aschema);
343 msgpack_unpacked_destroy(&msg);
344 }
345
test_union_type_branches()346 void test_union_type_branches()
347 {
348 avro_value_t aobject;
349 avro_schema_t aschema;
350
351 msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1);
352
353 flb_msgpack_to_avro(&aobject, &mp.data);
354
355 avro_value_t test_value;
356 TEST_CHECK(avro_value_get_by_name(&aobject, "status", &test_value, NULL) == 0);
357 TEST_CHECK(avro_value_get_type(&test_value) == AVRO_UNION);
358
359 int discriminant = 0;
360 TEST_CHECK(avro_value_get_discriminant(&test_value, &discriminant) == 0);
361 TEST_CHECK(discriminant == -1);
362
363 avro_value_t branch;
364 TEST_CHECK(avro_value_get_current_branch(&test_value, &branch) != 0);
365
366 TEST_CHECK(avro_value_set_branch(&test_value, 0, &branch) == 0);
367 TEST_CHECK(avro_value_set_null(&branch) == 0);
368
369 TEST_CHECK(avro_value_get_null(&branch) == 0);
370
371 avro_value_decref(&aobject);
372 avro_schema_decref(aschema);
373 msgpack_unpacked_destroy(&mp);
374 }
375 TEST_LIST = {
376 /* Avro */
377 { "msgpack_to_avro_basic", test_unpack_to_avro},
378 { "test_parse_reordered_schema", test_parse_reordered_schema},
379 { "test_union_type_sanity", test_union_type_sanity},
380 { "test_union_type_branches", test_union_type_branches},
381 { 0 }
382 };
383