1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import time
16import unittest
17
18import grpc
19import grpc_testing
20
21from tests.testing import _application_common
22from tests.testing import _application_testing_common
23from tests.testing import _server_application
24
25
26class FirstServiceServicerTest(unittest.TestCase):
27
28    def setUp(self):
29        self._real_time = grpc_testing.strict_real_time()
30        self._fake_time = grpc_testing.strict_fake_time(time.time())
31        servicer = _server_application.FirstServiceServicer()
32        descriptors_to_servicers = {
33            _application_testing_common.FIRST_SERVICE: servicer
34        }
35        self._real_time_server = grpc_testing.server_from_dictionary(
36            descriptors_to_servicers, self._real_time)
37        self._fake_time_server = grpc_testing.server_from_dictionary(
38            descriptors_to_servicers, self._fake_time)
39
40    def test_successful_unary_unary(self):
41        rpc = self._real_time_server.invoke_unary_unary(
42            _application_testing_common.FIRST_SERVICE_UNUN, (),
43            _application_common.UNARY_UNARY_REQUEST, None)
44        initial_metadata = rpc.initial_metadata()
45        response, trailing_metadata, code, details = rpc.termination()
46
47        self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
48        self.assertIs(code, grpc.StatusCode.OK)
49
50    def test_successful_unary_stream(self):
51        rpc = self._real_time_server.invoke_unary_stream(
52            _application_testing_common.FIRST_SERVICE_UNSTRE, (),
53            _application_common.UNARY_STREAM_REQUEST, None)
54        initial_metadata = rpc.initial_metadata()
55        trailing_metadata, code, details = rpc.termination()
56
57        self.assertIs(code, grpc.StatusCode.OK)
58
59    def test_successful_stream_unary(self):
60        rpc = self._real_time_server.invoke_stream_unary(
61            _application_testing_common.FIRST_SERVICE_STREUN, (), None)
62        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
63        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
64        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
65        rpc.requests_closed()
66        initial_metadata = rpc.initial_metadata()
67        response, trailing_metadata, code, details = rpc.termination()
68
69        self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response)
70        self.assertIs(code, grpc.StatusCode.OK)
71
72    def test_successful_stream_stream(self):
73        rpc = self._real_time_server.invoke_stream_stream(
74            _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
75        rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
76        initial_metadata = rpc.initial_metadata()
77        responses = [
78            rpc.take_response(),
79            rpc.take_response(),
80        ]
81        rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
82        rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
83        responses.extend([
84            rpc.take_response(),
85            rpc.take_response(),
86            rpc.take_response(),
87            rpc.take_response(),
88        ])
89        rpc.requests_closed()
90        trailing_metadata, code, details = rpc.termination()
91
92        for response in responses:
93            self.assertEqual(_application_common.STREAM_STREAM_RESPONSE,
94                             response)
95        self.assertIs(code, grpc.StatusCode.OK)
96
97    def test_server_rpc_idempotence(self):
98        rpc = self._real_time_server.invoke_unary_unary(
99            _application_testing_common.FIRST_SERVICE_UNUN, (),
100            _application_common.UNARY_UNARY_REQUEST, None)
101        first_initial_metadata = rpc.initial_metadata()
102        second_initial_metadata = rpc.initial_metadata()
103        third_initial_metadata = rpc.initial_metadata()
104        first_termination = rpc.termination()
105        second_termination = rpc.termination()
106        third_termination = rpc.termination()
107
108        for later_initial_metadata in (
109                second_initial_metadata,
110                third_initial_metadata,
111        ):
112            self.assertEqual(first_initial_metadata, later_initial_metadata)
113        response = first_termination[0]
114        terminal_metadata = first_termination[1]
115        code = first_termination[2]
116        details = first_termination[3]
117        for later_termination in (
118                second_termination,
119                third_termination,
120        ):
121            self.assertEqual(response, later_termination[0])
122            self.assertEqual(terminal_metadata, later_termination[1])
123            self.assertIs(code, later_termination[2])
124            self.assertEqual(details, later_termination[3])
125        self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
126        self.assertIs(code, grpc.StatusCode.OK)
127
128    def test_misbehaving_client_unary_unary(self):
129        rpc = self._real_time_server.invoke_unary_unary(
130            _application_testing_common.FIRST_SERVICE_UNUN, (),
131            _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None)
132        initial_metadata = rpc.initial_metadata()
133        response, trailing_metadata, code, details = rpc.termination()
134
135        self.assertIsNot(code, grpc.StatusCode.OK)
136
137    def test_infinite_request_stream_real_time(self):
138        rpc = self._real_time_server.invoke_stream_unary(
139            _application_testing_common.FIRST_SERVICE_STREUN, (),
140            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
141        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
142        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
143        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
144        initial_metadata = rpc.initial_metadata()
145        self._real_time.sleep_for(
146            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
147        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
148        response, trailing_metadata, code, details = rpc.termination()
149
150        self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
151
152    def test_infinite_request_stream_fake_time(self):
153        rpc = self._fake_time_server.invoke_stream_unary(
154            _application_testing_common.FIRST_SERVICE_STREUN, (),
155            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
156        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
157        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
158        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
159        initial_metadata = rpc.initial_metadata()
160        self._fake_time.sleep_for(
161            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
162        rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
163        response, trailing_metadata, code, details = rpc.termination()
164
165        self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
166
167
168if __name__ == '__main__':
169    unittest.main(verbosity=2)
170