1package manager
2
3import (
4	"io"
5	"sync"
6)
7
8// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the
9// SDK to accept an io.Reader that is not also an io.Seeker for unsigned
10// streaming payload API operations.
11//
12// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's
13// input will prevent that operation being retried in the case of
14// network errors, and cause operation requests to fail if yhe operation
15// requires payload signing.
16//
17// Note: If using with S3 PutObject to stream an object upload. The SDK's S3
18// Upload Manager(s3manager.Uploader) provides support for streaming
19// with the ability to retry network errors.
20func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser {
21	return &ReaderSeekerCloser{r}
22}
23
24// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and
25// io.Closer interfaces to the underlying object if they are available.
26type ReaderSeekerCloser struct {
27	r io.Reader
28}
29
30// seekerLen attempts to get the number of bytes remaining at the seeker's
31// current position.  Returns the number of bytes remaining or error.
32func seekerLen(s io.Seeker) (int64, error) {
33	// Determine if the seeker is actually seekable. ReaderSeekerCloser
34	// hides the fact that a io.Readers might not actually be seekable.
35	switch v := s.(type) {
36	case *ReaderSeekerCloser:
37		return v.GetLen()
38	}
39
40	return computeSeekerLength(s)
41}
42
43// GetLen returns the length of the bytes remaining in the underlying reader.
44// Checks first for Len(), then io.Seeker to determine the size of the
45// underlying reader.
46//
47// Will return -1 if the length cannot be determined.
48func (r *ReaderSeekerCloser) GetLen() (int64, error) {
49	if l, ok := r.HasLen(); ok {
50		return int64(l), nil
51	}
52
53	if s, ok := r.r.(io.Seeker); ok {
54		return computeSeekerLength(s)
55	}
56
57	return -1, nil
58}
59
60func computeSeekerLength(s io.Seeker) (int64, error) {
61	curOffset, err := s.Seek(0, io.SeekCurrent)
62	if err != nil {
63		return 0, err
64	}
65
66	endOffset, err := s.Seek(0, io.SeekEnd)
67	if err != nil {
68		return 0, err
69	}
70
71	_, err = s.Seek(curOffset, io.SeekStart)
72	if err != nil {
73		return 0, err
74	}
75
76	return endOffset - curOffset, nil
77}
78
79// HasLen returns the length of the underlying reader if the value implements
80// the Len() int method.
81func (r *ReaderSeekerCloser) HasLen() (int, bool) {
82	type lenner interface {
83		Len() int
84	}
85
86	if lr, ok := r.r.(lenner); ok {
87		return lr.Len(), true
88	}
89
90	return 0, false
91}
92
93// Read reads from the reader up to size of p. The number of bytes read, and
94// error if it occurred will be returned.
95//
96// If the reader is not an io.Reader zero bytes read, and nil error will be
97// returned.
98//
99// Performs the same functionality as io.Reader Read
100func (r *ReaderSeekerCloser) Read(p []byte) (int, error) {
101	switch t := r.r.(type) {
102	case io.Reader:
103		return t.Read(p)
104	}
105	return 0, nil
106}
107
108// Seek sets the offset for the next Read to offset, interpreted according to
109// whence: 0 means relative to the origin of the file, 1 means relative to the
110// current offset, and 2 means relative to the end. Seek returns the new offset
111// and an error, if any.
112//
113// If the ReaderSeekerCloser is not an io.Seeker nothing will be done.
114func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) {
115	switch t := r.r.(type) {
116	case io.Seeker:
117		return t.Seek(offset, whence)
118	}
119	return int64(0), nil
120}
121
122// IsSeeker returns if the underlying reader is also a seeker.
123func (r *ReaderSeekerCloser) IsSeeker() bool {
124	_, ok := r.r.(io.Seeker)
125	return ok
126}
127
128// Close closes the ReaderSeekerCloser.
129//
130// If the ReaderSeekerCloser is not an io.Closer nothing will be done.
131func (r *ReaderSeekerCloser) Close() error {
132	switch t := r.r.(type) {
133	case io.Closer:
134		return t.Close()
135	}
136	return nil
137}
138
139// A WriteAtBuffer provides a in memory buffer supporting the io.WriterAt interface
140// Can be used with the s3manager.Downloader to download content to a buffer
141// in memory. Safe to use concurrently.
142type WriteAtBuffer struct {
143	buf []byte
144	m   sync.Mutex
145
146	// GrowthCoeff defines the growth rate of the internal buffer. By
147	// default, the growth rate is 1, where expanding the internal
148	// buffer will allocate only enough capacity to fit the new expected
149	// length.
150	GrowthCoeff float64
151}
152
153// NewWriteAtBuffer creates a WriteAtBuffer with an internal buffer
154// provided by buf.
155func NewWriteAtBuffer(buf []byte) *WriteAtBuffer {
156	return &WriteAtBuffer{buf: buf}
157}
158
159// WriteAt writes a slice of bytes to a buffer starting at the position provided
160// The number of bytes written will be returned, or error. Can overwrite previous
161// written slices if the write ats overlap.
162func (b *WriteAtBuffer) WriteAt(p []byte, pos int64) (n int, err error) {
163	pLen := len(p)
164	expLen := pos + int64(pLen)
165	b.m.Lock()
166	defer b.m.Unlock()
167	if int64(len(b.buf)) < expLen {
168		if int64(cap(b.buf)) < expLen {
169			if b.GrowthCoeff < 1 {
170				b.GrowthCoeff = 1
171			}
172			newBuf := make([]byte, expLen, int64(b.GrowthCoeff*float64(expLen)))
173			copy(newBuf, b.buf)
174			b.buf = newBuf
175		}
176		b.buf = b.buf[:expLen]
177	}
178	copy(b.buf[pos:], p)
179	return pLen, nil
180}
181
182// Bytes returns a slice of bytes written to the buffer.
183func (b *WriteAtBuffer) Bytes() []byte {
184	b.m.Lock()
185	defer b.m.Unlock()
186	return b.buf
187}
188