1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package audit 5 6import ( 7 "context" 8 "time" 9 10 "github.com/zeebo/errs" 11 "go.uber.org/zap" 12 13 "storj.io/common/memory" 14 "storj.io/common/storj" 15 "storj.io/common/sync2" 16) 17 18// Error is the default audit errs class. 19var Error = errs.Class("audit") 20 21// Config contains configurable values for audit chore and workers. 22type Config struct { 23 MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"` 24 MinBytesPerSecond memory.Size `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B" testDefault:"1.00 KB"` 25 MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"` 26 MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"` 27 28 ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"` 29 QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"` 30 Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` 31 WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` 32} 33 34// Worker contains information for populating audit queue and processing audits. 35type Worker struct { 36 log *zap.Logger 37 queues *Queues 38 verifier *Verifier 39 reporter *Reporter 40 Loop *sync2.Cycle 41 limiter *sync2.Limiter 42} 43 44// NewWorker instantiates Worker. 45func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) { 46 return &Worker{ 47 log: log, 48 49 queues: queues, 50 verifier: verifier, 51 reporter: reporter, 52 Loop: sync2.NewCycle(config.QueueInterval), 53 limiter: sync2.NewLimiter(config.WorkerConcurrency), 54 }, nil 55} 56 57// Run runs audit service 2.0. 58func (worker *Worker) Run(ctx context.Context) (err error) { 59 defer mon.Task()(&ctx)(&err) 60 61 // Wait for all audits to run. 62 defer worker.limiter.Wait() 63 64 return worker.Loop.Run(ctx, func(ctx context.Context) (err error) { 65 defer mon.Task()(&ctx)(&err) 66 err = worker.process(ctx) 67 if err != nil { 68 worker.log.Error("process", zap.Error(Error.Wrap(err))) 69 } 70 return nil 71 }) 72} 73 74// Close halts the worker. 75func (worker *Worker) Close() error { 76 worker.Loop.Close() 77 return nil 78} 79 80// process repeatedly removes an item from the queue and runs an audit. 81func (worker *Worker) process(ctx context.Context) (err error) { 82 defer mon.Task()(&ctx)(&err) 83 84 // get the current queue 85 queue := worker.queues.Fetch() 86 87 worker.limiter.Wait() 88 for { 89 segment, err := queue.Next() 90 if err != nil { 91 if ErrEmptyQueue.Has(err) { 92 // get a new queue and return if empty; otherwise continue working. 93 queue = worker.queues.Fetch() 94 if queue.Size() == 0 { 95 return nil 96 } 97 continue 98 } 99 return err 100 } 101 102 worker.limiter.Go(ctx, func() { 103 err := worker.work(ctx, segment) 104 if err != nil { 105 worker.log.Error("error(s) during audit", 106 zap.String("Segment StreamID", segment.StreamID.String()), 107 zap.Uint64("Segment Position", segment.Position.Encode()), 108 zap.Error(err)) 109 } 110 }) 111 } 112} 113 114func (worker *Worker) work(ctx context.Context, segment Segment) (err error) { 115 defer mon.Task()(&ctx)(&err) 116 117 var errlist errs.Group 118 119 // First, attempt to reverify nodes for this segment that are in containment mode. 120 report, err := worker.verifier.Reverify(ctx, segment) 121 if err != nil { 122 errlist.Add(err) 123 } 124 125 // TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update 126 _, err = worker.reporter.RecordAudits(ctx, report) 127 if err != nil { 128 errlist.Add(err) 129 } 130 131 // Skip all reverified nodes in the next Verify step. 132 skip := make(map[storj.NodeID]bool) 133 for _, nodeID := range report.Successes { 134 skip[nodeID] = true 135 } 136 for _, nodeID := range report.Offlines { 137 skip[nodeID] = true 138 } 139 for _, nodeID := range report.Fails { 140 skip[nodeID] = true 141 } 142 for _, pending := range report.PendingAudits { 143 skip[pending.NodeID] = true 144 } 145 for _, nodeID := range report.Unknown { 146 skip[nodeID] = true 147 } 148 149 // Next, audit the the remaining nodes that are not in containment mode. 150 report, err = worker.verifier.Verify(ctx, segment, skip) 151 if err != nil { 152 errlist.Add(err) 153 } 154 155 // TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update 156 _, err = worker.reporter.RecordAudits(ctx, report) 157 if err != nil { 158 errlist.Add(err) 159 } 160 161 return errlist.Err() 162} 163