1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# distutils: language=c++ 15 16from libc cimport string 17import errno 18gevent_g = None 19gevent_socket = None 20gevent_hub = None 21gevent_event = None 22g_event = None 23g_pool = None 24 25def _spawn_greenlet(*args): 26 greenlet = g_pool.spawn(*args) 27 28############################### 29### socket implementation ### 30############################### 31 32cdef class SocketWrapper: 33 def __cinit__(self): 34 fork_handlers_and_grpc_init() 35 self.sockopts = [] 36 self.socket = None 37 self.c_socket = NULL 38 self.c_buffer = NULL 39 self.len = 0 40 41 def __dealloc__(self): 42 grpc_shutdown_blocking() 43 44cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: 45 sw = SocketWrapper() 46 sw.c_socket = socket 47 sw.sockopts = [] 48 cpython.Py_INCREF(sw) 49 # Python doesn't support AF_UNSPEC sockets, so we defer creation until 50 # bind/connect when we know what type of socket we need 51 sw.socket = None 52 sw.closed = False 53 sw.accepting_socket = NULL 54 socket.impl = <void*>sw 55 return grpc_error_none() 56 57cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): 58 try: 59 socket_wrapper.socket.connect(addr_tuple) 60 socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 61 grpc_error_none()) 62 except IOError as io_error: 63 socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 64 socket_error("connect", str(io_error))) 65 g_event.set() 66 67def socket_connect_async(socket_wrapper, addr_tuple): 68 socket_connect_async_cython(socket_wrapper, addr_tuple) 69 70cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, 71 size_t addr_len, 72 grpc_custom_connect_callback cb) with gil: 73 py_socket = None 74 socket_wrapper = <SocketWrapper>socket.impl 75 socket_wrapper.connect_cb = cb 76 addr_tuple = sockaddr_to_tuple(addr, addr_len) 77 if sockaddr_is_ipv4(addr, addr_len): 78 py_socket = gevent_socket.socket(gevent_socket.AF_INET) 79 else: 80 py_socket = gevent_socket.socket(gevent_socket.AF_INET6) 81 applysockopts(py_socket) 82 socket_wrapper.socket = py_socket 83 _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) 84 85cdef void socket_destroy(grpc_custom_socket* socket) with gil: 86 cpython.Py_DECREF(<SocketWrapper>socket.impl) 87 88cdef void socket_shutdown(grpc_custom_socket* socket) with gil: 89 try: 90 (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) 91 except IOError as io_error: 92 if io_error.errno != errno.ENOTCONN: 93 raise io_error 94 95cdef void socket_close(grpc_custom_socket* socket, 96 grpc_custom_close_callback cb) with gil: 97 socket_wrapper = (<SocketWrapper>socket.impl) 98 if socket_wrapper.socket is not None: 99 socket_wrapper.socket.close() 100 socket_wrapper.closed = True 101 socket_wrapper.close_cb = cb 102 # Delay the close callback until the accept() call has picked it up 103 if socket_wrapper.accepting_socket != NULL: 104 return 105 socket_wrapper.close_cb(socket) 106 107def socket_sendmsg(socket, write_bytes): 108 try: 109 return socket.sendmsg(write_bytes) 110 except AttributeError: 111 # sendmsg not available on all Pythons/Platforms 112 return socket.send(b''.join(write_bytes)) 113 114cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): 115 try: 116 while write_bytes: 117 sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) 118 while sent_byte_count > 0: 119 if sent_byte_count < len(write_bytes[0]): 120 write_bytes[0] = write_bytes[0][sent_byte_count:] 121 sent_byte_count = 0 122 else: 123 sent_byte_count -= len(write_bytes[0]) 124 write_bytes = write_bytes[1:] 125 socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 126 grpc_error_none()) 127 except IOError as io_error: 128 socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 129 socket_error("send", str(io_error))) 130 g_event.set() 131 132def socket_write_async(socket_wrapper, write_bytes): 133 socket_write_async_cython(socket_wrapper, write_bytes) 134 135cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, 136 grpc_custom_write_callback cb) with gil: 137 cdef char* start 138 sw = <SocketWrapper>socket.impl 139 sw.write_cb = cb 140 write_bytes = [] 141 for i in range(buffer.count): 142 start = grpc_slice_buffer_start(buffer, i) 143 length = grpc_slice_buffer_length(buffer, i) 144 write_bytes.append(<bytes>start[:length]) 145 _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes) 146 147cdef socket_read_async_cython(SocketWrapper socket_wrapper): 148 cdef char* buff_char_arr 149 try: 150 buff_str = socket_wrapper.socket.recv(socket_wrapper.len) 151 buff_char_arr = buff_str 152 string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) 153 socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 154 len(buff_str), grpc_error_none()) 155 except IOError as io_error: 156 socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 157 -1, socket_error("recv", str(io_error))) 158 g_event.set() 159 160def socket_read_async(socket_wrapper): 161 socket_read_async_cython(socket_wrapper) 162 163cdef void socket_read(grpc_custom_socket* socket, char* buffer, 164 size_t length, grpc_custom_read_callback cb) with gil: 165 sw = <SocketWrapper>socket.impl 166 sw.read_cb = cb 167 sw.c_buffer = buffer 168 sw.len = length 169 _spawn_greenlet(socket_read_async, sw) 170 171cdef grpc_error* socket_getpeername(grpc_custom_socket* socket, 172 const grpc_sockaddr* addr, 173 int* length) with gil: 174 cdef char* src_buf 175 peer = (<SocketWrapper>socket.impl).socket.getpeername() 176 177 cdef grpc_resolved_address c_addr 178 hostname = str_to_bytes(peer[0]) 179 grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) 180 string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) 181 length[0] = c_addr.len 182 return grpc_error_none() 183 184cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, 185 const grpc_sockaddr* addr, 186 int* length) with gil: 187 cdef char* src_buf 188 cdef grpc_resolved_address c_addr 189 if (<SocketWrapper>socket.impl).socket is None: 190 peer = ('0.0.0.0', 0) 191 else: 192 peer = (<SocketWrapper>socket.impl).socket.getsockname() 193 hostname = str_to_bytes(peer[0]) 194 grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) 195 string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) 196 length[0] = c_addr.len 197 return grpc_error_none() 198 199def applysockopts(s): 200 s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) 201 s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) 202 203cdef grpc_error* socket_bind(grpc_custom_socket* socket, 204 const grpc_sockaddr* addr, 205 size_t len, int flags) with gil: 206 addr_tuple = sockaddr_to_tuple(addr, len) 207 try: 208 try: 209 py_socket = gevent_socket.socket(gevent_socket.AF_INET) 210 applysockopts(py_socket) 211 py_socket.bind(addr_tuple) 212 except gevent_socket.gaierror as e: 213 py_socket = gevent_socket.socket(gevent_socket.AF_INET6) 214 applysockopts(py_socket) 215 py_socket.bind(addr_tuple) 216 (<SocketWrapper>socket.impl).socket = py_socket 217 except IOError as io_error: 218 return socket_error("bind", str(io_error)) 219 else: 220 return grpc_error_none() 221 222cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil: 223 (<SocketWrapper>socket.impl).socket.listen(50) 224 return grpc_error_none() 225 226cdef void accept_callback_cython(SocketWrapper s) except *: 227 try: 228 conn, address = s.socket.accept() 229 sw = SocketWrapper() 230 sw.closed = False 231 sw.c_socket = s.accepting_socket 232 sw.sockopts = [] 233 sw.socket = conn 234 sw.c_socket.impl = <void*>sw 235 sw.accepting_socket = NULL 236 cpython.Py_INCREF(sw) 237 s.accepting_socket = NULL 238 s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none()) 239 except IOError as io_error: 240 #TODO actual error 241 s.accepting_socket = NULL 242 s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket, 243 socket_error("accept", str(io_error))) 244 if s.closed: 245 s.close_cb(<grpc_custom_socket*>s.c_socket) 246 g_event.set() 247 248def socket_accept_async(s): 249 accept_callback_cython(s) 250 251cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, 252 grpc_custom_accept_callback cb) with gil: 253 sw = <SocketWrapper>socket.impl 254 sw.accepting_socket = client 255 sw.accept_cb = cb 256 _spawn_greenlet(socket_accept_async, sw) 257 258##################################### 259######Resolver implementation ####### 260##################################### 261 262cdef class ResolveWrapper: 263 def __cinit__(self): 264 fork_handlers_and_grpc_init() 265 self.c_resolver = NULL 266 self.c_host = NULL 267 self.c_port = NULL 268 269 def __dealloc__(self): 270 grpc_shutdown_blocking() 271 272cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): 273 try: 274 res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) 275 grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, 276 tuples_to_resolvaddr(res), grpc_error_none()) 277 except IOError as io_error: 278 grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, 279 <grpc_resolved_addresses*>0, 280 socket_error("getaddrinfo", str(io_error))) 281 g_event.set() 282 283def socket_resolve_async_python(resolve_wrapper): 284 socket_resolve_async_cython(resolve_wrapper) 285 286cdef void socket_resolve_async(grpc_custom_resolver* r, const char* host, const char* port) with gil: 287 rw = ResolveWrapper() 288 rw.c_resolver = r 289 rw.c_host = host 290 rw.c_port = port 291 _spawn_greenlet(socket_resolve_async_python, rw) 292 293cdef grpc_error* socket_resolve(const char* host, const char* port, 294 grpc_resolved_addresses** res) with gil: 295 try: 296 result = gevent_socket.getaddrinfo(host, port) 297 res[0] = tuples_to_resolvaddr(result) 298 return grpc_error_none() 299 except IOError as io_error: 300 return socket_error("getaddrinfo", str(io_error)) 301 302############################### 303### timer implementation ###### 304############################### 305 306cdef class TimerWrapper: 307 def __cinit__(self, deadline): 308 fork_handlers_and_grpc_init() 309 self.timer = gevent_hub.get_hub().loop.timer(deadline) 310 self.event = None 311 312 def start(self): 313 self.event = gevent_event.Event() 314 self.timer.start(self.on_finish) 315 316 def on_finish(self): 317 grpc_custom_timer_callback(self.c_timer, grpc_error_none()) 318 self.timer.stop() 319 g_event.set() 320 321 def stop(self): 322 self.event.set() 323 self.timer.stop() 324 325 def __dealloc__(self): 326 grpc_shutdown_blocking() 327 328cdef void timer_start(grpc_custom_timer* t) with gil: 329 timer = TimerWrapper(t.timeout_ms / 1000.0) 330 timer.c_timer = t 331 t.timer = <void*>timer 332 timer.start() 333 334cdef void timer_stop(grpc_custom_timer* t) with gil: 335 time_wrapper = <object>t.timer 336 time_wrapper.stop() 337 338############################### 339### pollset implementation ### 340############################### 341 342cdef void init_loop() with gil: 343 pass 344 345cdef void destroy_loop() with gil: 346 g_pool.join() 347 348cdef void kick_loop() with gil: 349 g_event.set() 350 351cdef void run_loop(size_t timeout_ms) with gil: 352 timeout = timeout_ms / 1000.0 353 if timeout_ms > 0: 354 g_event.wait(timeout) 355 g_event.clear() 356 357############################### 358### Initializer ############### 359############################### 360 361cdef grpc_socket_vtable gevent_socket_vtable 362cdef grpc_custom_resolver_vtable gevent_resolver_vtable 363cdef grpc_custom_timer_vtable gevent_timer_vtable 364cdef grpc_custom_poller_vtable gevent_pollset_vtable 365 366def init_grpc_gevent(): 367 # Lazily import gevent 368 global gevent_socket 369 global gevent_g 370 global gevent_hub 371 global gevent_event 372 global g_event 373 global g_pool 374 import gevent 375 gevent_g = gevent 376 import gevent.socket 377 gevent_socket = gevent.socket 378 import gevent.hub 379 gevent_hub = gevent.hub 380 import gevent.event 381 gevent_event = gevent.event 382 import gevent.pool 383 384 g_event = gevent.event.Event() 385 g_pool = gevent.pool.Group() 386 387 def cb_func(cb, args): 388 _spawn_greenlet(cb, *args) 389 set_async_callback_func(cb_func) 390 391 gevent_resolver_vtable.resolve = socket_resolve 392 gevent_resolver_vtable.resolve_async = socket_resolve_async 393 394 gevent_socket_vtable.init = socket_init 395 gevent_socket_vtable.connect = socket_connect 396 gevent_socket_vtable.destroy = socket_destroy 397 gevent_socket_vtable.shutdown = socket_shutdown 398 gevent_socket_vtable.close = socket_close 399 gevent_socket_vtable.write = socket_write 400 gevent_socket_vtable.read = socket_read 401 gevent_socket_vtable.getpeername = socket_getpeername 402 gevent_socket_vtable.getsockname = socket_getsockname 403 gevent_socket_vtable.bind = socket_bind 404 gevent_socket_vtable.listen = socket_listen 405 gevent_socket_vtable.accept = socket_accept 406 407 gevent_timer_vtable.start = timer_start 408 gevent_timer_vtable.stop = timer_stop 409 410 gevent_pollset_vtable.init = init_loop 411 gevent_pollset_vtable.poll = run_loop 412 gevent_pollset_vtable.kick = kick_loop 413 gevent_pollset_vtable.shutdown = destroy_loop 414 415 grpc_custom_iomgr_init(&gevent_socket_vtable, 416 &gevent_resolver_vtable, 417 &gevent_timer_vtable, 418 &gevent_pollset_vtable) 419