1// Copyright (C) 2021 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package ultest
5
6import (
7	"bytes"
8	"context"
9	"path/filepath"
10	"sort"
11	"strings"
12	"sync"
13	"time"
14
15	"github.com/zeebo/clingy"
16	"github.com/zeebo/errs"
17
18	"storj.io/storj/cmd/uplinkng/ulfs"
19	"storj.io/storj/cmd/uplinkng/ulloc"
20)
21
22//
23// ulfs.Filesystem
24//
25
26type testFilesystem struct {
27	stdin   string
28	created int64
29	files   map[ulloc.Location]memFileData
30	pending map[ulloc.Location][]*memWriteHandle
31	locals  map[string]bool // true means path is a directory
32	buckets map[string]struct{}
33
34	mu sync.Mutex
35}
36
37func newTestFilesystem() *testFilesystem {
38	return &testFilesystem{
39		files:   make(map[ulloc.Location]memFileData),
40		pending: make(map[ulloc.Location][]*memWriteHandle),
41		locals:  make(map[string]bool),
42		buckets: make(map[string]struct{}),
43	}
44}
45
46type memFileData struct {
47	contents string
48	created  int64
49}
50
51func (tfs *testFilesystem) ensureBucket(name string) {
52	tfs.buckets[name] = struct{}{}
53}
54
55func (tfs *testFilesystem) Files() (files []File) {
56	for loc, mf := range tfs.files {
57		files = append(files, File{
58			Loc:      loc.String(),
59			Contents: mf.contents,
60		})
61	}
62	sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) })
63	return files
64}
65
66func (tfs *testFilesystem) Pending() (files []File) {
67	for loc, mh := range tfs.pending {
68		for _, h := range mh {
69			files = append(files, File{
70				Loc:      loc.String(),
71				Contents: h.buf.String(),
72			})
73		}
74	}
75	sort.Slice(files, func(i, j int) bool { return files[i].less(files[j]) })
76	return files
77}
78
79func (tfs *testFilesystem) Close() error {
80	return nil
81}
82
83func (tfs *testFilesystem) Open(ctx clingy.Context, loc ulloc.Location, opts *ulfs.OpenOptions) (_ ulfs.ReadHandle, err error) {
84	tfs.mu.Lock()
85	defer tfs.mu.Unlock()
86
87	if loc.Std() {
88		return &byteReadHandle{Buffer: bytes.NewBufferString("-")}, nil
89	}
90
91	mf, ok := tfs.files[loc]
92	if !ok {
93		return nil, errs.New("file does not exist %q", loc)
94	}
95
96	if opts != nil {
97		return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents[opts.Offset:(opts.Offset + opts.Length)])}, nil
98	}
99
100	return &byteReadHandle{Buffer: bytes.NewBufferString(mf.contents)}, nil
101}
102
103func (tfs *testFilesystem) Create(ctx clingy.Context, loc ulloc.Location) (_ ulfs.WriteHandle, err error) {
104	tfs.mu.Lock()
105	defer tfs.mu.Unlock()
106
107	if loc.Std() {
108		return new(discardWriteHandle), nil
109	}
110
111	if bucket, _, ok := loc.RemoteParts(); ok {
112		if _, ok := tfs.buckets[bucket]; !ok {
113			return nil, errs.New("bucket %q does not exist", bucket)
114		}
115	}
116
117	if path, ok := loc.LocalParts(); ok {
118		if loc.Directoryish() || tfs.isLocalDir(ctx, loc) {
119			return nil, errs.New("unable to open file for writing: %q", loc)
120		}
121		dir := ulloc.CleanPath(filepath.Dir(path))
122		if err := tfs.mkdirAll(ctx, dir); err != nil {
123			return nil, err
124		}
125	}
126
127	tfs.created++
128	wh := &memWriteHandle{
129		buf: bytes.NewBuffer(nil),
130		loc: loc,
131		tfs: tfs,
132		cre: tfs.created,
133	}
134
135	if loc.Remote() {
136		tfs.pending[loc] = append(tfs.pending[loc], wh)
137	}
138
139	return wh, nil
140}
141
142func (tfs *testFilesystem) Move(ctx clingy.Context, source, dest ulloc.Location) error {
143	tfs.mu.Lock()
144	defer tfs.mu.Unlock()
145
146	mf, ok := tfs.files[source]
147	if !ok {
148		return errs.New("file does not exist %q", source)
149	}
150	delete(tfs.files, source)
151	tfs.files[dest] = mf
152	return nil
153}
154
155func (tfs *testFilesystem) Remove(ctx context.Context, loc ulloc.Location, opts *ulfs.RemoveOptions) error {
156	tfs.mu.Lock()
157	defer tfs.mu.Unlock()
158
159	if opts == nil || !opts.Pending {
160		delete(tfs.files, loc)
161	} else {
162		// TODO: Remove needs an API that understands that multiple pending files may exist
163		delete(tfs.pending, loc)
164	}
165	return nil
166}
167
168func (tfs *testFilesystem) List(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
169	tfs.mu.Lock()
170	defer tfs.mu.Unlock()
171
172	if opts != nil && opts.Pending {
173		return tfs.listPending(ctx, prefix, opts)
174	}
175
176	prefixDir := prefix.AsDirectoryish()
177
178	var infos []ulfs.ObjectInfo
179	for loc, mf := range tfs.files {
180		if loc.HasPrefix(prefixDir) || loc == prefix {
181			infos = append(infos, ulfs.ObjectInfo{
182				Loc:     loc,
183				Created: time.Unix(mf.created, 0),
184			})
185		}
186	}
187
188	sort.Sort(objectInfos(infos))
189
190	if opts == nil || !opts.Recursive {
191		infos = collapseObjectInfos(prefix, infos)
192	}
193
194	return &objectInfoIterator{infos: infos}, nil
195}
196
197func (tfs *testFilesystem) listPending(ctx context.Context, prefix ulloc.Location, opts *ulfs.ListOptions) (ulfs.ObjectIterator, error) {
198	if prefix.Local() {
199		return &objectInfoIterator{}, nil
200	}
201
202	prefixDir := prefix.AsDirectoryish()
203
204	var infos []ulfs.ObjectInfo
205	for loc, whs := range tfs.pending {
206		if loc.HasPrefix(prefixDir) || loc == prefix {
207			for _, wh := range whs {
208				infos = append(infos, ulfs.ObjectInfo{
209					Loc:     loc,
210					Created: time.Unix(wh.cre, 0),
211				})
212			}
213		}
214	}
215
216	sort.Sort(objectInfos(infos))
217
218	if opts == nil || !opts.Recursive {
219		infos = collapseObjectInfos(prefix, infos)
220	}
221
222	return &objectInfoIterator{infos: infos}, nil
223}
224
225func (tfs *testFilesystem) IsLocalDir(ctx context.Context, loc ulloc.Location) (local bool) {
226	tfs.mu.Lock()
227	defer tfs.mu.Unlock()
228
229	return tfs.isLocalDir(ctx, loc)
230}
231
232func (tfs *testFilesystem) isLocalDir(ctx context.Context, loc ulloc.Location) (local bool) {
233	path, ok := loc.LocalParts()
234	return ok && (ulloc.CleanPath(path) == "." || tfs.locals[path])
235}
236
237func (tfs *testFilesystem) Stat(ctx context.Context, loc ulloc.Location) (*ulfs.ObjectInfo, error) {
238	if loc.Std() {
239		return nil, errs.New("unable to stat loc %q", loc.Loc())
240	}
241
242	mf, ok := tfs.files[loc]
243	if !ok {
244		return nil, errs.New("file does not exist: %q", loc.Loc())
245	}
246
247	return &ulfs.ObjectInfo{
248		Loc:           loc,
249		Created:       time.Unix(mf.created, 0),
250		ContentLength: int64(len(mf.contents)),
251	}, nil
252}
253
254func (tfs *testFilesystem) mkdirAll(ctx context.Context, dir string) error {
255	i := 0
256	for i < len(dir) {
257		slash := strings.Index(dir[i:], "/")
258		if slash == -1 {
259			break
260		}
261		if err := tfs.mkdir(ctx, dir[:i+slash]); err != nil {
262			return err
263		}
264		i += slash + 1
265	}
266	if len(dir) > 0 {
267		return tfs.mkdir(ctx, dir)
268	}
269	return nil
270}
271
272func (tfs *testFilesystem) mkdir(ctx context.Context, dir string) error {
273	if isDir, ok := tfs.locals[dir]; ok && !isDir {
274		return errs.New("cannot create directory: %q is a file", dir)
275	}
276	tfs.locals[dir] = true
277	return nil
278}
279
280//
281// ulfs.ReadHandle
282//
283
284type byteReadHandle struct {
285	*bytes.Buffer
286}
287
288func (b *byteReadHandle) Close() error          { return nil }
289func (b *byteReadHandle) Info() ulfs.ObjectInfo { return ulfs.ObjectInfo{} }
290
291//
292// ulfs.WriteHandle
293//
294
295type memWriteHandle struct {
296	buf  *bytes.Buffer
297	loc  ulloc.Location
298	tfs  *testFilesystem
299	cre  int64
300	done bool
301}
302
303func (b *memWriteHandle) Write(p []byte) (int, error) {
304	return b.buf.Write(p)
305}
306
307func (b *memWriteHandle) Commit() error {
308	b.tfs.mu.Lock()
309	defer b.tfs.mu.Unlock()
310
311	if err := b.close(); err != nil {
312		return err
313	}
314
315	if path, ok := b.loc.LocalParts(); ok {
316		b.tfs.locals[path] = false
317	}
318
319	b.tfs.files[b.loc] = memFileData{
320		contents: b.buf.String(),
321		created:  b.cre,
322	}
323	return nil
324}
325
326func (b *memWriteHandle) Abort() error {
327	b.tfs.mu.Lock()
328	defer b.tfs.mu.Unlock()
329
330	if err := b.close(); err != nil {
331		return err
332	}
333
334	return nil
335}
336
337func (b *memWriteHandle) close() error {
338	if b.done {
339		return errs.New("already done")
340	}
341	b.done = true
342
343	handles := b.tfs.pending[b.loc]
344	for i, v := range handles {
345		if v == b {
346			handles = append(handles[:i], handles[i+1:]...)
347			break
348		}
349	}
350
351	if len(handles) > 0 {
352		b.tfs.pending[b.loc] = handles
353	} else {
354		delete(b.tfs.pending, b.loc)
355	}
356
357	return nil
358}
359
360type discardWriteHandle struct{}
361
362func (discardWriteHandle) Write(p []byte) (int, error) { return len(p), nil }
363func (discardWriteHandle) Commit() error               { return nil }
364func (discardWriteHandle) Abort() error                { return nil }
365
366//
367// ulfs.ObjectIterator
368//
369
370type objectInfoIterator struct {
371	infos   []ulfs.ObjectInfo
372	current ulfs.ObjectInfo
373}
374
375func (li *objectInfoIterator) Next() bool {
376	if len(li.infos) == 0 {
377		return false
378	}
379	li.current, li.infos = li.infos[0], li.infos[1:]
380	return true
381}
382
383func (li *objectInfoIterator) Err() error {
384	return nil
385}
386
387func (li *objectInfoIterator) Item() ulfs.ObjectInfo {
388	return li.current
389}
390
391type objectInfos []ulfs.ObjectInfo
392
393func (ois objectInfos) Len() int               { return len(ois) }
394func (ois objectInfos) Swap(i int, j int)      { ois[i], ois[j] = ois[j], ois[i] }
395func (ois objectInfos) Less(i int, j int) bool { return ois[i].Loc.Less(ois[j].Loc) }
396
397func collapseObjectInfos(prefix ulloc.Location, infos []ulfs.ObjectInfo) []ulfs.ObjectInfo {
398	collapsing := false
399	current := ""
400	j := 0
401
402	for _, oi := range infos {
403		first, ok := oi.Loc.ListKeyName(prefix)
404		if ok {
405			if collapsing && first == current {
406				continue
407			}
408
409			collapsing = true
410			current = first
411
412			oi.IsPrefix = true
413		}
414
415		if bucket, _, ok := oi.Loc.RemoteParts(); ok {
416			oi.Loc = ulloc.NewRemote(bucket, first)
417		} else if _, ok := oi.Loc.LocalParts(); ok {
418			oi.Loc = ulloc.NewLocal(first)
419		} else {
420			panic("invalid object returned from list")
421		}
422
423		infos[j] = oi
424		j++
425	}
426
427	return infos[:j]
428}
429