1package transformers 2 3import ( 4 "container/list" 5 "errors" 6 "fmt" 7 "os" 8 "strings" 9 10 "miller/src/cliutil" 11 "miller/src/input" 12 "miller/src/lib" 13 "miller/src/transformers/utils" 14 "miller/src/transforming" 15 "miller/src/types" 16) 17 18// ---------------------------------------------------------------- 19const verbNameJoin = "join" 20 21var JoinSetup = transforming.TransformerSetup{ 22 Verb: verbNameJoin, 23 UsageFunc: transformerJoinUsage, 24 ParseCLIFunc: transformerJoinParseCLI, 25 IgnoresInput: false, 26} 27 28// ---------------------------------------------------------------- 29// Most transformers have option-variables as individual locals within the 30// transformerXYZParseCLI function, which are passed as individual arguments to 31// the NewTransformerXYZ function. For join, things are a bit more complex 32// and we bag up the option-variables into this data structure. 33 34type tJoinOptions struct { 35 leftPrefix string 36 rightPrefix string 37 38 outputJoinFieldNames []string 39 leftJoinFieldNames []string 40 rightJoinFieldNames []string 41 42 allowUnsortedInput bool 43 emitPairables bool 44 emitLeftUnpairables bool 45 emitRightUnpairables bool 46 47 leftFileName string 48 prepipe string 49 50 // These allow the joiner to have its own different format/delimiter for the left-file: 51 joinReaderOptions cliutil.TReaderOptions 52} 53 54func newJoinOptions() *tJoinOptions { 55 return &tJoinOptions{ 56 leftPrefix: "", 57 rightPrefix: "", 58 59 outputJoinFieldNames: nil, 60 leftJoinFieldNames: nil, 61 rightJoinFieldNames: nil, 62 63 allowUnsortedInput: true, 64 emitPairables: true, 65 emitLeftUnpairables: false, 66 emitRightUnpairables: false, 67 68 leftFileName: "", 69 prepipe: "", 70 71 // TODO 72 // readerOptions: readerOptions, 73 } 74} 75 76// ---------------------------------------------------------------- 77func transformerJoinUsage( 78 o *os.File, 79 doExit bool, 80 exitCode int, 81) { 82 fmt.Fprintf(o, "Usage: %s %s [options]\n", lib.MlrExeName(), verbNameJoin) 83 fmt.Fprintf(o, "Joins records from specified left file name with records from all file names\n") 84 fmt.Fprintf(o, "at the end of the Miller argument list.\n") 85 fmt.Fprintf(o, "Functionality is essentially the same as the system \"join\" command, but for\n") 86 fmt.Fprintf(o, "record streams.\n") 87 fmt.Fprintf(o, "Options:\n") 88 fmt.Fprintf(o, " -f {left file name}\n") 89 fmt.Fprintf(o, " -j {a,b,c} Comma-separated join-field names for output\n") 90 fmt.Fprintf(o, " -l {a,b,c} Comma-separated join-field names for left input file;\n") 91 fmt.Fprintf(o, " defaults to -j values if omitted.\n") 92 fmt.Fprintf(o, " -r {a,b,c} Comma-separated join-field names for right input file(s);\n") 93 fmt.Fprintf(o, " defaults to -j values if omitted.\n") 94 fmt.Fprintf(o, " --lp {text} Additional prefix for non-join output field names from\n") 95 fmt.Fprintf(o, " the left file\n") 96 fmt.Fprintf(o, " --rp {text} Additional prefix for non-join output field names from\n") 97 fmt.Fprintf(o, " the right file(s)\n") 98 fmt.Fprintf(o, " --np Do not emit paired records\n") 99 fmt.Fprintf(o, " --ul Emit unpaired records from the left file\n") 100 fmt.Fprintf(o, " --ur Emit unpaired records from the right file(s)\n") 101 fmt.Fprintf(o, " -s|--sorted-input Require sorted input: records must be sorted\n") 102 fmt.Fprintf(o, " lexically by their join-field names, else not all records will\n") 103 fmt.Fprintf(o, " be paired. The only likely use case for this is with a left\n") 104 fmt.Fprintf(o, " file which is too big to fit into system memory otherwise.\n") 105 fmt.Fprintf(o, " -u Enable unsorted input. (This is the default even without -u.)\n") 106 fmt.Fprintf(o, " In this case, the entire left file will be loaded into memory.\n") 107 fmt.Fprintf(o, " --prepipe {command} As in main input options; see %s --help for details.\n", 108 lib.MlrExeName()) 109 fmt.Fprintf(o, " If you wish to use a prepipe command for the main input as well\n") 110 fmt.Fprintf(o, " as here, it must be specified there as well as here.\n") 111 fmt.Fprintf(o, "-h|--help Show this message.\n") 112 fmt.Fprintf(o, "\n") 113 fmt.Fprintf(o, "File-format options default to those for the right file names on the Miller\n") 114 fmt.Fprintf(o, "argument list, but may be overridden for the left file as follows. Please see\n") 115 fmt.Fprintf(o, "the main \"%s --help\" for more information on syntax for these arguments.\n", lib.MlrExeName()) 116 fmt.Fprintf(o, " -i {one of csv,dkvp,nidx,pprint,xtab}\n") 117 fmt.Fprintf(o, " --irs {record-separator character}\n") 118 fmt.Fprintf(o, " --ifs {field-separator character}\n") 119 fmt.Fprintf(o, " --ips {pair-separator character}\n") 120 fmt.Fprintf(o, " --repifs\n") 121 fmt.Fprintf(o, " --repips\n") 122 fmt.Fprintf(o, "Please use \"%s --usage-separator-options\" for information on specifying separators.\n", 123 lib.MlrExeName()) 124 fmt.Fprintf(o, "Please see https://miller.readthedocs.io/en/latest/reference-verbs.html#join for more information\n") 125 fmt.Fprintf(o, "including examples.\n") 126 127 if doExit { 128 os.Exit(exitCode) 129 } 130} 131 132// ---------------------------------------------------------------- 133func transformerJoinParseCLI( 134 pargi *int, 135 argc int, 136 args []string, 137 mainReaderOptions *cliutil.TReaderOptions, // Options for the right-files 138 __ *cliutil.TWriterOptions, 139) transforming.IRecordTransformer { 140 141 // Skip the verb name from the current spot in the mlr command line 142 argi := *pargi 143 verb := args[argi] 144 argi++ 145 146 // Parse local flags 147 opts := newJoinOptions() 148 149 if mainReaderOptions != nil { // for 'mlr --usage-all-verbs', it's nil 150 // TODO: make sure this is a full nested-struct copy. 151 opts.joinReaderOptions = *mainReaderOptions // struct copy 152 } 153 154 for argi < argc /* variable increment: 1 or 2 depending on flag */ { 155 opt := args[argi] 156 if !strings.HasPrefix(opt, "-") { 157 break // No more flag options to process 158 } 159 argi++ 160 161 if opt == "-h" || opt == "--help" { 162 transformerSortUsage(os.Stdout, true, 0) 163 164 } else if opt == "--prepipe" { 165 opts.prepipe = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc) 166 167 } else if opt == "-f" { 168 opts.leftFileName = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc) 169 170 } else if opt == "-j" { 171 opts.outputJoinFieldNames = cliutil.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc) 172 173 } else if opt == "-l" { 174 opts.leftJoinFieldNames = cliutil.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc) 175 176 } else if opt == "-r" { 177 opts.rightJoinFieldNames = cliutil.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc) 178 179 } else if opt == "--lp" { 180 opts.leftPrefix = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc) 181 182 } else if opt == "--rp" { 183 opts.rightPrefix = cliutil.VerbGetStringArgOrDie(verb, opt, args, &argi, argc) 184 185 } else if opt == "--np" { 186 opts.emitPairables = false 187 188 } else if opt == "--ul" { 189 opts.emitLeftUnpairables = true 190 191 } else if opt == "--ur" { 192 opts.emitRightUnpairables = true 193 194 } else if opt == "-u" { 195 opts.allowUnsortedInput = true 196 197 } else if opt == "--sorted-input" || opt == "-s" { 198 opts.allowUnsortedInput = false 199 200 } else { 201 // This is inelegant. For error-proofing we advance argi already in our 202 // loop (so individual if-statements don't need to). However, 203 // ParseReaderOptions expects it unadvanced. 204 rargi := argi - 1 205 if cliutil.ParseReaderOptions(args, argc, &rargi, &opts.joinReaderOptions) { 206 // This lets mlr main and mlr join have different input formats. 207 // Nothing else to handle here. 208 argi = rargi 209 } else { 210 transformerJoinUsage(os.Stderr, true, 1) 211 } 212 } 213 } 214 215 if opts.leftFileName == "" { 216 fmt.Fprintf(os.Stderr, "%s %s: need left file name\n", lib.MlrExeName(), verb) 217 transformerSortUsage(os.Stderr, true, 1) 218 return nil 219 } 220 221 if !opts.emitPairables && !opts.emitLeftUnpairables && !opts.emitRightUnpairables { 222 fmt.Fprintf(os.Stderr, "%s %s: all emit flags are unset; no output is possible.\n", 223 lib.MlrExeName(), verb) 224 transformerSortUsage(os.Stderr, true, 1) 225 return nil 226 } 227 228 if opts.outputJoinFieldNames == nil { 229 fmt.Fprintf(os.Stderr, "%s %s: need output field names\n", lib.MlrExeName(), verb) 230 transformerSortUsage(os.Stderr, true, 1) 231 return nil 232 } 233 234 if opts.leftJoinFieldNames == nil { 235 opts.leftJoinFieldNames = opts.outputJoinFieldNames // array copy 236 } 237 if opts.rightJoinFieldNames == nil { 238 opts.rightJoinFieldNames = opts.outputJoinFieldNames // array copy 239 } 240 241 llen := len(opts.leftJoinFieldNames) 242 rlen := len(opts.rightJoinFieldNames) 243 olen := len(opts.outputJoinFieldNames) 244 if llen != rlen || llen != olen { 245 fmt.Fprintf(os.Stderr, 246 "%s %s: must have equal left,right,output field-name lists; got lengths %d,%d,%d.\n", 247 lib.MlrExeName(), verb, llen, rlen, olen) 248 os.Exit(1) 249 } 250 251 transformer, _ := NewTransformerJoin(opts) 252 253 *pargi = argi 254 return transformer 255} 256 257// ---------------------------------------------------------------- 258type TransformerJoin struct { 259 opts *tJoinOptions 260 261 leftFieldNameSet map[string]bool 262 rightFieldNameSet map[string]bool 263 264 // For unsorted/half-streaming input 265 ingested bool 266 leftBucketsByJoinFieldValues *lib.OrderedMap 267 leftUnpairableRecordsAndContexts *list.List 268 269 // For sorted/doubly-streaming input 270 joinBucketKeeper *utils.JoinBucketKeeper 271 272 recordTransformerFunc transforming.RecordTransformerFunc 273} 274 275// ---------------------------------------------------------------- 276func NewTransformerJoin( 277 opts *tJoinOptions, 278) (*TransformerJoin, error) { 279 280 this := &TransformerJoin{ 281 opts: opts, 282 283 leftFieldNameSet: lib.StringListToSet(opts.leftJoinFieldNames), 284 rightFieldNameSet: lib.StringListToSet(opts.rightJoinFieldNames), 285 286 ingested: false, 287 leftBucketsByJoinFieldValues: nil, 288 leftUnpairableRecordsAndContexts: nil, 289 joinBucketKeeper: nil, 290 } 291 292 if opts.allowUnsortedInput { 293 // Half-streaming (default) case: ingest entire left file first. 294 295 this.leftUnpairableRecordsAndContexts = list.New() 296 this.leftBucketsByJoinFieldValues = lib.NewOrderedMap() 297 this.recordTransformerFunc = this.transformHalfStreaming 298 299 } else { 300 // Doubly-streaming (non-default) case: step left/right files forward. 301 // Requires both files be sorted on their join keys in order to not 302 // miss anything. This lets people do joins that would otherwise take 303 // too much RAM. 304 305 this.joinBucketKeeper = utils.NewJoinBucketKeeper( 306 // opts.prepipe, 307 opts.leftFileName, 308 &opts.joinReaderOptions, 309 opts.leftJoinFieldNames, 310 ) 311 312 this.recordTransformerFunc = this.transformDoublyStreaming 313 } 314 315 return this, nil 316} 317 318// ---------------------------------------------------------------- 319func (this *TransformerJoin) Transform( 320 inrecAndContext *types.RecordAndContext, 321 outputChannel chan<- *types.RecordAndContext, 322) { 323 this.recordTransformerFunc(inrecAndContext, outputChannel) 324} 325 326// ---------------------------------------------------------------- 327// This is for the half-streaming case. We ingest the entire left file, 328// matching each right record against those. 329func (this *TransformerJoin) transformHalfStreaming( 330 inrecAndContext *types.RecordAndContext, 331 outputChannel chan<- *types.RecordAndContext, 332) { 333 // This can't be done in the CLI-parser since it requires information which 334 // isn't known until after the CLI-parser is called. 335 // 336 // TODO: check if this is still true for the Go port, once everything else 337 // is done. 338 if !this.ingested { // First call 339 this.ingestLeftFile() 340 this.ingested = true 341 } 342 343 if !inrecAndContext.EndOfStream { 344 inrec := inrecAndContext.Record 345 groupingKey, hasAllJoinKeys := inrec.GetSelectedValuesJoined( 346 this.opts.rightJoinFieldNames, 347 ) 348 if hasAllJoinKeys { 349 iLeftBucket := this.leftBucketsByJoinFieldValues.Get(groupingKey) 350 if iLeftBucket == nil { 351 if this.opts.emitRightUnpairables { 352 outputChannel <- inrecAndContext 353 } 354 } else { 355 leftBucket := iLeftBucket.(*utils.JoinBucket) 356 leftBucket.WasPaired = true 357 if this.opts.emitPairables { 358 this.formAndEmitPairs( 359 leftBucket.RecordsAndContexts, 360 inrecAndContext, 361 outputChannel, 362 ) 363 } 364 } 365 } else if this.opts.emitRightUnpairables { 366 outputChannel <- inrecAndContext 367 } 368 369 } else { // end of record stream 370 if this.opts.emitLeftUnpairables { 371 this.emitLeftUnpairedBuckets(outputChannel) 372 this.emitLeftUnpairables(outputChannel) 373 } 374 outputChannel <- inrecAndContext // emit end-of-stream marker 375 } 376} 377 378// ---------------------------------------------------------------- 379func (this *TransformerJoin) transformDoublyStreaming( 380 rightRecAndContext *types.RecordAndContext, 381 outputChannel chan<- *types.RecordAndContext, 382) { 383 keeper := this.joinBucketKeeper // keystroke-saver 384 385 if !rightRecAndContext.EndOfStream { 386 rightRec := rightRecAndContext.Record 387 isPaired := false 388 389 rightFieldValues, hasAllJoinKeys := rightRec.ReferenceSelectedValues( 390 this.opts.rightJoinFieldNames, 391 ) 392 if hasAllJoinKeys { 393 isPaired = keeper.FindJoinBucket(rightFieldValues) 394 } 395 if this.opts.emitLeftUnpairables { 396 keeper.OutputAndReleaseLeftUnpaireds(outputChannel) 397 } else { 398 keeper.ReleaseLeftUnpaireds(outputChannel) 399 } 400 401 lefts := keeper.JoinBucket.RecordsAndContexts // keystroke-saver 402 403 if !isPaired && this.opts.emitRightUnpairables { 404 outputChannel <- rightRecAndContext 405 } 406 407 if isPaired && this.opts.emitPairables && lefts != nil { 408 this.formAndEmitPairs(lefts, rightRecAndContext, outputChannel) 409 } 410 411 } else { // end of record stream 412 keeper.FindJoinBucket(nil) 413 414 if this.opts.emitLeftUnpairables { 415 keeper.OutputAndReleaseLeftUnpaireds(outputChannel) 416 } 417 418 outputChannel <- rightRecAndContext // emit end-of-stream marker 419 } 420} 421 422// ---------------------------------------------------------------- 423// This is for the half-streaming case. We ingest the entire left file, 424// matching each right record against those. 425// 426// Note: this logic is very similar to that in stream.go, which is what 427// processes the main/right files. 428 429func (this *TransformerJoin) ingestLeftFile() { 430 readerOpts := &this.opts.joinReaderOptions 431 432 // Instantiate the record-reader 433 recordReader := input.Create(readerOpts) 434 if recordReader == nil { 435 fmt.Fprintln( 436 os.Stderr, 437 errors.New("Input format not found: "+readerOpts.InputFileFormat), 438 ) 439 os.Exit(1) 440 } 441 442 // Set the initial context for the left-file. 443 // 444 // Since Go is concurrent, the context struct needs to be duplicated and 445 // passed through the channels along with each record. 446 initialContext := types.NewContext(nil) 447 initialContext.UpdateForStartOfFile(this.opts.leftFileName) 448 449 // Set up channels for the record-reader. 450 inputChannel := make(chan *types.RecordAndContext, 10) 451 errorChannel := make(chan error, 1) 452 453 // Start the record reader. 454 // TODO: prepipe 455 leftFileNameArray := [1]string{this.opts.leftFileName} 456 go recordReader.Read(leftFileNameArray[:], *initialContext, inputChannel, errorChannel) 457 458 // Ingest parsed records and bucket them by their join-field values. E.g. 459 // if the join-field is "id" then put all records with id=1 in one bucket, 460 // all those with id=2 in another bucket, etc. And any records lacking an 461 // "id" field go into the unpairable list. 462 done := false 463 for !done { 464 select { 465 466 case err := <-errorChannel: 467 fmt.Fprintln(os.Stderr, lib.MlrExeName(), ": ", err) 468 os.Exit(1) 469 470 case leftrecAndContext := <-inputChannel: 471 if leftrecAndContext.EndOfStream { 472 done = true 473 break // breaks the switch, not the for, in Golang 474 } 475 leftrec := leftrecAndContext.Record 476 477 groupingKey, leftFieldValues, ok := leftrec.GetSelectedValuesAndJoined( 478 this.opts.leftJoinFieldNames, 479 ) 480 if ok { 481 iBucket := this.leftBucketsByJoinFieldValues.Get(groupingKey) 482 if iBucket == nil { // New key-field-value: new bucket and hash-map entry 483 bucket := utils.NewJoinBucket(leftFieldValues) 484 bucket.RecordsAndContexts.PushBack(leftrecAndContext) 485 this.leftBucketsByJoinFieldValues.Put(groupingKey, bucket) 486 } else { // Previously seen key-field-value: append record to bucket 487 bucket := iBucket.(*utils.JoinBucket) 488 bucket.RecordsAndContexts.PushBack(leftrecAndContext) 489 } 490 } else { 491 this.leftUnpairableRecordsAndContexts.PushBack(leftrecAndContext) 492 } 493 } 494 } 495} 496 497// ---------------------------------------------------------------- 498// This helper method is used by the half-streaming/unsorted join, as well as 499// the doubly-streaming/sorted join. 500 501func (this *TransformerJoin) formAndEmitPairs( 502 leftRecordsAndContexts *list.List, 503 rightRecordAndContext *types.RecordAndContext, 504 outputChannel chan<- *types.RecordAndContext, 505) { 506 ////fmt.Println("-- pairs start") // VERBOSE 507 // Loop over each to-be-paired-with record from the left file. 508 for pe := leftRecordsAndContexts.Front(); pe != nil; pe = pe.Next() { 509 ////fmt.Println("-- pairs pe") // VERBOSE 510 leftRecordAndContext := pe.Value.(*types.RecordAndContext) 511 leftrec := leftRecordAndContext.Record 512 rightrec := rightRecordAndContext.Record 513 514 // Allocate a new output record which is the join of the left and right records. 515 outrec := types.NewMlrmapAsRecord() 516 517 // Add the joined-on fields to the new output record 518 n := len(this.opts.leftJoinFieldNames) 519 for i := 0; i < n; i++ { 520 // These arrays are already guaranteed same-length by CLI parser 521 leftJoinFieldName := this.opts.leftJoinFieldNames[i] 522 outputJoinFieldName := this.opts.outputJoinFieldNames[i] 523 value := leftrec.Get(leftJoinFieldName) 524 if value != nil { 525 outrec.PutCopy(outputJoinFieldName, value) 526 } 527 } 528 529 // Add the left-record fields not already added 530 for pl := leftrec.Head; pl != nil; pl = pl.Next { 531 _, ok := this.leftFieldNameSet[pl.Key] 532 if !ok { 533 key := this.opts.leftPrefix + pl.Key 534 outrec.PutCopy(key, pl.Value) 535 } 536 } 537 538 // Add the right-record fields not already added 539 for pr := rightrec.Head; pr != nil; pr = pr.Next { 540 _, ok := this.rightFieldNameSet[pr.Key] 541 if !ok { 542 key := this.opts.rightPrefix + pr.Key 543 outrec.PutCopy(key, pr.Value) 544 } 545 } 546 ////fmt.Println("-- pairs outrec") // VERBOSE 547 ////outrec.Print() // VERBOSE 548 549 // Clone the right record's context (NR, FILENAME, etc) to use for the new output record 550 context := rightRecordAndContext.Context // struct copy 551 outrecAndContext := types.NewRecordAndContext(outrec, &context) 552 553 // Emit the new joined record on the downstream channel 554 outputChannel <- outrecAndContext 555 } 556 ////fmt.Println("-- pairs end") // VERBOSE 557} 558 559// ---------------------------------------------------------------- 560// There are two kinds of left non-pair records: (a) those lacking the 561// specified join-keys -- can't possibly pair with anything on the right; (b) 562// those having the join-keys but not matching with a record on the right. 563// 564// Example: join on "id" field. Records lacking an "id" field are in the first 565// category. Now suppose there's a left record with id=0, but there were three 566// right-file records with id-field values 1,2,3. Then the id=0 left records is 567// in the second category. 568 569func (this *TransformerJoin) emitLeftUnpairables( 570 outputChannel chan<- *types.RecordAndContext, 571) { 572 // Loop over each to-be-paired-with record from the left file. 573 for pe := this.leftUnpairableRecordsAndContexts.Front(); pe != nil; pe = pe.Next() { 574 leftRecordAndContext := pe.Value.(*types.RecordAndContext) 575 outputChannel <- leftRecordAndContext 576 } 577} 578 579func (this *TransformerJoin) emitLeftUnpairedBuckets( 580 outputChannel chan<- *types.RecordAndContext, 581) { 582 for pe := this.leftBucketsByJoinFieldValues.Head; pe != nil; pe = pe.Next { 583 bucket := pe.Value.(*utils.JoinBucket) 584 if !bucket.WasPaired { 585 for pf := bucket.RecordsAndContexts.Front(); pf != nil; pf = pf.Next() { 586 recordAndContext := pf.Value.(*types.RecordAndContext) 587 outputChannel <- recordAndContext 588 } 589 } 590 } 591} 592