1package binary_test 2 3import ( 4 "bytes" 5 "fmt" 6 "math" 7 "testing" 8 "time" 9 10 "github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary" 11 "github.com/influxdata/influxdb/models" 12 "github.com/influxdata/influxdb/tsdb" 13 "github.com/influxdata/influxql" 14) 15 16func TestReader_OneBucketOneIntegerSeries(t *testing.T) { 17 var buf bytes.Buffer 18 ts := []int64{0, 1, 2} 19 ints := []int64{10, 11, 12} 20 vs := make([]interface{}, len(ints)) 21 for i, v := range ints { 22 vs[i] = v 23 } 24 s := &oneSeriesData{ 25 db: "database", 26 rp: "default", 27 sd: time.Hour * 24, 28 start: int64(0), 29 end: int64(time.Hour * 24), 30 seriesName: []byte("series"), 31 seriesField: []byte("field"), 32 seriesTags: models.NewTags(map[string]string{"k": "v"}), 33 fieldType: binary.IntegerFieldType, 34 ts: ts, 35 vs: vs, 36 } 37 38 w := binary.NewWriter(&buf, s.db, s.rp, s.sd) 39 bw, _ := w.NewBucket(s.start, s.end) 40 bw.BeginSeries(s.seriesName, s.seriesField, influxql.Integer, s.seriesTags) 41 bw.WriteIntegerCursor(&intCursor{1, s.ts, ints}) 42 bw.EndSeries() 43 bw.Close() 44 w.Close() 45 46 verifySingleSeries(t, buf, s) 47} 48 49func TestReader_OneBucketOneFloatSeries(t *testing.T) { 50 var buf bytes.Buffer 51 ts := []int64{0, 1, 2} 52 floats := []float64{0.1, 11.1, 1200.0} 53 vs := make([]interface{}, len(floats)) 54 for i, v := range floats { 55 vs[i] = v 56 } 57 s := &oneSeriesData{ 58 db: "database", 59 rp: "default", 60 sd: time.Hour * 24, 61 start: int64(0), 62 end: int64(time.Hour * 24), 63 seriesName: []byte("series"), 64 seriesField: []byte("field"), 65 seriesTags: models.NewTags(map[string]string{"k": "v"}), 66 fieldType: binary.FloatFieldType, 67 ts: ts, 68 vs: vs, 69 } 70 71 w := binary.NewWriter(&buf, s.db, s.rp, s.sd) 72 bw, _ := w.NewBucket(s.start, s.end) 73 bw.BeginSeries(s.seriesName, s.seriesField, influxql.Float, s.seriesTags) 74 bw.WriteFloatCursor(&floatCursor{1, s.ts, floats}) 75 bw.EndSeries() 76 bw.Close() 77 w.Close() 78 79 verifySingleSeries(t, buf, s) 80} 81 82func TestReader_OneBucketOneUnsignedSeries(t *testing.T) { 83 var buf bytes.Buffer 84 ts := []int64{0, 1, 2} 85 uints := []uint64{0, 1, math.MaxUint64} 86 vs := make([]interface{}, len(uints)) 87 for i, v := range uints { 88 vs[i] = v 89 } 90 s := &oneSeriesData{ 91 db: "database", 92 rp: "default", 93 sd: time.Hour * 24, 94 start: int64(0), 95 end: int64(time.Hour * 24), 96 seriesName: []byte("series"), 97 seriesField: []byte("field"), 98 seriesTags: models.NewTags(map[string]string{"k": "v"}), 99 fieldType: binary.UnsignedFieldType, 100 ts: ts, 101 vs: vs, 102 } 103 104 w := binary.NewWriter(&buf, s.db, s.rp, s.sd) 105 bw, _ := w.NewBucket(s.start, s.end) 106 bw.BeginSeries(s.seriesName, s.seriesField, influxql.Unsigned, s.seriesTags) 107 bw.WriteUnsignedCursor(&unsignedCursor{1, s.ts, uints}) 108 bw.EndSeries() 109 bw.Close() 110 w.Close() 111 112 verifySingleSeries(t, buf, s) 113} 114 115func TestReader_OneBucketOneBooleanSeries(t *testing.T) { 116 var buf bytes.Buffer 117 ts := []int64{0, 1, 2} 118 bools := []bool{true, true, false} 119 vs := make([]interface{}, len(bools)) 120 for i, v := range bools { 121 vs[i] = v 122 } 123 s := &oneSeriesData{ 124 db: "database", 125 rp: "default", 126 sd: time.Hour * 24, 127 start: int64(0), 128 end: int64(time.Hour * 24), 129 seriesName: []byte("series"), 130 seriesField: []byte("field"), 131 seriesTags: models.NewTags(map[string]string{"k": "v"}), 132 fieldType: binary.BooleanFieldType, 133 ts: ts, 134 vs: vs, 135 } 136 137 w := binary.NewWriter(&buf, s.db, s.rp, s.sd) 138 bw, _ := w.NewBucket(s.start, s.end) 139 bw.BeginSeries(s.seriesName, s.seriesField, influxql.Boolean, s.seriesTags) 140 bw.WriteBooleanCursor(&booleanCursor{1, s.ts, bools}) 141 bw.EndSeries() 142 bw.Close() 143 w.Close() 144 145 verifySingleSeries(t, buf, s) 146} 147 148func TestReader_OneBucketOneStringSeries(t *testing.T) { 149 var buf bytes.Buffer 150 ts := []int64{0, 1, 2} 151 strings := []string{"", "a", "a《 》"} 152 vs := make([]interface{}, len(strings)) 153 for i, v := range strings { 154 vs[i] = v 155 } 156 s := &oneSeriesData{ 157 db: "database", 158 rp: "default", 159 sd: time.Hour * 24, 160 start: int64(0), 161 end: int64(time.Hour * 24), 162 seriesName: []byte("series"), 163 seriesField: []byte("field"), 164 seriesTags: models.NewTags(map[string]string{"k": "v"}), 165 fieldType: binary.StringFieldType, 166 ts: ts, 167 vs: vs, 168 } 169 170 w := binary.NewWriter(&buf, s.db, s.rp, s.sd) 171 bw, _ := w.NewBucket(s.start, s.end) 172 bw.BeginSeries(s.seriesName, s.seriesField, influxql.String, s.seriesTags) 173 bw.WriteStringCursor(&stringCursor{1, s.ts, strings}) 174 bw.EndSeries() 175 bw.Close() 176 w.Close() 177 178 verifySingleSeries(t, buf, s) 179} 180 181type oneSeriesData struct { 182 db string 183 rp string 184 sd time.Duration 185 start int64 186 end int64 187 seriesName []byte 188 seriesField []byte 189 seriesTags models.Tags 190 fieldType binary.FieldType 191 ts []int64 192 vs []interface{} 193} 194 195func verifySingleSeries(t *testing.T, buf bytes.Buffer, s *oneSeriesData) { 196 t.Helper() 197 r := binary.NewReader(&buf) 198 h, err := r.ReadHeader() 199 assertNoError(t, err) 200 assertEqual(t, h, &binary.Header{Database: s.db, RetentionPolicy: s.rp, ShardDuration: s.sd}) 201 202 bh, err := r.NextBucket() 203 assertNoError(t, err) 204 assertEqual(t, bh, &binary.BucketHeader{Start: s.start, End: s.end}) 205 206 sh, err := r.NextSeries() 207 assertNoError(t, err) 208 209 seriesKey := make([]byte, 0) 210 seriesKey = models.AppendMakeKey(seriesKey[:0], s.seriesName, s.seriesTags) 211 assertEqual(t, sh, &binary.SeriesHeader{FieldType: s.fieldType, SeriesKey: seriesKey, Field: s.seriesField}) 212 213 for i := 0; i < len(s.ts); i++ { 214 next, err := r.Points().Next() 215 assertNoError(t, err) 216 assertEqual(t, next, true) 217 values := r.Points().Values() 218 assertEqual(t, len(values), 1) 219 assertEqual(t, values[0].UnixNano(), s.ts[i]) 220 assertEqual(t, values[0].Value(), s.vs[i]) 221 } 222 223 next, err := r.Points().Next() 224 assertNoError(t, err) 225 assertEqual(t, next, false) 226 227 sh, err = r.NextSeries() 228 assertNoError(t, err) 229 assertNil(t, sh) 230 231 bh, err = r.NextBucket() 232 assertNoError(t, err) 233 assertNil(t, bh) 234} 235 236func TestReader_OneBucketMixedSeries(t *testing.T) { 237 var buf bytes.Buffer 238 db := "db" 239 rp := "rp" 240 start := int64(0) 241 end := int64(time.Hour * 24) 242 seriesName := []byte("cpu") 243 seriesField := []byte("idle") 244 seriesTags1 := models.NewTags(map[string]string{"host": "host1", "region": "us-west-1"}) 245 seriesTags2 := models.NewTags(map[string]string{"host": "host2", "region": "us-west-1"}) 246 247 w := binary.NewWriter(&buf, db, rp, time.Hour*24) 248 bw, _ := w.NewBucket(start, end) 249 bw.BeginSeries(seriesName, seriesField, influxql.Integer, seriesTags1) 250 t1s := []int64{0, 1, 2} 251 v1s := []int64{10, 11, 12} 252 bw.WriteIntegerCursor(&intCursor{1, t1s, v1s}) 253 bw.EndSeries() 254 bw.BeginSeries(seriesName, seriesField, influxql.Integer, seriesTags2) 255 t2s := []int64{1, 2, 3} 256 v2s := []float64{7, 8, 9} 257 bw.WriteFloatCursor(&floatCursor{1, t2s, v2s}) 258 bw.EndSeries() 259 bw.Close() 260 w.Close() 261 262 r := binary.NewReader(&buf) 263 h, err := r.ReadHeader() 264 assertNoError(t, err) 265 assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: time.Hour * 24}) 266 267 bh, err := r.NextBucket() 268 assertNoError(t, err) 269 assertEqual(t, bh, &binary.BucketHeader{Start: start, End: end}) 270 271 sh, err := r.NextSeries() 272 assertNoError(t, err) 273 274 seriesKey := make([]byte, 0) 275 seriesKey = models.AppendMakeKey(seriesKey[:0], seriesName, seriesTags1) 276 assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.IntegerFieldType, SeriesKey: seriesKey, Field: seriesField}) 277 278 for i := 0; i < len(t1s); i++ { 279 next, err := r.Points().Next() 280 assertNoError(t, err) 281 assertEqual(t, next, true) 282 values := r.Points().Values() 283 assertEqual(t, len(values), 1) 284 assertEqual(t, values[0].UnixNano(), t1s[i]) 285 assertEqual(t, values[0].Value(), v1s[i]) 286 } 287 288 next, err := r.Points().Next() 289 assertNoError(t, err) 290 assertEqual(t, next, false) 291 292 sh, err = r.NextSeries() 293 assertNoError(t, err) 294 295 seriesKey = models.AppendMakeKey(seriesKey[:0], seriesName, seriesTags2) 296 assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.FloatFieldType, SeriesKey: seriesKey, Field: seriesField}) 297 298 for i := 0; i < len(t2s); i++ { 299 next, err := r.Points().Next() 300 assertNoError(t, err) 301 assertEqual(t, next, true) 302 values := r.Points().Values() 303 assertEqual(t, len(values), 1) 304 assertEqual(t, values[0].UnixNano(), t2s[i]) 305 assertEqual(t, values[0].Value(), v2s[i]) 306 } 307 308 next, err = r.Points().Next() 309 assertNoError(t, err) 310 assertEqual(t, next, false) 311 312 sh, err = r.NextSeries() 313 assertNoError(t, err) 314 assertNil(t, sh) 315 316 bh, err = r.NextBucket() 317 assertNoError(t, err) 318 assertNil(t, bh) 319} 320 321func TestReader_EmptyBucket(t *testing.T) { 322 var buf bytes.Buffer 323 db := "db" 324 rp := "default" 325 start := int64(0) 326 end := int64(time.Hour * 24) 327 328 w := binary.NewWriter(&buf, db, rp, time.Hour*24) 329 bw, _ := w.NewBucket(start, end) 330 bw.Close() 331 w.Close() 332 333 r := binary.NewReader(&buf) 334 h, err := r.ReadHeader() 335 assertNoError(t, err) 336 assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: time.Hour * 24}) 337 338 bh, err := r.NextBucket() 339 assertNoError(t, err) 340 assertEqual(t, bh, &binary.BucketHeader{Start: start, End: end}) 341 342 sh, err := r.NextSeries() 343 assertNoError(t, err) 344 assertNil(t, sh) 345 346 bh, err = r.NextBucket() 347 assertNoError(t, err) 348 assertNil(t, bh) 349} 350 351func TestReader_States(t *testing.T) { 352 var buf bytes.Buffer 353 r := binary.NewReader(&buf) 354 355 next, err := r.Points().Next() 356 assertError(t, err, fmt.Errorf("expected reader in state %v, was in state %v", 4, 1)) 357 assertEqual(t, next, false) 358 359 sh, err := r.NextSeries() 360 assertError(t, err, fmt.Errorf("expected reader in state %v, was in state %v", 3, 1)) 361 assertNil(t, sh) 362 363 bh, err := r.NextBucket() 364 assertError(t, err, fmt.Errorf("expected reader in state %v, was in state %v", 2, 1)) 365 assertNil(t, bh) 366} 367 368type floatCursor struct { 369 c int // number of values to return per call to Next 370 keys []int64 371 vals []float64 372} 373 374func (c *floatCursor) Close() {} 375func (c *floatCursor) Err() error { return nil } 376func (c *floatCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} } 377 378func (c *floatCursor) Next() *tsdb.FloatArray { 379 if c.c > len(c.keys) { 380 c.c = len(c.keys) 381 } 382 383 var a tsdb.FloatArray 384 a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c] 385 c.keys, c.vals = c.keys[c.c:], c.vals[c.c:] 386 return &a 387} 388 389type unsignedCursor struct { 390 c int // number of values to return per call to Next 391 keys []int64 392 vals []uint64 393} 394 395func (c *unsignedCursor) Close() {} 396func (c *unsignedCursor) Err() error { return nil } 397func (c *unsignedCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} } 398 399func (c *unsignedCursor) Next() *tsdb.UnsignedArray { 400 if c.c > len(c.keys) { 401 c.c = len(c.keys) 402 } 403 404 var a tsdb.UnsignedArray 405 a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c] 406 c.keys, c.vals = c.keys[c.c:], c.vals[c.c:] 407 return &a 408} 409 410type booleanCursor struct { 411 c int // number of values to return per call to Next 412 keys []int64 413 vals []bool 414} 415 416func (c *booleanCursor) Close() {} 417func (c *booleanCursor) Err() error { return nil } 418func (c *booleanCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} } 419 420func (c *booleanCursor) Next() *tsdb.BooleanArray { 421 if c.c > len(c.keys) { 422 c.c = len(c.keys) 423 } 424 425 var a tsdb.BooleanArray 426 a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c] 427 c.keys, c.vals = c.keys[c.c:], c.vals[c.c:] 428 return &a 429} 430 431type stringCursor struct { 432 c int // number of values to return per call to Next 433 keys []int64 434 vals []string 435} 436 437func (c *stringCursor) Close() {} 438func (c *stringCursor) Err() error { return nil } 439func (c *stringCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} } 440 441func (c *stringCursor) Next() *tsdb.StringArray { 442 if c.c > len(c.keys) { 443 c.c = len(c.keys) 444 } 445 446 var a tsdb.StringArray 447 a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c] 448 c.keys, c.vals = c.keys[c.c:], c.vals[c.c:] 449 return &a 450} 451 452func assertNil(t *testing.T, got interface{}) { 453 t.Helper() 454 if got == nil { 455 t.Fatalf("not nil: got:\n%s", got) 456 } 457} 458 459func assertError(t *testing.T, got error, exp error) { 460 t.Helper() 461 if got == nil { 462 t.Fatalf("did not receive expected error: %s", exp) 463 } else { 464 assertEqual(t, got.Error(), exp.Error()) 465 } 466} 467