1package shipper 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "io/ioutil" 8 "os" 9 "path" 10 "sync" 11 "time" 12 13 util_log "github.com/cortexproject/cortex/pkg/util/log" 14 "github.com/cortexproject/cortex/pkg/util/spanlogger" 15 "github.com/go-kit/kit/log/level" 16 "github.com/prometheus/client_golang/prometheus" 17 "github.com/weaveworks/common/instrument" 18 "go.etcd.io/bbolt" 19 20 "github.com/grafana/loki/pkg/storage/chunk" 21 "github.com/grafana/loki/pkg/storage/chunk/local" 22 chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" 23 "github.com/grafana/loki/pkg/storage/stores/shipper/downloads" 24 "github.com/grafana/loki/pkg/storage/stores/shipper/storage" 25 "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" 26 shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" 27) 28 29const ( 30 // ModeReadWrite is to allow both read and write 31 ModeReadWrite = iota 32 // ModeReadOnly is to allow only read operations 33 ModeReadOnly 34 // ModeWriteOnly is to allow only write operations 35 ModeWriteOnly 36 37 // BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage 38 BoltDBShipperType = "boltdb-shipper" 39 40 // FilesystemObjectStoreType holds the periodic config type for the filesystem store 41 FilesystemObjectStoreType = "filesystem" 42 43 // UploadInterval defines interval for when we check if there are new index files to upload. 44 // It's also used to snapshot the currently written index tables so the snapshots can be used for reads. 45 UploadInterval = 1 * time.Minute 46) 47 48type boltDBIndexClient interface { 49 QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error 50 NewWriteBatch() chunk.WriteBatch 51 WriteToDB(ctx context.Context, db *bbolt.DB, writes local.TableWrites) error 52 Stop() 53} 54 55type Config struct { 56 ActiveIndexDirectory string `yaml:"active_index_directory"` 57 SharedStoreType string `yaml:"shared_store"` 58 SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` 59 CacheLocation string `yaml:"cache_location"` 60 CacheTTL time.Duration `yaml:"cache_ttl"` 61 ResyncInterval time.Duration `yaml:"resync_interval"` 62 QueryReadyNumDays int `yaml:"query_ready_num_days"` 63 IndexGatewayClientConfig IndexGatewayClientConfig `yaml:"index_gateway_client"` 64 IngesterName string `yaml:"-"` 65 Mode int `yaml:"-"` 66 IngesterDBRetainPeriod time.Duration `yaml:"-"` 67} 68 69// RegisterFlags registers flags. 70func (cfg *Config) RegisterFlags(f *flag.FlagSet) { 71 cfg.IndexGatewayClientConfig.RegisterFlagsWithPrefix("boltdb.shipper.index-gateway-client", f) 72 73 f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage") 74 f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem") 75 f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it") 76 f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") 77 f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") 78 f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") 79 f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.") 80} 81 82func (cfg *Config) Validate() error { 83 return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) 84} 85 86type Shipper struct { 87 cfg Config 88 boltDBIndexClient boltDBIndexClient 89 uploadsManager *uploads.TableManager 90 downloadsManager *downloads.TableManager 91 92 metrics *metrics 93 stopOnce sync.Once 94} 95 96// NewShipper creates a shipper for syncing local objects with a store 97func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (chunk.IndexClient, error) { 98 shipper := Shipper{ 99 cfg: cfg, 100 metrics: newMetrics(registerer), 101 } 102 103 err := shipper.init(storageClient, registerer) 104 if err != nil { 105 return nil, err 106 } 107 108 level.Info(util_log.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode)) 109 110 return &shipper, nil 111} 112 113func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.Registerer) error { 114 // When we run with target querier we don't have ActiveIndexDirectory set so using CacheLocation instead. 115 // Also it doesn't matter which directory we use since BoltDBIndexClient doesn't do anything with it but it is good to have a valid path. 116 boltdbIndexClientDir := s.cfg.ActiveIndexDirectory 117 if boltdbIndexClientDir == "" { 118 boltdbIndexClientDir = s.cfg.CacheLocation 119 } 120 121 var err error 122 s.boltDBIndexClient, err = local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: boltdbIndexClientDir}) 123 if err != nil { 124 return err 125 } 126 127 indexStorageClient := storage.NewIndexStorageClient(storageClient, s.cfg.SharedStoreKeyPrefix) 128 129 if s.cfg.Mode != ModeReadOnly { 130 uploader, err := s.getUploaderName() 131 if err != nil { 132 return err 133 } 134 135 cfg := uploads.Config{ 136 Uploader: uploader, 137 IndexDir: s.cfg.ActiveIndexDirectory, 138 UploadInterval: UploadInterval, 139 DBRetainPeriod: s.cfg.IngesterDBRetainPeriod, 140 } 141 uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) 142 if err != nil { 143 return err 144 } 145 146 s.uploadsManager = uploadsManager 147 } 148 149 if s.cfg.Mode != ModeWriteOnly { 150 cfg := downloads.Config{ 151 CacheDir: s.cfg.CacheLocation, 152 SyncInterval: s.cfg.ResyncInterval, 153 CacheTTL: s.cfg.CacheTTL, 154 QueryReadyNumDays: s.cfg.QueryReadyNumDays, 155 } 156 downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) 157 if err != nil { 158 return err 159 } 160 161 s.downloadsManager = downloadsManager 162 } 163 164 return nil 165} 166 167// we would persist uploader name in <active-index-directory>/uploader/name file so that we use same name on subsequent restarts to 168// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to 169// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters. 170func (s *Shipper) getUploaderName() (string, error) { 171 uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().UnixNano()) 172 173 uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name") 174 if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil { 175 return "", err 176 } 177 178 _, err := os.Stat(uploaderFilePath) 179 if err != nil { 180 if !os.IsNotExist(err) { 181 return "", err 182 } 183 if err := ioutil.WriteFile(uploaderFilePath, []byte(uploader), 0666); err != nil { 184 return "", err 185 } 186 } else { 187 ub, err := ioutil.ReadFile(uploaderFilePath) 188 if err != nil { 189 return "", err 190 } 191 uploader = string(ub) 192 } 193 194 return uploader, nil 195} 196 197func (s *Shipper) Stop() { 198 s.stopOnce.Do(s.stop) 199} 200 201func (s *Shipper) stop() { 202 if s.uploadsManager != nil { 203 s.uploadsManager.Stop() 204 } 205 206 if s.downloadsManager != nil { 207 s.downloadsManager.Stop() 208 } 209 210 s.boltDBIndexClient.Stop() 211} 212 213func (s *Shipper) NewWriteBatch() chunk.WriteBatch { 214 return s.boltDBIndexClient.NewWriteBatch() 215} 216 217func (s *Shipper) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { 218 return instrument.CollectedRequest(ctx, "WRITE", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { 219 return s.uploadsManager.BatchWrite(ctx, batch) 220 }) 221} 222 223func (s *Shipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { 224 return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { 225 spanLogger := spanlogger.FromContext(ctx) 226 227 if s.uploadsManager != nil { 228 err := s.uploadsManager.QueryPages(ctx, queries, callback) 229 if err != nil { 230 return err 231 } 232 233 level.Debug(spanLogger).Log("queried", "uploads-manager") 234 } 235 236 if s.downloadsManager != nil { 237 err := s.downloadsManager.QueryPages(ctx, queries, callback) 238 if err != nil { 239 return err 240 } 241 242 level.Debug(spanLogger).Log("queried", "downloads-manager") 243 } 244 245 return nil 246 }) 247} 248