1package transformers
2
3import (
4	"container/list"
5	"errors"
6	"fmt"
7	"os"
8	"strings"
9
10	"miller/src/cliutil"
11	"miller/src/input"
12	"miller/src/lib"
13	"miller/src/transformers/utils"
14	"miller/src/transforming"
15	"miller/src/types"
16)
17
18// ----------------------------------------------------------------
19const verbNameJoin = "join"
20
21var JoinSetup = transforming.TransformerSetup{
22	Verb:         verbNameJoin,
23	UsageFunc:    transformerJoinUsage,
24	ParseCLIFunc: transformerJoinParseCLI,
25	IgnoresInput: false,
26}
27
28// ----------------------------------------------------------------
29// Most transformers have option-variables as individual locals within the
30// transformerXYZParseCLI function, which are passed as individual arguments to
31// the NewTransformerXYZ function. For join, things are a bit more complex
32// and we bag up the option-variables into this data structure.
33
34type tJoinOptions struct {
35	leftPrefix  string
36	rightPrefix string
37
38	outputJoinFieldNames []string
39	leftJoinFieldNames   []string
40	rightJoinFieldNames  []string
41
42	allowUnsortedInput   bool
43	emitPairables        bool
44	emitLeftUnpairables  bool
45	emitRightUnpairables bool
46
47	leftFileName string
48	prepipe      string
49
50	// These allow the joiner to have its own different format/delimiter for the left-file:
51	joinReaderOptions cliutil.TReaderOptions
52}
53
54func newJoinOptions() *tJoinOptions {
55	return &tJoinOptions{
56		leftPrefix:  "",
57		rightPrefix: "",
58
59		outputJoinFieldNames: nil,
60		leftJoinFieldNames:   nil,
61		rightJoinFieldNames:  nil,
62
63		allowUnsortedInput:   true,
64		emitPairables:        true,
65		emitLeftUnpairables:  false,
66		emitRightUnpairables: false,
67
68		leftFileName: "",
69		prepipe:      "",
70
71		// TODO
72		// readerOptions: readerOptions,
73	}
74}
75
76// ----------------------------------------------------------------
77func transformerJoinUsage(
78	o *os.File,
79	doExit bool,
80	exitCode int,
81) {
82	fmt.Fprintf(o, "Usage: %s %s [options]\n", lib.MlrExeName(), verbNameJoin)
83	fmt.Fprintf(o, "Joins records from specified left file name with records from all file names\n")
84	fmt.Fprintf(o, "at the end of the Miller argument list.\n")
85	fmt.Fprintf(o, "Functionality is essentially the same as the system \"join\" command, but for\n")
86	fmt.Fprintf(o, "record streams.\n")
87	fmt.Fprintf(o, "Options:\n")
88	fmt.Fprintf(o, "  -f {left file name}\n")
89	fmt.Fprintf(o, "  -j {a,b,c}   Comma-separated join-field names for output\n")
90	fmt.Fprintf(o, "  -l {a,b,c}   Comma-separated join-field names for left input file;\n")
91	fmt.Fprintf(o, "               defaults to -j values if omitted.\n")
92	fmt.Fprintf(o, "  -r {a,b,c}   Comma-separated join-field names for right input file(s);\n")
93	fmt.Fprintf(o, "               defaults to -j values if omitted.\n")
94	fmt.Fprintf(o, "  --lp {text}  Additional prefix for non-join output field names from\n")
95	fmt.Fprintf(o, "               the left file\n")
96	fmt.Fprintf(o, "  --rp {text}  Additional prefix for non-join output field names from\n")
97	fmt.Fprintf(o, "               the right file(s)\n")
98	fmt.Fprintf(o, "  --np         Do not emit paired records\n")
99	fmt.Fprintf(o, "  --ul         Emit unpaired records from the left file\n")
100	fmt.Fprintf(o, "  --ur         Emit unpaired records from the right file(s)\n")
101	fmt.Fprintf(o, "  -s|--sorted-input  Require sorted input: records must be sorted\n")
102	fmt.Fprintf(o, "               lexically by their join-field names, else not all records will\n")
103	fmt.Fprintf(o, "               be paired. The only likely use case for this is with a left\n")
104	fmt.Fprintf(o, "               file which is too big to fit into system memory otherwise.\n")
105	fmt.Fprintf(o, "  -u           Enable unsorted input. (This is the default even without -u.)\n")
106	fmt.Fprintf(o, "               In this case, the entire left file will be loaded into memory.\n")
107	fmt.Fprintf(o, "  --prepipe {command} As in main input options; see %s --help for details.\n",
108		lib.MlrExeName())
109	fmt.Fprintf(o, "               If you wish to use a prepipe command for the main input as well\n")
110	fmt.Fprintf(o, "               as here, it must be specified there as well as here.\n")
111	fmt.Fprintf(o, "-h|--help Show this message.\n")
112	fmt.Fprintf(o, "\n")
113	fmt.Fprintf(o, "File-format options default to those for the right file names on the Miller\n")
114	fmt.Fprintf(o, "argument list, but may be overridden for the left file as follows. Please see\n")
115	fmt.Fprintf(o, "the main \"%s --help\" for more information on syntax for these arguments.\n", lib.MlrExeName())
116	fmt.Fprintf(o, "  -i {one of csv,dkvp,nidx,pprint,xtab}\n")
117	fmt.Fprintf(o, "  --irs {record-separator character}\n")
118	fmt.Fprintf(o, "  --ifs {field-separator character}\n")
119	fmt.Fprintf(o, "  --ips {pair-separator character}\n")
120	fmt.Fprintf(o, "  --repifs\n")
121	fmt.Fprintf(o, "  --repips\n")
122	fmt.Fprintf(o, "Please use \"%s --usage-separator-options\" for information on specifying separators.\n",
123		lib.MlrExeName())
124	fmt.Fprintf(o, "Please see https://miller.readthedocs.io/en/latest/reference-verbs.html#join for more information\n")
125	fmt.Fprintf(o, "including examples.\n")
126
127	if doExit {
128		os.Exit(exitCode)
129	}
130}
131
132// ----------------------------------------------------------------
133func transformerJoinParseCLI(
134	pargi *int,
135	argc int,
136	args []string,
137	mainReaderOptions *cliutil.TReaderOptions, // Options for the right-files
138	__ *cliutil.TWriterOptions,
139) transforming.IRecordTransformer {
140
141	// Skip the verb name from the current spot in the mlr command line
142	argi := *pargi
143	verb := args[argi]
144	argi++
145
146	// Parse local flags
147	opts := newJoinOptions()
148
149	if mainReaderOptions != nil { // for 'mlr --usage-all-verbs', it's nil
150		// TODO: make sure this is a full nested-struct copy.
151		opts.joinReaderOptions = *mainReaderOptions // struct copy
152	}
153
154	for argi < argc /* variable increment: 1 or 2 depending on flag */ {
155		opt := args[argi]
156		if !strings.HasPrefix(opt, "-") {
157			break // No more flag options to process
158		}
159		argi++
160
161		if opt == "-h" || opt == "--help" {
162			transformerSortUsage(os.Stdout, true, 0)
163
164		} else if opt == "--prepipe" {
165			opts.prepipe = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
166
167		} else if opt == "-f" {
168			opts.leftFileName = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
169
170		} else if opt == "-j" {
171			opts.outputJoinFieldNames = cliutil.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
172
173		} else if opt == "-l" {
174			opts.leftJoinFieldNames = cliutil.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
175
176		} else if opt == "-r" {
177			opts.rightJoinFieldNames = cliutil.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
178
179		} else if opt == "--lp" {
180			opts.leftPrefix = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
181
182		} else if opt == "--rp" {
183			opts.rightPrefix = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
184
185		} else if opt == "--np" {
186			opts.emitPairables = false
187
188		} else if opt == "--ul" {
189			opts.emitLeftUnpairables = true
190
191		} else if opt == "--ur" {
192			opts.emitRightUnpairables = true
193
194		} else if opt == "-u" {
195			opts.allowUnsortedInput = true
196
197		} else if opt == "--sorted-input" || opt == "-s" {
198			opts.allowUnsortedInput = false
199
200		} else {
201			// This is inelegant. For error-proofing we advance argi already in our
202			// loop (so individual if-statements don't need to). However,
203			// ParseReaderOptions expects it unadvanced.
204			rargi := argi - 1
205			if cliutil.ParseReaderOptions(args, argc, &rargi, &opts.joinReaderOptions) {
206				// This lets mlr main and mlr join have different input formats.
207				// Nothing else to handle here.
208				argi = rargi
209			} else {
210				transformerJoinUsage(os.Stderr, true, 1)
211			}
212		}
213	}
214
215	if opts.leftFileName == "" {
216		fmt.Fprintf(os.Stderr, "%s %s: need left file name\n", lib.MlrExeName(), verb)
217		transformerSortUsage(os.Stderr, true, 1)
218		return nil
219	}
220
221	if !opts.emitPairables && !opts.emitLeftUnpairables && !opts.emitRightUnpairables {
222		fmt.Fprintf(os.Stderr, "%s %s: all emit flags are unset; no output is possible.\n",
223			lib.MlrExeName(), verb)
224		transformerSortUsage(os.Stderr, true, 1)
225		return nil
226	}
227
228	if opts.outputJoinFieldNames == nil {
229		fmt.Fprintf(os.Stderr, "%s %s: need output field names\n", lib.MlrExeName(), verb)
230		transformerSortUsage(os.Stderr, true, 1)
231		return nil
232	}
233
234	if opts.leftJoinFieldNames == nil {
235		opts.leftJoinFieldNames = opts.outputJoinFieldNames // array copy
236	}
237	if opts.rightJoinFieldNames == nil {
238		opts.rightJoinFieldNames = opts.outputJoinFieldNames // array copy
239	}
240
241	llen := len(opts.leftJoinFieldNames)
242	rlen := len(opts.rightJoinFieldNames)
243	olen := len(opts.outputJoinFieldNames)
244	if llen != rlen || llen != olen {
245		fmt.Fprintf(os.Stderr,
246			"%s %s: must have equal left,right,output field-name lists; got lengths %d,%d,%d.\n",
247			lib.MlrExeName(), verb, llen, rlen, olen)
248		os.Exit(1)
249	}
250
251	transformer, _ := NewTransformerJoin(opts)
252
253	*pargi = argi
254	return transformer
255}
256
257// ----------------------------------------------------------------
258type TransformerJoin struct {
259	opts *tJoinOptions
260
261	leftFieldNameSet  map[string]bool
262	rightFieldNameSet map[string]bool
263
264	// For unsorted/half-streaming input
265	ingested                         bool
266	leftBucketsByJoinFieldValues     *lib.OrderedMap
267	leftUnpairableRecordsAndContexts *list.List
268
269	// For sorted/doubly-streaming input
270	joinBucketKeeper *utils.JoinBucketKeeper
271
272	recordTransformerFunc transforming.RecordTransformerFunc
273}
274
275// ----------------------------------------------------------------
276func NewTransformerJoin(
277	opts *tJoinOptions,
278) (*TransformerJoin, error) {
279
280	this := &TransformerJoin{
281		opts: opts,
282
283		leftFieldNameSet:  lib.StringListToSet(opts.leftJoinFieldNames),
284		rightFieldNameSet: lib.StringListToSet(opts.rightJoinFieldNames),
285
286		ingested:                         false,
287		leftBucketsByJoinFieldValues:     nil,
288		leftUnpairableRecordsAndContexts: nil,
289		joinBucketKeeper:                 nil,
290	}
291
292	if opts.allowUnsortedInput {
293		// Half-streaming (default) case: ingest entire left file first.
294
295		this.leftUnpairableRecordsAndContexts = list.New()
296		this.leftBucketsByJoinFieldValues = lib.NewOrderedMap()
297		this.recordTransformerFunc = this.transformHalfStreaming
298
299	} else {
300		// Doubly-streaming (non-default) case: step left/right files forward.
301		// Requires both files be sorted on their join keys in order to not
302		// miss anything. This lets people do joins that would otherwise take
303		// too much RAM.
304
305		this.joinBucketKeeper = utils.NewJoinBucketKeeper(
306			//		opts.prepipe,
307			opts.leftFileName,
308			&opts.joinReaderOptions,
309			opts.leftJoinFieldNames,
310		)
311
312		this.recordTransformerFunc = this.transformDoublyStreaming
313	}
314
315	return this, nil
316}
317
318// ----------------------------------------------------------------
319func (this *TransformerJoin) Transform(
320	inrecAndContext *types.RecordAndContext,
321	outputChannel chan<- *types.RecordAndContext,
322) {
323	this.recordTransformerFunc(inrecAndContext, outputChannel)
324}
325
326// ----------------------------------------------------------------
327// This is for the half-streaming case. We ingest the entire left file,
328// matching each right record against those.
329func (this *TransformerJoin) transformHalfStreaming(
330	inrecAndContext *types.RecordAndContext,
331	outputChannel chan<- *types.RecordAndContext,
332) {
333	// This can't be done in the CLI-parser since it requires information which
334	// isn't known until after the CLI-parser is called.
335	//
336	// TODO: check if this is still true for the Go port, once everything else
337	// is done.
338	if !this.ingested { // First call
339		this.ingestLeftFile()
340		this.ingested = true
341	}
342
343	if !inrecAndContext.EndOfStream {
344		inrec := inrecAndContext.Record
345		groupingKey, hasAllJoinKeys := inrec.GetSelectedValuesJoined(
346			this.opts.rightJoinFieldNames,
347		)
348		if hasAllJoinKeys {
349			iLeftBucket := this.leftBucketsByJoinFieldValues.Get(groupingKey)
350			if iLeftBucket == nil {
351				if this.opts.emitRightUnpairables {
352					outputChannel <- inrecAndContext
353				}
354			} else {
355				leftBucket := iLeftBucket.(*utils.JoinBucket)
356				leftBucket.WasPaired = true
357				if this.opts.emitPairables {
358					this.formAndEmitPairs(
359						leftBucket.RecordsAndContexts,
360						inrecAndContext,
361						outputChannel,
362					)
363				}
364			}
365		} else if this.opts.emitRightUnpairables {
366			outputChannel <- inrecAndContext
367		}
368
369	} else { // end of record stream
370		if this.opts.emitLeftUnpairables {
371			this.emitLeftUnpairedBuckets(outputChannel)
372			this.emitLeftUnpairables(outputChannel)
373		}
374		outputChannel <- inrecAndContext // emit end-of-stream marker
375	}
376}
377
378// ----------------------------------------------------------------
379func (this *TransformerJoin) transformDoublyStreaming(
380	rightRecAndContext *types.RecordAndContext,
381	outputChannel chan<- *types.RecordAndContext,
382) {
383	keeper := this.joinBucketKeeper // keystroke-saver
384
385	if !rightRecAndContext.EndOfStream {
386		rightRec := rightRecAndContext.Record
387		isPaired := false
388
389		rightFieldValues, hasAllJoinKeys := rightRec.ReferenceSelectedValues(
390			this.opts.rightJoinFieldNames,
391		)
392		if hasAllJoinKeys {
393			isPaired = keeper.FindJoinBucket(rightFieldValues)
394		}
395		if this.opts.emitLeftUnpairables {
396			keeper.OutputAndReleaseLeftUnpaireds(outputChannel)
397		} else {
398			keeper.ReleaseLeftUnpaireds(outputChannel)
399		}
400
401		lefts := keeper.JoinBucket.RecordsAndContexts // keystroke-saver
402
403		if !isPaired && this.opts.emitRightUnpairables {
404			outputChannel <- rightRecAndContext
405		}
406
407		if isPaired && this.opts.emitPairables && lefts != nil {
408			this.formAndEmitPairs(lefts, rightRecAndContext, outputChannel)
409		}
410
411	} else { // end of record stream
412		keeper.FindJoinBucket(nil)
413
414		if this.opts.emitLeftUnpairables {
415			keeper.OutputAndReleaseLeftUnpaireds(outputChannel)
416		}
417
418		outputChannel <- rightRecAndContext // emit end-of-stream marker
419	}
420}
421
422// ----------------------------------------------------------------
423// This is for the half-streaming case. We ingest the entire left file,
424// matching each right record against those.
425//
426// Note: this logic is very similar to that in stream.go, which is what
427// processes the main/right files.
428
429func (this *TransformerJoin) ingestLeftFile() {
430	readerOpts := &this.opts.joinReaderOptions
431
432	// Instantiate the record-reader
433	recordReader := input.Create(readerOpts)
434	if recordReader == nil {
435		fmt.Fprintln(
436			os.Stderr,
437			errors.New("Input format not found: "+readerOpts.InputFileFormat),
438		)
439		os.Exit(1)
440	}
441
442	// Set the initial context for the left-file.
443	//
444	// Since Go is concurrent, the context struct needs to be duplicated and
445	// passed through the channels along with each record.
446	initialContext := types.NewContext(nil)
447	initialContext.UpdateForStartOfFile(this.opts.leftFileName)
448
449	// Set up channels for the record-reader.
450	inputChannel := make(chan *types.RecordAndContext, 10)
451	errorChannel := make(chan error, 1)
452
453	// Start the record reader.
454	// TODO: prepipe
455	leftFileNameArray := [1]string{this.opts.leftFileName}
456	go recordReader.Read(leftFileNameArray[:], *initialContext, inputChannel, errorChannel)
457
458	// Ingest parsed records and bucket them by their join-field values.  E.g.
459	// if the join-field is "id" then put all records with id=1 in one bucket,
460	// all those with id=2 in another bucket, etc. And any records lacking an
461	// "id" field go into the unpairable list.
462	done := false
463	for !done {
464		select {
465
466		case err := <-errorChannel:
467			fmt.Fprintln(os.Stderr, lib.MlrExeName(), ": ", err)
468			os.Exit(1)
469
470		case leftrecAndContext := <-inputChannel:
471			if leftrecAndContext.EndOfStream {
472				done = true
473				break // breaks the switch, not the for, in Golang
474			}
475			leftrec := leftrecAndContext.Record
476
477			groupingKey, leftFieldValues, ok := leftrec.GetSelectedValuesAndJoined(
478				this.opts.leftJoinFieldNames,
479			)
480			if ok {
481				iBucket := this.leftBucketsByJoinFieldValues.Get(groupingKey)
482				if iBucket == nil { // New key-field-value: new bucket and hash-map entry
483					bucket := utils.NewJoinBucket(leftFieldValues)
484					bucket.RecordsAndContexts.PushBack(leftrecAndContext)
485					this.leftBucketsByJoinFieldValues.Put(groupingKey, bucket)
486				} else { // Previously seen key-field-value: append record to bucket
487					bucket := iBucket.(*utils.JoinBucket)
488					bucket.RecordsAndContexts.PushBack(leftrecAndContext)
489				}
490			} else {
491				this.leftUnpairableRecordsAndContexts.PushBack(leftrecAndContext)
492			}
493		}
494	}
495}
496
497// ----------------------------------------------------------------
498// This helper method is used by the half-streaming/unsorted join, as well as
499// the doubly-streaming/sorted join.
500
501func (this *TransformerJoin) formAndEmitPairs(
502	leftRecordsAndContexts *list.List,
503	rightRecordAndContext *types.RecordAndContext,
504	outputChannel chan<- *types.RecordAndContext,
505) {
506	////fmt.Println("-- pairs start") // VERBOSE
507	// Loop over each to-be-paired-with record from the left file.
508	for pe := leftRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
509		////fmt.Println("-- pairs pe") // VERBOSE
510		leftRecordAndContext := pe.Value.(*types.RecordAndContext)
511		leftrec := leftRecordAndContext.Record
512		rightrec := rightRecordAndContext.Record
513
514		// Allocate a new output record which is the join of the left and right records.
515		outrec := types.NewMlrmapAsRecord()
516
517		// Add the joined-on fields to the new output record
518		n := len(this.opts.leftJoinFieldNames)
519		for i := 0; i < n; i++ {
520			// These arrays are already guaranteed same-length by CLI parser
521			leftJoinFieldName := this.opts.leftJoinFieldNames[i]
522			outputJoinFieldName := this.opts.outputJoinFieldNames[i]
523			value := leftrec.Get(leftJoinFieldName)
524			if value != nil {
525				outrec.PutCopy(outputJoinFieldName, value)
526			}
527		}
528
529		// Add the left-record fields not already added
530		for pl := leftrec.Head; pl != nil; pl = pl.Next {
531			_, ok := this.leftFieldNameSet[pl.Key]
532			if !ok {
533				key := this.opts.leftPrefix + pl.Key
534				outrec.PutCopy(key, pl.Value)
535			}
536		}
537
538		// Add the right-record fields not already added
539		for pr := rightrec.Head; pr != nil; pr = pr.Next {
540			_, ok := this.rightFieldNameSet[pr.Key]
541			if !ok {
542				key := this.opts.rightPrefix + pr.Key
543				outrec.PutCopy(key, pr.Value)
544			}
545		}
546		////fmt.Println("-- pairs outrec") // VERBOSE
547		////outrec.Print() // VERBOSE
548
549		// Clone the right record's context (NR, FILENAME, etc) to use for the new output record
550		context := rightRecordAndContext.Context // struct copy
551		outrecAndContext := types.NewRecordAndContext(outrec, &context)
552
553		// Emit the new joined record on the downstream channel
554		outputChannel <- outrecAndContext
555	}
556	////fmt.Println("-- pairs end") // VERBOSE
557}
558
559// ----------------------------------------------------------------
560// There are two kinds of left non-pair records: (a) those lacking the
561// specified join-keys -- can't possibly pair with anything on the right; (b)
562// those having the join-keys but not matching with a record on the right.
563//
564// Example: join on "id" field. Records lacking an "id" field are in the first
565// category.  Now suppose there's a left record with id=0, but there were three
566// right-file records with id-field values 1,2,3. Then the id=0 left records is
567// in the second category.
568
569func (this *TransformerJoin) emitLeftUnpairables(
570	outputChannel chan<- *types.RecordAndContext,
571) {
572	// Loop over each to-be-paired-with record from the left file.
573	for pe := this.leftUnpairableRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
574		leftRecordAndContext := pe.Value.(*types.RecordAndContext)
575		outputChannel <- leftRecordAndContext
576	}
577}
578
579func (this *TransformerJoin) emitLeftUnpairedBuckets(
580	outputChannel chan<- *types.RecordAndContext,
581) {
582	for pe := this.leftBucketsByJoinFieldValues.Head; pe != nil; pe = pe.Next {
583		bucket := pe.Value.(*utils.JoinBucket)
584		if !bucket.WasPaired {
585			for pf := bucket.RecordsAndContexts.Front(); pf != nil; pf = pf.Next() {
586				recordAndContext := pf.Value.(*types.RecordAndContext)
587				outputChannel <- recordAndContext
588			}
589		}
590	}
591}
592