1// Copyright 2016 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"errors"
19	"fmt"
20
21	"golang.org/x/net/context"
22	raw "google.golang.org/api/storage/v1"
23)
24
25// CopierFrom creates a Copier that can copy src to dst.
26// You can immediately call Run on the returned Copier, or
27// you can configure it first.
28//
29// For Requester Pays buckets, the user project of dst is billed, unless it is empty,
30// in which case the user project of src is billed.
31func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier {
32	return &Copier{dst: dst, src: src}
33}
34
35// A Copier copies a source object to a destination.
36type Copier struct {
37	// ObjectAttrs are optional attributes to set on the destination object.
38	// Any attributes must be initialized before any calls on the Copier. Nil
39	// or zero-valued attributes are ignored.
40	ObjectAttrs
41
42	// RewriteToken can be set before calling Run to resume a copy
43	// operation. After Run returns a non-nil error, RewriteToken will
44	// have been updated to contain the value needed to resume the copy.
45	RewriteToken string
46
47	// ProgressFunc can be used to monitor the progress of a multi-RPC copy
48	// operation. If ProgressFunc is not nil and copying requires multiple
49	// calls to the underlying service (see
50	// https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then
51	// ProgressFunc will be invoked after each call with the number of bytes of
52	// content copied so far and the total size in bytes of the source object.
53	//
54	// ProgressFunc is intended to make upload progress available to the
55	// application. For example, the implementation of ProgressFunc may update
56	// a progress bar in the application's UI, or log the result of
57	// float64(copiedBytes)/float64(totalBytes).
58	//
59	// ProgressFunc should return quickly without blocking.
60	ProgressFunc func(copiedBytes, totalBytes uint64)
61
62	dst, src *ObjectHandle
63}
64
65// Run performs the copy.
66func (c *Copier) Run(ctx context.Context) (*ObjectAttrs, error) {
67	if err := c.src.validate(); err != nil {
68		return nil, err
69	}
70	if err := c.dst.validate(); err != nil {
71		return nil, err
72	}
73	// Convert destination attributes to raw form, omitting the bucket.
74	// If the bucket is included but name or content-type aren't, the service
75	// returns a 400 with "Required" as the only message. Omitting the bucket
76	// does not cause any problems.
77	rawObject := c.ObjectAttrs.toRawObject("")
78	for {
79		res, err := c.callRewrite(ctx, rawObject)
80		if err != nil {
81			return nil, err
82		}
83		if c.ProgressFunc != nil {
84			c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize))
85		}
86		if res.Done { // Finished successfully.
87			return newObject(res.Resource), nil
88		}
89	}
90}
91
92func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) {
93	call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj)
94
95	call.Context(ctx).Projection("full")
96	if c.RewriteToken != "" {
97		call.RewriteToken(c.RewriteToken)
98	}
99	if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil {
100		return nil, err
101	}
102	if c.dst.userProject != "" {
103		call.UserProject(c.dst.userProject)
104	} else if c.src.userProject != "" {
105		call.UserProject(c.src.userProject)
106	}
107	if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil {
108		return nil, err
109	}
110	if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
111		return nil, err
112	}
113	if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil {
114		return nil, err
115	}
116	var res *raw.RewriteResponse
117	var err error
118	setClientHeader(call.Header())
119	err = runWithRetry(ctx, func() error { res, err = call.Do(); return err })
120	if err != nil {
121		return nil, err
122	}
123	c.RewriteToken = res.RewriteToken
124	return res, nil
125}
126
127// ComposerFrom creates a Composer that can compose srcs into dst.
128// You can immediately call Run on the returned Composer, or you can
129// configure it first.
130//
131// The encryption key for the destination object will be used to decrypt all
132// source objects and encrypt the destination object. It is an error
133// to specify an encryption key for any of the source objects.
134func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer {
135	return &Composer{dst: dst, srcs: srcs}
136}
137
138// A Composer composes source objects into a destination object.
139//
140// For Requester Pays buckets, the user project of dst is billed.
141type Composer struct {
142	// ObjectAttrs are optional attributes to set on the destination object.
143	// Any attributes must be initialized before any calls on the Composer. Nil
144	// or zero-valued attributes are ignored.
145	ObjectAttrs
146
147	dst  *ObjectHandle
148	srcs []*ObjectHandle
149}
150
151// Run performs the compose operation.
152func (c *Composer) Run(ctx context.Context) (*ObjectAttrs, error) {
153	if err := c.dst.validate(); err != nil {
154		return nil, err
155	}
156	if len(c.srcs) == 0 {
157		return nil, errors.New("storage: at least one source object must be specified")
158	}
159
160	req := &raw.ComposeRequest{}
161	// Compose requires a non-empty Destination, so we always set it,
162	// even if the caller-provided ObjectAttrs is the zero value.
163	req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
164	for _, src := range c.srcs {
165		if err := src.validate(); err != nil {
166			return nil, err
167		}
168		if src.bucket != c.dst.bucket {
169			return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket)
170		}
171		if src.encryptionKey != nil {
172			return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object)
173		}
174		srcObj := &raw.ComposeRequestSourceObjects{
175			Name: src.object,
176		}
177		if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
178			return nil, err
179		}
180		req.SourceObjects = append(req.SourceObjects, srcObj)
181	}
182
183	call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
184	if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil {
185		return nil, err
186	}
187	if c.dst.userProject != "" {
188		call.UserProject(c.dst.userProject)
189	}
190	if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
191		return nil, err
192	}
193	var obj *raw.Object
194	var err error
195	setClientHeader(call.Header())
196	err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
197	if err != nil {
198		return nil, err
199	}
200	return newObject(obj), nil
201}
202