1 #include "lib/mlr_globals.h"
2 #include "lib/mlrutil.h"
3 #include "lib/mlrregex.h"
4 #include "lib/string_builder.h"
5 #include "containers/lhmss.h"
6 #include "containers/sllv.h"
7 #include "containers/lhmslv.h"
8 #include "containers/mixutil.h"
9 #include "mapping/mappers.h"
10 #include "cli/argparse.h"
11 
12 // ================================================================
13 #define SB_ALLOC_LENGTH 128
14 
15 typedef struct _mapper_nest_state_t {
16 	ap_state_t* pargp;
17 
18 	char* field_name;
19 	char* nested_fs;
20 	char* nested_ps;
21 	int   nested_ps_length;
22 
23 	lhmslv_t* other_keys_to_other_values_to_buckets;
24 	string_builder_t* psb;
25 	regex_t regex;
26 } mapper_nest_state_t;
27 
28 typedef struct _nest_bucket_t {
29 	lrec_t*  prepresentative;
30 	sllv_t* pairs;
31 } nest_bucket_t;
32 
33 static void      mapper_nest_usage(FILE* o, char* argv0, char* verb);
34 static mapper_t* mapper_nest_parse_cli(int* pargi, int argc, char** argv,
35 	cli_reader_opts_t* _, cli_writer_opts_t* __);
36 static mapper_t* mapper_nest_alloc(ap_state_t* pargp, char* argv0,
37 	char* field_name, char* nested_fs, char* nested_ps,
38 	int do_explode, int do_pairs, int do_across_fields);
39 static void    mapper_nest_free(mapper_t* pmapper, context_t* _);
40 
41 static sllv_t* mapper_nest_explode_values_across_fields  (lrec_t* pinrec, context_t* pctx, void* pvstate);
42 static sllv_t* mapper_nest_implode_values_across_fields  (lrec_t* pinrec, context_t* pctx, void* pvstate);
43 static sllv_t* mapper_nest_explode_values_across_records (lrec_t* pinrec, context_t* pctx, void* pvstate);
44 static sllv_t* mapper_nest_implode_values_across_records (lrec_t* pinrec, context_t* pctx, void* pvstate);
45 static sllv_t* mapper_nest_explode_pairs_across_fields   (lrec_t* pinrec, context_t* pctx, void* pvstate);
46 static sllv_t* mapper_nest_explode_pairs_across_records  (lrec_t* pinrec, context_t* pctx, void* pvstate);
47 
48 static nest_bucket_t* nest_bucket_alloc(lrec_t* prepresentative);
49 static void nest_bucket_free(nest_bucket_t* pbucket);
50 
51 // ----------------------------------------------------------------
52 mapper_setup_t mapper_nest_setup = {
53 	.verb = "nest",
54 	.pusage_func = mapper_nest_usage,
55 	.pparse_func = mapper_nest_parse_cli,
56 	.ignores_input = FALSE,
57 };
58 
59 // ----------------------------------------------------------------
mapper_nest_usage(FILE * o,char * argv0,char * verb)60 static void mapper_nest_usage(FILE* o, char* argv0, char* verb) {
61 	fprintf(o, "Usage: %s %s [options]\n", argv0, verb);
62 	fprintf(o, "Explodes specified field values into separate fields/records, or reverses this.\n");
63 	fprintf(o, "Options:\n");
64 	fprintf(o, "  --explode,--implode   One is required.\n");
65 	fprintf(o, "  --values,--pairs      One is required.\n");
66 	fprintf(o, "  --across-records,--across-fields One is required.\n");
67 	fprintf(o, "  -f {field name}       Required.\n");
68 	fprintf(o, "  --nested-fs {string}  Defaults to \";\". Field separator for nested values.\n");
69 	fprintf(o, "  --nested-ps {string}  Defaults to \":\". Pair separator for nested key-value pairs.\n");
70 	fprintf(o, "  --evar {string}       Shorthand for --explode --values ---across-records --nested-fs {string}\n");
71 	fprintf(o, "  --ivar {string}       Shorthand for --implode --values ---across-records --nested-fs {string}\n");
72 	fprintf(o, "Please use \"%s --usage-separator-options\" for information on specifying separators.\n",
73 		argv0);
74 
75 	fprintf(o, "\n");
76 	fprintf(o, "Examples:\n");
77 
78 	fprintf(o, "\n");
79 	fprintf(o, "  %s %s --explode --values --across-records -f x\n", argv0, verb);
80 	fprintf(o, "  with input record \"x=a;b;c,y=d\" produces output records\n");
81 	fprintf(o, "    \"x=a,y=d\"\n");
82 	fprintf(o, "    \"x=b,y=d\"\n");
83 	fprintf(o, "    \"x=c,y=d\"\n");
84 	fprintf(o, "  Use --implode to do the reverse.\n");
85 
86 	fprintf(o, "\n");
87 	fprintf(o, "  %s %s --explode --values --across-fields -f x\n", argv0, verb);
88 	fprintf(o, "  with input record \"x=a;b;c,y=d\" produces output records\n");
89 	fprintf(o, "    \"x_1=a,x_2=b,x_3=c,y=d\"\n");
90 	fprintf(o, "  Use --implode to do the reverse.\n");
91 
92 	fprintf(o, "\n");
93 	fprintf(o, "  %s %s --explode --pairs --across-records -f x\n", argv0, verb);
94 	fprintf(o, "  with input record \"x=a:1;b:2;c:3,y=d\" produces output records\n");
95 	fprintf(o, "    \"a=1,y=d\"\n");
96 	fprintf(o, "    \"b=2,y=d\"\n");
97 	fprintf(o, "    \"c=3,y=d\"\n");
98 
99 	fprintf(o, "\n");
100 	fprintf(o, "  %s %s --explode --pairs --across-fields -f x\n", argv0, verb);
101 	fprintf(o, "  with input record \"x=a:1;b:2;c:3,y=d\" produces output records\n");
102 	fprintf(o, "    \"a=1,b=2,c=3,y=d\"\n");
103 
104 	fprintf(o, "\n");
105 	fprintf(o, "Notes:\n");
106 	fprintf(o, "* With --pairs, --implode doesn't make sense since the original field name has\n");
107 	fprintf(o, "  been lost.\n");
108 	fprintf(o, "* The combination \"--implode --values --across-records\" is non-streaming:\n");
109 	fprintf(o, "  no output records are produced until all input records have been read. In\n");
110 	fprintf(o, "  particular, this means it won't work in tail -f contexts. But all other flag\n");
111 	fprintf(o, "  combinations result in streaming (tail -f friendly) data processing.\n");
112 	fprintf(o, "* It's up to you to ensure that the nested-fs is distinct from your data's IFS:\n");
113 	fprintf(o, "  e.g. by default the former is semicolon and the latter is comma.\n");
114 	fprintf(o, "See also %s reshape.\n", argv0);
115 }
116 
mapper_nest_parse_cli(int * pargi,int argc,char ** argv,cli_reader_opts_t * _,cli_writer_opts_t * __)117 static mapper_t* mapper_nest_parse_cli(int* pargi, int argc, char** argv,
118 	cli_reader_opts_t* _, cli_writer_opts_t* __)
119 {
120 	char* field_name = NULL;
121 	char* nested_fs = ";";
122 	char* nested_ps = ":";
123 	char* vfs = NULL;
124 	char* ivfs = NULL;
125 	int   do_explode       = NEITHER_TRUE_NOR_FALSE;
126 	int   do_pairs         = NEITHER_TRUE_NOR_FALSE;
127 	int   do_across_fields = NEITHER_TRUE_NOR_FALSE;
128 
129 	char* verb = argv[(*pargi)++];
130 
131 	ap_state_t* pstate = ap_alloc();
132 	ap_define_string_flag(pstate, "-f",               &field_name);
133 	ap_define_string_flag(pstate, "--nested-fs",      &nested_fs);
134 	ap_define_string_flag(pstate, "--nested-ps",      &nested_ps);
135 	ap_define_true_flag(pstate,   "--explode",        &do_explode);
136 	ap_define_false_flag(pstate,  "--implode",        &do_explode);
137 	ap_define_true_flag(pstate,   "--pairs",          &do_pairs);
138 	ap_define_false_flag(pstate,  "--values",         &do_pairs);
139 	ap_define_true_flag(pstate,   "--across-fields",  &do_across_fields);
140 	ap_define_false_flag(pstate,  "--across-records", &do_across_fields);
141 	ap_define_string_flag(pstate, "--evar",           &vfs);
142 	ap_define_string_flag(pstate, "--ivar",           &ivfs);
143 
144 	if (!ap_parse(pstate, verb, pargi, argc, argv)) {
145 		mapper_nest_usage(stderr, argv[0], verb);
146 		return NULL;
147 	}
148 
149 	if (vfs != NULL) {
150 		do_explode = TRUE;
151 		do_pairs = FALSE;
152 		do_across_fields = FALSE;
153 		nested_fs = vfs;
154 	}
155 
156 	if (ivfs != NULL) {
157 		do_explode = FALSE;
158 		do_pairs = FALSE;
159 		do_across_fields = FALSE;
160 		nested_fs = ivfs;
161 	}
162 
163 	if (field_name == NULL) {
164 		mapper_nest_usage(stderr, argv[0], verb);
165 		return NULL;
166 	}
167 	if (do_explode == NEITHER_TRUE_NOR_FALSE) {
168 		mapper_nest_usage(stderr, argv[0], verb);
169 		return NULL;
170 	}
171 	if (do_pairs == NEITHER_TRUE_NOR_FALSE) {
172 		mapper_nest_usage(stderr, argv[0], verb);
173 		return NULL;
174 	}
175 	if (do_across_fields == NEITHER_TRUE_NOR_FALSE) {
176 		mapper_nest_usage(stderr, argv[0], verb);
177 		return NULL;
178 	}
179 	if (do_pairs == TRUE && do_explode == FALSE) {
180 		mapper_nest_usage(stderr, argv[0], verb);
181 		return NULL;
182 	}
183 
184 	return mapper_nest_alloc(pstate, argv[0], field_name, nested_fs, nested_ps, do_explode, do_pairs, do_across_fields);
185 }
186 
187 // ----------------------------------------------------------------
mapper_nest_alloc(ap_state_t * pargp,char * argv0,char * field_name,char * nested_fs,char * nested_ps,int do_explode,int do_pairs,int do_across_fields)188 static mapper_t* mapper_nest_alloc(ap_state_t* pargp, char* argv0,
189 	char* field_name, char* nested_fs, char* nested_ps,
190 	int do_explode, int do_pairs, int do_across_fields)
191 {
192 	mapper_t* pmapper = mlr_malloc_or_die(sizeof(mapper_t));
193 
194 	mapper_nest_state_t* pstate = mlr_malloc_or_die(sizeof(mapper_nest_state_t));
195 
196 	pstate->pargp      = pargp;
197 	pstate->field_name = field_name;
198 	pstate->nested_fs  = cli_sep_from_arg(nested_fs);
199 	pstate->nested_ps  = cli_sep_from_arg(nested_ps);
200 	pstate->nested_ps_length = strlen(pstate->nested_ps);
201 
202 	if (do_explode) {
203 		if (do_pairs) {
204 			pmapper->pprocess_func = do_across_fields
205 				? mapper_nest_explode_pairs_across_fields
206 				: mapper_nest_explode_pairs_across_records;
207 		} else {
208 			pmapper->pprocess_func = do_across_fields
209 				? mapper_nest_explode_values_across_fields
210 				: mapper_nest_explode_values_across_records;
211 		}
212 	} else {
213 		if (do_pairs) {
214 			// Should have been caught in CLI-parser.
215 			MLR_INTERNAL_CODING_ERROR();
216 		} else {
217 			pmapper->pprocess_func = do_across_fields
218 				? mapper_nest_implode_values_across_fields
219 				: mapper_nest_implode_values_across_records;
220 		}
221 	}
222 	pstate->other_keys_to_other_values_to_buckets = lhmslv_alloc();
223 	pstate->psb = sb_alloc(SB_ALLOC_LENGTH);
224 	char* pattern = mlr_malloc_or_die(strlen(field_name) + 12);
225 	sprintf(pattern, "^%s_[0-9]+$", field_name);
226 	regcomp_or_die(&pstate->regex, pattern, REG_NOSUB);
227 	free(pattern);
228 
229 	pmapper->pfree_func = mapper_nest_free;
230 
231 	pmapper->pvstate = (void*)pstate;
232 	return pmapper;
233 }
234 
mapper_nest_free(mapper_t * pmapper,context_t * _)235 static void mapper_nest_free(mapper_t* pmapper, context_t* _) {
236 	mapper_nest_state_t* pstate = pmapper->pvstate;
237 
238 	if (pstate->other_keys_to_other_values_to_buckets != NULL) {
239 		for (lhmslve_t* pe = pstate->other_keys_to_other_values_to_buckets->phead; pe != NULL; pe = pe->pnext) {
240 			lhmslv_t* other_values_to_buckets = pe->pvvalue;
241 			for (lhmslve_t* pf = other_values_to_buckets->phead; pf != NULL; pf = pf->pnext) {
242 				nest_bucket_t* pbucket = pf->pvvalue;
243 				nest_bucket_free(pbucket);
244 			}
245 			lhmslv_free(other_values_to_buckets);
246 		}
247 		lhmslv_free(pstate->other_keys_to_other_values_to_buckets);
248 	}
249 
250 	sb_free(pstate->psb);
251 	free(pstate->nested_fs);
252 	free(pstate->nested_ps);
253 	regfree(&pstate->regex);
254 	ap_free(pstate->pargp);
255 	free(pstate);
256 	free(pmapper);
257 }
258 
259 // ================================================================
mapper_nest_explode_values_across_fields(lrec_t * pinrec,context_t * pctx,void * pvstate)260 static sllv_t* mapper_nest_explode_values_across_fields(lrec_t* pinrec, context_t* pctx, void* pvstate) {
261 	if (pinrec == NULL) // End of input stream
262 		return sllv_single(NULL);
263 	mapper_nest_state_t* pstate = (mapper_nest_state_t*)pvstate;
264 
265 	lrece_t* pentry = NULL;
266 	char* field_value = lrec_get_ext(pinrec, pstate->field_name, &pentry);
267 	if (field_value == NULL) {
268 		return sllv_single(pinrec);
269 	}
270 	lrece_t* porig = pentry;
271 
272 	char* sep = pstate->nested_fs;
273 	int seplen = strlen(sep);
274 	int i = 1;
275 	char* walker = field_value;
276 	char* piece = NULL;
277 	while ((piece = mlr_strmsep(&walker, sep, seplen)) != NULL) {
278 		char  istring_free_flags;
279 		char* istring = low_int_to_string(i, &istring_free_flags);
280 		char* new_key = mlr_paste_3_strings(pstate->field_name, "_", istring);
281 		if (istring_free_flags & FREE_ENTRY_KEY)
282 			free(istring);
283 		pentry = lrec_put_after(pinrec, pentry, new_key, mlr_strdup_or_die(piece), FREE_ENTRY_KEY|FREE_ENTRY_VALUE);
284 		i++;
285 	}
286 	lrec_unlink_and_free(pinrec, porig);
287 	return sllv_single(pinrec);;
288 }
289 
290 // ----------------------------------------------------------------
mapper_nest_implode_values_across_fields(lrec_t * pinrec,context_t * pctx,void * pvstate)291 static sllv_t* mapper_nest_implode_values_across_fields(lrec_t* pinrec, context_t* pctx, void* pvstate) {
292 	mapper_nest_state_t* pstate = (mapper_nest_state_t*)pvstate;
293 
294 	if (pinrec == NULL) // end of input stream
295 		return sllv_single(NULL);
296 
297 	lrece_t* pprev = NULL;
298 	int field_count = 0;
299 	for (lrece_t* pe = pinrec->phead; pe != NULL; /* increment in loop */) {
300 		if (regmatch_or_die(&pstate->regex, pe->key, 0, NULL)) {
301 			if (field_count > 0)
302 				sb_append_string(pstate->psb, pstate->nested_fs);
303 			sb_append_string(pstate->psb, pe->value);
304 			field_count++;
305 
306 			// Keep the location so we can implode in-place.
307 			if (pprev == NULL)
308 				pprev = pe->pprev;
309 			lrece_t* pnext = pe->pnext;
310 			lrec_unlink_and_free(pinrec, pe);
311 			pe = pnext;
312 
313 		} else {
314 			pe = pe->pnext;
315 		}
316 	}
317 
318 	if (field_count > 0) {
319 		if (pprev == NULL) // No record before the unlinked one, i.e. list-head.
320 			lrec_prepend(pinrec, pstate->field_name, sb_finish(pstate->psb), FREE_ENTRY_VALUE);
321 		else
322 			lrec_put_after(pinrec, pprev, pstate->field_name, sb_finish(pstate->psb), FREE_ENTRY_VALUE);
323 	}
324 
325 	return sllv_single(pinrec);
326 }
327 
328 // ----------------------------------------------------------------
mapper_nest_explode_values_across_records(lrec_t * pinrec,context_t * pctx,void * pvstate)329 static sllv_t* mapper_nest_explode_values_across_records(lrec_t* pinrec, context_t* pctx, void* pvstate) {
330 	if (pinrec == NULL) // End of input stream
331 		return sllv_single(NULL);
332 	mapper_nest_state_t* pstate = (mapper_nest_state_t*)pvstate;
333 
334 	char* field_value = lrec_get(pinrec, pstate->field_name);
335 	if (field_value == NULL) {
336 		return sllv_single(pinrec);
337 	}
338 
339 	sllv_t* poutrecs = sllv_alloc();
340 	char* sep = pstate->nested_fs;
341 	int seplen = strlen(sep);
342 	char* walker = field_value;
343 	char* piece = NULL;
344 	while ((piece = mlr_strmsep(&walker, sep, seplen)) != NULL) {
345 		lrec_t* poutrec = lrec_copy(pinrec);
346 		lrec_put(poutrec, pstate->field_name, mlr_strdup_or_die(piece), FREE_ENTRY_VALUE);
347 		sllv_append(poutrecs, poutrec);
348 	}
349 	lrec_free(pinrec);
350 	return poutrecs;
351 }
352 
353 // ----------------------------------------------------------------
mapper_nest_implode_values_across_records(lrec_t * pinrec,context_t * pctx,void * pvstate)354 static sllv_t* mapper_nest_implode_values_across_records(lrec_t* pinrec, context_t* pctx, void* pvstate) {
355 	mapper_nest_state_t* pstate = (mapper_nest_state_t*)pvstate;
356 
357 	if (pinrec != NULL) { // Not end of input stream
358 		lrece_t* px = NULL;
359 		char* field_value = lrec_get_ext(pinrec, pstate->field_name, &px);
360 		if (field_value == NULL)
361 			return sllv_single(pinrec);
362 		char* field_value_copy = mlr_strdup_or_die(field_value);
363 
364 		// Don't lrec_remove pstate->field_name so we can implode in-place at the end.
365 		slls_t* other_keys = mlr_reference_keys_from_record_except(pinrec, px);
366 		lhmslv_t* other_values_to_buckets = lhmslv_get(pstate->other_keys_to_other_values_to_buckets, other_keys);
367 		if (other_values_to_buckets == NULL) {
368 			other_values_to_buckets = lhmslv_alloc();
369 			lhmslv_put(pstate->other_keys_to_other_values_to_buckets,
370 				slls_copy(other_keys), other_values_to_buckets, FREE_ENTRY_KEY);
371 		}
372 
373 		slls_t* other_values = mlr_reference_values_from_record_except(pinrec, px);
374 		nest_bucket_t* pbucket = lhmslv_get(other_values_to_buckets, other_values);
375 		if (pbucket == NULL) {
376 			pbucket = nest_bucket_alloc(pinrec);
377 			lhmslv_put(other_values_to_buckets, slls_copy(other_values), pbucket, FREE_ENTRY_KEY);
378 		} else {
379 			lrec_free(pinrec);
380 		}
381 		lrec_t* pair = lrec_unbacked_alloc();
382 		lrec_put(pair, pstate->field_name, field_value_copy, FREE_ENTRY_VALUE);
383 		sllv_append(pbucket->pairs, pair);
384 
385 		slls_free(other_values);
386 		slls_free(other_keys);
387 
388 		return NULL;
389 
390 	} else { // end of input stream
391 		sllv_t* poutrecs = sllv_alloc();
392 
393 		for (lhmslve_t* pe = pstate->other_keys_to_other_values_to_buckets->phead; pe != NULL; pe = pe->pnext) {
394 			lhmslv_t* other_values_to_buckets = pe->pvvalue;
395 			for (lhmslve_t* pf = other_values_to_buckets->phead; pf != NULL; pf = pf->pnext) {
396 				nest_bucket_t* pbucket = pf->pvvalue;
397 				lrec_t* poutrec = pbucket->prepresentative;
398 				pbucket->prepresentative = NULL; // ownership transfer
399 				for (sllve_t* pg = pbucket->pairs->phead; pg != NULL; pg = pg->pnext) {
400 					lrec_t* pr = pg->pvvalue;
401 					sb_append_string(pstate->psb, pr->phead->value);
402 					if (pg->pnext != NULL)
403 						sb_append_string(pstate->psb, pstate->nested_fs);
404 				}
405 				// pstate->field_name was already present so we'll overwrite it in-place here.
406 				lrec_put(poutrec, pstate->field_name, sb_finish(pstate->psb), FREE_ENTRY_VALUE);
407 				sllv_append(poutrecs, poutrec);
408 			}
409 		}
410 
411 		sllv_append(poutrecs, NULL);
412 		return poutrecs;
413 	}
414 }
415 
416 // ----------------------------------------------------------------
mapper_nest_explode_pairs_across_fields(lrec_t * pinrec,context_t * pctx,void * pvstate)417 static sllv_t* mapper_nest_explode_pairs_across_fields(lrec_t* pinrec, context_t* pctx, void* pvstate) {
418 	if (pinrec == NULL) // End of input stream
419 		return sllv_single(NULL);
420 	mapper_nest_state_t* pstate = (mapper_nest_state_t*)pvstate;
421 
422 	lrece_t* pentry = NULL;
423 	char* field_value = lrec_get_ext(pinrec, pstate->field_name, &pentry);
424 	if (field_value == NULL) {
425 		return sllv_single(pinrec);
426 	}
427 	lrece_t* porig = pentry;
428 
429 	char* sep = pstate->nested_fs;
430 	int seplen = strlen(sep);
431 	char* walker = field_value;
432 	char* piece = NULL;
433 	while ((piece = mlr_strmsep(&walker, sep, seplen)) != NULL) {
434 		char* found_sep = strstr(piece, pstate->nested_ps);
435 		if (found_sep != NULL) { // there is a pair
436 			*found_sep = 0;
437 			pentry = lrec_put_after(pinrec, pentry,
438 				mlr_strdup_or_die(piece), mlr_strdup_or_die(found_sep + pstate->nested_ps_length),
439 				FREE_ENTRY_KEY | FREE_ENTRY_VALUE);
440 		} else { // there is not a pair
441 			pentry = lrec_put_after(pinrec, pentry,
442 				pstate->field_name, mlr_strdup_or_die(piece), FREE_ENTRY_VALUE);
443 		}
444 	}
445 	lrec_unlink_and_free(pinrec, porig);
446 
447 	return sllv_single(pinrec);
448 }
449 
450 // ----------------------------------------------------------------
mapper_nest_explode_pairs_across_records(lrec_t * pinrec,context_t * pctx,void * pvstate)451 static sllv_t* mapper_nest_explode_pairs_across_records(lrec_t* pinrec, context_t* pctx, void* pvstate) {
452 	if (pinrec == NULL) // End of input stream
453 		return sllv_single(NULL);
454 	mapper_nest_state_t* pstate = (mapper_nest_state_t*)pvstate;
455 
456 	char* field_value = lrec_get(pinrec, pstate->field_name);
457 	if (field_value == NULL) {
458 		return sllv_single(pinrec);
459 	}
460 
461 	sllv_t* poutrecs = sllv_alloc();
462 	char* sep = pstate->nested_fs;
463 	int seplen = strlen(sep);
464 	char* walker = field_value;
465 	char* piece = NULL;
466 	while ((piece = mlr_strmsep(&walker, sep, seplen)) != NULL) {
467 		char* found_sep = strstr(piece, pstate->nested_ps);
468 		lrec_t* poutrec = lrec_copy(pinrec);
469 		lrece_t* pe = NULL;
470 		(void)lrec_get_ext(poutrec, pstate->field_name, &pe);
471 		// Put the new field where the old one was -- unless there's already a field with the new
472 		// name in which case replace its value.
473 		if (found_sep != NULL) { // there is a pair
474 			*found_sep = 0;
475 			lrec_put_after(poutrec, pe, mlr_strdup_or_die(piece),
476 				mlr_strdup_or_die(found_sep + pstate->nested_ps_length),
477 				FREE_ENTRY_KEY | FREE_ENTRY_VALUE);
478 		} else { // there is not a pair
479 			lrec_put_after(poutrec, pe, pstate->field_name, mlr_strdup_or_die(piece), FREE_ENTRY_VALUE);
480 		}
481 		lrec_unlink_and_free(poutrec, pe);
482 		sllv_append(poutrecs, poutrec);
483 	}
484 
485 	lrec_free(pinrec);
486 	return poutrecs;
487 }
488 
489 // ----------------------------------------------------------------
nest_bucket_alloc(lrec_t * prepresentative)490 static nest_bucket_t* nest_bucket_alloc(lrec_t* prepresentative) {
491 	nest_bucket_t* pbucket = mlr_malloc_or_die(sizeof(nest_bucket_t));
492 	pbucket->prepresentative = prepresentative;
493 	pbucket->pairs = sllv_alloc();
494 	return pbucket;
495 }
nest_bucket_free(nest_bucket_t * pbucket)496 static void nest_bucket_free(nest_bucket_t* pbucket) {
497 	lrec_free(pbucket->prepresentative);
498 	for (sllve_t* pe = pbucket->pairs->phead; pe != NULL; pe = pe->pnext) {
499 		lrec_t* pair = pe->pvvalue;
500 		lrec_free(pair);
501 	}
502 	sllv_free(pbucket->pairs);
503 	free(pbucket);
504 }
505