1# Copyright 2018 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""This module contains the "roll-up" class, :class:`~.API`.
16Everything else in the :mod:`~.schema` module is usually accessed
17through an :class:`~.API` object.
18"""
19
20import collections
21import dataclasses
22import itertools
23import keyword
24import os
25import sys
26from typing import Callable, Container, Dict, FrozenSet, Mapping, Optional, Sequence, Set, Tuple
27from types import MappingProxyType
28
29from google.api_core import exceptions  # type: ignore
30from google.api import resource_pb2  # type: ignore
31from google.longrunning import operations_pb2  # type: ignore
32from google.protobuf import descriptor_pb2
33
34import grpc  # type: ignore
35
36from gapic.schema import metadata
37from gapic.schema import wrappers
38from gapic.schema import naming as api_naming
39from gapic.utils import cached_property
40from gapic.utils import nth
41from gapic.utils import Options
42from gapic.utils import to_snake_case
43from gapic.utils import RESERVED_NAMES
44
45
46@dataclasses.dataclass(frozen=True)
47class Proto:
48    """A representation of a particular proto file within an API."""
49
50    file_pb2: descriptor_pb2.FileDescriptorProto
51    services: Mapping[str, wrappers.Service]
52    all_messages: Mapping[str, wrappers.MessageType]
53    all_enums: Mapping[str, wrappers.EnumType]
54    file_to_generate: bool
55    meta: metadata.Metadata = dataclasses.field(
56        default_factory=metadata.Metadata,
57    )
58
59    def __getattr__(self, name: str):
60        return getattr(self.file_pb2, name)
61
62    @classmethod
63    def build(
64            cls,
65            file_descriptor: descriptor_pb2.FileDescriptorProto,
66            file_to_generate: bool,
67            naming: api_naming.Naming,
68            opts: Options = Options(),
69            prior_protos: Mapping[str, 'Proto'] = None,
70            load_services: bool = True,
71            all_resources: Optional[Mapping[str, wrappers.MessageType]] = None,
72    ) -> 'Proto':
73        """Build and return a Proto instance.
74
75        Args:
76            file_descriptor (~.FileDescriptorProto): The protocol buffer
77                object describing the proto file.
78            file_to_generate (bool): Whether this is a file which is
79                to be directly generated, or a dependency.
80            naming (~.Naming): The :class:`~.Naming` instance associated
81                with the API.
82            prior_protos (~.Proto): Previous, already processed protos.
83                These are needed to look up messages in imported protos.
84            load_services (bool): Toggle whether the proto file should
85                load its services. Not doing so enables a two-pass fix for
86                LRO response and metadata types in certain situations.
87        """
88        return _ProtoBuilder(
89            file_descriptor,
90            file_to_generate=file_to_generate,
91            naming=naming,
92            opts=opts,
93            prior_protos=prior_protos or {},
94            load_services=load_services,
95            all_resources=all_resources or {},
96        ).proto
97
98    @cached_property
99    def enums(self) -> Mapping[str, wrappers.EnumType]:
100        """Return top-level enums on the proto."""
101        return collections.OrderedDict([
102            (k, v) for k, v in self.all_enums.items()
103            if not v.meta.address.parent
104        ])
105
106    @cached_property
107    def messages(self) -> Mapping[str, wrappers.MessageType]:
108        """Return top-level messages on the proto."""
109        return collections.OrderedDict(
110            (k, v) for k, v in self.all_messages.items()
111            if not v.meta.address.parent
112        )
113
114    @cached_property
115    def resource_messages(self) -> Mapping[str, wrappers.MessageType]:
116        """Return the file level resources of the proto."""
117        file_resource_messages = (
118            (res.type, wrappers.CommonResource.build(res).message_type)
119            for res in self.file_pb2.options.Extensions[resource_pb2.resource_definition]
120        )
121        resource_messages = (
122            (msg.options.Extensions[resource_pb2.resource].type, msg)
123            for msg in self.messages.values()
124            if msg.options.Extensions[resource_pb2.resource].type
125        )
126        return collections.OrderedDict(
127            itertools.chain(
128                file_resource_messages, resource_messages,
129            )
130        )
131
132    @property
133    def module_name(self) -> str:
134        """Return the appropriate module name for this service.
135
136        Returns:
137            str: The module name for this service (which is the service
138                name in snake case).
139        """
140        return to_snake_case(self.name.split('/')[-1][:-len('.proto')])
141
142    @cached_property
143    def names(self) -> FrozenSet[str]:
144        """Return a set of names used by this proto.
145
146        This is used for detecting naming collisions in the module names
147        used for imports.
148        """
149        # Add names of all enums, messages, and fields.
150        answer: Set[str] = {e.name for e in self.all_enums.values()}
151        for message in self.all_messages.values():
152            answer.update(f.name for f in message.fields.values())
153            answer.add(message.name)
154
155        # Identify any import module names where the same module name is used
156        # from distinct packages.
157        modules: Dict[str, Set[str]] = collections.defaultdict(set)
158        for m in self.all_messages.values():
159            for t in m.recursive_field_types:
160                modules[t.ident.module].add(t.ident.package)
161
162        answer.update(
163            module_name
164            for module_name, packages in modules.items()
165            if len(packages) > 1 or module_name in RESERVED_NAMES
166        )
167
168        # Return the set of collision names.
169        return frozenset(answer)
170
171    @cached_property
172    def python_modules(self) -> Sequence[Tuple[str, str]]:
173        """Return a sequence of Python modules, for import.
174
175        The results of this method are in alphabetical order (by package,
176        then module), and do not contain duplicates.
177
178        Returns:
179            Sequence[Tuple[str, str]]: The package and module pair, intended
180            for use in a ``from package import module`` type
181            of statement.
182        """
183        self_reference = self.meta.address.python_import
184
185        answer = {
186            t.ident.python_import
187            for m in self.all_messages.values()
188            # Sanity check: We do make sure that we are not trying to have
189            # a module import itself.
190            for t in m.field_types if t.ident.python_import != self_reference
191        }
192
193        # Done; return the sorted sequence.
194        return tuple(sorted(answer))
195
196    def disambiguate(self, string: str) -> str:
197        """Return a disambiguated string for the context of this proto.
198
199        This is used for avoiding naming collisions. Generally, this method
200        returns the same string, but it returns a modified version if
201        it will cause a naming collision with messages or fields in this proto.
202        """
203        if string in self.names:
204            return self.disambiguate(f'_{string}')
205        return string
206
207
208@dataclasses.dataclass(frozen=True)
209class API:
210    """A representation of a full API.
211
212    This represents a top-down view of a complete API, as loaded from a
213    set of protocol buffer files. Once the descriptors are loaded
214    (see :meth:`load`), this object contains every message, method, service,
215    and everything else needed to write a client library.
216
217    An instance of this object is made available to every template
218    (as ``api``).
219    """
220    naming: api_naming.Naming
221    all_protos: Mapping[str, Proto]
222    subpackage_view: Tuple[str, ...] = dataclasses.field(default_factory=tuple)
223
224    @classmethod
225    def build(
226        cls,
227        file_descriptors: Sequence[descriptor_pb2.FileDescriptorProto],
228        package: str = '',
229        opts: Options = Options(),
230        prior_protos: Mapping[str, 'Proto'] = None,
231    ) -> 'API':
232        """Build the internal API schema based on the request.
233
234        Args:
235            file_descriptors (Sequence[~.FileDescriptorProto]): A list of
236                :class:`~.FileDescriptorProto` objects describing the
237                API.
238            package (str): A protocol buffer package, as a string, for which
239                code should be explicitly generated (including subpackages).
240                Protos with packages outside this list are considered imports
241                rather than explicit targets.
242            opts (~.options.Options): CLI options passed to the generator.
243            prior_protos (~.Proto): Previous, already processed protos.
244                These are needed to look up messages in imported protos.
245                Primarily used for testing.
246        """
247        # Save information about the overall naming for this API.
248        naming = api_naming.Naming.build(*filter(
249            lambda fd: fd.package.startswith(package),
250            file_descriptors,
251        ), opts=opts)
252
253        def disambiguate_keyword_fname(
254                full_path: str,
255                visited_names: Container[str]) -> str:
256            path, fname = os.path.split(full_path)
257            name, ext = os.path.splitext(fname)
258            if name in keyword.kwlist or full_path in visited_names:
259                name += "_"
260                full_path = os.path.join(path, name + ext)
261                if full_path in visited_names:
262                    return disambiguate_keyword_fname(full_path, visited_names)
263
264            return full_path
265
266        # Iterate over each FileDescriptorProto and fill out a Proto
267        # object describing it, and save these to the instance.
268        #
269        # The first pass gathers messages and enums but NOT services or methods.
270        # This is a workaround for a limitation in protobuf annotations for
271        # long running operations: the annotations are strings that reference
272        # message types but do not require a proto import.
273        # This hack attempts to address a common case where API authors,
274        # not wishing to generate an 'unused import' warning,
275        # don't import the proto file defining the real response or metadata
276        # type into the proto file that defines an LRO.
277        # We just load all the APIs types first and then
278        # load the services and methods with the full scope of types.
279        pre_protos: Dict[str, Proto] = dict(prior_protos or {})
280        for fd in file_descriptors:
281            fd.name = disambiguate_keyword_fname(fd.name, pre_protos)
282            pre_protos[fd.name] = Proto.build(
283                file_descriptor=fd,
284                file_to_generate=fd.package.startswith(package),
285                naming=naming,
286                opts=opts,
287                prior_protos=pre_protos,
288                # Ugly, ugly hack.
289                load_services=False,
290            )
291
292        # A file descriptor's file-level resources are NOT visible to any importers.
293        # The only way to make referenced resources visible is to aggregate them at
294        # the API level and then pass that around.
295        all_file_resources = collections.ChainMap(
296            *(proto.resource_messages for proto in pre_protos.values())
297        )
298
299        # Second pass uses all the messages and enums defined in the entire API.
300        # This allows LRO returning methods to see all the types in the API,
301        # bypassing the above missing import problem.
302        protos: Dict[str, Proto] = {
303            name: Proto.build(
304                file_descriptor=proto.file_pb2,
305                file_to_generate=proto.file_to_generate,
306                naming=naming,
307                opts=opts,
308                prior_protos=pre_protos,
309                all_resources=MappingProxyType(all_file_resources),
310            )
311            for name, proto in pre_protos.items()
312        }
313
314        # Done; return the API.
315        return cls(naming=naming, all_protos=protos)
316
317    @cached_property
318    def enums(self) -> Mapping[str, wrappers.EnumType]:
319        """Return a map of all enums available in the API."""
320        return collections.ChainMap({},
321                                    *[p.all_enums for p in self.protos.values()],
322                                    )
323
324    @cached_property
325    def messages(self) -> Mapping[str, wrappers.MessageType]:
326        """Return a map of all messages available in the API."""
327        return collections.ChainMap({},
328                                    *[p.all_messages for p in self.protos.values()],
329                                    )
330
331    @cached_property
332    def top_level_messages(self) -> Mapping[str, wrappers.MessageType]:
333        """Return a map of all messages that are NOT nested."""
334        return {
335            k: v
336            for p in self.protos.values()
337            for k, v in p.messages.items()
338        }
339
340    @cached_property
341    def top_level_enums(self) -> Mapping[str, wrappers.EnumType]:
342        """Return a map of all messages that are NOT nested."""
343        return {
344            k: v
345            for p in self.protos.values()
346            for k, v in p.enums.items()
347        }
348
349    @cached_property
350    def protos(self) -> Mapping[str, Proto]:
351        """Return a map of all protos specific to this API.
352
353        This property excludes imported protos that are dependencies
354        of this API but not being directly generated.
355        """
356        view = self.subpackage_view
357        return collections.OrderedDict([
358            (k, v) for k, v in self.all_protos.items()
359            if v.file_to_generate and
360            v.meta.address.subpackage[:len(view)] == view
361        ])
362
363    @cached_property
364    def services(self) -> Mapping[str, wrappers.Service]:
365        """Return a map of all services available in the API."""
366        return collections.ChainMap({},
367                                    *[p.services for p in self.protos.values()],
368                                    )
369
370    @cached_property
371    def subpackages(self) -> Mapping[str, 'API']:
372        """Return a map of all subpackages, if any.
373
374        Each value in the mapping is another API object, but the ``protos``
375        property only shows protos belonging to the subpackage.
376        """
377        answer: Dict[str, API] = collections.OrderedDict()
378
379        # Get the actual subpackages we have.
380        #
381        # Note that this intentionally only goes one level deep; nested
382        # subpackages can be accessed by requesting subpackages of the
383        # derivative API objects returned here.
384        level = len(self.subpackage_view)
385        for subpkg_name in sorted({p.meta.address.subpackage[0]
386                                   for p in self.protos.values()
387                                   if len(p.meta.address.subpackage) > level and
388                                   p.meta.address.subpackage[:level] == self.subpackage_view}):
389            answer[subpkg_name] = dataclasses.replace(self,
390                                                      subpackage_view=self.subpackage_view +
391                                                      (subpkg_name,),
392                                                      )
393        return answer
394
395    def requires_package(self, pkg: Tuple[str, ...]) -> bool:
396        return any(
397            message.ident.package == pkg
398            for proto in self.all_protos.values()
399            for message in proto.all_messages.values()
400        )
401
402
403class _ProtoBuilder:
404    """A "builder class" for Proto objects.
405
406    The sole purpose of this class is to accept the information from the
407    file descriptor and "piece together" the components of the :class:`~.Proto`
408    object in-place.
409
410    This allows the public :class:`~.Proto` object to be frozen, and free
411    of the setup machinations.
412
413    The correct usage of this class is always to create an instance, call
414    the :attr:`proto` property, and then throw the builder away. Additionally,
415    there should be no reason to use this class outside of this module.
416    """
417    EMPTY = descriptor_pb2.SourceCodeInfo.Location()
418
419    def __init__(
420        self,
421        file_descriptor: descriptor_pb2.FileDescriptorProto,
422        file_to_generate: bool,
423        naming: api_naming.Naming,
424        opts: Options = Options(),
425        prior_protos: Mapping[str, Proto] = None,
426        load_services: bool = True,
427        all_resources: Optional[Mapping[str, wrappers.MessageType]] = None,
428    ):
429        self.proto_messages: Dict[str, wrappers.MessageType] = {}
430        self.proto_enums: Dict[str, wrappers.EnumType] = {}
431        self.proto_services: Dict[str, wrappers.Service] = {}
432        self.file_descriptor = file_descriptor
433        self.file_to_generate = file_to_generate
434        self.prior_protos = prior_protos or {}
435        self.opts = opts
436
437        # Iterate over the documentation and place it into a dictionary.
438        #
439        # The comments in protocol buffers are sorted by a concept called
440        # the "path", which is a sequence of integers described in more
441        # detail below; this code simply shifts from a list to a dict,
442        # with tuples of paths as the dictionary keys.
443        self.docs: Dict[Tuple[int, ...],
444                        descriptor_pb2.SourceCodeInfo.Location] = {}
445        for location in file_descriptor.source_code_info.location:
446            self.docs[tuple(location.path)] = location
447
448        # Everything has an "address", which is the proto where the thing
449        # was declared.
450        #
451        # We put this together by a baton pass of sorts: everything in
452        # this file *starts with* this address, which is appended to
453        # for each item as it is loaded.
454        self.address = metadata.Address(
455            api_naming=naming,
456            module=file_descriptor.name.split('/')[-1][:-len('.proto')],
457            package=tuple(file_descriptor.package.split('.')),
458        )
459
460        # Now iterate over the FileDescriptorProto and pull out each of
461        # the messages, enums, and services.
462        #
463        # The hard-coded path keys sent here are based on how descriptor.proto
464        # works; it uses the proto message number of the pieces of each
465        # message (e.g. the hard-code `4` for `message_type` immediately
466        # below is because `repeated DescriptorProto message_type = 4;` in
467        # descriptor.proto itself).
468        self._load_children(file_descriptor.enum_type, self._load_enum,
469                            address=self.address, path=(5,),
470                            resources=all_resources or {})
471        self._load_children(file_descriptor.message_type, self._load_message,
472                            address=self.address, path=(4,),
473                            resources=all_resources or {})
474
475        # Edge case: Protocol buffers is not particularly picky about
476        # ordering, and it is possible that a message will have had a field
477        # referencing another message which appears later in the file
478        # (or itself, recursively).
479        #
480        # In this situation, we would not have come across the message yet,
481        # and the field would have its original textual reference to the
482        # message (`type_name`) but not its resolved message wrapper.
483        orphan_field_gen = (
484            (field.type_name.lstrip('.'), field)
485            for message in self.proto_messages.values()
486            for field in message.fields.values()
487            if field.type_name and not (field.message or field.enum)
488        )
489        for key, field in orphan_field_gen:
490            maybe_msg_type = self.proto_messages.get(key)
491            maybe_enum_type = self.proto_enums.get(key)
492            if maybe_msg_type:
493                object.__setattr__(field, 'message', maybe_msg_type)
494            elif maybe_enum_type:
495                object.__setattr__(field, 'enum', maybe_enum_type)
496            else:
497                raise TypeError(
498                    f"Unknown type referenced in "
499                    f"{self.file_descriptor.name}: '{key}'"
500                )
501
502        # Only generate the service if this is a target file to be generated.
503        # This prevents us from generating common services (e.g. LRO) when
504        # they are being used as an import just to get types declared in the
505        # same files.
506        if file_to_generate and load_services:
507            self._load_children(file_descriptor.service, self._load_service,
508                                address=self.address, path=(6,),
509                                resources=all_resources or {})
510        # TODO(lukesneeringer): oneofs are on path 7.
511
512    @property
513    def proto(self) -> Proto:
514        """Return a Proto dataclass object."""
515        # Create a "context-naïve" proto.
516        # This has everything but is ignorant of naming collisions in the
517        # ultimate file that will be written.
518        naive = Proto(
519            all_enums=self.proto_enums,
520            all_messages=self.proto_messages,
521            file_pb2=self.file_descriptor,
522            file_to_generate=self.file_to_generate,
523            services=self.proto_services,
524            meta=metadata.Metadata(
525                address=self.address,
526            ),
527        )
528
529        # If this is not a file being generated, we do not need to
530        # do anything else.
531        if not self.file_to_generate:
532            return naive
533
534        # Return a context-aware proto object.
535        return dataclasses.replace(
536            naive,
537            all_enums=collections.OrderedDict(
538                (k, v.with_context(collisions=naive.names))
539                for k, v in naive.all_enums.items()
540            ),
541            all_messages=collections.OrderedDict(
542                (k, v.with_context(collisions=naive.names))
543                for k, v in naive.all_messages.items()
544            ),
545            services=collections.OrderedDict(
546                # Note: services bind to themselves because services get their
547                # own output files.
548                (k, v.with_context(collisions=v.names))
549                for k, v in naive.services.items()
550            ),
551            meta=naive.meta.with_context(collisions=naive.names),
552        )
553
554    @cached_property
555    def api_enums(self) -> Mapping[str, wrappers.EnumType]:
556        return collections.ChainMap({}, self.proto_enums,
557                                    *[p.all_enums for p in self.prior_protos.values()],
558                                    )
559
560    @cached_property
561    def api_messages(self) -> Mapping[str, wrappers.MessageType]:
562        return collections.ChainMap({}, self.proto_messages,
563                                    *[p.all_messages for p in self.prior_protos.values()],
564                                    )
565
566    def _load_children(self,
567                       children: Sequence, loader: Callable, *,
568                       address: metadata.Address, path: Tuple[int, ...],
569                       resources: Mapping[str, wrappers.MessageType]) -> Mapping:
570        """Return wrapped versions of arbitrary children from a Descriptor.
571
572        Args:
573            children (list): A sequence of children of the given field to
574                be loaded. For example, a FileDescriptorProto contains the
575                lists ``message_type``, ``enum_type``, etc.; these are valid
576                inputs for this argument.
577            loader (Callable[Message, Address, Tuple[int]]): The function able
578                to load the kind of message in ``children``. This should
579                be one of the ``_load_{noun}`` methods on this class
580                (e.g. ``_load_descriptor``).
581            address (~.metadata.Address): The address up to this point.
582                This will include the package and may include outer messages.
583            path (Tuple[int]): The location path up to this point. This is
584                used to correspond to documentation in
585                ``SourceCodeInfo.Location`` in ``descriptor.proto``.
586
587        Return:
588            Mapping[str, Union[~.MessageType, ~.Service, ~.EnumType]]: A
589                sequence of the objects that were loaded.
590        """
591        # Iterate over the list of children provided and call the
592        # applicable loader function on each.
593        answer = {}
594        for child, i in zip(children, range(0, sys.maxsize)):
595            wrapped = loader(child, address=address, path=path + (i,),
596                             resources=resources)
597            answer[wrapped.name] = wrapped
598        return answer
599
600    def _get_oneofs(self,
601                    oneof_pbs: Sequence[descriptor_pb2.OneofDescriptorProto],
602                    address: metadata.Address, path: Tuple[int, ...],
603                    ) -> Dict[str, wrappers.Oneof]:
604        """Return a dictionary of wrapped oneofs for the given message.
605
606        Args:
607            oneof_fields (Sequence[~.descriptor_pb2.OneofDescriptorProto]): A
608                sequence of protobuf field objects.
609            address (~.metadata.Address): An address object denoting the
610                location of these oneofs.
611            path (Tuple[int]): The source location path thus far, as
612                understood by ``SourceCodeInfo.Location``.
613
614        Returns:
615            Mapping[str, ~.wrappers.Oneof]: A ordered mapping of
616                :class:`~.wrappers.Oneof` objects.
617        """
618        # Iterate over the oneofs and collect them into a dictionary.
619        answer = collections.OrderedDict(
620            (oneof_pb.name, wrappers.Oneof(oneof_pb=oneof_pb))
621            for i, oneof_pb in enumerate(oneof_pbs)
622        )
623
624        # Done; return the answer.
625        return answer
626
627    def _get_fields(self,
628                    field_pbs: Sequence[descriptor_pb2.FieldDescriptorProto],
629                    address: metadata.Address, path: Tuple[int, ...],
630                    oneofs: Optional[Dict[str, wrappers.Oneof]] = None
631                    ) -> Dict[str, wrappers.Field]:
632        """Return a dictionary of wrapped fields for the given message.
633
634        Args:
635            field_pbs (Sequence[~.descriptor_pb2.FieldDescriptorProto]): A
636                sequence of protobuf field objects.
637            address (~.metadata.Address): An address object denoting the
638                location of these fields.
639            path (Tuple[int]): The source location path thus far, as
640                understood by ``SourceCodeInfo.Location``.
641
642        Returns:
643            Mapping[str, ~.wrappers.Field]: A ordered mapping of
644                :class:`~.wrappers.Field` objects.
645        """
646        # Iterate over the fields and collect them into a dictionary.
647        #
648        # The saving of the enum and message types rely on protocol buffers'
649        # naming rules to trust that they will never collide.
650        #
651        # Note: If this field is a recursive reference to its own message,
652        # then the message will not be in `api_messages` yet (because the
653        # message wrapper is not yet created, because it needs this object
654        # first) and this will be None. This case is addressed in the
655        # `_load_message` method.
656        answer: Dict[str, wrappers.Field] = collections.OrderedDict()
657        for i, field_pb in enumerate(field_pbs):
658            is_oneof = oneofs and field_pb.HasField('oneof_index')
659            oneof_name = nth(
660                (oneofs or {}).keys(),
661                field_pb.oneof_index
662            ) if is_oneof else None
663
664            field = wrappers.Field(
665                field_pb=field_pb,
666                enum=self.api_enums.get(field_pb.type_name.lstrip('.')),
667                message=self.api_messages.get(field_pb.type_name.lstrip('.')),
668                meta=metadata.Metadata(
669                    address=address.child(field_pb.name, path + (i,)),
670                    documentation=self.docs.get(path + (i,), self.EMPTY),
671                ),
672                oneof=oneof_name,
673            )
674            answer[field.name] = field
675
676        # Done; return the answer.
677        return answer
678
679    def _get_retry_and_timeout(
680        self,
681        service_address: metadata.Address,
682        meth_pb: descriptor_pb2.MethodDescriptorProto
683    ) -> Tuple[Optional[wrappers.RetryInfo], Optional[float]]:
684        """Returns the retry and timeout configuration of a method if it exists.
685
686        Args:
687            service_address (~.metadata.Address): An address object for the
688                service, denoting the location of these methods.
689            meth_pb (~.descriptor_pb2.MethodDescriptorProto): A
690                protobuf method objects.
691
692        Returns:
693            Tuple[Optional[~.wrappers.RetryInfo], Optional[float]]: The retry
694                and timeout information for the method if it exists.
695        """
696
697        # If we got a gRPC service config, get the appropriate retry
698        # and timeout information from it.
699        retry = None
700        timeout = None
701
702        # This object should be a dictionary that conforms to the
703        # gRPC service config proto:
704        #   Repo: https://github.com/grpc/grpc-proto/
705        #   Filename: grpc/service_config/service_config.proto
706        #
707        # We only care about a small piece, so we are just leaving
708        # it as a dictionary and parsing accordingly.
709        if self.opts.retry:
710            # The gRPC service config uses a repeated `name` field
711            # with a particular format, which we match against.
712            # This defines the expected selector for *this* method.
713            selector = {
714                'service': '{package}.{service_name}'.format(
715                    package='.'.join(service_address.package),
716                    service_name=service_address.name,
717                ),
718                'method': meth_pb.name,
719            }
720
721            # Find the method config that applies to us, if any.
722            mc = next((c for c in self.opts.retry.get('methodConfig', [])
723                       if selector in c.get('name')), None)
724            if mc:
725                # Set the timeout according to this method config.
726                if mc.get('timeout'):
727                    timeout = self._to_float(mc['timeout'])
728
729                # Set the retry according to this method config.
730                if 'retryPolicy' in mc:
731                    r = mc['retryPolicy']
732                    retry = wrappers.RetryInfo(
733                        max_attempts=r.get('maxAttempts', 0),
734                        initial_backoff=self._to_float(
735                            r.get('initialBackoff', '0s'),
736                        ),
737                        max_backoff=self._to_float(r.get('maxBackoff', '0s')),
738                        backoff_multiplier=r.get('backoffMultiplier', 0.0),
739                        retryable_exceptions=frozenset(
740                            exceptions.exception_class_for_grpc_status(
741                                getattr(grpc.StatusCode, code),
742                            )
743                            for code in r.get('retryableStatusCodes', [])
744                        ),
745                    )
746
747        return retry, timeout
748
749    def _maybe_get_lro(
750        self,
751        service_address: metadata.Address,
752        meth_pb: descriptor_pb2.MethodDescriptorProto
753    ) -> Optional[wrappers.OperationInfo]:
754        """Determines whether a method is a Long Running Operation (aka LRO)
755               and, if it is, return an OperationInfo that includes the response
756               and metadata types.
757
758        Args:
759            service_address (~.metadata.Address): An address object for the
760                service, denoting the location of these methods.
761            meth_pb (~.descriptor_pb2.MethodDescriptorProto): A
762                protobuf method objects.
763
764        Returns:
765            Optional[~.wrappers.OperationInfo]: The info for the long-running
766                operation, if the passed method is an LRO.
767        """
768        lro = None
769
770        # If the output type is google.longrunning.Operation, we use
771        # a specialized object in its place.
772        if meth_pb.output_type.endswith('google.longrunning.Operation'):
773            op = meth_pb.options.Extensions[operations_pb2.operation_info]
774            if not op.response_type or not op.metadata_type:
775                raise TypeError(
776                    f'rpc {meth_pb.name} returns a google.longrunning.'
777                    'Operation, but is missing a response type or '
778                    'metadata type.',
779                )
780            response_key = service_address.resolve(op.response_type)
781            metadata_key = service_address.resolve(op.metadata_type)
782            lro = wrappers.OperationInfo(
783                response_type=self.api_messages[response_key],
784                metadata_type=self.api_messages[metadata_key],
785            )
786
787        return lro
788
789    def _get_methods(self,
790                     methods: Sequence[descriptor_pb2.MethodDescriptorProto],
791                     service_address: metadata.Address, path: Tuple[int, ...],
792                     ) -> Mapping[str, wrappers.Method]:
793        """Return a dictionary of wrapped methods for the given service.
794
795        Args:
796            methods (Sequence[~.descriptor_pb2.MethodDescriptorProto]): A
797                sequence of protobuf method objects.
798            service_address (~.metadata.Address): An address object for the
799                service, denoting the location of these methods.
800            path (Tuple[int]): The source location path thus far, as understood
801                by ``SourceCodeInfo.Location``.
802
803        Returns:
804            Mapping[str, ~.wrappers.Method]: A ordered mapping of
805                :class:`~.wrappers.Method` objects.
806        """
807        # Iterate over the methods and collect them into a dictionary.
808        answer: Dict[str, wrappers.Method] = collections.OrderedDict()
809        for i, meth_pb in enumerate(methods):
810            retry, timeout = self._get_retry_and_timeout(
811                service_address,
812                meth_pb
813            )
814
815            # Create the method wrapper object.
816            answer[meth_pb.name] = wrappers.Method(
817                input=self.api_messages[meth_pb.input_type.lstrip('.')],
818                lro=self._maybe_get_lro(service_address, meth_pb),
819                method_pb=meth_pb,
820                meta=metadata.Metadata(
821                    address=service_address.child(meth_pb.name, path + (i,)),
822                    documentation=self.docs.get(path + (i,), self.EMPTY),
823                ),
824                output=self.api_messages[meth_pb.output_type.lstrip('.')],
825                retry=retry,
826                timeout=timeout,
827            )
828
829        # Done; return the answer.
830        return answer
831
832    def _load_message(self,
833                      message_pb: descriptor_pb2.DescriptorProto,
834                      address: metadata.Address,
835                      path: Tuple[int],
836                      resources: Mapping[str, wrappers.MessageType],
837                      ) -> wrappers.MessageType:
838        """Load message descriptions from DescriptorProtos."""
839        address = address.child(message_pb.name, path)
840
841        # Load all nested items.
842        #
843        # Note: This occurs before piecing together this message's fields
844        # because if nested types are present, they are generally the
845        # type of one of this message's fields, and they need to be in
846        # the registry for the field's message or enum attributes to be
847        # set correctly.
848        nested_enums = self._load_children(
849            message_pb.enum_type,
850            address=address,
851            loader=self._load_enum,
852            path=path + (4,),
853            resources=resources,
854        )
855        nested_messages = self._load_children(
856            message_pb.nested_type,
857            address=address,
858            loader=self._load_message,
859            path=path + (3,),
860            resources=resources,
861        )
862
863        oneofs = self._get_oneofs(
864            message_pb.oneof_decl,
865            address=address,
866            path=path + (7,),
867        )
868
869        # Create a dictionary of all the fields for this message.
870        fields = self._get_fields(
871            message_pb.field,
872            address=address,
873            path=path + (2,),
874            oneofs=oneofs,
875        )
876        fields.update(self._get_fields(
877            message_pb.extension,
878            address=address,
879            path=path + (6,),
880            oneofs=oneofs,
881        ))
882
883        # Create a message correspoding to this descriptor.
884        self.proto_messages[address.proto] = wrappers.MessageType(
885            fields=fields,
886            message_pb=message_pb,
887            nested_enums=nested_enums,
888            nested_messages=nested_messages,
889            meta=metadata.Metadata(
890                address=address,
891                documentation=self.docs.get(path, self.EMPTY),
892            ),
893            oneofs=oneofs,
894        )
895        return self.proto_messages[address.proto]
896
897    def _load_enum(self,
898                   enum: descriptor_pb2.EnumDescriptorProto,
899                   address: metadata.Address,
900                   path: Tuple[int],
901                   resources: Mapping[str, wrappers.MessageType],
902                   ) -> wrappers.EnumType:
903        """Load enum descriptions from EnumDescriptorProtos."""
904        address = address.child(enum.name, path)
905
906        # Put together wrapped objects for the enum values.
907        values = []
908        for enum_value, i in zip(enum.value, range(0, sys.maxsize)):
909            values.append(wrappers.EnumValueType(
910                enum_value_pb=enum_value,
911                meta=metadata.Metadata(
912                    address=address,
913                    documentation=self.docs.get(path + (2, i), self.EMPTY),
914                ),
915            ))
916
917        # Load the enum itself.
918        self.proto_enums[address.proto] = wrappers.EnumType(
919            enum_pb=enum,
920            meta=metadata.Metadata(
921                address=address,
922                documentation=self.docs.get(path, self.EMPTY),
923            ),
924            values=values,
925        )
926        return self.proto_enums[address.proto]
927
928    def _load_service(self,
929                      service: descriptor_pb2.ServiceDescriptorProto,
930                      address: metadata.Address,
931                      path: Tuple[int],
932                      resources: Mapping[str, wrappers.MessageType],
933                      ) -> wrappers.Service:
934        """Load comments for a service and its methods."""
935        address = address.child(service.name, path)
936
937        # Put together a dictionary of the service's methods.
938        methods = self._get_methods(
939            service.method,
940            service_address=address,
941            path=path + (2,),
942        )
943
944        # Load the comments for the service itself.
945        self.proto_services[address.proto] = wrappers.Service(
946            meta=metadata.Metadata(
947                address=address,
948                documentation=self.docs.get(path, self.EMPTY),
949            ),
950            methods=methods,
951            service_pb=service,
952            visible_resources=resources,
953        )
954        return self.proto_services[address.proto]
955
956    def _to_float(self, s: str) -> float:
957        """Convert a protobuf duration string (e.g. `"30s"`) to float."""
958        return int(s[:-1]) / 1e9 if s.endswith('n') else float(s[:-1])
959