1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package podtopologyspread
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"sync/atomic"
24
25	v1 "k8s.io/api/core/v1"
26	"k8s.io/apimachinery/pkg/util/sets"
27	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
28	"k8s.io/kubernetes/pkg/scheduler/framework"
29)
30
31const preScoreStateKey = "PreScore" + Name
32const invalidScore = -1
33
34// preScoreState computed at PreScore and used at Score.
35// Fields are exported for comparison during testing.
36type preScoreState struct {
37	Constraints []topologySpreadConstraint
38	// IgnoredNodes is a set of node names which miss some Constraints[*].topologyKey.
39	IgnoredNodes sets.String
40	// TopologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
41	TopologyPairToPodCounts map[topologyPair]*int64
42	// TopologyNormalizingWeight is the weight we give to the counts per topology.
43	// This allows the pod counts of smaller topologies to not be watered down by
44	// bigger ones.
45	TopologyNormalizingWeight []float64
46}
47
48// Clone implements the mandatory Clone interface. We don't really copy the data since
49// there is no need for that.
50func (s *preScoreState) Clone() framework.StateData {
51	return s
52}
53
54// initPreScoreState iterates "filteredNodes" to filter out the nodes which
55// don't have required topologyKey(s), and initialize:
56// 1) s.TopologyPairToPodCounts: keyed with both eligible topology pair and node names.
57// 2) s.IgnoredNodes: the set of nodes that shouldn't be scored.
58// 3) s.TopologyNormalizingWeight: The weight to be given to each constraint based on the number of values in a topology.
59func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*v1.Node) error {
60	var err error
61	if len(pod.Spec.TopologySpreadConstraints) > 0 {
62		s.Constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway)
63		if err != nil {
64			return fmt.Errorf("obtaining pod's soft topology spread constraints: %w", err)
65		}
66	} else {
67		s.Constraints, err = pl.buildDefaultConstraints(pod, v1.ScheduleAnyway)
68		if err != nil {
69			return fmt.Errorf("setting default soft topology spread constraints: %w", err)
70		}
71	}
72	if len(s.Constraints) == 0 {
73		return nil
74	}
75	topoSize := make([]int, len(s.Constraints))
76	for _, node := range filteredNodes {
77		if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) {
78			// Nodes which don't have all required topologyKeys present are ignored
79			// when scoring later.
80			s.IgnoredNodes.Insert(node.Name)
81			continue
82		}
83		for i, constraint := range s.Constraints {
84			// per-node counts are calculated during Score.
85			if constraint.TopologyKey == v1.LabelHostname {
86				continue
87			}
88			pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
89			if s.TopologyPairToPodCounts[pair] == nil {
90				s.TopologyPairToPodCounts[pair] = new(int64)
91				topoSize[i]++
92			}
93		}
94	}
95
96	s.TopologyNormalizingWeight = make([]float64, len(s.Constraints))
97	for i, c := range s.Constraints {
98		sz := topoSize[i]
99		if c.TopologyKey == v1.LabelHostname {
100			sz = len(filteredNodes) - len(s.IgnoredNodes)
101		}
102		s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz)
103	}
104	return nil
105}
106
107// PreScore builds and writes cycle state used by Score and NormalizeScore.
108func (pl *PodTopologySpread) PreScore(
109	ctx context.Context,
110	cycleState *framework.CycleState,
111	pod *v1.Pod,
112	filteredNodes []*v1.Node,
113) *framework.Status {
114	allNodes, err := pl.sharedLister.NodeInfos().List()
115	if err != nil {
116		return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err))
117	}
118
119	if len(filteredNodes) == 0 || len(allNodes) == 0 {
120		// No nodes to score.
121		return nil
122	}
123
124	state := &preScoreState{
125		IgnoredNodes:            sets.NewString(),
126		TopologyPairToPodCounts: make(map[topologyPair]*int64),
127	}
128	err = pl.initPreScoreState(state, pod, filteredNodes)
129	if err != nil {
130		return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err))
131	}
132
133	// return if incoming pod doesn't have soft topology spread Constraints.
134	if len(state.Constraints) == 0 {
135		cycleState.Write(preScoreStateKey, state)
136		return nil
137	}
138
139	// Ignore parsing errors for backwards compatibility.
140	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
141	processAllNode := func(i int) {
142		nodeInfo := allNodes[i]
143		node := nodeInfo.Node()
144		if node == nil {
145			return
146		}
147		// (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity
148		// (2) All topologyKeys need to be present in `node`
149		match, _ := requiredNodeAffinity.Match(node)
150		if !match || !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
151			return
152		}
153
154		for _, c := range state.Constraints {
155			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
156			// If current topology pair is not associated with any candidate node,
157			// continue to avoid unnecessary calculation.
158			// Per-node counts are also skipped, as they are done during Score.
159			tpCount := state.TopologyPairToPodCounts[pair]
160			if tpCount == nil {
161				continue
162			}
163			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
164			atomic.AddInt64(tpCount, int64(count))
165		}
166	}
167	pl.parallelizer.Until(ctx, len(allNodes), processAllNode)
168
169	cycleState.Write(preScoreStateKey, state)
170	return nil
171}
172
173// Score invoked at the Score extension point.
174// The "score" returned in this function is the matching number of pods on the `nodeName`,
175// it is normalized later.
176func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
177	nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
178	if err != nil {
179		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
180	}
181
182	node := nodeInfo.Node()
183	s, err := getPreScoreState(cycleState)
184	if err != nil {
185		return 0, framework.AsStatus(err)
186	}
187
188	// Return if the node is not qualified.
189	if s.IgnoredNodes.Has(node.Name) {
190		return 0, nil
191	}
192
193	// For each present <pair>, current node gets a credit of <matchSum>.
194	// And we sum up <matchSum> and return it as this node's score.
195	var score float64
196	for i, c := range s.Constraints {
197		if tpVal, ok := node.Labels[c.TopologyKey]; ok {
198			var cnt int64
199			if c.TopologyKey == v1.LabelHostname {
200				cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
201			} else {
202				pair := topologyPair{key: c.TopologyKey, value: tpVal}
203				cnt = *s.TopologyPairToPodCounts[pair]
204			}
205			score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
206		}
207	}
208	return int64(score), nil
209}
210
211// NormalizeScore invoked after scoring all nodes.
212func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
213	s, err := getPreScoreState(cycleState)
214	if err != nil {
215		return framework.AsStatus(err)
216	}
217	if s == nil {
218		return nil
219	}
220
221	// Calculate <minScore> and <maxScore>
222	var minScore int64 = math.MaxInt64
223	var maxScore int64
224	for i, score := range scores {
225		// it's mandatory to check if <score.Name> is present in m.IgnoredNodes
226		if s.IgnoredNodes.Has(score.Name) {
227			scores[i].Score = invalidScore
228			continue
229		}
230		if score.Score < minScore {
231			minScore = score.Score
232		}
233		if score.Score > maxScore {
234			maxScore = score.Score
235		}
236	}
237
238	for i := range scores {
239		if scores[i].Score == invalidScore {
240			scores[i].Score = 0
241			continue
242		}
243		if maxScore == 0 {
244			scores[i].Score = framework.MaxNodeScore
245			continue
246		}
247		s := scores[i].Score
248		scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
249	}
250	return nil
251}
252
253// ScoreExtensions of the Score plugin.
254func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions {
255	return pl
256}
257
258func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
259	c, err := cycleState.Read(preScoreStateKey)
260	if err != nil {
261		return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err)
262	}
263
264	s, ok := c.(*preScoreState)
265	if !ok {
266		return nil, fmt.Errorf("%+v  convert to podtopologyspread.preScoreState error", c)
267	}
268	return s, nil
269}
270
271// topologyNormalizingWeight calculates the weight for the topology, based on
272// the number of values that exist for a topology.
273// Since <size> is at least 1 (all nodes that passed the Filters are in the
274// same topology), and k8s supports 5k nodes, the result is in the interval
275// <1.09, 8.52>.
276//
277// Note: <size> could also be zero when no nodes have the required topologies,
278// however we don't care about topology weight in this case as we return a 0
279// score for all nodes.
280func topologyNormalizingWeight(size int) float64 {
281	return math.Log(float64(size + 2))
282}
283
284// scoreForCount calculates the score based on number of matching pods in a
285// topology domain, the constraint's maxSkew and the topology weight.
286// `maxSkew-1` is added to the score so that differences between topology
287// domains get watered down, controlling the tolerance of the score to skews.
288func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {
289	return float64(cnt)*tpWeight + float64(maxSkew-1)
290}
291