1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package main 5 6import ( 7 "context" 8 "io" 9 "net" 10 "time" 11 12 "github.com/zeebo/errs" 13 "go.uber.org/zap" 14 15 "storj.io/common/storj" 16 "storj.io/storj/satellite/accounting" 17 "storj.io/storj/satellite/compensation" 18 "storj.io/storj/satellite/overlay" 19 "storj.io/storj/satellite/satellitedb" 20) 21 22func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io.Writer) (err error) { 23 periodInfo := compensation.PeriodInfo{ 24 Period: period, 25 Rates: &compensation.Rates{ 26 AtRestGBHours: generateInvoicesCfg.Compensation.Rates.AtRestGBHours, 27 GetTB: generateInvoicesCfg.Compensation.Rates.GetTB, 28 PutTB: generateInvoicesCfg.Compensation.Rates.PutTB, 29 GetRepairTB: generateInvoicesCfg.Compensation.Rates.GetRepairTB, 30 PutRepairTB: generateInvoicesCfg.Compensation.Rates.PutRepairTB, 31 GetAuditTB: generateInvoicesCfg.Compensation.Rates.GetAuditTB, 32 }, 33 SurgePercent: generateInvoicesCfg.SurgePercent, 34 DisposePercent: generateInvoicesCfg.Compensation.DisposePercent, 35 WithheldPercents: generateInvoicesCfg.Compensation.WithheldPercents, 36 } 37 38 db, err := satellitedb.Open(ctx, zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"}) 39 if err != nil { 40 return errs.New("error connecting to master database on satellite: %+v", err) 41 } 42 defer func() { err = errs.Combine(err, db.Close()) }() 43 44 if err := db.CheckVersion(ctx); err != nil { 45 zap.L().Fatal("Failed satellite database version check.", zap.Error(err)) 46 return errs.New("Error checking version for satellitedb: %+v", err) 47 } 48 49 periodUsage, err := db.StoragenodeAccounting().QueryStorageNodePeriodUsage(ctx, period) 50 if err != nil { 51 return err 52 } 53 54 periodUsageByNode := make(map[storj.NodeID]accounting.StorageNodePeriodUsage, len(periodUsage)) 55 for _, usage := range periodUsage { 56 periodUsageByNode[usage.NodeID] = usage 57 } 58 59 var allNodes []*overlay.NodeDossier 60 err = db.OverlayCache().IterateAllNodeDossiers(ctx, 61 func(ctx context.Context, node *overlay.NodeDossier) error { 62 allNodes = append(allNodes, node) 63 return nil 64 }) 65 if err != nil { 66 return err 67 } 68 69 invoices := make([]compensation.Invoice, 0, len(allNodes)) 70 for _, node := range allNodes { 71 totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, node.Id) 72 if err != nil { 73 return err 74 } 75 76 var gracefulExit *time.Time 77 if node.ExitStatus.ExitSuccess { 78 gracefulExit = node.ExitStatus.ExitFinishedAt 79 } 80 nodeAddress, _, err := net.SplitHostPort(node.Address.Address) 81 if err != nil { 82 return errs.New("unable to split node %q address %q", node.Id, node.Address.Address) 83 } 84 var nodeLastIP string 85 if node.LastIPPort != "" { 86 nodeLastIP, _, err = net.SplitHostPort(node.LastIPPort) 87 if err != nil { 88 return errs.New("unable to split node %q last ip:port %q", node.Id, node.LastIPPort) 89 } 90 } 91 92 // the zero value of period usage is acceptable for if the node does not have 93 // any usage for the period. 94 usage := periodUsageByNode[node.Id] 95 nodeInfo := compensation.NodeInfo{ 96 ID: node.Id, 97 CreatedAt: node.CreatedAt, 98 LastContactSuccess: node.Reputation.LastContactSuccess, 99 Disqualified: node.Disqualified, 100 GracefulExit: gracefulExit, 101 UsageAtRest: usage.AtRestTotal, 102 UsageGet: usage.GetTotal, 103 UsagePut: usage.PutTotal, 104 UsageGetRepair: usage.GetRepairTotal, 105 UsagePutRepair: usage.PutRepairTotal, 106 UsageGetAudit: usage.GetAuditTotal, 107 TotalHeld: totalAmounts.TotalHeld, 108 TotalDisposed: totalAmounts.TotalDisposed, 109 TotalPaid: totalAmounts.TotalPaid, 110 TotalDistributed: totalAmounts.TotalDistributed, 111 } 112 113 invoice := compensation.Invoice{ 114 Period: period, 115 NodeID: compensation.NodeID(node.Id), 116 NodeWallet: node.Operator.Wallet, 117 NodeWalletFeatures: node.Operator.WalletFeatures, 118 NodeAddress: nodeAddress, 119 NodeLastIP: nodeLastIP, 120 } 121 122 if err := invoice.MergeNodeInfo(nodeInfo); err != nil { 123 return err 124 } 125 invoices = append(invoices, invoice) 126 periodInfo.Nodes = append(periodInfo.Nodes, nodeInfo) 127 } 128 129 statements, err := compensation.GenerateStatements(periodInfo) 130 if err != nil { 131 return err 132 } 133 134 for i := 0; i < len(statements); i++ { 135 if err := invoices[i].MergeStatement(statements[i]); err != nil { 136 return err 137 } 138 } 139 140 if err := compensation.WriteInvoices(out, invoices); err != nil { 141 return err 142 } 143 144 return nil 145} 146 147func recordPeriod(ctx context.Context, paystubsCSV, paymentsCSV string) (int, int, error) { 148 paystubs, err := compensation.LoadPaystubs(paystubsCSV) 149 if err != nil { 150 return 0, 0, err 151 } 152 153 payments, err := compensation.LoadPayments(paymentsCSV) 154 if err != nil { 155 return 0, 0, err 156 } 157 158 db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"}) 159 if err != nil { 160 return 0, 0, errs.New("error connecting to master database on satellite: %+v", err) 161 } 162 defer func() { err = errs.Combine(err, db.Close()) }() 163 164 if err := db.CheckVersion(ctx); err != nil { 165 zap.L().Fatal("Failed satellite database version check.", zap.Error(err)) 166 return 0, 0, errs.New("Error checking version for satellitedb: %+v", err) 167 } 168 169 if err := db.Compensation().RecordPeriod(ctx, paystubs, payments); err != nil { 170 return 0, 0, err 171 } 172 173 return len(paystubs), len(payments), nil 174} 175 176func recordOneOffPayments(ctx context.Context, paymentsCSV string) (int, error) { 177 payments, err := compensation.LoadPayments(paymentsCSV) 178 if err != nil { 179 return 0, err 180 } 181 182 db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"}) 183 if err != nil { 184 return 0, errs.New("error connecting to master database on satellite: %+v", err) 185 } 186 defer func() { err = errs.Combine(err, db.Close()) }() 187 188 if err := db.CheckVersion(ctx); err != nil { 189 zap.L().Fatal("Failed satellite database version check.", zap.Error(err)) 190 return 0, errs.New("Error checking version for satellitedb: %+v", err) 191 } 192 193 if err := db.Compensation().RecordPayments(ctx, payments); err != nil { 194 return 0, err 195 } 196 197 return len(payments), nil 198} 199