1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16cdef class CallbackFailureHandler:
17
18    def __cinit__(self,
19                  str core_function_name,
20                  object error_details,
21                  object exception_type):
22        """Handles failure by raising exception."""
23        self._core_function_name = core_function_name
24        self._error_details = error_details
25        self._exception_type = exception_type
26
27    cdef handle(self, object future):
28        future.set_exception(self._exception_type(
29            'Failed "%s": %s' % (self._core_function_name, self._error_details)
30        ))
31
32
33cdef class CallbackWrapper:
34
35    def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler):
36        self.context.functor.functor_run = self.functor_run
37        self.context.waiter = <cpython.PyObject*>future
38        self.context.loop = <cpython.PyObject*>loop
39        self.context.failure_handler = <cpython.PyObject*>failure_handler
40        self.context.callback_wrapper = <cpython.PyObject*>self
41        # NOTE(lidiz) Not using a list here, because this class is critical in
42        # data path. We should make it as efficient as possible.
43        self._reference_of_future = future
44        self._reference_of_failure_handler = failure_handler
45        # NOTE(lidiz) We need to ensure when Core invokes our callback, the
46        # callback function itself is not deallocated. Othersise, we will get
47        # a segfault. We can view this as Core holding a ref.
48        cpython.Py_INCREF(self)
49
50    @staticmethod
51    cdef void functor_run(
52            grpc_completion_queue_functor* functor,
53            int success):
54        cdef CallbackContext *context = <CallbackContext *>functor
55        cdef object waiter = <object>context.waiter
56        if not waiter.cancelled():
57            if success == 0:
58                (<CallbackFailureHandler>context.failure_handler).handle(waiter)
59            else:
60                waiter.set_result(None)
61        cpython.Py_DECREF(<object>context.callback_wrapper)
62
63    cdef grpc_completion_queue_functor *c_functor(self):
64        return &self.context.functor
65
66
67cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler(
68    'grpc_completion_queue_shutdown',
69    'Unknown',
70    InternalError)
71
72
73class ExecuteBatchError(InternalError):
74    """Raised when execute batch returns a failure from Core."""
75
76
77async def execute_batch(GrpcCallWrapper grpc_call_wrapper,
78                               tuple operations,
79                               object loop):
80    """The callback version of start batch operations."""
81    cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None)
82    batch_operation_tag.prepare()
83
84    cdef object future = loop.create_future()
85    cdef CallbackWrapper wrapper = CallbackWrapper(
86        future,
87        loop,
88        CallbackFailureHandler('execute_batch', operations, ExecuteBatchError))
89    cdef grpc_call_error error = grpc_call_start_batch(
90        grpc_call_wrapper.call,
91        batch_operation_tag.c_ops,
92        batch_operation_tag.c_nops,
93        wrapper.c_functor(), NULL)
94
95    if error != GRPC_CALL_OK:
96        raise ExecuteBatchError("Failed grpc_call_start_batch: {}".format(error))
97
98    await future
99
100    cdef grpc_event c_event
101    # Tag.event must be called, otherwise messages won't be parsed from C
102    batch_operation_tag.event(c_event)
103
104
105cdef prepend_send_initial_metadata_op(tuple ops, tuple metadata):
106    # Eventually, this function should be the only function that produces
107    # SendInitialMetadataOperation. So we have more control over the flag.
108    return (SendInitialMetadataOperation(
109        metadata,
110        _EMPTY_FLAG
111    ),) + ops
112
113
114async def _receive_message(GrpcCallWrapper grpc_call_wrapper,
115                           object loop):
116    """Retrives parsed messages from Core.
117
118    The messages maybe already in Core's buffer, so there isn't a 1-to-1
119    mapping between this and the underlying "socket.read()". Also, eventually,
120    this function will end with an EOF, which reads empty message.
121    """
122    cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG)
123    cdef tuple ops = (receive_op,)
124    try:
125        await execute_batch(grpc_call_wrapper, ops, loop)
126    except ExecuteBatchError as e:
127        # NOTE(lidiz) The receive message operation has two ways to indicate
128        # finish state : 1) returns empty message due to EOF; 2) fails inside
129        # the callback (e.g. cancelled).
130        #
131        # Since they all indicates finish, they are better be merged.
132        _LOGGER.debug('Failed to receive any message from Core')
133    # NOTE(lidiz) The returned message might be an empty bytes (aka. b'').
134    # Please explicitly check if it is None or falsey string object!
135    return receive_op.message()
136
137
138async def _send_message(GrpcCallWrapper grpc_call_wrapper,
139                        bytes message,
140                        Operation send_initial_metadata_op,
141                        int write_flag,
142                        object loop):
143    cdef SendMessageOperation op = SendMessageOperation(message, write_flag)
144    cdef tuple ops = (op,)
145    if send_initial_metadata_op is not None:
146        ops = (send_initial_metadata_op,) + ops
147    await execute_batch(grpc_call_wrapper, ops, loop)
148
149
150async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
151                                 tuple metadata,
152                                 int flags,
153                                 object loop):
154    cdef SendInitialMetadataOperation op = SendInitialMetadataOperation(
155        metadata,
156        flags)
157    cdef tuple ops = (op,)
158    await execute_batch(grpc_call_wrapper, ops, loop)
159
160
161async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
162                                    object loop):
163    cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
164    cdef tuple ops = (op,)
165    await execute_batch(grpc_call_wrapper, ops, loop)
166    return op.initial_metadata()
167
168async def _send_error_status_from_server(GrpcCallWrapper grpc_call_wrapper,
169                                         grpc_status_code code,
170                                         str details,
171                                         tuple trailing_metadata,
172                                         Operation send_initial_metadata_op,
173                                         object loop):
174    assert code != StatusCode.ok, 'Expecting non-ok status code.'
175    cdef SendStatusFromServerOperation op = SendStatusFromServerOperation(
176        trailing_metadata,
177        code,
178        details,
179        _EMPTY_FLAGS,
180    )
181    cdef tuple ops = (op,)
182    if send_initial_metadata_op is not None:
183        ops = (send_initial_metadata_op,) + ops
184    await execute_batch(grpc_call_wrapper, ops, loop)
185