1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package metaclient
5
6import (
7	"bytes"
8	"context"
9	"sync"
10	"time"
11
12	"github.com/spacemonkeygo/monkit/v3"
13	"github.com/zeebo/errs"
14
15	"storj.io/common/errs2"
16	"storj.io/common/macaroon"
17	"storj.io/common/pb"
18	"storj.io/common/rpc"
19	"storj.io/common/rpc/rpcstatus"
20	"storj.io/common/storj"
21	"storj.io/uplink/private/eestream"
22)
23
24var (
25	mon = monkit.Package()
26
27	// Error is the errs class of standard metainfo errors.
28	Error = errs.Class("metaclient")
29)
30
31// Client creates a grpcClient.
32type Client struct {
33	mu        sync.Mutex
34	conn      *rpc.Conn
35	client    pb.DRPCMetainfoClient
36	apiKeyRaw []byte
37
38	userAgent string
39}
40
41// ListItem is a single item in a listing.
42type ListItem struct {
43	Path     storj.Path
44	Pointer  *pb.Pointer
45	IsPrefix bool
46}
47
48// NewClient creates Metainfo API client.
49func NewClient(client pb.DRPCMetainfoClient, apiKey *macaroon.APIKey, userAgent string) *Client {
50	return &Client{
51		client:    client,
52		apiKeyRaw: apiKey.SerializeRaw(),
53
54		userAgent: userAgent,
55	}
56}
57
58// DialNodeURL dials to metainfo endpoint with the specified api key.
59func DialNodeURL(ctx context.Context, dialer rpc.Dialer, nodeURL string, apiKey *macaroon.APIKey, userAgent string) (*Client, error) {
60	url, err := storj.ParseNodeURL(nodeURL)
61	if err != nil {
62		return nil, Error.Wrap(err)
63	}
64
65	if url.ID.IsZero() {
66		return nil, Error.New("node ID is required in node URL %q", nodeURL)
67	}
68
69	conn, err := dialer.DialNodeURL(ctx, url)
70	if err != nil {
71		return nil, Error.Wrap(err)
72	}
73
74	return &Client{
75		conn:      conn,
76		client:    pb.NewDRPCMetainfoClient(conn),
77		apiKeyRaw: apiKey.SerializeRaw(),
78		userAgent: userAgent,
79	}, nil
80}
81
82// Close closes the dialed connection.
83func (client *Client) Close() error {
84	client.mu.Lock()
85	defer client.mu.Unlock()
86
87	if client.conn != nil {
88		err := client.conn.Close()
89		client.conn = nil
90		return Error.Wrap(err)
91	}
92
93	return nil
94}
95
96func (client *Client) header() *pb.RequestHeader {
97	return &pb.RequestHeader{
98		ApiKey:    client.apiKeyRaw,
99		UserAgent: []byte(client.userAgent),
100	}
101}
102
103// GetProjectInfo gets the ProjectInfo for the api key associated with the metainfo client.
104func (client *Client) GetProjectInfo(ctx context.Context) (response *pb.ProjectInfoResponse, err error) {
105	defer mon.Task()(&ctx)(&err)
106
107	err = WithRetry(ctx, func(ctx context.Context) error {
108		response, err = client.client.ProjectInfo(ctx, &pb.ProjectInfoRequest{
109			Header: client.header(),
110		})
111		return err
112	})
113	return response, err
114}
115
116// CreateBucketParams parameters for CreateBucket method.
117type CreateBucketParams struct {
118	Name []byte
119
120	// TODO remove those values when satellite will be adjusted
121	PathCipher                  storj.CipherSuite
122	PartnerID                   []byte
123	DefaultSegmentsSize         int64
124	DefaultRedundancyScheme     storj.RedundancyScheme
125	DefaultEncryptionParameters storj.EncryptionParameters
126}
127
128func (params *CreateBucketParams) toRequest(header *pb.RequestHeader) *pb.BucketCreateRequest {
129	defaultRS := params.DefaultRedundancyScheme
130	defaultEP := params.DefaultEncryptionParameters
131
132	return &pb.BucketCreateRequest{
133		Header:             header,
134		Name:               params.Name,
135		PathCipher:         pb.CipherSuite(params.PathCipher),
136		PartnerId:          params.PartnerID,
137		DefaultSegmentSize: params.DefaultSegmentsSize,
138		DefaultRedundancyScheme: &pb.RedundancyScheme{
139			Type:             pb.RedundancyScheme_SchemeType(defaultRS.Algorithm),
140			MinReq:           int32(defaultRS.RequiredShares),
141			Total:            int32(defaultRS.TotalShares),
142			RepairThreshold:  int32(defaultRS.RepairShares),
143			SuccessThreshold: int32(defaultRS.OptimalShares),
144			ErasureShareSize: defaultRS.ShareSize,
145		},
146		DefaultEncryptionParameters: &pb.EncryptionParameters{
147			CipherSuite: pb.CipherSuite(defaultEP.CipherSuite),
148			BlockSize:   int64(defaultEP.BlockSize),
149		},
150	}
151}
152
153// BatchItem returns single item for batch request.
154func (params *CreateBucketParams) BatchItem() *pb.BatchRequestItem {
155	return &pb.BatchRequestItem{
156		Request: &pb.BatchRequestItem_BucketCreate{
157			BucketCreate: params.toRequest(nil),
158		},
159	}
160}
161
162// CreateBucketResponse response for CreateBucket request.
163type CreateBucketResponse struct {
164	Bucket Bucket
165}
166
167func newCreateBucketResponse(response *pb.BucketCreateResponse) (CreateBucketResponse, error) {
168	bucket, err := convertProtoToBucket(response.Bucket)
169	if err != nil {
170		return CreateBucketResponse{}, err
171	}
172	return CreateBucketResponse{
173		Bucket: bucket,
174	}, nil
175}
176
177// CreateBucket creates a new bucket.
178func (client *Client) CreateBucket(ctx context.Context, params CreateBucketParams) (respBucket Bucket, err error) {
179	defer mon.Task()(&ctx)(&err)
180
181	var response *pb.BucketCreateResponse
182	err = WithRetry(ctx, func(ctx context.Context) error {
183		response, err = client.client.CreateBucket(ctx, params.toRequest(client.header()))
184		return err
185	})
186	if err != nil {
187		return Bucket{}, Error.Wrap(err)
188	}
189
190	respBucket, err = convertProtoToBucket(response.Bucket)
191	if err != nil {
192		return Bucket{}, Error.Wrap(err)
193	}
194	return respBucket, nil
195}
196
197// GetBucketParams parmaters for GetBucketParams method.
198type GetBucketParams struct {
199	Name []byte
200}
201
202func (params *GetBucketParams) toRequest(header *pb.RequestHeader) *pb.BucketGetRequest {
203	return &pb.BucketGetRequest{
204		Header: header,
205		Name:   params.Name,
206	}
207}
208
209// BatchItem returns single item for batch request.
210func (params *GetBucketParams) BatchItem() *pb.BatchRequestItem {
211	return &pb.BatchRequestItem{
212		Request: &pb.BatchRequestItem_BucketGet{
213			BucketGet: params.toRequest(nil),
214		},
215	}
216}
217
218// GetBucketResponse response for GetBucket request.
219type GetBucketResponse struct {
220	Bucket Bucket
221}
222
223func newGetBucketResponse(response *pb.BucketGetResponse) (GetBucketResponse, error) {
224	bucket, err := convertProtoToBucket(response.Bucket)
225	if err != nil {
226		return GetBucketResponse{}, err
227	}
228	return GetBucketResponse{
229		Bucket: bucket,
230	}, nil
231}
232
233// GetBucket returns a bucket.
234func (client *Client) GetBucket(ctx context.Context, params GetBucketParams) (respBucket Bucket, err error) {
235	defer mon.Task()(&ctx)(&err)
236
237	var response *pb.BucketGetResponse
238	err = WithRetry(ctx, func(ctx context.Context) error {
239		// TODO(moby) make sure bucket not found is properly handled
240		response, err = client.client.GetBucket(ctx, params.toRequest(client.header()))
241		return err
242	})
243	if err != nil {
244		if errs2.IsRPC(err, rpcstatus.NotFound) {
245			return Bucket{}, ErrBucketNotFound.Wrap(err)
246		}
247		return Bucket{}, Error.Wrap(err)
248	}
249
250	respBucket, err = convertProtoToBucket(response.Bucket)
251	if err != nil {
252		return Bucket{}, Error.Wrap(err)
253	}
254	return respBucket, nil
255}
256
257// DeleteBucketParams parmaters for DeleteBucket method.
258type DeleteBucketParams struct {
259	Name      []byte
260	DeleteAll bool
261}
262
263func (params *DeleteBucketParams) toRequest(header *pb.RequestHeader) *pb.BucketDeleteRequest {
264	return &pb.BucketDeleteRequest{
265		Header:    header,
266		Name:      params.Name,
267		DeleteAll: params.DeleteAll,
268	}
269}
270
271// BatchItem returns single item for batch request.
272func (params *DeleteBucketParams) BatchItem() *pb.BatchRequestItem {
273	return &pb.BatchRequestItem{
274		Request: &pb.BatchRequestItem_BucketDelete{
275			BucketDelete: params.toRequest(nil),
276		},
277	}
278}
279
280// DeleteBucket deletes a bucket.
281func (client *Client) DeleteBucket(ctx context.Context, params DeleteBucketParams) (_ Bucket, err error) {
282	defer mon.Task()(&ctx)(&err)
283
284	var response *pb.BucketDeleteResponse
285	err = WithRetry(ctx, func(ctx context.Context) error {
286		// TODO(moby) make sure bucket not found is properly handled
287		response, err = client.client.DeleteBucket(ctx, params.toRequest(client.header()))
288		return err
289	})
290	if err != nil {
291		if errs2.IsRPC(err, rpcstatus.NotFound) {
292			return Bucket{}, ErrBucketNotFound.Wrap(err)
293		}
294		return Bucket{}, Error.Wrap(err)
295	}
296
297	respBucket, err := convertProtoToBucket(response.Bucket)
298	if err != nil {
299		return Bucket{}, Error.Wrap(err)
300	}
301	return respBucket, nil
302}
303
304// ListBucketsParams parmaters for ListBucketsParams method.
305type ListBucketsParams struct {
306	ListOpts BucketListOptions
307}
308
309func (params *ListBucketsParams) toRequest(header *pb.RequestHeader) *pb.BucketListRequest {
310	return &pb.BucketListRequest{
311		Header:    header,
312		Cursor:    []byte(params.ListOpts.Cursor),
313		Limit:     int32(params.ListOpts.Limit),
314		Direction: int32(params.ListOpts.Direction),
315	}
316}
317
318// BatchItem returns single item for batch request.
319func (params *ListBucketsParams) BatchItem() *pb.BatchRequestItem {
320	return &pb.BatchRequestItem{
321		Request: &pb.BatchRequestItem_BucketList{
322			BucketList: params.toRequest(nil),
323		},
324	}
325}
326
327// ListBucketsResponse response for ListBucket request.
328type ListBucketsResponse struct {
329	BucketList BucketList
330}
331
332func newListBucketsResponse(response *pb.BucketListResponse) ListBucketsResponse {
333	bucketList := BucketList{
334		More: response.More,
335	}
336	bucketList.Items = make([]Bucket, len(response.Items))
337	for i, item := range response.GetItems() {
338		bucketList.Items[i] = Bucket{
339			Name:    string(item.Name),
340			Created: item.CreatedAt,
341		}
342	}
343	return ListBucketsResponse{
344		BucketList: bucketList,
345	}
346}
347
348// ListBuckets lists buckets.
349func (client *Client) ListBuckets(ctx context.Context, params ListBucketsParams) (_ BucketList, err error) {
350	defer mon.Task()(&ctx)(&err)
351
352	var response *pb.BucketListResponse
353	err = WithRetry(ctx, func(ctx context.Context) error {
354		response, err = client.client.ListBuckets(ctx, params.toRequest(client.header()))
355		return err
356	})
357	if err != nil {
358		return BucketList{}, Error.Wrap(err)
359	}
360
361	resultBucketList := BucketList{
362		More: response.GetMore(),
363	}
364	resultBucketList.Items = make([]Bucket, len(response.GetItems()))
365	for i, item := range response.GetItems() {
366		resultBucketList.Items[i] = Bucket{
367			Name:    string(item.GetName()),
368			Created: item.GetCreatedAt(),
369		}
370	}
371	return resultBucketList, nil
372}
373
374func convertProtoToBucket(pbBucket *pb.Bucket) (bucket Bucket, err error) {
375	if pbBucket == nil {
376		return Bucket{}, nil
377	}
378
379	return Bucket{
380		Name:    string(pbBucket.GetName()),
381		Created: pbBucket.GetCreatedAt(),
382	}, nil
383}
384
385// BeginObjectParams parmaters for BeginObject method.
386type BeginObjectParams struct {
387	Bucket               []byte
388	EncryptedPath        []byte
389	Version              int32
390	Redundancy           storj.RedundancyScheme
391	EncryptionParameters storj.EncryptionParameters
392	ExpiresAt            time.Time
393}
394
395func (params *BeginObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectBeginRequest {
396	return &pb.ObjectBeginRequest{
397		Header:        header,
398		Bucket:        params.Bucket,
399		EncryptedPath: params.EncryptedPath,
400		Version:       params.Version,
401		ExpiresAt:     params.ExpiresAt,
402		RedundancyScheme: &pb.RedundancyScheme{
403			Type:             pb.RedundancyScheme_SchemeType(params.Redundancy.Algorithm),
404			ErasureShareSize: params.Redundancy.ShareSize,
405			MinReq:           int32(params.Redundancy.RequiredShares),
406			RepairThreshold:  int32(params.Redundancy.RepairShares),
407			SuccessThreshold: int32(params.Redundancy.OptimalShares),
408			Total:            int32(params.Redundancy.TotalShares),
409		},
410		EncryptionParameters: &pb.EncryptionParameters{
411			CipherSuite: pb.CipherSuite(params.EncryptionParameters.CipherSuite),
412			BlockSize:   int64(params.EncryptionParameters.BlockSize),
413		},
414	}
415}
416
417// BatchItem returns single item for batch request.
418func (params *BeginObjectParams) BatchItem() *pb.BatchRequestItem {
419	return &pb.BatchRequestItem{
420		Request: &pb.BatchRequestItem_ObjectBegin{
421			ObjectBegin: params.toRequest(nil),
422		},
423	}
424}
425
426// BeginObjectResponse response for BeginObject request.
427type BeginObjectResponse struct {
428	StreamID             storj.StreamID
429	RedundancyStrategy   eestream.RedundancyStrategy
430	EncryptionParameters storj.EncryptionParameters
431}
432
433func newBeginObjectResponse(response *pb.ObjectBeginResponse, redundancyStrategy eestream.RedundancyStrategy) BeginObjectResponse {
434	ep := storj.EncryptionParameters{}
435	if response.EncryptionParameters != nil {
436		ep = storj.EncryptionParameters{
437			CipherSuite: storj.CipherSuite(response.EncryptionParameters.CipherSuite),
438			BlockSize:   int32(response.EncryptionParameters.BlockSize),
439		}
440	}
441
442	return BeginObjectResponse{
443		StreamID:             response.StreamId,
444		RedundancyStrategy:   redundancyStrategy,
445		EncryptionParameters: ep,
446	}
447}
448
449// BeginObject begins object creation.
450func (client *Client) BeginObject(ctx context.Context, params BeginObjectParams) (_ BeginObjectResponse, err error) {
451	defer mon.Task()(&ctx)(&err)
452
453	var response *pb.ObjectBeginResponse
454	err = WithRetry(ctx, func(ctx context.Context) error {
455		response, err = client.client.BeginObject(ctx, params.toRequest(client.header()))
456		return err
457	})
458	if err != nil {
459		return BeginObjectResponse{}, Error.Wrap(err)
460	}
461
462	rs := eestream.RedundancyStrategy{}
463	if response.RedundancyScheme != nil {
464		rs, err = eestream.NewRedundancyStrategyFromProto(response.RedundancyScheme)
465		if err != nil {
466			return BeginObjectResponse{}, Error.Wrap(err)
467		}
468	}
469
470	return newBeginObjectResponse(response, rs), nil
471}
472
473// CommitObjectParams parmaters for CommitObject method.
474type CommitObjectParams struct {
475	StreamID storj.StreamID
476
477	EncryptedMetadataNonce        storj.Nonce
478	EncryptedMetadata             []byte
479	EncryptedMetadataEncryptedKey []byte
480}
481
482func (params *CommitObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectCommitRequest {
483	return &pb.ObjectCommitRequest{
484		Header:                        header,
485		StreamId:                      params.StreamID,
486		EncryptedMetadataNonce:        params.EncryptedMetadataNonce,
487		EncryptedMetadata:             params.EncryptedMetadata,
488		EncryptedMetadataEncryptedKey: params.EncryptedMetadataEncryptedKey,
489	}
490}
491
492// BatchItem returns single item for batch request.
493func (params *CommitObjectParams) BatchItem() *pb.BatchRequestItem {
494	return &pb.BatchRequestItem{
495		Request: &pb.BatchRequestItem_ObjectCommit{
496			ObjectCommit: params.toRequest(nil),
497		},
498	}
499}
500
501// CommitObject commits a created object.
502func (client *Client) CommitObject(ctx context.Context, params CommitObjectParams) (err error) {
503	defer mon.Task()(&ctx)(&err)
504
505	err = WithRetry(ctx, func(ctx context.Context) error {
506		_, err = client.client.CommitObject(ctx, params.toRequest(client.header()))
507		return err
508	})
509	return Error.Wrap(err)
510}
511
512// GetObjectParams parameters for GetObject method.
513type GetObjectParams struct {
514	Bucket        []byte
515	EncryptedPath []byte
516	Version       int32
517
518	RedundancySchemePerSegment bool
519}
520
521func (params *GetObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectGetRequest {
522	return &pb.ObjectGetRequest{
523		Header:                     header,
524		Bucket:                     params.Bucket,
525		EncryptedPath:              params.EncryptedPath,
526		Version:                    params.Version,
527		RedundancySchemePerSegment: params.RedundancySchemePerSegment,
528	}
529}
530
531// BatchItem returns single item for batch request.
532func (params *GetObjectParams) BatchItem() *pb.BatchRequestItem {
533	return &pb.BatchRequestItem{
534		Request: &pb.BatchRequestItem_ObjectGet{
535			ObjectGet: params.toRequest(nil),
536		},
537	}
538}
539
540// GetObjectResponse response for GetObject request.
541type GetObjectResponse struct {
542	Info RawObjectItem
543}
544
545func newGetObjectResponse(response *pb.ObjectGetResponse) GetObjectResponse {
546	return GetObjectResponse{
547		Info: newObjectInfo(response.Object),
548	}
549}
550
551func newObjectInfo(object *pb.Object) RawObjectItem {
552	if object == nil {
553		return RawObjectItem{}
554	}
555
556	info := RawObjectItem{
557		Bucket:        string(object.Bucket),
558		EncryptedPath: object.EncryptedPath,
559		Version:       uint32(object.Version),
560
561		StreamID: object.StreamId,
562
563		Created:                       object.CreatedAt,
564		Modified:                      object.CreatedAt,
565		PlainSize:                     object.PlainSize,
566		Expires:                       object.ExpiresAt,
567		EncryptedMetadata:             object.EncryptedMetadata,
568		EncryptedMetadataNonce:        object.EncryptedMetadataNonce,
569		EncryptedMetadataEncryptedKey: object.EncryptedMetadataEncryptedKey,
570
571		EncryptionParameters: storj.EncryptionParameters{
572			CipherSuite: storj.CipherSuite(object.EncryptionParameters.CipherSuite),
573			BlockSize:   int32(object.EncryptionParameters.BlockSize),
574		},
575	}
576
577	pbRS := object.RedundancyScheme
578	if pbRS != nil {
579		info.RedundancyScheme = storj.RedundancyScheme{
580			Algorithm:      storj.RedundancyAlgorithm(pbRS.Type),
581			ShareSize:      pbRS.ErasureShareSize,
582			RequiredShares: int16(pbRS.MinReq),
583			RepairShares:   int16(pbRS.RepairThreshold),
584			OptimalShares:  int16(pbRS.SuccessThreshold),
585			TotalShares:    int16(pbRS.Total),
586		}
587	}
588	return info
589}
590
591// GetObject gets single object.
592func (client *Client) GetObject(ctx context.Context, params GetObjectParams) (_ RawObjectItem, err error) {
593	defer mon.Task()(&ctx)(&err)
594
595	var response *pb.ObjectGetResponse
596	err = WithRetry(ctx, func(ctx context.Context) error {
597		response, err = client.client.GetObject(ctx, params.toRequest(client.header()))
598		return err
599	})
600	if err != nil {
601		if errs2.IsRPC(err, rpcstatus.NotFound) {
602			return RawObjectItem{}, ErrObjectNotFound.Wrap(err)
603		}
604		return RawObjectItem{}, Error.Wrap(err)
605	}
606
607	getResponse := newGetObjectResponse(response)
608	return getResponse.Info, nil
609}
610
611// GetObjectIPsParams are params for the GetObjectIPs request.
612type GetObjectIPsParams struct {
613	Bucket        []byte
614	EncryptedPath []byte
615	Version       int32
616}
617
618// GetObjectIPsResponse is the response from GetObjectIPs.
619type GetObjectIPsResponse struct {
620	IPPorts            [][]byte
621	SegmentCount       int64
622	PieceCount         int64
623	ReliablePieceCount int64
624}
625
626func (params *GetObjectIPsParams) toRequest(header *pb.RequestHeader) *pb.ObjectGetIPsRequest {
627	return &pb.ObjectGetIPsRequest{
628		Header:        header,
629		Bucket:        params.Bucket,
630		EncryptedPath: params.EncryptedPath,
631		Version:       params.Version,
632	}
633}
634
635// GetObjectIPs returns the IP addresses of the nodes which hold the object.
636func (client *Client) GetObjectIPs(ctx context.Context, params GetObjectIPsParams) (r *GetObjectIPsResponse, err error) {
637	defer mon.Task()(&ctx)(&err)
638
639	var response *pb.ObjectGetIPsResponse
640	err = WithRetry(ctx, func(ctx context.Context) error {
641		response, err = client.client.GetObjectIPs(ctx, params.toRequest(client.header()))
642		return err
643	})
644	if err != nil {
645		if errs2.IsRPC(err, rpcstatus.NotFound) {
646			return nil, ErrObjectNotFound.Wrap(err)
647		}
648		return nil, Error.Wrap(err)
649	}
650
651	return &GetObjectIPsResponse{
652		IPPorts:            response.Ips,
653		SegmentCount:       response.SegmentCount,
654		PieceCount:         response.PieceCount,
655		ReliablePieceCount: response.ReliablePieceCount,
656	}, nil
657}
658
659// UpdateObjectMetadataParams are params for the UpdateObjectMetadata request.
660type UpdateObjectMetadataParams struct {
661	Bucket             []byte
662	EncryptedObjectKey []byte
663	Version            int32
664	StreamID           storj.StreamID
665
666	EncryptedMetadataNonce        storj.Nonce
667	EncryptedMetadata             []byte
668	EncryptedMetadataEncryptedKey []byte
669}
670
671func (params *UpdateObjectMetadataParams) toRequest(header *pb.RequestHeader) *pb.ObjectUpdateMetadataRequest {
672	return &pb.ObjectUpdateMetadataRequest{
673		Header:                        header,
674		Bucket:                        params.Bucket,
675		EncryptedObjectKey:            params.EncryptedObjectKey,
676		Version:                       params.Version,
677		StreamId:                      params.StreamID,
678		EncryptedMetadataNonce:        params.EncryptedMetadataNonce,
679		EncryptedMetadata:             params.EncryptedMetadata,
680		EncryptedMetadataEncryptedKey: params.EncryptedMetadataEncryptedKey,
681	}
682}
683
684// UpdateObjectMetadata replaces objects metadata.
685func (client *Client) UpdateObjectMetadata(ctx context.Context, params UpdateObjectMetadataParams) (err error) {
686	defer mon.Task()(&ctx)(&err)
687
688	err = WithRetry(ctx, func(ctx context.Context) error {
689		_, err = client.client.UpdateObjectMetadata(ctx, params.toRequest(client.header()))
690		return err
691	})
692	if err != nil {
693		if errs2.IsRPC(err, rpcstatus.NotFound) {
694			return ErrObjectNotFound.Wrap(err)
695		}
696	}
697
698	return Error.Wrap(err)
699}
700
701// BeginDeleteObjectParams parameters for BeginDeleteObject method.
702type BeginDeleteObjectParams struct {
703	Bucket        []byte
704	EncryptedPath []byte
705	Version       int32
706	StreamID      storj.StreamID
707	Status        int32
708}
709
710func (params *BeginDeleteObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectBeginDeleteRequest {
711	return &pb.ObjectBeginDeleteRequest{
712		Header:        header,
713		Bucket:        params.Bucket,
714		EncryptedPath: params.EncryptedPath,
715		Version:       params.Version,
716		StreamId:      &params.StreamID,
717		Status:        params.Status,
718	}
719}
720
721// BatchItem returns single item for batch request.
722func (params *BeginDeleteObjectParams) BatchItem() *pb.BatchRequestItem {
723	return &pb.BatchRequestItem{
724		Request: &pb.BatchRequestItem_ObjectBeginDelete{
725			ObjectBeginDelete: params.toRequest(nil),
726		},
727	}
728}
729
730// BeginDeleteObjectResponse response for BeginDeleteObject request.
731type BeginDeleteObjectResponse struct {
732}
733
734func newBeginDeleteObjectResponse(response *pb.ObjectBeginDeleteResponse) BeginDeleteObjectResponse {
735	return BeginDeleteObjectResponse{}
736}
737
738// BeginDeleteObject begins object deletion process.
739func (client *Client) BeginDeleteObject(ctx context.Context, params BeginDeleteObjectParams) (_ RawObjectItem, err error) {
740	defer mon.Task()(&ctx)(&err)
741
742	var response *pb.ObjectBeginDeleteResponse
743	err = WithRetry(ctx, func(ctx context.Context) error {
744		// response.StreamID is not processed because satellite will always return nil
745		response, err = client.client.BeginDeleteObject(ctx, params.toRequest(client.header()))
746		return err
747	})
748	if err != nil {
749		if errs2.IsRPC(err, rpcstatus.NotFound) {
750			return RawObjectItem{}, ErrObjectNotFound.Wrap(err)
751		}
752		return RawObjectItem{}, Error.Wrap(err)
753	}
754
755	return newObjectInfo(response.Object), nil
756}
757
758// ListObjectsParams parameters for ListObjects method.
759type ListObjectsParams struct {
760	Bucket                []byte
761	EncryptedPrefix       []byte
762	EncryptedCursor       []byte
763	Limit                 int32
764	IncludeCustomMetadata bool
765	IncludeSystemMetadata bool
766	Recursive             bool
767	Status                int32
768}
769
770func (params *ListObjectsParams) toRequest(header *pb.RequestHeader) *pb.ObjectListRequest {
771	return &pb.ObjectListRequest{
772		Header:          header,
773		Bucket:          params.Bucket,
774		EncryptedPrefix: params.EncryptedPrefix,
775		EncryptedCursor: params.EncryptedCursor,
776		Limit:           params.Limit,
777		ObjectIncludes: &pb.ObjectListItemIncludes{
778			Metadata:              params.IncludeCustomMetadata,
779			ExcludeSystemMetadata: !params.IncludeSystemMetadata,
780		},
781		UseObjectIncludes: true,
782		Recursive:         params.Recursive,
783		Status:            pb.Object_Status(params.Status),
784	}
785}
786
787// BatchItem returns single item for batch request.
788func (params *ListObjectsParams) BatchItem() *pb.BatchRequestItem {
789	return &pb.BatchRequestItem{
790		Request: &pb.BatchRequestItem_ObjectList{
791			ObjectList: params.toRequest(nil),
792		},
793	}
794}
795
796// ListObjectsResponse response for ListObjects request.
797type ListObjectsResponse struct {
798	Items []RawObjectListItem
799	More  bool
800}
801
802func newListObjectsResponse(response *pb.ObjectListResponse, encryptedPrefix []byte, recursive bool) ListObjectsResponse {
803	objects := make([]RawObjectListItem, len(response.Items))
804	for i, object := range response.Items {
805		encryptedPath := object.EncryptedPath
806		isPrefix := false
807		if !recursive && len(encryptedPath) != 0 && encryptedPath[len(encryptedPath)-1] == '/' && !bytes.Equal(encryptedPath, encryptedPrefix) {
808			isPrefix = true
809		}
810
811		objects[i] = RawObjectListItem{
812			EncryptedPath:          object.EncryptedPath,
813			Version:                object.Version,
814			Status:                 int32(object.Status),
815			StatusAt:               object.StatusAt,
816			CreatedAt:              object.CreatedAt,
817			ExpiresAt:              object.ExpiresAt,
818			PlainSize:              object.PlainSize,
819			EncryptedMetadataNonce: object.EncryptedMetadataNonce,
820			EncryptedMetadata:      object.EncryptedMetadata,
821
822			IsPrefix: isPrefix,
823		}
824
825		if object.StreamId != nil {
826			objects[i].StreamID = *object.StreamId
827		}
828	}
829
830	return ListObjectsResponse{
831		Items: objects,
832		More:  response.More,
833	}
834}
835
836// ListObjects lists objects according to specific parameters.
837func (client *Client) ListObjects(ctx context.Context, params ListObjectsParams) (_ []RawObjectListItem, more bool, err error) {
838	defer mon.Task()(&ctx)(&err)
839
840	var response *pb.ObjectListResponse
841	err = WithRetry(ctx, func(ctx context.Context) error {
842		response, err = client.client.ListObjects(ctx, params.toRequest(client.header()))
843		return err
844	})
845	if err != nil {
846		return []RawObjectListItem{}, false, Error.Wrap(err)
847	}
848
849	listResponse := newListObjectsResponse(response, params.EncryptedPrefix, params.Recursive)
850	return listResponse.Items, listResponse.More, Error.Wrap(err)
851}
852
853// ListPendingObjectStreamsParams parameters for ListPendingObjectStreams method.
854type ListPendingObjectStreamsParams struct {
855	Bucket          []byte
856	EncryptedPath   []byte
857	EncryptedCursor []byte
858	Limit           int32
859}
860
861func (params *ListPendingObjectStreamsParams) toRequest(header *pb.RequestHeader) *pb.ObjectListPendingStreamsRequest {
862	return &pb.ObjectListPendingStreamsRequest{
863		Header:         header,
864		Bucket:         params.Bucket,
865		EncryptedPath:  params.EncryptedPath,
866		StreamIdCursor: params.EncryptedCursor,
867		Limit:          params.Limit,
868	}
869}
870
871// BatchItem returns single item for batch request.
872func (params *ListPendingObjectStreamsParams) BatchItem() *pb.BatchRequestItem {
873	return &pb.BatchRequestItem{
874		Request: &pb.BatchRequestItem_PendingStreams{
875			PendingStreams: params.toRequest(nil),
876		},
877	}
878}
879
880// ListPendingObjectStreamsResponse response for ListPendingObjectStreams request.
881type ListPendingObjectStreamsResponse struct {
882	Items []RawObjectListItem
883	More  bool
884}
885
886func newListPendingObjectStreamsResponse(response *pb.ObjectListPendingStreamsResponse) ListPendingObjectStreamsResponse {
887	objects := make([]RawObjectListItem, len(response.Items))
888	for i, object := range response.Items {
889
890		objects[i] = RawObjectListItem{
891			EncryptedPath:          object.EncryptedPath,
892			Version:                object.Version,
893			Status:                 int32(object.Status),
894			StatusAt:               object.StatusAt,
895			CreatedAt:              object.CreatedAt,
896			ExpiresAt:              object.ExpiresAt,
897			PlainSize:              object.PlainSize,
898			EncryptedMetadataNonce: object.EncryptedMetadataNonce,
899			EncryptedMetadata:      object.EncryptedMetadata,
900
901			IsPrefix: false,
902		}
903
904		if object.StreamId != nil {
905			objects[i].StreamID = *object.StreamId
906		}
907	}
908
909	return ListPendingObjectStreamsResponse{
910		Items: objects,
911		More:  response.More,
912	}
913}
914
915// ListPendingObjectStreams lists pending objects with the specified object key in the specified bucket.
916func (client *Client) ListPendingObjectStreams(ctx context.Context, params ListPendingObjectStreamsParams) (_ ListPendingObjectStreamsResponse, err error) {
917	defer mon.Task()(&ctx)(&err)
918
919	var response *pb.ObjectListPendingStreamsResponse
920	err = WithRetry(ctx, func(ctx context.Context) error {
921		response, err = client.client.ListPendingObjectStreams(ctx, params.toRequest(client.header()))
922		return err
923	})
924	if err != nil {
925		return ListPendingObjectStreamsResponse{}, Error.Wrap(err)
926	}
927
928	return newListPendingObjectStreamsResponse(response), nil
929}
930
931// SegmentListItem represents listed segment.
932type SegmentListItem struct {
933	Position          SegmentPosition
934	PlainSize         int64
935	PlainOffset       int64
936	CreatedAt         time.Time
937	EncryptedETag     []byte
938	EncryptedKeyNonce storj.Nonce
939	EncryptedKey      []byte
940}
941
942// ListSegmentsParams parameters for ListSegments method.
943type ListSegmentsParams struct {
944	StreamID []byte
945	Cursor   SegmentPosition
946	Limit    int32
947	Range    StreamRange
948}
949
950func (params *ListSegmentsParams) toRequest(header *pb.RequestHeader) *pb.SegmentListRequest {
951	return &pb.SegmentListRequest{
952		Header:   header,
953		StreamId: params.StreamID,
954		CursorPosition: &pb.SegmentPosition{
955			PartNumber: params.Cursor.PartNumber,
956			Index:      params.Cursor.Index,
957		},
958		Limit: params.Limit,
959		Range: params.Range.toProto(),
960	}
961}
962
963// BatchItem returns single item for batch request.
964func (params *ListSegmentsParams) BatchItem() *pb.BatchRequestItem {
965	return &pb.BatchRequestItem{
966		Request: &pb.BatchRequestItem_SegmentList{
967			SegmentList: params.toRequest(nil),
968		},
969	}
970}
971
972// ListSegmentsResponse response for ListSegments request.
973type ListSegmentsResponse struct {
974	Items                []SegmentListItem
975	More                 bool
976	EncryptionParameters storj.EncryptionParameters
977}
978
979func newListSegmentsResponse(response *pb.SegmentListResponse) ListSegmentsResponse {
980	segments := make([]SegmentListItem, len(response.Items))
981	for i, segment := range response.Items {
982		segments[i] = SegmentListItem{
983			Position: SegmentPosition{
984				PartNumber: segment.Position.PartNumber,
985				Index:      segment.Position.Index,
986			},
987			PlainSize:         segment.PlainSize,
988			PlainOffset:       segment.PlainOffset,
989			CreatedAt:         segment.CreatedAt,
990			EncryptedETag:     segment.EncryptedETag,
991			EncryptedKeyNonce: segment.EncryptedKeyNonce,
992			EncryptedKey:      segment.EncryptedKey,
993		}
994	}
995
996	ep := storj.EncryptionParameters{}
997	if response.EncryptionParameters != nil {
998		ep = storj.EncryptionParameters{
999			CipherSuite: storj.CipherSuite(response.EncryptionParameters.CipherSuite),
1000			BlockSize:   int32(response.EncryptionParameters.BlockSize),
1001		}
1002	}
1003
1004	return ListSegmentsResponse{
1005		Items:                segments,
1006		More:                 response.More,
1007		EncryptionParameters: ep,
1008	}
1009}
1010
1011// ListSegments lists segments according to specific parameters.
1012func (client *Client) ListSegments(ctx context.Context, params ListSegmentsParams) (_ ListSegmentsResponse, err error) {
1013	defer mon.Task()(&ctx)(&err)
1014
1015	var response *pb.SegmentListResponse
1016	err = WithRetry(ctx, func(ctx context.Context) error {
1017		response, err = client.client.ListSegments(ctx, params.toRequest(client.header()))
1018		return err
1019	})
1020	if err != nil {
1021		return ListSegmentsResponse{}, Error.Wrap(err)
1022	}
1023
1024	return newListSegmentsResponse(response), nil
1025}
1026
1027// BeginSegmentParams parameters for BeginSegment method.
1028type BeginSegmentParams struct {
1029	StreamID      storj.StreamID
1030	Position      SegmentPosition
1031	MaxOrderLimit int64
1032}
1033
1034func (params *BeginSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentBeginRequest {
1035	return &pb.SegmentBeginRequest{
1036		Header:   header,
1037		StreamId: params.StreamID,
1038		Position: &pb.SegmentPosition{
1039			PartNumber: params.Position.PartNumber,
1040			Index:      params.Position.Index,
1041		},
1042		MaxOrderLimit: params.MaxOrderLimit,
1043	}
1044}
1045
1046// BatchItem returns single item for batch request.
1047func (params *BeginSegmentParams) BatchItem() *pb.BatchRequestItem {
1048	return &pb.BatchRequestItem{
1049		Request: &pb.BatchRequestItem_SegmentBegin{
1050			SegmentBegin: params.toRequest(nil),
1051		},
1052	}
1053}
1054
1055// BeginSegmentResponse response for BeginSegment request.
1056type BeginSegmentResponse struct {
1057	SegmentID          storj.SegmentID
1058	Limits             []*pb.AddressedOrderLimit
1059	PiecePrivateKey    storj.PiecePrivateKey
1060	RedundancyStrategy eestream.RedundancyStrategy
1061}
1062
1063func newBeginSegmentResponse(response *pb.SegmentBeginResponse) (BeginSegmentResponse, error) {
1064	var rs eestream.RedundancyStrategy
1065	var err error
1066	if response.RedundancyScheme != nil {
1067		rs, err = eestream.NewRedundancyStrategyFromProto(response.RedundancyScheme)
1068		if err != nil {
1069			return BeginSegmentResponse{}, err
1070		}
1071	}
1072	return BeginSegmentResponse{
1073		SegmentID:          response.SegmentId,
1074		Limits:             response.AddressedLimits,
1075		PiecePrivateKey:    response.PrivateKey,
1076		RedundancyStrategy: rs,
1077	}, nil
1078}
1079
1080// BeginSegment begins a segment upload.
1081func (client *Client) BeginSegment(ctx context.Context, params BeginSegmentParams) (_ BeginSegmentResponse, err error) {
1082	defer mon.Task()(&ctx)(&err)
1083
1084	var response *pb.SegmentBeginResponse
1085	err = WithRetry(ctx, func(ctx context.Context) error {
1086		response, err = client.client.BeginSegment(ctx, params.toRequest(client.header()))
1087		return err
1088	})
1089	if err != nil {
1090		return BeginSegmentResponse{}, Error.Wrap(err)
1091	}
1092
1093	return newBeginSegmentResponse(response)
1094}
1095
1096// CommitSegmentParams parameters for CommitSegment method.
1097type CommitSegmentParams struct {
1098	SegmentID         storj.SegmentID
1099	Encryption        SegmentEncryption
1100	SizeEncryptedData int64
1101	PlainSize         int64
1102	EncryptedTag      []byte
1103
1104	UploadResult []*pb.SegmentPieceUploadResult
1105}
1106
1107func (params *CommitSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentCommitRequest {
1108	return &pb.SegmentCommitRequest{
1109		Header:    header,
1110		SegmentId: params.SegmentID,
1111
1112		EncryptedKeyNonce: params.Encryption.EncryptedKeyNonce,
1113		EncryptedKey:      params.Encryption.EncryptedKey,
1114		SizeEncryptedData: params.SizeEncryptedData,
1115		PlainSize:         params.PlainSize,
1116		EncryptedETag:     params.EncryptedTag,
1117		UploadResult:      params.UploadResult,
1118	}
1119}
1120
1121// BatchItem returns single item for batch request.
1122func (params *CommitSegmentParams) BatchItem() *pb.BatchRequestItem {
1123	return &pb.BatchRequestItem{
1124		Request: &pb.BatchRequestItem_SegmentCommit{
1125			SegmentCommit: params.toRequest(nil),
1126		},
1127	}
1128}
1129
1130// CommitSegment commits an uploaded segment.
1131func (client *Client) CommitSegment(ctx context.Context, params CommitSegmentParams) (err error) {
1132	defer mon.Task()(&ctx)(&err)
1133
1134	err = WithRetry(ctx, func(ctx context.Context) error {
1135		_, err = client.client.CommitSegment(ctx, params.toRequest(client.header()))
1136		return err
1137	})
1138
1139	return Error.Wrap(err)
1140}
1141
1142// MakeInlineSegmentParams parameters for MakeInlineSegment method.
1143type MakeInlineSegmentParams struct {
1144	StreamID            storj.StreamID
1145	Position            SegmentPosition
1146	Encryption          SegmentEncryption
1147	EncryptedInlineData []byte
1148	PlainSize           int64
1149	EncryptedTag        []byte
1150}
1151
1152func (params *MakeInlineSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentMakeInlineRequest {
1153	return &pb.SegmentMakeInlineRequest{
1154		Header:   header,
1155		StreamId: params.StreamID,
1156		Position: &pb.SegmentPosition{
1157			PartNumber: params.Position.PartNumber,
1158			Index:      params.Position.Index,
1159		},
1160		EncryptedKeyNonce:   params.Encryption.EncryptedKeyNonce,
1161		EncryptedKey:        params.Encryption.EncryptedKey,
1162		EncryptedInlineData: params.EncryptedInlineData,
1163		PlainSize:           params.PlainSize,
1164		EncryptedETag:       params.EncryptedTag,
1165	}
1166}
1167
1168// BatchItem returns single item for batch request.
1169func (params *MakeInlineSegmentParams) BatchItem() *pb.BatchRequestItem {
1170	return &pb.BatchRequestItem{
1171		Request: &pb.BatchRequestItem_SegmentMakeInline{
1172			SegmentMakeInline: params.toRequest(nil),
1173		},
1174	}
1175}
1176
1177// MakeInlineSegment creates an inline segment.
1178func (client *Client) MakeInlineSegment(ctx context.Context, params MakeInlineSegmentParams) (err error) {
1179	defer mon.Task()(&ctx)(&err)
1180
1181	err = WithRetry(ctx, func(ctx context.Context) error {
1182		_, err = client.client.MakeInlineSegment(ctx, params.toRequest(client.header()))
1183		return err
1184	})
1185
1186	return Error.Wrap(err)
1187}
1188
1189// DownloadObjectParams parameters for DownloadSegment method.
1190type DownloadObjectParams struct {
1191	Bucket             []byte
1192	EncryptedObjectKey []byte
1193
1194	Range StreamRange
1195}
1196
1197// StreamRange contains range specification.
1198type StreamRange struct {
1199	Mode   StreamRangeMode
1200	Start  int64
1201	Limit  int64
1202	Suffix int64
1203}
1204
1205// StreamRangeMode contains different modes for range.
1206type StreamRangeMode byte
1207
1208const (
1209	// StreamRangeAll selects all.
1210	StreamRangeAll StreamRangeMode = iota
1211	// StreamRangeStart selects starting from range.Start.
1212	StreamRangeStart
1213	// StreamRangeStartLimit selects starting from range.Start to range.End (inclusive).
1214	StreamRangeStartLimit
1215	// StreamRangeSuffix selects last range.Suffix bytes.
1216	StreamRangeSuffix
1217)
1218
1219func (streamRange StreamRange) toProto() *pb.Range {
1220	switch streamRange.Mode {
1221	case StreamRangeAll:
1222	case StreamRangeStart:
1223		return &pb.Range{
1224			Range: &pb.Range_Start{
1225				Start: &pb.RangeStart{
1226					PlainStart: streamRange.Start,
1227				},
1228			},
1229		}
1230	case StreamRangeStartLimit:
1231		return &pb.Range{
1232			Range: &pb.Range_StartLimit{
1233				StartLimit: &pb.RangeStartLimit{
1234					PlainStart: streamRange.Start,
1235					PlainLimit: streamRange.Limit,
1236				},
1237			},
1238		}
1239	case StreamRangeSuffix:
1240		return &pb.Range{
1241			Range: &pb.Range_Suffix{
1242				Suffix: &pb.RangeSuffix{
1243					PlainSuffix: streamRange.Suffix,
1244				},
1245			},
1246		}
1247	}
1248	return nil
1249}
1250
1251// Normalize converts the range to a StreamRangeStartLimit or StreamRangeAll.
1252func (streamRange StreamRange) Normalize(plainSize int64) StreamRange {
1253	switch streamRange.Mode {
1254	case StreamRangeAll:
1255		streamRange.Start = 0
1256		streamRange.Limit = plainSize
1257	case StreamRangeStart:
1258		streamRange.Mode = StreamRangeStartLimit
1259		streamRange.Limit = plainSize
1260	case StreamRangeStartLimit:
1261	case StreamRangeSuffix:
1262		streamRange.Mode = StreamRangeStartLimit
1263		streamRange.Start = plainSize - streamRange.Suffix
1264		streamRange.Limit = plainSize
1265	}
1266
1267	if streamRange.Start < 0 {
1268		streamRange.Start = 0
1269	}
1270	if streamRange.Limit > plainSize {
1271		streamRange.Limit = plainSize
1272	}
1273	streamRange.Suffix = 0
1274
1275	return streamRange
1276}
1277
1278func (params *DownloadObjectParams) toRequest(header *pb.RequestHeader) *pb.ObjectDownloadRequest {
1279	return &pb.ObjectDownloadRequest{
1280		Header:             header,
1281		Bucket:             params.Bucket,
1282		EncryptedObjectKey: params.EncryptedObjectKey,
1283		Range:              params.Range.toProto(),
1284	}
1285}
1286
1287// BatchItem returns single item for batch request.
1288func (params *DownloadObjectParams) BatchItem() *pb.BatchRequestItem {
1289	return &pb.BatchRequestItem{
1290		Request: &pb.BatchRequestItem_ObjectDownload{
1291			ObjectDownload: params.toRequest(nil),
1292		},
1293	}
1294}
1295
1296// DownloadObjectResponse response for DownloadSegment request.
1297type DownloadObjectResponse struct {
1298	Object             RawObjectItem
1299	DownloadedSegments []DownloadSegmentWithRSResponse
1300	ListSegments       ListSegmentsResponse
1301}
1302
1303func newDownloadObjectResponse(response *pb.ObjectDownloadResponse) DownloadObjectResponse {
1304	downloadedSegments := make([]DownloadSegmentWithRSResponse, 0, len(response.SegmentDownload))
1305	for _, segmentDownload := range response.SegmentDownload {
1306		downloadedSegments = append(downloadedSegments, newDownloadSegmentResponseWithRS(segmentDownload))
1307	}
1308	return DownloadObjectResponse{
1309		Object:             newObjectInfo(response.Object),
1310		DownloadedSegments: downloadedSegments,
1311		ListSegments:       newListSegmentsResponse(response.SegmentList),
1312	}
1313}
1314
1315// DownloadObject gets object information, lists segments and downloads the first segment.
1316func (client *Client) DownloadObject(ctx context.Context, params DownloadObjectParams) (_ DownloadObjectResponse, err error) {
1317	defer mon.Task()(&ctx)(&err)
1318
1319	var response *pb.ObjectDownloadResponse
1320	err = WithRetry(ctx, func(ctx context.Context) error {
1321		response, err = client.client.DownloadObject(ctx, params.toRequest(client.header()))
1322		return err
1323	})
1324	if err != nil {
1325		if errs2.IsRPC(err, rpcstatus.NotFound) {
1326			return DownloadObjectResponse{}, storj.ErrObjectNotFound.Wrap(err)
1327		}
1328		return DownloadObjectResponse{}, Error.Wrap(err)
1329	}
1330
1331	return newDownloadObjectResponse(response), nil
1332}
1333
1334// DownloadSegmentParams parameters for DownloadSegment method.
1335type DownloadSegmentParams struct {
1336	StreamID storj.StreamID
1337	Position SegmentPosition
1338}
1339
1340func (params *DownloadSegmentParams) toRequest(header *pb.RequestHeader) *pb.SegmentDownloadRequest {
1341	return &pb.SegmentDownloadRequest{
1342		Header:   header,
1343		StreamId: params.StreamID,
1344		CursorPosition: &pb.SegmentPosition{
1345			PartNumber: params.Position.PartNumber,
1346			Index:      params.Position.Index,
1347		},
1348	}
1349}
1350
1351// BatchItem returns single item for batch request.
1352func (params *DownloadSegmentParams) BatchItem() *pb.BatchRequestItem {
1353	return &pb.BatchRequestItem{
1354		Request: &pb.BatchRequestItem_SegmentDownload{
1355			SegmentDownload: params.toRequest(nil),
1356		},
1357	}
1358}
1359
1360// DownloadSegmentResponse response for DownloadSegment request.
1361type DownloadSegmentResponse struct {
1362	Info SegmentDownloadResponseInfo
1363
1364	Limits []*pb.AddressedOrderLimit
1365}
1366
1367func newDownloadSegmentResponse(response *pb.SegmentDownloadResponse) DownloadSegmentResponse {
1368	info := SegmentDownloadResponseInfo{
1369		SegmentID:           response.SegmentId,
1370		EncryptedSize:       response.SegmentSize,
1371		EncryptedInlineData: response.EncryptedInlineData,
1372		PiecePrivateKey:     response.PrivateKey,
1373		SegmentEncryption: SegmentEncryption{
1374			EncryptedKeyNonce: response.EncryptedKeyNonce,
1375			EncryptedKey:      response.EncryptedKey,
1376		},
1377	}
1378	if response.Next != nil {
1379		info.Next = SegmentPosition{
1380			PartNumber: response.Next.PartNumber,
1381			Index:      response.Next.Index,
1382		}
1383	}
1384
1385	for i := range response.AddressedLimits {
1386		if response.AddressedLimits[i].Limit == nil {
1387			response.AddressedLimits[i] = nil
1388		}
1389	}
1390	return DownloadSegmentResponse{
1391		Info:   info,
1392		Limits: response.AddressedLimits,
1393	}
1394}
1395
1396// DownloadSegment gets information for downloading remote segment or data
1397// from an inline segment.
1398func (client *Client) DownloadSegment(ctx context.Context, params DownloadSegmentParams) (_ SegmentDownloadResponseInfo, _ []*pb.AddressedOrderLimit, err error) {
1399	defer mon.Task()(&ctx)(&err)
1400
1401	var response *pb.SegmentDownloadResponse
1402	err = WithRetry(ctx, func(ctx context.Context) error {
1403		response, err = client.client.DownloadSegment(ctx, params.toRequest(client.header()))
1404		return err
1405	})
1406	if err != nil {
1407		if errs2.IsRPC(err, rpcstatus.NotFound) {
1408			return SegmentDownloadResponseInfo{}, nil, ErrObjectNotFound.Wrap(err)
1409		}
1410		return SegmentDownloadResponseInfo{}, nil, Error.Wrap(err)
1411	}
1412
1413	downloadResponse := newDownloadSegmentResponse(response)
1414	return downloadResponse.Info, downloadResponse.Limits, nil
1415}
1416
1417// DownloadSegmentWithRSResponse contains information for downloading remote segment or data from an inline segment.
1418type DownloadSegmentWithRSResponse struct {
1419	Info   SegmentDownloadInfo
1420	Limits []*pb.AddressedOrderLimit
1421}
1422
1423// SegmentDownloadInfo represents information necessary for downloading segment (inline and remote).
1424type SegmentDownloadInfo struct {
1425	SegmentID           storj.SegmentID
1426	PlainOffset         int64
1427	PlainSize           int64
1428	EncryptedSize       int64
1429	EncryptedInlineData []byte
1430	PiecePrivateKey     storj.PiecePrivateKey
1431	SegmentEncryption   SegmentEncryption
1432	RedundancyScheme    storj.RedundancyScheme
1433	Position            *storj.SegmentPosition
1434}
1435
1436func newDownloadSegmentResponseWithRS(response *pb.SegmentDownloadResponse) DownloadSegmentWithRSResponse {
1437	info := SegmentDownloadInfo{
1438		SegmentID:           response.SegmentId,
1439		PlainOffset:         response.PlainOffset,
1440		PlainSize:           response.PlainSize,
1441		EncryptedSize:       response.SegmentSize,
1442		EncryptedInlineData: response.EncryptedInlineData,
1443		PiecePrivateKey:     response.PrivateKey,
1444		SegmentEncryption: SegmentEncryption{
1445			EncryptedKeyNonce: response.EncryptedKeyNonce,
1446			EncryptedKey:      response.EncryptedKey,
1447		},
1448	}
1449
1450	if response.Position != nil {
1451		info.Position = &storj.SegmentPosition{
1452			PartNumber: response.Position.PartNumber,
1453			Index:      response.Position.Index,
1454		}
1455	}
1456
1457	if response.RedundancyScheme != nil {
1458		info.RedundancyScheme = storj.RedundancyScheme{
1459			Algorithm:      storj.RedundancyAlgorithm(response.RedundancyScheme.Type),
1460			ShareSize:      response.RedundancyScheme.ErasureShareSize,
1461			RequiredShares: int16(response.RedundancyScheme.MinReq),
1462			RepairShares:   int16(response.RedundancyScheme.RepairThreshold),
1463			OptimalShares:  int16(response.RedundancyScheme.SuccessThreshold),
1464			TotalShares:    int16(response.RedundancyScheme.Total),
1465		}
1466	}
1467
1468	for i := range response.AddressedLimits {
1469		if response.AddressedLimits[i].Limit == nil {
1470			response.AddressedLimits[i] = nil
1471		}
1472	}
1473	return DownloadSegmentWithRSResponse{
1474		Info:   info,
1475		Limits: response.AddressedLimits,
1476	}
1477}
1478
1479// TODO replace DownloadSegment with DownloadSegmentWithRS in batch
1480
1481// DownloadSegmentWithRS gets information for downloading remote segment or data from an inline segment.
1482func (client *Client) DownloadSegmentWithRS(ctx context.Context, params DownloadSegmentParams) (_ DownloadSegmentWithRSResponse, err error) {
1483	defer mon.Task()(&ctx)(&err)
1484
1485	var response *pb.SegmentDownloadResponse
1486	err = WithRetry(ctx, func(ctx context.Context) error {
1487		response, err = client.client.DownloadSegment(ctx, params.toRequest(client.header()))
1488		return err
1489	})
1490	if err != nil {
1491		if errs2.IsRPC(err, rpcstatus.NotFound) {
1492			return DownloadSegmentWithRSResponse{}, ErrObjectNotFound.Wrap(err)
1493		}
1494		return DownloadSegmentWithRSResponse{}, Error.Wrap(err)
1495	}
1496
1497	return newDownloadSegmentResponseWithRS(response), nil
1498}
1499
1500// DeletePartParams contains information needed to delete part.
1501type DeletePartParams struct {
1502	StreamID   storj.StreamID
1503	PartNumber uint32
1504}
1505
1506func (params *DeletePartParams) toRequest(header *pb.RequestHeader) *pb.PartDeleteRequest {
1507	return &pb.PartDeleteRequest{
1508		Header: header,
1509
1510		StreamId:   params.StreamID,
1511		PartNumber: int32(params.PartNumber),
1512	}
1513}
1514
1515// BatchItem returns single item for batch request.
1516func (params *DeletePartParams) BatchItem() *pb.BatchRequestItem {
1517	return &pb.BatchRequestItem{
1518		Request: &pb.BatchRequestItem_PartDelete{
1519			PartDelete: params.toRequest(nil),
1520		},
1521	}
1522}
1523
1524// DeletePart deletes single part.
1525func (client *Client) DeletePart(ctx context.Context, params DeletePartParams) (err error) {
1526	defer mon.Task()(&ctx)(&err)
1527
1528	_, err = client.client.DeletePart(ctx, params.toRequest(client.header()))
1529	return err
1530}
1531
1532// RevokeAPIKey revokes the APIKey provided in the params.
1533func (client *Client) RevokeAPIKey(ctx context.Context, params RevokeAPIKeyParams) (err error) {
1534	defer mon.Task()(&ctx)(&err)
1535	err = WithRetry(ctx, func(ctx context.Context) error {
1536		_, err = client.client.RevokeAPIKey(ctx, params.toRequest(client.header()))
1537		return err
1538	})
1539	return Error.Wrap(err)
1540}
1541
1542// RevokeAPIKeyParams contain params for a RevokeAPIKey request.
1543type RevokeAPIKeyParams struct {
1544	APIKey []byte
1545}
1546
1547func (r RevokeAPIKeyParams) toRequest(header *pb.RequestHeader) *pb.RevokeAPIKeyRequest {
1548	return &pb.RevokeAPIKeyRequest{
1549		Header: header,
1550		ApiKey: r.APIKey,
1551	}
1552}
1553
1554// Batch sends multiple requests in one batch.
1555func (client *Client) Batch(ctx context.Context, requests ...BatchItem) (resp []BatchResponse, err error) {
1556	defer mon.Task()(&ctx)(&err)
1557
1558	batchItems := make([]*pb.BatchRequestItem, len(requests))
1559	for i, request := range requests {
1560		batchItems[i] = request.BatchItem()
1561	}
1562	response, err := client.client.Batch(ctx, &pb.BatchRequest{
1563		Header:   client.header(),
1564		Requests: batchItems,
1565	})
1566	if err != nil {
1567		return []BatchResponse{}, Error.Wrap(err)
1568	}
1569
1570	resp = make([]BatchResponse, len(response.Responses))
1571	for i, response := range response.Responses {
1572		resp[i] = BatchResponse{
1573			pbRequest:  batchItems[i].Request,
1574			pbResponse: response.Response,
1575		}
1576	}
1577
1578	return resp, nil
1579}
1580
1581// SetRawAPIKey sets the client's raw API key. Mainly used for testing.
1582func (client *Client) SetRawAPIKey(key []byte) {
1583	client.apiKeyRaw = key
1584}
1585