1package tsi1 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "io" 8 "sort" 9 "unsafe" 10 11 "github.com/influxdata/influxdb/pkg/estimator" 12 "github.com/influxdata/influxdb/pkg/estimator/hll" 13 "github.com/influxdata/influxdb/pkg/rhh" 14 "github.com/influxdata/influxdb/tsdb" 15) 16 17// MeasurementBlockVersion is the version of the measurement block. 18const MeasurementBlockVersion = 1 19 20// Measurement flag constants. 21const ( 22 MeasurementTombstoneFlag = 0x01 23) 24 25// Measurement field size constants. 26const ( 27 // 1 byte offset for the block to ensure non-zero offsets. 28 MeasurementFillSize = 1 29 30 // Measurement trailer fields 31 MeasurementTrailerSize = 0 + 32 2 + // version 33 8 + 8 + // data offset/size 34 8 + 8 + // hash index offset/size 35 8 + 8 + // measurement sketch offset/size 36 8 + 8 // tombstone measurement sketch offset/size 37 38 // Measurement key block fields. 39 MeasurementNSize = 8 40 MeasurementOffsetSize = 8 41 42 SeriesIDSize = 8 43) 44 45// Measurement errors. 46var ( 47 ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported measurement block version") 48 ErrMeasurementBlockSizeMismatch = errors.New("measurement block size mismatch") 49) 50 51// MeasurementBlock represents a collection of all measurements in an index. 52type MeasurementBlock struct { 53 data []byte 54 hashData []byte 55 56 // Measurement sketch and tombstone sketch for cardinality estimation. 57 sketchData, tSketchData []byte 58 59 version int // block version 60} 61 62// bytes estimates the memory footprint of this MeasurementBlock, in bytes. 63func (blk *MeasurementBlock) bytes() int { 64 var b int 65 // Do not count contents of blk.data or blk.hashData because they reference into an external []byte 66 b += int(unsafe.Sizeof(*blk)) 67 return b 68} 69 70// Version returns the encoding version parsed from the data. 71// Only valid after UnmarshalBinary() has been successfully invoked. 72func (blk *MeasurementBlock) Version() int { return blk.version } 73 74// Elem returns an element for a measurement. 75func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) { 76 n := int64(binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize])) 77 hash := rhh.HashKey(name) 78 pos := hash % n 79 80 // Track current distance 81 var d int64 82 for { 83 // Find offset of measurement. 84 offset := binary.BigEndian.Uint64(blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):]) 85 if offset == 0 { 86 return MeasurementBlockElem{}, false 87 } 88 89 // Evaluate name if offset is not empty. 90 if offset > 0 { 91 // Parse into element. 92 var e MeasurementBlockElem 93 e.UnmarshalBinary(blk.data[offset:]) 94 95 // Return if name match. 96 if bytes.Equal(e.name, name) { 97 return e, true 98 } 99 100 // Check if we've exceeded the probe distance. 101 if d > rhh.Dist(rhh.HashKey(e.name), pos, n) { 102 return MeasurementBlockElem{}, false 103 } 104 } 105 106 // Move position forward. 107 pos = (pos + 1) % n 108 d++ 109 110 if d > n { 111 return MeasurementBlockElem{}, false 112 } 113 } 114} 115 116// UnmarshalBinary unpacks data into the block. Block is not copied so data 117// should be retained and unchanged after being passed into this function. 118func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error { 119 // Read trailer. 120 t, err := ReadMeasurementBlockTrailer(data) 121 if err != nil { 122 return err 123 } 124 125 // Save data section. 126 blk.data = data[t.Data.Offset:] 127 blk.data = blk.data[:t.Data.Size] 128 129 // Save hash index block. 130 blk.hashData = data[t.HashIndex.Offset:] 131 blk.hashData = blk.hashData[:t.HashIndex.Size] 132 133 // Initialise sketch data. 134 blk.sketchData = data[t.Sketch.Offset:][:t.Sketch.Size] 135 blk.tSketchData = data[t.TSketch.Offset:][:t.TSketch.Size] 136 137 return nil 138} 139 140// Iterator returns an iterator over all measurements. 141func (blk *MeasurementBlock) Iterator() MeasurementIterator { 142 return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]} 143} 144 145// SeriesIDIterator returns an iterator for all series ids in a measurement. 146func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator { 147 // Find measurement element. 148 e, ok := blk.Elem(name) 149 if !ok { 150 return &rawSeriesIDIterator{} 151 } 152 return &rawSeriesIDIterator{n: e.series.n, data: e.series.data} 153} 154 155// Sketches returns existence and tombstone measurement sketches. 156func (blk *MeasurementBlock) Sketches() (sketch, tSketch estimator.Sketch, err error) { 157 sketch = hll.NewDefaultPlus() 158 if err := sketch.UnmarshalBinary(blk.sketchData); err != nil { 159 return nil, nil, err 160 } 161 162 tSketch = hll.NewDefaultPlus() 163 if err := tSketch.UnmarshalBinary(blk.tSketchData); err != nil { 164 return nil, nil, err 165 } 166 return sketch, tSketch, nil 167} 168 169// blockMeasurementIterator iterates over a list measurements in a block. 170type blockMeasurementIterator struct { 171 elem MeasurementBlockElem 172 data []byte 173} 174 175// Next returns the next measurement. Returns nil when iterator is complete. 176func (itr *blockMeasurementIterator) Next() MeasurementElem { 177 // Return nil when we run out of data. 178 if len(itr.data) == 0 { 179 return nil 180 } 181 182 // Unmarshal the element at the current position. 183 itr.elem.UnmarshalBinary(itr.data) 184 185 // Move the data forward past the record. 186 itr.data = itr.data[itr.elem.size:] 187 188 return &itr.elem 189} 190 191// rawSeriesIterator iterates over a list of raw series data. 192type rawSeriesIDIterator struct { 193 prev uint64 194 n uint64 195 data []byte 196} 197 198func (itr *rawSeriesIDIterator) Close() error { return nil } 199 200// Next returns the next decoded series. 201func (itr *rawSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) { 202 if len(itr.data) == 0 { 203 return tsdb.SeriesIDElem{}, nil 204 } 205 206 delta, n, err := uvarint(itr.data) 207 if err != nil { 208 return tsdb.SeriesIDElem{}, err 209 } 210 itr.data = itr.data[n:] 211 212 seriesID := itr.prev + uint64(delta) 213 itr.prev = seriesID 214 return tsdb.SeriesIDElem{SeriesID: seriesID}, nil 215} 216 217func (itr *rawSeriesIDIterator) SeriesIDSet() *tsdb.SeriesIDSet { 218 ss := tsdb.NewSeriesIDSet() 219 for data, prev := itr.data, uint64(0); len(data) > 0; { 220 delta, n, err := uvarint(data) 221 if err != nil { 222 break 223 } 224 data = data[n:] 225 226 seriesID := prev + uint64(delta) 227 prev = seriesID 228 ss.AddNoLock(seriesID) 229 } 230 return ss 231} 232 233// MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock. 234type MeasurementBlockTrailer struct { 235 Version int // Encoding version 236 237 // Offset & size of data section. 238 Data struct { 239 Offset int64 240 Size int64 241 } 242 243 // Offset & size of hash map section. 244 HashIndex struct { 245 Offset int64 246 Size int64 247 } 248 249 // Offset and size of cardinality sketch for measurements. 250 Sketch struct { 251 Offset int64 252 Size int64 253 } 254 255 // Offset and size of cardinality sketch for tombstoned measurements. 256 TSketch struct { 257 Offset int64 258 Size int64 259 } 260} 261 262// ReadMeasurementBlockTrailer returns the block trailer from data. 263func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) { 264 var t MeasurementBlockTrailer 265 266 // Read version (which is located in the last two bytes of the trailer). 267 t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:])) 268 if t.Version != MeasurementBlockVersion { 269 return t, ErrUnsupportedIndexFileVersion 270 } 271 272 // Slice trailer data. 273 buf := data[len(data)-MeasurementTrailerSize:] 274 275 // Read data section info. 276 t.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 277 t.Data.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 278 279 // Read measurement block info. 280 t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 281 t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 282 283 // Read measurement sketch info. 284 t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 285 t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 286 287 // Read tombstone measurement sketch info. 288 t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] 289 t.TSketch.Size = int64(binary.BigEndian.Uint64(buf[0:8])) 290 291 return t, nil 292} 293 294// WriteTo writes the trailer to w. 295func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { 296 // Write data section info. 297 if err := writeUint64To(w, uint64(t.Data.Offset), &n); err != nil { 298 return n, err 299 } else if err := writeUint64To(w, uint64(t.Data.Size), &n); err != nil { 300 return n, err 301 } 302 303 // Write hash index section info. 304 if err := writeUint64To(w, uint64(t.HashIndex.Offset), &n); err != nil { 305 return n, err 306 } else if err := writeUint64To(w, uint64(t.HashIndex.Size), &n); err != nil { 307 return n, err 308 } 309 310 // Write measurement sketch info. 311 if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil { 312 return n, err 313 } else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil { 314 return n, err 315 } 316 317 // Write tombstone measurement sketch info. 318 if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil { 319 return n, err 320 } else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil { 321 return n, err 322 } 323 324 // Write measurement block version. 325 if err := writeUint16To(w, MeasurementBlockVersion, &n); err != nil { 326 return n, err 327 } 328 329 return n, nil 330} 331 332// MeasurementBlockElem represents an internal measurement element. 333type MeasurementBlockElem struct { 334 flag byte // flag 335 name []byte // measurement name 336 337 tagBlock struct { 338 offset int64 339 size int64 340 } 341 342 series struct { 343 n uint64 // series count 344 data []byte // serialized series data 345 } 346 347 // size in bytes, set after unmarshaling. 348 size int 349} 350 351// Name returns the measurement name. 352func (e *MeasurementBlockElem) Name() []byte { return e.name } 353 354// Deleted returns true if the tombstone flag is set. 355func (e *MeasurementBlockElem) Deleted() bool { 356 return (e.flag & MeasurementTombstoneFlag) != 0 357} 358 359// TagBlockOffset returns the offset of the measurement's tag block. 360func (e *MeasurementBlockElem) TagBlockOffset() int64 { return e.tagBlock.offset } 361 362// TagBlockSize returns the size of the measurement's tag block. 363func (e *MeasurementBlockElem) TagBlockSize() int64 { return e.tagBlock.size } 364 365// SeriesData returns the raw series data. 366func (e *MeasurementBlockElem) SeriesData() []byte { return e.series.data } 367 368// SeriesN returns the number of series associated with the measurement. 369func (e *MeasurementBlockElem) SeriesN() uint64 { return e.series.n } 370 371// SeriesID returns series ID at an index. 372func (e *MeasurementBlockElem) SeriesID(i int) uint64 { 373 return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:]) 374} 375 376func (e *MeasurementBlockElem) HasSeries() bool { return e.series.n > 0 } 377 378// SeriesIDs returns a list of decoded series ids. 379// 380// NOTE: This should be used for testing and diagnostics purposes only. 381// It requires loading the entire list of series in-memory. 382func (e *MeasurementBlockElem) SeriesIDs() []uint64 { 383 a := make([]uint64, 0, e.series.n) 384 e.ForEachSeriesID(func(id uint64) error { 385 a = append(a, id) 386 return nil 387 }) 388 return a 389} 390 391func (e *MeasurementBlockElem) ForEachSeriesID(fn func(uint64) error) error { 392 var prev uint64 393 for data := e.series.data; len(data) > 0; { 394 delta, n, err := uvarint(data) 395 if err != nil { 396 return err 397 } 398 data = data[n:] 399 400 seriesID := prev + uint64(delta) 401 if err = fn(seriesID); err != nil { 402 return err 403 } 404 prev = seriesID 405 } 406 return nil 407} 408 409// Size returns the size of the element. 410func (e *MeasurementBlockElem) Size() int { return e.size } 411 412// UnmarshalBinary unmarshals data into e. 413func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { 414 start := len(data) 415 416 // Parse flag data. 417 e.flag, data = data[0], data[1:] 418 419 // Parse tag block offset. 420 e.tagBlock.offset, data = int64(binary.BigEndian.Uint64(data)), data[8:] 421 e.tagBlock.size, data = int64(binary.BigEndian.Uint64(data)), data[8:] 422 423 // Parse name. 424 sz, n, err := uvarint(data) 425 if err != nil { 426 return err 427 } 428 e.name, data = data[n:n+int(sz)], data[n+int(sz):] 429 430 // Parse series data. 431 v, n, err := uvarint(data) 432 if err != nil { 433 return err 434 } 435 e.series.n, data = uint64(v), data[n:] 436 sz, n, err = uvarint(data) 437 if err != nil { 438 return err 439 } 440 data = data[n:] 441 e.series.data, data = data[:sz], data[sz:] 442 443 // Save length of elem. 444 e.size = start - len(data) 445 446 return nil 447} 448 449// MeasurementBlockWriter writes a measurement block. 450type MeasurementBlockWriter struct { 451 buf bytes.Buffer 452 mms map[string]measurement 453 454 // Measurement sketch and tombstoned measurement sketch. 455 sketch, tSketch estimator.Sketch 456} 457 458// NewMeasurementBlockWriter returns a new MeasurementBlockWriter. 459func NewMeasurementBlockWriter() *MeasurementBlockWriter { 460 return &MeasurementBlockWriter{ 461 mms: make(map[string]measurement), 462 sketch: hll.NewDefaultPlus(), 463 tSketch: hll.NewDefaultPlus(), 464 } 465} 466 467// Add adds a measurement with series and tag set offset/size. 468func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size int64, seriesIDs []uint64) { 469 mm := mw.mms[string(name)] 470 mm.deleted = deleted 471 mm.tagBlock.offset = offset 472 mm.tagBlock.size = size 473 mm.seriesIDs = seriesIDs 474 mw.mms[string(name)] = mm 475 476 if deleted { 477 mw.tSketch.Add(name) 478 } else { 479 mw.sketch.Add(name) 480 } 481} 482 483// WriteTo encodes the measurements to w. 484func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) { 485 var t MeasurementBlockTrailer 486 487 // The sketches must be set before calling WriteTo. 488 if mw.sketch == nil { 489 return 0, errors.New("measurement sketch not set") 490 } else if mw.tSketch == nil { 491 return 0, errors.New("measurement tombstone sketch not set") 492 } 493 494 // Sort names. 495 names := make([]string, 0, len(mw.mms)) 496 for name := range mw.mms { 497 names = append(names, name) 498 } 499 sort.Strings(names) 500 501 // Begin data section. 502 t.Data.Offset = n 503 504 // Write padding byte so no offsets are zero. 505 if err := writeUint8To(w, 0, &n); err != nil { 506 return n, err 507 } 508 509 // Encode key list. 510 for _, name := range names { 511 // Retrieve measurement and save offset. 512 mm := mw.mms[name] 513 mm.offset = n 514 mw.mms[name] = mm 515 516 // Write measurement 517 if err := mw.writeMeasurementTo(w, []byte(name), &mm, &n); err != nil { 518 return n, err 519 } 520 } 521 t.Data.Size = n - t.Data.Offset 522 523 // Build key hash map 524 m := rhh.NewHashMap(rhh.Options{ 525 Capacity: int64(len(names)), 526 LoadFactor: LoadFactor, 527 }) 528 for name := range mw.mms { 529 mm := mw.mms[name] 530 m.Put([]byte(name), &mm) 531 } 532 533 t.HashIndex.Offset = n 534 535 // Encode hash map length. 536 if err := writeUint64To(w, uint64(m.Cap()), &n); err != nil { 537 return n, err 538 } 539 540 // Encode hash map offset entries. 541 for i := int64(0); i < m.Cap(); i++ { 542 _, v := m.Elem(i) 543 544 var offset int64 545 if mm, ok := v.(*measurement); ok { 546 offset = mm.offset 547 } 548 549 if err := writeUint64To(w, uint64(offset), &n); err != nil { 550 return n, err 551 } 552 } 553 t.HashIndex.Size = n - t.HashIndex.Offset 554 555 // Write the sketches out. 556 t.Sketch.Offset = n 557 if err := writeSketchTo(w, mw.sketch, &n); err != nil { 558 return n, err 559 } 560 t.Sketch.Size = n - t.Sketch.Offset 561 562 t.TSketch.Offset = n 563 if err := writeSketchTo(w, mw.tSketch, &n); err != nil { 564 return n, err 565 } 566 t.TSketch.Size = n - t.TSketch.Offset 567 568 // Write trailer. 569 nn, err := t.WriteTo(w) 570 n += nn 571 return n, err 572} 573 574// writeMeasurementTo encodes a single measurement entry into w. 575func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, mm *measurement, n *int64) error { 576 // Write flag & tag block offset. 577 if err := writeUint8To(w, mm.flag(), n); err != nil { 578 return err 579 } 580 if err := writeUint64To(w, uint64(mm.tagBlock.offset), n); err != nil { 581 return err 582 } else if err := writeUint64To(w, uint64(mm.tagBlock.size), n); err != nil { 583 return err 584 } 585 586 // Write measurement name. 587 if err := writeUvarintTo(w, uint64(len(name)), n); err != nil { 588 return err 589 } 590 if err := writeTo(w, name, n); err != nil { 591 return err 592 } 593 594 // Write series data to buffer. 595 mw.buf.Reset() 596 var prev uint64 597 for _, seriesID := range mm.seriesIDs { 598 delta := seriesID - prev 599 600 var buf [binary.MaxVarintLen32]byte 601 i := binary.PutUvarint(buf[:], uint64(delta)) 602 if _, err := mw.buf.Write(buf[:i]); err != nil { 603 return err 604 } 605 606 prev = seriesID 607 } 608 609 // Write series count. 610 if err := writeUvarintTo(w, uint64(len(mm.seriesIDs)), n); err != nil { 611 return err 612 } 613 614 // Write data size & buffer. 615 if err := writeUvarintTo(w, uint64(mw.buf.Len()), n); err != nil { 616 return err 617 } 618 nn, err := mw.buf.WriteTo(w) 619 *n += nn 620 return err 621} 622 623// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes 624// written via n. 625func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error { 626 data, err := s.MarshalBinary() 627 if err != nil { 628 return err 629 } 630 631 nn, err := w.Write(data) 632 *n += int64(nn) 633 return err 634} 635 636type measurement struct { 637 deleted bool 638 tagBlock struct { 639 offset int64 640 size int64 641 } 642 seriesIDs []uint64 643 offset int64 644} 645 646func (mm measurement) flag() byte { 647 var flag byte 648 if mm.deleted { 649 flag |= MeasurementTombstoneFlag 650 } 651 return flag 652} 653