1# Copyright 2019 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16cdef class CallbackFailureHandler: 17 18 def __cinit__(self, 19 str core_function_name, 20 object error_details, 21 object exception_type): 22 """Handles failure by raising exception.""" 23 self._core_function_name = core_function_name 24 self._error_details = error_details 25 self._exception_type = exception_type 26 27 cdef handle(self, object future): 28 future.set_exception(self._exception_type( 29 'Failed "%s": %s' % (self._core_function_name, self._error_details) 30 )) 31 32 33cdef class CallbackWrapper: 34 35 def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler): 36 self.context.functor.functor_run = self.functor_run 37 self.context.waiter = <cpython.PyObject*>future 38 self.context.loop = <cpython.PyObject*>loop 39 self.context.failure_handler = <cpython.PyObject*>failure_handler 40 self.context.callback_wrapper = <cpython.PyObject*>self 41 # NOTE(lidiz) Not using a list here, because this class is critical in 42 # data path. We should make it as efficient as possible. 43 self._reference_of_future = future 44 self._reference_of_failure_handler = failure_handler 45 # NOTE(lidiz) We need to ensure when Core invokes our callback, the 46 # callback function itself is not deallocated. Othersise, we will get 47 # a segfault. We can view this as Core holding a ref. 48 cpython.Py_INCREF(self) 49 50 @staticmethod 51 cdef void functor_run( 52 grpc_completion_queue_functor* functor, 53 int success): 54 cdef CallbackContext *context = <CallbackContext *>functor 55 cdef object waiter = <object>context.waiter 56 if not waiter.cancelled(): 57 if success == 0: 58 (<CallbackFailureHandler>context.failure_handler).handle(waiter) 59 else: 60 waiter.set_result(None) 61 cpython.Py_DECREF(<object>context.callback_wrapper) 62 63 cdef grpc_completion_queue_functor *c_functor(self): 64 return &self.context.functor 65 66 67cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( 68 'grpc_completion_queue_shutdown', 69 'Unknown', 70 InternalError) 71 72 73class ExecuteBatchError(InternalError): 74 """Raised when execute batch returns a failure from Core.""" 75 76 77async def execute_batch(GrpcCallWrapper grpc_call_wrapper, 78 tuple operations, 79 object loop): 80 """The callback version of start batch operations.""" 81 cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) 82 batch_operation_tag.prepare() 83 84 cdef object future = loop.create_future() 85 cdef CallbackWrapper wrapper = CallbackWrapper( 86 future, 87 loop, 88 CallbackFailureHandler('execute_batch', operations, ExecuteBatchError)) 89 cdef grpc_call_error error = grpc_call_start_batch( 90 grpc_call_wrapper.call, 91 batch_operation_tag.c_ops, 92 batch_operation_tag.c_nops, 93 wrapper.c_functor(), NULL) 94 95 if error != GRPC_CALL_OK: 96 raise ExecuteBatchError("Failed grpc_call_start_batch: {}".format(error)) 97 98 await future 99 100 cdef grpc_event c_event 101 # Tag.event must be called, otherwise messages won't be parsed from C 102 batch_operation_tag.event(c_event) 103 104 105cdef prepend_send_initial_metadata_op(tuple ops, tuple metadata): 106 # Eventually, this function should be the only function that produces 107 # SendInitialMetadataOperation. So we have more control over the flag. 108 return (SendInitialMetadataOperation( 109 metadata, 110 _EMPTY_FLAG 111 ),) + ops 112 113 114async def _receive_message(GrpcCallWrapper grpc_call_wrapper, 115 object loop): 116 """Retrives parsed messages from Core. 117 118 The messages maybe already in Core's buffer, so there isn't a 1-to-1 119 mapping between this and the underlying "socket.read()". Also, eventually, 120 this function will end with an EOF, which reads empty message. 121 """ 122 cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG) 123 cdef tuple ops = (receive_op,) 124 try: 125 await execute_batch(grpc_call_wrapper, ops, loop) 126 except ExecuteBatchError as e: 127 # NOTE(lidiz) The receive message operation has two ways to indicate 128 # finish state : 1) returns empty message due to EOF; 2) fails inside 129 # the callback (e.g. cancelled). 130 # 131 # Since they all indicates finish, they are better be merged. 132 _LOGGER.debug('Failed to receive any message from Core') 133 # NOTE(lidiz) The returned message might be an empty bytes (aka. b''). 134 # Please explicitly check if it is None or falsey string object! 135 return receive_op.message() 136 137 138async def _send_message(GrpcCallWrapper grpc_call_wrapper, 139 bytes message, 140 Operation send_initial_metadata_op, 141 int write_flag, 142 object loop): 143 cdef SendMessageOperation op = SendMessageOperation(message, write_flag) 144 cdef tuple ops = (op,) 145 if send_initial_metadata_op is not None: 146 ops = (send_initial_metadata_op,) + ops 147 await execute_batch(grpc_call_wrapper, ops, loop) 148 149 150async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, 151 tuple metadata, 152 int flags, 153 object loop): 154 cdef SendInitialMetadataOperation op = SendInitialMetadataOperation( 155 metadata, 156 flags) 157 cdef tuple ops = (op,) 158 await execute_batch(grpc_call_wrapper, ops, loop) 159 160 161async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper, 162 object loop): 163 cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) 164 cdef tuple ops = (op,) 165 await execute_batch(grpc_call_wrapper, ops, loop) 166 return op.initial_metadata() 167 168async def _send_error_status_from_server(GrpcCallWrapper grpc_call_wrapper, 169 grpc_status_code code, 170 str details, 171 tuple trailing_metadata, 172 Operation send_initial_metadata_op, 173 object loop): 174 assert code != StatusCode.ok, 'Expecting non-ok status code.' 175 cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( 176 trailing_metadata, 177 code, 178 details, 179 _EMPTY_FLAGS, 180 ) 181 cdef tuple ops = (op,) 182 if send_initial_metadata_op is not None: 183 ops = (send_initial_metadata_op,) + ops 184 await execute_batch(grpc_call_wrapper, ops, loop) 185