1// Package gcsstore provides a Google cloud storage based backend. 2// 3// GCSStore is a storage backend that uses the GCSAPI interface in order to store uploads 4// on GCS. Uploads will be represented by two files in GCS; the data file will be stored 5// as an extensionless object [uid] and the JSON info file will stored as [uid].info. 6// In order to store uploads on GCS, make sure to specifiy the appropriate Google service 7// account file path in the GCS_SERVICE_ACCOUNT_FILE environment variable. Also make sure that 8// this service account file has the "https://www.googleapis.com/auth/devstorage.read_write" 9// scope enabled so you can read and write data to the storage buckets associated with the 10// service account file. 11package gcsstore 12 13import ( 14 "bytes" 15 "encoding/json" 16 "fmt" 17 "io" 18 "strconv" 19 "strings" 20 "sync" 21 "sync/atomic" 22 23 "golang.org/x/net/context" 24 25 "cloud.google.com/go/storage" 26 "github.com/tus/tusd" 27 "github.com/tus/tusd/uid" 28) 29 30// See the tusd.DataStore interface for documentation about the different 31// methods. 32type GCSStore struct { 33 // Specifies the GCS bucket that uploads will be stored in 34 Bucket string 35 36 // ObjectPrefix is prepended to the name of each GCS object that is created. 37 // It can be used to create a pseudo-directory structure in the bucket, 38 // e.g. "path/to/my/uploads". 39 ObjectPrefix string 40 41 // Service specifies an interface used to communicate with the Google 42 // cloud storage backend. Implementation can be seen in gcsservice file. 43 Service GCSAPI 44} 45 46// New constructs a new GCS storage backend using the supplied GCS bucket name 47// and service object. 48func New(bucket string, service GCSAPI) GCSStore { 49 return GCSStore{ 50 Bucket: bucket, 51 Service: service, 52 } 53} 54 55func (store GCSStore) UseIn(composer *tusd.StoreComposer) { 56 composer.UseCore(store) 57 composer.UseTerminater(store) 58 composer.UseFinisher(store) 59 composer.UseGetReader(store) 60} 61 62func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) { 63 if info.ID == "" { 64 info.ID = uid.Uid() 65 } 66 67 ctx := context.Background() 68 err = store.writeInfo(ctx, store.keyWithPrefix(info.ID), info) 69 if err != nil { 70 return info.ID, err 71 } 72 73 return info.ID, nil 74} 75 76func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { 77 prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) 78 filterParams := GCSFilterParams{ 79 Bucket: store.Bucket, 80 Prefix: prefix, 81 } 82 83 ctx := context.Background() 84 names, err := store.Service.FilterObjects(ctx, filterParams) 85 if err != nil { 86 return 0, err 87 } 88 89 maxIdx := -1 90 91 for _, name := range names { 92 split := strings.Split(name, "_") 93 idx, err := strconv.Atoi(split[len(split)-1]) 94 if err != nil { 95 return 0, err 96 } 97 98 if idx > maxIdx { 99 maxIdx = idx 100 } 101 } 102 103 cid := fmt.Sprintf("%s_%d", store.keyWithPrefix(id), maxIdx+1) 104 objectParams := GCSObjectParams{ 105 Bucket: store.Bucket, 106 ID: cid, 107 } 108 109 n, err := store.Service.WriteObject(ctx, objectParams, src) 110 if err != nil { 111 return 0, err 112 } 113 114 return n, err 115} 116 117const CONCURRENT_SIZE_REQUESTS = 32 118 119func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { 120 info := tusd.FileInfo{} 121 i := fmt.Sprintf("%s.info", store.keyWithPrefix(id)) 122 123 params := GCSObjectParams{ 124 Bucket: store.Bucket, 125 ID: i, 126 } 127 128 ctx := context.Background() 129 r, err := store.Service.ReadObject(ctx, params) 130 if err != nil { 131 if err == storage.ErrObjectNotExist { 132 return info, tusd.ErrNotFound 133 } 134 return info, err 135 } 136 137 buf := make([]byte, r.Size()) 138 _, err = r.Read(buf) 139 if err != nil { 140 return info, err 141 } 142 143 if err := json.Unmarshal(buf, &info); err != nil { 144 return info, err 145 } 146 147 prefix := fmt.Sprintf("%s", store.keyWithPrefix(id)) 148 filterParams := GCSFilterParams{ 149 Bucket: store.Bucket, 150 Prefix: prefix, 151 } 152 153 names, err := store.Service.FilterObjects(ctx, filterParams) 154 if err != nil { 155 return info, err 156 } 157 158 var offset int64 = 0 159 var firstError error = nil 160 var wg sync.WaitGroup 161 162 sem := make(chan struct{}, CONCURRENT_SIZE_REQUESTS) 163 errChan := make(chan error) 164 ctxCancel, cancel := context.WithCancel(ctx) 165 defer cancel() 166 167 go func() { 168 for err := range errChan { 169 if err != context.Canceled && firstError == nil { 170 firstError = err 171 cancel() 172 } 173 } 174 }() 175 176 for _, name := range names { 177 sem <- struct{}{} 178 wg.Add(1) 179 params = GCSObjectParams{ 180 Bucket: store.Bucket, 181 ID: name, 182 } 183 184 go func(params GCSObjectParams) { 185 defer func() { 186 <-sem 187 wg.Done() 188 }() 189 190 size, err := store.Service.GetObjectSize(ctxCancel, params) 191 192 if err != nil { 193 errChan <- err 194 return 195 } 196 197 atomic.AddInt64(&offset, size) 198 }(params) 199 } 200 201 wg.Wait() 202 close(errChan) 203 204 if firstError != nil { 205 return info, firstError 206 } 207 208 info.Offset = offset 209 err = store.writeInfo(ctx, store.keyWithPrefix(id), info) 210 if err != nil { 211 return info, err 212 } 213 214 return info, nil 215} 216 217func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileInfo) error { 218 data, err := json.Marshal(info) 219 if err != nil { 220 return err 221 } 222 223 r := bytes.NewReader(data) 224 225 i := fmt.Sprintf("%s.info", id) 226 params := GCSObjectParams{ 227 Bucket: store.Bucket, 228 ID: i, 229 } 230 231 _, err = store.Service.WriteObject(ctx, params, r) 232 if err != nil { 233 return err 234 } 235 236 return nil 237} 238 239func (store GCSStore) FinishUpload(id string) error { 240 prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) 241 filterParams := GCSFilterParams{ 242 Bucket: store.Bucket, 243 Prefix: prefix, 244 } 245 246 ctx := context.Background() 247 names, err := store.Service.FilterObjects(ctx, filterParams) 248 if err != nil { 249 return err 250 } 251 252 composeParams := GCSComposeParams{ 253 Bucket: store.Bucket, 254 Destination: store.keyWithPrefix(id), 255 Sources: names, 256 } 257 258 err = store.Service.ComposeObjects(ctx, composeParams) 259 if err != nil { 260 return err 261 } 262 263 err = store.Service.DeleteObjectsWithFilter(ctx, filterParams) 264 if err != nil { 265 return err 266 } 267 268 info, err := store.GetInfo(id) 269 if err != nil { 270 return err 271 } 272 273 objectParams := GCSObjectParams{ 274 Bucket: store.Bucket, 275 ID: store.keyWithPrefix(id), 276 } 277 278 err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData) 279 if err != nil { 280 return err 281 } 282 283 return nil 284} 285 286func (store GCSStore) Terminate(id string) error { 287 filterParams := GCSFilterParams{ 288 Bucket: store.Bucket, 289 Prefix: store.keyWithPrefix(id), 290 } 291 292 ctx := context.Background() 293 err := store.Service.DeleteObjectsWithFilter(ctx, filterParams) 294 if err != nil { 295 return err 296 } 297 298 return nil 299} 300 301func (store GCSStore) GetReader(id string) (io.Reader, error) { 302 params := GCSObjectParams{ 303 Bucket: store.Bucket, 304 ID: store.keyWithPrefix(id), 305 } 306 307 ctx := context.Background() 308 r, err := store.Service.ReadObject(ctx, params) 309 if err != nil { 310 return nil, err 311 } 312 313 return r, nil 314} 315 316func (store GCSStore) keyWithPrefix(key string) string { 317 prefix := store.ObjectPrefix 318 if prefix != "" && !strings.HasSuffix(prefix, "/") { 319 prefix += "/" 320 } 321 return prefix + key 322} 323