1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Objects for use in testing gRPC Python-using application code."""
15
16import abc
17
18from google.protobuf import descriptor
19import grpc
20import six
21
22
23class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
24    """Fixture for a unary-unary RPC invoked by a system under test.
25
26    Enables users to "play server" for the RPC.
27    """
28
29    @abc.abstractmethod
30    def send_initial_metadata(self, initial_metadata):
31        """Sends the RPC's initial metadata to the system under test.
32
33        Args:
34          initial_metadata: The RPC's initial metadata to be "sent" to
35            the system under test.
36        """
37        raise NotImplementedError()
38
39    @abc.abstractmethod
40    def cancelled(self):
41        """Blocks until the system under test has cancelled the RPC."""
42        raise NotImplementedError()
43
44    @abc.abstractmethod
45    def terminate(self, response, trailing_metadata, code, details):
46        """Terminates the RPC.
47
48        Args:
49          response: The response for the RPC.
50          trailing_metadata: The RPC's trailing metadata.
51          code: The RPC's status code.
52          details: The RPC's status details.
53        """
54        raise NotImplementedError()
55
56
57class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
58    """Fixture for a unary-stream RPC invoked by a system under test.
59
60    Enables users to "play server" for the RPC.
61    """
62
63    @abc.abstractmethod
64    def send_initial_metadata(self, initial_metadata):
65        """Sends the RPC's initial metadata to the system under test.
66
67        Args:
68          initial_metadata: The RPC's initial metadata to be "sent" to
69            the system under test.
70        """
71        raise NotImplementedError()
72
73    @abc.abstractmethod
74    def send_response(self, response):
75        """Sends a response to the system under test.
76
77        Args:
78          response: A response message to be "sent" to the system under test.
79        """
80        raise NotImplementedError()
81
82    @abc.abstractmethod
83    def cancelled(self):
84        """Blocks until the system under test has cancelled the RPC."""
85        raise NotImplementedError()
86
87    @abc.abstractmethod
88    def terminate(self, trailing_metadata, code, details):
89        """Terminates the RPC.
90
91        Args:
92          trailing_metadata: The RPC's trailing metadata.
93          code: The RPC's status code.
94          details: The RPC's status details.
95        """
96        raise NotImplementedError()
97
98
99class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
100    """Fixture for a stream-unary RPC invoked by a system under test.
101
102    Enables users to "play server" for the RPC.
103    """
104
105    @abc.abstractmethod
106    def send_initial_metadata(self, initial_metadata):
107        """Sends the RPC's initial metadata to the system under test.
108
109        Args:
110          initial_metadata: The RPC's initial metadata to be "sent" to
111            the system under test.
112        """
113        raise NotImplementedError()
114
115    @abc.abstractmethod
116    def take_request(self):
117        """Draws one of the requests added to the RPC by the system under test.
118
119        This method blocks until the system under test has added to the RPC
120        the request to be returned.
121
122        Successive calls to this method return requests in the same order in
123        which the system under test added them to the RPC.
124
125        Returns:
126          A request message added to the RPC by the system under test.
127        """
128        raise NotImplementedError()
129
130    @abc.abstractmethod
131    def requests_closed(self):
132        """Blocks until the system under test has closed the request stream."""
133        raise NotImplementedError()
134
135    @abc.abstractmethod
136    def cancelled(self):
137        """Blocks until the system under test has cancelled the RPC."""
138        raise NotImplementedError()
139
140    @abc.abstractmethod
141    def terminate(self, response, trailing_metadata, code, details):
142        """Terminates the RPC.
143
144        Args:
145          response: The response for the RPC.
146          trailing_metadata: The RPC's trailing metadata.
147          code: The RPC's status code.
148          details: The RPC's status details.
149        """
150        raise NotImplementedError()
151
152
153class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
154    """Fixture for a stream-stream RPC invoked by a system under test.
155
156    Enables users to "play server" for the RPC.
157    """
158
159    @abc.abstractmethod
160    def send_initial_metadata(self, initial_metadata):
161        """Sends the RPC's initial metadata to the system under test.
162
163        Args:
164          initial_metadata: The RPC's initial metadata to be "sent" to the
165            system under test.
166        """
167        raise NotImplementedError()
168
169    @abc.abstractmethod
170    def take_request(self):
171        """Draws one of the requests added to the RPC by the system under test.
172
173        This method blocks until the system under test has added to the RPC
174        the request to be returned.
175
176        Successive calls to this method return requests in the same order in
177        which the system under test added them to the RPC.
178
179        Returns:
180          A request message added to the RPC by the system under test.
181        """
182        raise NotImplementedError()
183
184    @abc.abstractmethod
185    def send_response(self, response):
186        """Sends a response to the system under test.
187
188        Args:
189          response: A response messages to be "sent" to the system under test.
190        """
191        raise NotImplementedError()
192
193    @abc.abstractmethod
194    def requests_closed(self):
195        """Blocks until the system under test has closed the request stream."""
196        raise NotImplementedError()
197
198    @abc.abstractmethod
199    def cancelled(self):
200        """Blocks until the system under test has cancelled the RPC."""
201        raise NotImplementedError()
202
203    @abc.abstractmethod
204    def terminate(self, trailing_metadata, code, details):
205        """Terminates the RPC.
206
207        Args:
208          trailing_metadata: The RPC's trailing metadata.
209          code: The RPC's status code.
210          details: The RPC's status details.
211        """
212        raise NotImplementedError()
213
214
215class Channel(six.with_metaclass(abc.ABCMeta, grpc.Channel)):
216    """A grpc.Channel double with which to test a system that invokes RPCs."""
217
218    @abc.abstractmethod
219    def take_unary_unary(self, method_descriptor):
220        """Draws an RPC currently being made by the system under test.
221
222        If the given descriptor does not identify any RPC currently being made
223        by the system under test, this method blocks until the system under
224        test invokes such an RPC.
225
226        Args:
227          method_descriptor: A descriptor.MethodDescriptor describing a
228            unary-unary RPC method.
229
230        Returns:
231          A (invocation_metadata, request, unary_unary_channel_rpc) tuple of
232            the RPC's invocation metadata, its request, and a
233            UnaryUnaryChannelRpc with which to "play server" for the RPC.
234        """
235        raise NotImplementedError()
236
237    @abc.abstractmethod
238    def take_unary_stream(self, method_descriptor):
239        """Draws an RPC currently being made by the system under test.
240
241        If the given descriptor does not identify any RPC currently being made
242        by the system under test, this method blocks until the system under
243        test invokes such an RPC.
244
245        Args:
246          method_descriptor: A descriptor.MethodDescriptor describing a
247            unary-stream RPC method.
248
249        Returns:
250          A (invocation_metadata, request, unary_stream_channel_rpc) tuple of
251            the RPC's invocation metadata, its request, and a
252            UnaryStreamChannelRpc with which to "play server" for the RPC.
253        """
254        raise NotImplementedError()
255
256    @abc.abstractmethod
257    def take_stream_unary(self, method_descriptor):
258        """Draws an RPC currently being made by the system under test.
259
260        If the given descriptor does not identify any RPC currently being made
261        by the system under test, this method blocks until the system under
262        test invokes such an RPC.
263
264        Args:
265          method_descriptor: A descriptor.MethodDescriptor describing a
266            stream-unary RPC method.
267
268        Returns:
269          A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's
270            invocation metadata and a StreamUnaryChannelRpc with which to "play
271            server" for the RPC.
272        """
273        raise NotImplementedError()
274
275    @abc.abstractmethod
276    def take_stream_stream(self, method_descriptor):
277        """Draws an RPC currently being made by the system under test.
278
279        If the given descriptor does not identify any RPC currently being made
280        by the system under test, this method blocks until the system under
281        test invokes such an RPC.
282
283        Args:
284          method_descriptor: A descriptor.MethodDescriptor describing a
285            stream-stream RPC method.
286
287        Returns:
288          A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's
289            invocation metadata and a StreamStreamChannelRpc with which to
290            "play server" for the RPC.
291        """
292        raise NotImplementedError()
293
294
295class UnaryUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
296    """Fixture for a unary-unary RPC serviced by a system under test.
297
298    Enables users to "play client" for the RPC.
299    """
300
301    @abc.abstractmethod
302    def initial_metadata(self):
303        """Accesses the initial metadata emitted by the system under test.
304
305        This method blocks until the system under test has added initial
306        metadata to the RPC (or has provided one or more response messages or
307        has terminated the RPC, either of which will cause gRPC Python to
308        synthesize initial metadata for the RPC).
309
310        Returns:
311          The initial metadata for the RPC.
312        """
313        raise NotImplementedError()
314
315    @abc.abstractmethod
316    def cancel(self):
317        """Cancels the RPC."""
318        raise NotImplementedError()
319
320    @abc.abstractmethod
321    def termination(self):
322        """Blocks until the system under test has terminated the RPC.
323
324        Returns:
325          A (response, trailing_metadata, code, details) sequence with the RPC's
326            response, trailing metadata, code, and details.
327        """
328        raise NotImplementedError()
329
330
331class UnaryStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
332    """Fixture for a unary-stream RPC serviced by a system under test.
333
334    Enables users to "play client" for the RPC.
335    """
336
337    @abc.abstractmethod
338    def initial_metadata(self):
339        """Accesses the initial metadata emitted by the system under test.
340
341        This method blocks until the system under test has added initial
342        metadata to the RPC (or has provided one or more response messages or
343        has terminated the RPC, either of which will cause gRPC Python to
344        synthesize initial metadata for the RPC).
345
346        Returns:
347          The initial metadata for the RPC.
348        """
349        raise NotImplementedError()
350
351    @abc.abstractmethod
352    def take_response(self):
353        """Draws one of the responses added to the RPC by the system under test.
354
355        Successive calls to this method return responses in the same order in
356        which the system under test added them to the RPC.
357
358        Returns:
359          A response message added to the RPC by the system under test.
360        """
361        raise NotImplementedError()
362
363    @abc.abstractmethod
364    def cancel(self):
365        """Cancels the RPC."""
366        raise NotImplementedError()
367
368    @abc.abstractmethod
369    def termination(self):
370        """Blocks until the system under test has terminated the RPC.
371
372        Returns:
373          A (trailing_metadata, code, details) sequence with the RPC's trailing
374            metadata, code, and details.
375        """
376        raise NotImplementedError()
377
378
379class StreamUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
380    """Fixture for a stream-unary RPC serviced by a system under test.
381
382    Enables users to "play client" for the RPC.
383    """
384
385    @abc.abstractmethod
386    def initial_metadata(self):
387        """Accesses the initial metadata emitted by the system under test.
388
389        This method blocks until the system under test has added initial
390        metadata to the RPC (or has provided one or more response messages or
391        has terminated the RPC, either of which will cause gRPC Python to
392        synthesize initial metadata for the RPC).
393
394        Returns:
395          The initial metadata for the RPC.
396        """
397        raise NotImplementedError()
398
399    @abc.abstractmethod
400    def send_request(self, request):
401        """Sends a request to the system under test.
402
403        Args:
404          request: A request message for the RPC to be "sent" to the system
405            under test.
406        """
407        raise NotImplementedError()
408
409    @abc.abstractmethod
410    def requests_closed(self):
411        """Indicates the end of the RPC's request stream."""
412        raise NotImplementedError()
413
414    @abc.abstractmethod
415    def cancel(self):
416        """Cancels the RPC."""
417        raise NotImplementedError()
418
419    @abc.abstractmethod
420    def termination(self):
421        """Blocks until the system under test has terminated the RPC.
422
423        Returns:
424          A (response, trailing_metadata, code, details) sequence with the RPC's
425            response, trailing metadata, code, and details.
426        """
427        raise NotImplementedError()
428
429
430class StreamStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
431    """Fixture for a stream-stream RPC serviced by a system under test.
432
433    Enables users to "play client" for the RPC.
434    """
435
436    @abc.abstractmethod
437    def initial_metadata(self):
438        """Accesses the initial metadata emitted by the system under test.
439
440        This method blocks until the system under test has added initial
441        metadata to the RPC (or has provided one or more response messages or
442        has terminated the RPC, either of which will cause gRPC Python to
443        synthesize initial metadata for the RPC).
444
445        Returns:
446          The initial metadata for the RPC.
447        """
448        raise NotImplementedError()
449
450    @abc.abstractmethod
451    def send_request(self, request):
452        """Sends a request to the system under test.
453
454        Args:
455          request: A request message for the RPC to be "sent" to the system
456            under test.
457        """
458        raise NotImplementedError()
459
460    @abc.abstractmethod
461    def requests_closed(self):
462        """Indicates the end of the RPC's request stream."""
463        raise NotImplementedError()
464
465    @abc.abstractmethod
466    def take_response(self):
467        """Draws one of the responses added to the RPC by the system under test.
468
469        Successive calls to this method return responses in the same order in
470        which the system under test added them to the RPC.
471
472        Returns:
473          A response message added to the RPC by the system under test.
474        """
475        raise NotImplementedError()
476
477    @abc.abstractmethod
478    def cancel(self):
479        """Cancels the RPC."""
480        raise NotImplementedError()
481
482    @abc.abstractmethod
483    def termination(self):
484        """Blocks until the system under test has terminated the RPC.
485
486        Returns:
487          A (trailing_metadata, code, details) sequence with the RPC's trailing
488            metadata, code, and details.
489        """
490        raise NotImplementedError()
491
492
493class Server(six.with_metaclass(abc.ABCMeta)):
494    """A server with which to test a system that services RPCs."""
495
496    @abc.abstractmethod
497    def invoke_unary_unary(self, method_descriptor, invocation_metadata,
498                           request, timeout):
499        """Invokes an RPC to be serviced by the system under test.
500
501        Args:
502          method_descriptor: A descriptor.MethodDescriptor describing a unary-unary
503            RPC method.
504          invocation_metadata: The RPC's invocation metadata.
505          request: The RPC's request.
506          timeout: A duration of time in seconds for the RPC or None to
507            indicate that the RPC has no time limit.
508
509        Returns:
510          A UnaryUnaryServerRpc with which to "play client" for the RPC.
511        """
512        raise NotImplementedError()
513
514    @abc.abstractmethod
515    def invoke_unary_stream(self, method_descriptor, invocation_metadata,
516                            request, timeout):
517        """Invokes an RPC to be serviced by the system under test.
518
519        Args:
520          method_descriptor: A descriptor.MethodDescriptor describing a unary-stream
521            RPC method.
522          invocation_metadata: The RPC's invocation metadata.
523          request: The RPC's request.
524          timeout: A duration of time in seconds for the RPC or None to
525            indicate that the RPC has no time limit.
526
527        Returns:
528          A UnaryStreamServerRpc with which to "play client" for the RPC.
529        """
530        raise NotImplementedError()
531
532    @abc.abstractmethod
533    def invoke_stream_unary(self, method_descriptor, invocation_metadata,
534                            timeout):
535        """Invokes an RPC to be serviced by the system under test.
536
537        Args:
538          method_descriptor: A descriptor.MethodDescriptor describing a stream-unary
539            RPC method.
540          invocation_metadata: The RPC's invocation metadata.
541          timeout: A duration of time in seconds for the RPC or None to
542            indicate that the RPC has no time limit.
543
544        Returns:
545          A StreamUnaryServerRpc with which to "play client" for the RPC.
546        """
547        raise NotImplementedError()
548
549    @abc.abstractmethod
550    def invoke_stream_stream(self, method_descriptor, invocation_metadata,
551                             timeout):
552        """Invokes an RPC to be serviced by the system under test.
553
554        Args:
555          method_descriptor: A descriptor.MethodDescriptor describing a stream-stream
556            RPC method.
557          invocation_metadata: The RPC's invocation metadata.
558          timeout: A duration of time in seconds for the RPC or None to
559            indicate that the RPC has no time limit.
560
561        Returns:
562          A StreamStreamServerRpc with which to "play client" for the RPC.
563        """
564        raise NotImplementedError()
565
566
567class Time(six.with_metaclass(abc.ABCMeta)):
568    """A simulation of time.
569
570    Implementations needn't be connected with real time as provided by the
571    Python interpreter, but as long as systems under test use
572    RpcContext.is_active and RpcContext.time_remaining for querying RPC liveness
573    implementations may be used to change passage of time in tests.
574    """
575
576    @abc.abstractmethod
577    def time(self):
578        """Accesses the current test time.
579
580        Returns:
581          The current test time (over which this object has authority).
582        """
583        raise NotImplementedError()
584
585    @abc.abstractmethod
586    def call_in(self, behavior, delay):
587        """Adds a behavior to be called after some time.
588
589        Args:
590          behavior: A behavior to be called with no arguments.
591          delay: A duration of time in seconds after which to call the behavior.
592
593        Returns:
594          A grpc.Future with which the call of the behavior may be cancelled
595            before it is executed.
596        """
597        raise NotImplementedError()
598
599    @abc.abstractmethod
600    def call_at(self, behavior, time):
601        """Adds a behavior to be called at a specific time.
602
603        Args:
604          behavior: A behavior to be called with no arguments.
605          time: The test time at which to call the behavior.
606
607        Returns:
608          A grpc.Future with which the call of the behavior may be cancelled
609            before it is executed.
610        """
611        raise NotImplementedError()
612
613    @abc.abstractmethod
614    def sleep_for(self, duration):
615        """Blocks for some length of test time.
616
617        Args:
618          duration: A duration of test time in seconds for which to block.
619        """
620        raise NotImplementedError()
621
622    @abc.abstractmethod
623    def sleep_until(self, time):
624        """Blocks until some test time.
625
626        Args:
627          time: The test time until which to block.
628        """
629        raise NotImplementedError()
630
631
632def strict_real_time():
633    """Creates a Time backed by the Python interpreter's time.
634
635    The returned instance will be "strict" with respect to callbacks
636    submitted to it: it will ensure that all callbacks registered to
637    be called at time t have been called before it describes the time
638    as having advanced beyond t.
639
640    Returns:
641      A Time backed by the "system" (Python interpreter's) time.
642    """
643    from grpc_testing import _time
644    return _time.StrictRealTime()
645
646
647def strict_fake_time(now):
648    """Creates a Time that can be manipulated by test code.
649
650    The returned instance maintains an internal representation of time
651    independent of real time. This internal representation only advances
652    when user code calls the instance's sleep_for and sleep_until methods.
653
654    The returned instance will be "strict" with respect to callbacks
655    submitted to it: it will ensure that all callbacks registered to
656    be called at time t have been called before it describes the time
657    as having advanced beyond t.
658
659    Returns:
660      A Time that simulates the passage of time.
661    """
662    from grpc_testing import _time
663    return _time.StrictFakeTime(now)
664
665
666def channel(service_descriptors, time):
667    """Creates a Channel for use in tests of a gRPC Python-using system.
668
669    Args:
670      service_descriptors: An iterable of descriptor.ServiceDescriptors
671        describing the RPCs that will be made on the returned Channel by the
672        system under test.
673      time: A Time to be used for tests.
674
675    Returns:
676      A Channel for use in tests.
677    """
678    from grpc_testing import _channel
679    return _channel.testing_channel(service_descriptors, time)
680
681
682def server_from_dictionary(descriptors_to_servicers, time):
683    """Creates a Server for use in tests of a gRPC Python-using system.
684
685    Args:
686      descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors
687        defining RPC services to servicer objects (usually instances of classes
688        that implement "Servicer" interfaces defined in generated "_pb2_grpc"
689        modules) implementing those services.
690      time: A Time to be used for tests.
691
692    Returns:
693      A Server for use in tests.
694    """
695    from grpc_testing import _server
696    return _server.server_from_dictionary(descriptors_to_servicers, time)
697