1// Copyright 2018 The Prometheus Authors 2 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package tsdb 16 17import ( 18 "math" 19 "sort" 20 21 "github.com/pkg/errors" 22 "github.com/prometheus/tsdb/encoding" 23 "github.com/prometheus/tsdb/labels" 24) 25 26// RecordType represents the data type of a record. 27type RecordType uint8 28 29const ( 30 // RecordInvalid is returned for unrecognised WAL record types. 31 RecordInvalid RecordType = 255 32 // RecordSeries is used to match WAL records of type Series. 33 RecordSeries RecordType = 1 34 // RecordSamples is used to match WAL records of type Samples. 35 RecordSamples RecordType = 2 36 // RecordTombstones is used to match WAL records of type Tombstones. 37 RecordTombstones RecordType = 3 38) 39 40// RecordDecoder decodes series, sample, and tombstone records. 41// The zero value is ready to use. 42type RecordDecoder struct { 43} 44 45// Type returns the type of the record. 46// Return RecordInvalid if no valid record type is found. 47func (d *RecordDecoder) Type(rec []byte) RecordType { 48 if len(rec) < 1 { 49 return RecordInvalid 50 } 51 switch t := RecordType(rec[0]); t { 52 case RecordSeries, RecordSamples, RecordTombstones: 53 return t 54 } 55 return RecordInvalid 56} 57 58// Series appends series in rec to the given slice. 59func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { 60 dec := encoding.Decbuf{B: rec} 61 62 if RecordType(dec.Byte()) != RecordSeries { 63 return nil, errors.New("invalid record type") 64 } 65 for len(dec.B) > 0 && dec.Err() == nil { 66 ref := dec.Be64() 67 68 lset := make(labels.Labels, dec.Uvarint()) 69 70 for i := range lset { 71 lset[i].Name = dec.UvarintStr() 72 lset[i].Value = dec.UvarintStr() 73 } 74 sort.Sort(lset) 75 76 series = append(series, RefSeries{ 77 Ref: ref, 78 Labels: lset, 79 }) 80 } 81 if dec.Err() != nil { 82 return nil, dec.Err() 83 } 84 if len(dec.B) > 0 { 85 return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) 86 } 87 return series, nil 88} 89 90// Samples appends samples in rec to the given slice. 91func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { 92 dec := encoding.Decbuf{B: rec} 93 94 if RecordType(dec.Byte()) != RecordSamples { 95 return nil, errors.New("invalid record type") 96 } 97 if dec.Len() == 0 { 98 return samples, nil 99 } 100 var ( 101 baseRef = dec.Be64() 102 baseTime = dec.Be64int64() 103 ) 104 for len(dec.B) > 0 && dec.Err() == nil { 105 dref := dec.Varint64() 106 dtime := dec.Varint64() 107 val := dec.Be64() 108 109 samples = append(samples, RefSample{ 110 Ref: uint64(int64(baseRef) + dref), 111 T: baseTime + dtime, 112 V: math.Float64frombits(val), 113 }) 114 } 115 116 if dec.Err() != nil { 117 return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples)) 118 } 119 if len(dec.B) > 0 { 120 return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) 121 } 122 return samples, nil 123} 124 125// Tombstones appends tombstones in rec to the given slice. 126func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { 127 dec := encoding.Decbuf{B: rec} 128 129 if RecordType(dec.Byte()) != RecordTombstones { 130 return nil, errors.New("invalid record type") 131 } 132 for dec.Len() > 0 && dec.Err() == nil { 133 tstones = append(tstones, Stone{ 134 ref: dec.Be64(), 135 intervals: Intervals{ 136 {Mint: dec.Varint64(), Maxt: dec.Varint64()}, 137 }, 138 }) 139 } 140 if dec.Err() != nil { 141 return nil, dec.Err() 142 } 143 if len(dec.B) > 0 { 144 return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) 145 } 146 return tstones, nil 147} 148 149// RecordEncoder encodes series, sample, and tombstones records. 150// The zero value is ready to use. 151type RecordEncoder struct { 152} 153 154// Series appends the encoded series to b and returns the resulting slice. 155func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { 156 buf := encoding.Encbuf{B: b} 157 buf.PutByte(byte(RecordSeries)) 158 159 for _, s := range series { 160 buf.PutBE64(s.Ref) 161 buf.PutUvarint(len(s.Labels)) 162 163 for _, l := range s.Labels { 164 buf.PutUvarintStr(l.Name) 165 buf.PutUvarintStr(l.Value) 166 } 167 } 168 return buf.Get() 169} 170 171// Samples appends the encoded samples to b and returns the resulting slice. 172func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { 173 buf := encoding.Encbuf{B: b} 174 buf.PutByte(byte(RecordSamples)) 175 176 if len(samples) == 0 { 177 return buf.Get() 178 } 179 180 // Store base timestamp and base reference number of first sample. 181 // All samples encode their timestamp and ref as delta to those. 182 first := samples[0] 183 184 buf.PutBE64(first.Ref) 185 buf.PutBE64int64(first.T) 186 187 for _, s := range samples { 188 buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) 189 buf.PutVarint64(s.T - first.T) 190 buf.PutBE64(math.Float64bits(s.V)) 191 } 192 return buf.Get() 193} 194 195// Tombstones appends the encoded tombstones to b and returns the resulting slice. 196func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { 197 buf := encoding.Encbuf{B: b} 198 buf.PutByte(byte(RecordTombstones)) 199 200 for _, s := range tstones { 201 for _, iv := range s.intervals { 202 buf.PutBE64(s.ref) 203 buf.PutVarint64(iv.Mint) 204 buf.PutVarint64(iv.Maxt) 205 } 206 } 207 return buf.Get() 208} 209