1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package metaclient 5 6import ( 7 "bytes" 8 "context" 9 "sync" 10 "time" 11 12 "github.com/spacemonkeygo/monkit/v3" 13 "github.com/zeebo/errs" 14 15 "storj.io/common/errs2" 16 "storj.io/common/macaroon" 17 "storj.io/common/pb" 18 "storj.io/common/rpc" 19 "storj.io/common/rpc/rpcstatus" 20 "storj.io/common/storj" 21 "storj.io/uplink/private/eestream" 22) 23 24var ( 25 mon = monkit.Package() 26 27 // Error is the errs class of standard metainfo errors. 28 Error = errs.Class("metaclient") 29) 30 31// Client creates a grpcClient. 32type Client struct { 33 mu sync.Mutex 34 conn *rpc.Conn 35 client pb.DRPCMetainfoClient 36 apiKeyRaw []byte 37 38 userAgent string 39} 40 41// ListItem is a single item in a listing. 42type ListItem struct { 43 Path storj.Path 44 Pointer *pb.Pointer 45 IsPrefix bool 46} 47 48// NewClient creates Metainfo API client. 49func NewClient(client pb.DRPCMetainfoClient, apiKey *macaroon.APIKey, userAgent string) *Client { 50 return &Client{ 51 client: client, 52 apiKeyRaw: apiKey.SerializeRaw(), 53 54 userAgent: userAgent, 55 } 56} 57 58// DialNodeURL dials to metainfo endpoint with the specified api key. 59func DialNodeURL(ctx context.Context, dialer rpc.Dialer, nodeURL string, apiKey *macaroon.APIKey, userAgent string) (*Client, error) { 60 url, err := storj.ParseNodeURL(nodeURL) 61 if err != nil { 62 return nil, Error.Wrap(err) 63 } 64 65 if url.ID.IsZero() { 66 return nil, Error.New("node ID is required in node URL %q", nodeURL) 67 } 68 69 conn, err := dialer.DialNodeURL(ctx, url) 70 if err != nil { 71 return nil, Error.Wrap(err) 72 } 73 74 return &Client{ 75 conn: conn, 76 client: pb.NewDRPCMetainfoClient(conn), 77 apiKeyRaw: apiKey.SerializeRaw(), 78 userAgent: userAgent, 79 }, nil 80} 81 82// Close closes the dialed connection. 83func (client *Client) Close() error { 84 client.mu.Lock() 85 defer client.mu.Unlock() 86 87 if client.conn != nil { 88 err := client.conn.Close() 89 client.conn = nil 90 return Error.Wrap(err) 91 } 92 93 return nil 94} 95 96func (client *Client) header() *pb.RequestHeader { 97 return &pb.RequestHeader{ 98 ApiKey: client.apiKeyRaw, 99 UserAgent: []byte(client.userAgent), 100 } 101} 102 103// GetProjectInfo gets the ProjectInfo for the api key associated with the metainfo client. 104func (client *Client) GetProjectInfo(ctx context.Context) (response *pb.ProjectInfoResponse, err error) { 105 defer mon.Task()(&ctx)(&err) 106 107 err = WithRetry(ctx, func(ctx context.Context) error { 108 response, err = client.client.ProjectInfo(ctx, &pb.ProjectInfoRequest{ 109 Header: client.header(), 110 }) 111 return err 112 }) 113 return response, err 114} 115 116// CreateBucketParams parameters for CreateBucket method. 117type CreateBucketParams struct { 118 Name []byte 119 120 // TODO remove those values when satellite will be adjusted 121 PathCipher storj.CipherSuite 122 PartnerID []byte 123 DefaultSegmentsSize int64 124 DefaultRedundancyScheme storj.RedundancyScheme 125 DefaultEncryptionParameters storj.EncryptionParameters 126} 127 128func (params *CreateBucketParams) toRequest(header *pb.RequestHeader) *pb.BucketCreateRequest { 129 defaultRS := params.DefaultRedundancyScheme 130 defaultEP := params.DefaultEncryptionParameters 131 132 return &pb.BucketCreateRequest{ 133 Header: header, 134 Name: params.Name, 135 PathCipher: pb.CipherSuite(params.PathCipher), 136 PartnerId: params.PartnerID, 137 DefaultSegmentSize: params.DefaultSegmentsSize, 138 DefaultRedundancyScheme: &pb.RedundancyScheme{ 139 Type: pb.RedundancyScheme_SchemeType(defaultRS.Algorithm), 140 MinReq: int32(defaultRS.RequiredShares), 141 Total: int32(defaultRS.TotalShares), 142 RepairThreshold: int32(defaultRS.RepairShares), 143 SuccessThreshold: int32(defaultRS.OptimalShares), 144 ErasureShareSize: defaultRS.ShareSize, 145 }, 146 DefaultEncryptionParameters: &pb.EncryptionParameters{ 147 CipherSuite: pb.CipherSuite(defaultEP.CipherSuite), 148 BlockSize: int64(defaultEP.BlockSize), 149 }, 150 } 151} 152 153// BatchItem returns single item for batch request. 154func (params *CreateBucketParams) BatchItem() *pb.BatchRequestItem { 155 return &pb.BatchRequestItem{ 156 Request: &pb.BatchRequestItem_BucketCreate{ 157 BucketCreate: params.toRequest(nil), 158 }, 159 } 160} 161 162// CreateBucketResponse response for CreateBucket request. 163type CreateBucketResponse struct { 164 Bucket Bucket 165} 166 167func newCreateBucketResponse(response *pb.BucketCreateResponse) (CreateBucketResponse, error) { 168 bucket, err := convertProtoToBucket(response.Bucket) 169 if err != nil { 170 return CreateBucketResponse{}, err 171 } 172 return CreateBucketResponse{ 173 Bucket: bucket, 174 }, nil 175} 176 177// CreateBucket creates a new bucket. 178func (client *Client) CreateBucket(ctx context.Context, params CreateBucketParams) (respBucket Bucket, err error) { 179 defer mon.Task()(&ctx)(&err) 180 181 var response *pb.BucketCreateResponse 182 err = WithRetry(ctx, func(ctx context.Context) error { 183 response, err = client.client.CreateBucket(ctx, params.toRequest(client.header())) 184 return err 185 }) 186 if err != nil { 187 return Bucket{}, Error.Wrap(err) 188 } 189 190 respBucket, err = convertProtoToBucket(response.Bucket) 191 if err != nil { 192 return Bucket{}, Error.Wrap(err) 193 } 194 return respBucket, nil 195} 196 197// GetBucketParams parmaters for GetBucketParams method. 198type GetBucketParams struct { 199 Name []byte 200} 201 202func (params *GetBucketParams) toRequest(header *pb.RequestHeader) *pb.BucketGetRequest { 203 return &pb.BucketGetRequest{ 204 Header: header, 205 Name: params.Name, 206 } 207} 208 209// BatchItem returns single item for batch request. 210func (params *GetBucketParams) BatchItem() *pb.BatchRequestItem { 211 return &pb.BatchRequestItem{ 212 Request: &pb.BatchRequestItem_BucketGet{ 213 BucketGet: params.toRequest(nil), 214 }, 215 } 216} 217 218// GetBucketResponse response for GetBucket request. 219type GetBucketResponse struct { 220 Bucket Bucket 221} 222 223func newGetBucketResponse(response *pb.BucketGetResponse) (GetBucketResponse, error) { 224 bucket, err := convertProtoToBucket(response.Bucket) 225 if err != nil { 226 return GetBucketResponse{}, err 227 } 228 return GetBucketResponse{ 229 Bucket: bucket, 230 }, nil 231} 232 233// GetBucket returns a bucket. 234func (client *Client) GetBucket(ctx context.Context, params GetBucketParams) (respBucket Bucket, err error) { 235 defer mon.Task()(&ctx)(&err) 236 237 var response *pb.BucketGetResponse 238 err = WithRetry(ctx, func(ctx context.Context) error { 239 // TODO(moby) make sure bucket not found is properly handled 240 response, err = client.client.GetBucket(ctx, params.toRequest(client.header())) 241 return err 242 }) 243 if err != nil { 244 if errs2.IsRPC(err, rpcstatus.NotFound) { 245 return Bucket{}, ErrBucketNotFound.Wrap(err) 246 } 247 return Bucket{}, Error.Wrap(err) 248 } 249 250 respBucket, err = convertProtoToBucket(response.Bucket) 251 if err != nil { 252 return Bucket{}, Error.Wrap(err) 253 } 254 return respBucket, nil 255} 256 257// DeleteBucketParams parmaters for DeleteBucket method. 258type DeleteBucketParams struct { 259 Name []byte 260 DeleteAll bool 261} 262 263func (params *DeleteBucketParams) toRequest(header *pb.RequestHeader) *pb.BucketDeleteRequest { 264 return &pb.BucketDeleteRequest{ 265 Header: header, 266 Name: params.Name, 267 DeleteAll: params.DeleteAll, 268 } 269} 270 271// BatchItem returns single item for batch request. 272func (params *DeleteBucketParams) BatchItem() *pb.BatchRequestItem { 273 return &pb.BatchRequestItem{ 274 Request: &pb.BatchRequestItem_BucketDelete{ 275 BucketDelete: params.toRequest(nil), 276 }, 277 } 278} 279 280// DeleteBucket deletes a bucket. 281func (client *Client) DeleteBucket(ctx context.Context, params DeleteBucketParams) (_ Bucket, err error) { 282 defer mon.Task()(&ctx)(&err) 283 284 var response *pb.BucketDeleteResponse 285 err = WithRetry(ctx, func(ctx context.Context) error { 286 // TODO(moby) make sure bucket not found is properly handled 287 response, err = client.client.DeleteBucket(ctx, params.toRequest(client.header())) 288 return err 289 }) 290 if err != nil { 291 if errs2.IsRPC(err, rpcstatus.NotFound) { 292 return Bucket{}, ErrBucketNotFound.Wrap(err) 293 } 294 return Bucket{}, Error.Wrap(err) 295 } 296 297 respBucket, err := convertProtoToBucket(response.Bucket) 298 if err != nil { 299 return Bucket{}, Error.Wrap(err) 300 } 301 return respBucket, nil 302} 303 304// ListBucketsParams parmaters for ListBucketsParams method. 305type ListBucketsParams struct { 306 ListOpts BucketListOptions 307} 308 309func (params *ListBucketsParams) toRequest(header *pb.RequestHeader) *pb.BucketListRequest { 310 return &pb.BucketListRequest{ 311 Header: header, 312 Cursor: []byte(params.ListOpts.Cursor), 313 Limit: int32(params.ListOpts.Limit), 314 Direction: int32(params.ListOpts.Direction), 315 } 316} 317 318// BatchItem returns single item for batch request. 319func (params *ListBucketsParams) BatchItem() *pb.BatchRequestItem { 320 return &pb.BatchRequestItem{ 321 Request: &pb.BatchRequestItem_BucketList{ 322 BucketList: params.toRequest(nil), 323 }, 324 } 325} 326 327// ListBucketsResponse response for ListBucket request. 328type ListBucketsResponse struct { 329 BucketList BucketList 330} 331 332func newListBucketsResponse(response *pb.BucketListResponse) ListBucketsResponse { 333 bucketList := BucketList{ 334 More: response.More, 335 } 336 bucketList.Items = make([]Bucket, len(response.Items)) 337 for i, item := range response.GetItems() { 338 bucketList.Items[i] = Bucket{ 339 Name: string(item.Name), 340 Created: item.CreatedAt, 341 } 342 } 343 return ListBucketsResponse{ 344 BucketList: bucketList, 345 } 346} 347 348// ListBuckets lists buckets. 349func (client *Client) ListBuckets(ctx context.Context, params ListBucketsParams) (_ BucketList, err error) { 350 defer mon.Task()(&ctx)(&err) 351 352 var response *pb.BucketListResponse 353 err = WithRetry(ctx, func(ctx context.Context) error { 354 response, err = client.client.ListBuckets(ctx, params.toRequest(client.header())) 355 return err 356 }) 357 if err != nil { 358 return BucketList{}, Error.Wrap(err) 359 } 360 361 resultBucketList := BucketList{ 362 More: response.GetMore(), 363 } 364 resultBucketList.Items = make([]Bucket, len(response.GetItems())) 365 for i, item := range response.GetItems() { 366 resultBucketList.Items[i] = Bucket{ 367 Name: string(item.GetName()), 368 Created: item.GetCreatedAt(), 369 } 370 } 371 return resultBucketList, nil 372} 373 374func convertProtoToBucket(pbBucket *pb.Bucket) (bucket Bucket, err error) { 375 if pbBucket == nil { 376 return Bucket{}, nil 377 } 378 379 return Bucket{ 380 Name: string(pbBucket.GetName()), 381 Created: pbBucket.GetCreatedAt(), 382 }, nil 383} 384 385// BeginObjectParams parmaters for BeginObject method. 386type BeginObjectParams struct { 387 Bucket []byte 388 EncryptedPath []byte 389 Version int32 390 Redundancy storj.RedundancyScheme 391 EncryptionParameters storj.EncryptionParameters 392 ExpiresAt time.Time 393} 394 395func (params *BeginObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectBeginRequest { 396 return &pb.ObjectBeginRequest{ 397 Header: header, 398 Bucket: params.Bucket, 399 EncryptedPath: params.EncryptedPath, 400 Version: params.Version, 401 ExpiresAt: params.ExpiresAt, 402 RedundancyScheme: &pb.RedundancyScheme{ 403 Type: pb.RedundancyScheme_SchemeType(params.Redundancy.Algorithm), 404 ErasureShareSize: params.Redundancy.ShareSize, 405 MinReq: int32(params.Redundancy.RequiredShares), 406 RepairThreshold: int32(params.Redundancy.RepairShares), 407 SuccessThreshold: int32(params.Redundancy.OptimalShares), 408 Total: int32(params.Redundancy.TotalShares), 409 }, 410 EncryptionParameters: &pb.EncryptionParameters{ 411 CipherSuite: pb.CipherSuite(params.EncryptionParameters.CipherSuite), 412 BlockSize: int64(params.EncryptionParameters.BlockSize), 413 }, 414 } 415} 416 417// BatchItem returns single item for batch request. 418func (params *BeginObjectParams) BatchItem() *pb.BatchRequestItem { 419 return &pb.BatchRequestItem{ 420 Request: &pb.BatchRequestItem_ObjectBegin{ 421 ObjectBegin: params.toRequest(nil), 422 }, 423 } 424} 425 426// BeginObjectResponse response for BeginObject request. 427type BeginObjectResponse struct { 428 StreamID storj.StreamID 429 RedundancyStrategy eestream.RedundancyStrategy 430 EncryptionParameters storj.EncryptionParameters 431} 432 433func newBeginObjectResponse(response *pb.ObjectBeginResponse, redundancyStrategy eestream.RedundancyStrategy) BeginObjectResponse { 434 ep := storj.EncryptionParameters{} 435 if response.EncryptionParameters != nil { 436 ep = storj.EncryptionParameters{ 437 CipherSuite: storj.CipherSuite(response.EncryptionParameters.CipherSuite), 438 BlockSize: int32(response.EncryptionParameters.BlockSize), 439 } 440 } 441 442 return BeginObjectResponse{ 443 StreamID: response.StreamId, 444 RedundancyStrategy: redundancyStrategy, 445 EncryptionParameters: ep, 446 } 447} 448 449// BeginObject begins object creation. 450func (client *Client) BeginObject(ctx context.Context, params BeginObjectParams) (_ BeginObjectResponse, err error) { 451 defer mon.Task()(&ctx)(&err) 452 453 var response *pb.ObjectBeginResponse 454 err = WithRetry(ctx, func(ctx context.Context) error { 455 response, err = client.client.BeginObject(ctx, params.toRequest(client.header())) 456 return err 457 }) 458 if err != nil { 459 return BeginObjectResponse{}, Error.Wrap(err) 460 } 461 462 rs := eestream.RedundancyStrategy{} 463 if response.RedundancyScheme != nil { 464 rs, err = eestream.NewRedundancyStrategyFromProto(response.RedundancyScheme) 465 if err != nil { 466 return BeginObjectResponse{}, Error.Wrap(err) 467 } 468 } 469 470 return newBeginObjectResponse(response, rs), nil 471} 472 473// CommitObjectParams parmaters for CommitObject method. 474type CommitObjectParams struct { 475 StreamID storj.StreamID 476 477 EncryptedMetadataNonce storj.Nonce 478 EncryptedMetadata []byte 479 EncryptedMetadataEncryptedKey []byte 480} 481 482func (params *CommitObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectCommitRequest { 483 return &pb.ObjectCommitRequest{ 484 Header: header, 485 StreamId: params.StreamID, 486 EncryptedMetadataNonce: params.EncryptedMetadataNonce, 487 EncryptedMetadata: params.EncryptedMetadata, 488 EncryptedMetadataEncryptedKey: params.EncryptedMetadataEncryptedKey, 489 } 490} 491 492// BatchItem returns single item for batch request. 493func (params *CommitObjectParams) BatchItem() *pb.BatchRequestItem { 494 return &pb.BatchRequestItem{ 495 Request: &pb.BatchRequestItem_ObjectCommit{ 496 ObjectCommit: params.toRequest(nil), 497 }, 498 } 499} 500 501// CommitObject commits a created object. 502func (client *Client) CommitObject(ctx context.Context, params CommitObjectParams) (err error) { 503 defer mon.Task()(&ctx)(&err) 504 505 err = WithRetry(ctx, func(ctx context.Context) error { 506 _, err = client.client.CommitObject(ctx, params.toRequest(client.header())) 507 return err 508 }) 509 return Error.Wrap(err) 510} 511 512// GetObjectParams parameters for GetObject method. 513type GetObjectParams struct { 514 Bucket []byte 515 EncryptedPath []byte 516 Version int32 517 518 RedundancySchemePerSegment bool 519} 520 521func (params *GetObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectGetRequest { 522 return &pb.ObjectGetRequest{ 523 Header: header, 524 Bucket: params.Bucket, 525 EncryptedPath: params.EncryptedPath, 526 Version: params.Version, 527 RedundancySchemePerSegment: params.RedundancySchemePerSegment, 528 } 529} 530 531// BatchItem returns single item for batch request. 532func (params *GetObjectParams) BatchItem() *pb.BatchRequestItem { 533 return &pb.BatchRequestItem{ 534 Request: &pb.BatchRequestItem_ObjectGet{ 535 ObjectGet: params.toRequest(nil), 536 }, 537 } 538} 539 540// GetObjectResponse response for GetObject request. 541type GetObjectResponse struct { 542 Info RawObjectItem 543} 544 545func newGetObjectResponse(response *pb.ObjectGetResponse) GetObjectResponse { 546 return GetObjectResponse{ 547 Info: newObjectInfo(response.Object), 548 } 549} 550 551func newObjectInfo(object *pb.Object) RawObjectItem { 552 if object == nil { 553 return RawObjectItem{} 554 } 555 556 info := RawObjectItem{ 557 Bucket: string(object.Bucket), 558 EncryptedPath: object.EncryptedPath, 559 Version: uint32(object.Version), 560 561 StreamID: object.StreamId, 562 563 Created: object.CreatedAt, 564 Modified: object.CreatedAt, 565 PlainSize: object.PlainSize, 566 Expires: object.ExpiresAt, 567 EncryptedMetadata: object.EncryptedMetadata, 568 EncryptedMetadataNonce: object.EncryptedMetadataNonce, 569 EncryptedMetadataEncryptedKey: object.EncryptedMetadataEncryptedKey, 570 571 EncryptionParameters: storj.EncryptionParameters{ 572 CipherSuite: storj.CipherSuite(object.EncryptionParameters.CipherSuite), 573 BlockSize: int32(object.EncryptionParameters.BlockSize), 574 }, 575 } 576 577 pbRS := object.RedundancyScheme 578 if pbRS != nil { 579 info.RedundancyScheme = storj.RedundancyScheme{ 580 Algorithm: storj.RedundancyAlgorithm(pbRS.Type), 581 ShareSize: pbRS.ErasureShareSize, 582 RequiredShares: int16(pbRS.MinReq), 583 RepairShares: int16(pbRS.RepairThreshold), 584 OptimalShares: int16(pbRS.SuccessThreshold), 585 TotalShares: int16(pbRS.Total), 586 } 587 } 588 return info 589} 590 591// GetObject gets single object. 592func (client *Client) GetObject(ctx context.Context, params GetObjectParams) (_ RawObjectItem, err error) { 593 defer mon.Task()(&ctx)(&err) 594 595 var response *pb.ObjectGetResponse 596 err = WithRetry(ctx, func(ctx context.Context) error { 597 response, err = client.client.GetObject(ctx, params.toRequest(client.header())) 598 return err 599 }) 600 if err != nil { 601 if errs2.IsRPC(err, rpcstatus.NotFound) { 602 return RawObjectItem{}, ErrObjectNotFound.Wrap(err) 603 } 604 return RawObjectItem{}, Error.Wrap(err) 605 } 606 607 getResponse := newGetObjectResponse(response) 608 return getResponse.Info, nil 609} 610 611// GetObjectIPsParams are params for the GetObjectIPs request. 612type GetObjectIPsParams struct { 613 Bucket []byte 614 EncryptedPath []byte 615 Version int32 616} 617 618// GetObjectIPsResponse is the response from GetObjectIPs. 619type GetObjectIPsResponse struct { 620 IPPorts [][]byte 621 SegmentCount int64 622 PieceCount int64 623 ReliablePieceCount int64 624} 625 626func (params *GetObjectIPsParams) toRequest(header *pb.RequestHeader) *pb.ObjectGetIPsRequest { 627 return &pb.ObjectGetIPsRequest{ 628 Header: header, 629 Bucket: params.Bucket, 630 EncryptedPath: params.EncryptedPath, 631 Version: params.Version, 632 } 633} 634 635// GetObjectIPs returns the IP addresses of the nodes which hold the object. 636func (client *Client) GetObjectIPs(ctx context.Context, params GetObjectIPsParams) (r *GetObjectIPsResponse, err error) { 637 defer mon.Task()(&ctx)(&err) 638 639 var response *pb.ObjectGetIPsResponse 640 err = WithRetry(ctx, func(ctx context.Context) error { 641 response, err = client.client.GetObjectIPs(ctx, params.toRequest(client.header())) 642 return err 643 }) 644 if err != nil { 645 if errs2.IsRPC(err, rpcstatus.NotFound) { 646 return nil, ErrObjectNotFound.Wrap(err) 647 } 648 return nil, Error.Wrap(err) 649 } 650 651 return &GetObjectIPsResponse{ 652 IPPorts: response.Ips, 653 SegmentCount: response.SegmentCount, 654 PieceCount: response.PieceCount, 655 ReliablePieceCount: response.ReliablePieceCount, 656 }, nil 657} 658 659// UpdateObjectMetadataParams are params for the UpdateObjectMetadata request. 660type UpdateObjectMetadataParams struct { 661 Bucket []byte 662 EncryptedObjectKey []byte 663 Version int32 664 StreamID storj.StreamID 665 666 EncryptedMetadataNonce storj.Nonce 667 EncryptedMetadata []byte 668 EncryptedMetadataEncryptedKey []byte 669} 670 671func (params *UpdateObjectMetadataParams) toRequest(header *pb.RequestHeader) *pb.ObjectUpdateMetadataRequest { 672 return &pb.ObjectUpdateMetadataRequest{ 673 Header: header, 674 Bucket: params.Bucket, 675 EncryptedObjectKey: params.EncryptedObjectKey, 676 Version: params.Version, 677 StreamId: params.StreamID, 678 EncryptedMetadataNonce: params.EncryptedMetadataNonce, 679 EncryptedMetadata: params.EncryptedMetadata, 680 EncryptedMetadataEncryptedKey: params.EncryptedMetadataEncryptedKey, 681 } 682} 683 684// UpdateObjectMetadata replaces objects metadata. 685func (client *Client) UpdateObjectMetadata(ctx context.Context, params UpdateObjectMetadataParams) (err error) { 686 defer mon.Task()(&ctx)(&err) 687 688 err = WithRetry(ctx, func(ctx context.Context) error { 689 _, err = client.client.UpdateObjectMetadata(ctx, params.toRequest(client.header())) 690 return err 691 }) 692 if err != nil { 693 if errs2.IsRPC(err, rpcstatus.NotFound) { 694 return ErrObjectNotFound.Wrap(err) 695 } 696 } 697 698 return Error.Wrap(err) 699} 700 701// BeginDeleteObjectParams parameters for BeginDeleteObject method. 702type BeginDeleteObjectParams struct { 703 Bucket []byte 704 EncryptedPath []byte 705 Version int32 706 StreamID storj.StreamID 707 Status int32 708} 709 710func (params *BeginDeleteObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectBeginDeleteRequest { 711 return &pb.ObjectBeginDeleteRequest{ 712 Header: header, 713 Bucket: params.Bucket, 714 EncryptedPath: params.EncryptedPath, 715 Version: params.Version, 716 StreamId: ¶ms.StreamID, 717 Status: params.Status, 718 } 719} 720 721// BatchItem returns single item for batch request. 722func (params *BeginDeleteObjectParams) BatchItem() *pb.BatchRequestItem { 723 return &pb.BatchRequestItem{ 724 Request: &pb.BatchRequestItem_ObjectBeginDelete{ 725 ObjectBeginDelete: params.toRequest(nil), 726 }, 727 } 728} 729 730// BeginDeleteObjectResponse response for BeginDeleteObject request. 731type BeginDeleteObjectResponse struct { 732} 733 734func newBeginDeleteObjectResponse(response *pb.ObjectBeginDeleteResponse) BeginDeleteObjectResponse { 735 return BeginDeleteObjectResponse{} 736} 737 738// BeginDeleteObject begins object deletion process. 739func (client *Client) BeginDeleteObject(ctx context.Context, params BeginDeleteObjectParams) (_ RawObjectItem, err error) { 740 defer mon.Task()(&ctx)(&err) 741 742 var response *pb.ObjectBeginDeleteResponse 743 err = WithRetry(ctx, func(ctx context.Context) error { 744 // response.StreamID is not processed because satellite will always return nil 745 response, err = client.client.BeginDeleteObject(ctx, params.toRequest(client.header())) 746 return err 747 }) 748 if err != nil { 749 if errs2.IsRPC(err, rpcstatus.NotFound) { 750 return RawObjectItem{}, ErrObjectNotFound.Wrap(err) 751 } 752 return RawObjectItem{}, Error.Wrap(err) 753 } 754 755 return newObjectInfo(response.Object), nil 756} 757 758// ListObjectsParams parameters for ListObjects method. 759type ListObjectsParams struct { 760 Bucket []byte 761 EncryptedPrefix []byte 762 EncryptedCursor []byte 763 Limit int32 764 IncludeCustomMetadata bool 765 IncludeSystemMetadata bool 766 Recursive bool 767 Status int32 768} 769 770func (params *ListObjectsParams) toRequest(header *pb.RequestHeader) *pb.ObjectListRequest { 771 return &pb.ObjectListRequest{ 772 Header: header, 773 Bucket: params.Bucket, 774 EncryptedPrefix: params.EncryptedPrefix, 775 EncryptedCursor: params.EncryptedCursor, 776 Limit: params.Limit, 777 ObjectIncludes: &pb.ObjectListItemIncludes{ 778 Metadata: params.IncludeCustomMetadata, 779 ExcludeSystemMetadata: !params.IncludeSystemMetadata, 780 }, 781 UseObjectIncludes: true, 782 Recursive: params.Recursive, 783 Status: pb.Object_Status(params.Status), 784 } 785} 786 787// BatchItem returns single item for batch request. 788func (params *ListObjectsParams) BatchItem() *pb.BatchRequestItem { 789 return &pb.BatchRequestItem{ 790 Request: &pb.BatchRequestItem_ObjectList{ 791 ObjectList: params.toRequest(nil), 792 }, 793 } 794} 795 796// ListObjectsResponse response for ListObjects request. 797type ListObjectsResponse struct { 798 Items []RawObjectListItem 799 More bool 800} 801 802func newListObjectsResponse(response *pb.ObjectListResponse, encryptedPrefix []byte, recursive bool) ListObjectsResponse { 803 objects := make([]RawObjectListItem, len(response.Items)) 804 for i, object := range response.Items { 805 encryptedPath := object.EncryptedPath 806 isPrefix := false 807 if !recursive && len(encryptedPath) != 0 && encryptedPath[len(encryptedPath)-1] == '/' && !bytes.Equal(encryptedPath, encryptedPrefix) { 808 isPrefix = true 809 } 810 811 objects[i] = RawObjectListItem{ 812 EncryptedPath: object.EncryptedPath, 813 Version: object.Version, 814 Status: int32(object.Status), 815 StatusAt: object.StatusAt, 816 CreatedAt: object.CreatedAt, 817 ExpiresAt: object.ExpiresAt, 818 PlainSize: object.PlainSize, 819 EncryptedMetadataNonce: object.EncryptedMetadataNonce, 820 EncryptedMetadata: object.EncryptedMetadata, 821 822 IsPrefix: isPrefix, 823 } 824 825 if object.StreamId != nil { 826 objects[i].StreamID = *object.StreamId 827 } 828 } 829 830 return ListObjectsResponse{ 831 Items: objects, 832 More: response.More, 833 } 834} 835 836// ListObjects lists objects according to specific parameters. 837func (client *Client) ListObjects(ctx context.Context, params ListObjectsParams) (_ []RawObjectListItem, more bool, err error) { 838 defer mon.Task()(&ctx)(&err) 839 840 var response *pb.ObjectListResponse 841 err = WithRetry(ctx, func(ctx context.Context) error { 842 response, err = client.client.ListObjects(ctx, params.toRequest(client.header())) 843 return err 844 }) 845 if err != nil { 846 return []RawObjectListItem{}, false, Error.Wrap(err) 847 } 848 849 listResponse := newListObjectsResponse(response, params.EncryptedPrefix, params.Recursive) 850 return listResponse.Items, listResponse.More, Error.Wrap(err) 851} 852 853// ListPendingObjectStreamsParams parameters for ListPendingObjectStreams method. 854type ListPendingObjectStreamsParams struct { 855 Bucket []byte 856 EncryptedPath []byte 857 EncryptedCursor []byte 858 Limit int32 859} 860 861func (params *ListPendingObjectStreamsParams) toRequest(header *pb.RequestHeader) *pb.ObjectListPendingStreamsRequest { 862 return &pb.ObjectListPendingStreamsRequest{ 863 Header: header, 864 Bucket: params.Bucket, 865 EncryptedPath: params.EncryptedPath, 866 StreamIdCursor: params.EncryptedCursor, 867 Limit: params.Limit, 868 } 869} 870 871// BatchItem returns single item for batch request. 872func (params *ListPendingObjectStreamsParams) BatchItem() *pb.BatchRequestItem { 873 return &pb.BatchRequestItem{ 874 Request: &pb.BatchRequestItem_PendingStreams{ 875 PendingStreams: params.toRequest(nil), 876 }, 877 } 878} 879 880// ListPendingObjectStreamsResponse response for ListPendingObjectStreams request. 881type ListPendingObjectStreamsResponse struct { 882 Items []RawObjectListItem 883 More bool 884} 885 886func newListPendingObjectStreamsResponse(response *pb.ObjectListPendingStreamsResponse) ListPendingObjectStreamsResponse { 887 objects := make([]RawObjectListItem, len(response.Items)) 888 for i, object := range response.Items { 889 890 objects[i] = RawObjectListItem{ 891 EncryptedPath: object.EncryptedPath, 892 Version: object.Version, 893 Status: int32(object.Status), 894 StatusAt: object.StatusAt, 895 CreatedAt: object.CreatedAt, 896 ExpiresAt: object.ExpiresAt, 897 PlainSize: object.PlainSize, 898 EncryptedMetadataNonce: object.EncryptedMetadataNonce, 899 EncryptedMetadata: object.EncryptedMetadata, 900 901 IsPrefix: false, 902 } 903 904 if object.StreamId != nil { 905 objects[i].StreamID = *object.StreamId 906 } 907 } 908 909 return ListPendingObjectStreamsResponse{ 910 Items: objects, 911 More: response.More, 912 } 913} 914 915// ListPendingObjectStreams lists pending objects with the specified object key in the specified bucket. 916func (client *Client) ListPendingObjectStreams(ctx context.Context, params ListPendingObjectStreamsParams) (_ ListPendingObjectStreamsResponse, err error) { 917 defer mon.Task()(&ctx)(&err) 918 919 var response *pb.ObjectListPendingStreamsResponse 920 err = WithRetry(ctx, func(ctx context.Context) error { 921 response, err = client.client.ListPendingObjectStreams(ctx, params.toRequest(client.header())) 922 return err 923 }) 924 if err != nil { 925 return ListPendingObjectStreamsResponse{}, Error.Wrap(err) 926 } 927 928 return newListPendingObjectStreamsResponse(response), nil 929} 930 931// SegmentListItem represents listed segment. 932type SegmentListItem struct { 933 Position SegmentPosition 934 PlainSize int64 935 PlainOffset int64 936 CreatedAt time.Time 937 EncryptedETag []byte 938 EncryptedKeyNonce storj.Nonce 939 EncryptedKey []byte 940} 941 942// ListSegmentsParams parameters for ListSegments method. 943type ListSegmentsParams struct { 944 StreamID []byte 945 Cursor SegmentPosition 946 Limit int32 947 Range StreamRange 948} 949 950func (params *ListSegmentsParams) toRequest(header *pb.RequestHeader) *pb.SegmentListRequest { 951 return &pb.SegmentListRequest{ 952 Header: header, 953 StreamId: params.StreamID, 954 CursorPosition: &pb.SegmentPosition{ 955 PartNumber: params.Cursor.PartNumber, 956 Index: params.Cursor.Index, 957 }, 958 Limit: params.Limit, 959 Range: params.Range.toProto(), 960 } 961} 962 963// BatchItem returns single item for batch request. 964func (params *ListSegmentsParams) BatchItem() *pb.BatchRequestItem { 965 return &pb.BatchRequestItem{ 966 Request: &pb.BatchRequestItem_SegmentList{ 967 SegmentList: params.toRequest(nil), 968 }, 969 } 970} 971 972// ListSegmentsResponse response for ListSegments request. 973type ListSegmentsResponse struct { 974 Items []SegmentListItem 975 More bool 976 EncryptionParameters storj.EncryptionParameters 977} 978 979func newListSegmentsResponse(response *pb.SegmentListResponse) ListSegmentsResponse { 980 segments := make([]SegmentListItem, len(response.Items)) 981 for i, segment := range response.Items { 982 segments[i] = SegmentListItem{ 983 Position: SegmentPosition{ 984 PartNumber: segment.Position.PartNumber, 985 Index: segment.Position.Index, 986 }, 987 PlainSize: segment.PlainSize, 988 PlainOffset: segment.PlainOffset, 989 CreatedAt: segment.CreatedAt, 990 EncryptedETag: segment.EncryptedETag, 991 EncryptedKeyNonce: segment.EncryptedKeyNonce, 992 EncryptedKey: segment.EncryptedKey, 993 } 994 } 995 996 ep := storj.EncryptionParameters{} 997 if response.EncryptionParameters != nil { 998 ep = storj.EncryptionParameters{ 999 CipherSuite: storj.CipherSuite(response.EncryptionParameters.CipherSuite), 1000 BlockSize: int32(response.EncryptionParameters.BlockSize), 1001 } 1002 } 1003 1004 return ListSegmentsResponse{ 1005 Items: segments, 1006 More: response.More, 1007 EncryptionParameters: ep, 1008 } 1009} 1010 1011// ListSegments lists segments according to specific parameters. 1012func (client *Client) ListSegments(ctx context.Context, params ListSegmentsParams) (_ ListSegmentsResponse, err error) { 1013 defer mon.Task()(&ctx)(&err) 1014 1015 var response *pb.SegmentListResponse 1016 err = WithRetry(ctx, func(ctx context.Context) error { 1017 response, err = client.client.ListSegments(ctx, params.toRequest(client.header())) 1018 return err 1019 }) 1020 if err != nil { 1021 return ListSegmentsResponse{}, Error.Wrap(err) 1022 } 1023 1024 return newListSegmentsResponse(response), nil 1025} 1026 1027// BeginSegmentParams parameters for BeginSegment method. 1028type BeginSegmentParams struct { 1029 StreamID storj.StreamID 1030 Position SegmentPosition 1031 MaxOrderLimit int64 1032} 1033 1034func (params *BeginSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentBeginRequest { 1035 return &pb.SegmentBeginRequest{ 1036 Header: header, 1037 StreamId: params.StreamID, 1038 Position: &pb.SegmentPosition{ 1039 PartNumber: params.Position.PartNumber, 1040 Index: params.Position.Index, 1041 }, 1042 MaxOrderLimit: params.MaxOrderLimit, 1043 } 1044} 1045 1046// BatchItem returns single item for batch request. 1047func (params *BeginSegmentParams) BatchItem() *pb.BatchRequestItem { 1048 return &pb.BatchRequestItem{ 1049 Request: &pb.BatchRequestItem_SegmentBegin{ 1050 SegmentBegin: params.toRequest(nil), 1051 }, 1052 } 1053} 1054 1055// BeginSegmentResponse response for BeginSegment request. 1056type BeginSegmentResponse struct { 1057 SegmentID storj.SegmentID 1058 Limits []*pb.AddressedOrderLimit 1059 PiecePrivateKey storj.PiecePrivateKey 1060 RedundancyStrategy eestream.RedundancyStrategy 1061} 1062 1063func newBeginSegmentResponse(response *pb.SegmentBeginResponse) (BeginSegmentResponse, error) { 1064 var rs eestream.RedundancyStrategy 1065 var err error 1066 if response.RedundancyScheme != nil { 1067 rs, err = eestream.NewRedundancyStrategyFromProto(response.RedundancyScheme) 1068 if err != nil { 1069 return BeginSegmentResponse{}, err 1070 } 1071 } 1072 return BeginSegmentResponse{ 1073 SegmentID: response.SegmentId, 1074 Limits: response.AddressedLimits, 1075 PiecePrivateKey: response.PrivateKey, 1076 RedundancyStrategy: rs, 1077 }, nil 1078} 1079 1080// BeginSegment begins a segment upload. 1081func (client *Client) BeginSegment(ctx context.Context, params BeginSegmentParams) (_ BeginSegmentResponse, err error) { 1082 defer mon.Task()(&ctx)(&err) 1083 1084 var response *pb.SegmentBeginResponse 1085 err = WithRetry(ctx, func(ctx context.Context) error { 1086 response, err = client.client.BeginSegment(ctx, params.toRequest(client.header())) 1087 return err 1088 }) 1089 if err != nil { 1090 return BeginSegmentResponse{}, Error.Wrap(err) 1091 } 1092 1093 return newBeginSegmentResponse(response) 1094} 1095 1096// CommitSegmentParams parameters for CommitSegment method. 1097type CommitSegmentParams struct { 1098 SegmentID storj.SegmentID 1099 Encryption SegmentEncryption 1100 SizeEncryptedData int64 1101 PlainSize int64 1102 EncryptedTag []byte 1103 1104 UploadResult []*pb.SegmentPieceUploadResult 1105} 1106 1107func (params *CommitSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentCommitRequest { 1108 return &pb.SegmentCommitRequest{ 1109 Header: header, 1110 SegmentId: params.SegmentID, 1111 1112 EncryptedKeyNonce: params.Encryption.EncryptedKeyNonce, 1113 EncryptedKey: params.Encryption.EncryptedKey, 1114 SizeEncryptedData: params.SizeEncryptedData, 1115 PlainSize: params.PlainSize, 1116 EncryptedETag: params.EncryptedTag, 1117 UploadResult: params.UploadResult, 1118 } 1119} 1120 1121// BatchItem returns single item for batch request. 1122func (params *CommitSegmentParams) BatchItem() *pb.BatchRequestItem { 1123 return &pb.BatchRequestItem{ 1124 Request: &pb.BatchRequestItem_SegmentCommit{ 1125 SegmentCommit: params.toRequest(nil), 1126 }, 1127 } 1128} 1129 1130// CommitSegment commits an uploaded segment. 1131func (client *Client) CommitSegment(ctx context.Context, params CommitSegmentParams) (err error) { 1132 defer mon.Task()(&ctx)(&err) 1133 1134 err = WithRetry(ctx, func(ctx context.Context) error { 1135 _, err = client.client.CommitSegment(ctx, params.toRequest(client.header())) 1136 return err 1137 }) 1138 1139 return Error.Wrap(err) 1140} 1141 1142// MakeInlineSegmentParams parameters for MakeInlineSegment method. 1143type MakeInlineSegmentParams struct { 1144 StreamID storj.StreamID 1145 Position SegmentPosition 1146 Encryption SegmentEncryption 1147 EncryptedInlineData []byte 1148 PlainSize int64 1149 EncryptedTag []byte 1150} 1151 1152func (params *MakeInlineSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentMakeInlineRequest { 1153 return &pb.SegmentMakeInlineRequest{ 1154 Header: header, 1155 StreamId: params.StreamID, 1156 Position: &pb.SegmentPosition{ 1157 PartNumber: params.Position.PartNumber, 1158 Index: params.Position.Index, 1159 }, 1160 EncryptedKeyNonce: params.Encryption.EncryptedKeyNonce, 1161 EncryptedKey: params.Encryption.EncryptedKey, 1162 EncryptedInlineData: params.EncryptedInlineData, 1163 PlainSize: params.PlainSize, 1164 EncryptedETag: params.EncryptedTag, 1165 } 1166} 1167 1168// BatchItem returns single item for batch request. 1169func (params *MakeInlineSegmentParams) BatchItem() *pb.BatchRequestItem { 1170 return &pb.BatchRequestItem{ 1171 Request: &pb.BatchRequestItem_SegmentMakeInline{ 1172 SegmentMakeInline: params.toRequest(nil), 1173 }, 1174 } 1175} 1176 1177// MakeInlineSegment creates an inline segment. 1178func (client *Client) MakeInlineSegment(ctx context.Context, params MakeInlineSegmentParams) (err error) { 1179 defer mon.Task()(&ctx)(&err) 1180 1181 err = WithRetry(ctx, func(ctx context.Context) error { 1182 _, err = client.client.MakeInlineSegment(ctx, params.toRequest(client.header())) 1183 return err 1184 }) 1185 1186 return Error.Wrap(err) 1187} 1188 1189// DownloadObjectParams parameters for DownloadSegment method. 1190type DownloadObjectParams struct { 1191 Bucket []byte 1192 EncryptedObjectKey []byte 1193 1194 Range StreamRange 1195} 1196 1197// StreamRange contains range specification. 1198type StreamRange struct { 1199 Mode StreamRangeMode 1200 Start int64 1201 Limit int64 1202 Suffix int64 1203} 1204 1205// StreamRangeMode contains different modes for range. 1206type StreamRangeMode byte 1207 1208const ( 1209 // StreamRangeAll selects all. 1210 StreamRangeAll StreamRangeMode = iota 1211 // StreamRangeStart selects starting from range.Start. 1212 StreamRangeStart 1213 // StreamRangeStartLimit selects starting from range.Start to range.End (inclusive). 1214 StreamRangeStartLimit 1215 // StreamRangeSuffix selects last range.Suffix bytes. 1216 StreamRangeSuffix 1217) 1218 1219func (streamRange StreamRange) toProto() *pb.Range { 1220 switch streamRange.Mode { 1221 case StreamRangeAll: 1222 case StreamRangeStart: 1223 return &pb.Range{ 1224 Range: &pb.Range_Start{ 1225 Start: &pb.RangeStart{ 1226 PlainStart: streamRange.Start, 1227 }, 1228 }, 1229 } 1230 case StreamRangeStartLimit: 1231 return &pb.Range{ 1232 Range: &pb.Range_StartLimit{ 1233 StartLimit: &pb.RangeStartLimit{ 1234 PlainStart: streamRange.Start, 1235 PlainLimit: streamRange.Limit, 1236 }, 1237 }, 1238 } 1239 case StreamRangeSuffix: 1240 return &pb.Range{ 1241 Range: &pb.Range_Suffix{ 1242 Suffix: &pb.RangeSuffix{ 1243 PlainSuffix: streamRange.Suffix, 1244 }, 1245 }, 1246 } 1247 } 1248 return nil 1249} 1250 1251// Normalize converts the range to a StreamRangeStartLimit or StreamRangeAll. 1252func (streamRange StreamRange) Normalize(plainSize int64) StreamRange { 1253 switch streamRange.Mode { 1254 case StreamRangeAll: 1255 streamRange.Start = 0 1256 streamRange.Limit = plainSize 1257 case StreamRangeStart: 1258 streamRange.Mode = StreamRangeStartLimit 1259 streamRange.Limit = plainSize 1260 case StreamRangeStartLimit: 1261 case StreamRangeSuffix: 1262 streamRange.Mode = StreamRangeStartLimit 1263 streamRange.Start = plainSize - streamRange.Suffix 1264 streamRange.Limit = plainSize 1265 } 1266 1267 if streamRange.Start < 0 { 1268 streamRange.Start = 0 1269 } 1270 if streamRange.Limit > plainSize { 1271 streamRange.Limit = plainSize 1272 } 1273 streamRange.Suffix = 0 1274 1275 return streamRange 1276} 1277 1278func (params *DownloadObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectDownloadRequest { 1279 return &pb.ObjectDownloadRequest{ 1280 Header: header, 1281 Bucket: params.Bucket, 1282 EncryptedObjectKey: params.EncryptedObjectKey, 1283 Range: params.Range.toProto(), 1284 } 1285} 1286 1287// BatchItem returns single item for batch request. 1288func (params *DownloadObjectParams) BatchItem() *pb.BatchRequestItem { 1289 return &pb.BatchRequestItem{ 1290 Request: &pb.BatchRequestItem_ObjectDownload{ 1291 ObjectDownload: params.toRequest(nil), 1292 }, 1293 } 1294} 1295 1296// DownloadObjectResponse response for DownloadSegment request. 1297type DownloadObjectResponse struct { 1298 Object RawObjectItem 1299 DownloadedSegments []DownloadSegmentWithRSResponse 1300 ListSegments ListSegmentsResponse 1301} 1302 1303func newDownloadObjectResponse(response *pb.ObjectDownloadResponse) DownloadObjectResponse { 1304 downloadedSegments := make([]DownloadSegmentWithRSResponse, 0, len(response.SegmentDownload)) 1305 for _, segmentDownload := range response.SegmentDownload { 1306 downloadedSegments = append(downloadedSegments, newDownloadSegmentResponseWithRS(segmentDownload)) 1307 } 1308 return DownloadObjectResponse{ 1309 Object: newObjectInfo(response.Object), 1310 DownloadedSegments: downloadedSegments, 1311 ListSegments: newListSegmentsResponse(response.SegmentList), 1312 } 1313} 1314 1315// DownloadObject gets object information, lists segments and downloads the first segment. 1316func (client *Client) DownloadObject(ctx context.Context, params DownloadObjectParams) (_ DownloadObjectResponse, err error) { 1317 defer mon.Task()(&ctx)(&err) 1318 1319 var response *pb.ObjectDownloadResponse 1320 err = WithRetry(ctx, func(ctx context.Context) error { 1321 response, err = client.client.DownloadObject(ctx, params.toRequest(client.header())) 1322 return err 1323 }) 1324 if err != nil { 1325 if errs2.IsRPC(err, rpcstatus.NotFound) { 1326 return DownloadObjectResponse{}, storj.ErrObjectNotFound.Wrap(err) 1327 } 1328 return DownloadObjectResponse{}, Error.Wrap(err) 1329 } 1330 1331 return newDownloadObjectResponse(response), nil 1332} 1333 1334// DownloadSegmentParams parameters for DownloadSegment method. 1335type DownloadSegmentParams struct { 1336 StreamID storj.StreamID 1337 Position SegmentPosition 1338} 1339 1340func (params *DownloadSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentDownloadRequest { 1341 return &pb.SegmentDownloadRequest{ 1342 Header: header, 1343 StreamId: params.StreamID, 1344 CursorPosition: &pb.SegmentPosition{ 1345 PartNumber: params.Position.PartNumber, 1346 Index: params.Position.Index, 1347 }, 1348 } 1349} 1350 1351// BatchItem returns single item for batch request. 1352func (params *DownloadSegmentParams) BatchItem() *pb.BatchRequestItem { 1353 return &pb.BatchRequestItem{ 1354 Request: &pb.BatchRequestItem_SegmentDownload{ 1355 SegmentDownload: params.toRequest(nil), 1356 }, 1357 } 1358} 1359 1360// DownloadSegmentResponse response for DownloadSegment request. 1361type DownloadSegmentResponse struct { 1362 Info SegmentDownloadResponseInfo 1363 1364 Limits []*pb.AddressedOrderLimit 1365} 1366 1367func newDownloadSegmentResponse(response *pb.SegmentDownloadResponse) DownloadSegmentResponse { 1368 info := SegmentDownloadResponseInfo{ 1369 SegmentID: response.SegmentId, 1370 EncryptedSize: response.SegmentSize, 1371 EncryptedInlineData: response.EncryptedInlineData, 1372 PiecePrivateKey: response.PrivateKey, 1373 SegmentEncryption: SegmentEncryption{ 1374 EncryptedKeyNonce: response.EncryptedKeyNonce, 1375 EncryptedKey: response.EncryptedKey, 1376 }, 1377 } 1378 if response.Next != nil { 1379 info.Next = SegmentPosition{ 1380 PartNumber: response.Next.PartNumber, 1381 Index: response.Next.Index, 1382 } 1383 } 1384 1385 for i := range response.AddressedLimits { 1386 if response.AddressedLimits[i].Limit == nil { 1387 response.AddressedLimits[i] = nil 1388 } 1389 } 1390 return DownloadSegmentResponse{ 1391 Info: info, 1392 Limits: response.AddressedLimits, 1393 } 1394} 1395 1396// DownloadSegment gets information for downloading remote segment or data 1397// from an inline segment. 1398func (client *Client) DownloadSegment(ctx context.Context, params DownloadSegmentParams) (_ SegmentDownloadResponseInfo, _ []*pb.AddressedOrderLimit, err error) { 1399 defer mon.Task()(&ctx)(&err) 1400 1401 var response *pb.SegmentDownloadResponse 1402 err = WithRetry(ctx, func(ctx context.Context) error { 1403 response, err = client.client.DownloadSegment(ctx, params.toRequest(client.header())) 1404 return err 1405 }) 1406 if err != nil { 1407 if errs2.IsRPC(err, rpcstatus.NotFound) { 1408 return SegmentDownloadResponseInfo{}, nil, ErrObjectNotFound.Wrap(err) 1409 } 1410 return SegmentDownloadResponseInfo{}, nil, Error.Wrap(err) 1411 } 1412 1413 downloadResponse := newDownloadSegmentResponse(response) 1414 return downloadResponse.Info, downloadResponse.Limits, nil 1415} 1416 1417// DownloadSegmentWithRSResponse contains information for downloading remote segment or data from an inline segment. 1418type DownloadSegmentWithRSResponse struct { 1419 Info SegmentDownloadInfo 1420 Limits []*pb.AddressedOrderLimit 1421} 1422 1423// SegmentDownloadInfo represents information necessary for downloading segment (inline and remote). 1424type SegmentDownloadInfo struct { 1425 SegmentID storj.SegmentID 1426 PlainOffset int64 1427 PlainSize int64 1428 EncryptedSize int64 1429 EncryptedInlineData []byte 1430 PiecePrivateKey storj.PiecePrivateKey 1431 SegmentEncryption SegmentEncryption 1432 RedundancyScheme storj.RedundancyScheme 1433 Position *storj.SegmentPosition 1434} 1435 1436func newDownloadSegmentResponseWithRS(response *pb.SegmentDownloadResponse) DownloadSegmentWithRSResponse { 1437 info := SegmentDownloadInfo{ 1438 SegmentID: response.SegmentId, 1439 PlainOffset: response.PlainOffset, 1440 PlainSize: response.PlainSize, 1441 EncryptedSize: response.SegmentSize, 1442 EncryptedInlineData: response.EncryptedInlineData, 1443 PiecePrivateKey: response.PrivateKey, 1444 SegmentEncryption: SegmentEncryption{ 1445 EncryptedKeyNonce: response.EncryptedKeyNonce, 1446 EncryptedKey: response.EncryptedKey, 1447 }, 1448 } 1449 1450 if response.Position != nil { 1451 info.Position = &storj.SegmentPosition{ 1452 PartNumber: response.Position.PartNumber, 1453 Index: response.Position.Index, 1454 } 1455 } 1456 1457 if response.RedundancyScheme != nil { 1458 info.RedundancyScheme = storj.RedundancyScheme{ 1459 Algorithm: storj.RedundancyAlgorithm(response.RedundancyScheme.Type), 1460 ShareSize: response.RedundancyScheme.ErasureShareSize, 1461 RequiredShares: int16(response.RedundancyScheme.MinReq), 1462 RepairShares: int16(response.RedundancyScheme.RepairThreshold), 1463 OptimalShares: int16(response.RedundancyScheme.SuccessThreshold), 1464 TotalShares: int16(response.RedundancyScheme.Total), 1465 } 1466 } 1467 1468 for i := range response.AddressedLimits { 1469 if response.AddressedLimits[i].Limit == nil { 1470 response.AddressedLimits[i] = nil 1471 } 1472 } 1473 return DownloadSegmentWithRSResponse{ 1474 Info: info, 1475 Limits: response.AddressedLimits, 1476 } 1477} 1478 1479// TODO replace DownloadSegment with DownloadSegmentWithRS in batch 1480 1481// DownloadSegmentWithRS gets information for downloading remote segment or data from an inline segment. 1482func (client *Client) DownloadSegmentWithRS(ctx context.Context, params DownloadSegmentParams) (_ DownloadSegmentWithRSResponse, err error) { 1483 defer mon.Task()(&ctx)(&err) 1484 1485 var response *pb.SegmentDownloadResponse 1486 err = WithRetry(ctx, func(ctx context.Context) error { 1487 response, err = client.client.DownloadSegment(ctx, params.toRequest(client.header())) 1488 return err 1489 }) 1490 if err != nil { 1491 if errs2.IsRPC(err, rpcstatus.NotFound) { 1492 return DownloadSegmentWithRSResponse{}, ErrObjectNotFound.Wrap(err) 1493 } 1494 return DownloadSegmentWithRSResponse{}, Error.Wrap(err) 1495 } 1496 1497 return newDownloadSegmentResponseWithRS(response), nil 1498} 1499 1500// DeletePartParams contains information needed to delete part. 1501type DeletePartParams struct { 1502 StreamID storj.StreamID 1503 PartNumber uint32 1504} 1505 1506func (params *DeletePartParams) toRequest(header *pb.RequestHeader) *pb.PartDeleteRequest { 1507 return &pb.PartDeleteRequest{ 1508 Header: header, 1509 1510 StreamId: params.StreamID, 1511 PartNumber: int32(params.PartNumber), 1512 } 1513} 1514 1515// BatchItem returns single item for batch request. 1516func (params *DeletePartParams) BatchItem() *pb.BatchRequestItem { 1517 return &pb.BatchRequestItem{ 1518 Request: &pb.BatchRequestItem_PartDelete{ 1519 PartDelete: params.toRequest(nil), 1520 }, 1521 } 1522} 1523 1524// DeletePart deletes single part. 1525func (client *Client) DeletePart(ctx context.Context, params DeletePartParams) (err error) { 1526 defer mon.Task()(&ctx)(&err) 1527 1528 _, err = client.client.DeletePart(ctx, params.toRequest(client.header())) 1529 return err 1530} 1531 1532// RevokeAPIKey revokes the APIKey provided in the params. 1533func (client *Client) RevokeAPIKey(ctx context.Context, params RevokeAPIKeyParams) (err error) { 1534 defer mon.Task()(&ctx)(&err) 1535 err = WithRetry(ctx, func(ctx context.Context) error { 1536 _, err = client.client.RevokeAPIKey(ctx, params.toRequest(client.header())) 1537 return err 1538 }) 1539 return Error.Wrap(err) 1540} 1541 1542// RevokeAPIKeyParams contain params for a RevokeAPIKey request. 1543type RevokeAPIKeyParams struct { 1544 APIKey []byte 1545} 1546 1547func (r RevokeAPIKeyParams) toRequest(header *pb.RequestHeader) *pb.RevokeAPIKeyRequest { 1548 return &pb.RevokeAPIKeyRequest{ 1549 Header: header, 1550 ApiKey: r.APIKey, 1551 } 1552} 1553 1554// Batch sends multiple requests in one batch. 1555func (client *Client) Batch(ctx context.Context, requests ...BatchItem) (resp []BatchResponse, err error) { 1556 defer mon.Task()(&ctx)(&err) 1557 1558 batchItems := make([]*pb.BatchRequestItem, len(requests)) 1559 for i, request := range requests { 1560 batchItems[i] = request.BatchItem() 1561 } 1562 response, err := client.client.Batch(ctx, &pb.BatchRequest{ 1563 Header: client.header(), 1564 Requests: batchItems, 1565 }) 1566 if err != nil { 1567 return []BatchResponse{}, Error.Wrap(err) 1568 } 1569 1570 resp = make([]BatchResponse, len(response.Responses)) 1571 for i, response := range response.Responses { 1572 resp[i] = BatchResponse{ 1573 pbRequest: batchItems[i].Request, 1574 pbResponse: response.Response, 1575 } 1576 } 1577 1578 return resp, nil 1579} 1580 1581// SetRawAPIKey sets the client's raw API key. Mainly used for testing. 1582func (client *Client) SetRawAPIKey(key []byte) { 1583 client.apiKeyRaw = key 1584} 1585