1// This file was taken from Prometheus (https://github.com/prometheus/prometheus). 2// The original license header is included below: 3// 4// Copyright 2014 The Prometheus Authors 5// Licensed under the Apache License, Version 2.0 (the "License"); 6// you may not use this file except in compliance with the License. 7// You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package encoding 18 19import ( 20 "encoding/binary" 21 "fmt" 22 "io" 23 "math" 24 25 "github.com/prometheus/common/model" 26) 27 28// The 37-byte header of a delta-encoded chunk looks like: 29// 30// - used buf bytes: 2 bytes 31// - time double-delta bytes: 1 bytes 32// - value double-delta bytes: 1 bytes 33// - is integer: 1 byte 34// - base time: 8 bytes 35// - base value: 8 bytes 36// - base time delta: 8 bytes 37// - base value delta: 8 bytes 38const ( 39 doubleDeltaHeaderBytes = 37 40 doubleDeltaHeaderMinBytes = 21 // header isn't full for chunk w/ one sample 41 42 doubleDeltaHeaderBufLenOffset = 0 43 doubleDeltaHeaderTimeBytesOffset = 2 44 doubleDeltaHeaderValueBytesOffset = 3 45 doubleDeltaHeaderIsIntOffset = 4 46 doubleDeltaHeaderBaseTimeOffset = 5 47 doubleDeltaHeaderBaseValueOffset = 13 48 doubleDeltaHeaderBaseTimeDeltaOffset = 21 49 doubleDeltaHeaderBaseValueDeltaOffset = 29 50) 51 52// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with 53// a double-delta encoding of various types (int, float) and bit widths. A base 54// value and timestamp and a base delta for each is saved in the header. The 55// payload consists of double-deltas, i.e. deviations from the values and 56// timestamps calculated by applying the base value and time and the base deltas. 57// However, once 8 bytes would be needed to encode a double-delta value, a 58// fall-back to the absolute numbers happens (so that timestamps are saved 59// directly as int64 and values as float64). 60// doubleDeltaEncodedChunk implements the chunk interface. 61type doubleDeltaEncodedChunk []byte 62 63// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk. 64func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk { 65 if tb < 1 { 66 panic("need at least 1 time delta byte") 67 } 68 if length < doubleDeltaHeaderBytes+16 { 69 panic(fmt.Errorf( 70 "chunk length %d bytes is insufficient, need at least %d", 71 length, doubleDeltaHeaderBytes+16, 72 )) 73 } 74 c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length) 75 76 c[doubleDeltaHeaderTimeBytesOffset] = byte(tb) 77 c[doubleDeltaHeaderValueBytesOffset] = byte(vb) 78 if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes. 79 c[doubleDeltaHeaderIsIntOffset] = 1 80 } else { 81 c[doubleDeltaHeaderIsIntOffset] = 0 82 } 83 return &c 84} 85 86// Add implements chunk. 87func (c *doubleDeltaEncodedChunk) Add(s model.SamplePair) (Chunk, error) { 88 // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. 89 if c.Len() == 0 { 90 c.addFirstSample(s) 91 return nil, nil 92 } 93 94 tb := c.timeBytes() 95 vb := c.valueBytes() 96 97 if c.Len() == 1 { 98 err := c.addSecondSample(s, tb, vb) 99 return nil, err 100 } 101 102 remainingBytes := cap(*c) - len(*c) 103 sampleSize := c.sampleSize() 104 105 // Do we generally have space for another sample in this chunk? If not, 106 // overflow into a new one. 107 if remainingBytes < sampleSize { 108 return addToOverflowChunk(s) 109 } 110 111 projectedTime := c.baseTime() + model.Time(c.Len())*c.baseTimeDelta() 112 ddt := s.Timestamp - projectedTime 113 114 projectedValue := c.baseValue() + model.SampleValue(c.Len())*c.baseValueDelta() 115 ddv := s.Value - projectedValue 116 117 ntb, nvb, nInt := tb, vb, c.isInt() 118 // If the new sample is incompatible with the current encoding, reencode the 119 // existing chunk data into new chunk(s). 120 if c.isInt() && !isInt64(ddv) { 121 // int->float. 122 nvb = d4 123 nInt = false 124 } else if !c.isInt() && vb == d4 && projectedValue+model.SampleValue(float32(ddv)) != s.Value { 125 // float32->float64. 126 nvb = d8 127 } else { 128 if tb < d8 { 129 // Maybe more bytes for timestamp. 130 ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt)) 131 } 132 if c.isInt() && vb < d8 { 133 // Maybe more bytes for sample value. 134 nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv)) 135 } 136 } 137 if tb != ntb || vb != nvb || c.isInt() != nInt { 138 if len(*c)*2 < cap(*c) { 139 result, err := transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(*c)), c, s) 140 if err != nil { 141 return nil, err 142 } 143 // We cannot handle >2 chunks returned as we can only return 1 chunk. 144 // Ideally there wont be >2 chunks, but if it happens to be >2, 145 // we fall through to perfom `addToOverflowChunk` instead. 146 if len(result) == 1 { 147 // Replace the current chunk with the new bigger chunk. 148 c0 := result[0].(*doubleDeltaEncodedChunk) 149 *c = *c0 150 return nil, nil 151 } else if len(result) == 2 { 152 // Replace the current chunk with the new bigger chunk 153 // and return the additional chunk. 154 c0 := result[0].(*doubleDeltaEncodedChunk) 155 c1 := result[1].(*doubleDeltaEncodedChunk) 156 *c = *c0 157 return c1, nil 158 } 159 } 160 161 // Chunk is already half full. Better create a new one and save the transcoding efforts. 162 // We also perform this if `transcodeAndAdd` resulted in >2 chunks. 163 return addToOverflowChunk(s) 164 } 165 166 offset := len(*c) 167 (*c) = (*c)[:offset+sampleSize] 168 169 switch tb { 170 case d1: 171 (*c)[offset] = byte(ddt) 172 case d2: 173 binary.LittleEndian.PutUint16((*c)[offset:], uint16(ddt)) 174 case d4: 175 binary.LittleEndian.PutUint32((*c)[offset:], uint32(ddt)) 176 case d8: 177 // Store the absolute value (no delta) in case of d8. 178 binary.LittleEndian.PutUint64((*c)[offset:], uint64(s.Timestamp)) 179 default: 180 return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) 181 } 182 183 offset += int(tb) 184 185 if c.isInt() { 186 switch vb { 187 case d0: 188 // No-op. Constant delta is stored as base value. 189 case d1: 190 (*c)[offset] = byte(int8(ddv)) 191 case d2: 192 binary.LittleEndian.PutUint16((*c)[offset:], uint16(int16(ddv))) 193 case d4: 194 binary.LittleEndian.PutUint32((*c)[offset:], uint32(int32(ddv))) 195 // d8 must not happen. Those samples are encoded as float64. 196 default: 197 return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) 198 } 199 } else { 200 switch vb { 201 case d4: 202 binary.LittleEndian.PutUint32((*c)[offset:], math.Float32bits(float32(ddv))) 203 case d8: 204 // Store the absolute value (no delta) in case of d8. 205 binary.LittleEndian.PutUint64((*c)[offset:], math.Float64bits(float64(s.Value))) 206 default: 207 return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) 208 } 209 } 210 return nil, nil 211} 212 213// FirstTime implements chunk. 214func (c doubleDeltaEncodedChunk) FirstTime() model.Time { 215 return c.baseTime() 216} 217 218// NewIterator implements chunk. 219func (c *doubleDeltaEncodedChunk) NewIterator(_ Iterator) Iterator { 220 return newIndexAccessingChunkIterator(c.Len(), &doubleDeltaEncodedIndexAccessor{ 221 c: *c, 222 baseT: c.baseTime(), 223 baseΔT: c.baseTimeDelta(), 224 baseV: c.baseValue(), 225 baseΔV: c.baseValueDelta(), 226 tBytes: c.timeBytes(), 227 vBytes: c.valueBytes(), 228 isInt: c.isInt(), 229 }) 230} 231 232func (c *doubleDeltaEncodedChunk) Slice(_, _ model.Time) Chunk { 233 return c 234} 235 236func (c *doubleDeltaEncodedChunk) Rebound(start, end model.Time) (Chunk, error) { 237 return reboundChunk(c, start, end) 238} 239 240// Marshal implements chunk. 241func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error { 242 if len(c) > math.MaxUint16 { 243 panic("chunk buffer length would overflow a 16 bit uint") 244 } 245 binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) 246 247 n, err := w.Write(c[:cap(c)]) 248 if err != nil { 249 return err 250 } 251 if n != cap(c) { 252 return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) 253 } 254 return nil 255} 256 257// MarshalToBuf implements chunk. 258func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error { 259 if len(c) > math.MaxUint16 { 260 panic("chunk buffer length would overflow a 16 bit uint") 261 } 262 binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) 263 264 n := copy(buf, c) 265 if n != len(c) { 266 return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) 267 } 268 return nil 269} 270 271// UnmarshalFromBuf implements chunk. 272func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { 273 (*c) = (*c)[:cap((*c))] 274 copy((*c), buf) 275 return c.setLen() 276} 277 278// setLen sets the length of the underlying slice and performs some sanity checks. 279func (c *doubleDeltaEncodedChunk) setLen() error { 280 l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) 281 if int(l) > cap((*c)) { 282 return fmt.Errorf("doubledelta chunk length exceeded during unmarshalling: %d", l) 283 } 284 if int(l) < doubleDeltaHeaderMinBytes { 285 return fmt.Errorf("doubledelta chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes) 286 } 287 switch c.timeBytes() { 288 case d1, d2, d4, d8: 289 // Pass. 290 default: 291 return fmt.Errorf("invalid number of time bytes in doubledelta chunk: %d", c.timeBytes()) 292 } 293 switch c.valueBytes() { 294 case d0, d1, d2, d4, d8: 295 // Pass. 296 default: 297 return fmt.Errorf("invalid number of value bytes in doubledelta chunk: %d", c.valueBytes()) 298 } 299 (*c) = (*c)[:l] 300 return nil 301} 302 303// Encoding implements chunk. 304func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta } 305 306// Utilization implements chunk. 307func (c doubleDeltaEncodedChunk) Utilization() float64 { 308 return float64(len(c)-doubleDeltaHeaderIsIntOffset-1) / float64(cap(c)) 309} 310 311func (c doubleDeltaEncodedChunk) baseTime() model.Time { 312 return model.Time( 313 binary.LittleEndian.Uint64( 314 c[doubleDeltaHeaderBaseTimeOffset:], 315 ), 316 ) 317} 318 319func (c doubleDeltaEncodedChunk) baseValue() model.SampleValue { 320 return model.SampleValue( 321 math.Float64frombits( 322 binary.LittleEndian.Uint64( 323 c[doubleDeltaHeaderBaseValueOffset:], 324 ), 325 ), 326 ) 327} 328 329func (c doubleDeltaEncodedChunk) baseTimeDelta() model.Time { 330 if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 { 331 return 0 332 } 333 return model.Time( 334 binary.LittleEndian.Uint64( 335 c[doubleDeltaHeaderBaseTimeDeltaOffset:], 336 ), 337 ) 338} 339 340func (c doubleDeltaEncodedChunk) baseValueDelta() model.SampleValue { 341 if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 { 342 return 0 343 } 344 return model.SampleValue( 345 math.Float64frombits( 346 binary.LittleEndian.Uint64( 347 c[doubleDeltaHeaderBaseValueDeltaOffset:], 348 ), 349 ), 350 ) 351} 352 353func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes { 354 return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset]) 355} 356 357func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes { 358 return deltaBytes(c[doubleDeltaHeaderValueBytesOffset]) 359} 360 361func (c doubleDeltaEncodedChunk) sampleSize() int { 362 return int(c.timeBytes() + c.valueBytes()) 363} 364 365// Len implements Chunk. Runs in constant time. 366func (c doubleDeltaEncodedChunk) Len() int { 367 if len(c) <= doubleDeltaHeaderIsIntOffset+1 { 368 return 0 369 } 370 if len(c) <= doubleDeltaHeaderBaseValueOffset+8 { 371 return 1 372 } 373 return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2 374} 375 376func (c doubleDeltaEncodedChunk) Size() int { 377 return len(c) 378} 379 380func (c doubleDeltaEncodedChunk) isInt() bool { 381 return c[doubleDeltaHeaderIsIntOffset] == 1 382} 383 384// addFirstSample is a helper method only used by c.add(). It adds timestamp and 385// value as base time and value. 386func (c *doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) { 387 (*c) = (*c)[:doubleDeltaHeaderBaseValueOffset+8] 388 binary.LittleEndian.PutUint64( 389 (*c)[doubleDeltaHeaderBaseTimeOffset:], 390 uint64(s.Timestamp), 391 ) 392 binary.LittleEndian.PutUint64( 393 (*c)[doubleDeltaHeaderBaseValueOffset:], 394 math.Float64bits(float64(s.Value)), 395 ) 396} 397 398// addSecondSample is a helper method only used by c.add(). It calculates the 399// base delta from the provided sample and adds it to the chunk. 400func (c *doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) error { 401 baseTimeDelta := s.Timestamp - c.baseTime() 402 if baseTimeDelta < 0 { 403 return fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) 404 } 405 (*c) = (*c)[:doubleDeltaHeaderBytes] 406 if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { 407 // If already the base delta needs d8 (or we are at d8 408 // already, anyway), we better encode this timestamp 409 // directly rather than as a delta and switch everything 410 // to d8. 411 (*c)[doubleDeltaHeaderTimeBytesOffset] = byte(d8) 412 binary.LittleEndian.PutUint64( 413 (*c)[doubleDeltaHeaderBaseTimeDeltaOffset:], 414 uint64(s.Timestamp), 415 ) 416 } else { 417 binary.LittleEndian.PutUint64( 418 (*c)[doubleDeltaHeaderBaseTimeDeltaOffset:], 419 uint64(baseTimeDelta), 420 ) 421 } 422 baseValue := c.baseValue() 423 baseValueDelta := s.Value - baseValue 424 if vb >= d8 || baseValue+baseValueDelta != s.Value { 425 // If we can't reproduce the original sample value (or 426 // if we are at d8 already, anyway), we better encode 427 // this value directly rather than as a delta and switch 428 // everything to d8. 429 (*c)[doubleDeltaHeaderValueBytesOffset] = byte(d8) 430 (*c)[doubleDeltaHeaderIsIntOffset] = 0 431 binary.LittleEndian.PutUint64( 432 (*c)[doubleDeltaHeaderBaseValueDeltaOffset:], 433 math.Float64bits(float64(s.Value)), 434 ) 435 } else { 436 binary.LittleEndian.PutUint64( 437 (*c)[doubleDeltaHeaderBaseValueDeltaOffset:], 438 math.Float64bits(float64(baseValueDelta)), 439 ) 440 } 441 return nil 442} 443 444// doubleDeltaEncodedIndexAccessor implements indexAccessor. 445type doubleDeltaEncodedIndexAccessor struct { 446 c doubleDeltaEncodedChunk 447 baseT, baseΔT model.Time 448 baseV, baseΔV model.SampleValue 449 tBytes, vBytes deltaBytes 450 isInt bool 451 lastErr error 452} 453 454func (acc *doubleDeltaEncodedIndexAccessor) err() error { 455 return acc.lastErr 456} 457 458func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { 459 if idx == 0 { 460 return acc.baseT 461 } 462 if idx == 1 { 463 // If time bytes are at d8, the time is saved directly rather 464 // than as a difference. 465 if acc.tBytes == d8 { 466 return acc.baseΔT 467 } 468 return acc.baseT + acc.baseΔT 469 } 470 471 offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) 472 473 switch acc.tBytes { 474 case d1: 475 return acc.baseT + 476 model.Time(idx)*acc.baseΔT + 477 model.Time(int8(acc.c[offset])) 478 case d2: 479 return acc.baseT + 480 model.Time(idx)*acc.baseΔT + 481 model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) 482 case d4: 483 return acc.baseT + 484 model.Time(idx)*acc.baseΔT + 485 model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) 486 case d8: 487 // Take absolute value for d8. 488 return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) 489 default: 490 acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) 491 return model.Earliest 492 } 493} 494 495func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { 496 if idx == 0 { 497 return acc.baseV 498 } 499 if idx == 1 { 500 // If value bytes are at d8, the value is saved directly rather 501 // than as a difference. 502 if acc.vBytes == d8 { 503 return acc.baseΔV 504 } 505 return acc.baseV + acc.baseΔV 506 } 507 508 offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) 509 510 if acc.isInt { 511 switch acc.vBytes { 512 case d0: 513 return acc.baseV + 514 model.SampleValue(idx)*acc.baseΔV 515 case d1: 516 return acc.baseV + 517 model.SampleValue(idx)*acc.baseΔV + 518 model.SampleValue(int8(acc.c[offset])) 519 case d2: 520 return acc.baseV + 521 model.SampleValue(idx)*acc.baseΔV + 522 model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) 523 case d4: 524 return acc.baseV + 525 model.SampleValue(idx)*acc.baseΔV + 526 model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) 527 // No d8 for ints. 528 default: 529 acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) 530 return 0 531 } 532 } else { 533 switch acc.vBytes { 534 case d4: 535 return acc.baseV + 536 model.SampleValue(idx)*acc.baseΔV + 537 model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) 538 case d8: 539 // Take absolute value for d8. 540 return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) 541 default: 542 acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) 543 return 0 544 } 545 } 546} 547