1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package statefulset 18 19import ( 20 "bytes" 21 "encoding/json" 22 "fmt" 23 "regexp" 24 "strconv" 25 26 apps "k8s.io/api/apps/v1" 27 "k8s.io/api/core/v1" 28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 29 "k8s.io/apimachinery/pkg/runtime" 30 "k8s.io/apimachinery/pkg/util/strategicpatch" 31 "k8s.io/client-go/kubernetes/scheme" 32 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 33 "k8s.io/kubernetes/pkg/controller" 34 "k8s.io/kubernetes/pkg/controller/history" 35) 36 37var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion) 38 39// overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker. 40// Generally used to tie break between StatefulSets that have overlapping selectors. 41type overlappingStatefulSets []*apps.StatefulSet 42 43func (o overlappingStatefulSets) Len() int { return len(o) } 44 45func (o overlappingStatefulSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] } 46 47func (o overlappingStatefulSets) Less(i, j int) bool { 48 if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { 49 return o[i].Name < o[j].Name 50 } 51 return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) 52} 53 54// statefulPodRegex is a regular expression that extracts the parent StatefulSet and ordinal from the Name of a Pod 55var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$") 56 57// getParentNameAndOrdinal gets the name of pod's parent StatefulSet and pod's ordinal as extracted from its Name. If 58// the Pod was not created by a StatefulSet, its parent is considered to be empty string, and its ordinal is considered 59// to be -1. 60func getParentNameAndOrdinal(pod *v1.Pod) (string, int) { 61 parent := "" 62 ordinal := -1 63 subMatches := statefulPodRegex.FindStringSubmatch(pod.Name) 64 if len(subMatches) < 3 { 65 return parent, ordinal 66 } 67 parent = subMatches[1] 68 if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil { 69 ordinal = int(i) 70 } 71 return parent, ordinal 72} 73 74// getParentName gets the name of pod's parent StatefulSet. If pod has not parent, the empty string is returned. 75func getParentName(pod *v1.Pod) string { 76 parent, _ := getParentNameAndOrdinal(pod) 77 return parent 78} 79 80// getOrdinal gets pod's ordinal. If pod has no ordinal, -1 is returned. 81func getOrdinal(pod *v1.Pod) int { 82 _, ordinal := getParentNameAndOrdinal(pod) 83 return ordinal 84} 85 86// getPodName gets the name of set's child Pod with an ordinal index of ordinal 87func getPodName(set *apps.StatefulSet, ordinal int) string { 88 return fmt.Sprintf("%s-%d", set.Name, ordinal) 89} 90 91// getPersistentVolumeClaimName gets the name of PersistentVolumeClaim for a Pod with an ordinal index of ordinal. claim 92// must be a PersistentVolumeClaim from set's VolumeClaims template. 93func getPersistentVolumeClaimName(set *apps.StatefulSet, claim *v1.PersistentVolumeClaim, ordinal int) string { 94 // NOTE: This name format is used by the heuristics for zone spreading in ChooseZoneForVolume 95 return fmt.Sprintf("%s-%s-%d", claim.Name, set.Name, ordinal) 96} 97 98// isMemberOf tests if pod is a member of set. 99func isMemberOf(set *apps.StatefulSet, pod *v1.Pod) bool { 100 return getParentName(pod) == set.Name 101} 102 103// identityMatches returns true if pod has a valid identity and network identity for a member of set. 104func identityMatches(set *apps.StatefulSet, pod *v1.Pod) bool { 105 parent, ordinal := getParentNameAndOrdinal(pod) 106 return ordinal >= 0 && 107 set.Name == parent && 108 pod.Name == getPodName(set, ordinal) && 109 pod.Namespace == set.Namespace && 110 pod.Labels[apps.StatefulSetPodNameLabel] == pod.Name 111} 112 113// storageMatches returns true if pod's Volumes cover the set of PersistentVolumeClaims 114func storageMatches(set *apps.StatefulSet, pod *v1.Pod) bool { 115 ordinal := getOrdinal(pod) 116 if ordinal < 0 { 117 return false 118 } 119 volumes := make(map[string]v1.Volume, len(pod.Spec.Volumes)) 120 for _, volume := range pod.Spec.Volumes { 121 volumes[volume.Name] = volume 122 } 123 for _, claim := range set.Spec.VolumeClaimTemplates { 124 volume, found := volumes[claim.Name] 125 if !found || 126 volume.VolumeSource.PersistentVolumeClaim == nil || 127 volume.VolumeSource.PersistentVolumeClaim.ClaimName != 128 getPersistentVolumeClaimName(set, &claim, ordinal) { 129 return false 130 } 131 } 132 return true 133} 134 135// getPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The 136// returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined 137// by getPersistentVolumeClaimName. 138func getPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) map[string]v1.PersistentVolumeClaim { 139 ordinal := getOrdinal(pod) 140 templates := set.Spec.VolumeClaimTemplates 141 claims := make(map[string]v1.PersistentVolumeClaim, len(templates)) 142 for i := range templates { 143 claim := templates[i] 144 claim.Name = getPersistentVolumeClaimName(set, &claim, ordinal) 145 claim.Namespace = set.Namespace 146 if claim.Labels != nil { 147 for key, value := range set.Spec.Selector.MatchLabels { 148 claim.Labels[key] = value 149 } 150 } else { 151 claim.Labels = set.Spec.Selector.MatchLabels 152 } 153 claims[templates[i].Name] = claim 154 } 155 return claims 156} 157 158// updateStorage updates pod's Volumes to conform with the PersistentVolumeClaim of set's templates. If pod has 159// conflicting local Volumes these are replaced with Volumes that conform to the set's templates. 160func updateStorage(set *apps.StatefulSet, pod *v1.Pod) { 161 currentVolumes := pod.Spec.Volumes 162 claims := getPersistentVolumeClaims(set, pod) 163 newVolumes := make([]v1.Volume, 0, len(claims)) 164 for name, claim := range claims { 165 newVolumes = append(newVolumes, v1.Volume{ 166 Name: name, 167 VolumeSource: v1.VolumeSource{ 168 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ 169 ClaimName: claim.Name, 170 // TODO: Use source definition to set this value when we have one. 171 ReadOnly: false, 172 }, 173 }, 174 }) 175 } 176 for i := range currentVolumes { 177 if _, ok := claims[currentVolumes[i].Name]; !ok { 178 newVolumes = append(newVolumes, currentVolumes[i]) 179 } 180 } 181 pod.Spec.Volumes = newVolumes 182} 183 184func initIdentity(set *apps.StatefulSet, pod *v1.Pod) { 185 updateIdentity(set, pod) 186 // Set these immutable fields only on initial Pod creation, not updates. 187 pod.Spec.Hostname = pod.Name 188 pod.Spec.Subdomain = set.Spec.ServiceName 189} 190 191// updateIdentity updates pod's name, hostname, and subdomain, and StatefulSetPodNameLabel to conform to set's name 192// and headless service. 193func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) { 194 pod.Name = getPodName(set, getOrdinal(pod)) 195 pod.Namespace = set.Namespace 196 if pod.Labels == nil { 197 pod.Labels = make(map[string]string) 198 } 199 pod.Labels[apps.StatefulSetPodNameLabel] = pod.Name 200} 201 202// isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady. 203func isRunningAndReady(pod *v1.Pod) bool { 204 return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) 205} 206 207func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool { 208 return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now()) 209} 210 211// isCreated returns true if pod has been created and is maintained by the API server 212func isCreated(pod *v1.Pod) bool { 213 return pod.Status.Phase != "" 214} 215 216// isFailed returns true if pod has a Phase of PodFailed 217func isFailed(pod *v1.Pod) bool { 218 return pod.Status.Phase == v1.PodFailed 219} 220 221// isTerminating returns true if pod's DeletionTimestamp has been set 222func isTerminating(pod *v1.Pod) bool { 223 return pod.DeletionTimestamp != nil 224} 225 226// isHealthy returns true if pod is running and ready and has not been terminated 227func isHealthy(pod *v1.Pod) bool { 228 return isRunningAndReady(pod) && !isTerminating(pod) 229} 230 231// allowsBurst is true if the alpha burst annotation is set. 232func allowsBurst(set *apps.StatefulSet) bool { 233 return set.Spec.PodManagementPolicy == apps.ParallelPodManagement 234} 235 236// setPodRevision sets the revision of Pod to revision by adding the StatefulSetRevisionLabel 237func setPodRevision(pod *v1.Pod, revision string) { 238 if pod.Labels == nil { 239 pod.Labels = make(map[string]string) 240 } 241 pod.Labels[apps.StatefulSetRevisionLabel] = revision 242} 243 244// getPodRevision gets the revision of Pod by inspecting the StatefulSetRevisionLabel. If pod has no revision the empty 245// string is returned. 246func getPodRevision(pod *v1.Pod) string { 247 if pod.Labels == nil { 248 return "" 249 } 250 return pod.Labels[apps.StatefulSetRevisionLabel] 251} 252 253// newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal. 254func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod { 255 pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, metav1.NewControllerRef(set, controllerKind)) 256 pod.Name = getPodName(set, ordinal) 257 initIdentity(set, pod) 258 updateStorage(set, pod) 259 return pod 260} 261 262// newVersionedStatefulSetPod creates a new Pod for a StatefulSet. currentSet is the representation of the set at the 263// current revision. updateSet is the representation of the set at the updateRevision. currentRevision is the name of 264// the current revision. updateRevision is the name of the update revision. ordinal is the ordinal of the Pod. If the 265// returned error is nil, the returned Pod is valid. 266func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod { 267 if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType && 268 (currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < int(currentSet.Status.CurrentReplicas)) || 269 (currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition)) { 270 pod := newStatefulSetPod(currentSet, ordinal) 271 setPodRevision(pod, currentRevision) 272 return pod 273 } 274 pod := newStatefulSetPod(updateSet, ordinal) 275 setPodRevision(pod, updateRevision) 276 return pod 277} 278 279// Match check if the given StatefulSet's template matches the template stored in the given history. 280func Match(ss *apps.StatefulSet, history *apps.ControllerRevision) (bool, error) { 281 patch, err := getPatch(ss) 282 if err != nil { 283 return false, err 284 } 285 return bytes.Equal(patch, history.Data.Raw), nil 286} 287 288// getPatch returns a strategic merge patch that can be applied to restore a StatefulSet to a 289// previous version. If the returned error is nil the patch is valid. The current state that we save is just the 290// PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously 291// recorded patches. 292func getPatch(set *apps.StatefulSet) ([]byte, error) { 293 data, err := runtime.Encode(patchCodec, set) 294 if err != nil { 295 return nil, err 296 } 297 var raw map[string]interface{} 298 err = json.Unmarshal(data, &raw) 299 if err != nil { 300 return nil, err 301 } 302 objCopy := make(map[string]interface{}) 303 specCopy := make(map[string]interface{}) 304 spec := raw["spec"].(map[string]interface{}) 305 template := spec["template"].(map[string]interface{}) 306 specCopy["template"] = template 307 template["$patch"] = "replace" 308 objCopy["spec"] = specCopy 309 patch, err := json.Marshal(objCopy) 310 return patch, err 311} 312 313// newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set. 314// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned 315// ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set 316// to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet. 317func newRevision(set *apps.StatefulSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) { 318 patch, err := getPatch(set) 319 if err != nil { 320 return nil, err 321 } 322 cr, err := history.NewControllerRevision(set, 323 controllerKind, 324 set.Spec.Template.Labels, 325 runtime.RawExtension{Raw: patch}, 326 revision, 327 collisionCount) 328 if err != nil { 329 return nil, err 330 } 331 if cr.ObjectMeta.Annotations == nil { 332 cr.ObjectMeta.Annotations = make(map[string]string) 333 } 334 for key, value := range set.Annotations { 335 cr.ObjectMeta.Annotations[key] = value 336 } 337 return cr, nil 338} 339 340// ApplyRevision returns a new StatefulSet constructed by restoring the state in revision to set. If the returned error 341// is nil, the returned StatefulSet is valid. 342func ApplyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) { 343 clone := set.DeepCopy() 344 patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone) 345 if err != nil { 346 return nil, err 347 } 348 restoredSet := &apps.StatefulSet{} 349 err = json.Unmarshal(patched, restoredSet) 350 if err != nil { 351 return nil, err 352 } 353 return restoredSet, nil 354} 355 356// nextRevision finds the next valid revision number based on revisions. If the length of revisions 357// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method 358// assumes that revisions has been sorted by Revision. 359func nextRevision(revisions []*apps.ControllerRevision) int64 { 360 count := len(revisions) 361 if count <= 0 { 362 return 1 363 } 364 return revisions[count-1].Revision + 1 365} 366 367// inconsistentStatus returns true if the ObservedGeneration of status is greater than set's 368// Generation or if any of the status's fields do not match those of set's status. 369func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool { 370 return status.ObservedGeneration > set.Status.ObservedGeneration || 371 status.Replicas != set.Status.Replicas || 372 status.CurrentReplicas != set.Status.CurrentReplicas || 373 status.ReadyReplicas != set.Status.ReadyReplicas || 374 status.UpdatedReplicas != set.Status.UpdatedReplicas || 375 status.CurrentRevision != set.Status.CurrentRevision || 376 status.AvailableReplicas != set.Status.AvailableReplicas || 377 status.UpdateRevision != set.Status.UpdateRevision 378} 379 380// completeRollingUpdate completes a rolling update when all of set's replica Pods have been updated 381// to the updateRevision. status's currentRevision is set to updateRevision and its' updateRevision 382// is set to the empty string. status's currentReplicas is set to updateReplicas and its updateReplicas 383// are set to 0. 384func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) { 385 if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType && 386 status.UpdatedReplicas == status.Replicas && 387 status.ReadyReplicas == status.Replicas { 388 status.CurrentReplicas = status.UpdatedReplicas 389 status.CurrentRevision = status.UpdateRevision 390 } 391} 392 393// ascendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted 394// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed 395// to the front of the list. 396type ascendingOrdinal []*v1.Pod 397 398func (ao ascendingOrdinal) Len() int { 399 return len(ao) 400} 401 402func (ao ascendingOrdinal) Swap(i, j int) { 403 ao[i], ao[j] = ao[j], ao[i] 404} 405 406func (ao ascendingOrdinal) Less(i, j int) bool { 407 return getOrdinal(ao[i]) < getOrdinal(ao[j]) 408} 409