1// This file was taken from Prometheus (https://github.com/prometheus/prometheus).
2// The original license header is included below:
3//
4// Copyright 2014 The Prometheus Authors
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package encoding
18
19import (
20	"encoding/binary"
21	"fmt"
22	"io"
23	"math"
24
25	"github.com/prometheus/common/model"
26)
27
28// The 37-byte header of a delta-encoded chunk looks like:
29//
30// - used buf bytes:           2 bytes
31// - time double-delta bytes:  1 bytes
32// - value double-delta bytes: 1 bytes
33// - is integer:               1 byte
34// - base time:                8 bytes
35// - base value:               8 bytes
36// - base time delta:          8 bytes
37// - base value delta:         8 bytes
38const (
39	doubleDeltaHeaderBytes    = 37
40	doubleDeltaHeaderMinBytes = 21 // header isn't full for chunk w/ one sample
41
42	doubleDeltaHeaderBufLenOffset         = 0
43	doubleDeltaHeaderTimeBytesOffset      = 2
44	doubleDeltaHeaderValueBytesOffset     = 3
45	doubleDeltaHeaderIsIntOffset          = 4
46	doubleDeltaHeaderBaseTimeOffset       = 5
47	doubleDeltaHeaderBaseValueOffset      = 13
48	doubleDeltaHeaderBaseTimeDeltaOffset  = 21
49	doubleDeltaHeaderBaseValueDeltaOffset = 29
50)
51
52// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with
53// a double-delta encoding of various types (int, float) and bit widths. A base
54// value and timestamp and a base delta for each is saved in the header. The
55// payload consists of double-deltas, i.e. deviations from the values and
56// timestamps calculated by applying the base value and time and the base deltas.
57// However, once 8 bytes would be needed to encode a double-delta value, a
58// fall-back to the absolute numbers happens (so that timestamps are saved
59// directly as int64 and values as float64).
60// doubleDeltaEncodedChunk implements the chunk interface.
61type doubleDeltaEncodedChunk []byte
62
63// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk.
64func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk {
65	if tb < 1 {
66		panic("need at least 1 time delta byte")
67	}
68	if length < doubleDeltaHeaderBytes+16 {
69		panic(fmt.Errorf(
70			"chunk length %d bytes is insufficient, need at least %d",
71			length, doubleDeltaHeaderBytes+16,
72		))
73	}
74	c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length)
75
76	c[doubleDeltaHeaderTimeBytesOffset] = byte(tb)
77	c[doubleDeltaHeaderValueBytesOffset] = byte(vb)
78	if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes.
79		c[doubleDeltaHeaderIsIntOffset] = 1
80	} else {
81		c[doubleDeltaHeaderIsIntOffset] = 0
82	}
83	return &c
84}
85
86// Add implements chunk.
87func (c *doubleDeltaEncodedChunk) Add(s model.SamplePair) (Chunk, error) {
88	// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
89	if c.Len() == 0 {
90		c.addFirstSample(s)
91		return nil, nil
92	}
93
94	tb := c.timeBytes()
95	vb := c.valueBytes()
96
97	if c.Len() == 1 {
98		err := c.addSecondSample(s, tb, vb)
99		return nil, err
100	}
101
102	remainingBytes := cap(*c) - len(*c)
103	sampleSize := c.sampleSize()
104
105	// Do we generally have space for another sample in this chunk? If not,
106	// overflow into a new one.
107	if remainingBytes < sampleSize {
108		return addToOverflowChunk(s)
109	}
110
111	projectedTime := c.baseTime() + model.Time(c.Len())*c.baseTimeDelta()
112	ddt := s.Timestamp - projectedTime
113
114	projectedValue := c.baseValue() + model.SampleValue(c.Len())*c.baseValueDelta()
115	ddv := s.Value - projectedValue
116
117	ntb, nvb, nInt := tb, vb, c.isInt()
118	// If the new sample is incompatible with the current encoding, reencode the
119	// existing chunk data into new chunk(s).
120	if c.isInt() && !isInt64(ddv) {
121		// int->float.
122		nvb = d4
123		nInt = false
124	} else if !c.isInt() && vb == d4 && projectedValue+model.SampleValue(float32(ddv)) != s.Value {
125		// float32->float64.
126		nvb = d8
127	} else {
128		if tb < d8 {
129			// Maybe more bytes for timestamp.
130			ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt))
131		}
132		if c.isInt() && vb < d8 {
133			// Maybe more bytes for sample value.
134			nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv))
135		}
136	}
137	if tb != ntb || vb != nvb || c.isInt() != nInt {
138		if len(*c)*2 < cap(*c) {
139			result, err := transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(*c)), c, s)
140			if err != nil {
141				return nil, err
142			}
143			// We cannot handle >2 chunks returned as we can only return 1 chunk.
144			// Ideally there wont be >2 chunks, but if it happens to be >2,
145			// we fall through to perfom `addToOverflowChunk` instead.
146			if len(result) == 1 {
147				// Replace the current chunk with the new bigger chunk.
148				c0 := result[0].(*doubleDeltaEncodedChunk)
149				*c = *c0
150				return nil, nil
151			} else if len(result) == 2 {
152				// Replace the current chunk with the new bigger chunk
153				// and return the additional chunk.
154				c0 := result[0].(*doubleDeltaEncodedChunk)
155				c1 := result[1].(*doubleDeltaEncodedChunk)
156				*c = *c0
157				return c1, nil
158			}
159		}
160
161		// Chunk is already half full. Better create a new one and save the transcoding efforts.
162		// We also perform this if `transcodeAndAdd` resulted in >2 chunks.
163		return addToOverflowChunk(s)
164	}
165
166	offset := len(*c)
167	(*c) = (*c)[:offset+sampleSize]
168
169	switch tb {
170	case d1:
171		(*c)[offset] = byte(ddt)
172	case d2:
173		binary.LittleEndian.PutUint16((*c)[offset:], uint16(ddt))
174	case d4:
175		binary.LittleEndian.PutUint32((*c)[offset:], uint32(ddt))
176	case d8:
177		// Store the absolute value (no delta) in case of d8.
178		binary.LittleEndian.PutUint64((*c)[offset:], uint64(s.Timestamp))
179	default:
180		return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
181	}
182
183	offset += int(tb)
184
185	if c.isInt() {
186		switch vb {
187		case d0:
188			// No-op. Constant delta is stored as base value.
189		case d1:
190			(*c)[offset] = byte(int8(ddv))
191		case d2:
192			binary.LittleEndian.PutUint16((*c)[offset:], uint16(int16(ddv)))
193		case d4:
194			binary.LittleEndian.PutUint32((*c)[offset:], uint32(int32(ddv)))
195		// d8 must not happen. Those samples are encoded as float64.
196		default:
197			return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
198		}
199	} else {
200		switch vb {
201		case d4:
202			binary.LittleEndian.PutUint32((*c)[offset:], math.Float32bits(float32(ddv)))
203		case d8:
204			// Store the absolute value (no delta) in case of d8.
205			binary.LittleEndian.PutUint64((*c)[offset:], math.Float64bits(float64(s.Value)))
206		default:
207			return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
208		}
209	}
210	return nil, nil
211}
212
213// FirstTime implements chunk.
214func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
215	return c.baseTime()
216}
217
218// NewIterator implements chunk.
219func (c *doubleDeltaEncodedChunk) NewIterator(_ Iterator) Iterator {
220	return newIndexAccessingChunkIterator(c.Len(), &doubleDeltaEncodedIndexAccessor{
221		c:      *c,
222		baseT:  c.baseTime(),
223		baseΔT: c.baseTimeDelta(),
224		baseV:  c.baseValue(),
225		baseΔV: c.baseValueDelta(),
226		tBytes: c.timeBytes(),
227		vBytes: c.valueBytes(),
228		isInt:  c.isInt(),
229	})
230}
231
232func (c *doubleDeltaEncodedChunk) Slice(_, _ model.Time) Chunk {
233	return c
234}
235
236func (c *doubleDeltaEncodedChunk) Rebound(start, end model.Time) (Chunk, error) {
237	return reboundChunk(c, start, end)
238}
239
240// Marshal implements chunk.
241func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error {
242	if len(c) > math.MaxUint16 {
243		panic("chunk buffer length would overflow a 16 bit uint")
244	}
245	binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
246
247	n, err := w.Write(c[:cap(c)])
248	if err != nil {
249		return err
250	}
251	if n != cap(c) {
252		return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
253	}
254	return nil
255}
256
257// MarshalToBuf implements chunk.
258func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
259	if len(c) > math.MaxUint16 {
260		panic("chunk buffer length would overflow a 16 bit uint")
261	}
262	binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
263
264	n := copy(buf, c)
265	if n != len(c) {
266		return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
267	}
268	return nil
269}
270
271// UnmarshalFromBuf implements chunk.
272func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
273	(*c) = (*c)[:cap((*c))]
274	copy((*c), buf)
275	return c.setLen()
276}
277
278// setLen sets the length of the underlying slice and performs some sanity checks.
279func (c *doubleDeltaEncodedChunk) setLen() error {
280	l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
281	if int(l) > cap((*c)) {
282		return fmt.Errorf("doubledelta chunk length exceeded during unmarshalling: %d", l)
283	}
284	if int(l) < doubleDeltaHeaderMinBytes {
285		return fmt.Errorf("doubledelta chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
286	}
287	switch c.timeBytes() {
288	case d1, d2, d4, d8:
289		// Pass.
290	default:
291		return fmt.Errorf("invalid number of time bytes in doubledelta chunk: %d", c.timeBytes())
292	}
293	switch c.valueBytes() {
294	case d0, d1, d2, d4, d8:
295		// Pass.
296	default:
297		return fmt.Errorf("invalid number of value bytes in doubledelta chunk: %d", c.valueBytes())
298	}
299	(*c) = (*c)[:l]
300	return nil
301}
302
303// Encoding implements chunk.
304func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta }
305
306// Utilization implements chunk.
307func (c doubleDeltaEncodedChunk) Utilization() float64 {
308	return float64(len(c)-doubleDeltaHeaderIsIntOffset-1) / float64(cap(c))
309}
310
311func (c doubleDeltaEncodedChunk) baseTime() model.Time {
312	return model.Time(
313		binary.LittleEndian.Uint64(
314			c[doubleDeltaHeaderBaseTimeOffset:],
315		),
316	)
317}
318
319func (c doubleDeltaEncodedChunk) baseValue() model.SampleValue {
320	return model.SampleValue(
321		math.Float64frombits(
322			binary.LittleEndian.Uint64(
323				c[doubleDeltaHeaderBaseValueOffset:],
324			),
325		),
326	)
327}
328
329func (c doubleDeltaEncodedChunk) baseTimeDelta() model.Time {
330	if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 {
331		return 0
332	}
333	return model.Time(
334		binary.LittleEndian.Uint64(
335			c[doubleDeltaHeaderBaseTimeDeltaOffset:],
336		),
337	)
338}
339
340func (c doubleDeltaEncodedChunk) baseValueDelta() model.SampleValue {
341	if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 {
342		return 0
343	}
344	return model.SampleValue(
345		math.Float64frombits(
346			binary.LittleEndian.Uint64(
347				c[doubleDeltaHeaderBaseValueDeltaOffset:],
348			),
349		),
350	)
351}
352
353func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes {
354	return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset])
355}
356
357func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes {
358	return deltaBytes(c[doubleDeltaHeaderValueBytesOffset])
359}
360
361func (c doubleDeltaEncodedChunk) sampleSize() int {
362	return int(c.timeBytes() + c.valueBytes())
363}
364
365// Len implements Chunk. Runs in constant time.
366func (c doubleDeltaEncodedChunk) Len() int {
367	if len(c) <= doubleDeltaHeaderIsIntOffset+1 {
368		return 0
369	}
370	if len(c) <= doubleDeltaHeaderBaseValueOffset+8 {
371		return 1
372	}
373	return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2
374}
375
376func (c doubleDeltaEncodedChunk) Size() int {
377	return len(c)
378}
379
380func (c doubleDeltaEncodedChunk) isInt() bool {
381	return c[doubleDeltaHeaderIsIntOffset] == 1
382}
383
384// addFirstSample is a helper method only used by c.add(). It adds timestamp and
385// value as base time and value.
386func (c *doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) {
387	(*c) = (*c)[:doubleDeltaHeaderBaseValueOffset+8]
388	binary.LittleEndian.PutUint64(
389		(*c)[doubleDeltaHeaderBaseTimeOffset:],
390		uint64(s.Timestamp),
391	)
392	binary.LittleEndian.PutUint64(
393		(*c)[doubleDeltaHeaderBaseValueOffset:],
394		math.Float64bits(float64(s.Value)),
395	)
396}
397
398// addSecondSample is a helper method only used by c.add(). It calculates the
399// base delta from the provided sample and adds it to the chunk.
400func (c *doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) error {
401	baseTimeDelta := s.Timestamp - c.baseTime()
402	if baseTimeDelta < 0 {
403		return fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
404	}
405	(*c) = (*c)[:doubleDeltaHeaderBytes]
406	if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
407		// If already the base delta needs d8 (or we are at d8
408		// already, anyway), we better encode this timestamp
409		// directly rather than as a delta and switch everything
410		// to d8.
411		(*c)[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
412		binary.LittleEndian.PutUint64(
413			(*c)[doubleDeltaHeaderBaseTimeDeltaOffset:],
414			uint64(s.Timestamp),
415		)
416	} else {
417		binary.LittleEndian.PutUint64(
418			(*c)[doubleDeltaHeaderBaseTimeDeltaOffset:],
419			uint64(baseTimeDelta),
420		)
421	}
422	baseValue := c.baseValue()
423	baseValueDelta := s.Value - baseValue
424	if vb >= d8 || baseValue+baseValueDelta != s.Value {
425		// If we can't reproduce the original sample value (or
426		// if we are at d8 already, anyway), we better encode
427		// this value directly rather than as a delta and switch
428		// everything to d8.
429		(*c)[doubleDeltaHeaderValueBytesOffset] = byte(d8)
430		(*c)[doubleDeltaHeaderIsIntOffset] = 0
431		binary.LittleEndian.PutUint64(
432			(*c)[doubleDeltaHeaderBaseValueDeltaOffset:],
433			math.Float64bits(float64(s.Value)),
434		)
435	} else {
436		binary.LittleEndian.PutUint64(
437			(*c)[doubleDeltaHeaderBaseValueDeltaOffset:],
438			math.Float64bits(float64(baseValueDelta)),
439		)
440	}
441	return nil
442}
443
444// doubleDeltaEncodedIndexAccessor implements indexAccessor.
445type doubleDeltaEncodedIndexAccessor struct {
446	c              doubleDeltaEncodedChunk
447	baseT, baseΔT  model.Time
448	baseV, baseΔV  model.SampleValue
449	tBytes, vBytes deltaBytes
450	isInt          bool
451	lastErr        error
452}
453
454func (acc *doubleDeltaEncodedIndexAccessor) err() error {
455	return acc.lastErr
456}
457
458func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
459	if idx == 0 {
460		return acc.baseT
461	}
462	if idx == 1 {
463		// If time bytes are at d8, the time is saved directly rather
464		// than as a difference.
465		if acc.tBytes == d8 {
466			return acc.baseΔT
467		}
468		return acc.baseT + acc.baseΔT
469	}
470
471	offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes)
472
473	switch acc.tBytes {
474	case d1:
475		return acc.baseT +
476			model.Time(idx)*acc.baseΔT +
477			model.Time(int8(acc.c[offset]))
478	case d2:
479		return acc.baseT +
480			model.Time(idx)*acc.baseΔT +
481			model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
482	case d4:
483		return acc.baseT +
484			model.Time(idx)*acc.baseΔT +
485			model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
486	case d8:
487		// Take absolute value for d8.
488		return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
489	default:
490		acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
491		return model.Earliest
492	}
493}
494
495func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
496	if idx == 0 {
497		return acc.baseV
498	}
499	if idx == 1 {
500		// If value bytes are at d8, the value is saved directly rather
501		// than as a difference.
502		if acc.vBytes == d8 {
503			return acc.baseΔV
504		}
505		return acc.baseV + acc.baseΔV
506	}
507
508	offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
509
510	if acc.isInt {
511		switch acc.vBytes {
512		case d0:
513			return acc.baseV +
514				model.SampleValue(idx)*acc.baseΔV
515		case d1:
516			return acc.baseV +
517				model.SampleValue(idx)*acc.baseΔV +
518				model.SampleValue(int8(acc.c[offset]))
519		case d2:
520			return acc.baseV +
521				model.SampleValue(idx)*acc.baseΔV +
522				model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
523		case d4:
524			return acc.baseV +
525				model.SampleValue(idx)*acc.baseΔV +
526				model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
527		// No d8 for ints.
528		default:
529			acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
530			return 0
531		}
532	} else {
533		switch acc.vBytes {
534		case d4:
535			return acc.baseV +
536				model.SampleValue(idx)*acc.baseΔV +
537				model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
538		case d8:
539			// Take absolute value for d8.
540			return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
541		default:
542			acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
543			return 0
544		}
545	}
546}
547