1package azure 2 3import ( 4 "context" 5 "errors" 6 "flag" 7 "fmt" 8 "io" 9 "net/url" 10 "strings" 11 "time" 12 13 "github.com/Azure/azure-pipeline-go/pipeline" 14 "github.com/Azure/azure-storage-blob-go/azblob" 15 16 "github.com/cortexproject/cortex/pkg/util" 17 "github.com/cortexproject/cortex/pkg/util/log" 18 "github.com/grafana/dskit/flagext" 19 20 "github.com/grafana/loki/pkg/storage/chunk" 21 chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" 22) 23 24const ( 25 // Environment 26 azureGlobal = "AzureGlobal" 27 azureChinaCloud = "AzureChinaCloud" 28 azureGermanCloud = "AzureGermanCloud" 29 azureUSGovernment = "AzureUSGovernment" 30) 31 32var ( 33 supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment} 34 noClientKey = azblob.ClientProvidedKeyOptions{} 35 endpoints = map[string]struct{ blobURLFmt, containerURLFmt string }{ 36 azureGlobal: { 37 "https://%s.blob.core.windows.net/%s/%s", 38 "https://%s.blob.core.windows.net/%s", 39 }, 40 azureChinaCloud: { 41 "https://%s.blob.core.chinacloudapi.cn/%s/%s", 42 "https://%s.blob.core.chinacloudapi.cn/%s", 43 }, 44 azureGermanCloud: { 45 "https://%s.blob.core.cloudapi.de/%s/%s", 46 "https://%s.blob.core.cloudapi.de/%s", 47 }, 48 azureUSGovernment: { 49 "https://%s.blob.core.usgovcloudapi.net/%s/%s", 50 "https://%s.blob.core.usgovcloudapi.net/%s", 51 }, 52 } 53) 54 55// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage. 56type BlobStorageConfig struct { 57 Environment string `yaml:"environment"` 58 ContainerName string `yaml:"container_name"` 59 AccountName string `yaml:"account_name"` 60 AccountKey flagext.Secret `yaml:"account_key"` 61 DownloadBufferSize int `yaml:"download_buffer_size"` 62 UploadBufferSize int `yaml:"upload_buffer_size"` 63 UploadBufferCount int `yaml:"upload_buffer_count"` 64 RequestTimeout time.Duration `yaml:"request_timeout"` 65 MaxRetries int `yaml:"max_retries"` 66 MinRetryDelay time.Duration `yaml:"min_retry_delay"` 67 MaxRetryDelay time.Duration `yaml:"max_retry_delay"` 68} 69 70// RegisterFlags adds the flags required to config this to the given FlagSet 71func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) { 72 c.RegisterFlagsWithPrefix("", f) 73} 74 75// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet 76func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { 77 f.StringVar(&c.Environment, prefix+"azure.environment", azureGlobal, fmt.Sprintf("Azure Cloud environment. Supported values are: %s.", strings.Join(supportedEnvironments, ", "))) 78 f.StringVar(&c.ContainerName, prefix+"azure.container-name", "cortex", "Name of the blob container used to store chunks. This container must be created before running cortex.") 79 f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used") 80 f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.") 81 f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.") 82 f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads.") 83 f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for uploads.") 84 f.IntVar(&c.UploadBufferCount, prefix+"azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk.") 85 f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.") 86 f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.") 87 f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.") 88} 89 90// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks. 91// Implements ObjectStorage 92type BlobStorage struct { 93 // blobService storage.Serv 94 cfg *BlobStorageConfig 95 containerURL azblob.ContainerURL 96} 97 98// NewBlobStorage creates a new instance of the BlobStorage struct. 99func NewBlobStorage(cfg *BlobStorageConfig) (*BlobStorage, error) { 100 log.WarnExperimentalUse("Azure Blob Storage") 101 blobStorage := &BlobStorage{ 102 cfg: cfg, 103 } 104 105 var err error 106 blobStorage.containerURL, err = blobStorage.buildContainerURL() 107 if err != nil { 108 return nil, err 109 } 110 111 return blobStorage, nil 112} 113 114// Stop is a no op, as there are no background workers with this driver currently 115func (b *BlobStorage) Stop() {} 116 117func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { 118 var cancel context.CancelFunc = func() {} 119 if b.cfg.RequestTimeout > 0 { 120 ctx, cancel = context.WithTimeout(ctx, b.cfg.RequestTimeout) 121 } 122 123 rc, err := b.getObject(ctx, objectKey) 124 if err != nil { 125 // cancel the context if there is an error. 126 cancel() 127 return nil, err 128 } 129 // else return a wrapped ReadCloser which cancels the context while closing the reader. 130 return chunk_util.NewReadCloserWithContextCancelFunc(rc, cancel), nil 131} 132 133func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) { 134 blockBlobURL, err := b.getBlobURL(objectKey) 135 if err != nil { 136 return nil, err 137 } 138 139 // Request access to the blob 140 downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, noClientKey) 141 if err != nil { 142 return nil, err 143 } 144 145 return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: b.cfg.MaxRetries}), nil 146} 147 148func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { 149 blockBlobURL, err := b.getBlobURL(objectKey) 150 if err != nil { 151 return err 152 } 153 154 bufferSize := b.cfg.UploadBufferSize 155 maxBuffers := b.cfg.UploadBufferCount 156 _, err = azblob.UploadStreamToBlockBlob(ctx, object, blockBlobURL, 157 azblob.UploadStreamToBlockBlobOptions{BufferSize: bufferSize, MaxBuffers: maxBuffers}) 158 159 return err 160} 161 162func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) { 163 blobID = strings.Replace(blobID, ":", "-", -1) 164 165 // generate url for new chunk blob 166 u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.AccountName, b.cfg.ContainerName, blobID)) 167 if err != nil { 168 return azblob.BlockBlobURL{}, err 169 } 170 171 azPipeline, err := b.newPipeline() 172 if err != nil { 173 return azblob.BlockBlobURL{}, err 174 } 175 176 return azblob.NewBlockBlobURL(*u, azPipeline), nil 177} 178 179func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { 180 u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.AccountName, b.cfg.ContainerName)) 181 if err != nil { 182 return azblob.ContainerURL{}, err 183 } 184 185 azPipeline, err := b.newPipeline() 186 if err != nil { 187 return azblob.ContainerURL{}, err 188 } 189 190 return azblob.NewContainerURL(*u, azPipeline), nil 191} 192 193func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) { 194 credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value) 195 if err != nil { 196 return nil, err 197 } 198 199 return azblob.NewPipeline(credential, azblob.PipelineOptions{ 200 Retry: azblob.RetryOptions{ 201 Policy: azblob.RetryPolicyExponential, 202 MaxTries: (int32)(b.cfg.MaxRetries), 203 TryTimeout: b.cfg.RequestTimeout, 204 RetryDelay: b.cfg.MinRetryDelay, 205 MaxRetryDelay: b.cfg.MaxRetryDelay, 206 }, 207 }), nil 208} 209 210// List implements chunk.ObjectClient. 211func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { 212 var storageObjects []chunk.StorageObject 213 var commonPrefixes []chunk.StorageCommonPrefix 214 215 for marker := (azblob.Marker{}); marker.NotDone(); { 216 if ctx.Err() != nil { 217 return nil, nil, ctx.Err() 218 } 219 220 listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix}) 221 if err != nil { 222 return nil, nil, err 223 } 224 225 marker = listBlob.NextMarker 226 227 // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) 228 for _, blobInfo := range listBlob.Segment.BlobItems { 229 storageObjects = append(storageObjects, chunk.StorageObject{ 230 Key: blobInfo.Name, 231 ModifiedAt: blobInfo.Properties.LastModified, 232 }) 233 } 234 235 // Process the BlobPrefixes so called commonPrefixes or synthetic directories in the listed synthetic directory 236 for _, blobPrefix := range listBlob.Segment.BlobPrefixes { 237 commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(blobPrefix.Name)) 238 } 239 } 240 241 return storageObjects, commonPrefixes, nil 242} 243 244func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error { 245 blockBlobURL, err := b.getBlobURL(blobID) 246 if err != nil { 247 return err 248 } 249 250 _, err = blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}) 251 return err 252} 253 254// Validate the config. 255func (c *BlobStorageConfig) Validate() error { 256 if !util.StringsContain(supportedEnvironments, c.Environment) { 257 return fmt.Errorf("unsupported Azure blob storage environment: %s, please select one of: %s ", c.Environment, strings.Join(supportedEnvironments, ", ")) 258 } 259 return nil 260} 261 262func (b *BlobStorage) selectBlobURLFmt() string { 263 return endpoints[b.cfg.Environment].blobURLFmt 264} 265 266func (b *BlobStorage) selectContainerURLFmt() string { 267 return endpoints[b.cfg.Environment].containerURLFmt 268} 269 270// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. 271func (b *BlobStorage) IsObjectNotFoundErr(err error) bool { 272 var e azblob.StorageError 273 if errors.As(err, &e) && e.ServiceCode() == azblob.ServiceCodeBlobNotFound { 274 return true 275 } 276 277 return false 278} 279