1// Package gcsstore provides a Google cloud storage based backend.
2//
3// GCSStore is a storage backend that uses the GCSAPI interface in order to store uploads
4// on GCS. Uploads will be represented by two files in GCS; the data file will be stored
5// as an extensionless object [uid] and the JSON info file will stored as [uid].info.
6// In order to store uploads on GCS, make sure to specifiy the appropriate Google service
7// account file path in the GCS_SERVICE_ACCOUNT_FILE environment variable. Also make sure that
8// this service account file has the "https://www.googleapis.com/auth/devstorage.read_write"
9// scope enabled so you can read and write data to the storage buckets associated with the
10// service account file.
11package gcsstore
12
13import (
14	"bytes"
15	"encoding/json"
16	"fmt"
17	"io"
18	"strconv"
19	"strings"
20	"sync"
21	"sync/atomic"
22
23	"golang.org/x/net/context"
24
25	"cloud.google.com/go/storage"
26	"github.com/tus/tusd"
27	"github.com/tus/tusd/uid"
28)
29
30// See the tusd.DataStore interface for documentation about the different
31// methods.
32type GCSStore struct {
33	// Specifies the GCS bucket that uploads will be stored in
34	Bucket string
35
36	// ObjectPrefix is prepended to the name of each GCS object that is created.
37	// It can be used to create a pseudo-directory structure in the bucket,
38	// e.g. "path/to/my/uploads".
39	ObjectPrefix string
40
41	// Service specifies an interface used to communicate with the Google
42	// cloud storage backend. Implementation can be seen in gcsservice file.
43	Service GCSAPI
44}
45
46// New constructs a new GCS storage backend using the supplied GCS bucket name
47// and service object.
48func New(bucket string, service GCSAPI) GCSStore {
49	return GCSStore{
50		Bucket:  bucket,
51		Service: service,
52	}
53}
54
55func (store GCSStore) UseIn(composer *tusd.StoreComposer) {
56	composer.UseCore(store)
57	composer.UseTerminater(store)
58	composer.UseFinisher(store)
59	composer.UseGetReader(store)
60}
61
62func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) {
63	if info.ID == "" {
64		info.ID = uid.Uid()
65	}
66
67	ctx := context.Background()
68	err = store.writeInfo(ctx, store.keyWithPrefix(info.ID), info)
69	if err != nil {
70		return info.ID, err
71	}
72
73	return info.ID, nil
74}
75
76func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
77	prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
78	filterParams := GCSFilterParams{
79		Bucket: store.Bucket,
80		Prefix: prefix,
81	}
82
83	ctx := context.Background()
84	names, err := store.Service.FilterObjects(ctx, filterParams)
85	if err != nil {
86		return 0, err
87	}
88
89	maxIdx := -1
90
91	for _, name := range names {
92		split := strings.Split(name, "_")
93		idx, err := strconv.Atoi(split[len(split)-1])
94		if err != nil {
95			return 0, err
96		}
97
98		if idx > maxIdx {
99			maxIdx = idx
100		}
101	}
102
103	cid := fmt.Sprintf("%s_%d", store.keyWithPrefix(id), maxIdx+1)
104	objectParams := GCSObjectParams{
105		Bucket: store.Bucket,
106		ID:     cid,
107	}
108
109	n, err := store.Service.WriteObject(ctx, objectParams, src)
110	if err != nil {
111		return 0, err
112	}
113
114	return n, err
115}
116
117const CONCURRENT_SIZE_REQUESTS = 32
118
119func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
120	info := tusd.FileInfo{}
121	i := fmt.Sprintf("%s.info", store.keyWithPrefix(id))
122
123	params := GCSObjectParams{
124		Bucket: store.Bucket,
125		ID:     i,
126	}
127
128	ctx := context.Background()
129	r, err := store.Service.ReadObject(ctx, params)
130	if err != nil {
131		if err == storage.ErrObjectNotExist {
132			return info, tusd.ErrNotFound
133		}
134		return info, err
135	}
136
137	buf := make([]byte, r.Size())
138	_, err = r.Read(buf)
139	if err != nil {
140		return info, err
141	}
142
143	if err := json.Unmarshal(buf, &info); err != nil {
144		return info, err
145	}
146
147	prefix := fmt.Sprintf("%s", store.keyWithPrefix(id))
148	filterParams := GCSFilterParams{
149		Bucket: store.Bucket,
150		Prefix: prefix,
151	}
152
153	names, err := store.Service.FilterObjects(ctx, filterParams)
154	if err != nil {
155		return info, err
156	}
157
158	var offset int64 = 0
159	var firstError error = nil
160	var wg sync.WaitGroup
161
162	sem := make(chan struct{}, CONCURRENT_SIZE_REQUESTS)
163	errChan := make(chan error)
164	ctxCancel, cancel := context.WithCancel(ctx)
165	defer cancel()
166
167	go func() {
168		for err := range errChan {
169			if err != context.Canceled && firstError == nil {
170				firstError = err
171				cancel()
172			}
173		}
174	}()
175
176	for _, name := range names {
177		sem <- struct{}{}
178		wg.Add(1)
179		params = GCSObjectParams{
180			Bucket: store.Bucket,
181			ID:     name,
182		}
183
184		go func(params GCSObjectParams) {
185			defer func() {
186				<-sem
187				wg.Done()
188			}()
189
190			size, err := store.Service.GetObjectSize(ctxCancel, params)
191
192			if err != nil {
193				errChan <- err
194				return
195			}
196
197			atomic.AddInt64(&offset, size)
198		}(params)
199	}
200
201	wg.Wait()
202	close(errChan)
203
204	if firstError != nil {
205		return info, firstError
206	}
207
208	info.Offset = offset
209	err = store.writeInfo(ctx, store.keyWithPrefix(id), info)
210	if err != nil {
211		return info, err
212	}
213
214	return info, nil
215}
216
217func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileInfo) error {
218	data, err := json.Marshal(info)
219	if err != nil {
220		return err
221	}
222
223	r := bytes.NewReader(data)
224
225	i := fmt.Sprintf("%s.info", id)
226	params := GCSObjectParams{
227		Bucket: store.Bucket,
228		ID:     i,
229	}
230
231	_, err = store.Service.WriteObject(ctx, params, r)
232	if err != nil {
233		return err
234	}
235
236	return nil
237}
238
239func (store GCSStore) FinishUpload(id string) error {
240	prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
241	filterParams := GCSFilterParams{
242		Bucket: store.Bucket,
243		Prefix: prefix,
244	}
245
246	ctx := context.Background()
247	names, err := store.Service.FilterObjects(ctx, filterParams)
248	if err != nil {
249		return err
250	}
251
252	composeParams := GCSComposeParams{
253		Bucket:      store.Bucket,
254		Destination: store.keyWithPrefix(id),
255		Sources:     names,
256	}
257
258	err = store.Service.ComposeObjects(ctx, composeParams)
259	if err != nil {
260		return err
261	}
262
263	err = store.Service.DeleteObjectsWithFilter(ctx, filterParams)
264	if err != nil {
265		return err
266	}
267
268	info, err := store.GetInfo(id)
269	if err != nil {
270		return err
271	}
272
273	objectParams := GCSObjectParams{
274		Bucket: store.Bucket,
275		ID:     store.keyWithPrefix(id),
276	}
277
278	err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData)
279	if err != nil {
280		return err
281	}
282
283	return nil
284}
285
286func (store GCSStore) Terminate(id string) error {
287	filterParams := GCSFilterParams{
288		Bucket: store.Bucket,
289		Prefix: store.keyWithPrefix(id),
290	}
291
292	ctx := context.Background()
293	err := store.Service.DeleteObjectsWithFilter(ctx, filterParams)
294	if err != nil {
295		return err
296	}
297
298	return nil
299}
300
301func (store GCSStore) GetReader(id string) (io.Reader, error) {
302	params := GCSObjectParams{
303		Bucket: store.Bucket,
304		ID:     store.keyWithPrefix(id),
305	}
306
307	ctx := context.Background()
308	r, err := store.Service.ReadObject(ctx, params)
309	if err != nil {
310		return nil, err
311	}
312
313	return r, nil
314}
315
316func (store GCSStore) keyWithPrefix(key string) string {
317	prefix := store.ObjectPrefix
318	if prefix != "" && !strings.HasSuffix(prefix, "/") {
319		prefix += "/"
320	}
321	return prefix + key
322}
323