1package shipper
2
3import (
4	"context"
5	"flag"
6	"fmt"
7	"io/ioutil"
8	"os"
9	"path"
10	"sync"
11	"time"
12
13	util_log "github.com/cortexproject/cortex/pkg/util/log"
14	"github.com/cortexproject/cortex/pkg/util/spanlogger"
15	"github.com/go-kit/kit/log/level"
16	"github.com/prometheus/client_golang/prometheus"
17	"github.com/weaveworks/common/instrument"
18	"go.etcd.io/bbolt"
19
20	"github.com/grafana/loki/pkg/storage/chunk"
21	"github.com/grafana/loki/pkg/storage/chunk/local"
22	chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
23	"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
24	"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
25	"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
26	shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
27)
28
29const (
30	// ModeReadWrite is to allow both read and write
31	ModeReadWrite = iota
32	// ModeReadOnly is to allow only read operations
33	ModeReadOnly
34	// ModeWriteOnly is to allow only write operations
35	ModeWriteOnly
36
37	// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
38	BoltDBShipperType = "boltdb-shipper"
39
40	// FilesystemObjectStoreType holds the periodic config type for the filesystem store
41	FilesystemObjectStoreType = "filesystem"
42
43	// UploadInterval defines interval for when we check if there are new index files to upload.
44	// It's also used to snapshot the currently written index tables so the snapshots can be used for reads.
45	UploadInterval = 1 * time.Minute
46)
47
48type boltDBIndexClient interface {
49	QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error
50	NewWriteBatch() chunk.WriteBatch
51	WriteToDB(ctx context.Context, db *bbolt.DB, writes local.TableWrites) error
52	Stop()
53}
54
55type Config struct {
56	ActiveIndexDirectory     string                   `yaml:"active_index_directory"`
57	SharedStoreType          string                   `yaml:"shared_store"`
58	SharedStoreKeyPrefix     string                   `yaml:"shared_store_key_prefix"`
59	CacheLocation            string                   `yaml:"cache_location"`
60	CacheTTL                 time.Duration            `yaml:"cache_ttl"`
61	ResyncInterval           time.Duration            `yaml:"resync_interval"`
62	QueryReadyNumDays        int                      `yaml:"query_ready_num_days"`
63	IndexGatewayClientConfig IndexGatewayClientConfig `yaml:"index_gateway_client"`
64	IngesterName             string                   `yaml:"-"`
65	Mode                     int                      `yaml:"-"`
66	IngesterDBRetainPeriod   time.Duration            `yaml:"-"`
67}
68
69// RegisterFlags registers flags.
70func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
71	cfg.IndexGatewayClientConfig.RegisterFlagsWithPrefix("boltdb.shipper.index-gateway-client", f)
72
73	f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
74	f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
75	f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it")
76	f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
77	f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
78	f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
79	f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.")
80}
81
82func (cfg *Config) Validate() error {
83	return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
84}
85
86type Shipper struct {
87	cfg               Config
88	boltDBIndexClient boltDBIndexClient
89	uploadsManager    *uploads.TableManager
90	downloadsManager  *downloads.TableManager
91
92	metrics  *metrics
93	stopOnce sync.Once
94}
95
96// NewShipper creates a shipper for syncing local objects with a store
97func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (chunk.IndexClient, error) {
98	shipper := Shipper{
99		cfg:     cfg,
100		metrics: newMetrics(registerer),
101	}
102
103	err := shipper.init(storageClient, registerer)
104	if err != nil {
105		return nil, err
106	}
107
108	level.Info(util_log.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode))
109
110	return &shipper, nil
111}
112
113func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.Registerer) error {
114	// When we run with target querier we don't have ActiveIndexDirectory set so using CacheLocation instead.
115	// Also it doesn't matter which directory we use since BoltDBIndexClient doesn't do anything with it but it is good to have a valid path.
116	boltdbIndexClientDir := s.cfg.ActiveIndexDirectory
117	if boltdbIndexClientDir == "" {
118		boltdbIndexClientDir = s.cfg.CacheLocation
119	}
120
121	var err error
122	s.boltDBIndexClient, err = local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: boltdbIndexClientDir})
123	if err != nil {
124		return err
125	}
126
127	indexStorageClient := storage.NewIndexStorageClient(storageClient, s.cfg.SharedStoreKeyPrefix)
128
129	if s.cfg.Mode != ModeReadOnly {
130		uploader, err := s.getUploaderName()
131		if err != nil {
132			return err
133		}
134
135		cfg := uploads.Config{
136			Uploader:       uploader,
137			IndexDir:       s.cfg.ActiveIndexDirectory,
138			UploadInterval: UploadInterval,
139			DBRetainPeriod: s.cfg.IngesterDBRetainPeriod,
140		}
141		uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer)
142		if err != nil {
143			return err
144		}
145
146		s.uploadsManager = uploadsManager
147	}
148
149	if s.cfg.Mode != ModeWriteOnly {
150		cfg := downloads.Config{
151			CacheDir:          s.cfg.CacheLocation,
152			SyncInterval:      s.cfg.ResyncInterval,
153			CacheTTL:          s.cfg.CacheTTL,
154			QueryReadyNumDays: s.cfg.QueryReadyNumDays,
155		}
156		downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer)
157		if err != nil {
158			return err
159		}
160
161		s.downloadsManager = downloadsManager
162	}
163
164	return nil
165}
166
167// we would persist uploader name in <active-index-directory>/uploader/name file so that we use same name on subsequent restarts to
168// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to
169// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters.
170func (s *Shipper) getUploaderName() (string, error) {
171	uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().UnixNano())
172
173	uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name")
174	if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil {
175		return "", err
176	}
177
178	_, err := os.Stat(uploaderFilePath)
179	if err != nil {
180		if !os.IsNotExist(err) {
181			return "", err
182		}
183		if err := ioutil.WriteFile(uploaderFilePath, []byte(uploader), 0666); err != nil {
184			return "", err
185		}
186	} else {
187		ub, err := ioutil.ReadFile(uploaderFilePath)
188		if err != nil {
189			return "", err
190		}
191		uploader = string(ub)
192	}
193
194	return uploader, nil
195}
196
197func (s *Shipper) Stop() {
198	s.stopOnce.Do(s.stop)
199}
200
201func (s *Shipper) stop() {
202	if s.uploadsManager != nil {
203		s.uploadsManager.Stop()
204	}
205
206	if s.downloadsManager != nil {
207		s.downloadsManager.Stop()
208	}
209
210	s.boltDBIndexClient.Stop()
211}
212
213func (s *Shipper) NewWriteBatch() chunk.WriteBatch {
214	return s.boltDBIndexClient.NewWriteBatch()
215}
216
217func (s *Shipper) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
218	return instrument.CollectedRequest(ctx, "WRITE", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
219		return s.uploadsManager.BatchWrite(ctx, batch)
220	})
221}
222
223func (s *Shipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
224	return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error {
225		spanLogger := spanlogger.FromContext(ctx)
226
227		if s.uploadsManager != nil {
228			err := s.uploadsManager.QueryPages(ctx, queries, callback)
229			if err != nil {
230				return err
231			}
232
233			level.Debug(spanLogger).Log("queried", "uploads-manager")
234		}
235
236		if s.downloadsManager != nil {
237			err := s.downloadsManager.QueryPages(ctx, queries, callback)
238			if err != nil {
239				return err
240			}
241
242			level.Debug(spanLogger).Log("queried", "downloads-manager")
243		}
244
245		return nil
246	})
247}
248