1 2import json 3import sys 4import re 5import os 6import stat 7import fcntl 8import shutil 9import hashlib 10import tempfile 11import subprocess 12import base64 13import threading 14import pipes 15import uuid 16import codecs 17 18from distutils.spawn import find_executable 19from ansible_runner.exceptions import ConfigurationError 20 21try: 22 from collections.abc import Iterable, Mapping 23except ImportError: 24 from collections import Iterable, Mapping 25from io import StringIO 26from six import string_types, PY2, PY3, text_type, binary_type 27 28 29class Bunch(object): 30 31 ''' 32 Collect a bunch of variables together in an object. 33 This is a slight modification of Alex Martelli's and Doug Hudgeon's Bunch pattern. 34 ''' 35 36 def __init__(self, **kwargs): 37 self.update(**kwargs) 38 39 def update(self, **kwargs): 40 self.__dict__.update(kwargs) 41 42 def get(self, key): 43 return self.__dict__.get(key) 44 45 46def isplaybook(obj): 47 ''' 48 Inspects the object and returns if it is a playbook 49 50 Args: 51 obj (object): The object to be inspected by this function 52 53 Returns: 54 boolean: True if the object is a list and False if it is not 55 ''' 56 return isinstance(obj, Iterable) and (not isinstance(obj, string_types) and not isinstance(obj, Mapping)) 57 58 59def isinventory(obj): 60 ''' 61 Inspects the object and returns if it is an inventory 62 63 Args: 64 obj (object): The object to be inspected by this function 65 66 Returns: 67 boolean: True if the object is an inventory dict and False if it is not 68 ''' 69 return isinstance(obj, Mapping) or isinstance(obj, string_types) 70 71 72def check_isolation_executable_installed(isolation_executable): 73 ''' 74 Check that process isolation executable (e.g. podman, docker, bwrap) is installed. 75 ''' 76 cmd = [isolation_executable, '--version'] 77 try: 78 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 79 stderr=subprocess.PIPE) 80 proc.communicate() 81 return bool(proc.returncode == 0) 82 except (OSError, ValueError) as e: 83 if isinstance(e, ValueError) or getattr(e, 'errno', 1) != 2: # ENOENT, no such file or directory 84 raise RuntimeError(f'{isolation_executable} unavailable for unexpected reason.') 85 return False 86 87 88def dump_artifact(obj, path, filename=None): 89 ''' 90 Write the artifact to disk at the specified path 91 92 Args: 93 obj (string): The string object to be dumped to disk in the specified 94 path. The artifact filename will be automatically created 95 96 path (string): The full path to the artifacts data directory. 97 98 filename (string, optional): The name of file to write the artifact to. 99 If the filename is not provided, then one will be generated. 100 101 Returns: 102 string: The full path filename for the artifact that was generated 103 ''' 104 p_sha1 = None 105 106 if not os.path.exists(path): 107 os.makedirs(path, mode=0o700) 108 else: 109 p_sha1 = hashlib.sha1() 110 p_sha1.update(obj.encode(encoding='UTF-8')) 111 112 if filename is None: 113 fd, fn = tempfile.mkstemp(dir=path) 114 else: 115 fn = os.path.join(path, filename) 116 117 if os.path.exists(fn): 118 c_sha1 = hashlib.sha1() 119 with open(fn) as f: 120 contents = f.read() 121 c_sha1.update(contents.encode(encoding='UTF-8')) 122 123 if not os.path.exists(fn) or p_sha1.hexdigest() != c_sha1.hexdigest(): 124 lock_fp = os.path.join(path, '.artifact_write_lock') 125 lock_fd = os.open(lock_fp, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR) 126 fcntl.lockf(lock_fd, fcntl.LOCK_EX) 127 128 try: 129 with open(fn, 'w') as f: 130 os.chmod(fn, stat.S_IRUSR) 131 f.write(str(obj)) 132 finally: 133 fcntl.lockf(lock_fd, fcntl.LOCK_UN) 134 os.close(lock_fd) 135 os.remove(lock_fp) 136 137 return fn 138 139 140def cleanup_artifact_dir(path, num_keep=0): 141 # 0 disables artifact dir cleanup/rotation 142 if num_keep < 1: 143 return 144 all_paths = sorted([os.path.join(path, p) for p in os.listdir(path)], 145 key=lambda x: os.path.getmtime(x)) 146 total_remove = len(all_paths) - num_keep 147 for f in range(total_remove): 148 shutil.rmtree(all_paths[f]) 149 150 151def dump_artifacts(kwargs): 152 ''' 153 Introspect the kwargs and dump objects to disk 154 ''' 155 private_data_dir = kwargs.get('private_data_dir') 156 if not private_data_dir: 157 private_data_dir = tempfile.mkdtemp() 158 kwargs['private_data_dir'] = private_data_dir 159 160 if not os.path.exists(private_data_dir): 161 raise ValueError('private_data_dir path is either invalid or does not exist') 162 163 if 'role' in kwargs: 164 role = {'name': kwargs.pop('role')} 165 if 'role_vars' in kwargs: 166 role['vars'] = kwargs.pop('role_vars') 167 168 play = [{'hosts': kwargs.pop('hosts', 'all'), 'roles': [role]}] 169 170 if kwargs.pop('role_skip_facts', False): 171 play[0]['gather_facts'] = False 172 173 kwargs['playbook'] = play 174 175 if 'envvars' not in kwargs: 176 kwargs['envvars'] = {} 177 178 roles_path = kwargs.pop('roles_path', None) 179 if not roles_path: 180 roles_path = os.path.join(private_data_dir, 'roles') 181 else: 182 roles_path += ':{}'.format(os.path.join(private_data_dir, 'roles')) 183 184 kwargs['envvars']['ANSIBLE_ROLES_PATH'] = roles_path 185 186 obj = kwargs.get('playbook') 187 if obj and isplaybook(obj): 188 path = os.path.join(private_data_dir, 'project') 189 kwargs['playbook'] = dump_artifact(json.dumps(obj), path, 'main.json') 190 191 obj = kwargs.get('inventory') 192 if obj and isinventory(obj): 193 path = os.path.join(private_data_dir, 'inventory') 194 if isinstance(obj, Mapping): 195 kwargs['inventory'] = dump_artifact(json.dumps(obj), path, 'hosts.json') 196 elif isinstance(obj, string_types): 197 if not os.path.exists(obj): 198 kwargs['inventory'] = dump_artifact(obj, path, 'hosts') 199 200 for key in ('envvars', 'extravars', 'passwords', 'settings'): 201 obj = kwargs.get(key) 202 if obj and not os.path.exists(os.path.join(private_data_dir, 'env', key)): 203 path = os.path.join(private_data_dir, 'env') 204 dump_artifact(json.dumps(obj), path, key) 205 kwargs.pop(key) 206 207 for key in ('ssh_key', 'cmdline'): 208 obj = kwargs.get(key) 209 if obj and not os.path.exists(os.path.join(private_data_dir, 'env', key)): 210 path = os.path.join(private_data_dir, 'env') 211 dump_artifact(str(kwargs[key]), path, key) 212 kwargs.pop(key) 213 214 215def collect_new_events(event_path, old_events): 216 ''' 217 Collect new events for the 'events' generator property 218 ''' 219 dir_events = os.listdir(event_path) 220 dir_events_actual = [] 221 for each_file in dir_events: 222 if re.match("^[0-9]+-.+json$", each_file): 223 if '-partial' not in each_file and each_file not in old_events.keys(): 224 dir_events_actual.append(each_file) 225 dir_events_actual.sort(key=lambda filenm: int(filenm.split("-", 1)[0])) 226 for event_file in dir_events_actual: 227 with codecs.open(os.path.join(event_path, event_file), 'r', encoding='utf-8') as event_file_actual: 228 try: 229 event = json.load(event_file_actual) 230 except ValueError: 231 break 232 233 old_events[event_file] = True 234 yield event, old_events 235 236 237class OutputEventFilter(object): 238 ''' 239 File-like object that looks for encoded job events in stdout data. 240 ''' 241 242 EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K') 243 244 def __init__(self, handle, event_callback, 245 suppress_ansible_output=False, output_json=False): 246 self._event_callback = event_callback 247 self._counter = 0 248 self._start_line = 0 249 self._handle = handle 250 self._buffer = StringIO() 251 self._last_chunk = '' 252 self._current_event_data = None 253 self.output_json = output_json 254 self.suppress_ansible_output = suppress_ansible_output 255 256 def flush(self): 257 self._handle.flush() 258 259 def write(self, data): 260 self._buffer.write(data) 261 262 # keep a sliding window of the last chunk written so we can detect 263 # event tokens and determine if we need to perform a search of the full 264 # buffer 265 should_search = '\x1b[K' in (self._last_chunk + data) 266 self._last_chunk = data 267 268 # Only bother searching the buffer if we recently saw a start/end 269 # token (\x1b[K) 270 while should_search: 271 value = self._buffer.getvalue() 272 match = self.EVENT_DATA_RE.search(value) 273 if not match: 274 break 275 try: 276 base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1)) 277 event_data = json.loads(base64.b64decode(base64_data).decode('utf-8')) 278 except ValueError: 279 event_data = {} 280 event_data = self._emit_event(value[:match.start()], event_data) 281 if not self.output_json: 282 stdout_actual = event_data['stdout'] if 'stdout' in event_data else None 283 else: 284 stdout_actual = json.dumps(event_data) 285 remainder = value[match.end():] 286 self._buffer = StringIO() 287 self._buffer.write(remainder) 288 289 if stdout_actual and stdout_actual != "{}": 290 if not self.suppress_ansible_output: 291 sys.stdout.write( 292 stdout_actual.encode('utf-8') if PY2 else stdout_actual 293 ) 294 sys.stdout.write("\n") 295 sys.stdout.flush() 296 self._handle.write(stdout_actual + "\n") 297 self._handle.flush() 298 299 self._last_chunk = remainder 300 else: 301 # Verbose stdout outside of event data context 302 if data and '\n' in data and self._current_event_data is None: 303 # emit events for all complete lines we know about 304 lines = self._buffer.getvalue().splitlines(True) # keep ends 305 remainder = None 306 # if last line is not a complete line, then exclude it 307 if '\n' not in lines[-1]: 308 remainder = lines.pop() 309 # emit all complete lines 310 for line in lines: 311 self._emit_event(line) 312 if not self.suppress_ansible_output: 313 sys.stdout.write( 314 line.encode('utf-8') if PY2 else line 315 ) 316 self._handle.write(line) 317 self._handle.flush() 318 self._buffer = StringIO() 319 # put final partial line back on buffer 320 if remainder: 321 self._buffer.write(remainder) 322 323 def close(self): 324 value = self._buffer.getvalue() 325 if value: 326 self._emit_event(value) 327 self._buffer = StringIO() 328 self._event_callback(dict(event='EOF')) 329 self._handle.close() 330 331 def _emit_event(self, buffered_stdout, next_event_data=None): 332 next_event_data = next_event_data or {} 333 if self._current_event_data: 334 event_data = self._current_event_data 335 stdout_chunks = [buffered_stdout] 336 elif buffered_stdout: 337 event_data = dict(event='verbose') 338 stdout_chunks = buffered_stdout.splitlines(True) 339 else: 340 event_data = dict() 341 stdout_chunks = [] 342 343 for stdout_chunk in stdout_chunks: 344 if event_data.get('event') == 'verbose': 345 event_data['uuid'] = str(uuid.uuid4()) 346 self._counter += 1 347 event_data['counter'] = self._counter 348 event_data['stdout'] = stdout_chunk[:-2] if len(stdout_chunk) > 2 else "" 349 n_lines = stdout_chunk.count('\n') 350 event_data['start_line'] = self._start_line 351 event_data['end_line'] = self._start_line + n_lines 352 self._start_line += n_lines 353 if self._event_callback: 354 self._event_callback(event_data) 355 if next_event_data.get('uuid', None): 356 self._current_event_data = next_event_data 357 else: 358 self._current_event_data = None 359 return event_data 360 361 362def open_fifo_write(path, data): 363 '''open_fifo_write opens the fifo named pipe in a new thread. 364 This blocks the thread until an external process (such as ssh-agent) 365 reads data from the pipe. 366 ''' 367 os.mkfifo(path, stat.S_IRUSR | stat.S_IWUSR) 368 threading.Thread(target=lambda p, d: open(p, 'wb').write(d), 369 args=(path, data)).start() 370 371 372def args2cmdline(*args): 373 return ' '.join([pipes.quote(a) for a in args]) 374 375 376def ensure_str(s, encoding='utf-8', errors='strict'): 377 """ 378 Copied from six==1.12 379 380 Coerce *s* to `str`. 381 For Python 2: 382 - `unicode` -> encoded to `str` 383 - `str` -> `str` 384 For Python 3: 385 - `str` -> `str` 386 - `bytes` -> decoded to `str` 387 """ 388 if not isinstance(s, (text_type, binary_type)): 389 raise TypeError("not expecting type '%s'" % type(s)) 390 if PY2 and isinstance(s, text_type): 391 s = s.encode(encoding, errors) 392 elif PY3 and isinstance(s, binary_type): 393 s = s.decode(encoding, errors) 394 return s 395 396 397def sanitize_container_name(original_name): 398 """ 399 Docker and podman will only accept certain characters in container names 400 This takes a given name from user-specified values and replaces the 401 invalid characters so it can be used in docker/podman CLI commands 402 """ 403 return re.sub('[^a-zA-Z0-9_-]', '_', text_type(original_name)) 404 405 406def cli_mounts(): 407 return [ 408 { 409 'ENVS': ['SSH_AUTH_SOCK'], 410 'PATHS': [ 411 { 412 'src': '{}/.ssh/'.format(os.environ['HOME']), 413 'dest': '/home/runner/.ssh/' 414 }, 415 { 416 'src': '/etc/ssh/ssh_known_hosts', 417 'dest': '/etc/ssh/ssh_known_hosts' 418 } 419 ] 420 }, 421 ] 422 423 424def santize_json_response(data): 425 ''' 426 Removes warning message from response message emitted by ansible 427 command line utilities. 428 :param action: The string data to be santizied 429 :type action: str 430 ''' 431 start_re = re.compile("{(.|\n)*", re.MULTILINE) 432 data = start_re.search(data).group().strip() 433 return data 434 435 436def get_executable_path(name): 437 exec_path = find_executable(name) 438 if exec_path is None: 439 raise ConfigurationError(f"{name} command not found") 440 return exec_path 441