1// Copyright (C) 2021 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package operators
5
6import (
7	"context"
8
9	"github.com/spacemonkeygo/monkit/v3"
10	"github.com/zeebo/errs"
11	"go.uber.org/zap"
12
13	"storj.io/common/rpc"
14	"storj.io/common/storj"
15	"storj.io/storj/multinode/nodes"
16	"storj.io/storj/private/multinodepb"
17)
18
19// MaxOperatorsOnPage defines maximum limit on operators page.
20const MaxOperatorsOnPage = 5
21
22var (
23	mon = monkit.Package()
24	// Error is an error class for operators service error.
25	Error = errs.Class("operators")
26)
27
28// Service exposes all operators related logic.
29//
30// architecture: Service
31type Service struct {
32	log    *zap.Logger
33	dialer rpc.Dialer
34	nodes  nodes.DB
35}
36
37// NewService creates new instance of Service.
38func NewService(log *zap.Logger, dialer rpc.Dialer, nodes nodes.DB) *Service {
39	return &Service{
40		log:    log,
41		dialer: dialer,
42		nodes:  nodes,
43	}
44}
45
46// ListPaginated returns paginated list of operators.
47func (service *Service) ListPaginated(ctx context.Context, cursor Cursor) (_ Page, err error) {
48	defer mon.Task()(&ctx)(&err)
49	if cursor.Limit > MaxOperatorsOnPage {
50		cursor.Limit = MaxOperatorsOnPage
51	}
52	if cursor.Limit < 1 {
53		cursor.Limit = 1
54	}
55	if cursor.Page == 0 {
56		return Page{}, Error.Wrap(errs.New("page can not be 0"))
57	}
58	page, err := service.nodes.ListPaged(ctx, nodes.Cursor{
59		Limit: cursor.Limit,
60		Page:  cursor.Page,
61	})
62	if err != nil {
63		return Page{}, Error.Wrap(err)
64	}
65
66	var operators []Operator
67	for _, node := range page.Nodes {
68		operator, err := service.GetOperator(ctx, node)
69		if err != nil {
70			if nodes.ErrNodeNotReachable.Has(err) {
71				continue
72			}
73
74			return Page{}, Error.Wrap(err)
75		}
76		operators = append(operators, operator)
77	}
78
79	return Page{
80		Operators:   operators,
81		Offset:      page.Offset,
82		Limit:       page.Limit,
83		CurrentPage: page.CurrentPage,
84		PageCount:   page.PageCount,
85		TotalCount:  page.TotalCount,
86	}, nil
87}
88
89// GetOperator retrieves operator form node via rpc.
90func (service *Service) GetOperator(ctx context.Context, node nodes.Node) (_ Operator, err error) {
91	defer mon.Task()(&ctx)(&err)
92
93	conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
94		ID:      node.ID,
95		Address: node.PublicAddress,
96	})
97	if err != nil {
98		return Operator{}, nodes.ErrNodeNotReachable.Wrap(err)
99	}
100	defer func() {
101		err = errs.Combine(err, conn.Close())
102	}()
103
104	nodeClient := multinodepb.NewDRPCNodeClient(conn)
105	payoutClient := multinodepb.NewDRPCPayoutClient(conn)
106	header := &multinodepb.RequestHeader{
107		ApiKey: node.APISecret,
108	}
109
110	operatorResponse, err := nodeClient.Operator(ctx, &multinodepb.OperatorRequest{Header: header})
111	if err != nil {
112		return Operator{}, Error.Wrap(err)
113	}
114	undistributedResponse, err := payoutClient.Undistributed(ctx, &multinodepb.UndistributedRequest{Header: header})
115	if err != nil {
116		return Operator{}, Error.Wrap(err)
117	}
118
119	return Operator{
120		NodeID:         node.ID,
121		Email:          operatorResponse.Email,
122		Wallet:         operatorResponse.Wallet,
123		WalletFeatures: operatorResponse.WalletFeatures,
124		Undistributed:  undistributedResponse.Total,
125	}, nil
126}
127