1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package compactor
16
17import (
18	"reflect"
19	"testing"
20	"time"
21
22	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
23	"github.com/coreos/etcd/pkg/testutil"
24
25	"github.com/jonboulle/clockwork"
26)
27
28func TestPeriodicHourly(t *testing.T) {
29	retentionHours := 2
30	retentionDuration := time.Duration(retentionHours) * time.Hour
31
32	fc := clockwork.NewFakeClock()
33	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
34	compactable := &fakeCompactable{testutil.NewRecorderStream()}
35	tb := newPeriodic(fc, retentionDuration, rg, compactable)
36
37	tb.Run()
38	defer tb.Stop()
39
40	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
41
42	// compaction doesn't happen til 2 hours elapse
43	for i := 0; i < initialIntervals; i++ {
44		rg.Wait(1)
45		fc.Advance(tb.getRetryInterval())
46	}
47
48	// very first compaction
49	a, err := compactable.Wait(1)
50	if err != nil {
51		t.Fatal(err)
52	}
53	expectedRevision := int64(1)
54	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
55		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
56	}
57
58	// simulate 3 hours
59	// now compactor kicks in, every hour
60	for i := 0; i < 3; i++ {
61		// advance one hour, one revision for each interval
62		for j := 0; j < intervalsPerPeriod; j++ {
63			rg.Wait(1)
64			fc.Advance(tb.getRetryInterval())
65		}
66
67		a, err = compactable.Wait(1)
68		if err != nil {
69			t.Fatal(err)
70		}
71
72		expectedRevision = int64((i + 1) * 10)
73		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
74			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
75		}
76	}
77}
78
79func TestPeriodicMinutes(t *testing.T) {
80	retentionMinutes := 5
81	retentionDuration := time.Duration(retentionMinutes) * time.Minute
82
83	fc := clockwork.NewFakeClock()
84	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
85	compactable := &fakeCompactable{testutil.NewRecorderStream()}
86	tb := newPeriodic(fc, retentionDuration, rg, compactable)
87
88	tb.Run()
89	defer tb.Stop()
90
91	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
92
93	// compaction doesn't happen til 5 minutes elapse
94	for i := 0; i < initialIntervals; i++ {
95		rg.Wait(1)
96		fc.Advance(tb.getRetryInterval())
97	}
98
99	// very first compaction
100	a, err := compactable.Wait(1)
101	if err != nil {
102		t.Fatal(err)
103	}
104	expectedRevision := int64(1)
105	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
106		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
107	}
108
109	// compaction happens at every interval
110	for i := 0; i < 5; i++ {
111		// advance 5-minute, one revision for each interval
112		for j := 0; j < intervalsPerPeriod; j++ {
113			rg.Wait(1)
114			fc.Advance(tb.getRetryInterval())
115		}
116
117		a, err := compactable.Wait(1)
118		if err != nil {
119			t.Fatal(err)
120		}
121
122		expectedRevision = int64((i + 1) * 10)
123		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
124			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
125		}
126	}
127}
128
129func TestPeriodicPause(t *testing.T) {
130	fc := clockwork.NewFakeClock()
131	retentionDuration := time.Hour
132	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
133	compactable := &fakeCompactable{testutil.NewRecorderStream()}
134	tb := newPeriodic(fc, retentionDuration, rg, compactable)
135
136	tb.Run()
137	tb.Pause()
138
139	n := tb.getRetentions()
140
141	// tb will collect 3 hours of revisions but not compact since paused
142	for i := 0; i < n*3; i++ {
143		rg.Wait(1)
144		fc.Advance(tb.getRetryInterval())
145	}
146	// t.revs = [21 22 23 24 25 26 27 28 29 30]
147
148	select {
149	case a := <-compactable.Chan():
150		t.Fatalf("unexpected action %v", a)
151	case <-time.After(10 * time.Millisecond):
152	}
153
154	// tb resumes to being blocked on the clock
155	tb.Resume()
156	rg.Wait(1)
157
158	// unblock clock, will kick off a compaction at T=3h6m by retry
159	fc.Advance(tb.getRetryInterval())
160
161	// T=3h6m
162	a, err := compactable.Wait(1)
163	if err != nil {
164		t.Fatal(err)
165	}
166
167	// compact the revision from hour 2:06
168	wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
169	if !reflect.DeepEqual(a[0].Params[0], wreq) {
170		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
171	}
172}
173