1//go:build !nos3
2// +build !nos3
3
4package vfs
5
6import (
7	"context"
8	"fmt"
9	"mime"
10	"net/url"
11	"os"
12	"path"
13	"path/filepath"
14	"strings"
15	"time"
16
17	"github.com/aws/aws-sdk-go/aws"
18	"github.com/aws/aws-sdk-go/aws/awserr"
19	"github.com/aws/aws-sdk-go/aws/credentials"
20	"github.com/aws/aws-sdk-go/aws/request"
21	"github.com/aws/aws-sdk-go/aws/session"
22	"github.com/aws/aws-sdk-go/service/s3"
23	"github.com/aws/aws-sdk-go/service/s3/s3manager"
24	"github.com/eikenb/pipeat"
25	"github.com/pkg/sftp"
26
27	"github.com/drakkan/sftpgo/v2/logger"
28	"github.com/drakkan/sftpgo/v2/metric"
29	"github.com/drakkan/sftpgo/v2/util"
30	"github.com/drakkan/sftpgo/v2/version"
31)
32
33// using this mime type for directories improves compatibility with s3fs-fuse
34const s3DirMimeType = "application/x-directory"
35
36// S3Fs is a Fs implementation for AWS S3 compatible object storages
37type S3Fs struct {
38	connectionID string
39	localTempDir string
40	// if not empty this fs is mouted as virtual folder in the specified path
41	mountPath      string
42	config         *S3FsConfig
43	svc            *s3.S3
44	ctxTimeout     time.Duration
45	ctxLongTimeout time.Duration
46}
47
48func init() {
49	version.AddFeature("+s3")
50}
51
52// NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible
53// object storage
54func NewS3Fs(connectionID, localTempDir, mountPath string, config S3FsConfig) (Fs, error) {
55	if localTempDir == "" {
56		if tempPath != "" {
57			localTempDir = tempPath
58		} else {
59			localTempDir = filepath.Clean(os.TempDir())
60		}
61	}
62	fs := &S3Fs{
63		connectionID:   connectionID,
64		localTempDir:   localTempDir,
65		mountPath:      mountPath,
66		config:         &config,
67		ctxTimeout:     30 * time.Second,
68		ctxLongTimeout: 300 * time.Second,
69	}
70	if err := fs.config.Validate(); err != nil {
71		return fs, err
72	}
73	awsConfig := aws.NewConfig()
74
75	if fs.config.Region != "" {
76		awsConfig.WithRegion(fs.config.Region)
77	}
78
79	if !fs.config.AccessSecret.IsEmpty() {
80		if err := fs.config.AccessSecret.TryDecrypt(); err != nil {
81			return fs, err
82		}
83		awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), "")
84	}
85
86	if fs.config.Endpoint != "" {
87		awsConfig.Endpoint = aws.String(fs.config.Endpoint)
88	}
89	if fs.config.ForcePathStyle {
90		awsConfig.S3ForcePathStyle = aws.Bool(true)
91	}
92	if fs.config.UploadPartSize == 0 {
93		fs.config.UploadPartSize = s3manager.DefaultUploadPartSize
94	} else {
95		fs.config.UploadPartSize *= 1024 * 1024
96	}
97	if fs.config.UploadConcurrency == 0 {
98		fs.config.UploadConcurrency = s3manager.DefaultUploadConcurrency
99	}
100	if fs.config.DownloadPartSize == 0 {
101		fs.config.DownloadPartSize = s3manager.DefaultDownloadPartSize
102	} else {
103		fs.config.DownloadPartSize *= 1024 * 1024
104	}
105	if fs.config.DownloadConcurrency == 0 {
106		fs.config.DownloadConcurrency = s3manager.DefaultDownloadConcurrency
107	}
108
109	sessOpts := session.Options{
110		Config:            *awsConfig,
111		SharedConfigState: session.SharedConfigEnable,
112	}
113	sess, err := session.NewSessionWithOptions(sessOpts)
114	if err != nil {
115		return fs, err
116	}
117	fs.svc = s3.New(sess)
118	return fs, nil
119}
120
121// Name returns the name for the Fs implementation
122func (fs *S3Fs) Name() string {
123	return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket)
124}
125
126// ConnectionID returns the connection ID associated to this Fs implementation
127func (fs *S3Fs) ConnectionID() string {
128	return fs.connectionID
129}
130
131// Stat returns a FileInfo describing the named file
132func (fs *S3Fs) Stat(name string) (os.FileInfo, error) {
133	var result *FileInfo
134	if name == "/" || name == "." {
135		err := fs.checkIfBucketExists()
136		if err != nil {
137			return result, err
138		}
139		return NewFileInfo(name, true, 0, time.Now(), false), nil
140	}
141	if "/"+fs.config.KeyPrefix == name+"/" {
142		return NewFileInfo(name, true, 0, time.Now(), false), nil
143	}
144	obj, err := fs.headObject(name)
145	if err == nil {
146		// a "dir" has a trailing "/" so we cannot have a directory here
147		objSize := *obj.ContentLength
148		objectModTime := *obj.LastModified
149		return NewFileInfo(name, false, objSize, objectModTime, false), nil
150	}
151	if !fs.IsNotExist(err) {
152		return result, err
153	}
154	// now check if this is a prefix (virtual directory)
155	hasContents, err := fs.hasContents(name)
156	if err == nil && hasContents {
157		return NewFileInfo(name, true, 0, time.Now(), false), nil
158	} else if err != nil {
159		return nil, err
160	}
161	// the requested file may still be a directory as a zero bytes key
162	// with a trailing forward slash (created using mkdir).
163	// S3 doesn't return content type when listing objects, so we have
164	// create "dirs" adding a trailing "/" to the key
165	return fs.getStatForDir(name)
166}
167
168func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) {
169	var result *FileInfo
170	obj, err := fs.headObject(name + "/")
171	if err != nil {
172		return result, err
173	}
174	objSize := *obj.ContentLength
175	objectModTime := *obj.LastModified
176	return NewFileInfo(name, true, objSize, objectModTime, false), nil
177}
178
179// Lstat returns a FileInfo describing the named file
180func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
181	return fs.Stat(name)
182}
183
184// Open opens the named file for reading
185func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
186	r, w, err := pipeat.PipeInDir(fs.localTempDir)
187	if err != nil {
188		return nil, nil, nil, err
189	}
190	ctx, cancelFn := context.WithCancel(context.Background())
191	downloader := s3manager.NewDownloaderWithClient(fs.svc)
192	if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
193		downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
194			chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second)
195			r.SetContext(chunkCtx)
196
197			go func() {
198				<-ctx.Done()
199				cancel()
200			}()
201		})
202	}
203	var streamRange *string
204	if offset > 0 {
205		streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
206	}
207
208	go func() {
209		defer cancelFn()
210		n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
211			Bucket: aws.String(fs.config.Bucket),
212			Key:    aws.String(name),
213			Range:  streamRange,
214		}, func(d *s3manager.Downloader) {
215			d.Concurrency = fs.config.DownloadConcurrency
216			d.PartSize = fs.config.DownloadPartSize
217		})
218		w.CloseWithError(err) //nolint:errcheck
219		fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
220		metric.S3TransferCompleted(n, 1, err)
221	}()
222	return nil, r, cancelFn, nil
223}
224
225// Create creates or opens the named file for writing
226func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
227	r, w, err := pipeat.PipeInDir(fs.localTempDir)
228	if err != nil {
229		return nil, nil, nil, err
230	}
231	p := NewPipeWriter(w)
232	ctx, cancelFn := context.WithCancel(context.Background())
233	uploader := s3manager.NewUploaderWithClient(fs.svc)
234	go func() {
235		defer cancelFn()
236		key := name
237		var contentType string
238		if flag == -1 {
239			contentType = s3DirMimeType
240		} else {
241			contentType = mime.TypeByExtension(path.Ext(name))
242		}
243		response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
244			Bucket:       aws.String(fs.config.Bucket),
245			Key:          aws.String(key),
246			Body:         r,
247			ACL:          util.NilIfEmpty(fs.config.ACL),
248			StorageClass: util.NilIfEmpty(fs.config.StorageClass),
249			ContentType:  util.NilIfEmpty(contentType),
250		}, func(u *s3manager.Uploader) {
251			u.Concurrency = fs.config.UploadConcurrency
252			u.PartSize = fs.config.UploadPartSize
253		})
254		r.CloseWithError(err) //nolint:errcheck
255		p.Done(err)
256		fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, response: %v, readed bytes: %v, err: %+v",
257			name, fs.config.ACL, response, r.GetReadedBytes(), err)
258		metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
259	}()
260	return nil, p, cancelFn, nil
261}
262
263// Rename renames (moves) source to target.
264// We don't support renaming non empty directories since we should
265// rename all the contents too and this could take long time: think
266// about directories with thousands of files, for each file we should
267// execute a CopyObject call.
268// TODO: rename does not work for files bigger than 5GB, implement
269// multipart copy or wait for this pull request to be merged:
270//
271// https://github.com/aws/aws-sdk-go/pull/2653
272//
273func (fs *S3Fs) Rename(source, target string) error {
274	if source == target {
275		return nil
276	}
277	fi, err := fs.Stat(source)
278	if err != nil {
279		return err
280	}
281	copySource := fs.Join(fs.config.Bucket, source)
282	if fi.IsDir() {
283		hasContents, err := fs.hasContents(source)
284		if err != nil {
285			return err
286		}
287		if hasContents {
288			return fmt.Errorf("cannot rename non empty directory: %#v", source)
289		}
290		if !strings.HasSuffix(copySource, "/") {
291			copySource += "/"
292		}
293		if !strings.HasSuffix(target, "/") {
294			target += "/"
295		}
296	}
297	var contentType string
298	if fi.IsDir() {
299		contentType = s3DirMimeType
300	} else {
301		contentType = mime.TypeByExtension(path.Ext(source))
302	}
303	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
304	defer cancelFn()
305	_, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
306		Bucket:       aws.String(fs.config.Bucket),
307		CopySource:   aws.String(pathEscape(copySource)),
308		Key:          aws.String(target),
309		StorageClass: util.NilIfEmpty(fs.config.StorageClass),
310		ACL:          util.NilIfEmpty(fs.config.ACL),
311		ContentType:  util.NilIfEmpty(contentType),
312	})
313	if err != nil {
314		metric.S3CopyObjectCompleted(err)
315		return err
316	}
317	err = fs.svc.WaitUntilObjectExistsWithContext(ctx, &s3.HeadObjectInput{
318		Bucket: aws.String(fs.config.Bucket),
319		Key:    aws.String(target),
320	})
321	metric.S3CopyObjectCompleted(err)
322	if err != nil {
323		return err
324	}
325	return fs.Remove(source, fi.IsDir())
326}
327
328// Remove removes the named file or (empty) directory.
329func (fs *S3Fs) Remove(name string, isDir bool) error {
330	if isDir {
331		hasContents, err := fs.hasContents(name)
332		if err != nil {
333			return err
334		}
335		if hasContents {
336			return fmt.Errorf("cannot remove non empty directory: %#v", name)
337		}
338		if !strings.HasSuffix(name, "/") {
339			name += "/"
340		}
341	}
342	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
343	defer cancelFn()
344	_, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
345		Bucket: aws.String(fs.config.Bucket),
346		Key:    aws.String(name),
347	})
348	metric.S3DeleteObjectCompleted(err)
349	return err
350}
351
352// Mkdir creates a new directory with the specified name and default permissions
353func (fs *S3Fs) Mkdir(name string) error {
354	_, err := fs.Stat(name)
355	if !fs.IsNotExist(err) {
356		return err
357	}
358	if !strings.HasSuffix(name, "/") {
359		name += "/"
360	}
361	_, w, _, err := fs.Create(name, -1)
362	if err != nil {
363		return err
364	}
365	return w.Close()
366}
367
368// MkdirAll does nothing, we don't have folder
369func (*S3Fs) MkdirAll(name string, uid int, gid int) error {
370	return nil
371}
372
373// Symlink creates source as a symbolic link to target.
374func (*S3Fs) Symlink(source, target string) error {
375	return ErrVfsUnsupported
376}
377
378// Readlink returns the destination of the named symbolic link
379func (*S3Fs) Readlink(name string) (string, error) {
380	return "", ErrVfsUnsupported
381}
382
383// Chown changes the numeric uid and gid of the named file.
384func (*S3Fs) Chown(name string, uid int, gid int) error {
385	return ErrVfsUnsupported
386}
387
388// Chmod changes the mode of the named file to mode.
389func (*S3Fs) Chmod(name string, mode os.FileMode) error {
390	return ErrVfsUnsupported
391}
392
393// Chtimes changes the access and modification times of the named file.
394func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error {
395	return ErrVfsUnsupported
396}
397
398// Truncate changes the size of the named file.
399// Truncate by path is not supported, while truncating an opened
400// file is handled inside base transfer
401func (*S3Fs) Truncate(name string, size int64) error {
402	return ErrVfsUnsupported
403}
404
405// ReadDir reads the directory named by dirname and returns
406// a list of directory entries.
407func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
408	var result []os.FileInfo
409	// dirname must be already cleaned
410	prefix := ""
411	if dirname != "/" && dirname != "." {
412		prefix = strings.TrimPrefix(dirname, "/")
413		if !strings.HasSuffix(prefix, "/") {
414			prefix += "/"
415		}
416	}
417
418	prefixes := make(map[string]bool)
419
420	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
421	defer cancelFn()
422	err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
423		Bucket:    aws.String(fs.config.Bucket),
424		Prefix:    aws.String(prefix),
425		Delimiter: aws.String("/"),
426	}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
427		for _, p := range page.CommonPrefixes {
428			// prefixes have a trailing slash
429			name, _ := fs.resolve(p.Prefix, prefix)
430			if name == "" {
431				continue
432			}
433			if _, ok := prefixes[name]; ok {
434				continue
435			}
436			result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
437			prefixes[name] = true
438		}
439		for _, fileObject := range page.Contents {
440			objectSize := *fileObject.Size
441			objectModTime := *fileObject.LastModified
442			name, isDir := fs.resolve(fileObject.Key, prefix)
443			if name == "" {
444				continue
445			}
446			if isDir {
447				if _, ok := prefixes[name]; ok {
448					continue
449				}
450				prefixes[name] = true
451			}
452			result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false))
453		}
454		return true
455	})
456	metric.S3ListObjectsCompleted(err)
457	return result, err
458}
459
460// IsUploadResumeSupported returns true if resuming uploads is supported.
461// Resuming uploads is not supported on S3
462func (*S3Fs) IsUploadResumeSupported() bool {
463	return false
464}
465
466// IsAtomicUploadSupported returns true if atomic upload is supported.
467// S3 uploads are already atomic, we don't need to upload to a temporary
468// file
469func (*S3Fs) IsAtomicUploadSupported() bool {
470	return false
471}
472
473// IsNotExist returns a boolean indicating whether the error is known to
474// report that a file or directory does not exist
475func (*S3Fs) IsNotExist(err error) bool {
476	if err == nil {
477		return false
478	}
479	if aerr, ok := err.(awserr.Error); ok {
480		if aerr.Code() == s3.ErrCodeNoSuchKey {
481			return true
482		}
483		if aerr.Code() == s3.ErrCodeNoSuchBucket {
484			return true
485		}
486	}
487	if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
488		if multierr.Code() == s3.ErrCodeNoSuchKey {
489			return true
490		}
491		if multierr.Code() == s3.ErrCodeNoSuchBucket {
492			return true
493		}
494	}
495	return strings.Contains(err.Error(), "404")
496}
497
498// IsPermission returns a boolean indicating whether the error is known to
499// report that permission is denied.
500func (*S3Fs) IsPermission(err error) bool {
501	if err == nil {
502		return false
503	}
504	return strings.Contains(err.Error(), "403")
505}
506
507// IsNotSupported returns true if the error indicate an unsupported operation
508func (*S3Fs) IsNotSupported(err error) bool {
509	if err == nil {
510		return false
511	}
512	return err == ErrVfsUnsupported
513}
514
515// CheckRootPath creates the specified local root directory if it does not exists
516func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
517	// we need a local directory for temporary files
518	osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
519	return osFs.CheckRootPath(username, uid, gid)
520}
521
522// ScanRootDirContents returns the number of files contained in the bucket,
523// and their size
524func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
525	numFiles := 0
526	size := int64(0)
527	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
528	defer cancelFn()
529	err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
530		Bucket: aws.String(fs.config.Bucket),
531		Prefix: aws.String(fs.config.KeyPrefix),
532	}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
533		for _, fileObject := range page.Contents {
534			isDir := strings.HasSuffix(*fileObject.Key, "/")
535			if isDir && *fileObject.Size == 0 {
536				continue
537			}
538			numFiles++
539			size += *fileObject.Size
540		}
541		return true
542	})
543	metric.S3ListObjectsCompleted(err)
544	return numFiles, size, err
545}
546
547// GetDirSize returns the number of files and the size for a folder
548// including any subfolders
549func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
550	return 0, 0, ErrVfsUnsupported
551}
552
553// GetAtomicUploadPath returns the path to use for an atomic upload.
554// S3 uploads are already atomic, we never call this method for S3
555func (*S3Fs) GetAtomicUploadPath(name string) string {
556	return ""
557}
558
559// GetRelativePath returns the path for a file relative to the user's home dir.
560// This is the path as seen by SFTPGo users
561func (fs *S3Fs) GetRelativePath(name string) string {
562	rel := path.Clean(name)
563	if rel == "." {
564		rel = ""
565	}
566	if !path.IsAbs(rel) {
567		return "/" + rel
568	}
569	if fs.config.KeyPrefix != "" {
570		if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
571			rel = "/"
572		}
573		rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
574	}
575	if fs.mountPath != "" {
576		rel = path.Join(fs.mountPath, rel)
577	}
578	return rel
579}
580
581// Walk walks the file tree rooted at root, calling walkFn for each file or
582// directory in the tree, including root. The result are unordered
583func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
584	prefix := ""
585	if root != "/" && root != "." {
586		prefix = strings.TrimPrefix(root, "/")
587		if !strings.HasSuffix(prefix, "/") {
588			prefix += "/"
589		}
590	}
591	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
592	defer cancelFn()
593	err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
594		Bucket: aws.String(fs.config.Bucket),
595		Prefix: aws.String(prefix),
596	}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
597		for _, fileObject := range page.Contents {
598			objectSize := *fileObject.Size
599			objectModTime := *fileObject.LastModified
600			isDir := strings.HasSuffix(*fileObject.Key, "/")
601			name := path.Clean(*fileObject.Key)
602			if name == "/" || name == "." {
603				continue
604			}
605			err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil)
606			if err != nil {
607				return false
608			}
609		}
610		return true
611	})
612	metric.S3ListObjectsCompleted(err)
613	walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
614
615	return err
616}
617
618// Join joins any number of path elements into a single path
619func (*S3Fs) Join(elem ...string) string {
620	return path.Join(elem...)
621}
622
623// HasVirtualFolders returns true if folders are emulated
624func (*S3Fs) HasVirtualFolders() bool {
625	return true
626}
627
628// ResolvePath returns the matching filesystem path for the specified virtual path
629func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) {
630	if fs.mountPath != "" {
631		virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
632	}
633	if !path.IsAbs(virtualPath) {
634		virtualPath = path.Clean("/" + virtualPath)
635	}
636	return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil
637}
638
639func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) {
640	result := strings.TrimPrefix(*name, prefix)
641	isDir := strings.HasSuffix(result, "/")
642	if isDir {
643		result = strings.TrimSuffix(result, "/")
644	}
645	if strings.Contains(result, "/") {
646		i := strings.Index(result, "/")
647		isDir = true
648		result = result[:i]
649	}
650	return result, isDir
651}
652
653func (fs *S3Fs) checkIfBucketExists() error {
654	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
655	defer cancelFn()
656	_, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
657		Bucket: aws.String(fs.config.Bucket),
658	})
659	metric.S3HeadBucketCompleted(err)
660	return err
661}
662
663func (fs *S3Fs) hasContents(name string) (bool, error) {
664	prefix := ""
665	if name != "/" && name != "." {
666		prefix = strings.TrimPrefix(name, "/")
667		if !strings.HasSuffix(prefix, "/") {
668			prefix += "/"
669		}
670	}
671	maxResults := int64(2)
672	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
673	defer cancelFn()
674	results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
675		Bucket:  aws.String(fs.config.Bucket),
676		Prefix:  aws.String(prefix),
677		MaxKeys: &maxResults,
678	})
679	metric.S3ListObjectsCompleted(err)
680	if err != nil {
681		return false, err
682	}
683	// MinIO returns no contents while S3 returns 1 object
684	// with the key equal to the prefix for empty directories
685	for _, obj := range results.Contents {
686		name, _ := fs.resolve(obj.Key, prefix)
687		if name == "" || name == "/" {
688			continue
689		}
690		return true, nil
691	}
692	return false, nil
693}
694
695func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
696	ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
697	defer cancelFn()
698	obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
699		Bucket: aws.String(fs.config.Bucket),
700		Key:    aws.String(name),
701	})
702	metric.S3HeadObjectCompleted(err)
703	return obj, err
704}
705
706// GetMimeType returns the content type
707func (fs *S3Fs) GetMimeType(name string) (string, error) {
708	obj, err := fs.headObject(name)
709	if err != nil {
710		return "", err
711	}
712	return *obj.ContentType, err
713}
714
715// Close closes the fs
716func (*S3Fs) Close() error {
717	return nil
718}
719
720// GetAvailableDiskSize return the available size for the specified path
721func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
722	return nil, ErrStorageSizeUnavailable
723}
724
725// ideally we should simply use url.PathEscape:
726//
727// https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65
728//
729// but this cause issue with some vendors, see #483, the code below is copied from rclone
730func pathEscape(in string) string {
731	var u url.URL
732	u.Path = in
733	return strings.ReplaceAll(u.String(), "+", "%2B")
734}
735