1package swift 2 3import ( 4 "bufio" 5 "bytes" 6 "crypto/rand" 7 "crypto/sha1" 8 "encoding/hex" 9 "errors" 10 "fmt" 11 "io" 12 "os" 13 gopath "path" 14 "strconv" 15 "strings" 16 "time" 17) 18 19// NotLargeObject is returned if an operation is performed on an object which isn't large. 20var NotLargeObject = errors.New("Not a large object") 21 22// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded 23var readAfterWriteTimeout = 15 * time.Second 24 25// readAfterWriteWait defines the time to sleep between two retries 26var readAfterWriteWait = 200 * time.Millisecond 27 28// largeObjectCreateFile represents an open static or dynamic large object 29type largeObjectCreateFile struct { 30 conn *Connection 31 container string 32 objectName string 33 currentLength int64 34 filePos int64 35 chunkSize int64 36 segmentContainer string 37 prefix string 38 contentType string 39 checkHash bool 40 segments []Object 41 headers Headers 42 minChunkSize int64 43} 44 45func swiftSegmentPath(path string) (string, error) { 46 checksum := sha1.New() 47 random := make([]byte, 32) 48 if _, err := rand.Read(random); err != nil { 49 return "", err 50 } 51 path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...))) 52 return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil 53} 54 55func getSegment(segmentPath string, partNumber int) string { 56 return fmt.Sprintf("%s/%016d", segmentPath, partNumber) 57} 58 59func parseFullPath(manifest string) (container string, prefix string) { 60 components := strings.SplitN(manifest, "/", 2) 61 container = components[0] 62 if len(components) > 1 { 63 prefix = components[1] 64 } 65 return container, prefix 66} 67 68func (headers Headers) IsLargeObjectDLO() bool { 69 _, isDLO := headers["X-Object-Manifest"] 70 return isDLO 71} 72 73func (headers Headers) IsLargeObjectSLO() bool { 74 _, isSLO := headers["X-Static-Large-Object"] 75 return isSLO 76} 77 78func (headers Headers) IsLargeObject() bool { 79 return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO() 80} 81 82func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) { 83 if manifest, isDLO := headers["X-Object-Manifest"]; isDLO { 84 segmentContainer, segmentPath := parseFullPath(manifest) 85 segments, err := c.getAllDLOSegments(segmentContainer, segmentPath) 86 return segmentContainer, segments, err 87 } 88 if headers.IsLargeObjectSLO() { 89 return c.getAllSLOSegments(container, path) 90 } 91 return "", nil, NotLargeObject 92} 93 94// LargeObjectOpts describes how a large object should be created 95type LargeObjectOpts struct { 96 Container string // Name of container to place object 97 ObjectName string // Name of object 98 Flags int // Creation flags 99 CheckHash bool // If set Check the hash 100 Hash string // If set use this hash to check 101 ContentType string // Content-Type of the object 102 Headers Headers // Additional headers to upload the object with 103 ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set 104 MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info 105 SegmentContainer string // Name of the container to place segments 106 SegmentPrefix string // Prefix to use for the segments 107 NoBuffer bool // Prevents using a bufio.Writer to write segments 108} 109 110type LargeObjectFile interface { 111 io.Writer 112 io.Seeker 113 io.Closer 114 Size() int64 115 Flush() error 116} 117 118// largeObjectCreate creates a large object at opts.Container, opts.ObjectName. 119// 120// opts.Flags can have the following bits set 121// os.TRUNC - remove the contents of the large object if it exists 122// os.APPEND - write at the end of the large object 123func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) { 124 var ( 125 segmentPath string 126 segmentContainer string 127 segments []Object 128 currentLength int64 129 err error 130 ) 131 132 if opts.SegmentPrefix != "" { 133 segmentPath = opts.SegmentPrefix 134 } else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil { 135 return nil, err 136 } 137 138 if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil { 139 if opts.Flags&os.O_TRUNC != 0 { 140 c.LargeObjectDelete(opts.Container, opts.ObjectName) 141 } else { 142 currentLength = info.Bytes 143 if headers.IsLargeObject() { 144 segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers) 145 if err != nil { 146 return nil, err 147 } 148 if len(segments) > 0 { 149 segmentPath = gopath.Dir(segments[0].Name) 150 } 151 } else { 152 if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil { 153 return nil, err 154 } 155 segments = append(segments, info) 156 } 157 } 158 } else if err != ObjectNotFound { 159 return nil, err 160 } 161 162 // segmentContainer is not empty when the manifest already existed 163 if segmentContainer == "" { 164 if opts.SegmentContainer != "" { 165 segmentContainer = opts.SegmentContainer 166 } else { 167 segmentContainer = opts.Container + "_segments" 168 } 169 } 170 171 file := &largeObjectCreateFile{ 172 conn: c, 173 checkHash: opts.CheckHash, 174 container: opts.Container, 175 objectName: opts.ObjectName, 176 chunkSize: opts.ChunkSize, 177 minChunkSize: opts.MinChunkSize, 178 headers: opts.Headers, 179 segmentContainer: segmentContainer, 180 prefix: segmentPath, 181 segments: segments, 182 currentLength: currentLength, 183 } 184 185 if file.chunkSize == 0 { 186 file.chunkSize = 10 * 1024 * 1024 187 } 188 189 if file.minChunkSize > file.chunkSize { 190 file.chunkSize = file.minChunkSize 191 } 192 193 if opts.Flags&os.O_APPEND != 0 { 194 file.filePos = currentLength 195 } 196 197 return file, nil 198} 199 200// LargeObjectDelete deletes the large object named by container, path 201func (c *Connection) LargeObjectDelete(container string, objectName string) error { 202 _, headers, err := c.Object(container, objectName) 203 if err != nil { 204 return err 205 } 206 207 var objects [][]string 208 if headers.IsLargeObject() { 209 segmentContainer, segments, err := c.getAllSegments(container, objectName, headers) 210 if err != nil { 211 return err 212 } 213 for _, obj := range segments { 214 objects = append(objects, []string{segmentContainer, obj.Name}) 215 } 216 } 217 objects = append(objects, []string{container, objectName}) 218 219 info, err := c.cachedQueryInfo() 220 if err == nil && info.SupportsBulkDelete() && len(objects) > 0 { 221 filenames := make([]string, len(objects)) 222 for i, obj := range objects { 223 filenames[i] = obj[0] + "/" + obj[1] 224 } 225 _, err = c.doBulkDelete(filenames) 226 // Don't fail on ObjectNotFound because eventual consistency 227 // makes this situation normal. 228 if err != nil && err != Forbidden && err != ObjectNotFound { 229 return err 230 } 231 } else { 232 for _, obj := range objects { 233 if err := c.ObjectDelete(obj[0], obj[1]); err != nil { 234 return err 235 } 236 } 237 } 238 239 return nil 240} 241 242// LargeObjectGetSegments returns all the segments that compose an object 243// If the object is a Dynamic Large Object (DLO), it just returns the objects 244// that have the prefix as indicated by the manifest. 245// If the object is a Static Large Object (SLO), it retrieves the JSON content 246// of the manifest and return all the segments of it. 247func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) { 248 _, headers, err := c.Object(container, path) 249 if err != nil { 250 return "", nil, err 251 } 252 253 return c.getAllSegments(container, path, headers) 254} 255 256// Seek sets the offset for the next write operation 257func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) { 258 switch whence { 259 case 0: 260 file.filePos = offset 261 case 1: 262 file.filePos += offset 263 case 2: 264 file.filePos = file.currentLength + offset 265 default: 266 return -1, fmt.Errorf("invalid value for whence") 267 } 268 if file.filePos < 0 { 269 return -1, fmt.Errorf("negative offset") 270 } 271 return file.filePos, nil 272} 273 274func (file *largeObjectCreateFile) Size() int64 { 275 return file.currentLength 276} 277 278func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) { 279 endTimer := time.NewTimer(readAfterWriteTimeout) 280 defer endTimer.Stop() 281 waitingTime := readAfterWriteWait 282 for { 283 var headers Headers 284 var sz int64 285 if headers, sz, err = fn(); err == nil { 286 if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz { 287 return 288 } 289 } else { 290 return 291 } 292 waitTimer := time.NewTimer(waitingTime) 293 select { 294 case <-endTimer.C: 295 waitTimer.Stop() 296 err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz) 297 return 298 case <-waitTimer.C: 299 waitingTime *= 2 300 } 301 } 302} 303 304func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) { 305 err = withLORetry(expectedSize, func() (Headers, int64, error) { 306 var info Object 307 var headers Headers 308 info, headers, err = c.objectBase(container, objectName) 309 if err != nil { 310 return headers, 0, err 311 } 312 return headers, info.Bytes, nil 313 }) 314 return 315} 316 317// Write satisfies the io.Writer interface 318func (file *largeObjectCreateFile) Write(buf []byte) (int, error) { 319 var sz int64 320 var relativeFilePos int 321 writeSegmentIdx := 0 322 for i, obj := range file.segments { 323 if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) { 324 relativeFilePos = int(file.filePos - sz) 325 break 326 } 327 writeSegmentIdx++ 328 sz += obj.Bytes 329 } 330 sizeToWrite := len(buf) 331 for offset := 0; offset < sizeToWrite; { 332 newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos) 333 if err != nil { 334 return 0, err 335 } 336 if writeSegmentIdx < len(file.segments) { 337 file.segments[writeSegmentIdx] = *newSegment 338 } else { 339 file.segments = append(file.segments, *newSegment) 340 } 341 offset += n 342 writeSegmentIdx++ 343 relativeFilePos = 0 344 } 345 file.filePos += int64(sizeToWrite) 346 file.currentLength = 0 347 for _, obj := range file.segments { 348 file.currentLength += obj.Bytes 349 } 350 return sizeToWrite, nil 351} 352 353func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) { 354 var ( 355 readers []io.Reader 356 existingSegment *Object 357 segmentSize int 358 ) 359 segmentName := getSegment(file.prefix, writeSegmentIdx+1) 360 sizeToRead := int(file.chunkSize) 361 if writeSegmentIdx < len(file.segments) { 362 existingSegment = &file.segments[writeSegmentIdx] 363 if writeSegmentIdx != len(file.segments)-1 { 364 sizeToRead = int(existingSegment.Bytes) 365 } 366 if relativeFilePos > 0 { 367 headers := make(Headers) 368 headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10) 369 existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers) 370 if err != nil { 371 return nil, 0, err 372 } 373 defer existingSegmentReader.Close() 374 sizeToRead -= relativeFilePos 375 segmentSize += relativeFilePos 376 readers = []io.Reader{existingSegmentReader} 377 } 378 } 379 if sizeToRead > len(buf) { 380 sizeToRead = len(buf) 381 } 382 segmentSize += sizeToRead 383 readers = append(readers, bytes.NewReader(buf[:sizeToRead])) 384 if existingSegment != nil && segmentSize < int(existingSegment.Bytes) { 385 headers := make(Headers) 386 headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-" 387 tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers) 388 if err != nil { 389 return nil, 0, err 390 } 391 defer tailSegmentReader.Close() 392 segmentSize = int(existingSegment.Bytes) 393 readers = append(readers, tailSegmentReader) 394 } 395 segmentReader := io.MultiReader(readers...) 396 headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil) 397 if err != nil { 398 return nil, 0, err 399 } 400 return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil 401} 402 403func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile { 404 if !opts.NoBuffer { 405 return &bufferedLargeObjectFile{ 406 LargeObjectFile: lo, 407 bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)), 408 } 409 } 410 return lo 411} 412 413type bufferedLargeObjectFile struct { 414 LargeObjectFile 415 bw *bufio.Writer 416} 417 418func (blo *bufferedLargeObjectFile) Close() error { 419 err := blo.bw.Flush() 420 if err != nil { 421 return err 422 } 423 return blo.LargeObjectFile.Close() 424} 425 426func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) { 427 return blo.bw.Write(p) 428} 429 430func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) { 431 err := blo.bw.Flush() 432 if err != nil { 433 return 0, err 434 } 435 return blo.LargeObjectFile.Seek(offset, whence) 436} 437 438func (blo *bufferedLargeObjectFile) Size() int64 { 439 return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered()) 440} 441 442func (blo *bufferedLargeObjectFile) Flush() error { 443 err := blo.bw.Flush() 444 if err != nil { 445 return err 446 } 447 return blo.LargeObjectFile.Flush() 448} 449