1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package watch 18 19import ( 20 "context" 21 "reflect" 22 goruntime "runtime" 23 "sort" 24 "testing" 25 "time" 26 27 "github.com/davecgh/go-spew/spew" 28 29 corev1 "k8s.io/api/core/v1" 30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 31 "k8s.io/apimachinery/pkg/runtime" 32 "k8s.io/apimachinery/pkg/runtime/schema" 33 "k8s.io/apimachinery/pkg/util/diff" 34 "k8s.io/apimachinery/pkg/watch" 35 fakeclientset "k8s.io/client-go/kubernetes/fake" 36 testcore "k8s.io/client-go/testing" 37 "k8s.io/client-go/tools/cache" 38) 39 40// TestEventProcessorExit is expected to timeout if the event processor fails 41// to exit when stopped. 42func TestEventProcessorExit(t *testing.T) { 43 event := watch.Event{} 44 45 tests := []struct { 46 name string 47 write func(e *eventProcessor) 48 }{ 49 { 50 name: "exit on blocked read", 51 write: func(e *eventProcessor) { 52 e.push(event) 53 }, 54 }, 55 { 56 name: "exit on blocked write", 57 write: func(e *eventProcessor) { 58 e.push(event) 59 e.push(event) 60 }, 61 }, 62 } 63 for _, test := range tests { 64 t.Run(test.name, func(t *testing.T) { 65 out := make(chan watch.Event) 66 e := newEventProcessor(out) 67 68 test.write(e) 69 70 exited := make(chan struct{}) 71 go func() { 72 e.run() 73 close(exited) 74 }() 75 76 <-out 77 e.stop() 78 goruntime.Gosched() 79 <-exited 80 }) 81 } 82} 83 84type apiInt int 85 86func (apiInt) GetObjectKind() schema.ObjectKind { return nil } 87func (apiInt) DeepCopyObject() runtime.Object { return nil } 88 89func TestEventProcessorOrdersEvents(t *testing.T) { 90 out := make(chan watch.Event) 91 e := newEventProcessor(out) 92 go e.run() 93 94 numProcessed := 0 95 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 96 go func() { 97 for i := 0; i < 1000; i++ { 98 e := <-out 99 if got, want := int(e.Object.(apiInt)), i; got != want { 100 t.Errorf("unexpected event: got=%d, want=%d", got, want) 101 } 102 numProcessed++ 103 } 104 cancel() 105 }() 106 107 for i := 0; i < 1000; i++ { 108 e.push(watch.Event{Object: apiInt(i)}) 109 } 110 111 <-ctx.Done() 112 e.stop() 113 114 if numProcessed != 1000 { 115 t.Errorf("unexpected number of events processed: %d", numProcessed) 116 } 117 118} 119 120type byEventTypeAndName []watch.Event 121 122func (a byEventTypeAndName) Len() int { return len(a) } 123func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 124func (a byEventTypeAndName) Less(i, j int) bool { 125 if a[i].Type < a[j].Type { 126 return true 127 } 128 129 if a[i].Type > a[j].Type { 130 return false 131 } 132 133 return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name 134} 135 136func TestNewInformerWatcher(t *testing.T) { 137 // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. 138 tt := []struct { 139 name string 140 objects []runtime.Object 141 events []watch.Event 142 }{ 143 { 144 name: "basic test", 145 objects: []runtime.Object{ 146 &corev1.Secret{ 147 ObjectMeta: metav1.ObjectMeta{ 148 Name: "pod-1", 149 }, 150 StringData: map[string]string{ 151 "foo-1": "initial", 152 }, 153 }, 154 &corev1.Secret{ 155 ObjectMeta: metav1.ObjectMeta{ 156 Name: "pod-2", 157 }, 158 StringData: map[string]string{ 159 "foo-2": "initial", 160 }, 161 }, 162 &corev1.Secret{ 163 ObjectMeta: metav1.ObjectMeta{ 164 Name: "pod-3", 165 }, 166 StringData: map[string]string{ 167 "foo-3": "initial", 168 }, 169 }, 170 }, 171 events: []watch.Event{ 172 { 173 Type: watch.Added, 174 Object: &corev1.Secret{ 175 ObjectMeta: metav1.ObjectMeta{ 176 Name: "pod-4", 177 }, 178 StringData: map[string]string{ 179 "foo-4": "initial", 180 }, 181 }, 182 }, 183 { 184 Type: watch.Modified, 185 Object: &corev1.Secret{ 186 ObjectMeta: metav1.ObjectMeta{ 187 Name: "pod-2", 188 }, 189 StringData: map[string]string{ 190 "foo-2": "new", 191 }, 192 }, 193 }, 194 { 195 Type: watch.Deleted, 196 Object: &corev1.Secret{ 197 ObjectMeta: metav1.ObjectMeta{ 198 Name: "pod-3", 199 }, 200 }, 201 }, 202 }, 203 }, 204 } 205 206 for _, tc := range tt { 207 t.Run(tc.name, func(t *testing.T) { 208 var expected []watch.Event 209 for _, o := range tc.objects { 210 expected = append(expected, watch.Event{ 211 Type: watch.Added, 212 Object: o.DeepCopyObject(), 213 }) 214 } 215 for _, e := range tc.events { 216 expected = append(expected, *e.DeepCopy()) 217 } 218 219 fake := fakeclientset.NewSimpleClientset(tc.objects...) 220 fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false) 221 fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil)) 222 223 for _, e := range tc.events { 224 fakeWatch.Action(e.Type, e.Object) 225 } 226 227 lw := &cache.ListWatch{ 228 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { 229 return fake.CoreV1().Secrets("").List(options) 230 }, 231 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { 232 return fake.CoreV1().Secrets("").Watch(options) 233 }, 234 } 235 _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) 236 237 var result []watch.Event 238 loop: 239 for { 240 var event watch.Event 241 var ok bool 242 select { 243 case event, ok = <-w.ResultChan(): 244 if !ok { 245 t.Errorf("Failed to read event: channel is already closed!") 246 return 247 } 248 249 result = append(result, *event.DeepCopy()) 250 case <-time.After(time.Second * 1): 251 // All the events are buffered -> this means we are done 252 // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event 253 break loop 254 } 255 } 256 257 // Informers don't guarantee event order so we need to sort these arrays to compare them 258 sort.Sort(byEventTypeAndName(expected)) 259 sort.Sort(byEventTypeAndName(result)) 260 261 if !reflect.DeepEqual(expected, result) { 262 t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result))) 263 return 264 } 265 266 // Fill in some data to test watch closing while there are some events to be read 267 for _, e := range tc.events { 268 fakeWatch.Action(e.Type, e.Object) 269 } 270 271 // Stop before reading all the data to make sure the informer can deal with closed channel 272 w.Stop() 273 274 <-done 275 }) 276 } 277 278} 279