1// Copyright (C) 2021 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package operators 5 6import ( 7 "context" 8 9 "github.com/spacemonkeygo/monkit/v3" 10 "github.com/zeebo/errs" 11 "go.uber.org/zap" 12 13 "storj.io/common/rpc" 14 "storj.io/common/storj" 15 "storj.io/storj/multinode/nodes" 16 "storj.io/storj/private/multinodepb" 17) 18 19// MaxOperatorsOnPage defines maximum limit on operators page. 20const MaxOperatorsOnPage = 5 21 22var ( 23 mon = monkit.Package() 24 // Error is an error class for operators service error. 25 Error = errs.Class("operators") 26) 27 28// Service exposes all operators related logic. 29// 30// architecture: Service 31type Service struct { 32 log *zap.Logger 33 dialer rpc.Dialer 34 nodes nodes.DB 35} 36 37// NewService creates new instance of Service. 38func NewService(log *zap.Logger, dialer rpc.Dialer, nodes nodes.DB) *Service { 39 return &Service{ 40 log: log, 41 dialer: dialer, 42 nodes: nodes, 43 } 44} 45 46// ListPaginated returns paginated list of operators. 47func (service *Service) ListPaginated(ctx context.Context, cursor Cursor) (_ Page, err error) { 48 defer mon.Task()(&ctx)(&err) 49 if cursor.Limit > MaxOperatorsOnPage { 50 cursor.Limit = MaxOperatorsOnPage 51 } 52 if cursor.Limit < 1 { 53 cursor.Limit = 1 54 } 55 if cursor.Page == 0 { 56 return Page{}, Error.Wrap(errs.New("page can not be 0")) 57 } 58 page, err := service.nodes.ListPaged(ctx, nodes.Cursor{ 59 Limit: cursor.Limit, 60 Page: cursor.Page, 61 }) 62 if err != nil { 63 return Page{}, Error.Wrap(err) 64 } 65 66 var operators []Operator 67 for _, node := range page.Nodes { 68 operator, err := service.GetOperator(ctx, node) 69 if err != nil { 70 if nodes.ErrNodeNotReachable.Has(err) { 71 continue 72 } 73 74 return Page{}, Error.Wrap(err) 75 } 76 operators = append(operators, operator) 77 } 78 79 return Page{ 80 Operators: operators, 81 Offset: page.Offset, 82 Limit: page.Limit, 83 CurrentPage: page.CurrentPage, 84 PageCount: page.PageCount, 85 TotalCount: page.TotalCount, 86 }, nil 87} 88 89// GetOperator retrieves operator form node via rpc. 90func (service *Service) GetOperator(ctx context.Context, node nodes.Node) (_ Operator, err error) { 91 defer mon.Task()(&ctx)(&err) 92 93 conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{ 94 ID: node.ID, 95 Address: node.PublicAddress, 96 }) 97 if err != nil { 98 return Operator{}, nodes.ErrNodeNotReachable.Wrap(err) 99 } 100 defer func() { 101 err = errs.Combine(err, conn.Close()) 102 }() 103 104 nodeClient := multinodepb.NewDRPCNodeClient(conn) 105 payoutClient := multinodepb.NewDRPCPayoutClient(conn) 106 header := &multinodepb.RequestHeader{ 107 ApiKey: node.APISecret, 108 } 109 110 operatorResponse, err := nodeClient.Operator(ctx, &multinodepb.OperatorRequest{Header: header}) 111 if err != nil { 112 return Operator{}, Error.Wrap(err) 113 } 114 undistributedResponse, err := payoutClient.Undistributed(ctx, &multinodepb.UndistributedRequest{Header: header}) 115 if err != nil { 116 return Operator{}, Error.Wrap(err) 117 } 118 119 return Operator{ 120 NodeID: node.ID, 121 Email: operatorResponse.Email, 122 Wallet: operatorResponse.Wallet, 123 WalletFeatures: operatorResponse.WalletFeatures, 124 Undistributed: undistributedResponse.Total, 125 }, nil 126} 127