1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package nodestats 5 6import ( 7 "context" 8 9 "github.com/spacemonkeygo/monkit/v3" 10 "go.uber.org/zap" 11 12 "storj.io/common/identity" 13 "storj.io/common/pb" 14 "storj.io/common/rpc/rpcstatus" 15 "storj.io/storj/satellite/accounting" 16 "storj.io/storj/satellite/overlay" 17 "storj.io/storj/satellite/payments/paymentsconfig" 18 "storj.io/storj/satellite/reputation" 19) 20 21var ( 22 mon = monkit.Package() 23) 24 25// Endpoint for querying node stats for the SNO. 26// 27// architecture: Endpoint 28type Endpoint struct { 29 pb.DRPCNodeStatsUnimplementedServer 30 31 log *zap.Logger 32 overlay overlay.DB 33 reputation *reputation.Service 34 accounting accounting.StoragenodeAccounting 35 config paymentsconfig.Config 36} 37 38// NewEndpoint creates new endpoint. 39func NewEndpoint(log *zap.Logger, overlay overlay.DB, reputation *reputation.Service, accounting accounting.StoragenodeAccounting, config paymentsconfig.Config) *Endpoint { 40 return &Endpoint{ 41 log: log, 42 overlay: overlay, 43 reputation: reputation, 44 accounting: accounting, 45 config: config, 46 } 47} 48 49// GetStats sends node stats for client node. 50func (e *Endpoint) GetStats(ctx context.Context, req *pb.GetStatsRequest) (_ *pb.GetStatsResponse, err error) { 51 defer mon.Task()(&ctx)(&err) 52 53 peer, err := identity.PeerIdentityFromContext(ctx) 54 if err != nil { 55 return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) 56 } 57 node, err := e.overlay.Get(ctx, peer.ID) 58 if err != nil { 59 if overlay.ErrNodeNotFound.Has(err) { 60 return nil, nil 61 } 62 e.log.Error("overlay.Get failed", zap.Error(err)) 63 return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) 64 } 65 reputationInfo, err := e.reputation.Get(ctx, peer.ID) 66 if err != nil { 67 e.log.Error("reputation.Get failed", zap.Error(err)) 68 return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) 69 } 70 71 auditScore := calculateReputationScore( 72 reputationInfo.AuditReputationAlpha, 73 reputationInfo.AuditReputationBeta) 74 75 unknownScore := calculateReputationScore( 76 reputationInfo.UnknownAuditReputationAlpha, 77 reputationInfo.UnknownAuditReputationBeta) 78 79 return &pb.GetStatsResponse{ 80 AuditCheck: &pb.ReputationStats{ 81 TotalCount: reputationInfo.TotalAuditCount, 82 SuccessCount: reputationInfo.AuditSuccessCount, 83 ReputationAlpha: reputationInfo.AuditReputationAlpha, 84 ReputationBeta: reputationInfo.AuditReputationBeta, 85 UnknownReputationAlpha: reputationInfo.UnknownAuditReputationAlpha, 86 UnknownReputationBeta: reputationInfo.UnknownAuditReputationBeta, 87 ReputationScore: auditScore, 88 UnknownReputationScore: unknownScore, 89 }, 90 OnlineScore: reputationInfo.OnlineScore, 91 Disqualified: reputationInfo.Disqualified, 92 Suspended: reputationInfo.UnknownAuditSuspended, 93 OfflineSuspended: reputationInfo.OfflineSuspended, 94 OfflineUnderReview: reputationInfo.UnderReview, 95 VettedAt: reputationInfo.VettedAt, 96 AuditHistory: reputation.AuditHistoryToPB(reputationInfo.AuditHistory), 97 JoinedAt: node.CreatedAt, 98 }, nil 99} 100 101// DailyStorageUsage returns slice of daily storage usage for given period of time sorted in ASC order by date. 102func (e *Endpoint) DailyStorageUsage(ctx context.Context, req *pb.DailyStorageUsageRequest) (_ *pb.DailyStorageUsageResponse, err error) { 103 defer mon.Task()(&ctx)(&err) 104 105 peer, err := identity.PeerIdentityFromContext(ctx) 106 if err != nil { 107 return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) 108 } 109 node, err := e.overlay.Get(ctx, peer.ID) 110 if err != nil { 111 if overlay.ErrNodeNotFound.Has(err) { 112 return nil, nil 113 } 114 e.log.Error("overlay.Get failed", zap.Error(err)) 115 return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) 116 } 117 118 nodeSpaceUsages, err := e.accounting.QueryStorageNodeUsage(ctx, node.Id, req.GetFrom(), req.GetTo()) 119 if err != nil { 120 e.log.Error("accounting.QueryStorageNodeUsage failed", zap.Error(err)) 121 return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) 122 } 123 124 return &pb.DailyStorageUsageResponse{ 125 NodeId: node.Id, 126 DailyStorageUsage: toProtoDailyStorageUsage(nodeSpaceUsages), 127 }, nil 128} 129 130// PricingModel returns pricing model for storagenode. 131func (e *Endpoint) PricingModel(ctx context.Context, req *pb.PricingModelRequest) (_ *pb.PricingModelResponse, err error) { 132 defer mon.Task()(&ctx)(&err) 133 134 return &pb.PricingModelResponse{ 135 EgressBandwidthPrice: e.config.NodeEgressBandwidthPrice, 136 RepairBandwidthPrice: e.config.NodeRepairBandwidthPrice, 137 DiskSpacePrice: e.config.NodeDiskSpacePrice, 138 AuditBandwidthPrice: e.config.NodeAuditBandwidthPrice, 139 }, nil 140} 141 142// toProtoDailyStorageUsage converts StorageNodeUsage to PB DailyStorageUsageResponse_StorageUsage. 143func toProtoDailyStorageUsage(usages []accounting.StorageNodeUsage) []*pb.DailyStorageUsageResponse_StorageUsage { 144 var pbUsages []*pb.DailyStorageUsageResponse_StorageUsage 145 146 for _, usage := range usages { 147 pbUsages = append(pbUsages, &pb.DailyStorageUsageResponse_StorageUsage{ 148 AtRestTotal: usage.StorageUsed, 149 Timestamp: usage.Timestamp, 150 }) 151 } 152 153 return pbUsages 154} 155 156// calculateReputationScore is helper method to calculate reputation score value. 157func calculateReputationScore(alpha, beta float64) float64 { 158 return alpha / (alpha + beta) 159} 160