1// 2// Copyright (c) 2018, Joyent, Inc. All rights reserved. 3// 4// This Source Code Form is subject to the terms of the Mozilla Public 5// License, v. 2.0. If a copy of the MPL was not distributed with this 6// file, You can obtain one at http://mozilla.org/MPL/2.0/. 7// 8 9package storage 10 11import ( 12 "context" 13 "encoding/json" 14 "io" 15 "net/http" 16 "net/url" 17 "path" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/joyent/triton-go/client" 23 tt "github.com/joyent/triton-go/errors" 24 "github.com/pkg/errors" 25) 26 27type ObjectsClient struct { 28 client *client.Client 29} 30 31// AbortMpuInput represents parameters to an AbortMpu operation 32type AbortMpuInput struct { 33 PartsDirectoryPath string 34} 35 36func (s *ObjectsClient) AbortMultipartUpload(ctx context.Context, input *AbortMpuInput) error { 37 return abortMpu(*s, ctx, input) 38} 39 40// CommitMpuInput represents parameters to a CommitMpu operation 41type CommitMpuInput struct { 42 Id string 43 Headers map[string]string 44 Body CommitMpuBody 45} 46 47// CommitMpuBody represents the body of a CommitMpu request 48type CommitMpuBody struct { 49 Parts []string `json:"parts"` 50} 51 52func (s *ObjectsClient) CommitMultipartUpload(ctx context.Context, input *CommitMpuInput) error { 53 return commitMpu(*s, ctx, input) 54} 55 56// CreateMpuInput represents parameters to a CreateMpu operation. 57type CreateMpuInput struct { 58 Body CreateMpuBody 59 ContentLength uint64 60 ContentMD5 string 61 DurabilityLevel uint64 62 ForceInsert bool //Force the creation of the directory tree 63} 64 65// CreateMpuOutput represents the response from a CreateMpu operation 66type CreateMpuOutput struct { 67 Id string `json:"id"` 68 PartsDirectory string `json:"partsDirectory"` 69} 70 71// CreateMpuBody represents the body of a CreateMpu request. 72type CreateMpuBody struct { 73 ObjectPath string `json:"objectPath"` 74 Headers map[string]string `json:"headers,omitempty"` 75} 76 77func (s *ObjectsClient) CreateMultipartUpload(ctx context.Context, input *CreateMpuInput) (*CreateMpuOutput, error) { 78 return createMpu(*s, ctx, input) 79} 80 81// GetObjectInput represents parameters to a GetObject operation. 82type GetInfoInput struct { 83 ObjectPath string 84 Headers map[string]string 85} 86 87// GetObjectOutput contains the outputs for a GetObject operation. It is your 88// responsibility to ensure that the io.ReadCloser ObjectReader is closed. 89type GetInfoOutput struct { 90 ContentLength uint64 91 ContentType string 92 LastModified time.Time 93 ContentMD5 string 94 ETag string 95 Metadata map[string]string 96} 97 98// GetInfo sends a HEAD request to an object in the Manta service. This function 99// does not return a response body. 100func (s *ObjectsClient) GetInfo(ctx context.Context, input *GetInfoInput) (*GetInfoOutput, error) { 101 absPath := absFileInput(s.client.AccountName, input.ObjectPath) 102 103 headers := &http.Header{} 104 for key, value := range input.Headers { 105 headers.Set(key, value) 106 } 107 108 reqInput := client.RequestInput{ 109 Method: http.MethodHead, 110 Path: string(absPath), 111 Headers: headers, 112 } 113 _, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput) 114 if err != nil { 115 return nil, errors.Wrap(err, "unable to get info") 116 } 117 118 response := &GetInfoOutput{ 119 ContentType: respHeaders.Get("Content-Type"), 120 ContentMD5: respHeaders.Get("Content-MD5"), 121 ETag: respHeaders.Get("Etag"), 122 } 123 124 lastModified, err := time.Parse(time.RFC1123, respHeaders.Get("Last-Modified")) 125 if err == nil { 126 response.LastModified = lastModified 127 } 128 129 contentLength, err := strconv.ParseUint(respHeaders.Get("Content-Length"), 10, 64) 130 if err == nil { 131 response.ContentLength = contentLength 132 } 133 134 metadata := map[string]string{} 135 for key, values := range respHeaders { 136 if strings.HasPrefix(key, "m-") { 137 metadata[key] = strings.Join(values, ", ") 138 } 139 } 140 response.Metadata = metadata 141 142 return response, nil 143} 144 145// IsDir is a convenience wrapper around the GetInfo function which takes an 146// ObjectPath and returns a boolean whether or not the object is a directory 147// type in Manta. Returns an error if GetInfo failed upstream for some reason. 148func (s *ObjectsClient) IsDir(ctx context.Context, objectPath string) (bool, error) { 149 info, err := s.GetInfo(ctx, &GetInfoInput{ 150 ObjectPath: objectPath, 151 }) 152 if err != nil { 153 return false, err 154 } 155 if info != nil { 156 return strings.HasSuffix(info.ContentType, "type=directory"), nil 157 } 158 return false, nil 159} 160 161// GetObjectInput represents parameters to a GetObject operation. 162type GetObjectInput struct { 163 ObjectPath string 164 Headers map[string]string 165} 166 167// GetObjectOutput contains the outputs for a GetObject operation. It is your 168// responsibility to ensure that the io.ReadCloser ObjectReader is closed. 169type GetObjectOutput struct { 170 ContentLength uint64 171 ContentType string 172 LastModified time.Time 173 ContentMD5 string 174 ETag string 175 Metadata map[string]string 176 ObjectReader io.ReadCloser 177} 178 179// Get retrieves an object from the Manta service. If error is nil (i.e. the 180// call returns successfully), it is your responsibility to close the 181// io.ReadCloser named ObjectReader in the operation output. 182func (s *ObjectsClient) Get(ctx context.Context, input *GetObjectInput) (*GetObjectOutput, error) { 183 absPath := absFileInput(s.client.AccountName, input.ObjectPath) 184 185 headers := &http.Header{} 186 for key, value := range input.Headers { 187 headers.Set(key, value) 188 } 189 190 reqInput := client.RequestInput{ 191 Method: http.MethodGet, 192 Path: string(absPath), 193 Headers: headers, 194 } 195 respBody, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput) 196 if err != nil { 197 return nil, errors.Wrap(err, "unable to get object") 198 } 199 200 response := &GetObjectOutput{ 201 ContentType: respHeaders.Get("Content-Type"), 202 ContentMD5: respHeaders.Get("Content-MD5"), 203 ETag: respHeaders.Get("Etag"), 204 ObjectReader: respBody, 205 } 206 207 lastModified, err := time.Parse(time.RFC1123, respHeaders.Get("Last-Modified")) 208 if err == nil { 209 response.LastModified = lastModified 210 } 211 212 contentLength, err := strconv.ParseUint(respHeaders.Get("Content-Length"), 10, 64) 213 if err == nil { 214 response.ContentLength = contentLength 215 } 216 217 metadata := map[string]string{} 218 for key, values := range respHeaders { 219 if strings.HasPrefix(key, "m-") { 220 metadata[key] = strings.Join(values, ", ") 221 } 222 } 223 response.Metadata = metadata 224 225 return response, nil 226} 227 228// DeleteObjectInput represents parameters to a DeleteObject operation. 229type DeleteObjectInput struct { 230 ObjectPath string 231 Headers map[string]string 232} 233 234// DeleteObject deletes an object. 235func (s *ObjectsClient) Delete(ctx context.Context, input *DeleteObjectInput) error { 236 absPath := absFileInput(s.client.AccountName, input.ObjectPath) 237 238 headers := &http.Header{} 239 for key, value := range input.Headers { 240 headers.Set(key, value) 241 } 242 243 reqInput := client.RequestInput{ 244 Method: http.MethodDelete, 245 Path: string(absPath), 246 Headers: headers, 247 } 248 respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput) 249 if respBody != nil { 250 defer respBody.Close() 251 } 252 if err != nil { 253 return errors.Wrap(err, "unable to delete object") 254 } 255 256 return nil 257} 258 259// GetMpuInput represents parameters to a GetMpu operation 260type GetMpuInput struct { 261 PartsDirectoryPath string 262} 263 264type GetMpuHeaders struct { 265 ContentLength int64 `json:"content-length"` 266 ContentMd5 string `json:"content-md5"` 267} 268 269type GetMpuOutput struct { 270 Id string `json:"id"` 271 State string `json:"state"` 272 PartsDirectory string `json:"partsDirectory"` 273 TargetObject string `json:"targetObject"` 274 Headers GetMpuHeaders `json:"headers"` 275 NumCopies int64 `json:"numCopies"` 276 CreationTimeMs int64 `json:"creationTimeMs"` 277} 278 279func (s *ObjectsClient) GetMultipartUpload(ctx context.Context, input *GetMpuInput) (*GetMpuOutput, error) { 280 return getMpu(*s, ctx, input) 281} 282 283type ListMpuPartsInput struct { 284 Id string 285} 286 287type ListMpuPart struct { 288 ETag string 289 PartNumber int 290 Size int64 291} 292 293type ListMpuPartsOutput struct { 294 Parts []ListMpuPart 295} 296 297func (s *ObjectsClient) ListMultipartUploadParts(ctx context.Context, input *ListMpuPartsInput) (*ListMpuPartsOutput, error) { 298 return listMpuParts(*s, ctx, input) 299} 300 301// PutObjectMetadataInput represents parameters to a PutObjectMetadata operation. 302type PutObjectMetadataInput struct { 303 ObjectPath string 304 ContentType string 305 Metadata map[string]string 306} 307 308// PutObjectMetadata allows you to overwrite the HTTP headers for an already 309// existing object, without changing the data. Note this is an idempotent "replace" 310// operation, so you must specify the complete set of HTTP headers you want 311// stored on each request. 312// 313// You cannot change "critical" headers: 314// - Content-Length 315// - Content-MD5 316// - Durability-Level 317func (s *ObjectsClient) PutMetadata(ctx context.Context, input *PutObjectMetadataInput) error { 318 absPath := absFileInput(s.client.AccountName, input.ObjectPath) 319 query := &url.Values{} 320 query.Set("metadata", "true") 321 322 headers := &http.Header{} 323 headers.Set("Content-Type", input.ContentType) 324 for key, value := range input.Metadata { 325 headers.Set(key, value) 326 } 327 328 reqInput := client.RequestInput{ 329 Method: http.MethodPut, 330 Path: string(absPath), 331 Query: query, 332 Headers: headers, 333 } 334 respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput) 335 if respBody != nil { 336 defer respBody.Close() 337 } 338 if err != nil { 339 return errors.Wrap(err, "unable to put metadata") 340 } 341 342 return nil 343} 344 345// PutObjectInput represents parameters to a PutObject operation. 346type PutObjectInput struct { 347 ObjectPath string 348 DurabilityLevel uint64 349 ContentType string 350 ContentMD5 string 351 IfMatch string 352 IfModifiedSince *time.Time 353 ContentLength uint64 354 MaxContentLength uint64 355 ObjectReader io.Reader 356 Headers map[string]string 357 ForceInsert bool //Force the creation of the directory tree 358} 359 360func (s *ObjectsClient) Put(ctx context.Context, input *PutObjectInput) error { 361 absPath := absFileInput(s.client.AccountName, input.ObjectPath) 362 if input.ForceInsert { 363 absDirName := _AbsCleanPath(path.Dir(string(absPath))) 364 exists, err := checkDirectoryTreeExists(*s, ctx, absDirName) 365 if err != nil { 366 return err 367 } 368 if !exists { 369 err := createDirectory(*s, ctx, absDirName) 370 if err != nil { 371 return err 372 } 373 return putObject(*s, ctx, input, absPath) 374 } 375 } 376 377 return putObject(*s, ctx, input, absPath) 378} 379 380// UploadPartInput represents parameters to a UploadPart operation. 381type UploadPartInput struct { 382 Id string 383 PartNum uint64 384 ContentMD5 string 385 Headers map[string]string 386 ObjectReader io.Reader 387} 388 389// UploadPartOutput represents the response from a 390type UploadPartOutput struct { 391 Part string `json:"part"` 392} 393 394func (s *ObjectsClient) UploadPart(ctx context.Context, input *UploadPartInput) (*UploadPartOutput, error) { 395 return uploadPart(*s, ctx, input) 396} 397 398// _AbsCleanPath is an internal type that means the input has been 399// path.Clean()'ed and is an absolute path. 400type _AbsCleanPath string 401 402func absFileInput(accountName, objPath string) _AbsCleanPath { 403 cleanInput := path.Clean(objPath) 404 if strings.HasPrefix(cleanInput, path.Join("/", accountName, "/")) { 405 return _AbsCleanPath(cleanInput) 406 } 407 408 cleanAbs := path.Clean(path.Join("/", accountName, objPath)) 409 return _AbsCleanPath(cleanAbs) 410} 411 412func putObject(c ObjectsClient, ctx context.Context, input *PutObjectInput, absPath _AbsCleanPath) error { 413 if input.MaxContentLength != 0 && input.ContentLength != 0 { 414 return errors.New("ContentLength and MaxContentLength may not both be set to non-zero values.") 415 } 416 417 headers := &http.Header{} 418 for key, value := range input.Headers { 419 headers.Set(key, value) 420 } 421 if input.DurabilityLevel != 0 { 422 headers.Set("Durability-Level", strconv.FormatUint(input.DurabilityLevel, 10)) 423 } 424 if input.ContentType != "" { 425 headers.Set("Content-Type", input.ContentType) 426 } 427 if input.ContentMD5 != "" { 428 headers.Set("Content-MD$", input.ContentMD5) 429 } 430 if input.IfMatch != "" { 431 headers.Set("If-Match", input.IfMatch) 432 } 433 if input.IfModifiedSince != nil { 434 headers.Set("If-Modified-Since", input.IfModifiedSince.Format(time.RFC1123)) 435 } 436 if input.ContentLength != 0 { 437 headers.Set("Content-Length", strconv.FormatUint(input.ContentLength, 10)) 438 } 439 if input.MaxContentLength != 0 { 440 headers.Set("Max-Content-Length", strconv.FormatUint(input.MaxContentLength, 10)) 441 } 442 443 reqInput := client.RequestNoEncodeInput{ 444 Method: http.MethodPut, 445 Path: string(absPath), 446 Headers: headers, 447 Body: input.ObjectReader, 448 } 449 respBody, _, err := c.client.ExecuteRequestNoEncode(ctx, reqInput) 450 if respBody != nil { 451 defer respBody.Close() 452 } 453 if err != nil { 454 return errors.Wrap(err, "unable to put object") 455 } 456 457 return nil 458} 459 460func createDirectory(c ObjectsClient, ctx context.Context, absPath _AbsCleanPath) error { 461 dirClient := &DirectoryClient{ 462 client: c.client, 463 } 464 465 // An abspath starts w/ a leading "/" which gets added to the slice as an 466 // empty string. Start all array math at 1. 467 parts := strings.Split(string(absPath), "/") 468 if len(parts) < 2 { 469 return errors.New("no path components to create directory") 470 } 471 472 folderPath := parts[1] 473 // Don't attempt to create a manta account as a directory 474 for i := 2; i < len(parts); i++ { 475 part := parts[i] 476 folderPath = path.Clean(path.Join("/", folderPath, part)) 477 err := dirClient.Put(ctx, &PutDirectoryInput{ 478 DirectoryName: folderPath, 479 }) 480 if err != nil { 481 return err 482 } 483 } 484 485 return nil 486} 487 488func abortMpu(c ObjectsClient, ctx context.Context, input *AbortMpuInput) error { 489 reqInput := client.RequestInput{ 490 Method: http.MethodPost, 491 Path: input.PartsDirectoryPath + "/abort", 492 Headers: &http.Header{}, 493 Body: nil, 494 } 495 respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput) 496 if err != nil { 497 return errors.Wrap(err, "unable to abort mpu") 498 } 499 500 if respBody != nil { 501 defer respBody.Close() 502 } 503 504 return nil 505} 506 507func commitMpu(c ObjectsClient, ctx context.Context, input *CommitMpuInput) error { 508 headers := &http.Header{} 509 for key, value := range input.Headers { 510 headers.Set(key, value) 511 } 512 513 // The mpu directory prefix length is derived from the final character 514 // in the mpu identifier which we'll call P. The mpu prefix itself is 515 // the first P characters of the mpu identifier. In order to derive the 516 // correct directory structure we need to parse this information from 517 // the mpu identifier 518 id := input.Id 519 idLength := len(id) 520 prefixLen, err := strconv.Atoi(id[idLength-1 : idLength]) 521 if err != nil { 522 return errors.Wrap(err, "unable to commit mpu due to invalid mpu prefix length") 523 } 524 prefix := id[:prefixLen] 525 partPath := "/" + c.client.AccountName + "/uploads/" + prefix + "/" + input.Id + "/commit" 526 527 reqInput := client.RequestInput{ 528 Method: http.MethodPost, 529 Path: partPath, 530 Headers: headers, 531 Body: input.Body, 532 } 533 respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput) 534 if err != nil { 535 return errors.Wrap(err, "unable to commit mpu") 536 } 537 538 if respBody != nil { 539 defer respBody.Close() 540 } 541 542 return nil 543} 544 545func createMpu(c ObjectsClient, ctx context.Context, input *CreateMpuInput) (*CreateMpuOutput, error) { 546 absPath := absFileInput(c.client.AccountName, input.Body.ObjectPath) 547 548 // Because some clients will be treating Manta like S3, they will 549 // include slashes in object names which we'll need to convert to 550 // directories 551 if input.ForceInsert { 552 absDirName := _AbsCleanPath(path.Dir(string(absPath))) 553 exists, _ := checkDirectoryTreeExists(c, ctx, absDirName) 554 if !exists { 555 err := createDirectory(c, ctx, absDirName) 556 if err != nil { 557 return nil, errors.Wrap(err, "unable to create directory for create mpu operation") 558 } 559 } 560 } 561 headers := &http.Header{} 562 for key, value := range input.Body.Headers { 563 headers.Set(key, value) 564 } 565 if input.DurabilityLevel != 0 { 566 headers.Set("Durability-Level", strconv.FormatUint(input.DurabilityLevel, 10)) 567 } 568 if input.ContentLength != 0 { 569 headers.Set("Content-Length", strconv.FormatUint(input.ContentLength, 10)) 570 } 571 if input.ContentMD5 != "" { 572 headers.Set("Content-MD5", input.ContentMD5) 573 } 574 575 input.Body.ObjectPath = string(absPath) 576 reqInput := client.RequestInput{ 577 Method: http.MethodPost, 578 Path: "/" + c.client.AccountName + "/uploads", 579 Headers: headers, 580 Body: input.Body, 581 } 582 respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput) 583 if err != nil { 584 return nil, errors.Wrap(err, "unable to create mpu") 585 } 586 if respBody != nil { 587 defer respBody.Close() 588 } 589 590 response := &CreateMpuOutput{} 591 decoder := json.NewDecoder(respBody) 592 if err = decoder.Decode(&response); err != nil { 593 return nil, errors.Wrap(err, "unable to decode create mpu response") 594 } 595 596 return response, nil 597} 598 599func getMpu(c ObjectsClient, ctx context.Context, input *GetMpuInput) (*GetMpuOutput, error) { 600 headers := &http.Header{} 601 602 reqInput := client.RequestInput{ 603 Method: http.MethodGet, 604 Path: input.PartsDirectoryPath + "/state", 605 Headers: headers, 606 } 607 respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput) 608 if err != nil { 609 return nil, errors.Wrap(err, "unable to get mpu") 610 } 611 612 response := &GetMpuOutput{} 613 decoder := json.NewDecoder(respBody) 614 if err = decoder.Decode(&response); err != nil { 615 return nil, errors.Wrap(err, "unable to decode get mpu response") 616 } 617 618 return response, nil 619} 620 621func listMpuParts(c ObjectsClient, ctx context.Context, input *ListMpuPartsInput) (*ListMpuPartsOutput, error) { 622 id := input.Id 623 idLength := len(id) 624 prefixLen, err := strconv.Atoi(id[idLength-1 : idLength]) 625 if err != nil { 626 return nil, errors.Wrap(err, "unable to upload part") 627 } 628 prefix := id[:prefixLen] 629 partPath := "/" + c.client.AccountName + "/uploads/" + prefix + "/" + input.Id + "/" 630 listDirInput := ListDirectoryInput{ 631 DirectoryName: partPath, 632 } 633 634 dirClient := &DirectoryClient{ 635 client: c.client, 636 } 637 638 listDirOutput, err := dirClient.List(ctx, &listDirInput) 639 if err != nil { 640 return nil, errors.Wrap(err, "unable to list mpu parts") 641 } 642 643 var parts []ListMpuPart 644 for num, part := range listDirOutput.Entries { 645 parts = append(parts, ListMpuPart{ 646 ETag: part.ETag, 647 PartNumber: num, 648 Size: int64(part.Size), 649 }) 650 } 651 652 listMpuPartsOutput := &ListMpuPartsOutput{ 653 Parts: parts, 654 } 655 656 return listMpuPartsOutput, nil 657} 658 659func uploadPart(c ObjectsClient, ctx context.Context, input *UploadPartInput) (*UploadPartOutput, error) { 660 headers := &http.Header{} 661 for key, value := range input.Headers { 662 headers.Set(key, value) 663 } 664 665 if input.ContentMD5 != "" { 666 headers.Set("Content-MD5", input.ContentMD5) 667 } 668 669 // The mpu directory prefix length is derived from the final character 670 // in the mpu identifier which we'll call P. The mpu prefix itself is 671 // the first P characters of the mpu identifier. In order to derive the 672 // correct directory structure we need to parse this information from 673 // the mpu identifier 674 id := input.Id 675 idLength := len(id) 676 partNum := strconv.FormatUint(input.PartNum, 10) 677 prefixLen, err := strconv.Atoi(id[idLength-1 : idLength]) 678 if err != nil { 679 return nil, errors.Wrap(err, "unable to upload part due to invalid mpu prefix length") 680 } 681 prefix := id[:prefixLen] 682 partPath := "/" + c.client.AccountName + "/uploads/" + prefix + "/" + input.Id + "/" + partNum 683 684 reqInput := client.RequestNoEncodeInput{ 685 Method: http.MethodPut, 686 Path: partPath, 687 Headers: headers, 688 Body: input.ObjectReader, 689 } 690 respBody, respHeader, err := c.client.ExecuteRequestNoEncode(ctx, reqInput) 691 if respBody != nil { 692 defer respBody.Close() 693 } 694 if err != nil { 695 return nil, errors.Wrap(err, "unable to upload part") 696 } 697 698 uploadPartOutput := &UploadPartOutput{ 699 Part: respHeader.Get("Etag"), 700 } 701 return uploadPartOutput, nil 702} 703 704func checkDirectoryTreeExists(c ObjectsClient, ctx context.Context, absPath _AbsCleanPath) (bool, error) { 705 exists, err := c.IsDir(ctx, string(absPath)) 706 if err != nil { 707 if tt.IsResourceNotFoundError(err) || tt.IsStatusNotFoundCode(err) { 708 return false, nil 709 } 710 return false, err 711 } 712 if exists { 713 return true, nil 714 } 715 716 return false, nil 717} 718