1package tsdb 2 3import ( 4 "bytes" 5 "errors" 6 "fmt" 7 "io" 8 "os" 9 "regexp" 10 "sort" 11 "sync" 12 13 "github.com/influxdata/influxdb/models" 14 "github.com/influxdata/influxdb/pkg/bytesutil" 15 "github.com/influxdata/influxdb/pkg/estimator" 16 "github.com/influxdata/influxdb/pkg/slices" 17 "github.com/influxdata/influxdb/query" 18 "github.com/influxdata/influxql" 19 "go.uber.org/zap" 20) 21 22// Available index types. 23const ( 24 InmemIndexName = "inmem" 25 TSI1IndexName = "tsi1" 26) 27 28// ErrIndexClosing can be returned to from an Index method if the index is currently closing. 29var ErrIndexClosing = errors.New("index is closing") 30 31type Index interface { 32 Open() error 33 Close() error 34 WithLogger(*zap.Logger) 35 36 Database() string 37 MeasurementExists(name []byte) (bool, error) 38 MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) 39 DropMeasurement(name []byte) error 40 ForEachMeasurementName(fn func(name []byte) error) error 41 42 InitializeSeries(keys, names [][]byte, tags []models.Tags) error 43 CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error 44 CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error 45 DropSeries(seriesID uint64, key []byte, cascade bool) error 46 DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error 47 DropMeasurementIfSeriesNotExist(name []byte) (bool, error) 48 49 // Used to clean up series in inmem index that were dropped with a shard. 50 DropSeriesGlobal(key []byte) error 51 52 MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) 53 SeriesN() int64 54 SeriesSketches() (estimator.Sketch, estimator.Sketch, error) 55 SeriesIDSet() *SeriesIDSet 56 57 HasTagKey(name, key []byte) (bool, error) 58 HasTagValue(name, key, value []byte) (bool, error) 59 60 MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) 61 62 TagKeyCardinality(name, key []byte) int 63 64 // InfluxQL system iterators 65 MeasurementIterator() (MeasurementIterator, error) 66 TagKeyIterator(name []byte) (TagKeyIterator, error) 67 TagValueIterator(name, key []byte) (TagValueIterator, error) 68 MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) 69 TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) 70 TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) 71 72 // Sets a shared fieldset from the engine. 73 FieldSet() *MeasurementFieldSet 74 SetFieldSet(fs *MeasurementFieldSet) 75 76 // Size of the index on disk, if applicable. 77 DiskSizeBytes() int64 78 79 // Bytes estimates the memory footprint of this Index, in bytes. 80 Bytes() int 81 82 // To be removed w/ tsi1. 83 SetFieldName(measurement []byte, name string) 84 85 Type() string 86 // Returns a unique reference ID to the index instance. 87 // For inmem, returns a reference to the backing Index, not ShardIndex. 88 UniqueReferenceID() uintptr 89 90 Rebuild() 91} 92 93// SeriesElem represents a generic series element. 94type SeriesElem interface { 95 Name() []byte 96 Tags() models.Tags 97 Deleted() bool 98 99 // InfluxQL expression associated with series during filtering. 100 Expr() influxql.Expr 101} 102 103// SeriesIterator represents a iterator over a list of series. 104type SeriesIterator interface { 105 Close() error 106 Next() (SeriesElem, error) 107} 108 109// NewSeriesIteratorAdapter returns an adapter for converting series ids to series. 110func NewSeriesIteratorAdapter(sfile *SeriesFile, itr SeriesIDIterator) SeriesIterator { 111 return &seriesIteratorAdapter{ 112 sfile: sfile, 113 itr: itr, 114 } 115} 116 117type seriesIteratorAdapter struct { 118 sfile *SeriesFile 119 itr SeriesIDIterator 120} 121 122func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() } 123 124func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) { 125 for { 126 elem, err := itr.itr.Next() 127 if err != nil { 128 return nil, err 129 } else if elem.SeriesID == 0 { 130 return nil, nil 131 } 132 133 // Skip if this key has been tombstoned. 134 key := itr.sfile.SeriesKey(elem.SeriesID) 135 if len(key) == 0 { 136 continue 137 } 138 139 name, tags := ParseSeriesKey(key) 140 deleted := itr.sfile.IsDeleted(elem.SeriesID) 141 return &seriesElemAdapter{ 142 name: name, 143 tags: tags, 144 deleted: deleted, 145 expr: elem.Expr, 146 }, nil 147 } 148} 149 150type seriesElemAdapter struct { 151 name []byte 152 tags models.Tags 153 deleted bool 154 expr influxql.Expr 155} 156 157func (e *seriesElemAdapter) Name() []byte { return e.name } 158func (e *seriesElemAdapter) Tags() models.Tags { return e.tags } 159func (e *seriesElemAdapter) Deleted() bool { return e.deleted } 160func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr } 161 162// SeriesIDElem represents a single series and optional expression. 163type SeriesIDElem struct { 164 SeriesID uint64 165 Expr influxql.Expr 166} 167 168// SeriesIDElems represents a list of series id elements. 169type SeriesIDElems []SeriesIDElem 170 171func (a SeriesIDElems) Len() int { return len(a) } 172func (a SeriesIDElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 173func (a SeriesIDElems) Less(i, j int) bool { return a[i].SeriesID < a[j].SeriesID } 174 175// SeriesIDIterator represents a iterator over a list of series ids. 176type SeriesIDIterator interface { 177 Next() (SeriesIDElem, error) 178 Close() error 179} 180 181// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet. 182type SeriesIDSetIterator interface { 183 SeriesIDIterator 184 SeriesIDSet() *SeriesIDSet 185} 186 187type seriesIDSetIterator struct { 188 ss *SeriesIDSet 189 itr SeriesIDSetIterable 190 closer io.Closer 191} 192 193func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator { 194 if ss == nil || ss.bitmap == nil { 195 return nil 196 } 197 return &seriesIDSetIterator{ss: ss, itr: ss.Iterator()} 198} 199 200func NewSeriesIDSetIteratorWithCloser(ss *SeriesIDSet, closer io.Closer) SeriesIDSetIterator { 201 if ss == nil || ss.bitmap == nil { 202 return nil 203 } 204 return &seriesIDSetIterator{ss: ss, itr: ss.Iterator(), closer: closer} 205} 206 207func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) { 208 if !itr.itr.HasNext() { 209 return SeriesIDElem{}, nil 210 } 211 return SeriesIDElem{SeriesID: uint64(itr.itr.Next())}, nil 212} 213 214func (itr *seriesIDSetIterator) Close() error { 215 if itr.closer != nil { 216 return itr.closer.Close() 217 } 218 return nil 219} 220 221func (itr *seriesIDSetIterator) SeriesIDSet() *SeriesIDSet { return itr.ss } 222 223type SeriesIDSetIterators []SeriesIDSetIterator 224 225func (a SeriesIDSetIterators) Close() (err error) { 226 for i := range a { 227 if e := a[i].Close(); e != nil && err == nil { 228 err = e 229 } 230 } 231 return err 232} 233 234// NewSeriesIDSetIterators returns a slice of SeriesIDSetIterator if all itrs 235// can be type casted. Otherwise returns nil. 236func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator { 237 if len(itrs) == 0 { 238 return nil 239 } 240 241 a := make([]SeriesIDSetIterator, len(itrs)) 242 for i := range itrs { 243 if itr, ok := itrs[i].(SeriesIDSetIterator); ok { 244 a[i] = itr 245 } else { 246 return nil 247 } 248 } 249 return a 250} 251 252// ReadAllSeriesIDIterator returns all ids from the iterator. 253func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) { 254 if itr == nil { 255 return nil, nil 256 } 257 258 var a []uint64 259 for { 260 e, err := itr.Next() 261 if err != nil { 262 return nil, err 263 } else if e.SeriesID == 0 { 264 break 265 } 266 a = append(a, e.SeriesID) 267 } 268 return a, nil 269} 270 271// NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice. 272func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator { 273 return &SeriesIDSliceIterator{ids: ids} 274} 275 276// SeriesIDSliceIterator iterates over a slice of series ids. 277type SeriesIDSliceIterator struct { 278 ids []uint64 279} 280 281// Next returns the next series id in the slice. 282func (itr *SeriesIDSliceIterator) Next() (SeriesIDElem, error) { 283 if len(itr.ids) == 0 { 284 return SeriesIDElem{}, nil 285 } 286 id := itr.ids[0] 287 itr.ids = itr.ids[1:] 288 return SeriesIDElem{SeriesID: id}, nil 289} 290 291func (itr *SeriesIDSliceIterator) Close() error { return nil } 292 293// SeriesIDSet returns a set of all remaining ids. 294func (itr *SeriesIDSliceIterator) SeriesIDSet() *SeriesIDSet { 295 s := NewSeriesIDSet() 296 for _, id := range itr.ids { 297 s.AddNoLock(id) 298 } 299 return s 300} 301 302type SeriesIDIterators []SeriesIDIterator 303 304func (a SeriesIDIterators) Close() (err error) { 305 for i := range a { 306 if e := a[i].Close(); e != nil && err == nil { 307 err = e 308 } 309 } 310 return err 311} 312 313func (a SeriesIDIterators) filterNonNil() []SeriesIDIterator { 314 other := make([]SeriesIDIterator, 0, len(a)) 315 for _, itr := range a { 316 if itr == nil { 317 continue 318 } 319 other = append(other, itr) 320 } 321 return other 322} 323 324// seriesQueryAdapterIterator adapts SeriesIDIterator to an influxql.Iterator. 325type seriesQueryAdapterIterator struct { 326 once sync.Once 327 sfile *SeriesFile 328 itr SeriesIDIterator 329 fieldset *MeasurementFieldSet 330 opt query.IteratorOptions 331 332 point query.FloatPoint // reusable point 333} 334 335// NewSeriesQueryAdapterIterator returns a new instance of SeriesQueryAdapterIterator. 336func NewSeriesQueryAdapterIterator(sfile *SeriesFile, itr SeriesIDIterator, fieldset *MeasurementFieldSet, opt query.IteratorOptions) query.Iterator { 337 return &seriesQueryAdapterIterator{ 338 sfile: sfile, 339 itr: itr, 340 fieldset: fieldset, 341 point: query.FloatPoint{ 342 Aux: make([]interface{}, len(opt.Aux)), 343 }, 344 opt: opt, 345 } 346} 347 348// Stats returns stats about the points processed. 349func (itr *seriesQueryAdapterIterator) Stats() query.IteratorStats { return query.IteratorStats{} } 350 351// Close closes the iterator. 352func (itr *seriesQueryAdapterIterator) Close() error { 353 itr.once.Do(func() { 354 itr.itr.Close() 355 }) 356 return nil 357} 358 359// Next emits the next point in the iterator. 360func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) { 361 for { 362 // Read next series element. 363 e, err := itr.itr.Next() 364 if err != nil { 365 return nil, err 366 } else if e.SeriesID == 0 { 367 return nil, nil 368 } 369 370 // Skip if key has been tombstoned. 371 seriesKey := itr.sfile.SeriesKey(e.SeriesID) 372 if len(seriesKey) == 0 { 373 continue 374 } 375 376 // Convert to a key. 377 name, tags := ParseSeriesKey(seriesKey) 378 key := string(models.MakeKey(name, tags)) 379 380 // Write auxiliary fields. 381 for i, f := range itr.opt.Aux { 382 switch f.Val { 383 case "key": 384 itr.point.Aux[i] = key 385 } 386 } 387 return &itr.point, nil 388 } 389} 390 391// filterUndeletedSeriesIDIterator returns all series which are not deleted. 392type filterUndeletedSeriesIDIterator struct { 393 sfile *SeriesFile 394 itr SeriesIDIterator 395} 396 397// FilterUndeletedSeriesIDIterator returns an iterator which filters all deleted series. 398func FilterUndeletedSeriesIDIterator(sfile *SeriesFile, itr SeriesIDIterator) SeriesIDIterator { 399 if itr == nil { 400 return nil 401 } 402 return &filterUndeletedSeriesIDIterator{sfile: sfile, itr: itr} 403} 404 405func (itr *filterUndeletedSeriesIDIterator) Close() error { 406 return itr.itr.Close() 407} 408 409func (itr *filterUndeletedSeriesIDIterator) Next() (SeriesIDElem, error) { 410 for { 411 e, err := itr.itr.Next() 412 if err != nil { 413 return SeriesIDElem{}, err 414 } else if e.SeriesID == 0 { 415 return SeriesIDElem{}, nil 416 } else if itr.sfile.IsDeleted(e.SeriesID) { 417 continue 418 } 419 return e, nil 420 } 421} 422 423// seriesIDExprIterator is an iterator that attaches an associated expression. 424type seriesIDExprIterator struct { 425 itr SeriesIDIterator 426 expr influxql.Expr 427} 428 429// newSeriesIDExprIterator returns a new instance of seriesIDExprIterator. 430func newSeriesIDExprIterator(itr SeriesIDIterator, expr influxql.Expr) SeriesIDIterator { 431 if itr == nil { 432 return nil 433 } 434 435 return &seriesIDExprIterator{ 436 itr: itr, 437 expr: expr, 438 } 439} 440 441func (itr *seriesIDExprIterator) Close() error { 442 return itr.itr.Close() 443} 444 445// Next returns the next element in the iterator. 446func (itr *seriesIDExprIterator) Next() (SeriesIDElem, error) { 447 elem, err := itr.itr.Next() 448 if err != nil { 449 return SeriesIDElem{}, err 450 } else if elem.SeriesID == 0 { 451 return SeriesIDElem{}, nil 452 } 453 elem.Expr = itr.expr 454 return elem, nil 455} 456 457// MergeSeriesIDIterators returns an iterator that merges a set of iterators. 458// Iterators that are first in the list take precedence and a deletion by those 459// early iterators will invalidate elements by later iterators. 460func MergeSeriesIDIterators(itrs ...SeriesIDIterator) SeriesIDIterator { 461 if n := len(itrs); n == 0 { 462 return nil 463 } else if n == 1 { 464 return itrs[0] 465 } 466 itrs = SeriesIDIterators(itrs).filterNonNil() 467 468 // Merge as series id sets, if available. 469 if a := NewSeriesIDSetIterators(itrs); a != nil { 470 sets := make([]*SeriesIDSet, len(a)) 471 for i := range a { 472 sets[i] = a[i].SeriesIDSet() 473 } 474 475 ss := NewSeriesIDSet() 476 ss.Merge(sets...) 477 478 // Attach underlying iterators as the closer 479 return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a)) 480 } 481 482 return &seriesIDMergeIterator{ 483 buf: make([]SeriesIDElem, len(itrs)), 484 itrs: itrs, 485 } 486} 487 488// seriesIDMergeIterator is an iterator that merges multiple iterators together. 489type seriesIDMergeIterator struct { 490 buf []SeriesIDElem 491 itrs []SeriesIDIterator 492} 493 494func (itr *seriesIDMergeIterator) Close() error { 495 SeriesIDIterators(itr.itrs).Close() 496 return nil 497} 498 499// Next returns the element with the next lowest name/tags across the iterators. 500func (itr *seriesIDMergeIterator) Next() (SeriesIDElem, error) { 501 // Find next lowest id amongst the buffers. 502 var elem SeriesIDElem 503 for i := range itr.buf { 504 buf := &itr.buf[i] 505 506 // Fill buffer. 507 if buf.SeriesID == 0 { 508 elem, err := itr.itrs[i].Next() 509 if err != nil { 510 return SeriesIDElem{}, nil 511 } else if elem.SeriesID == 0 { 512 continue 513 } 514 itr.buf[i] = elem 515 } 516 517 if elem.SeriesID == 0 || buf.SeriesID < elem.SeriesID { 518 elem = *buf 519 } 520 } 521 522 // Return EOF if no elements remaining. 523 if elem.SeriesID == 0 { 524 return SeriesIDElem{}, nil 525 } 526 527 // Clear matching buffers. 528 for i := range itr.buf { 529 if itr.buf[i].SeriesID == elem.SeriesID { 530 itr.buf[i].SeriesID = 0 531 } 532 } 533 return elem, nil 534} 535 536// IntersectSeriesIDIterators returns an iterator that only returns series which 537// occur in both iterators. If both series have associated expressions then 538// they are combined together. 539func IntersectSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { 540 if itr0 == nil || itr1 == nil { 541 if itr0 != nil { 542 itr0.Close() 543 } 544 if itr1 != nil { 545 itr1.Close() 546 } 547 return nil 548 } 549 550 // Create series id set, if available. 551 if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { 552 return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().And(a[1].SeriesIDSet()), SeriesIDSetIterators(a)) 553 } 554 555 return &seriesIDIntersectIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} 556} 557 558// seriesIDIntersectIterator is an iterator that merges two iterators together. 559type seriesIDIntersectIterator struct { 560 buf [2]SeriesIDElem 561 itrs [2]SeriesIDIterator 562} 563 564func (itr *seriesIDIntersectIterator) Close() (err error) { 565 if e := itr.itrs[0].Close(); e != nil && err == nil { 566 err = e 567 } 568 if e := itr.itrs[1].Close(); e != nil && err == nil { 569 err = e 570 } 571 return err 572} 573 574// Next returns the next element which occurs in both iterators. 575func (itr *seriesIDIntersectIterator) Next() (_ SeriesIDElem, err error) { 576 for { 577 // Fill buffers. 578 if itr.buf[0].SeriesID == 0 { 579 if itr.buf[0], err = itr.itrs[0].Next(); err != nil { 580 return SeriesIDElem{}, err 581 } 582 } 583 if itr.buf[1].SeriesID == 0 { 584 if itr.buf[1], err = itr.itrs[1].Next(); err != nil { 585 return SeriesIDElem{}, err 586 } 587 } 588 589 // Exit if either buffer is still empty. 590 if itr.buf[0].SeriesID == 0 || itr.buf[1].SeriesID == 0 { 591 return SeriesIDElem{}, nil 592 } 593 594 // Skip if both series are not equal. 595 if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b { 596 itr.buf[0].SeriesID = 0 597 continue 598 } else if a > b { 599 itr.buf[1].SeriesID = 0 600 continue 601 } 602 603 // Merge series together if equal. 604 elem := itr.buf[0] 605 606 // Attach expression. 607 expr0 := itr.buf[0].Expr 608 expr1 := itr.buf[1].Expr 609 if expr0 == nil { 610 elem.Expr = expr1 611 } else if expr1 == nil { 612 elem.Expr = expr0 613 } else { 614 elem.Expr = influxql.Reduce(&influxql.BinaryExpr{ 615 Op: influxql.AND, 616 LHS: expr0, 617 RHS: expr1, 618 }, nil) 619 } 620 621 itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0 622 return elem, nil 623 } 624} 625 626// UnionSeriesIDIterators returns an iterator that returns series from both 627// both iterators. If both series have associated expressions then they are 628// combined together. 629func UnionSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { 630 // Return other iterator if either one is nil. 631 if itr0 == nil { 632 return itr1 633 } else if itr1 == nil { 634 return itr0 635 } 636 637 // Create series id set, if available. 638 if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { 639 ss := NewSeriesIDSet() 640 ss.Merge(a[0].SeriesIDSet(), a[1].SeriesIDSet()) 641 return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a)) 642 } 643 644 return &seriesIDUnionIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} 645} 646 647// seriesIDUnionIterator is an iterator that unions two iterators together. 648type seriesIDUnionIterator struct { 649 buf [2]SeriesIDElem 650 itrs [2]SeriesIDIterator 651} 652 653func (itr *seriesIDUnionIterator) Close() (err error) { 654 if e := itr.itrs[0].Close(); e != nil && err == nil { 655 err = e 656 } 657 if e := itr.itrs[1].Close(); e != nil && err == nil { 658 err = e 659 } 660 return err 661} 662 663// Next returns the next element which occurs in both iterators. 664func (itr *seriesIDUnionIterator) Next() (_ SeriesIDElem, err error) { 665 // Fill buffers. 666 if itr.buf[0].SeriesID == 0 { 667 if itr.buf[0], err = itr.itrs[0].Next(); err != nil { 668 return SeriesIDElem{}, err 669 } 670 } 671 if itr.buf[1].SeriesID == 0 { 672 if itr.buf[1], err = itr.itrs[1].Next(); err != nil { 673 return SeriesIDElem{}, err 674 } 675 } 676 677 // Return non-zero or lesser series. 678 if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a == 0 && b == 0 { 679 return SeriesIDElem{}, nil 680 } else if b == 0 || (a != 0 && a < b) { 681 elem := itr.buf[0] 682 itr.buf[0].SeriesID = 0 683 return elem, nil 684 } else if a == 0 || (b != 0 && a > b) { 685 elem := itr.buf[1] 686 itr.buf[1].SeriesID = 0 687 return elem, nil 688 } 689 690 // Attach element. 691 elem := itr.buf[0] 692 693 // Attach expression. 694 expr0 := itr.buf[0].Expr 695 expr1 := itr.buf[1].Expr 696 if expr0 != nil && expr1 != nil { 697 elem.Expr = influxql.Reduce(&influxql.BinaryExpr{ 698 Op: influxql.OR, 699 LHS: expr0, 700 RHS: expr1, 701 }, nil) 702 } else { 703 elem.Expr = nil 704 } 705 706 itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0 707 return elem, nil 708} 709 710// DifferenceSeriesIDIterators returns an iterator that only returns series which 711// occur the first iterator but not the second iterator. 712func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { 713 if itr0 == nil && itr1 == nil { 714 return nil 715 } else if itr1 == nil { 716 return itr0 717 } else if itr0 == nil { 718 itr1.Close() 719 return nil 720 } 721 722 // Create series id set, if available. 723 if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { 724 return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()), SeriesIDSetIterators(a)) 725 } 726 727 return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} 728} 729 730// seriesIDDifferenceIterator is an iterator that merges two iterators together. 731type seriesIDDifferenceIterator struct { 732 buf [2]SeriesIDElem 733 itrs [2]SeriesIDIterator 734} 735 736func (itr *seriesIDDifferenceIterator) Close() (err error) { 737 if e := itr.itrs[0].Close(); e != nil && err == nil { 738 err = e 739 } 740 if e := itr.itrs[1].Close(); e != nil && err == nil { 741 err = e 742 } 743 return err 744} 745 746// Next returns the next element which occurs only in the first iterator. 747func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) { 748 for { 749 // Fill buffers. 750 if itr.buf[0].SeriesID == 0 { 751 if itr.buf[0], err = itr.itrs[0].Next(); err != nil { 752 return SeriesIDElem{}, err 753 } 754 } 755 if itr.buf[1].SeriesID == 0 { 756 if itr.buf[1], err = itr.itrs[1].Next(); err != nil { 757 return SeriesIDElem{}, err 758 } 759 } 760 761 // Exit if first buffer is still empty. 762 if itr.buf[0].SeriesID == 0 { 763 return SeriesIDElem{}, nil 764 } else if itr.buf[1].SeriesID == 0 { 765 elem := itr.buf[0] 766 itr.buf[0].SeriesID = 0 767 return elem, nil 768 } 769 770 // Return first series if it's less. 771 // If second series is less then skip it. 772 // If both series are equal then skip both. 773 if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b { 774 elem := itr.buf[0] 775 itr.buf[0].SeriesID = 0 776 return elem, nil 777 } else if a > b { 778 itr.buf[1].SeriesID = 0 779 continue 780 } else { 781 itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0 782 continue 783 } 784 } 785} 786 787// seriesPointIterator adapts SeriesIterator to an influxql.Iterator. 788type seriesPointIterator struct { 789 once sync.Once 790 indexSet IndexSet 791 mitr MeasurementIterator 792 keys [][]byte 793 opt query.IteratorOptions 794 795 point query.FloatPoint // reusable point 796} 797 798// newSeriesPointIterator returns a new instance of seriesPointIterator. 799func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) { 800 // Only equality operators are allowed. 801 influxql.WalkFunc(opt.Condition, func(n influxql.Node) { 802 switch n := n.(type) { 803 case *influxql.BinaryExpr: 804 switch n.Op { 805 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, 806 influxql.OR, influxql.AND: 807 default: 808 err = errors.New("invalid tag comparison operator") 809 } 810 } 811 }) 812 if err != nil { 813 return nil, err 814 } 815 816 mitr, err := indexSet.MeasurementIterator() 817 if err != nil { 818 return nil, err 819 } 820 821 return &seriesPointIterator{ 822 indexSet: indexSet, 823 mitr: mitr, 824 point: query.FloatPoint{ 825 Aux: make([]interface{}, len(opt.Aux)), 826 }, 827 opt: opt, 828 }, nil 829} 830 831// Stats returns stats about the points processed. 832func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.IteratorStats{} } 833 834// Close closes the iterator. 835func (itr *seriesPointIterator) Close() (err error) { 836 itr.once.Do(func() { 837 if itr.mitr != nil { 838 err = itr.mitr.Close() 839 } 840 }) 841 return err 842} 843 844// Next emits the next point in the iterator. 845func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { 846 for { 847 // Read series keys for next measurement if no more keys remaining. 848 // Exit if there are no measurements remaining. 849 if len(itr.keys) == 0 { 850 m, err := itr.mitr.Next() 851 if err != nil { 852 return nil, err 853 } else if m == nil { 854 return nil, nil 855 } 856 857 if err := itr.readSeriesKeys(m); err != nil { 858 return nil, err 859 } 860 continue 861 } 862 863 name, tags := ParseSeriesKey(itr.keys[0]) 864 itr.keys = itr.keys[1:] 865 866 // TODO(edd): It seems to me like this authorisation check should be 867 // further down in the index. At this point we're going to be filtering 868 // series that have already been materialised in the LogFiles and 869 // IndexFiles. 870 if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) { 871 continue 872 } 873 874 // Convert to a key. 875 key := string(models.MakeKey(name, tags)) 876 877 // Write auxiliary fields. 878 for i, f := range itr.opt.Aux { 879 switch f.Val { 880 case "key": 881 itr.point.Aux[i] = key 882 } 883 } 884 885 return &itr.point, nil 886 } 887} 888 889func (itr *seriesPointIterator) readSeriesKeys(name []byte) error { 890 sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition) 891 if err != nil { 892 return err 893 } else if sitr == nil { 894 return nil 895 } 896 defer sitr.Close() 897 898 // Slurp all series keys. 899 itr.keys = itr.keys[:0] 900 for i := 0; ; i++ { 901 elem, err := sitr.Next() 902 if err != nil { 903 return err 904 } else if elem.SeriesID == 0 { 905 break 906 } 907 908 // Periodically check for interrupt. 909 if i&0xFF == 0xFF { 910 select { 911 case <-itr.opt.InterruptCh: 912 return itr.Close() 913 default: 914 } 915 } 916 917 key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID) 918 if len(key) == 0 { 919 continue 920 } 921 itr.keys = append(itr.keys, key) 922 } 923 924 // Sort keys. 925 sort.Sort(seriesKeys(itr.keys)) 926 return nil 927} 928 929// MeasurementIterator represents a iterator over a list of measurements. 930type MeasurementIterator interface { 931 Close() error 932 Next() ([]byte, error) 933} 934 935type MeasurementIterators []MeasurementIterator 936 937func (a MeasurementIterators) Close() (err error) { 938 for i := range a { 939 if e := a[i].Close(); e != nil && err == nil { 940 err = e 941 } 942 } 943 return err 944} 945 946type measurementSliceIterator struct { 947 names [][]byte 948} 949 950// NewMeasurementSliceIterator returns an iterator over a slice of in-memory measurement names. 951func NewMeasurementSliceIterator(names [][]byte) *measurementSliceIterator { 952 return &measurementSliceIterator{names: names} 953} 954 955func (itr *measurementSliceIterator) Close() (err error) { return nil } 956 957func (itr *measurementSliceIterator) Next() (name []byte, err error) { 958 if len(itr.names) == 0 { 959 return nil, nil 960 } 961 name, itr.names = itr.names[0], itr.names[1:] 962 return name, nil 963} 964 965// MergeMeasurementIterators returns an iterator that merges a set of iterators. 966// Iterators that are first in the list take precedence and a deletion by those 967// early iterators will invalidate elements by later iterators. 968func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator { 969 if len(itrs) == 0 { 970 return nil 971 } else if len(itrs) == 1 { 972 return itrs[0] 973 } 974 975 return &measurementMergeIterator{ 976 buf: make([][]byte, len(itrs)), 977 itrs: itrs, 978 } 979} 980 981type measurementMergeIterator struct { 982 buf [][]byte 983 itrs []MeasurementIterator 984} 985 986func (itr *measurementMergeIterator) Close() (err error) { 987 for i := range itr.itrs { 988 if e := itr.itrs[i].Close(); e != nil && err == nil { 989 err = e 990 } 991 } 992 return err 993} 994 995// Next returns the element with the next lowest name across the iterators. 996// 997// If multiple iterators contain the same name then the first is returned 998// and the remaining ones are skipped. 999func (itr *measurementMergeIterator) Next() (_ []byte, err error) { 1000 // Find next lowest name amongst the buffers. 1001 var name []byte 1002 for i, buf := range itr.buf { 1003 // Fill buffer if empty. 1004 if buf == nil { 1005 if buf, err = itr.itrs[i].Next(); err != nil { 1006 return nil, err 1007 } else if buf != nil { 1008 itr.buf[i] = buf 1009 } else { 1010 continue 1011 } 1012 } 1013 1014 // Find next lowest name. 1015 if name == nil || bytes.Compare(itr.buf[i], name) == -1 { 1016 name = itr.buf[i] 1017 } 1018 } 1019 1020 // Return nil if no elements remaining. 1021 if name == nil { 1022 return nil, nil 1023 } 1024 1025 // Merge all elements together and clear buffers. 1026 for i, buf := range itr.buf { 1027 if buf == nil || !bytes.Equal(buf, name) { 1028 continue 1029 } 1030 itr.buf[i] = nil 1031 } 1032 return name, nil 1033} 1034 1035// TagKeyIterator represents a iterator over a list of tag keys. 1036type TagKeyIterator interface { 1037 Close() error 1038 Next() ([]byte, error) 1039} 1040 1041type TagKeyIterators []TagKeyIterator 1042 1043func (a TagKeyIterators) Close() (err error) { 1044 for i := range a { 1045 if e := a[i].Close(); e != nil && err == nil { 1046 err = e 1047 } 1048 } 1049 return err 1050} 1051 1052// NewTagKeySliceIterator returns a TagKeyIterator that iterates over a slice. 1053func NewTagKeySliceIterator(keys [][]byte) *tagKeySliceIterator { 1054 return &tagKeySliceIterator{keys: keys} 1055} 1056 1057// tagKeySliceIterator iterates over a slice of tag keys. 1058type tagKeySliceIterator struct { 1059 keys [][]byte 1060} 1061 1062// Next returns the next tag key in the slice. 1063func (itr *tagKeySliceIterator) Next() ([]byte, error) { 1064 if len(itr.keys) == 0 { 1065 return nil, nil 1066 } 1067 key := itr.keys[0] 1068 itr.keys = itr.keys[1:] 1069 return key, nil 1070} 1071 1072func (itr *tagKeySliceIterator) Close() error { return nil } 1073 1074// MergeTagKeyIterators returns an iterator that merges a set of iterators. 1075func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator { 1076 if len(itrs) == 0 { 1077 return nil 1078 } else if len(itrs) == 1 { 1079 return itrs[0] 1080 } 1081 1082 return &tagKeyMergeIterator{ 1083 buf: make([][]byte, len(itrs)), 1084 itrs: itrs, 1085 } 1086} 1087 1088type tagKeyMergeIterator struct { 1089 buf [][]byte 1090 itrs []TagKeyIterator 1091} 1092 1093func (itr *tagKeyMergeIterator) Close() error { 1094 for i := range itr.itrs { 1095 itr.itrs[i].Close() 1096 } 1097 return nil 1098} 1099 1100// Next returns the element with the next lowest key across the iterators. 1101// 1102// If multiple iterators contain the same key then the first is returned 1103// and the remaining ones are skipped. 1104func (itr *tagKeyMergeIterator) Next() (_ []byte, err error) { 1105 // Find next lowest key amongst the buffers. 1106 var key []byte 1107 for i, buf := range itr.buf { 1108 // Fill buffer. 1109 if buf == nil { 1110 if buf, err = itr.itrs[i].Next(); err != nil { 1111 return nil, err 1112 } else if buf != nil { 1113 itr.buf[i] = buf 1114 } else { 1115 continue 1116 } 1117 } 1118 1119 // Find next lowest key. 1120 if key == nil || bytes.Compare(buf, key) == -1 { 1121 key = buf 1122 } 1123 } 1124 1125 // Return nil if no elements remaining. 1126 if key == nil { 1127 return nil, nil 1128 } 1129 1130 // Merge elements and clear buffers. 1131 for i, buf := range itr.buf { 1132 if buf == nil || !bytes.Equal(buf, key) { 1133 continue 1134 } 1135 itr.buf[i] = nil 1136 } 1137 return key, nil 1138} 1139 1140// TagValueIterator represents a iterator over a list of tag values. 1141type TagValueIterator interface { 1142 Close() error 1143 Next() ([]byte, error) 1144} 1145 1146type TagValueIterators []TagValueIterator 1147 1148func (a TagValueIterators) Close() (err error) { 1149 for i := range a { 1150 if e := a[i].Close(); e != nil && err == nil { 1151 err = e 1152 } 1153 } 1154 return err 1155} 1156 1157// NewTagValueSliceIterator returns a TagValueIterator that iterates over a slice. 1158func NewTagValueSliceIterator(values [][]byte) *tagValueSliceIterator { 1159 return &tagValueSliceIterator{values: values} 1160} 1161 1162// tagValueSliceIterator iterates over a slice of tag values. 1163type tagValueSliceIterator struct { 1164 values [][]byte 1165} 1166 1167// Next returns the next tag value in the slice. 1168func (itr *tagValueSliceIterator) Next() ([]byte, error) { 1169 if len(itr.values) == 0 { 1170 return nil, nil 1171 } 1172 value := itr.values[0] 1173 itr.values = itr.values[1:] 1174 return value, nil 1175} 1176 1177func (itr *tagValueSliceIterator) Close() error { return nil } 1178 1179// MergeTagValueIterators returns an iterator that merges a set of iterators. 1180func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator { 1181 if len(itrs) == 0 { 1182 return nil 1183 } else if len(itrs) == 1 { 1184 return itrs[0] 1185 } 1186 1187 return &tagValueMergeIterator{ 1188 buf: make([][]byte, len(itrs)), 1189 itrs: itrs, 1190 } 1191} 1192 1193type tagValueMergeIterator struct { 1194 buf [][]byte 1195 itrs []TagValueIterator 1196} 1197 1198func (itr *tagValueMergeIterator) Close() error { 1199 for i := range itr.itrs { 1200 itr.itrs[i].Close() 1201 } 1202 return nil 1203} 1204 1205// Next returns the element with the next lowest value across the iterators. 1206// 1207// If multiple iterators contain the same value then the first is returned 1208// and the remaining ones are skipped. 1209func (itr *tagValueMergeIterator) Next() (_ []byte, err error) { 1210 // Find next lowest value amongst the buffers. 1211 var value []byte 1212 for i, buf := range itr.buf { 1213 // Fill buffer. 1214 if buf == nil { 1215 if buf, err = itr.itrs[i].Next(); err != nil { 1216 return nil, err 1217 } else if buf != nil { 1218 itr.buf[i] = buf 1219 } else { 1220 continue 1221 } 1222 } 1223 1224 // Find next lowest value. 1225 if value == nil || bytes.Compare(buf, value) == -1 { 1226 value = buf 1227 } 1228 } 1229 1230 // Return nil if no elements remaining. 1231 if value == nil { 1232 return nil, nil 1233 } 1234 1235 // Merge elements and clear buffers. 1236 for i, buf := range itr.buf { 1237 if buf == nil || !bytes.Equal(buf, value) { 1238 continue 1239 } 1240 itr.buf[i] = nil 1241 } 1242 return value, nil 1243} 1244 1245// IndexSet represents a list of indexes, all belonging to one database. 1246type IndexSet struct { 1247 Indexes []Index // The set of indexes comprising this IndexSet. 1248 SeriesFile *SeriesFile // The Series File associated with the db for this set. 1249 fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB. 1250} 1251 1252// HasInmemIndex returns true if any in-memory index is in use. 1253func (is IndexSet) HasInmemIndex() bool { 1254 for _, idx := range is.Indexes { 1255 if idx.Type() == InmemIndexName { 1256 return true 1257 } 1258 } 1259 return false 1260} 1261 1262// Database returns the database name of the first index. 1263func (is IndexSet) Database() string { 1264 if len(is.Indexes) == 0 { 1265 return "" 1266 } 1267 return is.Indexes[0].Database() 1268} 1269 1270// HasField determines if any of the field sets on the set of indexes in the 1271// IndexSet have the provided field for the provided measurement. 1272func (is IndexSet) HasField(measurement []byte, field string) bool { 1273 if len(is.Indexes) == 0 { 1274 return false 1275 } 1276 1277 if len(is.fieldSets) == 0 { 1278 // field sets may not have been initialised yet. 1279 is.fieldSets = make([]*MeasurementFieldSet, 0, len(is.Indexes)) 1280 for _, idx := range is.Indexes { 1281 is.fieldSets = append(is.fieldSets, idx.FieldSet()) 1282 } 1283 } 1284 1285 for _, fs := range is.fieldSets { 1286 if fs.Fields(measurement).HasField(field) { 1287 return true 1288 } 1289 } 1290 return false 1291} 1292 1293// DedupeInmemIndexes returns an index set which removes duplicate indexes. 1294// Useful because inmem indexes are shared by shards per database. 1295func (is IndexSet) DedupeInmemIndexes() IndexSet { 1296 other := IndexSet{ 1297 Indexes: make([]Index, 0, len(is.Indexes)), 1298 SeriesFile: is.SeriesFile, 1299 fieldSets: make([]*MeasurementFieldSet, 0, len(is.Indexes)), 1300 } 1301 1302 uniqueIndexes := make(map[uintptr]Index) 1303 for _, idx := range is.Indexes { 1304 uniqueIndexes[idx.UniqueReferenceID()] = idx 1305 } 1306 1307 for _, idx := range uniqueIndexes { 1308 other.Indexes = append(other.Indexes, idx) 1309 other.fieldSets = append(other.fieldSets, idx.FieldSet()) 1310 } 1311 1312 return other 1313} 1314 1315// MeasurementNamesByExpr returns a slice of measurement names matching the 1316// provided condition. If no condition is provided then all names are returned. 1317func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { 1318 release := is.SeriesFile.Retain() 1319 defer release() 1320 1321 // Return filtered list if expression exists. 1322 if expr != nil { 1323 names, err := is.measurementNamesByExpr(auth, expr) 1324 if err != nil { 1325 return nil, err 1326 } 1327 return slices.CopyChunkedByteSlices(names, 1000), nil 1328 } 1329 1330 itr, err := is.measurementIterator() 1331 if err != nil { 1332 return nil, err 1333 } else if itr == nil { 1334 return nil, nil 1335 } 1336 defer itr.Close() 1337 1338 // Iterate over all measurements if no condition exists. 1339 var names [][]byte 1340 for { 1341 e, err := itr.Next() 1342 if err != nil { 1343 return nil, err 1344 } else if e == nil { 1345 break 1346 } 1347 1348 // Determine if there exists at least one authorised series for the 1349 // measurement name. 1350 if is.measurementAuthorizedSeries(auth, e, nil) { 1351 names = append(names, e) 1352 } 1353 } 1354 return slices.CopyChunkedByteSlices(names, 1000), nil 1355} 1356 1357func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { 1358 if expr == nil { 1359 return nil, nil 1360 } 1361 1362 switch e := expr.(type) { 1363 case *influxql.BinaryExpr: 1364 switch e.Op { 1365 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: 1366 tag, ok := e.LHS.(*influxql.VarRef) 1367 if !ok { 1368 return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) 1369 } 1370 1371 // Retrieve value or regex expression from RHS. 1372 var value string 1373 var regex *regexp.Regexp 1374 if influxql.IsRegexOp(e.Op) { 1375 re, ok := e.RHS.(*influxql.RegexLiteral) 1376 if !ok { 1377 return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) 1378 } 1379 regex = re.Val 1380 } else { 1381 s, ok := e.RHS.(*influxql.StringLiteral) 1382 if !ok { 1383 return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) 1384 } 1385 value = s.Val 1386 } 1387 1388 // Match on name, if specified. 1389 if tag.Val == "_name" { 1390 return is.measurementNamesByNameFilter(auth, e.Op, value, regex) 1391 } else if influxql.IsSystemName(tag.Val) { 1392 return nil, nil 1393 } 1394 return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex) 1395 1396 case influxql.OR, influxql.AND: 1397 lhs, err := is.measurementNamesByExpr(auth, e.LHS) 1398 if err != nil { 1399 return nil, err 1400 } 1401 1402 rhs, err := is.measurementNamesByExpr(auth, e.RHS) 1403 if err != nil { 1404 return nil, err 1405 } 1406 1407 if e.Op == influxql.OR { 1408 return bytesutil.Union(lhs, rhs), nil 1409 } 1410 return bytesutil.Intersect(lhs, rhs), nil 1411 1412 default: 1413 return nil, fmt.Errorf("invalid tag comparison operator") 1414 } 1415 1416 case *influxql.ParenExpr: 1417 return is.measurementNamesByExpr(auth, e.Expr) 1418 default: 1419 return nil, fmt.Errorf("%#v", expr) 1420 } 1421} 1422 1423// measurementNamesByNameFilter returns matching measurement names in sorted order. 1424func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { 1425 itr, err := is.measurementIterator() 1426 if err != nil { 1427 return nil, err 1428 } else if itr == nil { 1429 return nil, nil 1430 } 1431 defer itr.Close() 1432 1433 var names [][]byte 1434 for { 1435 e, err := itr.Next() 1436 if err != nil { 1437 return nil, err 1438 } else if e == nil { 1439 break 1440 } 1441 1442 var matched bool 1443 switch op { 1444 case influxql.EQ: 1445 matched = string(e) == val 1446 case influxql.NEQ: 1447 matched = string(e) != val 1448 case influxql.EQREGEX: 1449 matched = regex.Match(e) 1450 case influxql.NEQREGEX: 1451 matched = !regex.Match(e) 1452 } 1453 1454 if matched && is.measurementAuthorizedSeries(auth, e, nil) { 1455 names = append(names, e) 1456 } 1457 } 1458 bytesutil.Sort(names) 1459 return names, nil 1460} 1461 1462// MeasurementNamesByPredicate returns a slice of measurement names matching the 1463// provided condition. If no condition is provided then all names are returned. 1464// This behaves differently from MeasurementNamesByExpr because it will 1465// return measurements using flux predicates. 1466func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { 1467 release := is.SeriesFile.Retain() 1468 defer release() 1469 1470 // Return filtered list if expression exists. 1471 if expr != nil { 1472 names, err := is.measurementNamesByPredicate(auth, expr) 1473 if err != nil { 1474 return nil, err 1475 } 1476 return slices.CopyChunkedByteSlices(names, 1000), nil 1477 } 1478 1479 itr, err := is.measurementIterator() 1480 if err != nil { 1481 return nil, err 1482 } else if itr == nil { 1483 return nil, nil 1484 } 1485 defer itr.Close() 1486 1487 // Iterate over all measurements if no condition exists. 1488 var names [][]byte 1489 for { 1490 e, err := itr.Next() 1491 if err != nil { 1492 return nil, err 1493 } else if e == nil { 1494 break 1495 } 1496 1497 // Determine if there exists at least one authorised series for the 1498 // measurement name. 1499 if is.measurementAuthorizedSeries(auth, e, nil) { 1500 names = append(names, e) 1501 } 1502 } 1503 return slices.CopyChunkedByteSlices(names, 1000), nil 1504} 1505 1506func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { 1507 if expr == nil { 1508 return nil, nil 1509 } 1510 1511 switch e := expr.(type) { 1512 case *influxql.BinaryExpr: 1513 switch e.Op { 1514 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: 1515 tag, ok := e.LHS.(*influxql.VarRef) 1516 if !ok { 1517 return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) 1518 } 1519 1520 // Retrieve value or regex expression from RHS. 1521 var value string 1522 var regex *regexp.Regexp 1523 if influxql.IsRegexOp(e.Op) { 1524 re, ok := e.RHS.(*influxql.RegexLiteral) 1525 if !ok { 1526 return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) 1527 } 1528 regex = re.Val 1529 } else { 1530 s, ok := e.RHS.(*influxql.StringLiteral) 1531 if !ok { 1532 return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) 1533 } 1534 value = s.Val 1535 } 1536 1537 // Match on name, if specified. 1538 if tag.Val == "_name" { 1539 return is.measurementNamesByNameFilter(auth, e.Op, value, regex) 1540 } else if influxql.IsSystemName(tag.Val) { 1541 return nil, nil 1542 } 1543 return is.measurementNamesByTagPredicate(auth, e.Op, tag.Val, value, regex) 1544 1545 case influxql.OR, influxql.AND: 1546 lhs, err := is.measurementNamesByPredicate(auth, e.LHS) 1547 if err != nil { 1548 return nil, err 1549 } 1550 1551 rhs, err := is.measurementNamesByPredicate(auth, e.RHS) 1552 if err != nil { 1553 return nil, err 1554 } 1555 1556 if e.Op == influxql.OR { 1557 return bytesutil.Union(lhs, rhs), nil 1558 } 1559 return bytesutil.Intersect(lhs, rhs), nil 1560 1561 default: 1562 return nil, fmt.Errorf("invalid tag comparison operator") 1563 } 1564 1565 case *influxql.ParenExpr: 1566 return is.measurementNamesByPredicate(auth, e.Expr) 1567 default: 1568 return nil, fmt.Errorf("%#v", expr) 1569 } 1570} 1571 1572func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { 1573 var names [][]byte 1574 1575 mitr, err := is.measurementIterator() 1576 if err != nil { 1577 return nil, err 1578 } else if mitr == nil { 1579 return nil, nil 1580 } 1581 defer mitr.Close() 1582 1583 // valEqual determines if the provided []byte is equal to the tag value 1584 // to be filtered on. 1585 valEqual := regex.Match 1586 if op == influxql.EQ || op == influxql.NEQ { 1587 vb := []byte(val) 1588 valEqual = func(b []byte) bool { return bytes.Equal(vb, b) } 1589 } 1590 1591 var tagMatch bool 1592 var authorized bool 1593 for { 1594 me, err := mitr.Next() 1595 if err != nil { 1596 return nil, err 1597 } else if me == nil { 1598 break 1599 } 1600 // If the measurement doesn't have the tag key, then it won't be considered. 1601 if ok, err := is.hasTagKey(me, []byte(key)); err != nil { 1602 return nil, err 1603 } else if !ok { 1604 continue 1605 } 1606 tagMatch = false 1607 // Authorization must be explicitly granted when an authorizer is present. 1608 authorized = query.AuthorizerIsOpen(auth) 1609 1610 vitr, err := is.tagValueIterator(me, []byte(key)) 1611 if err != nil { 1612 return nil, err 1613 } 1614 1615 if vitr != nil { 1616 defer vitr.Close() 1617 for { 1618 ve, err := vitr.Next() 1619 if err != nil { 1620 return nil, err 1621 } else if ve == nil { 1622 break 1623 } 1624 if !valEqual(ve) { 1625 continue 1626 } 1627 1628 tagMatch = true 1629 if query.AuthorizerIsOpen(auth) { 1630 break 1631 } 1632 1633 // When an authorizer is present, the measurement should be 1634 // included only if one of it's series is authorized. 1635 sitr, err := is.tagValueSeriesIDIterator(me, []byte(key), ve) 1636 if err != nil { 1637 return nil, err 1638 } else if sitr == nil { 1639 continue 1640 } 1641 defer sitr.Close() 1642 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1643 1644 // Locate a series with this matching tag value that's authorized. 1645 for { 1646 se, err := sitr.Next() 1647 if err != nil { 1648 return nil, err 1649 } 1650 1651 if se.SeriesID == 0 { 1652 break 1653 } 1654 1655 name, tags := is.SeriesFile.Series(se.SeriesID) 1656 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1657 authorized = true 1658 break 1659 } 1660 } 1661 1662 if err := sitr.Close(); err != nil { 1663 return nil, err 1664 } 1665 1666 if tagMatch && authorized { 1667 // The measurement can definitely be included or rejected. 1668 break 1669 } 1670 } 1671 if err := vitr.Close(); err != nil { 1672 return nil, err 1673 } 1674 } 1675 1676 // For negation operators, to determine if the measurement is authorized, 1677 // an authorized series belonging to the measurement must be located. 1678 // Then, the measurement can be added iff !tagMatch && authorized. 1679 if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch { 1680 authorized = is.measurementAuthorizedSeries(auth, me, nil) 1681 } 1682 1683 // tags match | operation is EQ | measurement matches 1684 // -------------------------------------------------- 1685 // True | True | True 1686 // True | False | False 1687 // False | True | False 1688 // False | False | True 1689 if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized { 1690 names = append(names, me) 1691 continue 1692 } 1693 } 1694 1695 bytesutil.Sort(names) 1696 return names, nil 1697} 1698 1699func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { 1700 var names [][]byte 1701 1702 mitr, err := is.measurementIterator() 1703 if err != nil { 1704 return nil, err 1705 } else if mitr == nil { 1706 return nil, nil 1707 } 1708 defer mitr.Close() 1709 1710 var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error) 1711 switch op { 1712 case influxql.EQ: 1713 checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { 1714 return is.measurementHasTagValue(auth, me, []byte(key), []byte(val)) 1715 } 1716 case influxql.NEQ: 1717 checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { 1718 // If there is an authorized series in this measurement and that series 1719 // does not contain the tag key/value. 1720 ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool { 1721 return tags.GetString(key) == val 1722 }) 1723 return ok, nil 1724 } 1725 case influxql.EQREGEX: 1726 checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { 1727 return is.measurementHasTagValueRegex(auth, me, []byte(key), regex) 1728 } 1729 case influxql.NEQREGEX: 1730 checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { 1731 // If there is an authorized series in this measurement and that series 1732 // does not contain the tag key/value. 1733 ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool { 1734 return regex.MatchString(tags.GetString(key)) 1735 }) 1736 return ok, nil 1737 } 1738 default: 1739 return nil, fmt.Errorf("unsupported operand: %s", op) 1740 } 1741 1742 for { 1743 me, err := mitr.Next() 1744 if err != nil { 1745 return nil, err 1746 } else if me == nil { 1747 break 1748 } 1749 1750 ok, err := checkMeasurement(auth, me) 1751 if err != nil { 1752 return nil, err 1753 } else if ok { 1754 names = append(names, me) 1755 } 1756 } 1757 1758 bytesutil.Sort(names) 1759 return names, nil 1760} 1761 1762// measurementAuthorizedSeries determines if the measurement contains a series 1763// that is authorized to be read. 1764func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte, exclude func(tags models.Tags) bool) bool { 1765 if query.AuthorizerIsOpen(auth) && exclude == nil { 1766 return true 1767 } 1768 1769 if auth == nil { 1770 auth = query.OpenAuthorizer 1771 } 1772 1773 sitr, err := is.measurementSeriesIDIterator(name) 1774 if err != nil || sitr == nil { 1775 return false 1776 } 1777 defer sitr.Close() 1778 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1779 1780 for { 1781 series, err := sitr.Next() 1782 if err != nil { 1783 return false 1784 } 1785 1786 if series.SeriesID == 0 { 1787 return false // End of iterator 1788 } 1789 1790 name, tags := is.SeriesFile.Series(series.SeriesID) 1791 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1792 if exclude != nil && exclude(tags) { 1793 continue 1794 } 1795 return true 1796 } 1797 } 1798} 1799 1800func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value []byte) (bool, error) { 1801 if len(value) == 0 { 1802 return is.measurementHasEmptyTagValue(auth, me, key) 1803 } 1804 1805 hasTagValue, err := is.HasTagValue(me, key, value) 1806 if err != nil || !hasTagValue { 1807 return false, err 1808 } 1809 1810 // If the authorizer is open, return true. 1811 if query.AuthorizerIsOpen(auth) { 1812 return true, nil 1813 } 1814 1815 // When an authorizer is present, the measurement should be 1816 // included only if one of it's series is authorized. 1817 sitr, err := is.tagValueSeriesIDIterator(me, key, value) 1818 if err != nil || sitr == nil { 1819 return false, err 1820 } 1821 defer sitr.Close() 1822 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1823 1824 // Locate a series with this matching tag value that's authorized. 1825 for { 1826 se, err := sitr.Next() 1827 if err != nil || se.SeriesID == 0 { 1828 return false, err 1829 } 1830 1831 name, tags := is.SeriesFile.Series(se.SeriesID) 1832 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1833 return true, nil 1834 } 1835 } 1836} 1837 1838func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []byte) (bool, error) { 1839 // Any series that does not have a tag key 1840 // has an empty tag value for that key. 1841 // Iterate through all of the series to find one 1842 // series that does not have the tag key. 1843 sitr, err := is.measurementSeriesIDIterator(me) 1844 if err != nil || sitr == nil { 1845 return false, err 1846 } 1847 defer sitr.Close() 1848 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1849 1850 for { 1851 series, err := sitr.Next() 1852 if err != nil || series.SeriesID == 0 { 1853 return false, err 1854 } 1855 1856 name, tags := is.SeriesFile.Series(series.SeriesID) 1857 if len(tags.Get(key)) > 0 { 1858 // The tag key exists in this series. We need 1859 // at least one series that does not have the tag 1860 // keys. 1861 continue 1862 } 1863 1864 // Verify that we can see this series. 1865 if query.AuthorizerIsOpen(auth) { 1866 return true, nil 1867 } else if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1868 return true, nil 1869 } 1870 } 1871} 1872 1873func (is IndexSet) measurementHasTagValueRegex(auth query.Authorizer, me, key []byte, value *regexp.Regexp) (bool, error) { 1874 // If the regex matches the empty string, do a special check to see 1875 // if we have an empty tag value. 1876 if matchEmpty := value.MatchString(""); matchEmpty { 1877 if ok, err := is.measurementHasEmptyTagValue(auth, me, key); err != nil { 1878 return false, err 1879 } else if ok { 1880 return true, nil 1881 } 1882 } 1883 1884 // Iterate over the tag values and find one that matches the value. 1885 vitr, err := is.tagValueIterator(me, key) 1886 if err != nil || vitr == nil { 1887 return false, err 1888 } 1889 defer vitr.Close() 1890 1891 for { 1892 ve, err := vitr.Next() 1893 if err != nil || ve == nil { 1894 return false, err 1895 } 1896 1897 if !value.Match(ve) { 1898 // The regex does not match this tag value. 1899 continue 1900 } 1901 1902 // If the authorizer is open, then we have found a suitable tag value. 1903 if query.AuthorizerIsOpen(auth) { 1904 return true, nil 1905 } 1906 1907 // When an authorizer is present, the measurement should only be included 1908 // if one of the series is authorized. 1909 if authorized, err := func() (bool, error) { 1910 sitr, err := is.tagValueSeriesIDIterator(me, key, ve) 1911 if err != nil || sitr == nil { 1912 return false, err 1913 } 1914 defer sitr.Close() 1915 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1916 1917 // Locate an authorized series. 1918 for { 1919 se, err := sitr.Next() 1920 if err != nil || se.SeriesID == 0 { 1921 return false, err 1922 } 1923 1924 name, tags := is.SeriesFile.Series(se.SeriesID) 1925 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1926 return true, nil 1927 } 1928 } 1929 }(); err != nil { 1930 return false, err 1931 } else if authorized { 1932 return true, nil 1933 } 1934 } 1935} 1936 1937// HasTagKey returns true if the tag key exists in any index for the provided 1938// measurement. 1939func (is IndexSet) HasTagKey(name, key []byte) (bool, error) { 1940 return is.hasTagKey(name, key) 1941} 1942 1943// hasTagKey returns true if the tag key exists in any index for the provided 1944// measurement, and guarantees to never take a lock on the series file. 1945func (is IndexSet) hasTagKey(name, key []byte) (bool, error) { 1946 for _, idx := range is.Indexes { 1947 if ok, err := idx.HasTagKey(name, key); err != nil { 1948 return false, err 1949 } else if ok { 1950 return true, nil 1951 } 1952 } 1953 return false, nil 1954} 1955 1956// HasTagValue returns true if the tag value exists in any index for the provided 1957// measurement and tag key. 1958func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) { 1959 for _, idx := range is.Indexes { 1960 if ok, err := idx.HasTagValue(name, key, value); err != nil { 1961 return false, err 1962 } else if ok { 1963 return true, nil 1964 } 1965 } 1966 return false, nil 1967} 1968 1969// MeasurementIterator returns an iterator over all measurements in the index. 1970func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) { 1971 return is.measurementIterator() 1972} 1973 1974// measurementIterator returns an iterator over all measurements in the index. 1975// It guarantees to never take any locks on the underlying series file. 1976func (is IndexSet) measurementIterator() (MeasurementIterator, error) { 1977 a := make([]MeasurementIterator, 0, len(is.Indexes)) 1978 for _, idx := range is.Indexes { 1979 itr, err := idx.MeasurementIterator() 1980 if err != nil { 1981 MeasurementIterators(a).Close() 1982 return nil, err 1983 } else if itr != nil { 1984 a = append(a, itr) 1985 } 1986 } 1987 return MergeMeasurementIterators(a...), nil 1988} 1989 1990// TagKeyIterator returns a key iterator for a measurement. 1991func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) { 1992 return is.tagKeyIterator(name) 1993} 1994 1995// tagKeyIterator returns a key iterator for a measurement. It guarantees to never 1996// take any locks on the underlying series file. 1997func (is IndexSet) tagKeyIterator(name []byte) (TagKeyIterator, error) { 1998 a := make([]TagKeyIterator, 0, len(is.Indexes)) 1999 for _, idx := range is.Indexes { 2000 itr, err := idx.TagKeyIterator(name) 2001 if err != nil { 2002 TagKeyIterators(a).Close() 2003 return nil, err 2004 } else if itr != nil { 2005 a = append(a, itr) 2006 } 2007 } 2008 return MergeTagKeyIterators(a...), nil 2009} 2010 2011// TagValueIterator returns a value iterator for a tag key. 2012func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) { 2013 return is.tagValueIterator(name, key) 2014} 2015 2016// tagValueIterator returns a value iterator for a tag key. It guarantees to never 2017// take any locks on the underlying series file. 2018func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) { 2019 a := make([]TagValueIterator, 0, len(is.Indexes)) 2020 for _, idx := range is.Indexes { 2021 itr, err := idx.TagValueIterator(name, key) 2022 if err != nil { 2023 TagValueIterators(a).Close() 2024 return nil, err 2025 } else if itr != nil { 2026 a = append(a, itr) 2027 } 2028 } 2029 return MergeTagValueIterators(a...), nil 2030} 2031 2032// TagKeyHasAuthorizedSeries determines if there exists an authorized series for 2033// the provided measurement name and tag key. 2034func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) { 2035 if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) { 2036 return true, nil 2037 } 2038 2039 release := is.SeriesFile.Retain() 2040 defer release() 2041 2042 itr, err := is.tagKeySeriesIDIterator(name, tagKey) 2043 if err != nil { 2044 return false, err 2045 } else if itr == nil { 2046 return false, nil 2047 } 2048 defer itr.Close() 2049 itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) 2050 2051 for { 2052 e, err := itr.Next() 2053 if err != nil { 2054 return false, err 2055 } 2056 2057 if e.SeriesID == 0 { 2058 return false, nil 2059 } 2060 2061 if query.AuthorizerIsOpen(auth) { 2062 return true, nil 2063 } 2064 2065 name, tags := is.SeriesFile.Series(e.SeriesID) 2066 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 2067 return true, nil 2068 } 2069 } 2070} 2071 2072// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series 2073// for the provided measurement. 2074func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { 2075 release := is.SeriesFile.Retain() 2076 defer release() 2077 2078 itr, err := is.measurementSeriesIDIterator(name) 2079 if err != nil { 2080 return nil, err 2081 } 2082 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2083} 2084 2085// measurementSeriesIDIterator does not provide any locking on the Series file. 2086// 2087// See MeasurementSeriesIDIterator for more details. 2088func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { 2089 a := make([]SeriesIDIterator, 0, len(is.Indexes)) 2090 for _, idx := range is.Indexes { 2091 itr, err := idx.MeasurementSeriesIDIterator(name) 2092 if err != nil { 2093 SeriesIDIterators(a).Close() 2094 return nil, err 2095 } else if itr != nil { 2096 a = append(a, itr) 2097 } 2098 } 2099 return MergeSeriesIDIterators(a...), nil 2100} 2101 2102// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies 2103// the provided function. 2104func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { 2105 release := is.SeriesFile.Retain() 2106 defer release() 2107 2108 itr, err := is.tagKeyIterator(name) 2109 if err != nil { 2110 return err 2111 } else if itr == nil { 2112 return nil 2113 } 2114 defer itr.Close() 2115 2116 for { 2117 key, err := itr.Next() 2118 if err != nil { 2119 return err 2120 } else if key == nil { 2121 return nil 2122 } 2123 2124 if err := fn(key); err != nil { 2125 return err 2126 } 2127 } 2128} 2129 2130// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. 2131func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { 2132 release := is.SeriesFile.Retain() 2133 defer release() 2134 2135 keys := make(map[string]struct{}) 2136 for _, idx := range is.Indexes { 2137 m, err := idx.MeasurementTagKeysByExpr(name, expr) 2138 if err != nil { 2139 return nil, err 2140 } 2141 for k := range m { 2142 keys[k] = struct{}{} 2143 } 2144 } 2145 return keys, nil 2146} 2147 2148// TagKeySeriesIDIterator returns a series iterator for all values across a single key. 2149func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { 2150 release := is.SeriesFile.Retain() 2151 defer release() 2152 2153 itr, err := is.tagKeySeriesIDIterator(name, key) 2154 if err != nil { 2155 return nil, err 2156 } 2157 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2158} 2159 2160// tagKeySeriesIDIterator returns a series iterator for all values across a 2161// single key. 2162// 2163// It guarantees to never take any locks on the series file. 2164func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { 2165 a := make([]SeriesIDIterator, 0, len(is.Indexes)) 2166 for _, idx := range is.Indexes { 2167 itr, err := idx.TagKeySeriesIDIterator(name, key) 2168 if err != nil { 2169 SeriesIDIterators(a).Close() 2170 return nil, err 2171 } else if itr != nil { 2172 a = append(a, itr) 2173 } 2174 } 2175 return MergeSeriesIDIterators(a...), nil 2176} 2177 2178// TagValueSeriesIDIterator returns a series iterator for a single tag value. 2179func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { 2180 release := is.SeriesFile.Retain() 2181 defer release() 2182 2183 itr, err := is.tagValueSeriesIDIterator(name, key, value) 2184 if err != nil { 2185 return nil, err 2186 } 2187 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2188} 2189 2190// tagValueSeriesIDIterator does not provide any locking on the Series File. 2191// 2192// See TagValueSeriesIDIterator for more details. 2193func (is IndexSet) tagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { 2194 a := make([]SeriesIDIterator, 0, len(is.Indexes)) 2195 for _, idx := range is.Indexes { 2196 itr, err := idx.TagValueSeriesIDIterator(name, key, value) 2197 if err != nil { 2198 SeriesIDIterators(a).Close() 2199 return nil, err 2200 } else if itr != nil { 2201 a = append(a, itr) 2202 } 2203 } 2204 return MergeSeriesIDIterators(a...), nil 2205} 2206 2207// MeasurementSeriesByExprIterator returns a series iterator for a measurement 2208// that is filtered by expr. If expr only contains time expressions then this 2209// call is equivalent to MeasurementSeriesIDIterator(). 2210func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { 2211 release := is.SeriesFile.Retain() 2212 defer release() 2213 return is.measurementSeriesByExprIterator(name, expr) 2214} 2215 2216// measurementSeriesByExprIterator returns a series iterator for a measurement 2217// that is filtered by expr. See MeasurementSeriesByExprIterator for more details. 2218// 2219// measurementSeriesByExprIterator guarantees to never take any locks on the 2220// series file. 2221func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { 2222 // Return all series for the measurement if there are no tag expressions. 2223 if expr == nil { 2224 itr, err := is.measurementSeriesIDIterator(name) 2225 if err != nil { 2226 return nil, err 2227 } 2228 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2229 } 2230 2231 itr, err := is.seriesByExprIterator(name, expr) 2232 if err != nil { 2233 return nil, err 2234 } 2235 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2236} 2237 2238// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. 2239func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { 2240 release := is.SeriesFile.Retain() 2241 defer release() 2242 2243 // Create iterator for all matching series. 2244 itr, err := is.measurementSeriesByExprIterator(name, expr) 2245 if err != nil { 2246 return nil, err 2247 } else if itr == nil { 2248 return nil, nil 2249 } 2250 defer itr.Close() 2251 2252 // measurementSeriesByExprIterator filters deleted series; no need to do so here. 2253 2254 // Iterate over all series and generate keys. 2255 var keys [][]byte 2256 for { 2257 e, err := itr.Next() 2258 if err != nil { 2259 return nil, err 2260 } else if e.SeriesID == 0 { 2261 break 2262 } 2263 2264 // Check for unsupported field filters. 2265 // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). 2266 if e.Expr != nil { 2267 if v, ok := e.Expr.(*influxql.BooleanLiteral); !ok || !v.Val { 2268 return nil, errors.New("fields not supported in WHERE clause during deletion") 2269 } 2270 } 2271 2272 seriesKey := is.SeriesFile.SeriesKey(e.SeriesID) 2273 if len(seriesKey) == 0 { 2274 continue 2275 } 2276 2277 name, tags := ParseSeriesKey(seriesKey) 2278 keys = append(keys, models.MakeKey(name, tags)) 2279 } 2280 2281 bytesutil.Sort(keys) 2282 2283 return keys, nil 2284} 2285 2286func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { 2287 switch expr := expr.(type) { 2288 case *influxql.BinaryExpr: 2289 switch expr.Op { 2290 case influxql.AND, influxql.OR: 2291 // Get the series IDs and filter expressions for the LHS. 2292 litr, err := is.seriesByExprIterator(name, expr.LHS) 2293 if err != nil { 2294 return nil, err 2295 } 2296 2297 // Get the series IDs and filter expressions for the RHS. 2298 ritr, err := is.seriesByExprIterator(name, expr.RHS) 2299 if err != nil { 2300 if litr != nil { 2301 litr.Close() 2302 } 2303 return nil, err 2304 } 2305 2306 // Intersect iterators if expression is "AND". 2307 if expr.Op == influxql.AND { 2308 return IntersectSeriesIDIterators(litr, ritr), nil 2309 } 2310 2311 // Union iterators if expression is "OR". 2312 return UnionSeriesIDIterators(litr, ritr), nil 2313 2314 default: 2315 return is.seriesByBinaryExprIterator(name, expr) 2316 } 2317 2318 case *influxql.ParenExpr: 2319 return is.seriesByExprIterator(name, expr.Expr) 2320 2321 case *influxql.BooleanLiteral: 2322 if expr.Val { 2323 return is.measurementSeriesIDIterator(name) 2324 } 2325 return nil, nil 2326 2327 default: 2328 return nil, nil 2329 } 2330} 2331 2332// seriesByBinaryExprIterator returns a series iterator and a filtering expression. 2333func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIDIterator, error) { 2334 // If this binary expression has another binary expression, then this 2335 // is some expression math and we should just pass it to the underlying query. 2336 if _, ok := n.LHS.(*influxql.BinaryExpr); ok { 2337 itr, err := is.measurementSeriesIDIterator(name) 2338 if err != nil { 2339 return nil, err 2340 } 2341 return newSeriesIDExprIterator(itr, n), nil 2342 } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { 2343 itr, err := is.measurementSeriesIDIterator(name) 2344 if err != nil { 2345 return nil, err 2346 } 2347 return newSeriesIDExprIterator(itr, n), nil 2348 } 2349 2350 // Retrieve the variable reference from the correct side of the expression. 2351 key, ok := n.LHS.(*influxql.VarRef) 2352 value := n.RHS 2353 if !ok { 2354 key, ok = n.RHS.(*influxql.VarRef) 2355 if !ok { 2356 // This is an expression we do not know how to evaluate. Let the 2357 // query engine take care of this. 2358 itr, err := is.measurementSeriesIDIterator(name) 2359 if err != nil { 2360 return nil, err 2361 } 2362 return newSeriesIDExprIterator(itr, n), nil 2363 } 2364 value = n.LHS 2365 } 2366 2367 // For fields, return all series from this measurement. 2368 if key.Val != "_name" && ((key.Type == influxql.Unknown && is.HasField(name, key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) { 2369 itr, err := is.measurementSeriesIDIterator(name) 2370 if err != nil { 2371 return nil, err 2372 } 2373 return newSeriesIDExprIterator(itr, n), nil 2374 } else if value, ok := value.(*influxql.VarRef); ok { 2375 // Check if the RHS is a variable and if it is a field. 2376 if value.Val != "_name" && ((value.Type == influxql.Unknown && is.HasField(name, value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { 2377 itr, err := is.measurementSeriesIDIterator(name) 2378 if err != nil { 2379 return nil, err 2380 } 2381 return newSeriesIDExprIterator(itr, n), nil 2382 } 2383 } 2384 2385 // Create iterator based on value type. 2386 switch value := value.(type) { 2387 case *influxql.StringLiteral: 2388 return is.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op) 2389 case *influxql.RegexLiteral: 2390 return is.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op) 2391 case *influxql.VarRef: 2392 return is.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op) 2393 default: 2394 // We do not know how to evaluate this expression so pass it 2395 // on to the query engine. 2396 itr, err := is.measurementSeriesIDIterator(name) 2397 if err != nil { 2398 return nil, err 2399 } 2400 return newSeriesIDExprIterator(itr, n), nil 2401 } 2402} 2403 2404func (is IndexSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIDIterator, error) { 2405 // Special handling for "_name" to match measurement name. 2406 if bytes.Equal(key, []byte("_name")) { 2407 if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { 2408 return is.measurementSeriesIDIterator(name) 2409 } 2410 return nil, nil 2411 } 2412 2413 if op == influxql.EQ { 2414 // Match a specific value. 2415 if len(value) != 0 { 2416 return is.tagValueSeriesIDIterator(name, key, value) 2417 } 2418 2419 mitr, err := is.measurementSeriesIDIterator(name) 2420 if err != nil { 2421 return nil, err 2422 } 2423 2424 kitr, err := is.tagKeySeriesIDIterator(name, key) 2425 if err != nil { 2426 if mitr != nil { 2427 mitr.Close() 2428 } 2429 return nil, err 2430 } 2431 2432 // Return all measurement series that have no values from this tag key. 2433 return DifferenceSeriesIDIterators(mitr, kitr), nil 2434 } 2435 2436 // Return all measurement series without this tag value. 2437 if len(value) != 0 { 2438 mitr, err := is.measurementSeriesIDIterator(name) 2439 if err != nil { 2440 return nil, err 2441 } 2442 2443 vitr, err := is.tagValueSeriesIDIterator(name, key, value) 2444 if err != nil { 2445 if mitr != nil { 2446 mitr.Close() 2447 } 2448 return nil, err 2449 } 2450 2451 return DifferenceSeriesIDIterators(mitr, vitr), nil 2452 } 2453 2454 // Return all series across all values of this tag key. 2455 return is.tagKeySeriesIDIterator(name, key) 2456} 2457 2458func (is IndexSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIDIterator, error) { 2459 // Special handling for "_name" to match measurement name. 2460 if bytes.Equal(key, []byte("_name")) { 2461 match := value.Match(name) 2462 if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) { 2463 mitr, err := is.measurementSeriesIDIterator(name) 2464 if err != nil { 2465 return nil, err 2466 } 2467 return newSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil 2468 } 2469 return nil, nil 2470 } 2471 return is.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX) 2472} 2473 2474func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIDIterator, error) { 2475 itr0, err := is.tagKeySeriesIDIterator(name, key) 2476 if err != nil { 2477 return nil, err 2478 } 2479 2480 itr1, err := is.tagKeySeriesIDIterator(name, []byte(value.Val)) 2481 if err != nil { 2482 if itr0 != nil { 2483 itr0.Close() 2484 } 2485 return nil, err 2486 } 2487 2488 if op == influxql.EQ { 2489 return IntersectSeriesIDIterators(itr0, itr1), nil 2490 } 2491 return DifferenceSeriesIDIterators(itr0, itr1), nil 2492} 2493 2494// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value. 2495// If matches is false, returns iterators which do not match value. 2496func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { 2497 release := is.SeriesFile.Retain() 2498 defer release() 2499 itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches) 2500 if err != nil { 2501 return nil, err 2502 } 2503 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2504} 2505 2506// matchTagValueSeriesIDIterator returns a series iterator for tags which match 2507// value. See MatchTagValueSeriesIDIterator for more details. 2508// 2509// It guarantees to never take any locks on the underlying series file. 2510func (is IndexSet) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { 2511 matchEmpty := value.MatchString("") 2512 if matches { 2513 if matchEmpty { 2514 return is.matchTagValueEqualEmptySeriesIDIterator(name, key, value) 2515 } 2516 return is.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value) 2517 } 2518 2519 if matchEmpty { 2520 return is.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value) 2521 } 2522 return is.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value) 2523} 2524 2525func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2526 vitr, err := is.tagValueIterator(name, key) 2527 if err != nil { 2528 return nil, err 2529 } else if vitr == nil { 2530 return is.measurementSeriesIDIterator(name) 2531 } 2532 defer vitr.Close() 2533 2534 var itrs []SeriesIDIterator 2535 if err := func() error { 2536 for { 2537 e, err := vitr.Next() 2538 if err != nil { 2539 return err 2540 } else if e == nil { 2541 break 2542 } 2543 2544 if !value.Match(e) { 2545 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2546 if err != nil { 2547 return err 2548 } else if itr != nil { 2549 itrs = append(itrs, itr) 2550 } 2551 } 2552 } 2553 return nil 2554 }(); err != nil { 2555 SeriesIDIterators(itrs).Close() 2556 return nil, err 2557 } 2558 2559 mitr, err := is.measurementSeriesIDIterator(name) 2560 if err != nil { 2561 SeriesIDIterators(itrs).Close() 2562 return nil, err 2563 } 2564 2565 return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil 2566} 2567 2568func (is IndexSet) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2569 vitr, err := is.tagValueIterator(name, key) 2570 if err != nil { 2571 return nil, err 2572 } else if vitr == nil { 2573 return nil, nil 2574 } 2575 defer vitr.Close() 2576 2577 var itrs []SeriesIDIterator 2578 for { 2579 e, err := vitr.Next() 2580 if err != nil { 2581 SeriesIDIterators(itrs).Close() 2582 return nil, err 2583 } else if e == nil { 2584 break 2585 } 2586 2587 if value.Match(e) { 2588 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2589 if err != nil { 2590 SeriesIDIterators(itrs).Close() 2591 return nil, err 2592 } else if itr != nil { 2593 itrs = append(itrs, itr) 2594 } 2595 } 2596 } 2597 return MergeSeriesIDIterators(itrs...), nil 2598} 2599 2600func (is IndexSet) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2601 vitr, err := is.tagValueIterator(name, key) 2602 if err != nil { 2603 return nil, err 2604 } else if vitr == nil { 2605 return nil, nil 2606 } 2607 defer vitr.Close() 2608 2609 var itrs []SeriesIDIterator 2610 for { 2611 e, err := vitr.Next() 2612 if err != nil { 2613 SeriesIDIterators(itrs).Close() 2614 return nil, err 2615 } else if e == nil { 2616 break 2617 } 2618 2619 if !value.Match(e) { 2620 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2621 if err != nil { 2622 SeriesIDIterators(itrs).Close() 2623 return nil, err 2624 } else if itr != nil { 2625 itrs = append(itrs, itr) 2626 } 2627 } 2628 } 2629 return MergeSeriesIDIterators(itrs...), nil 2630} 2631 2632func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2633 vitr, err := is.tagValueIterator(name, key) 2634 if err != nil { 2635 return nil, err 2636 } else if vitr == nil { 2637 return is.measurementSeriesIDIterator(name) 2638 } 2639 defer vitr.Close() 2640 2641 var itrs []SeriesIDIterator 2642 for { 2643 e, err := vitr.Next() 2644 if err != nil { 2645 SeriesIDIterators(itrs).Close() 2646 return nil, err 2647 } else if e == nil { 2648 break 2649 } 2650 if value.Match(e) { 2651 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2652 if err != nil { 2653 SeriesIDIterators(itrs).Close() 2654 return nil, err 2655 } else if itr != nil { 2656 itrs = append(itrs, itr) 2657 } 2658 } 2659 } 2660 2661 mitr, err := is.measurementSeriesIDIterator(name) 2662 if err != nil { 2663 SeriesIDIterators(itrs).Close() 2664 return nil, err 2665 } 2666 return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil 2667} 2668 2669// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys. 2670// 2671// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the 2672// position of the tag key in the keys argument. 2673// 2674// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending 2675// lexicographic order. 2676func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) { 2677 release := is.SeriesFile.Retain() 2678 defer release() 2679 return is.tagValuesByKeyAndExpr(auth, name, keys, expr) 2680} 2681 2682// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See 2683// TagValuesByKeyAndExpr for more details. 2684// 2685// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying 2686// series file. 2687func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { 2688 database := is.Database() 2689 2690 valueExpr := influxql.CloneExpr(expr) 2691 valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr { 2692 switch e := e.(type) { 2693 case *influxql.BinaryExpr: 2694 switch e.Op { 2695 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: 2696 tag, ok := e.LHS.(*influxql.VarRef) 2697 if !ok || tag.Val != "value" { 2698 return nil 2699 } 2700 } 2701 } 2702 return e 2703 }), nil) 2704 2705 itr, err := is.seriesByExprIterator(name, expr) 2706 if err != nil { 2707 return nil, err 2708 } else if itr == nil { 2709 return nil, nil 2710 } 2711 itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) 2712 defer itr.Close() 2713 2714 keyIdxs := make(map[string]int, len(keys)) 2715 for ki, key := range keys { 2716 keyIdxs[key] = ki 2717 2718 // Check that keys are in order. 2719 if ki > 0 && key < keys[ki-1] { 2720 return nil, fmt.Errorf("keys %v are not in ascending order", keys) 2721 } 2722 } 2723 2724 resultSet := make([]map[string]struct{}, len(keys)) 2725 for i := 0; i < len(resultSet); i++ { 2726 resultSet[i] = make(map[string]struct{}) 2727 } 2728 2729 // Iterate all series to collect tag values. 2730 for { 2731 e, err := itr.Next() 2732 if err != nil { 2733 return nil, err 2734 } else if e.SeriesID == 0 { 2735 break 2736 } 2737 2738 buf := is.SeriesFile.SeriesKey(e.SeriesID) 2739 if len(buf) == 0 { 2740 continue 2741 } 2742 2743 if auth != nil { 2744 name, tags := ParseSeriesKey(buf) 2745 if !auth.AuthorizeSeriesRead(database, name, tags) { 2746 continue 2747 } 2748 } 2749 2750 _, buf = ReadSeriesKeyLen(buf) 2751 _, buf = ReadSeriesKeyMeasurement(buf) 2752 tagN, buf := ReadSeriesKeyTagN(buf) 2753 for i := 0; i < tagN; i++ { 2754 var key, value []byte 2755 key, value, buf = ReadSeriesKeyTag(buf) 2756 if valueExpr != nil { 2757 if !influxql.EvalBool(valueExpr, map[string]interface{}{"value": string(value)}) { 2758 continue 2759 } 2760 } 2761 2762 if idx, ok := keyIdxs[string(key)]; ok { 2763 resultSet[idx][string(value)] = struct{}{} 2764 } else if string(key) > keys[len(keys)-1] { 2765 // The tag key is > the largest key we're interested in. 2766 break 2767 } 2768 } 2769 } 2770 return resultSet, nil 2771} 2772 2773// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. 2774func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { 2775 if len(keys) == 0 { 2776 return nil, nil 2777 } 2778 2779 results := make([][]string, len(keys)) 2780 // If the keys are not sorted, then sort them. 2781 if !keysSorted { 2782 sort.Strings(keys) 2783 } 2784 2785 release := is.SeriesFile.Retain() 2786 defer release() 2787 2788 // No expression means that the values shouldn't be filtered; so fetch them 2789 // all. 2790 if expr == nil { 2791 for ki, key := range keys { 2792 vitr, err := is.tagValueIterator(name, []byte(key)) 2793 if err != nil { 2794 return nil, err 2795 } else if vitr == nil { 2796 break 2797 } 2798 defer vitr.Close() 2799 2800 // If no authorizer present then return all values. 2801 if query.AuthorizerIsOpen(auth) { 2802 for { 2803 val, err := vitr.Next() 2804 if err != nil { 2805 return nil, err 2806 } else if val == nil { 2807 break 2808 } 2809 results[ki] = append(results[ki], string(val)) 2810 } 2811 continue 2812 } 2813 2814 // Authorization is present — check all series with matching tag values 2815 // and measurements for the presence of an authorized series. 2816 for { 2817 val, err := vitr.Next() 2818 if err != nil { 2819 return nil, err 2820 } else if val == nil { 2821 break 2822 } 2823 2824 sitr, err := is.tagValueSeriesIDIterator(name, []byte(key), val) 2825 if err != nil { 2826 return nil, err 2827 } else if sitr == nil { 2828 continue 2829 } 2830 defer sitr.Close() 2831 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 2832 2833 for { 2834 se, err := sitr.Next() 2835 if err != nil { 2836 return nil, err 2837 } 2838 2839 if se.SeriesID == 0 { 2840 break 2841 } 2842 2843 name, tags := is.SeriesFile.Series(se.SeriesID) 2844 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 2845 results[ki] = append(results[ki], string(val)) 2846 break 2847 } 2848 } 2849 if err := sitr.Close(); err != nil { 2850 return nil, err 2851 } 2852 } 2853 } 2854 return results, nil 2855 } 2856 2857 // This is the case where we have filtered series by some WHERE condition. 2858 // We only care about the tag values for the keys given the 2859 // filtered set of series ids. 2860 resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr) 2861 if err != nil { 2862 return nil, err 2863 } 2864 2865 // Convert result sets into []string 2866 for i, s := range resultSet { 2867 values := make([]string, 0, len(s)) 2868 for v := range s { 2869 values = append(values, v) 2870 } 2871 sort.Strings(values) 2872 results[i] = values 2873 } 2874 return results, nil 2875} 2876 2877// TagSets returns an ordered list of tag sets for a measurement by dimension 2878// and filtered by an optional conditional expression. 2879func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { 2880 release := is.SeriesFile.Retain() 2881 defer release() 2882 2883 itr, err := is.measurementSeriesByExprIterator(name, opt.Condition) 2884 if err != nil { 2885 return nil, err 2886 } else if itr == nil { 2887 return nil, nil 2888 } 2889 defer itr.Close() 2890 // measurementSeriesByExprIterator filters deleted series IDs; no need to 2891 // do so here. 2892 2893 var dims []string 2894 if len(opt.Dimensions) > 0 { 2895 dims = make([]string, len(opt.Dimensions)) 2896 copy(dims, opt.Dimensions) 2897 sort.Strings(dims) 2898 } 2899 2900 // For every series, get the tag values for the requested tag keys i.e. 2901 // dimensions. This is the TagSet for that series. Series with the same 2902 // TagSet are then grouped together, because for the purpose of GROUP BY 2903 // they are part of the same composite series. 2904 tagSets := make(map[string]*query.TagSet, 64) 2905 var ( 2906 seriesN, maxSeriesN int 2907 db = is.Database() 2908 ) 2909 2910 if opt.MaxSeriesN > 0 { 2911 maxSeriesN = opt.MaxSeriesN 2912 } else { 2913 maxSeriesN = int(^uint(0) >> 1) 2914 } 2915 2916 // The tag sets require a string for each series key in the set, The series 2917 // file formatted keys need to be parsed into models format. Since they will 2918 // end up as strings we can re-use an intermediate buffer for this process. 2919 var keyBuf []byte 2920 var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration. 2921 for { 2922 se, err := itr.Next() 2923 if err != nil { 2924 return nil, err 2925 } else if se.SeriesID == 0 { 2926 break 2927 } 2928 2929 // Skip if the series has been tombstoned. 2930 key := sfile.SeriesKey(se.SeriesID) 2931 if len(key) == 0 { 2932 continue 2933 } 2934 2935 if seriesN&0x3fff == 0x3fff { 2936 // check every 16384 series if the query has been canceled 2937 select { 2938 case <-opt.InterruptCh: 2939 return nil, query.ErrQueryInterrupted 2940 default: 2941 } 2942 } 2943 2944 if seriesN > maxSeriesN { 2945 return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) 2946 } 2947 2948 // NOTE - must not escape this loop iteration. 2949 _, tagsBuf = ParseSeriesKeyInto(key, tagsBuf) 2950 if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) { 2951 continue 2952 } 2953 2954 var tagsAsKey []byte 2955 if len(dims) > 0 { 2956 tagsAsKey = MakeTagsKey(dims, tagsBuf) 2957 } 2958 2959 tagSet, ok := tagSets[string(tagsAsKey)] 2960 if !ok { 2961 // This TagSet is new, create a new entry for it. 2962 tagSet = &query.TagSet{ 2963 Tags: nil, 2964 Key: tagsAsKey, 2965 } 2966 } 2967 2968 // Associate the series and filter with the Tagset. 2969 keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf) 2970 tagSet.AddFilter(string(keyBuf), se.Expr) 2971 keyBuf = keyBuf[:0] 2972 2973 // Ensure it's back in the map. 2974 tagSets[string(tagsAsKey)] = tagSet 2975 seriesN++ 2976 } 2977 2978 // Sort the series in each tag set. 2979 for _, t := range tagSets { 2980 sort.Sort(t) 2981 } 2982 2983 // The TagSets have been created, as a map of TagSets. Just send 2984 // the values back as a slice, sorting for consistency. 2985 sortedTagsSets := make([]*query.TagSet, 0, len(tagSets)) 2986 for _, v := range tagSets { 2987 sortedTagsSets = append(sortedTagsSets, v) 2988 } 2989 sort.Sort(byTagKey(sortedTagsSets)) 2990 2991 return sortedTagsSets, nil 2992} 2993 2994// IndexFormat represents the format for an index. 2995type IndexFormat int 2996 2997const ( 2998 // InMemFormat is the format used by the original in-memory shared index. 2999 InMemFormat IndexFormat = 1 3000 3001 // TSI1Format is the format used by the tsi1 index. 3002 TSI1Format IndexFormat = 2 3003) 3004 3005// NewIndexFunc creates a new index. 3006type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index 3007 3008// newIndexFuncs is a lookup of index constructors by name. 3009var newIndexFuncs = make(map[string]NewIndexFunc) 3010 3011// RegisterIndex registers a storage index initializer by name. 3012func RegisterIndex(name string, fn NewIndexFunc) { 3013 if _, ok := newIndexFuncs[name]; ok { 3014 panic("index already registered: " + name) 3015 } 3016 newIndexFuncs[name] = fn 3017} 3018 3019// RegisteredIndexes returns the slice of currently registered indexes. 3020func RegisteredIndexes() []string { 3021 a := make([]string, 0, len(newIndexFuncs)) 3022 for k := range newIndexFuncs { 3023 a = append(a, k) 3024 } 3025 sort.Strings(a) 3026 return a 3027} 3028 3029// NewIndex returns an instance of an index based on its format. 3030// If the path does not exist then the DefaultFormat is used. 3031func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) { 3032 format := options.IndexVersion 3033 3034 // Use default format unless existing directory exists. 3035 _, err := os.Stat(path) 3036 if os.IsNotExist(err) { 3037 // nop, use default 3038 } else if err != nil { 3039 return nil, err 3040 } else if err == nil { 3041 format = TSI1IndexName 3042 } 3043 3044 // Lookup index by format. 3045 fn := newIndexFuncs[format] 3046 if fn == nil { 3047 return nil, fmt.Errorf("invalid index format: %q", format) 3048 } 3049 return fn(id, database, path, seriesIDSet, sfile, options), nil 3050} 3051 3052func MustOpenIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index { 3053 idx, err := NewIndex(id, database, path, seriesIDSet, sfile, options) 3054 if err != nil { 3055 panic(err) 3056 } else if err := idx.Open(); err != nil { 3057 panic(err) 3058 } 3059 return idx 3060} 3061 3062// assert will panic with a given formatted message if the given condition is false. 3063func assert(condition bool, msg string, v ...interface{}) { 3064 if !condition { 3065 panic(fmt.Sprintf("assert failed: "+msg, v...)) 3066 } 3067} 3068 3069type byTagKey []*query.TagSet 3070 3071func (t byTagKey) Len() int { return len(t) } 3072func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } 3073func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } 3074