1// Copyright 2014 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "context" 19 "encoding/base64" 20 "errors" 21 "fmt" 22 "io" 23 "sync" 24 "unicode/utf8" 25 26 "google.golang.org/api/googleapi" 27 raw "google.golang.org/api/storage/v1" 28) 29 30// A Writer writes a Cloud Storage object. 31type Writer struct { 32 // ObjectAttrs are optional attributes to set on the object. Any attributes 33 // must be initialized before the first Write call. Nil or zero-valued 34 // attributes are ignored. 35 ObjectAttrs 36 37 // SendCRC specifies whether to transmit a CRC32C field. It should be set 38 // to true in addition to setting the Writer's CRC32C field, because zero 39 // is a valid CRC and normally a zero would not be transmitted. 40 // If a CRC32C is sent, and the data written does not match the checksum, 41 // the write will be rejected. 42 SendCRC32C bool 43 44 // ChunkSize controls the maximum number of bytes of the object that the 45 // Writer will attempt to send to the server in a single request. Objects 46 // smaller than the size will be sent in a single request, while larger 47 // objects will be split over multiple requests. The size will be rounded up 48 // to the nearest multiple of 256K. If zero, chunking will be disabled and 49 // the object will be uploaded in a single request. 50 // 51 // ChunkSize will default to a reasonable value. If you perform many concurrent 52 // writes of small objects, you may wish set ChunkSize to a value that matches 53 // your objects' sizes to avoid consuming large amounts of memory. 54 // 55 // ChunkSize must be set before the first Write call. 56 ChunkSize int 57 58 // ProgressFunc can be used to monitor the progress of a large write. 59 // operation. If ProgressFunc is not nil and writing requires multiple 60 // calls to the underlying service (see 61 // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload), 62 // then ProgressFunc will be invoked after each call with the number of bytes of 63 // content copied so far. 64 // 65 // ProgressFunc should return quickly without blocking. 66 ProgressFunc func(int64) 67 68 ctx context.Context 69 o *ObjectHandle 70 71 opened bool 72 pw *io.PipeWriter 73 74 donec chan struct{} // closed after err and obj are set. 75 obj *ObjectAttrs 76 77 mu sync.Mutex 78 err error 79} 80 81func (w *Writer) open() error { 82 attrs := w.ObjectAttrs 83 // Check the developer didn't change the object Name (this is unfortunate, but 84 // we don't want to store an object under the wrong name). 85 if attrs.Name != w.o.object { 86 return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object) 87 } 88 if !utf8.ValidString(attrs.Name) { 89 return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name) 90 } 91 if attrs.KMSKeyName != "" && w.o.encryptionKey != nil { 92 return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key") 93 } 94 pr, pw := io.Pipe() 95 w.pw = pw 96 w.opened = true 97 98 go w.monitorCancel() 99 100 if w.ChunkSize < 0 { 101 return errors.New("storage: Writer.ChunkSize must be non-negative") 102 } 103 mediaOpts := []googleapi.MediaOption{ 104 googleapi.ChunkSize(w.ChunkSize), 105 } 106 if c := attrs.ContentType; c != "" { 107 mediaOpts = append(mediaOpts, googleapi.ContentType(c)) 108 } 109 110 go func() { 111 defer close(w.donec) 112 113 rawObj := attrs.toRawObject(w.o.bucket) 114 if w.SendCRC32C { 115 rawObj.Crc32c = encodeUint32(attrs.CRC32C) 116 } 117 if w.MD5 != nil { 118 rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5) 119 } 120 if w.o.c.envHost != "" { 121 w.o.c.raw.BasePath = fmt.Sprintf("%s://%s", w.o.c.scheme, w.o.c.envHost) 122 } 123 call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj). 124 Media(pr, mediaOpts...). 125 Projection("full"). 126 Context(w.ctx) 127 128 if w.ProgressFunc != nil { 129 call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) }) 130 } 131 if attrs.KMSKeyName != "" { 132 call.KmsKeyName(attrs.KMSKeyName) 133 } 134 if attrs.PredefinedACL != "" { 135 call.PredefinedAcl(attrs.PredefinedACL) 136 } 137 if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil { 138 w.mu.Lock() 139 w.err = err 140 w.mu.Unlock() 141 pr.CloseWithError(err) 142 return 143 } 144 var resp *raw.Object 145 err := applyConds("NewWriter", w.o.gen, w.o.conds, call) 146 if err == nil { 147 if w.o.userProject != "" { 148 call.UserProject(w.o.userProject) 149 } 150 setClientHeader(call.Header()) 151 152 // The internals that perform call.Do automatically retry 153 // uploading chunks, hence no need to add retries here. 154 // See issue https://github.com/googleapis/google-cloud-go/issues/1507. 155 // 156 // However, since this whole call's internals involve making the initial 157 // resumable upload session, the first HTTP request is not retried. 158 // TODO: Follow-up with google.golang.org/gensupport to solve 159 // https://github.com/googleapis/google-api-go-client/issues/392. 160 resp, err = call.Do() 161 } 162 if err != nil { 163 w.mu.Lock() 164 w.err = err 165 w.mu.Unlock() 166 pr.CloseWithError(err) 167 return 168 } 169 w.obj = newObject(resp) 170 }() 171 return nil 172} 173 174// Write appends to w. It implements the io.Writer interface. 175// 176// Since writes happen asynchronously, Write may return a nil 177// error even though the write failed (or will fail). Always 178// use the error returned from Writer.Close to determine if 179// the upload was successful. 180func (w *Writer) Write(p []byte) (n int, err error) { 181 w.mu.Lock() 182 werr := w.err 183 w.mu.Unlock() 184 if werr != nil { 185 return 0, werr 186 } 187 if !w.opened { 188 if err := w.open(); err != nil { 189 return 0, err 190 } 191 } 192 n, err = w.pw.Write(p) 193 if err != nil { 194 w.mu.Lock() 195 werr := w.err 196 w.mu.Unlock() 197 // Preserve existing functionality that when context is canceled, Write will return 198 // context.Canceled instead of "io: read/write on closed pipe". This hides the 199 // pipe implementation detail from users and makes Write seem as though it's an RPC. 200 if werr == context.Canceled || werr == context.DeadlineExceeded { 201 return n, werr 202 } 203 } 204 return n, err 205} 206 207// Close completes the write operation and flushes any buffered data. 208// If Close doesn't return an error, metadata about the written object 209// can be retrieved by calling Attrs. 210func (w *Writer) Close() error { 211 if !w.opened { 212 if err := w.open(); err != nil { 213 return err 214 } 215 } 216 217 // Closing either the read or write causes the entire pipe to close. 218 if err := w.pw.Close(); err != nil { 219 return err 220 } 221 222 <-w.donec 223 w.mu.Lock() 224 defer w.mu.Unlock() 225 return w.err 226} 227 228// monitorCancel is intended to be used as a background goroutine. It monitors the 229// context, and when it observes that the context has been canceled, it manually 230// closes things that do not take a context. 231func (w *Writer) monitorCancel() { 232 select { 233 case <-w.ctx.Done(): 234 w.mu.Lock() 235 werr := w.ctx.Err() 236 w.err = werr 237 w.mu.Unlock() 238 239 // Closing either the read or write causes the entire pipe to close. 240 w.CloseWithError(werr) 241 case <-w.donec: 242 } 243} 244 245// CloseWithError aborts the write operation with the provided error. 246// CloseWithError always returns nil. 247// 248// Deprecated: cancel the context passed to NewWriter instead. 249func (w *Writer) CloseWithError(err error) error { 250 if !w.opened { 251 return nil 252 } 253 return w.pw.CloseWithError(err) 254} 255 256// Attrs returns metadata about a successfully-written object. 257// It's only valid to call it after Close returns nil. 258func (w *Writer) Attrs() *ObjectAttrs { 259 return w.obj 260} 261