1# Copyright (c) 2019-2021, Felix Fontein <felix@fontein.de>
2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
3
4from __future__ import (absolute_import, division, print_function)
5__metaclass__ = type
6
7
8import os
9import os.path
10import socket as pysocket
11
12from ansible.module_utils.basic import missing_required_lib
13from ansible.module_utils.six import PY3
14
15try:
16    from docker.utils import socket as docker_socket
17    import struct
18except Exception:
19    # missing Docker SDK for Python handled in ansible_collections.community.docker.plugins.module_utils.common
20    pass
21
22from ansible_collections.community.docker.plugins.module_utils.socket_helper import (
23    make_unblocking,
24    shutdown_writing,
25    write_to_socket,
26)
27
28
29PARAMIKO_POLL_TIMEOUT = 0.01  # 10 milliseconds
30
31
32class DockerSocketHandlerBase(object):
33    def __init__(self, sock, selectors, log=None):
34        make_unblocking(sock)
35
36        self._selectors = selectors
37        if log is not None:
38            self._log = log
39        else:
40            self._log = lambda msg: True
41        self._paramiko_read_workaround = hasattr(sock, 'send_ready') and 'paramiko' in str(type(sock))
42
43        self._sock = sock
44        self._block_done_callback = None
45        self._block_buffer = []
46        self._eof = False
47        self._read_buffer = b''
48        self._write_buffer = b''
49        self._end_of_writing = False
50
51        self._current_stream = None
52        self._current_missing = 0
53        self._current_buffer = b''
54
55        self._selector = self._selectors.DefaultSelector()
56        self._selector.register(self._sock, self._selectors.EVENT_READ)
57
58    def __enter__(self):
59        return self
60
61    def __exit__(self, type, value, tb):
62        self._selector.close()
63
64    def set_block_done_callback(self, block_done_callback):
65        self._block_done_callback = block_done_callback
66        if self._block_done_callback is not None:
67            while self._block_buffer:
68                elt = self._block_buffer.remove(0)
69                self._block_done_callback(*elt)
70
71    def _add_block(self, stream_id, data):
72        if self._block_done_callback is not None:
73            self._block_done_callback(stream_id, data)
74        else:
75            self._block_buffer.append((stream_id, data))
76
77    def _read(self):
78        if self._eof:
79            return
80        if hasattr(self._sock, 'recv'):
81            try:
82                data = self._sock.recv(262144)
83            except Exception as e:
84                # After calling self._sock.shutdown(), OpenSSL's/urllib3's
85                # WrappedSocket seems to eventually raise ZeroReturnError in
86                # case of EOF
87                if 'OpenSSL.SSL.ZeroReturnError' in str(type(e)):
88                    self._eof = True
89                    return
90                else:
91                    raise
92        elif PY3 and isinstance(self._sock, getattr(pysocket, 'SocketIO')):
93            data = self._sock.read()
94        else:
95            data = os.read(self._sock.fileno())
96        if data is None:
97            # no data available
98            return
99        self._log('read {0} bytes'.format(len(data)))
100        if len(data) == 0:
101            # Stream EOF
102            self._eof = True
103            return
104        self._read_buffer += data
105        while len(self._read_buffer) > 0:
106            if self._current_missing > 0:
107                n = min(len(self._read_buffer), self._current_missing)
108                self._current_buffer += self._read_buffer[:n]
109                self._read_buffer = self._read_buffer[n:]
110                self._current_missing -= n
111                if self._current_missing == 0:
112                    self._add_block(self._current_stream, self._current_buffer)
113                    self._current_buffer = b''
114            if len(self._read_buffer) < 8:
115                break
116            self._current_stream, self._current_missing = struct.unpack('>BxxxL', self._read_buffer[:8])
117            self._read_buffer = self._read_buffer[8:]
118            if self._current_missing < 0:
119                # Stream EOF (as reported by docker daemon)
120                self._eof = True
121                break
122
123    def _handle_end_of_writing(self):
124        if self._end_of_writing and len(self._write_buffer) == 0:
125            self._end_of_writing = False
126            self._log('Shutting socket down for writing')
127            shutdown_writing(self._sock, self._log)
128
129    def _write(self):
130        if len(self._write_buffer) > 0:
131            written = write_to_socket(self._sock, self._write_buffer)
132            self._write_buffer = self._write_buffer[written:]
133            self._log('wrote {0} bytes, {1} are left'.format(written, len(self._write_buffer)))
134            if len(self._write_buffer) > 0:
135                self._selector.modify(self._sock, self._selectors.EVENT_READ | self._selectors.EVENT_WRITE)
136            else:
137                self._selector.modify(self._sock, self._selectors.EVENT_READ)
138            self._handle_end_of_writing()
139
140    def select(self, timeout=None, _internal_recursion=False):
141        if not _internal_recursion and self._paramiko_read_workaround and len(self._write_buffer) > 0:
142            # When the SSH transport is used, docker-py internally uses Paramiko, whose
143            # Channel object supports select(), but only for reading
144            # (https://github.com/paramiko/paramiko/issues/695).
145            if self._sock.send_ready():
146                self._write()
147                return True
148            while timeout is None or timeout > PARAMIKO_POLL_TIMEOUT:
149                result = self.select(PARAMIKO_POLL_TIMEOUT, _internal_recursion=True)
150                if self._sock.send_ready():
151                    self._read()
152                    result += 1
153                if result > 0:
154                    return True
155                if timeout is not None:
156                    timeout -= PARAMIKO_POLL_TIMEOUT
157        self._log('select... ({0})'.format(timeout))
158        events = self._selector.select(timeout)
159        for key, event in events:
160            if key.fileobj == self._sock:
161                self._log(
162                    'select event read:{0} write:{1}'.format(
163                        event & self._selectors.EVENT_READ != 0,
164                        event & self._selectors.EVENT_WRITE != 0))
165                if event & self._selectors.EVENT_READ != 0:
166                    self._read()
167                if event & self._selectors.EVENT_WRITE != 0:
168                    self._write()
169        result = len(events)
170        if self._paramiko_read_workaround and len(self._write_buffer) > 0:
171            if self._sock.send_ready():
172                self._write()
173                result += 1
174        return result > 0
175
176    def is_eof(self):
177        return self._eof
178
179    def end_of_writing(self):
180        self._end_of_writing = True
181        self._handle_end_of_writing()
182
183    def consume(self):
184        stdout = []
185        stderr = []
186
187        def append_block(stream_id, data):
188            if stream_id == docker_socket.STDOUT:
189                stdout.append(data)
190            elif stream_id == docker_socket.STDERR:
191                stderr.append(data)
192            else:
193                raise ValueError('{0} is not a valid stream ID'.format(stream_id))
194
195        self.end_of_writing()
196
197        self.set_block_done_callback(append_block)
198        while not self._eof:
199            self.select()
200        return b''.join(stdout), b''.join(stderr)
201
202    def write(self, str):
203        self._write_buffer += str
204        if len(self._write_buffer) == len(str):
205            self._write()
206
207
208class DockerSocketHandlerModule(DockerSocketHandlerBase):
209    def __init__(self, sock, module, selectors):
210        super(DockerSocketHandlerModule, self).__init__(sock, selectors, module.debug)
211
212
213def find_selectors(module):
214    try:
215        # ansible-base 2.10+ has selectors a compat version of selectors, which a bundled fallback:
216        from ansible.module_utils.compat import selectors
217        return selectors
218    except ImportError:
219        pass
220    try:
221        # Python 3.4+
222        import selectors
223        return selectors
224    except ImportError:
225        pass
226    try:
227        # backport package installed in the system
228        import selectors2
229        return selectors2
230    except ImportError:
231        pass
232    module.fail_json(msg=missing_required_lib('selectors2', reason='for handling stdin'))
233