1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package v3compactor 16 17import ( 18 "reflect" 19 "testing" 20 "time" 21 22 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 23 "go.etcd.io/etcd/pkg/testutil" 24 25 "github.com/jonboulle/clockwork" 26 "go.uber.org/zap" 27) 28 29func TestRevision(t *testing.T) { 30 fc := clockwork.NewFakeClock() 31 rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} 32 compactable := &fakeCompactable{testutil.NewRecorderStream()} 33 tb := newRevision(zap.NewExample(), fc, 10, rg, compactable) 34 35 tb.Run() 36 defer tb.Stop() 37 38 fc.Advance(revInterval) 39 rg.Wait(1) 40 // nothing happens 41 42 rg.SetRev(99) // will be 100 43 expectedRevision := int64(90) 44 fc.Advance(revInterval) 45 rg.Wait(1) 46 a, err := compactable.Wait(1) 47 if err != nil { 48 t.Fatal(err) 49 } 50 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { 51 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) 52 } 53 54 // skip the same revision 55 rg.SetRev(99) // will be 100 56 rg.Wait(1) 57 // nothing happens 58 59 rg.SetRev(199) // will be 200 60 expectedRevision = int64(190) 61 fc.Advance(revInterval) 62 rg.Wait(1) 63 a, err = compactable.Wait(1) 64 if err != nil { 65 t.Fatal(err) 66 } 67 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { 68 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) 69 } 70} 71 72func TestRevisionPause(t *testing.T) { 73 fc := clockwork.NewFakeClock() 74 rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 75 compactable := &fakeCompactable{testutil.NewRecorderStream()} 76 tb := newRevision(zap.NewExample(), fc, 10, rg, compactable) 77 78 tb.Run() 79 tb.Pause() 80 81 // tb will collect 3 hours of revisions but not compact since paused 82 n := int(time.Hour / revInterval) 83 for i := 0; i < 3*n; i++ { 84 fc.Advance(revInterval) 85 } 86 // tb ends up waiting for the clock 87 88 select { 89 case a := <-compactable.Chan(): 90 t.Fatalf("unexpected action %v", a) 91 case <-time.After(10 * time.Millisecond): 92 } 93 94 // tb resumes to being blocked on the clock 95 tb.Resume() 96 97 // unblock clock, will kick off a compaction at hour 3:05 98 fc.Advance(revInterval) 99 rg.Wait(1) 100 a, err := compactable.Wait(1) 101 if err != nil { 102 t.Fatal(err) 103 } 104 wreq := &pb.CompactionRequest{Revision: int64(90)} 105 if !reflect.DeepEqual(a[0].Params[0], wreq) { 106 t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) 107 } 108} 109