1/* 2Copyright 2011 The Perkeep Authors 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Package s3 registers the "s3" blobserver storage type, storing 19blobs in an Amazon Web Services' S3 storage bucket. 20 21Example low-level config: 22 23 "/r1/": { 24 "handler": "storage-s3", 25 "handlerArgs": { 26 "bucket": "foo", 27 "aws_region": "us-east-1", 28 "aws_access_key": "...", 29 "aws_secret_access_key": "...", 30 "skipStartupCheck": false 31 } 32 }, 33 34*/ 35package s3 // import "perkeep.org/pkg/blobserver/s3" 36 37import ( 38 "context" 39 "fmt" 40 "net/http" 41 "strings" 42 43 "perkeep.org/pkg/blob" 44 "perkeep.org/pkg/blobserver" 45 "perkeep.org/pkg/blobserver/memory" 46 "perkeep.org/pkg/blobserver/proxycache" 47 48 "github.com/aws/aws-sdk-go/aws" 49 "github.com/aws/aws-sdk-go/aws/awserr" 50 "github.com/aws/aws-sdk-go/aws/credentials" 51 "github.com/aws/aws-sdk-go/aws/session" 52 "github.com/aws/aws-sdk-go/service/s3" 53 "github.com/aws/aws-sdk-go/service/s3/s3iface" 54 "go4.org/fault" 55 "go4.org/jsonconfig" 56) 57 58var ( 59 _ blob.SubFetcher = (*s3Storage)(nil) 60 _ blobserver.MaxEnumerateConfig = (*s3Storage)(nil) 61) 62 63var ( 64 faultReceive = fault.NewInjector("s3_receive") 65 faultEnumerate = fault.NewInjector("s3_enumerate") 66 faultStat = fault.NewInjector("s3_stat") 67 faultGet = fault.NewInjector("s3_get") 68) 69 70const maxParallelHTTP = 5 71 72type s3Storage struct { 73 client s3iface.S3API 74 bucket string 75 // optional "directory" where the blobs are stored, instead of at the root of the bucket. 76 // S3 is actually flat, which in effect just means that all the objects should have this 77 // dirPrefix as a prefix of their key. 78 // If non empty, it should be a slash separated path with a trailing slash and no starting 79 // slash. 80 dirPrefix string 81 // hostname indicates the hostname of the server providing an S3 compatible endpoint. 82 // It should not be set for AWS's S3 since the correct endpoint will be 83 // automatically identified based on the bucket name (and, if provided, the 84 // 'aws_region' low-level config option). 85 hostname string 86} 87 88func (s *s3Storage) String() string { 89 if s.dirPrefix != "" { 90 return fmt.Sprintf("\"S3\" blob storage at host %q, bucket %q, directory %q", s.hostname, s.bucket, s.dirPrefix) 91 } 92 return fmt.Sprintf("\"S3\" blob storage at host %q, bucket %q", s.hostname, s.bucket) 93} 94 95func newFromConfig(l blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { 96 return newFromConfigWithTransport(l, config, nil) 97} 98 99// newFromConfigWithTransport constructs a s3 blobserver using the given 100// transport for all s3 requests. The transport may be set to 'nil' to use a 101// default transport. 102// This is used for unit tests. 103func newFromConfigWithTransport(_ blobserver.Loader, config jsonconfig.Obj, transport http.RoundTripper) (blobserver.Storage, error) { 104 hostname := config.OptionalString("hostname", "") 105 region := config.OptionalString("aws_region", "us-east-1") 106 107 cacheSize := config.OptionalInt64("cacheSize", 32<<20) 108 s3Cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials( 109 config.RequiredString("aws_access_key"), 110 config.RequiredString("aws_secret_access_key"), 111 "", 112 )) 113 if hostname != "" { 114 s3Cfg.WithEndpoint(hostname) 115 } 116 s3Cfg.WithRegion(region) 117 if transport != nil { 118 httpClient := *http.DefaultClient 119 httpClient.Transport = transport 120 s3Cfg.WithHTTPClient(&httpClient) 121 } 122 awsSession := session.New(s3Cfg) 123 124 bucket := config.RequiredString("bucket") 125 var dirPrefix string 126 if parts := strings.SplitN(bucket, "/", 2); len(parts) > 1 { 127 dirPrefix = parts[1] 128 bucket = parts[0] 129 } 130 if dirPrefix != "" && !strings.HasSuffix(dirPrefix, "/") { 131 dirPrefix += "/" 132 } 133 134 skipStartupCheck := config.OptionalBool("skipStartupCheck", false) 135 if err := config.Validate(); err != nil { 136 return nil, err 137 } 138 139 ctx := context.TODO() // TODO: 5 min timeout or something? 140 if !skipStartupCheck { 141 info, err := normalizeBucketLocation(ctx, awsSession, hostname, bucket) 142 if err != nil { 143 return nil, err 144 } 145 awsSession.Config.WithRegion(info.region) 146 awsSession.Config.WithEndpoint(info.endpoint) 147 if !info.isAWS { 148 awsSession.Config.WithS3ForcePathStyle(true) 149 } 150 } else { 151 // safer default if we can't determine more info 152 awsSession.Config.WithS3ForcePathStyle(true) 153 } 154 155 sto := &s3Storage{ 156 client: s3.New(awsSession), 157 bucket: bucket, 158 dirPrefix: dirPrefix, 159 hostname: hostname, 160 } 161 162 if cacheSize != 0 { 163 // This has two layers of LRU caching (proxycache and memory). 164 // We make the outer one 4x the size so that it doesn't evict from the 165 // underlying one when it's about to perform its own eviction. 166 return proxycache.New(cacheSize<<2, memory.NewCache(cacheSize), sto), nil 167 } 168 return sto, nil 169} 170 171func init() { 172 blobserver.RegisterStorageConstructor("s3", blobserver.StorageConstructor(newFromConfig)) 173} 174 175// isNotFound checks for s3 errors which indicate the object doesn't exist. 176func isNotFound(err error) bool { 177 if err == nil { 178 return false 179 } 180 if aerr, ok := err.(awserr.Error); ok { 181 return aerr.Code() == s3.ErrCodeNoSuchKey || 182 // Check 'NotFound' as well because it's returned for some requests, even 183 // though the API model doesn't include it (hence why there isn't an 184 // 's3.ErrCodeNotFound' for comparison) 185 aerr.Code() == "NotFound" 186 } 187 return false 188} 189