1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package podtopologyspread 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "sync/atomic" 24 25 v1 "k8s.io/api/core/v1" 26 "k8s.io/apimachinery/pkg/util/sets" 27 "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" 28 "k8s.io/kubernetes/pkg/scheduler/framework" 29) 30 31const preScoreStateKey = "PreScore" + Name 32const invalidScore = -1 33 34// preScoreState computed at PreScore and used at Score. 35// Fields are exported for comparison during testing. 36type preScoreState struct { 37 Constraints []topologySpreadConstraint 38 // IgnoredNodes is a set of node names which miss some Constraints[*].topologyKey. 39 IgnoredNodes sets.String 40 // TopologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods. 41 TopologyPairToPodCounts map[topologyPair]*int64 42 // TopologyNormalizingWeight is the weight we give to the counts per topology. 43 // This allows the pod counts of smaller topologies to not be watered down by 44 // bigger ones. 45 TopologyNormalizingWeight []float64 46} 47 48// Clone implements the mandatory Clone interface. We don't really copy the data since 49// there is no need for that. 50func (s *preScoreState) Clone() framework.StateData { 51 return s 52} 53 54// initPreScoreState iterates "filteredNodes" to filter out the nodes which 55// don't have required topologyKey(s), and initialize: 56// 1) s.TopologyPairToPodCounts: keyed with both eligible topology pair and node names. 57// 2) s.IgnoredNodes: the set of nodes that shouldn't be scored. 58// 3) s.TopologyNormalizingWeight: The weight to be given to each constraint based on the number of values in a topology. 59func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*v1.Node) error { 60 var err error 61 if len(pod.Spec.TopologySpreadConstraints) > 0 { 62 s.Constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway) 63 if err != nil { 64 return fmt.Errorf("obtaining pod's soft topology spread constraints: %w", err) 65 } 66 } else { 67 s.Constraints, err = pl.buildDefaultConstraints(pod, v1.ScheduleAnyway) 68 if err != nil { 69 return fmt.Errorf("setting default soft topology spread constraints: %w", err) 70 } 71 } 72 if len(s.Constraints) == 0 { 73 return nil 74 } 75 topoSize := make([]int, len(s.Constraints)) 76 for _, node := range filteredNodes { 77 if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) { 78 // Nodes which don't have all required topologyKeys present are ignored 79 // when scoring later. 80 s.IgnoredNodes.Insert(node.Name) 81 continue 82 } 83 for i, constraint := range s.Constraints { 84 // per-node counts are calculated during Score. 85 if constraint.TopologyKey == v1.LabelHostname { 86 continue 87 } 88 pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} 89 if s.TopologyPairToPodCounts[pair] == nil { 90 s.TopologyPairToPodCounts[pair] = new(int64) 91 topoSize[i]++ 92 } 93 } 94 } 95 96 s.TopologyNormalizingWeight = make([]float64, len(s.Constraints)) 97 for i, c := range s.Constraints { 98 sz := topoSize[i] 99 if c.TopologyKey == v1.LabelHostname { 100 sz = len(filteredNodes) - len(s.IgnoredNodes) 101 } 102 s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz) 103 } 104 return nil 105} 106 107// PreScore builds and writes cycle state used by Score and NormalizeScore. 108func (pl *PodTopologySpread) PreScore( 109 ctx context.Context, 110 cycleState *framework.CycleState, 111 pod *v1.Pod, 112 filteredNodes []*v1.Node, 113) *framework.Status { 114 allNodes, err := pl.sharedLister.NodeInfos().List() 115 if err != nil { 116 return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err)) 117 } 118 119 if len(filteredNodes) == 0 || len(allNodes) == 0 { 120 // No nodes to score. 121 return nil 122 } 123 124 state := &preScoreState{ 125 IgnoredNodes: sets.NewString(), 126 TopologyPairToPodCounts: make(map[topologyPair]*int64), 127 } 128 err = pl.initPreScoreState(state, pod, filteredNodes) 129 if err != nil { 130 return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err)) 131 } 132 133 // return if incoming pod doesn't have soft topology spread Constraints. 134 if len(state.Constraints) == 0 { 135 cycleState.Write(preScoreStateKey, state) 136 return nil 137 } 138 139 // Ignore parsing errors for backwards compatibility. 140 requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod) 141 processAllNode := func(i int) { 142 nodeInfo := allNodes[i] 143 node := nodeInfo.Node() 144 if node == nil { 145 return 146 } 147 // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity 148 // (2) All topologyKeys need to be present in `node` 149 match, _ := requiredNodeAffinity.Match(node) 150 if !match || !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) { 151 return 152 } 153 154 for _, c := range state.Constraints { 155 pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]} 156 // If current topology pair is not associated with any candidate node, 157 // continue to avoid unnecessary calculation. 158 // Per-node counts are also skipped, as they are done during Score. 159 tpCount := state.TopologyPairToPodCounts[pair] 160 if tpCount == nil { 161 continue 162 } 163 count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace) 164 atomic.AddInt64(tpCount, int64(count)) 165 } 166 } 167 pl.parallelizer.Until(ctx, len(allNodes), processAllNode) 168 169 cycleState.Write(preScoreStateKey, state) 170 return nil 171} 172 173// Score invoked at the Score extension point. 174// The "score" returned in this function is the matching number of pods on the `nodeName`, 175// it is normalized later. 176func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { 177 nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) 178 if err != nil { 179 return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err)) 180 } 181 182 node := nodeInfo.Node() 183 s, err := getPreScoreState(cycleState) 184 if err != nil { 185 return 0, framework.AsStatus(err) 186 } 187 188 // Return if the node is not qualified. 189 if s.IgnoredNodes.Has(node.Name) { 190 return 0, nil 191 } 192 193 // For each present <pair>, current node gets a credit of <matchSum>. 194 // And we sum up <matchSum> and return it as this node's score. 195 var score float64 196 for i, c := range s.Constraints { 197 if tpVal, ok := node.Labels[c.TopologyKey]; ok { 198 var cnt int64 199 if c.TopologyKey == v1.LabelHostname { 200 cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)) 201 } else { 202 pair := topologyPair{key: c.TopologyKey, value: tpVal} 203 cnt = *s.TopologyPairToPodCounts[pair] 204 } 205 score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i]) 206 } 207 } 208 return int64(score), nil 209} 210 211// NormalizeScore invoked after scoring all nodes. 212func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { 213 s, err := getPreScoreState(cycleState) 214 if err != nil { 215 return framework.AsStatus(err) 216 } 217 if s == nil { 218 return nil 219 } 220 221 // Calculate <minScore> and <maxScore> 222 var minScore int64 = math.MaxInt64 223 var maxScore int64 224 for i, score := range scores { 225 // it's mandatory to check if <score.Name> is present in m.IgnoredNodes 226 if s.IgnoredNodes.Has(score.Name) { 227 scores[i].Score = invalidScore 228 continue 229 } 230 if score.Score < minScore { 231 minScore = score.Score 232 } 233 if score.Score > maxScore { 234 maxScore = score.Score 235 } 236 } 237 238 for i := range scores { 239 if scores[i].Score == invalidScore { 240 scores[i].Score = 0 241 continue 242 } 243 if maxScore == 0 { 244 scores[i].Score = framework.MaxNodeScore 245 continue 246 } 247 s := scores[i].Score 248 scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore 249 } 250 return nil 251} 252 253// ScoreExtensions of the Score plugin. 254func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions { 255 return pl 256} 257 258func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) { 259 c, err := cycleState.Read(preScoreStateKey) 260 if err != nil { 261 return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err) 262 } 263 264 s, ok := c.(*preScoreState) 265 if !ok { 266 return nil, fmt.Errorf("%+v convert to podtopologyspread.preScoreState error", c) 267 } 268 return s, nil 269} 270 271// topologyNormalizingWeight calculates the weight for the topology, based on 272// the number of values that exist for a topology. 273// Since <size> is at least 1 (all nodes that passed the Filters are in the 274// same topology), and k8s supports 5k nodes, the result is in the interval 275// <1.09, 8.52>. 276// 277// Note: <size> could also be zero when no nodes have the required topologies, 278// however we don't care about topology weight in this case as we return a 0 279// score for all nodes. 280func topologyNormalizingWeight(size int) float64 { 281 return math.Log(float64(size + 2)) 282} 283 284// scoreForCount calculates the score based on number of matching pods in a 285// topology domain, the constraint's maxSkew and the topology weight. 286// `maxSkew-1` is added to the score so that differences between topology 287// domains get watered down, controlling the tolerance of the score to skews. 288func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 { 289 return float64(cnt)*tpWeight + float64(maxSkew-1) 290} 291