1package tsi1 2 3import ( 4 "errors" 5 "fmt" 6 "io/ioutil" 7 "os" 8 "path/filepath" 9 "regexp" 10 "runtime" 11 "strconv" 12 "sync" 13 "sync/atomic" 14 "unsafe" 15 16 "github.com/cespare/xxhash" 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/pkg/estimator" 19 "github.com/influxdata/influxdb/pkg/estimator/hll" 20 "github.com/influxdata/influxdb/pkg/slices" 21 "github.com/influxdata/influxdb/tsdb" 22 "github.com/influxdata/influxql" 23 "go.uber.org/zap" 24) 25 26// IndexName is the name of the index. 27const IndexName = tsdb.TSI1IndexName 28 29// ErrCompactionInterrupted is returned if compactions are disabled or 30// an index is closed while a compaction is occurring. 31var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted") 32 33func init() { 34 if os.Getenv("INFLUXDB_EXP_TSI_PARTITIONS") != "" { 35 i, err := strconv.Atoi(os.Getenv("INFLUXDB_EXP_TSI_PARTITIONS")) 36 if err != nil { 37 panic(err) 38 } 39 DefaultPartitionN = uint64(i) 40 } 41 42 tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index { 43 idx := NewIndex(sfile, db, 44 WithPath(path), 45 WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)), 46 WithSeriesIDCacheSize(opt.Config.SeriesIDSetCacheSize), 47 ) 48 return idx 49 }) 50} 51 52// DefaultPartitionN determines how many shards the index will be partitioned into. 53// 54// NOTE: Currently, this must not be change once a database is created. Further, 55// it must also be a power of 2. 56// 57var DefaultPartitionN uint64 = 8 58 59// An IndexOption is a functional option for changing the configuration of 60// an Index. 61type IndexOption func(i *Index) 62 63// WithPath sets the root path of the Index 64var WithPath = func(path string) IndexOption { 65 return func(i *Index) { 66 i.path = path 67 } 68} 69 70// DisableCompactions disables compactions on the Index. 71var DisableCompactions = func() IndexOption { 72 return func(i *Index) { 73 i.disableCompactions = true 74 } 75} 76 77// WithLogger sets the logger for the Index. 78var WithLogger = func(l zap.Logger) IndexOption { 79 return func(i *Index) { 80 i.logger = l.With(zap.String("index", "tsi")) 81 } 82} 83 84// WithMaximumLogFileSize sets the maximum size of LogFiles before they're 85// compacted into IndexFiles. 86var WithMaximumLogFileSize = func(size int64) IndexOption { 87 return func(i *Index) { 88 i.maxLogFileSize = size 89 } 90} 91 92// DisableFsync disables flushing and syncing of underlying files. Primarily this 93// impacts the LogFiles. This option can be set when working with the index in 94// an offline manner, for cases where a hard failure can be overcome by re-running the tooling. 95var DisableFsync = func() IndexOption { 96 return func(i *Index) { 97 i.disableFsync = true 98 } 99} 100 101// WithLogFileBufferSize sets the size of the buffer used within LogFiles. 102// Typically appending an entry to a LogFile involves writing 11 or 12 bytes, so 103// depending on how many new series are being created within a batch, it may 104// be appropriate to set this. 105var WithLogFileBufferSize = func(sz int) IndexOption { 106 return func(i *Index) { 107 if sz > 1<<17 { // 128K 108 sz = 1 << 17 109 } else if sz < 1<<12 { 110 sz = 1 << 12 // 4K (runtime default) 111 } 112 i.logfileBufferSize = sz 113 } 114} 115 116// WithSeriesIDCacheSize sets the size of the series id set cache. 117// If set to 0, then the cache is disabled. 118var WithSeriesIDCacheSize = func(sz int) IndexOption { 119 return func(i *Index) { 120 i.tagValueCacheSize = sz 121 } 122} 123 124// Index represents a collection of layered index files and WAL. 125type Index struct { 126 mu sync.RWMutex 127 partitions []*Partition 128 opened bool 129 130 tagValueCache *TagValueSeriesIDCache 131 tagValueCacheSize int 132 133 // The following may be set when initializing an Index. 134 path string // Root directory of the index partitions. 135 disableCompactions bool // Initially disables compactions on the index. 136 maxLogFileSize int64 // Maximum size of a LogFile before it's compacted. 137 logfileBufferSize int // The size of the buffer used by the LogFile. 138 disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline. 139 logger *zap.Logger // Index's logger. 140 141 // The following must be set when initializing an Index. 142 sfile *tsdb.SeriesFile // series lookup file 143 database string // Name of database. 144 145 // Cached sketches. 146 mSketch, mTSketch estimator.Sketch // Measurement sketches 147 sSketch, sTSketch estimator.Sketch // Series sketches 148 149 // Index's version. 150 version int 151 152 // Number of partitions used by the index. 153 PartitionN uint64 154} 155 156func (i *Index) UniqueReferenceID() uintptr { 157 return uintptr(unsafe.Pointer(i)) 158} 159 160// NewIndex returns a new instance of Index. 161func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index { 162 idx := &Index{ 163 tagValueCacheSize: tsdb.DefaultSeriesIDSetCacheSize, 164 maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize, 165 logger: zap.NewNop(), 166 version: Version, 167 sfile: sfile, 168 database: database, 169 mSketch: hll.NewDefaultPlus(), 170 mTSketch: hll.NewDefaultPlus(), 171 sSketch: hll.NewDefaultPlus(), 172 sTSketch: hll.NewDefaultPlus(), 173 PartitionN: DefaultPartitionN, 174 } 175 176 for _, option := range options { 177 option(idx) 178 } 179 180 idx.tagValueCache = NewTagValueSeriesIDCache(idx.tagValueCacheSize) 181 return idx 182} 183 184// Bytes estimates the memory footprint of this Index, in bytes. 185func (i *Index) Bytes() int { 186 var b int 187 i.mu.RLock() 188 b += 24 // mu RWMutex is 24 bytes 189 b += int(unsafe.Sizeof(i.partitions)) 190 for _, p := range i.partitions { 191 b += int(unsafe.Sizeof(p)) + p.bytes() 192 } 193 b += int(unsafe.Sizeof(i.opened)) 194 b += int(unsafe.Sizeof(i.path)) + len(i.path) 195 b += int(unsafe.Sizeof(i.disableCompactions)) 196 b += int(unsafe.Sizeof(i.maxLogFileSize)) 197 b += int(unsafe.Sizeof(i.logger)) 198 b += int(unsafe.Sizeof(i.sfile)) 199 // Do not count SeriesFile because it belongs to the code that constructed this Index. 200 b += int(unsafe.Sizeof(i.mSketch)) + i.mSketch.Bytes() 201 b += int(unsafe.Sizeof(i.mTSketch)) + i.mTSketch.Bytes() 202 b += int(unsafe.Sizeof(i.sSketch)) + i.sSketch.Bytes() 203 b += int(unsafe.Sizeof(i.sTSketch)) + i.sTSketch.Bytes() 204 b += int(unsafe.Sizeof(i.database)) + len(i.database) 205 b += int(unsafe.Sizeof(i.version)) 206 b += int(unsafe.Sizeof(i.PartitionN)) 207 i.mu.RUnlock() 208 return b 209} 210 211// Database returns the name of the database the index was initialized with. 212func (i *Index) Database() string { 213 return i.database 214} 215 216// WithLogger sets the logger on the index after it's been created. 217// 218// It's not safe to call WithLogger after the index has been opened, or before 219// it has been closed. 220func (i *Index) WithLogger(l *zap.Logger) { 221 i.mu.Lock() 222 defer i.mu.Unlock() 223 i.logger = l.With(zap.String("index", "tsi")) 224} 225 226// Type returns the type of Index this is. 227func (i *Index) Type() string { return IndexName } 228 229// SeriesFile returns the series file attached to the index. 230func (i *Index) SeriesFile() *tsdb.SeriesFile { return i.sfile } 231 232// SeriesIDSet returns the set of series ids associated with series in this 233// index. Any series IDs for series no longer present in the index are filtered out. 234func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet { 235 seriesIDSet := tsdb.NewSeriesIDSet() 236 others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN) 237 for _, p := range i.partitions { 238 others = append(others, p.seriesIDSet) 239 } 240 seriesIDSet.Merge(others...) 241 return seriesIDSet 242} 243 244// Open opens the index. 245func (i *Index) Open() error { 246 i.mu.Lock() 247 defer i.mu.Unlock() 248 249 if i.opened { 250 return errors.New("index already open") 251 } 252 253 // Ensure root exists. 254 if err := os.MkdirAll(i.path, 0777); err != nil { 255 return err 256 } 257 258 // Initialize index partitions. 259 i.partitions = make([]*Partition, i.PartitionN) 260 for j := 0; j < len(i.partitions); j++ { 261 p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j))) 262 p.MaxLogFileSize = i.maxLogFileSize 263 p.nosync = i.disableFsync 264 p.logbufferSize = i.logfileBufferSize 265 p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1))) 266 i.partitions[j] = p 267 } 268 269 // Open all the Partitions in parallel. 270 partitionN := len(i.partitions) 271 n := i.availableThreads() 272 273 // Store results. 274 errC := make(chan error, partitionN) 275 276 // Run fn on each partition using a fixed number of goroutines. 277 var pidx uint32 // Index of maximum Partition being worked on. 278 for k := 0; k < n; k++ { 279 go func(k int) { 280 for { 281 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 282 if idx >= partitionN { 283 return // No more work. 284 } 285 err := i.partitions[idx].Open() 286 errC <- err 287 } 288 }(k) 289 } 290 291 // Check for error 292 for i := 0; i < partitionN; i++ { 293 if err := <-errC; err != nil { 294 return err 295 } 296 } 297 298 // Refresh cached sketches. 299 if err := i.updateSeriesSketches(); err != nil { 300 return err 301 } else if err := i.updateMeasurementSketches(); err != nil { 302 return err 303 } 304 305 // Mark opened. 306 i.opened = true 307 i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN)) 308 return nil 309} 310 311// Compact requests a compaction of partitions. 312func (i *Index) Compact() { 313 i.mu.Lock() 314 defer i.mu.Unlock() 315 for _, p := range i.partitions { 316 p.Compact() 317 } 318} 319 320func (i *Index) EnableCompactions() { 321 for _, p := range i.partitions { 322 p.EnableCompactions() 323 } 324} 325 326func (i *Index) DisableCompactions() { 327 for _, p := range i.partitions { 328 p.DisableCompactions() 329 } 330} 331 332// Wait blocks until all outstanding compactions have completed. 333func (i *Index) Wait() { 334 for _, p := range i.partitions { 335 p.Wait() 336 } 337} 338 339// Close closes the index. 340func (i *Index) Close() error { 341 // Lock index and close partitions. 342 i.mu.Lock() 343 defer i.mu.Unlock() 344 345 for _, p := range i.partitions { 346 if err := p.Close(); err != nil { 347 return err 348 } 349 } 350 351 // Mark index as closed. 352 i.opened = false 353 return nil 354} 355 356// Path returns the path the index was opened with. 357func (i *Index) Path() string { return i.path } 358 359// PartitionAt returns the partition by index. 360func (i *Index) PartitionAt(index int) *Partition { 361 return i.partitions[index] 362} 363 364// partition returns the appropriate Partition for a provided series key. 365func (i *Index) partition(key []byte) *Partition { 366 return i.partitions[int(xxhash.Sum64(key)&(i.PartitionN-1))] 367} 368 369// partitionIdx returns the index of the partition that key belongs in. 370func (i *Index) partitionIdx(key []byte) int { 371 return int(xxhash.Sum64(key) & (i.PartitionN - 1)) 372} 373 374// availableThreads returns the minimum of GOMAXPROCS and the number of 375// partitions in the Index. 376func (i *Index) availableThreads() int { 377 n := runtime.GOMAXPROCS(0) 378 if len(i.partitions) < n { 379 return len(i.partitions) 380 } 381 return n 382} 383 384// updateMeasurementSketches rebuilds the cached measurement sketches. 385func (i *Index) updateMeasurementSketches() error { 386 for j := 0; j < int(i.PartitionN); j++ { 387 if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil { 388 return err 389 } else if i.mSketch.Merge(s); err != nil { 390 return err 391 } else if i.mTSketch.Merge(t); err != nil { 392 return err 393 } 394 } 395 return nil 396} 397 398// updateSeriesSketches rebuilds the cached series sketches. 399func (i *Index) updateSeriesSketches() error { 400 for j := 0; j < int(i.PartitionN); j++ { 401 if s, t, err := i.partitions[j].SeriesSketches(); err != nil { 402 return err 403 } else if i.sSketch.Merge(s); err != nil { 404 return err 405 } else if i.sTSketch.Merge(t); err != nil { 406 return err 407 } 408 } 409 return nil 410} 411 412// SetFieldSet sets a shared field set from the engine. 413func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { 414 for _, p := range i.partitions { 415 p.SetFieldSet(fs) 416 } 417} 418 419// FieldSet returns the assigned fieldset. 420func (i *Index) FieldSet() *tsdb.MeasurementFieldSet { 421 if len(i.partitions) == 0 { 422 return nil 423 } 424 return i.partitions[0].FieldSet() 425} 426 427// ForEachMeasurementName iterates over all measurement names in the index, 428// applying fn. It returns the first error encountered, if any. 429// 430// ForEachMeasurementName does not call fn on each partition concurrently so the 431// call may provide a non-goroutine safe fn. 432func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { 433 itr, err := i.MeasurementIterator() 434 if err != nil { 435 return err 436 } else if itr == nil { 437 return nil 438 } 439 defer itr.Close() 440 441 // Iterate over all measurements. 442 for { 443 e, err := itr.Next() 444 if err != nil { 445 return err 446 } else if e == nil { 447 break 448 } 449 450 if err := fn(e); err != nil { 451 return err 452 } 453 } 454 return nil 455} 456 457// MeasurementExists returns true if a measurement exists. 458func (i *Index) MeasurementExists(name []byte) (bool, error) { 459 n := i.availableThreads() 460 461 // Store errors 462 var found uint32 // Use this to signal we found the measurement. 463 errC := make(chan error, i.PartitionN) 464 465 // Check each partition for the measurement concurrently. 466 var pidx uint32 // Index of maximum Partition being worked on. 467 for k := 0; k < n; k++ { 468 go func() { 469 for { 470 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check 471 if idx >= len(i.partitions) { 472 return // No more work. 473 } 474 475 // Check if the measurement has been found. If it has don't 476 // need to check this partition and can just move on. 477 if atomic.LoadUint32(&found) == 1 { 478 errC <- nil 479 continue 480 } 481 482 b, err := i.partitions[idx].MeasurementExists(name) 483 if b { 484 atomic.StoreUint32(&found, 1) 485 } 486 errC <- err 487 } 488 }() 489 } 490 491 // Check for error 492 for i := 0; i < cap(errC); i++ { 493 if err := <-errC; err != nil { 494 return false, err 495 } 496 } 497 498 // Check if we found the measurement. 499 return atomic.LoadUint32(&found) == 1, nil 500} 501 502// MeasurementHasSeries returns true if a measurement has non-tombstoned series. 503func (i *Index) MeasurementHasSeries(name []byte) (bool, error) { 504 for _, p := range i.partitions { 505 if v, err := p.MeasurementHasSeries(name); err != nil { 506 return false, err 507 } else if v { 508 return true, nil 509 } 510 } 511 return false, nil 512} 513 514// fetchByteValues is a helper for gathering values from each partition in the index, 515// based on some criteria. 516// 517// fn is a function that works on partition idx and calls into some method on 518// the partition that returns some ordered values. 519func (i *Index) fetchByteValues(fn func(idx int) ([][]byte, error)) ([][]byte, error) { 520 n := i.availableThreads() 521 522 // Store results. 523 names := make([][][]byte, i.PartitionN) 524 errC := make(chan error, i.PartitionN) 525 526 var pidx uint32 // Index of maximum Partition being worked on. 527 for k := 0; k < n; k++ { 528 go func() { 529 for { 530 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 531 if idx >= len(i.partitions) { 532 return // No more work. 533 } 534 535 pnames, err := fn(idx) 536 537 // This is safe since there are no readers on names until all 538 // the writers are done. 539 names[idx] = pnames 540 errC <- err 541 } 542 }() 543 } 544 545 // Check for error 546 for i := 0; i < cap(errC); i++ { 547 if err := <-errC; err != nil { 548 return nil, err 549 } 550 } 551 552 // It's now safe to read from names. 553 return slices.MergeSortedBytes(names...), nil 554} 555 556// MeasurementIterator returns an iterator over all measurements. 557func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) { 558 itrs := make([]tsdb.MeasurementIterator, 0, len(i.partitions)) 559 for _, p := range i.partitions { 560 itr, err := p.MeasurementIterator() 561 if err != nil { 562 tsdb.MeasurementIterators(itrs).Close() 563 return nil, err 564 } else if itr != nil { 565 itrs = append(itrs, itr) 566 } 567 } 568 return tsdb.MergeMeasurementIterators(itrs...), nil 569} 570 571// MeasurementSeriesIDIterator returns an iterator over all series in a measurement. 572func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { 573 itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) 574 for _, p := range i.partitions { 575 itr, err := p.MeasurementSeriesIDIterator(name) 576 if err != nil { 577 tsdb.SeriesIDIterators(itrs).Close() 578 return nil, err 579 } else if itr != nil { 580 itrs = append(itrs, itr) 581 } 582 } 583 return tsdb.MergeSeriesIDIterators(itrs...), nil 584} 585 586// MeasurementNamesByRegex returns measurement names for the provided regex. 587func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { 588 return i.fetchByteValues(func(idx int) ([][]byte, error) { 589 return i.partitions[idx].MeasurementNamesByRegex(re) 590 }) 591} 592 593// DropMeasurement deletes a measurement from the index. It returns the first 594// error encountered, if any. 595func (i *Index) DropMeasurement(name []byte) error { 596 n := i.availableThreads() 597 598 // Store results. 599 errC := make(chan error, i.PartitionN) 600 601 var pidx uint32 // Index of maximum Partition being worked on. 602 for k := 0; k < n; k++ { 603 go func() { 604 for { 605 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 606 if idx >= len(i.partitions) { 607 return // No more work. 608 } 609 errC <- i.partitions[idx].DropMeasurement(name) 610 } 611 }() 612 } 613 614 // Check for error 615 for i := 0; i < cap(errC); i++ { 616 if err := <-errC; err != nil { 617 return err 618 } 619 } 620 621 // Update sketches under lock. 622 i.mu.Lock() 623 defer i.mu.Unlock() 624 625 i.mTSketch.Add(name) 626 if err := i.updateSeriesSketches(); err != nil { 627 return err 628 } 629 630 return nil 631} 632 633// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk. 634func (i *Index) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags, tracker tsdb.StatsTracker) error { 635 // All slices must be of equal length. 636 if len(names) != len(tagsSlice) { 637 return errors.New("names/tags length mismatch in index") 638 } 639 640 // We need to move different series into collections for each partition 641 // to process. 642 pNames := make([][][]byte, i.PartitionN) 643 pTags := make([][]models.Tags, i.PartitionN) 644 645 // Determine partition for series using each series key. 646 for ki, key := range keys { 647 pidx := i.partitionIdx(key) 648 pNames[pidx] = append(pNames[pidx], names[ki]) 649 pTags[pidx] = append(pTags[pidx], tagsSlice[ki]) 650 } 651 652 // Process each subset of series on each partition. 653 n := i.availableThreads() 654 655 // Store errors. 656 errC := make(chan error, i.PartitionN) 657 658 var pidx uint32 // Index of maximum Partition being worked on. 659 for k := 0; k < n; k++ { 660 go func() { 661 for { 662 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 663 if idx >= len(i.partitions) { 664 return // No more work. 665 } 666 667 ids, err := i.partitions[idx].createSeriesListIfNotExists(pNames[idx], pTags[idx], tracker) 668 669 var updateCache bool 670 for _, id := range ids { 671 if id != 0 { 672 updateCache = true 673 break 674 } 675 } 676 677 if !updateCache { 678 errC <- err 679 continue 680 } 681 682 // Some cached bitset results may need to be updated. 683 i.tagValueCache.RLock() 684 for j, id := range ids { 685 if id == 0 { 686 continue 687 } 688 689 name := pNames[idx][j] 690 tags := pTags[idx][j] 691 if i.tagValueCache.measurementContainsSets(name) { 692 for _, pair := range tags { 693 // TODO(edd): It's not clear to me yet whether it will be better to take a lock 694 // on every series id set, or whether to gather them all up under the cache rlock 695 // and then take the cache lock and update them all at once (without invoking a lock 696 // on each series id set). 697 // 698 // Taking the cache lock will block all queries, but is one lock. Taking each series set 699 // lock might be many lock/unlocks but will only block a query that needs that particular set. 700 // 701 // Need to think on it, but I think taking a lock on each series id set is the way to go. 702 // 703 // One other option here is to take a lock on the series id set when we first encounter it 704 // and then keep it locked until we're done with all the ids. 705 // 706 // Note: this will only add `id` to the set if it exists. 707 i.tagValueCache.addToSet(name, pair.Key, pair.Value, id) // Takes a lock on the series id set 708 } 709 } 710 } 711 i.tagValueCache.RUnlock() 712 713 errC <- err 714 } 715 }() 716 } 717 718 // Check for error 719 for i := 0; i < cap(errC); i++ { 720 if err := <-errC; err != nil { 721 return err 722 } 723 } 724 725 // Update sketches under lock. 726 i.mu.Lock() 727 defer i.mu.Unlock() 728 729 for _, key := range keys { 730 i.sSketch.Add(key) 731 } 732 for _, name := range names { 733 i.mSketch.Add(name) 734 } 735 736 return nil 737} 738 739// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted. 740func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, tracker tsdb.StatsTracker) error { 741 ids, err := i.partition(key).createSeriesListIfNotExists([][]byte{name}, []models.Tags{tags}, tracker) 742 if err != nil { 743 return err 744 } 745 746 i.mu.Lock() 747 i.sSketch.Add(key) 748 i.mSketch.Add(name) 749 i.mu.Unlock() 750 751 if ids[0] == 0 { 752 return nil // No new series, nothing further to update. 753 } 754 755 // If there are cached sets for any of the tag pairs, they will need to be 756 // updated with the series id. 757 i.tagValueCache.RLock() 758 if i.tagValueCache.measurementContainsSets(name) { 759 for _, pair := range tags { 760 // TODO(edd): It's not clear to me yet whether it will be better to take a lock 761 // on every series id set, or whether to gather them all up under the cache rlock 762 // and then take the cache lock and update them all at once (without invoking a lock 763 // on each series id set). 764 // 765 // Taking the cache lock will block all queries, but is one lock. Taking each series set 766 // lock might be many lock/unlocks but will only block a query that needs that particular set. 767 // 768 // Need to think on it, but I think taking a lock on each series id set is the way to go. 769 // 770 // Note this will only add `id` to the set if it exists. 771 i.tagValueCache.addToSet(name, pair.Key, pair.Value, ids[0]) // Takes a lock on the series id set 772 } 773 } 774 i.tagValueCache.RUnlock() 775 return nil 776} 777 778// InitializeSeries is a no-op. This only applies to the in-memory index. 779func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error { 780 return nil 781} 782 783// DropSeries drops the provided series from the index. If cascade is true 784// and this is the last series to the measurement, the measurment will also be dropped. 785func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error { 786 // Remove from partition. 787 if err := i.partition(key).DropSeries(seriesID); err != nil { 788 return err 789 } 790 791 // Add sketch tombstone. 792 i.mu.Lock() 793 i.sTSketch.Add(key) 794 i.mu.Unlock() 795 796 if !cascade { 797 return nil 798 } 799 800 // Extract measurement name & tags. 801 name, tags := models.ParseKeyBytes(key) 802 803 // If there are cached sets for any of the tag pairs, they will need to be 804 // updated with the series id. 805 i.tagValueCache.RLock() 806 if i.tagValueCache.measurementContainsSets(name) { 807 for _, pair := range tags { 808 i.tagValueCache.delete(name, pair.Key, pair.Value, seriesID) // Takes a lock on the series id set 809 } 810 } 811 i.tagValueCache.RUnlock() 812 813 // Check if that was the last series for the measurement in the entire index. 814 if ok, err := i.MeasurementHasSeries(name); err != nil { 815 return err 816 } else if ok { 817 return nil 818 } 819 820 // If no more series exist in the measurement then delete the measurement. 821 if err := i.DropMeasurement(name); err != nil { 822 return err 823 } 824 return nil 825} 826 827// DropSeries drops the provided series from the index. If cascade is true 828// and this is the last series to the measurement, the measurment will also be dropped. 829func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error { 830 // All slices must be of equal length. 831 if len(seriesIDs) != len(keys) { 832 return errors.New("seriesIDs/keys length mismatch in index") 833 } 834 835 // We need to move different series into collections for each partition 836 // to process. 837 pSeriesIDs := make([][]uint64, i.PartitionN) 838 pKeys := make([][][]byte, i.PartitionN) 839 840 for idx, key := range keys { 841 pidx := i.partitionIdx(key) 842 pSeriesIDs[pidx] = append(pSeriesIDs[pidx], seriesIDs[idx]) 843 pKeys[pidx] = append(pKeys[pidx], key) 844 } 845 846 // Process each subset of series on each partition. 847 n := i.availableThreads() 848 849 // Store errors. 850 errC := make(chan error, i.PartitionN) 851 852 var pidx uint32 // Index of maximum Partition being worked on. 853 for k := 0; k < n; k++ { 854 go func() { 855 for { 856 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 857 if idx >= len(i.partitions) { 858 return // No more work. 859 } 860 861 // Drop from partition. 862 err := i.partitions[idx].DropSeriesList(pSeriesIDs[idx]) 863 errC <- err 864 } 865 }() 866 } 867 868 // Check for error 869 for i := 0; i < cap(errC); i++ { 870 if err := <-errC; err != nil { 871 return err 872 } 873 } 874 875 // Add sketch tombstone. 876 i.mu.Lock() 877 for _, key := range keys { 878 i.sTSketch.Add(key) 879 } 880 i.mu.Unlock() 881 882 return nil 883} 884 885// DropSeriesGlobal is a no-op on the tsi1 index. 886func (i *Index) DropSeriesGlobal(key []byte) error { return nil } 887 888// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more 889// series for the measurment. 890func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { 891 // Check if that was the last series for the measurement in the entire index. 892 if ok, err := i.MeasurementHasSeries(name); err != nil { 893 return false, err 894 } else if ok { 895 return false, nil 896 } 897 898 // If no more series exist in the measurement then delete the measurement. 899 return true, i.DropMeasurement(name) 900} 901 902// MeasurementsSketches returns the two measurement sketches for the index. 903func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { 904 i.mu.RLock() 905 defer i.mu.RUnlock() 906 return i.mSketch.Clone(), i.mTSketch.Clone(), nil 907} 908 909// SeriesSketches returns the two series sketches for the index. 910func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { 911 i.mu.RLock() 912 defer i.mu.RUnlock() 913 return i.sSketch.Clone(), i.sTSketch.Clone(), nil 914} 915 916// Since indexes are not shared across shards, the count returned by SeriesN 917// cannot be combined with other shard's results. If you need to count series 918// across indexes then use either the database-wide series file, or merge the 919// index-level bitsets or sketches. 920func (i *Index) SeriesN() int64 { 921 return int64(i.SeriesIDSet().Cardinality()) 922} 923 924// HasTagKey returns true if tag key exists. It returns the first error 925// encountered if any. 926func (i *Index) HasTagKey(name, key []byte) (bool, error) { 927 n := i.availableThreads() 928 929 // Store errors 930 var found uint32 // Use this to signal we found the tag key. 931 errC := make(chan error, i.PartitionN) 932 933 // Check each partition for the tag key concurrently. 934 var pidx uint32 // Index of maximum Partition being worked on. 935 for k := 0; k < n; k++ { 936 go func() { 937 for { 938 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check 939 if idx >= len(i.partitions) { 940 return // No more work. 941 } 942 943 // Check if the tag key has already been found. If it has, we 944 // don't need to check this partition and can just move on. 945 if atomic.LoadUint32(&found) == 1 { 946 errC <- nil 947 continue 948 } 949 950 b, err := i.partitions[idx].HasTagKey(name, key) 951 if b { 952 atomic.StoreUint32(&found, 1) 953 } 954 errC <- err 955 } 956 }() 957 } 958 959 // Check for error 960 for i := 0; i < cap(errC); i++ { 961 if err := <-errC; err != nil { 962 return false, err 963 } 964 } 965 966 // Check if we found the tag key. 967 return atomic.LoadUint32(&found) == 1, nil 968} 969 970// HasTagValue returns true if tag value exists. 971func (i *Index) HasTagValue(name, key, value []byte) (bool, error) { 972 n := i.availableThreads() 973 974 // Store errors 975 var found uint32 // Use this to signal we found the tag key. 976 errC := make(chan error, i.PartitionN) 977 978 // Check each partition for the tag key concurrently. 979 var pidx uint32 // Index of maximum Partition being worked on. 980 for k := 0; k < n; k++ { 981 go func() { 982 for { 983 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check 984 if idx >= len(i.partitions) { 985 return // No more work. 986 } 987 988 // Check if the tag key has already been found. If it has, we 989 // don't need to check this partition and can just move on. 990 if atomic.LoadUint32(&found) == 1 { 991 errC <- nil 992 continue 993 } 994 995 b, err := i.partitions[idx].HasTagValue(name, key, value) 996 if b { 997 atomic.StoreUint32(&found, 1) 998 } 999 errC <- err 1000 } 1001 }() 1002 } 1003 1004 // Check for error 1005 for i := 0; i < cap(errC); i++ { 1006 if err := <-errC; err != nil { 1007 return false, err 1008 } 1009 } 1010 1011 // Check if we found the tag key. 1012 return atomic.LoadUint32(&found) == 1, nil 1013} 1014 1015// TagKeyIterator returns an iterator for all keys across a single measurement. 1016func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) { 1017 a := make([]tsdb.TagKeyIterator, 0, len(i.partitions)) 1018 for _, p := range i.partitions { 1019 itr := p.TagKeyIterator(name) 1020 if itr != nil { 1021 a = append(a, itr) 1022 } 1023 } 1024 return tsdb.MergeTagKeyIterators(a...), nil 1025} 1026 1027// TagValueIterator returns an iterator for all values across a single key. 1028func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error) { 1029 a := make([]tsdb.TagValueIterator, 0, len(i.partitions)) 1030 for _, p := range i.partitions { 1031 itr := p.TagValueIterator(name, key) 1032 if itr != nil { 1033 a = append(a, itr) 1034 } 1035 } 1036 return tsdb.MergeTagValueIterators(a...), nil 1037} 1038 1039// TagKeySeriesIDIterator returns a series iterator for all values across a single key. 1040func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { 1041 a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) 1042 for _, p := range i.partitions { 1043 itr, err := p.TagKeySeriesIDIterator(name, key) 1044 if err != nil { 1045 return nil, err 1046 } else if itr != nil { 1047 a = append(a, itr) 1048 } 1049 } 1050 return tsdb.MergeSeriesIDIterators(a...), nil 1051} 1052 1053// TagValueSeriesIDIterator returns a series iterator for a single tag value. 1054func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { 1055 // Check series ID set cache... 1056 if i.tagValueCacheSize > 0 { 1057 if ss := i.tagValueCache.Get(name, key, value); ss != nil { 1058 // Return a clone because the set is mutable. 1059 return tsdb.NewSeriesIDSetIterator(ss.Clone()), nil 1060 } 1061 } 1062 1063 a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) 1064 for _, p := range i.partitions { 1065 itr, err := p.TagValueSeriesIDIterator(name, key, value) 1066 if err != nil { 1067 return nil, err 1068 } else if itr != nil { 1069 a = append(a, itr) 1070 } 1071 } 1072 1073 itr := tsdb.MergeSeriesIDIterators(a...) 1074 if i.tagValueCacheSize == 0 { 1075 return itr, nil 1076 } 1077 1078 // Check if the iterator contains only series id sets. Cache them... 1079 if ssitr, ok := itr.(tsdb.SeriesIDSetIterator); ok { 1080 ss := ssitr.SeriesIDSet() 1081 i.tagValueCache.Put(name, key, value, ss) 1082 } 1083 return itr, nil 1084} 1085 1086// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. 1087func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { 1088 n := i.availableThreads() 1089 1090 // Store results. 1091 keys := make([]map[string]struct{}, i.PartitionN) 1092 errC := make(chan error, i.PartitionN) 1093 1094 var pidx uint32 // Index of maximum Partition being worked on. 1095 for k := 0; k < n; k++ { 1096 go func() { 1097 for { 1098 idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. 1099 if idx >= len(i.partitions) { 1100 return // No more work. 1101 } 1102 1103 // This is safe since there are no readers on keys until all 1104 // the writers are done. 1105 tagKeys, err := i.partitions[idx].MeasurementTagKeysByExpr(name, expr) 1106 keys[idx] = tagKeys 1107 errC <- err 1108 } 1109 }() 1110 } 1111 1112 // Check for error 1113 for i := 0; i < cap(errC); i++ { 1114 if err := <-errC; err != nil { 1115 return nil, err 1116 } 1117 } 1118 1119 // Merge into single map. 1120 result := keys[0] 1121 for k := 1; k < len(i.partitions); k++ { 1122 for k := range keys[k] { 1123 result[k] = struct{}{} 1124 } 1125 } 1126 return result, nil 1127} 1128 1129// DiskSizeBytes returns the size of the index on disk. 1130func (i *Index) DiskSizeBytes() int64 { 1131 fs, err := i.RetainFileSet() 1132 if err != nil { 1133 i.logger.Warn("Index is closing down") 1134 return 0 1135 } 1136 defer fs.Release() 1137 1138 var manifestSize int64 1139 // Get MANIFEST sizes from each partition. 1140 for _, p := range i.partitions { 1141 manifestSize += p.manifestSize 1142 } 1143 return fs.Size() + manifestSize 1144} 1145 1146// TagKeyCardinality always returns zero. 1147// It is not possible to determine cardinality of tags across index files, and 1148// thus it cannot be done across partitions. 1149func (i *Index) TagKeyCardinality(name, key []byte) int { 1150 return 0 1151} 1152 1153// RetainFileSet returns the set of all files across all partitions. 1154// This is only needed when all files need to be retained for an operation. 1155func (i *Index) RetainFileSet() (*FileSet, error) { 1156 i.mu.RLock() 1157 defer i.mu.RUnlock() 1158 1159 fs, _ := NewFileSet(nil, i.sfile, nil) 1160 for _, p := range i.partitions { 1161 pfs, err := p.RetainFileSet() 1162 if err != nil { 1163 fs.Close() 1164 return nil, err 1165 } 1166 fs.files = append(fs.files, pfs.files...) 1167 } 1168 return fs, nil 1169} 1170 1171// SetFieldName is a no-op on this index. 1172func (i *Index) SetFieldName(measurement []byte, name string) {} 1173 1174// Rebuild rebuilds an index. It's a no-op for this index. 1175func (i *Index) Rebuild() {} 1176 1177// IsIndexDir returns true if directory contains at least one partition directory. 1178func IsIndexDir(path string) (bool, error) { 1179 fis, err := ioutil.ReadDir(path) 1180 if err != nil { 1181 return false, err 1182 } 1183 for _, fi := range fis { 1184 if !fi.IsDir() { 1185 continue 1186 } else if ok, err := IsPartitionDir(filepath.Join(path, fi.Name())); err != nil { 1187 return false, err 1188 } else if ok { 1189 return true, nil 1190 } 1191 } 1192 return false, nil 1193} 1194