1package binary 2 3import ( 4 "bytes" 5 "errors" 6 "fmt" 7 "io" 8 9 "github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv" 10 "github.com/influxdata/influxdb/tsdb" 11 "github.com/influxdata/influxdb/tsdb/engine/tsm1" 12) 13 14type Reader struct { 15 r io.Reader 16 pr *PointsReader 17 state *readerState 18 stats *readerStats 19} 20 21type readerStats struct { 22 series int 23 counts [8]struct { 24 series, values int 25 } 26} 27type readerState byte 28 29const ( 30 readHeader readerState = iota + 1 31 readBucket 32 readSeries 33 readPoints 34 done 35) 36 37func NewReader(reader io.Reader) *Reader { 38 state := readHeader 39 var stats readerStats 40 r := &Reader{r: reader, state: &state, stats: &stats, 41 pr: &PointsReader{r: reader, values: make(tsm1.Values, tsdb.DefaultMaxPointsPerBlock), state: &state, stats: &stats}} 42 return r 43} 44 45func (r *Reader) ReadHeader() (*Header, error) { 46 if *r.state != readHeader { 47 return nil, fmt.Errorf("expected reader in state %v, was in state %v\n", readHeader, *r.state) 48 } 49 50 var magic [len(Magic)]byte 51 n, err := r.r.Read(magic[:]) 52 if err != nil { 53 return nil, err 54 } 55 56 if n < len(Magic) || !bytes.Equal(magic[:], Magic[:]) { 57 return nil, errors.New("IFLXDUMP header not present") 58 } 59 60 t, lv, err := tlv.ReadTLV(r.r) 61 if err != nil { 62 return nil, err 63 } 64 if t != byte(HeaderType) { 65 return nil, fmt.Errorf("expected header type, got %v", t) 66 } 67 h := &Header{} 68 err = h.Unmarshal(lv) 69 *r.state = readBucket 70 71 return h, err 72} 73 74func (r *Reader) Close() error { 75 return nil 76} 77 78func (r *Reader) NextBucket() (*BucketHeader, error) { 79 if *r.state != readBucket { 80 return nil, fmt.Errorf("expected reader in state %v, was in state %v", readBucket, *r.state) 81 } 82 83 t, lv, err := tlv.ReadTLV(r.r) 84 if err != nil { 85 if err == io.EOF { 86 *r.state = done 87 return nil, nil 88 } 89 return nil, err 90 } 91 if t != byte(BucketHeaderType) { 92 return nil, fmt.Errorf("expected bucket header type, got %v", t) 93 } 94 95 bh := &BucketHeader{} 96 err = bh.Unmarshal(lv) 97 if err != nil { 98 return nil, err 99 } 100 *r.state = readSeries 101 102 return bh, nil 103} 104 105func (r *Reader) NextSeries() (*SeriesHeader, error) { 106 if *r.state != readSeries { 107 return nil, fmt.Errorf("expected reader in state %v, was in state %v", readSeries, *r.state) 108 } 109 110 t, lv, err := tlv.ReadTLV(r.r) 111 if err != nil { 112 return nil, err 113 } 114 if t == byte(BucketFooterType) { 115 *r.state = readBucket 116 return nil, nil 117 } 118 if t != byte(SeriesHeaderType) { 119 return nil, fmt.Errorf("expected series header type, got %v", t) 120 } 121 sh := &SeriesHeader{} 122 err = sh.Unmarshal(lv) 123 if err != nil { 124 return nil, err 125 } 126 r.stats.series++ 127 r.stats.counts[sh.FieldType&7].series++ 128 129 var pointsType MessageType 130 switch sh.FieldType { 131 case FloatFieldType: 132 pointsType = FloatPointsType 133 case IntegerFieldType: 134 pointsType = IntegerPointsType 135 case UnsignedFieldType: 136 pointsType = UnsignedPointsType 137 case BooleanFieldType: 138 pointsType = BooleanPointsType 139 case StringFieldType: 140 pointsType = StringPointsType 141 default: 142 return nil, fmt.Errorf("unsupported series field type %v", sh.FieldType) 143 } 144 145 *r.state = readPoints 146 r.pr.Reset(pointsType) 147 return sh, nil 148} 149 150func (r *Reader) Points() *PointsReader { 151 return r.pr 152} 153 154type PointsReader struct { 155 pointsType MessageType 156 r io.Reader 157 values tsm1.Values 158 n int 159 state *readerState 160 stats *readerStats 161} 162 163func (pr *PointsReader) Reset(pointsType MessageType) { 164 pr.pointsType = pointsType 165 pr.n = 0 166} 167 168func (pr *PointsReader) Next() (bool, error) { 169 if *pr.state != readPoints { 170 return false, fmt.Errorf("expected reader in state %v, was in state %v", readPoints, *pr.state) 171 } 172 173 t, lv, err := tlv.ReadTLV(pr.r) 174 if err != nil { 175 return false, err 176 } 177 if t == byte(SeriesFooterType) { 178 *pr.state = readSeries 179 return false, nil 180 } 181 if t != byte(pr.pointsType) { 182 return false, fmt.Errorf("expected message type %v, got %v", pr.pointsType, t) 183 } 184 err = pr.marshalValues(lv) 185 if err != nil { 186 return false, err 187 } 188 189 return true, nil 190} 191 192func (pr *PointsReader) Values() tsm1.Values { 193 return pr.values[:pr.n] 194} 195 196func (pr *PointsReader) marshalValues(lv []byte) error { 197 switch pr.pointsType { 198 case FloatPointsType: 199 return pr.marshalFloats(lv) 200 case IntegerPointsType: 201 return pr.marshalIntegers(lv) 202 case UnsignedPointsType: 203 return pr.marshalUnsigned(lv) 204 case BooleanPointsType: 205 return pr.marshalBooleans(lv) 206 case StringPointsType: 207 return pr.marshalStrings(lv) 208 default: 209 return fmt.Errorf("unsupported points type %v", pr.pointsType) 210 } 211} 212 213func (pr *PointsReader) marshalFloats(lv []byte) error { 214 fp := &FloatPoints{} 215 err := fp.Unmarshal(lv) 216 if err != nil { 217 return err 218 } 219 for i, t := range fp.Timestamps { 220 pr.values[i] = tsm1.NewFloatValue(t, fp.Values[i]) 221 } 222 pr.stats.counts[0].values += len(fp.Timestamps) 223 pr.n = len(fp.Timestamps) 224 return nil 225} 226 227func (pr *PointsReader) marshalIntegers(lv []byte) error { 228 ip := &IntegerPoints{} 229 err := ip.Unmarshal(lv) 230 if err != nil { 231 return err 232 } 233 for i, t := range ip.Timestamps { 234 pr.values[i] = tsm1.NewIntegerValue(t, ip.Values[i]) 235 } 236 pr.stats.counts[1].values += len(ip.Timestamps) 237 pr.n = len(ip.Timestamps) 238 return nil 239} 240 241func (pr *PointsReader) marshalUnsigned(lv []byte) error { 242 up := &UnsignedPoints{} 243 err := up.Unmarshal(lv) 244 if err != nil { 245 return err 246 } 247 for i, t := range up.Timestamps { 248 pr.values[i] = tsm1.NewUnsignedValue(t, up.Values[i]) 249 } 250 pr.stats.counts[2].values += len(up.Timestamps) 251 pr.n = len(up.Timestamps) 252 return nil 253} 254 255func (pr *PointsReader) marshalBooleans(lv []byte) error { 256 bp := &BooleanPoints{} 257 err := bp.Unmarshal(lv) 258 if err != nil { 259 return err 260 } 261 for i, t := range bp.Timestamps { 262 pr.values[i] = tsm1.NewBooleanValue(t, bp.Values[i]) 263 } 264 pr.stats.counts[3].values += len(bp.Timestamps) 265 pr.n = len(bp.Timestamps) 266 return nil 267} 268 269func (pr *PointsReader) marshalStrings(lv []byte) error { 270 sp := &StringPoints{} 271 err := sp.Unmarshal(lv) 272 if err != nil { 273 return err 274 } 275 for i, t := range sp.Timestamps { 276 pr.values[i] = tsm1.NewStringValue(t, sp.Values[i]) 277 } 278 pr.stats.counts[4].values += len(sp.Timestamps) 279 pr.n = len(sp.Timestamps) 280 return nil 281} 282