1 #include "lib/mlrutil.h"
2 #include "lib/mlrregex.h"
3 #include "lib/string_builder.h"
4 #include "containers/lhmss.h"
5 #include "containers/sllv.h"
6 #include "containers/lhmslv.h"
7 #include "containers/mixutil.h"
8 #include "mapping/mappers.h"
9 #include "cli/argparse.h"
10 
11 // ================================================================
12 // WIDE:
13 //          time           X          Y           Z
14 // 1  2009-01-01  0.65473572  2.4520609 -1.46570942
15 // 2  2009-01-02 -0.89248112  0.2154713 -2.05357735
16 // 3  2009-01-03  0.98012375  1.3179287  4.64248357
17 // 4  2009-01-04  0.35397376  3.3765645 -0.25237774
18 // 5  2009-01-05  2.19357813  1.3477511  0.09719105
19 
20 // LONG:
21 //          time  item       price
22 // 1  2009-01-01     X  0.65473572
23 // 2  2009-01-02     X -0.89248112
24 // 3  2009-01-03     X  0.98012375
25 // 4  2009-01-04     X  0.35397376
26 // 5  2009-01-05     X  2.19357813
27 // 6  2009-01-01     Y  2.45206093
28 // 7  2009-01-02     Y  0.21547134
29 // 8  2009-01-03     Y  1.31792866
30 // 9  2009-01-04     Y  3.37656453
31 // 10 2009-01-05     Y  1.34775108
32 // 11 2009-01-01     Z -1.46570942
33 // 12 2009-01-02     Z -2.05357735
34 // 13 2009-01-03     Z  4.64248357
35 // 14 2009-01-04     Z -0.25237774
36 // 15 2009-01-05     Z  0.09719105
37 
38 // ================================================================
39 typedef struct _mapper_reshape_state_t {
40 	ap_state_t* pargp;
41 
42 	// for wide-to-long:
43 	slls_t* input_field_names;
44 	sllv_t* input_field_regexes;
45 	char* output_key_field_name;
46 	char* output_value_field_name;
47 
48 	// for long-to-wide:
49 	char* split_out_key_field_name;
50 	char* split_out_value_field_name;
51 	lhmslv_t* other_keys_to_other_values_to_buckets;
52 } mapper_reshape_state_t;
53 
54 typedef struct _reshape_bucket_t {
55 	lrec_t* prepresentative;
56 	lhmss_t* pairs;
57 } reshape_bucket_t;
58 
59 static void      mapper_reshape_usage(FILE* o, char* argv0, char* verb);
60 static mapper_t* mapper_reshape_parse_cli(int* pargi, int argc, char** argv,
61 	cli_reader_opts_t* _, cli_writer_opts_t* __);
62 static mapper_t* mapper_reshape_alloc(
63 	ap_state_t* pargp,
64 	slls_t* input_field_names,
65 	slls_t* input_field_regex_strings,
66 	char*   output_key_field_name,
67 	char*   output_value_field_name,
68 	char*   split_out_key_field_name,
69 	char*   split_out_value_field_name);
70 static void      mapper_reshape_free(mapper_t* pmapper, context_t* _);
71 static sllv_t*   mapper_reshape_wide_to_long_no_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate);
72 static sllv_t*   mapper_reshape_wide_to_long_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate);
73 static sllv_t*   mapper_reshape_long_to_wide_process(lrec_t* pinrec, context_t* pctx, void* pvstate);
74 
75 static reshape_bucket_t* reshape_bucket_alloc(lrec_t* prepresentative);
76 static void reshape_bucket_free(reshape_bucket_t* pbucket);
77 
78 // ----------------------------------------------------------------
79 mapper_setup_t mapper_reshape_setup = {
80 	.verb = "reshape",
81 	.pusage_func = mapper_reshape_usage,
82 	.pparse_func = mapper_reshape_parse_cli,
83 	.ignores_input = FALSE,
84 };
85 
86 // ----------------------------------------------------------------
mapper_reshape_usage(FILE * o,char * argv0,char * verb)87 static void mapper_reshape_usage(FILE* o, char* argv0, char* verb) {
88 	fprintf(o, "Usage: %s %s [options]\n", argv0, verb);
89 	fprintf(o, "Wide-to-long options:\n");
90 	fprintf(o, "  -i {input field names}   -o {key-field name,value-field name}\n");
91 	fprintf(o, "  -r {input field regexes} -o {key-field name,value-field name}\n");
92 	fprintf(o, "  These pivot/reshape the input data such that the input fields are removed\n");
93 	fprintf(o, "  and separate records are emitted for each key/value pair.\n");
94 	fprintf(o, "  Note: this works with tail -f and produces output records for each input\n");
95 	fprintf(o, "  record seen.\n");
96 	fprintf(o, "Long-to-wide options:\n");
97 	fprintf(o, "  -s {key-field name,value-field name}\n");
98 	fprintf(o, "  These pivot/reshape the input data to undo the wide-to-long operation.\n");
99 	fprintf(o, "  Note: this does not work with tail -f; it produces output records only after\n");
100 	fprintf(o, "  all input records have been read.\n");
101 	fprintf(o, "\n");
102 	fprintf(o, "Examples:\n");
103 	fprintf(o, "\n");
104 	fprintf(o, "  Input file \"wide.txt\":\n");
105 	fprintf(o, "    time       X           Y\n");
106 	fprintf(o, "    2009-01-01 0.65473572  2.4520609\n");
107 	fprintf(o, "    2009-01-02 -0.89248112 0.2154713\n");
108 	fprintf(o, "    2009-01-03 0.98012375  1.3179287\n");
109 	fprintf(o, "\n");
110 	fprintf(o, "  %s --pprint %s -i X,Y -o item,value wide.txt\n", argv0, verb);
111 	fprintf(o, "    time       item value\n");
112 	fprintf(o, "    2009-01-01 X    0.65473572\n");
113 	fprintf(o, "    2009-01-01 Y    2.4520609\n");
114 	fprintf(o, "    2009-01-02 X    -0.89248112\n");
115 	fprintf(o, "    2009-01-02 Y    0.2154713\n");
116 	fprintf(o, "    2009-01-03 X    0.98012375\n");
117 	fprintf(o, "    2009-01-03 Y    1.3179287\n");
118 	fprintf(o, "\n");
119 	fprintf(o, "  %s --pprint %s -r '[A-Z]' -o item,value wide.txt\n", argv0, verb);
120 	fprintf(o, "    time       item value\n");
121 	fprintf(o, "    2009-01-01 X    0.65473572\n");
122 	fprintf(o, "    2009-01-01 Y    2.4520609\n");
123 	fprintf(o, "    2009-01-02 X    -0.89248112\n");
124 	fprintf(o, "    2009-01-02 Y    0.2154713\n");
125 	fprintf(o, "    2009-01-03 X    0.98012375\n");
126 	fprintf(o, "    2009-01-03 Y    1.3179287\n");
127 	fprintf(o, "\n");
128 	fprintf(o, "  Input file \"long.txt\":\n");
129 	fprintf(o, "    time       item value\n");
130 	fprintf(o, "    2009-01-01 X    0.65473572\n");
131 	fprintf(o, "    2009-01-01 Y    2.4520609\n");
132 	fprintf(o, "    2009-01-02 X    -0.89248112\n");
133 	fprintf(o, "    2009-01-02 Y    0.2154713\n");
134 	fprintf(o, "    2009-01-03 X    0.98012375\n");
135 	fprintf(o, "    2009-01-03 Y    1.3179287\n");
136 	fprintf(o, "\n");
137 	fprintf(o, "  %s --pprint %s -s item,value long.txt\n", argv0, verb);
138 	fprintf(o, "    time       X           Y\n");
139 	fprintf(o, "    2009-01-01 0.65473572  2.4520609\n");
140 	fprintf(o, "    2009-01-02 -0.89248112 0.2154713\n");
141 	fprintf(o, "    2009-01-03 0.98012375  1.3179287\n");
142 	fprintf(o, "See also %s nest.\n", argv0);
143 }
144 
mapper_reshape_parse_cli(int * pargi,int argc,char ** argv,cli_reader_opts_t * _,cli_writer_opts_t * __)145 static mapper_t* mapper_reshape_parse_cli(int* pargi, int argc, char** argv,
146 	cli_reader_opts_t* _, cli_writer_opts_t* __)
147 {
148 	slls_t* input_field_names         = NULL;
149 	slls_t* input_field_regex_strings = NULL;
150 	slls_t* output_field_names        = NULL;
151 	slls_t* split_out_field_names     = NULL;
152 
153 	char* verb = argv[(*pargi)++];
154 
155 	ap_state_t* pstate = ap_alloc();
156 	ap_define_string_list_flag(pstate, "-i", &input_field_names);
157 	ap_define_string_list_flag(pstate, "-r", &input_field_regex_strings);
158 	ap_define_string_list_flag(pstate, "-o", &output_field_names);
159 	ap_define_string_list_flag(pstate, "-s", &split_out_field_names);
160 
161 	if (!ap_parse(pstate, verb, pargi, argc, argv)) {
162 		mapper_reshape_usage(stderr, argv[0], verb);
163 		return NULL;
164 	}
165 
166 	char* output_key_field_name      = NULL;
167 	char* output_value_field_name    = NULL;
168 	char* split_out_key_field_name   = NULL;
169 	char* split_out_value_field_name = NULL;
170 
171 	if (split_out_field_names == NULL) {
172 		// wide to long
173 		if (input_field_names == NULL && input_field_regex_strings == NULL) {
174 			mapper_reshape_usage(stderr, argv[0], verb);
175 			return NULL;
176 		}
177 
178 		if (output_field_names == NULL) {
179 			mapper_reshape_usage(stderr, argv[0], verb);
180 			return NULL;
181 		}
182 		if (output_field_names->length != 2) {
183 			mapper_reshape_usage(stderr, argv[0], verb);
184 			return NULL;
185 		}
186 		output_key_field_name   = mlr_strdup_or_die(output_field_names->phead->value);
187 		output_value_field_name = mlr_strdup_or_die(output_field_names->phead->pnext->value);
188 		slls_free(output_field_names);
189 
190 	} else {
191 		// long to wide
192 		if (split_out_field_names->length != 2) {
193 			mapper_reshape_usage(stderr, argv[0], verb);
194 			return NULL;
195 		}
196 		split_out_key_field_name   = mlr_strdup_or_die(split_out_field_names->phead->value);
197 		split_out_value_field_name = mlr_strdup_or_die(split_out_field_names->phead->pnext->value);
198 		slls_free(split_out_field_names);
199 	}
200 
201 	return mapper_reshape_alloc(pstate, input_field_names, input_field_regex_strings,
202 		output_key_field_name, output_value_field_name,
203 		split_out_key_field_name, split_out_value_field_name);
204 }
205 
206 // ----------------------------------------------------------------
mapper_reshape_alloc(ap_state_t * pargp,slls_t * input_field_names,slls_t * input_field_regex_strings,char * output_key_field_name,char * output_value_field_name,char * split_out_key_field_name,char * split_out_value_field_name)207 static mapper_t* mapper_reshape_alloc(
208 	ap_state_t* pargp,
209 	slls_t* input_field_names,
210 	slls_t* input_field_regex_strings,
211 	char*   output_key_field_name,
212 	char*   output_value_field_name,
213 	char*   split_out_key_field_name,
214 	char*   split_out_value_field_name)
215 {
216 	mapper_t* pmapper = mlr_malloc_or_die(sizeof(mapper_t));
217 
218 	mapper_reshape_state_t* pstate = mlr_malloc_or_die(sizeof(mapper_reshape_state_t));
219 
220 	pstate->pargp                      = pargp;
221 	pstate->input_field_names          = input_field_names;
222 	pstate->output_key_field_name      = output_key_field_name;
223 	pstate->output_value_field_name    = output_value_field_name;
224 	pstate->split_out_key_field_name   = split_out_key_field_name;
225 	pstate->split_out_value_field_name = split_out_value_field_name;
226 
227 	if (input_field_regex_strings == NULL) {
228 		pstate->input_field_regexes = NULL;
229 	} else {
230 		pstate->input_field_regexes = sllv_alloc();
231 		for (sllse_t* pe = input_field_regex_strings->phead; pe != NULL; pe = pe->pnext) {
232 			regex_t* pregex = mlr_malloc_or_die(sizeof(regex_t));
233 			regcomp_or_die(pregex, pe->value, 0);
234 			sllv_append(pstate->input_field_regexes, pregex);
235 		}
236 		slls_free(input_field_regex_strings);
237 	}
238 
239 	if (split_out_key_field_name == NULL) {
240 		if (pstate->input_field_regexes == NULL)
241 			pmapper->pprocess_func = mapper_reshape_wide_to_long_no_regex_process;
242 		else
243 			pmapper->pprocess_func = mapper_reshape_wide_to_long_regex_process;
244 		pstate->other_keys_to_other_values_to_buckets = NULL;
245 	} else {
246 		pmapper->pprocess_func = mapper_reshape_long_to_wide_process;
247 		pstate->other_keys_to_other_values_to_buckets = lhmslv_alloc();
248 	}
249 
250 	pmapper->pfree_func = mapper_reshape_free;
251 
252 	pmapper->pvstate = (void*)pstate;
253 	return pmapper;
254 }
255 
mapper_reshape_free(mapper_t * pmapper,context_t * _)256 static void mapper_reshape_free(mapper_t* pmapper, context_t* _) {
257 	mapper_reshape_state_t* pstate = pmapper->pvstate;
258 
259 	slls_free(pstate->input_field_names);
260 
261 	free(pstate->output_key_field_name);
262 	free(pstate->output_value_field_name);
263 
264 	free(pstate->split_out_key_field_name);
265 	free(pstate->split_out_value_field_name);
266 
267 	if (pstate->input_field_regexes != NULL) {
268 		for (sllve_t* pe = pstate->input_field_regexes->phead; pe != NULL; pe = pe->pnext) {
269 			regex_t* pregex = pe->pvvalue;
270 			regfree(pregex);
271 			free(pregex);
272 		}
273 		sllv_free(pstate->input_field_regexes);
274 	}
275 
276 	if (pstate->other_keys_to_other_values_to_buckets != NULL) {
277 		for (lhmslve_t* pe = pstate->other_keys_to_other_values_to_buckets->phead; pe != NULL; pe = pe->pnext) {
278 			lhmslv_t* other_values_to_buckets = pe->pvvalue;
279 			for (lhmslve_t* pf = other_values_to_buckets->phead; pf != NULL; pf = pf->pnext) {
280 				reshape_bucket_t* pbucket = pf->pvvalue;
281 				reshape_bucket_free(pbucket);
282 			}
283 			lhmslv_free(other_values_to_buckets);
284 		}
285 		lhmslv_free(pstate->other_keys_to_other_values_to_buckets);
286 	}
287 
288 	ap_free(pstate->pargp);
289 	free(pstate);
290 	free(pmapper);
291 }
292 
293 // ----------------------------------------------------------------
mapper_reshape_wide_to_long_no_regex_process(lrec_t * pinrec,context_t * pctx,void * pvstate)294 static sllv_t* mapper_reshape_wide_to_long_no_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate) {
295 	if (pinrec == NULL) // End of input stream
296 		return sllv_single(NULL);
297 	mapper_reshape_state_t* pstate = (mapper_reshape_state_t*)pvstate;
298 
299 	sllv_t* poutrecs = sllv_alloc();
300 	lhmss_t* pairs = lhmss_alloc();
301 	char* pfree_flags = NULL;
302 	for (sllse_t* pe = pstate->input_field_names->phead; pe != NULL; pe = pe->pnext) {
303 		char* key = pe->value;
304 		char* value = lrec_get_pff(pinrec, key, &pfree_flags);
305 		if (value != NULL) {
306 			// Ownership-transfer of the about-to-be-freed key-value pairs from lrec to lhmss
307 			lhmss_put(pairs, key, value, *pfree_flags);
308 			*pfree_flags = NO_FREE;
309 		}
310 	}
311 
312 	// Unset the lrec keys after iterating over them, rather than during
313 	for (lhmsse_t* pf = pairs->phead; pf != NULL; pf = pf->pnext)
314 		lrec_remove(pinrec, pf->key);
315 
316 	if (pairs->num_occupied == 0) {
317 		sllv_append(poutrecs, pinrec);
318 	} else {
319 		for (lhmsse_t* pf = pairs->phead; pf != NULL; pf = pf->pnext) {
320 			lrec_t* poutrec = lrec_copy(pinrec);
321 			lrec_put(poutrec, pstate->output_key_field_name, mlr_strdup_or_die(pf->key), FREE_ENTRY_VALUE);
322 			lrec_put(poutrec, pstate->output_value_field_name, mlr_strdup_or_die(pf->value), FREE_ENTRY_VALUE);
323 			sllv_append(poutrecs, poutrec);
324 		}
325 		lrec_free(pinrec);
326 	}
327 
328 	lhmss_free(pairs);
329 	return poutrecs;
330 }
331 
332 // ----------------------------------------------------------------
mapper_reshape_wide_to_long_regex_process(lrec_t * pinrec,context_t * pctx,void * pvstate)333 static sllv_t* mapper_reshape_wide_to_long_regex_process(lrec_t* pinrec, context_t* pctx, void* pvstate) {
334 	if (pinrec == NULL) // End of input stream
335 		return sllv_single(NULL);
336 
337 	mapper_reshape_state_t* pstate = (mapper_reshape_state_t*)pvstate;
338 
339 	sllv_t* poutrecs = sllv_alloc();
340 	lhmss_t* pairs = lhmss_alloc();
341 
342 	for (lrece_t* pe = pinrec->phead; pe != NULL; pe = pe->pnext) {
343 		for (sllve_t* pf = pstate->input_field_regexes->phead; pf != NULL; pf = pf->pnext) {
344 			regex_t* pregex = pf->pvvalue;
345 			if (regmatch_or_die(pregex, pe->key, 0, NULL)) {
346 				// Ownership-transfer of the about-to-be-freed key-value pairs from lrec to lhmss
347 				lhmss_put(pairs, pe->key, pe->value, pe->free_flags);
348 				pe->free_flags = NO_FREE;
349 				break;
350 			}
351 		}
352 	}
353 
354 	// Unset the lrec keys after iterating over them, rather than during
355 	for (lhmsse_t* pg = pairs->phead; pg != NULL; pg = pg->pnext)
356 		lrec_remove(pinrec, pg->key);
357 
358 	if (pairs->num_occupied == 0) {
359 		sllv_append(poutrecs, pinrec);
360 	} else {
361 		for (lhmsse_t* pf = pairs->phead; pf != NULL; pf = pf->pnext) {
362 			lrec_t* poutrec = lrec_copy(pinrec);
363 			lrec_put(poutrec, pstate->output_key_field_name, mlr_strdup_or_die(pf->key), FREE_ENTRY_VALUE);
364 			lrec_put(poutrec, pstate->output_value_field_name, mlr_strdup_or_die(pf->value), FREE_ENTRY_VALUE);
365 			sllv_append(poutrecs, poutrec);
366 		}
367 		lrec_free(pinrec);
368 	}
369 
370 	lhmss_free(pairs);
371 
372 	return poutrecs;
373 }
374 
375 // ----------------------------------------------------------------
mapper_reshape_long_to_wide_process(lrec_t * pinrec,context_t * pctx,void * pvstate)376 static sllv_t* mapper_reshape_long_to_wide_process(lrec_t* pinrec, context_t* pctx, void* pvstate) {
377 	mapper_reshape_state_t* pstate = (mapper_reshape_state_t*)pvstate;
378 
379 	if (pinrec != NULL) { // Not end of input stream
380 		char* split_out_key_field_value   = lrec_get(pinrec, pstate->split_out_key_field_name);
381 		char* split_out_value_field_value = lrec_get(pinrec, pstate-> split_out_value_field_name);
382 		if (split_out_key_field_value == NULL || split_out_value_field_value == NULL)
383 			return sllv_single(pinrec);
384 		split_out_key_field_value   = mlr_strdup_or_die(split_out_key_field_value);
385 		split_out_value_field_value = mlr_strdup_or_die(split_out_value_field_value);
386 		lrec_remove(pinrec, pstate->split_out_key_field_name);
387 		lrec_remove(pinrec, pstate->split_out_value_field_name);
388 
389 		slls_t* other_keys = mlr_reference_keys_from_record(pinrec);
390 		lhmslv_t* other_values_to_buckets = lhmslv_get(pstate->other_keys_to_other_values_to_buckets, other_keys);
391 		if (other_values_to_buckets == NULL) {
392 			other_values_to_buckets = lhmslv_alloc();
393 			lhmslv_put(pstate->other_keys_to_other_values_to_buckets,
394 				slls_copy(other_keys), other_values_to_buckets, FREE_ENTRY_KEY);
395 		}
396 
397 		slls_t* other_values = mlr_reference_values_from_record(pinrec);
398 		reshape_bucket_t* pbucket = lhmslv_get(other_values_to_buckets, other_values);
399 		if (pbucket == NULL) {
400 			pbucket = reshape_bucket_alloc(pinrec);
401 			lhmslv_put(other_values_to_buckets, slls_copy(other_values), pbucket, FREE_ENTRY_KEY);
402 		} else {
403 			lrec_free(pinrec);
404 		}
405 		lhmss_put(pbucket->pairs, split_out_key_field_value, split_out_value_field_value,
406 			FREE_ENTRY_KEY|FREE_ENTRY_VALUE);
407 
408 		slls_free(other_values);
409 		slls_free(other_keys);
410 
411 		return NULL;
412 
413 	} else { // end of input stream
414 		sllv_t* poutrecs = sllv_alloc();
415 
416 		for (lhmslve_t* pe = pstate->other_keys_to_other_values_to_buckets->phead; pe != NULL; pe = pe->pnext) {
417 			lhmslv_t* other_values_to_buckets = pe->pvvalue;
418 			for (lhmslve_t* pf = other_values_to_buckets->phead; pf != NULL; pf = pf->pnext) {
419 				reshape_bucket_t* pbucket = pf->pvvalue;
420 				lrec_t* poutrec = pbucket->prepresentative;
421 				pbucket->prepresentative = NULL; // ownership transfer
422 				for (lhmsse_t* pg = pbucket->pairs->phead; pg != NULL; pg = pg->pnext) {
423 					// Strings in these lrecs are backed by our multi-level hashmaps which aren't freed by our free
424 					// method until shutdown time (in particular, after all outrecs are emitted).
425 					lrec_put(poutrec, pg->key, pg->value, NO_FREE);
426 				}
427 				sllv_append(poutrecs, poutrec);
428 			}
429 		}
430 
431 		sllv_append(poutrecs, NULL);
432 		return poutrecs;
433 	}
434 }
435 
436 // ----------------------------------------------------------------
reshape_bucket_alloc(lrec_t * prepresentative)437 static reshape_bucket_t* reshape_bucket_alloc(lrec_t* prepresentative) {
438 	reshape_bucket_t* pbucket = mlr_malloc_or_die(sizeof(reshape_bucket_t));
439 	pbucket->prepresentative = prepresentative;
440 	pbucket->pairs = lhmss_alloc();
441 	return pbucket;
442 }
reshape_bucket_free(reshape_bucket_t * pbucket)443 static void reshape_bucket_free(reshape_bucket_t* pbucket) {
444 	lrec_free(pbucket->prepresentative);
445 	lhmss_free(pbucket->pairs);
446 	free(pbucket);
447 }
448