1package azblob 2 3import ( 4 "context" 5 "encoding/base64" 6 "io" 7 "net/http" 8 9 "bytes" 10 "os" 11 "sync" 12 "time" 13 14 "errors" 15 16 "github.com/Azure/azure-pipeline-go/pipeline" 17) 18 19// CommonResponse returns the headers common to all blob REST API responses. 20type CommonResponse interface { 21 // ETag returns the value for header ETag. 22 ETag() ETag 23 24 // LastModified returns the value for header Last-Modified. 25 LastModified() time.Time 26 27 // RequestID returns the value for header x-ms-request-id. 28 RequestID() string 29 30 // Date returns the value for header Date. 31 Date() time.Time 32 33 // Version returns the value for header x-ms-version. 34 Version() string 35 36 // Response returns the raw HTTP response object. 37 Response() *http.Response 38} 39 40// UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions. 41type UploadToBlockBlobOptions struct { 42 // BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes. 43 BlockSize int64 44 45 // Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL. 46 // Note that the progress reporting is not always increasing; it can go down when retrying a request. 47 Progress pipeline.ProgressReceiver 48 49 // BlobHTTPHeaders indicates the HTTP headers to be associated with the blob. 50 BlobHTTPHeaders BlobHTTPHeaders 51 52 // Metadata indicates the metadata to be associated with the blob when PutBlockList is called. 53 Metadata Metadata 54 55 // AccessConditions indicates the access conditions for the block blob. 56 AccessConditions BlobAccessConditions 57 58 // Parallelism indicates the maximum number of blocks to upload in parallel (0=default) 59 Parallelism uint16 60} 61 62// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob. 63func UploadBufferToBlockBlob(ctx context.Context, b []byte, 64 blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { 65 bufferSize := int64(len(b)) 66 if o.BlockSize == 0 { 67 // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error 68 if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks { 69 return nil, errors.New("Buffer is too large to upload to a block blob") 70 } 71 // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request 72 if bufferSize <= BlockBlobMaxUploadBlobBytes { 73 o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified 74 } else { 75 o.BlockSize = bufferSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks 76 if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB 77 o.BlockSize = BlobDefaultDownloadBlockSize 78 } 79 // StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize). 80 } 81 } 82 83 if bufferSize <= BlockBlobMaxUploadBlobBytes { 84 // If the size can fit in 1 Upload call, do it this way 85 var body io.ReadSeeker = bytes.NewReader(b) 86 if o.Progress != nil { 87 body = pipeline.NewRequestBodyProgress(body, o.Progress) 88 } 89 return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions) 90 } 91 92 var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1) 93 94 blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs 95 progress := int64(0) 96 progressLock := &sync.Mutex{} 97 98 err := doBatchTransfer(ctx, batchTransferOptions{ 99 operationName: "UploadBufferToBlockBlob", 100 transferSize: bufferSize, 101 chunkSize: o.BlockSize, 102 parallelism: o.Parallelism, 103 operation: func(offset int64, count int64) error { 104 // This function is called once per block. 105 // It is passed this block's offset within the buffer and its count of bytes 106 // Prepare to read the proper block/section of the buffer 107 var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count]) 108 blockNum := offset / o.BlockSize 109 if o.Progress != nil { 110 blockProgress := int64(0) 111 body = pipeline.NewRequestBodyProgress(body, 112 func(bytesTransferred int64) { 113 diff := bytesTransferred - blockProgress 114 blockProgress = bytesTransferred 115 progressLock.Lock() // 1 goroutine at a time gets a progress report 116 progress += diff 117 o.Progress(progress) 118 progressLock.Unlock() 119 }) 120 } 121 122 // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks 123 // at the same time causing PutBlockList to get a mix of blocks from all the clients. 124 blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes()) 125 _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil) 126 return err 127 }, 128 }) 129 if err != nil { 130 return nil, err 131 } 132 // All put blocks were successful, call Put Block List to finalize the blob 133 return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions) 134} 135 136// UploadFileToBlockBlob uploads a file in blocks to a block blob. 137func UploadFileToBlockBlob(ctx context.Context, file *os.File, 138 blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { 139 140 stat, err := file.Stat() 141 if err != nil { 142 return nil, err 143 } 144 m := mmf{} // Default to an empty slice; used for 0-size file 145 if stat.Size() != 0 { 146 m, err = newMMF(file, false, 0, int(stat.Size())) 147 if err != nil { 148 return nil, err 149 } 150 defer m.unmap() 151 } 152 return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o) 153} 154 155/////////////////////////////////////////////////////////////////////////////// 156 157const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB 158 159// DownloadFromBlobOptions identifies options used by the DownloadBlobToBuffer and DownloadBlobToFile functions. 160type DownloadFromBlobOptions struct { 161 // BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize. 162 BlockSize int64 163 164 // Progress is a function that is invoked periodically as bytes are received. 165 Progress pipeline.ProgressReceiver 166 167 // AccessConditions indicates the access conditions used when making HTTP GET requests against the blob. 168 AccessConditions BlobAccessConditions 169 170 // Parallelism indicates the maximum number of blocks to download in parallel (0=default) 171 Parallelism uint16 172 173 // RetryReaderOptionsPerBlock is used when downloading each block. 174 RetryReaderOptionsPerBlock RetryReaderOptions 175} 176 177// downloadBlobToBuffer downloads an Azure blob to a buffer with parallel. 178func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64, 179 b []byte, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { 180 if o.BlockSize == 0 { 181 o.BlockSize = BlobDefaultDownloadBlockSize 182 } 183 184 if count == CountToEnd { // If size not specified, calculate it 185 if initialDownloadResponse != nil { 186 count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it 187 } else { 188 // If we don't have the length at all, get it 189 dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false) 190 if err != nil { 191 return err 192 } 193 count = dr.ContentLength() - offset 194 } 195 } 196 197 // Prepare and do parallel download. 198 progress := int64(0) 199 progressLock := &sync.Mutex{} 200 201 err := doBatchTransfer(ctx, batchTransferOptions{ 202 operationName: "downloadBlobToBuffer", 203 transferSize: count, 204 chunkSize: o.BlockSize, 205 parallelism: o.Parallelism, 206 operation: func(chunkStart int64, count int64) error { 207 dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false) 208 if err != nil { 209 return err 210 } 211 body := dr.Body(o.RetryReaderOptionsPerBlock) 212 if o.Progress != nil { 213 rangeProgress := int64(0) 214 body = pipeline.NewResponseBodyProgress( 215 body, 216 func(bytesTransferred int64) { 217 diff := bytesTransferred - rangeProgress 218 rangeProgress = bytesTransferred 219 progressLock.Lock() 220 progress += diff 221 o.Progress(progress) 222 progressLock.Unlock() 223 }) 224 } 225 _, err = io.ReadFull(body, b[chunkStart:chunkStart+count]) 226 body.Close() 227 return err 228 }, 229 }) 230 if err != nil { 231 return err 232 } 233 return nil 234} 235 236// DownloadBlobToBuffer downloads an Azure blob to a buffer with parallel. 237// Offset and count are optional, pass 0 for both to download the entire blob. 238func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64, 239 b []byte, o DownloadFromBlobOptions) error { 240 return downloadBlobToBuffer(ctx, blobURL, offset, count, b, o, nil) 241} 242 243// DownloadBlobToFile downloads an Azure blob to a local file. 244// The file would be truncated if the size doesn't match. 245// Offset and count are optional, pass 0 for both to download the entire blob. 246func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64, 247 file *os.File, o DownloadFromBlobOptions) error { 248 // 1. Calculate the size of the destination file 249 var size int64 250 251 if count == CountToEnd { 252 // Try to get Azure blob's size 253 props, err := blobURL.GetProperties(ctx, o.AccessConditions) 254 if err != nil { 255 return err 256 } 257 size = props.ContentLength() - offset 258 } else { 259 size = count 260 } 261 262 // 2. Compare and try to resize local file's size if it doesn't match Azure blob's size. 263 stat, err := file.Stat() 264 if err != nil { 265 return err 266 } 267 if stat.Size() != size { 268 if err = file.Truncate(size); err != nil { 269 return err 270 } 271 } 272 273 if size > 0 { 274 // 3. Set mmap and call downloadBlobToBuffer. 275 m, err := newMMF(file, true, 0, int(size)) 276 if err != nil { 277 return err 278 } 279 defer m.unmap() 280 return downloadBlobToBuffer(ctx, blobURL, offset, size, m, o, nil) 281 } else { // if the blob's size is 0, there is no need in downloading it 282 return nil 283 } 284} 285 286/////////////////////////////////////////////////////////////////////////////// 287 288// BatchTransferOptions identifies options used by doBatchTransfer. 289type batchTransferOptions struct { 290 transferSize int64 291 chunkSize int64 292 parallelism uint16 293 operation func(offset int64, chunkSize int64) error 294 operationName string 295} 296 297// doBatchTransfer helps to execute operations in a batch manner. 298func doBatchTransfer(ctx context.Context, o batchTransferOptions) error { 299 // Prepare and do parallel operations. 300 numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1) 301 operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently 302 operationResponseChannel := make(chan error, numChunks) // Holds each response 303 ctx, cancel := context.WithCancel(ctx) 304 defer cancel() 305 306 // Create the goroutines that process each operation (in parallel). 307 if o.parallelism == 0 { 308 o.parallelism = 5 // default parallelism 309 } 310 for g := uint16(0); g < o.parallelism; g++ { 311 //grIndex := g 312 go func() { 313 for f := range operationChannel { 314 //fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex) 315 err := f() 316 operationResponseChannel <- err 317 //fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex) 318 } 319 }() 320 } 321 322 // Add each chunk's operation to the channel. 323 for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { 324 curChunkSize := o.chunkSize 325 326 if chunkNum == numChunks-1 { // Last chunk 327 curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total 328 } 329 offset := int64(chunkNum) * o.chunkSize 330 331 operationChannel <- func() error { 332 return o.operation(offset, curChunkSize) 333 } 334 } 335 close(operationChannel) 336 337 // Wait for the operations to complete. 338 for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { 339 responseError := <-operationResponseChannel 340 if responseError != nil { 341 cancel() // As soon as any operation fails, cancel all remaining operation calls 342 return responseError // No need to process anymore responses 343 } 344 } 345 return nil 346} 347 348//////////////////////////////////////////////////////////////////////////////////////////////// 349 350type UploadStreamToBlockBlobOptions struct { 351 BufferSize int 352 MaxBuffers int 353 BlobHTTPHeaders BlobHTTPHeaders 354 Metadata Metadata 355 AccessConditions BlobAccessConditions 356} 357 358func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, 359 o UploadStreamToBlockBlobOptions) (CommonResponse, error) { 360 result, err := uploadStream(ctx, reader, 361 UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers}, 362 &uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()}) 363 if err != nil { 364 return nil, err 365 } 366 return result.(CommonResponse), nil 367} 368 369type uploadStreamToBlockBlobOptions struct { 370 b BlockBlobURL 371 o UploadStreamToBlockBlobOptions 372 blockIDPrefix uuid // UUID used with all blockIDs 373 maxBlockNum uint32 // defaults to 0 374 firstBlock []byte // Used only if maxBlockNum is 0 375} 376 377func (t *uploadStreamToBlockBlobOptions) start(ctx context.Context) (interface{}, error) { 378 return nil, nil 379} 380 381func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32, buffer []byte) error { 382 if num == 0 { 383 t.firstBlock = buffer 384 385 // If whole payload fits in 1 block, don't stage it; End will upload it with 1 I/O operation 386 // If the payload is exactly the same size as the buffer, there may be more content coming in. 387 if len(buffer) < t.o.BufferSize { 388 return nil 389 } 390 } 391 // Else, upload a staged block... 392 atomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) { 393 // Atomically remember (in t.numBlocks) the maximum block num we've ever seen 394 if startVal < num { 395 return num, nil 396 } 397 return startVal, nil 398 }) 399 blockID := newUuidBlockID(t.blockIDPrefix).WithBlockNumber(num).ToBase64() 400 _, err := t.b.StageBlock(ctx, blockID, bytes.NewReader(buffer), LeaseAccessConditions{}, nil) 401 return err 402} 403 404func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{}, error) { 405 // If the first block had the exact same size as the buffer 406 // we would have staged it as a block thinking that there might be more data coming 407 if t.maxBlockNum == 0 && len(t.firstBlock) != t.o.BufferSize { 408 // If whole payload fits in 1 block (block #0), upload it with 1 I/O operation 409 return t.b.Upload(ctx, bytes.NewReader(t.firstBlock), 410 t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions) 411 } 412 // Multiple blocks staged, commit them all now 413 blockID := newUuidBlockID(t.blockIDPrefix) 414 blockIDs := make([]string, t.maxBlockNum+1) 415 for bn := uint32(0); bn <= t.maxBlockNum; bn++ { 416 blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64() 417 } 418 return t.b.CommitBlockList(ctx, blockIDs, t.o.BlobHTTPHeaders, t.o.Metadata, t.o.AccessConditions) 419} 420 421//////////////////////////////////////////////////////////////////////////////////////////////////// 422 423type iTransfer interface { 424 start(ctx context.Context) (interface{}, error) 425 chunk(ctx context.Context, num uint32, buffer []byte) error 426 end(ctx context.Context) (interface{}, error) 427} 428 429type UploadStreamOptions struct { 430 MaxBuffers int 431 BufferSize int 432} 433 434type firstErr struct { 435 lock sync.Mutex 436 finalError error 437} 438 439func (fe *firstErr) set(err error) { 440 fe.lock.Lock() 441 if fe.finalError == nil { 442 fe.finalError = err 443 } 444 fe.lock.Unlock() 445} 446 447func (fe *firstErr) get() (err error) { 448 fe.lock.Lock() 449 err = fe.finalError 450 fe.lock.Unlock() 451 return 452} 453 454func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) { 455 firstErr := firstErr{} 456 ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything 457 defer cancel() 458 wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing 459 type OutgoingMsg struct { 460 chunkNum uint32 461 buffer []byte 462 } 463 464 // Create a channel to hold the buffers usable for incoming datsa 465 incoming := make(chan []byte, o.MaxBuffers) 466 outgoing := make(chan OutgoingMsg, o.MaxBuffers) // Channel holding outgoing buffers 467 if result, err := t.start(ctx); err != nil { 468 return result, err 469 } 470 471 numBuffers := 0 // The number of buffers & out going goroutines created so far 472 injectBuffer := func() { 473 // For each Buffer, create it and a goroutine to upload it 474 incoming <- make([]byte, o.BufferSize) // Add the new buffer to the incoming channel so this goroutine can from the reader into it 475 numBuffers++ 476 go func() { 477 for outgoingMsg := range outgoing { 478 // Upload the outgoing buffer 479 err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer) 480 wg.Done() // Indicate this buffer was sent 481 if nil != err { 482 // NOTE: finalErr could be assigned to multiple times here which is OK, 483 // some error will be returned. 484 firstErr.set(err) 485 cancel() 486 } 487 incoming <- outgoingMsg.buffer // The goroutine reading from the stream can reuse this buffer now 488 } 489 }() 490 } 491 injectBuffer() // Create our 1st buffer & outgoing goroutine 492 493 // This goroutine grabs a buffer, reads from the stream into the buffer, 494 // and inserts the buffer into the outgoing channel to be uploaded 495 for c := uint32(0); true; c++ { // Iterate once per chunk 496 var buffer []byte 497 if numBuffers < o.MaxBuffers { 498 select { 499 // We're not at max buffers, see if a previously-created buffer is available 500 case buffer = <-incoming: 501 break 502 default: 503 // No buffer available; inject a new buffer & go routine to process it 504 injectBuffer() 505 buffer = <-incoming // Grab the just-injected buffer 506 } 507 } else { 508 // We are at max buffers, block until we get to reuse one 509 buffer = <-incoming 510 } 511 n, err := io.ReadFull(reader, buffer) 512 if err != nil { // Less than len(buffer) bytes were read 513 buffer = buffer[:n] // Make slice match the # of read bytes 514 } 515 if len(buffer) > 0 { 516 // Buffer not empty, upload it 517 wg.Add(1) // We're posting a buffer to be sent 518 outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer} 519 } 520 if err != nil { // The reader is done, no more outgoing buffers 521 if err == io.EOF || err == io.ErrUnexpectedEOF { 522 err = nil // This function does NOT return an error if io.ReadFull returns io.EOF or io.ErrUnexpectedEOF 523 } else { 524 firstErr.set(err) 525 } 526 break 527 } 528 } 529 // NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done 530 close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty 531 wg.Wait() // Wait for all pending outgoing messages to complete 532 err := firstErr.get() 533 if err == nil { 534 // If no error, after all blocks uploaded, commit them to the blob & return the result 535 return t.end(ctx) 536 } 537 return nil, err 538} 539