1# coding: utf-8
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6#     http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11# See the License for the specific language governing permissions and
12# limitations under the License.
13from __future__ import unicode_literals
14
15import crypt
16import datetime
17import re
18import time
19
20import pytest
21
22from ipaddress import ip_address
23from ipaddress import IPv4Address
24from ipaddress import IPv6Address
25
26from testinfra.modules.socket import parse_socketspec
27
28all_images = pytest.mark.testinfra_hosts(*[
29    "docker://{}".format(image)
30    for image in (
31        "alpine", "archlinux", "centos_6", "centos_7",
32        "debian_stretch", "ubuntu_xenial"
33    )
34])
35
36
37@all_images
38def test_package(host, docker_image):
39    assert not host.package('zsh').is_installed
40    if docker_image in ("alpine", "archlinux"):
41        name = "openssh"
42    else:
43        name = "openssh-server"
44
45    ssh = host.package(name)
46    version = {
47        "alpine": "7.",
48        "archlinux": "8.",
49        "centos_6": "5.",
50        "centos_7": "7.",
51        "debian_stretch": "1:7.4",
52        "ubuntu_xenial": "1:7.2"
53    }[docker_image]
54    assert ssh.is_installed
55    assert ssh.version.startswith(version)
56    release = {
57        "alpine": "r5",
58        "archlinux": None,
59        "centos_6": ".el6",
60        "centos_7": ".el7",
61        "debian_stretch": None,
62        "ubuntu_xenial": None
63    }[docker_image]
64    if release is None:
65        with pytest.raises(NotImplementedError):
66            ssh.release
67    else:
68        assert release in ssh.release
69
70
71def test_held_package(host):
72    python = host.package("python")
73    assert python.is_installed
74    assert python.version.startswith("2.7.")
75
76
77@pytest.mark.destructive
78def test_uninstalled_package_version(host):
79    with pytest.raises(AssertionError) as excinfo:
80        host.package('zsh').version
81    assert 'Unexpected exit code 1 for CommandResult' in str(excinfo.value)
82    assert host.package('sudo').is_installed
83    host.check_output('apt-get -y remove sudo')
84    assert not host.package('sudo').is_installed
85    with pytest.raises(AssertionError) as excinfo:
86        host.package('sudo').version
87    assert ('The package sudo is not installed, dpkg-query output: '
88            'deinstall ok config-files 1.8.') in str(excinfo.value)
89
90
91@all_images
92def test_systeminfo(host, docker_image):
93    assert host.system_info.type == "linux"
94
95    release, distribution, codename = {
96        "alpine": (r"^3\.9\.", "alpine", None),
97        "archlinux": ("rolling", "arch", None),
98        "centos_6": (r"^6", "CentOS", None),
99        "centos_7": (r"^7$", "centos", None),
100        "debian_stretch": (r"^9\.", "debian", "stretch"),
101        "ubuntu_xenial": (r"^16\.04$", "ubuntu", "xenial")
102    }[docker_image]
103
104    assert host.system_info.distribution == distribution
105    assert host.system_info.codename == codename
106    assert re.match(release, host.system_info.release)
107
108
109@all_images
110def test_ssh_service(host, docker_image):
111    if docker_image in ("centos_6", "centos_7",
112                        "alpine", "archlinux"):
113        name = "sshd"
114    else:
115        name = "ssh"
116
117    ssh = host.service(name)
118    if docker_image == "ubuntu_xenial":
119        assert not ssh.is_running
120    else:
121        # wait at max 10 seconds for ssh is running
122        for _ in range(10):
123            if ssh.is_running:
124                break
125            time.sleep(1)
126        else:
127            if docker_image == "archlinux":
128                raise pytest.skip('FIXME: flapping test')
129            raise AssertionError('ssh is not running')
130
131    if docker_image == "ubuntu_xenial":
132        assert not ssh.is_enabled
133    else:
134        assert ssh.is_enabled
135
136
137@pytest.mark.parametrize("name,running,enabled", [
138    ("ntp", False, True),
139    ("salt-minion", False, False),
140])
141def test_service(host, name, running, enabled):
142    service = host.service(name)
143    assert service.is_running == running
144    assert service.is_enabled == enabled
145
146
147def test_salt(host):
148    ssh_version = host.salt("pkg.version", "openssh-server", local=True)
149    assert ssh_version.startswith("1:7.4")
150
151
152def test_puppet_resource(host):
153    resource = host.puppet_resource("package", "openssh-server")
154    assert resource["openssh-server"]["ensure"].startswith("1:7.4")
155
156
157def test_facter(host):
158    assert host.facter()["lsbdistcodename"] == "stretch"
159    assert host.facter("lsbdistcodename") == {
160        "lsbdistcodename": "stretch",
161    }
162
163
164def test_sysctl(host):
165    assert host.sysctl("kernel.hostname") == host.check_output("hostname")
166    assert isinstance(host.sysctl("kernel.panic"), int)
167
168
169def test_parse_socketspec():
170    assert parse_socketspec("tcp://22") == ("tcp", None, 22)
171    assert parse_socketspec("tcp://:::22") == ("tcp", "::", 22)
172    assert parse_socketspec("udp://0.0.0.0:22") == ("udp", "0.0.0.0", 22)
173    assert parse_socketspec("unix://can:be.any/thing:22") == (
174        "unix", "can:be.any/thing:22", None)
175
176
177def test_socket(host):
178    listening = host.socket.get_listening_sockets()
179    for spec in (
180        "tcp://0.0.0.0:22",
181        "tcp://:::22",
182        "unix:///run/systemd/private",
183    ):
184        assert spec in listening
185    for spec in (
186        "tcp://22",
187        "tcp://0.0.0.0:22",
188        "tcp://127.0.0.1:22",
189        "tcp://:::22",
190        "tcp://::1:22",
191        "unix:///run/systemd/private",
192    ):
193        socket = host.socket(spec)
194        assert socket.is_listening
195
196    assert not host.socket("tcp://4242").is_listening
197
198    if not host.backend.get_connection_type() == "docker":
199        # FIXME
200        for spec in (
201            "tcp://22",
202            "tcp://0.0.0.0:22",
203        ):
204            assert len(host.socket(spec).clients) >= 1
205
206
207@all_images
208def test_process(host, docker_image):
209    init = host.process.get(pid=1)
210    assert init.ppid == 0
211    if docker_image != "alpine":
212        # busybox ps doesn't have a euid equivalent
213        assert init.euid == 0
214    assert init.user == "root"
215
216    args, comm = {
217        "alpine": ("/sbin/init", "init"),
218        "archlinux": ("/usr/sbin/init", "systemd"),
219        "centos_6": ("/usr/sbin/sshd -D", "sshd"),
220        "centos_7": ("/usr/sbin/init", "systemd"),
221        "debian_stretch": ("/sbin/init", "systemd"),
222        "ubuntu_xenial": ("/sbin/init", "systemd")
223    }[docker_image]
224    assert init.args == args
225    assert init.comm == comm
226
227
228def test_user(host):
229    user = host.user("sshd")
230    assert user.exists
231    assert user.name == "sshd"
232    assert user.uid == 106
233    assert user.gid == 65534
234    assert user.group == "nogroup"
235    assert user.gids == [65534]
236    assert user.groups == ["nogroup"]
237    assert user.shell == "/usr/sbin/nologin"
238    assert user.home == "/run/sshd"
239    assert user.password == "*"
240
241
242def test_user_user(host):
243    user = host.user("user")
244    assert user.exists
245    assert user.gecos == "gecos.comment"
246
247
248def test_user_expiration_date(host):
249    assert host.user("root").expiration_date is None
250    assert host.user("user").expiration_date == (
251        datetime.datetime(2024, 10, 4, 0, 0))
252
253
254def test_nonexistent_user(host):
255    assert not host.user("zzzzzzzzzz").exists
256
257
258def test_current_user(host):
259    assert host.user().name == "root"
260    pw = host.user().password
261    assert crypt.crypt("foo", pw) == pw
262
263
264def test_group(host):
265    assert host.group("root").exists
266    assert host.group("root").gid == 0
267
268
269def test_empty_command_output(host):
270    assert host.check_output("printf ''") == ""
271
272
273def test_local_command(host):
274    assert host.get_host("local://").check_output("true") == ""
275
276
277def test_file(host):
278    host.check_output("mkdir -p /d && printf foo > /d/f && chmod 600 /d/f")
279    d = host.file("/d")
280    assert d.is_directory
281    assert not d.is_file
282    f = host.file("/d/f")
283    assert f.exists
284    assert f.is_file
285    assert f.content == b"foo"
286    assert f.content_string == "foo"
287    assert f.user == "root"
288    assert f.uid == 0
289    assert f.gid == 0
290    assert f.group == "root"
291    assert f.mode == 0o600
292    assert f.contains("fo")
293    assert not f.is_directory
294    assert not f.is_symlink
295    assert not f.is_pipe
296    assert f.linked_to == "/d/f"
297    assert f.size == 3
298    assert f.md5sum == "acbd18db4cc2f85cedef654fccc4a4d8"
299    assert f.sha256sum == (
300        "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"
301    )
302    host.check_output("ln -fsn /d/f /d/l")
303    link = host.file("/d/l")
304    assert link.is_symlink
305    assert link.is_file
306    assert link.linked_to == "/d/f"
307    assert link.linked_to == f
308    assert f == host.file('/d/f')
309    assert not d == f
310
311    host.check_output("rm -f /d/p && mkfifo /d/p")
312    assert host.file("/d/p").is_pipe
313
314
315def test_ansible_unavailable(host):
316    expected = ('Ansible module is only available with '
317                'ansible connection backend')
318    with pytest.raises(RuntimeError) as excinfo:
319        host.ansible("setup")
320    assert expected in str(excinfo.value)
321    with pytest.raises(RuntimeError) as excinfo:
322        host.ansible.get_variables()
323    assert expected in str(excinfo.value)
324
325
326@pytest.mark.testinfra_hosts("ansible://debian_stretch")
327def test_ansible_module(host):
328    setup = host.ansible("setup")["ansible_facts"]
329    assert setup["ansible_lsb"]["codename"] == "stretch"
330    passwd = host.ansible("file", "path=/etc/passwd state=file")
331    assert passwd["changed"] is False
332    assert passwd["gid"] == 0
333    assert passwd["group"] == "root"
334    assert passwd["mode"] == "0644"
335    assert passwd["owner"] == "root"
336    assert isinstance(passwd["size"], int)
337    assert passwd["path"] == "/etc/passwd"
338    # seems to vary with differents docker fs backend
339    assert passwd["state"] in ("file", "hard")
340    assert passwd["uid"] == 0
341
342    variables = host.ansible.get_variables()
343    assert variables["myvar"] == "foo"
344    assert variables["myhostvar"] == "bar"
345    assert variables["mygroupvar"] == "qux"
346    assert variables["inventory_hostname"] == "debian_stretch"
347    assert variables["group_names"] == ["testgroup"]
348    assert variables["groups"] == {
349        "all": ["debian_stretch"],
350        "testgroup": ["debian_stretch"],
351    }
352
353    with pytest.raises(host.ansible.AnsibleException) as excinfo:
354        host.ansible("command", "zzz")
355    assert excinfo.value.result['msg'] == \
356        "Skipped. You might want to try check=False"
357
358    try:
359        host.ansible("command", "zzz", check=False)
360    except host.ansible.AnsibleException as exc:
361        assert exc.result['rc'] == 2
362        # notez que the debian stretch container is set to LANG=fr_FR
363        assert exc.result['msg'] == ('[Errno 2] Aucun fichier ou dossier '
364                                     'de ce type')
365
366    result = host.ansible("command", "echo foo", check=False)
367    assert result['stdout'] == 'foo'
368
369
370@pytest.mark.testinfra_hosts("ansible://debian_stretch",
371                             "ansible://user@debian_stretch")
372def test_ansible_module_become(host):
373    user_name = host.user().name
374    assert host.ansible('shell', 'echo $USER',
375                        check=False)['stdout'] == user_name
376    assert host.ansible('shell', 'echo $USER',
377                        check=False, become=True)['stdout'] == 'root'
378
379    with host.sudo():
380        assert host.user().name == 'root'
381        assert host.ansible('shell', 'echo $USER',
382                            check=False)['stdout'] == user_name
383        assert host.ansible('shell', 'echo $USER',
384                            check=False, become=True)['stdout'] == 'root'
385
386
387@pytest.mark.destructive
388def test_supervisor(host):
389    # Wait supervisord is running
390    for _ in range(20):
391        if host.service("supervisor").is_running:
392            break
393        time.sleep(.5)
394    else:
395        raise RuntimeError("No running supervisor")
396
397    for _ in range(20):
398        service = host.supervisor("tail")
399        if service.status == "RUNNING":
400            break
401        else:
402            assert service.status == "STARTING"
403            time.sleep(.5)
404    else:
405        raise RuntimeError("No running tail in supervisor")
406
407    assert service.is_running
408    proc = host.process.get(pid=service.pid)
409    assert proc.comm == "tail"
410
411    services = host.supervisor.get_services()
412    assert len(services) == 1
413    assert services[0].name == "tail"
414    assert services[0].is_running
415    assert services[0].pid == service.pid
416
417    host.run("supervisorctl stop tail")
418    service = host.supervisor("tail")
419    assert not service.is_running
420    assert service.status == "STOPPED"
421    assert service.pid is None
422
423    host.run("service supervisor stop")
424    assert not host.service("supervisor").is_running
425    with pytest.raises(RuntimeError) as excinfo:
426        host.supervisor("tail").is_running
427    assert 'Is supervisor running' in str(excinfo.value)
428
429
430def test_mountpoint(host):
431    root_mount = host.mount_point('/')
432    assert root_mount.exists
433    assert isinstance(root_mount.options, list)
434    assert 'rw' in root_mount.options
435    assert root_mount.filesystem
436
437    fake_mount = host.mount_point('/fake/mount')
438    assert not fake_mount.exists
439
440    mountpoints = host.mount_point.get_mountpoints()
441    assert mountpoints
442    assert all(isinstance(m, host.mount_point) for m in mountpoints)
443    assert len([m for m in mountpoints if m.path == "/"]) == 1
444
445
446def test_sudo_from_root(host):
447    assert host.user().name == "root"
448    with host.sudo("user"):
449        assert host.user().name == "user"
450    assert host.user().name == "root"
451
452
453def test_sudo_fail_from_root(host):
454    assert host.user().name == "root"
455    with pytest.raises(AssertionError) as exc:
456        with host.sudo("unprivileged"):
457            assert host.user().name == "unprivileged"
458            host.check_output('ls /root/invalid')
459    assert str(exc.value).startswith('Unexpected exit code')
460    with host.sudo():
461        assert host.user().name == "root"
462
463
464@pytest.mark.testinfra_hosts("docker://user@debian_stretch")
465def test_sudo_to_root(host):
466    assert host.user().name == "user"
467    with host.sudo():
468        assert host.user().name == "root"
469        # Test nested sudo
470        with host.sudo("www-data"):
471            assert host.user().name == "www-data"
472    assert host.user().name == "user"
473
474
475def test_command_execution(host):
476    assert host.run("false").failed
477    assert host.run("true").succeeded
478
479
480def test_pip_package(host):
481    assert host.pip_package.get_packages()['pip']['version'] == '9.0.1'
482    pytest = host.pip_package.get_packages(pip_path='/v/bin/pip')['pytest']
483    assert pytest['version'].startswith('2.')
484    outdated = host.pip_package.get_outdated_packages(
485        pip_path='/v/bin/pip')['pytest']
486    assert outdated['current'] == pytest['version']
487    assert int(outdated['latest'].split('.')[0]) > 2
488
489
490def test_environment_home(host):
491    assert host.environment().get('HOME') == '/root'
492
493
494def test_iptables(host):
495    ssh_rule_str = \
496        '-A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT'
497    vip_redirect_rule_str = \
498        '-A PREROUTING -d 192.168.0.1/32 -j REDIRECT'
499    rules = host.iptables.rules()
500    input_rules = host.iptables.rules('filter', 'INPUT')
501    nat_rules = host.iptables.rules('nat')
502    nat_prerouting_rules = host.iptables.rules('nat', 'PREROUTING')
503    assert ssh_rule_str in rules
504    assert ssh_rule_str in input_rules
505    assert vip_redirect_rule_str in nat_rules
506    assert vip_redirect_rule_str in nat_prerouting_rules
507    # test ip6tables call works; ipv6 setup is a whole huge thing, but
508    # ensure we at least see the headings
509    v6_rules = host.iptables.rules(version=6)
510    assert '-P INPUT ACCEPT' in v6_rules
511    assert '-P FORWARD ACCEPT' in v6_rules
512    assert '-P OUTPUT ACCEPT' in v6_rules
513    v6_filter_rules = host.iptables.rules('filter', 'INPUT', version=6)
514    assert '-P INPUT ACCEPT' in v6_filter_rules
515
516
517@all_images
518def test_addr(host):
519    non_resolvable = host.addr('some_non_resolvable_host')
520    assert not non_resolvable.is_resolvable
521    assert not non_resolvable.is_reachable
522    assert not non_resolvable.port(80).is_reachable
523
524    # Some arbitrary internal IP, hopefully non reachable
525    # IP addresses are always resolvable no matter what
526    non_reachable_ip = host.addr('10.42.13.73')
527    assert non_reachable_ip.is_resolvable
528    assert non_reachable_ip.ipv4_addresses == ['10.42.13.73']
529    assert not non_reachable_ip.is_reachable
530    assert not non_reachable_ip.port(80).is_reachable
531
532    google_dns = host.addr('8.8.8.8')
533    assert google_dns.is_resolvable
534    assert google_dns.ipv4_addresses == ['8.8.8.8']
535    assert google_dns.is_reachable
536    assert google_dns.port(53).is_reachable
537    assert not google_dns.port(666).is_reachable
538
539    google_addr = host.addr('google.com')
540    assert google_addr.is_resolvable
541    assert google_addr.is_reachable
542    assert google_addr.port(443).is_reachable
543    assert not google_addr.port(666).is_reachable
544
545    for ip in google_addr.ipv4_addresses:
546        assert isinstance(ip_address(ip), IPv4Address)
547
548    for ip in google_addr.ipv6_addresses:
549        assert isinstance(ip_address(ip), IPv6Address)
550
551    for ip in google_addr.ip_addresses:
552        assert isinstance(ip_address(ip), (IPv4Address, IPv6Address))
553