1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package testing 18 19import ( 20 "fmt" 21 "sync" 22 23 jsonpatch "github.com/evanphx/json-patch" 24 "k8s.io/apimachinery/pkg/api/errors" 25 "k8s.io/apimachinery/pkg/api/meta" 26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 27 "k8s.io/apimachinery/pkg/runtime" 28 "k8s.io/apimachinery/pkg/runtime/schema" 29 "k8s.io/apimachinery/pkg/types" 30 "k8s.io/apimachinery/pkg/util/json" 31 "k8s.io/apimachinery/pkg/util/strategicpatch" 32 "k8s.io/apimachinery/pkg/watch" 33 restclient "k8s.io/client-go/rest" 34) 35 36// ObjectTracker keeps track of objects. It is intended to be used to 37// fake calls to a server by returning objects based on their kind, 38// namespace and name. 39type ObjectTracker interface { 40 // Add adds an object to the tracker. If object being added 41 // is a list, its items are added separately. 42 Add(obj runtime.Object) error 43 44 // Get retrieves the object by its kind, namespace and name. 45 Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) 46 47 // Create adds an object to the tracker in the specified namespace. 48 Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error 49 50 // Update updates an existing object in the tracker in the specified namespace. 51 Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error 52 53 // List retrieves all objects of a given kind in the given 54 // namespace. Only non-List kinds are accepted. 55 List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) 56 57 // Delete deletes an existing object from the tracker. If object 58 // didn't exist in the tracker prior to deletion, Delete returns 59 // no error. 60 Delete(gvr schema.GroupVersionResource, ns, name string) error 61 62 // Watch watches objects from the tracker. Watch returns a channel 63 // which will push added / modified / deleted object. 64 Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) 65} 66 67// ObjectScheme abstracts the implementation of common operations on objects. 68type ObjectScheme interface { 69 runtime.ObjectCreater 70 runtime.ObjectTyper 71} 72 73// ObjectReaction returns a ReactionFunc that applies core.Action to 74// the given tracker. 75func ObjectReaction(tracker ObjectTracker) ReactionFunc { 76 return func(action Action) (bool, runtime.Object, error) { 77 ns := action.GetNamespace() 78 gvr := action.GetResource() 79 // Here and below we need to switch on implementation types, 80 // not on interfaces, as some interfaces are identical 81 // (e.g. UpdateAction and CreateAction), so if we use them, 82 // updates and creates end up matching the same case branch. 83 switch action := action.(type) { 84 85 case ListActionImpl: 86 obj, err := tracker.List(gvr, action.GetKind(), ns) 87 return true, obj, err 88 89 case GetActionImpl: 90 obj, err := tracker.Get(gvr, ns, action.GetName()) 91 return true, obj, err 92 93 case CreateActionImpl: 94 objMeta, err := meta.Accessor(action.GetObject()) 95 if err != nil { 96 return true, nil, err 97 } 98 if action.GetSubresource() == "" { 99 err = tracker.Create(gvr, action.GetObject(), ns) 100 } else { 101 // TODO: Currently we're handling subresource creation as an update 102 // on the enclosing resource. This works for some subresources but 103 // might not be generic enough. 104 err = tracker.Update(gvr, action.GetObject(), ns) 105 } 106 if err != nil { 107 return true, nil, err 108 } 109 obj, err := tracker.Get(gvr, ns, objMeta.GetName()) 110 return true, obj, err 111 112 case UpdateActionImpl: 113 objMeta, err := meta.Accessor(action.GetObject()) 114 if err != nil { 115 return true, nil, err 116 } 117 err = tracker.Update(gvr, action.GetObject(), ns) 118 if err != nil { 119 return true, nil, err 120 } 121 obj, err := tracker.Get(gvr, ns, objMeta.GetName()) 122 return true, obj, err 123 124 case DeleteActionImpl: 125 err := tracker.Delete(gvr, ns, action.GetName()) 126 if err != nil { 127 return true, nil, err 128 } 129 return true, nil, nil 130 131 case PatchActionImpl: 132 obj, err := tracker.Get(gvr, ns, action.GetName()) 133 if err != nil { 134 return true, nil, err 135 } 136 137 old, err := json.Marshal(obj) 138 if err != nil { 139 return true, nil, err 140 } 141 142 switch action.GetPatchType() { 143 case types.JSONPatchType: 144 patch, err := jsonpatch.DecodePatch(action.GetPatch()) 145 if err != nil { 146 return true, nil, err 147 } 148 modified, err := patch.Apply(old) 149 if err != nil { 150 return true, nil, err 151 } 152 if err = json.Unmarshal(modified, obj); err != nil { 153 return true, nil, err 154 } 155 case types.MergePatchType: 156 modified, err := jsonpatch.MergePatch(old, action.GetPatch()) 157 if err != nil { 158 return true, nil, err 159 } 160 161 if err := json.Unmarshal(modified, obj); err != nil { 162 return true, nil, err 163 } 164 case types.StrategicMergePatchType: 165 mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj) 166 if err != nil { 167 return true, nil, err 168 } 169 if err = json.Unmarshal(mergedByte, obj); err != nil { 170 return true, nil, err 171 } 172 default: 173 return true, nil, fmt.Errorf("PatchType is not supported") 174 } 175 176 if err = tracker.Update(gvr, obj, ns); err != nil { 177 return true, nil, err 178 } 179 180 return true, obj, nil 181 182 default: 183 return false, nil, fmt.Errorf("no reaction implemented for %s", action) 184 } 185 } 186} 187 188type tracker struct { 189 scheme ObjectScheme 190 decoder runtime.Decoder 191 lock sync.RWMutex 192 objects map[schema.GroupVersionResource][]runtime.Object 193 // The value type of watchers is a map of which the key is either a namespace or 194 // all/non namespace aka "" and its value is list of fake watchers. 195 // Manipulations on resources will broadcast the notification events into the 196 // watchers' channel. Note that too many unhandled events (currently 100, 197 // see apimachinery/pkg/watch.DefaultChanSize) will cause a panic. 198 watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher 199} 200 201var _ ObjectTracker = &tracker{} 202 203// NewObjectTracker returns an ObjectTracker that can be used to keep track 204// of objects for the fake clientset. Mostly useful for unit tests. 205func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker { 206 return &tracker{ 207 scheme: scheme, 208 decoder: decoder, 209 objects: make(map[schema.GroupVersionResource][]runtime.Object), 210 watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), 211 } 212} 213 214func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) { 215 // Heuristic for list kind: original kind + List suffix. Might 216 // not always be true but this tracker has a pretty limited 217 // understanding of the actual API model. 218 listGVK := gvk 219 listGVK.Kind = listGVK.Kind + "List" 220 // GVK does have the concept of "internal version". The scheme recognizes 221 // the runtime.APIVersionInternal, but not the empty string. 222 if listGVK.Version == "" { 223 listGVK.Version = runtime.APIVersionInternal 224 } 225 226 list, err := t.scheme.New(listGVK) 227 if err != nil { 228 return nil, err 229 } 230 231 if !meta.IsListType(list) { 232 return nil, fmt.Errorf("%q is not a list type", listGVK.Kind) 233 } 234 235 t.lock.RLock() 236 defer t.lock.RUnlock() 237 238 objs, ok := t.objects[gvr] 239 if !ok { 240 return list, nil 241 } 242 243 matchingObjs, err := filterByNamespaceAndName(objs, ns, "") 244 if err != nil { 245 return nil, err 246 } 247 if err := meta.SetList(list, matchingObjs); err != nil { 248 return nil, err 249 } 250 return list.DeepCopyObject(), nil 251} 252 253func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { 254 t.lock.Lock() 255 defer t.lock.Unlock() 256 257 fakewatcher := watch.NewRaceFreeFake() 258 259 if _, exists := t.watchers[gvr]; !exists { 260 t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) 261 } 262 t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher) 263 return fakewatcher, nil 264} 265 266func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) { 267 errNotFound := errors.NewNotFound(gvr.GroupResource(), name) 268 269 t.lock.RLock() 270 defer t.lock.RUnlock() 271 272 objs, ok := t.objects[gvr] 273 if !ok { 274 return nil, errNotFound 275 } 276 277 matchingObjs, err := filterByNamespaceAndName(objs, ns, name) 278 if err != nil { 279 return nil, err 280 } 281 if len(matchingObjs) == 0 { 282 return nil, errNotFound 283 } 284 if len(matchingObjs) > 1 { 285 return nil, fmt.Errorf("more than one object matched gvr %s, ns: %q name: %q", gvr, ns, name) 286 } 287 288 // Only one object should match in the tracker if it works 289 // correctly, as Add/Update methods enforce kind/namespace/name 290 // uniqueness. 291 obj := matchingObjs[0].DeepCopyObject() 292 if status, ok := obj.(*metav1.Status); ok { 293 if status.Status != metav1.StatusSuccess { 294 return nil, &errors.StatusError{ErrStatus: *status} 295 } 296 } 297 298 return obj, nil 299} 300 301func (t *tracker) Add(obj runtime.Object) error { 302 if meta.IsListType(obj) { 303 return t.addList(obj, false) 304 } 305 objMeta, err := meta.Accessor(obj) 306 if err != nil { 307 return err 308 } 309 gvks, _, err := t.scheme.ObjectKinds(obj) 310 if err != nil { 311 return err 312 } 313 if len(gvks) == 0 { 314 return fmt.Errorf("no registered kinds for %v", obj) 315 } 316 for _, gvk := range gvks { 317 // NOTE: UnsafeGuessKindToResource is a heuristic and default match. The 318 // actual registration in apiserver can specify arbitrary route for a 319 // gvk. If a test uses such objects, it cannot preset the tracker with 320 // objects via Add(). Instead, it should trigger the Create() function 321 // of the tracker, where an arbitrary gvr can be specified. 322 gvr, _ := meta.UnsafeGuessKindToResource(gvk) 323 // Resource doesn't have the concept of "__internal" version, just set it to "". 324 if gvr.Version == runtime.APIVersionInternal { 325 gvr.Version = "" 326 } 327 328 err := t.add(gvr, obj, objMeta.GetNamespace(), false) 329 if err != nil { 330 return err 331 } 332 } 333 return nil 334} 335 336func (t *tracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error { 337 return t.add(gvr, obj, ns, false) 338} 339 340func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error { 341 return t.add(gvr, obj, ns, true) 342} 343 344func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { 345 watches := []*watch.RaceFreeFakeWatcher{} 346 if t.watchers[gvr] != nil { 347 if w := t.watchers[gvr][ns]; w != nil { 348 watches = append(watches, w...) 349 } 350 if ns != metav1.NamespaceAll { 351 if w := t.watchers[gvr][metav1.NamespaceAll]; w != nil { 352 watches = append(watches, w...) 353 } 354 } 355 } 356 return watches 357} 358 359func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error { 360 t.lock.Lock() 361 defer t.lock.Unlock() 362 363 gr := gvr.GroupResource() 364 365 // To avoid the object from being accidentally modified by caller 366 // after it's been added to the tracker, we always store the deep 367 // copy. 368 obj = obj.DeepCopyObject() 369 370 newMeta, err := meta.Accessor(obj) 371 if err != nil { 372 return err 373 } 374 375 // Propagate namespace to the new object if hasn't already been set. 376 if len(newMeta.GetNamespace()) == 0 { 377 newMeta.SetNamespace(ns) 378 } 379 380 if ns != newMeta.GetNamespace() { 381 msg := fmt.Sprintf("request namespace does not match object namespace, request: %q object: %q", ns, newMeta.GetNamespace()) 382 return errors.NewBadRequest(msg) 383 } 384 385 for i, existingObj := range t.objects[gvr] { 386 oldMeta, err := meta.Accessor(existingObj) 387 if err != nil { 388 return err 389 } 390 if oldMeta.GetNamespace() == newMeta.GetNamespace() && oldMeta.GetName() == newMeta.GetName() { 391 if replaceExisting { 392 for _, w := range t.getWatches(gvr, ns) { 393 w.Modify(obj) 394 } 395 t.objects[gvr][i] = obj 396 return nil 397 } 398 return errors.NewAlreadyExists(gr, newMeta.GetName()) 399 } 400 } 401 402 if replaceExisting { 403 // Tried to update but no matching object was found. 404 return errors.NewNotFound(gr, newMeta.GetName()) 405 } 406 407 t.objects[gvr] = append(t.objects[gvr], obj) 408 409 for _, w := range t.getWatches(gvr, ns) { 410 w.Add(obj) 411 } 412 413 return nil 414} 415 416func (t *tracker) addList(obj runtime.Object, replaceExisting bool) error { 417 list, err := meta.ExtractList(obj) 418 if err != nil { 419 return err 420 } 421 errs := runtime.DecodeList(list, t.decoder) 422 if len(errs) > 0 { 423 return errs[0] 424 } 425 for _, obj := range list { 426 if err := t.Add(obj); err != nil { 427 return err 428 } 429 } 430 return nil 431} 432 433func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error { 434 t.lock.Lock() 435 defer t.lock.Unlock() 436 437 found := false 438 439 for i, existingObj := range t.objects[gvr] { 440 objMeta, err := meta.Accessor(existingObj) 441 if err != nil { 442 return err 443 } 444 if objMeta.GetNamespace() == ns && objMeta.GetName() == name { 445 obj := t.objects[gvr][i] 446 t.objects[gvr] = append(t.objects[gvr][:i], t.objects[gvr][i+1:]...) 447 for _, w := range t.getWatches(gvr, ns) { 448 w.Delete(obj) 449 } 450 found = true 451 break 452 } 453 } 454 455 if found { 456 return nil 457 } 458 459 return errors.NewNotFound(gvr.GroupResource(), name) 460} 461 462// filterByNamespaceAndName returns all objects in the collection that 463// match provided namespace and name. Empty namespace matches 464// non-namespaced objects. 465func filterByNamespaceAndName(objs []runtime.Object, ns, name string) ([]runtime.Object, error) { 466 var res []runtime.Object 467 468 for _, obj := range objs { 469 acc, err := meta.Accessor(obj) 470 if err != nil { 471 return nil, err 472 } 473 if ns != "" && acc.GetNamespace() != ns { 474 continue 475 } 476 if name != "" && acc.GetName() != name { 477 continue 478 } 479 res = append(res, obj) 480 } 481 482 return res, nil 483} 484 485func DefaultWatchReactor(watchInterface watch.Interface, err error) WatchReactionFunc { 486 return func(action Action) (bool, watch.Interface, error) { 487 return true, watchInterface, err 488 } 489} 490 491// SimpleReactor is a Reactor. Each reaction function is attached to a given verb,resource tuple. "*" in either field matches everything for that value. 492// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions 493type SimpleReactor struct { 494 Verb string 495 Resource string 496 497 Reaction ReactionFunc 498} 499 500func (r *SimpleReactor) Handles(action Action) bool { 501 verbCovers := r.Verb == "*" || r.Verb == action.GetVerb() 502 if !verbCovers { 503 return false 504 } 505 resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource 506 if !resourceCovers { 507 return false 508 } 509 510 return true 511} 512 513func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) { 514 return r.Reaction(action) 515} 516 517// SimpleWatchReactor is a WatchReactor. Each reaction function is attached to a given resource. "*" matches everything for that value. 518// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions 519type SimpleWatchReactor struct { 520 Resource string 521 522 Reaction WatchReactionFunc 523} 524 525func (r *SimpleWatchReactor) Handles(action Action) bool { 526 resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource 527 if !resourceCovers { 528 return false 529 } 530 531 return true 532} 533 534func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) { 535 return r.Reaction(action) 536} 537 538// SimpleProxyReactor is a ProxyReactor. Each reaction function is attached to a given resource. "*" matches everything for that value. 539// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions. 540type SimpleProxyReactor struct { 541 Resource string 542 543 Reaction ProxyReactionFunc 544} 545 546func (r *SimpleProxyReactor) Handles(action Action) bool { 547 resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource 548 if !resourceCovers { 549 return false 550 } 551 552 return true 553} 554 555func (r *SimpleProxyReactor) React(action Action) (bool, restclient.ResponseWrapper, error) { 556 return r.Reaction(action) 557} 558