1package scanner
2
3import (
4	"context"
5	"encoding/json"
6	"flag"
7	"io"
8	"io/ioutil"
9	"os"
10	"path"
11	"path/filepath"
12	"regexp"
13	"sort"
14	"strconv"
15	"strings"
16	"time"
17
18	"github.com/go-kit/log"
19	"github.com/go-kit/log/level"
20	"github.com/grafana/dskit/backoff"
21	"github.com/grafana/dskit/flagext"
22	"github.com/grafana/dskit/services"
23	"github.com/pkg/errors"
24	"github.com/prometheus/client_golang/prometheus"
25	"github.com/prometheus/client_golang/prometheus/promauto"
26	"github.com/thanos-io/thanos/pkg/objstore"
27	"golang.org/x/sync/errgroup"
28
29	"github.com/cortexproject/cortex/pkg/chunk"
30	"github.com/cortexproject/cortex/pkg/chunk/aws"
31	"github.com/cortexproject/cortex/pkg/chunk/storage"
32	"github.com/cortexproject/cortex/tools/blocksconvert"
33)
34
35type Config struct {
36	TableNames  string
37	TablesLimit int
38
39	PeriodStart flagext.DayValue
40	PeriodEnd   flagext.DayValue
41
42	OutputDirectory string
43	Concurrency     int
44
45	VerifyPlans bool
46	UploadFiles bool
47	KeepFiles   bool
48
49	AllowedUsers       string
50	IgnoredUserPattern string
51}
52
53func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
54	f.StringVar(&cfg.TableNames, "scanner.tables", "", "Comma-separated tables to generate plan files from. If not used, all tables found via schema and scanning of Index store will be used.")
55	f.StringVar(&cfg.OutputDirectory, "scanner.output-dir", "", "Local directory used for storing temporary plan files (will be created if missing).")
56	f.IntVar(&cfg.Concurrency, "scanner.concurrency", 16, "Number of concurrent index processors, and plan uploads.")
57	f.BoolVar(&cfg.UploadFiles, "scanner.upload", true, "Upload plan files.")
58	f.BoolVar(&cfg.KeepFiles, "scanner.keep-files", false, "Keep plan files locally after uploading.")
59	f.IntVar(&cfg.TablesLimit, "scanner.tables-limit", 0, "Number of tables to convert. 0 = all.")
60	f.StringVar(&cfg.AllowedUsers, "scanner.allowed-users", "", "Allowed users that can be converted, comma-separated. If set, only these users have plan files generated.")
61	f.StringVar(&cfg.IgnoredUserPattern, "scanner.ignore-users-regex", "", "If set and user ID matches this regex pattern, it will be ignored. Checked after applying -scanner.allowed-users, if set.")
62	f.BoolVar(&cfg.VerifyPlans, "scanner.verify-plans", true, "Verify plans before uploading to bucket. Enabled by default for extra check. Requires extra memory for large plans.")
63	f.Var(&cfg.PeriodStart, "scanner.scan-period-start", "If specified, this is lower end of time period to scan. Specified date is included in the range. (format: \"2006-01-02\")")
64	f.Var(&cfg.PeriodEnd, "scanner.scan-period-end", "If specified, this is upper end of time period to scan. Specified date is not included in the range. (format: \"2006-01-02\")")
65}
66
67type Scanner struct {
68	services.Service
69
70	cfg        Config
71	storageCfg storage.Config
72
73	bucketPrefix string
74	bucket       objstore.Bucket
75
76	logger log.Logger
77	reg    prometheus.Registerer
78
79	series                        prometheus.Counter
80	openFiles                     prometheus.Gauge
81	indexEntries                  *prometheus.CounterVec
82	indexReaderRowsRead           prometheus.Counter
83	indexReaderParsedIndexEntries prometheus.Counter
84	ignoredEntries                prometheus.Counter
85	foundTables                   prometheus.Counter
86	processedTables               prometheus.Counter
87	currentTableRanges            prometheus.Gauge
88	currentTableScannedRanges     prometheus.Gauge
89
90	schema       chunk.SchemaConfig
91	ignoredUsers *regexp.Regexp
92	allowedUsers blocksconvert.AllowedUsers
93}
94
95func NewScanner(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg prometheus.Registerer) (*Scanner, error) {
96	err := scfg.SchemaConfig.Load()
97	if err != nil {
98		return nil, errors.Wrap(err, "no table name provided, and schema failed to load")
99	}
100
101	if cfg.OutputDirectory == "" {
102		return nil, errors.Errorf("no output directory")
103	}
104
105	var bucketClient objstore.Bucket
106	if cfg.UploadFiles {
107		var err error
108		bucketClient, err = scfg.GetBucket(l, reg)
109		if err != nil {
110			return nil, err
111		}
112	}
113
114	var users = blocksconvert.AllowAllUsers
115	if cfg.AllowedUsers != "" {
116		users = blocksconvert.ParseAllowedUsers(cfg.AllowedUsers)
117	}
118
119	var ignoredUserRegex *regexp.Regexp = nil
120	if cfg.IgnoredUserPattern != "" {
121		re, err := regexp.Compile(cfg.IgnoredUserPattern)
122		if err != nil {
123			return nil, errors.Wrap(err, "failed to compile ignored user regex")
124		}
125		ignoredUserRegex = re
126	}
127
128	if err := os.MkdirAll(cfg.OutputDirectory, os.FileMode(0700)); err != nil {
129		return nil, errors.Wrapf(err, "failed to create new output directory %s", cfg.OutputDirectory)
130	}
131
132	s := &Scanner{
133		cfg:          cfg,
134		schema:       scfg.SchemaConfig,
135		storageCfg:   scfg.StorageConfig,
136		logger:       l,
137		bucket:       bucketClient,
138		bucketPrefix: scfg.BucketPrefix,
139		reg:          reg,
140		allowedUsers: users,
141		ignoredUsers: ignoredUserRegex,
142
143		indexReaderRowsRead: promauto.With(reg).NewCounter(prometheus.CounterOpts{
144			Name: "cortex_blocksconvert_bigtable_read_rows_total",
145			Help: "Number of rows read from BigTable",
146		}),
147		indexReaderParsedIndexEntries: promauto.With(reg).NewCounter(prometheus.CounterOpts{
148			Name: "cortex_blocksconvert_bigtable_parsed_index_entries_total",
149			Help: "Number of parsed index entries",
150		}),
151		currentTableRanges: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
152			Name: "cortex_blocksconvert_scanner_bigtable_ranges_in_current_table",
153			Help: "Number of ranges to scan from current table.",
154		}),
155		currentTableScannedRanges: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
156			Name: "cortex_blocksconvert_scanner_bigtable_scanned_ranges_from_current_table",
157			Help: "Number of scanned ranges from current table. Resets to 0 every time a table is getting scanned or its scan has completed.",
158		}),
159
160		series: promauto.With(reg).NewCounter(prometheus.CounterOpts{
161			Name: "cortex_blocksconvert_scanner_series_written_total",
162			Help: "Number of series written to the plan files",
163		}),
164
165		openFiles: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
166			Name: "cortex_blocksconvert_scanner_open_files",
167			Help: "Number of series written to the plan files",
168		}),
169
170		indexEntries: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
171			Name: "cortex_blocksconvert_scanner_scanned_index_entries_total",
172			Help: "Number of various index entries scanned",
173		}, []string{"type"}),
174		ignoredEntries: promauto.With(reg).NewCounter(prometheus.CounterOpts{
175			Name: "cortex_blocksconvert_scanner_ignored_index_entries_total",
176			Help: "Number of ignored index entries because of ignoring users.",
177		}),
178
179		foundTables: promauto.With(reg).NewCounter(prometheus.CounterOpts{
180			Name: "cortex_blocksconvert_scanner_found_tables_total",
181			Help: "Number of tables found for processing.",
182		}),
183		processedTables: promauto.With(reg).NewCounter(prometheus.CounterOpts{
184			Name: "cortex_blocksconvert_scanner_processed_tables_total",
185			Help: "Number of processed tables so far.",
186		}),
187	}
188
189	s.Service = services.NewBasicService(nil, s.running, nil)
190	return s, nil
191}
192
193func (s *Scanner) running(ctx context.Context) error {
194	allTables := []tableToProcess{}
195
196	for ix, c := range s.schema.Configs {
197		if c.Schema != "v9" && c.Schema != "v10" && c.Schema != "v11" {
198			level.Warn(s.logger).Log("msg", "skipping unsupported schema version", "version", c.Schema, "schemaFrom", c.From.String())
199			continue
200		}
201
202		if c.IndexTables.Period%(24*time.Hour) != 0 {
203			level.Warn(s.logger).Log("msg", "skipping invalid index table period", "period", c.IndexTables.Period, "schemaFrom", c.From.String())
204			continue
205		}
206
207		var reader chunk.IndexReader
208		switch c.IndexType {
209		case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed":
210			bigTable := s.storageCfg.GCPStorageConfig
211
212			if bigTable.Project == "" || bigTable.Instance == "" {
213				level.Error(s.logger).Log("msg", "cannot scan BigTable, missing configuration", "schemaFrom", c.From.String())
214				continue
215			}
216
217			reader = newBigtableIndexReader(bigTable.Project, bigTable.Instance, s.logger, s.indexReaderRowsRead, s.indexReaderParsedIndexEntries, s.currentTableRanges, s.currentTableScannedRanges)
218		case "aws-dynamo":
219			cfg := s.storageCfg.AWSStorageConfig
220
221			if cfg.DynamoDB.URL == nil {
222				level.Error(s.logger).Log("msg", "cannot scan DynamoDB, missing configuration", "schemaFrom", c.From.String())
223				continue
224			}
225
226			var err error
227			reader, err = aws.NewDynamoDBIndexReader(cfg.DynamoDBConfig, s.schema, s.reg, s.logger, s.indexReaderRowsRead)
228			if err != nil {
229				level.Error(s.logger).Log("msg", "cannot scan DynamoDB", "err", err)
230			}
231		case "cassandra":
232			cass := s.storageCfg.CassandraStorageConfig
233
234			reader = newCassandraIndexReader(cass, s.schema, s.logger, s.indexReaderRowsRead, s.indexReaderParsedIndexEntries, s.currentTableRanges, s.currentTableScannedRanges)
235		default:
236			level.Warn(s.logger).Log("msg", "unsupported index type", "type", c.IndexType, "schemaFrom", c.From.String())
237			continue
238		}
239
240		toTimestamp := time.Now().Add(24 * time.Hour).Truncate(24 * time.Hour).Unix()
241		if ix < len(s.schema.Configs)-1 {
242			toTimestamp = s.schema.Configs[ix+1].From.Unix()
243		}
244
245		level.Info(s.logger).Log("msg", "scanning for schema tables", "schemaFrom", c.From.String(), "prefix", c.IndexTables.Prefix, "period", c.IndexTables.Period)
246		tables, err := s.findTablesToProcess(ctx, reader, c.From.Unix(), toTimestamp, c.IndexTables)
247		if err != nil {
248			return errors.Wrapf(err, "finding tables for schema %s", c.From.String())
249		}
250
251		level.Info(s.logger).Log("msg", "found tables", "count", len(tables))
252		allTables = append(allTables, tables...)
253	}
254
255	level.Info(s.logger).Log("msg", "total found tables", "count", len(allTables))
256
257	if s.cfg.TableNames != "" {
258		// Find tables from parameter.
259		tableNames := map[string]bool{}
260		for _, t := range strings.Split(s.cfg.TableNames, ",") {
261			tableNames[strings.TrimSpace(t)] = true
262		}
263
264		for ix := 0; ix < len(allTables); {
265			t := allTables[ix]
266			if !tableNames[t.table] {
267				// remove table.
268				allTables = append(allTables[:ix], allTables[ix+1:]...)
269				continue
270			}
271			ix++
272		}
273
274		level.Error(s.logger).Log("msg", "applied tables filter", "selected", len(allTables))
275	}
276
277	// Recent tables go first.
278	sort.Slice(allTables, func(i, j int) bool {
279		return allTables[i].start.After(allTables[j].start)
280	})
281
282	for ix := 0; ix < len(allTables); {
283		t := allTables[ix]
284		if s.cfg.PeriodStart.IsSet() && !t.end.IsZero() && t.end.Unix() <= s.cfg.PeriodStart.Unix() {
285			level.Info(s.logger).Log("msg", "table ends before period-start, ignoring", "table", t.table, "table_start", t.start.String(), "table_end", t.end.String(), "period_start", s.cfg.PeriodStart.String())
286			allTables = append(allTables[:ix], allTables[ix+1:]...)
287			continue
288		}
289		if s.cfg.PeriodEnd.IsSet() && t.start.Unix() >= s.cfg.PeriodEnd.Unix() {
290			level.Info(s.logger).Log("msg", "table starts after period-end, ignoring", "table", t.table, "table_start", t.start.String(), "table_end", t.end.String(), "period_end", s.cfg.PeriodEnd.String())
291			allTables = append(allTables[:ix], allTables[ix+1:]...)
292			continue
293		}
294		ix++
295	}
296
297	if s.cfg.TablesLimit > 0 && len(allTables) > s.cfg.TablesLimit {
298		level.Info(s.logger).Log("msg", "applied tables limit", "limit", s.cfg.TablesLimit)
299		allTables = allTables[:s.cfg.TablesLimit]
300	}
301
302	s.foundTables.Add(float64(len(allTables)))
303
304	for _, t := range allTables {
305		if err := s.processTable(ctx, t.table, t.reader); err != nil {
306			return errors.Wrapf(err, "failed to process table %s", t.table)
307		}
308		s.processedTables.Inc()
309	}
310
311	// All good, just wait until context is done, to avoid restarts.
312	level.Info(s.logger).Log("msg", "finished")
313	<-ctx.Done()
314	return nil
315}
316
317type tableToProcess struct {
318	table  string
319	reader chunk.IndexReader
320	start  time.Time
321	end    time.Time // Will not be set for non-periodic tables. Exclusive.
322}
323
324func (s *Scanner) findTablesToProcess(ctx context.Context, indexReader chunk.IndexReader, fromUnixTimestamp, toUnixTimestamp int64, tablesConfig chunk.PeriodicTableConfig) ([]tableToProcess, error) {
325	tables, err := indexReader.IndexTableNames(ctx)
326	if err != nil {
327		return nil, err
328	}
329
330	var result []tableToProcess
331
332	for _, t := range tables {
333		if !strings.HasPrefix(t, tablesConfig.Prefix) {
334			continue
335		}
336
337		var tp tableToProcess
338		if tablesConfig.Period == 0 {
339			tp = tableToProcess{
340				table:  t,
341				reader: indexReader,
342				start:  time.Unix(fromUnixTimestamp, 0),
343			}
344		} else {
345			p, err := strconv.ParseInt(t[len(tablesConfig.Prefix):], 10, 64)
346			if err != nil {
347				level.Warn(s.logger).Log("msg", "failed to parse period index of table", "table", t)
348				continue
349			}
350
351			start := time.Unix(p*int64(tablesConfig.Period/time.Second), 0)
352			tp = tableToProcess{
353				table:  t,
354				reader: indexReader,
355				start:  start,
356				end:    start.Add(tablesConfig.Period),
357			}
358		}
359
360		if fromUnixTimestamp <= tp.start.Unix() && tp.start.Unix() < toUnixTimestamp {
361			result = append(result, tp)
362		}
363	}
364
365	return result, nil
366}
367
368func (s *Scanner) processTable(ctx context.Context, table string, indexReader chunk.IndexReader) error {
369	tableLog := log.With(s.logger, "table", table)
370
371	tableProcessedFile := filepath.Join(s.cfg.OutputDirectory, table+".processed")
372
373	if shouldSkipOperationBecauseFileExists(tableProcessedFile) {
374		level.Info(tableLog).Log("msg", "skipping table because it was already scanned")
375		return nil
376	}
377
378	dir := filepath.Join(s.cfg.OutputDirectory, table)
379	level.Info(tableLog).Log("msg", "scanning table", "output", dir)
380
381	ignoredUsers, err := scanSingleTable(ctx, indexReader, table, dir, s.cfg.Concurrency, s.allowedUsers, s.ignoredUsers, s.openFiles, s.series, s.indexEntries, s.ignoredEntries)
382	if err != nil {
383		return errors.Wrapf(err, "failed to scan table %s and generate plan files", table)
384	}
385
386	tableLog.Log("msg", "ignored users", "count", len(ignoredUsers), "users", strings.Join(ignoredUsers, ","))
387
388	if s.cfg.VerifyPlans {
389		err = verifyPlanFiles(ctx, dir, tableLog)
390		if err != nil {
391			return errors.Wrap(err, "failed to verify plans")
392		}
393	}
394
395	if s.bucket != nil {
396		level.Info(tableLog).Log("msg", "uploading generated plan files for table", "source", dir)
397
398		err := uploadPlansConcurrently(ctx, tableLog, dir, s.bucket, s.bucketPrefix, s.cfg.Concurrency)
399		if err != nil {
400			return errors.Wrapf(err, "failed to upload plan files for table %s to bucket", table)
401		}
402
403		level.Info(tableLog).Log("msg", "uploaded generated files for table")
404		if !s.cfg.KeepFiles {
405			if err := os.RemoveAll(dir); err != nil {
406				return errors.Wrapf(err, "failed to delete uploaded plan files for table %s", table)
407			}
408		}
409	}
410
411	err = ioutil.WriteFile(tableProcessedFile, []byte("Finished on "+time.Now().String()+"\n"), 0600)
412	if err != nil {
413		return errors.Wrapf(err, "failed to create file %s", tableProcessedFile)
414	}
415
416	level.Info(tableLog).Log("msg", "done processing table")
417	return nil
418}
419
420func uploadPlansConcurrently(ctx context.Context, log log.Logger, dir string, bucket objstore.Bucket, bucketPrefix string, concurrency int) error {
421	df, err := os.Stat(dir)
422	if err != nil {
423		return errors.Wrap(err, "stat dir")
424	}
425	if !df.IsDir() {
426		return errors.Errorf("%s is not a directory", dir)
427	}
428
429	// Path relative to dir, and only use Slash as separator. BucketPrefix is prepended to it when uploading.
430	paths := make(chan string)
431
432	g, ctx := errgroup.WithContext(ctx)
433	for i := 0; i < concurrency; i++ {
434		g.Go(func() error {
435			for p := range paths {
436				src := filepath.Join(dir, filepath.FromSlash(p))
437				dst := path.Join(bucketPrefix, p)
438
439				boff := backoff.New(ctx, backoff.Config{
440					MinBackoff: 1 * time.Second,
441					MaxBackoff: 5 * time.Second,
442					MaxRetries: 5,
443				})
444
445				for boff.Ongoing() {
446					err := objstore.UploadFile(ctx, log, bucket, src, dst)
447
448					if err == nil {
449						break
450					}
451
452					level.Warn(log).Log("msg", "failed to upload block", "err", err)
453					boff.Wait()
454				}
455
456				if boff.Err() != nil {
457					return boff.Err()
458				}
459			}
460			return nil
461		})
462	}
463
464	g.Go(func() error {
465		defer close(paths)
466
467		return filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error {
468			if err != nil {
469				return err
470			}
471			if ctx.Err() != nil {
472				return ctx.Err()
473			}
474
475			if fi.IsDir() {
476				return nil
477			}
478
479			relPath, err := filepath.Rel(dir, path)
480			if err != nil {
481				return err
482			}
483
484			relPath = filepath.ToSlash(relPath)
485
486			select {
487			case paths <- relPath:
488				return nil
489			case <-ctx.Done():
490				return ctx.Err()
491			}
492		})
493	})
494
495	return g.Wait()
496}
497
498func shouldSkipOperationBecauseFileExists(file string) bool {
499	// If file exists, we should skip the operation.
500	_, err := os.Stat(file)
501	// Any error (including ErrNotExists) indicates operation should continue.
502	return err == nil
503}
504
505func scanSingleTable(
506	ctx context.Context,
507	indexReader chunk.IndexReader,
508	tableName string,
509	outDir string,
510	concurrency int,
511	allowed blocksconvert.AllowedUsers,
512	ignored *regexp.Regexp,
513	openFiles prometheus.Gauge,
514	series prometheus.Counter,
515	indexEntries *prometheus.CounterVec,
516	ignoredEntries prometheus.Counter,
517) ([]string, error) {
518	err := os.RemoveAll(outDir)
519	if err != nil {
520		return nil, errors.Wrapf(err, "failed to delete directory %s", outDir)
521	}
522
523	err = os.MkdirAll(outDir, os.FileMode(0700))
524	if err != nil {
525		return nil, errors.Wrapf(err, "failed to prepare directory %s", outDir)
526	}
527
528	files := newOpenFiles(openFiles)
529	result := func(dir string, file string, entry blocksconvert.PlanEntry, header func() blocksconvert.PlanEntry) error {
530		return files.appendJSONEntryToFile(dir, file, entry, func() interface{} {
531			return header()
532		})
533	}
534
535	var ps []chunk.IndexEntryProcessor
536
537	for i := 0; i < concurrency; i++ {
538		ps = append(ps, newProcessor(outDir, result, allowed, ignored, series, indexEntries, ignoredEntries))
539	}
540
541	err = indexReader.ReadIndexEntries(ctx, tableName, ps)
542	if err != nil {
543		return nil, err
544	}
545
546	ignoredUsersMap := map[string]struct{}{}
547	for _, p := range ps {
548		for u := range p.(*processor).ignoredUsers {
549			ignoredUsersMap[u] = struct{}{}
550		}
551	}
552
553	var ignoredUsers []string
554	for u := range ignoredUsersMap {
555		ignoredUsers = append(ignoredUsers, u)
556	}
557
558	err = files.closeAllFiles(func() interface{} {
559		return blocksconvert.PlanEntry{Complete: true}
560	})
561	return ignoredUsers, errors.Wrap(err, "closing files")
562}
563
564func verifyPlanFiles(ctx context.Context, dir string, logger log.Logger) error {
565	return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
566		if err != nil {
567			return err
568		}
569		if err := ctx.Err(); err != nil {
570			return err
571		}
572
573		if info.IsDir() {
574			return nil
575		}
576
577		ok, _ := blocksconvert.IsPlanFilename(info.Name())
578		if !ok {
579			return nil
580		}
581
582		r, err := os.Open(path)
583		if err != nil {
584			return errors.Wrapf(err, "failed to open %s", path)
585		}
586		defer func() {
587			_ = r.Close()
588		}()
589
590		pr, err := blocksconvert.PreparePlanFileReader(info.Name(), r)
591		if err != nil {
592			return errors.Wrapf(err, "failed to prepare plan file for reading: %s", path)
593		}
594
595		level.Info(logger).Log("msg", "verifying plan", "path", path)
596		return errors.Wrapf(verifyPlanFile(pr), "plan file: %s", path)
597	})
598}
599
600func verifyPlanFile(r io.Reader) error {
601	dec := json.NewDecoder(r)
602
603	entry := blocksconvert.PlanEntry{}
604	if err := dec.Decode(&entry); err != nil {
605		return errors.Wrap(err, "failed to parse plan file header")
606	}
607	if entry.User == "" || entry.DayIndex == 0 {
608		return errors.New("failed to read plan file header: no user or day index found")
609	}
610
611	series := map[string]struct{}{}
612
613	var err error
614	footerFound := false
615	for err = dec.Decode(&entry); err == nil; err = dec.Decode(&entry) {
616		if entry.Complete {
617			footerFound = true
618			entry.Reset()
619			continue
620		}
621
622		if footerFound {
623			return errors.New("plan entries found after plan footer")
624		}
625
626		if entry.SeriesID == "" {
627			return errors.Errorf("plan contains entry without seriesID")
628		}
629
630		if len(entry.Chunks) == 0 {
631			return errors.Errorf("entry for seriesID %s has no chunks", entry.SeriesID)
632		}
633
634		if _, found := series[entry.SeriesID]; found {
635			return errors.Errorf("multiple entries for series %s found in plan", entry.SeriesID)
636		}
637		series[entry.SeriesID] = struct{}{}
638
639		entry.Reset()
640	}
641
642	if err == io.EOF {
643		if !footerFound {
644			return errors.New("no footer found in the plan")
645		}
646		err = nil
647	}
648	return err
649}
650