1package azure
2
3import (
4	"context"
5	"errors"
6	"flag"
7	"fmt"
8	"io"
9	"net/url"
10	"strings"
11	"time"
12
13	"github.com/Azure/azure-pipeline-go/pipeline"
14	"github.com/Azure/azure-storage-blob-go/azblob"
15
16	"github.com/cortexproject/cortex/pkg/util"
17	"github.com/cortexproject/cortex/pkg/util/log"
18	"github.com/grafana/dskit/flagext"
19
20	"github.com/grafana/loki/pkg/storage/chunk"
21	chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
22)
23
24const (
25	// Environment
26	azureGlobal       = "AzureGlobal"
27	azureChinaCloud   = "AzureChinaCloud"
28	azureGermanCloud  = "AzureGermanCloud"
29	azureUSGovernment = "AzureUSGovernment"
30)
31
32var (
33	supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment}
34	noClientKey           = azblob.ClientProvidedKeyOptions{}
35	endpoints             = map[string]struct{ blobURLFmt, containerURLFmt string }{
36		azureGlobal: {
37			"https://%s.blob.core.windows.net/%s/%s",
38			"https://%s.blob.core.windows.net/%s",
39		},
40		azureChinaCloud: {
41			"https://%s.blob.core.chinacloudapi.cn/%s/%s",
42			"https://%s.blob.core.chinacloudapi.cn/%s",
43		},
44		azureGermanCloud: {
45			"https://%s.blob.core.cloudapi.de/%s/%s",
46			"https://%s.blob.core.cloudapi.de/%s",
47		},
48		azureUSGovernment: {
49			"https://%s.blob.core.usgovcloudapi.net/%s/%s",
50			"https://%s.blob.core.usgovcloudapi.net/%s",
51		},
52	}
53)
54
55// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
56type BlobStorageConfig struct {
57	Environment        string         `yaml:"environment"`
58	ContainerName      string         `yaml:"container_name"`
59	AccountName        string         `yaml:"account_name"`
60	AccountKey         flagext.Secret `yaml:"account_key"`
61	DownloadBufferSize int            `yaml:"download_buffer_size"`
62	UploadBufferSize   int            `yaml:"upload_buffer_size"`
63	UploadBufferCount  int            `yaml:"upload_buffer_count"`
64	RequestTimeout     time.Duration  `yaml:"request_timeout"`
65	MaxRetries         int            `yaml:"max_retries"`
66	MinRetryDelay      time.Duration  `yaml:"min_retry_delay"`
67	MaxRetryDelay      time.Duration  `yaml:"max_retry_delay"`
68}
69
70// RegisterFlags adds the flags required to config this to the given FlagSet
71func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) {
72	c.RegisterFlagsWithPrefix("", f)
73}
74
75// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
76func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
77	f.StringVar(&c.Environment, prefix+"azure.environment", azureGlobal, fmt.Sprintf("Azure Cloud environment. Supported values are: %s.", strings.Join(supportedEnvironments, ", ")))
78	f.StringVar(&c.ContainerName, prefix+"azure.container-name", "cortex", "Name of the blob container used to store chunks. This container must be created before running cortex.")
79	f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used")
80	f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.")
81	f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.")
82	f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads.")
83	f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for uploads.")
84	f.IntVar(&c.UploadBufferCount, prefix+"azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk.")
85	f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.")
86	f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
87	f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
88}
89
90// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks.
91// Implements ObjectStorage
92type BlobStorage struct {
93	// blobService storage.Serv
94	cfg          *BlobStorageConfig
95	containerURL azblob.ContainerURL
96}
97
98// NewBlobStorage creates a new instance of the BlobStorage struct.
99func NewBlobStorage(cfg *BlobStorageConfig) (*BlobStorage, error) {
100	log.WarnExperimentalUse("Azure Blob Storage")
101	blobStorage := &BlobStorage{
102		cfg: cfg,
103	}
104
105	var err error
106	blobStorage.containerURL, err = blobStorage.buildContainerURL()
107	if err != nil {
108		return nil, err
109	}
110
111	return blobStorage, nil
112}
113
114// Stop is a no op, as there are no background workers with this driver currently
115func (b *BlobStorage) Stop() {}
116
117func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
118	var cancel context.CancelFunc = func() {}
119	if b.cfg.RequestTimeout > 0 {
120		ctx, cancel = context.WithTimeout(ctx, b.cfg.RequestTimeout)
121	}
122
123	rc, err := b.getObject(ctx, objectKey)
124	if err != nil {
125		// cancel the context if there is an error.
126		cancel()
127		return nil, err
128	}
129	// else return a wrapped ReadCloser which cancels the context while closing the reader.
130	return chunk_util.NewReadCloserWithContextCancelFunc(rc, cancel), nil
131}
132
133func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) {
134	blockBlobURL, err := b.getBlobURL(objectKey)
135	if err != nil {
136		return nil, err
137	}
138
139	// Request access to the blob
140	downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, noClientKey)
141	if err != nil {
142		return nil, err
143	}
144
145	return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: b.cfg.MaxRetries}), nil
146}
147
148func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
149	blockBlobURL, err := b.getBlobURL(objectKey)
150	if err != nil {
151		return err
152	}
153
154	bufferSize := b.cfg.UploadBufferSize
155	maxBuffers := b.cfg.UploadBufferCount
156	_, err = azblob.UploadStreamToBlockBlob(ctx, object, blockBlobURL,
157		azblob.UploadStreamToBlockBlobOptions{BufferSize: bufferSize, MaxBuffers: maxBuffers})
158
159	return err
160}
161
162func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) {
163	blobID = strings.Replace(blobID, ":", "-", -1)
164
165	// generate url for new chunk blob
166	u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.AccountName, b.cfg.ContainerName, blobID))
167	if err != nil {
168		return azblob.BlockBlobURL{}, err
169	}
170
171	azPipeline, err := b.newPipeline()
172	if err != nil {
173		return azblob.BlockBlobURL{}, err
174	}
175
176	return azblob.NewBlockBlobURL(*u, azPipeline), nil
177}
178
179func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
180	u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.AccountName, b.cfg.ContainerName))
181	if err != nil {
182		return azblob.ContainerURL{}, err
183	}
184
185	azPipeline, err := b.newPipeline()
186	if err != nil {
187		return azblob.ContainerURL{}, err
188	}
189
190	return azblob.NewContainerURL(*u, azPipeline), nil
191}
192
193func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) {
194	credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
195	if err != nil {
196		return nil, err
197	}
198
199	return azblob.NewPipeline(credential, azblob.PipelineOptions{
200		Retry: azblob.RetryOptions{
201			Policy:        azblob.RetryPolicyExponential,
202			MaxTries:      (int32)(b.cfg.MaxRetries),
203			TryTimeout:    b.cfg.RequestTimeout,
204			RetryDelay:    b.cfg.MinRetryDelay,
205			MaxRetryDelay: b.cfg.MaxRetryDelay,
206		},
207	}), nil
208}
209
210// List implements chunk.ObjectClient.
211func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
212	var storageObjects []chunk.StorageObject
213	var commonPrefixes []chunk.StorageCommonPrefix
214
215	for marker := (azblob.Marker{}); marker.NotDone(); {
216		if ctx.Err() != nil {
217			return nil, nil, ctx.Err()
218		}
219
220		listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix})
221		if err != nil {
222			return nil, nil, err
223		}
224
225		marker = listBlob.NextMarker
226
227		// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
228		for _, blobInfo := range listBlob.Segment.BlobItems {
229			storageObjects = append(storageObjects, chunk.StorageObject{
230				Key:        blobInfo.Name,
231				ModifiedAt: blobInfo.Properties.LastModified,
232			})
233		}
234
235		// Process the BlobPrefixes so called commonPrefixes or synthetic directories in the listed synthetic directory
236		for _, blobPrefix := range listBlob.Segment.BlobPrefixes {
237			commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(blobPrefix.Name))
238		}
239	}
240
241	return storageObjects, commonPrefixes, nil
242}
243
244func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error {
245	blockBlobURL, err := b.getBlobURL(blobID)
246	if err != nil {
247		return err
248	}
249
250	_, err = blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
251	return err
252}
253
254// Validate the config.
255func (c *BlobStorageConfig) Validate() error {
256	if !util.StringsContain(supportedEnvironments, c.Environment) {
257		return fmt.Errorf("unsupported Azure blob storage environment: %s, please select one of: %s ", c.Environment, strings.Join(supportedEnvironments, ", "))
258	}
259	return nil
260}
261
262func (b *BlobStorage) selectBlobURLFmt() string {
263	return endpoints[b.cfg.Environment].blobURLFmt
264}
265
266func (b *BlobStorage) selectContainerURLFmt() string {
267	return endpoints[b.cfg.Environment].containerURLFmt
268}
269
270// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
271func (b *BlobStorage) IsObjectNotFoundErr(err error) bool {
272	var e azblob.StorageError
273	if errors.As(err, &e) && e.ServiceCode() == azblob.ServiceCodeBlobNotFound {
274		return true
275	}
276
277	return false
278}
279