1package scanner 2 3import ( 4 "context" 5 "encoding/json" 6 "flag" 7 "io" 8 "io/ioutil" 9 "os" 10 "path" 11 "path/filepath" 12 "regexp" 13 "sort" 14 "strconv" 15 "strings" 16 "time" 17 18 "github.com/go-kit/log" 19 "github.com/go-kit/log/level" 20 "github.com/grafana/dskit/backoff" 21 "github.com/grafana/dskit/flagext" 22 "github.com/grafana/dskit/services" 23 "github.com/pkg/errors" 24 "github.com/prometheus/client_golang/prometheus" 25 "github.com/prometheus/client_golang/prometheus/promauto" 26 "github.com/thanos-io/thanos/pkg/objstore" 27 "golang.org/x/sync/errgroup" 28 29 "github.com/cortexproject/cortex/pkg/chunk" 30 "github.com/cortexproject/cortex/pkg/chunk/aws" 31 "github.com/cortexproject/cortex/pkg/chunk/storage" 32 "github.com/cortexproject/cortex/tools/blocksconvert" 33) 34 35type Config struct { 36 TableNames string 37 TablesLimit int 38 39 PeriodStart flagext.DayValue 40 PeriodEnd flagext.DayValue 41 42 OutputDirectory string 43 Concurrency int 44 45 VerifyPlans bool 46 UploadFiles bool 47 KeepFiles bool 48 49 AllowedUsers string 50 IgnoredUserPattern string 51} 52 53func (cfg *Config) RegisterFlags(f *flag.FlagSet) { 54 f.StringVar(&cfg.TableNames, "scanner.tables", "", "Comma-separated tables to generate plan files from. If not used, all tables found via schema and scanning of Index store will be used.") 55 f.StringVar(&cfg.OutputDirectory, "scanner.output-dir", "", "Local directory used for storing temporary plan files (will be created if missing).") 56 f.IntVar(&cfg.Concurrency, "scanner.concurrency", 16, "Number of concurrent index processors, and plan uploads.") 57 f.BoolVar(&cfg.UploadFiles, "scanner.upload", true, "Upload plan files.") 58 f.BoolVar(&cfg.KeepFiles, "scanner.keep-files", false, "Keep plan files locally after uploading.") 59 f.IntVar(&cfg.TablesLimit, "scanner.tables-limit", 0, "Number of tables to convert. 0 = all.") 60 f.StringVar(&cfg.AllowedUsers, "scanner.allowed-users", "", "Allowed users that can be converted, comma-separated. If set, only these users have plan files generated.") 61 f.StringVar(&cfg.IgnoredUserPattern, "scanner.ignore-users-regex", "", "If set and user ID matches this regex pattern, it will be ignored. Checked after applying -scanner.allowed-users, if set.") 62 f.BoolVar(&cfg.VerifyPlans, "scanner.verify-plans", true, "Verify plans before uploading to bucket. Enabled by default for extra check. Requires extra memory for large plans.") 63 f.Var(&cfg.PeriodStart, "scanner.scan-period-start", "If specified, this is lower end of time period to scan. Specified date is included in the range. (format: \"2006-01-02\")") 64 f.Var(&cfg.PeriodEnd, "scanner.scan-period-end", "If specified, this is upper end of time period to scan. Specified date is not included in the range. (format: \"2006-01-02\")") 65} 66 67type Scanner struct { 68 services.Service 69 70 cfg Config 71 storageCfg storage.Config 72 73 bucketPrefix string 74 bucket objstore.Bucket 75 76 logger log.Logger 77 reg prometheus.Registerer 78 79 series prometheus.Counter 80 openFiles prometheus.Gauge 81 indexEntries *prometheus.CounterVec 82 indexReaderRowsRead prometheus.Counter 83 indexReaderParsedIndexEntries prometheus.Counter 84 ignoredEntries prometheus.Counter 85 foundTables prometheus.Counter 86 processedTables prometheus.Counter 87 currentTableRanges prometheus.Gauge 88 currentTableScannedRanges prometheus.Gauge 89 90 schema chunk.SchemaConfig 91 ignoredUsers *regexp.Regexp 92 allowedUsers blocksconvert.AllowedUsers 93} 94 95func NewScanner(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg prometheus.Registerer) (*Scanner, error) { 96 err := scfg.SchemaConfig.Load() 97 if err != nil { 98 return nil, errors.Wrap(err, "no table name provided, and schema failed to load") 99 } 100 101 if cfg.OutputDirectory == "" { 102 return nil, errors.Errorf("no output directory") 103 } 104 105 var bucketClient objstore.Bucket 106 if cfg.UploadFiles { 107 var err error 108 bucketClient, err = scfg.GetBucket(l, reg) 109 if err != nil { 110 return nil, err 111 } 112 } 113 114 var users = blocksconvert.AllowAllUsers 115 if cfg.AllowedUsers != "" { 116 users = blocksconvert.ParseAllowedUsers(cfg.AllowedUsers) 117 } 118 119 var ignoredUserRegex *regexp.Regexp = nil 120 if cfg.IgnoredUserPattern != "" { 121 re, err := regexp.Compile(cfg.IgnoredUserPattern) 122 if err != nil { 123 return nil, errors.Wrap(err, "failed to compile ignored user regex") 124 } 125 ignoredUserRegex = re 126 } 127 128 if err := os.MkdirAll(cfg.OutputDirectory, os.FileMode(0700)); err != nil { 129 return nil, errors.Wrapf(err, "failed to create new output directory %s", cfg.OutputDirectory) 130 } 131 132 s := &Scanner{ 133 cfg: cfg, 134 schema: scfg.SchemaConfig, 135 storageCfg: scfg.StorageConfig, 136 logger: l, 137 bucket: bucketClient, 138 bucketPrefix: scfg.BucketPrefix, 139 reg: reg, 140 allowedUsers: users, 141 ignoredUsers: ignoredUserRegex, 142 143 indexReaderRowsRead: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 144 Name: "cortex_blocksconvert_bigtable_read_rows_total", 145 Help: "Number of rows read from BigTable", 146 }), 147 indexReaderParsedIndexEntries: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 148 Name: "cortex_blocksconvert_bigtable_parsed_index_entries_total", 149 Help: "Number of parsed index entries", 150 }), 151 currentTableRanges: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ 152 Name: "cortex_blocksconvert_scanner_bigtable_ranges_in_current_table", 153 Help: "Number of ranges to scan from current table.", 154 }), 155 currentTableScannedRanges: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ 156 Name: "cortex_blocksconvert_scanner_bigtable_scanned_ranges_from_current_table", 157 Help: "Number of scanned ranges from current table. Resets to 0 every time a table is getting scanned or its scan has completed.", 158 }), 159 160 series: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 161 Name: "cortex_blocksconvert_scanner_series_written_total", 162 Help: "Number of series written to the plan files", 163 }), 164 165 openFiles: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ 166 Name: "cortex_blocksconvert_scanner_open_files", 167 Help: "Number of series written to the plan files", 168 }), 169 170 indexEntries: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ 171 Name: "cortex_blocksconvert_scanner_scanned_index_entries_total", 172 Help: "Number of various index entries scanned", 173 }, []string{"type"}), 174 ignoredEntries: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 175 Name: "cortex_blocksconvert_scanner_ignored_index_entries_total", 176 Help: "Number of ignored index entries because of ignoring users.", 177 }), 178 179 foundTables: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 180 Name: "cortex_blocksconvert_scanner_found_tables_total", 181 Help: "Number of tables found for processing.", 182 }), 183 processedTables: promauto.With(reg).NewCounter(prometheus.CounterOpts{ 184 Name: "cortex_blocksconvert_scanner_processed_tables_total", 185 Help: "Number of processed tables so far.", 186 }), 187 } 188 189 s.Service = services.NewBasicService(nil, s.running, nil) 190 return s, nil 191} 192 193func (s *Scanner) running(ctx context.Context) error { 194 allTables := []tableToProcess{} 195 196 for ix, c := range s.schema.Configs { 197 if c.Schema != "v9" && c.Schema != "v10" && c.Schema != "v11" { 198 level.Warn(s.logger).Log("msg", "skipping unsupported schema version", "version", c.Schema, "schemaFrom", c.From.String()) 199 continue 200 } 201 202 if c.IndexTables.Period%(24*time.Hour) != 0 { 203 level.Warn(s.logger).Log("msg", "skipping invalid index table period", "period", c.IndexTables.Period, "schemaFrom", c.From.String()) 204 continue 205 } 206 207 var reader chunk.IndexReader 208 switch c.IndexType { 209 case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed": 210 bigTable := s.storageCfg.GCPStorageConfig 211 212 if bigTable.Project == "" || bigTable.Instance == "" { 213 level.Error(s.logger).Log("msg", "cannot scan BigTable, missing configuration", "schemaFrom", c.From.String()) 214 continue 215 } 216 217 reader = newBigtableIndexReader(bigTable.Project, bigTable.Instance, s.logger, s.indexReaderRowsRead, s.indexReaderParsedIndexEntries, s.currentTableRanges, s.currentTableScannedRanges) 218 case "aws-dynamo": 219 cfg := s.storageCfg.AWSStorageConfig 220 221 if cfg.DynamoDB.URL == nil { 222 level.Error(s.logger).Log("msg", "cannot scan DynamoDB, missing configuration", "schemaFrom", c.From.String()) 223 continue 224 } 225 226 var err error 227 reader, err = aws.NewDynamoDBIndexReader(cfg.DynamoDBConfig, s.schema, s.reg, s.logger, s.indexReaderRowsRead) 228 if err != nil { 229 level.Error(s.logger).Log("msg", "cannot scan DynamoDB", "err", err) 230 } 231 case "cassandra": 232 cass := s.storageCfg.CassandraStorageConfig 233 234 reader = newCassandraIndexReader(cass, s.schema, s.logger, s.indexReaderRowsRead, s.indexReaderParsedIndexEntries, s.currentTableRanges, s.currentTableScannedRanges) 235 default: 236 level.Warn(s.logger).Log("msg", "unsupported index type", "type", c.IndexType, "schemaFrom", c.From.String()) 237 continue 238 } 239 240 toTimestamp := time.Now().Add(24 * time.Hour).Truncate(24 * time.Hour).Unix() 241 if ix < len(s.schema.Configs)-1 { 242 toTimestamp = s.schema.Configs[ix+1].From.Unix() 243 } 244 245 level.Info(s.logger).Log("msg", "scanning for schema tables", "schemaFrom", c.From.String(), "prefix", c.IndexTables.Prefix, "period", c.IndexTables.Period) 246 tables, err := s.findTablesToProcess(ctx, reader, c.From.Unix(), toTimestamp, c.IndexTables) 247 if err != nil { 248 return errors.Wrapf(err, "finding tables for schema %s", c.From.String()) 249 } 250 251 level.Info(s.logger).Log("msg", "found tables", "count", len(tables)) 252 allTables = append(allTables, tables...) 253 } 254 255 level.Info(s.logger).Log("msg", "total found tables", "count", len(allTables)) 256 257 if s.cfg.TableNames != "" { 258 // Find tables from parameter. 259 tableNames := map[string]bool{} 260 for _, t := range strings.Split(s.cfg.TableNames, ",") { 261 tableNames[strings.TrimSpace(t)] = true 262 } 263 264 for ix := 0; ix < len(allTables); { 265 t := allTables[ix] 266 if !tableNames[t.table] { 267 // remove table. 268 allTables = append(allTables[:ix], allTables[ix+1:]...) 269 continue 270 } 271 ix++ 272 } 273 274 level.Error(s.logger).Log("msg", "applied tables filter", "selected", len(allTables)) 275 } 276 277 // Recent tables go first. 278 sort.Slice(allTables, func(i, j int) bool { 279 return allTables[i].start.After(allTables[j].start) 280 }) 281 282 for ix := 0; ix < len(allTables); { 283 t := allTables[ix] 284 if s.cfg.PeriodStart.IsSet() && !t.end.IsZero() && t.end.Unix() <= s.cfg.PeriodStart.Unix() { 285 level.Info(s.logger).Log("msg", "table ends before period-start, ignoring", "table", t.table, "table_start", t.start.String(), "table_end", t.end.String(), "period_start", s.cfg.PeriodStart.String()) 286 allTables = append(allTables[:ix], allTables[ix+1:]...) 287 continue 288 } 289 if s.cfg.PeriodEnd.IsSet() && t.start.Unix() >= s.cfg.PeriodEnd.Unix() { 290 level.Info(s.logger).Log("msg", "table starts after period-end, ignoring", "table", t.table, "table_start", t.start.String(), "table_end", t.end.String(), "period_end", s.cfg.PeriodEnd.String()) 291 allTables = append(allTables[:ix], allTables[ix+1:]...) 292 continue 293 } 294 ix++ 295 } 296 297 if s.cfg.TablesLimit > 0 && len(allTables) > s.cfg.TablesLimit { 298 level.Info(s.logger).Log("msg", "applied tables limit", "limit", s.cfg.TablesLimit) 299 allTables = allTables[:s.cfg.TablesLimit] 300 } 301 302 s.foundTables.Add(float64(len(allTables))) 303 304 for _, t := range allTables { 305 if err := s.processTable(ctx, t.table, t.reader); err != nil { 306 return errors.Wrapf(err, "failed to process table %s", t.table) 307 } 308 s.processedTables.Inc() 309 } 310 311 // All good, just wait until context is done, to avoid restarts. 312 level.Info(s.logger).Log("msg", "finished") 313 <-ctx.Done() 314 return nil 315} 316 317type tableToProcess struct { 318 table string 319 reader chunk.IndexReader 320 start time.Time 321 end time.Time // Will not be set for non-periodic tables. Exclusive. 322} 323 324func (s *Scanner) findTablesToProcess(ctx context.Context, indexReader chunk.IndexReader, fromUnixTimestamp, toUnixTimestamp int64, tablesConfig chunk.PeriodicTableConfig) ([]tableToProcess, error) { 325 tables, err := indexReader.IndexTableNames(ctx) 326 if err != nil { 327 return nil, err 328 } 329 330 var result []tableToProcess 331 332 for _, t := range tables { 333 if !strings.HasPrefix(t, tablesConfig.Prefix) { 334 continue 335 } 336 337 var tp tableToProcess 338 if tablesConfig.Period == 0 { 339 tp = tableToProcess{ 340 table: t, 341 reader: indexReader, 342 start: time.Unix(fromUnixTimestamp, 0), 343 } 344 } else { 345 p, err := strconv.ParseInt(t[len(tablesConfig.Prefix):], 10, 64) 346 if err != nil { 347 level.Warn(s.logger).Log("msg", "failed to parse period index of table", "table", t) 348 continue 349 } 350 351 start := time.Unix(p*int64(tablesConfig.Period/time.Second), 0) 352 tp = tableToProcess{ 353 table: t, 354 reader: indexReader, 355 start: start, 356 end: start.Add(tablesConfig.Period), 357 } 358 } 359 360 if fromUnixTimestamp <= tp.start.Unix() && tp.start.Unix() < toUnixTimestamp { 361 result = append(result, tp) 362 } 363 } 364 365 return result, nil 366} 367 368func (s *Scanner) processTable(ctx context.Context, table string, indexReader chunk.IndexReader) error { 369 tableLog := log.With(s.logger, "table", table) 370 371 tableProcessedFile := filepath.Join(s.cfg.OutputDirectory, table+".processed") 372 373 if shouldSkipOperationBecauseFileExists(tableProcessedFile) { 374 level.Info(tableLog).Log("msg", "skipping table because it was already scanned") 375 return nil 376 } 377 378 dir := filepath.Join(s.cfg.OutputDirectory, table) 379 level.Info(tableLog).Log("msg", "scanning table", "output", dir) 380 381 ignoredUsers, err := scanSingleTable(ctx, indexReader, table, dir, s.cfg.Concurrency, s.allowedUsers, s.ignoredUsers, s.openFiles, s.series, s.indexEntries, s.ignoredEntries) 382 if err != nil { 383 return errors.Wrapf(err, "failed to scan table %s and generate plan files", table) 384 } 385 386 tableLog.Log("msg", "ignored users", "count", len(ignoredUsers), "users", strings.Join(ignoredUsers, ",")) 387 388 if s.cfg.VerifyPlans { 389 err = verifyPlanFiles(ctx, dir, tableLog) 390 if err != nil { 391 return errors.Wrap(err, "failed to verify plans") 392 } 393 } 394 395 if s.bucket != nil { 396 level.Info(tableLog).Log("msg", "uploading generated plan files for table", "source", dir) 397 398 err := uploadPlansConcurrently(ctx, tableLog, dir, s.bucket, s.bucketPrefix, s.cfg.Concurrency) 399 if err != nil { 400 return errors.Wrapf(err, "failed to upload plan files for table %s to bucket", table) 401 } 402 403 level.Info(tableLog).Log("msg", "uploaded generated files for table") 404 if !s.cfg.KeepFiles { 405 if err := os.RemoveAll(dir); err != nil { 406 return errors.Wrapf(err, "failed to delete uploaded plan files for table %s", table) 407 } 408 } 409 } 410 411 err = ioutil.WriteFile(tableProcessedFile, []byte("Finished on "+time.Now().String()+"\n"), 0600) 412 if err != nil { 413 return errors.Wrapf(err, "failed to create file %s", tableProcessedFile) 414 } 415 416 level.Info(tableLog).Log("msg", "done processing table") 417 return nil 418} 419 420func uploadPlansConcurrently(ctx context.Context, log log.Logger, dir string, bucket objstore.Bucket, bucketPrefix string, concurrency int) error { 421 df, err := os.Stat(dir) 422 if err != nil { 423 return errors.Wrap(err, "stat dir") 424 } 425 if !df.IsDir() { 426 return errors.Errorf("%s is not a directory", dir) 427 } 428 429 // Path relative to dir, and only use Slash as separator. BucketPrefix is prepended to it when uploading. 430 paths := make(chan string) 431 432 g, ctx := errgroup.WithContext(ctx) 433 for i := 0; i < concurrency; i++ { 434 g.Go(func() error { 435 for p := range paths { 436 src := filepath.Join(dir, filepath.FromSlash(p)) 437 dst := path.Join(bucketPrefix, p) 438 439 boff := backoff.New(ctx, backoff.Config{ 440 MinBackoff: 1 * time.Second, 441 MaxBackoff: 5 * time.Second, 442 MaxRetries: 5, 443 }) 444 445 for boff.Ongoing() { 446 err := objstore.UploadFile(ctx, log, bucket, src, dst) 447 448 if err == nil { 449 break 450 } 451 452 level.Warn(log).Log("msg", "failed to upload block", "err", err) 453 boff.Wait() 454 } 455 456 if boff.Err() != nil { 457 return boff.Err() 458 } 459 } 460 return nil 461 }) 462 } 463 464 g.Go(func() error { 465 defer close(paths) 466 467 return filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error { 468 if err != nil { 469 return err 470 } 471 if ctx.Err() != nil { 472 return ctx.Err() 473 } 474 475 if fi.IsDir() { 476 return nil 477 } 478 479 relPath, err := filepath.Rel(dir, path) 480 if err != nil { 481 return err 482 } 483 484 relPath = filepath.ToSlash(relPath) 485 486 select { 487 case paths <- relPath: 488 return nil 489 case <-ctx.Done(): 490 return ctx.Err() 491 } 492 }) 493 }) 494 495 return g.Wait() 496} 497 498func shouldSkipOperationBecauseFileExists(file string) bool { 499 // If file exists, we should skip the operation. 500 _, err := os.Stat(file) 501 // Any error (including ErrNotExists) indicates operation should continue. 502 return err == nil 503} 504 505func scanSingleTable( 506 ctx context.Context, 507 indexReader chunk.IndexReader, 508 tableName string, 509 outDir string, 510 concurrency int, 511 allowed blocksconvert.AllowedUsers, 512 ignored *regexp.Regexp, 513 openFiles prometheus.Gauge, 514 series prometheus.Counter, 515 indexEntries *prometheus.CounterVec, 516 ignoredEntries prometheus.Counter, 517) ([]string, error) { 518 err := os.RemoveAll(outDir) 519 if err != nil { 520 return nil, errors.Wrapf(err, "failed to delete directory %s", outDir) 521 } 522 523 err = os.MkdirAll(outDir, os.FileMode(0700)) 524 if err != nil { 525 return nil, errors.Wrapf(err, "failed to prepare directory %s", outDir) 526 } 527 528 files := newOpenFiles(openFiles) 529 result := func(dir string, file string, entry blocksconvert.PlanEntry, header func() blocksconvert.PlanEntry) error { 530 return files.appendJSONEntryToFile(dir, file, entry, func() interface{} { 531 return header() 532 }) 533 } 534 535 var ps []chunk.IndexEntryProcessor 536 537 for i := 0; i < concurrency; i++ { 538 ps = append(ps, newProcessor(outDir, result, allowed, ignored, series, indexEntries, ignoredEntries)) 539 } 540 541 err = indexReader.ReadIndexEntries(ctx, tableName, ps) 542 if err != nil { 543 return nil, err 544 } 545 546 ignoredUsersMap := map[string]struct{}{} 547 for _, p := range ps { 548 for u := range p.(*processor).ignoredUsers { 549 ignoredUsersMap[u] = struct{}{} 550 } 551 } 552 553 var ignoredUsers []string 554 for u := range ignoredUsersMap { 555 ignoredUsers = append(ignoredUsers, u) 556 } 557 558 err = files.closeAllFiles(func() interface{} { 559 return blocksconvert.PlanEntry{Complete: true} 560 }) 561 return ignoredUsers, errors.Wrap(err, "closing files") 562} 563 564func verifyPlanFiles(ctx context.Context, dir string, logger log.Logger) error { 565 return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { 566 if err != nil { 567 return err 568 } 569 if err := ctx.Err(); err != nil { 570 return err 571 } 572 573 if info.IsDir() { 574 return nil 575 } 576 577 ok, _ := blocksconvert.IsPlanFilename(info.Name()) 578 if !ok { 579 return nil 580 } 581 582 r, err := os.Open(path) 583 if err != nil { 584 return errors.Wrapf(err, "failed to open %s", path) 585 } 586 defer func() { 587 _ = r.Close() 588 }() 589 590 pr, err := blocksconvert.PreparePlanFileReader(info.Name(), r) 591 if err != nil { 592 return errors.Wrapf(err, "failed to prepare plan file for reading: %s", path) 593 } 594 595 level.Info(logger).Log("msg", "verifying plan", "path", path) 596 return errors.Wrapf(verifyPlanFile(pr), "plan file: %s", path) 597 }) 598} 599 600func verifyPlanFile(r io.Reader) error { 601 dec := json.NewDecoder(r) 602 603 entry := blocksconvert.PlanEntry{} 604 if err := dec.Decode(&entry); err != nil { 605 return errors.Wrap(err, "failed to parse plan file header") 606 } 607 if entry.User == "" || entry.DayIndex == 0 { 608 return errors.New("failed to read plan file header: no user or day index found") 609 } 610 611 series := map[string]struct{}{} 612 613 var err error 614 footerFound := false 615 for err = dec.Decode(&entry); err == nil; err = dec.Decode(&entry) { 616 if entry.Complete { 617 footerFound = true 618 entry.Reset() 619 continue 620 } 621 622 if footerFound { 623 return errors.New("plan entries found after plan footer") 624 } 625 626 if entry.SeriesID == "" { 627 return errors.Errorf("plan contains entry without seriesID") 628 } 629 630 if len(entry.Chunks) == 0 { 631 return errors.Errorf("entry for seriesID %s has no chunks", entry.SeriesID) 632 } 633 634 if _, found := series[entry.SeriesID]; found { 635 return errors.Errorf("multiple entries for series %s found in plan", entry.SeriesID) 636 } 637 series[entry.SeriesID] = struct{}{} 638 639 entry.Reset() 640 } 641 642 if err == io.EOF { 643 if !footerFound { 644 return errors.New("no footer found in the plan") 645 } 646 err = nil 647 } 648 return err 649} 650