1package awss3
2
3import (
4	"fmt"
5	"io"
6	"net/http"
7	"os"
8	"path"
9	"strings"
10	"sync"
11	"time"
12
13	"github.com/araddon/gou"
14	"github.com/pborman/uuid"
15	"golang.org/x/net/context"
16
17	"github.com/aws/aws-sdk-go/aws"
18	"github.com/aws/aws-sdk-go/aws/credentials"
19	"github.com/aws/aws-sdk-go/aws/session"
20	"github.com/aws/aws-sdk-go/service/s3"
21	"github.com/aws/aws-sdk-go/service/s3/s3manager"
22
23	"github.com/lytics/cloudstorage"
24	"github.com/lytics/cloudstorage/csbufio"
25)
26
27const (
28	// StoreType = "s3" this is used to define the storage type to create
29	// from cloudstorage.NewStore(config)
30	StoreType = "s3"
31
32	// Configuration Keys.  These are the names of keys
33	// to look for in the json map[string]string to extract for config.
34
35	// ConfKeyAccessKey config key name of the aws access_key(id) for auth
36	ConfKeyAccessKey = "access_key"
37	// ConfKeyAccessSecret config key name of the aws acccess secret
38	ConfKeyAccessSecret = "access_secret"
39	// ConfKeyARN config key name of the aws ARN name of user
40	ConfKeyARN = "arn"
41	// ConfKeyDisableSSL config key name of disabling ssl flag
42	ConfKeyDisableSSL = "disable_ssl"
43	// Authentication Source's
44
45	// AuthAccessKey is for using aws access key/secret pairs
46	AuthAccessKey cloudstorage.AuthMethod = "aws_access_key"
47)
48
49var (
50	// Retries number of times to retry upon failures.
51	Retries = 3
52	// PageSize is default page size
53	PageSize = 2000
54
55	// ErrNoS3Session no valid session
56	ErrNoS3Session = fmt.Errorf("no valid aws session was created")
57	// ErrNoAccessKey error for no access_key
58	ErrNoAccessKey = fmt.Errorf("no settings.access_key")
59	// ErrNoAccessSecret error for no settings.access_secret
60	ErrNoAccessSecret = fmt.Errorf("no settings.access_secret")
61	// ErrNoAuth error for no findable auth
62	ErrNoAuth = fmt.Errorf("No auth provided")
63)
64
65func init() {
66	// Register this Driver (s3) in cloudstorage driver registry.
67	cloudstorage.Register(StoreType, func(conf *cloudstorage.Config) (cloudstorage.Store, error) {
68		client, sess, err := NewClient(conf)
69		if err != nil {
70			return nil, err
71		}
72		return NewStore(client, sess, conf)
73	})
74}
75
76type (
77	// FS Simple wrapper for accessing s3 files, it doesn't currently implement a
78	// Reader/Writer interface so not useful for stream reading of large files yet.
79	FS struct {
80		PageSize  int
81		ID        string
82		client    *s3.S3
83		sess      *session.Session
84		endpoint  string
85		bucket    string
86		cachepath string
87	}
88
89	object struct {
90		fs         *FS
91		o          *s3.GetObjectOutput
92		cachedcopy *os.File
93
94		name      string    // aka "key" in s3
95		updated   time.Time // LastModifyied in s3
96		metadata  map[string]string
97		bucket    string
98		readonly  bool
99		opened    bool
100		cachepath string
101
102		infoOnce sync.Once
103		infoErr  error
104	}
105)
106
107// NewClient create new AWS s3 Client.  Uses cloudstorage.Config to read
108// necessary config settings such as bucket, region, auth.
109func NewClient(conf *cloudstorage.Config) (*s3.S3, *session.Session, error) {
110
111	awsConf := aws.NewConfig().
112		WithHTTPClient(http.DefaultClient).
113		WithMaxRetries(aws.UseServiceDefaultRetries).
114		WithLogger(aws.NewDefaultLogger()).
115		WithLogLevel(aws.LogOff).
116		WithSleepDelay(time.Sleep)
117
118	if conf.Region != "" {
119		awsConf.WithRegion(conf.Region)
120	} else {
121		awsConf.WithRegion("us-east-1")
122	}
123
124	switch conf.AuthMethod {
125	case AuthAccessKey:
126		accessKey := conf.Settings.String(ConfKeyAccessKey)
127		if accessKey == "" {
128			return nil, nil, ErrNoAccessKey
129		}
130		secretKey := conf.Settings.String(ConfKeyAccessSecret)
131		if secretKey == "" {
132			return nil, nil, ErrNoAccessSecret
133		}
134		awsConf.WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, ""))
135	default:
136		return nil, nil, ErrNoAuth
137	}
138
139	if conf.BaseUrl != "" {
140		awsConf.WithEndpoint(conf.BaseUrl).WithS3ForcePathStyle(true)
141	}
142
143	disableSSL := conf.Settings.Bool(ConfKeyDisableSSL)
144	if disableSSL {
145		awsConf.WithDisableSSL(true)
146	}
147
148	sess := session.New(awsConf)
149	if sess == nil {
150		return nil, nil, ErrNoS3Session
151	}
152
153	s3Client := s3.New(sess)
154
155	return s3Client, sess, nil
156}
157
158// NewStore Create AWS S3 storage client of type cloudstorage.Store
159func NewStore(c *s3.S3, sess *session.Session, conf *cloudstorage.Config) (*FS, error) {
160
161	if conf.TmpDir == "" {
162		return nil, fmt.Errorf("unable to create cachepath. config.tmpdir=%q", conf.TmpDir)
163	}
164	err := os.MkdirAll(conf.TmpDir, 0775)
165	if err != nil {
166		return nil, fmt.Errorf("unable to create cachepath. config.tmpdir=%q err=%v", conf.TmpDir, err)
167	}
168
169	uid := uuid.NewUUID().String()
170	uid = strings.Replace(uid, "-", "", -1)
171
172	return &FS{
173		client:    c,
174		sess:      sess,
175		bucket:    conf.Bucket,
176		cachepath: conf.TmpDir,
177		ID:        uid,
178		PageSize:  cloudstorage.MaxResults,
179	}, nil
180}
181
182// Type of store = "s3"
183func (f *FS) Type() string {
184	return StoreType
185}
186
187// Client gets access to the underlying s3 cloud storage client.
188func (f *FS) Client() interface{} {
189	return f.client
190}
191
192// String function to provide s3://..../file   path
193func (f *FS) String() string {
194	return fmt.Sprintf("s3://%s/", f.bucket)
195}
196
197// NewObject of Type s3.
198func (f *FS) NewObject(objectname string) (cloudstorage.Object, error) {
199	obj, err := f.Get(context.Background(), objectname)
200	if err != nil && err != cloudstorage.ErrObjectNotFound {
201		return nil, err
202	} else if obj != nil {
203		return nil, cloudstorage.ErrObjectExists
204	}
205
206	cf := cloudstorage.CachePathObj(f.cachepath, objectname, f.ID)
207
208	return &object{
209		fs:         f,
210		name:       objectname,
211		metadata:   map[string]string{cloudstorage.ContentTypeKey: cloudstorage.ContentType(objectname)},
212		bucket:     f.bucket,
213		cachedcopy: nil,
214		cachepath:  cf,
215	}, nil
216}
217
218// Get a single File Object
219func (f *FS) Get(ctx context.Context, objectpath string) (cloudstorage.Object, error) {
220
221	obj, err := f.getObjectMeta(ctx, objectpath)
222	if err != nil {
223		return nil, err
224	} else if obj == nil {
225		return nil, cloudstorage.ErrObjectNotFound
226	}
227
228	return obj, nil
229}
230
231// get single object
232func (f *FS) getObjectMeta(ctx context.Context, objectname string) (*object, error) {
233
234	req := &s3.HeadObjectInput{
235		Key:    aws.String(objectname),
236		Bucket: aws.String(f.bucket),
237	}
238
239	res, err := f.client.HeadObjectWithContext(ctx, req)
240	if err != nil {
241		// translate the string error to typed error
242		if strings.Contains(err.Error(), "Not Found") {
243			return nil, cloudstorage.ErrObjectNotFound
244		}
245		return nil, err
246	}
247
248	return newObjectFromHead(f, objectname, res), nil
249}
250
251func (f *FS) getS3OpenObject(ctx context.Context, objectname string) (*s3.GetObjectOutput, error) {
252
253	res, err := f.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
254		Key:    aws.String(objectname),
255		Bucket: aws.String(f.bucket),
256	})
257	if err != nil {
258		// translate the string error to typed error
259		if strings.Contains(err.Error(), "NoSuchKey") {
260			return nil, cloudstorage.ErrObjectNotFound
261		}
262		return nil, err
263	}
264	return res, nil
265}
266
267func convertMetaData(m map[string]*string) (map[string]string, error) {
268	result := make(map[string]string, len(m))
269	for key, value := range m {
270		if value != nil {
271			result[strings.ToLower(key)] = *value
272		} else {
273			result[strings.ToLower(key)] = ""
274		}
275
276	}
277	return result, nil
278}
279
280// List objects from this store.
281func (f *FS) List(ctx context.Context, q cloudstorage.Query) (*cloudstorage.ObjectsResponse, error) {
282
283	itemLimit := int64(f.PageSize)
284	if q.PageSize > 0 {
285		itemLimit = int64(q.PageSize)
286	}
287
288	params := &s3.ListObjectsInput{
289		Bucket:  aws.String(f.bucket),
290		Marker:  &q.Marker,
291		MaxKeys: &itemLimit,
292		Prefix:  &q.Prefix,
293	}
294
295	resp, err := f.client.ListObjects(params)
296	if err != nil {
297		gou.Warnf("err = %v", err)
298		return nil, err
299	}
300
301	objResp := &cloudstorage.ObjectsResponse{
302		Objects: make(cloudstorage.Objects, len(resp.Contents)),
303	}
304
305	for i, o := range resp.Contents {
306		objResp.Objects[i] = newObject(f, o)
307	}
308
309	if resp.IsTruncated != nil && *resp.IsTruncated {
310		q.Marker = *resp.Contents[len(resp.Contents)-1].Key
311	}
312
313	return objResp, nil
314}
315
316// Objects returns an iterator over the objects in the s3 bucket that match the Query q.
317// If q is nil, no filtering is done.
318func (f *FS) Objects(ctx context.Context, q cloudstorage.Query) (cloudstorage.ObjectIterator, error) {
319	return cloudstorage.NewObjectPageIterator(ctx, f, q), nil
320}
321
322// Folders get folders list.
323func (f *FS) Folders(ctx context.Context, q cloudstorage.Query) ([]string, error) {
324
325	q.Delimiter = "/"
326
327	// Think we should just put 1 here right?
328	itemLimit := int64(f.PageSize)
329	if q.PageSize > 0 {
330		itemLimit = int64(q.PageSize)
331	}
332
333	params := &s3.ListObjectsInput{
334		Bucket:    aws.String(f.bucket),
335		MaxKeys:   &itemLimit,
336		Prefix:    &q.Prefix,
337		Delimiter: &q.Delimiter,
338	}
339
340	folders := make([]string, 0)
341
342	for {
343		select {
344		case <-ctx.Done():
345			// If has been closed
346			return folders, ctx.Err()
347		default:
348			if q.Marker != "" {
349				params.Marker = &q.Marker
350			}
351			resp, err := f.client.ListObjectsWithContext(ctx, params)
352			if err != nil {
353				return nil, err
354			}
355			for _, cp := range resp.CommonPrefixes {
356				folders = append(folders, strings.TrimPrefix(*cp.Prefix, `/`))
357			}
358			return folders, nil
359		}
360	}
361}
362
363/*
364// Copy from src to destination
365func (f *FS) Copy(ctx context.Context, src, des cloudstorage.Object) error {
366
367	so, ok := src.(*object)
368	if !ok {
369		return fmt.Errorf("Copy source file expected s3 but got %T", src)
370	}
371	do, ok := des.(*object)
372	if !ok {
373		return fmt.Errorf("Copy destination expected s3 but got %T", des)
374	}
375
376	oh := so.b.Object(so.name)
377	dh := do.b.Object(do.name)
378
379	_, err := dh.CopierFrom(oh).Run(ctx)
380	return err
381}
382
383// Move which is a Copy & Delete
384func (f *FS) Move(ctx context.Context, src, des cloudstorage.Object) error {
385
386	so, ok := src.(*object)
387	if !ok {
388		return fmt.Errorf("Move source file expected s3 but got %T", src)
389	}
390	do, ok := des.(*object)
391	if !ok {
392		return fmt.Errorf("Move destination expected s3 but got %T", des)
393	}
394
395	oh := so.b.Object(so.name)
396	dh := do.b.Object(des.name)
397
398	if _, err := dh.CopierFrom(oh).Run(ctx); err != nil {
399		return err
400	}
401
402	return oh.Delete(ctx)
403}
404*/
405// NewReader create file reader.
406func (f *FS) NewReader(o string) (io.ReadCloser, error) {
407	return f.NewReaderWithContext(context.Background(), o)
408}
409
410// NewReaderWithContext create new File reader with context.
411func (f *FS) NewReaderWithContext(ctx context.Context, objectname string) (io.ReadCloser, error) {
412	res, err := f.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
413		Key:    aws.String(objectname),
414		Bucket: aws.String(f.bucket),
415	})
416	if err != nil {
417		// translate the string error to typed error
418		if strings.Contains(err.Error(), "NoSuchKey") {
419			return nil, cloudstorage.ErrObjectNotFound
420		}
421		return nil, err
422	}
423	return res.Body, nil
424}
425
426// NewWriter create Object Writer.
427func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteCloser, error) {
428	return f.NewWriterWithContext(context.Background(), objectName, metadata)
429}
430
431// NewWriterWithContext create writer with provided context and metadata.
432func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string) (io.WriteCloser, error) {
433
434	// Create an uploader with the session and default options
435	uploader := s3manager.NewUploader(f.sess)
436
437	pr, pw := io.Pipe()
438	bw := csbufio.NewWriter(pw)
439
440	go func() {
441		// TODO:  this needs to be managed, ie shutdown signals, close, handler err etc.
442
443		// Upload the file to S3.
444		_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
445			Bucket: aws.String(f.bucket),
446			Key:    aws.String(objectName),
447			Body:   pr,
448		})
449		if err != nil {
450			gou.Warnf("could not upload %v", err)
451		}
452	}()
453
454	return bw, nil
455}
456
457// Delete requested object path string.
458func (f *FS) Delete(ctx context.Context, obj string) error {
459	params := &s3.DeleteObjectInput{
460		Bucket: aws.String(f.bucket),
461		Key:    aws.String(obj),
462	}
463
464	_, err := f.client.DeleteObjectWithContext(ctx, params)
465	if err != nil {
466		return err
467	}
468	return nil
469}
470
471func newObject(f *FS, o *s3.Object) *object {
472	obj := &object{
473		fs:        f,
474		name:      *o.Key,
475		bucket:    f.bucket,
476		cachepath: cloudstorage.CachePathObj(f.cachepath, *o.Key, f.ID),
477	}
478	if o.LastModified != nil {
479		obj.updated = *o.LastModified
480	}
481	return obj
482}
483func newObjectFromHead(f *FS, name string, o *s3.HeadObjectOutput) *object {
484	obj := &object{
485		fs:        f,
486		name:      name,
487		bucket:    f.bucket,
488		cachepath: cloudstorage.CachePathObj(f.cachepath, name, f.ID),
489	}
490	if o.LastModified != nil {
491		obj.updated = *o.LastModified
492	}
493	// metadata?
494	obj.metadata, _ = convertMetaData(o.Metadata)
495	return obj
496}
497
498func (o *object) StorageSource() string {
499	return StoreType
500}
501func (o *object) Name() string {
502	return o.name
503}
504func (o *object) String() string {
505	return o.name
506}
507func (o *object) Updated() time.Time {
508	return o.updated
509}
510func (o *object) MetaData() map[string]string {
511	return o.metadata
512}
513func (o *object) SetMetaData(meta map[string]string) {
514	o.metadata = meta
515}
516
517func (o *object) Delete() error {
518	return o.fs.Delete(context.Background(), o.name)
519}
520
521func (o *object) Open(accesslevel cloudstorage.AccessLevel) (*os.File, error) {
522	if o.opened {
523		return nil, fmt.Errorf("the store object is already opened. %s", o.name)
524	}
525
526	var errs []error = make([]error, 0)
527	var cachedcopy *os.File = nil
528	var err error
529	var readonly = accesslevel == cloudstorage.ReadOnly
530
531	err = os.MkdirAll(path.Dir(o.cachepath), 0775)
532	if err != nil {
533		return nil, fmt.Errorf("error occurred creating cachedcopy dir. cachepath=%s object=%s err=%v", o.cachepath, o.name, err)
534	}
535
536	err = cloudstorage.EnsureDir(o.cachepath)
537	if err != nil {
538		return nil, fmt.Errorf("error occurred creating cachedcopy's dir. cachepath=%s err=%v", o.cachepath, err)
539	}
540
541	cachedcopy, err = os.Create(o.cachepath)
542	if err != nil {
543		return nil, fmt.Errorf("error occurred creating file. local=%s err=%v", o.cachepath, err)
544	}
545
546	for try := 0; try < Retries; try++ {
547		if o.o == nil {
548			obj, err := o.fs.getS3OpenObject(context.Background(), o.name)
549			if err != nil {
550				if err == cloudstorage.ErrObjectNotFound {
551					// New, this is fine
552				} else {
553					// lets re-try
554					errs = append(errs, fmt.Errorf("error getting object err=%v", err))
555					cloudstorage.Backoff(try)
556					continue
557				}
558			}
559
560			if obj != nil {
561				o.o = obj
562			}
563		}
564
565		if o.o != nil {
566			// we have a preexisting object, so lets download it..
567			defer o.o.Body.Close()
568
569			if _, err := cachedcopy.Seek(0, os.SEEK_SET); err != nil {
570				return nil, fmt.Errorf("error seeking to start of cachedcopy err=%v", err) //don't retry on local fs errors
571			}
572
573			_, err = io.Copy(cachedcopy, o.o.Body)
574			if err != nil {
575				errs = append(errs, fmt.Errorf("error coping bytes. err=%v", err))
576				//recreate the cachedcopy file incase it has incomplete data
577				if err := os.Remove(o.cachepath); err != nil {
578					return nil, fmt.Errorf("error resetting the cachedcopy err=%v", err) //don't retry on local fs errors
579				}
580				if cachedcopy, err = os.Create(o.cachepath); err != nil {
581					return nil, fmt.Errorf("error creating a new cachedcopy file. local=%s err=%v", o.cachepath, err)
582				}
583
584				cloudstorage.Backoff(try)
585				continue
586			}
587		}
588
589		if readonly {
590			cachedcopy.Close()
591			cachedcopy, err = os.Open(o.cachepath)
592			if err != nil {
593				name := "unknown"
594				if cachedcopy != nil {
595					name = cachedcopy.Name()
596				}
597				return nil, fmt.Errorf("error opening file. local=%s object=%s tfile=%v err=%v", o.cachepath, o.name, name, err)
598			}
599		} else {
600			if _, err := cachedcopy.Seek(0, os.SEEK_SET); err != nil {
601				return nil, fmt.Errorf("error seeking to start of cachedcopy err=%v", err) //don't retry on local fs errors
602			}
603		}
604
605		o.cachedcopy = cachedcopy
606		o.readonly = readonly
607		o.opened = true
608		return o.cachedcopy, nil
609	}
610
611	return nil, fmt.Errorf("fetch error retry cnt reached: obj=%s tfile=%v errs:[%v]", o.name, o.cachepath, errs)
612}
613
614// File get the current file handle for cached copy.
615func (o *object) File() *os.File {
616	return o.cachedcopy
617}
618
619// Read bytes from underlying/cached file
620func (o *object) Read(p []byte) (n int, err error) {
621	return o.cachedcopy.Read(p)
622}
623
624// Write bytes to local file, will be synced on close/sync.
625func (o *object) Write(p []byte) (n int, err error) {
626	if o.cachedcopy == nil {
627		_, err := o.Open(cloudstorage.ReadWrite)
628		if err != nil {
629			return 0, err
630		}
631	}
632	return o.cachedcopy.Write(p)
633}
634
635// Sync syncs any changes in file up to s3.
636func (o *object) Sync() error {
637
638	if !o.opened {
639		return fmt.Errorf("object isn't opened object:%s", o.name)
640	}
641	if o.readonly {
642		return fmt.Errorf("trying to Sync a readonly object:%s", o.name)
643	}
644
645	cachedcopy, err := os.OpenFile(o.cachepath, os.O_RDWR, 0664)
646	if err != nil {
647		return fmt.Errorf("couldn't open localfile for sync'ing. local=%s err=%v", o.cachepath, err)
648	}
649	defer cachedcopy.Close()
650
651	// Create an uploader with the session and default options
652	uploader := s3manager.NewUploader(o.fs.sess)
653
654	if _, err := cachedcopy.Seek(0, os.SEEK_SET); err != nil {
655		return fmt.Errorf("error seeking to start of cachedcopy err=%v", err) //don't retry on local filesystem errors
656	}
657
658	// Upload the file to S3.
659	_, err = uploader.Upload(&s3manager.UploadInput{
660		Bucket: aws.String(o.fs.bucket),
661		Key:    aws.String(o.name),
662		Body:   cachedcopy,
663	})
664	if err != nil {
665		gou.Warnf("could not upload %v", err)
666		return fmt.Errorf("failed to upload file, %v", err)
667	}
668	return nil
669}
670
671// Close this object
672func (o *object) Close() error {
673	if !o.opened {
674		return nil
675	}
676	defer func() {
677		os.Remove(o.cachepath)
678		o.cachedcopy = nil
679		o.opened = false
680	}()
681
682	serr := o.cachedcopy.Sync()
683	cerr := o.cachedcopy.Close()
684	if serr != nil || cerr != nil {
685		return fmt.Errorf("error on sync and closing localfile. %s sync=%v, err=%v", o.cachepath, serr, cerr)
686	}
687
688	if o.opened && !o.readonly {
689		err := o.Sync()
690		if err != nil {
691			gou.Errorf("error on sync %v", err)
692			return err
693		}
694	}
695	return nil
696}
697
698// Release this object, cleanup cached copy.
699func (o *object) Release() error {
700	if o.cachedcopy != nil {
701		gou.Infof("release %q vs %q", o.cachedcopy.Name(), o.cachepath)
702		o.cachedcopy.Close()
703		return os.Remove(o.cachepath)
704	}
705	os.Remove(o.cachepath)
706	return nil
707}
708