1// Copyright 2018 The Prometheus Authors
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tsdb
16
17import (
18	"math"
19	"sort"
20
21	"github.com/pkg/errors"
22	"github.com/prometheus/tsdb/encoding"
23	"github.com/prometheus/tsdb/labels"
24)
25
26// RecordType represents the data type of a record.
27type RecordType uint8
28
29const (
30	// RecordInvalid is returned for unrecognised WAL record types.
31	RecordInvalid RecordType = 255
32	// RecordSeries is used to match WAL records of type Series.
33	RecordSeries RecordType = 1
34	// RecordSamples is used to match WAL records of type Samples.
35	RecordSamples RecordType = 2
36	// RecordTombstones is used to match WAL records of type Tombstones.
37	RecordTombstones RecordType = 3
38)
39
40// RecordDecoder decodes series, sample, and tombstone records.
41// The zero value is ready to use.
42type RecordDecoder struct {
43}
44
45// Type returns the type of the record.
46// Return RecordInvalid if no valid record type is found.
47func (d *RecordDecoder) Type(rec []byte) RecordType {
48	if len(rec) < 1 {
49		return RecordInvalid
50	}
51	switch t := RecordType(rec[0]); t {
52	case RecordSeries, RecordSamples, RecordTombstones:
53		return t
54	}
55	return RecordInvalid
56}
57
58// Series appends series in rec to the given slice.
59func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
60	dec := encoding.Decbuf{B: rec}
61
62	if RecordType(dec.Byte()) != RecordSeries {
63		return nil, errors.New("invalid record type")
64	}
65	for len(dec.B) > 0 && dec.Err() == nil {
66		ref := dec.Be64()
67
68		lset := make(labels.Labels, dec.Uvarint())
69
70		for i := range lset {
71			lset[i].Name = dec.UvarintStr()
72			lset[i].Value = dec.UvarintStr()
73		}
74		sort.Sort(lset)
75
76		series = append(series, RefSeries{
77			Ref:    ref,
78			Labels: lset,
79		})
80	}
81	if dec.Err() != nil {
82		return nil, dec.Err()
83	}
84	if len(dec.B) > 0 {
85		return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
86	}
87	return series, nil
88}
89
90// Samples appends samples in rec to the given slice.
91func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
92	dec := encoding.Decbuf{B: rec}
93
94	if RecordType(dec.Byte()) != RecordSamples {
95		return nil, errors.New("invalid record type")
96	}
97	if dec.Len() == 0 {
98		return samples, nil
99	}
100	var (
101		baseRef  = dec.Be64()
102		baseTime = dec.Be64int64()
103	)
104	for len(dec.B) > 0 && dec.Err() == nil {
105		dref := dec.Varint64()
106		dtime := dec.Varint64()
107		val := dec.Be64()
108
109		samples = append(samples, RefSample{
110			Ref: uint64(int64(baseRef) + dref),
111			T:   baseTime + dtime,
112			V:   math.Float64frombits(val),
113		})
114	}
115
116	if dec.Err() != nil {
117		return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
118	}
119	if len(dec.B) > 0 {
120		return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
121	}
122	return samples, nil
123}
124
125// Tombstones appends tombstones in rec to the given slice.
126func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
127	dec := encoding.Decbuf{B: rec}
128
129	if RecordType(dec.Byte()) != RecordTombstones {
130		return nil, errors.New("invalid record type")
131	}
132	for dec.Len() > 0 && dec.Err() == nil {
133		tstones = append(tstones, Stone{
134			ref: dec.Be64(),
135			intervals: Intervals{
136				{Mint: dec.Varint64(), Maxt: dec.Varint64()},
137			},
138		})
139	}
140	if dec.Err() != nil {
141		return nil, dec.Err()
142	}
143	if len(dec.B) > 0 {
144		return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
145	}
146	return tstones, nil
147}
148
149// RecordEncoder encodes series, sample, and tombstones records.
150// The zero value is ready to use.
151type RecordEncoder struct {
152}
153
154// Series appends the encoded series to b and returns the resulting slice.
155func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
156	buf := encoding.Encbuf{B: b}
157	buf.PutByte(byte(RecordSeries))
158
159	for _, s := range series {
160		buf.PutBE64(s.Ref)
161		buf.PutUvarint(len(s.Labels))
162
163		for _, l := range s.Labels {
164			buf.PutUvarintStr(l.Name)
165			buf.PutUvarintStr(l.Value)
166		}
167	}
168	return buf.Get()
169}
170
171// Samples appends the encoded samples to b and returns the resulting slice.
172func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
173	buf := encoding.Encbuf{B: b}
174	buf.PutByte(byte(RecordSamples))
175
176	if len(samples) == 0 {
177		return buf.Get()
178	}
179
180	// Store base timestamp and base reference number of first sample.
181	// All samples encode their timestamp and ref as delta to those.
182	first := samples[0]
183
184	buf.PutBE64(first.Ref)
185	buf.PutBE64int64(first.T)
186
187	for _, s := range samples {
188		buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
189		buf.PutVarint64(s.T - first.T)
190		buf.PutBE64(math.Float64bits(s.V))
191	}
192	return buf.Get()
193}
194
195// Tombstones appends the encoded tombstones to b and returns the resulting slice.
196func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
197	buf := encoding.Encbuf{B: b}
198	buf.PutByte(byte(RecordTombstones))
199
200	for _, s := range tstones {
201		for _, iv := range s.intervals {
202			buf.PutBE64(s.ref)
203			buf.PutVarint64(iv.Mint)
204			buf.PutVarint64(iv.Maxt)
205		}
206	}
207	return buf.Get()
208}
209