1cdef __pipe_init_uv_handle(UVStream handle, Loop loop): 2 cdef int err 3 4 handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t)) 5 if handle._handle is NULL: 6 handle._abort_init() 7 raise MemoryError() 8 9 # Initialize pipe handle with ipc=0. 10 # ipc=1 means that libuv will use recvmsg/sendmsg 11 # instead of recv/send. 12 err = uv.uv_pipe_init(handle._loop.uvloop, 13 <uv.uv_pipe_t*>handle._handle, 14 0) 15 # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe 16 # even if it is O_WRONLY, see also #317, libuv/libuv#2058 17 handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE 18 if err < 0: 19 handle._abort_init() 20 raise convert_error(err) 21 22 handle._finish_init() 23 24 25cdef __pipe_open(UVStream handle, int fd): 26 cdef int err 27 err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle, 28 <uv.uv_file>fd) 29 if err < 0: 30 exc = convert_error(err) 31 raise exc 32 33 34cdef __pipe_get_socket(UVSocketHandle handle): 35 fileno = handle._fileno() 36 return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno) 37 38 39@cython.no_gc_clear 40cdef class UnixServer(UVStreamServer): 41 42 @staticmethod 43 cdef UnixServer new(Loop loop, object protocol_factory, Server server, 44 object backlog, 45 object ssl, 46 object ssl_handshake_timeout, 47 object ssl_shutdown_timeout): 48 49 cdef UnixServer handle 50 handle = UnixServer.__new__(UnixServer) 51 handle._init(loop, protocol_factory, server, backlog, 52 ssl, ssl_handshake_timeout, ssl_shutdown_timeout) 53 __pipe_init_uv_handle(<UVStream>handle, loop) 54 return handle 55 56 cdef _new_socket(self): 57 return __pipe_get_socket(<UVSocketHandle>self) 58 59 cdef _open(self, int sockfd): 60 self._ensure_alive() 61 __pipe_open(<UVStream>self, sockfd) 62 self._mark_as_open() 63 64 cdef bind(self, str path): 65 cdef int err 66 self._ensure_alive() 67 err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle, 68 path.encode()) 69 if err < 0: 70 exc = convert_error(err) 71 self._fatal_error(exc, True) 72 return 73 74 self._mark_as_open() 75 76 cdef UVStream _make_new_transport(self, object protocol, object waiter, 77 object context): 78 cdef UnixTransport tr 79 tr = UnixTransport.new(self._loop, protocol, self._server, waiter, 80 context) 81 return <UVStream>tr 82 83 84@cython.no_gc_clear 85cdef class UnixTransport(UVStream): 86 87 @staticmethod 88 cdef UnixTransport new(Loop loop, object protocol, Server server, 89 object waiter, object context): 90 91 cdef UnixTransport handle 92 handle = UnixTransport.__new__(UnixTransport) 93 handle._init(loop, protocol, server, waiter, context) 94 __pipe_init_uv_handle(<UVStream>handle, loop) 95 return handle 96 97 cdef _new_socket(self): 98 return __pipe_get_socket(<UVSocketHandle>self) 99 100 cdef _open(self, int sockfd): 101 __pipe_open(<UVStream>self, sockfd) 102 103 cdef connect(self, char* addr): 104 cdef _PipeConnectRequest req 105 req = _PipeConnectRequest(self._loop, self) 106 req.connect(addr) 107 108 109@cython.no_gc_clear 110cdef class ReadUnixTransport(UVStream): 111 112 @staticmethod 113 cdef ReadUnixTransport new(Loop loop, object protocol, Server server, 114 object waiter): 115 cdef ReadUnixTransport handle 116 handle = ReadUnixTransport.__new__(ReadUnixTransport) 117 # This is only used in connect_read_pipe() and subprocess_shell/exec() 118 # directly, we could simply copy the current context. 119 handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) 120 __pipe_init_uv_handle(<UVStream>handle, loop) 121 return handle 122 123 cdef _new_socket(self): 124 return __pipe_get_socket(<UVSocketHandle>self) 125 126 cdef _open(self, int sockfd): 127 __pipe_open(<UVStream>self, sockfd) 128 129 def get_write_buffer_limits(self): 130 raise NotImplementedError 131 132 def set_write_buffer_limits(self, high=None, low=None): 133 raise NotImplementedError 134 135 def get_write_buffer_size(self): 136 raise NotImplementedError 137 138 def write(self, data): 139 raise NotImplementedError 140 141 def writelines(self, list_of_data): 142 raise NotImplementedError 143 144 def write_eof(self): 145 raise NotImplementedError 146 147 def can_write_eof(self): 148 raise NotImplementedError 149 150 def abort(self): 151 raise NotImplementedError 152 153 154@cython.no_gc_clear 155cdef class WriteUnixTransport(UVStream): 156 157 @staticmethod 158 cdef WriteUnixTransport new(Loop loop, object protocol, Server server, 159 object waiter): 160 cdef WriteUnixTransport handle 161 handle = WriteUnixTransport.__new__(WriteUnixTransport) 162 163 # We listen for read events on write-end of the pipe. When 164 # the read-end is close, the uv_stream_t.read callback will 165 # receive an error -- we want to silence that error, and just 166 # close the transport. 167 handle._close_on_read_error() 168 169 # This is only used in connect_write_pipe() and subprocess_shell/exec() 170 # directly, we could simply copy the current context. 171 handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) 172 __pipe_init_uv_handle(<UVStream>handle, loop) 173 return handle 174 175 cdef _new_socket(self): 176 return __pipe_get_socket(<UVSocketHandle>self) 177 178 cdef _open(self, int sockfd): 179 __pipe_open(<UVStream>self, sockfd) 180 181 def pause_reading(self): 182 raise NotImplementedError 183 184 def resume_reading(self): 185 raise NotImplementedError 186 187 188cdef class _PipeConnectRequest(UVRequest): 189 cdef: 190 UnixTransport transport 191 uv.uv_connect_t _req_data 192 193 def __cinit__(self, loop, transport): 194 self.request = <uv.uv_req_t*> &self._req_data 195 self.request.data = <void*>self 196 self.transport = transport 197 198 cdef connect(self, char* addr): 199 # uv_pipe_connect returns void 200 uv.uv_pipe_connect(<uv.uv_connect_t*>self.request, 201 <uv.uv_pipe_t*>self.transport._handle, 202 addr, 203 __pipe_connect_callback) 204 205cdef void __pipe_connect_callback(uv.uv_connect_t* req, int status) with gil: 206 cdef: 207 _PipeConnectRequest wrapper 208 UnixTransport transport 209 210 wrapper = <_PipeConnectRequest> req.data 211 transport = wrapper.transport 212 213 if status < 0: 214 exc = convert_error(status) 215 else: 216 exc = None 217 218 try: 219 transport._on_connect(exc) 220 except BaseException as ex: 221 wrapper.transport._fatal_error(ex, False) 222 finally: 223 wrapper.on_done() 224