1package tsi1 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "io" 8 "sort" 9 "unsafe" 10 11 "github.com/influxdata/influxdb/pkg/estimator" 12 "github.com/influxdata/influxdb/pkg/estimator/hll" 13 "github.com/influxdata/influxdb/pkg/rhh" 14 "github.com/influxdata/influxdb/tsdb" 15) 16 17// MeasurementBlockVersion is the version of the measurement block. 18const MeasurementBlockVersion = 1 19 20// Measurement flag constants. 21const ( 22 MeasurementTombstoneFlag = 0x01 23 MeasurementSeriesIDSetFlag = 0x02 24) 25 26// Measurement field size constants. 27const ( 28 // 1 byte offset for the block to ensure non-zero offsets. 29 MeasurementFillSize = 1 30 31 // Measurement trailer fields 32 MeasurementTrailerSize = 0 + 33 2 + // version 34 8 + 8 + // data offset/size 35 8 + 8 + // hash index offset/size 36 8 + 8 + // measurement sketch offset/size 37 8 + 8 // tombstone measurement sketch offset/size 38 39 // Measurement key block fields. 40 MeasurementNSize = 8 41 MeasurementOffsetSize = 8 42 43 SeriesIDSize = 8 44) 45 46// Measurement errors. 47var ( 48 ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported measurement block version") 49 ErrMeasurementBlockSizeMismatch = errors.New("measurement block size mismatch") 50) 51 52// MeasurementBlock represents a collection of all measurements in an index. 53type MeasurementBlock struct { 54 data []byte 55 hashData []byte 56 57 // Measurement sketch and tombstone sketch for cardinality estimation. 58 sketchData, tSketchData []byte 59 60 version int // block version 61} 62 63// bytes estimates the memory footprint of this MeasurementBlock, in bytes. 64func (blk *MeasurementBlock) bytes() int { 65 var b int 66 // Do not count contents of blk.data or blk.hashData because they reference into an external []byte 67 b += int(unsafe.Sizeof(*blk)) 68 return b 69} 70 71// Version returns the encoding version parsed from the data. 72// Only valid after UnmarshalBinary() has been successfully invoked. 73func (blk *MeasurementBlock) Version() int { return blk.version } 74 75// Elem returns an element for a measurement. 76func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) { 77 n := int64(binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize])) 78 hash := rhh.HashKey(name) 79 pos := hash % n 80 81 // Track current distance 82 var d int64 83 for { 84 // Find offset of measurement. 85 offset := binary.BigEndian.Uint64(blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):]) 86 if offset == 0 { 87 return MeasurementBlockElem{}, false 88 } 89 90 // Evaluate name if offset is not empty. 91 if offset > 0 { 92 // Parse into element. 93 var e MeasurementBlockElem 94 e.UnmarshalBinary(blk.data[offset:]) 95 96 // Return if name match. 97 if bytes.Equal(e.name, name) { 98 return e, true 99 } 100 101 // Check if we've exceeded the probe distance. 102 if d > rhh.Dist(rhh.HashKey(e.name), pos, n) { 103 return MeasurementBlockElem{}, false 104 } 105 } 106 107 // Move position forward. 108 pos = (pos + 1) % n 109 d++ 110 111 if d > n { 112 return MeasurementBlockElem{}, false 113 } 114 } 115} 116 117// UnmarshalBinary unpacks data into the block. Block is not copied so data 118// should be retained and unchanged after being passed into this function. 119func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error { 120 // Read trailer. 121 t, err := ReadMeasurementBlockTrailer(data) 122 if err != nil { 123 return err 124 } 125 126 // Save data section. 127 blk.data = data[t.Data.Offset:] 128 blk.data = blk.data[:t.Data.Size] 129 130 // Save hash index block. 131 blk.hashData = data[t.HashIndex.Offset:] 132 blk.hashData = blk.hashData[:t.HashIndex.Size] 133 134 // Initialise sketch data. 135 blk.sketchData = data[t.Sketch.Offset:][:t.Sketch.Size] 136 blk.tSketchData = data[t.TSketch.Offset:][:t.TSketch.Size] 137 138 return nil 139} 140 141// Iterator returns an iterator over all measurements. 142func (blk *MeasurementBlock) Iterator() MeasurementIterator { 143 return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]} 144} 145 146// SeriesIDIterator returns an iterator for all series ids in a measurement. 147func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator { 148 // Find measurement element. 149 e, ok := blk.Elem(name) 150 if !ok { 151 return &rawSeriesIDIterator{} 152 } 153 if e.seriesIDSet != nil { 154 return tsdb.NewSeriesIDSetIterator(e.seriesIDSet) 155 } 156 return &rawSeriesIDIterator{n: e.series.n, data: e.series.data} 157} 158 159// Sketches returns existence and tombstone measurement sketches. 160func (blk *MeasurementBlock) Sketches() (sketch, tSketch estimator.Sketch, err error) { 161 sketch = hll.NewDefaultPlus() 162 if err := sketch.UnmarshalBinary(blk.sketchData); err != nil { 163 return nil, nil, err 164 } 165 166 tSketch = hll.NewDefaultPlus() 167 if err := tSketch.UnmarshalBinary(blk.tSketchData); err != nil { 168 return nil, nil, err 169 } 170 return sketch, tSketch, nil 171} 172 173// blockMeasurementIterator iterates over a list measurements in a block. 174type blockMeasurementIterator struct { 175 elem MeasurementBlockElem 176 data []byte 177} 178 179// Next returns the next measurement. Returns nil when iterator is complete. 180func (itr *blockMeasurementIterator) Next() MeasurementElem { 181 // Return nil when we run out of data. 182 if len(itr.data) == 0 { 183 return nil 184 } 185 186 // Unmarshal the element at the current position. 187 itr.elem.UnmarshalBinary(itr.data) 188 189 // Move the data forward past the record. 190 itr.data = itr.data[itr.elem.size:] 191 192 return &itr.elem 193} 194 195// rawSeriesIterator iterates over a list of raw series data. 196type rawSeriesIDIterator struct { 197 prev uint64 198 n uint64 199 data []byte 200} 201 202func (itr *rawSeriesIDIterator) Close() error { return nil } 203 204// Next returns the next decoded series. 205func (itr *rawSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) { 206 if len(itr.data) == 0 { 207 return tsdb.SeriesIDElem{}, nil 208 } 209 210 delta, n, err := uvarint(itr.data) 211 if err != nil { 212 return tsdb.SeriesIDElem{}, err 213 } 214 itr.data = itr.data[n:] 215 216 seriesID := itr.prev + uint64(delta) 217 itr.prev = seriesID 218 return tsdb.SeriesIDElem{SeriesID: seriesID}, nil 219} 220 221func (itr *rawSeriesIDIterator) SeriesIDSet() *tsdb.SeriesIDSet { 222 ss := tsdb.NewSeriesIDSet() 223 for data, prev := itr.data, uint64(0); len(data) > 0; { 224 delta, n, err := uvarint(data) 225 if err != nil { 226 break 227 } 228 data = data[n:] 229 230 seriesID := prev + uint64(delta) 231 prev = seriesID 232 ss.AddNoLock(seriesID) 233 } 234 return ss 235} 236 237// MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock. 238type MeasurementBlockTrailer struct { 239 Version int // Encoding version 240 241 // Offset & size of data section. 242 Data struct { 243 Offset int64 244 Size int64 245 } 246 247 // Offset & size of hash map section. 248 HashIndex struct { 249 Offset int64 250 Size int64 251 } 252 253 // Offset and size of cardinality sketch for measurements. 254 Sketch struct { 255 Offset int64 256 Size int64 257 } 258 259 // Offset and size of cardinality sketch for tombstoned measurements. 260 TSketch struct { 261 Offset int64 262 Size int64 263 } 264} 265 266// ReadMeasurementBlockTrailer returns the block trailer from data. 267func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) { 268 var t MeasurementBlockTrailer 269 270 // Read version (which is located in the last two bytes of the trailer). 271 t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:])) 272 if t.Version != MeasurementBlockVersion { 273 return t, ErrUnsupportedIndexFileVersion 274 } 275 276 // Slice trailer data. 277 buf := data[len(data)-MeasurementTrailerSize:] 278 279 // Read data section info. 280 t.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 281 t.Data.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 282 283 // Read measurement block info. 284 t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 285 t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 286 287 // Read measurement sketch info. 288 t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 289 t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 290 291 // Read tombstone measurement sketch info. 292 t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 293 t.TSketch.Size = int64(binary.BigEndian.Uint64(buf[0:8])) 294 295 return t, nil 296} 297 298// WriteTo writes the trailer to w. 299func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { 300 // Write data section info. 301 if err := writeUint64To(w, uint64(t.Data.Offset), &n); err != nil { 302 return n, err 303 } else if err := writeUint64To(w, uint64(t.Data.Size), &n); err != nil { 304 return n, err 305 } 306 307 // Write hash index section info. 308 if err := writeUint64To(w, uint64(t.HashIndex.Offset), &n); err != nil { 309 return n, err 310 } else if err := writeUint64To(w, uint64(t.HashIndex.Size), &n); err != nil { 311 return n, err 312 } 313 314 // Write measurement sketch info. 315 if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil { 316 return n, err 317 } else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil { 318 return n, err 319 } 320 321 // Write tombstone measurement sketch info. 322 if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil { 323 return n, err 324 } else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil { 325 return n, err 326 } 327 328 // Write measurement block version. 329 if err := writeUint16To(w, MeasurementBlockVersion, &n); err != nil { 330 return n, err 331 } 332 333 return n, nil 334} 335 336// MeasurementBlockElem represents an internal measurement element. 337type MeasurementBlockElem struct { 338 flag byte // flag 339 name []byte // measurement name 340 341 tagBlock struct { 342 offset int64 343 size int64 344 } 345 346 series struct { 347 n uint64 // series count 348 data []byte // serialized series data 349 } 350 351 seriesIDSet *tsdb.SeriesIDSet 352 353 // size in bytes, set after unmarshaling. 354 size int 355} 356 357// Name returns the measurement name. 358func (e *MeasurementBlockElem) Name() []byte { return e.name } 359 360// Deleted returns true if the tombstone flag is set. 361func (e *MeasurementBlockElem) Deleted() bool { 362 return (e.flag & MeasurementTombstoneFlag) != 0 363} 364 365// TagBlockOffset returns the offset of the measurement's tag block. 366func (e *MeasurementBlockElem) TagBlockOffset() int64 { return e.tagBlock.offset } 367 368// TagBlockSize returns the size of the measurement's tag block. 369func (e *MeasurementBlockElem) TagBlockSize() int64 { return e.tagBlock.size } 370 371// SeriesData returns the raw series data. 372func (e *MeasurementBlockElem) SeriesData() []byte { return e.series.data } 373 374// SeriesN returns the number of series associated with the measurement. 375func (e *MeasurementBlockElem) SeriesN() uint64 { return e.series.n } 376 377// SeriesID returns series ID at an index. 378func (e *MeasurementBlockElem) SeriesID(i int) uint64 { 379 return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:]) 380} 381 382func (e *MeasurementBlockElem) HasSeries() bool { return e.series.n > 0 } 383 384// SeriesIDs returns a list of decoded series ids. 385// 386// NOTE: This should be used for testing and diagnostics purposes only. 387// It requires loading the entire list of series in-memory. 388func (e *MeasurementBlockElem) SeriesIDs() []uint64 { 389 a := make([]uint64, 0, e.series.n) 390 e.ForEachSeriesID(func(id uint64) error { 391 a = append(a, id) 392 return nil 393 }) 394 return a 395} 396 397func (e *MeasurementBlockElem) ForEachSeriesID(fn func(uint64) error) error { 398 // Read from roaring, if available. 399 if e.seriesIDSet != nil { 400 itr := e.seriesIDSet.Iterator() 401 for itr.HasNext() { 402 if err := fn(uint64(itr.Next())); err != nil { 403 return err 404 } 405 } 406 } 407 408 // Read from uvarint encoded data, if available. 409 var prev uint64 410 for data := e.series.data; len(data) > 0; { 411 delta, n, err := uvarint(data) 412 if err != nil { 413 return err 414 } 415 data = data[n:] 416 417 seriesID := prev + uint64(delta) 418 if err = fn(seriesID); err != nil { 419 return err 420 } 421 prev = seriesID 422 } 423 return nil 424} 425 426// Size returns the size of the element. 427func (e *MeasurementBlockElem) Size() int { return e.size } 428 429// UnmarshalBinary unmarshals data into e. 430func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { 431 start := len(data) 432 433 // Parse flag data. 434 e.flag, data = data[0], data[1:] 435 436 // Parse tag block offset. 437 e.tagBlock.offset, data = int64(binary.BigEndian.Uint64(data)), data[8:] 438 e.tagBlock.size, data = int64(binary.BigEndian.Uint64(data)), data[8:] 439 440 // Parse name. 441 sz, n, err := uvarint(data) 442 if err != nil { 443 return err 444 } 445 e.name, data = data[n:n+int(sz)], data[n+int(sz):] 446 447 // Parse series count. 448 v, n, err := uvarint(data) 449 if err != nil { 450 return err 451 } 452 e.series.n, data = uint64(v), data[n:] 453 454 // Parse series data size. 455 sz, n, err = uvarint(data) 456 if err != nil { 457 return err 458 } 459 data = data[n:] 460 461 // Parse series data (original uvarint encoded or roaring bitmap). 462 if e.flag&MeasurementSeriesIDSetFlag == 0 { 463 e.series.data, data = data[:sz], data[sz:] 464 } else { 465 // data = memalign(data) 466 e.seriesIDSet = tsdb.NewSeriesIDSet() 467 if err = e.seriesIDSet.UnmarshalBinaryUnsafe(data[:sz]); err != nil { 468 return err 469 } 470 data = data[sz:] 471 } 472 473 // Save length of elem. 474 e.size = start - len(data) 475 476 return nil 477} 478 479// MeasurementBlockWriter writes a measurement block. 480type MeasurementBlockWriter struct { 481 buf bytes.Buffer 482 mms map[string]measurement 483 484 // Measurement sketch and tombstoned measurement sketch. 485 sketch, tSketch estimator.Sketch 486} 487 488// NewMeasurementBlockWriter returns a new MeasurementBlockWriter. 489func NewMeasurementBlockWriter() *MeasurementBlockWriter { 490 return &MeasurementBlockWriter{ 491 mms: make(map[string]measurement), 492 sketch: hll.NewDefaultPlus(), 493 tSketch: hll.NewDefaultPlus(), 494 } 495} 496 497// Add adds a measurement with series and tag set offset/size. 498func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size int64, seriesIDs []uint64) { 499 mm := mw.mms[string(name)] 500 mm.deleted = deleted 501 mm.tagBlock.offset = offset 502 mm.tagBlock.size = size 503 504 if mm.seriesIDSet == nil { 505 mm.seriesIDSet = tsdb.NewSeriesIDSet() 506 } 507 for _, seriesID := range seriesIDs { 508 mm.seriesIDSet.AddNoLock(seriesID) 509 } 510 511 mw.mms[string(name)] = mm 512 513 if deleted { 514 mw.tSketch.Add(name) 515 } else { 516 mw.sketch.Add(name) 517 } 518} 519 520// WriteTo encodes the measurements to w. 521func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) { 522 var t MeasurementBlockTrailer 523 524 // The sketches must be set before calling WriteTo. 525 if mw.sketch == nil { 526 return 0, errors.New("measurement sketch not set") 527 } else if mw.tSketch == nil { 528 return 0, errors.New("measurement tombstone sketch not set") 529 } 530 531 // Sort names. 532 names := make([]string, 0, len(mw.mms)) 533 for name := range mw.mms { 534 names = append(names, name) 535 } 536 sort.Strings(names) 537 538 // Begin data section. 539 t.Data.Offset = n 540 541 // Write padding byte so no offsets are zero. 542 if err := writeUint8To(w, 0, &n); err != nil { 543 return n, err 544 } 545 546 // Encode key list. 547 for _, name := range names { 548 // Retrieve measurement and save offset. 549 mm := mw.mms[name] 550 mm.offset = n 551 mw.mms[name] = mm 552 553 // Write measurement 554 if err := mw.writeMeasurementTo(w, []byte(name), &mm, &n); err != nil { 555 return n, err 556 } 557 } 558 t.Data.Size = n - t.Data.Offset 559 560 // Build key hash map 561 m := rhh.NewHashMap(rhh.Options{ 562 Capacity: int64(len(names)), 563 LoadFactor: LoadFactor, 564 }) 565 for name := range mw.mms { 566 mm := mw.mms[name] 567 m.Put([]byte(name), &mm) 568 } 569 570 t.HashIndex.Offset = n 571 572 // Encode hash map length. 573 if err := writeUint64To(w, uint64(m.Cap()), &n); err != nil { 574 return n, err 575 } 576 577 // Encode hash map offset entries. 578 for i := int64(0); i < m.Cap(); i++ { 579 _, v := m.Elem(i) 580 581 var offset int64 582 if mm, ok := v.(*measurement); ok { 583 offset = mm.offset 584 } 585 586 if err := writeUint64To(w, uint64(offset), &n); err != nil { 587 return n, err 588 } 589 } 590 t.HashIndex.Size = n - t.HashIndex.Offset 591 592 // Write the sketches out. 593 t.Sketch.Offset = n 594 if err := writeSketchTo(w, mw.sketch, &n); err != nil { 595 return n, err 596 } 597 t.Sketch.Size = n - t.Sketch.Offset 598 599 t.TSketch.Offset = n 600 if err := writeSketchTo(w, mw.tSketch, &n); err != nil { 601 return n, err 602 } 603 t.TSketch.Size = n - t.TSketch.Offset 604 605 // Write trailer. 606 nn, err := t.WriteTo(w) 607 n += nn 608 return n, err 609} 610 611// writeMeasurementTo encodes a single measurement entry into w. 612func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, mm *measurement, n *int64) error { 613 // Write flag & tag block offset. 614 if err := writeUint8To(w, mm.flag(), n); err != nil { 615 return err 616 } 617 if err := writeUint64To(w, uint64(mm.tagBlock.offset), n); err != nil { 618 return err 619 } else if err := writeUint64To(w, uint64(mm.tagBlock.size), n); err != nil { 620 return err 621 } 622 623 // Write measurement name. 624 if err := writeUvarintTo(w, uint64(len(name)), n); err != nil { 625 return err 626 } 627 if err := writeTo(w, name, n); err != nil { 628 return err 629 } 630 631 // Write series data to buffer. 632 mw.buf.Reset() 633 if _, err := mm.seriesIDSet.WriteTo(&mw.buf); err != nil { 634 return err 635 } 636 637 // Write series count. 638 if err := writeUvarintTo(w, mm.seriesIDSet.Cardinality(), n); err != nil { 639 return err 640 } 641 642 // Write data size & buffer. 643 if err := writeUvarintTo(w, uint64(mw.buf.Len()), n); err != nil { 644 return err 645 } 646 647 // Word align bitmap data. 648 // if offset := (*n) % 8; offset != 0 { 649 // if err := writeTo(w, make([]byte, 8-offset), n); err != nil { 650 // return err 651 // } 652 // } 653 654 nn, err := mw.buf.WriteTo(w) 655 *n += nn 656 return err 657} 658 659// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes 660// written via n. 661func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error { 662 data, err := s.MarshalBinary() 663 if err != nil { 664 return err 665 } 666 667 nn, err := w.Write(data) 668 *n += int64(nn) 669 return err 670} 671 672type measurement struct { 673 deleted bool 674 tagBlock struct { 675 offset int64 676 size int64 677 } 678 seriesIDSet *tsdb.SeriesIDSet 679 offset int64 680} 681 682func (mm measurement) flag() byte { 683 flag := byte(MeasurementSeriesIDSetFlag) 684 if mm.deleted { 685 flag |= MeasurementTombstoneFlag 686 } 687 return flag 688} 689