1package azblob 2 3import ( 4 "context" 5 "encoding/base64" 6 "io" 7 "net/http" 8 9 "bytes" 10 "os" 11 "sync" 12 "time" 13 14 "errors" 15 16 "github.com/Azure/azure-pipeline-go/pipeline" 17) 18 19// CommonResponse returns the headers common to all blob REST API responses. 20type CommonResponse interface { 21 // ETag returns the value for header ETag. 22 ETag() ETag 23 24 // LastModified returns the value for header Last-Modified. 25 LastModified() time.Time 26 27 // RequestID returns the value for header x-ms-request-id. 28 RequestID() string 29 30 // Date returns the value for header Date. 31 Date() time.Time 32 33 // Version returns the value for header x-ms-version. 34 Version() string 35 36 // Response returns the raw HTTP response object. 37 Response() *http.Response 38} 39 40// UploadToBlockBlobOptions identifies options used by the UploadBufferToBlockBlob and UploadFileToBlockBlob functions. 41type UploadToBlockBlobOptions struct { 42 // BlockSize specifies the block size to use; the default (and maximum size) is BlockBlobMaxStageBlockBytes. 43 BlockSize int64 44 45 // Progress is a function that is invoked periodically as bytes are sent to the BlockBlobURL. 46 // Note that the progress reporting is not always increasing; it can go down when retrying a request. 47 Progress pipeline.ProgressReceiver 48 49 // BlobHTTPHeaders indicates the HTTP headers to be associated with the blob. 50 BlobHTTPHeaders BlobHTTPHeaders 51 52 // Metadata indicates the metadata to be associated with the blob when PutBlockList is called. 53 Metadata Metadata 54 55 // AccessConditions indicates the access conditions for the block blob. 56 AccessConditions BlobAccessConditions 57 58 // BlobAccessTier indicates the tier of blob 59 BlobAccessTier AccessTierType 60 61 // BlobTagsMap 62 BlobTagsMap BlobTagsMap 63 64 // ClientProvidedKeyOptions indicates the client provided key by name and/or by value to encrypt/decrypt data. 65 ClientProvidedKeyOptions ClientProvidedKeyOptions 66 67 // Parallelism indicates the maximum number of blocks to upload in parallel (0=default) 68 Parallelism uint16 69} 70 71// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob. 72func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64, 73 blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { 74 if o.BlockSize == 0 { 75 // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error 76 if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks { 77 return nil, errors.New("buffer is too large to upload to a block blob") 78 } 79 // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request 80 if readerSize <= BlockBlobMaxUploadBlobBytes { 81 o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified 82 } else { 83 o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks 84 if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB 85 o.BlockSize = BlobDefaultDownloadBlockSize 86 } 87 // StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize). 88 } 89 } 90 91 if readerSize <= BlockBlobMaxUploadBlobBytes { 92 // If the size can fit in 1 Upload call, do it this way 93 var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize) 94 if o.Progress != nil { 95 body = pipeline.NewRequestBodyProgress(body, o.Progress) 96 } 97 return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions) 98 } 99 100 var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1) 101 102 blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs 103 progress := int64(0) 104 progressLock := &sync.Mutex{} 105 106 err := DoBatchTransfer(ctx, BatchTransferOptions{ 107 OperationName: "uploadReaderAtToBlockBlob", 108 TransferSize: readerSize, 109 ChunkSize: o.BlockSize, 110 Parallelism: o.Parallelism, 111 Operation: func(offset int64, count int64, ctx context.Context) error { 112 // This function is called once per block. 113 // It is passed this block's offset within the buffer and its count of bytes 114 // Prepare to read the proper block/section of the buffer 115 var body io.ReadSeeker = io.NewSectionReader(reader, offset, count) 116 blockNum := offset / o.BlockSize 117 if o.Progress != nil { 118 blockProgress := int64(0) 119 body = pipeline.NewRequestBodyProgress(body, 120 func(bytesTransferred int64) { 121 diff := bytesTransferred - blockProgress 122 blockProgress = bytesTransferred 123 progressLock.Lock() // 1 goroutine at a time gets a progress report 124 progress += diff 125 o.Progress(progress) 126 progressLock.Unlock() 127 }) 128 } 129 130 // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks 131 // at the same time causing PutBlockList to get a mix of blocks from all the clients. 132 blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes()) 133 _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions) 134 return err 135 }, 136 }) 137 if err != nil { 138 return nil, err 139 } 140 // All put blocks were successful, call Put Block List to finalize the blob 141 return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions) 142} 143 144// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob. 145func UploadBufferToBlockBlob(ctx context.Context, b []byte, 146 blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { 147 return uploadReaderAtToBlockBlob(ctx, bytes.NewReader(b), int64(len(b)), blockBlobURL, o) 148} 149 150// UploadFileToBlockBlob uploads a file in blocks to a block blob. 151func UploadFileToBlockBlob(ctx context.Context, file *os.File, 152 blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { 153 154 stat, err := file.Stat() 155 if err != nil { 156 return nil, err 157 } 158 return uploadReaderAtToBlockBlob(ctx, file, stat.Size(), blockBlobURL, o) 159} 160 161/////////////////////////////////////////////////////////////////////////////// 162 163const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB 164 165// DownloadFromBlobOptions identifies options used by the DownloadBlobToBuffer and DownloadBlobToFile functions. 166type DownloadFromBlobOptions struct { 167 // BlockSize specifies the block size to use for each parallel download; the default size is BlobDefaultDownloadBlockSize. 168 BlockSize int64 169 170 // Progress is a function that is invoked periodically as bytes are received. 171 Progress pipeline.ProgressReceiver 172 173 // AccessConditions indicates the access conditions used when making HTTP GET requests against the blob. 174 AccessConditions BlobAccessConditions 175 176 // ClientProvidedKeyOptions indicates the client provided key by name and/or by value to encrypt/decrypt data. 177 ClientProvidedKeyOptions ClientProvidedKeyOptions 178 179 // Parallelism indicates the maximum number of blocks to download in parallel (0=default) 180 Parallelism uint16 181 182 // RetryReaderOptionsPerBlock is used when downloading each block. 183 RetryReaderOptionsPerBlock RetryReaderOptions 184} 185 186// downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel. 187func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64, 188 writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { 189 if o.BlockSize == 0 { 190 o.BlockSize = BlobDefaultDownloadBlockSize 191 } 192 193 if count == CountToEnd { // If size not specified, calculate it 194 if initialDownloadResponse != nil { 195 count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it 196 } else { 197 // If we don't have the length at all, get it 198 dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false, o.ClientProvidedKeyOptions) 199 if err != nil { 200 return err 201 } 202 count = dr.ContentLength() - offset 203 } 204 } 205 206 if count <= 0 { 207 // The file is empty, there is nothing to download. 208 return nil 209 } 210 211 // Prepare and do parallel download. 212 progress := int64(0) 213 progressLock := &sync.Mutex{} 214 215 err := DoBatchTransfer(ctx, BatchTransferOptions{ 216 OperationName: "downloadBlobToWriterAt", 217 TransferSize: count, 218 ChunkSize: o.BlockSize, 219 Parallelism: o.Parallelism, 220 Operation: func(chunkStart int64, count int64, ctx context.Context) error { 221 dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false, o.ClientProvidedKeyOptions) 222 if err != nil { 223 return err 224 } 225 body := dr.Body(o.RetryReaderOptionsPerBlock) 226 if o.Progress != nil { 227 rangeProgress := int64(0) 228 body = pipeline.NewResponseBodyProgress( 229 body, 230 func(bytesTransferred int64) { 231 diff := bytesTransferred - rangeProgress 232 rangeProgress = bytesTransferred 233 progressLock.Lock() 234 progress += diff 235 o.Progress(progress) 236 progressLock.Unlock() 237 }) 238 } 239 _, err = io.Copy(newSectionWriter(writer, chunkStart, count), body) 240 body.Close() 241 return err 242 }, 243 }) 244 if err != nil { 245 return err 246 } 247 return nil 248} 249 250// DownloadBlobToBuffer downloads an Azure blob to a buffer with parallel. 251// Offset and count are optional, pass 0 for both to download the entire blob. 252func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64, 253 b []byte, o DownloadFromBlobOptions) error { 254 return downloadBlobToWriterAt(ctx, blobURL, offset, count, newBytesWriter(b), o, nil) 255} 256 257// DownloadBlobToFile downloads an Azure blob to a local file. 258// The file would be truncated if the size doesn't match. 259// Offset and count are optional, pass 0 for both to download the entire blob. 260func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, count int64, 261 file *os.File, o DownloadFromBlobOptions) error { 262 // 1. Calculate the size of the destination file 263 var size int64 264 265 if count == CountToEnd { 266 // Try to get Azure blob's size 267 props, err := blobURL.GetProperties(ctx, o.AccessConditions, o.ClientProvidedKeyOptions) 268 if err != nil { 269 return err 270 } 271 size = props.ContentLength() - offset 272 } else { 273 size = count 274 } 275 276 // 2. Compare and try to resize local file's size if it doesn't match Azure blob's size. 277 stat, err := file.Stat() 278 if err != nil { 279 return err 280 } 281 if stat.Size() != size { 282 if err = file.Truncate(size); err != nil { 283 return err 284 } 285 } 286 287 if size > 0 { 288 return downloadBlobToWriterAt(ctx, blobURL, offset, size, file, o, nil) 289 } else { // if the blob's size is 0, there is no need in downloading it 290 return nil 291 } 292} 293 294/////////////////////////////////////////////////////////////////////////////// 295 296// BatchTransferOptions identifies options used by DoBatchTransfer. 297type BatchTransferOptions struct { 298 TransferSize int64 299 ChunkSize int64 300 Parallelism uint16 301 Operation func(offset int64, chunkSize int64, ctx context.Context) error 302 OperationName string 303} 304 305// DoBatchTransfer helps to execute operations in a batch manner. 306// Can be used by users to customize batch works (for other scenarios that the SDK does not provide) 307func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error { 308 if o.ChunkSize == 0 { 309 return errors.New("ChunkSize cannot be 0") 310 } 311 312 if o.Parallelism == 0 { 313 o.Parallelism = 5 // default Parallelism 314 } 315 316 // Prepare and do parallel operations. 317 numChunks := uint16(((o.TransferSize - 1) / o.ChunkSize) + 1) 318 operationChannel := make(chan func() error, o.Parallelism) // Create the channel that release 'Parallelism' goroutines concurrently 319 operationResponseChannel := make(chan error, numChunks) // Holds each response 320 ctx, cancel := context.WithCancel(ctx) 321 defer cancel() 322 323 // Create the goroutines that process each operation (in parallel). 324 for g := uint16(0); g < o.Parallelism; g++ { 325 //grIndex := g 326 go func() { 327 for f := range operationChannel { 328 err := f() 329 operationResponseChannel <- err 330 } 331 }() 332 } 333 334 // Add each chunk's operation to the channel. 335 for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { 336 curChunkSize := o.ChunkSize 337 338 if chunkNum == numChunks-1 { // Last chunk 339 curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total 340 } 341 offset := int64(chunkNum) * o.ChunkSize 342 343 operationChannel <- func() error { 344 return o.Operation(offset, curChunkSize, ctx) 345 } 346 } 347 close(operationChannel) 348 349 // Wait for the operations to complete. 350 var firstErr error = nil 351 for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { 352 responseError := <-operationResponseChannel 353 // record the first error (the original error which should cause the other chunks to fail with canceled context) 354 if responseError != nil && firstErr == nil { 355 cancel() // As soon as any operation fails, cancel all remaining operation calls 356 firstErr = responseError 357 } 358 } 359 return firstErr 360} 361 362//////////////////////////////////////////////////////////////////////////////////////////////// 363 364const _1MiB = 1024 * 1024 365 366type UploadStreamToBlockBlobOptions struct { 367 // BufferSize sizes the buffer used to read data from source. If < 1 MiB, defaults to 1 MiB. 368 BufferSize int 369 // MaxBuffers defines the number of simultaneous uploads will be performed to upload the file. 370 MaxBuffers int 371 BlobHTTPHeaders BlobHTTPHeaders 372 Metadata Metadata 373 AccessConditions BlobAccessConditions 374 BlobAccessTier AccessTierType 375 BlobTagsMap BlobTagsMap 376 ClientProvidedKeyOptions ClientProvidedKeyOptions 377} 378 379func (u *UploadStreamToBlockBlobOptions) defaults() { 380 if u.MaxBuffers == 0 { 381 u.MaxBuffers = 1 382 } 383 384 if u.BufferSize < _1MiB { 385 u.BufferSize = _1MiB 386 } 387} 388 389// UploadStreamToBlockBlob copies the file held in io.Reader to the Blob at blockBlobURL. 390// A Context deadline or cancellation will cause this to error. 391func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, 392 o UploadStreamToBlockBlobOptions) (CommonResponse, error) { 393 o.defaults() 394 395 result, err := copyFromReader(ctx, reader, blockBlobURL, o) 396 if err != nil { 397 return nil, err 398 } 399 400 return result, nil 401} 402 403// UploadStreamOptions (defunct) was used internally. This will be removed or made private in a future version. 404type UploadStreamOptions struct { 405 BufferSize int 406 MaxBuffers int 407} 408