1package tsi1 2 3import ( 4 "errors" 5 "fmt" 6 "io/ioutil" 7 "os" 8 "path/filepath" 9 "regexp" 10 "runtime" 11 "strconv" 12 "sync" 13 "sync/atomic" 14 "unsafe" 15 16 "github.com/cespare/xxhash" 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/pkg/estimator" 19 "github.com/influxdata/influxdb/pkg/estimator/hll" 20 "github.com/influxdata/influxdb/pkg/slices" 21 "github.com/influxdata/influxdb/tsdb" 22 "github.com/influxdata/influxql" 23 "go.uber.org/zap" 24) 25 26// IndexName is the name of the index. 27const IndexName = tsdb.TSI1IndexName 28 29// ErrCompactionInterrupted is returned if compactions are disabled or 30// an index is closed while a compaction is occurring. 31var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted") 32 33func init() { 34 // FIXME(edd): Remove this. 35 if os.Getenv("TSI_PARTITIONS") != "" { 36 i, err := strconv.Atoi(os.Getenv("TSI_PARTITIONS")) 37 if err != nil { 38 panic(err) 39 } 40 DefaultPartitionN = uint64(i) 41 } 42 43 tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index { 44 idx := NewIndex(sfile, db, WithPath(path), WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize))) 45 return idx 46 }) 47} 48 49// DefaultPartitionN determines how many shards the index will be partitioned into. 50// 51// NOTE: Currently, this must not be change once a database is created. Further, 52// it must also be a power of 2. 53// 54var DefaultPartitionN uint64 = 8 55 56// An IndexOption is a functional option for changing the configuration of 57// an Index. 58type IndexOption func(i *Index) 59 60// WithPath sets the root path of the Index 61var WithPath = func(path string) IndexOption { 62 return func(i *Index) { 63 i.path = path 64 } 65} 66 67// DisableCompactions disables compactions on the Index. 68var DisableCompactions = func() IndexOption { 69 return func(i *Index) { 70 i.disableCompactions = true 71 } 72} 73 74// WithLogger sets the logger for the Index. 75var WithLogger = func(l zap.Logger) IndexOption { 76 return func(i *Index) { 77 i.logger = l.With(zap.String("index", "tsi")) 78 } 79} 80 81// WithMaximumLogFileSize sets the maximum size of LogFiles before they're 82// compacted into IndexFiles. 83var WithMaximumLogFileSize = func(size int64) IndexOption { 84 return func(i *Index) { 85 i.maxLogFileSize = size 86 } 87} 88 89// DisableFsync disables flushing and syncing of underlying files. Primarily this 90// impacts the LogFiles. This option can be set when working with the index in 91// an offline manner, for cases where a hard failure can be overcome by re-running the tooling. 92var DisableFsync = func() IndexOption { 93 return func(i *Index) { 94 i.disableFsync = true 95 } 96} 97 98// WithLogFileBufferSize sets the size of the buffer used within LogFiles. 99// Typically appending an entry to a LogFile involves writing 11 or 12 bytes, so 100// depending on how many new series are being created within a batch, it may 101// be appropriate to set this. 102var WithLogFileBufferSize = func(sz int) IndexOption { 103 return func(i *Index) { 104 if sz > 1<<17 { // 128K 105 sz = 1 << 17 106 } else if sz < 1<<12 { 107 sz = 1 << 12 // 4K (runtime default) 108 } 109 i.logfileBufferSize = sz 110 } 111} 112 113// Index represents a collection of layered index files and WAL. 114type Index struct { 115 mu sync.RWMutex 116 partitions []*Partition 117 opened bool 118 119 // The following may be set when initializing an Index. 120 path string // Root directory of the index partitions. 121 disableCompactions bool // Initially disables compactions on the index. 122 maxLogFileSize int64 // Maximum size of a LogFile before it's compacted. 123 logfileBufferSize int // The size of the buffer used by the LogFile. 124 disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline. 125 logger *zap.Logger // Index's logger. 126 127 // The following must be set when initializing an Index. 128 sfile *tsdb.SeriesFile // series lookup file 129 database string // Name of database. 130 131 // Cached sketches. 132 mSketch, mTSketch estimator.Sketch // Measurement sketches 133 sSketch, sTSketch estimator.Sketch // Series sketches 134 135 // Index's version. 136 version int 137 138 // Number of partitions used by the index. 139 PartitionN uint64 140} 141 142func (i *Index) UniqueReferenceID() uintptr { 143 return uintptr(unsafe.Pointer(i)) 144} 145 146// NewIndex returns a new instance of Index. 147func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index { 148 idx := &Index{ 149 maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize, 150 logger: zap.NewNop(), 151 version: Version, 152 sfile: sfile, 153 database: database, 154 mSketch: hll.NewDefaultPlus(), 155 mTSketch: hll.NewDefaultPlus(), 156 sSketch: hll.NewDefaultPlus(), 157 sTSketch: hll.NewDefaultPlus(), 158 PartitionN: DefaultPartitionN, 159 } 160 161 for _, option := range options { 162 option(idx) 163 } 164 165 return idx 166} 167 168// Bytes estimates the memory footprint of this Index, in bytes. 169func (i *Index) Bytes() int { 170 var b int 171 i.mu.RLock() 172 b += 24 // mu RWMutex is 24 bytes 173 b += int(unsafe.Sizeof(i.partitions)) 174 for _, p := range i.partitions { 175 b += int(unsafe.Sizeof(p)) + p.bytes() 176 } 177 b += int(unsafe.Sizeof(i.opened)) 178 b += int(unsafe.Sizeof(i.path)) + len(i.path) 179 b += int(unsafe.Sizeof(i.disableCompactions)) 180 b += int(unsafe.Sizeof(i.maxLogFileSize)) 181 b += int(unsafe.Sizeof(i.logger)) 182 b += int(unsafe.Sizeof(i.sfile)) 183 // Do not count SeriesFile because it belongs to the code that constructed this Index. 184 b += int(unsafe.Sizeof(i.mSketch)) + i.mSketch.Bytes() 185 b += int(unsafe.Sizeof(i.mTSketch)) + i.mTSketch.Bytes() 186 b += int(unsafe.Sizeof(i.sSketch)) + i.sSketch.Bytes() 187 b += int(unsafe.Sizeof(i.sTSketch)) + i.sTSketch.Bytes() 188 b += int(unsafe.Sizeof(i.database)) + len(i.database) 189 b += int(unsafe.Sizeof(i.version)) 190 b += int(unsafe.Sizeof(i.PartitionN)) 191 i.mu.RUnlock() 192 return b 193} 194 195// Database returns the name of the database the index was initialized with. 196func (i *Index) Database() string { 197 return i.database 198} 199 200// WithLogger sets the logger on the index after it's been created. 201// 202// It's not safe to call WithLogger after the index has been opened, or before 203// it has been closed. 204func (i *Index) WithLogger(l *zap.Logger) { 205 i.mu.Lock() 206 defer i.mu.Unlock() 207 i.logger = l.With(zap.String("index", "tsi")) 208} 209 210// Type returns the type of Index this is. 211func (i *Index) Type() string { return IndexName } 212 213// SeriesFile returns the series file attached to the index. 214func (i *Index) SeriesFile() *tsdb.SeriesFile { return i.sfile } 215 216// SeriesIDSet returns the set of series ids associated with series in this 217// index. Any series IDs for series no longer present in the index are filtered out. 218func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet { 219 seriesIDSet := tsdb.NewSeriesIDSet() 220 others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN) 221 for _, p := range i.partitions { 222 others = append(others, p.seriesIDSet) 223 } 224 seriesIDSet.Merge(others...) 225 return seriesIDSet 226} 227 228// Open opens the index. 229func (i *Index) Open() error { 230 i.mu.Lock() 231 defer i.mu.Unlock() 232 233 if i.opened { 234 return errors.New("index already open") 235 } 236 237 // Ensure root exists. 238 if err := os.MkdirAll(i.path, 0777); err != nil { 239 return err 240 } 241 242 // Initialize index partitions. 243 i.partitions = make([]*Partition, i.PartitionN) 244 for j := 0; j < len(i.partitions); j++ { 245 p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j))) 246 p.MaxLogFileSize = i.maxLogFileSize 247 p.nosync = i.disableFsync 248 p.logbufferSize = i.logfileBufferSize 249 p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1))) 250 i.partitions[j] = p 251 } 252 253 // Open all the Partitions in parallel. 254 partitionN := len(i.partitions) 255 n := i.availableThreads() 256 257 // Store results. 258 errC := make(chan error, partitionN) 259 260 // Run fn on each partition using a fixed number of goroutines. 261 var pidx uint32 // Index of maximum Partition being worked on. 262 for k := 0; k < n; k++ { 263 go func(k int) { 264 for { 265 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 266 if idx >= partitionN { 267 return // No more work. 268 } 269 err := i.partitions[idx].Open() 270 errC <- err 271 } 272 }(k) 273 } 274 275 // Check for error 276 for i := 0; i < partitionN; i++ { 277 if err := <-errC; err != nil { 278 return err 279 } 280 } 281 282 // Refresh cached sketches. 283 if err := i.updateSeriesSketches(); err != nil { 284 return err 285 } else if err := i.updateMeasurementSketches(); err != nil { 286 return err 287 } 288 289 // Mark opened. 290 i.opened = true 291 i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN)) 292 return nil 293} 294 295// Compact requests a compaction of partitions. 296func (i *Index) Compact() { 297 i.mu.Lock() 298 defer i.mu.Unlock() 299 for _, p := range i.partitions { 300 p.Compact() 301 } 302} 303 304func (i *Index) EnableCompactions() { 305 for _, p := range i.partitions { 306 p.EnableCompactions() 307 } 308} 309 310func (i *Index) DisableCompactions() { 311 for _, p := range i.partitions { 312 p.DisableCompactions() 313 } 314} 315 316// Wait blocks until all outstanding compactions have completed. 317func (i *Index) Wait() { 318 for _, p := range i.partitions { 319 p.Wait() 320 } 321} 322 323// Close closes the index. 324func (i *Index) Close() error { 325 // Lock index and close partitions. 326 i.mu.Lock() 327 defer i.mu.Unlock() 328 329 for _, p := range i.partitions { 330 if err := p.Close(); err != nil { 331 return err 332 } 333 } 334 335 // Mark index as closed. 336 i.opened = false 337 return nil 338} 339 340// Path returns the path the index was opened with. 341func (i *Index) Path() string { return i.path } 342 343// PartitionAt returns the partition by index. 344func (i *Index) PartitionAt(index int) *Partition { 345 return i.partitions[index] 346} 347 348// partition returns the appropriate Partition for a provided series key. 349func (i *Index) partition(key []byte) *Partition { 350 return i.partitions[int(xxhash.Sum64(key)&(i.PartitionN-1))] 351} 352 353// partitionIdx returns the index of the partition that key belongs in. 354func (i *Index) partitionIdx(key []byte) int { 355 return int(xxhash.Sum64(key) & (i.PartitionN - 1)) 356} 357 358// availableThreads returns the minimum of GOMAXPROCS and the number of 359// partitions in the Index. 360func (i *Index) availableThreads() int { 361 n := runtime.GOMAXPROCS(0) 362 if len(i.partitions) < n { 363 return len(i.partitions) 364 } 365 return n 366} 367 368// updateMeasurementSketches rebuilds the cached measurement sketches. 369func (i *Index) updateMeasurementSketches() error { 370 i.mSketch, i.mTSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() 371 for j := 0; j < int(i.PartitionN); j++ { 372 if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil { 373 return err 374 } else if i.mSketch.Merge(s); err != nil { 375 return err 376 } else if i.mTSketch.Merge(t); err != nil { 377 return err 378 } 379 } 380 return nil 381} 382 383// updateSeriesSketches rebuilds the cached series sketches. 384func (i *Index) updateSeriesSketches() error { 385 i.sSketch, i.sTSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() 386 for j := 0; j < int(i.PartitionN); j++ { 387 if s, t, err := i.partitions[j].SeriesSketches(); err != nil { 388 return err 389 } else if i.sSketch.Merge(s); err != nil { 390 return err 391 } else if i.sTSketch.Merge(t); err != nil { 392 return err 393 } 394 } 395 return nil 396} 397 398// SetFieldSet sets a shared field set from the engine. 399func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { 400 for _, p := range i.partitions { 401 p.SetFieldSet(fs) 402 } 403} 404 405// FieldSet returns the assigned fieldset. 406func (i *Index) FieldSet() *tsdb.MeasurementFieldSet { 407 if len(i.partitions) == 0 { 408 return nil 409 } 410 return i.partitions[0].FieldSet() 411} 412 413// ForEachMeasurementName iterates over all measurement names in the index, 414// applying fn. It returns the first error encountered, if any. 415// 416// ForEachMeasurementName does not call fn on each partition concurrently so the 417// call may provide a non-goroutine safe fn. 418func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { 419 itr, err := i.MeasurementIterator() 420 if err != nil { 421 return err 422 } else if itr == nil { 423 return nil 424 } 425 defer itr.Close() 426 427 // Iterate over all measurements. 428 for { 429 e, err := itr.Next() 430 if err != nil { 431 return err 432 } else if e == nil { 433 break 434 } 435 436 if err := fn(e); err != nil { 437 return err 438 } 439 } 440 return nil 441} 442 443// MeasurementExists returns true if a measurement exists. 444func (i *Index) MeasurementExists(name []byte) (bool, error) { 445 n := i.availableThreads() 446 447 // Store errors 448 var found uint32 // Use this to signal we found the measurement. 449 errC := make(chan error, i.PartitionN) 450 451 // Check each partition for the measurement concurrently. 452 var pidx uint32 // Index of maximum Partition being worked on. 453 for k := 0; k < n; k++ { 454 go func() { 455 for { 456 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check 457 if idx >= len(i.partitions) { 458 return // No more work. 459 } 460 461 // Check if the measurement has been found. If it has don't 462 // need to check this partition and can just move on. 463 if atomic.LoadUint32(&found) == 1 { 464 errC <- nil 465 continue 466 } 467 468 b, err := i.partitions[idx].MeasurementExists(name) 469 if b { 470 atomic.StoreUint32(&found, 1) 471 } 472 errC <- err 473 } 474 }() 475 } 476 477 // Check for error 478 for i := 0; i < cap(errC); i++ { 479 if err := <-errC; err != nil { 480 return false, err 481 } 482 } 483 484 // Check if we found the measurement. 485 return atomic.LoadUint32(&found) == 1, nil 486} 487 488// MeasurementHasSeries returns true if a measurement has non-tombstoned series. 489func (i *Index) MeasurementHasSeries(name []byte) (bool, error) { 490 for _, p := range i.partitions { 491 if v, err := p.MeasurementHasSeries(name); err != nil { 492 return false, err 493 } else if v { 494 return true, nil 495 } 496 } 497 return false, nil 498} 499 500// fetchByteValues is a helper for gathering values from each partition in the index, 501// based on some criteria. 502// 503// fn is a function that works on partition idx and calls into some method on 504// the partition that returns some ordered values. 505func (i *Index) fetchByteValues(fn func(idx int) ([][]byte, error)) ([][]byte, error) { 506 n := i.availableThreads() 507 508 // Store results. 509 names := make([][][]byte, i.PartitionN) 510 errC := make(chan error, i.PartitionN) 511 512 var pidx uint32 // Index of maximum Partition being worked on. 513 for k := 0; k < n; k++ { 514 go func() { 515 for { 516 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 517 if idx >= len(i.partitions) { 518 return // No more work. 519 } 520 521 pnames, err := fn(idx) 522 523 // This is safe since there are no readers on names until all 524 // the writers are done. 525 names[idx] = pnames 526 errC <- err 527 } 528 }() 529 } 530 531 // Check for error 532 for i := 0; i < cap(errC); i++ { 533 if err := <-errC; err != nil { 534 return nil, err 535 } 536 } 537 538 // It's now safe to read from names. 539 return slices.MergeSortedBytes(names[:]...), nil 540} 541 542// MeasurementIterator returns an iterator over all measurements. 543func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) { 544 itrs := make([]tsdb.MeasurementIterator, 0, len(i.partitions)) 545 for _, p := range i.partitions { 546 itr, err := p.MeasurementIterator() 547 if err != nil { 548 tsdb.MeasurementIterators(itrs).Close() 549 return nil, err 550 } else if itr != nil { 551 itrs = append(itrs, itr) 552 } 553 } 554 return tsdb.MergeMeasurementIterators(itrs...), nil 555} 556 557// MeasurementSeriesIDIterator returns an iterator over all series in a measurement. 558func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { 559 itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) 560 for _, p := range i.partitions { 561 itr, err := p.MeasurementSeriesIDIterator(name) 562 if err != nil { 563 tsdb.SeriesIDIterators(itrs).Close() 564 return nil, err 565 } else if itr != nil { 566 itrs = append(itrs, itr) 567 } 568 } 569 return tsdb.MergeSeriesIDIterators(itrs...), nil 570} 571 572// MeasurementNamesByRegex returns measurement names for the provided regex. 573func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { 574 return i.fetchByteValues(func(idx int) ([][]byte, error) { 575 return i.partitions[idx].MeasurementNamesByRegex(re) 576 }) 577} 578 579// DropMeasurement deletes a measurement from the index. It returns the first 580// error encountered, if any. 581func (i *Index) DropMeasurement(name []byte) error { 582 n := i.availableThreads() 583 584 // Store results. 585 errC := make(chan error, i.PartitionN) 586 587 var pidx uint32 // Index of maximum Partition being worked on. 588 for k := 0; k < n; k++ { 589 go func() { 590 for { 591 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 592 if idx >= len(i.partitions) { 593 return // No more work. 594 } 595 errC <- i.partitions[idx].DropMeasurement(name) 596 } 597 }() 598 } 599 600 // Check for error 601 for i := 0; i < cap(errC); i++ { 602 if err := <-errC; err != nil { 603 return err 604 } 605 } 606 607 // Update sketches. 608 i.mTSketch.Add(name) 609 if err := i.updateSeriesSketches(); err != nil { 610 return err 611 } 612 613 return nil 614} 615 616// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk. 617func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) error { 618 // All slices must be of equal length. 619 if len(names) != len(tagsSlice) { 620 return errors.New("names/tags length mismatch in index") 621 } 622 623 // We need to move different series into collections for each partition 624 // to process. 625 pNames := make([][][]byte, i.PartitionN) 626 pTags := make([][]models.Tags, i.PartitionN) 627 628 // Determine partition for series using each series key. 629 for ki, key := range keys { 630 pidx := i.partitionIdx(key) 631 pNames[pidx] = append(pNames[pidx], names[ki]) 632 pTags[pidx] = append(pTags[pidx], tagsSlice[ki]) 633 } 634 635 // Process each subset of series on each partition. 636 n := i.availableThreads() 637 638 // Store errors. 639 errC := make(chan error, i.PartitionN) 640 641 var pidx uint32 // Index of maximum Partition being worked on. 642 for k := 0; k < n; k++ { 643 go func() { 644 for { 645 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 646 if idx >= len(i.partitions) { 647 return // No more work. 648 } 649 errC <- i.partitions[idx].createSeriesListIfNotExists(pNames[idx], pTags[idx]) 650 } 651 }() 652 } 653 654 // Check for error 655 for i := 0; i < cap(errC); i++ { 656 if err := <-errC; err != nil { 657 return err 658 } 659 } 660 661 // Update sketches. 662 for _, key := range keys { 663 i.sSketch.Add(key) 664 } 665 for _, name := range names { 666 i.mSketch.Add(name) 667 } 668 669 return nil 670} 671 672// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted. 673func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { 674 if err := i.partition(key).createSeriesListIfNotExists([][]byte{name}, []models.Tags{tags}); err != nil { 675 return err 676 } 677 i.sSketch.Add(key) 678 i.mSketch.Add(name) 679 return nil 680} 681 682// InitializeSeries is a no-op. This only applies to the in-memory index. 683func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error { 684 return nil 685} 686 687// DropSeries drops the provided series from the index. If cascade is true 688// and this is the last series to the measurement, the measurment will also be dropped. 689func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error { 690 // Remove from partition. 691 if err := i.partition(key).DropSeries(seriesID); err != nil { 692 return err 693 } 694 695 // Add sketch tombstone. 696 i.sTSketch.Add(key) 697 698 if !cascade { 699 return nil 700 } 701 702 // Extract measurement name. 703 name, _ := models.ParseKeyBytes(key) 704 705 // Check if that was the last series for the measurement in the entire index. 706 if ok, err := i.MeasurementHasSeries(name); err != nil { 707 return err 708 } else if ok { 709 return nil 710 } 711 712 // If no more series exist in the measurement then delete the measurement. 713 if err := i.DropMeasurement(name); err != nil { 714 return err 715 } 716 return nil 717} 718 719// DropSeriesGlobal is a no-op on the tsi1 index. 720func (i *Index) DropSeriesGlobal(key []byte) error { return nil } 721 722// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more 723// series for the measurment. 724func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error { 725 // Check if that was the last series for the measurement in the entire index. 726 if ok, err := i.MeasurementHasSeries(name); err != nil { 727 return err 728 } else if ok { 729 return nil 730 } 731 732 // If no more series exist in the measurement then delete the measurement. 733 return i.DropMeasurement(name) 734} 735 736// MeasurementsSketches returns the two measurement sketches for the index. 737func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { 738 return i.mSketch, i.mTSketch, nil 739} 740 741// SeriesSketches returns the two series sketches for the index. 742func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { 743 return i.sSketch, i.sTSketch, nil 744} 745 746// Since indexes are not shared across shards, the count returned by SeriesN 747// cannot be combined with other shard's results. If you need to count series 748// across indexes then use either the database-wide series file, or merge the 749// index-level bitsets or sketches. 750func (i *Index) SeriesN() int64 { 751 return int64(i.SeriesIDSet().Cardinality()) 752} 753 754// HasTagKey returns true if tag key exists. It returns the first error 755// encountered if any. 756func (i *Index) HasTagKey(name, key []byte) (bool, error) { 757 n := i.availableThreads() 758 759 // Store errors 760 var found uint32 // Use this to signal we found the tag key. 761 errC := make(chan error, i.PartitionN) 762 763 // Check each partition for the tag key concurrently. 764 var pidx uint32 // Index of maximum Partition being worked on. 765 for k := 0; k < n; k++ { 766 go func() { 767 for { 768 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check 769 if idx >= len(i.partitions) { 770 return // No more work. 771 } 772 773 // Check if the tag key has already been found. If it has, we 774 // don't need to check this partition and can just move on. 775 if atomic.LoadUint32(&found) == 1 { 776 errC <- nil 777 continue 778 } 779 780 b, err := i.partitions[idx].HasTagKey(name, key) 781 if b { 782 atomic.StoreUint32(&found, 1) 783 } 784 errC <- err 785 } 786 }() 787 } 788 789 // Check for error 790 for i := 0; i < cap(errC); i++ { 791 if err := <-errC; err != nil { 792 return false, err 793 } 794 } 795 796 // Check if we found the tag key. 797 return atomic.LoadUint32(&found) == 1, nil 798} 799 800// HasTagValue returns true if tag value exists. 801func (i *Index) HasTagValue(name, key, value []byte) (bool, error) { 802 n := i.availableThreads() 803 804 // Store errors 805 var found uint32 // Use this to signal we found the tag key. 806 errC := make(chan error, i.PartitionN) 807 808 // Check each partition for the tag key concurrently. 809 var pidx uint32 // Index of maximum Partition being worked on. 810 for k := 0; k < n; k++ { 811 go func() { 812 for { 813 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check 814 if idx >= len(i.partitions) { 815 return // No more work. 816 } 817 818 // Check if the tag key has already been found. If it has, we 819 // don't need to check this partition and can just move on. 820 if atomic.LoadUint32(&found) == 1 { 821 errC <- nil 822 continue 823 } 824 825 b, err := i.partitions[idx].HasTagValue(name, key, value) 826 if b { 827 atomic.StoreUint32(&found, 1) 828 } 829 errC <- err 830 } 831 }() 832 } 833 834 // Check for error 835 for i := 0; i < cap(errC); i++ { 836 if err := <-errC; err != nil { 837 return false, err 838 } 839 } 840 841 // Check if we found the tag key. 842 return atomic.LoadUint32(&found) == 1, nil 843} 844 845// TagKeyIterator returns an iterator for all keys across a single measurement. 846func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) { 847 a := make([]tsdb.TagKeyIterator, 0, len(i.partitions)) 848 for _, p := range i.partitions { 849 itr := p.TagKeyIterator(name) 850 if itr != nil { 851 a = append(a, itr) 852 } 853 } 854 return tsdb.MergeTagKeyIterators(a...), nil 855} 856 857// TagValueIterator returns an iterator for all values across a single key. 858func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error) { 859 a := make([]tsdb.TagValueIterator, 0, len(i.partitions)) 860 for _, p := range i.partitions { 861 itr := p.TagValueIterator(name, key) 862 if itr != nil { 863 a = append(a, itr) 864 } 865 } 866 return tsdb.MergeTagValueIterators(a...), nil 867} 868 869// TagKeySeriesIDIterator returns a series iterator for all values across a single key. 870func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { 871 a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) 872 for _, p := range i.partitions { 873 itr := p.TagKeySeriesIDIterator(name, key) 874 if itr != nil { 875 a = append(a, itr) 876 } 877 } 878 return tsdb.MergeSeriesIDIterators(a...), nil 879} 880 881// TagValueSeriesIDIterator returns a series iterator for a single tag value. 882func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { 883 a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) 884 for _, p := range i.partitions { 885 itr := p.TagValueSeriesIDIterator(name, key, value) 886 if itr != nil { 887 a = append(a, itr) 888 } 889 } 890 return tsdb.MergeSeriesIDIterators(a...), nil 891} 892 893// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. 894func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { 895 n := i.availableThreads() 896 897 // Store results. 898 keys := make([]map[string]struct{}, i.PartitionN) 899 errC := make(chan error, i.PartitionN) 900 901 var pidx uint32 // Index of maximum Partition being worked on. 902 for k := 0; k < n; k++ { 903 go func() { 904 for { 905 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 906 if idx >= len(i.partitions) { 907 return // No more work. 908 } 909 910 // This is safe since there are no readers on keys until all 911 // the writers are done. 912 tagKeys, err := i.partitions[idx].MeasurementTagKeysByExpr(name, expr) 913 keys[idx] = tagKeys 914 errC <- err 915 } 916 }() 917 } 918 919 // Check for error 920 for i := 0; i < cap(errC); i++ { 921 if err := <-errC; err != nil { 922 return nil, err 923 } 924 } 925 926 // Merge into single map. 927 result := keys[0] 928 for k := 1; k < len(i.partitions); k++ { 929 for k := range keys[k] { 930 result[k] = struct{}{} 931 } 932 } 933 return result, nil 934} 935 936// DiskSizeBytes returns the size of the index on disk. 937func (i *Index) DiskSizeBytes() int64 { 938 fs, err := i.RetainFileSet() 939 if err != nil { 940 i.logger.Warn("Index is closing down") 941 return 0 942 } 943 defer fs.Release() 944 945 var manifestSize int64 946 // Get MANIFEST sizes from each partition. 947 for _, p := range i.partitions { 948 manifestSize += p.manifestSize 949 } 950 return fs.Size() + manifestSize 951} 952 953// TagKeyCardinality always returns zero. 954// It is not possible to determine cardinality of tags across index files, and 955// thus it cannot be done across partitions. 956func (i *Index) TagKeyCardinality(name, key []byte) int { 957 return 0 958} 959 960// RetainFileSet returns the set of all files across all partitions. 961// This is only needed when all files need to be retained for an operation. 962func (i *Index) RetainFileSet() (*FileSet, error) { 963 i.mu.RLock() 964 defer i.mu.RUnlock() 965 966 fs, _ := NewFileSet(nil, i.sfile, nil) 967 for _, p := range i.partitions { 968 pfs, err := p.RetainFileSet() 969 if err != nil { 970 fs.Close() 971 return nil, err 972 } 973 fs.files = append(fs.files, pfs.files...) 974 } 975 return fs, nil 976} 977 978// SetFieldName is a no-op on this index. 979func (i *Index) SetFieldName(measurement []byte, name string) {} 980 981// Rebuild rebuilds an index. It's a no-op for this index. 982func (i *Index) Rebuild() {} 983 984// IsIndexDir returns true if directory contains at least one partition directory. 985func IsIndexDir(path string) (bool, error) { 986 fis, err := ioutil.ReadDir(path) 987 if err != nil { 988 return false, err 989 } 990 for _, fi := range fis { 991 if !fi.IsDir() { 992 continue 993 } else if ok, err := IsPartitionDir(filepath.Join(path, fi.Name())); err != nil { 994 return false, err 995 } else if ok { 996 return true, nil 997 } 998 } 999 return false, nil 1000} 1001