1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package compactor
16
17import (
18	"context"
19	"sync"
20	"time"
21
22	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
23	"github.com/coreos/etcd/mvcc"
24
25	"github.com/jonboulle/clockwork"
26)
27
28// Revision compacts the log by purging revisions older than
29// the configured reivison number. Compaction happens every 5 minutes.
30type Revision struct {
31	clock     clockwork.Clock
32	retention int64
33
34	rg RevGetter
35	c  Compactable
36
37	ctx    context.Context
38	cancel context.CancelFunc
39
40	mu     sync.Mutex
41	paused bool
42}
43
44// NewRevision creates a new instance of Revisonal compactor that purges
45// the log older than retention revisions from the current revision.
46func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
47	return newRevision(clockwork.NewRealClock(), retention, rg, c)
48}
49
50func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
51	t := &Revision{
52		clock:     clock,
53		retention: retention,
54		rg:        rg,
55		c:         c,
56	}
57	t.ctx, t.cancel = context.WithCancel(context.Background())
58	return t
59}
60
61const revInterval = 5 * time.Minute
62
63// Run runs revision-based compactor.
64func (t *Revision) Run() {
65	prev := int64(0)
66	go func() {
67		for {
68			select {
69			case <-t.ctx.Done():
70				return
71			case <-t.clock.After(revInterval):
72				t.mu.Lock()
73				p := t.paused
74				t.mu.Unlock()
75				if p {
76					continue
77				}
78			}
79
80			rev := t.rg.Rev() - t.retention
81			if rev <= 0 || rev == prev {
82				continue
83			}
84
85			plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
86			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
87			if err == nil || err == mvcc.ErrCompacted {
88				prev = rev
89				plog.Noticef("Finished auto-compaction at revision %d", rev)
90			} else {
91				plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
92				plog.Noticef("Retry after %v", revInterval)
93			}
94		}
95	}()
96}
97
98// Stop stops revision-based compactor.
99func (t *Revision) Stop() {
100	t.cancel()
101}
102
103// Pause pauses revision-based compactor.
104func (t *Revision) Pause() {
105	t.mu.Lock()
106	defer t.mu.Unlock()
107	t.paused = true
108}
109
110// Resume resumes revision-based compactor.
111func (t *Revision) Resume() {
112	t.mu.Lock()
113	defer t.mu.Unlock()
114	t.paused = false
115}
116