1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package downsample
5
6import (
7	"encoding/binary"
8
9	"github.com/pkg/errors"
10	"github.com/prometheus/prometheus/tsdb/chunkenc"
11)
12
13// ChunkEncAggr is the top level encoding byte for the AggrChunk.
14// It picks the highest number possible to prevent future collisions with wrapped encodings.
15const ChunkEncAggr = chunkenc.Encoding(0xff)
16
17// AggrChunk is a chunk that is composed of a set of aggregates for the same underlying data.
18// Not all aggregates must be present.
19type AggrChunk []byte
20
21// EncodeAggrChunk encodes a new aggregate chunk from the array of chunks for each aggregate.
22// Each array entry corresponds to the respective AggrType number.
23func EncodeAggrChunk(chks [5]chunkenc.Chunk) *AggrChunk {
24	var b []byte
25	buf := [8]byte{}
26
27	for _, c := range chks {
28		// Unset aggregates are marked with a zero length entry.
29		if c == nil {
30			n := binary.PutUvarint(buf[:], 0)
31			b = append(b, buf[:n]...)
32			continue
33		}
34		l := len(c.Bytes())
35		n := binary.PutUvarint(buf[:], uint64(l))
36		b = append(b, buf[:n]...)
37		b = append(b, byte(c.Encoding()))
38		b = append(b, c.Bytes()...)
39	}
40	chk := AggrChunk(b)
41	return &chk
42}
43
44func (c AggrChunk) Bytes() []byte {
45	return []byte(c)
46}
47
48func (c AggrChunk) Appender() (chunkenc.Appender, error) {
49	return nil, errors.New("not implemented")
50}
51
52func (c AggrChunk) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
53	return chunkenc.NewNopIterator()
54}
55
56func (c AggrChunk) NumSamples() int {
57	x, err := c.Get(AggrCount)
58	if err != nil {
59		return 0
60	}
61	return x.NumSamples()
62}
63
64// ErrAggrNotExist is returned if a requested aggregation is not present in an AggrChunk.
65var ErrAggrNotExist = errors.New("aggregate does not exist")
66
67func (c AggrChunk) Encoding() chunkenc.Encoding {
68	return ChunkEncAggr
69}
70
71func (c AggrChunk) Compact() {}
72
73// Get returns the sub-chunk for the given aggregate type if it exists.
74func (c AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) {
75	b := c[:]
76	var x []byte
77
78	for i := AggrType(0); i <= t; i++ {
79		l, n := binary.Uvarint(b)
80		if n < 1 || len(b[n:]) < int(l)+1 {
81			return nil, errors.New("invalid size")
82		}
83		b = b[n:]
84		// If length is set to zero explicitly, that means the aggregate is unset.
85		if l == 0 {
86			if i == t {
87				return nil, ErrAggrNotExist
88			}
89			continue
90		}
91		x = b[:int(l)+1]
92		b = b[int(l)+1:]
93	}
94	return chunkenc.FromData(chunkenc.Encoding(x[0]), x[1:])
95}
96
97// AggrType represents an aggregation type.
98type AggrType uint8
99
100// Valid aggregations.
101const (
102	AggrCount AggrType = iota
103	AggrSum
104	AggrMin
105	AggrMax
106	AggrCounter
107)
108
109func (t AggrType) String() string {
110	switch t {
111	case AggrCount:
112		return "count"
113	case AggrSum:
114		return "sum"
115	case AggrMin:
116		return "min"
117	case AggrMax:
118		return "max"
119	case AggrCounter:
120		return "counter"
121	}
122	return "<unknown>"
123}
124