1package azure 2 3import ( 4 "context" 5 "encoding/base64" 6 "io" 7 "net/http" 8 "os" 9 "path" 10 "strings" 11 12 "github.com/restic/restic/internal/backend" 13 "github.com/restic/restic/internal/debug" 14 "github.com/restic/restic/internal/errors" 15 "github.com/restic/restic/internal/restic" 16 17 "github.com/Azure/azure-sdk-for-go/storage" 18 "github.com/cenkalti/backoff/v4" 19) 20 21// Backend stores data on an azure endpoint. 22type Backend struct { 23 accountName string 24 container *storage.Container 25 sem *backend.Semaphore 26 prefix string 27 listMaxItems int 28 backend.Layout 29} 30 31const defaultListMaxItems = 5000 32 33// make sure that *Backend implements backend.Backend 34var _ restic.Backend = &Backend{} 35 36func open(cfg Config, rt http.RoundTripper) (*Backend, error) { 37 debug.Log("open, config %#v", cfg) 38 39 client, err := storage.NewBasicClient(cfg.AccountName, cfg.AccountKey) 40 if err != nil { 41 return nil, errors.Wrap(err, "NewBasicClient") 42 } 43 44 client.HTTPClient = &http.Client{Transport: rt} 45 46 service := client.GetBlobService() 47 48 sem, err := backend.NewSemaphore(cfg.Connections) 49 if err != nil { 50 return nil, err 51 } 52 53 be := &Backend{ 54 container: service.GetContainerReference(cfg.Container), 55 accountName: cfg.AccountName, 56 sem: sem, 57 prefix: cfg.Prefix, 58 Layout: &backend.DefaultLayout{ 59 Path: cfg.Prefix, 60 Join: path.Join, 61 }, 62 listMaxItems: defaultListMaxItems, 63 } 64 65 return be, nil 66} 67 68// Open opens the Azure backend at specified container. 69func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { 70 return open(cfg, rt) 71} 72 73// Create opens the Azure backend at specified container and creates the container if 74// it does not exist yet. 75func Create(cfg Config, rt http.RoundTripper) (*Backend, error) { 76 be, err := open(cfg, rt) 77 78 if err != nil { 79 return nil, errors.Wrap(err, "open") 80 } 81 82 options := storage.CreateContainerOptions{ 83 Access: storage.ContainerAccessTypePrivate, 84 } 85 86 _, err = be.container.CreateIfNotExists(&options) 87 if err != nil { 88 return nil, errors.Wrap(err, "container.CreateIfNotExists") 89 } 90 91 return be, nil 92} 93 94// SetListMaxItems sets the number of list items to load per request. 95func (be *Backend) SetListMaxItems(i int) { 96 be.listMaxItems = i 97} 98 99// IsNotExist returns true if the error is caused by a not existing file. 100func (be *Backend) IsNotExist(err error) bool { 101 debug.Log("IsNotExist(%T, %#v)", err, err) 102 return os.IsNotExist(err) 103} 104 105// Join combines path components with slashes. 106func (be *Backend) Join(p ...string) string { 107 return path.Join(p...) 108} 109 110// Location returns this backend's location (the container name). 111func (be *Backend) Location() string { 112 return be.Join(be.container.Name, be.prefix) 113} 114 115// Path returns the path in the bucket that is used for this backend. 116func (be *Backend) Path() string { 117 return be.prefix 118} 119 120type azureAdapter struct { 121 restic.RewindReader 122} 123 124func (azureAdapter) Close() error { return nil } 125 126func (a azureAdapter) Len() int { 127 return int(a.Length()) 128} 129 130// Save stores data in the backend at the handle. 131func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { 132 if err := h.Valid(); err != nil { 133 return backoff.Permanent(err) 134 } 135 136 objName := be.Filename(h) 137 138 debug.Log("Save %v at %v", h, objName) 139 140 be.sem.GetToken() 141 142 debug.Log("InsertObject(%v, %v)", be.container.Name, objName) 143 144 var err error 145 if rd.Length() < 256*1024*1024 { 146 // wrap the reader so that net/http client cannot close the reader 147 // CreateBlockBlobFromReader reads length from `Len()`` 148 dataReader := azureAdapter{rd} 149 150 // if it's smaller than 256miB, then just create the file directly from the reader 151 err = be.container.GetBlobReference(objName).CreateBlockBlobFromReader(dataReader, nil) 152 } else { 153 // otherwise use the more complicated method 154 err = be.saveLarge(ctx, objName, rd) 155 156 } 157 158 be.sem.ReleaseToken() 159 debug.Log("%v, err %#v", objName, err) 160 161 return errors.Wrap(err, "CreateBlockBlobFromReader") 162} 163 164func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.RewindReader) error { 165 // create the file on the server 166 file := be.container.GetBlobReference(objName) 167 err := file.CreateBlockBlob(nil) 168 if err != nil { 169 return errors.Wrap(err, "CreateBlockBlob") 170 } 171 172 // read the data, in 100 MiB chunks 173 buf := make([]byte, 100*1024*1024) 174 var blocks []storage.Block 175 uploadedBytes := 0 176 177 for { 178 n, err := io.ReadFull(rd, buf) 179 if err == io.ErrUnexpectedEOF { 180 err = nil 181 } 182 if err == io.EOF { 183 // end of file reached, no bytes have been read at all 184 break 185 } 186 187 if err != nil { 188 return errors.Wrap(err, "ReadFull") 189 } 190 191 buf = buf[:n] 192 uploadedBytes += n 193 194 // upload it as a new "block", use the base64 hash for the ID 195 h := restic.Hash(buf) 196 id := base64.StdEncoding.EncodeToString(h[:]) 197 debug.Log("PutBlock %v with %d bytes", id, len(buf)) 198 err = file.PutBlock(id, buf, nil) 199 if err != nil { 200 return errors.Wrap(err, "PutBlock") 201 } 202 203 blocks = append(blocks, storage.Block{ 204 ID: id, 205 Status: "Uncommitted", 206 }) 207 } 208 209 // sanity check 210 if uploadedBytes != int(rd.Length()) { 211 return errors.Errorf("wrote %d bytes instead of the expected %d bytes", uploadedBytes, rd.Length()) 212 } 213 214 debug.Log("uploaded %d parts: %v", len(blocks), blocks) 215 err = file.PutBlockList(blocks, nil) 216 debug.Log("PutBlockList returned %v", err) 217 return errors.Wrap(err, "PutBlockList") 218} 219 220// wrapReader wraps an io.ReadCloser to run an additional function on Close. 221type wrapReader struct { 222 io.ReadCloser 223 f func() 224} 225 226func (wr wrapReader) Close() error { 227 err := wr.ReadCloser.Close() 228 wr.f() 229 return err 230} 231 232// Load runs fn with a reader that yields the contents of the file at h at the 233// given offset. 234func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { 235 return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) 236} 237 238func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { 239 debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) 240 if err := h.Valid(); err != nil { 241 return nil, backoff.Permanent(err) 242 } 243 244 if offset < 0 { 245 return nil, errors.New("offset is negative") 246 } 247 248 if length < 0 { 249 return nil, errors.Errorf("invalid length %d", length) 250 } 251 252 objName := be.Filename(h) 253 blob := be.container.GetBlobReference(objName) 254 255 start := uint64(offset) 256 var end uint64 257 258 if length > 0 { 259 end = uint64(offset + int64(length) - 1) 260 } else { 261 end = 0 262 } 263 264 be.sem.GetToken() 265 266 rd, err := blob.GetRange(&storage.GetBlobRangeOptions{Range: &storage.BlobRange{Start: start, End: end}}) 267 if err != nil { 268 be.sem.ReleaseToken() 269 return nil, err 270 } 271 272 closeRd := wrapReader{ 273 ReadCloser: rd, 274 f: func() { 275 debug.Log("Close()") 276 be.sem.ReleaseToken() 277 }, 278 } 279 280 return closeRd, err 281} 282 283// Stat returns information about a blob. 284func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { 285 debug.Log("%v", h) 286 287 objName := be.Filename(h) 288 blob := be.container.GetBlobReference(objName) 289 290 be.sem.GetToken() 291 err := blob.GetProperties(nil) 292 be.sem.ReleaseToken() 293 294 if err != nil { 295 debug.Log("blob.GetProperties err %v", err) 296 return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") 297 } 298 299 fi := restic.FileInfo{ 300 Size: int64(blob.Properties.ContentLength), 301 Name: h.Name, 302 } 303 return fi, nil 304} 305 306// Test returns true if a blob of the given type and name exists in the backend. 307func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { 308 objName := be.Filename(h) 309 310 be.sem.GetToken() 311 found, err := be.container.GetBlobReference(objName).Exists() 312 be.sem.ReleaseToken() 313 314 if err != nil { 315 return false, err 316 } 317 return found, nil 318} 319 320// Remove removes the blob with the given name and type. 321func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { 322 objName := be.Filename(h) 323 324 be.sem.GetToken() 325 _, err := be.container.GetBlobReference(objName).DeleteIfExists(nil) 326 be.sem.ReleaseToken() 327 328 debug.Log("Remove(%v) at %v -> err %v", h, objName, err) 329 return errors.Wrap(err, "client.RemoveObject") 330} 331 332// List runs fn for each file in the backend which has the type t. When an 333// error occurs (or fn returns an error), List stops and returns it. 334func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { 335 debug.Log("listing %v", t) 336 337 prefix, _ := be.Basedir(t) 338 339 // make sure prefix ends with a slash 340 if !strings.HasSuffix(prefix, "/") { 341 prefix += "/" 342 } 343 344 params := storage.ListBlobsParameters{ 345 MaxResults: uint(be.listMaxItems), 346 Prefix: prefix, 347 } 348 349 for { 350 be.sem.GetToken() 351 obj, err := be.container.ListBlobs(params) 352 be.sem.ReleaseToken() 353 354 if err != nil { 355 return err 356 } 357 358 debug.Log("got %v objects", len(obj.Blobs)) 359 360 for _, item := range obj.Blobs { 361 m := strings.TrimPrefix(item.Name, prefix) 362 if m == "" { 363 continue 364 } 365 366 fi := restic.FileInfo{ 367 Name: path.Base(m), 368 Size: item.Properties.ContentLength, 369 } 370 371 if ctx.Err() != nil { 372 return ctx.Err() 373 } 374 375 err := fn(fi) 376 if err != nil { 377 return err 378 } 379 380 if ctx.Err() != nil { 381 return ctx.Err() 382 } 383 384 } 385 386 if obj.NextMarker == "" { 387 break 388 } 389 params.Marker = obj.NextMarker 390 } 391 392 return ctx.Err() 393} 394 395// Remove keys for a specified backend type. 396func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { 397 return be.List(ctx, t, func(fi restic.FileInfo) error { 398 return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) 399 }) 400} 401 402// Delete removes all restic keys in the bucket. It will not remove the bucket itself. 403func (be *Backend) Delete(ctx context.Context) error { 404 alltypes := []restic.FileType{ 405 restic.PackFile, 406 restic.KeyFile, 407 restic.LockFile, 408 restic.SnapshotFile, 409 restic.IndexFile} 410 411 for _, t := range alltypes { 412 err := be.removeKeys(ctx, t) 413 if err != nil { 414 return nil 415 } 416 } 417 418 return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) 419} 420 421// Close does nothing 422func (be *Backend) Close() error { return nil } 423