1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package v3compactor 16 17import ( 18 "context" 19 "sync" 20 "time" 21 22 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 23 "go.etcd.io/etcd/server/v3/mvcc" 24 25 "github.com/jonboulle/clockwork" 26 "go.uber.org/zap" 27) 28 29// Revision compacts the log by purging revisions older than 30// the configured reivison number. Compaction happens every 5 minutes. 31type Revision struct { 32 lg *zap.Logger 33 34 clock clockwork.Clock 35 retention int64 36 37 rg RevGetter 38 c Compactable 39 40 ctx context.Context 41 cancel context.CancelFunc 42 43 mu sync.Mutex 44 paused bool 45} 46 47// newRevision creates a new instance of Revisonal compactor that purges 48// the log older than retention revisions from the current revision. 49func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { 50 rc := &Revision{ 51 lg: lg, 52 clock: clock, 53 retention: retention, 54 rg: rg, 55 c: c, 56 } 57 rc.ctx, rc.cancel = context.WithCancel(context.Background()) 58 return rc 59} 60 61const revInterval = 5 * time.Minute 62 63// Run runs revision-based compactor. 64func (rc *Revision) Run() { 65 prev := int64(0) 66 go func() { 67 for { 68 select { 69 case <-rc.ctx.Done(): 70 return 71 case <-rc.clock.After(revInterval): 72 rc.mu.Lock() 73 p := rc.paused 74 rc.mu.Unlock() 75 if p { 76 continue 77 } 78 } 79 80 rev := rc.rg.Rev() - rc.retention 81 if rev <= 0 || rev == prev { 82 continue 83 } 84 85 now := time.Now() 86 rc.lg.Info( 87 "starting auto revision compaction", 88 zap.Int64("revision", rev), 89 zap.Int64("revision-compaction-retention", rc.retention), 90 ) 91 _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev}) 92 if err == nil || err == mvcc.ErrCompacted { 93 prev = rev 94 rc.lg.Info( 95 "completed auto revision compaction", 96 zap.Int64("revision", rev), 97 zap.Int64("revision-compaction-retention", rc.retention), 98 zap.Duration("took", time.Since(now)), 99 ) 100 } else { 101 rc.lg.Warn( 102 "failed auto revision compaction", 103 zap.Int64("revision", rev), 104 zap.Int64("revision-compaction-retention", rc.retention), 105 zap.Duration("retry-interval", revInterval), 106 zap.Error(err), 107 ) 108 } 109 } 110 }() 111} 112 113// Stop stops revision-based compactor. 114func (rc *Revision) Stop() { 115 rc.cancel() 116} 117 118// Pause pauses revision-based compactor. 119func (rc *Revision) Pause() { 120 rc.mu.Lock() 121 rc.paused = true 122 rc.mu.Unlock() 123} 124 125// Resume resumes revision-based compactor. 126func (rc *Revision) Resume() { 127 rc.mu.Lock() 128 rc.paused = false 129 rc.mu.Unlock() 130} 131