1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package compactor 16 17import ( 18 "reflect" 19 "testing" 20 "time" 21 22 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 23 "github.com/coreos/etcd/pkg/testutil" 24 25 "github.com/jonboulle/clockwork" 26) 27 28func TestPeriodicHourly(t *testing.T) { 29 retentionHours := 2 30 retentionDuration := time.Duration(retentionHours) * time.Hour 31 32 fc := clockwork.NewFakeClock() 33 rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} 34 compactable := &fakeCompactable{testutil.NewRecorderStream()} 35 tb := newPeriodic(fc, retentionDuration, rg, compactable) 36 37 tb.Run() 38 defer tb.Stop() 39 40 initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 41 42 // compaction doesn't happen til 2 hours elapse 43 for i := 0; i < initialIntervals; i++ { 44 rg.Wait(1) 45 fc.Advance(tb.getRetryInterval()) 46 } 47 48 // very first compaction 49 a, err := compactable.Wait(1) 50 if err != nil { 51 t.Fatal(err) 52 } 53 expectedRevision := int64(1) 54 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { 55 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) 56 } 57 58 // simulate 3 hours 59 // now compactor kicks in, every hour 60 for i := 0; i < 3; i++ { 61 // advance one hour, one revision for each interval 62 for j := 0; j < intervalsPerPeriod; j++ { 63 rg.Wait(1) 64 fc.Advance(tb.getRetryInterval()) 65 } 66 67 a, err = compactable.Wait(1) 68 if err != nil { 69 t.Fatal(err) 70 } 71 72 expectedRevision = int64((i + 1) * 10) 73 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { 74 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) 75 } 76 } 77} 78 79func TestPeriodicMinutes(t *testing.T) { 80 retentionMinutes := 5 81 retentionDuration := time.Duration(retentionMinutes) * time.Minute 82 83 fc := clockwork.NewFakeClock() 84 rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} 85 compactable := &fakeCompactable{testutil.NewRecorderStream()} 86 tb := newPeriodic(fc, retentionDuration, rg, compactable) 87 88 tb.Run() 89 defer tb.Stop() 90 91 initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 92 93 // compaction doesn't happen til 5 minutes elapse 94 for i := 0; i < initialIntervals; i++ { 95 rg.Wait(1) 96 fc.Advance(tb.getRetryInterval()) 97 } 98 99 // very first compaction 100 a, err := compactable.Wait(1) 101 if err != nil { 102 t.Fatal(err) 103 } 104 expectedRevision := int64(1) 105 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { 106 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) 107 } 108 109 // compaction happens at every interval 110 for i := 0; i < 5; i++ { 111 // advance 5-minute, one revision for each interval 112 for j := 0; j < intervalsPerPeriod; j++ { 113 rg.Wait(1) 114 fc.Advance(tb.getRetryInterval()) 115 } 116 117 a, err := compactable.Wait(1) 118 if err != nil { 119 t.Fatal(err) 120 } 121 122 expectedRevision = int64((i + 1) * 10) 123 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { 124 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) 125 } 126 } 127} 128 129func TestPeriodicPause(t *testing.T) { 130 fc := clockwork.NewFakeClock() 131 retentionDuration := time.Hour 132 rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} 133 compactable := &fakeCompactable{testutil.NewRecorderStream()} 134 tb := newPeriodic(fc, retentionDuration, rg, compactable) 135 136 tb.Run() 137 tb.Pause() 138 139 n := tb.getRetentions() 140 141 // tb will collect 3 hours of revisions but not compact since paused 142 for i := 0; i < n*3; i++ { 143 rg.Wait(1) 144 fc.Advance(tb.getRetryInterval()) 145 } 146 // t.revs = [21 22 23 24 25 26 27 28 29 30] 147 148 select { 149 case a := <-compactable.Chan(): 150 t.Fatalf("unexpected action %v", a) 151 case <-time.After(10 * time.Millisecond): 152 } 153 154 // tb resumes to being blocked on the clock 155 tb.Resume() 156 rg.Wait(1) 157 158 // unblock clock, will kick off a compaction at T=3h6m by retry 159 fc.Advance(tb.getRetryInterval()) 160 161 // T=3h6m 162 a, err := compactable.Wait(1) 163 if err != nil { 164 t.Fatal(err) 165 } 166 167 // compact the revision from hour 2:06 168 wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} 169 if !reflect.DeepEqual(a[0].Params[0], wreq) { 170 t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) 171 } 172} 173