1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# distutils: language=c++
15
16from libc cimport string
17import errno
18gevent_g = None
19gevent_socket = None
20gevent_hub = None
21gevent_event = None
22g_event = None
23g_pool = None
24
25def _spawn_greenlet(*args):
26  greenlet = g_pool.spawn(*args)
27
28###############################
29### socket implementation ###
30###############################
31
32cdef class SocketWrapper:
33  def __cinit__(self):
34    fork_handlers_and_grpc_init()
35    self.sockopts = []
36    self.socket = None
37    self.c_socket = NULL
38    self.c_buffer = NULL
39    self.len = 0
40
41  def __dealloc__(self):
42    grpc_shutdown_blocking()
43
44cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil:
45  sw = SocketWrapper()
46  sw.c_socket = socket
47  sw.sockopts = []
48  cpython.Py_INCREF(sw)
49  # Python doesn't support AF_UNSPEC sockets, so we defer creation until
50  # bind/connect when we know what type of socket we need
51  sw.socket = None
52  sw.closed = False
53  sw.accepting_socket = NULL
54  socket.impl = <void*>sw
55  return grpc_error_none()
56
57cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple):
58  try:
59    socket_wrapper.socket.connect(addr_tuple)
60    socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
61                              grpc_error_none())
62  except IOError as io_error:
63    socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
64                              socket_error("connect", str(io_error)))
65  g_event.set()
66
67def socket_connect_async(socket_wrapper, addr_tuple):
68  socket_connect_async_cython(socket_wrapper, addr_tuple)
69
70cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
71                         size_t addr_len,
72                         grpc_custom_connect_callback cb) with gil:
73  py_socket = None
74  socket_wrapper = <SocketWrapper>socket.impl
75  socket_wrapper.connect_cb = cb
76  addr_tuple = sockaddr_to_tuple(addr, addr_len)
77  if sockaddr_is_ipv4(addr, addr_len):
78      py_socket = gevent_socket.socket(gevent_socket.AF_INET)
79  else:
80      py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
81  applysockopts(py_socket)
82  socket_wrapper.socket = py_socket
83  _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
84
85cdef void socket_destroy(grpc_custom_socket* socket) with gil:
86  cpython.Py_DECREF(<SocketWrapper>socket.impl)
87
88cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
89  try:
90    (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
91  except IOError as io_error:
92    if io_error.errno != errno.ENOTCONN:
93      raise io_error
94
95cdef void socket_close(grpc_custom_socket* socket,
96                       grpc_custom_close_callback cb) with gil:
97  socket_wrapper = (<SocketWrapper>socket.impl)
98  if socket_wrapper.socket is not None:
99    socket_wrapper.socket.close()
100    socket_wrapper.closed = True
101    socket_wrapper.close_cb = cb
102    # Delay the close callback until the accept() call has picked it up
103    if socket_wrapper.accepting_socket != NULL:
104      return
105  socket_wrapper.close_cb(socket)
106
107def socket_sendmsg(socket, write_bytes):
108  try:
109    return socket.sendmsg(write_bytes)
110  except AttributeError:
111    # sendmsg not available on all Pythons/Platforms
112    return socket.send(b''.join(write_bytes))
113
114cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes):
115  try:
116    while write_bytes:
117      sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes)
118      while sent_byte_count > 0:
119        if sent_byte_count < len(write_bytes[0]):
120          write_bytes[0] = write_bytes[0][sent_byte_count:]
121          sent_byte_count = 0
122        else:
123          sent_byte_count -= len(write_bytes[0])
124          write_bytes = write_bytes[1:]
125    socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
126                            grpc_error_none())
127  except IOError as io_error:
128    socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
129                            socket_error("send", str(io_error)))
130  g_event.set()
131
132def socket_write_async(socket_wrapper, write_bytes):
133  socket_write_async_cython(socket_wrapper, write_bytes)
134
135cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
136                       grpc_custom_write_callback cb) with gil:
137  cdef char* start
138  sw = <SocketWrapper>socket.impl
139  sw.write_cb = cb
140  write_bytes = []
141  for i in range(buffer.count):
142    start = grpc_slice_buffer_start(buffer, i)
143    length = grpc_slice_buffer_length(buffer, i)
144    write_bytes.append(<bytes>start[:length])
145  _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes)
146
147cdef socket_read_async_cython(SocketWrapper socket_wrapper):
148  cdef char* buff_char_arr
149  try:
150    buff_str = socket_wrapper.socket.recv(socket_wrapper.len)
151    buff_char_arr = buff_str
152    string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str))
153    socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
154                           len(buff_str), grpc_error_none())
155  except IOError as io_error:
156    socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
157                           -1, socket_error("recv", str(io_error)))
158  g_event.set()
159
160def socket_read_async(socket_wrapper):
161  socket_read_async_cython(socket_wrapper)
162
163cdef void socket_read(grpc_custom_socket* socket, char* buffer,
164                      size_t length, grpc_custom_read_callback cb) with gil:
165  sw = <SocketWrapper>socket.impl
166  sw.read_cb = cb
167  sw.c_buffer = buffer
168  sw.len = length
169  _spawn_greenlet(socket_read_async, sw)
170
171cdef grpc_error* socket_getpeername(grpc_custom_socket* socket,
172                                    const grpc_sockaddr* addr,
173                                    int* length) with gil:
174  cdef char* src_buf
175  peer = (<SocketWrapper>socket.impl).socket.getpeername()
176
177  cdef grpc_resolved_address c_addr
178  hostname = str_to_bytes(peer[0])
179  grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
180  string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
181  length[0] = c_addr.len
182  return grpc_error_none()
183
184cdef grpc_error* socket_getsockname(grpc_custom_socket* socket,
185                                    const grpc_sockaddr* addr,
186                                    int* length) with gil:
187  cdef char* src_buf
188  cdef grpc_resolved_address c_addr
189  if (<SocketWrapper>socket.impl).socket is None:
190    peer = ('0.0.0.0', 0)
191  else:
192    peer = (<SocketWrapper>socket.impl).socket.getsockname()
193  hostname = str_to_bytes(peer[0])
194  grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
195  string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
196  length[0] = c_addr.len
197  return grpc_error_none()
198
199def applysockopts(s):
200  s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1)
201  s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True)
202
203cdef grpc_error* socket_bind(grpc_custom_socket* socket,
204                             const grpc_sockaddr* addr,
205                             size_t len, int flags) with gil:
206  addr_tuple = sockaddr_to_tuple(addr, len)
207  try:
208    try:
209      py_socket = gevent_socket.socket(gevent_socket.AF_INET)
210      applysockopts(py_socket)
211      py_socket.bind(addr_tuple)
212    except gevent_socket.gaierror as e:
213      py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
214      applysockopts(py_socket)
215      py_socket.bind(addr_tuple)
216    (<SocketWrapper>socket.impl).socket = py_socket
217  except IOError as io_error:
218    return socket_error("bind", str(io_error))
219  else:
220    return grpc_error_none()
221
222cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil:
223  (<SocketWrapper>socket.impl).socket.listen(50)
224  return grpc_error_none()
225
226cdef void accept_callback_cython(SocketWrapper s) except *:
227   try:
228     conn, address = s.socket.accept()
229     sw = SocketWrapper()
230     sw.closed = False
231     sw.c_socket = s.accepting_socket
232     sw.sockopts = []
233     sw.socket = conn
234     sw.c_socket.impl = <void*>sw
235     sw.accepting_socket = NULL
236     cpython.Py_INCREF(sw)
237     s.accepting_socket = NULL
238     s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none())
239   except IOError as io_error:
240      #TODO actual error
241      s.accepting_socket = NULL
242      s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket,
243                  socket_error("accept", str(io_error)))
244      if s.closed:
245        s.close_cb(<grpc_custom_socket*>s.c_socket)
246   g_event.set()
247
248def socket_accept_async(s):
249  accept_callback_cython(s)
250
251cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
252                        grpc_custom_accept_callback cb) with gil:
253  sw = <SocketWrapper>socket.impl
254  sw.accepting_socket = client
255  sw.accept_cb = cb
256  _spawn_greenlet(socket_accept_async, sw)
257
258#####################################
259######Resolver implementation #######
260#####################################
261
262cdef class ResolveWrapper:
263  def __cinit__(self):
264    fork_handlers_and_grpc_init()
265    self.c_resolver = NULL
266    self.c_host = NULL
267    self.c_port = NULL
268
269  def __dealloc__(self):
270    grpc_shutdown_blocking()
271
272cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
273  try:
274    res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
275    grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
276                                 tuples_to_resolvaddr(res), grpc_error_none())
277  except IOError as io_error:
278    grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
279                                 <grpc_resolved_addresses*>0,
280                                 socket_error("getaddrinfo", str(io_error)))
281  g_event.set()
282
283def socket_resolve_async_python(resolve_wrapper):
284  socket_resolve_async_cython(resolve_wrapper)
285
286cdef void socket_resolve_async(grpc_custom_resolver* r, const char* host, const char* port) with gil:
287  rw = ResolveWrapper()
288  rw.c_resolver = r
289  rw.c_host = host
290  rw.c_port = port
291  _spawn_greenlet(socket_resolve_async_python, rw)
292
293cdef grpc_error* socket_resolve(const char* host, const char* port,
294                                grpc_resolved_addresses** res) with gil:
295    try:
296      result = gevent_socket.getaddrinfo(host, port)
297      res[0] = tuples_to_resolvaddr(result)
298      return grpc_error_none()
299    except IOError as io_error:
300      return socket_error("getaddrinfo", str(io_error))
301
302###############################
303### timer implementation ######
304###############################
305
306cdef class TimerWrapper:
307  def __cinit__(self, deadline):
308    fork_handlers_and_grpc_init()
309    self.timer = gevent_hub.get_hub().loop.timer(deadline)
310    self.event = None
311
312  def start(self):
313    self.event = gevent_event.Event()
314    self.timer.start(self.on_finish)
315
316  def on_finish(self):
317    grpc_custom_timer_callback(self.c_timer, grpc_error_none())
318    self.timer.stop()
319    g_event.set()
320
321  def stop(self):
322    self.event.set()
323    self.timer.stop()
324
325  def __dealloc__(self):
326    grpc_shutdown_blocking()
327
328cdef void timer_start(grpc_custom_timer* t) with gil:
329  timer = TimerWrapper(t.timeout_ms / 1000.0)
330  timer.c_timer = t
331  t.timer = <void*>timer
332  timer.start()
333
334cdef void timer_stop(grpc_custom_timer* t) with gil:
335  time_wrapper = <object>t.timer
336  time_wrapper.stop()
337
338###############################
339### pollset implementation ###
340###############################
341
342cdef void init_loop() with gil:
343  pass
344
345cdef void destroy_loop() with gil:
346  g_pool.join()
347
348cdef void kick_loop() with gil:
349  g_event.set()
350
351cdef void run_loop(size_t timeout_ms) with gil:
352    timeout = timeout_ms / 1000.0
353    if timeout_ms > 0:
354      g_event.wait(timeout)
355      g_event.clear()
356
357###############################
358### Initializer ###############
359###############################
360
361cdef grpc_socket_vtable gevent_socket_vtable
362cdef grpc_custom_resolver_vtable gevent_resolver_vtable
363cdef grpc_custom_timer_vtable gevent_timer_vtable
364cdef grpc_custom_poller_vtable gevent_pollset_vtable
365
366def init_grpc_gevent():
367  # Lazily import gevent
368  global gevent_socket
369  global gevent_g
370  global gevent_hub
371  global gevent_event
372  global g_event
373  global g_pool
374  import gevent
375  gevent_g = gevent
376  import gevent.socket
377  gevent_socket = gevent.socket
378  import gevent.hub
379  gevent_hub = gevent.hub
380  import gevent.event
381  gevent_event = gevent.event
382  import gevent.pool
383
384  g_event = gevent.event.Event()
385  g_pool = gevent.pool.Group()
386
387  def cb_func(cb, args):
388    _spawn_greenlet(cb, *args)
389  set_async_callback_func(cb_func)
390
391  gevent_resolver_vtable.resolve = socket_resolve
392  gevent_resolver_vtable.resolve_async = socket_resolve_async
393
394  gevent_socket_vtable.init = socket_init
395  gevent_socket_vtable.connect = socket_connect
396  gevent_socket_vtable.destroy = socket_destroy
397  gevent_socket_vtable.shutdown = socket_shutdown
398  gevent_socket_vtable.close = socket_close
399  gevent_socket_vtable.write = socket_write
400  gevent_socket_vtable.read = socket_read
401  gevent_socket_vtable.getpeername = socket_getpeername
402  gevent_socket_vtable.getsockname = socket_getsockname
403  gevent_socket_vtable.bind = socket_bind
404  gevent_socket_vtable.listen = socket_listen
405  gevent_socket_vtable.accept = socket_accept
406
407  gevent_timer_vtable.start = timer_start
408  gevent_timer_vtable.stop = timer_stop
409
410  gevent_pollset_vtable.init = init_loop
411  gevent_pollset_vtable.poll = run_loop
412  gevent_pollset_vtable.kick = kick_loop
413  gevent_pollset_vtable.shutdown = destroy_loop
414
415  grpc_custom_iomgr_init(&gevent_socket_vtable,
416                         &gevent_resolver_vtable,
417                         &gevent_timer_vtable,
418                         &gevent_pollset_vtable)
419