1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package audit
5
6import (
7	"context"
8	"time"
9
10	"github.com/zeebo/errs"
11	"go.uber.org/zap"
12
13	"storj.io/common/memory"
14	"storj.io/common/storj"
15	"storj.io/common/sync2"
16)
17
18// Error is the default audit errs class.
19var Error = errs.Class("audit")
20
21// Config contains configurable values for audit chore and workers.
22type Config struct {
23	MaxRetriesStatDB   int           `help:"max number of times to attempt updating a statdb batch" default:"3"`
24	MinBytesPerSecond  memory.Size   `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B" testDefault:"1.00 KB"`
25	MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"`
26	MaxReverifyCount   int           `help:"limit above which we consider an audit is failed" default:"3"`
27
28	ChoreInterval     time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
29	QueueInterval     time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
30	Slots             int           `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
31	WorkerConcurrency int           `help:"number of workers to run audits on segments" default:"2"`
32}
33
34// Worker contains information for populating audit queue and processing audits.
35type Worker struct {
36	log      *zap.Logger
37	queues   *Queues
38	verifier *Verifier
39	reporter *Reporter
40	Loop     *sync2.Cycle
41	limiter  *sync2.Limiter
42}
43
44// NewWorker instantiates Worker.
45func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) {
46	return &Worker{
47		log: log,
48
49		queues:   queues,
50		verifier: verifier,
51		reporter: reporter,
52		Loop:     sync2.NewCycle(config.QueueInterval),
53		limiter:  sync2.NewLimiter(config.WorkerConcurrency),
54	}, nil
55}
56
57// Run runs audit service 2.0.
58func (worker *Worker) Run(ctx context.Context) (err error) {
59	defer mon.Task()(&ctx)(&err)
60
61	// Wait for all audits to run.
62	defer worker.limiter.Wait()
63
64	return worker.Loop.Run(ctx, func(ctx context.Context) (err error) {
65		defer mon.Task()(&ctx)(&err)
66		err = worker.process(ctx)
67		if err != nil {
68			worker.log.Error("process", zap.Error(Error.Wrap(err)))
69		}
70		return nil
71	})
72}
73
74// Close halts the worker.
75func (worker *Worker) Close() error {
76	worker.Loop.Close()
77	return nil
78}
79
80// process repeatedly removes an item from the queue and runs an audit.
81func (worker *Worker) process(ctx context.Context) (err error) {
82	defer mon.Task()(&ctx)(&err)
83
84	// get the current queue
85	queue := worker.queues.Fetch()
86
87	worker.limiter.Wait()
88	for {
89		segment, err := queue.Next()
90		if err != nil {
91			if ErrEmptyQueue.Has(err) {
92				// get a new queue and return if empty; otherwise continue working.
93				queue = worker.queues.Fetch()
94				if queue.Size() == 0 {
95					return nil
96				}
97				continue
98			}
99			return err
100		}
101
102		worker.limiter.Go(ctx, func() {
103			err := worker.work(ctx, segment)
104			if err != nil {
105				worker.log.Error("error(s) during audit",
106					zap.String("Segment StreamID", segment.StreamID.String()),
107					zap.Uint64("Segment Position", segment.Position.Encode()),
108					zap.Error(err))
109			}
110		})
111	}
112}
113
114func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
115	defer mon.Task()(&ctx)(&err)
116
117	var errlist errs.Group
118
119	// First, attempt to reverify nodes for this segment that are in containment mode.
120	report, err := worker.verifier.Reverify(ctx, segment)
121	if err != nil {
122		errlist.Add(err)
123	}
124
125	// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
126	_, err = worker.reporter.RecordAudits(ctx, report)
127	if err != nil {
128		errlist.Add(err)
129	}
130
131	// Skip all reverified nodes in the next Verify step.
132	skip := make(map[storj.NodeID]bool)
133	for _, nodeID := range report.Successes {
134		skip[nodeID] = true
135	}
136	for _, nodeID := range report.Offlines {
137		skip[nodeID] = true
138	}
139	for _, nodeID := range report.Fails {
140		skip[nodeID] = true
141	}
142	for _, pending := range report.PendingAudits {
143		skip[pending.NodeID] = true
144	}
145	for _, nodeID := range report.Unknown {
146		skip[nodeID] = true
147	}
148
149	// Next, audit the the remaining nodes that are not in containment mode.
150	report, err = worker.verifier.Verify(ctx, segment, skip)
151	if err != nil {
152		errlist.Add(err)
153	}
154
155	// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
156	_, err = worker.reporter.RecordAudits(ctx, report)
157	if err != nil {
158		errlist.Add(err)
159	}
160
161	return errlist.Err()
162}
163