1package reads 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 8 "github.com/gogo/protobuf/types" 9 "github.com/influxdata/flux" 10 "github.com/influxdata/flux/execute" 11 "github.com/influxdata/flux/memory" 12 "github.com/influxdata/flux/values" 13 "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb" 14 "github.com/influxdata/influxdb/models" 15 "github.com/influxdata/influxdb/storage/reads/datatypes" 16 "github.com/influxdata/influxdb/tsdb/cursors" 17) 18 19type storageTable interface { 20 flux.Table 21 Close() 22 Cancel() 23 Statistics() cursors.CursorStats 24} 25 26type storeReader struct { 27 s Store 28} 29 30func NewReader(s Store) influxdb.Reader { 31 return &storeReader{s: s} 32} 33 34func (r *storeReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { 35 return &filterIterator{ 36 ctx: ctx, 37 s: r.s, 38 spec: spec, 39 cache: newTagsCache(0), 40 alloc: alloc, 41 }, nil 42} 43 44func (r *storeReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { 45 return &groupIterator{ 46 ctx: ctx, 47 s: r.s, 48 spec: spec, 49 cache: newTagsCache(0), 50 alloc: alloc, 51 }, nil 52} 53 54func (r *storeReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { 55 var predicate *datatypes.Predicate 56 if spec.Predicate != nil { 57 p, err := toStoragePredicate(spec.Predicate) 58 if err != nil { 59 return nil, err 60 } 61 predicate = p 62 } 63 64 return &tagKeysIterator{ 65 ctx: ctx, 66 bounds: spec.Bounds, 67 s: r.s, 68 readSpec: spec, 69 predicate: predicate, 70 alloc: alloc, 71 }, nil 72} 73 74func (r *storeReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { 75 var predicate *datatypes.Predicate 76 if spec.Predicate != nil { 77 p, err := toStoragePredicate(spec.Predicate) 78 if err != nil { 79 return nil, err 80 } 81 predicate = p 82 } 83 84 return &tagValuesIterator{ 85 ctx: ctx, 86 bounds: spec.Bounds, 87 s: r.s, 88 readSpec: spec, 89 predicate: predicate, 90 alloc: alloc, 91 }, nil 92} 93 94func (r *storeReader) Close() {} 95 96type filterIterator struct { 97 ctx context.Context 98 s Store 99 spec influxdb.ReadFilterSpec 100 stats cursors.CursorStats 101 cache *tagsCache 102 alloc *memory.Allocator 103} 104 105func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats } 106 107func (fi *filterIterator) Do(f func(flux.Table) error) error { 108 src := fi.s.GetSource( 109 fi.spec.Database, 110 fi.spec.RetentionPolicy, 111 ) 112 113 // Setup read request 114 any, err := types.MarshalAny(src) 115 if err != nil { 116 return err 117 } 118 119 var predicate *datatypes.Predicate 120 if fi.spec.Predicate != nil { 121 p, err := toStoragePredicate(fi.spec.Predicate) 122 if err != nil { 123 return err 124 } 125 predicate = p 126 } 127 128 var req datatypes.ReadFilterRequest 129 req.ReadSource = any 130 req.Predicate = predicate 131 req.Range.Start = int64(fi.spec.Bounds.Start) 132 req.Range.End = int64(fi.spec.Bounds.Stop) 133 134 rs, err := fi.s.ReadFilter(fi.ctx, &req) 135 if err != nil { 136 return err 137 } 138 139 if rs == nil { 140 return nil 141 } 142 143 return fi.handleRead(f, rs) 144} 145 146func (fi *filterIterator) handleRead(f func(flux.Table) error, rs ResultSet) error { 147 // these resources must be closed if not nil on return 148 var ( 149 cur cursors.Cursor 150 table storageTable 151 ) 152 153 defer func() { 154 if table != nil { 155 table.Close() 156 } 157 if cur != nil { 158 cur.Close() 159 } 160 rs.Close() 161 fi.cache.Release() 162 }() 163 164READ: 165 for rs.Next() { 166 cur = rs.Cursor() 167 if cur == nil { 168 // no data for series key + field combination 169 continue 170 } 171 172 bnds := fi.spec.Bounds 173 key := defaultGroupKeyForSeries(rs.Tags(), bnds) 174 done := make(chan struct{}) 175 switch typedCur := cur.(type) { 176 case cursors.IntegerArrayCursor: 177 cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt) 178 table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) 179 case cursors.FloatArrayCursor: 180 cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat) 181 table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) 182 case cursors.UnsignedArrayCursor: 183 cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt) 184 table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) 185 case cursors.BooleanArrayCursor: 186 cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool) 187 table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) 188 case cursors.StringArrayCursor: 189 cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) 190 table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) 191 default: 192 panic(fmt.Sprintf("unreachable: %T", typedCur)) 193 } 194 195 cur = nil 196 197 if !table.Empty() { 198 if err := f(table); err != nil { 199 table.Close() 200 table = nil 201 return err 202 } 203 select { 204 case <-done: 205 case <-fi.ctx.Done(): 206 table.Cancel() 207 break READ 208 } 209 } 210 211 stats := table.Statistics() 212 fi.stats.ScannedValues += stats.ScannedValues 213 fi.stats.ScannedBytes += stats.ScannedBytes 214 table.Close() 215 table = nil 216 } 217 return rs.Err() 218} 219 220type groupIterator struct { 221 ctx context.Context 222 s Store 223 spec influxdb.ReadGroupSpec 224 stats cursors.CursorStats 225 cache *tagsCache 226 alloc *memory.Allocator 227} 228 229func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats } 230 231func (gi *groupIterator) Do(f func(flux.Table) error) error { 232 src := gi.s.GetSource( 233 gi.spec.Database, 234 gi.spec.RetentionPolicy, 235 ) 236 237 // Setup read request 238 any, err := types.MarshalAny(src) 239 if err != nil { 240 return err 241 } 242 243 var predicate *datatypes.Predicate 244 if gi.spec.Predicate != nil { 245 p, err := toStoragePredicate(gi.spec.Predicate) 246 if err != nil { 247 return err 248 } 249 predicate = p 250 } 251 252 var req datatypes.ReadGroupRequest 253 req.ReadSource = any 254 req.Predicate = predicate 255 req.Range.Start = int64(gi.spec.Bounds.Start) 256 req.Range.End = int64(gi.spec.Bounds.Stop) 257 258 req.Group = convertGroupMode(gi.spec.GroupMode) 259 req.GroupKeys = gi.spec.GroupKeys 260 261 if agg, err := determineAggregateMethod(gi.spec.AggregateMethod); err != nil { 262 return err 263 } else if agg != datatypes.AggregateTypeNone { 264 req.Aggregate = &datatypes.Aggregate{Type: agg} 265 } 266 267 rs, err := gi.s.ReadGroup(gi.ctx, &req) 268 if err != nil { 269 return err 270 } 271 272 if rs == nil { 273 return nil 274 } 275 return gi.handleRead(f, rs) 276} 277 278func (gi *groupIterator) handleRead(f func(flux.Table) error, rs GroupResultSet) error { 279 // these resources must be closed if not nil on return 280 var ( 281 gc GroupCursor 282 cur cursors.Cursor 283 table storageTable 284 ) 285 286 defer func() { 287 if table != nil { 288 table.Close() 289 } 290 if cur != nil { 291 cur.Close() 292 } 293 if gc != nil { 294 gc.Close() 295 } 296 rs.Close() 297 gi.cache.Release() 298 }() 299 300 gc = rs.Next() 301READ: 302 for gc != nil { 303 for gc.Next() { 304 cur = gc.Cursor() 305 if cur != nil { 306 break 307 } 308 } 309 310 if cur == nil { 311 gc.Close() 312 gc = rs.Next() 313 continue 314 } 315 316 bnds := gi.spec.Bounds 317 key := groupKeyForGroup(gc.PartitionKeyVals(), &gi.spec, bnds) 318 done := make(chan struct{}) 319 switch typedCur := cur.(type) { 320 case cursors.IntegerArrayCursor: 321 cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt) 322 table = newIntegerGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) 323 case cursors.FloatArrayCursor: 324 cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat) 325 table = newFloatGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) 326 case cursors.UnsignedArrayCursor: 327 cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt) 328 table = newUnsignedGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) 329 case cursors.BooleanArrayCursor: 330 cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool) 331 table = newBooleanGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) 332 case cursors.StringArrayCursor: 333 cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString) 334 table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) 335 default: 336 panic(fmt.Sprintf("unreachable: %T", typedCur)) 337 } 338 339 // table owns these resources and is responsible for closing them 340 cur = nil 341 gc = nil 342 343 if err := f(table); err != nil { 344 table.Close() 345 table = nil 346 return err 347 } 348 select { 349 case <-done: 350 case <-gi.ctx.Done(): 351 table.Cancel() 352 break READ 353 } 354 355 stats := table.Statistics() 356 gi.stats.ScannedValues += stats.ScannedValues 357 gi.stats.ScannedBytes += stats.ScannedBytes 358 table.Close() 359 table = nil 360 361 gc = rs.Next() 362 } 363 return rs.Err() 364} 365 366func determineAggregateMethod(agg string) (datatypes.Aggregate_AggregateType, error) { 367 if agg == "" { 368 return datatypes.AggregateTypeNone, nil 369 } 370 371 if t, ok := datatypes.Aggregate_AggregateType_value[strings.ToUpper(agg)]; ok { 372 return datatypes.Aggregate_AggregateType(t), nil 373 } 374 return 0, fmt.Errorf("unknown aggregate type %q", agg) 375} 376 377func convertGroupMode(m influxdb.GroupMode) datatypes.ReadGroupRequest_Group { 378 switch m { 379 case influxdb.GroupModeNone: 380 return datatypes.GroupNone 381 case influxdb.GroupModeBy: 382 return datatypes.GroupBy 383 } 384 panic(fmt.Sprint("invalid group mode: ", m)) 385} 386 387const ( 388 startColIdx = 0 389 stopColIdx = 1 390 timeColIdx = 2 391 valueColIdx = 3 392) 393 394func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.ColMeta, [][]byte) { 395 cols := make([]flux.ColMeta, 4+len(tags)) 396 defs := make([][]byte, 4+len(tags)) 397 cols[startColIdx] = flux.ColMeta{ 398 Label: execute.DefaultStartColLabel, 399 Type: flux.TTime, 400 } 401 cols[stopColIdx] = flux.ColMeta{ 402 Label: execute.DefaultStopColLabel, 403 Type: flux.TTime, 404 } 405 cols[timeColIdx] = flux.ColMeta{ 406 Label: execute.DefaultTimeColLabel, 407 Type: flux.TTime, 408 } 409 cols[valueColIdx] = flux.ColMeta{ 410 Label: execute.DefaultValueColLabel, 411 Type: typ, 412 } 413 for j, tag := range tags { 414 cols[4+j] = flux.ColMeta{ 415 Label: string(tag.Key), 416 Type: flux.TString, 417 } 418 defs[4+j] = []byte("") 419 } 420 return cols, defs 421} 422 423func defaultGroupKeyForSeries(tags models.Tags, bnds execute.Bounds) flux.GroupKey { 424 cols := make([]flux.ColMeta, 2, len(tags)+2) 425 vs := make([]values.Value, 2, len(tags)+2) 426 cols[0] = flux.ColMeta{ 427 Label: execute.DefaultStartColLabel, 428 Type: flux.TTime, 429 } 430 vs[0] = values.NewTime(bnds.Start) 431 cols[1] = flux.ColMeta{ 432 Label: execute.DefaultStopColLabel, 433 Type: flux.TTime, 434 } 435 vs[1] = values.NewTime(bnds.Stop) 436 for i := range tags { 437 cols = append(cols, flux.ColMeta{ 438 Label: string(tags[i].Key), 439 Type: flux.TString, 440 }) 441 vs = append(vs, values.NewString(string(tags[i].Value))) 442 } 443 return execute.NewGroupKey(cols, vs) 444} 445 446func determineTableColsForGroup(tagKeys [][]byte, typ flux.ColType) ([]flux.ColMeta, [][]byte) { 447 cols := make([]flux.ColMeta, 4+len(tagKeys)) 448 defs := make([][]byte, 4+len(tagKeys)) 449 cols[startColIdx] = flux.ColMeta{ 450 Label: execute.DefaultStartColLabel, 451 Type: flux.TTime, 452 } 453 cols[stopColIdx] = flux.ColMeta{ 454 Label: execute.DefaultStopColLabel, 455 Type: flux.TTime, 456 } 457 cols[timeColIdx] = flux.ColMeta{ 458 Label: execute.DefaultTimeColLabel, 459 Type: flux.TTime, 460 } 461 cols[valueColIdx] = flux.ColMeta{ 462 Label: execute.DefaultValueColLabel, 463 Type: typ, 464 } 465 for j, tag := range tagKeys { 466 cols[4+j] = flux.ColMeta{ 467 Label: string(tag), 468 Type: flux.TString, 469 } 470 defs[4+j] = []byte("") 471 472 } 473 return cols, defs 474} 475 476func groupKeyForGroup(kv [][]byte, spec *influxdb.ReadGroupSpec, bnds execute.Bounds) flux.GroupKey { 477 cols := make([]flux.ColMeta, 2, len(spec.GroupKeys)+2) 478 vs := make([]values.Value, 2, len(spec.GroupKeys)+2) 479 cols[0] = flux.ColMeta{ 480 Label: execute.DefaultStartColLabel, 481 Type: flux.TTime, 482 } 483 vs[0] = values.NewTime(bnds.Start) 484 cols[1] = flux.ColMeta{ 485 Label: execute.DefaultStopColLabel, 486 Type: flux.TTime, 487 } 488 vs[1] = values.NewTime(bnds.Stop) 489 for i := range spec.GroupKeys { 490 if spec.GroupKeys[i] == execute.DefaultStartColLabel || spec.GroupKeys[i] == execute.DefaultStopColLabel { 491 continue 492 } 493 cols = append(cols, flux.ColMeta{ 494 Label: spec.GroupKeys[i], 495 Type: flux.TString, 496 }) 497 vs = append(vs, values.NewString(string(kv[i]))) 498 } 499 return execute.NewGroupKey(cols, vs) 500} 501 502type tagKeysIterator struct { 503 ctx context.Context 504 bounds execute.Bounds 505 s Store 506 readSpec influxdb.ReadTagKeysSpec 507 predicate *datatypes.Predicate 508 alloc *memory.Allocator 509} 510 511func (ti *tagKeysIterator) Do(f func(flux.Table) error) error { 512 src := ti.s.GetSource( 513 ti.readSpec.Database, 514 ti.readSpec.RetentionPolicy, 515 ) 516 517 var req datatypes.TagKeysRequest 518 if any, err := types.MarshalAny(src); err != nil { 519 return err 520 } else { 521 req.TagsSource = any 522 } 523 req.Predicate = ti.predicate 524 req.Range.Start = int64(ti.bounds.Start) 525 req.Range.End = int64(ti.bounds.Stop) 526 527 rs, err := ti.s.TagKeys(ti.ctx, &req) 528 if err != nil { 529 return err 530 } 531 return ti.handleRead(f, rs) 532} 533 534func (ti *tagKeysIterator) handleRead(f func(flux.Table) error, rs cursors.StringIterator) error { 535 key := execute.NewGroupKey(nil, nil) 536 builder := execute.NewColListTableBuilder(key, ti.alloc) 537 valueIdx, err := builder.AddCol(flux.ColMeta{ 538 Label: execute.DefaultValueColLabel, 539 Type: flux.TString, 540 }) 541 if err != nil { 542 return err 543 } 544 defer builder.ClearData() 545 546 // Add the _start and _stop columns that come from storage. 547 if err := builder.AppendString(valueIdx, "_start"); err != nil { 548 return err 549 } 550 if err := builder.AppendString(valueIdx, "_stop"); err != nil { 551 return err 552 } 553 554 for rs.Next() { 555 v := rs.Value() 556 switch v { 557 case models.MeasurementTagKey: 558 v = "_measurement" 559 case models.FieldKeyTagKey: 560 v = "_field" 561 } 562 563 if err := builder.AppendString(valueIdx, v); err != nil { 564 return err 565 } 566 } 567 568 // Construct the table and add to the reference count 569 // so we can free the table later. 570 tbl, err := builder.Table() 571 if err != nil { 572 return err 573 } 574 575 // Release the references to the arrays held by the builder. 576 builder.ClearData() 577 return f(tbl) 578} 579 580func (ti *tagKeysIterator) Statistics() cursors.CursorStats { 581 return cursors.CursorStats{} 582} 583 584type tagValuesIterator struct { 585 ctx context.Context 586 bounds execute.Bounds 587 s Store 588 readSpec influxdb.ReadTagValuesSpec 589 predicate *datatypes.Predicate 590 alloc *memory.Allocator 591} 592 593func (ti *tagValuesIterator) Do(f func(flux.Table) error) error { 594 src := ti.s.GetSource( 595 ti.readSpec.Database, 596 ti.readSpec.RetentionPolicy, 597 ) 598 599 var req datatypes.TagValuesRequest 600 if any, err := types.MarshalAny(src); err != nil { 601 return err 602 } else { 603 req.TagsSource = any 604 } 605 switch ti.readSpec.TagKey { 606 case "_measurement": 607 req.TagKey = models.MeasurementTagKey 608 case "_field": 609 req.TagKey = models.FieldKeyTagKey 610 default: 611 req.TagKey = ti.readSpec.TagKey 612 } 613 req.Predicate = ti.predicate 614 req.Range.Start = int64(ti.bounds.Start) 615 req.Range.End = int64(ti.bounds.Stop) 616 617 rs, err := ti.s.TagValues(ti.ctx, &req) 618 if err != nil { 619 return err 620 } 621 return ti.handleRead(f, rs) 622} 623 624func (ti *tagValuesIterator) handleRead(f func(flux.Table) error, rs cursors.StringIterator) error { 625 key := execute.NewGroupKey(nil, nil) 626 builder := execute.NewColListTableBuilder(key, ti.alloc) 627 valueIdx, err := builder.AddCol(flux.ColMeta{ 628 Label: execute.DefaultValueColLabel, 629 Type: flux.TString, 630 }) 631 if err != nil { 632 return err 633 } 634 defer builder.ClearData() 635 636 for rs.Next() { 637 if err := builder.AppendString(valueIdx, rs.Value()); err != nil { 638 return err 639 } 640 } 641 642 // Construct the table and add to the reference count 643 // so we can free the table later. 644 tbl, err := builder.Table() 645 if err != nil { 646 return err 647 } 648 649 // Release the references to the arrays held by the builder. 650 builder.ClearData() 651 return f(tbl) 652} 653 654func (ti *tagValuesIterator) Statistics() cursors.CursorStats { 655 return cursors.CursorStats{} 656} 657