1// Copyright (C) 2020 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package rolluparchive
5
6import (
7	"context"
8	"time"
9
10	"github.com/spacemonkeygo/monkit/v3"
11	"github.com/zeebo/errs"
12	"go.uber.org/zap"
13
14	"storj.io/common/sync2"
15	"storj.io/storj/satellite/accounting"
16)
17
18// Error is a standard error class for this package.
19var (
20	Error = errs.Class("rolluparchive")
21	mon   = monkit.Package()
22)
23
24// Config contains configurable values for rollup archiver.
25type Config struct {
26	Interval   time.Duration `help:"how frequently rollup archiver should run" releaseDefault:"24h" devDefault:"120s" testDefault:"$TESTINTERVAL"`
27	ArchiveAge time.Duration `help:"age at which a rollup is archived" default:"2160h" testDefault:"24h"`
28	BatchSize  int           `help:"number of records to delete per delete execution. Used only for crdb which is slow without limit." default:"500" testDefault:"1000"`
29	Enabled    bool          `help:"whether or not the rollup archive is enabled." default:"true"`
30}
31
32// Chore archives bucket and storagenode rollups at a given interval.
33//
34// architecture: Chore
35type Chore struct {
36	log               *zap.Logger
37	Loop              *sync2.Cycle
38	archiveAge        time.Duration
39	batchSize         int
40	nodeAccounting    accounting.StoragenodeAccounting
41	projectAccounting accounting.ProjectAccounting
42}
43
44// New creates a new rollup archiver chore.
45func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, config Config) *Chore {
46	return &Chore{
47		log:               log,
48		Loop:              sync2.NewCycle(config.Interval),
49		archiveAge:        config.ArchiveAge,
50		batchSize:         config.BatchSize,
51		nodeAccounting:    sdb,
52		projectAccounting: pdb,
53	}
54}
55
56// Run starts the archiver chore.
57func (chore *Chore) Run(ctx context.Context) (err error) {
58	defer mon.Task()(&ctx)(&err)
59	if chore.archiveAge < 0 {
60		return Error.New("archive age can't be less than 0")
61	}
62	return chore.Loop.Run(ctx, func(ctx context.Context) error {
63		cutoff := time.Now().UTC().Add(-chore.archiveAge)
64		err := chore.ArchiveRollups(ctx, cutoff, chore.batchSize)
65		if err != nil {
66			chore.log.Error("error archiving SN and bucket bandwidth rollups", zap.Error(err))
67		}
68		return nil
69	})
70}
71
72// Close stops the service and releases any resources.
73func (chore *Chore) Close() error {
74	chore.Loop.Close()
75	return nil
76}
77
78// ArchiveRollups will remove old rollups from active rollup tables.
79func (chore *Chore) ArchiveRollups(ctx context.Context, cutoff time.Time, batchSize int) (err error) {
80	defer mon.Task()(&ctx)(&err)
81	nodeRollupsArchived, err := chore.nodeAccounting.ArchiveRollupsBefore(ctx, cutoff, batchSize)
82	if err != nil {
83		chore.log.Error("archiving bandwidth rollups", zap.Int("node rollups archived", nodeRollupsArchived), zap.Error(err))
84		return Error.Wrap(err)
85	}
86	bucketRollupsArchived, err := chore.projectAccounting.ArchiveRollupsBefore(ctx, cutoff, batchSize)
87	if err != nil {
88		chore.log.Error("archiving bandwidth rollups", zap.Int("bucket rollups archived", bucketRollupsArchived), zap.Error(err))
89		return Error.Wrap(err)
90	}
91	return nil
92}
93