1#
2# Analogue of `multiprocessing.connection` which uses queues instead of sockets
3#
4# multiprocessing/dummy/connection.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = [ 'Client', 'Listener', 'Pipe' ]
11
12from queue import Queue
13
14
15families = [None]
16
17
18class Listener(object):
19
20    def __init__(self, address=None, family=None, backlog=1):
21        self._backlog_queue = Queue(backlog)
22
23    def accept(self):
24        return Connection(*self._backlog_queue.get())
25
26    def close(self):
27        self._backlog_queue = None
28
29    @property
30    def address(self):
31        return self._backlog_queue
32
33    def __enter__(self):
34        return self
35
36    def __exit__(self, exc_type, exc_value, exc_tb):
37        self.close()
38
39
40def Client(address):
41    _in, _out = Queue(), Queue()
42    address.put((_out, _in))
43    return Connection(_in, _out)
44
45
46def Pipe(duplex=True):
47    a, b = Queue(), Queue()
48    return Connection(a, b), Connection(b, a)
49
50
51class Connection(object):
52
53    def __init__(self, _in, _out):
54        self._out = _out
55        self._in = _in
56        self.send = self.send_bytes = _out.put
57        self.recv = self.recv_bytes = _in.get
58
59    def poll(self, timeout=0.0):
60        if self._in.qsize() > 0:
61            return True
62        if timeout <= 0.0:
63            return False
64        with self._in.not_empty:
65            self._in.not_empty.wait(timeout)
66        return self._in.qsize() > 0
67
68    def close(self):
69        pass
70
71    def __enter__(self):
72        return self
73
74    def __exit__(self, exc_type, exc_value, exc_tb):
75        self.close()
76