1/*
2Copyright 2011 The Perkeep Authors
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package s3 registers the "s3" blobserver storage type, storing
19blobs in an Amazon Web Services' S3 storage bucket.
20
21Example low-level config:
22
23     "/r1/": {
24         "handler": "storage-s3",
25         "handlerArgs": {
26            "bucket": "foo",
27            "aws_region": "us-east-1",
28            "aws_access_key": "...",
29            "aws_secret_access_key": "...",
30            "skipStartupCheck": false
31          }
32     },
33
34*/
35package s3 // import "perkeep.org/pkg/blobserver/s3"
36
37import (
38	"context"
39	"fmt"
40	"net/http"
41	"strings"
42
43	"perkeep.org/pkg/blob"
44	"perkeep.org/pkg/blobserver"
45	"perkeep.org/pkg/blobserver/memory"
46	"perkeep.org/pkg/blobserver/proxycache"
47
48	"github.com/aws/aws-sdk-go/aws"
49	"github.com/aws/aws-sdk-go/aws/awserr"
50	"github.com/aws/aws-sdk-go/aws/credentials"
51	"github.com/aws/aws-sdk-go/aws/session"
52	"github.com/aws/aws-sdk-go/service/s3"
53	"github.com/aws/aws-sdk-go/service/s3/s3iface"
54	"go4.org/fault"
55	"go4.org/jsonconfig"
56)
57
58var (
59	_ blob.SubFetcher               = (*s3Storage)(nil)
60	_ blobserver.MaxEnumerateConfig = (*s3Storage)(nil)
61)
62
63var (
64	faultReceive   = fault.NewInjector("s3_receive")
65	faultEnumerate = fault.NewInjector("s3_enumerate")
66	faultStat      = fault.NewInjector("s3_stat")
67	faultGet       = fault.NewInjector("s3_get")
68)
69
70const maxParallelHTTP = 5
71
72type s3Storage struct {
73	client s3iface.S3API
74	bucket string
75	// optional "directory" where the blobs are stored, instead of at the root of the bucket.
76	// S3 is actually flat, which in effect just means that all the objects should have this
77	// dirPrefix as a prefix of their key.
78	// If non empty, it should be a slash separated path with a trailing slash and no starting
79	// slash.
80	dirPrefix string
81	// hostname indicates the hostname of the server providing an S3 compatible endpoint.
82	// It should not be set for AWS's S3 since the correct endpoint will be
83	// automatically identified based on the bucket name (and, if provided, the
84	// 'aws_region' low-level config option).
85	hostname string
86}
87
88func (s *s3Storage) String() string {
89	if s.dirPrefix != "" {
90		return fmt.Sprintf("\"S3\" blob storage at host %q, bucket %q, directory %q", s.hostname, s.bucket, s.dirPrefix)
91	}
92	return fmt.Sprintf("\"S3\" blob storage at host %q, bucket %q", s.hostname, s.bucket)
93}
94
95func newFromConfig(l blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
96	return newFromConfigWithTransport(l, config, nil)
97}
98
99// newFromConfigWithTransport constructs a s3 blobserver using the given
100// transport for all s3 requests.  The transport may be set to 'nil' to use a
101// default transport.
102// This is used for unit tests.
103func newFromConfigWithTransport(_ blobserver.Loader, config jsonconfig.Obj, transport http.RoundTripper) (blobserver.Storage, error) {
104	hostname := config.OptionalString("hostname", "")
105	region := config.OptionalString("aws_region", "us-east-1")
106
107	cacheSize := config.OptionalInt64("cacheSize", 32<<20)
108	s3Cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(
109		config.RequiredString("aws_access_key"),
110		config.RequiredString("aws_secret_access_key"),
111		"",
112	))
113	if hostname != "" {
114		s3Cfg.WithEndpoint(hostname)
115	}
116	s3Cfg.WithRegion(region)
117	if transport != nil {
118		httpClient := *http.DefaultClient
119		httpClient.Transport = transport
120		s3Cfg.WithHTTPClient(&httpClient)
121	}
122	awsSession := session.New(s3Cfg)
123
124	bucket := config.RequiredString("bucket")
125	var dirPrefix string
126	if parts := strings.SplitN(bucket, "/", 2); len(parts) > 1 {
127		dirPrefix = parts[1]
128		bucket = parts[0]
129	}
130	if dirPrefix != "" && !strings.HasSuffix(dirPrefix, "/") {
131		dirPrefix += "/"
132	}
133
134	skipStartupCheck := config.OptionalBool("skipStartupCheck", false)
135	if err := config.Validate(); err != nil {
136		return nil, err
137	}
138
139	ctx := context.TODO() // TODO: 5 min timeout or something?
140	if !skipStartupCheck {
141		info, err := normalizeBucketLocation(ctx, awsSession, hostname, bucket)
142		if err != nil {
143			return nil, err
144		}
145		awsSession.Config.WithRegion(info.region)
146		awsSession.Config.WithEndpoint(info.endpoint)
147		if !info.isAWS {
148			awsSession.Config.WithS3ForcePathStyle(true)
149		}
150	} else {
151		// safer default if we can't determine more info
152		awsSession.Config.WithS3ForcePathStyle(true)
153	}
154
155	sto := &s3Storage{
156		client:    s3.New(awsSession),
157		bucket:    bucket,
158		dirPrefix: dirPrefix,
159		hostname:  hostname,
160	}
161
162	if cacheSize != 0 {
163		// This has two layers of LRU caching (proxycache and memory).
164		// We make the outer one 4x the size so that it doesn't evict from the
165		// underlying one when it's about to perform its own eviction.
166		return proxycache.New(cacheSize<<2, memory.NewCache(cacheSize), sto), nil
167	}
168	return sto, nil
169}
170
171func init() {
172	blobserver.RegisterStorageConstructor("s3", blobserver.StorageConstructor(newFromConfig))
173}
174
175// isNotFound checks for s3 errors which indicate the object doesn't exist.
176func isNotFound(err error) bool {
177	if err == nil {
178		return false
179	}
180	if aerr, ok := err.(awserr.Error); ok {
181		return aerr.Code() == s3.ErrCodeNoSuchKey ||
182			// Check 'NotFound' as well because it's returned for some requests, even
183			// though the API model doesn't include it (hence why there isn't an
184			// 's3.ErrCodeNotFound' for comparison)
185			aerr.Code() == "NotFound"
186	}
187	return false
188}
189