1// Copyright (C) 2020 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package rolluparchive 5 6import ( 7 "context" 8 "time" 9 10 "github.com/spacemonkeygo/monkit/v3" 11 "github.com/zeebo/errs" 12 "go.uber.org/zap" 13 14 "storj.io/common/sync2" 15 "storj.io/storj/satellite/accounting" 16) 17 18// Error is a standard error class for this package. 19var ( 20 Error = errs.Class("rolluparchive") 21 mon = monkit.Package() 22) 23 24// Config contains configurable values for rollup archiver. 25type Config struct { 26 Interval time.Duration `help:"how frequently rollup archiver should run" releaseDefault:"24h" devDefault:"120s" testDefault:"$TESTINTERVAL"` 27 ArchiveAge time.Duration `help:"age at which a rollup is archived" default:"2160h" testDefault:"24h"` 28 BatchSize int `help:"number of records to delete per delete execution. Used only for crdb which is slow without limit." default:"500" testDefault:"1000"` 29 Enabled bool `help:"whether or not the rollup archive is enabled." default:"true"` 30} 31 32// Chore archives bucket and storagenode rollups at a given interval. 33// 34// architecture: Chore 35type Chore struct { 36 log *zap.Logger 37 Loop *sync2.Cycle 38 archiveAge time.Duration 39 batchSize int 40 nodeAccounting accounting.StoragenodeAccounting 41 projectAccounting accounting.ProjectAccounting 42} 43 44// New creates a new rollup archiver chore. 45func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, config Config) *Chore { 46 return &Chore{ 47 log: log, 48 Loop: sync2.NewCycle(config.Interval), 49 archiveAge: config.ArchiveAge, 50 batchSize: config.BatchSize, 51 nodeAccounting: sdb, 52 projectAccounting: pdb, 53 } 54} 55 56// Run starts the archiver chore. 57func (chore *Chore) Run(ctx context.Context) (err error) { 58 defer mon.Task()(&ctx)(&err) 59 if chore.archiveAge < 0 { 60 return Error.New("archive age can't be less than 0") 61 } 62 return chore.Loop.Run(ctx, func(ctx context.Context) error { 63 cutoff := time.Now().UTC().Add(-chore.archiveAge) 64 err := chore.ArchiveRollups(ctx, cutoff, chore.batchSize) 65 if err != nil { 66 chore.log.Error("error archiving SN and bucket bandwidth rollups", zap.Error(err)) 67 } 68 return nil 69 }) 70} 71 72// Close stops the service and releases any resources. 73func (chore *Chore) Close() error { 74 chore.Loop.Close() 75 return nil 76} 77 78// ArchiveRollups will remove old rollups from active rollup tables. 79func (chore *Chore) ArchiveRollups(ctx context.Context, cutoff time.Time, batchSize int) (err error) { 80 defer mon.Task()(&ctx)(&err) 81 nodeRollupsArchived, err := chore.nodeAccounting.ArchiveRollupsBefore(ctx, cutoff, batchSize) 82 if err != nil { 83 chore.log.Error("archiving bandwidth rollups", zap.Int("node rollups archived", nodeRollupsArchived), zap.Error(err)) 84 return Error.Wrap(err) 85 } 86 bucketRollupsArchived, err := chore.projectAccounting.ArchiveRollupsBefore(ctx, cutoff, batchSize) 87 if err != nil { 88 chore.log.Error("archiving bandwidth rollups", zap.Int("bucket rollups archived", bucketRollupsArchived), zap.Error(err)) 89 return Error.Wrap(err) 90 } 91 return nil 92} 93