1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import time
16import unittest
17
18import grpc
19import grpc_testing
20
21from tests.testing import _application_common
22from tests.testing import _application_testing_common
23from tests.testing import _server_application
24from tests.testing.proto import services_pb2
25
26
27class FirstServiceServicerTest(unittest.TestCase):
28
29    def setUp(self):
30        self._real_time = grpc_testing.strict_real_time()
31        self._fake_time = grpc_testing.strict_fake_time(time.time())
32        servicer = _server_application.FirstServiceServicer()
33        descriptors_to_servicers = {
34            _application_testing_common.FIRST_SERVICE: servicer
35        }
36        self._real_time_server = grpc_testing.server_from_dictionary(
37            descriptors_to_servicers, self._real_time)
38        self._fake_time_server = grpc_testing.server_from_dictionary(
39            descriptors_to_servicers, self._fake_time)
40
41    def test_successful_unary_unary(self):
42        rpc = self._real_time_server.invoke_unary_unary(
43            _application_testing_common.FIRST_SERVICE_UNUN, (),
44            _application_common.UNARY_UNARY_REQUEST, None)
45        initial_metadata = rpc.initial_metadata()
46        response, trailing_metadata, code, details = rpc.termination()
47
48        self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
49        self.assertIs(code, grpc.StatusCode.OK)
50
51    def test_successful_unary_stream(self):
52        rpc = self._real_time_server.invoke_unary_stream(
53            _application_testing_common.FIRST_SERVICE_UNSTRE, (),
54            _application_common.UNARY_STREAM_REQUEST, None)
55        initial_metadata = rpc.initial_metadata()
56        trailing_metadata, code, details = rpc.termination()
57
58        self.assertIs(code, grpc.StatusCode.OK)
59
60    def test_successful_stream_unary(self):
61        rpc = self._real_time_server.invoke_stream_unary(
62            _application_testing_common.FIRST_SERVICE_STREUN, (), None)
63        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
64        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
65        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
66        rpc.requests_closed()
67        initial_metadata = rpc.initial_metadata()
68        response, trailing_metadata, code, details = rpc.termination()
69
70        self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response)
71        self.assertIs(code, grpc.StatusCode.OK)
72
73    def test_successful_stream_stream(self):
74        rpc = self._real_time_server.invoke_stream_stream(
75            _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
76        rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
77        initial_metadata = rpc.initial_metadata()
78        responses = [
79            rpc.take_response(),
80            rpc.take_response(),
81        ]
82        rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
83        rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
84        responses.extend([
85            rpc.take_response(),
86            rpc.take_response(),
87            rpc.take_response(),
88            rpc.take_response(),
89        ])
90        rpc.requests_closed()
91        trailing_metadata, code, details = rpc.termination()
92
93        for response in responses:
94            self.assertEqual(_application_common.STREAM_STREAM_RESPONSE,
95                             response)
96        self.assertIs(code, grpc.StatusCode.OK)
97
98    def test_mutating_stream_stream(self):
99        rpc = self._real_time_server.invoke_stream_stream(
100            _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
101        rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST)
102        initial_metadata = rpc.initial_metadata()
103        responses = [
104            rpc.take_response()
105            for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT)
106        ]
107        rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST)
108        responses.extend([
109            rpc.take_response()
110            for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT)
111        ])
112        rpc.requests_closed()
113        _, _, _ = rpc.termination()
114        expected_responses = (
115            services_pb2.Bottom(first_bottom_field=0),
116            services_pb2.Bottom(first_bottom_field=1),
117            services_pb2.Bottom(first_bottom_field=0),
118            services_pb2.Bottom(first_bottom_field=1),
119        )
120        self.assertSequenceEqual(expected_responses, responses)
121
122    def test_server_rpc_idempotence(self):
123        rpc = self._real_time_server.invoke_unary_unary(
124            _application_testing_common.FIRST_SERVICE_UNUN, (),
125            _application_common.UNARY_UNARY_REQUEST, None)
126        first_initial_metadata = rpc.initial_metadata()
127        second_initial_metadata = rpc.initial_metadata()
128        third_initial_metadata = rpc.initial_metadata()
129        first_termination = rpc.termination()
130        second_termination = rpc.termination()
131        third_termination = rpc.termination()
132
133        for later_initial_metadata in (
134                second_initial_metadata,
135                third_initial_metadata,
136        ):
137            self.assertEqual(first_initial_metadata, later_initial_metadata)
138        response = first_termination[0]
139        terminal_metadata = first_termination[1]
140        code = first_termination[2]
141        details = first_termination[3]
142        for later_termination in (
143                second_termination,
144                third_termination,
145        ):
146            self.assertEqual(response, later_termination[0])
147            self.assertEqual(terminal_metadata, later_termination[1])
148            self.assertIs(code, later_termination[2])
149            self.assertEqual(details, later_termination[3])
150        self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
151        self.assertIs(code, grpc.StatusCode.OK)
152
153    def test_misbehaving_client_unary_unary(self):
154        rpc = self._real_time_server.invoke_unary_unary(
155            _application_testing_common.FIRST_SERVICE_UNUN, (),
156            _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None)
157        initial_metadata = rpc.initial_metadata()
158        response, trailing_metadata, code, details = rpc.termination()
159
160        self.assertIsNot(code, grpc.StatusCode.OK)
161
162    def test_infinite_request_stream_real_time(self):
163        rpc = self._real_time_server.invoke_stream_unary(
164            _application_testing_common.FIRST_SERVICE_STREUN, (),
165            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
166        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
167        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
168        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
169        initial_metadata = rpc.initial_metadata()
170        self._real_time.sleep_for(
171            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
172        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
173        response, trailing_metadata, code, details = rpc.termination()
174
175        self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
176
177    def test_infinite_request_stream_fake_time(self):
178        rpc = self._fake_time_server.invoke_stream_unary(
179            _application_testing_common.FIRST_SERVICE_STREUN, (),
180            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
181        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
182        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
183        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
184        initial_metadata = rpc.initial_metadata()
185        self._fake_time.sleep_for(
186            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
187        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
188        response, trailing_metadata, code, details = rpc.termination()
189
190        self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
191
192    def test_servicer_context_abort(self):
193        rpc = self._real_time_server.invoke_unary_unary(
194            _application_testing_common.FIRST_SERVICE_UNUN, (),
195            _application_common.ABORT_REQUEST, None)
196        _, _, code, _ = rpc.termination()
197        self.assertIs(code, grpc.StatusCode.PERMISSION_DENIED)
198        rpc = self._real_time_server.invoke_unary_unary(
199            _application_testing_common.FIRST_SERVICE_UNUN, (),
200            _application_common.ABORT_SUCCESS_QUERY, None)
201        response, _, code, _ = rpc.termination()
202        self.assertEqual(_application_common.ABORT_SUCCESS_RESPONSE, response)
203        self.assertIs(code, grpc.StatusCode.OK)
204
205
206if __name__ == '__main__':
207    unittest.main(verbosity=2)
208