1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3compactor
16
17import (
18	"reflect"
19	"testing"
20	"time"
21
22	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
23	"go.etcd.io/etcd/pkg/testutil"
24
25	"github.com/jonboulle/clockwork"
26	"go.uber.org/zap"
27)
28
29func TestRevision(t *testing.T) {
30	fc := clockwork.NewFakeClock()
31	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
32	compactable := &fakeCompactable{testutil.NewRecorderStream()}
33	tb := newRevision(zap.NewExample(), fc, 10, rg, compactable)
34
35	tb.Run()
36	defer tb.Stop()
37
38	fc.Advance(revInterval)
39	rg.Wait(1)
40	// nothing happens
41
42	rg.SetRev(99) // will be 100
43	expectedRevision := int64(90)
44	fc.Advance(revInterval)
45	rg.Wait(1)
46	a, err := compactable.Wait(1)
47	if err != nil {
48		t.Fatal(err)
49	}
50	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
51		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
52	}
53
54	// skip the same revision
55	rg.SetRev(99) // will be 100
56	rg.Wait(1)
57	// nothing happens
58
59	rg.SetRev(199) // will be 200
60	expectedRevision = int64(190)
61	fc.Advance(revInterval)
62	rg.Wait(1)
63	a, err = compactable.Wait(1)
64	if err != nil {
65		t.Fatal(err)
66	}
67	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
68		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
69	}
70}
71
72func TestRevisionPause(t *testing.T) {
73	fc := clockwork.NewFakeClock()
74	rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
75	compactable := &fakeCompactable{testutil.NewRecorderStream()}
76	tb := newRevision(zap.NewExample(), fc, 10, rg, compactable)
77
78	tb.Run()
79	tb.Pause()
80
81	// tb will collect 3 hours of revisions but not compact since paused
82	n := int(time.Hour / revInterval)
83	for i := 0; i < 3*n; i++ {
84		fc.Advance(revInterval)
85	}
86	// tb ends up waiting for the clock
87
88	select {
89	case a := <-compactable.Chan():
90		t.Fatalf("unexpected action %v", a)
91	case <-time.After(10 * time.Millisecond):
92	}
93
94	// tb resumes to being blocked on the clock
95	tb.Resume()
96
97	// unblock clock, will kick off a compaction at hour 3:05
98	fc.Advance(revInterval)
99	rg.Wait(1)
100	a, err := compactable.Wait(1)
101	if err != nil {
102		t.Fatal(err)
103	}
104	wreq := &pb.CompactionRequest{Revision: int64(90)}
105	if !reflect.DeepEqual(a[0].Params[0], wreq) {
106		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
107	}
108}
109