1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package main
5
6import (
7	"context"
8	"io"
9	"net"
10	"time"
11
12	"github.com/zeebo/errs"
13	"go.uber.org/zap"
14
15	"storj.io/common/storj"
16	"storj.io/storj/satellite/accounting"
17	"storj.io/storj/satellite/compensation"
18	"storj.io/storj/satellite/overlay"
19	"storj.io/storj/satellite/satellitedb"
20)
21
22func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io.Writer) (err error) {
23	periodInfo := compensation.PeriodInfo{
24		Period: period,
25		Rates: &compensation.Rates{
26			AtRestGBHours: generateInvoicesCfg.Compensation.Rates.AtRestGBHours,
27			GetTB:         generateInvoicesCfg.Compensation.Rates.GetTB,
28			PutTB:         generateInvoicesCfg.Compensation.Rates.PutTB,
29			GetRepairTB:   generateInvoicesCfg.Compensation.Rates.GetRepairTB,
30			PutRepairTB:   generateInvoicesCfg.Compensation.Rates.PutRepairTB,
31			GetAuditTB:    generateInvoicesCfg.Compensation.Rates.GetAuditTB,
32		},
33		SurgePercent:     generateInvoicesCfg.SurgePercent,
34		DisposePercent:   generateInvoicesCfg.Compensation.DisposePercent,
35		WithheldPercents: generateInvoicesCfg.Compensation.WithheldPercents,
36	}
37
38	db, err := satellitedb.Open(ctx, zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
39	if err != nil {
40		return errs.New("error connecting to master database on satellite: %+v", err)
41	}
42	defer func() { err = errs.Combine(err, db.Close()) }()
43
44	if err := db.CheckVersion(ctx); err != nil {
45		zap.L().Fatal("Failed satellite database version check.", zap.Error(err))
46		return errs.New("Error checking version for satellitedb: %+v", err)
47	}
48
49	periodUsage, err := db.StoragenodeAccounting().QueryStorageNodePeriodUsage(ctx, period)
50	if err != nil {
51		return err
52	}
53
54	periodUsageByNode := make(map[storj.NodeID]accounting.StorageNodePeriodUsage, len(periodUsage))
55	for _, usage := range periodUsage {
56		periodUsageByNode[usage.NodeID] = usage
57	}
58
59	var allNodes []*overlay.NodeDossier
60	err = db.OverlayCache().IterateAllNodeDossiers(ctx,
61		func(ctx context.Context, node *overlay.NodeDossier) error {
62			allNodes = append(allNodes, node)
63			return nil
64		})
65	if err != nil {
66		return err
67	}
68
69	invoices := make([]compensation.Invoice, 0, len(allNodes))
70	for _, node := range allNodes {
71		totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, node.Id)
72		if err != nil {
73			return err
74		}
75
76		var gracefulExit *time.Time
77		if node.ExitStatus.ExitSuccess {
78			gracefulExit = node.ExitStatus.ExitFinishedAt
79		}
80		nodeAddress, _, err := net.SplitHostPort(node.Address.Address)
81		if err != nil {
82			return errs.New("unable to split node %q address %q", node.Id, node.Address.Address)
83		}
84		var nodeLastIP string
85		if node.LastIPPort != "" {
86			nodeLastIP, _, err = net.SplitHostPort(node.LastIPPort)
87			if err != nil {
88				return errs.New("unable to split node %q last ip:port %q", node.Id, node.LastIPPort)
89			}
90		}
91
92		// the zero value of period usage is acceptable for if the node does not have
93		// any usage for the period.
94		usage := periodUsageByNode[node.Id]
95		nodeInfo := compensation.NodeInfo{
96			ID:                 node.Id,
97			CreatedAt:          node.CreatedAt,
98			LastContactSuccess: node.Reputation.LastContactSuccess,
99			Disqualified:       node.Disqualified,
100			GracefulExit:       gracefulExit,
101			UsageAtRest:        usage.AtRestTotal,
102			UsageGet:           usage.GetTotal,
103			UsagePut:           usage.PutTotal,
104			UsageGetRepair:     usage.GetRepairTotal,
105			UsagePutRepair:     usage.PutRepairTotal,
106			UsageGetAudit:      usage.GetAuditTotal,
107			TotalHeld:          totalAmounts.TotalHeld,
108			TotalDisposed:      totalAmounts.TotalDisposed,
109			TotalPaid:          totalAmounts.TotalPaid,
110			TotalDistributed:   totalAmounts.TotalDistributed,
111		}
112
113		invoice := compensation.Invoice{
114			Period:             period,
115			NodeID:             compensation.NodeID(node.Id),
116			NodeWallet:         node.Operator.Wallet,
117			NodeWalletFeatures: node.Operator.WalletFeatures,
118			NodeAddress:        nodeAddress,
119			NodeLastIP:         nodeLastIP,
120		}
121
122		if err := invoice.MergeNodeInfo(nodeInfo); err != nil {
123			return err
124		}
125		invoices = append(invoices, invoice)
126		periodInfo.Nodes = append(periodInfo.Nodes, nodeInfo)
127	}
128
129	statements, err := compensation.GenerateStatements(periodInfo)
130	if err != nil {
131		return err
132	}
133
134	for i := 0; i < len(statements); i++ {
135		if err := invoices[i].MergeStatement(statements[i]); err != nil {
136			return err
137		}
138	}
139
140	if err := compensation.WriteInvoices(out, invoices); err != nil {
141		return err
142	}
143
144	return nil
145}
146
147func recordPeriod(ctx context.Context, paystubsCSV, paymentsCSV string) (int, int, error) {
148	paystubs, err := compensation.LoadPaystubs(paystubsCSV)
149	if err != nil {
150		return 0, 0, err
151	}
152
153	payments, err := compensation.LoadPayments(paymentsCSV)
154	if err != nil {
155		return 0, 0, err
156	}
157
158	db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
159	if err != nil {
160		return 0, 0, errs.New("error connecting to master database on satellite: %+v", err)
161	}
162	defer func() { err = errs.Combine(err, db.Close()) }()
163
164	if err := db.CheckVersion(ctx); err != nil {
165		zap.L().Fatal("Failed satellite database version check.", zap.Error(err))
166		return 0, 0, errs.New("Error checking version for satellitedb: %+v", err)
167	}
168
169	if err := db.Compensation().RecordPeriod(ctx, paystubs, payments); err != nil {
170		return 0, 0, err
171	}
172
173	return len(paystubs), len(payments), nil
174}
175
176func recordOneOffPayments(ctx context.Context, paymentsCSV string) (int, error) {
177	payments, err := compensation.LoadPayments(paymentsCSV)
178	if err != nil {
179		return 0, err
180	}
181
182	db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
183	if err != nil {
184		return 0, errs.New("error connecting to master database on satellite: %+v", err)
185	}
186	defer func() { err = errs.Combine(err, db.Close()) }()
187
188	if err := db.CheckVersion(ctx); err != nil {
189		zap.L().Fatal("Failed satellite database version check.", zap.Error(err))
190		return 0, errs.New("Error checking version for satellitedb: %+v", err)
191	}
192
193	if err := db.Compensation().RecordPayments(ctx, payments); err != nil {
194		return 0, err
195	}
196
197	return len(payments), nil
198}
199