1package swift
2
3import (
4	"bufio"
5	"bytes"
6	"crypto/rand"
7	"crypto/sha1"
8	"encoding/hex"
9	"errors"
10	"fmt"
11	"io"
12	"os"
13	gopath "path"
14	"strconv"
15	"strings"
16	"time"
17)
18
19// NotLargeObject is returned if an operation is performed on an object which isn't large.
20var NotLargeObject = errors.New("Not a large object")
21
22// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
23var readAfterWriteTimeout = 15 * time.Second
24
25// readAfterWriteWait defines the time to sleep between two retries
26var readAfterWriteWait = 200 * time.Millisecond
27
28// largeObjectCreateFile represents an open static or dynamic large object
29type largeObjectCreateFile struct {
30	conn             *Connection
31	container        string
32	objectName       string
33	currentLength    int64
34	filePos          int64
35	chunkSize        int64
36	segmentContainer string
37	prefix           string
38	contentType      string
39	checkHash        bool
40	segments         []Object
41	headers          Headers
42	minChunkSize     int64
43}
44
45func swiftSegmentPath(path string) (string, error) {
46	checksum := sha1.New()
47	random := make([]byte, 32)
48	if _, err := rand.Read(random); err != nil {
49		return "", err
50	}
51	path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
52	return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
53}
54
55func getSegment(segmentPath string, partNumber int) string {
56	return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
57}
58
59func parseFullPath(manifest string) (container string, prefix string) {
60	components := strings.SplitN(manifest, "/", 2)
61	container = components[0]
62	if len(components) > 1 {
63		prefix = components[1]
64	}
65	return container, prefix
66}
67
68func (headers Headers) IsLargeObjectDLO() bool {
69	_, isDLO := headers["X-Object-Manifest"]
70	return isDLO
71}
72
73func (headers Headers) IsLargeObjectSLO() bool {
74	_, isSLO := headers["X-Static-Large-Object"]
75	return isSLO
76}
77
78func (headers Headers) IsLargeObject() bool {
79	return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
80}
81
82func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
83	if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
84		segmentContainer, segmentPath := parseFullPath(manifest)
85		segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
86		return segmentContainer, segments, err
87	}
88	if headers.IsLargeObjectSLO() {
89		return c.getAllSLOSegments(container, path)
90	}
91	return "", nil, NotLargeObject
92}
93
94// LargeObjectOpts describes how a large object should be created
95type LargeObjectOpts struct {
96	Container        string  // Name of container to place object
97	ObjectName       string  // Name of object
98	Flags            int     // Creation flags
99	CheckHash        bool    // If set Check the hash
100	Hash             string  // If set use this hash to check
101	ContentType      string  // Content-Type of the object
102	Headers          Headers // Additional headers to upload the object with
103	ChunkSize        int64   // Size of chunks of the object, defaults to 10MB if not set
104	MinChunkSize     int64   // Minimum chunk size, automatically set for SLO's based on info
105	SegmentContainer string  // Name of the container to place segments
106	SegmentPrefix    string  // Prefix to use for the segments
107	NoBuffer         bool    // Prevents using a bufio.Writer to write segments
108}
109
110type LargeObjectFile interface {
111	io.Writer
112	io.Seeker
113	io.Closer
114	Size() int64
115	Flush() error
116}
117
118// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
119//
120// opts.Flags can have the following bits set
121//   os.TRUNC  - remove the contents of the large object if it exists
122//   os.APPEND - write at the end of the large object
123func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
124	var (
125		segmentPath      string
126		segmentContainer string
127		segments         []Object
128		currentLength    int64
129		err              error
130	)
131
132	if opts.SegmentPrefix != "" {
133		segmentPath = opts.SegmentPrefix
134	} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
135		return nil, err
136	}
137
138	if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
139		if opts.Flags&os.O_TRUNC != 0 {
140			c.LargeObjectDelete(opts.Container, opts.ObjectName)
141		} else {
142			currentLength = info.Bytes
143			if headers.IsLargeObject() {
144				segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
145				if err != nil {
146					return nil, err
147				}
148				if len(segments) > 0 {
149					segmentPath = gopath.Dir(segments[0].Name)
150				}
151			} else {
152				if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
153					return nil, err
154				}
155				segments = append(segments, info)
156			}
157		}
158	} else if err != ObjectNotFound {
159		return nil, err
160	}
161
162	// segmentContainer is not empty when the manifest already existed
163	if segmentContainer == "" {
164		if opts.SegmentContainer != "" {
165			segmentContainer = opts.SegmentContainer
166		} else {
167			segmentContainer = opts.Container + "_segments"
168		}
169	}
170
171	file := &largeObjectCreateFile{
172		conn:             c,
173		checkHash:        opts.CheckHash,
174		container:        opts.Container,
175		objectName:       opts.ObjectName,
176		chunkSize:        opts.ChunkSize,
177		minChunkSize:     opts.MinChunkSize,
178		headers:          opts.Headers,
179		segmentContainer: segmentContainer,
180		prefix:           segmentPath,
181		segments:         segments,
182		currentLength:    currentLength,
183	}
184
185	if file.chunkSize == 0 {
186		file.chunkSize = 10 * 1024 * 1024
187	}
188
189	if file.minChunkSize > file.chunkSize {
190		file.chunkSize = file.minChunkSize
191	}
192
193	if opts.Flags&os.O_APPEND != 0 {
194		file.filePos = currentLength
195	}
196
197	return file, nil
198}
199
200// LargeObjectDelete deletes the large object named by container, path
201func (c *Connection) LargeObjectDelete(container string, objectName string) error {
202	_, headers, err := c.Object(container, objectName)
203	if err != nil {
204		return err
205	}
206
207	var objects [][]string
208	if headers.IsLargeObject() {
209		segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
210		if err != nil {
211			return err
212		}
213		for _, obj := range segments {
214			objects = append(objects, []string{segmentContainer, obj.Name})
215		}
216	}
217	objects = append(objects, []string{container, objectName})
218
219	info, err := c.cachedQueryInfo()
220	if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
221		filenames := make([]string, len(objects))
222		for i, obj := range objects {
223			filenames[i] = obj[0] + "/" + obj[1]
224		}
225		_, err = c.doBulkDelete(filenames)
226		// Don't fail on ObjectNotFound because eventual consistency
227		// makes this situation normal.
228		if err != nil && err != Forbidden && err != ObjectNotFound {
229			return err
230		}
231	} else {
232		for _, obj := range objects {
233			if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
234				return err
235			}
236		}
237	}
238
239	return nil
240}
241
242// LargeObjectGetSegments returns all the segments that compose an object
243// If the object is a Dynamic Large Object (DLO), it just returns the objects
244// that have the prefix as indicated by the manifest.
245// If the object is a Static Large Object (SLO), it retrieves the JSON content
246// of the manifest and return all the segments of it.
247func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
248	_, headers, err := c.Object(container, path)
249	if err != nil {
250		return "", nil, err
251	}
252
253	return c.getAllSegments(container, path, headers)
254}
255
256// Seek sets the offset for the next write operation
257func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
258	switch whence {
259	case 0:
260		file.filePos = offset
261	case 1:
262		file.filePos += offset
263	case 2:
264		file.filePos = file.currentLength + offset
265	default:
266		return -1, fmt.Errorf("invalid value for whence")
267	}
268	if file.filePos < 0 {
269		return -1, fmt.Errorf("negative offset")
270	}
271	return file.filePos, nil
272}
273
274func (file *largeObjectCreateFile) Size() int64 {
275	return file.currentLength
276}
277
278func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
279	endTimer := time.NewTimer(readAfterWriteTimeout)
280	defer endTimer.Stop()
281	waitingTime := readAfterWriteWait
282	for {
283		var headers Headers
284		var sz int64
285		if headers, sz, err = fn(); err == nil {
286			if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
287				return
288			}
289		} else {
290			return
291		}
292		waitTimer := time.NewTimer(waitingTime)
293		select {
294		case <-endTimer.C:
295			waitTimer.Stop()
296			err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
297			return
298		case <-waitTimer.C:
299			waitingTime *= 2
300		}
301	}
302}
303
304func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
305	err = withLORetry(expectedSize, func() (Headers, int64, error) {
306		var info Object
307		var headers Headers
308		info, headers, err = c.objectBase(container, objectName)
309		if err != nil {
310			return headers, 0, err
311		}
312		return headers, info.Bytes, nil
313	})
314	return
315}
316
317// Write satisfies the io.Writer interface
318func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
319	var sz int64
320	var relativeFilePos int
321	writeSegmentIdx := 0
322	for i, obj := range file.segments {
323		if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
324			relativeFilePos = int(file.filePos - sz)
325			break
326		}
327		writeSegmentIdx++
328		sz += obj.Bytes
329	}
330	sizeToWrite := len(buf)
331	for offset := 0; offset < sizeToWrite; {
332		newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
333		if err != nil {
334			return 0, err
335		}
336		if writeSegmentIdx < len(file.segments) {
337			file.segments[writeSegmentIdx] = *newSegment
338		} else {
339			file.segments = append(file.segments, *newSegment)
340		}
341		offset += n
342		writeSegmentIdx++
343		relativeFilePos = 0
344	}
345	file.filePos += int64(sizeToWrite)
346	file.currentLength = 0
347	for _, obj := range file.segments {
348		file.currentLength += obj.Bytes
349	}
350	return sizeToWrite, nil
351}
352
353func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
354	var (
355		readers         []io.Reader
356		existingSegment *Object
357		segmentSize     int
358	)
359	segmentName := getSegment(file.prefix, writeSegmentIdx+1)
360	sizeToRead := int(file.chunkSize)
361	if writeSegmentIdx < len(file.segments) {
362		existingSegment = &file.segments[writeSegmentIdx]
363		if writeSegmentIdx != len(file.segments)-1 {
364			sizeToRead = int(existingSegment.Bytes)
365		}
366		if relativeFilePos > 0 {
367			headers := make(Headers)
368			headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
369			existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
370			if err != nil {
371				return nil, 0, err
372			}
373			defer existingSegmentReader.Close()
374			sizeToRead -= relativeFilePos
375			segmentSize += relativeFilePos
376			readers = []io.Reader{existingSegmentReader}
377		}
378	}
379	if sizeToRead > len(buf) {
380		sizeToRead = len(buf)
381	}
382	segmentSize += sizeToRead
383	readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
384	if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
385		headers := make(Headers)
386		headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
387		tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
388		if err != nil {
389			return nil, 0, err
390		}
391		defer tailSegmentReader.Close()
392		segmentSize = int(existingSegment.Bytes)
393		readers = append(readers, tailSegmentReader)
394	}
395	segmentReader := io.MultiReader(readers...)
396	headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
397	if err != nil {
398		return nil, 0, err
399	}
400	return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
401}
402
403func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
404	if !opts.NoBuffer {
405		return &bufferedLargeObjectFile{
406			LargeObjectFile: lo,
407			bw:              bufio.NewWriterSize(lo, int(opts.ChunkSize)),
408		}
409	}
410	return lo
411}
412
413type bufferedLargeObjectFile struct {
414	LargeObjectFile
415	bw *bufio.Writer
416}
417
418func (blo *bufferedLargeObjectFile) Close() error {
419	err := blo.bw.Flush()
420	if err != nil {
421		return err
422	}
423	return blo.LargeObjectFile.Close()
424}
425
426func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
427	return blo.bw.Write(p)
428}
429
430func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
431	err := blo.bw.Flush()
432	if err != nil {
433		return 0, err
434	}
435	return blo.LargeObjectFile.Seek(offset, whence)
436}
437
438func (blo *bufferedLargeObjectFile) Size() int64 {
439	return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
440}
441
442func (blo *bufferedLargeObjectFile) Flush() error {
443	err := blo.bw.Flush()
444	if err != nil {
445		return err
446	}
447	return blo.LargeObjectFile.Flush()
448}
449