1package tsdb 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "os" 11 "path/filepath" 12 "regexp" 13 "runtime" 14 "sort" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 "unsafe" 20 21 "github.com/gogo/protobuf/proto" 22 "github.com/influxdata/influxdb/models" 23 "github.com/influxdata/influxdb/pkg/bytesutil" 24 "github.com/influxdata/influxdb/pkg/estimator" 25 "github.com/influxdata/influxdb/pkg/file" 26 "github.com/influxdata/influxdb/pkg/limiter" 27 "github.com/influxdata/influxdb/pkg/slices" 28 "github.com/influxdata/influxdb/query" 29 internal "github.com/influxdata/influxdb/tsdb/internal" 30 "github.com/influxdata/influxql" 31 "go.uber.org/zap" 32) 33 34const ( 35 statWriteReq = "writeReq" 36 statWriteReqOK = "writeReqOk" 37 statWriteReqErr = "writeReqErr" 38 statSeriesCreate = "seriesCreate" 39 statFieldsCreate = "fieldsCreate" 40 statWritePointsErr = "writePointsErr" 41 statWritePointsDropped = "writePointsDropped" 42 statWritePointsOK = "writePointsOk" 43 statWriteBytes = "writeBytes" 44 statDiskBytes = "diskBytes" 45) 46 47var ( 48 // ErrFieldOverflow is returned when too many fields are created on a measurement. 49 ErrFieldOverflow = errors.New("field overflow") 50 51 // ErrFieldTypeConflict is returned when a new field already exists with a different type. 52 ErrFieldTypeConflict = errors.New("field type conflict") 53 54 // ErrFieldNotFound is returned when a field cannot be found. 55 ErrFieldNotFound = errors.New("field not found") 56 57 // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID 58 // there is no mapping for. 59 ErrFieldUnmappedID = errors.New("field ID not mapped") 60 61 // ErrEngineClosed is returned when a caller attempts indirectly to 62 // access the shard's underlying engine. 63 ErrEngineClosed = errors.New("engine is closed") 64 65 // ErrShardDisabled is returned when a the shard is not available for 66 // queries or writes. 67 ErrShardDisabled = errors.New("shard is disabled") 68 69 // ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by 70 // the file's magic number. 71 ErrUnknownFieldsFormat = errors.New("unknown field index format") 72 73 // ErrUnknownFieldType is returned when the type of a field cannot be determined. 74 ErrUnknownFieldType = errors.New("unknown field type") 75 76 // ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is 77 // attempted on a hot shard. 78 ErrShardNotIdle = errors.New("shard not idle") 79 80 // fieldsIndexMagicNumber is the file magic number for the fields index file. 81 fieldsIndexMagicNumber = []byte{0, 6, 1, 3} 82) 83 84var ( 85 // Static objects to prevent small allocs. 86 timeBytes = []byte("time") 87) 88 89// A ShardError implements the error interface, and contains extra 90// context about the shard that generated the error. 91type ShardError struct { 92 id uint64 93 Err error 94} 95 96// NewShardError returns a new ShardError. 97func NewShardError(id uint64, err error) error { 98 if err == nil { 99 return nil 100 } 101 return ShardError{id: id, Err: err} 102} 103 104// Error returns the string representation of the error, to satisfy the error interface. 105func (e ShardError) Error() string { 106 return fmt.Sprintf("[shard %d] %s", e.id, e.Err) 107} 108 109// PartialWriteError indicates a write request could only write a portion of the 110// requested values. 111type PartialWriteError struct { 112 Reason string 113 Dropped int 114 115 // A sorted slice of series keys that were dropped. 116 DroppedKeys [][]byte 117} 118 119func (e PartialWriteError) Error() string { 120 return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) 121} 122 123// Shard represents a self-contained time series database. An inverted index of 124// the measurement and tag data is kept along with the raw time series data. 125// Data can be split across many shards. The query engine in TSDB is responsible 126// for combining the output of many shards into a single query result. 127type Shard struct { 128 path string 129 walPath string 130 id uint64 131 132 database string 133 retentionPolicy string 134 135 sfile *SeriesFile 136 options EngineOptions 137 138 mu sync.RWMutex 139 _engine Engine 140 index Index 141 enabled bool 142 143 // expvar-based stats. 144 stats *ShardStatistics 145 defaultTags models.StatisticTags 146 147 baseLogger *zap.Logger 148 logger *zap.Logger 149 150 EnableOnOpen bool 151} 152 153// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index 154func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard { 155 db, rp := decodeStorePath(path) 156 logger := zap.NewNop() 157 if opt.FieldValidator == nil { 158 opt.FieldValidator = defaultFieldValidator{} 159 } 160 161 s := &Shard{ 162 id: id, 163 path: path, 164 walPath: walPath, 165 sfile: sfile, 166 options: opt, 167 168 stats: &ShardStatistics{}, 169 defaultTags: models.StatisticTags{ 170 "path": path, 171 "walPath": walPath, 172 "id": fmt.Sprintf("%d", id), 173 "database": db, 174 "retentionPolicy": rp, 175 "engine": opt.EngineVersion, 176 }, 177 178 database: db, 179 retentionPolicy: rp, 180 181 logger: logger, 182 baseLogger: logger, 183 EnableOnOpen: true, 184 } 185 return s 186} 187 188// WithLogger sets the logger on the shard. It must be called before Open. 189func (s *Shard) WithLogger(log *zap.Logger) { 190 s.baseLogger = log 191 engine, err := s.engine() 192 if err == nil { 193 engine.WithLogger(s.baseLogger) 194 s.index.WithLogger(s.baseLogger) 195 } 196 s.logger = s.baseLogger.With(zap.String("service", "shard")) 197} 198 199// SetEnabled enables the shard for queries and write. When disabled, all 200// writes and queries return an error and compactions are stopped for the shard. 201func (s *Shard) SetEnabled(enabled bool) { 202 s.mu.Lock() 203 // Prevent writes and queries 204 s.enabled = enabled 205 if s._engine != nil { 206 // Disable background compactions and snapshotting 207 s._engine.SetEnabled(enabled) 208 } 209 s.mu.Unlock() 210} 211 212// ScheduleFullCompaction forces a full compaction to be schedule on the shard. 213func (s *Shard) ScheduleFullCompaction() error { 214 engine, err := s.engine() 215 if err != nil { 216 return err 217 } 218 return engine.ScheduleFullCompaction() 219} 220 221// ID returns the shards ID. 222func (s *Shard) ID() uint64 { 223 return s.id 224} 225 226// Database returns the database of the shard. 227func (s *Shard) Database() string { 228 return s.database 229} 230 231// RetentionPolicy returns the retention policy of the shard. 232func (s *Shard) RetentionPolicy() string { 233 return s.retentionPolicy 234} 235 236// ShardStatistics maintains statistics for a shard. 237type ShardStatistics struct { 238 WriteReq int64 239 WriteReqOK int64 240 WriteReqErr int64 241 FieldsCreated int64 242 WritePointsErr int64 243 WritePointsDropped int64 244 WritePointsOK int64 245 BytesWritten int64 246 DiskBytes int64 247} 248 249// Statistics returns statistics for periodic monitoring. 250func (s *Shard) Statistics(tags map[string]string) []models.Statistic { 251 engine, err := s.engine() 252 if err != nil { 253 return nil 254 } 255 256 // Refresh our disk size stat 257 if _, err := s.DiskSize(); err != nil { 258 return nil 259 } 260 seriesN := engine.SeriesN() 261 262 tags = s.defaultTags.Merge(tags) 263 statistics := []models.Statistic{{ 264 Name: "shard", 265 Tags: tags, 266 Values: map[string]interface{}{ 267 statWriteReq: atomic.LoadInt64(&s.stats.WriteReq), 268 statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK), 269 statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr), 270 statSeriesCreate: seriesN, 271 statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated), 272 statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr), 273 statWritePointsDropped: atomic.LoadInt64(&s.stats.WritePointsDropped), 274 statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK), 275 statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten), 276 statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes), 277 }, 278 }} 279 280 // Add the index and engine statistics. 281 statistics = append(statistics, engine.Statistics(tags)...) 282 return statistics 283} 284 285// Path returns the path set on the shard when it was created. 286func (s *Shard) Path() string { return s.path } 287 288// Open initializes and opens the shard's store. 289func (s *Shard) Open() error { 290 if err := func() error { 291 s.mu.Lock() 292 defer s.mu.Unlock() 293 294 // Return if the shard is already open 295 if s._engine != nil { 296 return nil 297 } 298 299 seriesIDSet := NewSeriesIDSet() 300 301 // Initialize underlying index. 302 ipath := filepath.Join(s.path, "index") 303 idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options) 304 if err != nil { 305 return err 306 } 307 308 // Open index. 309 if err := idx.Open(); err != nil { 310 return err 311 } 312 s.index = idx 313 idx.WithLogger(s.baseLogger) 314 315 // Initialize underlying engine. 316 e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options) 317 if err != nil { 318 return err 319 } 320 321 // Set log output on the engine. 322 e.WithLogger(s.baseLogger) 323 324 // Disable compactions while loading the index 325 e.SetEnabled(false) 326 327 // Open engine. 328 if err := e.Open(); err != nil { 329 return err 330 } 331 332 // Load metadata index for the inmem index only. 333 if err := e.LoadMetadataIndex(s.id, s.index); err != nil { 334 return err 335 } 336 s._engine = e 337 338 return nil 339 }(); err != nil { 340 s.close() 341 return NewShardError(s.id, err) 342 } 343 344 if s.EnableOnOpen { 345 // enable writes, queries and compactions 346 s.SetEnabled(true) 347 } 348 349 return nil 350} 351 352// Close shuts down the shard's store. 353func (s *Shard) Close() error { 354 s.mu.Lock() 355 defer s.mu.Unlock() 356 return s.close() 357} 358 359// close closes the shard an removes reference to the shard from associated 360// indexes, unless clean is false. 361func (s *Shard) close() error { 362 if s._engine == nil { 363 return nil 364 } 365 366 err := s._engine.Close() 367 if err == nil { 368 s._engine = nil 369 } 370 371 if e := s.index.Close(); e == nil { 372 s.index = nil 373 } 374 return err 375} 376 377// IndexType returns the index version being used for this shard. 378// 379// IndexType returns the empty string if it is called before the shard is opened, 380// since it is only that point that the underlying index type is known. 381func (s *Shard) IndexType() string { 382 s.mu.RLock() 383 defer s.mu.RUnlock() 384 if s._engine == nil || s.index == nil { // Shard not open yet. 385 return "" 386 } 387 return s.index.Type() 388} 389 390// ready determines if the Shard is ready for queries or writes. 391// It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled 392func (s *Shard) ready() error { 393 var err error 394 if s._engine == nil { 395 err = ErrEngineClosed 396 } else if !s.enabled { 397 err = ErrShardDisabled 398 } 399 return err 400} 401 402// LastModified returns the time when this shard was last modified. 403func (s *Shard) LastModified() time.Time { 404 engine, err := s.engine() 405 if err != nil { 406 return time.Time{} 407 } 408 return engine.LastModified() 409} 410 411// Index returns a reference to the underlying index. It returns an error if 412// the index is nil. 413func (s *Shard) Index() (Index, error) { 414 s.mu.RLock() 415 defer s.mu.RUnlock() 416 if err := s.ready(); err != nil { 417 return nil, err 418 } 419 return s.index, nil 420} 421 422func (s *Shard) seriesFile() (*SeriesFile, error) { 423 s.mu.RLock() 424 defer s.mu.RUnlock() 425 if err := s.ready(); err != nil { 426 return nil, err 427 } 428 return s.sfile, nil 429} 430 431// IsIdle return true if the shard is not receiving writes and is fully compacted. 432func (s *Shard) IsIdle() bool { 433 engine, err := s.engine() 434 if err != nil { 435 return true 436 } 437 return engine.IsIdle() 438} 439 440func (s *Shard) Free() error { 441 engine, err := s.engine() 442 if err != nil { 443 return err 444 } 445 446 // Disable compactions to stop background goroutines 447 s.SetCompactionsEnabled(false) 448 449 return engine.Free() 450} 451 452// SetCompactionsEnabled enables or disable shard background compactions. 453func (s *Shard) SetCompactionsEnabled(enabled bool) { 454 engine, err := s.engine() 455 if err != nil { 456 return 457 } 458 engine.SetCompactionsEnabled(enabled) 459} 460 461// DiskSize returns the size on disk of this shard. 462func (s *Shard) DiskSize() (int64, error) { 463 s.mu.RLock() 464 defer s.mu.RUnlock() 465 // We don't use engine() becuase we still want to report the shard's disk 466 // size even if the shard has been disabled. 467 if s._engine == nil { 468 return 0, ErrEngineClosed 469 } 470 size := s._engine.DiskSize() 471 atomic.StoreInt64(&s.stats.DiskBytes, size) 472 return size, nil 473} 474 475// FieldCreate holds information for a field to create on a measurement. 476type FieldCreate struct { 477 Measurement []byte 478 Field *Field 479} 480 481// WritePoints will write the raw data points and any new metadata to the index in the shard. 482func (s *Shard) WritePoints(points []models.Point) error { 483 s.mu.RLock() 484 defer s.mu.RUnlock() 485 486 engine, err := s.engineNoLock() 487 if err != nil { 488 return err 489 } 490 491 var writeError error 492 atomic.AddInt64(&s.stats.WriteReq, 1) 493 494 points, fieldsToCreate, err := s.validateSeriesAndFields(points) 495 if err != nil { 496 if _, ok := err.(PartialWriteError); !ok { 497 return err 498 } 499 // There was a partial write (points dropped), hold onto the error to return 500 // to the caller, but continue on writing the remaining points. 501 writeError = err 502 } 503 atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate))) 504 505 // add any new fields and keep track of what needs to be saved 506 if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { 507 return err 508 } 509 510 // Write to the engine. 511 if err := engine.WritePoints(points); err != nil { 512 atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points))) 513 atomic.AddInt64(&s.stats.WriteReqErr, 1) 514 return fmt.Errorf("engine: %s", err) 515 } 516 atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points))) 517 atomic.AddInt64(&s.stats.WriteReqOK, 1) 518 519 return writeError 520} 521 522// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed. 523func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, []*FieldCreate, error) { 524 var ( 525 fieldsToCreate []*FieldCreate 526 err error 527 dropped int 528 reason string // only first error reason is set unless returned from CreateSeriesListIfNotExists 529 ) 530 531 // Create all series against the index in bulk. 532 keys := make([][]byte, len(points)) 533 names := make([][]byte, len(points)) 534 tagsSlice := make([]models.Tags, len(points)) 535 536 var j int 537 for i, p := range points { 538 tags := p.Tags() 539 540 // Drop any series w/ a "time" tag, these are illegal 541 if v := tags.Get(timeBytes); v != nil { 542 dropped++ 543 if reason == "" { 544 reason = fmt.Sprintf( 545 "invalid tag key: input tag \"%s\" on measurement \"%s\" is invalid", 546 "time", string(p.Name())) 547 } 548 continue 549 } 550 551 keys[j] = p.Key() 552 names[j] = p.Name() 553 tagsSlice[j] = tags 554 points[j] = points[i] 555 j++ 556 } 557 points, keys, names, tagsSlice = points[:j], keys[:j], names[:j], tagsSlice[:j] 558 559 engine, err := s.engineNoLock() 560 if err != nil { 561 return nil, nil, err 562 } 563 564 // Add new series. Check for partial writes. 565 var droppedKeys [][]byte 566 if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice); err != nil { 567 switch err := err.(type) { 568 // TODO(jmw): why is this a *PartialWriteError when everything else is not a pointer? 569 // Maybe we can just change it to be consistent if we change it also in all 570 // the places that construct it. 571 case *PartialWriteError: 572 reason = err.Reason 573 dropped += err.Dropped 574 droppedKeys = err.DroppedKeys 575 atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) 576 default: 577 return nil, nil, err 578 } 579 } 580 581 // Create a MeasurementFields cache. 582 mfCache := make(map[string]*MeasurementFields, 16) 583 j = 0 584 for i, p := range points { 585 // Skip any points with only invalid fields. 586 iter := p.FieldIterator() 587 validField := false 588 for iter.Next() { 589 if bytes.Equal(iter.FieldKey(), timeBytes) { 590 continue 591 } 592 validField = true 593 break 594 } 595 if !validField { 596 if reason == "" { 597 reason = fmt.Sprintf( 598 "invalid field name: input field \"%s\" on measurement \"%s\" is invalid", 599 "time", string(p.Name())) 600 } 601 dropped++ 602 continue 603 } 604 605 // Skip any points whos keys have been dropped. Dropped has already been incremented for them. 606 if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) { 607 continue 608 } 609 610 // Grab the MeasurementFields checking the local cache to avoid lock contention. 611 name := p.Name() 612 mf := mfCache[string(name)] 613 if mf == nil { 614 mf = engine.MeasurementFields(name).Clone() 615 mfCache[string(name)] = mf 616 } 617 618 // Check with the field validator. 619 if err := s.options.FieldValidator.Validate(mf, p); err != nil { 620 switch err := err.(type) { 621 case PartialWriteError: 622 if reason == "" { 623 reason = err.Reason 624 } 625 dropped += err.Dropped 626 atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) 627 default: 628 return nil, nil, err 629 } 630 continue 631 } 632 633 points[j] = points[i] 634 j++ 635 636 // Create any fields that are missing. 637 iter.Reset() 638 for iter.Next() { 639 fieldKey := iter.FieldKey() 640 641 // Skip fields named "time". They are illegal. 642 if bytes.Equal(fieldKey, timeBytes) { 643 continue 644 } 645 646 if mf.FieldBytes(fieldKey) != nil { 647 continue 648 } 649 650 dataType := dataTypeFromModelsFieldType(iter.Type()) 651 if dataType == influxql.Unknown { 652 continue 653 } 654 655 fieldsToCreate = append(fieldsToCreate, &FieldCreate{ 656 Measurement: name, 657 Field: &Field{ 658 Name: string(fieldKey), 659 Type: dataType, 660 }, 661 }) 662 } 663 } 664 665 if dropped > 0 { 666 err = PartialWriteError{Reason: reason, Dropped: dropped} 667 } 668 669 return points[:j], fieldsToCreate, err 670} 671 672func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error { 673 if len(fieldsToCreate) == 0 { 674 return nil 675 } 676 677 engine, err := s.engineNoLock() 678 if err != nil { 679 return err 680 } 681 682 // add fields 683 for _, f := range fieldsToCreate { 684 mf := engine.MeasurementFields(f.Measurement) 685 if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { 686 return err 687 } 688 689 s.index.SetFieldName(f.Measurement, f.Field.Name) 690 } 691 692 if len(fieldsToCreate) > 0 { 693 return engine.MeasurementFieldSet().Save() 694 } 695 696 return nil 697} 698 699// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) 700func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error { 701 engine, err := s.engine() 702 if err != nil { 703 return err 704 } 705 return engine.DeleteSeriesRange(itr, min, max) 706} 707 708// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive) 709// for which predicate() returns true. If predicate() is nil, then all values in range are deleted. 710func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error { 711 engine, err := s.engine() 712 if err != nil { 713 return err 714 } 715 return engine.DeleteSeriesRangeWithPredicate(itr, predicate) 716} 717 718// DeleteMeasurement deletes a measurement and all underlying series. 719func (s *Shard) DeleteMeasurement(name []byte) error { 720 engine, err := s.engine() 721 if err != nil { 722 return err 723 } 724 return engine.DeleteMeasurement(name) 725} 726 727// SeriesN returns the unique number of series in the shard. 728func (s *Shard) SeriesN() int64 { 729 engine, err := s.engine() 730 if err != nil { 731 return 0 732 } 733 return engine.SeriesN() 734} 735 736// SeriesSketches returns the measurement sketches for the shard. 737func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { 738 engine, err := s.engine() 739 if err != nil { 740 return nil, nil, err 741 } 742 return engine.SeriesSketches() 743} 744 745// MeasurementsSketches returns the measurement sketches for the shard. 746func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { 747 engine, err := s.engine() 748 if err != nil { 749 return nil, nil, err 750 } 751 return engine.MeasurementsSketches() 752} 753 754// MeasurementNamesByRegex returns names of measurements matching the regular expression. 755func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { 756 engine, err := s.engine() 757 if err != nil { 758 return nil, err 759 } 760 return engine.MeasurementNamesByRegex(re) 761} 762 763// MeasurementTagKeysByExpr returns all the tag keys for the provided expression. 764func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { 765 engine, err := s.engine() 766 if err != nil { 767 return nil, err 768 } 769 return engine.MeasurementTagKeysByExpr(name, expr) 770} 771 772// MeasurementTagKeyValuesByExpr returns all the tag keys values for the 773// provided expression. 774func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { 775 index, err := s.Index() 776 if err != nil { 777 return nil, err 778 } 779 indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} 780 return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted) 781} 782 783// MeasurementFields returns fields for a measurement. 784// TODO(edd): This method is currently only being called from tests; do we 785// really need it? 786func (s *Shard) MeasurementFields(name []byte) *MeasurementFields { 787 engine, err := s.engine() 788 if err != nil { 789 return nil 790 } 791 return engine.MeasurementFields(name) 792} 793 794// MeasurementExists returns true if the shard contains name. 795// TODO(edd): This method is currently only being called from tests; do we 796// really need it? 797func (s *Shard) MeasurementExists(name []byte) (bool, error) { 798 engine, err := s.engine() 799 if err != nil { 800 return false, err 801 } 802 return engine.MeasurementExists(name) 803} 804 805// WriteTo writes the shard's data to w. 806func (s *Shard) WriteTo(w io.Writer) (int64, error) { 807 engine, err := s.engine() 808 if err != nil { 809 return 0, err 810 } 811 n, err := engine.WriteTo(w) 812 atomic.AddInt64(&s.stats.BytesWritten, int64(n)) 813 return n, err 814} 815 816// CreateIterator returns an iterator for the data in the shard. 817func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { 818 engine, err := s.engine() 819 if err != nil { 820 return nil, err 821 } 822 switch m.SystemIterator { 823 case "_fieldKeys": 824 return NewFieldKeysIterator(s, opt) 825 case "_series": 826 // TODO(benbjohnson): Move up to the Shards.CreateIterator(). 827 index, err := s.Index() 828 if err != nil { 829 return nil, err 830 } 831 indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} 832 833 itr, err := NewSeriesPointIterator(indexSet, opt) 834 if err != nil { 835 return nil, err 836 } 837 838 return query.NewInterruptIterator(itr, opt.InterruptCh), nil 839 case "_tagKeys": 840 return NewTagKeysIterator(s, opt) 841 } 842 return engine.CreateIterator(ctx, m.Name, opt) 843} 844 845func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) { 846 index, err := s.Index() 847 if err != nil { 848 return nil, err 849 } 850 return newSeriesCursor(req, IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}, cond) 851} 852 853func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) { 854 engine, err := s.engine() 855 if err != nil { 856 return nil, err 857 } 858 return engine.CreateCursorIterator(ctx) 859} 860 861// FieldDimensions returns unique sets of fields and dimensions across a list of sources. 862func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 863 engine, err := s.engine() 864 if err != nil { 865 return nil, nil, err 866 } 867 868 fields = make(map[string]influxql.DataType) 869 dimensions = make(map[string]struct{}) 870 871 index, err := s.Index() 872 if err != nil { 873 return nil, nil, err 874 } 875 for _, name := range measurements { 876 // Handle system sources. 877 if strings.HasPrefix(name, "_") { 878 var keys []string 879 switch name { 880 case "_fieldKeys": 881 keys = []string{"fieldKey", "fieldType"} 882 case "_series": 883 keys = []string{"key"} 884 case "_tagKeys": 885 keys = []string{"tagKey"} 886 } 887 888 if len(keys) > 0 { 889 for _, k := range keys { 890 if fields[k].LessThan(influxql.String) { 891 fields[k] = influxql.String 892 } 893 } 894 continue 895 } 896 // Unknown system source so default to looking for a measurement. 897 } 898 899 // Retrieve measurement. 900 if exists, err := engine.MeasurementExists([]byte(name)); err != nil { 901 return nil, nil, err 902 } else if !exists { 903 continue 904 } 905 906 // Append fields and dimensions. 907 mf := engine.MeasurementFields([]byte(name)) 908 if mf != nil { 909 for k, typ := range mf.FieldSet() { 910 if fields[k].LessThan(typ) { 911 fields[k] = typ 912 } 913 } 914 } 915 916 indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} 917 if err := indexSet.ForEachMeasurementTagKey([]byte(name), func(key []byte) error { 918 dimensions[string(key)] = struct{}{} 919 return nil 920 }); err != nil { 921 return nil, nil, err 922 } 923 } 924 925 return fields, dimensions, nil 926} 927 928// mapType returns the data type for the field within the measurement. 929func (s *Shard) mapType(measurement, field string) (influxql.DataType, error) { 930 engine, err := s.engineNoLock() 931 if err != nil { 932 return 0, err 933 } 934 935 switch field { 936 case "_name", "_tagKey", "_tagValue", "_seriesKey": 937 return influxql.String, nil 938 } 939 940 // Process system measurements. 941 switch measurement { 942 case "_fieldKeys": 943 if field == "fieldKey" || field == "fieldType" { 944 return influxql.String, nil 945 } 946 return influxql.Unknown, nil 947 case "_series": 948 if field == "key" { 949 return influxql.String, nil 950 } 951 return influxql.Unknown, nil 952 case "_tagKeys": 953 if field == "tagKey" { 954 return influxql.String, nil 955 } 956 return influxql.Unknown, nil 957 } 958 // Unknown system source so default to looking for a measurement. 959 960 if exists, _ := engine.MeasurementExists([]byte(measurement)); !exists { 961 return influxql.Unknown, nil 962 } 963 964 mf := engine.MeasurementFields([]byte(measurement)) 965 if mf != nil { 966 f := mf.Field(field) 967 if f != nil { 968 return f.Type, nil 969 } 970 } 971 972 if exists, _ := engine.HasTagKey([]byte(measurement), []byte(field)); exists { 973 return influxql.Tag, nil 974 } 975 976 return influxql.Unknown, nil 977} 978 979// expandSources expands regex sources and removes duplicates. 980// NOTE: sources must be normalized (db and rp set) before calling this function. 981func (s *Shard) expandSources(sources influxql.Sources) (influxql.Sources, error) { 982 engine, err := s.engineNoLock() 983 if err != nil { 984 return nil, err 985 } 986 987 // Use a map as a set to prevent duplicates. 988 set := map[string]influxql.Source{} 989 990 // Iterate all sources, expanding regexes when they're found. 991 for _, source := range sources { 992 switch src := source.(type) { 993 case *influxql.Measurement: 994 // Add non-regex measurements directly to the set. 995 if src.Regex == nil { 996 set[src.String()] = src 997 continue 998 } 999 1000 // Loop over matching measurements. 1001 names, err := engine.MeasurementNamesByRegex(src.Regex.Val) 1002 if err != nil { 1003 return nil, err 1004 } 1005 1006 for _, name := range names { 1007 other := &influxql.Measurement{ 1008 Database: src.Database, 1009 RetentionPolicy: src.RetentionPolicy, 1010 Name: string(name), 1011 } 1012 set[other.String()] = other 1013 } 1014 1015 default: 1016 return nil, fmt.Errorf("expandSources: unsupported source type: %T", source) 1017 } 1018 } 1019 1020 // Convert set to sorted slice. 1021 names := make([]string, 0, len(set)) 1022 for name := range set { 1023 names = append(names, name) 1024 } 1025 sort.Strings(names) 1026 1027 // Convert set to a list of Sources. 1028 expanded := make(influxql.Sources, 0, len(set)) 1029 for _, name := range names { 1030 expanded = append(expanded, set[name]) 1031 } 1032 1033 return expanded, nil 1034} 1035 1036// Backup backs up the shard by creating a tar archive of all TSM files that 1037// have been modified since the provided time. See Engine.Backup for more details. 1038func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error { 1039 engine, err := s.engine() 1040 if err != nil { 1041 return err 1042 } 1043 return engine.Backup(w, basePath, since) 1044} 1045 1046func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { 1047 engine, err := s.engine() 1048 if err != nil { 1049 return err 1050 } 1051 return engine.Export(w, basePath, start, end) 1052} 1053 1054// Restore restores data to the underlying engine for the shard. 1055// The shard is reopened after restore. 1056func (s *Shard) Restore(r io.Reader, basePath string) error { 1057 if err := func() error { 1058 s.mu.Lock() 1059 defer s.mu.Unlock() 1060 1061 // Special case - we can still restore to a disabled shard, so we should 1062 // only check if the engine is closed and not care if the shard is 1063 // disabled. 1064 if s._engine == nil { 1065 return ErrEngineClosed 1066 } 1067 1068 // Restore to engine. 1069 return s._engine.Restore(r, basePath) 1070 }(); err != nil { 1071 return err 1072 } 1073 1074 // Close shard. 1075 if err := s.Close(); err != nil { 1076 return err 1077 } 1078 1079 // Reopen engine. 1080 return s.Open() 1081} 1082 1083// Import imports data to the underlying engine for the shard. r should 1084// be a reader from a backup created by Backup. 1085func (s *Shard) Import(r io.Reader, basePath string) error { 1086 // Special case - we can still import to a disabled shard, so we should 1087 // only check if the engine is closed and not care if the shard is 1088 // disabled. 1089 s.mu.Lock() 1090 defer s.mu.Unlock() 1091 if s._engine == nil { 1092 return ErrEngineClosed 1093 } 1094 1095 // Import to engine. 1096 return s._engine.Import(r, basePath) 1097} 1098 1099// CreateSnapshot will return a path to a temp directory 1100// containing hard links to the underlying shard files. 1101func (s *Shard) CreateSnapshot() (string, error) { 1102 engine, err := s.engine() 1103 if err != nil { 1104 return "", err 1105 } 1106 return engine.CreateSnapshot() 1107} 1108 1109// ForEachMeasurementName iterates over each measurement in the shard. 1110func (s *Shard) ForEachMeasurementName(fn func(name []byte) error) error { 1111 engine, err := s.engine() 1112 if err != nil { 1113 return err 1114 } 1115 return engine.ForEachMeasurementName(fn) 1116} 1117 1118func (s *Shard) TagKeyCardinality(name, key []byte) int { 1119 engine, err := s.engine() 1120 if err != nil { 1121 return 0 1122 } 1123 return engine.TagKeyCardinality(name, key) 1124} 1125 1126// Digest returns a digest of the shard. 1127func (s *Shard) Digest() (io.ReadCloser, int64, error) { 1128 engine, err := s.engine() 1129 if err != nil { 1130 return nil, 0, err 1131 } 1132 1133 // Make sure the shard is idle/cold. (No use creating a digest of a 1134 // hot shard that is rapidly changing.) 1135 if !engine.IsIdle() { 1136 return nil, 0, ErrShardNotIdle 1137 } 1138 1139 return engine.Digest() 1140} 1141 1142// engine safely (under an RLock) returns a reference to the shard's Engine, or 1143// an error if the Engine is closed, or the shard is currently disabled. 1144// 1145// The shard's Engine should always be accessed via a call to engine(), rather 1146// than directly referencing Shard.engine. 1147// 1148// If a caller needs an Engine reference but is already under a lock, then they 1149// should use engineNoLock(). 1150func (s *Shard) engine() (Engine, error) { 1151 s.mu.RLock() 1152 defer s.mu.RUnlock() 1153 return s.engineNoLock() 1154} 1155 1156// engineNoLock is similar to calling engine(), but the caller must guarantee 1157// that they already hold an appropriate lock. 1158func (s *Shard) engineNoLock() (Engine, error) { 1159 if err := s.ready(); err != nil { 1160 return nil, err 1161 } 1162 return s._engine, nil 1163} 1164 1165type ShardGroup interface { 1166 MeasurementsByRegex(re *regexp.Regexp) []string 1167 FieldKeysByMeasurement(name []byte) []string 1168 FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) 1169 MapType(measurement, field string) influxql.DataType 1170 CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) 1171 IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) 1172 ExpandSources(sources influxql.Sources) (influxql.Sources, error) 1173} 1174 1175// Shards represents a sortable list of shards. 1176type Shards []*Shard 1177 1178// Len implements sort.Interface. 1179func (a Shards) Len() int { return len(a) } 1180 1181// Less implements sort.Interface. 1182func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id } 1183 1184// Swap implements sort.Interface. 1185func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1186 1187// MeasurementsByRegex returns the unique set of measurements matching the 1188// provided regex, for all the shards. 1189func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string { 1190 var m map[string]struct{} 1191 for _, sh := range a { 1192 names, err := sh.MeasurementNamesByRegex(re) 1193 if err != nil { 1194 continue // Skip this shard's results—previous behaviour. 1195 } 1196 1197 if m == nil { 1198 m = make(map[string]struct{}, len(names)) 1199 } 1200 1201 for _, name := range names { 1202 m[string(name)] = struct{}{} 1203 } 1204 } 1205 1206 if len(m) == 0 { 1207 return nil 1208 } 1209 1210 names := make([]string, 0, len(m)) 1211 for key := range m { 1212 names = append(names, key) 1213 } 1214 sort.Strings(names) 1215 return names 1216} 1217 1218// FieldKeysByMeasurement returns a de-duplicated, sorted, set of field keys for 1219// the provided measurement name. 1220func (a Shards) FieldKeysByMeasurement(name []byte) []string { 1221 if len(a) == 1 { 1222 mf := a[0].MeasurementFields(name) 1223 if mf == nil { 1224 return nil 1225 } 1226 return mf.FieldKeys() 1227 } 1228 1229 all := make([][]string, 0, len(a)) 1230 for _, shard := range a { 1231 mf := shard.MeasurementFields(name) 1232 if mf == nil { 1233 continue 1234 } 1235 all = append(all, mf.FieldKeys()) 1236 } 1237 return slices.MergeSortedStrings(all...) 1238} 1239 1240func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 1241 fields = make(map[string]influxql.DataType) 1242 dimensions = make(map[string]struct{}) 1243 1244 for _, sh := range a { 1245 f, d, err := sh.FieldDimensions(measurements) 1246 if err != nil { 1247 return nil, nil, err 1248 } 1249 for k, typ := range f { 1250 if fields[k].LessThan(typ) { 1251 fields[k] = typ 1252 } 1253 } 1254 for k := range d { 1255 dimensions[k] = struct{}{} 1256 } 1257 } 1258 return 1259} 1260 1261func (a Shards) MapType(measurement, field string) influxql.DataType { 1262 var typ influxql.DataType 1263 for _, sh := range a { 1264 sh.mu.RLock() 1265 if t, err := sh.mapType(measurement, field); err == nil && typ.LessThan(t) { 1266 typ = t 1267 } 1268 sh.mu.RUnlock() 1269 } 1270 return typ 1271} 1272 1273func (a Shards) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { 1274 typmap := query.CallTypeMapper{} 1275 return typmap.CallType(name, args) 1276} 1277 1278func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { 1279 switch measurement.SystemIterator { 1280 case "_series": 1281 return a.createSeriesIterator(ctx, opt) 1282 } 1283 1284 itrs := make([]query.Iterator, 0, len(a)) 1285 for _, sh := range a { 1286 itr, err := sh.CreateIterator(ctx, measurement, opt) 1287 if err != nil { 1288 query.Iterators(itrs).Close() 1289 return nil, err 1290 } else if itr == nil { 1291 continue 1292 } 1293 itrs = append(itrs, itr) 1294 1295 select { 1296 case <-opt.InterruptCh: 1297 query.Iterators(itrs).Close() 1298 return nil, query.ErrQueryInterrupted 1299 default: 1300 } 1301 1302 // Enforce series limit at creation time. 1303 if opt.MaxSeriesN > 0 { 1304 stats := itr.Stats() 1305 if stats.SeriesN > opt.MaxSeriesN { 1306 query.Iterators(itrs).Close() 1307 return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", stats.SeriesN, opt.MaxSeriesN) 1308 } 1309 } 1310 } 1311 return query.Iterators(itrs).Merge(opt) 1312} 1313 1314func (a Shards) createSeriesIterator(ctx context.Context, opt query.IteratorOptions) (_ query.Iterator, err error) { 1315 var ( 1316 idxs = make([]Index, 0, len(a)) 1317 sfile *SeriesFile 1318 ) 1319 for _, sh := range a { 1320 var idx Index 1321 if idx, err = sh.Index(); err == nil { 1322 idxs = append(idxs, idx) 1323 } 1324 if sfile == nil { 1325 sfile, _ = sh.seriesFile() 1326 } 1327 } 1328 1329 if sfile == nil { 1330 return nil, nil 1331 } 1332 1333 return NewSeriesPointIterator(IndexSet{Indexes: idxs, SeriesFile: sfile}, opt) 1334} 1335 1336func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { 1337 var costs query.IteratorCost 1338 var costerr error 1339 var mu sync.RWMutex 1340 1341 setErr := func(err error) { 1342 mu.Lock() 1343 defer mu.Unlock() 1344 if costerr == nil { 1345 costerr = err 1346 } 1347 } 1348 1349 limit := limiter.NewFixed(runtime.GOMAXPROCS(0)) 1350 var wg sync.WaitGroup 1351 for _, sh := range a { 1352 limit.Take() 1353 wg.Add(1) 1354 1355 mu.RLock() 1356 if costerr != nil { 1357 mu.RUnlock() 1358 break 1359 } 1360 mu.RUnlock() 1361 1362 go func(sh *Shard) { 1363 defer limit.Release() 1364 defer wg.Done() 1365 1366 engine, err := sh.engine() 1367 if err != nil { 1368 setErr(err) 1369 return 1370 } 1371 1372 cost, err := engine.IteratorCost(measurement, opt) 1373 if err != nil { 1374 setErr(err) 1375 return 1376 } 1377 1378 mu.Lock() 1379 costs = costs.Combine(cost) 1380 mu.Unlock() 1381 }(sh) 1382 } 1383 wg.Wait() 1384 return costs, costerr 1385} 1386 1387func (a Shards) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (_ SeriesCursor, err error) { 1388 var ( 1389 idxs []Index 1390 sfile *SeriesFile 1391 ) 1392 for _, sh := range a { 1393 var idx Index 1394 if idx, err = sh.Index(); err == nil { 1395 idxs = append(idxs, idx) 1396 } 1397 if sfile == nil { 1398 sfile, _ = sh.seriesFile() 1399 } 1400 } 1401 1402 if sfile == nil { 1403 return nil, errors.New("CreateSeriesCursor: no series file") 1404 } 1405 1406 return newSeriesCursor(req, IndexSet{Indexes: idxs, SeriesFile: sfile}, cond) 1407} 1408 1409func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { 1410 // Use a map as a set to prevent duplicates. 1411 set := map[string]influxql.Source{} 1412 1413 // Iterate through every shard and expand the sources. 1414 for _, sh := range a { 1415 sh.mu.RLock() 1416 expanded, err := sh.expandSources(sources) 1417 sh.mu.RUnlock() 1418 if err != nil { 1419 return nil, err 1420 } 1421 1422 for _, src := range expanded { 1423 switch src := src.(type) { 1424 case *influxql.Measurement: 1425 set[src.String()] = src 1426 default: 1427 return nil, fmt.Errorf("Store.ExpandSources: unsupported source type: %T", src) 1428 } 1429 } 1430 } 1431 1432 // Convert set to sorted slice. 1433 names := make([]string, 0, len(set)) 1434 for name := range set { 1435 names = append(names, name) 1436 } 1437 sort.Strings(names) 1438 1439 // Convert set to a list of Sources. 1440 sorted := make([]influxql.Source, 0, len(set)) 1441 for _, name := range names { 1442 sorted = append(sorted, set[name]) 1443 } 1444 return sorted, nil 1445} 1446 1447// MeasurementFields holds the fields of a measurement and their codec. 1448type MeasurementFields struct { 1449 mu sync.RWMutex 1450 1451 fields map[string]*Field 1452} 1453 1454// NewMeasurementFields returns an initialised *MeasurementFields value. 1455func NewMeasurementFields() *MeasurementFields { 1456 return &MeasurementFields{fields: make(map[string]*Field)} 1457} 1458 1459func (m *MeasurementFields) FieldKeys() []string { 1460 m.mu.RLock() 1461 defer m.mu.RUnlock() 1462 1463 a := make([]string, 0, len(m.fields)) 1464 for key := range m.fields { 1465 a = append(a, key) 1466 } 1467 sort.Strings(a) 1468 return a 1469} 1470 1471// bytes estimates the memory footprint of this MeasurementFields, in bytes. 1472func (m *MeasurementFields) bytes() int { 1473 var b int 1474 m.mu.RLock() 1475 b += 24 // mu RWMutex is 24 bytes 1476 b += int(unsafe.Sizeof(m.fields)) 1477 for k, v := range m.fields { 1478 b += int(unsafe.Sizeof(k)) + len(k) 1479 b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name) 1480 } 1481 m.mu.RUnlock() 1482 return b 1483} 1484 1485// CreateFieldIfNotExists creates a new field with an autoincrementing ID. 1486// Returns an error if 255 fields have already been created on the measurement or 1487// the fields already exists with a different type. 1488func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error { 1489 m.mu.RLock() 1490 1491 // Ignore if the field already exists. 1492 if f := m.fields[string(name)]; f != nil { 1493 if f.Type != typ { 1494 m.mu.RUnlock() 1495 return ErrFieldTypeConflict 1496 } 1497 m.mu.RUnlock() 1498 return nil 1499 } 1500 m.mu.RUnlock() 1501 1502 m.mu.Lock() 1503 defer m.mu.Unlock() 1504 1505 // Re-check field and type under write lock. 1506 if f := m.fields[string(name)]; f != nil { 1507 if f.Type != typ { 1508 return ErrFieldTypeConflict 1509 } 1510 return nil 1511 } 1512 1513 // Create and append a new field. 1514 f := &Field{ 1515 ID: uint8(len(m.fields) + 1), 1516 Name: string(name), 1517 Type: typ, 1518 } 1519 m.fields[string(name)] = f 1520 1521 return nil 1522} 1523 1524func (m *MeasurementFields) FieldN() int { 1525 m.mu.RLock() 1526 n := len(m.fields) 1527 m.mu.RUnlock() 1528 return n 1529} 1530 1531// Field returns the field for name, or nil if there is no field for name. 1532func (m *MeasurementFields) Field(name string) *Field { 1533 m.mu.RLock() 1534 f := m.fields[name] 1535 m.mu.RUnlock() 1536 return f 1537} 1538 1539func (m *MeasurementFields) HasField(name string) bool { 1540 if m == nil { 1541 return false 1542 } 1543 m.mu.RLock() 1544 f := m.fields[name] 1545 m.mu.RUnlock() 1546 return f != nil 1547} 1548 1549// FieldBytes returns the field for name, or nil if there is no field for name. 1550// FieldBytes should be preferred to Field when the caller has a []byte, because 1551// it avoids a string allocation, which can't be avoided if the caller converts 1552// the []byte to a string and calls Field. 1553func (m *MeasurementFields) FieldBytes(name []byte) *Field { 1554 m.mu.RLock() 1555 f := m.fields[string(name)] 1556 m.mu.RUnlock() 1557 return f 1558} 1559 1560// FieldSet returns the set of fields and their types for the measurement. 1561func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { 1562 m.mu.RLock() 1563 defer m.mu.RUnlock() 1564 1565 fields := make(map[string]influxql.DataType) 1566 for name, f := range m.fields { 1567 fields[name] = f.Type 1568 } 1569 return fields 1570} 1571 1572func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) { 1573 m.mu.RLock() 1574 defer m.mu.RUnlock() 1575 for name, f := range m.fields { 1576 if !fn(name, f.Type) { 1577 return 1578 } 1579 } 1580} 1581 1582// Clone returns copy of the MeasurementFields 1583func (m *MeasurementFields) Clone() *MeasurementFields { 1584 m.mu.RLock() 1585 defer m.mu.RUnlock() 1586 fields := make(map[string]*Field, len(m.fields)) 1587 for key, field := range m.fields { 1588 fields[key] = field 1589 } 1590 return &MeasurementFields{ 1591 fields: fields, 1592 } 1593} 1594 1595// MeasurementFieldSet represents a collection of fields by measurement. 1596// This safe for concurrent use. 1597type MeasurementFieldSet struct { 1598 mu sync.RWMutex 1599 fields map[string]*MeasurementFields 1600 1601 // path is the location to persist field sets 1602 path string 1603} 1604 1605// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. 1606func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) { 1607 fs := &MeasurementFieldSet{ 1608 fields: make(map[string]*MeasurementFields), 1609 path: path, 1610 } 1611 1612 // If there is a load error, return the error and an empty set so 1613 // it can be rebuild manually. 1614 return fs, fs.load() 1615} 1616 1617// Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes. 1618func (fs *MeasurementFieldSet) Bytes() int { 1619 var b int 1620 fs.mu.RLock() 1621 b += 24 // mu RWMutex is 24 bytes 1622 for k, v := range fs.fields { 1623 b += int(unsafe.Sizeof(k)) + len(k) 1624 b += int(unsafe.Sizeof(v)) + v.bytes() 1625 } 1626 b += int(unsafe.Sizeof(fs.fields)) 1627 b += int(unsafe.Sizeof(fs.path)) + len(fs.path) 1628 fs.mu.RUnlock() 1629 return b 1630} 1631 1632// Fields returns fields for a measurement by name. 1633func (fs *MeasurementFieldSet) Fields(name []byte) *MeasurementFields { 1634 fs.mu.RLock() 1635 mf := fs.fields[string(name)] 1636 fs.mu.RUnlock() 1637 return mf 1638} 1639 1640// FieldsByString returns fields for a measurment by name. 1641func (fs *MeasurementFieldSet) FieldsByString(name string) *MeasurementFields { 1642 fs.mu.RLock() 1643 mf := fs.fields[name] 1644 fs.mu.RUnlock() 1645 return mf 1646} 1647 1648// CreateFieldsIfNotExists returns fields for a measurement by name. 1649func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *MeasurementFields { 1650 fs.mu.RLock() 1651 mf := fs.fields[string(name)] 1652 fs.mu.RUnlock() 1653 1654 if mf != nil { 1655 return mf 1656 } 1657 1658 fs.mu.Lock() 1659 mf = fs.fields[string(name)] 1660 if mf == nil { 1661 mf = NewMeasurementFields() 1662 fs.fields[string(name)] = mf 1663 } 1664 fs.mu.Unlock() 1665 return mf 1666} 1667 1668// Delete removes a field set for a measurement. 1669func (fs *MeasurementFieldSet) Delete(name string) { 1670 fs.mu.Lock() 1671 delete(fs.fields, name) 1672 fs.mu.Unlock() 1673} 1674 1675// DeleteWithLock executes fn and removes a field set from a measurement under lock. 1676func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error { 1677 fs.mu.Lock() 1678 defer fs.mu.Unlock() 1679 1680 if err := fn(); err != nil { 1681 return err 1682 } 1683 1684 delete(fs.fields, name) 1685 return nil 1686} 1687 1688func (fs *MeasurementFieldSet) IsEmpty() bool { 1689 fs.mu.RLock() 1690 defer fs.mu.RUnlock() 1691 return len(fs.fields) == 0 1692} 1693 1694func (fs *MeasurementFieldSet) Save() error { 1695 fs.mu.Lock() 1696 defer fs.mu.Unlock() 1697 1698 return fs.saveNoLock() 1699} 1700 1701func (fs *MeasurementFieldSet) saveNoLock() error { 1702 // No fields left, remove the fields index file 1703 if len(fs.fields) == 0 { 1704 return os.RemoveAll(fs.path) 1705 } 1706 1707 // Write the new index to a temp file and rename when it's sync'd 1708 path := fs.path + ".tmp" 1709 fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) 1710 if err != nil { 1711 return err 1712 } 1713 defer os.RemoveAll(path) 1714 1715 if _, err := fd.Write(fieldsIndexMagicNumber); err != nil { 1716 return err 1717 } 1718 1719 pb := internal.MeasurementFieldSet{ 1720 Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)), 1721 } 1722 for name, mf := range fs.fields { 1723 fs := &internal.MeasurementFields{ 1724 Name: name, 1725 Fields: make([]*internal.Field, 0, mf.FieldN()), 1726 } 1727 1728 mf.ForEachField(func(field string, typ influxql.DataType) bool { 1729 fs.Fields = append(fs.Fields, &internal.Field{Name: field, Type: int32(typ)}) 1730 return true 1731 }) 1732 1733 pb.Measurements = append(pb.Measurements, fs) 1734 } 1735 1736 b, err := proto.Marshal(&pb) 1737 if err != nil { 1738 return err 1739 } 1740 1741 if _, err := fd.Write(b); err != nil { 1742 return err 1743 } 1744 1745 if err = fd.Sync(); err != nil { 1746 return err 1747 } 1748 1749 //close file handle before renaming to support Windows 1750 if err = fd.Close(); err != nil { 1751 return err 1752 } 1753 1754 if err := file.RenameFile(path, fs.path); err != nil { 1755 return err 1756 } 1757 1758 return file.SyncDir(filepath.Dir(fs.path)) 1759} 1760 1761func (fs *MeasurementFieldSet) load() error { 1762 fs.mu.Lock() 1763 defer fs.mu.Unlock() 1764 1765 fd, err := os.Open(fs.path) 1766 if os.IsNotExist(err) { 1767 return nil 1768 } else if err != nil { 1769 return err 1770 } 1771 defer fd.Close() 1772 1773 var magic [4]byte 1774 if _, err := fd.Read(magic[:]); err != nil { 1775 return err 1776 } 1777 1778 if !bytes.Equal(magic[:], fieldsIndexMagicNumber) { 1779 return ErrUnknownFieldsFormat 1780 } 1781 1782 var pb internal.MeasurementFieldSet 1783 b, err := ioutil.ReadAll(fd) 1784 if err != nil { 1785 return err 1786 } 1787 1788 if err := proto.Unmarshal(b, &pb); err != nil { 1789 return err 1790 } 1791 1792 fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) 1793 for _, measurement := range pb.GetMeasurements() { 1794 set := &MeasurementFields{ 1795 fields: make(map[string]*Field, len(measurement.GetFields())), 1796 } 1797 for _, field := range measurement.GetFields() { 1798 set.fields[field.GetName()] = &Field{Name: field.GetName(), Type: influxql.DataType(field.GetType())} 1799 } 1800 fs.fields[measurement.GetName()] = set 1801 } 1802 return nil 1803} 1804 1805// Field represents a series field. 1806type Field struct { 1807 ID uint8 `json:"id,omitempty"` 1808 Name string `json:"name,omitempty"` 1809 Type influxql.DataType `json:"type,omitempty"` 1810} 1811 1812// NewFieldKeysIterator returns an iterator that can be iterated over to 1813// retrieve field keys. 1814func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { 1815 itr := &fieldKeysIterator{shard: sh} 1816 1817 index, err := sh.Index() 1818 if err != nil { 1819 return nil, err 1820 } 1821 1822 // Retrieve measurements from shard. Filter if condition specified. 1823 // 1824 // FGA is currently not supported when retrieving field keys. 1825 indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} 1826 names, err := indexSet.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition) 1827 if err != nil { 1828 return nil, err 1829 } 1830 itr.names = names 1831 1832 return itr, nil 1833} 1834 1835// fieldKeysIterator iterates over measurements and gets field keys from each measurement. 1836type fieldKeysIterator struct { 1837 shard *Shard 1838 names [][]byte // remaining measurement names 1839 buf struct { 1840 name []byte // current measurement name 1841 fields []Field // current measurement's fields 1842 } 1843} 1844 1845// Stats returns stats about the points processed. 1846func (itr *fieldKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} } 1847 1848// Close closes the iterator. 1849func (itr *fieldKeysIterator) Close() error { return nil } 1850 1851// Next emits the next tag key name. 1852func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) { 1853 for { 1854 // If there are no more keys then move to the next measurements. 1855 if len(itr.buf.fields) == 0 { 1856 if len(itr.names) == 0 { 1857 return nil, nil 1858 } 1859 1860 itr.buf.name = itr.names[0] 1861 mf := itr.shard.MeasurementFields(itr.buf.name) 1862 if mf != nil { 1863 fset := mf.FieldSet() 1864 if len(fset) == 0 { 1865 itr.names = itr.names[1:] 1866 continue 1867 } 1868 1869 keys := make([]string, 0, len(fset)) 1870 for k := range fset { 1871 keys = append(keys, k) 1872 } 1873 sort.Strings(keys) 1874 1875 itr.buf.fields = make([]Field, len(keys)) 1876 for i, name := range keys { 1877 itr.buf.fields[i] = Field{Name: name, Type: fset[name]} 1878 } 1879 } 1880 itr.names = itr.names[1:] 1881 continue 1882 } 1883 1884 // Return next key. 1885 field := itr.buf.fields[0] 1886 p := &query.FloatPoint{ 1887 Name: string(itr.buf.name), 1888 Aux: []interface{}{field.Name, field.Type.String()}, 1889 } 1890 itr.buf.fields = itr.buf.fields[1:] 1891 1892 return p, nil 1893 } 1894} 1895 1896// NewTagKeysIterator returns a new instance of TagKeysIterator. 1897func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { 1898 fn := func(name []byte) ([][]byte, error) { 1899 index, err := sh.Index() 1900 if err != nil { 1901 return nil, err 1902 } 1903 1904 indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} 1905 var keys [][]byte 1906 if err := indexSet.ForEachMeasurementTagKey(name, func(key []byte) error { 1907 keys = append(keys, key) 1908 return nil 1909 }); err != nil { 1910 return nil, err 1911 } 1912 return keys, nil 1913 } 1914 return newMeasurementKeysIterator(sh, fn, opt) 1915} 1916 1917// measurementKeyFunc is the function called by measurementKeysIterator. 1918type measurementKeyFunc func(name []byte) ([][]byte, error) 1919 1920func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) { 1921 index, err := sh.Index() 1922 if err != nil { 1923 return nil, err 1924 } 1925 1926 indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} 1927 itr := &measurementKeysIterator{fn: fn} 1928 names, err := indexSet.MeasurementNamesByExpr(opt.Authorizer, opt.Condition) 1929 if err != nil { 1930 return nil, err 1931 } 1932 itr.names = names 1933 1934 return itr, nil 1935} 1936 1937// measurementKeysIterator iterates over measurements and gets keys from each measurement. 1938type measurementKeysIterator struct { 1939 names [][]byte // remaining measurement names 1940 buf struct { 1941 name []byte // current measurement name 1942 keys [][]byte // current measurement's keys 1943 } 1944 fn measurementKeyFunc 1945} 1946 1947// Stats returns stats about the points processed. 1948func (itr *measurementKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} } 1949 1950// Close closes the iterator. 1951func (itr *measurementKeysIterator) Close() error { return nil } 1952 1953// Next emits the next tag key name. 1954func (itr *measurementKeysIterator) Next() (*query.FloatPoint, error) { 1955 for { 1956 // If there are no more keys then move to the next measurements. 1957 if len(itr.buf.keys) == 0 { 1958 if len(itr.names) == 0 { 1959 return nil, nil 1960 } 1961 1962 itr.buf.name, itr.names = itr.names[0], itr.names[1:] 1963 1964 keys, err := itr.fn(itr.buf.name) 1965 if err != nil { 1966 return nil, err 1967 } 1968 itr.buf.keys = keys 1969 continue 1970 } 1971 1972 // Return next key. 1973 p := &query.FloatPoint{ 1974 Name: string(itr.buf.name), 1975 Aux: []interface{}{string(itr.buf.keys[0])}, 1976 } 1977 itr.buf.keys = itr.buf.keys[1:] 1978 1979 return p, nil 1980 } 1981} 1982 1983// LimitError represents an error caused by a configurable limit. 1984type LimitError struct { 1985 Reason string 1986} 1987 1988func (e *LimitError) Error() string { return e.Reason } 1989