1# Copyright 2008 Canonical Ltd. 2 3# This file is part of launchpadlib. 4# 5# launchpadlib is free software: you can redistribute it and/or modify 6# it under the terms of the GNU Lesser General Public License as 7# published by the Free Software Foundation, either version 3 of the 8# License, or (at your option) any later version. 9# 10# launchpadlib is distributed in the hope that it will be useful, but 11# WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# Lesser General Public License for more details. 14# 15# You should have received a copy of the GNU Lesser General Public 16# License along with launchpadlib. If not, see 17# <http://www.gnu.org/licenses/>. 18 19"""Testing API allows fake data to be used in unit tests. 20 21Testing launchpadlib code is tricky, because it depends so heavily on a 22remote, unique webservice: Launchpad. This module helps you write tests for 23your launchpadlib application that can be run locally and quickly. 24 25Say you were writing some code that needed to call out to Launchpad and get 26the branches owned by the logged-in person, and then do something to them. For 27example, something like this:: 28 29 def collect_unique_names(lp): 30 names = [] 31 for branch in lp.me.getBranches(): 32 names.append(branch.unique_name) 33 return names 34 35To test it, you would first prepare a L{FakeLaunchpad} object, and give it 36some sample data of your own devising:: 37 38 lp = FakeLaunchpad() 39 my_branches = [dict(unique_name='~foo/bar/baz')] 40 lp.me = dict(getBranches: lambda status: my_branches) 41 42Then, in the test, call your own code and assert that it behaves correctly 43given the data. 44 45 names = collect_unique_names(lp) 46 self.assertEqual(['~foo/bar/baz'], names) 47 48And that's it. 49 50The L{FakeLaunchpad} code uses a WADL file to type-check any objects created 51or returned. This means you can be sure that you won't accidentally store 52sample data with misspelled attribute names. 53 54The WADL file that we use by default is for version 1.0 of the Launchpad API. 55If you want to work against a more recent version of the API, download the 56WADL yourself (see <https://help.launchpad.net/API/Hacking>) and construct 57your C{FakeLaunchpad} like this:: 58 59 from wadllib.application import Application 60 lp = FakeLaunchpad( 61 Application('https://api.launchpad.net/devel/', 62 '/path/to/wadl.xml')) 63 64Where 'https://api.launchpad.net/devel/' is the URL for the WADL file, found 65also in the WADL file itelf. 66""" 67 68from datetime import datetime 69try: 70 from collections.abc import Callable 71except ImportError: 72 from collections import Callable 73import sys 74if sys.version_info[0] >= 3: 75 basestring = str 76 77JSON_MEDIA_TYPE = "application/json" 78 79 80class IntegrityError(Exception): 81 """Raised when bad sample data is used with a L{FakeLaunchpad} instance.""" 82 83 84class FakeLaunchpad(object): 85 """A fake Launchpad API class for unit tests that depend on L{Launchpad}. 86 87 @param application: A C{wadllib.application.Application} instance for a 88 Launchpad WADL definition file. 89 """ 90 91 def __init__(self, credentials=None, service_root=None, cache=None, 92 timeout=None, proxy_info=None, application=None): 93 if application is None: 94 from launchpadlib.testing.resources import get_application 95 application = get_application() 96 root_resource = FakeRoot(application) 97 self.__dict__.update({"credentials": credentials, 98 "_application": application, 99 "_service_root": root_resource}) 100 101 def __setattr__(self, name, values): 102 """Set sample data. 103 104 @param name: The name of the attribute. 105 @param values: A dict representing an object matching a resource 106 defined in Launchpad's WADL definition. 107 """ 108 service_root = self._service_root 109 setattr(service_root, name, values) 110 111 def __getattr__(self, name): 112 """Get sample data. 113 114 @param name: The name of the attribute. 115 """ 116 return getattr(self._service_root, name) 117 118 @classmethod 119 def login(cls, consumer_name, token_string, access_secret, 120 service_root=None, cache=None, timeout=None, proxy_info=None): 121 """Convenience for setting up access credentials.""" 122 from launchpadlib.testing.resources import get_application 123 return cls(object(), application=get_application()) 124 125 @classmethod 126 def get_token_and_login(cls, consumer_name, service_root=None, 127 cache=None, timeout=None, proxy_info=None): 128 """Get credentials from Launchpad and log into the service root.""" 129 from launchpadlib.testing.resources import get_application 130 return cls(object(), application=get_application()) 131 132 @classmethod 133 def login_with(cls, consumer_name, service_root=None, 134 launchpadlib_dir=None, timeout=None, proxy_info=None): 135 """Log in to Launchpad with possibly cached credentials.""" 136 from launchpadlib.testing.resources import get_application 137 return cls(object(), application=get_application()) 138 139 140def find_by_attribute(element, name, value): 141 """Find children of 'element' where attribute 'name' is equal to 'value'. 142 """ 143 return [child for child in element if child.get(name) == value] 144 145 146def strip_suffix(string, suffix): 147 if string.endswith(suffix): 148 return string[:-len(suffix)] 149 return string 150 151 152class FakeResource(object): 153 """ 154 Represents valid sample data on L{FakeLaunchpad} instances. 155 156 @ivar _children: A dictionary of child resources, each of type 157 C{FakeResource}. 158 @ivar _values: A dictionary of values associated with this resource. e.g. 159 "display_name" or "date_created". The values of this dictionary will 160 never be C{FakeResource}s. 161 162 Note that if C{_children} has a key, then C{_values} will not, and vice 163 versa. That is, they are distinct dicts. 164 """ 165 166 special_methods = ["lp_save"] 167 168 def __init__(self, application, resource_type, values=None): 169 """Construct a FakeResource. 170 171 @param application: A C{waddlib.application.Application} instance. 172 @param resource_type: A C{wadllib.application.ResourceType} instance 173 for this resource. 174 @param values: Optionally, a dict representing attribute key/value 175 pairs for this resource. 176 """ 177 if values is None: 178 values = {} 179 self.__dict__.update({"_application": application, 180 "_resource_type": resource_type, 181 "_children": {}, 182 "_values": values}) 183 184 def __setattr__(self, name, value): 185 """Set sample data. 186 187 C{value} can be a dict representing an object matching a resource 188 defined in the WADL definition. Alternatively, C{value} could be a 189 resource itself. Either way, it is checked for type correctness 190 against the WADL definition. 191 """ 192 if isinstance(value, dict): 193 self._children[name] = self._create_child_resource(name, value) 194 else: 195 values = {} 196 values.update(self._values) 197 values[name] = value 198 # Confirm that the new 'values' dict is a partial type match for 199 # this resource. 200 self._check_resource_type(self._resource_type, values) 201 self.__dict__["_values"] = values 202 203 def __getattr__(self, name, _marker=object()): 204 """Get sample data. 205 206 @param name: The name of the attribute. 207 """ 208 result = self._children.get(name, _marker) 209 if result is _marker: 210 result = self._values.get(name, _marker) 211 if isinstance(result, Callable): 212 return self._wrap_method(name, result) 213 if name in self.special_methods: 214 return lambda: True 215 if result is _marker: 216 raise AttributeError("%r has no attribute '%s'" % (self, name)) 217 return result 218 219 def _wrap_method(self, name, method): 220 """Wrapper around methods validates results when it's run. 221 222 @param name: The name of the method. 223 @param method: The callable to run when the method is called. 224 """ 225 def wrapper(*args, **kwargs): 226 return self._run_method(name, method, *args, **kwargs) 227 return wrapper 228 229 def _create_child_resource(self, name, values): 230 """ 231 Ensure that C{values} is a valid object for the C{name} attribute and 232 return a resource object to represent it as API data. 233 234 @param name: The name of the attribute to check the C{values} object 235 against. 236 @param values: A dict with key/value pairs representing attributes and 237 methods of an object matching the C{name} resource's definition. 238 @return: A L{FakeEntry} for an ordinary resource or a 239 L{FakeCollection} for a resource that represents a collection. 240 @raises IntegrityError: Raised if C{name} isn't a valid attribute for 241 this resource or if C{values} isn't a valid object for the C{name} 242 attribute. 243 """ 244 root_resource = self._application.get_resource_by_path("") 245 is_link = False 246 param = root_resource.get_parameter(name + "_collection_link", 247 JSON_MEDIA_TYPE) 248 if param is None: 249 is_link = True 250 param = root_resource.get_parameter(name + "_link", JSON_MEDIA_TYPE) 251 if param is None: 252 raise IntegrityError("%s isn't a valid property." % (name,)) 253 resource_type = self._get_resource_type(param) 254 if is_link: 255 self._check_resource_type(resource_type, values) 256 return FakeEntry(self._application, resource_type, values) 257 else: 258 name, child_resource_type = ( 259 self._check_collection_type(resource_type, values)) 260 return FakeCollection(self._application, resource_type, values, 261 name, child_resource_type) 262 263 def _get_resource_type(self, param): 264 """Get the resource type for C{param}. 265 266 @param param: An object representing a C{_link} or C{_collection_link} 267 parameter. 268 @return: The resource type for the parameter, or None if one isn't 269 available. 270 """ 271 [link] = list(param.tag) 272 name = link.get("resource_type") 273 return self._application.get_resource_type(name) 274 275 def _check_resource_type(self, resource_type, partial_object): 276 """ 277 Ensure that attributes and methods defined for C{partial_object} match 278 attributes and methods defined for C{resource_type}. 279 280 @param resource_type: The resource type to check the attributes and 281 methods against. 282 @param partial_object: A dict with key/value pairs representing 283 attributes and methods. 284 """ 285 for name, value in partial_object.items(): 286 if isinstance(value, Callable): 287 # Performs an integrity check. 288 self._get_method(resource_type, name) 289 else: 290 self._check_attribute(resource_type, name, value) 291 292 def _check_collection_type(self, resource_type, partial_object): 293 """ 294 Ensure that attributes and methods defined for C{partial_object} match 295 attributes and methods defined for C{resource_type}. Collection 296 entries are treated specially. 297 298 @param resource_type: The resource type to check the attributes and 299 methods against. 300 @param partial_object: A dict with key/value pairs representing 301 attributes and methods. 302 @return: (name, resource_type), where 'name' is the name of the child 303 resource type and 'resource_type' is the corresponding resource 304 type. 305 """ 306 name = None 307 child_resource_type = None 308 for name, value in partial_object.items(): 309 if name == "entries": 310 name, child_resource_type = ( 311 self._check_entries(resource_type, value)) 312 elif isinstance(value, Callable): 313 # Performs an integrity check. 314 self._get_method(resource_type, name) 315 else: 316 self._check_attribute(resource_type, name, value) 317 return name, child_resource_type 318 319 def _find_representation_id(self, resource_type, name): 320 """Find the WADL XML id for the representation of C{resource_type}. 321 322 Looks in the WADL for the first representiation associated with the 323 method for a resource type. 324 325 :return: An XML id (a string). 326 """ 327 get_method = self._get_method(resource_type, name) 328 for response in get_method: 329 for representation in response: 330 representation_url = representation.get("href") 331 if representation_url is not None: 332 return self._application.lookup_xml_id(representation_url) 333 334 def _check_attribute(self, resource_type, name, value): 335 """ 336 Ensure that C{value} is a valid C{name} attribute on C{resource_type}. 337 338 Does this by finding the representation for the default, canonical GET 339 method (as opposed to the many "named" GET methods that exist.) 340 341 @param resource_type: The resource type to check the attribute 342 against. 343 @param name: The name of the attribute. 344 @param value: The value to check. 345 """ 346 xml_id = self._find_representation_id(resource_type, 'get') 347 self._check_attribute_representation(xml_id, name, value) 348 349 def _check_attribute_representation(self, xml_id, name, value): 350 """ 351 Ensure that C{value} is a valid value for C{name} with the 352 representation definition matching C{xml_id}. 353 354 @param xml_id: The XML ID for the representation to check the 355 attribute against. 356 @param name: The name of the attribute. 357 @param value: The value to check. 358 @raises IntegrityError: Raised if C{name} is not a valid attribute 359 name or if C{value}'s type is not valid for the attribute. 360 """ 361 representation = self._application.representation_definitions[xml_id] 362 parameters = dict((child.get("name"), child) 363 for child in representation.tag) 364 if name not in parameters: 365 raise IntegrityError("%s not found" % name) 366 parameter = parameters[name] 367 data_type = parameter.get("type") 368 if data_type is None: 369 if not isinstance(value, basestring): 370 raise IntegrityError( 371 "%s is not a str or unicode for %s" % (value, name)) 372 elif data_type == "xsd:dateTime": 373 if not isinstance(value, datetime): 374 raise IntegrityError( 375 "%s is not a datetime for %s" % (value, name)) 376 377 def _get_method(self, resource_type, name): 378 """Get the C{name} method on C{resource_type}. 379 380 @param resource_type: The method's resource type. 381 @param name: The name of the method. 382 @raises IntegrityError: Raised if a method called C{name} is not 383 available on C{resource_type}. 384 @return: The XML element for the method from the WADL. 385 """ 386 if name in self.special_methods: 387 return 388 resource_name = resource_type.tag.get("id") 389 xml_id = "%s-%s" % (resource_name, name) 390 try: 391 [get_method] = find_by_attribute(resource_type.tag, 'id', xml_id) 392 except ValueError: 393 raise IntegrityError( 394 "%s is not a method of %s" % (name, resource_name)) 395 return get_method 396 397 def _run_method(self, name, method, *args, **kwargs): 398 """Run a method and convert its result into a L{FakeResource}. 399 400 If the result represents an object it is validated against the WADL 401 definition before being returned. 402 403 @param name: The name of the method. 404 @param method: A callable. 405 @param args: Arguments to pass to the callable. 406 @param kwargs: Keyword arguments to pass to the callable. 407 @return: A L{FakeResource} representing the result if it's an object. 408 @raises IntegrityError: Raised if the return value from the method 409 isn't valid. 410 """ 411 result = method(*args, **kwargs) 412 if name in self.special_methods: 413 return result 414 else: 415 return self._create_resource(self._resource_type, name, result) 416 417 def _create_resource(self, resource_type, name, result): 418 """Create a new L{FakeResource} for C{resource_type} method call result. 419 420 @param resource_type: The resource type of the method. 421 @param name: The name of the method on C{resource_type}. 422 @param result: The result of calling the method. 423 @raises IntegrityError: Raised if C{result} is an invalid return value 424 for the method. 425 @return: A L{FakeResource} for C{result}. 426 """ 427 resource_name = resource_type.tag.get("id") 428 if resource_name == name: 429 name = "get" 430 xml_id = self._find_representation_id(resource_type, name) 431 xml_id = strip_suffix(xml_id, '-full') 432 if xml_id not in self._application.resource_types: 433 xml_id += '-resource' 434 result_resource_type = self._application.resource_types[xml_id] 435 self._check_resource_type(result_resource_type, result) 436 # XXX: Should this wrap in collection? 437 return FakeResource(self._application, result_resource_type, result) 438 439 def _get_child_resource_type(self, resource_type): 440 """Get the name and resource type for the entries in a collection. 441 442 @param resource_type: The resource type for a collection. 443 @return: (name, resource_type), where 'name' is the name of the child 444 resource type and 'resource_type' is the corresponding resource 445 type. 446 """ 447 xml_id = self._find_representation_id(resource_type, 'get') 448 representation_definition = ( 449 self._application.representation_definitions[xml_id]) 450 451 [entry_links] = find_by_attribute( 452 representation_definition.tag, 'name', 'entry_links') 453 [resource_type] = list(entry_links) 454 resource_type_url = resource_type.get("resource_type") 455 resource_type_name = resource_type_url.split("#")[1] 456 return ( 457 resource_type_name, 458 self._application.get_resource_type(resource_type_url)) 459 460 def _check_entries(self, resource_type, entries): 461 """Ensure that C{entries} are valid for a C{resource_type} collection. 462 463 @param resource_type: The resource type of the collection the entries 464 are in. 465 @param entries: A list of dicts representing objects in the 466 collection. 467 @return: (name, resource_type), where 'name' is the name of the child 468 resource type and 'resource_type' is the corresponding resource 469 type. 470 """ 471 name, child_resource_type = self._get_child_resource_type(resource_type) 472 for entry in entries: 473 self._check_resource_type(child_resource_type, entry) 474 return name, child_resource_type 475 476 def __repr__(self): 477 """ 478 The resource type, identifier if available, and memory address are 479 used to generate a representation of this fake resource. 480 """ 481 name = self._resource_type.tag.get("id") 482 key = "object" 483 key = self._values.get("id", key) 484 key = self._values.get("name", key) 485 return "<%s %s %s at %s>" % ( 486 self.__class__.__name__, name, key, hex(id(self))) 487 488 489class FakeRoot(FakeResource): 490 """Fake root object for an application.""" 491 492 def __init__(self, application): 493 """Create a L{FakeResource} for the service root of C{application}. 494 495 @param application: A C{wadllib.application.Application} instance. 496 """ 497 resource_type = application.get_resource_type( 498 application.markup_url + "#service-root") 499 super(FakeRoot, self).__init__(application, resource_type) 500 501 502class FakeEntry(FakeResource): 503 """A fake resource for an entry.""" 504 505 506class FakeCollection(FakeResource): 507 """A fake resource for a collection.""" 508 509 def __init__(self, application, resource_type, values=None, 510 name=None, child_resource_type=None): 511 super(FakeCollection, self).__init__(application, resource_type, values) 512 self.__dict__.update({"_name": name, 513 "_child_resource_type": child_resource_type}) 514 515 def __iter__(self): 516 """Iterate items if this resource has an C{entries} attribute.""" 517 entries = self._values.get("entries", ()) 518 for entry in entries: 519 yield self._create_resource(self._child_resource_type, self._name, 520 entry) 521 522 def __getitem__(self, key): 523 """Look up a slice, or a subordinate resource by index. 524 525 @param key: An individual object key or a C{slice}. 526 @raises IndexError: Raised if an invalid key is provided. 527 @return: A L{FakeResource} instance for the entry matching C{key}. 528 """ 529 entries = list(self) 530 if isinstance(key, slice): 531 start = key.start or 0 532 stop = key.stop 533 if start < 0: 534 raise ValueError("Collection slices must have a nonnegative " 535 "start point.") 536 if stop < 0: 537 raise ValueError("Collection slices must have a definite, " 538 "nonnegative end point.") 539 return entries.__getitem__(key) 540 elif isinstance(key, int): 541 return entries.__getitem__(key) 542 else: 543 raise IndexError("Do not support index lookups yet.") 544