1package local
2
3import (
4	"context"
5	"flag"
6	"fmt"
7	"io"
8	"os"
9	"path/filepath"
10	"time"
11
12	"github.com/go-kit/log/level"
13	"github.com/grafana/dskit/runutil"
14
15	"github.com/cortexproject/cortex/pkg/chunk"
16	"github.com/cortexproject/cortex/pkg/chunk/util"
17	util_log "github.com/cortexproject/cortex/pkg/util/log"
18)
19
20// FSConfig is the config for a FSObjectClient.
21type FSConfig struct {
22	Directory string `yaml:"directory"`
23}
24
25// RegisterFlags registers flags.
26func (cfg *FSConfig) RegisterFlags(f *flag.FlagSet) {
27	cfg.RegisterFlagsWithPrefix("", f)
28}
29
30// RegisterFlags registers flags with prefix.
31func (cfg *FSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
32	f.StringVar(&cfg.Directory, prefix+"local.chunk-directory", "", "Directory to store chunks in.")
33}
34
35// FSObjectClient holds config for filesystem as object store
36type FSObjectClient struct {
37	cfg           FSConfig
38	pathSeparator string
39}
40
41// NewFSObjectClient makes a chunk.Client which stores chunks as files in the local filesystem.
42func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
43	// filepath.Clean cleans up the path by removing unwanted duplicate slashes, dots etc.
44	// This is needed because DeleteObject works on paths which are already cleaned up and it
45	// checks whether it is about to delete the configured directory when it becomes empty
46	cfg.Directory = filepath.Clean(cfg.Directory)
47	if err := util.EnsureDirectory(cfg.Directory); err != nil {
48		return nil, err
49	}
50
51	return &FSObjectClient{
52		cfg:           cfg,
53		pathSeparator: string(os.PathSeparator),
54	}, nil
55}
56
57// Stop implements ObjectClient
58func (FSObjectClient) Stop() {}
59
60// GetObject from the store
61func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, error) {
62	fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)))
63	if err != nil && os.IsNotExist(err) {
64		return nil, chunk.ErrStorageObjectNotFound
65	}
66
67	return fl, err
68}
69
70// PutObject into the store
71func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
72	fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
73	err := util.EnsureDirectory(filepath.Dir(fullPath))
74	if err != nil {
75		return err
76	}
77
78	fl, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
79	if err != nil {
80		return err
81	}
82
83	defer runutil.CloseWithLogOnErr(util_log.Logger, fl, "fullPath: %s", fullPath)
84
85	_, err = io.Copy(fl, object)
86	if err != nil {
87		return err
88	}
89
90	err = fl.Sync()
91	if err != nil {
92		return err
93	}
94
95	return fl.Close()
96}
97
98// List implements chunk.ObjectClient.
99// FSObjectClient assumes that prefix is a directory, and only supports "" and "/" delimiters.
100func (f *FSObjectClient) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
101	if delimiter != "" && delimiter != "/" {
102		return nil, nil, fmt.Errorf("unsupported delimiter: %q", delimiter)
103	}
104
105	folderPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(prefix))
106
107	info, err := os.Stat(folderPath)
108	if err != nil {
109		if os.IsNotExist(err) {
110			return nil, nil, nil
111		}
112		return nil, nil, err
113	}
114	if !info.IsDir() {
115		// When listing single file, return this file only.
116		return []chunk.StorageObject{{Key: info.Name(), ModifiedAt: info.ModTime()}}, nil, nil
117	}
118
119	var storageObjects []chunk.StorageObject
120	var commonPrefixes []chunk.StorageCommonPrefix
121
122	err = filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {
123		if err != nil {
124			return err
125		}
126
127		// Ignore starting folder itself.
128		if path == folderPath {
129			return nil
130		}
131
132		relPath, err := filepath.Rel(f.cfg.Directory, path)
133		if err != nil {
134			return err
135		}
136
137		relPath = filepath.ToSlash(relPath)
138
139		if info.IsDir() {
140			if delimiter == "" {
141				// Go into directory
142				return nil
143			}
144
145			empty, err := isDirEmpty(path)
146			if err != nil {
147				return err
148			}
149
150			if !empty {
151				commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(relPath+delimiter))
152			}
153			return filepath.SkipDir
154		}
155
156		storageObjects = append(storageObjects, chunk.StorageObject{Key: relPath, ModifiedAt: info.ModTime()})
157		return nil
158	})
159
160	return storageObjects, commonPrefixes, err
161}
162
163func (f *FSObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
164	// inspired from https://github.com/thanos-io/thanos/blob/55cb8ca38b3539381dc6a781e637df15c694e50a/pkg/objstore/filesystem/filesystem.go#L195
165	file := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
166
167	for file != f.cfg.Directory {
168		if err := os.Remove(file); err != nil {
169			return err
170		}
171
172		file = filepath.Dir(file)
173		empty, err := isDirEmpty(file)
174		if err != nil {
175			return err
176		}
177
178		if !empty {
179			break
180		}
181	}
182
183	return nil
184}
185
186// DeleteChunksBefore implements BucketClient
187func (f *FSObjectClient) DeleteChunksBefore(ctx context.Context, ts time.Time) error {
188	return filepath.Walk(f.cfg.Directory, func(path string, info os.FileInfo, err error) error {
189		if !info.IsDir() && info.ModTime().Before(ts) {
190			level.Info(util_log.Logger).Log("msg", "file has exceeded the retention period, removing it", "filepath", info.Name())
191			if err := os.Remove(path); err != nil {
192				return err
193			}
194		}
195		return nil
196	})
197}
198
199// copied from https://github.com/thanos-io/thanos/blob/55cb8ca38b3539381dc6a781e637df15c694e50a/pkg/objstore/filesystem/filesystem.go#L181
200func isDirEmpty(name string) (ok bool, err error) {
201	f, err := os.Open(name)
202	if err != nil {
203		return false, err
204	}
205	defer runutil.CloseWithErrCapture(&err, f, "dir open")
206
207	if _, err = f.Readdir(1); err == io.EOF {
208		return true, nil
209	}
210	return false, err
211}
212