1 #include <stdio.h>
2 #include <stdlib.h>
3 #include "lib/mlrutil.h"
4 #include "lib/mlr_globals.h"
5 #include "lib/context.h"
6 #include "containers/mixutil.h"
7 #include "containers/join_bucket_keeper.h"
8 #include "input/lrec_readers.h"
9
10 // ================================================================
11 // JOIN_BUCKET_KEEPER
12 //
13 // This data structure supports Miller's sorted (double-streaming) join. It is
14 // perhaps best explained by first comparing with the unsorted (half-streaming)
15 // case.
16 //
17 // In both cases, we have left and right join keys. Suppose the left file has
18 // data with field name "L" to be joined with right-file(s) data with field
19 // name "R". For the unsorted case (see mapper_join.c) the entire left file is
20 // first loaded into buckets of record-lists, one for each distinct value of L.
21 // E.g. given the following:
22 //
23 // +-----+-----+
24 // | L | R |
25 // + --- + --- +
26 // | a | a |
27 // | c | b |
28 // | a | f |
29 // | b | |
30 // | c | |
31 // | d | |
32 // | a | |
33 // +-----+-----+
34 //
35 // the left file is bucketed as
36 //
37 // +-----+ +-----+ +-----+ +-----+
38 // | L | | L | | L | | L |
39 // + --- + + --- + + --- + + --- +
40 // | a | | c | | b | | d |
41 // | a | | c | +-----+ +-----+
42 // | a | + --- +
43 // + --- +
44 //
45 // Then the right file is processed one record at a time (hence
46 // "half-streaming"). The pairings are easy:
47 // * the right record with R=a is paired with the L=a bucket,
48 // * the right record with R=b is paired with the L=b bucket,
49 // * the right record with R=f is unpaired, and
50 // * the left records with L=c and L=d are unpaired.
51 //
52 // ----------------------------------------------------------------
53 // Now for the sorted (doubly-streaming) case. Here we require that the left
54 // and right files be already sorted (lexically ascending) by the join fields.
55 // Then the example inputs look like this:
56 //
57 // +-----+-----+
58 // | L | R |
59 // + --- + --- +
60 // | a | a |
61 // | a | b |
62 // | a | f |
63 // | b | |
64 // | c | |
65 // | c | |
66 // | d | |
67 // +-----+-----+
68 //
69 // The right file is still read one record at a time. It's the job of this
70 // join_bucket_keeper class to keep track of the left-file buckets, one bucket
71 // at a time. This includes all records with same values for the join
72 // field(s), e.g. the three L=a records, as well as a "peek" record which is
73 // either the next record with a different join value (e.g. the L=b record), or
74 // an end-of-file indicator.
75 //
76 // If a right-file record has join field matching the current left-file bucket,
77 // then it's paired with all records in that bucket. Otherwise the
78 // join_bucket_keeper needs to either stay with the current bucket or advance
79 // to the next one, depending whether the current right-file record's
80 // join-field values compare lexically with the the left-file bucket's
81 // join-field values.
82 //
83 // Examples:
84 //
85 // +-----------+-----------+-----------+-----------+-----------+-----------+
86 // | L R | L R | L R | L R | L R | L R |
87 // + --- --- + --- --- + --- --- + --- --- + --- --- + --- --- +
88 // | a | a | e | a | e e | e e |
89 // | b | e | e | e e | e | e e |
90 // | e | e | e | e | e | e |
91 // | e | e | f | e | f | g g |
92 // | e | f | g | g | g | g |
93 // | g | g | g | g | g | |
94 // | g | g | h | | | |
95 // +-----------+-----------+-----------+-----------+-----------+-----------+
96 //
97 // In all these examples, the join_bucket_keeper goes through these steps:
98 // * bucket is empty, peek rec has L=e
99 // * bucket is L=e records, peek rec has L=g
100 // * bucket is L=g records, peek rec is null (due to EOF)
101 // * bucket is empty, peek rec is null (due to EOF)
102 //
103 // Example 1:
104 // * left-bucket is empty and left-peek has L=e
105 // * right record has R=a; join_bucket_keeper does not advance
106 // * right record has R=b; join_bucket_keeper does not advance
107 // * right end of file; all left records are unpaired.
108 //
109 // Example 2:
110 // * left-bucket is empty and left-peek has L=e
111 // * right record has R=a; join_bucket_keeper does not advance
112 // * right record has R=f; left records with L=e are unpaired.
113 // * etc.
114 //
115 // ================================================================
116
117 // ----------------------------------------------------------------
118 #define LEFT_STATE_0_PREFILL 0
119 #define LEFT_STATE_1_FULL 1
120 #define LEFT_STATE_2_LAST_BUCKET 2
121 #define LEFT_STATE_3_EOF 3
122
123 // ----------------------------------------------------------------
124 // (0) pre-fill: Lv == null, peek == null, leof = false
125 // (1) midstream: Lv != null, peek != null, leof = false
126 // (2) last bucket: Lv != null, peek == null, leof = true
127 // (3) leof: Lv == null, peek == null, leof = true
128 // ----------------------------------------------------------------
129
130 // Private methods
131 static int join_bucket_keeper_get_state(join_bucket_keeper_t* pkeeper);
132 static void join_bucket_keeper_initial_fill(join_bucket_keeper_t* pkeeper,
133 sllv_t** pprecords_left_unpaired);
134 static void join_bucket_keeper_advance_to(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
135 sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired);
136 static void join_bucket_keeper_fill(join_bucket_keeper_t* pkeeper, sllv_t** pprecords_left_unpaired);
137 static void join_bucket_keeper_drain(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
138 sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired);
139 static char* describe_state(int state);
140
141 // ----------------------------------------------------------------
join_bucket_keeper_alloc(char * prepipe,char * left_file_name,cli_reader_opts_t * popts,slls_t * pleft_field_names)142 join_bucket_keeper_t* join_bucket_keeper_alloc(
143 char* prepipe,
144 char* left_file_name,
145 cli_reader_opts_t* popts,
146 slls_t* pleft_field_names
147 ) {
148 lrec_reader_t* plrec_reader = lrec_reader_alloc(popts);
149 return join_bucket_keeper_alloc_from_reader(plrec_reader, prepipe, left_file_name, pleft_field_names);
150 }
151
152 // ----------------------------------------------------------------
join_bucket_keeper_alloc_from_reader(lrec_reader_t * plrec_reader,char * prepipe,char * left_file_name,slls_t * pleft_field_names)153 join_bucket_keeper_t* join_bucket_keeper_alloc_from_reader(
154 lrec_reader_t* plrec_reader,
155 char* prepipe,
156 char* left_file_name,
157 slls_t* pleft_field_names)
158 {
159 join_bucket_keeper_t* pkeeper = mlr_malloc_or_die(sizeof(join_bucket_keeper_t));
160
161 void* pvhandle = plrec_reader->popen_func(plrec_reader->pvstate, prepipe, left_file_name);
162 plrec_reader->psof_func(plrec_reader->pvstate, pvhandle);
163
164 context_t* pctx = mlr_malloc_or_die(sizeof(context_t));
165 context_init_from_first_file_name(pctx, left_file_name);
166
167 pkeeper->plrec_reader = plrec_reader;
168 pkeeper->pvhandle = pvhandle;
169 pkeeper->pctx = pctx;
170
171 pkeeper->pleft_field_names = pleft_field_names;
172
173 pkeeper->pbucket = mlr_malloc_or_die(sizeof(join_bucket_t));
174 pkeeper->pbucket->precords = sllv_alloc();
175 pkeeper->pbucket->pleft_field_values = NULL;
176 pkeeper->pbucket->was_paired = FALSE;
177
178 pkeeper->prec_peek = NULL;
179 pkeeper->leof = FALSE;
180 pkeeper->state = LEFT_STATE_0_PREFILL;
181
182 return pkeeper;
183 }
184
185 // ----------------------------------------------------------------
join_bucket_keeper_free(join_bucket_keeper_t * pkeeper,char * prepipe)186 void join_bucket_keeper_free(join_bucket_keeper_t* pkeeper, char* prepipe) {
187 if (pkeeper == NULL)
188 return;
189 slls_free(pkeeper->pbucket->pleft_field_values);
190 sllv_free(pkeeper->pbucket->precords);
191 free(pkeeper->pbucket);
192 pkeeper->plrec_reader->pclose_func(pkeeper->plrec_reader->pvstate, pkeeper->pvhandle, prepipe);
193 pkeeper->plrec_reader->pfree_func(pkeeper->plrec_reader);
194 lrec_free(pkeeper->prec_peek);
195 free(pkeeper->pctx);
196 free(pkeeper);
197 }
198
199 // ----------------------------------------------------------------
join_bucket_keeper_emit(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)200 void join_bucket_keeper_emit(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
201 sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
202 {
203 *pprecords_paired = NULL;
204 *pprecords_left_unpaired = NULL;
205
206 if (pkeeper->state == LEFT_STATE_0_PREFILL) {
207 join_bucket_keeper_initial_fill(pkeeper, pprecords_left_unpaired);
208 pkeeper->state = join_bucket_keeper_get_state(pkeeper);
209 }
210
211 if (pright_field_values != NULL) { // Not right EOF
212 if (pkeeper->state == LEFT_STATE_1_FULL || pkeeper->state == LEFT_STATE_2_LAST_BUCKET) {
213 int cmp = slls_compare_lexically(pkeeper->pbucket->pleft_field_values, pright_field_values);
214 if (cmp < 0) {
215 // Advance left until match or left EOF.
216 join_bucket_keeper_advance_to(pkeeper, pright_field_values, pprecords_paired, pprecords_left_unpaired);
217 } else if (cmp == 0) {
218 pkeeper->pbucket->was_paired = TRUE;
219 *pprecords_paired = pkeeper->pbucket->precords;
220 } else {
221 // No match and no need to advance left; return null lists.
222 }
223 } else if (pkeeper->state != LEFT_STATE_3_EOF) {
224 fprintf(stderr, "%s: internal coding error: failed transition from prefill state.\n",
225 MLR_GLOBALS.bargv0);
226 exit(1);
227 }
228
229 } else { // Right EOF: return the final left-unpaireds.
230 join_bucket_keeper_drain(pkeeper, pright_field_values, pprecords_paired, pprecords_left_unpaired);
231 }
232
233 pkeeper->state = join_bucket_keeper_get_state(pkeeper);
234 }
235
236 // ----------------------------------------------------------------
join_bucket_keeper_get_state(join_bucket_keeper_t * pkeeper)237 static int join_bucket_keeper_get_state(join_bucket_keeper_t* pkeeper) {
238 if (pkeeper->pbucket->pleft_field_values == NULL) {
239 if (pkeeper->leof)
240 return LEFT_STATE_3_EOF;
241 else
242 return LEFT_STATE_0_PREFILL;
243 } else {
244 if (pkeeper->prec_peek == NULL)
245 return LEFT_STATE_2_LAST_BUCKET;
246 else
247 return LEFT_STATE_1_FULL;
248 }
249 }
250
join_bucket_keeper_initial_fill(join_bucket_keeper_t * pkeeper,sllv_t ** pprecords_left_unpaired)251 static void join_bucket_keeper_initial_fill(join_bucket_keeper_t* pkeeper,
252 sllv_t** pprecords_left_unpaired)
253 {
254 while (TRUE) {
255 // Skip over records not having the join keys. These go straight to the
256 // left-unpaired list.
257 pkeeper->prec_peek = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
258 pkeeper->pvhandle, pkeeper->pctx);
259 if (pkeeper->prec_peek == NULL) {
260 break;
261 }
262 if (record_has_all_keys(pkeeper->prec_peek, pkeeper->pleft_field_names)) {
263 break;
264 } else {
265 if (*pprecords_left_unpaired == NULL)
266 *pprecords_left_unpaired = sllv_alloc();
267 sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
268 }
269 }
270
271 if (pkeeper->prec_peek == NULL) {
272 pkeeper->leof = TRUE;
273 return;
274 }
275 join_bucket_keeper_fill(pkeeper, pprecords_left_unpaired);
276 }
277
278 // Preconditions:
279 // * prec_peek != NULL
280 // * prec_peek has the join keys
join_bucket_keeper_fill(join_bucket_keeper_t * pkeeper,sllv_t ** pprecords_left_unpaired)281 static void join_bucket_keeper_fill(join_bucket_keeper_t* pkeeper, sllv_t** pprecords_left_unpaired) {
282 slls_t* pleft_field_values = mlr_reference_selected_values_from_record(pkeeper->prec_peek,
283 pkeeper->pleft_field_names);
284 if (pleft_field_values == NULL) {
285 fprintf(stderr, "%s: internal coding error: peek record should have had join keys.\n",
286 MLR_GLOBALS.bargv0);
287 exit(1);
288 }
289
290 pkeeper->pbucket->pleft_field_values = slls_copy(pleft_field_values);
291 slls_free(pleft_field_values);
292 sllv_append(pkeeper->pbucket->precords, pkeeper->prec_peek);
293 pkeeper->pbucket->was_paired = FALSE;
294 pkeeper->prec_peek = NULL;
295 while (TRUE) {
296 // Skip over records not having the join keys. These go straight to the
297 // left-unpaired list.
298 pkeeper->prec_peek = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
299 pkeeper->pvhandle, pkeeper->pctx);
300 if (pkeeper->prec_peek == NULL) {
301 pkeeper->leof = TRUE;
302 break;
303 }
304
305 if (record_has_all_keys(pkeeper->prec_peek, pkeeper->pleft_field_names)) {
306 int cmp = slls_lrec_compare_lexically(
307 pkeeper->pbucket->pleft_field_values,
308 pkeeper->prec_peek,
309 pkeeper->pleft_field_names);
310 if (cmp != 0) {
311 break;
312 }
313 sllv_append(pkeeper->pbucket->precords, pkeeper->prec_peek);
314
315 } else {
316 if (*pprecords_left_unpaired == NULL)
317 *pprecords_left_unpaired = sllv_alloc();
318 sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
319 }
320 pkeeper->prec_peek = NULL;
321 }
322 }
323
324 // Pre-conditions:
325 // * pkeeper->pleft_field_values < pright_field_values.
326 // * currently in state 1 or 2 so there is a bucket but there may or may not be a peek-record
327 // * current bucket was/wasn't paired on previous emits but is not paired on this emit.
328 // Actions:
329 // * if bucket was never paired, return it to the caller; else discard.
330 // * consume left input stream, feeding into unpaired, for as long as leftvals < rightvals && !eof.
331 // * if there is leftrec with vals == rightvals: parallel initial_fill.
332 // else, mimic initial_fill.
333
join_bucket_keeper_advance_to(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)334 static void join_bucket_keeper_advance_to(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
335 sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
336 {
337 if (pkeeper->pbucket->was_paired) {
338 while (pkeeper->pbucket->precords->phead)
339 lrec_free(sllv_pop(pkeeper->pbucket->precords));
340 sllv_free(pkeeper->pbucket->precords);
341 pkeeper->pbucket->precords = NULL;
342 } else {
343 if (*pprecords_left_unpaired == NULL) {
344 *pprecords_left_unpaired = pkeeper->pbucket->precords;
345 } else {
346 sllv_transfer(*pprecords_left_unpaired, pkeeper->pbucket->precords);
347 }
348 }
349
350 pkeeper->pbucket->precords = sllv_alloc();
351 if (pkeeper->pbucket->pleft_field_values != NULL) {
352 slls_free(pkeeper->pbucket->pleft_field_values);
353 pkeeper->pbucket->pleft_field_values = NULL;
354 }
355 pkeeper->pbucket->was_paired = FALSE;
356
357 if (pkeeper->prec_peek == NULL) { // left EOF
358 return;
359 }
360
361 // Need a double condition here ... the peek record is either het or hom.
362 // (Or, change that: -> ensure elsewhere the peek record is hom.)
363 // The former is destined for lunp and shouldn't be lexcmped. The latter
364 // should be.
365
366 int cmp = lrec_slls_compare_lexically(pkeeper->prec_peek, pkeeper->pleft_field_names, pright_field_values);
367 if (cmp < 0) {
368 // keep seeking & filling the bucket until = or >; this may or may not end up being a match.
369
370 if (*pprecords_left_unpaired == NULL)
371 *pprecords_left_unpaired = sllv_alloc();
372
373 while (TRUE) {
374 sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
375 pkeeper->prec_peek = NULL;
376
377 while (TRUE) {
378 // Skip over records not having the join keys. These go straight to the
379 // left-unpaired list.
380 pkeeper->prec_peek = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
381 pkeeper->pvhandle, pkeeper->pctx);
382 if (pkeeper->prec_peek == NULL)
383 break;
384 if (record_has_all_keys(pkeeper->prec_peek, pkeeper->pleft_field_names)) {
385 break;
386 } else {
387 if (*pprecords_left_unpaired == NULL)
388 *pprecords_left_unpaired = sllv_alloc();
389 sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
390 }
391 }
392
393
394 if (pkeeper->prec_peek == NULL) {
395 pkeeper->leof = TRUE;
396 break;
397 }
398
399 cmp = lrec_slls_compare_lexically(pkeeper->prec_peek, pkeeper->pleft_field_names, pright_field_values);
400 if (cmp >= 0)
401 break;
402 }
403 }
404
405 if (cmp == 0) {
406 join_bucket_keeper_fill(pkeeper, pprecords_left_unpaired);
407 pkeeper->pbucket->was_paired = TRUE;
408 *pprecords_paired = pkeeper->pbucket->precords;
409 } else if (cmp > 0) {
410 join_bucket_keeper_fill(pkeeper, pprecords_left_unpaired);
411 }
412 }
413
join_bucket_keeper_drain(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)414 static void join_bucket_keeper_drain(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
415 sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
416 {
417 // 1. Any records already in pkeeper->pbucket->precords (current bucket)
418 if (pkeeper->pbucket->was_paired) {
419 if (*pprecords_left_unpaired == NULL)
420 *pprecords_left_unpaired = sllv_alloc();
421 } else {
422 if (*pprecords_left_unpaired == NULL) {
423 *pprecords_left_unpaired = pkeeper->pbucket->precords;
424 } else {
425 sllv_transfer(*pprecords_left_unpaired, pkeeper->pbucket->precords);
426 sllv_free(pkeeper->pbucket->precords);
427 }
428 }
429 // 2. Peek-record, if any
430 if (pkeeper->prec_peek != NULL) {
431 sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
432 pkeeper->prec_peek = NULL;
433 }
434 // 3. Remainder of left input stream
435 while (TRUE) {
436 lrec_t* prec = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
437 pkeeper->pvhandle, pkeeper->pctx);
438 if (prec == NULL)
439 break;
440 sllv_append(*pprecords_left_unpaired, prec);
441 }
442
443 pkeeper->pbucket->precords = NULL;
444 }
445
446 // ----------------------------------------------------------------
join_bucket_keeper_print(join_bucket_keeper_t * pkeeper)447 void join_bucket_keeper_print(join_bucket_keeper_t* pkeeper) {
448 printf("pbucket at %p:\n", pkeeper);
449 printf(" pvhandle = %p\n", pkeeper->pvhandle);
450 context_print(pkeeper->pctx, " ");
451 printf(" pleft_field_names = ");
452 slls_print(pkeeper->pleft_field_names);
453 printf("\n");
454 join_bucket_print(pkeeper->pbucket, " ");
455 printf(" prec_peek = ");
456 if (pkeeper->prec_peek == NULL) {
457 printf("null\n");
458 } else {
459 lrec_print(pkeeper->prec_peek);
460 }
461 printf(" leof = %d\n", pkeeper->leof);
462 printf(" state = %s\n", describe_state(pkeeper->state));
463 }
464
join_bucket_keeper_print_aux(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)465 void join_bucket_keeper_print_aux(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
466 sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
467 {
468 join_bucket_keeper_print(pkeeper);
469 printf(" pright_field_values = ");
470 slls_print(pright_field_values);
471 printf("\n");
472 printf(" precords_paired =\n");
473 lrec_print_list_with_prefix(*pprecords_paired, " ");
474 printf("\n");
475 printf(" precords_left_unpaired =\n");
476 lrec_print_list_with_prefix(*pprecords_left_unpaired, " ");
477 printf("\n");
478 }
479
join_bucket_print(join_bucket_t * pbucket,char * indent)480 void join_bucket_print(join_bucket_t* pbucket, char* indent) {
481 printf("%spbucket at %p:\n", indent, pbucket);
482 printf("%s pleft_field_values = ", indent);
483 slls_print(pbucket->pleft_field_values);
484 printf("\n");
485 if (pbucket->precords == NULL) {
486 printf("%s precords:\n", indent);
487 printf("%s (null)\n", indent);
488 } else {
489 printf("%s precords (length=%llu):\n", indent, pbucket->precords->length);
490 lrec_print_list_with_prefix(pbucket->precords, " ");
491 }
492 printf("%s was_paired = %d\n", indent, pbucket->was_paired);
493 }
494
describe_state(int state)495 static char* describe_state(int state) {
496 switch (state) {
497 case LEFT_STATE_0_PREFILL: return "LEFT_STATE_0_PREFILL";
498 case LEFT_STATE_1_FULL: return "LEFT_STATE_1_FULL";
499 case LEFT_STATE_2_LAST_BUCKET: return "LEFT_STATE_2_LAST_BUCKET";
500 case LEFT_STATE_3_EOF: return "LEFT_STATE_3_EOF";
501 default: return "???";
502 }
503 }
504