1package azure
2
3import (
4	"context"
5	"encoding/base64"
6	"io"
7	"net/http"
8	"os"
9	"path"
10	"strings"
11
12	"github.com/restic/restic/internal/backend"
13	"github.com/restic/restic/internal/debug"
14	"github.com/restic/restic/internal/errors"
15	"github.com/restic/restic/internal/restic"
16
17	"github.com/Azure/azure-sdk-for-go/storage"
18	"github.com/cenkalti/backoff/v4"
19)
20
21// Backend stores data on an azure endpoint.
22type Backend struct {
23	accountName  string
24	container    *storage.Container
25	sem          *backend.Semaphore
26	prefix       string
27	listMaxItems int
28	backend.Layout
29}
30
31const defaultListMaxItems = 5000
32
33// make sure that *Backend implements backend.Backend
34var _ restic.Backend = &Backend{}
35
36func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
37	debug.Log("open, config %#v", cfg)
38
39	client, err := storage.NewBasicClient(cfg.AccountName, cfg.AccountKey)
40	if err != nil {
41		return nil, errors.Wrap(err, "NewBasicClient")
42	}
43
44	client.HTTPClient = &http.Client{Transport: rt}
45
46	service := client.GetBlobService()
47
48	sem, err := backend.NewSemaphore(cfg.Connections)
49	if err != nil {
50		return nil, err
51	}
52
53	be := &Backend{
54		container:   service.GetContainerReference(cfg.Container),
55		accountName: cfg.AccountName,
56		sem:         sem,
57		prefix:      cfg.Prefix,
58		Layout: &backend.DefaultLayout{
59			Path: cfg.Prefix,
60			Join: path.Join,
61		},
62		listMaxItems: defaultListMaxItems,
63	}
64
65	return be, nil
66}
67
68// Open opens the Azure backend at specified container.
69func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
70	return open(cfg, rt)
71}
72
73// Create opens the Azure backend at specified container and creates the container if
74// it does not exist yet.
75func Create(cfg Config, rt http.RoundTripper) (*Backend, error) {
76	be, err := open(cfg, rt)
77
78	if err != nil {
79		return nil, errors.Wrap(err, "open")
80	}
81
82	options := storage.CreateContainerOptions{
83		Access: storage.ContainerAccessTypePrivate,
84	}
85
86	_, err = be.container.CreateIfNotExists(&options)
87	if err != nil {
88		return nil, errors.Wrap(err, "container.CreateIfNotExists")
89	}
90
91	return be, nil
92}
93
94// SetListMaxItems sets the number of list items to load per request.
95func (be *Backend) SetListMaxItems(i int) {
96	be.listMaxItems = i
97}
98
99// IsNotExist returns true if the error is caused by a not existing file.
100func (be *Backend) IsNotExist(err error) bool {
101	debug.Log("IsNotExist(%T, %#v)", err, err)
102	return os.IsNotExist(err)
103}
104
105// Join combines path components with slashes.
106func (be *Backend) Join(p ...string) string {
107	return path.Join(p...)
108}
109
110// Location returns this backend's location (the container name).
111func (be *Backend) Location() string {
112	return be.Join(be.container.Name, be.prefix)
113}
114
115// Path returns the path in the bucket that is used for this backend.
116func (be *Backend) Path() string {
117	return be.prefix
118}
119
120type azureAdapter struct {
121	restic.RewindReader
122}
123
124func (azureAdapter) Close() error { return nil }
125
126func (a azureAdapter) Len() int {
127	return int(a.Length())
128}
129
130// Save stores data in the backend at the handle.
131func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
132	if err := h.Valid(); err != nil {
133		return backoff.Permanent(err)
134	}
135
136	objName := be.Filename(h)
137
138	debug.Log("Save %v at %v", h, objName)
139
140	be.sem.GetToken()
141
142	debug.Log("InsertObject(%v, %v)", be.container.Name, objName)
143
144	var err error
145	if rd.Length() < 256*1024*1024 {
146		// wrap the reader so that net/http client cannot close the reader
147		// CreateBlockBlobFromReader reads length from `Len()``
148		dataReader := azureAdapter{rd}
149
150		// if it's smaller than 256miB, then just create the file directly from the reader
151		err = be.container.GetBlobReference(objName).CreateBlockBlobFromReader(dataReader, nil)
152	} else {
153		// otherwise use the more complicated method
154		err = be.saveLarge(ctx, objName, rd)
155
156	}
157
158	be.sem.ReleaseToken()
159	debug.Log("%v, err %#v", objName, err)
160
161	return errors.Wrap(err, "CreateBlockBlobFromReader")
162}
163
164func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.RewindReader) error {
165	// create the file on the server
166	file := be.container.GetBlobReference(objName)
167	err := file.CreateBlockBlob(nil)
168	if err != nil {
169		return errors.Wrap(err, "CreateBlockBlob")
170	}
171
172	// read the data, in 100 MiB chunks
173	buf := make([]byte, 100*1024*1024)
174	var blocks []storage.Block
175	uploadedBytes := 0
176
177	for {
178		n, err := io.ReadFull(rd, buf)
179		if err == io.ErrUnexpectedEOF {
180			err = nil
181		}
182		if err == io.EOF {
183			// end of file reached, no bytes have been read at all
184			break
185		}
186
187		if err != nil {
188			return errors.Wrap(err, "ReadFull")
189		}
190
191		buf = buf[:n]
192		uploadedBytes += n
193
194		// upload it as a new "block", use the base64 hash for the ID
195		h := restic.Hash(buf)
196		id := base64.StdEncoding.EncodeToString(h[:])
197		debug.Log("PutBlock %v with %d bytes", id, len(buf))
198		err = file.PutBlock(id, buf, nil)
199		if err != nil {
200			return errors.Wrap(err, "PutBlock")
201		}
202
203		blocks = append(blocks, storage.Block{
204			ID:     id,
205			Status: "Uncommitted",
206		})
207	}
208
209	// sanity check
210	if uploadedBytes != int(rd.Length()) {
211		return errors.Errorf("wrote %d bytes instead of the expected %d bytes", uploadedBytes, rd.Length())
212	}
213
214	debug.Log("uploaded %d parts: %v", len(blocks), blocks)
215	err = file.PutBlockList(blocks, nil)
216	debug.Log("PutBlockList returned %v", err)
217	return errors.Wrap(err, "PutBlockList")
218}
219
220// wrapReader wraps an io.ReadCloser to run an additional function on Close.
221type wrapReader struct {
222	io.ReadCloser
223	f func()
224}
225
226func (wr wrapReader) Close() error {
227	err := wr.ReadCloser.Close()
228	wr.f()
229	return err
230}
231
232// Load runs fn with a reader that yields the contents of the file at h at the
233// given offset.
234func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
235	return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
236}
237
238func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
239	debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
240	if err := h.Valid(); err != nil {
241		return nil, backoff.Permanent(err)
242	}
243
244	if offset < 0 {
245		return nil, errors.New("offset is negative")
246	}
247
248	if length < 0 {
249		return nil, errors.Errorf("invalid length %d", length)
250	}
251
252	objName := be.Filename(h)
253	blob := be.container.GetBlobReference(objName)
254
255	start := uint64(offset)
256	var end uint64
257
258	if length > 0 {
259		end = uint64(offset + int64(length) - 1)
260	} else {
261		end = 0
262	}
263
264	be.sem.GetToken()
265
266	rd, err := blob.GetRange(&storage.GetBlobRangeOptions{Range: &storage.BlobRange{Start: start, End: end}})
267	if err != nil {
268		be.sem.ReleaseToken()
269		return nil, err
270	}
271
272	closeRd := wrapReader{
273		ReadCloser: rd,
274		f: func() {
275			debug.Log("Close()")
276			be.sem.ReleaseToken()
277		},
278	}
279
280	return closeRd, err
281}
282
283// Stat returns information about a blob.
284func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
285	debug.Log("%v", h)
286
287	objName := be.Filename(h)
288	blob := be.container.GetBlobReference(objName)
289
290	be.sem.GetToken()
291	err := blob.GetProperties(nil)
292	be.sem.ReleaseToken()
293
294	if err != nil {
295		debug.Log("blob.GetProperties err %v", err)
296		return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties")
297	}
298
299	fi := restic.FileInfo{
300		Size: int64(blob.Properties.ContentLength),
301		Name: h.Name,
302	}
303	return fi, nil
304}
305
306// Test returns true if a blob of the given type and name exists in the backend.
307func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
308	objName := be.Filename(h)
309
310	be.sem.GetToken()
311	found, err := be.container.GetBlobReference(objName).Exists()
312	be.sem.ReleaseToken()
313
314	if err != nil {
315		return false, err
316	}
317	return found, nil
318}
319
320// Remove removes the blob with the given name and type.
321func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
322	objName := be.Filename(h)
323
324	be.sem.GetToken()
325	_, err := be.container.GetBlobReference(objName).DeleteIfExists(nil)
326	be.sem.ReleaseToken()
327
328	debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
329	return errors.Wrap(err, "client.RemoveObject")
330}
331
332// List runs fn for each file in the backend which has the type t. When an
333// error occurs (or fn returns an error), List stops and returns it.
334func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
335	debug.Log("listing %v", t)
336
337	prefix, _ := be.Basedir(t)
338
339	// make sure prefix ends with a slash
340	if !strings.HasSuffix(prefix, "/") {
341		prefix += "/"
342	}
343
344	params := storage.ListBlobsParameters{
345		MaxResults: uint(be.listMaxItems),
346		Prefix:     prefix,
347	}
348
349	for {
350		be.sem.GetToken()
351		obj, err := be.container.ListBlobs(params)
352		be.sem.ReleaseToken()
353
354		if err != nil {
355			return err
356		}
357
358		debug.Log("got %v objects", len(obj.Blobs))
359
360		for _, item := range obj.Blobs {
361			m := strings.TrimPrefix(item.Name, prefix)
362			if m == "" {
363				continue
364			}
365
366			fi := restic.FileInfo{
367				Name: path.Base(m),
368				Size: item.Properties.ContentLength,
369			}
370
371			if ctx.Err() != nil {
372				return ctx.Err()
373			}
374
375			err := fn(fi)
376			if err != nil {
377				return err
378			}
379
380			if ctx.Err() != nil {
381				return ctx.Err()
382			}
383
384		}
385
386		if obj.NextMarker == "" {
387			break
388		}
389		params.Marker = obj.NextMarker
390	}
391
392	return ctx.Err()
393}
394
395// Remove keys for a specified backend type.
396func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
397	return be.List(ctx, t, func(fi restic.FileInfo) error {
398		return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
399	})
400}
401
402// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
403func (be *Backend) Delete(ctx context.Context) error {
404	alltypes := []restic.FileType{
405		restic.PackFile,
406		restic.KeyFile,
407		restic.LockFile,
408		restic.SnapshotFile,
409		restic.IndexFile}
410
411	for _, t := range alltypes {
412		err := be.removeKeys(ctx, t)
413		if err != nil {
414			return nil
415		}
416	}
417
418	return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
419}
420
421// Close does nothing
422func (be *Backend) Close() error { return nil }
423