1//go:build !nos3 2// +build !nos3 3 4package vfs 5 6import ( 7 "context" 8 "fmt" 9 "mime" 10 "net/url" 11 "os" 12 "path" 13 "path/filepath" 14 "strings" 15 "time" 16 17 "github.com/aws/aws-sdk-go/aws" 18 "github.com/aws/aws-sdk-go/aws/awserr" 19 "github.com/aws/aws-sdk-go/aws/credentials" 20 "github.com/aws/aws-sdk-go/aws/request" 21 "github.com/aws/aws-sdk-go/aws/session" 22 "github.com/aws/aws-sdk-go/service/s3" 23 "github.com/aws/aws-sdk-go/service/s3/s3manager" 24 "github.com/eikenb/pipeat" 25 "github.com/pkg/sftp" 26 27 "github.com/drakkan/sftpgo/v2/logger" 28 "github.com/drakkan/sftpgo/v2/metric" 29 "github.com/drakkan/sftpgo/v2/util" 30 "github.com/drakkan/sftpgo/v2/version" 31) 32 33// using this mime type for directories improves compatibility with s3fs-fuse 34const s3DirMimeType = "application/x-directory" 35 36// S3Fs is a Fs implementation for AWS S3 compatible object storages 37type S3Fs struct { 38 connectionID string 39 localTempDir string 40 // if not empty this fs is mouted as virtual folder in the specified path 41 mountPath string 42 config *S3FsConfig 43 svc *s3.S3 44 ctxTimeout time.Duration 45 ctxLongTimeout time.Duration 46} 47 48func init() { 49 version.AddFeature("+s3") 50} 51 52// NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible 53// object storage 54func NewS3Fs(connectionID, localTempDir, mountPath string, config S3FsConfig) (Fs, error) { 55 if localTempDir == "" { 56 if tempPath != "" { 57 localTempDir = tempPath 58 } else { 59 localTempDir = filepath.Clean(os.TempDir()) 60 } 61 } 62 fs := &S3Fs{ 63 connectionID: connectionID, 64 localTempDir: localTempDir, 65 mountPath: mountPath, 66 config: &config, 67 ctxTimeout: 30 * time.Second, 68 ctxLongTimeout: 300 * time.Second, 69 } 70 if err := fs.config.Validate(); err != nil { 71 return fs, err 72 } 73 awsConfig := aws.NewConfig() 74 75 if fs.config.Region != "" { 76 awsConfig.WithRegion(fs.config.Region) 77 } 78 79 if !fs.config.AccessSecret.IsEmpty() { 80 if err := fs.config.AccessSecret.TryDecrypt(); err != nil { 81 return fs, err 82 } 83 awsConfig.Credentials = credentials.NewStaticCredentials(fs.config.AccessKey, fs.config.AccessSecret.GetPayload(), "") 84 } 85 86 if fs.config.Endpoint != "" { 87 awsConfig.Endpoint = aws.String(fs.config.Endpoint) 88 } 89 if fs.config.ForcePathStyle { 90 awsConfig.S3ForcePathStyle = aws.Bool(true) 91 } 92 if fs.config.UploadPartSize == 0 { 93 fs.config.UploadPartSize = s3manager.DefaultUploadPartSize 94 } else { 95 fs.config.UploadPartSize *= 1024 * 1024 96 } 97 if fs.config.UploadConcurrency == 0 { 98 fs.config.UploadConcurrency = s3manager.DefaultUploadConcurrency 99 } 100 if fs.config.DownloadPartSize == 0 { 101 fs.config.DownloadPartSize = s3manager.DefaultDownloadPartSize 102 } else { 103 fs.config.DownloadPartSize *= 1024 * 1024 104 } 105 if fs.config.DownloadConcurrency == 0 { 106 fs.config.DownloadConcurrency = s3manager.DefaultDownloadConcurrency 107 } 108 109 sessOpts := session.Options{ 110 Config: *awsConfig, 111 SharedConfigState: session.SharedConfigEnable, 112 } 113 sess, err := session.NewSessionWithOptions(sessOpts) 114 if err != nil { 115 return fs, err 116 } 117 fs.svc = s3.New(sess) 118 return fs, nil 119} 120 121// Name returns the name for the Fs implementation 122func (fs *S3Fs) Name() string { 123 return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket) 124} 125 126// ConnectionID returns the connection ID associated to this Fs implementation 127func (fs *S3Fs) ConnectionID() string { 128 return fs.connectionID 129} 130 131// Stat returns a FileInfo describing the named file 132func (fs *S3Fs) Stat(name string) (os.FileInfo, error) { 133 var result *FileInfo 134 if name == "/" || name == "." { 135 err := fs.checkIfBucketExists() 136 if err != nil { 137 return result, err 138 } 139 return NewFileInfo(name, true, 0, time.Now(), false), nil 140 } 141 if "/"+fs.config.KeyPrefix == name+"/" { 142 return NewFileInfo(name, true, 0, time.Now(), false), nil 143 } 144 obj, err := fs.headObject(name) 145 if err == nil { 146 // a "dir" has a trailing "/" so we cannot have a directory here 147 objSize := *obj.ContentLength 148 objectModTime := *obj.LastModified 149 return NewFileInfo(name, false, objSize, objectModTime, false), nil 150 } 151 if !fs.IsNotExist(err) { 152 return result, err 153 } 154 // now check if this is a prefix (virtual directory) 155 hasContents, err := fs.hasContents(name) 156 if err == nil && hasContents { 157 return NewFileInfo(name, true, 0, time.Now(), false), nil 158 } else if err != nil { 159 return nil, err 160 } 161 // the requested file may still be a directory as a zero bytes key 162 // with a trailing forward slash (created using mkdir). 163 // S3 doesn't return content type when listing objects, so we have 164 // create "dirs" adding a trailing "/" to the key 165 return fs.getStatForDir(name) 166} 167 168func (fs *S3Fs) getStatForDir(name string) (os.FileInfo, error) { 169 var result *FileInfo 170 obj, err := fs.headObject(name + "/") 171 if err != nil { 172 return result, err 173 } 174 objSize := *obj.ContentLength 175 objectModTime := *obj.LastModified 176 return NewFileInfo(name, true, objSize, objectModTime, false), nil 177} 178 179// Lstat returns a FileInfo describing the named file 180func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) { 181 return fs.Stat(name) 182} 183 184// Open opens the named file for reading 185func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) { 186 r, w, err := pipeat.PipeInDir(fs.localTempDir) 187 if err != nil { 188 return nil, nil, nil, err 189 } 190 ctx, cancelFn := context.WithCancel(context.Background()) 191 downloader := s3manager.NewDownloaderWithClient(fs.svc) 192 if offset == 0 && fs.config.DownloadPartMaxTime > 0 { 193 downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) { 194 chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second) 195 r.SetContext(chunkCtx) 196 197 go func() { 198 <-ctx.Done() 199 cancel() 200 }() 201 }) 202 } 203 var streamRange *string 204 if offset > 0 { 205 streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset)) 206 } 207 208 go func() { 209 defer cancelFn() 210 n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{ 211 Bucket: aws.String(fs.config.Bucket), 212 Key: aws.String(name), 213 Range: streamRange, 214 }, func(d *s3manager.Downloader) { 215 d.Concurrency = fs.config.DownloadConcurrency 216 d.PartSize = fs.config.DownloadPartSize 217 }) 218 w.CloseWithError(err) //nolint:errcheck 219 fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err) 220 metric.S3TransferCompleted(n, 1, err) 221 }() 222 return nil, r, cancelFn, nil 223} 224 225// Create creates or opens the named file for writing 226func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) { 227 r, w, err := pipeat.PipeInDir(fs.localTempDir) 228 if err != nil { 229 return nil, nil, nil, err 230 } 231 p := NewPipeWriter(w) 232 ctx, cancelFn := context.WithCancel(context.Background()) 233 uploader := s3manager.NewUploaderWithClient(fs.svc) 234 go func() { 235 defer cancelFn() 236 key := name 237 var contentType string 238 if flag == -1 { 239 contentType = s3DirMimeType 240 } else { 241 contentType = mime.TypeByExtension(path.Ext(name)) 242 } 243 response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{ 244 Bucket: aws.String(fs.config.Bucket), 245 Key: aws.String(key), 246 Body: r, 247 ACL: util.NilIfEmpty(fs.config.ACL), 248 StorageClass: util.NilIfEmpty(fs.config.StorageClass), 249 ContentType: util.NilIfEmpty(contentType), 250 }, func(u *s3manager.Uploader) { 251 u.Concurrency = fs.config.UploadConcurrency 252 u.PartSize = fs.config.UploadPartSize 253 }) 254 r.CloseWithError(err) //nolint:errcheck 255 p.Done(err) 256 fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, acl: %#v, response: %v, readed bytes: %v, err: %+v", 257 name, fs.config.ACL, response, r.GetReadedBytes(), err) 258 metric.S3TransferCompleted(r.GetReadedBytes(), 0, err) 259 }() 260 return nil, p, cancelFn, nil 261} 262 263// Rename renames (moves) source to target. 264// We don't support renaming non empty directories since we should 265// rename all the contents too and this could take long time: think 266// about directories with thousands of files, for each file we should 267// execute a CopyObject call. 268// TODO: rename does not work for files bigger than 5GB, implement 269// multipart copy or wait for this pull request to be merged: 270// 271// https://github.com/aws/aws-sdk-go/pull/2653 272// 273func (fs *S3Fs) Rename(source, target string) error { 274 if source == target { 275 return nil 276 } 277 fi, err := fs.Stat(source) 278 if err != nil { 279 return err 280 } 281 copySource := fs.Join(fs.config.Bucket, source) 282 if fi.IsDir() { 283 hasContents, err := fs.hasContents(source) 284 if err != nil { 285 return err 286 } 287 if hasContents { 288 return fmt.Errorf("cannot rename non empty directory: %#v", source) 289 } 290 if !strings.HasSuffix(copySource, "/") { 291 copySource += "/" 292 } 293 if !strings.HasSuffix(target, "/") { 294 target += "/" 295 } 296 } 297 var contentType string 298 if fi.IsDir() { 299 contentType = s3DirMimeType 300 } else { 301 contentType = mime.TypeByExtension(path.Ext(source)) 302 } 303 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 304 defer cancelFn() 305 _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{ 306 Bucket: aws.String(fs.config.Bucket), 307 CopySource: aws.String(pathEscape(copySource)), 308 Key: aws.String(target), 309 StorageClass: util.NilIfEmpty(fs.config.StorageClass), 310 ACL: util.NilIfEmpty(fs.config.ACL), 311 ContentType: util.NilIfEmpty(contentType), 312 }) 313 if err != nil { 314 metric.S3CopyObjectCompleted(err) 315 return err 316 } 317 err = fs.svc.WaitUntilObjectExistsWithContext(ctx, &s3.HeadObjectInput{ 318 Bucket: aws.String(fs.config.Bucket), 319 Key: aws.String(target), 320 }) 321 metric.S3CopyObjectCompleted(err) 322 if err != nil { 323 return err 324 } 325 return fs.Remove(source, fi.IsDir()) 326} 327 328// Remove removes the named file or (empty) directory. 329func (fs *S3Fs) Remove(name string, isDir bool) error { 330 if isDir { 331 hasContents, err := fs.hasContents(name) 332 if err != nil { 333 return err 334 } 335 if hasContents { 336 return fmt.Errorf("cannot remove non empty directory: %#v", name) 337 } 338 if !strings.HasSuffix(name, "/") { 339 name += "/" 340 } 341 } 342 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 343 defer cancelFn() 344 _, err := fs.svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ 345 Bucket: aws.String(fs.config.Bucket), 346 Key: aws.String(name), 347 }) 348 metric.S3DeleteObjectCompleted(err) 349 return err 350} 351 352// Mkdir creates a new directory with the specified name and default permissions 353func (fs *S3Fs) Mkdir(name string) error { 354 _, err := fs.Stat(name) 355 if !fs.IsNotExist(err) { 356 return err 357 } 358 if !strings.HasSuffix(name, "/") { 359 name += "/" 360 } 361 _, w, _, err := fs.Create(name, -1) 362 if err != nil { 363 return err 364 } 365 return w.Close() 366} 367 368// MkdirAll does nothing, we don't have folder 369func (*S3Fs) MkdirAll(name string, uid int, gid int) error { 370 return nil 371} 372 373// Symlink creates source as a symbolic link to target. 374func (*S3Fs) Symlink(source, target string) error { 375 return ErrVfsUnsupported 376} 377 378// Readlink returns the destination of the named symbolic link 379func (*S3Fs) Readlink(name string) (string, error) { 380 return "", ErrVfsUnsupported 381} 382 383// Chown changes the numeric uid and gid of the named file. 384func (*S3Fs) Chown(name string, uid int, gid int) error { 385 return ErrVfsUnsupported 386} 387 388// Chmod changes the mode of the named file to mode. 389func (*S3Fs) Chmod(name string, mode os.FileMode) error { 390 return ErrVfsUnsupported 391} 392 393// Chtimes changes the access and modification times of the named file. 394func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error { 395 return ErrVfsUnsupported 396} 397 398// Truncate changes the size of the named file. 399// Truncate by path is not supported, while truncating an opened 400// file is handled inside base transfer 401func (*S3Fs) Truncate(name string, size int64) error { 402 return ErrVfsUnsupported 403} 404 405// ReadDir reads the directory named by dirname and returns 406// a list of directory entries. 407func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) { 408 var result []os.FileInfo 409 // dirname must be already cleaned 410 prefix := "" 411 if dirname != "/" && dirname != "." { 412 prefix = strings.TrimPrefix(dirname, "/") 413 if !strings.HasSuffix(prefix, "/") { 414 prefix += "/" 415 } 416 } 417 418 prefixes := make(map[string]bool) 419 420 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 421 defer cancelFn() 422 err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{ 423 Bucket: aws.String(fs.config.Bucket), 424 Prefix: aws.String(prefix), 425 Delimiter: aws.String("/"), 426 }, func(page *s3.ListObjectsV2Output, lastPage bool) bool { 427 for _, p := range page.CommonPrefixes { 428 // prefixes have a trailing slash 429 name, _ := fs.resolve(p.Prefix, prefix) 430 if name == "" { 431 continue 432 } 433 if _, ok := prefixes[name]; ok { 434 continue 435 } 436 result = append(result, NewFileInfo(name, true, 0, time.Now(), false)) 437 prefixes[name] = true 438 } 439 for _, fileObject := range page.Contents { 440 objectSize := *fileObject.Size 441 objectModTime := *fileObject.LastModified 442 name, isDir := fs.resolve(fileObject.Key, prefix) 443 if name == "" { 444 continue 445 } 446 if isDir { 447 if _, ok := prefixes[name]; ok { 448 continue 449 } 450 prefixes[name] = true 451 } 452 result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false)) 453 } 454 return true 455 }) 456 metric.S3ListObjectsCompleted(err) 457 return result, err 458} 459 460// IsUploadResumeSupported returns true if resuming uploads is supported. 461// Resuming uploads is not supported on S3 462func (*S3Fs) IsUploadResumeSupported() bool { 463 return false 464} 465 466// IsAtomicUploadSupported returns true if atomic upload is supported. 467// S3 uploads are already atomic, we don't need to upload to a temporary 468// file 469func (*S3Fs) IsAtomicUploadSupported() bool { 470 return false 471} 472 473// IsNotExist returns a boolean indicating whether the error is known to 474// report that a file or directory does not exist 475func (*S3Fs) IsNotExist(err error) bool { 476 if err == nil { 477 return false 478 } 479 if aerr, ok := err.(awserr.Error); ok { 480 if aerr.Code() == s3.ErrCodeNoSuchKey { 481 return true 482 } 483 if aerr.Code() == s3.ErrCodeNoSuchBucket { 484 return true 485 } 486 } 487 if multierr, ok := err.(s3manager.MultiUploadFailure); ok { 488 if multierr.Code() == s3.ErrCodeNoSuchKey { 489 return true 490 } 491 if multierr.Code() == s3.ErrCodeNoSuchBucket { 492 return true 493 } 494 } 495 return strings.Contains(err.Error(), "404") 496} 497 498// IsPermission returns a boolean indicating whether the error is known to 499// report that permission is denied. 500func (*S3Fs) IsPermission(err error) bool { 501 if err == nil { 502 return false 503 } 504 return strings.Contains(err.Error(), "403") 505} 506 507// IsNotSupported returns true if the error indicate an unsupported operation 508func (*S3Fs) IsNotSupported(err error) bool { 509 if err == nil { 510 return false 511 } 512 return err == ErrVfsUnsupported 513} 514 515// CheckRootPath creates the specified local root directory if it does not exists 516func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool { 517 // we need a local directory for temporary files 518 osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "") 519 return osFs.CheckRootPath(username, uid, gid) 520} 521 522// ScanRootDirContents returns the number of files contained in the bucket, 523// and their size 524func (fs *S3Fs) ScanRootDirContents() (int, int64, error) { 525 numFiles := 0 526 size := int64(0) 527 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) 528 defer cancelFn() 529 err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{ 530 Bucket: aws.String(fs.config.Bucket), 531 Prefix: aws.String(fs.config.KeyPrefix), 532 }, func(page *s3.ListObjectsV2Output, lastPage bool) bool { 533 for _, fileObject := range page.Contents { 534 isDir := strings.HasSuffix(*fileObject.Key, "/") 535 if isDir && *fileObject.Size == 0 { 536 continue 537 } 538 numFiles++ 539 size += *fileObject.Size 540 } 541 return true 542 }) 543 metric.S3ListObjectsCompleted(err) 544 return numFiles, size, err 545} 546 547// GetDirSize returns the number of files and the size for a folder 548// including any subfolders 549func (*S3Fs) GetDirSize(dirname string) (int, int64, error) { 550 return 0, 0, ErrVfsUnsupported 551} 552 553// GetAtomicUploadPath returns the path to use for an atomic upload. 554// S3 uploads are already atomic, we never call this method for S3 555func (*S3Fs) GetAtomicUploadPath(name string) string { 556 return "" 557} 558 559// GetRelativePath returns the path for a file relative to the user's home dir. 560// This is the path as seen by SFTPGo users 561func (fs *S3Fs) GetRelativePath(name string) string { 562 rel := path.Clean(name) 563 if rel == "." { 564 rel = "" 565 } 566 if !path.IsAbs(rel) { 567 return "/" + rel 568 } 569 if fs.config.KeyPrefix != "" { 570 if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) { 571 rel = "/" 572 } 573 rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix)) 574 } 575 if fs.mountPath != "" { 576 rel = path.Join(fs.mountPath, rel) 577 } 578 return rel 579} 580 581// Walk walks the file tree rooted at root, calling walkFn for each file or 582// directory in the tree, including root. The result are unordered 583func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error { 584 prefix := "" 585 if root != "/" && root != "." { 586 prefix = strings.TrimPrefix(root, "/") 587 if !strings.HasSuffix(prefix, "/") { 588 prefix += "/" 589 } 590 } 591 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 592 defer cancelFn() 593 err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{ 594 Bucket: aws.String(fs.config.Bucket), 595 Prefix: aws.String(prefix), 596 }, func(page *s3.ListObjectsV2Output, lastPage bool) bool { 597 for _, fileObject := range page.Contents { 598 objectSize := *fileObject.Size 599 objectModTime := *fileObject.LastModified 600 isDir := strings.HasSuffix(*fileObject.Key, "/") 601 name := path.Clean(*fileObject.Key) 602 if name == "/" || name == "." { 603 continue 604 } 605 err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil) 606 if err != nil { 607 return false 608 } 609 } 610 return true 611 }) 612 metric.S3ListObjectsCompleted(err) 613 walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck 614 615 return err 616} 617 618// Join joins any number of path elements into a single path 619func (*S3Fs) Join(elem ...string) string { 620 return path.Join(elem...) 621} 622 623// HasVirtualFolders returns true if folders are emulated 624func (*S3Fs) HasVirtualFolders() bool { 625 return true 626} 627 628// ResolvePath returns the matching filesystem path for the specified virtual path 629func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) { 630 if fs.mountPath != "" { 631 virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath) 632 } 633 if !path.IsAbs(virtualPath) { 634 virtualPath = path.Clean("/" + virtualPath) 635 } 636 return fs.Join("/", fs.config.KeyPrefix, virtualPath), nil 637} 638 639func (fs *S3Fs) resolve(name *string, prefix string) (string, bool) { 640 result := strings.TrimPrefix(*name, prefix) 641 isDir := strings.HasSuffix(result, "/") 642 if isDir { 643 result = strings.TrimSuffix(result, "/") 644 } 645 if strings.Contains(result, "/") { 646 i := strings.Index(result, "/") 647 isDir = true 648 result = result[:i] 649 } 650 return result, isDir 651} 652 653func (fs *S3Fs) checkIfBucketExists() error { 654 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 655 defer cancelFn() 656 _, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{ 657 Bucket: aws.String(fs.config.Bucket), 658 }) 659 metric.S3HeadBucketCompleted(err) 660 return err 661} 662 663func (fs *S3Fs) hasContents(name string) (bool, error) { 664 prefix := "" 665 if name != "/" && name != "." { 666 prefix = strings.TrimPrefix(name, "/") 667 if !strings.HasSuffix(prefix, "/") { 668 prefix += "/" 669 } 670 } 671 maxResults := int64(2) 672 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 673 defer cancelFn() 674 results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{ 675 Bucket: aws.String(fs.config.Bucket), 676 Prefix: aws.String(prefix), 677 MaxKeys: &maxResults, 678 }) 679 metric.S3ListObjectsCompleted(err) 680 if err != nil { 681 return false, err 682 } 683 // MinIO returns no contents while S3 returns 1 object 684 // with the key equal to the prefix for empty directories 685 for _, obj := range results.Contents { 686 name, _ := fs.resolve(obj.Key, prefix) 687 if name == "" || name == "/" { 688 continue 689 } 690 return true, nil 691 } 692 return false, nil 693} 694 695func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) { 696 ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) 697 defer cancelFn() 698 obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ 699 Bucket: aws.String(fs.config.Bucket), 700 Key: aws.String(name), 701 }) 702 metric.S3HeadObjectCompleted(err) 703 return obj, err 704} 705 706// GetMimeType returns the content type 707func (fs *S3Fs) GetMimeType(name string) (string, error) { 708 obj, err := fs.headObject(name) 709 if err != nil { 710 return "", err 711 } 712 return *obj.ContentType, err 713} 714 715// Close closes the fs 716func (*S3Fs) Close() error { 717 return nil 718} 719 720// GetAvailableDiskSize return the available size for the specified path 721func (*S3Fs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) { 722 return nil, ErrStorageSizeUnavailable 723} 724 725// ideally we should simply use url.PathEscape: 726// 727// https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/go/example_code/s3/s3_copy_object.go#L65 728// 729// but this cause issue with some vendors, see #483, the code below is copied from rclone 730func pathEscape(in string) string { 731 var u url.URL 732 u.Path = in 733 return strings.ReplaceAll(u.String(), "+", "%2B") 734} 735