1package local
2
3import (
4	"bytes"
5	"context"
6	"io/ioutil"
7	"os"
8	"path"
9	"path/filepath"
10	"strings"
11	"testing"
12	"time"
13
14	"github.com/stretchr/testify/require"
15
16	"github.com/grafana/loki/pkg/storage/chunk/util"
17)
18
19func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
20	deleteFilesOlderThan := 10 * time.Minute
21
22	fsChunksDir, err := ioutil.TempDir(os.TempDir(), "fs-chunks")
23	require.NoError(t, err)
24
25	bucketClient, err := NewFSObjectClient(FSConfig{
26		Directory: fsChunksDir,
27	})
28	require.NoError(t, err)
29
30	defer func() {
31		require.NoError(t, os.RemoveAll(fsChunksDir))
32	}()
33
34	file1 := "file1"
35	file2 := "file2"
36
37	// Creating dummy files
38	require.NoError(t, os.Chdir(fsChunksDir))
39
40	f, err := os.Create(file1)
41	require.NoError(t, err)
42	require.NoError(t, f.Close())
43
44	f, err = os.Create(file2)
45	require.NoError(t, err)
46	require.NoError(t, f.Close())
47
48	// Verify whether all files are created
49	files, _ := ioutil.ReadDir(".")
50	require.Equal(t, 2, len(files), "Number of files should be 2")
51
52	// No files should be deleted, since all of them are not much older
53	require.NoError(t, bucketClient.DeleteChunksBefore(context.Background(), time.Now().Add(-deleteFilesOlderThan)))
54	files, _ = ioutil.ReadDir(".")
55	require.Equal(t, 2, len(files), "Number of files should be 2")
56
57	// Changing mtime of file1 to make it look older
58	require.NoError(t, os.Chtimes(file1, time.Now().Add(-deleteFilesOlderThan), time.Now().Add(-deleteFilesOlderThan)))
59	require.NoError(t, bucketClient.DeleteChunksBefore(context.Background(), time.Now().Add(-deleteFilesOlderThan)))
60
61	// Verifying whether older file got deleted
62	files, _ = ioutil.ReadDir(".")
63	require.Equal(t, 1, len(files), "Number of files should be 1 after enforcing retention")
64}
65
66func TestFSObjectClient_List(t *testing.T) {
67	fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-objects")
68	require.NoError(t, err)
69
70	bucketClient, err := NewFSObjectClient(FSConfig{
71		Directory: fsObjectsDir,
72	})
73	require.NoError(t, err)
74
75	defer func() {
76		require.NoError(t, os.RemoveAll(fsObjectsDir))
77	}()
78
79	allFiles := []string{
80		"outer-file1",
81		"outer-file2",
82		"folder1/file1",
83		"folder1/file2",
84		"folder2/file3",
85		"folder2/file4",
86		"folder2/file5",
87		"deeply/nested/folder/a",
88		"deeply/nested/folder/b",
89		"deeply/nested/folder/c",
90	}
91
92	topLevelFolders := map[string]bool{}
93	topLevelFiles := map[string]bool{}
94	filesInTopLevelFolders := map[string]map[string]bool{}
95
96	for _, f := range allFiles {
97		require.NoError(t, bucketClient.PutObject(context.Background(), f, bytes.NewReader([]byte(f))))
98
99		s := strings.Split(f, "/")
100		if len(s) > 1 {
101			topLevelFolders[s[0]] = true
102		} else {
103			topLevelFiles[s[0]] = true
104		}
105
106		if len(s) == 2 {
107			if filesInTopLevelFolders[s[0]] == nil {
108				filesInTopLevelFolders[s[0]] = map[string]bool{}
109			}
110			filesInTopLevelFolders[s[0]][s[1]] = true
111		}
112	}
113
114	// create an empty directory which should get excluded from the list
115	require.NoError(t, util.EnsureDirectory(filepath.Join(fsObjectsDir, "empty-folder")))
116
117	storageObjects, commonPrefixes, err := bucketClient.List(context.Background(), "", "/")
118	require.NoError(t, err)
119
120	require.Len(t, storageObjects, len(topLevelFiles))
121	for _, so := range storageObjects {
122		require.True(t, topLevelFiles[so.Key])
123	}
124
125	require.Len(t, commonPrefixes, len(topLevelFolders))
126	for _, commonPrefix := range commonPrefixes {
127		require.True(t, topLevelFolders[string(commonPrefix)[:len(commonPrefix)-1]]) // 1 to remove "/" separator.
128	}
129
130	for folder, files := range filesInTopLevelFolders {
131		storageObjects, commonPrefixes, err := bucketClient.List(context.Background(), folder, "/")
132		require.NoError(t, err)
133
134		require.Len(t, storageObjects, len(files))
135		for _, so := range storageObjects {
136			require.True(t, strings.HasPrefix(so.Key, folder+"/"))
137			require.True(t, files[path.Base(so.Key)])
138		}
139
140		require.Len(t, commonPrefixes, 0)
141	}
142
143	// List everything from the top, recursively.
144	storageObjects, commonPrefixes, err = bucketClient.List(context.Background(), "", "")
145	require.NoError(t, err)
146
147	// Since delimiter is empty, there are no commonPrefixes.
148	require.Empty(t, commonPrefixes)
149
150	var storageObjectPaths []string
151	for _, so := range storageObjects {
152		storageObjectPaths = append(storageObjectPaths, so.Key)
153	}
154	require.ElementsMatch(t, allFiles, storageObjectPaths)
155
156	storageObjects, commonPrefixes, err = bucketClient.List(context.Background(), "doesnt_exist", "")
157	require.NoError(t, err)
158	require.Empty(t, storageObjects)
159	require.Empty(t, commonPrefixes)
160
161	storageObjects, commonPrefixes, err = bucketClient.List(context.Background(), "outer-file1", "")
162	require.NoError(t, err)
163	require.Len(t, storageObjects, 1)
164	require.Equal(t, "outer-file1", storageObjects[0].Key)
165	require.Empty(t, commonPrefixes)
166}
167
168func TestFSObjectClient_DeleteObject(t *testing.T) {
169	fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-delete-object")
170	require.NoError(t, err)
171
172	bucketClient, err := NewFSObjectClient(FSConfig{
173		Directory: fsObjectsDir,
174	})
175	require.NoError(t, err)
176
177	defer func() {
178		require.NoError(t, os.RemoveAll(fsObjectsDir))
179	}()
180
181	foldersWithFiles := make(map[string][]string)
182	foldersWithFiles["folder1"] = []string{"file1", "file2"}
183
184	for folder, files := range foldersWithFiles {
185		for _, filename := range files {
186			err := bucketClient.PutObject(context.Background(), path.Join(folder, filename), bytes.NewReader([]byte(filename)))
187			require.NoError(t, err)
188		}
189	}
190
191	// let us check if we have right folders created
192	_, commonPrefixes, err := bucketClient.List(context.Background(), "", "/")
193	require.NoError(t, err)
194	require.Len(t, commonPrefixes, len(foldersWithFiles))
195
196	// let us delete file1 from folder1 and check that file1 is gone but folder1 with file2 is still there
197	require.NoError(t, bucketClient.DeleteObject(context.Background(), path.Join("folder1", "file1")))
198	_, err = os.Stat(filepath.Join(fsObjectsDir, filepath.Join("folder1", "file1")))
199	require.True(t, os.IsNotExist(err))
200
201	_, err = os.Stat(filepath.Join(fsObjectsDir, filepath.Join("folder1", "file2")))
202	require.NoError(t, err)
203
204	// let us delete second file as well and check that folder1 also got removed
205	require.NoError(t, bucketClient.DeleteObject(context.Background(), path.Join("folder1", "file2")))
206	_, err = os.Stat(filepath.Join(fsObjectsDir, "folder1"))
207	require.True(t, os.IsNotExist(err))
208
209	_, err = os.Stat(fsObjectsDir)
210	require.NoError(t, err)
211
212	// let us see ensure folder2 is still there will all the files:
213	/*files, commonPrefixes, err := bucketClient.List(context.Background(), "folder2/")
214	require.NoError(t, err)
215	require.Len(t, commonPrefixes, 0)
216	require.Len(t, files, len(foldersWithFiles["folder2/"]))*/
217}
218