1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package testing 18 19import ( 20 "fmt" 21 "reflect" 22 "sort" 23 "sync" 24 25 jsonpatch "github.com/evanphx/json-patch" 26 27 "k8s.io/apimachinery/pkg/api/errors" 28 "k8s.io/apimachinery/pkg/api/meta" 29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 30 "k8s.io/apimachinery/pkg/runtime" 31 "k8s.io/apimachinery/pkg/runtime/schema" 32 "k8s.io/apimachinery/pkg/types" 33 "k8s.io/apimachinery/pkg/util/json" 34 "k8s.io/apimachinery/pkg/util/strategicpatch" 35 "k8s.io/apimachinery/pkg/watch" 36 restclient "k8s.io/client-go/rest" 37) 38 39// ObjectTracker keeps track of objects. It is intended to be used to 40// fake calls to a server by returning objects based on their kind, 41// namespace and name. 42type ObjectTracker interface { 43 // Add adds an object to the tracker. If object being added 44 // is a list, its items are added separately. 45 Add(obj runtime.Object) error 46 47 // Get retrieves the object by its kind, namespace and name. 48 Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) 49 50 // Create adds an object to the tracker in the specified namespace. 51 Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error 52 53 // Update updates an existing object in the tracker in the specified namespace. 54 Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error 55 56 // List retrieves all objects of a given kind in the given 57 // namespace. Only non-List kinds are accepted. 58 List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) 59 60 // Delete deletes an existing object from the tracker. If object 61 // didn't exist in the tracker prior to deletion, Delete returns 62 // no error. 63 Delete(gvr schema.GroupVersionResource, ns, name string) error 64 65 // Watch watches objects from the tracker. Watch returns a channel 66 // which will push added / modified / deleted object. 67 Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) 68} 69 70// ObjectScheme abstracts the implementation of common operations on objects. 71type ObjectScheme interface { 72 runtime.ObjectCreater 73 runtime.ObjectTyper 74} 75 76// ObjectReaction returns a ReactionFunc that applies core.Action to 77// the given tracker. 78func ObjectReaction(tracker ObjectTracker) ReactionFunc { 79 return func(action Action) (bool, runtime.Object, error) { 80 ns := action.GetNamespace() 81 gvr := action.GetResource() 82 // Here and below we need to switch on implementation types, 83 // not on interfaces, as some interfaces are identical 84 // (e.g. UpdateAction and CreateAction), so if we use them, 85 // updates and creates end up matching the same case branch. 86 switch action := action.(type) { 87 88 case ListActionImpl: 89 obj, err := tracker.List(gvr, action.GetKind(), ns) 90 return true, obj, err 91 92 case GetActionImpl: 93 obj, err := tracker.Get(gvr, ns, action.GetName()) 94 return true, obj, err 95 96 case CreateActionImpl: 97 objMeta, err := meta.Accessor(action.GetObject()) 98 if err != nil { 99 return true, nil, err 100 } 101 if action.GetSubresource() == "" { 102 err = tracker.Create(gvr, action.GetObject(), ns) 103 } else { 104 // TODO: Currently we're handling subresource creation as an update 105 // on the enclosing resource. This works for some subresources but 106 // might not be generic enough. 107 err = tracker.Update(gvr, action.GetObject(), ns) 108 } 109 if err != nil { 110 return true, nil, err 111 } 112 obj, err := tracker.Get(gvr, ns, objMeta.GetName()) 113 return true, obj, err 114 115 case UpdateActionImpl: 116 objMeta, err := meta.Accessor(action.GetObject()) 117 if err != nil { 118 return true, nil, err 119 } 120 err = tracker.Update(gvr, action.GetObject(), ns) 121 if err != nil { 122 return true, nil, err 123 } 124 obj, err := tracker.Get(gvr, ns, objMeta.GetName()) 125 return true, obj, err 126 127 case DeleteActionImpl: 128 err := tracker.Delete(gvr, ns, action.GetName()) 129 if err != nil { 130 return true, nil, err 131 } 132 return true, nil, nil 133 134 case PatchActionImpl: 135 obj, err := tracker.Get(gvr, ns, action.GetName()) 136 if err != nil { 137 return true, nil, err 138 } 139 140 old, err := json.Marshal(obj) 141 if err != nil { 142 return true, nil, err 143 } 144 145 // reset the object in preparation to unmarshal, since unmarshal does not guarantee that fields 146 // in obj that are removed by patch are cleared 147 value := reflect.ValueOf(obj) 148 value.Elem().Set(reflect.New(value.Type().Elem()).Elem()) 149 150 switch action.GetPatchType() { 151 case types.JSONPatchType: 152 patch, err := jsonpatch.DecodePatch(action.GetPatch()) 153 if err != nil { 154 return true, nil, err 155 } 156 modified, err := patch.Apply(old) 157 if err != nil { 158 return true, nil, err 159 } 160 161 if err = json.Unmarshal(modified, obj); err != nil { 162 return true, nil, err 163 } 164 case types.MergePatchType: 165 modified, err := jsonpatch.MergePatch(old, action.GetPatch()) 166 if err != nil { 167 return true, nil, err 168 } 169 170 if err := json.Unmarshal(modified, obj); err != nil { 171 return true, nil, err 172 } 173 case types.StrategicMergePatchType: 174 mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj) 175 if err != nil { 176 return true, nil, err 177 } 178 if err = json.Unmarshal(mergedByte, obj); err != nil { 179 return true, nil, err 180 } 181 default: 182 return true, nil, fmt.Errorf("PatchType is not supported") 183 } 184 185 if err = tracker.Update(gvr, obj, ns); err != nil { 186 return true, nil, err 187 } 188 189 return true, obj, nil 190 191 default: 192 return false, nil, fmt.Errorf("no reaction implemented for %s", action) 193 } 194 } 195} 196 197type tracker struct { 198 scheme ObjectScheme 199 decoder runtime.Decoder 200 lock sync.RWMutex 201 objects map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object 202 // The value type of watchers is a map of which the key is either a namespace or 203 // all/non namespace aka "" and its value is list of fake watchers. 204 // Manipulations on resources will broadcast the notification events into the 205 // watchers' channel. Note that too many unhandled events (currently 100, 206 // see apimachinery/pkg/watch.DefaultChanSize) will cause a panic. 207 watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher 208} 209 210var _ ObjectTracker = &tracker{} 211 212// NewObjectTracker returns an ObjectTracker that can be used to keep track 213// of objects for the fake clientset. Mostly useful for unit tests. 214func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker { 215 return &tracker{ 216 scheme: scheme, 217 decoder: decoder, 218 objects: make(map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object), 219 watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), 220 } 221} 222 223func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) { 224 // Heuristic for list kind: original kind + List suffix. Might 225 // not always be true but this tracker has a pretty limited 226 // understanding of the actual API model. 227 listGVK := gvk 228 listGVK.Kind = listGVK.Kind + "List" 229 // GVK does have the concept of "internal version". The scheme recognizes 230 // the runtime.APIVersionInternal, but not the empty string. 231 if listGVK.Version == "" { 232 listGVK.Version = runtime.APIVersionInternal 233 } 234 235 list, err := t.scheme.New(listGVK) 236 if err != nil { 237 return nil, err 238 } 239 240 if !meta.IsListType(list) { 241 return nil, fmt.Errorf("%q is not a list type", listGVK.Kind) 242 } 243 244 t.lock.RLock() 245 defer t.lock.RUnlock() 246 247 objs, ok := t.objects[gvr] 248 if !ok { 249 return list, nil 250 } 251 252 matchingObjs, err := filterByNamespace(objs, ns) 253 if err != nil { 254 return nil, err 255 } 256 if err := meta.SetList(list, matchingObjs); err != nil { 257 return nil, err 258 } 259 return list.DeepCopyObject(), nil 260} 261 262func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { 263 t.lock.Lock() 264 defer t.lock.Unlock() 265 266 fakewatcher := watch.NewRaceFreeFake() 267 268 if _, exists := t.watchers[gvr]; !exists { 269 t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) 270 } 271 t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher) 272 return fakewatcher, nil 273} 274 275func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) { 276 errNotFound := errors.NewNotFound(gvr.GroupResource(), name) 277 278 t.lock.RLock() 279 defer t.lock.RUnlock() 280 281 objs, ok := t.objects[gvr] 282 if !ok { 283 return nil, errNotFound 284 } 285 286 matchingObj, ok := objs[types.NamespacedName{Namespace: ns, Name: name}] 287 if !ok { 288 return nil, errNotFound 289 } 290 291 // Only one object should match in the tracker if it works 292 // correctly, as Add/Update methods enforce kind/namespace/name 293 // uniqueness. 294 obj := matchingObj.DeepCopyObject() 295 if status, ok := obj.(*metav1.Status); ok { 296 if status.Status != metav1.StatusSuccess { 297 return nil, &errors.StatusError{ErrStatus: *status} 298 } 299 } 300 301 return obj, nil 302} 303 304func (t *tracker) Add(obj runtime.Object) error { 305 if meta.IsListType(obj) { 306 return t.addList(obj, false) 307 } 308 objMeta, err := meta.Accessor(obj) 309 if err != nil { 310 return err 311 } 312 gvks, _, err := t.scheme.ObjectKinds(obj) 313 if err != nil { 314 return err 315 } 316 317 if partial, ok := obj.(*metav1.PartialObjectMetadata); ok && len(partial.TypeMeta.APIVersion) > 0 { 318 gvks = []schema.GroupVersionKind{partial.TypeMeta.GroupVersionKind()} 319 } 320 321 if len(gvks) == 0 { 322 return fmt.Errorf("no registered kinds for %v", obj) 323 } 324 for _, gvk := range gvks { 325 // NOTE: UnsafeGuessKindToResource is a heuristic and default match. The 326 // actual registration in apiserver can specify arbitrary route for a 327 // gvk. If a test uses such objects, it cannot preset the tracker with 328 // objects via Add(). Instead, it should trigger the Create() function 329 // of the tracker, where an arbitrary gvr can be specified. 330 gvr, _ := meta.UnsafeGuessKindToResource(gvk) 331 // Resource doesn't have the concept of "__internal" version, just set it to "". 332 if gvr.Version == runtime.APIVersionInternal { 333 gvr.Version = "" 334 } 335 336 err := t.add(gvr, obj, objMeta.GetNamespace(), false) 337 if err != nil { 338 return err 339 } 340 } 341 return nil 342} 343 344func (t *tracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error { 345 return t.add(gvr, obj, ns, false) 346} 347 348func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error { 349 return t.add(gvr, obj, ns, true) 350} 351 352func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { 353 watches := []*watch.RaceFreeFakeWatcher{} 354 if t.watchers[gvr] != nil { 355 if w := t.watchers[gvr][ns]; w != nil { 356 watches = append(watches, w...) 357 } 358 if ns != metav1.NamespaceAll { 359 if w := t.watchers[gvr][metav1.NamespaceAll]; w != nil { 360 watches = append(watches, w...) 361 } 362 } 363 } 364 return watches 365} 366 367func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error { 368 t.lock.Lock() 369 defer t.lock.Unlock() 370 371 gr := gvr.GroupResource() 372 373 // To avoid the object from being accidentally modified by caller 374 // after it's been added to the tracker, we always store the deep 375 // copy. 376 obj = obj.DeepCopyObject() 377 378 newMeta, err := meta.Accessor(obj) 379 if err != nil { 380 return err 381 } 382 383 // Propagate namespace to the new object if hasn't already been set. 384 if len(newMeta.GetNamespace()) == 0 { 385 newMeta.SetNamespace(ns) 386 } 387 388 if ns != newMeta.GetNamespace() { 389 msg := fmt.Sprintf("request namespace does not match object namespace, request: %q object: %q", ns, newMeta.GetNamespace()) 390 return errors.NewBadRequest(msg) 391 } 392 393 _, ok := t.objects[gvr] 394 if !ok { 395 t.objects[gvr] = make(map[types.NamespacedName]runtime.Object) 396 } 397 398 namespacedName := types.NamespacedName{Namespace: newMeta.GetNamespace(), Name: newMeta.GetName()} 399 if _, ok = t.objects[gvr][namespacedName]; ok { 400 if replaceExisting { 401 for _, w := range t.getWatches(gvr, ns) { 402 w.Modify(obj) 403 } 404 t.objects[gvr][namespacedName] = obj 405 return nil 406 } 407 return errors.NewAlreadyExists(gr, newMeta.GetName()) 408 } 409 410 if replaceExisting { 411 // Tried to update but no matching object was found. 412 return errors.NewNotFound(gr, newMeta.GetName()) 413 } 414 415 t.objects[gvr][namespacedName] = obj 416 417 for _, w := range t.getWatches(gvr, ns) { 418 w.Add(obj) 419 } 420 421 return nil 422} 423 424func (t *tracker) addList(obj runtime.Object, replaceExisting bool) error { 425 list, err := meta.ExtractList(obj) 426 if err != nil { 427 return err 428 } 429 errs := runtime.DecodeList(list, t.decoder) 430 if len(errs) > 0 { 431 return errs[0] 432 } 433 for _, obj := range list { 434 if err := t.Add(obj); err != nil { 435 return err 436 } 437 } 438 return nil 439} 440 441func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error { 442 t.lock.Lock() 443 defer t.lock.Unlock() 444 445 objs, ok := t.objects[gvr] 446 if !ok { 447 return errors.NewNotFound(gvr.GroupResource(), name) 448 } 449 450 namespacedName := types.NamespacedName{Namespace: ns, Name: name} 451 obj, ok := objs[namespacedName] 452 if !ok { 453 return errors.NewNotFound(gvr.GroupResource(), name) 454 } 455 456 delete(objs, namespacedName) 457 for _, w := range t.getWatches(gvr, ns) { 458 w.Delete(obj) 459 } 460 return nil 461} 462 463// filterByNamespace returns all objects in the collection that 464// match provided namespace. Empty namespace matches 465// non-namespaced objects. 466func filterByNamespace(objs map[types.NamespacedName]runtime.Object, ns string) ([]runtime.Object, error) { 467 var res []runtime.Object 468 469 for _, obj := range objs { 470 acc, err := meta.Accessor(obj) 471 if err != nil { 472 return nil, err 473 } 474 if ns != "" && acc.GetNamespace() != ns { 475 continue 476 } 477 res = append(res, obj) 478 } 479 480 // Sort res to get deterministic order. 481 sort.Slice(res, func(i, j int) bool { 482 acc1, _ := meta.Accessor(res[i]) 483 acc2, _ := meta.Accessor(res[j]) 484 if acc1.GetNamespace() != acc2.GetNamespace() { 485 return acc1.GetNamespace() < acc2.GetNamespace() 486 } 487 return acc1.GetName() < acc2.GetName() 488 }) 489 return res, nil 490} 491 492func DefaultWatchReactor(watchInterface watch.Interface, err error) WatchReactionFunc { 493 return func(action Action) (bool, watch.Interface, error) { 494 return true, watchInterface, err 495 } 496} 497 498// SimpleReactor is a Reactor. Each reaction function is attached to a given verb,resource tuple. "*" in either field matches everything for that value. 499// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions 500type SimpleReactor struct { 501 Verb string 502 Resource string 503 504 Reaction ReactionFunc 505} 506 507func (r *SimpleReactor) Handles(action Action) bool { 508 verbCovers := r.Verb == "*" || r.Verb == action.GetVerb() 509 if !verbCovers { 510 return false 511 } 512 resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource 513 if !resourceCovers { 514 return false 515 } 516 517 return true 518} 519 520func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) { 521 return r.Reaction(action) 522} 523 524// SimpleWatchReactor is a WatchReactor. Each reaction function is attached to a given resource. "*" matches everything for that value. 525// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions 526type SimpleWatchReactor struct { 527 Resource string 528 529 Reaction WatchReactionFunc 530} 531 532func (r *SimpleWatchReactor) Handles(action Action) bool { 533 resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource 534 if !resourceCovers { 535 return false 536 } 537 538 return true 539} 540 541func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) { 542 return r.Reaction(action) 543} 544 545// SimpleProxyReactor is a ProxyReactor. Each reaction function is attached to a given resource. "*" matches everything for that value. 546// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions. 547type SimpleProxyReactor struct { 548 Resource string 549 550 Reaction ProxyReactionFunc 551} 552 553func (r *SimpleProxyReactor) Handles(action Action) bool { 554 resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource 555 if !resourceCovers { 556 return false 557 } 558 559 return true 560} 561 562func (r *SimpleProxyReactor) React(action Action) (bool, restclient.ResponseWrapper, error) { 563 return r.Reaction(action) 564} 565