1
2import json
3import sys
4import re
5import os
6import stat
7import fcntl
8import shutil
9import hashlib
10import tempfile
11import subprocess
12import base64
13import threading
14import pipes
15import uuid
16import codecs
17
18from distutils.spawn import find_executable
19from ansible_runner.exceptions import ConfigurationError
20
21try:
22    from collections.abc import Iterable, Mapping
23except ImportError:
24    from collections import Iterable, Mapping
25from io import StringIO
26from six import string_types, PY2, PY3, text_type, binary_type
27
28
29class Bunch(object):
30
31    '''
32    Collect a bunch of variables together in an object.
33    This is a slight modification of Alex Martelli's and Doug Hudgeon's Bunch pattern.
34    '''
35
36    def __init__(self, **kwargs):
37        self.update(**kwargs)
38
39    def update(self, **kwargs):
40        self.__dict__.update(kwargs)
41
42    def get(self, key):
43        return self.__dict__.get(key)
44
45
46def isplaybook(obj):
47    '''
48    Inspects the object and returns if it is a playbook
49
50    Args:
51        obj (object): The object to be inspected by this function
52
53    Returns:
54        boolean: True if the object is a list and False if it is not
55    '''
56    return isinstance(obj, Iterable) and (not isinstance(obj, string_types) and not isinstance(obj, Mapping))
57
58
59def isinventory(obj):
60    '''
61    Inspects the object and returns if it is an inventory
62
63    Args:
64        obj (object): The object to be inspected by this function
65
66    Returns:
67        boolean: True if the object is an inventory dict and False if it is not
68    '''
69    return isinstance(obj, Mapping) or isinstance(obj, string_types)
70
71
72def check_isolation_executable_installed(isolation_executable):
73    '''
74    Check that process isolation executable (e.g. podman, docker, bwrap) is installed.
75    '''
76    cmd = [isolation_executable, '--version']
77    try:
78        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
79                                stderr=subprocess.PIPE)
80        proc.communicate()
81        return bool(proc.returncode == 0)
82    except (OSError, ValueError) as e:
83        if isinstance(e, ValueError) or getattr(e, 'errno', 1) != 2:  # ENOENT, no such file or directory
84            raise RuntimeError(f'{isolation_executable} unavailable for unexpected reason.')
85        return False
86
87
88def dump_artifact(obj, path, filename=None):
89    '''
90    Write the artifact to disk at the specified path
91
92    Args:
93        obj (string): The string object to be dumped to disk in the specified
94            path.  The artifact filename will be automatically created
95
96        path (string): The full path to the artifacts data directory.
97
98        filename (string, optional): The name of file to write the artifact to.
99            If the filename is not provided, then one will be generated.
100
101    Returns:
102        string: The full path filename for the artifact that was generated
103    '''
104    p_sha1 = None
105
106    if not os.path.exists(path):
107        os.makedirs(path, mode=0o700)
108    else:
109        p_sha1 = hashlib.sha1()
110        p_sha1.update(obj.encode(encoding='UTF-8'))
111
112    if filename is None:
113        fd, fn = tempfile.mkstemp(dir=path)
114    else:
115        fn = os.path.join(path, filename)
116
117    if os.path.exists(fn):
118        c_sha1 = hashlib.sha1()
119        with open(fn) as f:
120            contents = f.read()
121        c_sha1.update(contents.encode(encoding='UTF-8'))
122
123    if not os.path.exists(fn) or p_sha1.hexdigest() != c_sha1.hexdigest():
124        lock_fp = os.path.join(path, '.artifact_write_lock')
125        lock_fd = os.open(lock_fp, os.O_RDWR | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR)
126        fcntl.lockf(lock_fd, fcntl.LOCK_EX)
127
128        try:
129            with open(fn, 'w') as f:
130                os.chmod(fn, stat.S_IRUSR)
131                f.write(str(obj))
132        finally:
133            fcntl.lockf(lock_fd, fcntl.LOCK_UN)
134            os.close(lock_fd)
135            os.remove(lock_fp)
136
137    return fn
138
139
140def cleanup_artifact_dir(path, num_keep=0):
141    # 0 disables artifact dir cleanup/rotation
142    if num_keep < 1:
143        return
144    all_paths = sorted([os.path.join(path, p) for p in os.listdir(path)],
145                       key=lambda x: os.path.getmtime(x))
146    total_remove = len(all_paths) - num_keep
147    for f in range(total_remove):
148        shutil.rmtree(all_paths[f])
149
150
151def dump_artifacts(kwargs):
152    '''
153    Introspect the kwargs and dump objects to disk
154    '''
155    private_data_dir = kwargs.get('private_data_dir')
156    if not private_data_dir:
157        private_data_dir = tempfile.mkdtemp()
158        kwargs['private_data_dir'] = private_data_dir
159
160    if not os.path.exists(private_data_dir):
161        raise ValueError('private_data_dir path is either invalid or does not exist')
162
163    if 'role' in kwargs:
164        role = {'name': kwargs.pop('role')}
165        if 'role_vars' in kwargs:
166            role['vars'] = kwargs.pop('role_vars')
167
168        play = [{'hosts': kwargs.pop('hosts', 'all'), 'roles': [role]}]
169
170        if kwargs.pop('role_skip_facts', False):
171            play[0]['gather_facts'] = False
172
173        kwargs['playbook'] = play
174
175        if 'envvars' not in kwargs:
176            kwargs['envvars'] = {}
177
178        roles_path = kwargs.pop('roles_path', None)
179        if not roles_path:
180            roles_path = os.path.join(private_data_dir, 'roles')
181        else:
182            roles_path += ':{}'.format(os.path.join(private_data_dir, 'roles'))
183
184        kwargs['envvars']['ANSIBLE_ROLES_PATH'] = roles_path
185
186    obj = kwargs.get('playbook')
187    if obj and isplaybook(obj):
188        path = os.path.join(private_data_dir, 'project')
189        kwargs['playbook'] = dump_artifact(json.dumps(obj), path, 'main.json')
190
191    obj = kwargs.get('inventory')
192    if obj and isinventory(obj):
193        path = os.path.join(private_data_dir, 'inventory')
194        if isinstance(obj, Mapping):
195            kwargs['inventory'] = dump_artifact(json.dumps(obj), path, 'hosts.json')
196        elif isinstance(obj, string_types):
197            if not os.path.exists(obj):
198                kwargs['inventory'] = dump_artifact(obj, path, 'hosts')
199
200    for key in ('envvars', 'extravars', 'passwords', 'settings'):
201        obj = kwargs.get(key)
202        if obj and not os.path.exists(os.path.join(private_data_dir, 'env', key)):
203            path = os.path.join(private_data_dir, 'env')
204            dump_artifact(json.dumps(obj), path, key)
205            kwargs.pop(key)
206
207    for key in ('ssh_key', 'cmdline'):
208        obj = kwargs.get(key)
209        if obj and not os.path.exists(os.path.join(private_data_dir, 'env', key)):
210            path = os.path.join(private_data_dir, 'env')
211            dump_artifact(str(kwargs[key]), path, key)
212            kwargs.pop(key)
213
214
215def collect_new_events(event_path, old_events):
216    '''
217    Collect new events for the 'events' generator property
218    '''
219    dir_events = os.listdir(event_path)
220    dir_events_actual = []
221    for each_file in dir_events:
222        if re.match("^[0-9]+-.+json$", each_file):
223            if '-partial' not in each_file and each_file not in old_events.keys():
224                dir_events_actual.append(each_file)
225    dir_events_actual.sort(key=lambda filenm: int(filenm.split("-", 1)[0]))
226    for event_file in dir_events_actual:
227        with codecs.open(os.path.join(event_path, event_file), 'r', encoding='utf-8') as event_file_actual:
228            try:
229                event = json.load(event_file_actual)
230            except ValueError:
231                break
232
233        old_events[event_file] = True
234        yield event, old_events
235
236
237class OutputEventFilter(object):
238    '''
239    File-like object that looks for encoded job events in stdout data.
240    '''
241
242    EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
243
244    def __init__(self, handle, event_callback,
245                 suppress_ansible_output=False, output_json=False):
246        self._event_callback = event_callback
247        self._counter = 0
248        self._start_line = 0
249        self._handle = handle
250        self._buffer = StringIO()
251        self._last_chunk = ''
252        self._current_event_data = None
253        self.output_json = output_json
254        self.suppress_ansible_output = suppress_ansible_output
255
256    def flush(self):
257        self._handle.flush()
258
259    def write(self, data):
260        self._buffer.write(data)
261
262        # keep a sliding window of the last chunk written so we can detect
263        # event tokens and determine if we need to perform a search of the full
264        # buffer
265        should_search = '\x1b[K' in (self._last_chunk + data)
266        self._last_chunk = data
267
268        # Only bother searching the buffer if we recently saw a start/end
269        # token (\x1b[K)
270        while should_search:
271            value = self._buffer.getvalue()
272            match = self.EVENT_DATA_RE.search(value)
273            if not match:
274                break
275            try:
276                base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1))
277                event_data = json.loads(base64.b64decode(base64_data).decode('utf-8'))
278            except ValueError:
279                event_data = {}
280            event_data = self._emit_event(value[:match.start()], event_data)
281            if not self.output_json:
282                stdout_actual = event_data['stdout'] if 'stdout' in event_data else None
283            else:
284                stdout_actual = json.dumps(event_data)
285            remainder = value[match.end():]
286            self._buffer = StringIO()
287            self._buffer.write(remainder)
288
289            if stdout_actual and stdout_actual != "{}":
290                if not self.suppress_ansible_output:
291                    sys.stdout.write(
292                        stdout_actual.encode('utf-8') if PY2 else stdout_actual
293                    )
294                    sys.stdout.write("\n")
295                    sys.stdout.flush()
296                self._handle.write(stdout_actual + "\n")
297                self._handle.flush()
298
299            self._last_chunk = remainder
300        else:
301            # Verbose stdout outside of event data context
302            if data and '\n' in data and self._current_event_data is None:
303                # emit events for all complete lines we know about
304                lines = self._buffer.getvalue().splitlines(True)  # keep ends
305                remainder = None
306                # if last line is not a complete line, then exclude it
307                if '\n' not in lines[-1]:
308                    remainder = lines.pop()
309                # emit all complete lines
310                for line in lines:
311                    self._emit_event(line)
312                    if not self.suppress_ansible_output:
313                        sys.stdout.write(
314                            line.encode('utf-8') if PY2 else line
315                        )
316                    self._handle.write(line)
317                    self._handle.flush()
318                self._buffer = StringIO()
319                # put final partial line back on buffer
320                if remainder:
321                    self._buffer.write(remainder)
322
323    def close(self):
324        value = self._buffer.getvalue()
325        if value:
326            self._emit_event(value)
327            self._buffer = StringIO()
328        self._event_callback(dict(event='EOF'))
329        self._handle.close()
330
331    def _emit_event(self, buffered_stdout, next_event_data=None):
332        next_event_data = next_event_data or {}
333        if self._current_event_data:
334            event_data = self._current_event_data
335            stdout_chunks = [buffered_stdout]
336        elif buffered_stdout:
337            event_data = dict(event='verbose')
338            stdout_chunks = buffered_stdout.splitlines(True)
339        else:
340            event_data = dict()
341            stdout_chunks = []
342
343        for stdout_chunk in stdout_chunks:
344            if event_data.get('event') == 'verbose':
345                event_data['uuid'] = str(uuid.uuid4())
346            self._counter += 1
347            event_data['counter'] = self._counter
348            event_data['stdout'] = stdout_chunk[:-2] if len(stdout_chunk) > 2 else ""
349            n_lines = stdout_chunk.count('\n')
350            event_data['start_line'] = self._start_line
351            event_data['end_line'] = self._start_line + n_lines
352            self._start_line += n_lines
353            if self._event_callback:
354                self._event_callback(event_data)
355        if next_event_data.get('uuid', None):
356            self._current_event_data = next_event_data
357        else:
358            self._current_event_data = None
359        return event_data
360
361
362def open_fifo_write(path, data):
363    '''open_fifo_write opens the fifo named pipe in a new thread.
364    This blocks the thread until an external process (such as ssh-agent)
365    reads data from the pipe.
366    '''
367    os.mkfifo(path, stat.S_IRUSR | stat.S_IWUSR)
368    threading.Thread(target=lambda p, d: open(p, 'wb').write(d),
369                     args=(path, data)).start()
370
371
372def args2cmdline(*args):
373    return ' '.join([pipes.quote(a) for a in args])
374
375
376def ensure_str(s, encoding='utf-8', errors='strict'):
377    """
378    Copied from six==1.12
379
380    Coerce *s* to `str`.
381    For Python 2:
382      - `unicode` -> encoded to `str`
383      - `str` -> `str`
384    For Python 3:
385      - `str` -> `str`
386      - `bytes` -> decoded to `str`
387    """
388    if not isinstance(s, (text_type, binary_type)):
389        raise TypeError("not expecting type '%s'" % type(s))
390    if PY2 and isinstance(s, text_type):
391        s = s.encode(encoding, errors)
392    elif PY3 and isinstance(s, binary_type):
393        s = s.decode(encoding, errors)
394    return s
395
396
397def sanitize_container_name(original_name):
398    """
399    Docker and podman will only accept certain characters in container names
400    This takes a given name from user-specified values and replaces the
401    invalid characters so it can be used in docker/podman CLI commands
402    """
403    return re.sub('[^a-zA-Z0-9_-]', '_', text_type(original_name))
404
405
406def cli_mounts():
407    return [
408        {
409            'ENVS': ['SSH_AUTH_SOCK'],
410            'PATHS': [
411                {
412                    'src': '{}/.ssh/'.format(os.environ['HOME']),
413                    'dest': '/home/runner/.ssh/'
414                },
415                {
416                    'src': '/etc/ssh/ssh_known_hosts',
417                    'dest': '/etc/ssh/ssh_known_hosts'
418                }
419            ]
420        },
421    ]
422
423
424def santize_json_response(data):
425    '''
426    Removes warning message from response message emitted by ansible
427    command line utilities.
428    :param action: The string data to be santizied
429    :type action: str
430    '''
431    start_re = re.compile("{(.|\n)*", re.MULTILINE)
432    data = start_re.search(data).group().strip()
433    return data
434
435
436def get_executable_path(name):
437    exec_path = find_executable(name)
438    if exec_path is None:
439        raise ConfigurationError(f"{name} command not found")
440    return exec_path
441