1# coding: utf-8
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6#     http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11# See the License for the specific language governing permissions and
12# limitations under the License.
13from __future__ import unicode_literals
14
15import operator
16import os
17import pytest
18import tempfile
19
20import testinfra
21import testinfra.backend
22from testinfra.backend.base import BaseBackend
23from testinfra.backend.base import HostSpec
24from testinfra.backend.winrm import _quote
25from testinfra.utils.ansible_runner import AnsibleRunner
26HOSTS = [
27    "ssh://debian_stretch",
28    "safe-ssh://debian_stretch",
29    "docker://debian_stretch",
30    "paramiko://debian_stretch",
31    "ansible://debian_stretch",
32    "ansible://debian_stretch?force_ansible=True",
33]
34USER_HOSTS = [
35    "ssh://user@debian_stretch",
36    "safe-ssh://user@debian_stretch",
37    "docker://user@debian_stretch",
38    "paramiko://user@debian_stretch",
39    "ansible://user@debian_stretch",
40    "ansible://user@debian_stretch?force_ansible=True",
41]
42SUDO_HOSTS = [
43    "ssh://user@debian_stretch?sudo=True",
44    "safe-ssh://user@debian_stretch?sudo=True",
45    "docker://user@debian_stretch?sudo=True",
46    "paramiko://user@debian_stretch?sudo=True",
47    "ansible://user@debian_stretch?sudo=True",
48    "ansible://user@debian_stretch?force_ansible=True&sudo=True",
49]
50SUDO_USER_HOSTS = [
51    "ssh://debian_stretch?sudo=True&sudo_user=user",
52    "safe-ssh://debian_stretch?sudo=True&sudo_user=user",
53    "docker://debian_stretch?sudo=True&sudo_user=user",
54    "paramiko://debian_stretch?sudo=True&sudo_user=user",
55    "ansible://debian_stretch?sudo=True&sudo_user=user",
56    "ansible://debian_stretch?force_ansible=True&sudo=True&sudo_user=user",
57]
58
59
60@pytest.mark.testinfra_hosts(*(
61    HOSTS + USER_HOSTS + SUDO_HOSTS + SUDO_USER_HOSTS))
62def test_command(host):
63    assert host.check_output("true") == ""
64    # test that quotting is correct
65    assert host.run("echo a b | grep -q %s", "a c").rc == 1
66    out = host.run("echo out && echo err >&2 && exit 42")
67    assert out.rc == 42
68    if (
69        host.backend.get_connection_type() == "ansible"
70        and host.backend.force_ansible
71    ):
72        assert out.stdout_bytes == b'out'
73        assert out.stderr_bytes == b'err'
74    else:
75        assert out.stdout_bytes == b'out\n'
76        assert out.stderr_bytes == b'err\n'
77    out = host.run("commandthatdoesnotexists")
78    assert out.rc == 127
79
80
81@pytest.mark.testinfra_hosts(*HOSTS)
82def test_encoding(host):
83    # stretch image is fr_FR@ISO-8859-15
84    cmd = host.run("ls -l %s", "/é")
85    if host.backend.get_connection_type() == "docker":
86        # docker bug ?
87        assert cmd.stderr_bytes == (
88            b"ls: impossible d'acc\xe9der \xe0 '/\xef\xbf\xbd': "
89            b"Aucun fichier ou dossier de ce type\n"
90        )
91    elif (
92        host.backend.get_connection_type() == "ansible"
93        and host.backend.force_ansible
94    ):
95        # XXX: this encoding issue comes directly from ansible
96        # not sure how to handle this...
97        assert cmd.stderr == (
98            "ls: impossible d'accéder à '/é': "
99            "Aucun fichier ou dossier de ce type")
100    else:
101        assert cmd.stderr_bytes == (
102            b"ls: impossible d'acc\xe9der \xe0 '/\xe9': "
103            b"Aucun fichier ou dossier de ce type\n"
104        )
105        assert cmd.stderr == (
106            "ls: impossible d'accéder à '/é': "
107            "Aucun fichier ou dossier de ce type\n"
108        )
109
110
111@pytest.mark.testinfra_hosts(
112    "ansible://debian_stretch?force_ansible=True")
113def test_ansible_any_error_fatal(host):
114    os.environ['ANSIBLE_ANY_ERRORS_FATAL'] = 'True'
115    try:
116        out = host.run("echo out && echo err >&2 && exit 42")
117        assert out.rc == 42
118        assert out.stdout == 'out'
119        assert out.stderr == 'err'
120    finally:
121        del os.environ['ANSIBLE_ANY_ERRORS_FATAL']
122
123
124@pytest.mark.testinfra_hosts(*(USER_HOSTS + SUDO_USER_HOSTS))
125def test_user_connection(host):
126    assert host.user().name == "user"
127
128
129@pytest.mark.testinfra_hosts(*SUDO_HOSTS)
130def test_sudo(host):
131    assert host.user().name == "root"
132
133
134def test_ansible_get_hosts():
135    with tempfile.NamedTemporaryFile() as f:
136        f.write((
137            b'ungrp\n'
138            b'[g1]\n'
139            b'debian\n'
140            b'[g2]\n'
141            b'centos\n'
142            b'[g3:children]\n'
143            b'g1\n'
144            b'g2\n'
145            b'[g4:children]\n'
146            b'g3'
147        ))
148        f.flush()
149
150        def get_hosts(spec):
151            return AnsibleRunner(f.name).get_hosts(spec)
152        assert get_hosts("all") == ["centos", "debian", "ungrp"]
153        assert get_hosts("*") == ["centos", "debian", "ungrp"]
154        assert get_hosts("g1") == ["debian"]
155        assert get_hosts("*2") == ["centos"]
156        assert get_hosts("*ia*") == ["debian"]
157        assert get_hosts('*3') == ["centos", "debian"]
158        assert get_hosts('*4') == ["centos", "debian"]
159        assert get_hosts('ungrouped') == ["ungrp"]
160        assert get_hosts('un*') == ["ungrp"]
161        assert get_hosts('nope') == []
162
163
164def test_ansible_get_variables():
165    with tempfile.NamedTemporaryFile() as f:
166        f.write((
167            b'debian a=b c=d\n'
168            b'centos e=f\n'
169            b'[all:vars]\n'
170            b'a=a\n'
171            b'[g]\n'
172            b'debian\n'
173            b'[g:vars]\n'
174            b'x=z\n'
175        ))
176        f.flush()
177
178        def get_vars(host):
179            return AnsibleRunner(f.name).get_variables(host)
180        groups = {
181            'all': ['centos', 'debian'],
182            'g': ['debian'],
183            'ungrouped': ['centos'],
184        }
185        assert get_vars("debian") == {
186            'a': 'b',
187            'c': 'd',
188            'x': 'z',
189            'inventory_hostname': 'debian',
190            'group_names': ['g'],
191            'groups': groups,
192        }
193        assert get_vars("centos") == {
194            'a': 'a',
195            'e': 'f',
196            'inventory_hostname': 'centos',
197            'group_names': ['ungrouped'],
198            'groups': groups,
199        }
200
201
202@pytest.mark.parametrize('kwargs,inventory,expected', [
203    ({}, b'host ansible_connection=local ansible_become=yes ansible_become_user=u', {  # noqa
204        'NAME': 'local',
205        'sudo': True,
206        'sudo_user': 'u',
207    }),
208    ({}, b'host', {
209        'NAME': 'ssh',
210        'host.name': 'host',
211    }),
212    ({}, b'host ansible_connection=smart', {
213        'NAME': 'ssh',
214        'host.name': 'host',
215    }),
216    ({}, b'host ansible_host=127.0.1.1 ansible_user=u ansible_ssh_private_key_file=key ansible_port=2222 ansible_become=yes ansible_become_user=u', {  # noqa
217        'NAME': 'ssh',
218        'sudo': True,
219        'sudo_user': 'u',
220        'host.name': '127.0.1.1',
221        'host.port': '2222',
222        'ssh_identity_file': 'key',
223    }),
224    ({}, b'host ansible_host=127.0.1.1 ansible_user=u ansible_private_key_file=key ansible_port=2222 ansible_become=yes ansible_become_user=u', {  # noqa
225        'NAME': 'ssh',
226        'sudo': True,
227        'sudo_user': 'u',
228        'host.name': '127.0.1.1',
229        'host.port': '2222',
230        'ssh_identity_file': 'key',
231    }),
232    ({}, b'host ansible_ssh_common_args="-o LogLevel=FATAL"', {
233        'NAME': 'ssh',
234        'host.name': 'host',
235        'ssh_extra_args': '-o LogLevel=FATAL',
236    }),
237    ({}, b'host ansible_ssh_extra_args="-o LogLevel=FATAL"', {
238        'NAME': 'ssh',
239        'host.name': 'host',
240        'ssh_extra_args': '-o LogLevel=FATAL',
241    }),
242    ({}, b'host ansible_ssh_common_args="-o StrictHostKeyChecking=no" ansible_ssh_extra_args="-o LogLevel=FATAL"', {  # noqa
243        'NAME': 'ssh',
244        'host.name': 'host',
245        'ssh_extra_args': '-o StrictHostKeyChecking=no -o LogLevel=FATAL',
246    }),
247    ({}, b'host ansible_connection=docker', {
248        'NAME': 'docker',
249        'name': 'host',
250        'user': None,
251    }),
252    ({}, b'host ansible_connection=docker ansible_become=yes ansible_become_user=u ansible_user=z ansible_host=container', {  # noqa
253        'NAME': 'docker',
254        'name': 'container',
255        'user': 'z',
256        'sudo': True,
257        'sudo_user': 'u',
258    }),
259    ({'ssh_config': '/ssh_config', 'ssh_identity_file': '/id_ed25519'},
260        b'host', {
261        'NAME': 'ssh',
262        'host.name': 'host',
263        'ssh_config': '/ssh_config',
264        'ssh_identity_file': '/id_ed25519',
265    }),
266])
267def test_ansible_get_host(kwargs, inventory, expected):
268    with tempfile.NamedTemporaryFile() as f:
269        f.write(inventory + b'\n')
270        f.flush()
271        backend = AnsibleRunner(f.name).get_host('host', **kwargs).backend
272        for attr, value in expected.items():
273            assert operator.attrgetter(attr)(backend) == value
274
275
276@pytest.mark.parametrize('inventory,expected', [
277    (b'host', (
278        'ssh -o ConnectTimeout=10 -o ControlMaster=auto '
279        '-o ControlPersist=60s host true')),
280    # avoid interference between our ssh backend and ansible_ssh_extra_args
281    (b'host ansible_ssh_extra_args="-o ConnectTimeout=5 -o ControlMaster=auto '
282     b'-o ControlPersist=10s"', (
283        'ssh -o ConnectTimeout=5 -o ControlMaster=auto -o '
284        'ControlPersist=10s host true')),
285    # escape %
286    (b'host ansible_ssh_extra_args="-o ControlPath ~/.ssh/ansible/cp/%r@%h-%p"', (  # noqa
287        'ssh -o ControlPath ~/.ssh/ansible/cp/%r@%h-%p -o ConnectTimeout=10 '
288        '-o ControlMaster=auto -o ControlPersist=60s host true')),
289])
290def test_ansible_ssh_command(inventory, expected):
291    with tempfile.NamedTemporaryFile() as f:
292        f.write(inventory + b'\n')
293        f.flush()
294        backend = AnsibleRunner(f.name).get_host('host').backend
295        cmd, cmd_args = backend._build_ssh_command('true')
296        command = backend.quote(' '.join(cmd), *cmd_args)
297        assert command == expected
298
299
300def test_ansible_no_host():
301    with tempfile.NamedTemporaryFile() as f:
302        f.write(b'host\n')
303        f.flush()
304        assert AnsibleRunner(f.name).get_hosts() == ['host']
305        hosts = testinfra.get_hosts(
306            [None], connection='ansible', ansible_inventory=f.name)
307        assert [h.backend.get_pytest_id() for h in hosts] == ['ansible://host']
308    with tempfile.NamedTemporaryFile() as f:
309        # empty or no inventory should not return any hosts except for
310        # localhost
311        nohost = (
312            'No inventory was parsed (missing file ?), '
313            'only implicit localhost is available')
314        with pytest.raises(RuntimeError) as exc:
315            assert AnsibleRunner(f.name).get_hosts() == []
316        assert str(exc.value) == nohost
317        with pytest.raises(RuntimeError) as exc:
318            assert AnsibleRunner(f.name).get_hosts('local*') == []
319        assert str(exc.value) == nohost
320        assert AnsibleRunner(f.name).get_hosts('localhost') == ['localhost']
321
322
323def test_ansible_config():
324    # test testinfra use ANSIBLE_CONFIG
325    tmp = tempfile.NamedTemporaryFile
326    with tmp(suffix='.cfg') as cfg, tmp() as inventory:
327        cfg.write((
328            b'[defaults]\n'
329            b'inventory=' + inventory.name.encode() + b'\n'
330        ))
331        cfg.flush()
332        inventory.write(b'h\n')
333        inventory.flush()
334        old = os.environ.get('ANSIBLE_CONFIG')
335        os.environ['ANSIBLE_CONFIG'] = cfg.name
336        try:
337            assert AnsibleRunner(None).get_hosts('all') == ['h']
338        finally:
339            if old is not None:
340                os.environ['ANSIBLE_CONFIG'] = old
341            else:
342                del os.environ['ANSIBLE_CONFIG']
343
344
345def test_backend_importables():
346    # just check that all declared backend are importable and NAME is set
347    # correctly
348    for connection_type in testinfra.backend.BACKENDS:
349        obj = testinfra.backend.get_backend_class(connection_type)
350        assert obj.get_connection_type() == connection_type
351
352
353@pytest.mark.testinfra_hosts("docker://centos_7", "ssh://centos_7")
354def test_docker_encoding(host):
355    encoding = host.check_output(
356        "python -c 'import locale;print(locale.getpreferredencoding())'")
357    assert encoding == "ANSI_X3.4-1968"
358    string = "ťēꞩƫìṇḟřặ ṧꝕèȃǩ ửƫᵮ8"
359    assert host.check_output("echo %s | tee /tmp/s.txt", string) == string
360    assert host.file("/tmp/s.txt").content_string.strip() == string
361
362
363@pytest.mark.parametrize('hostspec,expected', [
364    ('u:P@h:p', HostSpec('h', 'p', 'u', 'P')),
365    ('u@h:p', HostSpec('h', 'p', 'u', None)),
366    ('u:P@h', HostSpec('h', None, 'u', 'P')),
367    ('u@h', HostSpec('h', None, 'u', None)),
368    ('h', HostSpec('h', None, None, None)),
369    ('pr%C3%A9nom@h', HostSpec('h', None, 'prénom', None)),
370    ('pr%C3%A9nom:p%40ss%3Aw0rd@h', HostSpec('h', None, 'prénom',
371                                             'p@ss:w0rd')),
372    # ipv6 matching
373    ('[2001:db8:a0b:12f0::1]',
374     HostSpec('2001:db8:a0b:12f0::1', None, None, None)),
375    ('user:password@[2001:db8:a0b:12f0::1]',
376     HostSpec('2001:db8:a0b:12f0::1', None, 'user', 'password')),
377    ('user:password@[2001:4800:7819:103:be76:4eff:fe04:9229]:22',
378     HostSpec('2001:4800:7819:103:be76:4eff:fe04:9229', '22',
379              'user', 'password')),
380])
381def test_parse_hostspec(hostspec, expected):
382    assert BaseBackend.parse_hostspec(hostspec) == expected
383
384
385@pytest.mark.parametrize('hostspec,pod,container,namespace,kubeconfig', [
386    ('kubectl://pod', 'pod', None, None, None),
387    ('kubectl://pod?namespace=n', 'pod', None, 'n', None),
388    ('kubectl://pod?container=c&namespace=n', 'pod', 'c', 'n', None),
389    ('kubectl://pod?namespace=n&kubeconfig=k', 'pod', None, 'n', 'k')
390])
391def test_kubectl_hostspec(hostspec, pod, container, namespace, kubeconfig):
392    backend = testinfra.get_host(hostspec).backend
393    assert backend.name == pod
394    assert backend.container == container
395    assert backend.namespace == namespace
396    assert backend.kubeconfig == kubeconfig
397
398
399@pytest.mark.parametrize('arg_string,expected', [
400    (
401        'C:\\Users\\vagrant\\This Dir\\salt',
402        '"C:\\Users\\vagrant\\This Dir\\salt"'
403    ),
404    (
405        'C:\\Users\\vagrant\\AppData\\Local\\Temp\\kitchen\\etc\\salt',
406        '"C:\\Users\\vagrant\\AppData\\Local\\Temp\\kitchen\\etc\\salt"'
407    ),
408])
409def test_winrm_quote(arg_string, expected):
410    assert _quote(arg_string) == expected
411
412
413@pytest.mark.parametrize('hostspec,expected', [
414    ('ssh://h',
415        'ssh -o ConnectTimeout=10 -o ControlMaster=auto '
416        '-o ControlPersist=60s h true'),
417    ('ssh://h?timeout=1',
418        'ssh -o ConnectTimeout=1 -o ControlMaster=auto '
419        '-o ControlPersist=60s h true'),
420    ('ssh://u@h:2222',
421        'ssh -o User=u -o Port=2222 -o ConnectTimeout=10 '
422        '-o ControlMaster=auto -o ControlPersist=60s h true'),
423    ('ssh://h:2222?ssh_config=/f',
424        'ssh -F /f -o Port=2222 -o ConnectTimeout=10 '
425        '-o ControlMaster=auto -o ControlPersist=60s h true'),
426    ('ssh://u@h?ssh_identity_file=/id',
427        'ssh -o User=u -i /id -o ConnectTimeout=10 '
428        '-o ControlMaster=auto -o ControlPersist=60s h true'),
429    ('ssh://h?controlpersist=1',
430        'ssh -o ConnectTimeout=10 '
431        '-o ControlMaster=auto -o ControlPersist=1s h true'),
432    ('ssh://h?controlpersist=0',
433        'ssh -o ConnectTimeout=10 h true')
434])
435def test_ssh_hostspec(hostspec, expected):
436    backend = testinfra.get_host(hostspec).backend
437    cmd, cmd_args = backend._build_ssh_command('true')
438    command = backend.quote(' '.join(cmd), *cmd_args)
439    assert command == expected
440