1 #include <stdio.h>
2 #include <stdlib.h>
3 #include "lib/mlrutil.h"
4 #include "lib/mlr_globals.h"
5 #include "lib/context.h"
6 #include "containers/mixutil.h"
7 #include "containers/join_bucket_keeper.h"
8 #include "input/lrec_readers.h"
9 
10 // ================================================================
11 // JOIN_BUCKET_KEEPER
12 //
13 // This data structure supports Miller's sorted (double-streaming) join.  It is
14 // perhaps best explained by first comparing with the unsorted (half-streaming)
15 // case.
16 //
17 // In both cases, we have left and right join keys. Suppose the left file has
18 // data with field name "L" to be joined with right-file(s) data with field
19 // name "R". For the unsorted case (see mapper_join.c) the entire left file is
20 // first loaded into buckets of record-lists, one for each distinct value of L.
21 // E.g. given the following:
22 //
23 //   +-----+-----+
24 //   |  L  |  R  |
25 //   + --- + --- +
26 //   |  a  |  a  |
27 //   |  c  |  b  |
28 //   |  a  |  f  |
29 //   |  b  |     |
30 //   |  c  |     |
31 //   |  d  |     |
32 //   |  a  |     |
33 //   +-----+-----+
34 //
35 // the left file is bucketed as
36 //
37 //   +-----+     +-----+     +-----+     +-----+
38 //   |  L  |     |  L  |     |  L  |     |  L  |
39 //   + --- +     + --- +     + --- +     + --- +
40 //   |  a  |     |  c  |     |  b  |     |  d  |
41 //   |  a  |     |  c  |     +-----+     +-----+
42 //   |  a  |     + --- +
43 //   + --- +
44 //
45 // Then the right file is processed one record at a time (hence
46 // "half-streaming"). The pairings are easy:
47 // * the right record with R=a is paired with the L=a bucket,
48 // * the right record with R=b is paired with the L=b bucket,
49 // * the right record with R=f is unpaired, and
50 // * the left records with L=c and L=d are unpaired.
51 //
52 // ----------------------------------------------------------------
53 // Now for the sorted (doubly-streaming) case. Here we require that the left
54 // and right files be already sorted (lexically ascending) by the join fields.
55 // Then the example inputs look like this:
56 //
57 //   +-----+-----+
58 //   |  L  |  R  |
59 //   + --- + --- +
60 //   |  a  |  a  |
61 //   |  a  |  b  |
62 //   |  a  |  f  |
63 //   |  b  |     |
64 //   |  c  |     |
65 //   |  c  |     |
66 //   |  d  |     |
67 //   +-----+-----+
68 //
69 // The right file is still read one record at a time. It's the job of this
70 // join_bucket_keeper class to keep track of the left-file buckets, one bucket
71 // at a time.  This includes all records with same values for the join
72 // field(s), e.g. the three L=a records, as well as a "peek" record which is
73 // either the next record with a different join value (e.g. the L=b record), or
74 // an end-of-file indicator.
75 //
76 // If a right-file record has join field matching the current left-file bucket,
77 // then it's paired with all records in that bucket. Otherwise the
78 // join_bucket_keeper needs to either stay with the current bucket or advance
79 // to the next one, depending whether the current right-file record's
80 // join-field values compare lexically with the the left-file bucket's
81 // join-field values.
82 //
83 // Examples:
84 //
85 // +-----------+-----------+-----------+-----------+-----------+-----------+
86 // |  L    R   |   L   R   |   L   R   |   L   R   |   L   R   |   L   R   |
87 // + ---  ---  + ---  ---  + ---  ---  + ---  ---  + ---  ---  + ---  ---  +
88 // |       a   |       a   |   e       |       a   |   e   e   |   e   e   |
89 // |       b   |   e       |   e       |   e   e   |   e       |   e   e   |
90 // |   e       |   e       |   e       |   e       |   e       |   e       |
91 // |   e       |   e       |       f   |   e       |       f   |   g   g   |
92 // |   e       |       f   |   g       |   g       |   g       |   g       |
93 // |   g       |   g       |   g       |   g       |   g       |           |
94 // |   g       |   g       |       h   |           |           |           |
95 // +-----------+-----------+-----------+-----------+-----------+-----------+
96 //
97 // In all these examples, the join_bucket_keeper goes through these steps:
98 // * bucket is empty, peek rec has L=e
99 // * bucket is L=e records, peek rec has L=g
100 // * bucket is L=g records, peek rec is null (due to EOF)
101 // * bucket is empty, peek rec is null (due to EOF)
102 //
103 // Example 1:
104 // * left-bucket is empty and left-peek has L=e
105 // * right record has R=a; join_bucket_keeper does not advance
106 // * right record has R=b; join_bucket_keeper does not advance
107 // * right end of file; all left records are unpaired.
108 //
109 // Example 2:
110 // * left-bucket is empty and left-peek has L=e
111 // * right record has R=a; join_bucket_keeper does not advance
112 // * right record has R=f; left records with L=e are unpaired.
113 // * etc.
114 //
115 // ================================================================
116 
117 // ----------------------------------------------------------------
118 #define LEFT_STATE_0_PREFILL     0
119 #define LEFT_STATE_1_FULL        1
120 #define LEFT_STATE_2_LAST_BUCKET 2
121 #define LEFT_STATE_3_EOF         3
122 
123 // ----------------------------------------------------------------
124 // (0) pre-fill:    Lv == null, peek == null, leof = false
125 // (1) midstream:   Lv != null, peek != null, leof = false
126 // (2) last bucket: Lv != null, peek == null, leof = true
127 // (3) leof:        Lv == null, peek == null, leof = true
128 // ----------------------------------------------------------------
129 
130 // Private methods
131 static int join_bucket_keeper_get_state(join_bucket_keeper_t* pkeeper);
132 static void join_bucket_keeper_initial_fill(join_bucket_keeper_t* pkeeper,
133 	sllv_t** pprecords_left_unpaired);
134 static void join_bucket_keeper_advance_to(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
135 	sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired);
136 static void join_bucket_keeper_fill(join_bucket_keeper_t* pkeeper, sllv_t** pprecords_left_unpaired);
137 static void join_bucket_keeper_drain(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
138 	sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired);
139 static char* describe_state(int state);
140 
141 // ----------------------------------------------------------------
join_bucket_keeper_alloc(char * prepipe,char * left_file_name,cli_reader_opts_t * popts,slls_t * pleft_field_names)142 join_bucket_keeper_t* join_bucket_keeper_alloc(
143 	char* prepipe,
144 	char* left_file_name,
145 	cli_reader_opts_t* popts,
146 	slls_t* pleft_field_names
147 ) {
148 	lrec_reader_t* plrec_reader = lrec_reader_alloc(popts);
149 	return join_bucket_keeper_alloc_from_reader(plrec_reader, prepipe, left_file_name, pleft_field_names);
150 }
151 
152 // ----------------------------------------------------------------
join_bucket_keeper_alloc_from_reader(lrec_reader_t * plrec_reader,char * prepipe,char * left_file_name,slls_t * pleft_field_names)153 join_bucket_keeper_t* join_bucket_keeper_alloc_from_reader(
154 	lrec_reader_t* plrec_reader,
155 	char*          prepipe,
156 	char*          left_file_name,
157 	slls_t*        pleft_field_names)
158 {
159 	join_bucket_keeper_t* pkeeper = mlr_malloc_or_die(sizeof(join_bucket_keeper_t));
160 
161 	void* pvhandle = plrec_reader->popen_func(plrec_reader->pvstate, prepipe, left_file_name);
162 	plrec_reader->psof_func(plrec_reader->pvstate, pvhandle);
163 
164 	context_t* pctx = mlr_malloc_or_die(sizeof(context_t));
165 	context_init_from_first_file_name(pctx, left_file_name);
166 
167 	pkeeper->plrec_reader = plrec_reader;
168 	pkeeper->pvhandle = pvhandle;
169 	pkeeper->pctx = pctx;
170 
171 	pkeeper->pleft_field_names           = pleft_field_names;
172 
173 	pkeeper->pbucket                     = mlr_malloc_or_die(sizeof(join_bucket_t));
174 	pkeeper->pbucket->precords           = sllv_alloc();
175 	pkeeper->pbucket->pleft_field_values = NULL;
176 	pkeeper->pbucket->was_paired         = FALSE;
177 
178 	pkeeper->prec_peek                   = NULL;
179 	pkeeper->leof                        = FALSE;
180 	pkeeper->state                       = LEFT_STATE_0_PREFILL;
181 
182 	return pkeeper;
183 }
184 
185 // ----------------------------------------------------------------
join_bucket_keeper_free(join_bucket_keeper_t * pkeeper,char * prepipe)186 void join_bucket_keeper_free(join_bucket_keeper_t* pkeeper, char* prepipe) {
187 	if (pkeeper == NULL)
188 		return;
189 	slls_free(pkeeper->pbucket->pleft_field_values);
190 	sllv_free(pkeeper->pbucket->precords);
191 	free(pkeeper->pbucket);
192 	pkeeper->plrec_reader->pclose_func(pkeeper->plrec_reader->pvstate, pkeeper->pvhandle, prepipe);
193 	pkeeper->plrec_reader->pfree_func(pkeeper->plrec_reader);
194 	lrec_free(pkeeper->prec_peek);
195 	free(pkeeper->pctx);
196 	free(pkeeper);
197 }
198 
199 // ----------------------------------------------------------------
join_bucket_keeper_emit(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)200 void join_bucket_keeper_emit(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
201 	sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
202 {
203 	*pprecords_paired        = NULL;
204 	*pprecords_left_unpaired = NULL;
205 
206 	if (pkeeper->state == LEFT_STATE_0_PREFILL) {
207 		join_bucket_keeper_initial_fill(pkeeper, pprecords_left_unpaired);
208 		pkeeper->state = join_bucket_keeper_get_state(pkeeper);
209 	}
210 
211 	if (pright_field_values != NULL) { // Not right EOF
212 		if (pkeeper->state == LEFT_STATE_1_FULL || pkeeper->state == LEFT_STATE_2_LAST_BUCKET) {
213 			int cmp = slls_compare_lexically(pkeeper->pbucket->pleft_field_values, pright_field_values);
214 			if (cmp < 0) {
215 				// Advance left until match or left EOF.
216 				join_bucket_keeper_advance_to(pkeeper, pright_field_values, pprecords_paired, pprecords_left_unpaired);
217 			} else if (cmp == 0) {
218 				pkeeper->pbucket->was_paired = TRUE;
219 				*pprecords_paired = pkeeper->pbucket->precords;
220 			} else {
221 				// No match and no need to advance left; return null lists.
222 			}
223 		} else if (pkeeper->state != LEFT_STATE_3_EOF) {
224 			fprintf(stderr, "%s: internal coding error: failed transition from prefill state.\n",
225 				MLR_GLOBALS.bargv0);
226 			exit(1);
227 		}
228 
229 	} else { // Right EOF: return the final left-unpaireds.
230 		join_bucket_keeper_drain(pkeeper, pright_field_values, pprecords_paired, pprecords_left_unpaired);
231 	}
232 
233 	pkeeper->state = join_bucket_keeper_get_state(pkeeper);
234 }
235 
236 // ----------------------------------------------------------------
join_bucket_keeper_get_state(join_bucket_keeper_t * pkeeper)237 static int join_bucket_keeper_get_state(join_bucket_keeper_t* pkeeper) {
238 	if (pkeeper->pbucket->pleft_field_values == NULL) {
239 		if (pkeeper->leof)
240 			return LEFT_STATE_3_EOF;
241 		else
242 			return LEFT_STATE_0_PREFILL;
243 	} else {
244 		if (pkeeper->prec_peek == NULL)
245 			return LEFT_STATE_2_LAST_BUCKET;
246 		else
247 			return LEFT_STATE_1_FULL;
248 	}
249 }
250 
join_bucket_keeper_initial_fill(join_bucket_keeper_t * pkeeper,sllv_t ** pprecords_left_unpaired)251 static void join_bucket_keeper_initial_fill(join_bucket_keeper_t* pkeeper,
252 	sllv_t** pprecords_left_unpaired)
253 {
254 	while (TRUE) {
255 		// Skip over records not having the join keys. These go straight to the
256 		// left-unpaired list.
257 		pkeeper->prec_peek = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
258 			pkeeper->pvhandle, pkeeper->pctx);
259 		if (pkeeper->prec_peek == NULL) {
260 			break;
261 		}
262 		if (record_has_all_keys(pkeeper->prec_peek, pkeeper->pleft_field_names)) {
263 			break;
264 		} else {
265 			if (*pprecords_left_unpaired == NULL)
266 				*pprecords_left_unpaired = sllv_alloc();
267 			sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
268 		}
269 	}
270 
271 	if (pkeeper->prec_peek == NULL) {
272 		pkeeper->leof = TRUE;
273 		return;
274 	}
275 	join_bucket_keeper_fill(pkeeper, pprecords_left_unpaired);
276 }
277 
278 // Preconditions:
279 // * prec_peek != NULL
280 // * prec_peek has the join keys
join_bucket_keeper_fill(join_bucket_keeper_t * pkeeper,sllv_t ** pprecords_left_unpaired)281 static void join_bucket_keeper_fill(join_bucket_keeper_t* pkeeper, sllv_t** pprecords_left_unpaired) {
282 	slls_t* pleft_field_values = mlr_reference_selected_values_from_record(pkeeper->prec_peek,
283 		pkeeper->pleft_field_names);
284 	if (pleft_field_values == NULL) {
285 		fprintf(stderr, "%s: internal coding error: peek record should have had join keys.\n",
286 			MLR_GLOBALS.bargv0);
287 		exit(1);
288 	}
289 
290 	pkeeper->pbucket->pleft_field_values = slls_copy(pleft_field_values);
291 	slls_free(pleft_field_values);
292 	sllv_append(pkeeper->pbucket->precords, pkeeper->prec_peek);
293 	pkeeper->pbucket->was_paired = FALSE;
294 	pkeeper->prec_peek = NULL;
295 	while (TRUE) {
296 		// Skip over records not having the join keys. These go straight to the
297 		// left-unpaired list.
298 		pkeeper->prec_peek = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
299 			pkeeper->pvhandle, pkeeper->pctx);
300 		if (pkeeper->prec_peek == NULL) {
301 			pkeeper->leof = TRUE;
302 			break;
303 		}
304 
305 		if (record_has_all_keys(pkeeper->prec_peek, pkeeper->pleft_field_names)) {
306 			int cmp = slls_lrec_compare_lexically(
307 				pkeeper->pbucket->pleft_field_values,
308 				pkeeper->prec_peek,
309 				pkeeper->pleft_field_names);
310 			if (cmp != 0) {
311 				break;
312 			}
313 			sllv_append(pkeeper->pbucket->precords, pkeeper->prec_peek);
314 
315 		} else {
316 			if (*pprecords_left_unpaired == NULL)
317 				*pprecords_left_unpaired = sllv_alloc();
318 			sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
319 		}
320 		pkeeper->prec_peek = NULL;
321 	}
322 }
323 
324 // Pre-conditions:
325 // * pkeeper->pleft_field_values < pright_field_values.
326 // * currently in state 1 or 2 so there is a bucket but there may or may not be a peek-record
327 // * current bucket was/wasn't paired on previous emits but is not paired on this emit.
328 // Actions:
329 // * if bucket was never paired, return it to the caller; else discard.
330 // * consume left input stream, feeding into unpaired, for as long as leftvals < rightvals && !eof.
331 // * if there is leftrec with vals == rightvals: parallel initial_fill.
332 //   else, mimic initial_fill.
333 
join_bucket_keeper_advance_to(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)334 static void join_bucket_keeper_advance_to(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
335 	sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
336 {
337 	if (pkeeper->pbucket->was_paired) {
338 		while (pkeeper->pbucket->precords->phead)
339 			lrec_free(sllv_pop(pkeeper->pbucket->precords));
340 		sllv_free(pkeeper->pbucket->precords);
341 		pkeeper->pbucket->precords = NULL;
342 	} else {
343 		if (*pprecords_left_unpaired == NULL) {
344 			*pprecords_left_unpaired = pkeeper->pbucket->precords;
345 		} else {
346 			sllv_transfer(*pprecords_left_unpaired, pkeeper->pbucket->precords);
347 		}
348 	}
349 
350 	pkeeper->pbucket->precords = sllv_alloc();
351 	if (pkeeper->pbucket->pleft_field_values != NULL) {
352 		slls_free(pkeeper->pbucket->pleft_field_values);
353 		pkeeper->pbucket->pleft_field_values = NULL;
354 	}
355 	pkeeper->pbucket->was_paired = FALSE;
356 
357 	if (pkeeper->prec_peek == NULL) { // left EOF
358 		return;
359 	}
360 
361 	// Need a double condition here ... the peek record is either het or hom.
362 	// (Or, change that: -> ensure elsewhere the peek record is hom.)
363 	// The former is destined for lunp and shouldn't be lexcmped. The latter
364 	// should be.
365 
366 	int cmp = lrec_slls_compare_lexically(pkeeper->prec_peek, pkeeper->pleft_field_names, pright_field_values);
367 	if (cmp < 0) {
368 		// keep seeking & filling the bucket until = or >; this may or may not end up being a match.
369 
370 		if (*pprecords_left_unpaired == NULL)
371 			*pprecords_left_unpaired = sllv_alloc();
372 
373 		while (TRUE) {
374 			sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
375 			pkeeper->prec_peek = NULL;
376 
377 			while (TRUE) {
378 				// Skip over records not having the join keys. These go straight to the
379 				// left-unpaired list.
380 				pkeeper->prec_peek = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
381 					pkeeper->pvhandle, pkeeper->pctx);
382 				if (pkeeper->prec_peek == NULL)
383 					break;
384 				if (record_has_all_keys(pkeeper->prec_peek, pkeeper->pleft_field_names)) {
385 					break;
386 				} else {
387 					if (*pprecords_left_unpaired == NULL)
388 						*pprecords_left_unpaired = sllv_alloc();
389 					sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
390 				}
391 			}
392 
393 
394 			if (pkeeper->prec_peek == NULL) {
395 				pkeeper->leof = TRUE;
396 				break;
397 			}
398 
399 			cmp = lrec_slls_compare_lexically(pkeeper->prec_peek, pkeeper->pleft_field_names, pright_field_values);
400 			if (cmp >= 0)
401 				break;
402 		}
403 	}
404 
405 	if (cmp == 0) {
406 		join_bucket_keeper_fill(pkeeper, pprecords_left_unpaired);
407 		pkeeper->pbucket->was_paired = TRUE;
408 		*pprecords_paired = pkeeper->pbucket->precords;
409 	} else if (cmp > 0) {
410 		join_bucket_keeper_fill(pkeeper, pprecords_left_unpaired);
411 	}
412 }
413 
join_bucket_keeper_drain(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)414 static void join_bucket_keeper_drain(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
415 	sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
416 {
417 	// 1. Any records already in pkeeper->pbucket->precords (current bucket)
418 	if (pkeeper->pbucket->was_paired) {
419 		if (*pprecords_left_unpaired == NULL)
420 			*pprecords_left_unpaired = sllv_alloc();
421 	} else {
422 		if (*pprecords_left_unpaired == NULL) {
423 			*pprecords_left_unpaired = pkeeper->pbucket->precords;
424 		} else {
425 			sllv_transfer(*pprecords_left_unpaired, pkeeper->pbucket->precords);
426 			sllv_free(pkeeper->pbucket->precords);
427 		}
428 	}
429 	// 2. Peek-record, if any
430 	if (pkeeper->prec_peek != NULL) {
431 		sllv_append(*pprecords_left_unpaired, pkeeper->prec_peek);
432 		pkeeper->prec_peek = NULL;
433 	}
434 	// 3. Remainder of left input stream
435 	while (TRUE) {
436 		lrec_t* prec = pkeeper->plrec_reader->pprocess_func(pkeeper->plrec_reader->pvstate,
437 			pkeeper->pvhandle, pkeeper->pctx);
438 		if (prec == NULL)
439 			break;
440 		sllv_append(*pprecords_left_unpaired, prec);
441 	}
442 
443 	pkeeper->pbucket->precords = NULL;
444 }
445 
446 // ----------------------------------------------------------------
join_bucket_keeper_print(join_bucket_keeper_t * pkeeper)447 void join_bucket_keeper_print(join_bucket_keeper_t* pkeeper) {
448 	printf("pbucket at %p:\n", pkeeper);
449 	printf("  pvhandle = %p\n", pkeeper->pvhandle);
450 	context_print(pkeeper->pctx, "  ");
451 	printf("  pleft_field_names = ");
452 	slls_print(pkeeper->pleft_field_names);
453 	printf("\n");
454 	join_bucket_print(pkeeper->pbucket, "  ");
455 	printf("  prec_peek = ");
456 	if (pkeeper->prec_peek == NULL) {
457 		printf("null\n");
458 	} else {
459 		lrec_print(pkeeper->prec_peek);
460 	}
461 	printf("  leof  = %d\n", pkeeper->leof);
462 	printf("  state = %s\n", describe_state(pkeeper->state));
463 }
464 
join_bucket_keeper_print_aux(join_bucket_keeper_t * pkeeper,slls_t * pright_field_values,sllv_t ** pprecords_paired,sllv_t ** pprecords_left_unpaired)465 void join_bucket_keeper_print_aux(join_bucket_keeper_t* pkeeper, slls_t* pright_field_values,
466 	sllv_t** pprecords_paired, sllv_t** pprecords_left_unpaired)
467 {
468 	join_bucket_keeper_print(pkeeper);
469 	printf("  pright_field_values = ");
470 	slls_print(pright_field_values);
471 	printf("\n");
472 	printf("  precords_paired =\n");
473 	lrec_print_list_with_prefix(*pprecords_paired, "      ");
474 	printf("\n");
475 	printf("  precords_left_unpaired =\n");
476 	lrec_print_list_with_prefix(*pprecords_left_unpaired, "      ");
477 	printf("\n");
478 }
479 
join_bucket_print(join_bucket_t * pbucket,char * indent)480 void join_bucket_print(join_bucket_t* pbucket, char* indent) {
481 	printf("%spbucket at %p:\n", indent, pbucket);
482 	printf("%s  pleft_field_values = ", indent);
483 	slls_print(pbucket->pleft_field_values);
484 	printf("\n");
485 	if (pbucket->precords == NULL) {
486 		printf("%s  precords:\n", indent);
487 		printf("%s    (null)\n", indent);
488 	} else {
489 		printf("%s  precords (length=%llu):\n", indent, pbucket->precords->length);
490 		lrec_print_list_with_prefix(pbucket->precords, "      ");
491 	}
492 	printf("%s  was_paired = %d\n", indent, pbucket->was_paired);
493 }
494 
describe_state(int state)495 static char* describe_state(int state) {
496 	switch (state) {
497 	case LEFT_STATE_0_PREFILL:     return "LEFT_STATE_0_PREFILL";
498 	case LEFT_STATE_1_FULL:        return "LEFT_STATE_1_FULL";
499 	case LEFT_STATE_2_LAST_BUCKET: return "LEFT_STATE_2_LAST_BUCKET";
500 	case LEFT_STATE_3_EOF:         return "LEFT_STATE_3_EOF";
501 	default:                       return "???";
502 	}
503 }
504