1package awss3 2 3import ( 4 "fmt" 5 "io" 6 "net/http" 7 "os" 8 "path" 9 "strings" 10 "sync" 11 "time" 12 13 "github.com/araddon/gou" 14 "github.com/pborman/uuid" 15 "golang.org/x/net/context" 16 17 "github.com/aws/aws-sdk-go/aws" 18 "github.com/aws/aws-sdk-go/aws/credentials" 19 "github.com/aws/aws-sdk-go/aws/session" 20 "github.com/aws/aws-sdk-go/service/s3" 21 "github.com/aws/aws-sdk-go/service/s3/s3manager" 22 23 "github.com/lytics/cloudstorage" 24 "github.com/lytics/cloudstorage/csbufio" 25) 26 27const ( 28 // StoreType = "s3" this is used to define the storage type to create 29 // from cloudstorage.NewStore(config) 30 StoreType = "s3" 31 32 // Configuration Keys. These are the names of keys 33 // to look for in the json map[string]string to extract for config. 34 35 // ConfKeyAccessKey config key name of the aws access_key(id) for auth 36 ConfKeyAccessKey = "access_key" 37 // ConfKeyAccessSecret config key name of the aws acccess secret 38 ConfKeyAccessSecret = "access_secret" 39 // ConfKeyARN config key name of the aws ARN name of user 40 ConfKeyARN = "arn" 41 // ConfKeyDisableSSL config key name of disabling ssl flag 42 ConfKeyDisableSSL = "disable_ssl" 43 // Authentication Source's 44 45 // AuthAccessKey is for using aws access key/secret pairs 46 AuthAccessKey cloudstorage.AuthMethod = "aws_access_key" 47) 48 49var ( 50 // Retries number of times to retry upon failures. 51 Retries = 3 52 // PageSize is default page size 53 PageSize = 2000 54 55 // ErrNoS3Session no valid session 56 ErrNoS3Session = fmt.Errorf("no valid aws session was created") 57 // ErrNoAccessKey error for no access_key 58 ErrNoAccessKey = fmt.Errorf("no settings.access_key") 59 // ErrNoAccessSecret error for no settings.access_secret 60 ErrNoAccessSecret = fmt.Errorf("no settings.access_secret") 61 // ErrNoAuth error for no findable auth 62 ErrNoAuth = fmt.Errorf("No auth provided") 63) 64 65func init() { 66 // Register this Driver (s3) in cloudstorage driver registry. 67 cloudstorage.Register(StoreType, func(conf *cloudstorage.Config) (cloudstorage.Store, error) { 68 client, sess, err := NewClient(conf) 69 if err != nil { 70 return nil, err 71 } 72 return NewStore(client, sess, conf) 73 }) 74} 75 76type ( 77 // FS Simple wrapper for accessing s3 files, it doesn't currently implement a 78 // Reader/Writer interface so not useful for stream reading of large files yet. 79 FS struct { 80 PageSize int 81 ID string 82 client *s3.S3 83 sess *session.Session 84 endpoint string 85 bucket string 86 cachepath string 87 } 88 89 object struct { 90 fs *FS 91 o *s3.GetObjectOutput 92 cachedcopy *os.File 93 94 name string // aka "key" in s3 95 updated time.Time // LastModifyied in s3 96 metadata map[string]string 97 bucket string 98 readonly bool 99 opened bool 100 cachepath string 101 102 infoOnce sync.Once 103 infoErr error 104 } 105) 106 107// NewClient create new AWS s3 Client. Uses cloudstorage.Config to read 108// necessary config settings such as bucket, region, auth. 109func NewClient(conf *cloudstorage.Config) (*s3.S3, *session.Session, error) { 110 111 awsConf := aws.NewConfig(). 112 WithHTTPClient(http.DefaultClient). 113 WithMaxRetries(aws.UseServiceDefaultRetries). 114 WithLogger(aws.NewDefaultLogger()). 115 WithLogLevel(aws.LogOff). 116 WithSleepDelay(time.Sleep) 117 118 if conf.Region != "" { 119 awsConf.WithRegion(conf.Region) 120 } else { 121 awsConf.WithRegion("us-east-1") 122 } 123 124 switch conf.AuthMethod { 125 case AuthAccessKey: 126 accessKey := conf.Settings.String(ConfKeyAccessKey) 127 if accessKey == "" { 128 return nil, nil, ErrNoAccessKey 129 } 130 secretKey := conf.Settings.String(ConfKeyAccessSecret) 131 if secretKey == "" { 132 return nil, nil, ErrNoAccessSecret 133 } 134 awsConf.WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, "")) 135 default: 136 return nil, nil, ErrNoAuth 137 } 138 139 if conf.BaseUrl != "" { 140 awsConf.WithEndpoint(conf.BaseUrl).WithS3ForcePathStyle(true) 141 } 142 143 disableSSL := conf.Settings.Bool(ConfKeyDisableSSL) 144 if disableSSL { 145 awsConf.WithDisableSSL(true) 146 } 147 148 sess := session.New(awsConf) 149 if sess == nil { 150 return nil, nil, ErrNoS3Session 151 } 152 153 s3Client := s3.New(sess) 154 155 return s3Client, sess, nil 156} 157 158// NewStore Create AWS S3 storage client of type cloudstorage.Store 159func NewStore(c *s3.S3, sess *session.Session, conf *cloudstorage.Config) (*FS, error) { 160 161 if conf.TmpDir == "" { 162 return nil, fmt.Errorf("unable to create cachepath. config.tmpdir=%q", conf.TmpDir) 163 } 164 err := os.MkdirAll(conf.TmpDir, 0775) 165 if err != nil { 166 return nil, fmt.Errorf("unable to create cachepath. config.tmpdir=%q err=%v", conf.TmpDir, err) 167 } 168 169 uid := uuid.NewUUID().String() 170 uid = strings.Replace(uid, "-", "", -1) 171 172 return &FS{ 173 client: c, 174 sess: sess, 175 bucket: conf.Bucket, 176 cachepath: conf.TmpDir, 177 ID: uid, 178 PageSize: cloudstorage.MaxResults, 179 }, nil 180} 181 182// Type of store = "s3" 183func (f *FS) Type() string { 184 return StoreType 185} 186 187// Client gets access to the underlying s3 cloud storage client. 188func (f *FS) Client() interface{} { 189 return f.client 190} 191 192// String function to provide s3://..../file path 193func (f *FS) String() string { 194 return fmt.Sprintf("s3://%s/", f.bucket) 195} 196 197// NewObject of Type s3. 198func (f *FS) NewObject(objectname string) (cloudstorage.Object, error) { 199 obj, err := f.Get(context.Background(), objectname) 200 if err != nil && err != cloudstorage.ErrObjectNotFound { 201 return nil, err 202 } else if obj != nil { 203 return nil, cloudstorage.ErrObjectExists 204 } 205 206 cf := cloudstorage.CachePathObj(f.cachepath, objectname, f.ID) 207 208 return &object{ 209 fs: f, 210 name: objectname, 211 metadata: map[string]string{cloudstorage.ContentTypeKey: cloudstorage.ContentType(objectname)}, 212 bucket: f.bucket, 213 cachedcopy: nil, 214 cachepath: cf, 215 }, nil 216} 217 218// Get a single File Object 219func (f *FS) Get(ctx context.Context, objectpath string) (cloudstorage.Object, error) { 220 221 obj, err := f.getObjectMeta(ctx, objectpath) 222 if err != nil { 223 return nil, err 224 } else if obj == nil { 225 return nil, cloudstorage.ErrObjectNotFound 226 } 227 228 return obj, nil 229} 230 231// get single object 232func (f *FS) getObjectMeta(ctx context.Context, objectname string) (*object, error) { 233 234 req := &s3.HeadObjectInput{ 235 Key: aws.String(objectname), 236 Bucket: aws.String(f.bucket), 237 } 238 239 res, err := f.client.HeadObjectWithContext(ctx, req) 240 if err != nil { 241 // translate the string error to typed error 242 if strings.Contains(err.Error(), "Not Found") { 243 return nil, cloudstorage.ErrObjectNotFound 244 } 245 return nil, err 246 } 247 248 return newObjectFromHead(f, objectname, res), nil 249} 250 251func (f *FS) getS3OpenObject(ctx context.Context, objectname string) (*s3.GetObjectOutput, error) { 252 253 res, err := f.client.GetObjectWithContext(ctx, &s3.GetObjectInput{ 254 Key: aws.String(objectname), 255 Bucket: aws.String(f.bucket), 256 }) 257 if err != nil { 258 // translate the string error to typed error 259 if strings.Contains(err.Error(), "NoSuchKey") { 260 return nil, cloudstorage.ErrObjectNotFound 261 } 262 return nil, err 263 } 264 return res, nil 265} 266 267func convertMetaData(m map[string]*string) (map[string]string, error) { 268 result := make(map[string]string, len(m)) 269 for key, value := range m { 270 if value != nil { 271 result[strings.ToLower(key)] = *value 272 } else { 273 result[strings.ToLower(key)] = "" 274 } 275 276 } 277 return result, nil 278} 279 280// List objects from this store. 281func (f *FS) List(ctx context.Context, q cloudstorage.Query) (*cloudstorage.ObjectsResponse, error) { 282 283 itemLimit := int64(f.PageSize) 284 if q.PageSize > 0 { 285 itemLimit = int64(q.PageSize) 286 } 287 288 params := &s3.ListObjectsInput{ 289 Bucket: aws.String(f.bucket), 290 Marker: &q.Marker, 291 MaxKeys: &itemLimit, 292 Prefix: &q.Prefix, 293 } 294 295 resp, err := f.client.ListObjects(params) 296 if err != nil { 297 gou.Warnf("err = %v", err) 298 return nil, err 299 } 300 301 objResp := &cloudstorage.ObjectsResponse{ 302 Objects: make(cloudstorage.Objects, len(resp.Contents)), 303 } 304 305 for i, o := range resp.Contents { 306 objResp.Objects[i] = newObject(f, o) 307 } 308 309 if resp.IsTruncated != nil && *resp.IsTruncated { 310 q.Marker = *resp.Contents[len(resp.Contents)-1].Key 311 } 312 313 return objResp, nil 314} 315 316// Objects returns an iterator over the objects in the s3 bucket that match the Query q. 317// If q is nil, no filtering is done. 318func (f *FS) Objects(ctx context.Context, q cloudstorage.Query) (cloudstorage.ObjectIterator, error) { 319 return cloudstorage.NewObjectPageIterator(ctx, f, q), nil 320} 321 322// Folders get folders list. 323func (f *FS) Folders(ctx context.Context, q cloudstorage.Query) ([]string, error) { 324 325 q.Delimiter = "/" 326 327 // Think we should just put 1 here right? 328 itemLimit := int64(f.PageSize) 329 if q.PageSize > 0 { 330 itemLimit = int64(q.PageSize) 331 } 332 333 params := &s3.ListObjectsInput{ 334 Bucket: aws.String(f.bucket), 335 MaxKeys: &itemLimit, 336 Prefix: &q.Prefix, 337 Delimiter: &q.Delimiter, 338 } 339 340 folders := make([]string, 0) 341 342 for { 343 select { 344 case <-ctx.Done(): 345 // If has been closed 346 return folders, ctx.Err() 347 default: 348 if q.Marker != "" { 349 params.Marker = &q.Marker 350 } 351 resp, err := f.client.ListObjectsWithContext(ctx, params) 352 if err != nil { 353 return nil, err 354 } 355 for _, cp := range resp.CommonPrefixes { 356 folders = append(folders, strings.TrimPrefix(*cp.Prefix, `/`)) 357 } 358 return folders, nil 359 } 360 } 361} 362 363/* 364// Copy from src to destination 365func (f *FS) Copy(ctx context.Context, src, des cloudstorage.Object) error { 366 367 so, ok := src.(*object) 368 if !ok { 369 return fmt.Errorf("Copy source file expected s3 but got %T", src) 370 } 371 do, ok := des.(*object) 372 if !ok { 373 return fmt.Errorf("Copy destination expected s3 but got %T", des) 374 } 375 376 oh := so.b.Object(so.name) 377 dh := do.b.Object(do.name) 378 379 _, err := dh.CopierFrom(oh).Run(ctx) 380 return err 381} 382 383// Move which is a Copy & Delete 384func (f *FS) Move(ctx context.Context, src, des cloudstorage.Object) error { 385 386 so, ok := src.(*object) 387 if !ok { 388 return fmt.Errorf("Move source file expected s3 but got %T", src) 389 } 390 do, ok := des.(*object) 391 if !ok { 392 return fmt.Errorf("Move destination expected s3 but got %T", des) 393 } 394 395 oh := so.b.Object(so.name) 396 dh := do.b.Object(des.name) 397 398 if _, err := dh.CopierFrom(oh).Run(ctx); err != nil { 399 return err 400 } 401 402 return oh.Delete(ctx) 403} 404*/ 405// NewReader create file reader. 406func (f *FS) NewReader(o string) (io.ReadCloser, error) { 407 return f.NewReaderWithContext(context.Background(), o) 408} 409 410// NewReaderWithContext create new File reader with context. 411func (f *FS) NewReaderWithContext(ctx context.Context, objectname string) (io.ReadCloser, error) { 412 res, err := f.client.GetObjectWithContext(ctx, &s3.GetObjectInput{ 413 Key: aws.String(objectname), 414 Bucket: aws.String(f.bucket), 415 }) 416 if err != nil { 417 // translate the string error to typed error 418 if strings.Contains(err.Error(), "NoSuchKey") { 419 return nil, cloudstorage.ErrObjectNotFound 420 } 421 return nil, err 422 } 423 return res.Body, nil 424} 425 426// NewWriter create Object Writer. 427func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteCloser, error) { 428 return f.NewWriterWithContext(context.Background(), objectName, metadata) 429} 430 431// NewWriterWithContext create writer with provided context and metadata. 432func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string) (io.WriteCloser, error) { 433 434 // Create an uploader with the session and default options 435 uploader := s3manager.NewUploader(f.sess) 436 437 pr, pw := io.Pipe() 438 bw := csbufio.NewWriter(pw) 439 440 go func() { 441 // TODO: this needs to be managed, ie shutdown signals, close, handler err etc. 442 443 // Upload the file to S3. 444 _, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{ 445 Bucket: aws.String(f.bucket), 446 Key: aws.String(objectName), 447 Body: pr, 448 }) 449 if err != nil { 450 gou.Warnf("could not upload %v", err) 451 } 452 }() 453 454 return bw, nil 455} 456 457// Delete requested object path string. 458func (f *FS) Delete(ctx context.Context, obj string) error { 459 params := &s3.DeleteObjectInput{ 460 Bucket: aws.String(f.bucket), 461 Key: aws.String(obj), 462 } 463 464 _, err := f.client.DeleteObjectWithContext(ctx, params) 465 if err != nil { 466 return err 467 } 468 return nil 469} 470 471func newObject(f *FS, o *s3.Object) *object { 472 obj := &object{ 473 fs: f, 474 name: *o.Key, 475 bucket: f.bucket, 476 cachepath: cloudstorage.CachePathObj(f.cachepath, *o.Key, f.ID), 477 } 478 if o.LastModified != nil { 479 obj.updated = *o.LastModified 480 } 481 return obj 482} 483func newObjectFromHead(f *FS, name string, o *s3.HeadObjectOutput) *object { 484 obj := &object{ 485 fs: f, 486 name: name, 487 bucket: f.bucket, 488 cachepath: cloudstorage.CachePathObj(f.cachepath, name, f.ID), 489 } 490 if o.LastModified != nil { 491 obj.updated = *o.LastModified 492 } 493 // metadata? 494 obj.metadata, _ = convertMetaData(o.Metadata) 495 return obj 496} 497 498func (o *object) StorageSource() string { 499 return StoreType 500} 501func (o *object) Name() string { 502 return o.name 503} 504func (o *object) String() string { 505 return o.name 506} 507func (o *object) Updated() time.Time { 508 return o.updated 509} 510func (o *object) MetaData() map[string]string { 511 return o.metadata 512} 513func (o *object) SetMetaData(meta map[string]string) { 514 o.metadata = meta 515} 516 517func (o *object) Delete() error { 518 return o.fs.Delete(context.Background(), o.name) 519} 520 521func (o *object) Open(accesslevel cloudstorage.AccessLevel) (*os.File, error) { 522 if o.opened { 523 return nil, fmt.Errorf("the store object is already opened. %s", o.name) 524 } 525 526 var errs []error = make([]error, 0) 527 var cachedcopy *os.File = nil 528 var err error 529 var readonly = accesslevel == cloudstorage.ReadOnly 530 531 err = os.MkdirAll(path.Dir(o.cachepath), 0775) 532 if err != nil { 533 return nil, fmt.Errorf("error occurred creating cachedcopy dir. cachepath=%s object=%s err=%v", o.cachepath, o.name, err) 534 } 535 536 err = cloudstorage.EnsureDir(o.cachepath) 537 if err != nil { 538 return nil, fmt.Errorf("error occurred creating cachedcopy's dir. cachepath=%s err=%v", o.cachepath, err) 539 } 540 541 cachedcopy, err = os.Create(o.cachepath) 542 if err != nil { 543 return nil, fmt.Errorf("error occurred creating file. local=%s err=%v", o.cachepath, err) 544 } 545 546 for try := 0; try < Retries; try++ { 547 if o.o == nil { 548 obj, err := o.fs.getS3OpenObject(context.Background(), o.name) 549 if err != nil { 550 if err == cloudstorage.ErrObjectNotFound { 551 // New, this is fine 552 } else { 553 // lets re-try 554 errs = append(errs, fmt.Errorf("error getting object err=%v", err)) 555 cloudstorage.Backoff(try) 556 continue 557 } 558 } 559 560 if obj != nil { 561 o.o = obj 562 } 563 } 564 565 if o.o != nil { 566 // we have a preexisting object, so lets download it.. 567 defer o.o.Body.Close() 568 569 if _, err := cachedcopy.Seek(0, os.SEEK_SET); err != nil { 570 return nil, fmt.Errorf("error seeking to start of cachedcopy err=%v", err) //don't retry on local fs errors 571 } 572 573 _, err = io.Copy(cachedcopy, o.o.Body) 574 if err != nil { 575 errs = append(errs, fmt.Errorf("error coping bytes. err=%v", err)) 576 //recreate the cachedcopy file incase it has incomplete data 577 if err := os.Remove(o.cachepath); err != nil { 578 return nil, fmt.Errorf("error resetting the cachedcopy err=%v", err) //don't retry on local fs errors 579 } 580 if cachedcopy, err = os.Create(o.cachepath); err != nil { 581 return nil, fmt.Errorf("error creating a new cachedcopy file. local=%s err=%v", o.cachepath, err) 582 } 583 584 cloudstorage.Backoff(try) 585 continue 586 } 587 } 588 589 if readonly { 590 cachedcopy.Close() 591 cachedcopy, err = os.Open(o.cachepath) 592 if err != nil { 593 name := "unknown" 594 if cachedcopy != nil { 595 name = cachedcopy.Name() 596 } 597 return nil, fmt.Errorf("error opening file. local=%s object=%s tfile=%v err=%v", o.cachepath, o.name, name, err) 598 } 599 } else { 600 if _, err := cachedcopy.Seek(0, os.SEEK_SET); err != nil { 601 return nil, fmt.Errorf("error seeking to start of cachedcopy err=%v", err) //don't retry on local fs errors 602 } 603 } 604 605 o.cachedcopy = cachedcopy 606 o.readonly = readonly 607 o.opened = true 608 return o.cachedcopy, nil 609 } 610 611 return nil, fmt.Errorf("fetch error retry cnt reached: obj=%s tfile=%v errs:[%v]", o.name, o.cachepath, errs) 612} 613 614// File get the current file handle for cached copy. 615func (o *object) File() *os.File { 616 return o.cachedcopy 617} 618 619// Read bytes from underlying/cached file 620func (o *object) Read(p []byte) (n int, err error) { 621 return o.cachedcopy.Read(p) 622} 623 624// Write bytes to local file, will be synced on close/sync. 625func (o *object) Write(p []byte) (n int, err error) { 626 if o.cachedcopy == nil { 627 _, err := o.Open(cloudstorage.ReadWrite) 628 if err != nil { 629 return 0, err 630 } 631 } 632 return o.cachedcopy.Write(p) 633} 634 635// Sync syncs any changes in file up to s3. 636func (o *object) Sync() error { 637 638 if !o.opened { 639 return fmt.Errorf("object isn't opened object:%s", o.name) 640 } 641 if o.readonly { 642 return fmt.Errorf("trying to Sync a readonly object:%s", o.name) 643 } 644 645 cachedcopy, err := os.OpenFile(o.cachepath, os.O_RDWR, 0664) 646 if err != nil { 647 return fmt.Errorf("couldn't open localfile for sync'ing. local=%s err=%v", o.cachepath, err) 648 } 649 defer cachedcopy.Close() 650 651 // Create an uploader with the session and default options 652 uploader := s3manager.NewUploader(o.fs.sess) 653 654 if _, err := cachedcopy.Seek(0, os.SEEK_SET); err != nil { 655 return fmt.Errorf("error seeking to start of cachedcopy err=%v", err) //don't retry on local filesystem errors 656 } 657 658 // Upload the file to S3. 659 _, err = uploader.Upload(&s3manager.UploadInput{ 660 Bucket: aws.String(o.fs.bucket), 661 Key: aws.String(o.name), 662 Body: cachedcopy, 663 }) 664 if err != nil { 665 gou.Warnf("could not upload %v", err) 666 return fmt.Errorf("failed to upload file, %v", err) 667 } 668 return nil 669} 670 671// Close this object 672func (o *object) Close() error { 673 if !o.opened { 674 return nil 675 } 676 defer func() { 677 os.Remove(o.cachepath) 678 o.cachedcopy = nil 679 o.opened = false 680 }() 681 682 serr := o.cachedcopy.Sync() 683 cerr := o.cachedcopy.Close() 684 if serr != nil || cerr != nil { 685 return fmt.Errorf("error on sync and closing localfile. %s sync=%v, err=%v", o.cachepath, serr, cerr) 686 } 687 688 if o.opened && !o.readonly { 689 err := o.Sync() 690 if err != nil { 691 gou.Errorf("error on sync %v", err) 692 return err 693 } 694 } 695 return nil 696} 697 698// Release this object, cleanup cached copy. 699func (o *object) Release() error { 700 if o.cachedcopy != nil { 701 gou.Infof("release %q vs %q", o.cachedcopy.Name(), o.cachepath) 702 o.cachedcopy.Close() 703 return os.Remove(o.cachepath) 704 } 705 os.Remove(o.cachepath) 706 return nil 707} 708