1# -*- coding: utf-8 -*- 2# Copyright (c) 2015 Spotify AB 3 4from __future__ import absolute_import, division, print_function 5import re 6 7import attr 8from six import iteritems, iterkeys, itervalues 9 10 11from .config import MEDIA_TYPES 12from .errors import InvalidRAMLError 13from .parameters import ( 14 Documentation, Header, Body, Response, URIParameter, QueryParameter, 15 FormParameter, SecurityScheme 16) 17from .parser_utils import ( 18 security_schemes 19) 20from .raml import RootNode, ResourceNode, ResourceTypeNode, TraitNode 21from .utils import ( 22 load_schema, _resource_type_lookup, 23 _get_resource_type, _get_trait, _get_attribute, 24 _get_inherited_attribute, _remove_duplicates, _create_uri_params, 25 _get, _create_base_param_obj, _get_res_type_attribute, 26 _get_inherited_type_params, _get_inherited_item, _get_attribute_dict, 27 get_inherited, set_param_object, set_params, _get_data_union, 28 _preserve_uri_order 29) 30 31 32__all__ = ["parse_raml"] 33 34 35def parse_raml(loaded_raml, config): 36 """ 37 Parse loaded RAML file into RAML/Python objects. 38 39 :param RAMLDict loaded_raml: OrderedDict of loaded RAML file 40 :returns: :py:class:`.raml.RootNode` object. 41 :raises: :py:class:`.errors.InvalidRAMLError` when RAML file is invalid 42 """ 43 44 validate = str(_get(config, "validate")).lower() == 'true' 45 46 # Postpone validating the root node until the end; otherwise, 47 # we end up with duplicate validation exceptions. 48 attr.set_run_validators(False) 49 root = create_root(loaded_raml, config) 50 attr.set_run_validators(validate) 51 52 root.security_schemes = create_sec_schemes(root.raml_obj, root) 53 root.traits = create_traits(root.raml_obj, root) 54 root.resource_types = create_resource_types(root.raml_obj, root) 55 root.resources = create_resources(root.raml_obj, [], root, 56 parent=None) 57 58 if validate: 59 attr.validate(root) # need to validate again for root node 60 61 if root.errors: 62 raise InvalidRAMLError(root.errors) 63 64 return root 65 66 67def create_root(raml, config): 68 """ 69 Creates a Root Node based off of the RAML's root section. 70 71 :param RAMLDict raml: loaded RAML file 72 :returns: :py:class:`.raml.RootNode` object with API root attributes set 73 """ 74 75 errors = [] 76 77 def protocols(): 78 explicit_protos = _get(raml, "protocols") 79 implicit_protos = re.findall(r"(https|http)", base_uri()) 80 implicit_protos = [p.upper() for p in implicit_protos] 81 82 return explicit_protos or implicit_protos or None 83 84 def base_uri(): 85 base_uri = _get(raml, "baseUri", "") 86 if "{version}" in base_uri: 87 base_uri = base_uri.replace("{version}", 88 str(_get(raml, "version"))) 89 return base_uri 90 91 def base_uri_params(): 92 data = _get(raml, "baseUriParameters", {}) 93 params = _create_base_param_obj(data, URIParameter, config, errors) 94 uri = _get(raml, "baseUri", "") 95 declared = _get(raml, "uriParameters", {}) 96 declared = list(iterkeys(declared)) 97 return _preserve_uri_order(uri, params, config, errors, declared) 98 99 def uri_params(): 100 data = _get(raml, "uriParameters", {}) 101 params = _create_base_param_obj(data, URIParameter, config, errors) 102 uri = _get(raml, "baseUri", "") 103 104 declared = [] 105 base = base_uri_params() 106 if base: 107 declared = [p.name for p in base] 108 return _preserve_uri_order(uri, params, config, errors, declared) 109 110 def docs(): 111 d = _get(raml, "documentation", []) 112 assert isinstance(d, list), "Error parsing documentation" 113 docs = [Documentation(_get(i, "title"), _get(i, "content")) for i in d] 114 return docs or None 115 116 def schemas(): 117 _schemas = _get(raml, "schemas") 118 if not _schemas: 119 return None 120 schemas = [] 121 for schema in _schemas: 122 value = load_schema(list(itervalues(schema))[0]) 123 schemas.append({list(iterkeys(schema))[0]: value}) 124 return schemas or None 125 126 return RootNode( 127 raml_obj=raml, 128 raw=raml, 129 title=_get(raml, "title"), 130 version=_get(raml, "version"), 131 protocols=protocols(), 132 base_uri=base_uri(), 133 base_uri_params=base_uri_params(), 134 uri_params=uri_params(), 135 media_type=_get(raml, "mediaType"), 136 documentation=docs(), 137 schemas=schemas(), 138 config=config, 139 secured_by=_get(raml, "securedBy"), 140 errors=errors 141 ) 142 143 144def create_sec_schemes(raml_data, root): 145 """ 146 Parse security schemes into ``SecurityScheme`` objects 147 148 :param dict raml_data: Raw RAML data 149 :param RootNode root: Root Node 150 :returns: list of :py:class:`.parameters.SecurityScheme` objects 151 """ 152 def _map_object_types(item): 153 return { 154 "headers": headers, 155 "body": body, 156 "responses": responses, 157 "queryParameters": query_params, 158 "uriParameters": uri_params, 159 "formParameters": form_params, 160 "usage": usage, 161 "mediaType": media_type, 162 "protocols": protocols, 163 "documentation": documentation, 164 }[item] 165 166 def headers(header_data): 167 _headers = [] 168 header_data = _get(header_data, "headers", {}) 169 for k, v in list(iteritems(header_data)): 170 h = _create_base_param_obj({k: v}, 171 Header, 172 root.config, 173 root.errors) 174 _headers.extend(h) 175 return _headers 176 177 def body(body_data): 178 body_data = _get(body_data, "body", {}) 179 _body = [] 180 for k, v in list(iteritems(body_data)): 181 body = Body( 182 mime_type=k, 183 raw=v, 184 schema=load_schema(_get(v, "schema")), 185 example=load_schema(_get(v, "example")), 186 form_params=_get(v, "formParameters"), 187 config=root.config, 188 errors=root.errors 189 ) 190 _body.append(body) 191 return _body 192 193 def responses(resp_data): 194 _resps = [] 195 resp_data = _get(resp_data, "responses", {}) 196 for k, v in list(iteritems(resp_data)): 197 response = Response( 198 code=k, 199 raw=v, 200 desc=_get(v, "description"), 201 headers=headers(_get(v, "headers", {})), 202 body=body(_get(v, "body", {})), 203 config=root.config, 204 errors=root.errors 205 ) 206 _resps.append(response) 207 return sorted(_resps, key=lambda x: x.code) 208 209 def query_params(param_data): 210 param_data = _get(param_data, "queryParameters", {}) 211 _params = [] 212 for k, v in list(iteritems(param_data)): 213 p = _create_base_param_obj({k: v}, 214 QueryParameter, 215 root.config, 216 root.errors) 217 _params.extend(p) 218 return _params 219 220 def uri_params(param_data): 221 param_data = _get(param_data, "uriParameters") 222 _params = [] 223 for k, v in list(iteritems(param_data)): 224 p = _create_base_param_obj({k: v}, 225 URIParameter, 226 root.config, 227 root.errors) 228 _params.extend(p) 229 return _params 230 231 def form_params(param_data): 232 param_data = _get(param_data, "formParameters", {}) 233 _params = [] 234 for k, v in list(iteritems(param_data)): 235 p = _create_base_param_obj({k: v}, 236 FormParameter, 237 root.config, 238 root.errors) 239 _params.extend(p) 240 return _params 241 242 def usage(desc_by_data): 243 return _get(desc_by_data, "usage") 244 245 def media_type(desc_by_data): 246 return _get(desc_by_data, "mediaType") 247 248 def protocols(desc_by_data): 249 return _get(desc_by_data, "protocols") 250 251 def documentation(desc_by_data): 252 d = _get(desc_by_data, "documentation", []) 253 assert isinstance(d, list), "Error parsing documentation" 254 docs = [Documentation(_get(i, "title"), _get(i, "content")) for i in d] 255 return docs or None 256 257 def set_property(node, obj, node_data): 258 func = _map_object_types(obj) 259 item_objs = func({obj: node_data}) 260 setattr(node, func.__name__, item_objs) 261 262 def initial_wrap(key, data): 263 return SecurityScheme( 264 name=key, 265 raw=data, 266 type=_get(data, "type"), 267 described_by=_get(data, "describedBy", {}), 268 desc=_get(data, "description"), 269 settings=_get(data, "settings"), 270 config=root.config, 271 errors=root.errors 272 ) 273 274 def final_wrap(node): 275 for obj, node_data in list(iteritems(node.described_by)): 276 set_property(node, obj, node_data) 277 return node 278 279 schemes = _get(raml_data, "securitySchemes", []) 280 scheme_objs = [] 281 for s in schemes: 282 name = list(iterkeys(s))[0] 283 data = list(itervalues(s))[0] 284 node = initial_wrap(name, data) 285 node = final_wrap(node) 286 scheme_objs.append(node) 287 return scheme_objs or None 288 289 290def create_traits(raml_data, root): 291 """ 292 Parse traits into ``Trait`` objects. 293 294 :param dict raml_data: Raw RAML data 295 :param RootNode root: Root Node 296 :returns: list of :py:class:`.raml.TraitNode` objects 297 """ 298 def description(): 299 return _get(data, "description") 300 301 def protocols(): 302 return _get(data, "protocols") 303 304 def query_params(): 305 return set_param_object(data, "queryParameters", root) 306 307 def uri_params(): 308 return set_param_object(data, "uriParameters", root) 309 310 def form_params(): 311 return set_param_object(data, "formParameters", root) 312 313 def base_uri_params(): 314 return set_param_object(data, "baseUriParameters", root) 315 316 def headers(data): 317 return set_param_object(data, "headers", root) 318 319 def body(data): 320 body = _get(data, "body", {}) 321 body_objects = [] 322 for key, value in list(iteritems(body)): 323 body = Body( 324 mime_type=key, 325 raw=value, 326 schema=load_schema(_get(value, "schema")), 327 example=load_schema(_get(value, "example")), 328 form_params=_get(value, "formParameters"), 329 config=root.config, 330 errors=root.errors 331 ) 332 body_objects.append(body) 333 return body_objects or None 334 335 def responses(): 336 response_objects = [] 337 for key, value in list(iteritems(_get(data, "responses", {}))): 338 response = Response( 339 code=key, 340 raw=value, 341 desc=_get(value, "description"), 342 headers=headers(value), 343 body=body(value), 344 config=root.config, 345 errors=root.errors 346 ) 347 response_objects.append(response) 348 return sorted(response_objects, key=lambda x: x.code) or None 349 350 def wrap(key, data): 351 return TraitNode( 352 name=key, 353 raw=data, 354 root=root, 355 query_params=query_params(), 356 uri_params=uri_params(), 357 form_params=form_params(), 358 base_uri_params=base_uri_params(), 359 headers=headers(data), 360 body=body(data), 361 responses=responses(), 362 desc=description(), 363 media_type=_get(data, "mediaType"), 364 usage=_get(data, "usage"), 365 protocols=protocols(), 366 errors=root.errors 367 ) 368 369 traits = _get(raml_data, "traits", []) 370 trait_objects = [] 371 for trait in traits: 372 name = list(iterkeys(trait))[0] 373 data = list(itervalues(trait))[0] 374 trait_objects.append(wrap(name, data)) 375 return trait_objects or None 376 377 378def create_resource_types(raml_data, root): 379 """ 380 Parse resourceTypes into ``ResourceTypeNode`` objects. 381 382 :param dict raml_data: Raw RAML data 383 :param RootNode root: Root Node 384 :returns: list of :py:class:`.raml.ResourceTypeNode` objects 385 """ 386 # TODO: move this outside somewhere - config? 387 accepted_methods = _get(root.config, "http_optional") 388 389 ##### 390 # Set ResourceTypeNode attributes 391 ##### 392 393 def headers(data): 394 _headers = _get(data, "headers", {}) 395 if _get(v, "type"): 396 _headers = _get_inherited_item(_headers, "headers", 397 resource_types, 398 meth, v) 399 400 header_objs = _create_base_param_obj(_headers, 401 Header, 402 root.config, 403 root.errors) 404 if header_objs: 405 for h in header_objs: 406 h.method = method(meth) 407 408 return header_objs 409 410 def body(data): 411 _body = _get(data, "body", default={}) 412 if _get(v, "type"): 413 _body = _get_inherited_item(_body, "body", resource_types, 414 meth, v) 415 416 body_objects = [] 417 for key, value in list(iteritems(_body)): 418 body = Body( 419 mime_type=key, 420 raw=value, 421 schema=load_schema(_get(value, "schema")), 422 example=load_schema(_get(value, "example")), 423 form_params=_get(value, "formParameters"), 424 config=root.config, 425 errors=root.errors 426 ) 427 body_objects.append(body) 428 return body_objects or None 429 430 def responses(data): 431 response_objects = [] 432 _responses = _get(data, "responses", {}) 433 if _get(v, "type"): 434 _responses = _get_inherited_item(_responses, "responses", 435 resource_types, meth, v) 436 437 for key, value in list(iteritems(_responses)): 438 _headers = _get(_get(data, "responses", {}), key, {}) 439 _headers = _get(_headers, "headers", {}) 440 header_objs = _create_base_param_obj(_headers, Header, 441 root.config, root.errors) 442 if header_objs: 443 for h in header_objs: 444 h.method = method(meth) 445 response = Response( 446 code=key, 447 raw={key: value}, 448 desc=_get(value, "description"), 449 headers=header_objs, 450 body=body(value), 451 config=root.config, 452 method=method(meth), 453 errors=root.errors 454 ) 455 response_objects.append(response) 456 if response_objects: 457 return sorted(response_objects, key=lambda x: x.code) 458 return None 459 460 def uri_params(data): 461 uri_params = _get_attribute_dict(data, "uriParameters", v) 462 463 if _get(v, "type"): 464 uri_params = _get_inherited_type_params(v, "uriParameters", 465 uri_params, resource_types) 466 return _create_base_param_obj(uri_params, 467 URIParameter, 468 root.config, 469 root.errors) 470 471 def base_uri_params(data): 472 uri_params = _get_attribute_dict(data, "baseUriParameters", v) 473 474 return _create_base_param_obj(uri_params, 475 URIParameter, 476 root.config, 477 root.errors) 478 479 def query_params(data): 480 query_params = _get_attribute_dict(data, "queryParameters", v) 481 482 if _get(v, "type"): 483 query_params = _get_inherited_type_params(v, "queryParameters", 484 query_params, 485 resource_types) 486 487 return _create_base_param_obj(query_params, 488 QueryParameter, 489 root.config, 490 root.errors) 491 492 def form_params(data): 493 form_params = _get_attribute_dict(data, "formParameters", v) 494 495 if _get(v, "type"): 496 form_params = _get_inherited_type_params(v, "formParameters", 497 form_params, 498 resource_types) 499 500 return _create_base_param_obj(form_params, 501 FormParameter, 502 root.config, 503 root.errors) 504 505 def description(): 506 # prefer the resourceType method description 507 if meth: 508 method_attr = _get(v, meth) 509 desc = _get(method_attr, "description") 510 return desc or _get(v, "description") 511 return _get(v, "description") 512 513 def type_(): 514 return _get(v, "type") 515 516 def method(meth): 517 if not meth: 518 return None 519 if "?" in meth: 520 return meth[:-1] 521 return meth 522 523 def optional(): 524 if meth: 525 return "?" in meth 526 527 def protocols(data): 528 m, r = _get_res_type_attribute(v, data, "protocols", None) 529 return m or r or root.protocols 530 531 def is_(data): 532 m, r = _get_res_type_attribute(v, data, "is", default=[]) 533 return m + r or None 534 535 def traits(data): 536 assigned = is_(data) 537 if assigned: 538 if root.traits: 539 trait_objs = [] 540 for trait in assigned: 541 obj = [t for t in root.traits if t.name == trait] 542 if obj: 543 trait_objs.append(obj[0]) 544 return trait_objs or None 545 546 def secured_by(data): 547 m, r = _get_res_type_attribute(v, data, "securedBy", []) 548 return m + r or None 549 550 def security_schemes_(data): 551 secured = secured_by(data) 552 return security_schemes(secured, root) 553 554 def wrap(key, data, meth, _v): 555 return ResourceTypeNode( 556 name=key, 557 raw=data, 558 root=root, 559 headers=headers(data), 560 body=body(data), 561 responses=responses(data), 562 uri_params=uri_params(data), 563 base_uri_params=base_uri_params(data), 564 query_params=query_params(data), 565 form_params=form_params(data), 566 media_type=_get(v, "mediaType"), 567 desc=description(), 568 type=type_(), 569 method=method(meth), 570 usage=_get(v, "usage"), 571 optional=optional(), 572 is_=is_(data), 573 traits=traits(data), 574 secured_by=secured_by(data), 575 security_schemes=security_schemes_(data), 576 display_name=_get(data, "displayName", key), 577 protocols=protocols(data), 578 errors=root.errors 579 ) 580 581 resource_types = _get(raml_data, "resourceTypes", []) 582 resource_type_objects = [] 583 child_res_type_objects = [] 584 child_res_type_names = [] 585 586 for res in resource_types: 587 for k, v in list(iteritems(res)): 588 if isinstance(v, dict): 589 if "type" in list(iterkeys(v)): 590 child_res_type_objects.append({k: v}) 591 child_res_type_names.append(k) 592 593 else: 594 for meth in list(iterkeys(v)): 595 if meth in accepted_methods: 596 method_data = _get(v, meth, {}) 597 resource = wrap(k, method_data, meth, v) 598 resource_type_objects.append(resource) 599 else: 600 meth = None 601 resource = wrap(k, {}, meth, v) 602 resource_type_objects.append(resource) 603 604 while child_res_type_objects: 605 child = child_res_type_objects.pop() 606 name = list(iterkeys(child))[0] 607 data = list(itervalues(child))[0] 608 parent = data.get("type") 609 if parent in child_res_type_names: 610 continue 611 p_data = [r for r in resource_types if list(iterkeys(r))[0] == parent] 612 p_data = p_data[0].get(parent) 613 res_data = _get_data_union(data, p_data) 614 615 for meth in list(iterkeys(res_data)): 616 if meth in accepted_methods: 617 method_data = _get(res_data, meth, {}) 618 comb_data = dict(list(iteritems(method_data)) + 619 list(iteritems(res_data))) 620 resource = ResourceTypeNode( 621 name=name, 622 raw=res_data, 623 root=root, 624 headers=headers(method_data), 625 body=body(method_data), 626 responses=responses(method_data), 627 uri_params=uri_params(comb_data), 628 base_uri_params=base_uri_params(comb_data), 629 query_params=query_params(method_data), 630 form_params=form_params(method_data), 631 media_type=_get(v, "mediaType"), 632 desc=description(), 633 type=_get(res_data, "type"), 634 method=method(meth), 635 usage=_get(res_data, "usage"), 636 optional=optional(), 637 is_=is_(res_data), 638 traits=traits(res_data), 639 secured_by=secured_by(res_data), 640 security_schemes=security_schemes_(res_data), 641 display_name=_get(method_data, "displayName", name), 642 protocols=protocols(res_data), 643 errors=root.errors 644 ) 645 resource_type_objects.append(resource) 646 647 return resource_type_objects or None 648 649 650def create_resources(node, resources, root, parent): 651 """ 652 Recursively traverses the RAML file via DFS to find each resource 653 endpoint. 654 655 :param dict node: Dictionary of node to traverse 656 :param list resources: List of collected ``ResourceNode`` s 657 :param RootNode root: The ``RootNode`` of the API 658 :param ResourceNode parent: Parent ``ResourceNode`` of current ``node`` 659 :returns: List of :py:class:`.raml.ResourceNode` objects. 660 """ 661 for k, v in list(iteritems(node)): 662 if k.startswith("/"): 663 avail = _get(root.config, "http_optional") 664 methods = [m for m in avail if m in list(iterkeys(v))] 665 if "type" in list(iterkeys(v)): 666 assigned = _resource_type_lookup(_get(v, "type"), root) 667 if hasattr(assigned, "method"): 668 if not assigned.optional: 669 methods.append(assigned.method) 670 methods = list(set(methods)) 671 if methods: 672 for m in methods: 673 child = create_node(name=k, 674 raw_data=v, 675 method=m, 676 parent=parent, 677 root=root) 678 resources.append(child) 679 # inherit resource type methods 680 elif "type" in list(iterkeys(v)): 681 if hasattr(assigned, "method"): 682 method = assigned.method 683 else: 684 method = None 685 child = create_node(name=k, 686 raw_data=v, 687 method=method, 688 parent=parent, 689 root=root) 690 resources.append(child) 691 else: 692 child = create_node(name=k, 693 raw_data=v, 694 method=None, 695 parent=parent, 696 root=root) 697 resources.append(child) 698 resources = create_resources(child.raw, resources, root, child) 699 return resources 700 701 702def create_node(name, raw_data, method, parent, root): 703 """ 704 Create a Resource Node object. 705 706 :param str name: Name of resource node 707 :param dict raw_data: Raw RAML data associated with resource node 708 :param str method: HTTP method associated with resource node 709 :param ResourceNode parent: Parent node object of resource node, if any 710 :param RootNode api: API ``RootNode`` that the resource node is attached to 711 :returns: :py:class:`.raml.ResourceNode` object 712 """ 713 ##### 714 # Node attribute functions 715 ##### 716 def path(): 717 """Set resource's relative URI path.""" 718 parent_path = "" 719 if parent: 720 parent_path = parent.path 721 return parent_path + name 722 723 def absolute_uri(): 724 """Set resource's absolute URI path.""" 725 uri = root.base_uri + path() 726 proto = protocols() 727 if proto: 728 uri = uri.split("://") 729 if len(uri) == 2: 730 uri = uri[1] 731 if root.protocols: 732 _proto = list(set(root.protocols) & set(proto)) 733 # if resource protocols and root protocols share a protocol 734 # then use that one 735 if _proto: 736 uri = _proto[0].lower() + "://" + uri 737 # if no shared protocols, use the first of the resource 738 # protocols 739 else: 740 uri = proto[0].lower() + "://" + uri 741 return uri 742 743 def protocols(): 744 """Set resource's supported protocols.""" 745 # trait = _get_trait("protocols", root, is_()) 746 kwargs = dict(root=root, 747 is_=is_(), 748 type_=type_(), 749 method=method, 750 data=raw_data, 751 parent=parent) 752 objects_to_inherit = [ 753 "traits", "types", "method", "resource", "parent" 754 ] 755 inherited = get_inherited("protocols", objects_to_inherit, **kwargs) 756 trait = inherited["traits"] 757 r_type = inherited["types"] 758 meth = inherited["method"] 759 res = inherited["resource"] 760 parent_ = inherited["parent"] 761 default = [root.base_uri.split("://")[0].upper()] 762 763 return meth or r_type or trait or res or parent_ or default 764 765 def headers(): 766 """Set resource's supported headers.""" 767 headers = _get_attribute("headers", method, raw_data) 768 header_objs = _get_inherited_attribute("headers", root, type_(), 769 method, is_()) 770 771 _headers = _create_base_param_obj(headers, 772 Header, 773 root.config, 774 root.errors, 775 method=method) 776 if _headers is None: 777 return header_objs or None 778 return _remove_duplicates(header_objs, _headers) 779 780 def body(): 781 """Set resource's supported request/response body.""" 782 bodies = _get_attribute("body", method, raw_data) 783 body_objects = _get_inherited_attribute("body", root, type_(), 784 method, is_()) 785 786 _body_objs = [] 787 for k, v in list(iteritems(bodies)): 788 if v is None: 789 continue 790 body = Body( 791 mime_type=k, 792 raw={k: v}, 793 schema=load_schema(_get(v, "schema")), 794 example=load_schema(_get(v, "example")), 795 form_params=_get(v, "formParameters"), 796 config=root.config, 797 errors=root.errors 798 ) 799 _body_objs.append(body) 800 if _body_objs == []: 801 return body_objects or None 802 return _remove_duplicates(body_objects, _body_objs) 803 804 def responses(): 805 """Set resource's expected responses.""" 806 def resp_headers(headers): 807 """Set response headers.""" 808 header_objs = [] 809 for k, v in list(iteritems(headers)): 810 header = Header( 811 name=k, 812 display_name=_get(v, "displayName", default=k), 813 method=method, 814 raw=headers, 815 type=_get(v, "type", default="string"), 816 desc=_get(v, "description"), 817 example=_get(v, "example"), 818 default=_get(v, "default"), 819 minimum=_get(v, "minimum"), 820 maximum=_get(v, "maximum"), 821 min_length=_get(v, "minLength"), 822 max_length=_get(v, "maxLength"), 823 enum=_get(v, "enum"), 824 repeat=_get(v, "repeat", default=False), 825 pattern=_get(v, "pattern"), 826 config=root.config, 827 errors=root.errors 828 ) 829 header_objs.append(header) 830 return header_objs or None 831 832 def resp_body(body): 833 """Set response body.""" 834 body_list = [] 835 default_body = {} 836 for (key, spec) in body.items(): 837 if key not in MEDIA_TYPES: 838 # if a root mediaType was defined, the response body 839 # may omit the mime_type definition 840 if key in ('schema', 'example'): 841 default_body[key] = load_schema(spec) if spec else {} 842 else: 843 mime_type = key 844 # spec might be '!!null' 845 raw = spec or body 846 _schema = {} 847 _example = {} 848 if spec: 849 _schema_spec = _get(spec, 'schema', '') 850 _example_spec = _get(spec, 'example', '') 851 if _schema_spec: 852 _schema = load_schema(_schema_spec) 853 if _example_spec: 854 _example = load_schema(_example_spec) 855 body_list.append(Body( 856 mime_type=mime_type, 857 raw=raw, 858 schema=_schema, 859 example=_example, 860 form_params=None, 861 config=root.config, 862 errors=root.errors 863 )) 864 if default_body: 865 body_list.append(Body( 866 mime_type=root.media_type, 867 raw=body, 868 schema=_get(default_body, 'schema'), 869 example=_get(default_body, 'example'), 870 form_params=None, 871 config=root.config, 872 errors=root.errors 873 )) 874 875 return body_list or None 876 877 resps = _get_attribute("responses", method, raw_data) 878 type_resp = _get_resource_type("responses", root, type_(), method) 879 trait_resp = _get_trait("responses", root, is_()) 880 resp_objs = type_resp + trait_resp 881 resp_codes = [r.code for r in resp_objs] 882 for k, v in list(iteritems(resps)): 883 if k in resp_codes: 884 resp = [r for r in resp_objs if r.code == k][0] 885 index = resp_objs.index(resp) 886 inherit_resp = resp_objs.pop(index) 887 headers = resp_headers(_get(v, "headers", default={})) 888 if inherit_resp.headers: 889 headers = _remove_duplicates(inherit_resp.headers, headers) 890 # if headers: 891 # headers.extend(inherit_resp.headers) 892 # else: 893 # headers = inherit_resp.headers 894 body = resp_body(_get(v, "body", {})) 895 if inherit_resp.body: 896 body = _remove_duplicates(inherit_resp.body, body) 897 # if body: 898 # body.extend(inherit_resp.body) 899 # else: 900 # body = inherit_resp.body 901 resp = Response( 902 code=k, 903 raw={k: v}, # should prob get data union 904 method=method, 905 desc=_get(v, "description") or inherit_resp.desc, 906 headers=headers, 907 body=body, 908 config=root.config, 909 errors=root.errors 910 ) 911 resp_objs.insert(index, resp) # preserve order 912 else: 913 _headers = _get(v, "headers", default={}) 914 _body = _get(v, "body", default={}) 915 resp = Response( 916 code=k, 917 raw={k: v}, 918 method=method, 919 desc=_get(v, "description"), 920 headers=resp_headers(_headers), 921 body=resp_body(_body), 922 config=root.config, 923 errors=root.errors 924 ) 925 resp_objs.append(resp) 926 927 return resp_objs or None 928 929 def uri_params(): 930 """Set resource's URI parameters.""" 931 unparsed_attr = "uriParameters" 932 parsed_attr = "uri_params" 933 root_params = root.uri_params 934 params = _create_uri_params(unparsed_attr, parsed_attr, root_params, 935 root, type_(), is_(), method, raw_data, 936 parent) 937 declared = [] 938 base = base_uri_params() 939 if base: 940 declared = [p.name for p in base] 941 942 return _preserve_uri_order(absolute_uri(), params, root.config, 943 root.errors, declared) 944 945 def base_uri_params(): 946 """Set resource's base URI parameters.""" 947 root_params = root.base_uri_params 948 kw = dict(type=type_(), is_=is_(), root_params=root_params) 949 params = set_params(raw_data, "base_uri_params", root, method, 950 inherit=True, **kw) 951 declared = [] 952 uri = root.uri_params 953 base = root.base_uri_params 954 if uri: 955 declared = [p.name for p in uri] 956 if base: 957 declared.extend([p.name for p in base]) 958 return _preserve_uri_order(root.base_uri, params, root.config, 959 root.errors, declared) 960 961 def query_params(): 962 kw = dict(type_=type_(), is_=is_()) 963 return set_params(raw_data, "query_params", root, method, 964 inherit=True, **kw) 965 966 def form_params(): 967 """Set resource's form parameters.""" 968 kw = dict(type_=type_(), is_=is_()) 969 return set_params(raw_data, "form_params", root, method, 970 inherit=True, **kw) 971 972 def media_type_(): 973 """Set resource's supported media types.""" 974 if method is None: 975 return None 976 kwargs = dict(root=root, 977 is_=is_(), 978 type_=type_(), 979 method=method, 980 data=raw_data) 981 objects_to_inherit = [ 982 "method", "traits", "types", "resource", "root" 983 ] 984 inherited = get_inherited("mediaType", objects_to_inherit, **kwargs) 985 meth = inherited.get("method") 986 trait = inherited.get("trait") 987 r_type = inherited.get("types") 988 res = inherited.get("resource") 989 root_ = inherited.get("root") 990 return meth or trait or r_type or res or root_ 991 992 def description(): 993 """Set resource's description.""" 994 desc = _get(raw_data, "description") 995 try: 996 desc = _get(_get(raw_data, method), "description") 997 if desc is None: 998 raise AttributeError 999 except AttributeError: 1000 if type_(): 1001 assigned = _resource_type_lookup(type_(), root) 1002 try: 1003 if assigned.method == method: 1004 desc = assigned.description.raw 1005 except AttributeError: 1006 pass 1007 else: 1008 desc = _get(raw_data, "description") 1009 return desc 1010 1011 def is_(): 1012 """Set resource's assigned trait names.""" 1013 is_list = [] 1014 res_level = _get(raw_data, "is") 1015 if res_level: 1016 assert isinstance(res_level, list), "Error parsing trait" 1017 is_list.extend(res_level) 1018 method_level = _get(raw_data, method, {}) 1019 if method_level: 1020 method_level = _get(method_level, "is") 1021 if method_level: 1022 assert isinstance(method_level, list), "Error parsing trait" 1023 is_list.extend(method_level) 1024 return is_list or None 1025 1026 def traits(): 1027 """Set resource's assigned trait objects.""" 1028 assigned = is_() 1029 if assigned: 1030 if root.traits: 1031 trait_objs = [] 1032 for trait in assigned: 1033 obj = [t for t in root.traits if t.name == trait] 1034 if obj: 1035 trait_objs.append(obj[0]) 1036 return trait_objs or None 1037 1038 # TODO: wow this function sucks. 1039 def type_(): 1040 """Set resource's assigned resource type name.""" 1041 __get_method = _get(raw_data, method, {}) 1042 assigned_type = _get(__get_method, "type") 1043 if assigned_type: 1044 if not isinstance(assigned_type, dict): 1045 return assigned_type 1046 return list(iterkeys(assigned_type))[0] # NOCOV 1047 1048 assigned_type = _get(raw_data, "type") 1049 if isinstance(assigned_type, dict): 1050 return list(iterkeys(assigned_type))[0] # NOCOV 1051 return assigned_type 1052 1053 def resource_type(): 1054 """Set resource's assigned resource type objects.""" 1055 if type_() and root.resource_types: 1056 assigned_name = type_() 1057 res_types = root.resource_types 1058 type_obj = [r for r in res_types if r.name == assigned_name] 1059 if type_obj: 1060 return type_obj[0] 1061 1062 def secured_by(): 1063 """ 1064 Set resource's assigned security scheme names and related paramters. 1065 """ 1066 if method is not None: 1067 method_level = _get(raw_data, method, {}) 1068 if method_level: 1069 secured_by = _get(method_level, "securedBy") 1070 if secured_by: 1071 return secured_by 1072 resource_level = _get(raw_data, "securedBy") 1073 if resource_level: 1074 return resource_level 1075 root_level = root.secured_by 1076 if root_level: 1077 return root_level 1078 1079 def security_schemes_(): 1080 """Set resource's assigned security scheme objects.""" 1081 secured = secured_by() 1082 return security_schemes(secured, root) 1083 1084 node = ResourceNode( 1085 name=name, 1086 raw=raw_data, 1087 method=method, 1088 parent=parent, 1089 root=root, 1090 display_name=_get(raw_data, "displayName", name), 1091 path=path(), 1092 absolute_uri=absolute_uri(), 1093 protocols=protocols(), 1094 headers=headers(), 1095 body=body(), 1096 responses=responses(), 1097 uri_params=uri_params(), 1098 base_uri_params=base_uri_params(), 1099 query_params=query_params(), 1100 form_params=form_params(), 1101 media_type=media_type_(), 1102 desc=description(), 1103 is_=is_(), 1104 traits=traits(), 1105 type=type_(), 1106 resource_type=resource_type(), 1107 secured_by=secured_by(), 1108 security_schemes=security_schemes_(), 1109 errors=root.errors 1110 ) 1111 if resource_type(): 1112 # correct inheritance (issue #23) 1113 node._inherit_type() 1114 return node 1115