1# Copyright 2018 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""This module contains the "roll-up" class, :class:`~.API`. 16Everything else in the :mod:`~.schema` module is usually accessed 17through an :class:`~.API` object. 18""" 19 20import collections 21import dataclasses 22import itertools 23import keyword 24import os 25import sys 26from typing import Callable, Container, Dict, FrozenSet, Mapping, Optional, Sequence, Set, Tuple 27from types import MappingProxyType 28 29from google.api_core import exceptions # type: ignore 30from google.api import resource_pb2 # type: ignore 31from google.longrunning import operations_pb2 # type: ignore 32from google.protobuf import descriptor_pb2 33 34import grpc # type: ignore 35 36from gapic.schema import metadata 37from gapic.schema import wrappers 38from gapic.schema import naming as api_naming 39from gapic.utils import cached_property 40from gapic.utils import nth 41from gapic.utils import Options 42from gapic.utils import to_snake_case 43from gapic.utils import RESERVED_NAMES 44 45 46@dataclasses.dataclass(frozen=True) 47class Proto: 48 """A representation of a particular proto file within an API.""" 49 50 file_pb2: descriptor_pb2.FileDescriptorProto 51 services: Mapping[str, wrappers.Service] 52 all_messages: Mapping[str, wrappers.MessageType] 53 all_enums: Mapping[str, wrappers.EnumType] 54 file_to_generate: bool 55 meta: metadata.Metadata = dataclasses.field( 56 default_factory=metadata.Metadata, 57 ) 58 59 def __getattr__(self, name: str): 60 return getattr(self.file_pb2, name) 61 62 @classmethod 63 def build( 64 cls, 65 file_descriptor: descriptor_pb2.FileDescriptorProto, 66 file_to_generate: bool, 67 naming: api_naming.Naming, 68 opts: Options = Options(), 69 prior_protos: Mapping[str, 'Proto'] = None, 70 load_services: bool = True, 71 all_resources: Optional[Mapping[str, wrappers.MessageType]] = None, 72 ) -> 'Proto': 73 """Build and return a Proto instance. 74 75 Args: 76 file_descriptor (~.FileDescriptorProto): The protocol buffer 77 object describing the proto file. 78 file_to_generate (bool): Whether this is a file which is 79 to be directly generated, or a dependency. 80 naming (~.Naming): The :class:`~.Naming` instance associated 81 with the API. 82 prior_protos (~.Proto): Previous, already processed protos. 83 These are needed to look up messages in imported protos. 84 load_services (bool): Toggle whether the proto file should 85 load its services. Not doing so enables a two-pass fix for 86 LRO response and metadata types in certain situations. 87 """ 88 return _ProtoBuilder( 89 file_descriptor, 90 file_to_generate=file_to_generate, 91 naming=naming, 92 opts=opts, 93 prior_protos=prior_protos or {}, 94 load_services=load_services, 95 all_resources=all_resources or {}, 96 ).proto 97 98 @cached_property 99 def enums(self) -> Mapping[str, wrappers.EnumType]: 100 """Return top-level enums on the proto.""" 101 return collections.OrderedDict([ 102 (k, v) for k, v in self.all_enums.items() 103 if not v.meta.address.parent 104 ]) 105 106 @cached_property 107 def messages(self) -> Mapping[str, wrappers.MessageType]: 108 """Return top-level messages on the proto.""" 109 return collections.OrderedDict( 110 (k, v) for k, v in self.all_messages.items() 111 if not v.meta.address.parent 112 ) 113 114 @cached_property 115 def resource_messages(self) -> Mapping[str, wrappers.MessageType]: 116 """Return the file level resources of the proto.""" 117 file_resource_messages = ( 118 (res.type, wrappers.CommonResource.build(res).message_type) 119 for res in self.file_pb2.options.Extensions[resource_pb2.resource_definition] 120 ) 121 resource_messages = ( 122 (msg.options.Extensions[resource_pb2.resource].type, msg) 123 for msg in self.messages.values() 124 if msg.options.Extensions[resource_pb2.resource].type 125 ) 126 return collections.OrderedDict( 127 itertools.chain( 128 file_resource_messages, resource_messages, 129 ) 130 ) 131 132 @property 133 def module_name(self) -> str: 134 """Return the appropriate module name for this service. 135 136 Returns: 137 str: The module name for this service (which is the service 138 name in snake case). 139 """ 140 return to_snake_case(self.name.split('/')[-1][:-len('.proto')]) 141 142 @cached_property 143 def names(self) -> FrozenSet[str]: 144 """Return a set of names used by this proto. 145 146 This is used for detecting naming collisions in the module names 147 used for imports. 148 """ 149 # Add names of all enums, messages, and fields. 150 answer: Set[str] = {e.name for e in self.all_enums.values()} 151 for message in self.all_messages.values(): 152 answer.update(f.name for f in message.fields.values()) 153 answer.add(message.name) 154 155 # Identify any import module names where the same module name is used 156 # from distinct packages. 157 modules: Dict[str, Set[str]] = collections.defaultdict(set) 158 for m in self.all_messages.values(): 159 for t in m.recursive_field_types: 160 modules[t.ident.module].add(t.ident.package) 161 162 answer.update( 163 module_name 164 for module_name, packages in modules.items() 165 if len(packages) > 1 or module_name in RESERVED_NAMES 166 ) 167 168 # Return the set of collision names. 169 return frozenset(answer) 170 171 @cached_property 172 def python_modules(self) -> Sequence[Tuple[str, str]]: 173 """Return a sequence of Python modules, for import. 174 175 The results of this method are in alphabetical order (by package, 176 then module), and do not contain duplicates. 177 178 Returns: 179 Sequence[Tuple[str, str]]: The package and module pair, intended 180 for use in a ``from package import module`` type 181 of statement. 182 """ 183 self_reference = self.meta.address.python_import 184 185 answer = { 186 t.ident.python_import 187 for m in self.all_messages.values() 188 # Sanity check: We do make sure that we are not trying to have 189 # a module import itself. 190 for t in m.field_types if t.ident.python_import != self_reference 191 } 192 193 # Done; return the sorted sequence. 194 return tuple(sorted(answer)) 195 196 def disambiguate(self, string: str) -> str: 197 """Return a disambiguated string for the context of this proto. 198 199 This is used for avoiding naming collisions. Generally, this method 200 returns the same string, but it returns a modified version if 201 it will cause a naming collision with messages or fields in this proto. 202 """ 203 if string in self.names: 204 return self.disambiguate(f'_{string}') 205 return string 206 207 208@dataclasses.dataclass(frozen=True) 209class API: 210 """A representation of a full API. 211 212 This represents a top-down view of a complete API, as loaded from a 213 set of protocol buffer files. Once the descriptors are loaded 214 (see :meth:`load`), this object contains every message, method, service, 215 and everything else needed to write a client library. 216 217 An instance of this object is made available to every template 218 (as ``api``). 219 """ 220 naming: api_naming.Naming 221 all_protos: Mapping[str, Proto] 222 subpackage_view: Tuple[str, ...] = dataclasses.field(default_factory=tuple) 223 224 @classmethod 225 def build( 226 cls, 227 file_descriptors: Sequence[descriptor_pb2.FileDescriptorProto], 228 package: str = '', 229 opts: Options = Options(), 230 prior_protos: Mapping[str, 'Proto'] = None, 231 ) -> 'API': 232 """Build the internal API schema based on the request. 233 234 Args: 235 file_descriptors (Sequence[~.FileDescriptorProto]): A list of 236 :class:`~.FileDescriptorProto` objects describing the 237 API. 238 package (str): A protocol buffer package, as a string, for which 239 code should be explicitly generated (including subpackages). 240 Protos with packages outside this list are considered imports 241 rather than explicit targets. 242 opts (~.options.Options): CLI options passed to the generator. 243 prior_protos (~.Proto): Previous, already processed protos. 244 These are needed to look up messages in imported protos. 245 Primarily used for testing. 246 """ 247 # Save information about the overall naming for this API. 248 naming = api_naming.Naming.build(*filter( 249 lambda fd: fd.package.startswith(package), 250 file_descriptors, 251 ), opts=opts) 252 253 def disambiguate_keyword_fname( 254 full_path: str, 255 visited_names: Container[str]) -> str: 256 path, fname = os.path.split(full_path) 257 name, ext = os.path.splitext(fname) 258 if name in keyword.kwlist or full_path in visited_names: 259 name += "_" 260 full_path = os.path.join(path, name + ext) 261 if full_path in visited_names: 262 return disambiguate_keyword_fname(full_path, visited_names) 263 264 return full_path 265 266 # Iterate over each FileDescriptorProto and fill out a Proto 267 # object describing it, and save these to the instance. 268 # 269 # The first pass gathers messages and enums but NOT services or methods. 270 # This is a workaround for a limitation in protobuf annotations for 271 # long running operations: the annotations are strings that reference 272 # message types but do not require a proto import. 273 # This hack attempts to address a common case where API authors, 274 # not wishing to generate an 'unused import' warning, 275 # don't import the proto file defining the real response or metadata 276 # type into the proto file that defines an LRO. 277 # We just load all the APIs types first and then 278 # load the services and methods with the full scope of types. 279 pre_protos: Dict[str, Proto] = dict(prior_protos or {}) 280 for fd in file_descriptors: 281 fd.name = disambiguate_keyword_fname(fd.name, pre_protos) 282 pre_protos[fd.name] = Proto.build( 283 file_descriptor=fd, 284 file_to_generate=fd.package.startswith(package), 285 naming=naming, 286 opts=opts, 287 prior_protos=pre_protos, 288 # Ugly, ugly hack. 289 load_services=False, 290 ) 291 292 # A file descriptor's file-level resources are NOT visible to any importers. 293 # The only way to make referenced resources visible is to aggregate them at 294 # the API level and then pass that around. 295 all_file_resources = collections.ChainMap( 296 *(proto.resource_messages for proto in pre_protos.values()) 297 ) 298 299 # Second pass uses all the messages and enums defined in the entire API. 300 # This allows LRO returning methods to see all the types in the API, 301 # bypassing the above missing import problem. 302 protos: Dict[str, Proto] = { 303 name: Proto.build( 304 file_descriptor=proto.file_pb2, 305 file_to_generate=proto.file_to_generate, 306 naming=naming, 307 opts=opts, 308 prior_protos=pre_protos, 309 all_resources=MappingProxyType(all_file_resources), 310 ) 311 for name, proto in pre_protos.items() 312 } 313 314 # Done; return the API. 315 return cls(naming=naming, all_protos=protos) 316 317 @cached_property 318 def enums(self) -> Mapping[str, wrappers.EnumType]: 319 """Return a map of all enums available in the API.""" 320 return collections.ChainMap({}, 321 *[p.all_enums for p in self.protos.values()], 322 ) 323 324 @cached_property 325 def messages(self) -> Mapping[str, wrappers.MessageType]: 326 """Return a map of all messages available in the API.""" 327 return collections.ChainMap({}, 328 *[p.all_messages for p in self.protos.values()], 329 ) 330 331 @cached_property 332 def top_level_messages(self) -> Mapping[str, wrappers.MessageType]: 333 """Return a map of all messages that are NOT nested.""" 334 return { 335 k: v 336 for p in self.protos.values() 337 for k, v in p.messages.items() 338 } 339 340 @cached_property 341 def top_level_enums(self) -> Mapping[str, wrappers.EnumType]: 342 """Return a map of all messages that are NOT nested.""" 343 return { 344 k: v 345 for p in self.protos.values() 346 for k, v in p.enums.items() 347 } 348 349 @cached_property 350 def protos(self) -> Mapping[str, Proto]: 351 """Return a map of all protos specific to this API. 352 353 This property excludes imported protos that are dependencies 354 of this API but not being directly generated. 355 """ 356 view = self.subpackage_view 357 return collections.OrderedDict([ 358 (k, v) for k, v in self.all_protos.items() 359 if v.file_to_generate and 360 v.meta.address.subpackage[:len(view)] == view 361 ]) 362 363 @cached_property 364 def services(self) -> Mapping[str, wrappers.Service]: 365 """Return a map of all services available in the API.""" 366 return collections.ChainMap({}, 367 *[p.services for p in self.protos.values()], 368 ) 369 370 @cached_property 371 def subpackages(self) -> Mapping[str, 'API']: 372 """Return a map of all subpackages, if any. 373 374 Each value in the mapping is another API object, but the ``protos`` 375 property only shows protos belonging to the subpackage. 376 """ 377 answer: Dict[str, API] = collections.OrderedDict() 378 379 # Get the actual subpackages we have. 380 # 381 # Note that this intentionally only goes one level deep; nested 382 # subpackages can be accessed by requesting subpackages of the 383 # derivative API objects returned here. 384 level = len(self.subpackage_view) 385 for subpkg_name in sorted({p.meta.address.subpackage[0] 386 for p in self.protos.values() 387 if len(p.meta.address.subpackage) > level and 388 p.meta.address.subpackage[:level] == self.subpackage_view}): 389 answer[subpkg_name] = dataclasses.replace(self, 390 subpackage_view=self.subpackage_view + 391 (subpkg_name,), 392 ) 393 return answer 394 395 def requires_package(self, pkg: Tuple[str, ...]) -> bool: 396 return any( 397 message.ident.package == pkg 398 for proto in self.all_protos.values() 399 for message in proto.all_messages.values() 400 ) 401 402 403class _ProtoBuilder: 404 """A "builder class" for Proto objects. 405 406 The sole purpose of this class is to accept the information from the 407 file descriptor and "piece together" the components of the :class:`~.Proto` 408 object in-place. 409 410 This allows the public :class:`~.Proto` object to be frozen, and free 411 of the setup machinations. 412 413 The correct usage of this class is always to create an instance, call 414 the :attr:`proto` property, and then throw the builder away. Additionally, 415 there should be no reason to use this class outside of this module. 416 """ 417 EMPTY = descriptor_pb2.SourceCodeInfo.Location() 418 419 def __init__( 420 self, 421 file_descriptor: descriptor_pb2.FileDescriptorProto, 422 file_to_generate: bool, 423 naming: api_naming.Naming, 424 opts: Options = Options(), 425 prior_protos: Mapping[str, Proto] = None, 426 load_services: bool = True, 427 all_resources: Optional[Mapping[str, wrappers.MessageType]] = None, 428 ): 429 self.proto_messages: Dict[str, wrappers.MessageType] = {} 430 self.proto_enums: Dict[str, wrappers.EnumType] = {} 431 self.proto_services: Dict[str, wrappers.Service] = {} 432 self.file_descriptor = file_descriptor 433 self.file_to_generate = file_to_generate 434 self.prior_protos = prior_protos or {} 435 self.opts = opts 436 437 # Iterate over the documentation and place it into a dictionary. 438 # 439 # The comments in protocol buffers are sorted by a concept called 440 # the "path", which is a sequence of integers described in more 441 # detail below; this code simply shifts from a list to a dict, 442 # with tuples of paths as the dictionary keys. 443 self.docs: Dict[Tuple[int, ...], 444 descriptor_pb2.SourceCodeInfo.Location] = {} 445 for location in file_descriptor.source_code_info.location: 446 self.docs[tuple(location.path)] = location 447 448 # Everything has an "address", which is the proto where the thing 449 # was declared. 450 # 451 # We put this together by a baton pass of sorts: everything in 452 # this file *starts with* this address, which is appended to 453 # for each item as it is loaded. 454 self.address = metadata.Address( 455 api_naming=naming, 456 module=file_descriptor.name.split('/')[-1][:-len('.proto')], 457 package=tuple(file_descriptor.package.split('.')), 458 ) 459 460 # Now iterate over the FileDescriptorProto and pull out each of 461 # the messages, enums, and services. 462 # 463 # The hard-coded path keys sent here are based on how descriptor.proto 464 # works; it uses the proto message number of the pieces of each 465 # message (e.g. the hard-code `4` for `message_type` immediately 466 # below is because `repeated DescriptorProto message_type = 4;` in 467 # descriptor.proto itself). 468 self._load_children(file_descriptor.enum_type, self._load_enum, 469 address=self.address, path=(5,), 470 resources=all_resources or {}) 471 self._load_children(file_descriptor.message_type, self._load_message, 472 address=self.address, path=(4,), 473 resources=all_resources or {}) 474 475 # Edge case: Protocol buffers is not particularly picky about 476 # ordering, and it is possible that a message will have had a field 477 # referencing another message which appears later in the file 478 # (or itself, recursively). 479 # 480 # In this situation, we would not have come across the message yet, 481 # and the field would have its original textual reference to the 482 # message (`type_name`) but not its resolved message wrapper. 483 orphan_field_gen = ( 484 (field.type_name.lstrip('.'), field) 485 for message in self.proto_messages.values() 486 for field in message.fields.values() 487 if field.type_name and not (field.message or field.enum) 488 ) 489 for key, field in orphan_field_gen: 490 maybe_msg_type = self.proto_messages.get(key) 491 maybe_enum_type = self.proto_enums.get(key) 492 if maybe_msg_type: 493 object.__setattr__(field, 'message', maybe_msg_type) 494 elif maybe_enum_type: 495 object.__setattr__(field, 'enum', maybe_enum_type) 496 else: 497 raise TypeError( 498 f"Unknown type referenced in " 499 f"{self.file_descriptor.name}: '{key}'" 500 ) 501 502 # Only generate the service if this is a target file to be generated. 503 # This prevents us from generating common services (e.g. LRO) when 504 # they are being used as an import just to get types declared in the 505 # same files. 506 if file_to_generate and load_services: 507 self._load_children(file_descriptor.service, self._load_service, 508 address=self.address, path=(6,), 509 resources=all_resources or {}) 510 # TODO(lukesneeringer): oneofs are on path 7. 511 512 @property 513 def proto(self) -> Proto: 514 """Return a Proto dataclass object.""" 515 # Create a "context-naïve" proto. 516 # This has everything but is ignorant of naming collisions in the 517 # ultimate file that will be written. 518 naive = Proto( 519 all_enums=self.proto_enums, 520 all_messages=self.proto_messages, 521 file_pb2=self.file_descriptor, 522 file_to_generate=self.file_to_generate, 523 services=self.proto_services, 524 meta=metadata.Metadata( 525 address=self.address, 526 ), 527 ) 528 529 # If this is not a file being generated, we do not need to 530 # do anything else. 531 if not self.file_to_generate: 532 return naive 533 534 # Return a context-aware proto object. 535 return dataclasses.replace( 536 naive, 537 all_enums=collections.OrderedDict( 538 (k, v.with_context(collisions=naive.names)) 539 for k, v in naive.all_enums.items() 540 ), 541 all_messages=collections.OrderedDict( 542 (k, v.with_context(collisions=naive.names)) 543 for k, v in naive.all_messages.items() 544 ), 545 services=collections.OrderedDict( 546 # Note: services bind to themselves because services get their 547 # own output files. 548 (k, v.with_context(collisions=v.names)) 549 for k, v in naive.services.items() 550 ), 551 meta=naive.meta.with_context(collisions=naive.names), 552 ) 553 554 @cached_property 555 def api_enums(self) -> Mapping[str, wrappers.EnumType]: 556 return collections.ChainMap({}, self.proto_enums, 557 *[p.all_enums for p in self.prior_protos.values()], 558 ) 559 560 @cached_property 561 def api_messages(self) -> Mapping[str, wrappers.MessageType]: 562 return collections.ChainMap({}, self.proto_messages, 563 *[p.all_messages for p in self.prior_protos.values()], 564 ) 565 566 def _load_children(self, 567 children: Sequence, loader: Callable, *, 568 address: metadata.Address, path: Tuple[int, ...], 569 resources: Mapping[str, wrappers.MessageType]) -> Mapping: 570 """Return wrapped versions of arbitrary children from a Descriptor. 571 572 Args: 573 children (list): A sequence of children of the given field to 574 be loaded. For example, a FileDescriptorProto contains the 575 lists ``message_type``, ``enum_type``, etc.; these are valid 576 inputs for this argument. 577 loader (Callable[Message, Address, Tuple[int]]): The function able 578 to load the kind of message in ``children``. This should 579 be one of the ``_load_{noun}`` methods on this class 580 (e.g. ``_load_descriptor``). 581 address (~.metadata.Address): The address up to this point. 582 This will include the package and may include outer messages. 583 path (Tuple[int]): The location path up to this point. This is 584 used to correspond to documentation in 585 ``SourceCodeInfo.Location`` in ``descriptor.proto``. 586 587 Return: 588 Mapping[str, Union[~.MessageType, ~.Service, ~.EnumType]]: A 589 sequence of the objects that were loaded. 590 """ 591 # Iterate over the list of children provided and call the 592 # applicable loader function on each. 593 answer = {} 594 for child, i in zip(children, range(0, sys.maxsize)): 595 wrapped = loader(child, address=address, path=path + (i,), 596 resources=resources) 597 answer[wrapped.name] = wrapped 598 return answer 599 600 def _get_oneofs(self, 601 oneof_pbs: Sequence[descriptor_pb2.OneofDescriptorProto], 602 address: metadata.Address, path: Tuple[int, ...], 603 ) -> Dict[str, wrappers.Oneof]: 604 """Return a dictionary of wrapped oneofs for the given message. 605 606 Args: 607 oneof_fields (Sequence[~.descriptor_pb2.OneofDescriptorProto]): A 608 sequence of protobuf field objects. 609 address (~.metadata.Address): An address object denoting the 610 location of these oneofs. 611 path (Tuple[int]): The source location path thus far, as 612 understood by ``SourceCodeInfo.Location``. 613 614 Returns: 615 Mapping[str, ~.wrappers.Oneof]: A ordered mapping of 616 :class:`~.wrappers.Oneof` objects. 617 """ 618 # Iterate over the oneofs and collect them into a dictionary. 619 answer = collections.OrderedDict( 620 (oneof_pb.name, wrappers.Oneof(oneof_pb=oneof_pb)) 621 for i, oneof_pb in enumerate(oneof_pbs) 622 ) 623 624 # Done; return the answer. 625 return answer 626 627 def _get_fields(self, 628 field_pbs: Sequence[descriptor_pb2.FieldDescriptorProto], 629 address: metadata.Address, path: Tuple[int, ...], 630 oneofs: Optional[Dict[str, wrappers.Oneof]] = None 631 ) -> Dict[str, wrappers.Field]: 632 """Return a dictionary of wrapped fields for the given message. 633 634 Args: 635 field_pbs (Sequence[~.descriptor_pb2.FieldDescriptorProto]): A 636 sequence of protobuf field objects. 637 address (~.metadata.Address): An address object denoting the 638 location of these fields. 639 path (Tuple[int]): The source location path thus far, as 640 understood by ``SourceCodeInfo.Location``. 641 642 Returns: 643 Mapping[str, ~.wrappers.Field]: A ordered mapping of 644 :class:`~.wrappers.Field` objects. 645 """ 646 # Iterate over the fields and collect them into a dictionary. 647 # 648 # The saving of the enum and message types rely on protocol buffers' 649 # naming rules to trust that they will never collide. 650 # 651 # Note: If this field is a recursive reference to its own message, 652 # then the message will not be in `api_messages` yet (because the 653 # message wrapper is not yet created, because it needs this object 654 # first) and this will be None. This case is addressed in the 655 # `_load_message` method. 656 answer: Dict[str, wrappers.Field] = collections.OrderedDict() 657 for i, field_pb in enumerate(field_pbs): 658 is_oneof = oneofs and field_pb.HasField('oneof_index') 659 oneof_name = nth( 660 (oneofs or {}).keys(), 661 field_pb.oneof_index 662 ) if is_oneof else None 663 664 field = wrappers.Field( 665 field_pb=field_pb, 666 enum=self.api_enums.get(field_pb.type_name.lstrip('.')), 667 message=self.api_messages.get(field_pb.type_name.lstrip('.')), 668 meta=metadata.Metadata( 669 address=address.child(field_pb.name, path + (i,)), 670 documentation=self.docs.get(path + (i,), self.EMPTY), 671 ), 672 oneof=oneof_name, 673 ) 674 answer[field.name] = field 675 676 # Done; return the answer. 677 return answer 678 679 def _get_retry_and_timeout( 680 self, 681 service_address: metadata.Address, 682 meth_pb: descriptor_pb2.MethodDescriptorProto 683 ) -> Tuple[Optional[wrappers.RetryInfo], Optional[float]]: 684 """Returns the retry and timeout configuration of a method if it exists. 685 686 Args: 687 service_address (~.metadata.Address): An address object for the 688 service, denoting the location of these methods. 689 meth_pb (~.descriptor_pb2.MethodDescriptorProto): A 690 protobuf method objects. 691 692 Returns: 693 Tuple[Optional[~.wrappers.RetryInfo], Optional[float]]: The retry 694 and timeout information for the method if it exists. 695 """ 696 697 # If we got a gRPC service config, get the appropriate retry 698 # and timeout information from it. 699 retry = None 700 timeout = None 701 702 # This object should be a dictionary that conforms to the 703 # gRPC service config proto: 704 # Repo: https://github.com/grpc/grpc-proto/ 705 # Filename: grpc/service_config/service_config.proto 706 # 707 # We only care about a small piece, so we are just leaving 708 # it as a dictionary and parsing accordingly. 709 if self.opts.retry: 710 # The gRPC service config uses a repeated `name` field 711 # with a particular format, which we match against. 712 # This defines the expected selector for *this* method. 713 selector = { 714 'service': '{package}.{service_name}'.format( 715 package='.'.join(service_address.package), 716 service_name=service_address.name, 717 ), 718 'method': meth_pb.name, 719 } 720 721 # Find the method config that applies to us, if any. 722 mc = next((c for c in self.opts.retry.get('methodConfig', []) 723 if selector in c.get('name')), None) 724 if mc: 725 # Set the timeout according to this method config. 726 if mc.get('timeout'): 727 timeout = self._to_float(mc['timeout']) 728 729 # Set the retry according to this method config. 730 if 'retryPolicy' in mc: 731 r = mc['retryPolicy'] 732 retry = wrappers.RetryInfo( 733 max_attempts=r.get('maxAttempts', 0), 734 initial_backoff=self._to_float( 735 r.get('initialBackoff', '0s'), 736 ), 737 max_backoff=self._to_float(r.get('maxBackoff', '0s')), 738 backoff_multiplier=r.get('backoffMultiplier', 0.0), 739 retryable_exceptions=frozenset( 740 exceptions.exception_class_for_grpc_status( 741 getattr(grpc.StatusCode, code), 742 ) 743 for code in r.get('retryableStatusCodes', []) 744 ), 745 ) 746 747 return retry, timeout 748 749 def _maybe_get_lro( 750 self, 751 service_address: metadata.Address, 752 meth_pb: descriptor_pb2.MethodDescriptorProto 753 ) -> Optional[wrappers.OperationInfo]: 754 """Determines whether a method is a Long Running Operation (aka LRO) 755 and, if it is, return an OperationInfo that includes the response 756 and metadata types. 757 758 Args: 759 service_address (~.metadata.Address): An address object for the 760 service, denoting the location of these methods. 761 meth_pb (~.descriptor_pb2.MethodDescriptorProto): A 762 protobuf method objects. 763 764 Returns: 765 Optional[~.wrappers.OperationInfo]: The info for the long-running 766 operation, if the passed method is an LRO. 767 """ 768 lro = None 769 770 # If the output type is google.longrunning.Operation, we use 771 # a specialized object in its place. 772 if meth_pb.output_type.endswith('google.longrunning.Operation'): 773 op = meth_pb.options.Extensions[operations_pb2.operation_info] 774 if not op.response_type or not op.metadata_type: 775 raise TypeError( 776 f'rpc {meth_pb.name} returns a google.longrunning.' 777 'Operation, but is missing a response type or ' 778 'metadata type.', 779 ) 780 response_key = service_address.resolve(op.response_type) 781 metadata_key = service_address.resolve(op.metadata_type) 782 lro = wrappers.OperationInfo( 783 response_type=self.api_messages[response_key], 784 metadata_type=self.api_messages[metadata_key], 785 ) 786 787 return lro 788 789 def _get_methods(self, 790 methods: Sequence[descriptor_pb2.MethodDescriptorProto], 791 service_address: metadata.Address, path: Tuple[int, ...], 792 ) -> Mapping[str, wrappers.Method]: 793 """Return a dictionary of wrapped methods for the given service. 794 795 Args: 796 methods (Sequence[~.descriptor_pb2.MethodDescriptorProto]): A 797 sequence of protobuf method objects. 798 service_address (~.metadata.Address): An address object for the 799 service, denoting the location of these methods. 800 path (Tuple[int]): The source location path thus far, as understood 801 by ``SourceCodeInfo.Location``. 802 803 Returns: 804 Mapping[str, ~.wrappers.Method]: A ordered mapping of 805 :class:`~.wrappers.Method` objects. 806 """ 807 # Iterate over the methods and collect them into a dictionary. 808 answer: Dict[str, wrappers.Method] = collections.OrderedDict() 809 for i, meth_pb in enumerate(methods): 810 retry, timeout = self._get_retry_and_timeout( 811 service_address, 812 meth_pb 813 ) 814 815 # Create the method wrapper object. 816 answer[meth_pb.name] = wrappers.Method( 817 input=self.api_messages[meth_pb.input_type.lstrip('.')], 818 lro=self._maybe_get_lro(service_address, meth_pb), 819 method_pb=meth_pb, 820 meta=metadata.Metadata( 821 address=service_address.child(meth_pb.name, path + (i,)), 822 documentation=self.docs.get(path + (i,), self.EMPTY), 823 ), 824 output=self.api_messages[meth_pb.output_type.lstrip('.')], 825 retry=retry, 826 timeout=timeout, 827 ) 828 829 # Done; return the answer. 830 return answer 831 832 def _load_message(self, 833 message_pb: descriptor_pb2.DescriptorProto, 834 address: metadata.Address, 835 path: Tuple[int], 836 resources: Mapping[str, wrappers.MessageType], 837 ) -> wrappers.MessageType: 838 """Load message descriptions from DescriptorProtos.""" 839 address = address.child(message_pb.name, path) 840 841 # Load all nested items. 842 # 843 # Note: This occurs before piecing together this message's fields 844 # because if nested types are present, they are generally the 845 # type of one of this message's fields, and they need to be in 846 # the registry for the field's message or enum attributes to be 847 # set correctly. 848 nested_enums = self._load_children( 849 message_pb.enum_type, 850 address=address, 851 loader=self._load_enum, 852 path=path + (4,), 853 resources=resources, 854 ) 855 nested_messages = self._load_children( 856 message_pb.nested_type, 857 address=address, 858 loader=self._load_message, 859 path=path + (3,), 860 resources=resources, 861 ) 862 863 oneofs = self._get_oneofs( 864 message_pb.oneof_decl, 865 address=address, 866 path=path + (7,), 867 ) 868 869 # Create a dictionary of all the fields for this message. 870 fields = self._get_fields( 871 message_pb.field, 872 address=address, 873 path=path + (2,), 874 oneofs=oneofs, 875 ) 876 fields.update(self._get_fields( 877 message_pb.extension, 878 address=address, 879 path=path + (6,), 880 oneofs=oneofs, 881 )) 882 883 # Create a message correspoding to this descriptor. 884 self.proto_messages[address.proto] = wrappers.MessageType( 885 fields=fields, 886 message_pb=message_pb, 887 nested_enums=nested_enums, 888 nested_messages=nested_messages, 889 meta=metadata.Metadata( 890 address=address, 891 documentation=self.docs.get(path, self.EMPTY), 892 ), 893 oneofs=oneofs, 894 ) 895 return self.proto_messages[address.proto] 896 897 def _load_enum(self, 898 enum: descriptor_pb2.EnumDescriptorProto, 899 address: metadata.Address, 900 path: Tuple[int], 901 resources: Mapping[str, wrappers.MessageType], 902 ) -> wrappers.EnumType: 903 """Load enum descriptions from EnumDescriptorProtos.""" 904 address = address.child(enum.name, path) 905 906 # Put together wrapped objects for the enum values. 907 values = [] 908 for enum_value, i in zip(enum.value, range(0, sys.maxsize)): 909 values.append(wrappers.EnumValueType( 910 enum_value_pb=enum_value, 911 meta=metadata.Metadata( 912 address=address, 913 documentation=self.docs.get(path + (2, i), self.EMPTY), 914 ), 915 )) 916 917 # Load the enum itself. 918 self.proto_enums[address.proto] = wrappers.EnumType( 919 enum_pb=enum, 920 meta=metadata.Metadata( 921 address=address, 922 documentation=self.docs.get(path, self.EMPTY), 923 ), 924 values=values, 925 ) 926 return self.proto_enums[address.proto] 927 928 def _load_service(self, 929 service: descriptor_pb2.ServiceDescriptorProto, 930 address: metadata.Address, 931 path: Tuple[int], 932 resources: Mapping[str, wrappers.MessageType], 933 ) -> wrappers.Service: 934 """Load comments for a service and its methods.""" 935 address = address.child(service.name, path) 936 937 # Put together a dictionary of the service's methods. 938 methods = self._get_methods( 939 service.method, 940 service_address=address, 941 path=path + (2,), 942 ) 943 944 # Load the comments for the service itself. 945 self.proto_services[address.proto] = wrappers.Service( 946 meta=metadata.Metadata( 947 address=address, 948 documentation=self.docs.get(path, self.EMPTY), 949 ), 950 methods=methods, 951 service_pb=service, 952 visible_resources=resources, 953 ) 954 return self.proto_services[address.proto] 955 956 def _to_float(self, s: str) -> float: 957 """Convert a protobuf duration string (e.g. `"30s"`) to float.""" 958 return int(s[:-1]) / 1e9 if s.endswith('n') else float(s[:-1]) 959