1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_status with gRPC AsyncIO stack."""
15
16import logging
17import traceback
18import unittest
19
20import grpc
21from google.protobuf import any_pb2
22from google.rpc import code_pb2, error_details_pb2, status_pb2
23from grpc.experimental import aio
24
25from grpc_status import rpc_status
26from tests_aio.unit._test_base import AioTestBase
27
28_STATUS_OK = '/test/StatusOK'
29_STATUS_NOT_OK = '/test/StatusNotOk'
30_ERROR_DETAILS = '/test/ErrorDetails'
31_INCONSISTENT = '/test/Inconsistent'
32_INVALID_CODE = '/test/InvalidCode'
33
34_REQUEST = b'\x00\x00\x00'
35_RESPONSE = b'\x01\x01\x01'
36
37_GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin'
38
39_STATUS_DETAILS = 'This is an error detail'
40_STATUS_DETAILS_ANOTHER = 'This is another error detail'
41
42
43async def _ok_unary_unary(request, servicer_context):
44    return _RESPONSE
45
46
47async def _not_ok_unary_unary(request, servicer_context):
48    await servicer_context.abort(grpc.StatusCode.INTERNAL, _STATUS_DETAILS)
49
50
51async def _error_details_unary_unary(request, servicer_context):
52    details = any_pb2.Any()
53    details.Pack(
54        error_details_pb2.DebugInfo(stack_entries=traceback.format_stack(),
55                                    detail='Intentionally invoked'))
56    rich_status = status_pb2.Status(
57        code=code_pb2.INTERNAL,
58        message=_STATUS_DETAILS,
59        details=[details],
60    )
61    await servicer_context.abort_with_status(rpc_status.to_status(rich_status))
62
63
64async def _inconsistent_unary_unary(request, servicer_context):
65    rich_status = status_pb2.Status(
66        code=code_pb2.INTERNAL,
67        message=_STATUS_DETAILS,
68    )
69    servicer_context.set_code(grpc.StatusCode.NOT_FOUND)
70    servicer_context.set_details(_STATUS_DETAILS_ANOTHER)
71    # User put inconsistent status information in trailing metadata
72    servicer_context.set_trailing_metadata(
73        ((_GRPC_DETAILS_METADATA_KEY, rich_status.SerializeToString()),))
74
75
76async def _invalid_code_unary_unary(request, servicer_context):
77    rich_status = status_pb2.Status(
78        code=42,
79        message='Invalid code',
80    )
81    await servicer_context.abort_with_status(rpc_status.to_status(rich_status))
82
83
84class _GenericHandler(grpc.GenericRpcHandler):
85
86    def service(self, handler_call_details):
87        if handler_call_details.method == _STATUS_OK:
88            return grpc.unary_unary_rpc_method_handler(_ok_unary_unary)
89        elif handler_call_details.method == _STATUS_NOT_OK:
90            return grpc.unary_unary_rpc_method_handler(_not_ok_unary_unary)
91        elif handler_call_details.method == _ERROR_DETAILS:
92            return grpc.unary_unary_rpc_method_handler(
93                _error_details_unary_unary)
94        elif handler_call_details.method == _INCONSISTENT:
95            return grpc.unary_unary_rpc_method_handler(
96                _inconsistent_unary_unary)
97        elif handler_call_details.method == _INVALID_CODE:
98            return grpc.unary_unary_rpc_method_handler(
99                _invalid_code_unary_unary)
100        else:
101            return None
102
103
104class StatusTest(AioTestBase):
105
106    async def setUp(self):
107        self._server = aio.server()
108        self._server.add_generic_rpc_handlers((_GenericHandler(),))
109        port = self._server.add_insecure_port('[::]:0')
110        await self._server.start()
111
112        self._channel = aio.insecure_channel('localhost:%d' % port)
113
114    async def tearDown(self):
115        await self._server.stop(None)
116        await self._channel.close()
117
118    async def test_status_ok(self):
119        call = self._channel.unary_unary(_STATUS_OK)(_REQUEST)
120
121        # Succeed RPC doesn't have status
122        status = await rpc_status.aio.from_call(call)
123        self.assertIs(status, None)
124
125    async def test_status_not_ok(self):
126        call = self._channel.unary_unary(_STATUS_NOT_OK)(_REQUEST)
127        with self.assertRaises(aio.AioRpcError) as exception_context:
128            await call
129        rpc_error = exception_context.exception
130
131        self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
132        # Failed RPC doesn't automatically generate status
133        status = await rpc_status.aio.from_call(call)
134        self.assertIs(status, None)
135
136    async def test_error_details(self):
137        call = self._channel.unary_unary(_ERROR_DETAILS)(_REQUEST)
138        with self.assertRaises(aio.AioRpcError) as exception_context:
139            await call
140        rpc_error = exception_context.exception
141
142        status = await rpc_status.aio.from_call(call)
143        self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
144        self.assertEqual(status.code, code_pb2.Code.Value('INTERNAL'))
145
146        # Check if the underlying proto message is intact
147        self.assertTrue(status.details[0].Is(
148            error_details_pb2.DebugInfo.DESCRIPTOR))
149        info = error_details_pb2.DebugInfo()
150        status.details[0].Unpack(info)
151        self.assertIn('_error_details_unary_unary', info.stack_entries[-1])
152
153    async def test_code_message_validation(self):
154        call = self._channel.unary_unary(_INCONSISTENT)(_REQUEST)
155        with self.assertRaises(aio.AioRpcError) as exception_context:
156            await call
157        rpc_error = exception_context.exception
158        self.assertEqual(rpc_error.code(), grpc.StatusCode.NOT_FOUND)
159
160        # Code/Message validation failed
161        with self.assertRaises(ValueError):
162            await rpc_status.aio.from_call(call)
163
164    async def test_invalid_code(self):
165        with self.assertRaises(aio.AioRpcError) as exception_context:
166            await self._channel.unary_unary(_INVALID_CODE)(_REQUEST)
167        rpc_error = exception_context.exception
168        self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN)
169        # Invalid status code exception raised during coversion
170        self.assertIn('Invalid status code', rpc_error.details())
171
172
173if __name__ == '__main__':
174    logging.basicConfig()
175    unittest.main(verbosity=2)
176