1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 22 "cloud.google.com/go/internal/trace" 23 raw "google.golang.org/api/storage/v1" 24) 25 26// CopierFrom creates a Copier that can copy src to dst. 27// You can immediately call Run on the returned Copier, or 28// you can configure it first. 29// 30// For Requester Pays buckets, the user project of dst is billed, unless it is empty, 31// in which case the user project of src is billed. 32func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier { 33 return &Copier{dst: dst, src: src} 34} 35 36// A Copier copies a source object to a destination. 37type Copier struct { 38 // ObjectAttrs are optional attributes to set on the destination object. 39 // Any attributes must be initialized before any calls on the Copier. Nil 40 // or zero-valued attributes are ignored. 41 ObjectAttrs 42 43 // RewriteToken can be set before calling Run to resume a copy 44 // operation. After Run returns a non-nil error, RewriteToken will 45 // have been updated to contain the value needed to resume the copy. 46 RewriteToken string 47 48 // ProgressFunc can be used to monitor the progress of a multi-RPC copy 49 // operation. If ProgressFunc is not nil and copying requires multiple 50 // calls to the underlying service (see 51 // https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then 52 // ProgressFunc will be invoked after each call with the number of bytes of 53 // content copied so far and the total size in bytes of the source object. 54 // 55 // ProgressFunc is intended to make upload progress available to the 56 // application. For example, the implementation of ProgressFunc may update 57 // a progress bar in the application's UI, or log the result of 58 // float64(copiedBytes)/float64(totalBytes). 59 // 60 // ProgressFunc should return quickly without blocking. 61 ProgressFunc func(copiedBytes, totalBytes uint64) 62 63 // The Cloud KMS key, in the form projects/P/locations/L/keyRings/R/cryptoKeys/K, 64 // that will be used to encrypt the object. Overrides the object's KMSKeyName, if 65 // any. 66 // 67 // Providing both a DestinationKMSKeyName and a customer-supplied encryption key 68 // (via ObjectHandle.Key) on the destination object will result in an error when 69 // Run is called. 70 DestinationKMSKeyName string 71 72 dst, src *ObjectHandle 73} 74 75// Run performs the copy. 76func (c *Copier) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { 77 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Copier.Run") 78 defer func() { trace.EndSpan(ctx, err) }() 79 80 if err := c.src.validate(); err != nil { 81 return nil, err 82 } 83 if err := c.dst.validate(); err != nil { 84 return nil, err 85 } 86 if c.DestinationKMSKeyName != "" && c.dst.encryptionKey != nil { 87 return nil, errors.New("storage: cannot use DestinationKMSKeyName with a customer-supplied encryption key") 88 } 89 // Convert destination attributes to raw form, omitting the bucket. 90 // If the bucket is included but name or content-type aren't, the service 91 // returns a 400 with "Required" as the only message. Omitting the bucket 92 // does not cause any problems. 93 rawObject := c.ObjectAttrs.toRawObject("") 94 for { 95 res, err := c.callRewrite(ctx, rawObject) 96 if err != nil { 97 return nil, err 98 } 99 if c.ProgressFunc != nil { 100 c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize)) 101 } 102 if res.Done { // Finished successfully. 103 return newObject(res.Resource), nil 104 } 105 } 106} 107 108func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) { 109 call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj) 110 111 call.Context(ctx).Projection("full") 112 if c.RewriteToken != "" { 113 call.RewriteToken(c.RewriteToken) 114 } 115 if c.DestinationKMSKeyName != "" { 116 call.DestinationKmsKeyName(c.DestinationKMSKeyName) 117 } 118 if c.PredefinedACL != "" { 119 call.DestinationPredefinedAcl(c.PredefinedACL) 120 } 121 if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil { 122 return nil, err 123 } 124 if c.dst.userProject != "" { 125 call.UserProject(c.dst.userProject) 126 } else if c.src.userProject != "" { 127 call.UserProject(c.src.userProject) 128 } 129 if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil { 130 return nil, err 131 } 132 if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { 133 return nil, err 134 } 135 if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil { 136 return nil, err 137 } 138 var res *raw.RewriteResponse 139 var err error 140 setClientHeader(call.Header()) 141 err = runWithRetry(ctx, func() error { res, err = call.Do(); return err }) 142 if err != nil { 143 return nil, err 144 } 145 c.RewriteToken = res.RewriteToken 146 return res, nil 147} 148 149// ComposerFrom creates a Composer that can compose srcs into dst. 150// You can immediately call Run on the returned Composer, or you can 151// configure it first. 152// 153// The encryption key for the destination object will be used to decrypt all 154// source objects and encrypt the destination object. It is an error 155// to specify an encryption key for any of the source objects. 156func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer { 157 return &Composer{dst: dst, srcs: srcs} 158} 159 160// A Composer composes source objects into a destination object. 161// 162// For Requester Pays buckets, the user project of dst is billed. 163type Composer struct { 164 // ObjectAttrs are optional attributes to set on the destination object. 165 // Any attributes must be initialized before any calls on the Composer. Nil 166 // or zero-valued attributes are ignored. 167 ObjectAttrs 168 169 // SendCRC specifies whether to transmit a CRC32C field. It should be set 170 // to true in addition to setting the Composer's CRC32C field, because zero 171 // is a valid CRC and normally a zero would not be transmitted. 172 // If a CRC32C is sent, and the data in the destination object does not match 173 // the checksum, the compose will be rejected. 174 SendCRC32C bool 175 176 dst *ObjectHandle 177 srcs []*ObjectHandle 178} 179 180// Run performs the compose operation. 181func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { 182 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Composer.Run") 183 defer func() { trace.EndSpan(ctx, err) }() 184 185 if err := c.dst.validate(); err != nil { 186 return nil, err 187 } 188 if len(c.srcs) == 0 { 189 return nil, errors.New("storage: at least one source object must be specified") 190 } 191 192 req := &raw.ComposeRequest{} 193 // Compose requires a non-empty Destination, so we always set it, 194 // even if the caller-provided ObjectAttrs is the zero value. 195 req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket) 196 if c.SendCRC32C { 197 req.Destination.Crc32c = encodeUint32(c.ObjectAttrs.CRC32C) 198 } 199 for _, src := range c.srcs { 200 if err := src.validate(); err != nil { 201 return nil, err 202 } 203 if src.bucket != c.dst.bucket { 204 return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket) 205 } 206 if src.encryptionKey != nil { 207 return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object) 208 } 209 srcObj := &raw.ComposeRequestSourceObjects{ 210 Name: src.object, 211 } 212 if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil { 213 return nil, err 214 } 215 req.SourceObjects = append(req.SourceObjects, srcObj) 216 } 217 218 call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx) 219 if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil { 220 return nil, err 221 } 222 if c.dst.userProject != "" { 223 call.UserProject(c.dst.userProject) 224 } 225 if c.PredefinedACL != "" { 226 call.DestinationPredefinedAcl(c.PredefinedACL) 227 } 228 if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { 229 return nil, err 230 } 231 var obj *raw.Object 232 setClientHeader(call.Header()) 233 err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) 234 if err != nil { 235 return nil, err 236 } 237 return newObject(obj), nil 238} 239