1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"context"
19	"errors"
20	"fmt"
21
22	"cloud.google.com/go/internal/trace"
23	raw "google.golang.org/api/storage/v1"
24)
25
26// CopierFrom creates a Copier that can copy src to dst.
27// You can immediately call Run on the returned Copier, or
28// you can configure it first.
29//
30// For Requester Pays buckets, the user project of dst is billed, unless it is empty,
31// in which case the user project of src is billed.
32func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier {
33	return &Copier{dst: dst, src: src}
34}
35
36// A Copier copies a source object to a destination.
37type Copier struct {
38	// ObjectAttrs are optional attributes to set on the destination object.
39	// Any attributes must be initialized before any calls on the Copier. Nil
40	// or zero-valued attributes are ignored.
41	ObjectAttrs
42
43	// RewriteToken can be set before calling Run to resume a copy
44	// operation. After Run returns a non-nil error, RewriteToken will
45	// have been updated to contain the value needed to resume the copy.
46	RewriteToken string
47
48	// ProgressFunc can be used to monitor the progress of a multi-RPC copy
49	// operation. If ProgressFunc is not nil and copying requires multiple
50	// calls to the underlying service (see
51	// https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then
52	// ProgressFunc will be invoked after each call with the number of bytes of
53	// content copied so far and the total size in bytes of the source object.
54	//
55	// ProgressFunc is intended to make upload progress available to the
56	// application. For example, the implementation of ProgressFunc may update
57	// a progress bar in the application's UI, or log the result of
58	// float64(copiedBytes)/float64(totalBytes).
59	//
60	// ProgressFunc should return quickly without blocking.
61	ProgressFunc func(copiedBytes, totalBytes uint64)
62
63	// The Cloud KMS key, in the form projects/P/locations/L/keyRings/R/cryptoKeys/K,
64	// that will be used to encrypt the object. Overrides the object's KMSKeyName, if
65	// any.
66	//
67	// Providing both a DestinationKMSKeyName and a customer-supplied encryption key
68	// (via ObjectHandle.Key) on the destination object will result in an error when
69	// Run is called.
70	DestinationKMSKeyName string
71
72	dst, src *ObjectHandle
73}
74
75// Run performs the copy.
76func (c *Copier) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
77	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Copier.Run")
78	defer func() { trace.EndSpan(ctx, err) }()
79
80	if err := c.src.validate(); err != nil {
81		return nil, err
82	}
83	if err := c.dst.validate(); err != nil {
84		return nil, err
85	}
86	if c.DestinationKMSKeyName != "" && c.dst.encryptionKey != nil {
87		return nil, errors.New("storage: cannot use DestinationKMSKeyName with a customer-supplied encryption key")
88	}
89	// Convert destination attributes to raw form, omitting the bucket.
90	// If the bucket is included but name or content-type aren't, the service
91	// returns a 400 with "Required" as the only message. Omitting the bucket
92	// does not cause any problems.
93	rawObject := c.ObjectAttrs.toRawObject("")
94	for {
95		res, err := c.callRewrite(ctx, rawObject)
96		if err != nil {
97			return nil, err
98		}
99		if c.ProgressFunc != nil {
100			c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize))
101		}
102		if res.Done { // Finished successfully.
103			return newObject(res.Resource), nil
104		}
105	}
106}
107
108func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) {
109	call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj)
110
111	call.Context(ctx).Projection("full")
112	if c.RewriteToken != "" {
113		call.RewriteToken(c.RewriteToken)
114	}
115	if c.DestinationKMSKeyName != "" {
116		call.DestinationKmsKeyName(c.DestinationKMSKeyName)
117	}
118	if c.PredefinedACL != "" {
119		call.DestinationPredefinedAcl(c.PredefinedACL)
120	}
121	if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil {
122		return nil, err
123	}
124	if c.dst.userProject != "" {
125		call.UserProject(c.dst.userProject)
126	} else if c.src.userProject != "" {
127		call.UserProject(c.src.userProject)
128	}
129	if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil {
130		return nil, err
131	}
132	if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
133		return nil, err
134	}
135	if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil {
136		return nil, err
137	}
138	var res *raw.RewriteResponse
139	var err error
140	setClientHeader(call.Header())
141	err = runWithRetry(ctx, func() error { res, err = call.Do(); return err })
142	if err != nil {
143		return nil, err
144	}
145	c.RewriteToken = res.RewriteToken
146	return res, nil
147}
148
149// ComposerFrom creates a Composer that can compose srcs into dst.
150// You can immediately call Run on the returned Composer, or you can
151// configure it first.
152//
153// The encryption key for the destination object will be used to decrypt all
154// source objects and encrypt the destination object. It is an error
155// to specify an encryption key for any of the source objects.
156func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer {
157	return &Composer{dst: dst, srcs: srcs}
158}
159
160// A Composer composes source objects into a destination object.
161//
162// For Requester Pays buckets, the user project of dst is billed.
163type Composer struct {
164	// ObjectAttrs are optional attributes to set on the destination object.
165	// Any attributes must be initialized before any calls on the Composer. Nil
166	// or zero-valued attributes are ignored.
167	ObjectAttrs
168
169	// SendCRC specifies whether to transmit a CRC32C field. It should be set
170	// to true in addition to setting the Composer's CRC32C field, because zero
171	// is a valid CRC and normally a zero would not be transmitted.
172	// If a CRC32C is sent, and the data in the destination object does not match
173	// the checksum, the compose will be rejected.
174	SendCRC32C bool
175
176	dst  *ObjectHandle
177	srcs []*ObjectHandle
178}
179
180// Run performs the compose operation.
181func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
182	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Composer.Run")
183	defer func() { trace.EndSpan(ctx, err) }()
184
185	if err := c.dst.validate(); err != nil {
186		return nil, err
187	}
188	if len(c.srcs) == 0 {
189		return nil, errors.New("storage: at least one source object must be specified")
190	}
191
192	req := &raw.ComposeRequest{}
193	// Compose requires a non-empty Destination, so we always set it,
194	// even if the caller-provided ObjectAttrs is the zero value.
195	req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
196	if c.SendCRC32C {
197		req.Destination.Crc32c = encodeUint32(c.ObjectAttrs.CRC32C)
198	}
199	for _, src := range c.srcs {
200		if err := src.validate(); err != nil {
201			return nil, err
202		}
203		if src.bucket != c.dst.bucket {
204			return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket)
205		}
206		if src.encryptionKey != nil {
207			return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object)
208		}
209		srcObj := &raw.ComposeRequestSourceObjects{
210			Name: src.object,
211		}
212		if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
213			return nil, err
214		}
215		req.SourceObjects = append(req.SourceObjects, srcObj)
216	}
217
218	call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
219	if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil {
220		return nil, err
221	}
222	if c.dst.userProject != "" {
223		call.UserProject(c.dst.userProject)
224	}
225	if c.PredefinedACL != "" {
226		call.DestinationPredefinedAcl(c.PredefinedACL)
227	}
228	if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
229		return nil, err
230	}
231	var obj *raw.Object
232	setClientHeader(call.Header())
233	err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
234	if err != nil {
235		return nil, err
236	}
237	return newObject(obj), nil
238}
239