1package azblob
2
3import (
4	"context"
5	"encoding/base64"
6	"io"
7	"net/http"
8
9	"bytes"
10	"os"
11	"sync"
12	"time"
13
14	"errors"
15
16	"github.com/Azure/azure-pipeline-go/pipeline"
17)
18
19// CommonResponse returns the headers common to all blob REST API responses.
20type CommonResponse interface {
21	// ETag returns the value for header ETag.
22	ETag() ETag
23
24	// LastModified returns the value for header Last-Modified.
25	LastModified() time.Time
26
27	// RequestID returns the value for header x-ms-request-id.
28	RequestID() string
29
30	// Date returns the value for header Date.
31	Date() time.Time
32
33	// Version returns the value for header x-ms-version.
34	Version() string
35
36	// Response returns the raw HTTP response object.
37	Response() *http.Response
38}
39
40// UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions.
41type UploadToBlockBlobOptions struct {
42	// BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes.
43	BlockSize int64
44
45	// Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL.
46	// Note that the progress reporting is not always increasing; it can go down when retrying a request.
47	Progress pipeline.ProgressReceiver
48
49	// BlobHTTPHeaders indicates the HTTP headers to be associated with the blob.
50	BlobHTTPHeaders BlobHTTPHeaders
51
52	// Metadata indicates the metadata to be associated with the blob when PutBlockList is called.
53	Metadata Metadata
54
55	// AccessConditions indicates the access conditions for the block blob.
56	AccessConditions BlobAccessConditions
57
58	// Parallelism indicates the maximum number of blocks to upload in parallel (0=default)
59	Parallelism uint16
60}
61
62// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
63func UploadBufferToBlockBlob(ctx context.Context, b []byte,
64	blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
65	bufferSize := int64(len(b))
66	if o.BlockSize == 0 {
67		// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
68		if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
69			return nil, errors.New("Buffer is too large to upload to a block blob")
70		}
71		// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
72		if bufferSize <= BlockBlobMaxUploadBlobBytes {
73			o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
74		} else {
75			o.BlockSize = bufferSize / BlockBlobMaxBlocks   // buffer / max blocks = block size to use all 50,000 blocks
76			if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
77				o.BlockSize = BlobDefaultDownloadBlockSize
78			}
79			// StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize).
80		}
81	}
82
83	if bufferSize <= BlockBlobMaxUploadBlobBytes {
84		// If the size can fit in 1 Upload call, do it this way
85		var body io.ReadSeeker = bytes.NewReader(b)
86		if o.Progress != nil {
87			body = pipeline.NewRequestBodyProgress(body, o.Progress)
88		}
89		return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
90	}
91
92	var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1)
93
94	blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
95	progress := int64(0)
96	progressLock := &sync.Mutex{}
97
98	err := doBatchTransfer(ctx, batchTransferOptions{
99		operationName: "UploadBufferToBlockBlob",
100		transferSize:  bufferSize,
101		chunkSize:     o.BlockSize,
102		parallelism:   o.Parallelism,
103		operation: func(offset int64, count int64) error {
104			// This function is called once per block.
105			// It is passed this block's offset within the buffer and its count of bytes
106			// Prepare to read the proper block/section of the buffer
107			var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count])
108			blockNum := offset / o.BlockSize
109			if o.Progress != nil {
110				blockProgress := int64(0)
111				body = pipeline.NewRequestBodyProgress(body,
112					func(bytesTransferred int64) {
113						diff := bytesTransferred - blockProgress
114						blockProgress = bytesTransferred
115						progressLock.Lock() // 1 goroutine at a time gets a progress report
116						progress += diff
117						o.Progress(progress)
118						progressLock.Unlock()
119					})
120			}
121
122			// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
123			// at the same time causing PutBlockList to get a mix of blocks from all the clients.
124			blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
125			_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil)
126			return err
127		},
128	})
129	if err != nil {
130		return nil, err
131	}
132	// All put blocks were successful, call Put Block List to finalize the blob
133	return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
134}
135
136// UploadFileToBlockBlob uploads a file in blocks to a block blob.
137func UploadFileToBlockBlob(ctx context.Context, file *os.File,
138	blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
139
140	stat, err := file.Stat()
141	if err != nil {
142		return nil, err
143	}
144	m := mmf{} // Default to an empty slice; used for 0-size file
145	if stat.Size() != 0 {
146		m, err = newMMF(file, false, 0, int(stat.Size()))
147		if err != nil {
148			return nil, err
149		}
150		defer m.unmap()
151	}
152	return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o)
153}
154
155///////////////////////////////////////////////////////////////////////////////
156
157const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB
158
159// DownloadFromBlobOptions identifies options used by the DownloadBlobToBuffer and DownloadBlobToFile functions.
160type DownloadFromBlobOptions struct {
161	// BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize.
162	BlockSize int64
163
164	// Progress is a function that is invoked periodically as bytes are received.
165	Progress pipeline.ProgressReceiver
166
167	// AccessConditions indicates the access conditions used when making HTTP GET requests against the blob.
168	AccessConditions BlobAccessConditions
169
170	// Parallelism indicates the maximum number of blocks to download in parallel (0=default)
171	Parallelism uint16
172
173	// RetryReaderOptionsPerBlock is used when downloading each block.
174	RetryReaderOptionsPerBlock RetryReaderOptions
175}
176
177// downloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
178func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
179	b []byte, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
180	if o.BlockSize == 0 {
181		o.BlockSize = BlobDefaultDownloadBlockSize
182	}
183
184	if count == CountToEnd { // If size not specified, calculate it
185		if initialDownloadResponse != nil {
186			count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it
187		} else {
188			// If we don't have the length at all, get it
189			dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false)
190			if err != nil {
191				return err
192			}
193			count = dr.ContentLength() - offset
194		}
195	}
196
197	// Prepare and do parallel download.
198	progress := int64(0)
199	progressLock := &sync.Mutex{}
200
201	err := doBatchTransfer(ctx, batchTransferOptions{
202		operationName: "downloadBlobToBuffer",
203		transferSize:  count,
204		chunkSize:     o.BlockSize,
205		parallelism:   o.Parallelism,
206		operation: func(chunkStart int64, count int64) error {
207			dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false)
208			if err != nil {
209				return err
210			}
211			body := dr.Body(o.RetryReaderOptionsPerBlock)
212			if o.Progress != nil {
213				rangeProgress := int64(0)
214				body = pipeline.NewResponseBodyProgress(
215					body,
216					func(bytesTransferred int64) {
217						diff := bytesTransferred - rangeProgress
218						rangeProgress = bytesTransferred
219						progressLock.Lock()
220						progress += diff
221						o.Progress(progress)
222						progressLock.Unlock()
223					})
224			}
225			_, err = io.ReadFull(body, b[chunkStart:chunkStart+count])
226			body.Close()
227			return err
228		},
229	})
230	if err != nil {
231		return err
232	}
233	return nil
234}
235
236// DownloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
237// Offset and count are optional, pass 0 for both to download the entire blob.
238func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
239	b []byte, o DownloadFromBlobOptions) error {
240	return downloadBlobToBuffer(ctx, blobURL, offset, count, b, o, nil)
241}
242
243// DownloadBlobToFile downloads an Azure blob to a local file.
244// The file would be truncated if the size doesn't match.
245// Offset and count are optional, pass 0 for both to download the entire blob.
246func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64,
247	file *os.File, o DownloadFromBlobOptions) error {
248	// 1. Calculate the size of the destination file
249	var size int64
250
251	if count == CountToEnd {
252		// Try to get Azure blob's size
253		props, err := blobURL.GetProperties(ctx, o.AccessConditions)
254		if err != nil {
255			return err
256		}
257		size = props.ContentLength() - offset
258	} else {
259		size = count
260	}
261
262	// 2. Compare and try to resize local file's size if it doesn't match Azure blob's size.
263	stat, err := file.Stat()
264	if err != nil {
265		return err
266	}
267	if stat.Size() != size {
268		if err = file.Truncate(size); err != nil {
269			return err
270		}
271	}
272
273	if size > 0 {
274		// 3. Set mmap and call downloadBlobToBuffer.
275		m, err := newMMF(file, true, 0, int(size))
276		if err != nil {
277			return err
278		}
279		defer m.unmap()
280		return downloadBlobToBuffer(ctx, blobURL, offset, size, m, o, nil)
281	} else { // if the blob's size is 0, there is no need in downloading it
282		return nil
283	}
284}
285
286///////////////////////////////////////////////////////////////////////////////
287
288// BatchTransferOptions identifies options used by doBatchTransfer.
289type batchTransferOptions struct {
290	transferSize  int64
291	chunkSize     int64
292	parallelism   uint16
293	operation     func(offset int64, chunkSize int64) error
294	operationName string
295}
296
297// doBatchTransfer helps to execute operations in a batch manner.
298func doBatchTransfer(ctx context.Context, o batchTransferOptions) error {
299	// Prepare and do parallel operations.
300	numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1)
301	operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently
302	operationResponseChannel := make(chan error, numChunks)    // Holds each response
303	ctx, cancel := context.WithCancel(ctx)
304	defer cancel()
305
306	// Create the goroutines that process each operation (in parallel).
307	if o.parallelism == 0 {
308		o.parallelism = 5 // default parallelism
309	}
310	for g := uint16(0); g < o.parallelism; g++ {
311		//grIndex := g
312		go func() {
313			for f := range operationChannel {
314				//fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex)
315				err := f()
316				operationResponseChannel <- err
317				//fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex)
318			}
319		}()
320	}
321
322	// Add each chunk's operation to the channel.
323	for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
324		curChunkSize := o.chunkSize
325
326		if chunkNum == numChunks-1 { // Last chunk
327			curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total
328		}
329		offset := int64(chunkNum) * o.chunkSize
330
331		operationChannel <- func() error {
332			return o.operation(offset, curChunkSize)
333		}
334	}
335	close(operationChannel)
336
337	// Wait for the operations to complete.
338	for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
339		responseError := <-operationResponseChannel
340		if responseError != nil {
341			cancel()             // As soon as any operation fails, cancel all remaining operation calls
342			return responseError // No need to process anymore responses
343		}
344	}
345	return nil
346}
347
348////////////////////////////////////////////////////////////////////////////////////////////////
349
350type UploadStreamToBlockBlobOptions struct {
351	BufferSize       int
352	MaxBuffers       int
353	BlobHTTPHeaders  BlobHTTPHeaders
354	Metadata         Metadata
355	AccessConditions BlobAccessConditions
356}
357
358func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL,
359	o UploadStreamToBlockBlobOptions) (CommonResponse, error) {
360	result, err := uploadStream(ctx, reader,
361		UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers},
362		&uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()})
363	if err != nil {
364		return nil, err
365	}
366	return result.(CommonResponse), nil
367}
368
369type uploadStreamToBlockBlobOptions struct {
370	b             BlockBlobURL
371	o             UploadStreamToBlockBlobOptions
372	blockIDPrefix uuid   // UUID used with all blockIDs
373	maxBlockNum   uint32 // defaults to 0
374	firstBlock    []byte // Used only if maxBlockNum is 0
375}
376
377func (t *uploadStreamToBlockBlobOptions) start(ctx context.Context) (interface{}, error) {
378	return nil, nil
379}
380
381func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32, buffer []byte) error {
382	if num == 0 {
383		t.firstBlock = buffer
384
385		// If whole payload fits in 1 block, don't stage it; End will upload it with 1 I/O operation
386		// If the payload is exactly the same size as the buffer, there may be more content coming in.
387		if len(buffer) < t.o.BufferSize {
388			return nil
389		}
390	}
391	// Else, upload a staged block...
392	atomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
393		// Atomically remember (in t.numBlocks) the maximum block num we've ever seen
394		if startVal < num {
395			return num, nil
396		}
397		return startVal, nil
398	})
399	blockID := newUuidBlockID(t.blockIDPrefix).WithBlockNumber(num).ToBase64()
400	_, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{}, nil)
401	return err
402}
403
404func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{}, error) {
405	// If the first block had the exact same size as the buffer
406	// we would have staged it as a block thinking that there might be more data coming
407	if t.maxBlockNum == 0 && len(t.firstBlock) != t.o.BufferSize {
408		// If whole payload fits in 1 block (block #0), upload it with 1 I/O operation
409		return t.b.Upload(ctx, bytes.NewReader(t.firstBlock),
410			t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions)
411	}
412	// Multiple blocks staged, commit them all now
413	blockID := newUuidBlockID(t.blockIDPrefix)
414	blockIDs := make([]string, t.maxBlockNum+1)
415	for bn := uint32(0); bn <= t.maxBlockNum; bn++ {
416		blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64()
417	}
418	return t.b.CommitBlockList(ctx, blockIDs, t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions)
419}
420
421////////////////////////////////////////////////////////////////////////////////////////////////////
422
423type iTransfer interface {
424	start(ctx context.Context) (interface{}, error)
425	chunk(ctx context.Context, num uint32, buffer []byte) error
426	end(ctx context.Context) (interface{}, error)
427}
428
429type UploadStreamOptions struct {
430	MaxBuffers int
431	BufferSize int
432}
433
434type firstErr struct {
435	lock       sync.Mutex
436	finalError error
437}
438
439func (fe *firstErr) set(err error) {
440	fe.lock.Lock()
441	if fe.finalError == nil {
442		fe.finalError = err
443	}
444	fe.lock.Unlock()
445}
446
447func (fe *firstErr) get() (err error) {
448	fe.lock.Lock()
449	err = fe.finalError
450	fe.lock.Unlock()
451	return
452}
453
454func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) {
455	firstErr := firstErr{}
456	ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything
457	defer cancel()
458	wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing
459	type OutgoingMsg struct {
460		chunkNum uint32
461		buffer   []byte
462	}
463
464	// Create a channel to hold the buffers usable for incoming datsa
465	incoming := make(chan []byte, o.MaxBuffers)
466	outgoing := make(chan OutgoingMsg, o.MaxBuffers) // Channel holding outgoing buffers
467	if result, err := t.start(ctx); err != nil {
468		return result, err
469	}
470
471	numBuffers := 0 // The number of buffers & out going goroutines created so far
472	injectBuffer := func() {
473		// For each Buffer, create it and a goroutine to upload it
474		incoming <- make([]byte, o.BufferSize) // Add the new buffer to the incoming channel so this goroutine can from the reader into it
475		numBuffers++
476		go func() {
477			for outgoingMsg := range outgoing {
478				// Upload the outgoing buffer
479				err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer)
480				wg.Done() // Indicate this buffer was sent
481				if nil != err {
482					// NOTE: finalErr could be assigned to multiple times here which is OK,
483					// some error will be returned.
484					firstErr.set(err)
485					cancel()
486				}
487				incoming <- outgoingMsg.buffer // The goroutine reading from the stream can reuse this buffer now
488			}
489		}()
490	}
491	injectBuffer() // Create our 1st buffer & outgoing goroutine
492
493	// This goroutine grabs a buffer, reads from the stream into the buffer,
494	// and inserts the buffer into the outgoing channel to be uploaded
495	for c := uint32(0); true; c++ { // Iterate once per chunk
496		var buffer []byte
497		if numBuffers < o.MaxBuffers {
498			select {
499			// We're not at max buffers, see if a previously-created buffer is available
500			case buffer = <-incoming:
501				break
502			default:
503				// No buffer available; inject a new buffer & go routine to process it
504				injectBuffer()
505				buffer = <-incoming // Grab the just-injected buffer
506			}
507		} else {
508			// We are at max buffers, block until we get to reuse one
509			buffer = <-incoming
510		}
511		n, err := io.ReadFull(reader, buffer)
512		if err != nil { // Less than len(buffer) bytes were read
513			buffer = buffer[:n] // Make slice match the # of read bytes
514		}
515		if len(buffer) > 0 {
516			// Buffer not empty, upload it
517			wg.Add(1) // We're posting a buffer to be sent
518			outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer}
519		}
520		if err != nil { // The reader is done, no more outgoing buffers
521			if err == io.EOF || err == io.ErrUnexpectedEOF {
522				err = nil // This function does NOT return an error if io.ReadFull returns io.EOF or io.ErrUnexpectedEOF
523			} else {
524				firstErr.set(err)
525			}
526			break
527		}
528	}
529	// NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done
530	close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty
531	wg.Wait()       // Wait for all pending outgoing messages to complete
532	err := firstErr.get()
533	if err == nil {
534		// If no error, after all blocks uploaded, commit them to the blob & return the result
535		return t.end(ctx)
536	}
537	return nil, err
538}
539