1cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
2    cdef int err
3
4    handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t))
5    if handle._handle is NULL:
6        handle._abort_init()
7        raise MemoryError()
8
9    # Initialize pipe handle with ipc=0.
10    # ipc=1 means that libuv will use recvmsg/sendmsg
11    # instead of recv/send.
12    err = uv.uv_pipe_init(handle._loop.uvloop,
13                          <uv.uv_pipe_t*>handle._handle,
14                          0)
15    # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
16    # even if it is O_WRONLY, see also #317, libuv/libuv#2058
17    handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE
18    if err < 0:
19        handle._abort_init()
20        raise convert_error(err)
21
22    handle._finish_init()
23
24
25cdef __pipe_open(UVStream handle, int fd):
26    cdef int err
27    err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle,
28                          <uv.uv_file>fd)
29    if err < 0:
30        exc = convert_error(err)
31        raise exc
32
33
34cdef __pipe_get_socket(UVSocketHandle handle):
35    fileno = handle._fileno()
36    return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno)
37
38
39@cython.no_gc_clear
40cdef class UnixServer(UVStreamServer):
41
42    @staticmethod
43    cdef UnixServer new(Loop loop, object protocol_factory, Server server,
44                        object backlog,
45                        object ssl,
46                        object ssl_handshake_timeout,
47                        object ssl_shutdown_timeout):
48
49        cdef UnixServer handle
50        handle = UnixServer.__new__(UnixServer)
51        handle._init(loop, protocol_factory, server, backlog,
52                     ssl, ssl_handshake_timeout, ssl_shutdown_timeout)
53        __pipe_init_uv_handle(<UVStream>handle, loop)
54        return handle
55
56    cdef _new_socket(self):
57        return __pipe_get_socket(<UVSocketHandle>self)
58
59    cdef _open(self, int sockfd):
60        self._ensure_alive()
61        __pipe_open(<UVStream>self, sockfd)
62        self._mark_as_open()
63
64    cdef bind(self, str path):
65        cdef int err
66        self._ensure_alive()
67        err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle,
68                              path.encode())
69        if err < 0:
70            exc = convert_error(err)
71            self._fatal_error(exc, True)
72            return
73
74        self._mark_as_open()
75
76    cdef UVStream _make_new_transport(self, object protocol, object waiter,
77                                      object context):
78        cdef UnixTransport tr
79        tr = UnixTransport.new(self._loop, protocol, self._server, waiter,
80                               context)
81        return <UVStream>tr
82
83
84@cython.no_gc_clear
85cdef class UnixTransport(UVStream):
86
87    @staticmethod
88    cdef UnixTransport new(Loop loop, object protocol, Server server,
89                           object waiter, object context):
90
91        cdef UnixTransport handle
92        handle = UnixTransport.__new__(UnixTransport)
93        handle._init(loop, protocol, server, waiter, context)
94        __pipe_init_uv_handle(<UVStream>handle, loop)
95        return handle
96
97    cdef _new_socket(self):
98        return __pipe_get_socket(<UVSocketHandle>self)
99
100    cdef _open(self, int sockfd):
101        __pipe_open(<UVStream>self, sockfd)
102
103    cdef connect(self, char* addr):
104        cdef _PipeConnectRequest req
105        req = _PipeConnectRequest(self._loop, self)
106        req.connect(addr)
107
108
109@cython.no_gc_clear
110cdef class ReadUnixTransport(UVStream):
111
112    @staticmethod
113    cdef ReadUnixTransport new(Loop loop, object protocol, Server server,
114                               object waiter):
115        cdef ReadUnixTransport handle
116        handle = ReadUnixTransport.__new__(ReadUnixTransport)
117        # This is only used in connect_read_pipe() and subprocess_shell/exec()
118        # directly, we could simply copy the current context.
119        handle._init(loop, protocol, server, waiter, Context_CopyCurrent())
120        __pipe_init_uv_handle(<UVStream>handle, loop)
121        return handle
122
123    cdef _new_socket(self):
124        return __pipe_get_socket(<UVSocketHandle>self)
125
126    cdef _open(self, int sockfd):
127        __pipe_open(<UVStream>self, sockfd)
128
129    def get_write_buffer_limits(self):
130        raise NotImplementedError
131
132    def set_write_buffer_limits(self, high=None, low=None):
133        raise NotImplementedError
134
135    def get_write_buffer_size(self):
136        raise NotImplementedError
137
138    def write(self, data):
139        raise NotImplementedError
140
141    def writelines(self, list_of_data):
142        raise NotImplementedError
143
144    def write_eof(self):
145        raise NotImplementedError
146
147    def can_write_eof(self):
148        raise NotImplementedError
149
150    def abort(self):
151        raise NotImplementedError
152
153
154@cython.no_gc_clear
155cdef class WriteUnixTransport(UVStream):
156
157    @staticmethod
158    cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
159                                object waiter):
160        cdef WriteUnixTransport handle
161        handle = WriteUnixTransport.__new__(WriteUnixTransport)
162
163        # We listen for read events on write-end of the pipe. When
164        # the read-end is close, the uv_stream_t.read callback will
165        # receive an error -- we want to silence that error, and just
166        # close the transport.
167        handle._close_on_read_error()
168
169        # This is only used in connect_write_pipe() and subprocess_shell/exec()
170        # directly, we could simply copy the current context.
171        handle._init(loop, protocol, server, waiter, Context_CopyCurrent())
172        __pipe_init_uv_handle(<UVStream>handle, loop)
173        return handle
174
175    cdef _new_socket(self):
176        return __pipe_get_socket(<UVSocketHandle>self)
177
178    cdef _open(self, int sockfd):
179        __pipe_open(<UVStream>self, sockfd)
180
181    def pause_reading(self):
182        raise NotImplementedError
183
184    def resume_reading(self):
185        raise NotImplementedError
186
187
188cdef class _PipeConnectRequest(UVRequest):
189    cdef:
190        UnixTransport transport
191        uv.uv_connect_t _req_data
192
193    def __cinit__(self, loop, transport):
194        self.request = <uv.uv_req_t*> &self._req_data
195        self.request.data = <void*>self
196        self.transport = transport
197
198    cdef connect(self, char* addr):
199        # uv_pipe_connect returns void
200        uv.uv_pipe_connect(<uv.uv_connect_t*>self.request,
201                           <uv.uv_pipe_t*>self.transport._handle,
202                           addr,
203                           __pipe_connect_callback)
204
205cdef void __pipe_connect_callback(uv.uv_connect_t* req, int status) with gil:
206    cdef:
207        _PipeConnectRequest wrapper
208        UnixTransport transport
209
210    wrapper = <_PipeConnectRequest> req.data
211    transport = wrapper.transport
212
213    if status < 0:
214        exc = convert_error(status)
215    else:
216        exc = None
217
218    try:
219        transport._on_connect(exc)
220    except BaseException as ex:
221        wrapper.transport._fatal_error(ex, False)
222    finally:
223        wrapper.on_done()
224