1package manager 2 3import ( 4 "io" 5 "sync" 6) 7 8// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the 9// SDK to accept an io.Reader that is not also an io.Seeker for unsigned 10// streaming payload API operations. 11// 12// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's 13// input will prevent that operation being retried in the case of 14// network errors, and cause operation requests to fail if yhe operation 15// requires payload signing. 16// 17// Note: If using with S3 PutObject to stream an object upload. The SDK's S3 18// Upload Manager(s3manager.Uploader) provides support for streaming 19// with the ability to retry network errors. 20func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser { 21 return &ReaderSeekerCloser{r} 22} 23 24// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and 25// io.Closer interfaces to the underlying object if they are available. 26type ReaderSeekerCloser struct { 27 r io.Reader 28} 29 30// seekerLen attempts to get the number of bytes remaining at the seeker's 31// current position. Returns the number of bytes remaining or error. 32func seekerLen(s io.Seeker) (int64, error) { 33 // Determine if the seeker is actually seekable. ReaderSeekerCloser 34 // hides the fact that a io.Readers might not actually be seekable. 35 switch v := s.(type) { 36 case *ReaderSeekerCloser: 37 return v.GetLen() 38 } 39 40 return computeSeekerLength(s) 41} 42 43// GetLen returns the length of the bytes remaining in the underlying reader. 44// Checks first for Len(), then io.Seeker to determine the size of the 45// underlying reader. 46// 47// Will return -1 if the length cannot be determined. 48func (r *ReaderSeekerCloser) GetLen() (int64, error) { 49 if l, ok := r.HasLen(); ok { 50 return int64(l), nil 51 } 52 53 if s, ok := r.r.(io.Seeker); ok { 54 return computeSeekerLength(s) 55 } 56 57 return -1, nil 58} 59 60func computeSeekerLength(s io.Seeker) (int64, error) { 61 curOffset, err := s.Seek(0, io.SeekCurrent) 62 if err != nil { 63 return 0, err 64 } 65 66 endOffset, err := s.Seek(0, io.SeekEnd) 67 if err != nil { 68 return 0, err 69 } 70 71 _, err = s.Seek(curOffset, io.SeekStart) 72 if err != nil { 73 return 0, err 74 } 75 76 return endOffset - curOffset, nil 77} 78 79// HasLen returns the length of the underlying reader if the value implements 80// the Len() int method. 81func (r *ReaderSeekerCloser) HasLen() (int, bool) { 82 type lenner interface { 83 Len() int 84 } 85 86 if lr, ok := r.r.(lenner); ok { 87 return lr.Len(), true 88 } 89 90 return 0, false 91} 92 93// Read reads from the reader up to size of p. The number of bytes read, and 94// error if it occurred will be returned. 95// 96// If the reader is not an io.Reader zero bytes read, and nil error will be 97// returned. 98// 99// Performs the same functionality as io.Reader Read 100func (r *ReaderSeekerCloser) Read(p []byte) (int, error) { 101 switch t := r.r.(type) { 102 case io.Reader: 103 return t.Read(p) 104 } 105 return 0, nil 106} 107 108// Seek sets the offset for the next Read to offset, interpreted according to 109// whence: 0 means relative to the origin of the file, 1 means relative to the 110// current offset, and 2 means relative to the end. Seek returns the new offset 111// and an error, if any. 112// 113// If the ReaderSeekerCloser is not an io.Seeker nothing will be done. 114func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) { 115 switch t := r.r.(type) { 116 case io.Seeker: 117 return t.Seek(offset, whence) 118 } 119 return int64(0), nil 120} 121 122// IsSeeker returns if the underlying reader is also a seeker. 123func (r *ReaderSeekerCloser) IsSeeker() bool { 124 _, ok := r.r.(io.Seeker) 125 return ok 126} 127 128// Close closes the ReaderSeekerCloser. 129// 130// If the ReaderSeekerCloser is not an io.Closer nothing will be done. 131func (r *ReaderSeekerCloser) Close() error { 132 switch t := r.r.(type) { 133 case io.Closer: 134 return t.Close() 135 } 136 return nil 137} 138 139// A WriteAtBuffer provides a in memory buffer supporting the io.WriterAt interface 140// Can be used with the s3manager.Downloader to download content to a buffer 141// in memory. Safe to use concurrently. 142type WriteAtBuffer struct { 143 buf []byte 144 m sync.Mutex 145 146 // GrowthCoeff defines the growth rate of the internal buffer. By 147 // default, the growth rate is 1, where expanding the internal 148 // buffer will allocate only enough capacity to fit the new expected 149 // length. 150 GrowthCoeff float64 151} 152 153// NewWriteAtBuffer creates a WriteAtBuffer with an internal buffer 154// provided by buf. 155func NewWriteAtBuffer(buf []byte) *WriteAtBuffer { 156 return &WriteAtBuffer{buf: buf} 157} 158 159// WriteAt writes a slice of bytes to a buffer starting at the position provided 160// The number of bytes written will be returned, or error. Can overwrite previous 161// written slices if the write ats overlap. 162func (b *WriteAtBuffer) WriteAt(p []byte, pos int64) (n int, err error) { 163 pLen := len(p) 164 expLen := pos + int64(pLen) 165 b.m.Lock() 166 defer b.m.Unlock() 167 if int64(len(b.buf)) < expLen { 168 if int64(cap(b.buf)) < expLen { 169 if b.GrowthCoeff < 1 { 170 b.GrowthCoeff = 1 171 } 172 newBuf := make([]byte, expLen, int64(b.GrowthCoeff*float64(expLen))) 173 copy(newBuf, b.buf) 174 b.buf = newBuf 175 } 176 b.buf = b.buf[:expLen] 177 } 178 copy(b.buf[pos:], p) 179 return pLen, nil 180} 181 182// Bytes returns a slice of bytes written to the buffer. 183func (b *WriteAtBuffer) Bytes() []byte { 184 b.m.Lock() 185 defer b.m.Unlock() 186 return b.buf 187} 188