1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"errors"
21	"fmt"
22	"reflect"
23	"strings"
24	"testing"
25	"time"
26
27	v1 "k8s.io/api/core/v1"
28	"k8s.io/apimachinery/pkg/api/resource"
29	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30	"k8s.io/apimachinery/pkg/types"
31	"k8s.io/kubernetes/pkg/scheduler/framework"
32	schedutil "k8s.io/kubernetes/pkg/scheduler/util"
33)
34
35func deepEqualWithoutGeneration(actual *nodeInfoListItem, expected *framework.NodeInfo) error {
36	if (actual == nil) != (expected == nil) {
37		return errors.New("one of the actual or expected is nil and the other is not")
38	}
39	// Ignore generation field.
40	if actual != nil {
41		actual.info.Generation = 0
42	}
43	if expected != nil {
44		expected.Generation = 0
45	}
46	if actual != nil && !reflect.DeepEqual(actual.info, expected) {
47		return fmt.Errorf("got node info %s, want %s", actual.info, expected)
48	}
49	return nil
50}
51
52type hostPortInfoParam struct {
53	protocol, ip string
54	port         int32
55}
56
57type hostPortInfoBuilder struct {
58	inputs []hostPortInfoParam
59}
60
61func newHostPortInfoBuilder() *hostPortInfoBuilder {
62	return &hostPortInfoBuilder{}
63}
64
65func (b *hostPortInfoBuilder) add(protocol, ip string, port int32) *hostPortInfoBuilder {
66	b.inputs = append(b.inputs, hostPortInfoParam{protocol, ip, port})
67	return b
68}
69
70func (b *hostPortInfoBuilder) build() framework.HostPortInfo {
71	res := make(framework.HostPortInfo)
72	for _, param := range b.inputs {
73		res.Add(param.ip, param.protocol, param.port)
74	}
75	return res
76}
77
78func newNodeInfo(requestedResource *framework.Resource,
79	nonzeroRequest *framework.Resource,
80	pods []*v1.Pod,
81	usedPorts framework.HostPortInfo,
82	imageStates map[string]*framework.ImageStateSummary,
83) *framework.NodeInfo {
84	nodeInfo := framework.NewNodeInfo(pods...)
85	nodeInfo.Requested = requestedResource
86	nodeInfo.NonZeroRequested = nonzeroRequest
87	nodeInfo.UsedPorts = usedPorts
88	nodeInfo.ImageStates = imageStates
89	return nodeInfo
90}
91
92// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
93// on node level.
94func TestAssumePodScheduled(t *testing.T) {
95	nodeName := "node"
96	testPods := []*v1.Pod{
97		makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
98		makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
99		makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
100		makeBasePod(t, nodeName, "test-nonzero", "", "", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
101		makeBasePod(t, nodeName, "test", "100m", "500", "example.com/foo:3", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
102		makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "example.com/foo:5", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
103		makeBasePod(t, nodeName, "test", "100m", "500", "random-invalid-extended-key:100", []v1.ContainerPort{{}}),
104	}
105
106	tests := []struct {
107		pods []*v1.Pod
108
109		wNodeInfo *framework.NodeInfo
110	}{{
111		pods: []*v1.Pod{testPods[0]},
112		wNodeInfo: newNodeInfo(
113			&framework.Resource{
114				MilliCPU: 100,
115				Memory:   500,
116			},
117			&framework.Resource{
118				MilliCPU: 100,
119				Memory:   500,
120			},
121			[]*v1.Pod{testPods[0]},
122			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
123			make(map[string]*framework.ImageStateSummary),
124		),
125	}, {
126		pods: []*v1.Pod{testPods[1], testPods[2]},
127		wNodeInfo: newNodeInfo(
128			&framework.Resource{
129				MilliCPU: 300,
130				Memory:   1524,
131			},
132			&framework.Resource{
133				MilliCPU: 300,
134				Memory:   1524,
135			},
136			[]*v1.Pod{testPods[1], testPods[2]},
137			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
138			make(map[string]*framework.ImageStateSummary),
139		),
140	}, { // test non-zero request
141		pods: []*v1.Pod{testPods[3]},
142		wNodeInfo: newNodeInfo(
143			&framework.Resource{
144				MilliCPU: 0,
145				Memory:   0,
146			},
147			&framework.Resource{
148				MilliCPU: schedutil.DefaultMilliCPURequest,
149				Memory:   schedutil.DefaultMemoryRequest,
150			},
151			[]*v1.Pod{testPods[3]},
152			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
153			make(map[string]*framework.ImageStateSummary),
154		),
155	}, {
156		pods: []*v1.Pod{testPods[4]},
157		wNodeInfo: newNodeInfo(
158			&framework.Resource{
159				MilliCPU:        100,
160				Memory:          500,
161				ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3},
162			},
163			&framework.Resource{
164				MilliCPU: 100,
165				Memory:   500,
166			},
167			[]*v1.Pod{testPods[4]},
168			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
169			make(map[string]*framework.ImageStateSummary),
170		),
171	}, {
172		pods: []*v1.Pod{testPods[4], testPods[5]},
173		wNodeInfo: newNodeInfo(
174			&framework.Resource{
175				MilliCPU:        300,
176				Memory:          1524,
177				ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8},
178			},
179			&framework.Resource{
180				MilliCPU: 300,
181				Memory:   1524,
182			},
183			[]*v1.Pod{testPods[4], testPods[5]},
184			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
185			make(map[string]*framework.ImageStateSummary),
186		),
187	}, {
188		pods: []*v1.Pod{testPods[6]},
189		wNodeInfo: newNodeInfo(
190			&framework.Resource{
191				MilliCPU: 100,
192				Memory:   500,
193			},
194			&framework.Resource{
195				MilliCPU: 100,
196				Memory:   500,
197			},
198			[]*v1.Pod{testPods[6]},
199			newHostPortInfoBuilder().build(),
200			make(map[string]*framework.ImageStateSummary),
201		),
202	},
203	}
204
205	for i, tt := range tests {
206		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
207			cache := newSchedulerCache(time.Second, time.Second, nil)
208			for _, pod := range tt.pods {
209				if err := cache.AssumePod(pod); err != nil {
210					t.Fatalf("AssumePod failed: %v", err)
211				}
212			}
213			n := cache.nodes[nodeName]
214			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
215				t.Error(err)
216			}
217
218			for _, pod := range tt.pods {
219				if err := cache.ForgetPod(pod); err != nil {
220					t.Fatalf("ForgetPod failed: %v", err)
221				}
222				if err := isForgottenFromCache(pod, cache); err != nil {
223					t.Errorf("pod %s: %v", pod.Name, err)
224				}
225			}
226		})
227	}
228}
229
230type testExpirePodStruct struct {
231	pod         *v1.Pod
232	finishBind  bool
233	assumedTime time.Time
234}
235
236func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error {
237	if err := cache.AssumePod(pod); err != nil {
238		return err
239	}
240	return cache.finishBinding(pod, assumedTime)
241}
242
243// TestExpirePod tests that assumed pods will be removed if expired.
244// The removal will be reflected in node info.
245func TestExpirePod(t *testing.T) {
246	nodeName := "node"
247	testPods := []*v1.Pod{
248		makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
249		makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
250		makeBasePod(t, nodeName, "test-3", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
251	}
252	now := time.Now()
253	ttl := 10 * time.Second
254	tests := []struct {
255		pods        []*testExpirePodStruct
256		cleanupTime time.Time
257
258		wNodeInfo *framework.NodeInfo
259	}{{ // assumed pod would expires
260		pods: []*testExpirePodStruct{
261			{pod: testPods[0], finishBind: true, assumedTime: now},
262		},
263		cleanupTime: now.Add(2 * ttl),
264		wNodeInfo:   nil,
265	}, { // first one would expire, second and third would not.
266		pods: []*testExpirePodStruct{
267			{pod: testPods[0], finishBind: true, assumedTime: now},
268			{pod: testPods[1], finishBind: true, assumedTime: now.Add(3 * ttl / 2)},
269			{pod: testPods[2]},
270		},
271		cleanupTime: now.Add(2 * ttl),
272		wNodeInfo: newNodeInfo(
273			&framework.Resource{
274				MilliCPU: 400,
275				Memory:   2048,
276			},
277			&framework.Resource{
278				MilliCPU: 400,
279				Memory:   2048,
280			},
281			// Order gets altered when removing pods.
282			[]*v1.Pod{testPods[2], testPods[1]},
283			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
284			make(map[string]*framework.ImageStateSummary),
285		),
286	}}
287
288	for i, tt := range tests {
289		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
290			cache := newSchedulerCache(ttl, time.Second, nil)
291
292			for _, pod := range tt.pods {
293				if err := cache.AssumePod(pod.pod); err != nil {
294					t.Fatal(err)
295				}
296				if !pod.finishBind {
297					continue
298				}
299				if err := cache.finishBinding(pod.pod, pod.assumedTime); err != nil {
300					t.Fatal(err)
301				}
302			}
303			// pods that got bound and have assumedTime + ttl < cleanupTime will get
304			// expired and removed
305			cache.cleanupAssumedPods(tt.cleanupTime)
306			n := cache.nodes[nodeName]
307			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
308				t.Error(err)
309			}
310		})
311	}
312}
313
314// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
315// The pod info should still exist after manually expiring unconfirmed pods.
316func TestAddPodWillConfirm(t *testing.T) {
317	nodeName := "node"
318	now := time.Now()
319	ttl := 10 * time.Second
320
321	testPods := []*v1.Pod{
322		makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
323		makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
324	}
325	tests := []struct {
326		podsToAssume []*v1.Pod
327		podsToAdd    []*v1.Pod
328
329		wNodeInfo *framework.NodeInfo
330	}{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
331		podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
332		podsToAdd:    []*v1.Pod{testPods[0]},
333		wNodeInfo: newNodeInfo(
334			&framework.Resource{
335				MilliCPU: 100,
336				Memory:   500,
337			},
338			&framework.Resource{
339				MilliCPU: 100,
340				Memory:   500,
341			},
342			[]*v1.Pod{testPods[0]},
343			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
344			make(map[string]*framework.ImageStateSummary),
345		),
346	}}
347
348	for i, tt := range tests {
349		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
350			cache := newSchedulerCache(ttl, time.Second, nil)
351			for _, podToAssume := range tt.podsToAssume {
352				if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
353					t.Fatalf("assumePod failed: %v", err)
354				}
355			}
356			for _, podToAdd := range tt.podsToAdd {
357				if err := cache.AddPod(podToAdd); err != nil {
358					t.Fatalf("AddPod failed: %v", err)
359				}
360			}
361			cache.cleanupAssumedPods(now.Add(2 * ttl))
362			// check after expiration. confirmed pods shouldn't be expired.
363			n := cache.nodes[nodeName]
364			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
365				t.Error(err)
366			}
367		})
368	}
369}
370
371func TestDump(t *testing.T) {
372	nodeName := "node"
373	now := time.Now()
374	ttl := 10 * time.Second
375
376	testPods := []*v1.Pod{
377		makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
378		makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
379	}
380	tests := []struct {
381		podsToAssume []*v1.Pod
382		podsToAdd    []*v1.Pod
383	}{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
384		podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
385		podsToAdd:    []*v1.Pod{testPods[0]},
386	}}
387
388	for i, tt := range tests {
389		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
390			cache := newSchedulerCache(ttl, time.Second, nil)
391			for _, podToAssume := range tt.podsToAssume {
392				if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
393					t.Errorf("assumePod failed: %v", err)
394				}
395			}
396			for _, podToAdd := range tt.podsToAdd {
397				if err := cache.AddPod(podToAdd); err != nil {
398					t.Errorf("AddPod failed: %v", err)
399				}
400			}
401
402			snapshot := cache.Dump()
403			if len(snapshot.Nodes) != len(cache.nodes) {
404				t.Errorf("Unequal number of nodes in the cache and its snapshot. expected: %v, got: %v", len(cache.nodes), len(snapshot.Nodes))
405			}
406			for name, ni := range snapshot.Nodes {
407				nItem := cache.nodes[name]
408				if !reflect.DeepEqual(ni, nItem.info) {
409					t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni)
410				}
411			}
412			if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) {
413				t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
414			}
415		})
416	}
417}
418
419// TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
420func TestAddPodWillReplaceAssumed(t *testing.T) {
421	now := time.Now()
422	ttl := 10 * time.Second
423
424	assumedPod := makeBasePod(t, "assumed-node-1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
425	addedPod := makeBasePod(t, "actual-node", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
426	updatedPod := makeBasePod(t, "actual-node", "test-1", "200m", "500", "", []v1.ContainerPort{{HostPort: 90}})
427
428	tests := []struct {
429		podsToAssume []*v1.Pod
430		podsToAdd    []*v1.Pod
431		podsToUpdate [][]*v1.Pod
432
433		wNodeInfo map[string]*framework.NodeInfo
434	}{{
435		podsToAssume: []*v1.Pod{assumedPod.DeepCopy()},
436		podsToAdd:    []*v1.Pod{addedPod.DeepCopy()},
437		podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}},
438		wNodeInfo: map[string]*framework.NodeInfo{
439			"assumed-node": nil,
440			"actual-node": newNodeInfo(
441				&framework.Resource{
442					MilliCPU: 200,
443					Memory:   500,
444				},
445				&framework.Resource{
446					MilliCPU: 200,
447					Memory:   500,
448				},
449				[]*v1.Pod{updatedPod.DeepCopy()},
450				newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
451				make(map[string]*framework.ImageStateSummary),
452			),
453		},
454	}}
455
456	for i, tt := range tests {
457		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
458			cache := newSchedulerCache(ttl, time.Second, nil)
459			for _, podToAssume := range tt.podsToAssume {
460				if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
461					t.Fatalf("assumePod failed: %v", err)
462				}
463			}
464			for _, podToAdd := range tt.podsToAdd {
465				if err := cache.AddPod(podToAdd); err != nil {
466					t.Fatalf("AddPod failed: %v", err)
467				}
468			}
469			for _, podToUpdate := range tt.podsToUpdate {
470				if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil {
471					t.Fatalf("UpdatePod failed: %v", err)
472				}
473			}
474			for nodeName, expected := range tt.wNodeInfo {
475				n := cache.nodes[nodeName]
476				if err := deepEqualWithoutGeneration(n, expected); err != nil {
477					t.Errorf("node %q: %v", nodeName, err)
478				}
479			}
480		})
481	}
482}
483
484// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
485func TestAddPodAfterExpiration(t *testing.T) {
486	nodeName := "node"
487	ttl := 10 * time.Second
488	basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
489	tests := []struct {
490		pod *v1.Pod
491
492		wNodeInfo *framework.NodeInfo
493	}{{
494		pod: basePod,
495		wNodeInfo: newNodeInfo(
496			&framework.Resource{
497				MilliCPU: 100,
498				Memory:   500,
499			},
500			&framework.Resource{
501				MilliCPU: 100,
502				Memory:   500,
503			},
504			[]*v1.Pod{basePod},
505			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
506			make(map[string]*framework.ImageStateSummary),
507		),
508	}}
509
510	for i, tt := range tests {
511		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
512			now := time.Now()
513			cache := newSchedulerCache(ttl, time.Second, nil)
514			if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
515				t.Fatalf("assumePod failed: %v", err)
516			}
517			cache.cleanupAssumedPods(now.Add(2 * ttl))
518			// It should be expired and removed.
519			if err := isForgottenFromCache(tt.pod, cache); err != nil {
520				t.Error(err)
521			}
522			if err := cache.AddPod(tt.pod); err != nil {
523				t.Fatalf("AddPod failed: %v", err)
524			}
525			// check after expiration. confirmed pods shouldn't be expired.
526			n := cache.nodes[nodeName]
527			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
528				t.Error(err)
529			}
530		})
531	}
532}
533
534// TestUpdatePod tests that a pod will be updated if added before.
535func TestUpdatePod(t *testing.T) {
536	nodeName := "node"
537	ttl := 10 * time.Second
538	testPods := []*v1.Pod{
539		makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
540		makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
541	}
542	tests := []struct {
543		podsToAdd    []*v1.Pod
544		podsToUpdate []*v1.Pod
545
546		wNodeInfo []*framework.NodeInfo
547	}{{ // add a pod and then update it twice
548		podsToAdd:    []*v1.Pod{testPods[0]},
549		podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
550		wNodeInfo: []*framework.NodeInfo{newNodeInfo(
551			&framework.Resource{
552				MilliCPU: 200,
553				Memory:   1024,
554			},
555			&framework.Resource{
556				MilliCPU: 200,
557				Memory:   1024,
558			},
559			[]*v1.Pod{testPods[1]},
560			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
561			make(map[string]*framework.ImageStateSummary),
562		), newNodeInfo(
563			&framework.Resource{
564				MilliCPU: 100,
565				Memory:   500,
566			},
567			&framework.Resource{
568				MilliCPU: 100,
569				Memory:   500,
570			},
571			[]*v1.Pod{testPods[0]},
572			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
573			make(map[string]*framework.ImageStateSummary),
574		)},
575	}}
576
577	for i, tt := range tests {
578		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
579			cache := newSchedulerCache(ttl, time.Second, nil)
580			for _, podToAdd := range tt.podsToAdd {
581				if err := cache.AddPod(podToAdd); err != nil {
582					t.Fatalf("AddPod failed: %v", err)
583				}
584			}
585
586			for j := range tt.podsToUpdate {
587				if j == 0 {
588					continue
589				}
590				if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil {
591					t.Fatalf("UpdatePod failed: %v", err)
592				}
593				// check after expiration. confirmed pods shouldn't be expired.
594				n := cache.nodes[nodeName]
595				if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil {
596					t.Errorf("update %d: %v", j, err)
597				}
598			}
599		})
600	}
601}
602
603// TestUpdatePodAndGet tests get always return latest pod state
604func TestUpdatePodAndGet(t *testing.T) {
605	nodeName := "node"
606	ttl := 10 * time.Second
607	testPods := []*v1.Pod{
608		makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
609		makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
610	}
611	tests := []struct {
612		pod *v1.Pod
613
614		podToUpdate *v1.Pod
615		handler     func(cache Cache, pod *v1.Pod) error
616
617		assumePod bool
618	}{
619		{
620			pod: testPods[0],
621
622			podToUpdate: testPods[0],
623			handler: func(cache Cache, pod *v1.Pod) error {
624				return cache.AssumePod(pod)
625			},
626			assumePod: true,
627		},
628		{
629			pod: testPods[0],
630
631			podToUpdate: testPods[1],
632			handler: func(cache Cache, pod *v1.Pod) error {
633				return cache.AddPod(pod)
634			},
635			assumePod: false,
636		},
637	}
638
639	for i, tt := range tests {
640		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
641			cache := newSchedulerCache(ttl, time.Second, nil)
642
643			if err := tt.handler(cache, tt.pod); err != nil {
644				t.Fatalf("unexpected err: %v", err)
645			}
646
647			if !tt.assumePod {
648				if err := cache.UpdatePod(tt.pod, tt.podToUpdate); err != nil {
649					t.Fatalf("UpdatePod failed: %v", err)
650				}
651			}
652
653			cachedPod, err := cache.GetPod(tt.pod)
654			if err != nil {
655				t.Fatalf("GetPod failed: %v", err)
656			}
657			if !reflect.DeepEqual(tt.podToUpdate, cachedPod) {
658				t.Fatalf("pod get=%s, want=%s", cachedPod, tt.podToUpdate)
659			}
660		})
661	}
662}
663
664// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
665func TestExpireAddUpdatePod(t *testing.T) {
666	nodeName := "node"
667	ttl := 10 * time.Second
668	testPods := []*v1.Pod{
669		makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
670		makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
671	}
672	tests := []struct {
673		podsToAssume []*v1.Pod
674		podsToAdd    []*v1.Pod
675		podsToUpdate []*v1.Pod
676
677		wNodeInfo []*framework.NodeInfo
678	}{{ // Pod is assumed, expired, and added. Then it would be updated twice.
679		podsToAssume: []*v1.Pod{testPods[0]},
680		podsToAdd:    []*v1.Pod{testPods[0]},
681		podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
682		wNodeInfo: []*framework.NodeInfo{newNodeInfo(
683			&framework.Resource{
684				MilliCPU: 200,
685				Memory:   1024,
686			},
687			&framework.Resource{
688				MilliCPU: 200,
689				Memory:   1024,
690			},
691			[]*v1.Pod{testPods[1]},
692			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
693			make(map[string]*framework.ImageStateSummary),
694		), newNodeInfo(
695			&framework.Resource{
696				MilliCPU: 100,
697				Memory:   500,
698			},
699			&framework.Resource{
700				MilliCPU: 100,
701				Memory:   500,
702			},
703			[]*v1.Pod{testPods[0]},
704			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
705			make(map[string]*framework.ImageStateSummary),
706		)},
707	}}
708
709	for i, tt := range tests {
710		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
711			now := time.Now()
712			cache := newSchedulerCache(ttl, time.Second, nil)
713			for _, podToAssume := range tt.podsToAssume {
714				if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
715					t.Fatalf("assumePod failed: %v", err)
716				}
717			}
718			cache.cleanupAssumedPods(now.Add(2 * ttl))
719
720			for _, podToAdd := range tt.podsToAdd {
721				if err := cache.AddPod(podToAdd); err != nil {
722					t.Fatalf("AddPod failed: %v", err)
723				}
724			}
725
726			for j := range tt.podsToUpdate {
727				if j == 0 {
728					continue
729				}
730				if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil {
731					t.Fatalf("UpdatePod failed: %v", err)
732				}
733				// check after expiration. confirmed pods shouldn't be expired.
734				n := cache.nodes[nodeName]
735				if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil {
736					t.Errorf("update %d: %v", j, err)
737				}
738			}
739		})
740	}
741}
742
743func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
744	req := v1.ResourceList{
745		v1.ResourceEphemeralStorage: resource.MustParse(ephemeralStorage),
746	}
747	return &v1.Pod{
748		ObjectMeta: metav1.ObjectMeta{
749			Namespace: "default-namespace",
750			Name:      "pod-with-ephemeral-storage",
751			UID:       types.UID("pod-with-ephemeral-storage"),
752		},
753		Spec: v1.PodSpec{
754			Containers: []v1.Container{{
755				Resources: v1.ResourceRequirements{
756					Requests: req,
757				},
758			}},
759			NodeName: nodeName,
760		},
761	}
762}
763
764func TestEphemeralStorageResource(t *testing.T) {
765	nodeName := "node"
766	podE := makePodWithEphemeralStorage(nodeName, "500")
767	tests := []struct {
768		pod       *v1.Pod
769		wNodeInfo *framework.NodeInfo
770	}{
771		{
772			pod: podE,
773			wNodeInfo: newNodeInfo(
774				&framework.Resource{
775					EphemeralStorage: 500,
776				},
777				&framework.Resource{
778					MilliCPU: schedutil.DefaultMilliCPURequest,
779					Memory:   schedutil.DefaultMemoryRequest,
780				},
781				[]*v1.Pod{podE},
782				framework.HostPortInfo{},
783				make(map[string]*framework.ImageStateSummary),
784			),
785		},
786	}
787	for i, tt := range tests {
788		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
789			cache := newSchedulerCache(time.Second, time.Second, nil)
790			if err := cache.AddPod(tt.pod); err != nil {
791				t.Fatalf("AddPod failed: %v", err)
792			}
793			n := cache.nodes[nodeName]
794			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
795				t.Error(err)
796			}
797
798			if err := cache.RemovePod(tt.pod); err != nil {
799				t.Fatalf("RemovePod failed: %v", err)
800			}
801			if _, err := cache.GetPod(tt.pod); err == nil {
802				t.Errorf("pod was not deleted")
803			}
804		})
805	}
806}
807
808// TestRemovePod tests after added pod is removed, its information should also be subtracted.
809func TestRemovePod(t *testing.T) {
810	basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
811	tests := []struct {
812		nodes     []*v1.Node
813		pod       *v1.Pod
814		wNodeInfo *framework.NodeInfo
815	}{{
816		nodes: []*v1.Node{
817			{
818				ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
819			},
820			{
821				ObjectMeta: metav1.ObjectMeta{Name: "node-2"},
822			},
823		},
824		pod: basePod,
825		wNodeInfo: newNodeInfo(
826			&framework.Resource{
827				MilliCPU: 100,
828				Memory:   500,
829			},
830			&framework.Resource{
831				MilliCPU: 100,
832				Memory:   500,
833			},
834			[]*v1.Pod{basePod},
835			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
836			make(map[string]*framework.ImageStateSummary),
837		),
838	}}
839
840	for i, tt := range tests {
841		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
842			nodeName := tt.pod.Spec.NodeName
843			cache := newSchedulerCache(time.Second, time.Second, nil)
844			// Add pod succeeds even before adding the nodes.
845			if err := cache.AddPod(tt.pod); err != nil {
846				t.Fatalf("AddPod failed: %v", err)
847			}
848			n := cache.nodes[nodeName]
849			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
850				t.Error(err)
851			}
852			for _, n := range tt.nodes {
853				cache.AddNode(n)
854			}
855
856			if err := cache.RemovePod(tt.pod); err != nil {
857				t.Fatalf("RemovePod failed: %v", err)
858			}
859
860			if _, err := cache.GetPod(tt.pod); err == nil {
861				t.Errorf("pod was not deleted")
862			}
863
864			// Node that owned the Pod should be at the head of the list.
865			if cache.headNode.info.Node().Name != nodeName {
866				t.Errorf("node %q is not at the head of the list", nodeName)
867			}
868		})
869	}
870}
871
872func TestForgetPod(t *testing.T) {
873	nodeName := "node"
874	basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
875	pods := []*v1.Pod{basePod}
876	now := time.Now()
877	ttl := 10 * time.Second
878
879	cache := newSchedulerCache(ttl, time.Second, nil)
880	for _, pod := range pods {
881		if err := assumeAndFinishBinding(cache, pod, now); err != nil {
882			t.Fatalf("assumePod failed: %v", err)
883		}
884		isAssumed, err := cache.IsAssumedPod(pod)
885		if err != nil {
886			t.Fatalf("IsAssumedPod failed: %v.", err)
887		}
888		if !isAssumed {
889			t.Fatalf("Pod is expected to be assumed.")
890		}
891		assumedPod, err := cache.GetPod(pod)
892		if err != nil {
893			t.Fatalf("GetPod failed: %v.", err)
894		}
895		if assumedPod.Namespace != pod.Namespace {
896			t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
897		}
898		if assumedPod.Name != pod.Name {
899			t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
900		}
901	}
902	for _, pod := range pods {
903		if err := cache.ForgetPod(pod); err != nil {
904			t.Fatalf("ForgetPod failed: %v", err)
905		}
906		if err := isForgottenFromCache(pod, cache); err != nil {
907			t.Errorf("pod %q: %v", pod.Name, err)
908		}
909	}
910}
911
912// buildNodeInfo creates a NodeInfo by simulating node operations in cache.
913func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *framework.NodeInfo {
914	expected := framework.NewNodeInfo()
915	expected.SetNode(node)
916	expected.Allocatable = framework.NewResource(node.Status.Allocatable)
917	expected.Generation++
918	for _, pod := range pods {
919		expected.AddPod(pod)
920	}
921	return expected
922}
923
924// TestNodeOperators tests node operations of cache, including add, update
925// and remove.
926func TestNodeOperators(t *testing.T) {
927	// Test datas
928	nodeName := "test-node"
929	cpu1 := resource.MustParse("1000m")
930	mem100m := resource.MustParse("100m")
931	cpuHalf := resource.MustParse("500m")
932	mem50m := resource.MustParse("50m")
933	resourceFooName := "example.com/foo"
934	resourceFoo := resource.MustParse("1")
935
936	tests := []struct {
937		node *v1.Node
938		pods []*v1.Pod
939	}{
940		{
941			node: &v1.Node{
942				ObjectMeta: metav1.ObjectMeta{
943					Name: nodeName,
944				},
945				Status: v1.NodeStatus{
946					Allocatable: v1.ResourceList{
947						v1.ResourceCPU:                   cpu1,
948						v1.ResourceMemory:                mem100m,
949						v1.ResourceName(resourceFooName): resourceFoo,
950					},
951				},
952				Spec: v1.NodeSpec{
953					Taints: []v1.Taint{
954						{
955							Key:    "test-key",
956							Value:  "test-value",
957							Effect: v1.TaintEffectPreferNoSchedule,
958						},
959					},
960				},
961			},
962			pods: []*v1.Pod{
963				{
964					ObjectMeta: metav1.ObjectMeta{
965						Name: "pod1",
966						UID:  types.UID("pod1"),
967					},
968					Spec: v1.PodSpec{
969						NodeName: nodeName,
970						Containers: []v1.Container{
971							{
972								Resources: v1.ResourceRequirements{
973									Requests: v1.ResourceList{
974										v1.ResourceCPU:    cpuHalf,
975										v1.ResourceMemory: mem50m,
976									},
977								},
978								Ports: []v1.ContainerPort{
979									{
980										Name:          "http",
981										HostPort:      80,
982										ContainerPort: 80,
983									},
984								},
985							},
986						},
987					},
988				},
989			},
990		},
991		{
992			node: &v1.Node{
993				ObjectMeta: metav1.ObjectMeta{
994					Name: nodeName,
995				},
996				Status: v1.NodeStatus{
997					Allocatable: v1.ResourceList{
998						v1.ResourceCPU:                   cpu1,
999						v1.ResourceMemory:                mem100m,
1000						v1.ResourceName(resourceFooName): resourceFoo,
1001					},
1002				},
1003				Spec: v1.NodeSpec{
1004					Taints: []v1.Taint{
1005						{
1006							Key:    "test-key",
1007							Value:  "test-value",
1008							Effect: v1.TaintEffectPreferNoSchedule,
1009						},
1010					},
1011				},
1012			},
1013			pods: []*v1.Pod{
1014				{
1015					ObjectMeta: metav1.ObjectMeta{
1016						Name: "pod1",
1017						UID:  types.UID("pod1"),
1018					},
1019					Spec: v1.PodSpec{
1020						NodeName: nodeName,
1021						Containers: []v1.Container{
1022							{
1023								Resources: v1.ResourceRequirements{
1024									Requests: v1.ResourceList{
1025										v1.ResourceCPU:    cpuHalf,
1026										v1.ResourceMemory: mem50m,
1027									},
1028								},
1029							},
1030						},
1031					},
1032				},
1033				{
1034					ObjectMeta: metav1.ObjectMeta{
1035						Name: "pod2",
1036						UID:  types.UID("pod2"),
1037					},
1038					Spec: v1.PodSpec{
1039						NodeName: nodeName,
1040						Containers: []v1.Container{
1041							{
1042								Resources: v1.ResourceRequirements{
1043									Requests: v1.ResourceList{
1044										v1.ResourceCPU:    cpuHalf,
1045										v1.ResourceMemory: mem50m,
1046									},
1047								},
1048							},
1049						},
1050					},
1051				},
1052			},
1053		},
1054	}
1055
1056	for i, test := range tests {
1057		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
1058			expected := buildNodeInfo(test.node, test.pods)
1059			node := test.node
1060
1061			cache := newSchedulerCache(time.Second, time.Second, nil)
1062			cache.AddNode(node)
1063			for _, pod := range test.pods {
1064				if err := cache.AddPod(pod); err != nil {
1065					t.Fatal(err)
1066				}
1067			}
1068
1069			// Step 1: the node was added into cache successfully.
1070			got, found := cache.nodes[node.Name]
1071			if !found {
1072				t.Errorf("Failed to find node %v in internalcache.", node.Name)
1073			}
1074			nodesList, err := cache.nodeTree.list()
1075			if err != nil {
1076				t.Fatal(err)
1077			}
1078			if cache.nodeTree.numNodes != 1 || nodesList[len(nodesList)-1] != node.Name {
1079				t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
1080			}
1081
1082			// Generations are globally unique. We check in our unit tests that they are incremented correctly.
1083			expected.Generation = got.info.Generation
1084			if !reflect.DeepEqual(got.info, expected) {
1085				t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
1086			}
1087
1088			// Step 2: dump cached nodes successfully.
1089			cachedNodes := NewEmptySnapshot()
1090			if err := cache.UpdateSnapshot(cachedNodes); err != nil {
1091				t.Error(err)
1092			}
1093			newNode, found := cachedNodes.nodeInfoMap[node.Name]
1094			if !found || len(cachedNodes.nodeInfoMap) != 1 {
1095				t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
1096			}
1097			expected.Generation = newNode.Generation
1098			if !reflect.DeepEqual(newNode, expected) {
1099				t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected)
1100			}
1101
1102			// Step 3: update node attribute successfully.
1103			node.Status.Allocatable[v1.ResourceMemory] = mem50m
1104			expected.Allocatable.Memory = mem50m.Value()
1105
1106			cache.UpdateNode(nil, node)
1107			got, found = cache.nodes[node.Name]
1108			if !found {
1109				t.Errorf("Failed to find node %v in schedulertypes after UpdateNode.", node.Name)
1110			}
1111			if got.info.Generation <= expected.Generation {
1112				t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.Generation, expected.Generation)
1113			}
1114			expected.Generation = got.info.Generation
1115
1116			if !reflect.DeepEqual(got.info, expected) {
1117				t.Errorf("Failed to update node in schedulertypes:\n got: %+v \nexpected: %+v", got, expected)
1118			}
1119			// Check nodeTree after update
1120			nodesList, err = cache.nodeTree.list()
1121			if err != nil {
1122				t.Fatal(err)
1123			}
1124			if cache.nodeTree.numNodes != 1 || nodesList[len(nodesList)-1] != node.Name {
1125				t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
1126			}
1127
1128			// Step 4: the node can be removed even if it still has pods.
1129			if err := cache.RemoveNode(node); err != nil {
1130				t.Error(err)
1131			}
1132			if n, err := cache.getNodeInfo(node.Name); err != nil {
1133				t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err)
1134			} else if n != nil {
1135				t.Errorf("The node object for %v should be nil", node.Name)
1136			}
1137			// Check node is removed from nodeTree as well.
1138			nodesList, err = cache.nodeTree.list()
1139			if err != nil {
1140				t.Fatal(err)
1141			}
1142			if cache.nodeTree.numNodes != 0 || len(nodesList) != 0 {
1143				t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
1144			}
1145			// Pods are still in the pods cache.
1146			for _, p := range test.pods {
1147				if _, err := cache.GetPod(p); err != nil {
1148					t.Error(err)
1149				}
1150			}
1151
1152			// Step 5: removing pods for the removed node still succeeds.
1153			for _, p := range test.pods {
1154				if err := cache.RemovePod(p); err != nil {
1155					t.Error(err)
1156				}
1157				if _, err := cache.GetPod(p); err == nil {
1158					t.Errorf("pod %q still in cache", p.Name)
1159				}
1160			}
1161		})
1162	}
1163}
1164
1165func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
1166	// Create a few nodes to be used in tests.
1167	nodes := []*v1.Node{}
1168	for i := 0; i < 10; i++ {
1169		node := &v1.Node{
1170			ObjectMeta: metav1.ObjectMeta{
1171				Name: fmt.Sprintf("test-node%v", i),
1172			},
1173			Status: v1.NodeStatus{
1174				Allocatable: v1.ResourceList{
1175					v1.ResourceCPU:    resource.MustParse("1000m"),
1176					v1.ResourceMemory: resource.MustParse("100m"),
1177				},
1178			},
1179		}
1180		nodes = append(nodes, node)
1181	}
1182	// Create a few nodes as updated versions of the above nodes
1183	updatedNodes := []*v1.Node{}
1184	for _, n := range nodes {
1185		updatedNode := n.DeepCopy()
1186		updatedNode.Status.Allocatable = v1.ResourceList{
1187			v1.ResourceCPU:    resource.MustParse("2000m"),
1188			v1.ResourceMemory: resource.MustParse("500m"),
1189		}
1190		updatedNodes = append(updatedNodes, updatedNode)
1191	}
1192
1193	// Create a few pods for tests.
1194	pods := []*v1.Pod{}
1195	for i := 0; i < 20; i++ {
1196		pod := &v1.Pod{
1197			ObjectMeta: metav1.ObjectMeta{
1198				Name:      fmt.Sprintf("test-pod%v", i),
1199				Namespace: "test-ns",
1200				UID:       types.UID(fmt.Sprintf("test-puid%v", i)),
1201			},
1202			Spec: v1.PodSpec{
1203				NodeName: fmt.Sprintf("test-node%v", i%10),
1204			},
1205		}
1206		pods = append(pods, pod)
1207	}
1208
1209	// Create a few pods as updated versions of the above pods.
1210	updatedPods := []*v1.Pod{}
1211	for _, p := range pods {
1212		updatedPod := p.DeepCopy()
1213		priority := int32(1000)
1214		updatedPod.Spec.Priority = &priority
1215		updatedPods = append(updatedPods, updatedPod)
1216	}
1217
1218	// Add a couple of pods with affinity, on the first and seconds nodes.
1219	podsWithAffinity := []*v1.Pod{}
1220	for i := 0; i < 2; i++ {
1221		pod := &v1.Pod{
1222			ObjectMeta: metav1.ObjectMeta{
1223				Name:      fmt.Sprintf("test-pod%v", i),
1224				Namespace: "test-ns",
1225				UID:       types.UID(fmt.Sprintf("test-puid%v", i)),
1226			},
1227			Spec: v1.PodSpec{
1228				NodeName: fmt.Sprintf("test-node%v", i),
1229				Affinity: &v1.Affinity{
1230					PodAffinity: &v1.PodAffinity{},
1231				},
1232			},
1233		}
1234		podsWithAffinity = append(podsWithAffinity, pod)
1235	}
1236
1237	var cache *schedulerCache
1238	var snapshot *Snapshot
1239	type operation = func(t *testing.T)
1240
1241	addNode := func(i int) operation {
1242		return func(t *testing.T) {
1243			cache.AddNode(nodes[i])
1244		}
1245	}
1246	removeNode := func(i int) operation {
1247		return func(t *testing.T) {
1248			if err := cache.RemoveNode(nodes[i]); err != nil {
1249				t.Error(err)
1250			}
1251		}
1252	}
1253	updateNode := func(i int) operation {
1254		return func(t *testing.T) {
1255			cache.UpdateNode(nodes[i], updatedNodes[i])
1256		}
1257	}
1258	addPod := func(i int) operation {
1259		return func(t *testing.T) {
1260			if err := cache.AddPod(pods[i]); err != nil {
1261				t.Error(err)
1262			}
1263		}
1264	}
1265	addPodWithAffinity := func(i int) operation {
1266		return func(t *testing.T) {
1267			if err := cache.AddPod(podsWithAffinity[i]); err != nil {
1268				t.Error(err)
1269			}
1270		}
1271	}
1272	removePod := func(i int) operation {
1273		return func(t *testing.T) {
1274			if err := cache.RemovePod(pods[i]); err != nil {
1275				t.Error(err)
1276			}
1277		}
1278	}
1279	removePodWithAffinity := func(i int) operation {
1280		return func(t *testing.T) {
1281			if err := cache.RemovePod(podsWithAffinity[i]); err != nil {
1282				t.Error(err)
1283			}
1284		}
1285	}
1286	updatePod := func(i int) operation {
1287		return func(t *testing.T) {
1288			if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil {
1289				t.Error(err)
1290			}
1291		}
1292	}
1293	updateSnapshot := func() operation {
1294		return func(t *testing.T) {
1295			cache.UpdateSnapshot(snapshot)
1296			if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
1297				t.Error(err)
1298			}
1299		}
1300	}
1301
1302	tests := []struct {
1303		name                         string
1304		operations                   []operation
1305		expected                     []*v1.Node
1306		expectedHavePodsWithAffinity int
1307	}{
1308		{
1309			name:       "Empty cache",
1310			operations: []operation{},
1311			expected:   []*v1.Node{},
1312		},
1313		{
1314			name:       "Single node",
1315			operations: []operation{addNode(1)},
1316			expected:   []*v1.Node{nodes[1]},
1317		},
1318		{
1319			name: "Add node, remove it, add it again",
1320			operations: []operation{
1321				addNode(1), updateSnapshot(), removeNode(1), addNode(1),
1322			},
1323			expected: []*v1.Node{nodes[1]},
1324		},
1325		{
1326			name: "Add node and remove it in the same cycle, add it again",
1327			operations: []operation{
1328				addNode(1), updateSnapshot(), addNode(2), removeNode(1),
1329			},
1330			expected: []*v1.Node{nodes[2]},
1331		},
1332		{
1333			name: "Add a few nodes, and snapshot in the middle",
1334			operations: []operation{
1335				addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2),
1336				updateSnapshot(), addNode(3),
1337			},
1338			expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
1339		},
1340		{
1341			name: "Add a few nodes, and snapshot in the end",
1342			operations: []operation{
1343				addNode(0), addNode(2), addNode(5), addNode(6),
1344			},
1345			expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
1346		},
1347		{
1348			name: "Update some nodes",
1349			operations: []operation{
1350				addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1),
1351			},
1352			expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
1353		},
1354		{
1355			name: "Add a few nodes, and remove all of them",
1356			operations: []operation{
1357				addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
1358				removeNode(0), removeNode(2), removeNode(5), removeNode(6),
1359			},
1360			expected: []*v1.Node{},
1361		},
1362		{
1363			name: "Add a few nodes, and remove some of them",
1364			operations: []operation{
1365				addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
1366				removeNode(0), removeNode(6),
1367			},
1368			expected: []*v1.Node{nodes[5], nodes[2]},
1369		},
1370		{
1371			name: "Add a few nodes, remove all of them, and add more",
1372			operations: []operation{
1373				addNode(2), addNode(5), addNode(6), updateSnapshot(),
1374				removeNode(2), removeNode(5), removeNode(6), updateSnapshot(),
1375				addNode(7), addNode(9),
1376			},
1377			expected: []*v1.Node{nodes[9], nodes[7]},
1378		},
1379		{
1380			name: "Update nodes in particular order",
1381			operations: []operation{
1382				addNode(8), updateNode(2), updateNode(8), updateSnapshot(),
1383				addNode(1),
1384			},
1385			expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
1386		},
1387		{
1388			name: "Add some nodes and some pods",
1389			operations: []operation{
1390				addNode(0), addNode(2), addNode(8), updateSnapshot(),
1391				addPod(8), addPod(2),
1392			},
1393			expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
1394		},
1395		{
1396			name: "Updating a pod moves its node to the head",
1397			operations: []operation{
1398				addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0),
1399			},
1400			expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
1401		},
1402		{
1403			name: "Add pod before its node",
1404			operations: []operation{
1405				addNode(0), addPod(1), updatePod(1), addNode(1),
1406			},
1407			expected: []*v1.Node{nodes[1], nodes[0]},
1408		},
1409		{
1410			name: "Remove node before its pods",
1411			operations: []operation{
1412				addNode(0), addNode(1), addPod(1), addPod(11), updateSnapshot(),
1413				removeNode(1), updateSnapshot(),
1414				updatePod(1), updatePod(11), removePod(1), removePod(11),
1415			},
1416			expected: []*v1.Node{nodes[0]},
1417		},
1418		{
1419			name: "Add Pods with affinity",
1420			operations: []operation{
1421				addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1),
1422			},
1423			expected:                     []*v1.Node{nodes[1], nodes[0]},
1424			expectedHavePodsWithAffinity: 1,
1425		},
1426		{
1427			name: "Add multiple nodes with pods with affinity",
1428			operations: []operation{
1429				addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), addPodWithAffinity(1), updateSnapshot(),
1430			},
1431			expected:                     []*v1.Node{nodes[1], nodes[0]},
1432			expectedHavePodsWithAffinity: 2,
1433		},
1434		{
1435			name: "Add then Remove pods with affinity",
1436			operations: []operation{
1437				addNode(0), addNode(1), addPodWithAffinity(0), updateSnapshot(), removePodWithAffinity(0), updateSnapshot(),
1438			},
1439			expected:                     []*v1.Node{nodes[0], nodes[1]},
1440			expectedHavePodsWithAffinity: 0,
1441		},
1442	}
1443
1444	for _, test := range tests {
1445		t.Run(test.name, func(t *testing.T) {
1446			cache = newSchedulerCache(time.Second, time.Second, nil)
1447			snapshot = NewEmptySnapshot()
1448
1449			for _, op := range test.operations {
1450				op(t)
1451			}
1452
1453			if len(test.expected) != len(cache.nodes) {
1454				t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes))
1455			}
1456			var i int
1457			// Check that cache is in the expected state.
1458			for node := cache.headNode; node != nil; node = node.next {
1459				if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name {
1460					t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
1461				}
1462				i++
1463			}
1464			// Make sure we visited all the cached nodes in the above for loop.
1465			if i != len(cache.nodes) {
1466				t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i)
1467			}
1468
1469			// Check number of nodes with pods with affinity
1470			if len(snapshot.havePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity {
1471				t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList))
1472			}
1473
1474			// Always update the snapshot at the end of operations and compare it.
1475			if err := cache.UpdateSnapshot(snapshot); err != nil {
1476				t.Error(err)
1477			}
1478			if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
1479				t.Error(err)
1480			}
1481		})
1482	}
1483}
1484
1485func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error {
1486	// Compare the map.
1487	if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes {
1488		return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap))
1489	}
1490	for name, ni := range cache.nodes {
1491		want := ni.info
1492		if want.Node() == nil {
1493			want = nil
1494		}
1495		if !reflect.DeepEqual(snapshot.nodeInfoMap[name], want) {
1496			return fmt.Errorf("unexpected node info for node %q.Expected:\n%v, got:\n%v", name, ni.info, snapshot.nodeInfoMap[name])
1497		}
1498	}
1499
1500	// Compare the lists.
1501	if len(snapshot.nodeInfoList) != cache.nodeTree.numNodes {
1502		return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoList))
1503	}
1504
1505	expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
1506	expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
1507	nodesList, err := cache.nodeTree.list()
1508	if err != nil {
1509		t.Fatal(err)
1510	}
1511	for _, nodeName := range nodesList {
1512		if n := snapshot.nodeInfoMap[nodeName]; n != nil {
1513			expectedNodeInfoList = append(expectedNodeInfoList, n)
1514			if len(n.PodsWithAffinity) > 0 {
1515				expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n)
1516			}
1517		} else {
1518			return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName)
1519		}
1520	}
1521
1522	for i, expected := range expectedNodeInfoList {
1523		got := snapshot.nodeInfoList[i]
1524		if expected != got {
1525			return fmt.Errorf("unexpected NodeInfo pointer in NodeInfoList. Expected: %p, got: %p", expected, got)
1526		}
1527	}
1528
1529	for i, expected := range expectedHavePodsWithAffinityNodeInfoList {
1530		got := snapshot.havePodsWithAffinityNodeInfoList[i]
1531		if expected != got {
1532			return fmt.Errorf("unexpected NodeInfo pointer in HavePodsWithAffinityNodeInfoList. Expected: %p, got: %p", expected, got)
1533		}
1534	}
1535
1536	return nil
1537}
1538
1539func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
1540	// Create a few nodes to be used in tests.
1541	nodes := []*v1.Node{}
1542	i := 0
1543	// List of number of nodes per zone, zone 0 -> 2, zone 1 -> 6
1544	for zone, nb := range []int{2, 6} {
1545		for j := 0; j < nb; j++ {
1546			nodes = append(nodes, &v1.Node{
1547				ObjectMeta: metav1.ObjectMeta{
1548					Name: fmt.Sprintf("node-%d", i),
1549					Labels: map[string]string{
1550						v1.LabelTopologyRegion: fmt.Sprintf("region-%d", zone),
1551						v1.LabelTopologyZone:   fmt.Sprintf("zone-%d", zone),
1552					},
1553				},
1554			})
1555			i++
1556		}
1557	}
1558
1559	var cache *schedulerCache
1560	var snapshot *Snapshot
1561
1562	addNode := func(t *testing.T, i int) {
1563		cache.AddNode(nodes[i])
1564		_, ok := snapshot.nodeInfoMap[nodes[i].Name]
1565		if !ok {
1566			snapshot.nodeInfoMap[nodes[i].Name] = cache.nodes[nodes[i].Name].info
1567		}
1568	}
1569
1570	updateSnapshot := func(t *testing.T) {
1571		cache.updateNodeInfoSnapshotList(snapshot, true)
1572		if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
1573			t.Error(err)
1574		}
1575	}
1576
1577	tests := []struct {
1578		name       string
1579		operations func(t *testing.T)
1580		expected   []string
1581	}{
1582		{
1583			name:       "Empty cache",
1584			operations: func(t *testing.T) {},
1585			expected:   []string{},
1586		},
1587		{
1588			name: "Single node",
1589			operations: func(t *testing.T) {
1590				addNode(t, 0)
1591			},
1592			expected: []string{"node-0"},
1593		},
1594		{
1595			name: "Two nodes",
1596			operations: func(t *testing.T) {
1597				addNode(t, 0)
1598				updateSnapshot(t)
1599				addNode(t, 1)
1600			},
1601			expected: []string{"node-0", "node-1"},
1602		},
1603		{
1604			name: "bug 91601, two nodes, update the snapshot and add two nodes in different zones",
1605			operations: func(t *testing.T) {
1606				addNode(t, 2)
1607				addNode(t, 3)
1608				updateSnapshot(t)
1609				addNode(t, 4)
1610				addNode(t, 0)
1611			},
1612			expected: []string{"node-2", "node-0", "node-3", "node-4"},
1613		},
1614		{
1615			name: "bug 91601, 6 nodes, one in a different zone",
1616			operations: func(t *testing.T) {
1617				addNode(t, 2)
1618				addNode(t, 3)
1619				addNode(t, 4)
1620				addNode(t, 5)
1621				updateSnapshot(t)
1622				addNode(t, 6)
1623				addNode(t, 0)
1624			},
1625			expected: []string{"node-2", "node-0", "node-3", "node-4", "node-5", "node-6"},
1626		},
1627		{
1628			name: "bug 91601, 7 nodes, two in a different zone",
1629			operations: func(t *testing.T) {
1630				addNode(t, 2)
1631				updateSnapshot(t)
1632				addNode(t, 3)
1633				addNode(t, 4)
1634				updateSnapshot(t)
1635				addNode(t, 5)
1636				addNode(t, 6)
1637				addNode(t, 0)
1638				addNode(t, 1)
1639			},
1640			expected: []string{"node-2", "node-0", "node-3", "node-1", "node-4", "node-5", "node-6"},
1641		},
1642		{
1643			name: "bug 91601, 7 nodes, two in a different zone, different zone order",
1644			operations: func(t *testing.T) {
1645				addNode(t, 2)
1646				addNode(t, 1)
1647				updateSnapshot(t)
1648				addNode(t, 3)
1649				addNode(t, 4)
1650				updateSnapshot(t)
1651				addNode(t, 5)
1652				addNode(t, 6)
1653				addNode(t, 0)
1654			},
1655			expected: []string{"node-2", "node-1", "node-3", "node-0", "node-4", "node-5", "node-6"},
1656		},
1657	}
1658
1659	for _, test := range tests {
1660		t.Run(test.name, func(t *testing.T) {
1661			cache = newSchedulerCache(time.Second, time.Second, nil)
1662			snapshot = NewEmptySnapshot()
1663
1664			test.operations(t)
1665
1666			// Always update the snapshot at the end of operations and compare it.
1667			cache.updateNodeInfoSnapshotList(snapshot, true)
1668			if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
1669				t.Error(err)
1670			}
1671			nodeNames := make([]string, len(snapshot.nodeInfoList))
1672			for i, nodeInfo := range snapshot.nodeInfoList {
1673				nodeNames[i] = nodeInfo.Node().Name
1674			}
1675			if !reflect.DeepEqual(nodeNames, test.expected) {
1676				t.Errorf("The nodeInfoList is incorrect. Expected %v , got %v", test.expected, nodeNames)
1677			}
1678		})
1679	}
1680}
1681
1682func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
1683	cache := setupCacheOf1kNodes30kPods(b)
1684	b.ResetTimer()
1685	for n := 0; n < b.N; n++ {
1686		cachedNodes := NewEmptySnapshot()
1687		cache.UpdateSnapshot(cachedNodes)
1688	}
1689}
1690
1691func BenchmarkExpirePods(b *testing.B) {
1692	podNums := []int{
1693		100,
1694		1000,
1695		10000,
1696	}
1697	for _, podNum := range podNums {
1698		name := fmt.Sprintf("%dPods", podNum)
1699		b.Run(name, func(b *testing.B) {
1700			benchmarkExpire(b, podNum)
1701		})
1702	}
1703}
1704
1705func benchmarkExpire(b *testing.B, podNum int) {
1706	now := time.Now()
1707	for n := 0; n < b.N; n++ {
1708		b.StopTimer()
1709		cache := setupCacheWithAssumedPods(b, podNum, now)
1710		b.StartTimer()
1711		cache.cleanupAssumedPods(now.Add(2 * time.Second))
1712	}
1713}
1714
1715type testingMode interface {
1716	Fatalf(format string, args ...interface{})
1717}
1718
1719func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod {
1720	req := v1.ResourceList{}
1721	if cpu != "" {
1722		req = v1.ResourceList{
1723			v1.ResourceCPU:    resource.MustParse(cpu),
1724			v1.ResourceMemory: resource.MustParse(mem),
1725		}
1726		if extended != "" {
1727			parts := strings.Split(extended, ":")
1728			if len(parts) != 2 {
1729				t.Fatalf("Invalid extended resource string: \"%s\"", extended)
1730			}
1731			req[v1.ResourceName(parts[0])] = resource.MustParse(parts[1])
1732		}
1733	}
1734	return &v1.Pod{
1735		ObjectMeta: metav1.ObjectMeta{
1736			UID:       types.UID(objName),
1737			Namespace: "node_info_cache_test",
1738			Name:      objName,
1739		},
1740		Spec: v1.PodSpec{
1741			Containers: []v1.Container{{
1742				Resources: v1.ResourceRequirements{
1743					Requests: req,
1744				},
1745				Ports: ports,
1746			}},
1747			NodeName: nodeName,
1748		},
1749	}
1750}
1751
1752func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
1753	cache := newSchedulerCache(time.Second, time.Second, nil)
1754	for i := 0; i < 1000; i++ {
1755		nodeName := fmt.Sprintf("node-%d", i)
1756		for j := 0; j < 30; j++ {
1757			objName := fmt.Sprintf("%s-pod-%d", nodeName, j)
1758			pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil)
1759
1760			if err := cache.AddPod(pod); err != nil {
1761				b.Fatalf("AddPod failed: %v", err)
1762			}
1763		}
1764	}
1765	return cache
1766}
1767
1768func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache {
1769	cache := newSchedulerCache(time.Second, time.Second, nil)
1770	for i := 0; i < podNum; i++ {
1771		nodeName := fmt.Sprintf("node-%d", i/10)
1772		objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
1773		pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil)
1774
1775		err := assumeAndFinishBinding(cache, pod, assumedTime)
1776		if err != nil {
1777			b.Fatalf("assumePod failed: %v", err)
1778		}
1779	}
1780	return cache
1781}
1782
1783func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
1784	if assumed, err := c.IsAssumedPod(p); err != nil {
1785		return err
1786	} else if assumed {
1787		return errors.New("still assumed")
1788	}
1789	if _, err := c.GetPod(p); err == nil {
1790		return errors.New("still in cache")
1791	}
1792	return nil
1793}
1794
1795// getNodeInfo returns cached data for the node name.
1796func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) {
1797	cache.mu.RLock()
1798	defer cache.mu.RUnlock()
1799
1800	n, ok := cache.nodes[nodeName]
1801	if !ok {
1802		return nil, fmt.Errorf("node %q not found in cache", nodeName)
1803	}
1804
1805	return n.info.Node(), nil
1806}
1807