1package tsdb 2 3import ( 4 "bytes" 5 "errors" 6 "fmt" 7 "os" 8 "regexp" 9 "sort" 10 "sync" 11 12 "github.com/influxdata/influxdb/models" 13 "github.com/influxdata/influxdb/pkg/bytesutil" 14 "github.com/influxdata/influxdb/pkg/estimator" 15 "github.com/influxdata/influxdb/pkg/slices" 16 "github.com/influxdata/influxdb/query" 17 "github.com/influxdata/influxql" 18 "go.uber.org/zap" 19) 20 21// Available index types. 22const ( 23 InmemIndexName = "inmem" 24 TSI1IndexName = "tsi1" 25) 26 27// ErrIndexClosing can be returned to from an Index method if the index is currently closing. 28var ErrIndexClosing = errors.New("index is closing") 29 30type Index interface { 31 Open() error 32 Close() error 33 WithLogger(*zap.Logger) 34 35 Database() string 36 MeasurementExists(name []byte) (bool, error) 37 MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) 38 DropMeasurement(name []byte) error 39 ForEachMeasurementName(fn func(name []byte) error) error 40 41 InitializeSeries(keys, names [][]byte, tags []models.Tags) error 42 CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error 43 CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error 44 DropSeries(seriesID uint64, key []byte, cascade bool) error 45 DropMeasurementIfSeriesNotExist(name []byte) (bool, error) 46 47 // Used to clean up series in inmem index that were dropped with a shard. 48 DropSeriesGlobal(key []byte) error 49 50 MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) 51 SeriesN() int64 52 SeriesSketches() (estimator.Sketch, estimator.Sketch, error) 53 SeriesIDSet() *SeriesIDSet 54 55 HasTagKey(name, key []byte) (bool, error) 56 HasTagValue(name, key, value []byte) (bool, error) 57 58 MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) 59 60 TagKeyCardinality(name, key []byte) int 61 62 // InfluxQL system iterators 63 MeasurementIterator() (MeasurementIterator, error) 64 TagKeyIterator(name []byte) (TagKeyIterator, error) 65 TagValueIterator(name, key []byte) (TagValueIterator, error) 66 MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) 67 TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) 68 TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) 69 70 // Sets a shared fieldset from the engine. 71 FieldSet() *MeasurementFieldSet 72 SetFieldSet(fs *MeasurementFieldSet) 73 74 // Size of the index on disk, if applicable. 75 DiskSizeBytes() int64 76 77 // Bytes estimates the memory footprint of this Index, in bytes. 78 Bytes() int 79 80 // To be removed w/ tsi1. 81 SetFieldName(measurement []byte, name string) 82 83 Type() string 84 // Returns a unique reference ID to the index instance. 85 // For inmem, returns a reference to the backing Index, not ShardIndex. 86 UniqueReferenceID() uintptr 87 88 Rebuild() 89} 90 91// SeriesElem represents a generic series element. 92type SeriesElem interface { 93 Name() []byte 94 Tags() models.Tags 95 Deleted() bool 96 97 // InfluxQL expression associated with series during filtering. 98 Expr() influxql.Expr 99} 100 101// SeriesIterator represents a iterator over a list of series. 102type SeriesIterator interface { 103 Close() error 104 Next() (SeriesElem, error) 105} 106 107// NewSeriesIteratorAdapter returns an adapter for converting series ids to series. 108func NewSeriesIteratorAdapter(sfile *SeriesFile, itr SeriesIDIterator) SeriesIterator { 109 return &seriesIteratorAdapter{ 110 sfile: sfile, 111 itr: itr, 112 } 113} 114 115type seriesIteratorAdapter struct { 116 sfile *SeriesFile 117 itr SeriesIDIterator 118} 119 120func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() } 121 122func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) { 123 for { 124 elem, err := itr.itr.Next() 125 if err != nil { 126 return nil, err 127 } else if elem.SeriesID == 0 { 128 return nil, nil 129 } 130 131 // Skip if this key has been tombstoned. 132 key := itr.sfile.SeriesKey(elem.SeriesID) 133 if len(key) == 0 { 134 continue 135 } 136 137 name, tags := ParseSeriesKey(key) 138 deleted := itr.sfile.IsDeleted(elem.SeriesID) 139 return &seriesElemAdapter{ 140 name: name, 141 tags: tags, 142 deleted: deleted, 143 expr: elem.Expr, 144 }, nil 145 } 146} 147 148type seriesElemAdapter struct { 149 name []byte 150 tags models.Tags 151 deleted bool 152 expr influxql.Expr 153} 154 155func (e *seriesElemAdapter) Name() []byte { return e.name } 156func (e *seriesElemAdapter) Tags() models.Tags { return e.tags } 157func (e *seriesElemAdapter) Deleted() bool { return e.deleted } 158func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr } 159 160// SeriesIDElem represents a single series and optional expression. 161type SeriesIDElem struct { 162 SeriesID uint64 163 Expr influxql.Expr 164} 165 166// SeriesIDElems represents a list of series id elements. 167type SeriesIDElems []SeriesIDElem 168 169func (a SeriesIDElems) Len() int { return len(a) } 170func (a SeriesIDElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 171func (a SeriesIDElems) Less(i, j int) bool { return a[i].SeriesID < a[j].SeriesID } 172 173// SeriesIDIterator represents a iterator over a list of series ids. 174type SeriesIDIterator interface { 175 Next() (SeriesIDElem, error) 176 Close() error 177} 178 179// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet. 180type SeriesIDSetIterator interface { 181 SeriesIDIterator 182 SeriesIDSet() *SeriesIDSet 183} 184 185type seriesIDSetIterator struct { 186 ss *SeriesIDSet 187 itr SeriesIDSetIterable 188} 189 190func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator { 191 if ss == nil || ss.bitmap == nil { 192 return nil 193 } 194 return &seriesIDSetIterator{ss: ss, itr: ss.Iterator()} 195} 196 197func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) { 198 if !itr.itr.HasNext() { 199 return SeriesIDElem{}, nil 200 } 201 return SeriesIDElem{SeriesID: uint64(itr.itr.Next())}, nil 202} 203 204func (itr *seriesIDSetIterator) Close() error { return nil } 205 206func (itr *seriesIDSetIterator) SeriesIDSet() *SeriesIDSet { return itr.ss } 207 208// NewSeriesIDSetIterators returns a slice of SeriesIDSetIterator if all itrs 209// can be type casted. Otherwise returns nil. 210func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator { 211 if len(itrs) == 0 { 212 return nil 213 } 214 215 a := make([]SeriesIDSetIterator, len(itrs)) 216 for i := range itrs { 217 if itr, ok := itrs[i].(SeriesIDSetIterator); ok { 218 a[i] = itr 219 } else { 220 return nil 221 } 222 } 223 return a 224} 225 226// ReadAllSeriesIDIterator returns all ids from the iterator. 227func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) { 228 if itr == nil { 229 return nil, nil 230 } 231 232 var a []uint64 233 for { 234 e, err := itr.Next() 235 if err != nil { 236 return nil, err 237 } else if e.SeriesID == 0 { 238 break 239 } 240 a = append(a, e.SeriesID) 241 } 242 return a, nil 243} 244 245// NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice. 246func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator { 247 return &SeriesIDSliceIterator{ids: ids} 248} 249 250// SeriesIDSliceIterator iterates over a slice of series ids. 251type SeriesIDSliceIterator struct { 252 ids []uint64 253} 254 255// Next returns the next series id in the slice. 256func (itr *SeriesIDSliceIterator) Next() (SeriesIDElem, error) { 257 if len(itr.ids) == 0 { 258 return SeriesIDElem{}, nil 259 } 260 id := itr.ids[0] 261 itr.ids = itr.ids[1:] 262 return SeriesIDElem{SeriesID: id}, nil 263} 264 265func (itr *SeriesIDSliceIterator) Close() error { return nil } 266 267// SeriesIDSet returns a set of all remaining ids. 268func (itr *SeriesIDSliceIterator) SeriesIDSet() *SeriesIDSet { 269 s := NewSeriesIDSet() 270 for _, id := range itr.ids { 271 s.AddNoLock(id) 272 } 273 return s 274} 275 276type SeriesIDIterators []SeriesIDIterator 277 278func (a SeriesIDIterators) Close() (err error) { 279 for i := range a { 280 if e := a[i].Close(); e != nil && err == nil { 281 err = e 282 } 283 } 284 return err 285} 286 287func (a SeriesIDIterators) filterNonNil() []SeriesIDIterator { 288 other := make([]SeriesIDIterator, 0, len(a)) 289 for _, itr := range a { 290 if itr == nil { 291 continue 292 } 293 other = append(other, itr) 294 } 295 return other 296} 297 298// seriesQueryAdapterIterator adapts SeriesIDIterator to an influxql.Iterator. 299type seriesQueryAdapterIterator struct { 300 once sync.Once 301 sfile *SeriesFile 302 itr SeriesIDIterator 303 fieldset *MeasurementFieldSet 304 opt query.IteratorOptions 305 306 point query.FloatPoint // reusable point 307} 308 309// NewSeriesQueryAdapterIterator returns a new instance of SeriesQueryAdapterIterator. 310func NewSeriesQueryAdapterIterator(sfile *SeriesFile, itr SeriesIDIterator, fieldset *MeasurementFieldSet, opt query.IteratorOptions) query.Iterator { 311 return &seriesQueryAdapterIterator{ 312 sfile: sfile, 313 itr: itr, 314 fieldset: fieldset, 315 point: query.FloatPoint{ 316 Aux: make([]interface{}, len(opt.Aux)), 317 }, 318 opt: opt, 319 } 320} 321 322// Stats returns stats about the points processed. 323func (itr *seriesQueryAdapterIterator) Stats() query.IteratorStats { return query.IteratorStats{} } 324 325// Close closes the iterator. 326func (itr *seriesQueryAdapterIterator) Close() error { 327 itr.once.Do(func() { 328 itr.itr.Close() 329 }) 330 return nil 331} 332 333// Next emits the next point in the iterator. 334func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) { 335 for { 336 // Read next series element. 337 e, err := itr.itr.Next() 338 if err != nil { 339 return nil, err 340 } else if e.SeriesID == 0 { 341 return nil, nil 342 } 343 344 // Skip if key has been tombstoned. 345 seriesKey := itr.sfile.SeriesKey(e.SeriesID) 346 if len(seriesKey) == 0 { 347 continue 348 } 349 350 // Convert to a key. 351 name, tags := ParseSeriesKey(seriesKey) 352 key := string(models.MakeKey(name, tags)) 353 354 // Write auxiliary fields. 355 for i, f := range itr.opt.Aux { 356 switch f.Val { 357 case "key": 358 itr.point.Aux[i] = key 359 } 360 } 361 return &itr.point, nil 362 } 363} 364 365// filterUndeletedSeriesIDIterator returns all series which are not deleted. 366type filterUndeletedSeriesIDIterator struct { 367 sfile *SeriesFile 368 itr SeriesIDIterator 369} 370 371// FilterUndeletedSeriesIDIterator returns an iterator which filters all deleted series. 372func FilterUndeletedSeriesIDIterator(sfile *SeriesFile, itr SeriesIDIterator) SeriesIDIterator { 373 if itr == nil { 374 return nil 375 } 376 return &filterUndeletedSeriesIDIterator{sfile: sfile, itr: itr} 377} 378 379func (itr *filterUndeletedSeriesIDIterator) Close() error { 380 return itr.itr.Close() 381} 382 383func (itr *filterUndeletedSeriesIDIterator) Next() (SeriesIDElem, error) { 384 for { 385 e, err := itr.itr.Next() 386 if err != nil { 387 return SeriesIDElem{}, err 388 } else if e.SeriesID == 0 { 389 return SeriesIDElem{}, nil 390 } else if itr.sfile.IsDeleted(e.SeriesID) { 391 continue 392 } 393 return e, nil 394 } 395} 396 397// seriesIDExprIterator is an iterator that attaches an associated expression. 398type seriesIDExprIterator struct { 399 itr SeriesIDIterator 400 expr influxql.Expr 401} 402 403// newSeriesIDExprIterator returns a new instance of seriesIDExprIterator. 404func newSeriesIDExprIterator(itr SeriesIDIterator, expr influxql.Expr) SeriesIDIterator { 405 if itr == nil { 406 return nil 407 } 408 409 return &seriesIDExprIterator{ 410 itr: itr, 411 expr: expr, 412 } 413} 414 415func (itr *seriesIDExprIterator) Close() error { 416 return itr.itr.Close() 417} 418 419// Next returns the next element in the iterator. 420func (itr *seriesIDExprIterator) Next() (SeriesIDElem, error) { 421 elem, err := itr.itr.Next() 422 if err != nil { 423 return SeriesIDElem{}, err 424 } else if elem.SeriesID == 0 { 425 return SeriesIDElem{}, nil 426 } 427 elem.Expr = itr.expr 428 return elem, nil 429} 430 431// MergeSeriesIDIterators returns an iterator that merges a set of iterators. 432// Iterators that are first in the list take precedence and a deletion by those 433// early iterators will invalidate elements by later iterators. 434func MergeSeriesIDIterators(itrs ...SeriesIDIterator) SeriesIDIterator { 435 if n := len(itrs); n == 0 { 436 return nil 437 } else if n == 1 { 438 return itrs[0] 439 } 440 itrs = SeriesIDIterators(itrs).filterNonNil() 441 442 // Merge as series id sets, if available. 443 if a := NewSeriesIDSetIterators(itrs); a != nil { 444 sets := make([]*SeriesIDSet, len(a)) 445 for i := range a { 446 sets[i] = a[i].SeriesIDSet() 447 } 448 449 ss := NewSeriesIDSet() 450 ss.Merge(sets...) 451 SeriesIDIterators(itrs).Close() 452 return NewSeriesIDSetIterator(ss) 453 } 454 455 return &seriesIDMergeIterator{ 456 buf: make([]SeriesIDElem, len(itrs)), 457 itrs: itrs, 458 } 459} 460 461// seriesIDMergeIterator is an iterator that merges multiple iterators together. 462type seriesIDMergeIterator struct { 463 buf []SeriesIDElem 464 itrs []SeriesIDIterator 465} 466 467func (itr *seriesIDMergeIterator) Close() error { 468 SeriesIDIterators(itr.itrs).Close() 469 return nil 470} 471 472// Next returns the element with the next lowest name/tags across the iterators. 473func (itr *seriesIDMergeIterator) Next() (SeriesIDElem, error) { 474 // Find next lowest id amongst the buffers. 475 var elem SeriesIDElem 476 for i := range itr.buf { 477 buf := &itr.buf[i] 478 479 // Fill buffer. 480 if buf.SeriesID == 0 { 481 elem, err := itr.itrs[i].Next() 482 if err != nil { 483 return SeriesIDElem{}, nil 484 } else if elem.SeriesID == 0 { 485 continue 486 } 487 itr.buf[i] = elem 488 } 489 490 if elem.SeriesID == 0 || buf.SeriesID < elem.SeriesID { 491 elem = *buf 492 } 493 } 494 495 // Return EOF if no elements remaining. 496 if elem.SeriesID == 0 { 497 return SeriesIDElem{}, nil 498 } 499 500 // Clear matching buffers. 501 for i := range itr.buf { 502 if itr.buf[i].SeriesID == elem.SeriesID { 503 itr.buf[i].SeriesID = 0 504 } 505 } 506 return elem, nil 507} 508 509// IntersectSeriesIDIterators returns an iterator that only returns series which 510// occur in both iterators. If both series have associated expressions then 511// they are combined together. 512func IntersectSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { 513 if itr0 == nil || itr1 == nil { 514 if itr0 != nil { 515 itr0.Close() 516 } 517 if itr1 != nil { 518 itr1.Close() 519 } 520 return nil 521 } 522 523 // Create series id set, if available. 524 if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { 525 itr0.Close() 526 itr1.Close() 527 return NewSeriesIDSetIterator(a[0].SeriesIDSet().And(a[1].SeriesIDSet())) 528 } 529 530 return &seriesIDIntersectIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} 531} 532 533// seriesIDIntersectIterator is an iterator that merges two iterators together. 534type seriesIDIntersectIterator struct { 535 buf [2]SeriesIDElem 536 itrs [2]SeriesIDIterator 537} 538 539func (itr *seriesIDIntersectIterator) Close() (err error) { 540 if e := itr.itrs[0].Close(); e != nil && err == nil { 541 err = e 542 } 543 if e := itr.itrs[1].Close(); e != nil && err == nil { 544 err = e 545 } 546 return err 547} 548 549// Next returns the next element which occurs in both iterators. 550func (itr *seriesIDIntersectIterator) Next() (_ SeriesIDElem, err error) { 551 for { 552 // Fill buffers. 553 if itr.buf[0].SeriesID == 0 { 554 if itr.buf[0], err = itr.itrs[0].Next(); err != nil { 555 return SeriesIDElem{}, err 556 } 557 } 558 if itr.buf[1].SeriesID == 0 { 559 if itr.buf[1], err = itr.itrs[1].Next(); err != nil { 560 return SeriesIDElem{}, err 561 } 562 } 563 564 // Exit if either buffer is still empty. 565 if itr.buf[0].SeriesID == 0 || itr.buf[1].SeriesID == 0 { 566 return SeriesIDElem{}, nil 567 } 568 569 // Skip if both series are not equal. 570 if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b { 571 itr.buf[0].SeriesID = 0 572 continue 573 } else if a > b { 574 itr.buf[1].SeriesID = 0 575 continue 576 } 577 578 // Merge series together if equal. 579 elem := itr.buf[0] 580 581 // Attach expression. 582 expr0 := itr.buf[0].Expr 583 expr1 := itr.buf[1].Expr 584 if expr0 == nil { 585 elem.Expr = expr1 586 } else if expr1 == nil { 587 elem.Expr = expr0 588 } else { 589 elem.Expr = influxql.Reduce(&influxql.BinaryExpr{ 590 Op: influxql.AND, 591 LHS: expr0, 592 RHS: expr1, 593 }, nil) 594 } 595 596 itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0 597 return elem, nil 598 } 599} 600 601// UnionSeriesIDIterators returns an iterator that returns series from both 602// both iterators. If both series have associated expressions then they are 603// combined together. 604func UnionSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { 605 // Return other iterator if either one is nil. 606 if itr0 == nil { 607 return itr1 608 } else if itr1 == nil { 609 return itr0 610 } 611 612 // Create series id set, if available. 613 if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { 614 itr0.Close() 615 itr1.Close() 616 ss := NewSeriesIDSet() 617 ss.Merge(a[0].SeriesIDSet(), a[1].SeriesIDSet()) 618 return NewSeriesIDSetIterator(ss) 619 } 620 621 return &seriesIDUnionIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} 622} 623 624// seriesIDUnionIterator is an iterator that unions two iterators together. 625type seriesIDUnionIterator struct { 626 buf [2]SeriesIDElem 627 itrs [2]SeriesIDIterator 628} 629 630func (itr *seriesIDUnionIterator) Close() (err error) { 631 if e := itr.itrs[0].Close(); e != nil && err == nil { 632 err = e 633 } 634 if e := itr.itrs[1].Close(); e != nil && err == nil { 635 err = e 636 } 637 return err 638} 639 640// Next returns the next element which occurs in both iterators. 641func (itr *seriesIDUnionIterator) Next() (_ SeriesIDElem, err error) { 642 // Fill buffers. 643 if itr.buf[0].SeriesID == 0 { 644 if itr.buf[0], err = itr.itrs[0].Next(); err != nil { 645 return SeriesIDElem{}, err 646 } 647 } 648 if itr.buf[1].SeriesID == 0 { 649 if itr.buf[1], err = itr.itrs[1].Next(); err != nil { 650 return SeriesIDElem{}, err 651 } 652 } 653 654 // Return non-zero or lesser series. 655 if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a == 0 && b == 0 { 656 return SeriesIDElem{}, nil 657 } else if b == 0 || (a != 0 && a < b) { 658 elem := itr.buf[0] 659 itr.buf[0].SeriesID = 0 660 return elem, nil 661 } else if a == 0 || (b != 0 && a > b) { 662 elem := itr.buf[1] 663 itr.buf[1].SeriesID = 0 664 return elem, nil 665 } 666 667 // Attach element. 668 elem := itr.buf[0] 669 670 // Attach expression. 671 expr0 := itr.buf[0].Expr 672 expr1 := itr.buf[1].Expr 673 if expr0 != nil && expr1 != nil { 674 elem.Expr = influxql.Reduce(&influxql.BinaryExpr{ 675 Op: influxql.OR, 676 LHS: expr0, 677 RHS: expr1, 678 }, nil) 679 } else { 680 elem.Expr = nil 681 } 682 683 itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0 684 return elem, nil 685} 686 687// DifferenceSeriesIDIterators returns an iterator that only returns series which 688// occur the first iterator but not the second iterator. 689func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { 690 if itr0 == nil && itr1 == nil { 691 return nil 692 } else if itr1 == nil { 693 return itr0 694 } else if itr0 == nil { 695 itr1.Close() 696 return nil 697 } 698 699 // Create series id set, if available. 700 if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { 701 itr0.Close() 702 itr1.Close() 703 return NewSeriesIDSetIterator(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet())) 704 } 705 706 return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} 707} 708 709// seriesIDDifferenceIterator is an iterator that merges two iterators together. 710type seriesIDDifferenceIterator struct { 711 buf [2]SeriesIDElem 712 itrs [2]SeriesIDIterator 713} 714 715func (itr *seriesIDDifferenceIterator) Close() (err error) { 716 if e := itr.itrs[0].Close(); e != nil && err == nil { 717 err = e 718 } 719 if e := itr.itrs[1].Close(); e != nil && err == nil { 720 err = e 721 } 722 return err 723} 724 725// Next returns the next element which occurs only in the first iterator. 726func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) { 727 for { 728 // Fill buffers. 729 if itr.buf[0].SeriesID == 0 { 730 if itr.buf[0], err = itr.itrs[0].Next(); err != nil { 731 return SeriesIDElem{}, err 732 } 733 } 734 if itr.buf[1].SeriesID == 0 { 735 if itr.buf[1], err = itr.itrs[1].Next(); err != nil { 736 return SeriesIDElem{}, err 737 } 738 } 739 740 // Exit if first buffer is still empty. 741 if itr.buf[0].SeriesID == 0 { 742 return SeriesIDElem{}, nil 743 } else if itr.buf[1].SeriesID == 0 { 744 elem := itr.buf[0] 745 itr.buf[0].SeriesID = 0 746 return elem, nil 747 } 748 749 // Return first series if it's less. 750 // If second series is less then skip it. 751 // If both series are equal then skip both. 752 if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b { 753 elem := itr.buf[0] 754 itr.buf[0].SeriesID = 0 755 return elem, nil 756 } else if a > b { 757 itr.buf[1].SeriesID = 0 758 continue 759 } else { 760 itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0 761 continue 762 } 763 } 764} 765 766// seriesPointIterator adapts SeriesIterator to an influxql.Iterator. 767type seriesPointIterator struct { 768 once sync.Once 769 indexSet IndexSet 770 mitr MeasurementIterator 771 keys [][]byte 772 opt query.IteratorOptions 773 774 point query.FloatPoint // reusable point 775} 776 777// newSeriesPointIterator returns a new instance of seriesPointIterator. 778func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) { 779 // Only equality operators are allowed. 780 influxql.WalkFunc(opt.Condition, func(n influxql.Node) { 781 switch n := n.(type) { 782 case *influxql.BinaryExpr: 783 switch n.Op { 784 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, 785 influxql.OR, influxql.AND: 786 default: 787 err = errors.New("invalid tag comparison operator") 788 } 789 } 790 }) 791 if err != nil { 792 return nil, err 793 } 794 795 mitr, err := indexSet.MeasurementIterator() 796 if err != nil { 797 return nil, err 798 } 799 800 return &seriesPointIterator{ 801 indexSet: indexSet, 802 mitr: mitr, 803 point: query.FloatPoint{ 804 Aux: make([]interface{}, len(opt.Aux)), 805 }, 806 opt: opt, 807 }, nil 808} 809 810// Stats returns stats about the points processed. 811func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.IteratorStats{} } 812 813// Close closes the iterator. 814func (itr *seriesPointIterator) Close() (err error) { 815 itr.once.Do(func() { 816 if itr.mitr != nil { 817 err = itr.mitr.Close() 818 } 819 }) 820 return err 821} 822 823// Next emits the next point in the iterator. 824func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { 825 for { 826 // Read series keys for next measurement if no more keys remaining. 827 // Exit if there are no measurements remaining. 828 if len(itr.keys) == 0 { 829 m, err := itr.mitr.Next() 830 if err != nil { 831 return nil, err 832 } else if m == nil { 833 return nil, nil 834 } 835 836 if err := itr.readSeriesKeys(m); err != nil { 837 return nil, err 838 } 839 continue 840 } 841 842 name, tags := ParseSeriesKey(itr.keys[0]) 843 itr.keys = itr.keys[1:] 844 845 // TODO(edd): It seems to me like this authorisation check should be 846 // further down in the index. At this point we're going to be filtering 847 // series that have already been materialised in the LogFiles and 848 // IndexFiles. 849 if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) { 850 continue 851 } 852 853 // Convert to a key. 854 key := string(models.MakeKey(name, tags)) 855 856 // Write auxiliary fields. 857 for i, f := range itr.opt.Aux { 858 switch f.Val { 859 case "key": 860 itr.point.Aux[i] = key 861 } 862 } 863 864 return &itr.point, nil 865 } 866} 867 868func (itr *seriesPointIterator) readSeriesKeys(name []byte) error { 869 sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition) 870 if err != nil { 871 return err 872 } else if sitr == nil { 873 return nil 874 } 875 defer sitr.Close() 876 877 // Slurp all series keys. 878 itr.keys = itr.keys[:0] 879 for i := 0; ; i++ { 880 elem, err := sitr.Next() 881 if err != nil { 882 return err 883 } else if elem.SeriesID == 0 { 884 break 885 } 886 887 // Periodically check for interrupt. 888 if i&0xFF == 0xFF { 889 select { 890 case <-itr.opt.InterruptCh: 891 return itr.Close() 892 default: 893 } 894 } 895 896 key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID) 897 if len(key) == 0 { 898 continue 899 } 900 itr.keys = append(itr.keys, key) 901 } 902 903 // Sort keys. 904 sort.Sort(seriesKeys(itr.keys)) 905 return nil 906} 907 908// MeasurementIterator represents a iterator over a list of measurements. 909type MeasurementIterator interface { 910 Close() error 911 Next() ([]byte, error) 912} 913 914type MeasurementIterators []MeasurementIterator 915 916func (a MeasurementIterators) Close() (err error) { 917 for i := range a { 918 if e := a[i].Close(); e != nil && err == nil { 919 err = e 920 } 921 } 922 return err 923} 924 925type measurementSliceIterator struct { 926 names [][]byte 927} 928 929// NewMeasurementSliceIterator returns an iterator over a slice of in-memory measurement names. 930func NewMeasurementSliceIterator(names [][]byte) *measurementSliceIterator { 931 return &measurementSliceIterator{names: names} 932} 933 934func (itr *measurementSliceIterator) Close() (err error) { return nil } 935 936func (itr *measurementSliceIterator) Next() (name []byte, err error) { 937 if len(itr.names) == 0 { 938 return nil, nil 939 } 940 name, itr.names = itr.names[0], itr.names[1:] 941 return name, nil 942} 943 944// MergeMeasurementIterators returns an iterator that merges a set of iterators. 945// Iterators that are first in the list take precedence and a deletion by those 946// early iterators will invalidate elements by later iterators. 947func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator { 948 if len(itrs) == 0 { 949 return nil 950 } else if len(itrs) == 1 { 951 return itrs[0] 952 } 953 954 return &measurementMergeIterator{ 955 buf: make([][]byte, len(itrs)), 956 itrs: itrs, 957 } 958} 959 960type measurementMergeIterator struct { 961 buf [][]byte 962 itrs []MeasurementIterator 963} 964 965func (itr *measurementMergeIterator) Close() (err error) { 966 for i := range itr.itrs { 967 if e := itr.itrs[i].Close(); e != nil && err == nil { 968 err = e 969 } 970 } 971 return err 972} 973 974// Next returns the element with the next lowest name across the iterators. 975// 976// If multiple iterators contain the same name then the first is returned 977// and the remaining ones are skipped. 978func (itr *measurementMergeIterator) Next() (_ []byte, err error) { 979 // Find next lowest name amongst the buffers. 980 var name []byte 981 for i, buf := range itr.buf { 982 // Fill buffer if empty. 983 if buf == nil { 984 if buf, err = itr.itrs[i].Next(); err != nil { 985 return nil, err 986 } else if buf != nil { 987 itr.buf[i] = buf 988 } else { 989 continue 990 } 991 } 992 993 // Find next lowest name. 994 if name == nil || bytes.Compare(itr.buf[i], name) == -1 { 995 name = itr.buf[i] 996 } 997 } 998 999 // Return nil if no elements remaining. 1000 if name == nil { 1001 return nil, nil 1002 } 1003 1004 // Merge all elements together and clear buffers. 1005 for i, buf := range itr.buf { 1006 if buf == nil || !bytes.Equal(buf, name) { 1007 continue 1008 } 1009 itr.buf[i] = nil 1010 } 1011 return name, nil 1012} 1013 1014// TagKeyIterator represents a iterator over a list of tag keys. 1015type TagKeyIterator interface { 1016 Close() error 1017 Next() ([]byte, error) 1018} 1019 1020type TagKeyIterators []TagKeyIterator 1021 1022func (a TagKeyIterators) Close() (err error) { 1023 for i := range a { 1024 if e := a[i].Close(); e != nil && err == nil { 1025 err = e 1026 } 1027 } 1028 return err 1029} 1030 1031// NewTagKeySliceIterator returns a TagKeyIterator that iterates over a slice. 1032func NewTagKeySliceIterator(keys [][]byte) *tagKeySliceIterator { 1033 return &tagKeySliceIterator{keys: keys} 1034} 1035 1036// tagKeySliceIterator iterates over a slice of tag keys. 1037type tagKeySliceIterator struct { 1038 keys [][]byte 1039} 1040 1041// Next returns the next tag key in the slice. 1042func (itr *tagKeySliceIterator) Next() ([]byte, error) { 1043 if len(itr.keys) == 0 { 1044 return nil, nil 1045 } 1046 key := itr.keys[0] 1047 itr.keys = itr.keys[1:] 1048 return key, nil 1049} 1050 1051func (itr *tagKeySliceIterator) Close() error { return nil } 1052 1053// MergeTagKeyIterators returns an iterator that merges a set of iterators. 1054func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator { 1055 if len(itrs) == 0 { 1056 return nil 1057 } else if len(itrs) == 1 { 1058 return itrs[0] 1059 } 1060 1061 return &tagKeyMergeIterator{ 1062 buf: make([][]byte, len(itrs)), 1063 itrs: itrs, 1064 } 1065} 1066 1067type tagKeyMergeIterator struct { 1068 buf [][]byte 1069 itrs []TagKeyIterator 1070} 1071 1072func (itr *tagKeyMergeIterator) Close() error { 1073 for i := range itr.itrs { 1074 itr.itrs[i].Close() 1075 } 1076 return nil 1077} 1078 1079// Next returns the element with the next lowest key across the iterators. 1080// 1081// If multiple iterators contain the same key then the first is returned 1082// and the remaining ones are skipped. 1083func (itr *tagKeyMergeIterator) Next() (_ []byte, err error) { 1084 // Find next lowest key amongst the buffers. 1085 var key []byte 1086 for i, buf := range itr.buf { 1087 // Fill buffer. 1088 if buf == nil { 1089 if buf, err = itr.itrs[i].Next(); err != nil { 1090 return nil, err 1091 } else if buf != nil { 1092 itr.buf[i] = buf 1093 } else { 1094 continue 1095 } 1096 } 1097 1098 // Find next lowest key. 1099 if key == nil || bytes.Compare(buf, key) == -1 { 1100 key = buf 1101 } 1102 } 1103 1104 // Return nil if no elements remaining. 1105 if key == nil { 1106 return nil, nil 1107 } 1108 1109 // Merge elements and clear buffers. 1110 for i, buf := range itr.buf { 1111 if buf == nil || !bytes.Equal(buf, key) { 1112 continue 1113 } 1114 itr.buf[i] = nil 1115 } 1116 return key, nil 1117} 1118 1119// TagValueIterator represents a iterator over a list of tag values. 1120type TagValueIterator interface { 1121 Close() error 1122 Next() ([]byte, error) 1123} 1124 1125type TagValueIterators []TagValueIterator 1126 1127func (a TagValueIterators) Close() (err error) { 1128 for i := range a { 1129 if e := a[i].Close(); e != nil && err == nil { 1130 err = e 1131 } 1132 } 1133 return err 1134} 1135 1136// NewTagValueSliceIterator returns a TagValueIterator that iterates over a slice. 1137func NewTagValueSliceIterator(values [][]byte) *tagValueSliceIterator { 1138 return &tagValueSliceIterator{values: values} 1139} 1140 1141// tagValueSliceIterator iterates over a slice of tag values. 1142type tagValueSliceIterator struct { 1143 values [][]byte 1144} 1145 1146// Next returns the next tag value in the slice. 1147func (itr *tagValueSliceIterator) Next() ([]byte, error) { 1148 if len(itr.values) == 0 { 1149 return nil, nil 1150 } 1151 value := itr.values[0] 1152 itr.values = itr.values[1:] 1153 return value, nil 1154} 1155 1156func (itr *tagValueSliceIterator) Close() error { return nil } 1157 1158// MergeTagValueIterators returns an iterator that merges a set of iterators. 1159func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator { 1160 if len(itrs) == 0 { 1161 return nil 1162 } else if len(itrs) == 1 { 1163 return itrs[0] 1164 } 1165 1166 return &tagValueMergeIterator{ 1167 buf: make([][]byte, len(itrs)), 1168 itrs: itrs, 1169 } 1170} 1171 1172type tagValueMergeIterator struct { 1173 buf [][]byte 1174 itrs []TagValueIterator 1175} 1176 1177func (itr *tagValueMergeIterator) Close() error { 1178 for i := range itr.itrs { 1179 itr.itrs[i].Close() 1180 } 1181 return nil 1182} 1183 1184// Next returns the element with the next lowest value across the iterators. 1185// 1186// If multiple iterators contain the same value then the first is returned 1187// and the remaining ones are skipped. 1188func (itr *tagValueMergeIterator) Next() (_ []byte, err error) { 1189 // Find next lowest value amongst the buffers. 1190 var value []byte 1191 for i, buf := range itr.buf { 1192 // Fill buffer. 1193 if buf == nil { 1194 if buf, err = itr.itrs[i].Next(); err != nil { 1195 return nil, err 1196 } else if buf != nil { 1197 itr.buf[i] = buf 1198 } else { 1199 continue 1200 } 1201 } 1202 1203 // Find next lowest value. 1204 if value == nil || bytes.Compare(buf, value) == -1 { 1205 value = buf 1206 } 1207 } 1208 1209 // Return nil if no elements remaining. 1210 if value == nil { 1211 return nil, nil 1212 } 1213 1214 // Merge elements and clear buffers. 1215 for i, buf := range itr.buf { 1216 if buf == nil || !bytes.Equal(buf, value) { 1217 continue 1218 } 1219 itr.buf[i] = nil 1220 } 1221 return value, nil 1222} 1223 1224// IndexSet represents a list of indexes, all belonging to one database. 1225type IndexSet struct { 1226 Indexes []Index // The set of indexes comprising this IndexSet. 1227 SeriesFile *SeriesFile // The Series File associated with the db for this set. 1228 fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB. 1229} 1230 1231// HasInmemIndex returns true if any in-memory index is in use. 1232func (is IndexSet) HasInmemIndex() bool { 1233 for _, idx := range is.Indexes { 1234 if idx.Type() == InmemIndexName { 1235 return true 1236 } 1237 } 1238 return false 1239} 1240 1241// Database returns the database name of the first index. 1242func (is IndexSet) Database() string { 1243 if len(is.Indexes) == 0 { 1244 return "" 1245 } 1246 return is.Indexes[0].Database() 1247} 1248 1249// HasField determines if any of the field sets on the set of indexes in the 1250// IndexSet have the provided field for the provided measurement. 1251func (is IndexSet) HasField(measurement []byte, field string) bool { 1252 if len(is.Indexes) == 0 { 1253 return false 1254 } 1255 1256 if len(is.fieldSets) == 0 { 1257 // field sets may not have been initialised yet. 1258 is.fieldSets = make([]*MeasurementFieldSet, 0, len(is.Indexes)) 1259 for _, idx := range is.Indexes { 1260 is.fieldSets = append(is.fieldSets, idx.FieldSet()) 1261 } 1262 } 1263 1264 for _, fs := range is.fieldSets { 1265 if fs.Fields(measurement).HasField(field) { 1266 return true 1267 } 1268 } 1269 return false 1270} 1271 1272// DedupeInmemIndexes returns an index set which removes duplicate indexes. 1273// Useful because inmem indexes are shared by shards per database. 1274func (is IndexSet) DedupeInmemIndexes() IndexSet { 1275 other := IndexSet{ 1276 Indexes: make([]Index, 0, len(is.Indexes)), 1277 SeriesFile: is.SeriesFile, 1278 fieldSets: make([]*MeasurementFieldSet, 0, len(is.Indexes)), 1279 } 1280 1281 uniqueIndexes := make(map[uintptr]Index) 1282 for _, idx := range is.Indexes { 1283 uniqueIndexes[idx.UniqueReferenceID()] = idx 1284 } 1285 1286 for _, idx := range uniqueIndexes { 1287 other.Indexes = append(other.Indexes, idx) 1288 other.fieldSets = append(other.fieldSets, idx.FieldSet()) 1289 } 1290 1291 return other 1292} 1293 1294// MeasurementNamesByExpr returns a slice of measurement names matching the 1295// provided condition. If no condition is provided then all names are returned. 1296func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { 1297 release := is.SeriesFile.Retain() 1298 defer release() 1299 1300 // Return filtered list if expression exists. 1301 if expr != nil { 1302 names, err := is.measurementNamesByExpr(auth, expr) 1303 if err != nil { 1304 return nil, err 1305 } 1306 return slices.CopyChunkedByteSlices(names, 1000), nil 1307 } 1308 1309 itr, err := is.measurementIterator() 1310 if err != nil { 1311 return nil, err 1312 } else if itr == nil { 1313 return nil, nil 1314 } 1315 defer itr.Close() 1316 1317 // Iterate over all measurements if no condition exists. 1318 var names [][]byte 1319 for { 1320 e, err := itr.Next() 1321 if err != nil { 1322 return nil, err 1323 } else if e == nil { 1324 break 1325 } 1326 1327 // Determine if there exists at least one authorised series for the 1328 // measurement name. 1329 if is.measurementAuthorizedSeries(auth, e) { 1330 names = append(names, e) 1331 } 1332 } 1333 return slices.CopyChunkedByteSlices(names, 1000), nil 1334} 1335 1336func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { 1337 if expr == nil { 1338 return nil, nil 1339 } 1340 1341 switch e := expr.(type) { 1342 case *influxql.BinaryExpr: 1343 switch e.Op { 1344 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: 1345 tag, ok := e.LHS.(*influxql.VarRef) 1346 if !ok { 1347 return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) 1348 } 1349 1350 // Retrieve value or regex expression from RHS. 1351 var value string 1352 var regex *regexp.Regexp 1353 if influxql.IsRegexOp(e.Op) { 1354 re, ok := e.RHS.(*influxql.RegexLiteral) 1355 if !ok { 1356 return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) 1357 } 1358 regex = re.Val 1359 } else { 1360 s, ok := e.RHS.(*influxql.StringLiteral) 1361 if !ok { 1362 return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) 1363 } 1364 value = s.Val 1365 } 1366 1367 // Match on name, if specified. 1368 if tag.Val == "_name" { 1369 return is.measurementNamesByNameFilter(auth, e.Op, value, regex) 1370 } else if influxql.IsSystemName(tag.Val) { 1371 return nil, nil 1372 } 1373 return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex) 1374 1375 case influxql.OR, influxql.AND: 1376 lhs, err := is.measurementNamesByExpr(auth, e.LHS) 1377 if err != nil { 1378 return nil, err 1379 } 1380 1381 rhs, err := is.measurementNamesByExpr(auth, e.RHS) 1382 if err != nil { 1383 return nil, err 1384 } 1385 1386 if e.Op == influxql.OR { 1387 return bytesutil.Union(lhs, rhs), nil 1388 } 1389 return bytesutil.Intersect(lhs, rhs), nil 1390 1391 default: 1392 return nil, fmt.Errorf("invalid tag comparison operator") 1393 } 1394 1395 case *influxql.ParenExpr: 1396 return is.measurementNamesByExpr(auth, e.Expr) 1397 default: 1398 return nil, fmt.Errorf("%#v", expr) 1399 } 1400} 1401 1402// measurementNamesByNameFilter returns matching measurement names in sorted order. 1403func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { 1404 itr, err := is.measurementIterator() 1405 if err != nil { 1406 return nil, err 1407 } else if itr == nil { 1408 return nil, nil 1409 } 1410 defer itr.Close() 1411 1412 var names [][]byte 1413 for { 1414 e, err := itr.Next() 1415 if err != nil { 1416 return nil, err 1417 } else if e == nil { 1418 break 1419 } 1420 1421 var matched bool 1422 switch op { 1423 case influxql.EQ: 1424 matched = string(e) == val 1425 case influxql.NEQ: 1426 matched = string(e) != val 1427 case influxql.EQREGEX: 1428 matched = regex.Match(e) 1429 case influxql.NEQREGEX: 1430 matched = !regex.Match(e) 1431 } 1432 1433 if matched && is.measurementAuthorizedSeries(auth, e) { 1434 names = append(names, e) 1435 } 1436 } 1437 bytesutil.Sort(names) 1438 return names, nil 1439} 1440 1441func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { 1442 var names [][]byte 1443 1444 mitr, err := is.measurementIterator() 1445 if err != nil { 1446 return nil, err 1447 } else if mitr == nil { 1448 return nil, nil 1449 } 1450 defer mitr.Close() 1451 1452 // valEqual determines if the provided []byte is equal to the tag value 1453 // to be filtered on. 1454 valEqual := regex.Match 1455 if op == influxql.EQ || op == influxql.NEQ { 1456 vb := []byte(val) 1457 valEqual = func(b []byte) bool { return bytes.Equal(vb, b) } 1458 } 1459 1460 var tagMatch bool 1461 var authorized bool 1462 for { 1463 me, err := mitr.Next() 1464 if err != nil { 1465 return nil, err 1466 } else if me == nil { 1467 break 1468 } 1469 // If the measurement doesn't have the tag key, then it won't be considered. 1470 if ok, err := is.hasTagKey(me, []byte(key)); err != nil { 1471 return nil, err 1472 } else if !ok { 1473 continue 1474 } 1475 tagMatch = false 1476 // Authorization must be explicitly granted when an authorizer is present. 1477 authorized = query.AuthorizerIsOpen(auth) 1478 1479 vitr, err := is.tagValueIterator(me, []byte(key)) 1480 if err != nil { 1481 return nil, err 1482 } 1483 1484 if vitr != nil { 1485 defer vitr.Close() 1486 for { 1487 ve, err := vitr.Next() 1488 if err != nil { 1489 return nil, err 1490 } else if ve == nil { 1491 break 1492 } 1493 if !valEqual(ve) { 1494 continue 1495 } 1496 1497 tagMatch = true 1498 if query.AuthorizerIsOpen(auth) { 1499 break 1500 } 1501 1502 // When an authorizer is present, the measurement should be 1503 // included only if one of it's series is authorized. 1504 sitr, err := is.tagValueSeriesIDIterator(me, []byte(key), ve) 1505 if err != nil { 1506 return nil, err 1507 } else if sitr == nil { 1508 continue 1509 } 1510 defer sitr.Close() 1511 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1512 1513 // Locate a series with this matching tag value that's authorized. 1514 for { 1515 se, err := sitr.Next() 1516 if err != nil { 1517 return nil, err 1518 } 1519 1520 if se.SeriesID == 0 { 1521 break 1522 } 1523 1524 name, tags := is.SeriesFile.Series(se.SeriesID) 1525 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1526 authorized = true 1527 break 1528 } 1529 } 1530 1531 if err := sitr.Close(); err != nil { 1532 return nil, err 1533 } 1534 1535 if tagMatch && authorized { 1536 // The measurement can definitely be included or rejected. 1537 break 1538 } 1539 } 1540 if err := vitr.Close(); err != nil { 1541 return nil, err 1542 } 1543 } 1544 1545 // For negation operators, to determine if the measurement is authorized, 1546 // an authorized series belonging to the measurement must be located. 1547 // Then, the measurement can be added iff !tagMatch && authorized. 1548 if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch { 1549 authorized = is.measurementAuthorizedSeries(auth, me) 1550 } 1551 1552 // tags match | operation is EQ | measurement matches 1553 // -------------------------------------------------- 1554 // True | True | True 1555 // True | False | False 1556 // False | True | False 1557 // False | False | True 1558 if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized { 1559 names = append(names, me) 1560 continue 1561 } 1562 } 1563 1564 bytesutil.Sort(names) 1565 return names, nil 1566} 1567 1568// measurementAuthorizedSeries determines if the measurement contains a series 1569// that is authorized to be read. 1570func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte) bool { 1571 if query.AuthorizerIsOpen(auth) { 1572 return true 1573 } 1574 1575 sitr, err := is.measurementSeriesIDIterator(name) 1576 if err != nil || sitr == nil { 1577 return false 1578 } 1579 defer sitr.Close() 1580 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 1581 1582 for { 1583 series, err := sitr.Next() 1584 if err != nil { 1585 return false 1586 } 1587 1588 if series.SeriesID == 0 { 1589 return false // End of iterator 1590 } 1591 1592 name, tags := is.SeriesFile.Series(series.SeriesID) 1593 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1594 return true 1595 } 1596 } 1597} 1598 1599// HasTagKey returns true if the tag key exists in any index for the provided 1600// measurement. 1601func (is IndexSet) HasTagKey(name, key []byte) (bool, error) { 1602 return is.hasTagKey(name, key) 1603} 1604 1605// hasTagKey returns true if the tag key exists in any index for the provided 1606// measurement, and guarantees to never take a lock on the series file. 1607func (is IndexSet) hasTagKey(name, key []byte) (bool, error) { 1608 for _, idx := range is.Indexes { 1609 if ok, err := idx.HasTagKey(name, key); err != nil { 1610 return false, err 1611 } else if ok { 1612 return true, nil 1613 } 1614 } 1615 return false, nil 1616} 1617 1618// HasTagValue returns true if the tag value exists in any index for the provided 1619// measurement and tag key. 1620func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) { 1621 for _, idx := range is.Indexes { 1622 if ok, err := idx.HasTagValue(name, key, value); err != nil { 1623 return false, err 1624 } else if ok { 1625 return true, nil 1626 } 1627 } 1628 return false, nil 1629} 1630 1631// MeasurementIterator returns an iterator over all measurements in the index. 1632func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) { 1633 return is.measurementIterator() 1634} 1635 1636// measurementIterator returns an iterator over all measurements in the index. 1637// It guarantees to never take any locks on the underlying series file. 1638func (is IndexSet) measurementIterator() (MeasurementIterator, error) { 1639 a := make([]MeasurementIterator, 0, len(is.Indexes)) 1640 for _, idx := range is.Indexes { 1641 itr, err := idx.MeasurementIterator() 1642 if err != nil { 1643 MeasurementIterators(a).Close() 1644 return nil, err 1645 } else if itr != nil { 1646 a = append(a, itr) 1647 } 1648 } 1649 return MergeMeasurementIterators(a...), nil 1650} 1651 1652// TagKeyIterator returns a key iterator for a measurement. 1653func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) { 1654 return is.tagKeyIterator(name) 1655} 1656 1657// tagKeyIterator returns a key iterator for a measurement. It guarantees to never 1658// take any locks on the underlying series file. 1659func (is IndexSet) tagKeyIterator(name []byte) (TagKeyIterator, error) { 1660 a := make([]TagKeyIterator, 0, len(is.Indexes)) 1661 for _, idx := range is.Indexes { 1662 itr, err := idx.TagKeyIterator(name) 1663 if err != nil { 1664 TagKeyIterators(a).Close() 1665 return nil, err 1666 } else if itr != nil { 1667 a = append(a, itr) 1668 } 1669 } 1670 return MergeTagKeyIterators(a...), nil 1671} 1672 1673// TagValueIterator returns a value iterator for a tag key. 1674func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) { 1675 return is.tagValueIterator(name, key) 1676} 1677 1678// tagValueIterator returns a value iterator for a tag key. It guarantees to never 1679// take any locks on the underlying series file. 1680func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) { 1681 a := make([]TagValueIterator, 0, len(is.Indexes)) 1682 for _, idx := range is.Indexes { 1683 itr, err := idx.TagValueIterator(name, key) 1684 if err != nil { 1685 TagValueIterators(a).Close() 1686 return nil, err 1687 } else if itr != nil { 1688 a = append(a, itr) 1689 } 1690 } 1691 return MergeTagValueIterators(a...), nil 1692} 1693 1694// TagKeyHasAuthorizedSeries determines if there exists an authorized series for 1695// the provided measurement name and tag key. 1696func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) { 1697 if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) { 1698 return true, nil 1699 } 1700 1701 release := is.SeriesFile.Retain() 1702 defer release() 1703 1704 itr, err := is.tagKeySeriesIDIterator(name, tagKey) 1705 if err != nil { 1706 return false, err 1707 } else if itr == nil { 1708 return false, nil 1709 } 1710 defer itr.Close() 1711 itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) 1712 1713 for { 1714 e, err := itr.Next() 1715 if err != nil { 1716 return false, err 1717 } 1718 1719 if e.SeriesID == 0 { 1720 return false, nil 1721 } 1722 1723 if query.AuthorizerIsOpen(auth) { 1724 return true, nil 1725 } 1726 1727 name, tags := is.SeriesFile.Series(e.SeriesID) 1728 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 1729 return true, nil 1730 } 1731 } 1732} 1733 1734// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series 1735// for the provided measurement. 1736func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { 1737 release := is.SeriesFile.Retain() 1738 defer release() 1739 1740 itr, err := is.measurementSeriesIDIterator(name) 1741 if err != nil { 1742 return nil, err 1743 } 1744 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 1745} 1746 1747// measurementSeriesIDIterator does not provide any locking on the Series file. 1748// 1749// See MeasurementSeriesIDIterator for more details. 1750func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { 1751 a := make([]SeriesIDIterator, 0, len(is.Indexes)) 1752 for _, idx := range is.Indexes { 1753 itr, err := idx.MeasurementSeriesIDIterator(name) 1754 if err != nil { 1755 SeriesIDIterators(a).Close() 1756 return nil, err 1757 } else if itr != nil { 1758 a = append(a, itr) 1759 } 1760 } 1761 return MergeSeriesIDIterators(a...), nil 1762} 1763 1764// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies 1765// the provided function. 1766func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { 1767 release := is.SeriesFile.Retain() 1768 defer release() 1769 1770 itr, err := is.tagKeyIterator(name) 1771 if err != nil { 1772 return err 1773 } else if itr == nil { 1774 return nil 1775 } 1776 defer itr.Close() 1777 1778 for { 1779 key, err := itr.Next() 1780 if err != nil { 1781 return err 1782 } else if key == nil { 1783 return nil 1784 } 1785 1786 if err := fn(key); err != nil { 1787 return err 1788 } 1789 } 1790} 1791 1792// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. 1793func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { 1794 release := is.SeriesFile.Retain() 1795 defer release() 1796 1797 keys := make(map[string]struct{}) 1798 for _, idx := range is.Indexes { 1799 m, err := idx.MeasurementTagKeysByExpr(name, expr) 1800 if err != nil { 1801 return nil, err 1802 } 1803 for k := range m { 1804 keys[k] = struct{}{} 1805 } 1806 } 1807 return keys, nil 1808} 1809 1810// TagKeySeriesIDIterator returns a series iterator for all values across a single key. 1811func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { 1812 release := is.SeriesFile.Retain() 1813 defer release() 1814 1815 itr, err := is.tagKeySeriesIDIterator(name, key) 1816 if err != nil { 1817 return nil, err 1818 } 1819 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 1820} 1821 1822// tagKeySeriesIDIterator returns a series iterator for all values across a 1823// single key. 1824// 1825// It guarantees to never take any locks on the series file. 1826func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { 1827 a := make([]SeriesIDIterator, 0, len(is.Indexes)) 1828 for _, idx := range is.Indexes { 1829 itr, err := idx.TagKeySeriesIDIterator(name, key) 1830 if err != nil { 1831 SeriesIDIterators(a).Close() 1832 return nil, err 1833 } else if itr != nil { 1834 a = append(a, itr) 1835 } 1836 } 1837 return MergeSeriesIDIterators(a...), nil 1838} 1839 1840// TagValueSeriesIDIterator returns a series iterator for a single tag value. 1841func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { 1842 release := is.SeriesFile.Retain() 1843 defer release() 1844 1845 itr, err := is.tagValueSeriesIDIterator(name, key, value) 1846 if err != nil { 1847 return nil, err 1848 } 1849 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 1850} 1851 1852// tagValueSeriesIDIterator does not provide any locking on the Series File. 1853// 1854// See TagValueSeriesIDIterator for more details. 1855func (is IndexSet) tagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { 1856 a := make([]SeriesIDIterator, 0, len(is.Indexes)) 1857 for _, idx := range is.Indexes { 1858 itr, err := idx.TagValueSeriesIDIterator(name, key, value) 1859 if err != nil { 1860 SeriesIDIterators(a).Close() 1861 return nil, err 1862 } else if itr != nil { 1863 a = append(a, itr) 1864 } 1865 } 1866 return MergeSeriesIDIterators(a...), nil 1867} 1868 1869// MeasurementSeriesByExprIterator returns a series iterator for a measurement 1870// that is filtered by expr. If expr only contains time expressions then this 1871// call is equivalent to MeasurementSeriesIDIterator(). 1872func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { 1873 release := is.SeriesFile.Retain() 1874 defer release() 1875 return is.measurementSeriesByExprIterator(name, expr) 1876} 1877 1878// measurementSeriesByExprIterator returns a series iterator for a measurement 1879// that is filtered by expr. See MeasurementSeriesByExprIterator for more details. 1880// 1881// measurementSeriesByExprIterator guarantees to never take any locks on the 1882// series file. 1883func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { 1884 // Return all series for the measurement if there are no tag expressions. 1885 if expr == nil { 1886 itr, err := is.measurementSeriesIDIterator(name) 1887 if err != nil { 1888 return nil, err 1889 } 1890 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 1891 } 1892 1893 itr, err := is.seriesByExprIterator(name, expr) 1894 if err != nil { 1895 return nil, err 1896 } 1897 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 1898} 1899 1900// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. 1901func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { 1902 release := is.SeriesFile.Retain() 1903 defer release() 1904 1905 // Create iterator for all matching series. 1906 itr, err := is.measurementSeriesByExprIterator(name, expr) 1907 if err != nil { 1908 return nil, err 1909 } else if itr == nil { 1910 return nil, nil 1911 } 1912 defer itr.Close() 1913 1914 // measurementSeriesByExprIterator filters deleted series; no need to do so here. 1915 1916 // Iterate over all series and generate keys. 1917 var keys [][]byte 1918 for { 1919 e, err := itr.Next() 1920 if err != nil { 1921 return nil, err 1922 } else if e.SeriesID == 0 { 1923 break 1924 } 1925 1926 // Check for unsupported field filters. 1927 // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). 1928 if e.Expr != nil { 1929 if v, ok := e.Expr.(*influxql.BooleanLiteral); !ok || !v.Val { 1930 return nil, errors.New("fields not supported in WHERE clause during deletion") 1931 } 1932 } 1933 1934 seriesKey := is.SeriesFile.SeriesKey(e.SeriesID) 1935 if len(seriesKey) == 0 { 1936 continue 1937 } 1938 1939 name, tags := ParseSeriesKey(seriesKey) 1940 keys = append(keys, models.MakeKey(name, tags)) 1941 } 1942 1943 bytesutil.Sort(keys) 1944 1945 return keys, nil 1946} 1947 1948func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { 1949 switch expr := expr.(type) { 1950 case *influxql.BinaryExpr: 1951 switch expr.Op { 1952 case influxql.AND, influxql.OR: 1953 // Get the series IDs and filter expressions for the LHS. 1954 litr, err := is.seriesByExprIterator(name, expr.LHS) 1955 if err != nil { 1956 return nil, err 1957 } 1958 1959 // Get the series IDs and filter expressions for the RHS. 1960 ritr, err := is.seriesByExprIterator(name, expr.RHS) 1961 if err != nil { 1962 if litr != nil { 1963 litr.Close() 1964 } 1965 return nil, err 1966 } 1967 1968 // Intersect iterators if expression is "AND". 1969 if expr.Op == influxql.AND { 1970 return IntersectSeriesIDIterators(litr, ritr), nil 1971 } 1972 1973 // Union iterators if expression is "OR". 1974 return UnionSeriesIDIterators(litr, ritr), nil 1975 1976 default: 1977 return is.seriesByBinaryExprIterator(name, expr) 1978 } 1979 1980 case *influxql.ParenExpr: 1981 return is.seriesByExprIterator(name, expr.Expr) 1982 1983 case *influxql.BooleanLiteral: 1984 if expr.Val { 1985 return is.measurementSeriesIDIterator(name) 1986 } 1987 return nil, nil 1988 1989 default: 1990 return nil, nil 1991 } 1992} 1993 1994// seriesByBinaryExprIterator returns a series iterator and a filtering expression. 1995func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIDIterator, error) { 1996 // If this binary expression has another binary expression, then this 1997 // is some expression math and we should just pass it to the underlying query. 1998 if _, ok := n.LHS.(*influxql.BinaryExpr); ok { 1999 itr, err := is.measurementSeriesIDIterator(name) 2000 if err != nil { 2001 return nil, err 2002 } 2003 return newSeriesIDExprIterator(itr, n), nil 2004 } else if _, ok := n.RHS.(*influxql.BinaryExpr); ok { 2005 itr, err := is.measurementSeriesIDIterator(name) 2006 if err != nil { 2007 return nil, err 2008 } 2009 return newSeriesIDExprIterator(itr, n), nil 2010 } 2011 2012 // Retrieve the variable reference from the correct side of the expression. 2013 key, ok := n.LHS.(*influxql.VarRef) 2014 value := n.RHS 2015 if !ok { 2016 key, ok = n.RHS.(*influxql.VarRef) 2017 if !ok { 2018 // This is an expression we do not know how to evaluate. Let the 2019 // query engine take care of this. 2020 itr, err := is.measurementSeriesIDIterator(name) 2021 if err != nil { 2022 return nil, err 2023 } 2024 return newSeriesIDExprIterator(itr, n), nil 2025 } 2026 value = n.LHS 2027 } 2028 2029 // For fields, return all series from this measurement. 2030 if key.Val != "_name" && ((key.Type == influxql.Unknown && is.HasField(name, key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) { 2031 itr, err := is.measurementSeriesIDIterator(name) 2032 if err != nil { 2033 return nil, err 2034 } 2035 return newSeriesIDExprIterator(itr, n), nil 2036 } else if value, ok := value.(*influxql.VarRef); ok { 2037 // Check if the RHS is a variable and if it is a field. 2038 if value.Val != "_name" && ((value.Type == influxql.Unknown && is.HasField(name, value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) { 2039 itr, err := is.measurementSeriesIDIterator(name) 2040 if err != nil { 2041 return nil, err 2042 } 2043 return newSeriesIDExprIterator(itr, n), nil 2044 } 2045 } 2046 2047 // Create iterator based on value type. 2048 switch value := value.(type) { 2049 case *influxql.StringLiteral: 2050 return is.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op) 2051 case *influxql.RegexLiteral: 2052 return is.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op) 2053 case *influxql.VarRef: 2054 return is.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op) 2055 default: 2056 // We do not know how to evaluate this expression so pass it 2057 // on to the query engine. 2058 itr, err := is.measurementSeriesIDIterator(name) 2059 if err != nil { 2060 return nil, err 2061 } 2062 return newSeriesIDExprIterator(itr, n), nil 2063 } 2064} 2065 2066func (is IndexSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIDIterator, error) { 2067 // Special handling for "_name" to match measurement name. 2068 if bytes.Equal(key, []byte("_name")) { 2069 if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { 2070 return is.measurementSeriesIDIterator(name) 2071 } 2072 return nil, nil 2073 } 2074 2075 if op == influxql.EQ { 2076 // Match a specific value. 2077 if len(value) != 0 { 2078 return is.tagValueSeriesIDIterator(name, key, value) 2079 } 2080 2081 mitr, err := is.measurementSeriesIDIterator(name) 2082 if err != nil { 2083 return nil, err 2084 } 2085 2086 kitr, err := is.tagKeySeriesIDIterator(name, key) 2087 if err != nil { 2088 if mitr != nil { 2089 mitr.Close() 2090 } 2091 return nil, err 2092 } 2093 2094 // Return all measurement series that have no values from this tag key. 2095 return DifferenceSeriesIDIterators(mitr, kitr), nil 2096 } 2097 2098 // Return all measurement series without this tag value. 2099 if len(value) != 0 { 2100 mitr, err := is.measurementSeriesIDIterator(name) 2101 if err != nil { 2102 return nil, err 2103 } 2104 2105 vitr, err := is.tagValueSeriesIDIterator(name, key, value) 2106 if err != nil { 2107 if mitr != nil { 2108 mitr.Close() 2109 } 2110 return nil, err 2111 } 2112 2113 return DifferenceSeriesIDIterators(mitr, vitr), nil 2114 } 2115 2116 // Return all series across all values of this tag key. 2117 return is.tagKeySeriesIDIterator(name, key) 2118} 2119 2120func (is IndexSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIDIterator, error) { 2121 // Special handling for "_name" to match measurement name. 2122 if bytes.Equal(key, []byte("_name")) { 2123 match := value.Match(name) 2124 if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) { 2125 mitr, err := is.measurementSeriesIDIterator(name) 2126 if err != nil { 2127 return nil, err 2128 } 2129 return newSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil 2130 } 2131 return nil, nil 2132 } 2133 return is.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX) 2134} 2135 2136func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIDIterator, error) { 2137 itr0, err := is.tagKeySeriesIDIterator(name, key) 2138 if err != nil { 2139 return nil, err 2140 } 2141 2142 itr1, err := is.tagKeySeriesIDIterator(name, []byte(value.Val)) 2143 if err != nil { 2144 if itr0 != nil { 2145 itr0.Close() 2146 } 2147 return nil, err 2148 } 2149 2150 if op == influxql.EQ { 2151 return IntersectSeriesIDIterators(itr0, itr1), nil 2152 } 2153 return DifferenceSeriesIDIterators(itr0, itr1), nil 2154} 2155 2156// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value. 2157// If matches is false, returns iterators which do not match value. 2158func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { 2159 release := is.SeriesFile.Retain() 2160 defer release() 2161 itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches) 2162 if err != nil { 2163 return nil, err 2164 } 2165 return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil 2166} 2167 2168// matchTagValueSeriesIDIterator returns a series iterator for tags which match 2169// value. See MatchTagValueSeriesIDIterator for more details. 2170// 2171// It guarantees to never take any locks on the underlying series file. 2172func (is IndexSet) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { 2173 matchEmpty := value.MatchString("") 2174 if matches { 2175 if matchEmpty { 2176 return is.matchTagValueEqualEmptySeriesIDIterator(name, key, value) 2177 } 2178 return is.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value) 2179 } 2180 2181 if matchEmpty { 2182 return is.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value) 2183 } 2184 return is.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value) 2185} 2186 2187func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2188 vitr, err := is.tagValueIterator(name, key) 2189 if err != nil { 2190 return nil, err 2191 } else if vitr == nil { 2192 return is.measurementSeriesIDIterator(name) 2193 } 2194 defer vitr.Close() 2195 2196 var itrs []SeriesIDIterator 2197 if err := func() error { 2198 for { 2199 e, err := vitr.Next() 2200 if err != nil { 2201 return err 2202 } else if e == nil { 2203 break 2204 } 2205 2206 if !value.Match(e) { 2207 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2208 if err != nil { 2209 return err 2210 } else if itr != nil { 2211 itrs = append(itrs, itr) 2212 } 2213 } 2214 } 2215 return nil 2216 }(); err != nil { 2217 SeriesIDIterators(itrs).Close() 2218 return nil, err 2219 } 2220 2221 mitr, err := is.measurementSeriesIDIterator(name) 2222 if err != nil { 2223 SeriesIDIterators(itrs).Close() 2224 return nil, err 2225 } 2226 2227 return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil 2228} 2229 2230func (is IndexSet) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2231 vitr, err := is.tagValueIterator(name, key) 2232 if err != nil { 2233 return nil, err 2234 } else if vitr == nil { 2235 return nil, nil 2236 } 2237 defer vitr.Close() 2238 2239 var itrs []SeriesIDIterator 2240 for { 2241 e, err := vitr.Next() 2242 if err != nil { 2243 SeriesIDIterators(itrs).Close() 2244 return nil, err 2245 } else if e == nil { 2246 break 2247 } 2248 2249 if value.Match(e) { 2250 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2251 if err != nil { 2252 SeriesIDIterators(itrs).Close() 2253 return nil, err 2254 } else if itr != nil { 2255 itrs = append(itrs, itr) 2256 } 2257 } 2258 } 2259 return MergeSeriesIDIterators(itrs...), nil 2260} 2261 2262func (is IndexSet) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2263 vitr, err := is.tagValueIterator(name, key) 2264 if err != nil { 2265 return nil, err 2266 } else if vitr == nil { 2267 return nil, nil 2268 } 2269 defer vitr.Close() 2270 2271 var itrs []SeriesIDIterator 2272 for { 2273 e, err := vitr.Next() 2274 if err != nil { 2275 SeriesIDIterators(itrs).Close() 2276 return nil, err 2277 } else if e == nil { 2278 break 2279 } 2280 2281 if !value.Match(e) { 2282 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2283 if err != nil { 2284 SeriesIDIterators(itrs).Close() 2285 return nil, err 2286 } else if itr != nil { 2287 itrs = append(itrs, itr) 2288 } 2289 } 2290 } 2291 return MergeSeriesIDIterators(itrs...), nil 2292} 2293 2294func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) { 2295 vitr, err := is.tagValueIterator(name, key) 2296 if err != nil { 2297 return nil, err 2298 } else if vitr == nil { 2299 return is.measurementSeriesIDIterator(name) 2300 } 2301 defer vitr.Close() 2302 2303 var itrs []SeriesIDIterator 2304 for { 2305 e, err := vitr.Next() 2306 if err != nil { 2307 SeriesIDIterators(itrs).Close() 2308 return nil, err 2309 } else if e == nil { 2310 break 2311 } 2312 if value.Match(e) { 2313 itr, err := is.tagValueSeriesIDIterator(name, key, e) 2314 if err != nil { 2315 SeriesIDIterators(itrs).Close() 2316 return nil, err 2317 } else if itr != nil { 2318 itrs = append(itrs, itr) 2319 } 2320 } 2321 } 2322 2323 mitr, err := is.measurementSeriesIDIterator(name) 2324 if err != nil { 2325 SeriesIDIterators(itrs).Close() 2326 return nil, err 2327 } 2328 return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil 2329} 2330 2331// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys. 2332// 2333// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the 2334// position of the tag key in the keys argument. 2335// 2336// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending 2337// lexicographic order. 2338func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) { 2339 release := is.SeriesFile.Retain() 2340 defer release() 2341 return is.tagValuesByKeyAndExpr(auth, name, keys, expr) 2342} 2343 2344// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See 2345// TagValuesByKeyAndExpr for more details. 2346// 2347// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying 2348// series file. 2349func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { 2350 database := is.Database() 2351 2352 valueExpr := influxql.CloneExpr(expr) 2353 valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr { 2354 switch e := e.(type) { 2355 case *influxql.BinaryExpr: 2356 switch e.Op { 2357 case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: 2358 tag, ok := e.LHS.(*influxql.VarRef) 2359 if !ok || tag.Val != "value" { 2360 return nil 2361 } 2362 } 2363 } 2364 return e 2365 }), nil) 2366 2367 itr, err := is.seriesByExprIterator(name, expr) 2368 if err != nil { 2369 return nil, err 2370 } else if itr == nil { 2371 return nil, nil 2372 } 2373 itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) 2374 defer itr.Close() 2375 2376 keyIdxs := make(map[string]int, len(keys)) 2377 for ki, key := range keys { 2378 keyIdxs[key] = ki 2379 2380 // Check that keys are in order. 2381 if ki > 0 && key < keys[ki-1] { 2382 return nil, fmt.Errorf("keys %v are not in ascending order", keys) 2383 } 2384 } 2385 2386 resultSet := make([]map[string]struct{}, len(keys)) 2387 for i := 0; i < len(resultSet); i++ { 2388 resultSet[i] = make(map[string]struct{}) 2389 } 2390 2391 // Iterate all series to collect tag values. 2392 for { 2393 e, err := itr.Next() 2394 if err != nil { 2395 return nil, err 2396 } else if e.SeriesID == 0 { 2397 break 2398 } 2399 2400 buf := is.SeriesFile.SeriesKey(e.SeriesID) 2401 if len(buf) == 0 { 2402 continue 2403 } 2404 2405 if auth != nil { 2406 name, tags := ParseSeriesKey(buf) 2407 if !auth.AuthorizeSeriesRead(database, name, tags) { 2408 continue 2409 } 2410 } 2411 2412 _, buf = ReadSeriesKeyLen(buf) 2413 _, buf = ReadSeriesKeyMeasurement(buf) 2414 tagN, buf := ReadSeriesKeyTagN(buf) 2415 for i := 0; i < tagN; i++ { 2416 var key, value []byte 2417 key, value, buf = ReadSeriesKeyTag(buf) 2418 if valueExpr != nil { 2419 if !influxql.EvalBool(valueExpr, map[string]interface{}{"value": string(value)}) { 2420 continue 2421 } 2422 } 2423 2424 if idx, ok := keyIdxs[string(key)]; ok { 2425 resultSet[idx][string(value)] = struct{}{} 2426 } else if string(key) > keys[len(keys)-1] { 2427 // The tag key is > the largest key we're interested in. 2428 break 2429 } 2430 } 2431 } 2432 return resultSet, nil 2433} 2434 2435// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. 2436func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { 2437 if len(keys) == 0 { 2438 return nil, nil 2439 } 2440 2441 results := make([][]string, len(keys)) 2442 // If the keys are not sorted, then sort them. 2443 if !keysSorted { 2444 sort.Strings(keys) 2445 } 2446 2447 release := is.SeriesFile.Retain() 2448 defer release() 2449 2450 // No expression means that the values shouldn't be filtered; so fetch them 2451 // all. 2452 if expr == nil { 2453 for ki, key := range keys { 2454 vitr, err := is.tagValueIterator(name, []byte(key)) 2455 if err != nil { 2456 return nil, err 2457 } else if vitr == nil { 2458 break 2459 } 2460 defer vitr.Close() 2461 2462 // If no authorizer present then return all values. 2463 if query.AuthorizerIsOpen(auth) { 2464 for { 2465 val, err := vitr.Next() 2466 if err != nil { 2467 return nil, err 2468 } else if val == nil { 2469 break 2470 } 2471 results[ki] = append(results[ki], string(val)) 2472 } 2473 continue 2474 } 2475 2476 // Authorization is present — check all series with matching tag values 2477 // and measurements for the presence of an authorized series. 2478 for { 2479 val, err := vitr.Next() 2480 if err != nil { 2481 return nil, err 2482 } else if val == nil { 2483 break 2484 } 2485 2486 sitr, err := is.tagValueSeriesIDIterator(name, []byte(key), val) 2487 if err != nil { 2488 return nil, err 2489 } else if sitr == nil { 2490 continue 2491 } 2492 defer sitr.Close() 2493 sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) 2494 2495 for { 2496 se, err := sitr.Next() 2497 if err != nil { 2498 return nil, err 2499 } 2500 2501 if se.SeriesID == 0 { 2502 break 2503 } 2504 2505 name, tags := is.SeriesFile.Series(se.SeriesID) 2506 if auth.AuthorizeSeriesRead(is.Database(), name, tags) { 2507 results[ki] = append(results[ki], string(val)) 2508 break 2509 } 2510 } 2511 if err := sitr.Close(); err != nil { 2512 return nil, err 2513 } 2514 } 2515 } 2516 return results, nil 2517 } 2518 2519 // This is the case where we have filtered series by some WHERE condition. 2520 // We only care about the tag values for the keys given the 2521 // filtered set of series ids. 2522 resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr) 2523 if err != nil { 2524 return nil, err 2525 } 2526 2527 // Convert result sets into []string 2528 for i, s := range resultSet { 2529 values := make([]string, 0, len(s)) 2530 for v := range s { 2531 values = append(values, v) 2532 } 2533 sort.Strings(values) 2534 results[i] = values 2535 } 2536 return results, nil 2537} 2538 2539// TagSets returns an ordered list of tag sets for a measurement by dimension 2540// and filtered by an optional conditional expression. 2541func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { 2542 release := is.SeriesFile.Retain() 2543 defer release() 2544 2545 itr, err := is.measurementSeriesByExprIterator(name, opt.Condition) 2546 if err != nil { 2547 return nil, err 2548 } else if itr == nil { 2549 return nil, nil 2550 } 2551 defer itr.Close() 2552 // measurementSeriesByExprIterator filters deleted series IDs; no need to 2553 // do so here. 2554 2555 var dims []string 2556 if len(opt.Dimensions) > 0 { 2557 dims = make([]string, len(opt.Dimensions)) 2558 copy(dims, opt.Dimensions) 2559 sort.Strings(dims) 2560 } 2561 2562 // For every series, get the tag values for the requested tag keys i.e. 2563 // dimensions. This is the TagSet for that series. Series with the same 2564 // TagSet are then grouped together, because for the purpose of GROUP BY 2565 // they are part of the same composite series. 2566 tagSets := make(map[string]*query.TagSet, 64) 2567 var ( 2568 seriesN, maxSeriesN int 2569 db = is.Database() 2570 ) 2571 2572 if opt.MaxSeriesN > 0 { 2573 maxSeriesN = opt.MaxSeriesN 2574 } else { 2575 maxSeriesN = int(^uint(0) >> 1) 2576 } 2577 2578 // The tag sets require a string for each series key in the set, The series 2579 // file formatted keys need to be parsed into models format. Since they will 2580 // end up as strings we can re-use an intermediate buffer for this process. 2581 var keyBuf []byte 2582 var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration. 2583 for { 2584 se, err := itr.Next() 2585 if err != nil { 2586 return nil, err 2587 } else if se.SeriesID == 0 { 2588 break 2589 } 2590 2591 // Skip if the series has been tombstoned. 2592 key := sfile.SeriesKey(se.SeriesID) 2593 if len(key) == 0 { 2594 continue 2595 } 2596 2597 if seriesN&0x3fff == 0x3fff { 2598 // check every 16384 series if the query has been canceled 2599 select { 2600 case <-opt.InterruptCh: 2601 return nil, query.ErrQueryInterrupted 2602 default: 2603 } 2604 } 2605 2606 if seriesN > maxSeriesN { 2607 return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) 2608 } 2609 2610 // NOTE - must not escape this loop iteration. 2611 _, tagsBuf = ParseSeriesKeyInto(key, tagsBuf) 2612 if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) { 2613 continue 2614 } 2615 2616 var tagsAsKey []byte 2617 if len(dims) > 0 { 2618 tagsAsKey = MakeTagsKey(dims, tagsBuf) 2619 } 2620 2621 tagSet, ok := tagSets[string(tagsAsKey)] 2622 if !ok { 2623 // This TagSet is new, create a new entry for it. 2624 tagSet = &query.TagSet{ 2625 Tags: nil, 2626 Key: tagsAsKey, 2627 } 2628 } 2629 2630 // Associate the series and filter with the Tagset. 2631 keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf) 2632 tagSet.AddFilter(string(keyBuf), se.Expr) 2633 keyBuf = keyBuf[:0] 2634 2635 // Ensure it's back in the map. 2636 tagSets[string(tagsAsKey)] = tagSet 2637 seriesN++ 2638 } 2639 2640 // Sort the series in each tag set. 2641 for _, t := range tagSets { 2642 sort.Sort(t) 2643 } 2644 2645 // The TagSets have been created, as a map of TagSets. Just send 2646 // the values back as a slice, sorting for consistency. 2647 sortedTagsSets := make([]*query.TagSet, 0, len(tagSets)) 2648 for _, v := range tagSets { 2649 sortedTagsSets = append(sortedTagsSets, v) 2650 } 2651 sort.Sort(byTagKey(sortedTagsSets)) 2652 2653 return sortedTagsSets, nil 2654} 2655 2656// IndexFormat represents the format for an index. 2657type IndexFormat int 2658 2659const ( 2660 // InMemFormat is the format used by the original in-memory shared index. 2661 InMemFormat IndexFormat = 1 2662 2663 // TSI1Format is the format used by the tsi1 index. 2664 TSI1Format IndexFormat = 2 2665) 2666 2667// NewIndexFunc creates a new index. 2668type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index 2669 2670// newIndexFuncs is a lookup of index constructors by name. 2671var newIndexFuncs = make(map[string]NewIndexFunc) 2672 2673// RegisterIndex registers a storage index initializer by name. 2674func RegisterIndex(name string, fn NewIndexFunc) { 2675 if _, ok := newIndexFuncs[name]; ok { 2676 panic("index already registered: " + name) 2677 } 2678 newIndexFuncs[name] = fn 2679} 2680 2681// RegisteredIndexes returns the slice of currently registered indexes. 2682func RegisteredIndexes() []string { 2683 a := make([]string, 0, len(newIndexFuncs)) 2684 for k := range newIndexFuncs { 2685 a = append(a, k) 2686 } 2687 sort.Strings(a) 2688 return a 2689} 2690 2691// NewIndex returns an instance of an index based on its format. 2692// If the path does not exist then the DefaultFormat is used. 2693func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) { 2694 format := options.IndexVersion 2695 2696 // Use default format unless existing directory exists. 2697 _, err := os.Stat(path) 2698 if os.IsNotExist(err) { 2699 // nop, use default 2700 } else if err != nil { 2701 return nil, err 2702 } else if err == nil { 2703 format = TSI1IndexName 2704 } 2705 2706 // Lookup index by format. 2707 fn := newIndexFuncs[format] 2708 if fn == nil { 2709 return nil, fmt.Errorf("invalid index format: %q", format) 2710 } 2711 return fn(id, database, path, seriesIDSet, sfile, options), nil 2712} 2713 2714func MustOpenIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index { 2715 idx, err := NewIndex(id, database, path, seriesIDSet, sfile, options) 2716 if err != nil { 2717 panic(err) 2718 } else if err := idx.Open(); err != nil { 2719 panic(err) 2720 } 2721 return idx 2722} 2723 2724// assert will panic with a given formatted message if the given condition is false. 2725func assert(condition bool, msg string, v ...interface{}) { 2726 if !condition { 2727 panic(fmt.Sprintf("assert failed: "+msg, v...)) 2728 } 2729} 2730 2731type byTagKey []*query.TagSet 2732 2733func (t byTagKey) Len() int { return len(t) } 2734func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } 2735func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } 2736