1package coordinator 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "sort" 10 "strconv" 11 "strings" 12 "time" 13 14 "github.com/influxdata/influxdb" 15 "github.com/influxdata/influxdb/models" 16 "github.com/influxdata/influxdb/monitor" 17 "github.com/influxdata/influxdb/pkg/tracing" 18 "github.com/influxdata/influxdb/pkg/tracing/fields" 19 "github.com/influxdata/influxdb/query" 20 "github.com/influxdata/influxdb/services/meta" 21 "github.com/influxdata/influxdb/tsdb" 22 "github.com/influxdata/influxql" 23) 24 25// ErrDatabaseNameRequired is returned when executing statements that require a database, 26// when a database has not been provided. 27var ErrDatabaseNameRequired = errors.New("database name required") 28 29type pointsWriter interface { 30 WritePointsInto(*IntoWriteRequest) error 31} 32 33// StatementExecutor executes a statement in the query. 34type StatementExecutor struct { 35 MetaClient MetaClient 36 37 // TaskManager holds the StatementExecutor that handles task-related commands. 38 TaskManager query.StatementExecutor 39 40 // TSDB storage for local node. 41 TSDBStore TSDBStore 42 43 // ShardMapper for mapping shards when executing a SELECT statement. 44 ShardMapper query.ShardMapper 45 46 // Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS. 47 Monitor *monitor.Monitor 48 49 // Used for rewriting points back into system for SELECT INTO statements. 50 PointsWriter pointsWriter 51 52 // Select statement limits 53 MaxSelectPointN int 54 MaxSelectSeriesN int 55 MaxSelectBucketsN int 56} 57 58// ExecuteStatement executes the given statement with the given execution context. 59func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { 60 // Select statements are handled separately so that they can be streamed. 61 if stmt, ok := stmt.(*influxql.SelectStatement); ok { 62 return e.executeSelectStatement(stmt, ctx) 63 } 64 65 var rows models.Rows 66 var messages []*query.Message 67 var err error 68 switch stmt := stmt.(type) { 69 case *influxql.AlterRetentionPolicyStatement: 70 if ctx.ReadOnly { 71 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 72 } 73 err = e.executeAlterRetentionPolicyStatement(stmt) 74 case *influxql.CreateContinuousQueryStatement: 75 if ctx.ReadOnly { 76 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 77 } 78 err = e.executeCreateContinuousQueryStatement(stmt) 79 case *influxql.CreateDatabaseStatement: 80 if ctx.ReadOnly { 81 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 82 } 83 err = e.executeCreateDatabaseStatement(stmt) 84 case *influxql.CreateRetentionPolicyStatement: 85 if ctx.ReadOnly { 86 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 87 } 88 err = e.executeCreateRetentionPolicyStatement(stmt) 89 case *influxql.CreateSubscriptionStatement: 90 if ctx.ReadOnly { 91 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 92 } 93 err = e.executeCreateSubscriptionStatement(stmt) 94 case *influxql.CreateUserStatement: 95 if ctx.ReadOnly { 96 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 97 } 98 err = e.executeCreateUserStatement(stmt) 99 case *influxql.DeleteSeriesStatement: 100 err = e.executeDeleteSeriesStatement(stmt, ctx.Database) 101 case *influxql.DropContinuousQueryStatement: 102 if ctx.ReadOnly { 103 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 104 } 105 err = e.executeDropContinuousQueryStatement(stmt) 106 case *influxql.DropDatabaseStatement: 107 if ctx.ReadOnly { 108 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 109 } 110 err = e.executeDropDatabaseStatement(stmt) 111 case *influxql.DropMeasurementStatement: 112 if ctx.ReadOnly { 113 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 114 } 115 err = e.executeDropMeasurementStatement(stmt, ctx.Database) 116 case *influxql.DropSeriesStatement: 117 if ctx.ReadOnly { 118 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 119 } 120 err = e.executeDropSeriesStatement(stmt, ctx.Database) 121 case *influxql.DropRetentionPolicyStatement: 122 if ctx.ReadOnly { 123 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 124 } 125 err = e.executeDropRetentionPolicyStatement(stmt) 126 case *influxql.DropShardStatement: 127 if ctx.ReadOnly { 128 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 129 } 130 err = e.executeDropShardStatement(stmt) 131 case *influxql.DropSubscriptionStatement: 132 if ctx.ReadOnly { 133 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 134 } 135 err = e.executeDropSubscriptionStatement(stmt) 136 case *influxql.DropUserStatement: 137 if ctx.ReadOnly { 138 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 139 } 140 err = e.executeDropUserStatement(stmt) 141 case *influxql.ExplainStatement: 142 if stmt.Analyze { 143 rows, err = e.executeExplainAnalyzeStatement(stmt, ctx) 144 } else { 145 rows, err = e.executeExplainStatement(stmt, ctx) 146 } 147 case *influxql.GrantStatement: 148 if ctx.ReadOnly { 149 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 150 } 151 err = e.executeGrantStatement(stmt) 152 case *influxql.GrantAdminStatement: 153 if ctx.ReadOnly { 154 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 155 } 156 err = e.executeGrantAdminStatement(stmt) 157 case *influxql.RevokeStatement: 158 if ctx.ReadOnly { 159 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 160 } 161 err = e.executeRevokeStatement(stmt) 162 case *influxql.RevokeAdminStatement: 163 if ctx.ReadOnly { 164 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 165 } 166 err = e.executeRevokeAdminStatement(stmt) 167 case *influxql.ShowContinuousQueriesStatement: 168 rows, err = e.executeShowContinuousQueriesStatement(stmt) 169 case *influxql.ShowDatabasesStatement: 170 rows, err = e.executeShowDatabasesStatement(stmt, ctx) 171 case *influxql.ShowDiagnosticsStatement: 172 rows, err = e.executeShowDiagnosticsStatement(stmt) 173 case *influxql.ShowGrantsForUserStatement: 174 rows, err = e.executeShowGrantsForUserStatement(stmt) 175 case *influxql.ShowMeasurementsStatement: 176 return e.executeShowMeasurementsStatement(stmt, ctx) 177 case *influxql.ShowMeasurementCardinalityStatement: 178 rows, err = e.executeShowMeasurementCardinalityStatement(stmt) 179 case *influxql.ShowRetentionPoliciesStatement: 180 rows, err = e.executeShowRetentionPoliciesStatement(stmt) 181 case *influxql.ShowSeriesCardinalityStatement: 182 rows, err = e.executeShowSeriesCardinalityStatement(stmt) 183 case *influxql.ShowShardsStatement: 184 rows, err = e.executeShowShardsStatement(stmt) 185 case *influxql.ShowShardGroupsStatement: 186 rows, err = e.executeShowShardGroupsStatement(stmt) 187 case *influxql.ShowStatsStatement: 188 rows, err = e.executeShowStatsStatement(stmt) 189 case *influxql.ShowSubscriptionsStatement: 190 rows, err = e.executeShowSubscriptionsStatement(stmt) 191 case *influxql.ShowTagKeysStatement: 192 return e.executeShowTagKeys(stmt, ctx) 193 case *influxql.ShowTagValuesStatement: 194 return e.executeShowTagValues(stmt, ctx) 195 case *influxql.ShowUsersStatement: 196 rows, err = e.executeShowUsersStatement(stmt) 197 case *influxql.SetPasswordUserStatement: 198 if ctx.ReadOnly { 199 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 200 } 201 err = e.executeSetPasswordUserStatement(stmt) 202 case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement: 203 // Send query related statements to the task manager. 204 return e.TaskManager.ExecuteStatement(stmt, ctx) 205 default: 206 return query.ErrInvalidQuery 207 } 208 209 if err != nil { 210 return err 211 } 212 213 return ctx.Send(&query.Result{ 214 Series: rows, 215 Messages: messages, 216 }) 217} 218 219func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error { 220 rpu := &meta.RetentionPolicyUpdate{ 221 Duration: stmt.Duration, 222 ReplicaN: stmt.Replication, 223 ShardGroupDuration: stmt.ShardGroupDuration, 224 } 225 226 // Update the retention policy. 227 return e.MetaClient.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu, stmt.Default) 228} 229 230func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) error { 231 // Verify that retention policies exist. 232 var err error 233 verifyRPFn := func(n influxql.Node) { 234 if err != nil { 235 return 236 } 237 switch m := n.(type) { 238 case *influxql.Measurement: 239 var rp *meta.RetentionPolicyInfo 240 if rp, err = e.MetaClient.RetentionPolicy(m.Database, m.RetentionPolicy); err != nil { 241 return 242 } else if rp == nil { 243 err = fmt.Errorf("%s: %s.%s", meta.ErrRetentionPolicyNotFound, m.Database, m.RetentionPolicy) 244 } 245 default: 246 return 247 } 248 } 249 250 influxql.WalkFunc(q, verifyRPFn) 251 252 if err != nil { 253 return err 254 } 255 256 return e.MetaClient.CreateContinuousQuery(q.Database, q.Name, q.String()) 257} 258 259func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error { 260 if !meta.ValidName(stmt.Name) { 261 // TODO This should probably be in `(*meta.Data).CreateDatabase` 262 // but can't go there until 1.1 is used everywhere 263 return meta.ErrInvalidName 264 } 265 266 if !stmt.RetentionPolicyCreate { 267 _, err := e.MetaClient.CreateDatabase(stmt.Name) 268 return err 269 } 270 271 // If we're doing, for example, CREATE DATABASE "db" WITH DURATION 1d then 272 // the name will not yet be set. We only need to validate non-empty 273 // retention policy names, such as in the statement: 274 // CREATE DATABASE "db" WITH DURATION 1d NAME "xyz" 275 if stmt.RetentionPolicyName != "" && !meta.ValidName(stmt.RetentionPolicyName) { 276 return meta.ErrInvalidName 277 } 278 279 spec := meta.RetentionPolicySpec{ 280 Name: stmt.RetentionPolicyName, 281 Duration: stmt.RetentionPolicyDuration, 282 ReplicaN: stmt.RetentionPolicyReplication, 283 ShardGroupDuration: stmt.RetentionPolicyShardGroupDuration, 284 } 285 _, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, &spec) 286 return err 287} 288 289func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error { 290 if !meta.ValidName(stmt.Name) { 291 // TODO This should probably be in `(*meta.Data).CreateRetentionPolicy` 292 // but can't go there until 1.1 is used everywhere 293 return meta.ErrInvalidName 294 } 295 296 spec := meta.RetentionPolicySpec{ 297 Name: stmt.Name, 298 Duration: &stmt.Duration, 299 ReplicaN: &stmt.Replication, 300 ShardGroupDuration: stmt.ShardGroupDuration, 301 } 302 303 // Create new retention policy. 304 _, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default) 305 return err 306} 307 308func (e *StatementExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) error { 309 return e.MetaClient.CreateSubscription(q.Database, q.RetentionPolicy, q.Name, q.Mode, q.Destinations) 310} 311 312func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) error { 313 _, err := e.MetaClient.CreateUser(q.Name, q.Password, q.Admin) 314 return err 315} 316 317func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSeriesStatement, database string) error { 318 if dbi := e.MetaClient.Database(database); dbi == nil { 319 return query.ErrDatabaseNotFound(database) 320 } 321 322 // Convert "now()" to current time. 323 stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()}) 324 325 // Locally delete the series. 326 return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition) 327} 328 329func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error { 330 return e.MetaClient.DropContinuousQuery(q.Database, q.Name) 331} 332 333// executeDropDatabaseStatement drops a database from the cluster. 334// It does not return an error if the database was not found on any of 335// the nodes, or in the Meta store. 336func (e *StatementExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error { 337 if e.MetaClient.Database(stmt.Name) == nil { 338 return nil 339 } 340 341 // Locally delete the datababse. 342 if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil { 343 return err 344 } 345 346 // Remove the database from the Meta Store. 347 return e.MetaClient.DropDatabase(stmt.Name) 348} 349 350func (e *StatementExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error { 351 if dbi := e.MetaClient.Database(database); dbi == nil { 352 return query.ErrDatabaseNotFound(database) 353 } 354 355 // Locally drop the measurement 356 return e.TSDBStore.DeleteMeasurement(database, stmt.Name) 357} 358 359func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) error { 360 if dbi := e.MetaClient.Database(database); dbi == nil { 361 return query.ErrDatabaseNotFound(database) 362 } 363 364 // Check for time in WHERE clause (not supported). 365 if influxql.HasTimeExpr(stmt.Condition) { 366 return errors.New("DROP SERIES doesn't support time in WHERE clause") 367 } 368 369 // Locally drop the series. 370 return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition) 371} 372 373func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error { 374 // Locally delete the shard. 375 if err := e.TSDBStore.DeleteShard(stmt.ID); err != nil { 376 return err 377 } 378 379 // Remove the shard reference from the Meta Store. 380 return e.MetaClient.DropShard(stmt.ID) 381} 382 383func (e *StatementExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropRetentionPolicyStatement) error { 384 dbi := e.MetaClient.Database(stmt.Database) 385 if dbi == nil { 386 return nil 387 } 388 389 if dbi.RetentionPolicy(stmt.Name) == nil { 390 return nil 391 } 392 393 // Locally drop the retention policy. 394 if err := e.TSDBStore.DeleteRetentionPolicy(stmt.Database, stmt.Name); err != nil { 395 return err 396 } 397 398 return e.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name) 399} 400 401func (e *StatementExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) error { 402 return e.MetaClient.DropSubscription(q.Database, q.RetentionPolicy, q.Name) 403} 404 405func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) error { 406 return e.MetaClient.DropUser(q.Name) 407} 408 409func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { 410 opt := query.SelectOptions{ 411 NodeID: ctx.ExecutionOptions.NodeID, 412 MaxSeriesN: e.MaxSelectSeriesN, 413 MaxBucketsN: e.MaxSelectBucketsN, 414 Authorizer: ctx.Authorizer, 415 } 416 417 // Prepare the query for execution, but do not actually execute it. 418 // This should perform any needed substitutions. 419 p, err := query.Prepare(q.Statement, e.ShardMapper, opt) 420 if err != nil { 421 return nil, err 422 } 423 defer p.Close() 424 425 plan, err := p.Explain() 426 if err != nil { 427 return nil, err 428 } 429 plan = strings.TrimSpace(plan) 430 431 row := &models.Row{ 432 Columns: []string{"QUERY PLAN"}, 433 } 434 for _, s := range strings.Split(plan, "\n") { 435 row.Values = append(row.Values, []interface{}{s}) 436 } 437 return models.Rows{row}, nil 438} 439 440func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) { 441 stmt := q.Statement 442 t, span := tracing.NewTrace("select") 443 ctx := tracing.NewContextWithTrace(ectx, t) 444 ctx = tracing.NewContextWithSpan(ctx, span) 445 var aux query.Iterators 446 ctx = query.NewContextWithIterators(ctx, &aux) 447 start := time.Now() 448 449 cur, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions) 450 if err != nil { 451 return nil, err 452 } 453 454 iterTime := time.Since(start) 455 456 // Generate a row emitter from the iterator set. 457 em := query.NewEmitter(cur, ectx.ChunkSize) 458 459 // Emit rows to the results channel. 460 var writeN int64 461 for { 462 var row *models.Row 463 row, _, err = em.Emit() 464 if err != nil { 465 goto CLEANUP 466 } else if row == nil { 467 // Check if the query was interrupted while emitting. 468 select { 469 case <-ectx.Done(): 470 err = ectx.Err() 471 goto CLEANUP 472 default: 473 } 474 break 475 } 476 477 writeN += int64(len(row.Values)) 478 } 479 480CLEANUP: 481 em.Close() 482 if err != nil { 483 return nil, err 484 } 485 486 // close auxiliary iterators deterministically to finalize any captured measurements 487 aux.Close() 488 489 totalTime := time.Since(start) 490 span.MergeFields( 491 fields.Duration("total_time", totalTime), 492 fields.Duration("planning_time", iterTime), 493 fields.Duration("execution_time", totalTime-iterTime), 494 ) 495 span.Finish() 496 497 row := &models.Row{ 498 Columns: []string{"EXPLAIN ANALYZE"}, 499 } 500 for _, s := range strings.Split(t.Tree().String(), "\n") { 501 row.Values = append(row.Values, []interface{}{s}) 502 } 503 504 return models.Rows{row}, nil 505} 506 507func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error { 508 return e.MetaClient.SetPrivilege(stmt.User, stmt.On, stmt.Privilege) 509} 510 511func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) error { 512 return e.MetaClient.SetAdminPrivilege(stmt.User, true) 513} 514 515func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error { 516 priv := influxql.NoPrivileges 517 518 // Revoking all privileges means there's no need to look at existing user privileges. 519 if stmt.Privilege != influxql.AllPrivileges { 520 p, err := e.MetaClient.UserPrivilege(stmt.User, stmt.On) 521 if err != nil { 522 return err 523 } 524 // Bit clear (AND NOT) the user's privilege with the revoked privilege. 525 priv = *p &^ stmt.Privilege 526 } 527 528 return e.MetaClient.SetPrivilege(stmt.User, stmt.On, priv) 529} 530 531func (e *StatementExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) error { 532 return e.MetaClient.SetAdminPrivilege(stmt.User, false) 533} 534 535func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) error { 536 return e.MetaClient.UpdateUser(q.Name, q.Password) 537} 538 539func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error { 540 cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions) 541 if err != nil { 542 return err 543 } 544 545 // Generate a row emitter from the iterator set. 546 em := query.NewEmitter(cur, ctx.ChunkSize) 547 defer em.Close() 548 549 // Emit rows to the results channel. 550 var writeN int64 551 var emitted bool 552 553 var pointsWriter *BufferedPointsWriter 554 if stmt.Target != nil { 555 pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000) 556 } 557 558 for { 559 row, partial, err := em.Emit() 560 if err != nil { 561 return err 562 } else if row == nil { 563 // Check if the query was interrupted while emitting. 564 select { 565 case <-ctx.Done(): 566 return ctx.Err() 567 default: 568 } 569 break 570 } 571 572 // Write points back into system for INTO statements. 573 if stmt.Target != nil { 574 if err := e.writeInto(pointsWriter, stmt, row); err != nil { 575 return err 576 } 577 writeN += int64(len(row.Values)) 578 continue 579 } 580 581 result := &query.Result{ 582 Series: []*models.Row{row}, 583 Partial: partial, 584 } 585 586 // Send results or exit if closing. 587 if err := ctx.Send(result); err != nil { 588 return err 589 } 590 591 emitted = true 592 } 593 594 // Flush remaining points and emit write count if an INTO statement. 595 if stmt.Target != nil { 596 if err := pointsWriter.Flush(); err != nil { 597 return err 598 } 599 600 var messages []*query.Message 601 if ctx.ReadOnly { 602 messages = append(messages, query.ReadOnlyWarning(stmt.String())) 603 } 604 605 return ctx.Send(&query.Result{ 606 Messages: messages, 607 Series: []*models.Row{{ 608 Name: "result", 609 Columns: []string{"time", "written"}, 610 Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}}, 611 }}, 612 }) 613 } 614 615 // Always emit at least one result. 616 if !emitted { 617 return ctx.Send(&query.Result{ 618 Series: make([]*models.Row, 0), 619 }) 620 } 621 622 return nil 623} 624 625func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) (query.Cursor, error) { 626 sopt := query.SelectOptions{ 627 NodeID: opt.NodeID, 628 MaxSeriesN: e.MaxSelectSeriesN, 629 MaxPointN: e.MaxSelectPointN, 630 MaxBucketsN: e.MaxSelectBucketsN, 631 Authorizer: opt.Authorizer, 632 } 633 634 // Create a set of iterators from a selection. 635 cur, err := query.Select(ctx, stmt, e.ShardMapper, sopt) 636 if err != nil { 637 return nil, err 638 } 639 return cur, nil 640} 641 642func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) { 643 dis := e.MetaClient.Databases() 644 645 rows := []*models.Row{} 646 for _, di := range dis { 647 row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name} 648 for _, cqi := range di.ContinuousQueries { 649 row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query}) 650 } 651 rows = append(rows, row) 652 } 653 return rows, nil 654} 655 656func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) { 657 dis := e.MetaClient.Databases() 658 a := ctx.ExecutionOptions.Authorizer 659 660 row := &models.Row{Name: "databases", Columns: []string{"name"}} 661 for _, di := range dis { 662 // Only include databases that the user is authorized to read or write. 663 if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) { 664 row.Values = append(row.Values, []interface{}{di.Name}) 665 } 666 } 667 return []*models.Row{row}, nil 668} 669 670func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) { 671 diags, err := e.Monitor.Diagnostics() 672 if err != nil { 673 return nil, err 674 } 675 676 // Get a sorted list of diagnostics keys. 677 sortedKeys := make([]string, 0, len(diags)) 678 for k := range diags { 679 sortedKeys = append(sortedKeys, k) 680 } 681 sort.Strings(sortedKeys) 682 683 rows := make([]*models.Row, 0, len(diags)) 684 for _, k := range sortedKeys { 685 if stmt.Module != "" && k != stmt.Module { 686 continue 687 } 688 689 row := &models.Row{Name: k} 690 691 row.Columns = diags[k].Columns 692 row.Values = diags[k].Rows 693 rows = append(rows, row) 694 } 695 return rows, nil 696} 697 698func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) { 699 priv, err := e.MetaClient.UserPrivileges(q.Name) 700 if err != nil { 701 return nil, err 702 } 703 704 row := &models.Row{Columns: []string{"database", "privilege"}} 705 for d, p := range priv { 706 row.Values = append(row.Values, []interface{}{d, p.String()}) 707 } 708 return []*models.Row{row}, nil 709} 710 711func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMeasurementsStatement, ctx *query.ExecutionContext) error { 712 if q.Database == "" { 713 return ErrDatabaseNameRequired 714 } 715 716 names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition) 717 if err != nil || len(names) == 0 { 718 return ctx.Send(&query.Result{ 719 Err: err, 720 }) 721 } 722 723 if q.Offset > 0 { 724 if q.Offset >= len(names) { 725 names = nil 726 } else { 727 names = names[q.Offset:] 728 } 729 } 730 731 if q.Limit > 0 { 732 if q.Limit < len(names) { 733 names = names[:q.Limit] 734 } 735 } 736 737 values := make([][]interface{}, len(names)) 738 for i, name := range names { 739 values[i] = []interface{}{string(name)} 740 } 741 742 if len(values) == 0 { 743 return ctx.Send(&query.Result{}) 744 } 745 746 return ctx.Send(&query.Result{ 747 Series: []*models.Row{{ 748 Name: "measurements", 749 Columns: []string{"name"}, 750 Values: values, 751 }}, 752 }) 753} 754 755func (e *StatementExecutor) executeShowMeasurementCardinalityStatement(stmt *influxql.ShowMeasurementCardinalityStatement) (models.Rows, error) { 756 if stmt.Database == "" { 757 return nil, ErrDatabaseNameRequired 758 } 759 760 n, err := e.TSDBStore.MeasurementsCardinality(stmt.Database) 761 if err != nil { 762 return nil, err 763 } 764 765 return []*models.Row{&models.Row{ 766 Columns: []string{"cardinality estimation"}, 767 Values: [][]interface{}{{n}}, 768 }}, nil 769} 770 771func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) { 772 if q.Database == "" { 773 return nil, ErrDatabaseNameRequired 774 } 775 776 di := e.MetaClient.Database(q.Database) 777 if di == nil { 778 return nil, influxdb.ErrDatabaseNotFound(q.Database) 779 } 780 781 row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}} 782 for _, rpi := range di.RetentionPolicies { 783 row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) 784 } 785 return []*models.Row{row}, nil 786} 787 788func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) { 789 dis := e.MetaClient.Databases() 790 791 rows := []*models.Row{} 792 for _, di := range dis { 793 row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name} 794 for _, rpi := range di.RetentionPolicies { 795 for _, sgi := range rpi.ShardGroups { 796 // Shards associated with deleted shard groups are effectively deleted. 797 // Don't list them. 798 if sgi.Deleted() { 799 continue 800 } 801 802 for _, si := range sgi.Shards { 803 ownerIDs := make([]uint64, len(si.Owners)) 804 for i, owner := range si.Owners { 805 ownerIDs[i] = owner.NodeID 806 } 807 808 row.Values = append(row.Values, []interface{}{ 809 si.ID, 810 di.Name, 811 rpi.Name, 812 sgi.ID, 813 sgi.StartTime.UTC().Format(time.RFC3339), 814 sgi.EndTime.UTC().Format(time.RFC3339), 815 sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339), 816 joinUint64(ownerIDs), 817 }) 818 } 819 } 820 } 821 rows = append(rows, row) 822 } 823 return rows, nil 824} 825 826func (e *StatementExecutor) executeShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityStatement) (models.Rows, error) { 827 if stmt.Database == "" { 828 return nil, ErrDatabaseNameRequired 829 } 830 831 n, err := e.TSDBStore.SeriesCardinality(stmt.Database) 832 if err != nil { 833 return nil, err 834 } 835 836 return []*models.Row{&models.Row{ 837 Columns: []string{"cardinality estimation"}, 838 Values: [][]interface{}{{n}}, 839 }}, nil 840} 841 842func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) { 843 dis := e.MetaClient.Databases() 844 845 row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"} 846 for _, di := range dis { 847 for _, rpi := range di.RetentionPolicies { 848 for _, sgi := range rpi.ShardGroups { 849 // Shards associated with deleted shard groups are effectively deleted. 850 // Don't list them. 851 if sgi.Deleted() { 852 continue 853 } 854 855 row.Values = append(row.Values, []interface{}{ 856 sgi.ID, 857 di.Name, 858 rpi.Name, 859 sgi.StartTime.UTC().Format(time.RFC3339), 860 sgi.EndTime.UTC().Format(time.RFC3339), 861 sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339), 862 }) 863 } 864 } 865 } 866 867 return []*models.Row{row}, nil 868} 869 870func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) { 871 var rows []*models.Row 872 873 if _, ok := e.TSDBStore.(*tsdb.Store); stmt.Module == "indexes" && ok { 874 // The cost of collecting indexes metrics grows with the size of the indexes, so only collect this 875 // stat when explicitly requested. 876 b := e.TSDBStore.(*tsdb.Store).IndexBytes() 877 row := &models.Row{ 878 Name: "indexes", 879 Columns: []string{"memoryBytes"}, 880 Values: [][]interface{}{{b}}, 881 } 882 rows = append(rows, row) 883 884 } else { 885 stats, err := e.Monitor.Statistics(nil) 886 if err != nil { 887 return nil, err 888 } 889 890 for _, stat := range stats { 891 if stmt.Module != "" && stat.Name != stmt.Module { 892 continue 893 } 894 row := &models.Row{Name: stat.Name, Tags: stat.Tags} 895 896 values := make([]interface{}, 0, len(stat.Values)) 897 for _, k := range stat.ValueNames() { 898 row.Columns = append(row.Columns, k) 899 values = append(values, stat.Values[k]) 900 } 901 row.Values = [][]interface{}{values} 902 rows = append(rows, row) 903 } 904 } 905 return rows, nil 906} 907 908func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) { 909 dis := e.MetaClient.Databases() 910 911 rows := []*models.Row{} 912 for _, di := range dis { 913 row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name} 914 for _, rpi := range di.RetentionPolicies { 915 for _, si := range rpi.Subscriptions { 916 row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations}) 917 } 918 } 919 if len(row.Values) > 0 { 920 rows = append(rows, row) 921 } 922 } 923 return rows, nil 924} 925 926func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, ctx *query.ExecutionContext) error { 927 if q.Database == "" { 928 return ErrDatabaseNameRequired 929 } 930 931 // Determine shard set based on database and time range. 932 // SHOW TAG KEYS returns all tag keys for the default retention policy. 933 di := e.MetaClient.Database(q.Database) 934 if di == nil { 935 return fmt.Errorf("database not found: %s", q.Database) 936 } 937 938 // Determine appropriate time range. If one or fewer time boundaries provided 939 // then min/max possible time should be used instead. 940 valuer := &influxql.NowValuer{Now: time.Now()} 941 cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer) 942 if err != nil { 943 return err 944 } 945 946 // Get all shards for all retention policies. 947 var allGroups []meta.ShardGroupInfo 948 for _, rpi := range di.RetentionPolicies { 949 sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rpi.Name, timeRange.MinTime(), timeRange.MaxTime()) 950 if err != nil { 951 return err 952 } 953 allGroups = append(allGroups, sgis...) 954 } 955 956 var shardIDs []uint64 957 for _, sgi := range allGroups { 958 for _, si := range sgi.Shards { 959 shardIDs = append(shardIDs, si.ID) 960 } 961 } 962 963 tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond) 964 if err != nil { 965 return ctx.Send(&query.Result{ 966 Err: err, 967 }) 968 } 969 970 emitted := false 971 for _, m := range tagKeys { 972 keys := m.Keys 973 974 if q.Offset > 0 { 975 if q.Offset >= len(keys) { 976 keys = nil 977 } else { 978 keys = keys[q.Offset:] 979 } 980 } 981 if q.Limit > 0 && q.Limit < len(keys) { 982 keys = keys[:q.Limit] 983 } 984 985 if len(keys) == 0 { 986 continue 987 } 988 989 row := &models.Row{ 990 Name: m.Measurement, 991 Columns: []string{"tagKey"}, 992 Values: make([][]interface{}, len(keys)), 993 } 994 for i, key := range keys { 995 row.Values[i] = []interface{}{key} 996 } 997 998 if err := ctx.Send(&query.Result{ 999 Series: []*models.Row{row}, 1000 }); err != nil { 1001 return err 1002 } 1003 emitted = true 1004 } 1005 1006 // Ensure at least one result is emitted. 1007 if !emitted { 1008 return ctx.Send(&query.Result{}) 1009 } 1010 return nil 1011} 1012 1013func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatement, ctx *query.ExecutionContext) error { 1014 if q.Database == "" { 1015 return ErrDatabaseNameRequired 1016 } 1017 1018 // Determine shard set based on database and time range. 1019 // SHOW TAG VALUES returns all tag values for the default retention policy. 1020 di := e.MetaClient.Database(q.Database) 1021 if di == nil { 1022 return fmt.Errorf("database not found: %s", q.Database) 1023 } 1024 1025 // Determine appropriate time range. If one or fewer time boundaries provided 1026 // then min/max possible time should be used instead. 1027 valuer := &influxql.NowValuer{Now: time.Now()} 1028 cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer) 1029 if err != nil { 1030 return err 1031 } 1032 1033 // Get all shards for all retention policies. 1034 var allGroups []meta.ShardGroupInfo 1035 for _, rpi := range di.RetentionPolicies { 1036 sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rpi.Name, timeRange.MinTime(), timeRange.MaxTime()) 1037 if err != nil { 1038 return err 1039 } 1040 allGroups = append(allGroups, sgis...) 1041 } 1042 1043 var shardIDs []uint64 1044 for _, sgi := range allGroups { 1045 for _, si := range sgi.Shards { 1046 shardIDs = append(shardIDs, si.ID) 1047 } 1048 } 1049 1050 tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond) 1051 if err != nil { 1052 return ctx.Send(&query.Result{Err: err}) 1053 } 1054 1055 emitted := false 1056 for _, m := range tagValues { 1057 values := m.Values 1058 1059 if q.Offset > 0 { 1060 if q.Offset >= len(values) { 1061 values = nil 1062 } else { 1063 values = values[q.Offset:] 1064 } 1065 } 1066 1067 if q.Limit > 0 { 1068 if q.Limit < len(values) { 1069 values = values[:q.Limit] 1070 } 1071 } 1072 1073 if len(values) == 0 { 1074 continue 1075 } 1076 1077 row := &models.Row{ 1078 Name: m.Measurement, 1079 Columns: []string{"key", "value"}, 1080 Values: make([][]interface{}, len(values)), 1081 } 1082 for i, v := range values { 1083 row.Values[i] = []interface{}{v.Key, v.Value} 1084 } 1085 1086 if err := ctx.Send(&query.Result{ 1087 Series: []*models.Row{row}, 1088 }); err != nil { 1089 return err 1090 } 1091 emitted = true 1092 } 1093 1094 // Ensure at least one result is emitted. 1095 if !emitted { 1096 return ctx.Send(&query.Result{}) 1097 } 1098 return nil 1099} 1100 1101func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) { 1102 row := &models.Row{Columns: []string{"user", "admin"}} 1103 for _, ui := range e.MetaClient.Users() { 1104 row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin}) 1105 } 1106 return []*models.Row{row}, nil 1107} 1108 1109// BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries 1110// write their points to the destination in batches. 1111type BufferedPointsWriter struct { 1112 w pointsWriter 1113 buf []models.Point 1114 database string 1115 retentionPolicy string 1116} 1117 1118// NewBufferedPointsWriter returns a new BufferedPointsWriter. 1119func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter { 1120 return &BufferedPointsWriter{ 1121 w: w, 1122 buf: make([]models.Point, 0, capacity), 1123 database: database, 1124 retentionPolicy: retentionPolicy, 1125 } 1126} 1127 1128// WritePointsInto implements pointsWriter for BufferedPointsWriter. 1129func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error { 1130 // Make sure we're buffering points only for the expected destination. 1131 if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy { 1132 return fmt.Errorf("writer for %s.%s can't write into %s.%s", w.database, w.retentionPolicy, req.Database, req.RetentionPolicy) 1133 } 1134 1135 for i := 0; i < len(req.Points); { 1136 // Get the available space in the buffer. 1137 avail := cap(w.buf) - len(w.buf) 1138 1139 // Calculate number of points to copy into the buffer. 1140 n := len(req.Points[i:]) 1141 if n > avail { 1142 n = avail 1143 } 1144 1145 // Copy points into buffer. 1146 w.buf = append(w.buf, req.Points[i:n+i]...) 1147 1148 // Advance the index by number of points copied. 1149 i += n 1150 1151 // If buffer is full, flush points to underlying writer. 1152 if len(w.buf) == cap(w.buf) { 1153 if err := w.Flush(); err != nil { 1154 return err 1155 } 1156 } 1157 } 1158 1159 return nil 1160} 1161 1162// Flush writes all buffered points to the underlying writer. 1163func (w *BufferedPointsWriter) Flush() error { 1164 if len(w.buf) == 0 { 1165 return nil 1166 } 1167 1168 if err := w.w.WritePointsInto(&IntoWriteRequest{ 1169 Database: w.database, 1170 RetentionPolicy: w.retentionPolicy, 1171 Points: w.buf, 1172 }); err != nil { 1173 return err 1174 } 1175 1176 // Clear the buffer. 1177 w.buf = w.buf[:0] 1178 1179 return nil 1180} 1181 1182// Len returns the number of points buffered. 1183func (w *BufferedPointsWriter) Len() int { return len(w.buf) } 1184 1185// Cap returns the capacity (in points) of the buffer. 1186func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) } 1187 1188func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) error { 1189 if stmt.Target.Measurement.Database == "" { 1190 return errNoDatabaseInTarget 1191 } 1192 1193 // It might seem a bit weird that this is where we do this, since we will have to 1194 // convert rows back to points. The Executors (both aggregate and raw) are complex 1195 // enough that changing them to write back to the DB is going to be clumsy 1196 // 1197 // it might seem weird to have the write be in the Executor, but the interweaving of 1198 // limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the 1199 // results will be the same as when queried normally. 1200 name := stmt.Target.Measurement.Name 1201 if name == "" { 1202 name = row.Name 1203 } 1204 1205 points, err := convertRowToPoints(name, row) 1206 if err != nil { 1207 return err 1208 } 1209 1210 if err := w.WritePointsInto(&IntoWriteRequest{ 1211 Database: stmt.Target.Measurement.Database, 1212 RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, 1213 Points: points, 1214 }); err != nil { 1215 return err 1216 } 1217 1218 return nil 1219} 1220 1221var errNoDatabaseInTarget = errors.New("no database in target") 1222 1223// convertRowToPoints will convert a query result Row into Points that can be written back in. 1224func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) { 1225 // figure out which parts of the result are the time and which are the fields 1226 timeIndex := -1 1227 fieldIndexes := make(map[string]int) 1228 for i, c := range row.Columns { 1229 if c == "time" { 1230 timeIndex = i 1231 } else { 1232 fieldIndexes[c] = i 1233 } 1234 } 1235 1236 if timeIndex == -1 { 1237 return nil, errors.New("error finding time index in result") 1238 } 1239 1240 points := make([]models.Point, 0, len(row.Values)) 1241 for _, v := range row.Values { 1242 vals := make(map[string]interface{}) 1243 for fieldName, fieldIndex := range fieldIndexes { 1244 val := v[fieldIndex] 1245 if val != nil { 1246 vals[fieldName] = v[fieldIndex] 1247 } 1248 } 1249 1250 p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time)) 1251 if err != nil { 1252 // Drop points that can't be stored 1253 continue 1254 } 1255 1256 points = append(points, p) 1257 } 1258 1259 return points, nil 1260} 1261 1262// NormalizeStatement adds a default database and policy to the measurements in statement. 1263// Parameter defaultRetentionPolicy can be "". 1264func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string) (err error) { 1265 influxql.WalkFunc(stmt, func(node influxql.Node) { 1266 if err != nil { 1267 return 1268 } 1269 switch node := node.(type) { 1270 case *influxql.ShowRetentionPoliciesStatement: 1271 if node.Database == "" { 1272 node.Database = defaultDatabase 1273 } 1274 case *influxql.ShowMeasurementsStatement: 1275 if node.Database == "" { 1276 node.Database = defaultDatabase 1277 } 1278 case *influxql.ShowTagKeysStatement: 1279 if node.Database == "" { 1280 node.Database = defaultDatabase 1281 } 1282 case *influxql.ShowTagValuesStatement: 1283 if node.Database == "" { 1284 node.Database = defaultDatabase 1285 } 1286 case *influxql.ShowMeasurementCardinalityStatement: 1287 if node.Database == "" { 1288 node.Database = defaultDatabase 1289 } 1290 case *influxql.ShowSeriesCardinalityStatement: 1291 if node.Database == "" { 1292 node.Database = defaultDatabase 1293 } 1294 case *influxql.Measurement: 1295 switch stmt.(type) { 1296 case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement: 1297 // DB and RP not supported by these statements so don't rewrite into invalid 1298 // statements 1299 default: 1300 err = e.normalizeMeasurement(node, defaultDatabase, defaultRetentionPolicy) 1301 } 1302 } 1303 }) 1304 return 1305} 1306 1307func (e *StatementExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase, defaultRetentionPolicy string) error { 1308 // Targets (measurements in an INTO clause) can have blank names, which means it will be 1309 // the same as the measurement name it came from in the FROM clause. 1310 if !m.IsTarget && m.Name == "" && m.SystemIterator == "" && m.Regex == nil { 1311 return errors.New("invalid measurement") 1312 } 1313 1314 // Measurement does not have an explicit database? Insert default. 1315 if m.Database == "" { 1316 m.Database = defaultDatabase 1317 } 1318 1319 // The database must now be specified by this point. 1320 if m.Database == "" { 1321 return ErrDatabaseNameRequired 1322 } 1323 1324 // Find database. 1325 di := e.MetaClient.Database(m.Database) 1326 if di == nil { 1327 return influxdb.ErrDatabaseNotFound(m.Database) 1328 } 1329 1330 // If no retention policy was specified, use the default. 1331 if m.RetentionPolicy == "" { 1332 if defaultRetentionPolicy != "" { 1333 m.RetentionPolicy = defaultRetentionPolicy 1334 } else if di.DefaultRetentionPolicy != "" { 1335 m.RetentionPolicy = di.DefaultRetentionPolicy 1336 } else { 1337 return fmt.Errorf("default retention policy not set for: %s", di.Name) 1338 } 1339 } 1340 return nil 1341} 1342 1343// IntoWriteRequest is a partial copy of cluster.WriteRequest 1344type IntoWriteRequest struct { 1345 Database string 1346 RetentionPolicy string 1347 Points []models.Point 1348} 1349 1350// TSDBStore is an interface for accessing the time series data store. 1351type TSDBStore interface { 1352 CreateShard(database, policy string, shardID uint64, enabled bool) error 1353 WriteToShard(shardID uint64, points []models.Point) error 1354 1355 RestoreShard(id uint64, r io.Reader) error 1356 BackupShard(id uint64, since time.Time, w io.Writer) error 1357 1358 DeleteDatabase(name string) error 1359 DeleteMeasurement(database, name string) error 1360 DeleteRetentionPolicy(database, name string) error 1361 DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error 1362 DeleteShard(id uint64) error 1363 1364 MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) 1365 TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) 1366 TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) 1367 1368 SeriesCardinality(database string) (int64, error) 1369 MeasurementsCardinality(database string) (int64, error) 1370} 1371 1372var _ TSDBStore = LocalTSDBStore{} 1373 1374// LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator 1375// to satisfy the TSDBStore interface. 1376type LocalTSDBStore struct { 1377 *tsdb.Store 1378} 1379 1380// ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard. 1381type ShardIteratorCreator interface { 1382 ShardIteratorCreator(id uint64) query.IteratorCreator 1383} 1384 1385// joinUint64 returns a comma-delimited string of uint64 numbers. 1386func joinUint64(a []uint64) string { 1387 var buf bytes.Buffer 1388 for i, x := range a { 1389 buf.WriteString(strconv.FormatUint(x, 10)) 1390 if i < len(a)-1 { 1391 buf.WriteRune(',') 1392 } 1393 } 1394 return buf.String() 1395} 1396