1package azblob
2
3import (
4	"context"
5	"encoding/base64"
6	"io"
7	"net/http"
8
9	"bytes"
10	"os"
11	"sync"
12	"time"
13
14	"errors"
15
16	"github.com/Azure/azure-pipeline-go/pipeline"
17)
18
19// CommonResponse returns the headers common to all blob REST API responses.
20type CommonResponse interface {
21	// ETag returns the value for header ETag.
22	ETag() ETag
23
24	// LastModified returns the value for header Last-Modified.
25	LastModified() time.Time
26
27	// RequestID returns the value for header x-ms-request-id.
28	RequestID() string
29
30	// Date returns the value for header Date.
31	Date() time.Time
32
33	// Version returns the value for header x-ms-version.
34	Version() string
35
36	// Response returns the raw HTTP response object.
37	Response() *http.Response
38}
39
40// UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions.
41type UploadToBlockBlobOptions struct {
42	// BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes.
43	BlockSize int64
44
45	// Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL.
46	// Note that the progress reporting is not always increasing; it can go down when retrying a request.
47	Progress pipeline.ProgressReceiver
48
49	// BlobHTTPHeaders indicates the HTTP headers to be associated with the blob.
50	BlobHTTPHeaders BlobHTTPHeaders
51
52	// Metadata indicates the metadata to be associated with the blob when PutBlockList is called.
53	Metadata Metadata
54
55	// AccessConditions indicates the access conditions for the block blob.
56	AccessConditions BlobAccessConditions
57
58	// BlobAccessTier indicates the tier of blob
59	BlobAccessTier AccessTierType
60
61	// BlobTagsMap
62	BlobTagsMap BlobTagsMap
63
64	// ClientProvidedKeyOptions indicates the client provided key by name and/or by value to encrypt/decrypt data.
65	ClientProvidedKeyOptions ClientProvidedKeyOptions
66
67	// Parallelism indicates the maximum number of blocks to upload in parallel (0=default)
68	Parallelism uint16
69}
70
71// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
72func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
73	blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
74	if o.BlockSize == 0 {
75		// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
76		if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
77			return nil, errors.New("buffer is too large to upload to a block blob")
78		}
79		// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
80		if readerSize <= BlockBlobMaxUploadBlobBytes {
81			o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
82		} else {
83			o.BlockSize = readerSize / BlockBlobMaxBlocks   // buffer / max blocks = block size to use all 50,000 blocks
84			if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
85				o.BlockSize = BlobDefaultDownloadBlockSize
86			}
87			// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
88		}
89	}
90
91	if readerSize <= BlockBlobMaxUploadBlobBytes {
92		// If the size can fit in 1 Upload call, do it this way
93		var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
94		if o.Progress != nil {
95			body = pipeline.NewRequestBodyProgress(body, o.Progress)
96		}
97		return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
98	}
99
100	var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)
101
102	blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
103	progress := int64(0)
104	progressLock := &sync.Mutex{}
105
106	err := DoBatchTransfer(ctx, BatchTransferOptions{
107		OperationName: "uploadReaderAtToBlockBlob",
108		TransferSize:  readerSize,
109		ChunkSize:     o.BlockSize,
110		Parallelism:   o.Parallelism,
111		Operation: func(offset int64, count int64, ctx context.Context) error {
112			// This function is called once per block.
113			// It is passed this block's offset within the buffer and its count of bytes
114			// Prepare to read the proper block/section of the buffer
115			var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
116			blockNum := offset / o.BlockSize
117			if o.Progress != nil {
118				blockProgress := int64(0)
119				body = pipeline.NewRequestBodyProgress(body,
120					func(bytesTransferred int64) {
121						diff := bytesTransferred - blockProgress
122						blockProgress = bytesTransferred
123						progressLock.Lock() // 1 goroutine at a time gets a progress report
124						progress += diff
125						o.Progress(progress)
126						progressLock.Unlock()
127					})
128			}
129
130			// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
131			// at the same time causing PutBlockList to get a mix of blocks from all the clients.
132			blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
133			_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
134			return err
135		},
136	})
137	if err != nil {
138		return nil, err
139	}
140	// All put blocks were successful, call Put Block List to finalize the blob
141	return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
142}
143
144// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
145func UploadBufferToBlockBlob(ctx context.Context, b []byte,
146	blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
147	return uploadReaderAtToBlockBlob(ctx, bytes.NewReader(b), int64(len(b)), blockBlobURL, o)
148}
149
150// UploadFileToBlockBlob uploads a file in blocks to a block blob.
151func UploadFileToBlockBlob(ctx context.Context, file *os.File,
152	blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
153
154	stat, err := file.Stat()
155	if err != nil {
156		return nil, err
157	}
158	return uploadReaderAtToBlockBlob(ctx, file, stat.Size(), blockBlobURL, o)
159}
160
161///////////////////////////////////////////////////////////////////////////////
162
163const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB
164
165// DownloadFromBlobOptions identifies options used by the DownloadBlobToBuffer and DownloadBlobToFile functions.
166type DownloadFromBlobOptions struct {
167	// BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize.
168	BlockSize int64
169
170	// Progress is a function that is invoked periodically as bytes are received.
171	Progress pipeline.ProgressReceiver
172
173	// AccessConditions indicates the access conditions used when making HTTP GET requests against the blob.
174	AccessConditions BlobAccessConditions
175
176	// ClientProvidedKeyOptions indicates the client provided key by name and/or by value to encrypt/decrypt data.
177	ClientProvidedKeyOptions ClientProvidedKeyOptions
178
179	// Parallelism indicates the maximum number of blocks to download in parallel (0=default)
180	Parallelism uint16
181
182	// RetryReaderOptionsPerBlock is used when downloading each block.
183	RetryReaderOptionsPerBlock RetryReaderOptions
184}
185
186// downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel.
187func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64,
188	writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
189	if o.BlockSize == 0 {
190		o.BlockSize = BlobDefaultDownloadBlockSize
191	}
192
193	if count == CountToEnd { // If size not specified, calculate it
194		if initialDownloadResponse != nil {
195			count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it
196		} else {
197			// If we don't have the length at all, get it
198			dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false, o.ClientProvidedKeyOptions)
199			if err != nil {
200				return err
201			}
202			count = dr.ContentLength() - offset
203		}
204	}
205
206	if count <= 0 {
207		// The file is empty, there is nothing to download.
208		return nil
209	}
210
211	// Prepare and do parallel download.
212	progress := int64(0)
213	progressLock := &sync.Mutex{}
214
215	err := DoBatchTransfer(ctx, BatchTransferOptions{
216		OperationName: "downloadBlobToWriterAt",
217		TransferSize:  count,
218		ChunkSize:     o.BlockSize,
219		Parallelism:   o.Parallelism,
220		Operation: func(chunkStart int64, count int64, ctx context.Context) error {
221			dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false, o.ClientProvidedKeyOptions)
222			if err != nil {
223				return err
224			}
225			body := dr.Body(o.RetryReaderOptionsPerBlock)
226			if o.Progress != nil {
227				rangeProgress := int64(0)
228				body = pipeline.NewResponseBodyProgress(
229					body,
230					func(bytesTransferred int64) {
231						diff := bytesTransferred - rangeProgress
232						rangeProgress = bytesTransferred
233						progressLock.Lock()
234						progress += diff
235						o.Progress(progress)
236						progressLock.Unlock()
237					})
238			}
239			_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body)
240			body.Close()
241			return err
242		},
243	})
244	if err != nil {
245		return err
246	}
247	return nil
248}
249
250// DownloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
251// Offset and count are optional, pass 0 for both to download the entire blob.
252func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
253	b []byte, o DownloadFromBlobOptions) error {
254	return downloadBlobToWriterAt(ctx, blobURL, offset, count, newBytesWriter(b), o, nil)
255}
256
257// DownloadBlobToFile downloads an Azure blob to a local file.
258// The file would be truncated if the size doesn't match.
259// Offset and count are optional, pass 0 for both to download the entire blob.
260func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64,
261	file *os.File, o DownloadFromBlobOptions) error {
262	// 1. Calculate the size of the destination file
263	var size int64
264
265	if count == CountToEnd {
266		// Try to get Azure blob's size
267		props, err := blobURL.GetProperties(ctx, o.AccessConditions, o.ClientProvidedKeyOptions)
268		if err != nil {
269			return err
270		}
271		size = props.ContentLength() - offset
272	} else {
273		size = count
274	}
275
276	// 2. Compare and try to resize local file's size if it doesn't match Azure blob's size.
277	stat, err := file.Stat()
278	if err != nil {
279		return err
280	}
281	if stat.Size() != size {
282		if err = file.Truncate(size); err != nil {
283			return err
284		}
285	}
286
287	if size > 0 {
288		return downloadBlobToWriterAt(ctx, blobURL, offset, size, file, o, nil)
289	} else { // if the blob's size is 0, there is no need in downloading it
290		return nil
291	}
292}
293
294///////////////////////////////////////////////////////////////////////////////
295
296// BatchTransferOptions identifies options used by DoBatchTransfer.
297type BatchTransferOptions struct {
298	TransferSize  int64
299	ChunkSize     int64
300	Parallelism   uint16
301	Operation     func(offset int64, chunkSize int64, ctx context.Context) error
302	OperationName string
303}
304
305// DoBatchTransfer helps to execute operations in a batch manner.
306// Can be used by users to customize batch works (for other scenarios that the SDK does not provide)
307func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error {
308	if o.ChunkSize == 0 {
309		return errors.New("ChunkSize cannot be 0")
310	}
311
312	if o.Parallelism == 0 {
313		o.Parallelism = 5 // default Parallelism
314	}
315
316	// Prepare and do parallel operations.
317	numChunks := uint16(((o.TransferSize - 1) / o.ChunkSize) + 1)
318	operationChannel := make(chan func() error, o.Parallelism) // Create the channel that release 'Parallelism' goroutines concurrently
319	operationResponseChannel := make(chan error, numChunks)    // Holds each response
320	ctx, cancel := context.WithCancel(ctx)
321	defer cancel()
322
323	// Create the goroutines that process each operation (in parallel).
324	for g := uint16(0); g < o.Parallelism; g++ {
325		//grIndex := g
326		go func() {
327			for f := range operationChannel {
328				err := f()
329				operationResponseChannel <- err
330			}
331		}()
332	}
333
334	// Add each chunk's operation to the channel.
335	for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
336		curChunkSize := o.ChunkSize
337
338		if chunkNum == numChunks-1 { // Last chunk
339			curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total
340		}
341		offset := int64(chunkNum) * o.ChunkSize
342
343		operationChannel <- func() error {
344			return o.Operation(offset, curChunkSize, ctx)
345		}
346	}
347	close(operationChannel)
348
349	// Wait for the operations to complete.
350	var firstErr error = nil
351	for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
352		responseError := <-operationResponseChannel
353		// record the first error (the original error which should cause the other chunks to fail with canceled context)
354		if responseError != nil && firstErr == nil {
355			cancel() // As soon as any operation fails, cancel all remaining operation calls
356			firstErr = responseError
357		}
358	}
359	return firstErr
360}
361
362////////////////////////////////////////////////////////////////////////////////////////////////
363
364const _1MiB = 1024 * 1024
365
366type UploadStreamToBlockBlobOptions struct {
367	// BufferSize sizes the buffer used to read data from source. If < 1 MiB, defaults to 1 MiB.
368	BufferSize int
369	// MaxBuffers defines the number of simultaneous uploads will be performed to upload the file.
370	MaxBuffers               int
371	BlobHTTPHeaders          BlobHTTPHeaders
372	Metadata                 Metadata
373	AccessConditions         BlobAccessConditions
374	BlobAccessTier           AccessTierType
375	BlobTagsMap              BlobTagsMap
376	ClientProvidedKeyOptions ClientProvidedKeyOptions
377}
378
379func (u *UploadStreamToBlockBlobOptions) defaults() {
380	if u.MaxBuffers == 0 {
381		u.MaxBuffers = 1
382	}
383
384	if u.BufferSize < _1MiB {
385		u.BufferSize = _1MiB
386	}
387}
388
389// UploadStreamToBlockBlob copies the file held in io.Reader to the Blob at blockBlobURL.
390// A Context deadline or cancellation will cause this to error.
391func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL,
392	o UploadStreamToBlockBlobOptions) (CommonResponse, error) {
393	o.defaults()
394
395	result, err := copyFromReader(ctx, reader, blockBlobURL, o)
396	if err != nil {
397		return nil, err
398	}
399
400	return result, nil
401}
402
403// UploadStreamOptions (defunct) was used internally. This will be removed or made private in a future version.
404type UploadStreamOptions struct {
405	BufferSize int
406	MaxBuffers int
407}
408