1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 #include <errno.h>
3 #include <string.h>
4 #include <fluent-bit/flb_mem.h>
5 #include <fluent-bit/flb_avro.h>
6 #include <fluent-bit/flb_log.h>
7 #include <fluent-bit/flb_pack.h>
8 #include <msgpack.h>
9 
10 #include "flb_tests_internal.h"
11 
12 /* AVRO iteration tests */
13 #define AVRO_SINGLE_MAP1 FLB_TESTS_DATA_PATH "/data/avro/json_single_map_001.json"
14 #define AVRO_MULTILINE_JSON FLB_TESTS_DATA_PATH "/data/avro/live-sample.json"
15 
16 const char  JSON_SINGLE_MAP_001_SCHEMA[] =
17 "{\"type\":\"record\",\
18   \"name\":\"Map001\",\
19   \"fields\":[\
20      {\"name\": \"key001\", \"type\": \"int\"},\
21      {\"name\": \"key002\", \"type\": \"float\"},\
22      {\"name\": \"key003\", \"type\": \"string\"},\
23      {\"name\": \"key004\", \"type\":\
24         {\"type\": \"array\", \"items\":\
25              {\"type\": \"map\",\"values\": \"int\"}}}]}";
26 
test_init(avro_value_t * aobject,avro_schema_t * aschema,const char * json_schema,const char * json_data)27 msgpack_unpacked test_init(avro_value_t *aobject, avro_schema_t *aschema, const char *json_schema, const char *json_data) {
28     char *out_buf;
29     size_t out_size;
30     int root_type;
31 
32     avro_value_iface_t  *aclass = flb_avro_init(aobject, (char *)json_schema, strlen(json_schema), aschema);
33     TEST_CHECK(aclass != NULL);
34 
35     char *data = mk_file_to_buffer(json_data);
36     TEST_CHECK(data != NULL);
37 
38     size_t len = strlen(data);
39 
40     TEST_CHECK(flb_pack_json(data, len, &out_buf, &out_size, &root_type) == 0);
41 
42     msgpack_unpacked msg;
43     msgpack_unpacked_init(&msg);
44     TEST_CHECK(msgpack_unpack_next(&msg, out_buf, out_size, NULL) == MSGPACK_UNPACK_SUCCESS);
45 
46     avro_value_iface_decref(aclass);
47     flb_free(data);
48     flb_free(out_buf);
49 
50     return msg;
51 }
52 /* Unpack msgpack per avro schema */
test_unpack_to_avro()53 void test_unpack_to_avro()
54 {
55     avro_value_t  aobject;
56     avro_schema_t aschema;
57 
58     msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA, AVRO_SINGLE_MAP1);
59 
60     msgpack_object_print(stderr, mp.data);
61     flb_msgpack_to_avro(&aobject, &mp.data);
62 
63     avro_value_t test_value;
64     TEST_CHECK(avro_value_get_by_name(&aobject, "key001", &test_value, NULL) == 0);
65 
66     int val001 = 0;
67     avro_value_get_int(&test_value, &val001);
68     TEST_CHECK(val001 == 123456789);
69 
70     TEST_CHECK(avro_value_get_by_name(&aobject, "key002", &test_value, NULL) == 0);
71 
72     float val002 = 0.0f;
73     // for some reason its rounding to this value
74     float val002_actual = 0.999888f;
75     avro_value_get_float(&test_value, &val002);
76     char str1[80];
77     char str2[80];
78     sprintf(str1, "%f", val002);
79     sprintf(str2, "%f", val002_actual);
80     flb_info("val002:%s:\n", str1);
81     flb_info("val002_actual:%s:\n", str2);
82     TEST_CHECK((strcmp(str1, str2) == 0));
83 
84     TEST_CHECK(avro_value_get_by_name(&aobject, "key003", &test_value, NULL) == 0);
85     const char *val003 = NULL;
86     size_t val003_size = 0;
87     avro_value_get_string(&test_value, &val003, &val003_size);
88     flb_info("val003_size:%zu:\n", val003_size);
89     TEST_CHECK(val003[val003_size] == '\0');
90 
91     TEST_CHECK((strcmp(val003, "abcdefghijk") == 0));
92     // avro_value_get_by_name returns ths string length plus the NUL
93     TEST_CHECK(val003_size == 12);
94 
95     TEST_CHECK(avro_value_get_by_name(&aobject, "key004", &test_value, NULL) == 0);
96 
97     size_t asize = 0;
98     avro_value_get_size(&test_value, &asize);
99     flb_info("asize:%zu:\n", asize);
100 
101     TEST_CHECK(asize == 2);
102 
103     // check the first map
104     avro_value_t k8sRecord;
105     TEST_CHECK(avro_value_get_by_index(&test_value, 0, &k8sRecord, NULL) == 0);
106 
107     size_t msize = 0;
108     avro_value_get_size(&k8sRecord, &msize);
109     flb_info("msize:%zu:\n", msize);
110 
111     TEST_CHECK(msize == 2);
112 
113     avro_value_t obj_test;
114     const char  *actual_key = NULL;
115     int  actual = 0;
116 
117     // check the first item in the map
118     TEST_CHECK(avro_value_get_by_index(&k8sRecord, 0, &obj_test, &actual_key) == 0);
119     flb_info("actual_key:%s:\n", actual_key);
120 
121     TEST_CHECK(strcmp(actual_key, "a") == 0);
122     TEST_CHECK(avro_value_get_int(&obj_test, &actual) == 0);
123     TEST_CHECK(actual == 1);
124 
125     // check the second item in the map
126     TEST_CHECK(avro_value_get_by_index(&k8sRecord, 1, &obj_test, &actual_key) == 0);
127     flb_info("actual_key:%s:\n", actual_key);
128 
129     TEST_CHECK(strcmp(actual_key, "b") == 0);
130     TEST_CHECK(avro_value_get_int(&obj_test, &actual) == 0);
131     TEST_CHECK(actual == 2);
132 
133     // check the second map
134     TEST_CHECK(avro_value_get_by_index(&test_value, 1, &k8sRecord, NULL) == 0);
135 
136     avro_value_get_size(&k8sRecord, &msize);
137     flb_info("msize:%zu:\n", msize);
138 
139     TEST_CHECK(msize == 2);
140 
141     avro_value_decref(&aobject);
142     avro_schema_decref(aschema);
143     msgpack_unpacked_destroy(&mp);
144 }
145 
test_parse_reordered_schema()146 void test_parse_reordered_schema()
147 {
148     // test same schema but different order of fields
149     const char *ts1 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
150     const char *ts2 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
151     const char *ts3 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"newnovalue\",\"type\":\"string\"},{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
152 
153     const char *schemas[] = {ts1, ts2, ts3, ts2, ts1, NULL};
154 
155     int i=0;
156     for (i=0; schemas[i] != NULL ; i++) {
157 
158         avro_value_t  aobject = {0};
159         avro_schema_t aschema = {0};
160 
161         msgpack_unpacked msg = test_init(&aobject, &aschema, schemas[i], AVRO_MULTILINE_JSON);
162 
163         msgpack_object_print(stderr, msg.data);
164 
165         flb_msgpack_to_avro(&aobject, &msg.data);
166 
167         avro_value_t log0;
168         TEST_CHECK(avro_value_get_by_name(&aobject, "log", &log0, NULL) == 0);
169 
170         size_t size1 = 0;
171         const char  *log_line = NULL;
172         TEST_CHECK(avro_value_get_string(&log0, &log_line, &size1) == 0);
173         char *pre = "2020-08-21T15:49:48.154291375Z";
174         TEST_CHECK((strncmp(pre, log_line, strlen(pre)) == 0));
175         flb_info("log_line len:%zu:\n", strlen(log_line));
176 
177         avro_value_t kubernetes0;
178         TEST_CHECK(avro_value_get_by_name(&aobject, "kubernetes", &kubernetes0, NULL) == 0);
179 
180         avro_value_get_size(&kubernetes0, &size1);
181         flb_info("asize:%zu:\n", size1);
182         TEST_CHECK(size1 == 10);
183 
184         avro_value_t pn;
185         TEST_CHECK(avro_value_get_by_name(&kubernetes0, "pod_name", &pn, NULL) == 0);
186 
187         const char *pod_name = NULL;
188         size_t pod_name_size = 0;
189         TEST_CHECK(avro_value_get_string(&pn, &pod_name, &pod_name_size) == 0);
190         TEST_CHECK(strcmp(pod_name, "rrrr-bert-completion-tb1-6786c9c8-wj25m") == 0);
191         TEST_CHECK(pod_name[pod_name_size] == '\0');
192         TEST_CHECK(strlen(pod_name) == (pod_name_size-1));
193 
194         avro_value_t nn;
195         TEST_CHECK(avro_value_get_by_name(&kubernetes0, "namespace_name", &nn, NULL) == 0);
196 
197         const char *namespace_name = NULL;
198         size_t namespace_name_size = 0;
199         TEST_CHECK(avro_value_get_string(&nn, &namespace_name, &namespace_name_size) == 0);
200         TEST_CHECK(strcmp(namespace_name, "k8s-fgg") == 0);
201 
202         avro_value_t mapX;
203         TEST_CHECK(avro_value_get_by_name(&kubernetes0, "annotations", &mapX, NULL) == 0);
204 
205         avro_value_get_size(&mapX, &size1);
206         flb_info("asize:%zu:\n", size1);
207 
208         TEST_CHECK(size1 == 5);
209 
210         // check the first item in the map
211         avro_value_t doas;
212         TEST_CHECK(avro_value_get_by_name(&mapX, "doAs", &doas, NULL) == 0);
213         const char *doaser = NULL;
214         size_t doaser_size;
215         TEST_CHECK(avro_value_get_string(&doas, &doaser, &doaser_size) == 0);
216         TEST_CHECK((strcmp(doaser, "weeb") == 0));
217 
218         // check the second item in the map
219         avro_value_t iddecorator;
220         TEST_CHECK(avro_value_get_by_name(&mapX, "iddecorator.dkdk.username", &iddecorator, NULL) == 0);
221         const char *idder = NULL;
222         size_t idder_size;
223         TEST_CHECK(avro_value_get_string(&iddecorator, &idder, &idder_size) == 0);
224         TEST_CHECK((strcmp(idder, "rrrr") == 0));
225 
226         avro_schema_decref(aschema);
227         msgpack_unpacked_destroy(&msg);
228         avro_value_decref(&aobject);
229     }
230 
231 }
232 
233 // int msgpack2avro(avro_value_t *val, msgpack_object *o)
234 // get a schema for a type like this:
235 // http://avro.apache.org/docs/current/api/c/index.html#_examples
236 // ../lib/msgpack-3.2.0/include/msgpack/pack.h
237 // static int msgpack_pack_nil(msgpack_packer* pk);
test_msgpack2avro()238 void test_msgpack2avro()
239 {
240     avro_value_t  aobject;
241     avro_schema_t schema = avro_schema_null();
242     avro_value_iface_t  *aclass = avro_generic_class_from_schema(schema);
243     avro_generic_value_new(aclass, &aobject);
244 
245     msgpack_sbuffer sbuf;
246     msgpack_packer pk;
247     msgpack_zone mempool;
248     msgpack_object deserialized;
249 
250     /* msgpack::sbuffer is a simple buffer implementation. */
251     msgpack_sbuffer_init(&sbuf);
252 
253     /* serialize values into the buffer using msgpack_sbuffer_write callback function. */
254     msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
255 
256     msgpack_pack_nil(&pk);
257 
258     /* deserialize the buffer into msgpack_object instance. */
259     /* deserialized object is valid during the msgpack_zone instance alive. */
260     msgpack_zone_init(&mempool, 2048);
261     msgpack_unpack(sbuf.data, sbuf.size, NULL, &mempool, &deserialized);
262 
263     TEST_CHECK((msgpack2avro(&aobject, &deserialized) == FLB_TRUE));
264 
265     msgpack_zone_destroy(&mempool);
266     msgpack_sbuffer_destroy(&sbuf);
267 }
268 const char  JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION[] =
269 "{\"type\":\"record\",\
270   \"name\":\"Map001\",\
271   \"fields\":[\
272      {\"name\": \"key001\", \"type\": \"int\"},\
273      {\"name\": \"key002\", \"type\": \"float\"},\
274      {\"name\": \"key003\", \"type\": \"string\"},\
275               { \
276                 \"name\": \"status\", \
277                 \"default\": null, \
278                 \"type\": [\"null\", \"string\"] \
279               }, \
280      {\"name\": \"key004\", \"type\":\
281         {\"type\": \"array\", \"items\":\
282              {\"type\": \"map\",\"values\": \"int\"}}}]}";
test_union_type_sanity()283 void test_union_type_sanity()
284 {
285     avro_value_t  aobject;
286     avro_schema_t aschema;
287 
288     msgpack_unpacked msg = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1);
289 
290     msgpack_object_print(stderr, msg.data);
291     flb_msgpack_to_avro(&aobject, &msg.data);
292 
293     size_t totalSize = 0;
294     avro_value_get_size(&aobject, &totalSize);
295     flb_info("totalSize:%zu:\n", totalSize);
296     // this is key001,2,3,4 and the status field which is the union type
297     TEST_CHECK(totalSize == 5);
298 
299     avro_value_t test_value;
300     TEST_CHECK(avro_value_get_by_name(&aobject, "key001", &test_value, NULL) == 0);
301 
302     int val001 = 0;
303     avro_value_get_int(&test_value, &val001);
304     TEST_CHECK(val001 == 123456789);
305 
306     TEST_CHECK(avro_value_get_by_name(&aobject, "key002", &test_value, NULL) == 0);
307 
308     float val002 = 0.0f;
309     // for some reason its rounding to this value
310     float val002_actual = 0.999888f;
311     avro_value_get_float(&test_value, &val002);
312     char str1[80];
313     char str2[80];
314     sprintf(str1, "%f", val002);
315     sprintf(str2, "%f", val002_actual);
316     flb_info("val002:%s:\n", str1);
317     flb_info("val002_actual:%s:\n", str2);
318     TEST_CHECK((strcmp(str1, str2) == 0));
319 
320     TEST_CHECK(avro_value_get_by_name(&aobject, "key003", &test_value, NULL) == 0);
321     const char *val003 = NULL;
322     size_t val003_size = 0;
323     TEST_CHECK(avro_value_get_string(&test_value, &val003, &val003_size) == 0);
324     flb_info("val003_size:%zu:\n", val003_size);
325     TEST_CHECK(val003[val003_size] == '\0');
326 
327     TEST_CHECK((strcmp(val003, "abcdefghijk") == 0));
328     // avro_value_get_by_name returns ths string length plus the NUL
329     TEST_CHECK(val003_size == 12);
330 
331     TEST_CHECK(avro_value_get_by_name(&aobject, "key004", &test_value, NULL) == 0);
332 
333     size_t asize = 0;
334     avro_value_get_size(&test_value, &asize);
335     flb_info("asize:%zu:\n", asize);
336 
337     TEST_CHECK(asize == 2);
338 
339     TEST_CHECK(avro_value_get_by_name(&aobject, "status", &test_value, NULL) == 0);
340 
341     avro_value_decref(&aobject);
342     avro_schema_decref(aschema);
343     msgpack_unpacked_destroy(&msg);
344 }
345 
test_union_type_branches()346 void test_union_type_branches()
347 {
348     avro_value_t  aobject;
349     avro_schema_t aschema;
350 
351     msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1);
352 
353     flb_msgpack_to_avro(&aobject, &mp.data);
354 
355     avro_value_t test_value;
356     TEST_CHECK(avro_value_get_by_name(&aobject, "status", &test_value, NULL) == 0);
357     TEST_CHECK(avro_value_get_type(&test_value) == AVRO_UNION);
358 
359     int discriminant = 0;
360     TEST_CHECK(avro_value_get_discriminant(&test_value, &discriminant) == 0);
361     TEST_CHECK(discriminant == -1);
362 
363     avro_value_t  branch;
364     TEST_CHECK(avro_value_get_current_branch(&test_value, &branch) != 0);
365 
366     TEST_CHECK(avro_value_set_branch(&test_value, 0, &branch) == 0);
367     TEST_CHECK(avro_value_set_null(&branch) == 0);
368 
369     TEST_CHECK(avro_value_get_null(&branch) == 0);
370 
371     avro_value_decref(&aobject);
372     avro_schema_decref(aschema);
373     msgpack_unpacked_destroy(&mp);
374 }
375 TEST_LIST = {
376     /* Avro */
377     { "msgpack_to_avro_basic", test_unpack_to_avro},
378     { "test_parse_reordered_schema", test_parse_reordered_schema},
379     { "test_union_type_sanity", test_union_type_sanity},
380     { "test_union_type_branches", test_union_type_branches},
381     { 0 }
382 };
383