1# Copyright (c) 2019-2021, Felix Fontein <felix@fontein.de> 2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 3 4from __future__ import (absolute_import, division, print_function) 5__metaclass__ = type 6 7 8import os 9import os.path 10import socket as pysocket 11 12from ansible.module_utils.basic import missing_required_lib 13from ansible.module_utils.six import PY3 14 15try: 16 from docker.utils import socket as docker_socket 17 import struct 18except Exception: 19 # missing Docker SDK for Python handled in ansible_collections.community.docker.plugins.module_utils.common 20 pass 21 22from ansible_collections.community.docker.plugins.module_utils.socket_helper import ( 23 make_unblocking, 24 shutdown_writing, 25 write_to_socket, 26) 27 28 29PARAMIKO_POLL_TIMEOUT = 0.01 # 10 milliseconds 30 31 32class DockerSocketHandlerBase(object): 33 def __init__(self, sock, selectors, log=None): 34 make_unblocking(sock) 35 36 self._selectors = selectors 37 if log is not None: 38 self._log = log 39 else: 40 self._log = lambda msg: True 41 self._paramiko_read_workaround = hasattr(sock, 'send_ready') and 'paramiko' in str(type(sock)) 42 43 self._sock = sock 44 self._block_done_callback = None 45 self._block_buffer = [] 46 self._eof = False 47 self._read_buffer = b'' 48 self._write_buffer = b'' 49 self._end_of_writing = False 50 51 self._current_stream = None 52 self._current_missing = 0 53 self._current_buffer = b'' 54 55 self._selector = self._selectors.DefaultSelector() 56 self._selector.register(self._sock, self._selectors.EVENT_READ) 57 58 def __enter__(self): 59 return self 60 61 def __exit__(self, type, value, tb): 62 self._selector.close() 63 64 def set_block_done_callback(self, block_done_callback): 65 self._block_done_callback = block_done_callback 66 if self._block_done_callback is not None: 67 while self._block_buffer: 68 elt = self._block_buffer.remove(0) 69 self._block_done_callback(*elt) 70 71 def _add_block(self, stream_id, data): 72 if self._block_done_callback is not None: 73 self._block_done_callback(stream_id, data) 74 else: 75 self._block_buffer.append((stream_id, data)) 76 77 def _read(self): 78 if self._eof: 79 return 80 if hasattr(self._sock, 'recv'): 81 try: 82 data = self._sock.recv(262144) 83 except Exception as e: 84 # After calling self._sock.shutdown(), OpenSSL's/urllib3's 85 # WrappedSocket seems to eventually raise ZeroReturnError in 86 # case of EOF 87 if 'OpenSSL.SSL.ZeroReturnError' in str(type(e)): 88 self._eof = True 89 return 90 else: 91 raise 92 elif PY3 and isinstance(self._sock, getattr(pysocket, 'SocketIO')): 93 data = self._sock.read() 94 else: 95 data = os.read(self._sock.fileno()) 96 if data is None: 97 # no data available 98 return 99 self._log('read {0} bytes'.format(len(data))) 100 if len(data) == 0: 101 # Stream EOF 102 self._eof = True 103 return 104 self._read_buffer += data 105 while len(self._read_buffer) > 0: 106 if self._current_missing > 0: 107 n = min(len(self._read_buffer), self._current_missing) 108 self._current_buffer += self._read_buffer[:n] 109 self._read_buffer = self._read_buffer[n:] 110 self._current_missing -= n 111 if self._current_missing == 0: 112 self._add_block(self._current_stream, self._current_buffer) 113 self._current_buffer = b'' 114 if len(self._read_buffer) < 8: 115 break 116 self._current_stream, self._current_missing = struct.unpack('>BxxxL', self._read_buffer[:8]) 117 self._read_buffer = self._read_buffer[8:] 118 if self._current_missing < 0: 119 # Stream EOF (as reported by docker daemon) 120 self._eof = True 121 break 122 123 def _handle_end_of_writing(self): 124 if self._end_of_writing and len(self._write_buffer) == 0: 125 self._end_of_writing = False 126 self._log('Shutting socket down for writing') 127 shutdown_writing(self._sock, self._log) 128 129 def _write(self): 130 if len(self._write_buffer) > 0: 131 written = write_to_socket(self._sock, self._write_buffer) 132 self._write_buffer = self._write_buffer[written:] 133 self._log('wrote {0} bytes, {1} are left'.format(written, len(self._write_buffer))) 134 if len(self._write_buffer) > 0: 135 self._selector.modify(self._sock, self._selectors.EVENT_READ | self._selectors.EVENT_WRITE) 136 else: 137 self._selector.modify(self._sock, self._selectors.EVENT_READ) 138 self._handle_end_of_writing() 139 140 def select(self, timeout=None, _internal_recursion=False): 141 if not _internal_recursion and self._paramiko_read_workaround and len(self._write_buffer) > 0: 142 # When the SSH transport is used, docker-py internally uses Paramiko, whose 143 # Channel object supports select(), but only for reading 144 # (https://github.com/paramiko/paramiko/issues/695). 145 if self._sock.send_ready(): 146 self._write() 147 return True 148 while timeout is None or timeout > PARAMIKO_POLL_TIMEOUT: 149 result = self.select(PARAMIKO_POLL_TIMEOUT, _internal_recursion=True) 150 if self._sock.send_ready(): 151 self._read() 152 result += 1 153 if result > 0: 154 return True 155 if timeout is not None: 156 timeout -= PARAMIKO_POLL_TIMEOUT 157 self._log('select... ({0})'.format(timeout)) 158 events = self._selector.select(timeout) 159 for key, event in events: 160 if key.fileobj == self._sock: 161 self._log( 162 'select event read:{0} write:{1}'.format( 163 event & self._selectors.EVENT_READ != 0, 164 event & self._selectors.EVENT_WRITE != 0)) 165 if event & self._selectors.EVENT_READ != 0: 166 self._read() 167 if event & self._selectors.EVENT_WRITE != 0: 168 self._write() 169 result = len(events) 170 if self._paramiko_read_workaround and len(self._write_buffer) > 0: 171 if self._sock.send_ready(): 172 self._write() 173 result += 1 174 return result > 0 175 176 def is_eof(self): 177 return self._eof 178 179 def end_of_writing(self): 180 self._end_of_writing = True 181 self._handle_end_of_writing() 182 183 def consume(self): 184 stdout = [] 185 stderr = [] 186 187 def append_block(stream_id, data): 188 if stream_id == docker_socket.STDOUT: 189 stdout.append(data) 190 elif stream_id == docker_socket.STDERR: 191 stderr.append(data) 192 else: 193 raise ValueError('{0} is not a valid stream ID'.format(stream_id)) 194 195 self.end_of_writing() 196 197 self.set_block_done_callback(append_block) 198 while not self._eof: 199 self.select() 200 return b''.join(stdout), b''.join(stderr) 201 202 def write(self, str): 203 self._write_buffer += str 204 if len(self._write_buffer) == len(str): 205 self._write() 206 207 208class DockerSocketHandlerModule(DockerSocketHandlerBase): 209 def __init__(self, sock, module, selectors): 210 super(DockerSocketHandlerModule, self).__init__(sock, selectors, module.debug) 211 212 213def find_selectors(module): 214 try: 215 # ansible-base 2.10+ has selectors a compat version of selectors, which a bundled fallback: 216 from ansible.module_utils.compat import selectors 217 return selectors 218 except ImportError: 219 pass 220 try: 221 # Python 3.4+ 222 import selectors 223 return selectors 224 except ImportError: 225 pass 226 try: 227 # backport package installed in the system 228 import selectors2 229 return selectors2 230 except ImportError: 231 pass 232 module.fail_json(msg=missing_required_lib('selectors2', reason='for handling stdin')) 233