1 #include "lib/mlrutil.h"
2 #include "lib/mlrregex.h"
3 #include "lib/string_builder.h"
4 #include "containers/lhmss.h"
5 #include "containers/sllv.h"
6 #include "containers/lhmslv.h"
7 #include "containers/mixutil.h"
8 #include "mapping/mappers.h"
9 #include "cli/argparse.h"
10
11 // ================================================================
12 // WIDE:
13 // time X Y Z
14 // 1 2009-01-01 0.65473572 2.4520609 -1.46570942
15 // 2 2009-01-02 -0.89248112 0.2154713 -2.05357735
16 // 3 2009-01-03 0.98012375 1.3179287 4.64248357
17 // 4 2009-01-04 0.35397376 3.3765645 -0.25237774
18 // 5 2009-01-05 2.19357813 1.3477511 0.09719105
19
20 // LONG:
21 // time item price
22 // 1 2009-01-01 X 0.65473572
23 // 2 2009-01-02 X -0.89248112
24 // 3 2009-01-03 X 0.98012375
25 // 4 2009-01-04 X 0.35397376
26 // 5 2009-01-05 X 2.19357813
27 // 6 2009-01-01 Y 2.45206093
28 // 7 2009-01-02 Y 0.21547134
29 // 8 2009-01-03 Y 1.31792866
30 // 9 2009-01-04 Y 3.37656453
31 // 10 2009-01-05 Y 1.34775108
32 // 11 2009-01-01 Z -1.46570942
33 // 12 2009-01-02 Z -2.05357735
34 // 13 2009-01-03 Z 4.64248357
35 // 14 2009-01-04 Z -0.25237774
36 // 15 2009-01-05 Z 0.09719105
37
38 // ================================================================
39 typedef struct _mapper_reshape_state_t {
40 ap_state_t* pargp;
41
42 // for wide-to-long:
43 slls_t* input_field_names;
44 sllv_t* input_field_regexes;
45 char* output_key_field_name;
46 char* output_value_field_name;
47
48 // for long-to-wide:
49 char* split_out_key_field_name;
50 char* split_out_value_field_name;
51 lhmslv_t* other_keys_to_other_values_to_buckets;
52 } mapper_reshape_state_t;
53
54 typedef struct _reshape_bucket_t {
55 lrec_t* prepresentative;
56 lhmss_t* pairs;
57 } reshape_bucket_t;
58
59 static void mapper_reshape_usage(FILE* o, char* argv0, char* verb);
60 static mapper_t* mapper_reshape_parse_cli(int* pargi, int argc, char** argv,
61 cli_reader_opts_t* _, cli_writer_opts_t* __);
62 static mapper_t* mapper_reshape_alloc(
63 ap_state_t* pargp,
64 slls_t* input_field_names,
65 slls_t* input_field_regex_strings,
66 char* output_key_field_name,
67 char* output_value_field_name,
68 char* split_out_key_field_name,
69 char* split_out_value_field_name);
70 static void mapper_reshape_free(mapper_t* pmapper, context_t* _);
71 static sllv_t* mapper_reshape_wide_to_long_no_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate);
72 static sllv_t* mapper_reshape_wide_to_long_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate);
73 static sllv_t* mapper_reshape_long_to_wide_process(lrec_t* pinrec, context_t* pctx, void* pvstate);
74
75 static reshape_bucket_t* reshape_bucket_alloc(lrec_t* prepresentative);
76 static void reshape_bucket_free(reshape_bucket_t* pbucket);
77
78 // ----------------------------------------------------------------
79 mapper_setup_t mapper_reshape_setup = {
80 .verb = "reshape",
81 .pusage_func = mapper_reshape_usage,
82 .pparse_func = mapper_reshape_parse_cli,
83 .ignores_input = FALSE,
84 };
85
86 // ----------------------------------------------------------------
mapper_reshape_usage(FILE * o,char * argv0,char * verb)87 static void mapper_reshape_usage(FILE* o, char* argv0, char* verb) {
88 fprintf(o, "Usage: %s %s [options]\n", argv0, verb);
89 fprintf(o, "Wide-to-long options:\n");
90 fprintf(o, " -i {input field names} -o {key-field name,value-field name}\n");
91 fprintf(o, " -r {input field regexes} -o {key-field name,value-field name}\n");
92 fprintf(o, " These pivot/reshape the input data such that the input fields are removed\n");
93 fprintf(o, " and separate records are emitted for each key/value pair.\n");
94 fprintf(o, " Note: this works with tail -f and produces output records for each input\n");
95 fprintf(o, " record seen.\n");
96 fprintf(o, "Long-to-wide options:\n");
97 fprintf(o, " -s {key-field name,value-field name}\n");
98 fprintf(o, " These pivot/reshape the input data to undo the wide-to-long operation.\n");
99 fprintf(o, " Note: this does not work with tail -f; it produces output records only after\n");
100 fprintf(o, " all input records have been read.\n");
101 fprintf(o, "\n");
102 fprintf(o, "Examples:\n");
103 fprintf(o, "\n");
104 fprintf(o, " Input file \"wide.txt\":\n");
105 fprintf(o, " time X Y\n");
106 fprintf(o, " 2009-01-01 0.65473572 2.4520609\n");
107 fprintf(o, " 2009-01-02 -0.89248112 0.2154713\n");
108 fprintf(o, " 2009-01-03 0.98012375 1.3179287\n");
109 fprintf(o, "\n");
110 fprintf(o, " %s --pprint %s -i X,Y -o item,value wide.txt\n", argv0, verb);
111 fprintf(o, " time item value\n");
112 fprintf(o, " 2009-01-01 X 0.65473572\n");
113 fprintf(o, " 2009-01-01 Y 2.4520609\n");
114 fprintf(o, " 2009-01-02 X -0.89248112\n");
115 fprintf(o, " 2009-01-02 Y 0.2154713\n");
116 fprintf(o, " 2009-01-03 X 0.98012375\n");
117 fprintf(o, " 2009-01-03 Y 1.3179287\n");
118 fprintf(o, "\n");
119 fprintf(o, " %s --pprint %s -r '[A-Z]' -o item,value wide.txt\n", argv0, verb);
120 fprintf(o, " time item value\n");
121 fprintf(o, " 2009-01-01 X 0.65473572\n");
122 fprintf(o, " 2009-01-01 Y 2.4520609\n");
123 fprintf(o, " 2009-01-02 X -0.89248112\n");
124 fprintf(o, " 2009-01-02 Y 0.2154713\n");
125 fprintf(o, " 2009-01-03 X 0.98012375\n");
126 fprintf(o, " 2009-01-03 Y 1.3179287\n");
127 fprintf(o, "\n");
128 fprintf(o, " Input file \"long.txt\":\n");
129 fprintf(o, " time item value\n");
130 fprintf(o, " 2009-01-01 X 0.65473572\n");
131 fprintf(o, " 2009-01-01 Y 2.4520609\n");
132 fprintf(o, " 2009-01-02 X -0.89248112\n");
133 fprintf(o, " 2009-01-02 Y 0.2154713\n");
134 fprintf(o, " 2009-01-03 X 0.98012375\n");
135 fprintf(o, " 2009-01-03 Y 1.3179287\n");
136 fprintf(o, "\n");
137 fprintf(o, " %s --pprint %s -s item,value long.txt\n", argv0, verb);
138 fprintf(o, " time X Y\n");
139 fprintf(o, " 2009-01-01 0.65473572 2.4520609\n");
140 fprintf(o, " 2009-01-02 -0.89248112 0.2154713\n");
141 fprintf(o, " 2009-01-03 0.98012375 1.3179287\n");
142 fprintf(o, "See also %s nest.\n", argv0);
143 }
144
mapper_reshape_parse_cli(int * pargi,int argc,char ** argv,cli_reader_opts_t * _,cli_writer_opts_t * __)145 static mapper_t* mapper_reshape_parse_cli(int* pargi, int argc, char** argv,
146 cli_reader_opts_t* _, cli_writer_opts_t* __)
147 {
148 slls_t* input_field_names = NULL;
149 slls_t* input_field_regex_strings = NULL;
150 slls_t* output_field_names = NULL;
151 slls_t* split_out_field_names = NULL;
152
153 char* verb = argv[(*pargi)++];
154
155 ap_state_t* pstate = ap_alloc();
156 ap_define_string_list_flag(pstate, "-i", &input_field_names);
157 ap_define_string_list_flag(pstate, "-r", &input_field_regex_strings);
158 ap_define_string_list_flag(pstate, "-o", &output_field_names);
159 ap_define_string_list_flag(pstate, "-s", &split_out_field_names);
160
161 if (!ap_parse(pstate, verb, pargi, argc, argv)) {
162 mapper_reshape_usage(stderr, argv[0], verb);
163 return NULL;
164 }
165
166 char* output_key_field_name = NULL;
167 char* output_value_field_name = NULL;
168 char* split_out_key_field_name = NULL;
169 char* split_out_value_field_name = NULL;
170
171 if (split_out_field_names == NULL) {
172 // wide to long
173 if (input_field_names == NULL && input_field_regex_strings == NULL) {
174 mapper_reshape_usage(stderr, argv[0], verb);
175 return NULL;
176 }
177
178 if (output_field_names == NULL) {
179 mapper_reshape_usage(stderr, argv[0], verb);
180 return NULL;
181 }
182 if (output_field_names->length != 2) {
183 mapper_reshape_usage(stderr, argv[0], verb);
184 return NULL;
185 }
186 output_key_field_name = mlr_strdup_or_die(output_field_names->phead->value);
187 output_value_field_name = mlr_strdup_or_die(output_field_names->phead->pnext->value);
188 slls_free(output_field_names);
189
190 } else {
191 // long to wide
192 if (split_out_field_names->length != 2) {
193 mapper_reshape_usage(stderr, argv[0], verb);
194 return NULL;
195 }
196 split_out_key_field_name = mlr_strdup_or_die(split_out_field_names->phead->value);
197 split_out_value_field_name = mlr_strdup_or_die(split_out_field_names->phead->pnext->value);
198 slls_free(split_out_field_names);
199 }
200
201 return mapper_reshape_alloc(pstate, input_field_names, input_field_regex_strings,
202 output_key_field_name, output_value_field_name,
203 split_out_key_field_name, split_out_value_field_name);
204 }
205
206 // ----------------------------------------------------------------
mapper_reshape_alloc(ap_state_t * pargp,slls_t * input_field_names,slls_t * input_field_regex_strings,char * output_key_field_name,char * output_value_field_name,char * split_out_key_field_name,char * split_out_value_field_name)207 static mapper_t* mapper_reshape_alloc(
208 ap_state_t* pargp,
209 slls_t* input_field_names,
210 slls_t* input_field_regex_strings,
211 char* output_key_field_name,
212 char* output_value_field_name,
213 char* split_out_key_field_name,
214 char* split_out_value_field_name)
215 {
216 mapper_t* pmapper = mlr_malloc_or_die(sizeof(mapper_t));
217
218 mapper_reshape_state_t* pstate = mlr_malloc_or_die(sizeof(mapper_reshape_state_t));
219
220 pstate->pargp = pargp;
221 pstate->input_field_names = input_field_names;
222 pstate->output_key_field_name = output_key_field_name;
223 pstate->output_value_field_name = output_value_field_name;
224 pstate->split_out_key_field_name = split_out_key_field_name;
225 pstate->split_out_value_field_name = split_out_value_field_name;
226
227 if (input_field_regex_strings == NULL) {
228 pstate->input_field_regexes = NULL;
229 } else {
230 pstate->input_field_regexes = sllv_alloc();
231 for (sllse_t* pe = input_field_regex_strings->phead; pe != NULL; pe = pe->pnext) {
232 regex_t* pregex = mlr_malloc_or_die(sizeof(regex_t));
233 regcomp_or_die(pregex, pe->value, 0);
234 sllv_append(pstate->input_field_regexes, pregex);
235 }
236 slls_free(input_field_regex_strings);
237 }
238
239 if (split_out_key_field_name == NULL) {
240 if (pstate->input_field_regexes == NULL)
241 pmapper->pprocess_func = mapper_reshape_wide_to_long_no_regex_process;
242 else
243 pmapper->pprocess_func = mapper_reshape_wide_to_long_regex_process;
244 pstate->other_keys_to_other_values_to_buckets = NULL;
245 } else {
246 pmapper->pprocess_func = mapper_reshape_long_to_wide_process;
247 pstate->other_keys_to_other_values_to_buckets = lhmslv_alloc();
248 }
249
250 pmapper->pfree_func = mapper_reshape_free;
251
252 pmapper->pvstate = (void*)pstate;
253 return pmapper;
254 }
255
mapper_reshape_free(mapper_t * pmapper,context_t * _)256 static void mapper_reshape_free(mapper_t* pmapper, context_t* _) {
257 mapper_reshape_state_t* pstate = pmapper->pvstate;
258
259 slls_free(pstate->input_field_names);
260
261 free(pstate->output_key_field_name);
262 free(pstate->output_value_field_name);
263
264 free(pstate->split_out_key_field_name);
265 free(pstate->split_out_value_field_name);
266
267 if (pstate->input_field_regexes != NULL) {
268 for (sllve_t* pe = pstate->input_field_regexes->phead; pe != NULL; pe = pe->pnext) {
269 regex_t* pregex = pe->pvvalue;
270 regfree(pregex);
271 free(pregex);
272 }
273 sllv_free(pstate->input_field_regexes);
274 }
275
276 if (pstate->other_keys_to_other_values_to_buckets != NULL) {
277 for (lhmslve_t* pe = pstate->other_keys_to_other_values_to_buckets->phead; pe != NULL; pe = pe->pnext) {
278 lhmslv_t* other_values_to_buckets = pe->pvvalue;
279 for (lhmslve_t* pf = other_values_to_buckets->phead; pf != NULL; pf = pf->pnext) {
280 reshape_bucket_t* pbucket = pf->pvvalue;
281 reshape_bucket_free(pbucket);
282 }
283 lhmslv_free(other_values_to_buckets);
284 }
285 lhmslv_free(pstate->other_keys_to_other_values_to_buckets);
286 }
287
288 ap_free(pstate->pargp);
289 free(pstate);
290 free(pmapper);
291 }
292
293 // ----------------------------------------------------------------
mapper_reshape_wide_to_long_no_regex_process(lrec_t * pinrec,context_t * pctx,void * pvstate)294 static sllv_t* mapper_reshape_wide_to_long_no_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate) {
295 if (pinrec == NULL) // End of input stream
296 return sllv_single(NULL);
297 mapper_reshape_state_t* pstate = (mapper_reshape_state_t*)pvstate;
298
299 sllv_t* poutrecs = sllv_alloc();
300 lhmss_t* pairs = lhmss_alloc();
301 char* pfree_flags = NULL;
302 for (sllse_t* pe = pstate->input_field_names->phead; pe != NULL; pe = pe->pnext) {
303 char* key = pe->value;
304 char* value = lrec_get_pff(pinrec, key, &pfree_flags);
305 if (value != NULL) {
306 // Ownership-transfer of the about-to-be-freed key-value pairs from lrec to lhmss
307 lhmss_put(pairs, key, value, *pfree_flags);
308 *pfree_flags = NO_FREE;
309 }
310 }
311
312 // Unset the lrec keys after iterating over them, rather than during
313 for (lhmsse_t* pf = pairs->phead; pf != NULL; pf = pf->pnext)
314 lrec_remove(pinrec, pf->key);
315
316 if (pairs->num_occupied == 0) {
317 sllv_append(poutrecs, pinrec);
318 } else {
319 for (lhmsse_t* pf = pairs->phead; pf != NULL; pf = pf->pnext) {
320 lrec_t* poutrec = lrec_copy(pinrec);
321 lrec_put(poutrec, pstate->output_key_field_name, mlr_strdup_or_die(pf->key), FREE_ENTRY_VALUE);
322 lrec_put(poutrec, pstate->output_value_field_name, mlr_strdup_or_die(pf->value), FREE_ENTRY_VALUE);
323 sllv_append(poutrecs, poutrec);
324 }
325 lrec_free(pinrec);
326 }
327
328 lhmss_free(pairs);
329 return poutrecs;
330 }
331
332 // ----------------------------------------------------------------
mapper_reshape_wide_to_long_regex_process(lrec_t * pinrec,context_t * pctx,void * pvstate)333 static sllv_t* mapper_reshape_wide_to_long_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate) {
334 if (pinrec == NULL) // End of input stream
335 return sllv_single(NULL);
336
337 mapper_reshape_state_t* pstate = (mapper_reshape_state_t*)pvstate;
338
339 sllv_t* poutrecs = sllv_alloc();
340 lhmss_t* pairs = lhmss_alloc();
341
342 for (lrece_t* pe = pinrec->phead; pe != NULL; pe = pe->pnext) {
343 for (sllve_t* pf = pstate->input_field_regexes->phead; pf != NULL; pf = pf->pnext) {
344 regex_t* pregex = pf->pvvalue;
345 if (regmatch_or_die(pregex, pe->key, 0, NULL)) {
346 // Ownership-transfer of the about-to-be-freed key-value pairs from lrec to lhmss
347 lhmss_put(pairs, pe->key, pe->value, pe->free_flags);
348 pe->free_flags = NO_FREE;
349 break;
350 }
351 }
352 }
353
354 // Unset the lrec keys after iterating over them, rather than during
355 for (lhmsse_t* pg = pairs->phead; pg != NULL; pg = pg->pnext)
356 lrec_remove(pinrec, pg->key);
357
358 if (pairs->num_occupied == 0) {
359 sllv_append(poutrecs, pinrec);
360 } else {
361 for (lhmsse_t* pf = pairs->phead; pf != NULL; pf = pf->pnext) {
362 lrec_t* poutrec = lrec_copy(pinrec);
363 lrec_put(poutrec, pstate->output_key_field_name, mlr_strdup_or_die(pf->key), FREE_ENTRY_VALUE);
364 lrec_put(poutrec, pstate->output_value_field_name, mlr_strdup_or_die(pf->value), FREE_ENTRY_VALUE);
365 sllv_append(poutrecs, poutrec);
366 }
367 lrec_free(pinrec);
368 }
369
370 lhmss_free(pairs);
371
372 return poutrecs;
373 }
374
375 // ----------------------------------------------------------------
mapper_reshape_long_to_wide_process(lrec_t * pinrec,context_t * pctx,void * pvstate)376 static sllv_t* mapper_reshape_long_to_wide_process(lrec_t* pinrec, context_t* pctx, void* pvstate) {
377 mapper_reshape_state_t* pstate = (mapper_reshape_state_t*)pvstate;
378
379 if (pinrec != NULL) { // Not end of input stream
380 char* split_out_key_field_value = lrec_get(pinrec, pstate->split_out_key_field_name);
381 char* split_out_value_field_value = lrec_get(pinrec, pstate-> split_out_value_field_name);
382 if (split_out_key_field_value == NULL || split_out_value_field_value == NULL)
383 return sllv_single(pinrec);
384 split_out_key_field_value = mlr_strdup_or_die(split_out_key_field_value);
385 split_out_value_field_value = mlr_strdup_or_die(split_out_value_field_value);
386 lrec_remove(pinrec, pstate->split_out_key_field_name);
387 lrec_remove(pinrec, pstate->split_out_value_field_name);
388
389 slls_t* other_keys = mlr_reference_keys_from_record(pinrec);
390 lhmslv_t* other_values_to_buckets = lhmslv_get(pstate->other_keys_to_other_values_to_buckets, other_keys);
391 if (other_values_to_buckets == NULL) {
392 other_values_to_buckets = lhmslv_alloc();
393 lhmslv_put(pstate->other_keys_to_other_values_to_buckets,
394 slls_copy(other_keys), other_values_to_buckets, FREE_ENTRY_KEY);
395 }
396
397 slls_t* other_values = mlr_reference_values_from_record(pinrec);
398 reshape_bucket_t* pbucket = lhmslv_get(other_values_to_buckets, other_values);
399 if (pbucket == NULL) {
400 pbucket = reshape_bucket_alloc(pinrec);
401 lhmslv_put(other_values_to_buckets, slls_copy(other_values), pbucket, FREE_ENTRY_KEY);
402 } else {
403 lrec_free(pinrec);
404 }
405 lhmss_put(pbucket->pairs, split_out_key_field_value, split_out_value_field_value,
406 FREE_ENTRY_KEY|FREE_ENTRY_VALUE);
407
408 slls_free(other_values);
409 slls_free(other_keys);
410
411 return NULL;
412
413 } else { // end of input stream
414 sllv_t* poutrecs = sllv_alloc();
415
416 for (lhmslve_t* pe = pstate->other_keys_to_other_values_to_buckets->phead; pe != NULL; pe = pe->pnext) {
417 lhmslv_t* other_values_to_buckets = pe->pvvalue;
418 for (lhmslve_t* pf = other_values_to_buckets->phead; pf != NULL; pf = pf->pnext) {
419 reshape_bucket_t* pbucket = pf->pvvalue;
420 lrec_t* poutrec = pbucket->prepresentative;
421 pbucket->prepresentative = NULL; // ownership transfer
422 for (lhmsse_t* pg = pbucket->pairs->phead; pg != NULL; pg = pg->pnext) {
423 // Strings in these lrecs are backed by our multi-level hashmaps which aren't freed by our free
424 // method until shutdown time (in particular, after all outrecs are emitted).
425 lrec_put(poutrec, pg->key, pg->value, NO_FREE);
426 }
427 sllv_append(poutrecs, poutrec);
428 }
429 }
430
431 sllv_append(poutrecs, NULL);
432 return poutrecs;
433 }
434 }
435
436 // ----------------------------------------------------------------
reshape_bucket_alloc(lrec_t * prepresentative)437 static reshape_bucket_t* reshape_bucket_alloc(lrec_t* prepresentative) {
438 reshape_bucket_t* pbucket = mlr_malloc_or_die(sizeof(reshape_bucket_t));
439 pbucket->prepresentative = prepresentative;
440 pbucket->pairs = lhmss_alloc();
441 return pbucket;
442 }
reshape_bucket_free(reshape_bucket_t * pbucket)443 static void reshape_bucket_free(reshape_bucket_t* pbucket) {
444 lrec_free(pbucket->prepresentative);
445 lhmss_free(pbucket->pairs);
446 free(pbucket);
447 }
448