1# coding: utf-8 2# Licensed under the Apache License, Version 2.0 (the "License"); 3# you may not use this file except in compliance with the License. 4# You may obtain a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11# See the License for the specific language governing permissions and 12# limitations under the License. 13from __future__ import unicode_literals 14 15import crypt 16import datetime 17import re 18import time 19 20import pytest 21 22from ipaddress import ip_address 23from ipaddress import IPv4Address 24from ipaddress import IPv6Address 25 26from testinfra.modules.socket import parse_socketspec 27 28all_images = pytest.mark.testinfra_hosts(*[ 29 "docker://{}".format(image) 30 for image in ( 31 "alpine", "archlinux", "centos_6", "centos_7", 32 "debian_stretch", "ubuntu_xenial" 33 ) 34]) 35 36 37@all_images 38def test_package(host, docker_image): 39 assert not host.package('zsh').is_installed 40 if docker_image in ("alpine", "archlinux"): 41 name = "openssh" 42 else: 43 name = "openssh-server" 44 45 ssh = host.package(name) 46 version = { 47 "alpine": "7.", 48 "archlinux": "8.", 49 "centos_6": "5.", 50 "centos_7": "7.", 51 "debian_stretch": "1:7.4", 52 "ubuntu_xenial": "1:7.2" 53 }[docker_image] 54 assert ssh.is_installed 55 assert ssh.version.startswith(version) 56 release = { 57 "alpine": "r5", 58 "archlinux": None, 59 "centos_6": ".el6", 60 "centos_7": ".el7", 61 "debian_stretch": None, 62 "ubuntu_xenial": None 63 }[docker_image] 64 if release is None: 65 with pytest.raises(NotImplementedError): 66 ssh.release 67 else: 68 assert release in ssh.release 69 70 71def test_held_package(host): 72 python = host.package("python") 73 assert python.is_installed 74 assert python.version.startswith("2.7.") 75 76 77@pytest.mark.destructive 78def test_uninstalled_package_version(host): 79 with pytest.raises(AssertionError) as excinfo: 80 host.package('zsh').version 81 assert 'Unexpected exit code 1 for CommandResult' in str(excinfo.value) 82 assert host.package('sudo').is_installed 83 host.check_output('apt-get -y remove sudo') 84 assert not host.package('sudo').is_installed 85 with pytest.raises(AssertionError) as excinfo: 86 host.package('sudo').version 87 assert ('The package sudo is not installed, dpkg-query output: ' 88 'deinstall ok config-files 1.8.') in str(excinfo.value) 89 90 91@all_images 92def test_systeminfo(host, docker_image): 93 assert host.system_info.type == "linux" 94 95 release, distribution, codename = { 96 "alpine": (r"^3\.9\.", "alpine", None), 97 "archlinux": ("rolling", "arch", None), 98 "centos_6": (r"^6", "CentOS", None), 99 "centos_7": (r"^7$", "centos", None), 100 "debian_stretch": (r"^9\.", "debian", "stretch"), 101 "ubuntu_xenial": (r"^16\.04$", "ubuntu", "xenial") 102 }[docker_image] 103 104 assert host.system_info.distribution == distribution 105 assert host.system_info.codename == codename 106 assert re.match(release, host.system_info.release) 107 108 109@all_images 110def test_ssh_service(host, docker_image): 111 if docker_image in ("centos_6", "centos_7", 112 "alpine", "archlinux"): 113 name = "sshd" 114 else: 115 name = "ssh" 116 117 ssh = host.service(name) 118 if docker_image == "ubuntu_xenial": 119 assert not ssh.is_running 120 else: 121 # wait at max 10 seconds for ssh is running 122 for _ in range(10): 123 if ssh.is_running: 124 break 125 time.sleep(1) 126 else: 127 if docker_image == "archlinux": 128 raise pytest.skip('FIXME: flapping test') 129 raise AssertionError('ssh is not running') 130 131 if docker_image == "ubuntu_xenial": 132 assert not ssh.is_enabled 133 else: 134 assert ssh.is_enabled 135 136 137@pytest.mark.parametrize("name,running,enabled", [ 138 ("ntp", False, True), 139 ("salt-minion", False, False), 140]) 141def test_service(host, name, running, enabled): 142 service = host.service(name) 143 assert service.is_running == running 144 assert service.is_enabled == enabled 145 146 147def test_salt(host): 148 ssh_version = host.salt("pkg.version", "openssh-server", local=True) 149 assert ssh_version.startswith("1:7.4") 150 151 152def test_puppet_resource(host): 153 resource = host.puppet_resource("package", "openssh-server") 154 assert resource["openssh-server"]["ensure"].startswith("1:7.4") 155 156 157def test_facter(host): 158 assert host.facter()["lsbdistcodename"] == "stretch" 159 assert host.facter("lsbdistcodename") == { 160 "lsbdistcodename": "stretch", 161 } 162 163 164def test_sysctl(host): 165 assert host.sysctl("kernel.hostname") == host.check_output("hostname") 166 assert isinstance(host.sysctl("kernel.panic"), int) 167 168 169def test_parse_socketspec(): 170 assert parse_socketspec("tcp://22") == ("tcp", None, 22) 171 assert parse_socketspec("tcp://:::22") == ("tcp", "::", 22) 172 assert parse_socketspec("udp://0.0.0.0:22") == ("udp", "0.0.0.0", 22) 173 assert parse_socketspec("unix://can:be.any/thing:22") == ( 174 "unix", "can:be.any/thing:22", None) 175 176 177def test_socket(host): 178 listening = host.socket.get_listening_sockets() 179 for spec in ( 180 "tcp://0.0.0.0:22", 181 "tcp://:::22", 182 "unix:///run/systemd/private", 183 ): 184 assert spec in listening 185 for spec in ( 186 "tcp://22", 187 "tcp://0.0.0.0:22", 188 "tcp://127.0.0.1:22", 189 "tcp://:::22", 190 "tcp://::1:22", 191 "unix:///run/systemd/private", 192 ): 193 socket = host.socket(spec) 194 assert socket.is_listening 195 196 assert not host.socket("tcp://4242").is_listening 197 198 if not host.backend.get_connection_type() == "docker": 199 # FIXME 200 for spec in ( 201 "tcp://22", 202 "tcp://0.0.0.0:22", 203 ): 204 assert len(host.socket(spec).clients) >= 1 205 206 207@all_images 208def test_process(host, docker_image): 209 init = host.process.get(pid=1) 210 assert init.ppid == 0 211 if docker_image != "alpine": 212 # busybox ps doesn't have a euid equivalent 213 assert init.euid == 0 214 assert init.user == "root" 215 216 args, comm = { 217 "alpine": ("/sbin/init", "init"), 218 "archlinux": ("/usr/sbin/init", "systemd"), 219 "centos_6": ("/usr/sbin/sshd -D", "sshd"), 220 "centos_7": ("/usr/sbin/init", "systemd"), 221 "debian_stretch": ("/sbin/init", "systemd"), 222 "ubuntu_xenial": ("/sbin/init", "systemd") 223 }[docker_image] 224 assert init.args == args 225 assert init.comm == comm 226 227 228def test_user(host): 229 user = host.user("sshd") 230 assert user.exists 231 assert user.name == "sshd" 232 assert user.uid == 106 233 assert user.gid == 65534 234 assert user.group == "nogroup" 235 assert user.gids == [65534] 236 assert user.groups == ["nogroup"] 237 assert user.shell == "/usr/sbin/nologin" 238 assert user.home == "/run/sshd" 239 assert user.password == "*" 240 241 242def test_user_user(host): 243 user = host.user("user") 244 assert user.exists 245 assert user.gecos == "gecos.comment" 246 247 248def test_user_expiration_date(host): 249 assert host.user("root").expiration_date is None 250 assert host.user("user").expiration_date == ( 251 datetime.datetime(2024, 10, 4, 0, 0)) 252 253 254def test_nonexistent_user(host): 255 assert not host.user("zzzzzzzzzz").exists 256 257 258def test_current_user(host): 259 assert host.user().name == "root" 260 pw = host.user().password 261 assert crypt.crypt("foo", pw) == pw 262 263 264def test_group(host): 265 assert host.group("root").exists 266 assert host.group("root").gid == 0 267 268 269def test_empty_command_output(host): 270 assert host.check_output("printf ''") == "" 271 272 273def test_local_command(host): 274 assert host.get_host("local://").check_output("true") == "" 275 276 277def test_file(host): 278 host.check_output("mkdir -p /d && printf foo > /d/f && chmod 600 /d/f") 279 d = host.file("/d") 280 assert d.is_directory 281 assert not d.is_file 282 f = host.file("/d/f") 283 assert f.exists 284 assert f.is_file 285 assert f.content == b"foo" 286 assert f.content_string == "foo" 287 assert f.user == "root" 288 assert f.uid == 0 289 assert f.gid == 0 290 assert f.group == "root" 291 assert f.mode == 0o600 292 assert f.contains("fo") 293 assert not f.is_directory 294 assert not f.is_symlink 295 assert not f.is_pipe 296 assert f.linked_to == "/d/f" 297 assert f.size == 3 298 assert f.md5sum == "acbd18db4cc2f85cedef654fccc4a4d8" 299 assert f.sha256sum == ( 300 "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae" 301 ) 302 host.check_output("ln -fsn /d/f /d/l") 303 link = host.file("/d/l") 304 assert link.is_symlink 305 assert link.is_file 306 assert link.linked_to == "/d/f" 307 assert link.linked_to == f 308 assert f == host.file('/d/f') 309 assert not d == f 310 311 host.check_output("rm -f /d/p && mkfifo /d/p") 312 assert host.file("/d/p").is_pipe 313 314 315def test_ansible_unavailable(host): 316 expected = ('Ansible module is only available with ' 317 'ansible connection backend') 318 with pytest.raises(RuntimeError) as excinfo: 319 host.ansible("setup") 320 assert expected in str(excinfo.value) 321 with pytest.raises(RuntimeError) as excinfo: 322 host.ansible.get_variables() 323 assert expected in str(excinfo.value) 324 325 326@pytest.mark.testinfra_hosts("ansible://debian_stretch") 327def test_ansible_module(host): 328 setup = host.ansible("setup")["ansible_facts"] 329 assert setup["ansible_lsb"]["codename"] == "stretch" 330 passwd = host.ansible("file", "path=/etc/passwd state=file") 331 assert passwd["changed"] is False 332 assert passwd["gid"] == 0 333 assert passwd["group"] == "root" 334 assert passwd["mode"] == "0644" 335 assert passwd["owner"] == "root" 336 assert isinstance(passwd["size"], int) 337 assert passwd["path"] == "/etc/passwd" 338 # seems to vary with differents docker fs backend 339 assert passwd["state"] in ("file", "hard") 340 assert passwd["uid"] == 0 341 342 variables = host.ansible.get_variables() 343 assert variables["myvar"] == "foo" 344 assert variables["myhostvar"] == "bar" 345 assert variables["mygroupvar"] == "qux" 346 assert variables["inventory_hostname"] == "debian_stretch" 347 assert variables["group_names"] == ["testgroup"] 348 assert variables["groups"] == { 349 "all": ["debian_stretch"], 350 "testgroup": ["debian_stretch"], 351 } 352 353 with pytest.raises(host.ansible.AnsibleException) as excinfo: 354 host.ansible("command", "zzz") 355 assert excinfo.value.result['msg'] == \ 356 "Skipped. You might want to try check=False" 357 358 try: 359 host.ansible("command", "zzz", check=False) 360 except host.ansible.AnsibleException as exc: 361 assert exc.result['rc'] == 2 362 # notez que the debian stretch container is set to LANG=fr_FR 363 assert exc.result['msg'] == ('[Errno 2] Aucun fichier ou dossier ' 364 'de ce type') 365 366 result = host.ansible("command", "echo foo", check=False) 367 assert result['stdout'] == 'foo' 368 369 370@pytest.mark.testinfra_hosts("ansible://debian_stretch", 371 "ansible://user@debian_stretch") 372def test_ansible_module_become(host): 373 user_name = host.user().name 374 assert host.ansible('shell', 'echo $USER', 375 check=False)['stdout'] == user_name 376 assert host.ansible('shell', 'echo $USER', 377 check=False, become=True)['stdout'] == 'root' 378 379 with host.sudo(): 380 assert host.user().name == 'root' 381 assert host.ansible('shell', 'echo $USER', 382 check=False)['stdout'] == user_name 383 assert host.ansible('shell', 'echo $USER', 384 check=False, become=True)['stdout'] == 'root' 385 386 387@pytest.mark.destructive 388def test_supervisor(host): 389 # Wait supervisord is running 390 for _ in range(20): 391 if host.service("supervisor").is_running: 392 break 393 time.sleep(.5) 394 else: 395 raise RuntimeError("No running supervisor") 396 397 for _ in range(20): 398 service = host.supervisor("tail") 399 if service.status == "RUNNING": 400 break 401 else: 402 assert service.status == "STARTING" 403 time.sleep(.5) 404 else: 405 raise RuntimeError("No running tail in supervisor") 406 407 assert service.is_running 408 proc = host.process.get(pid=service.pid) 409 assert proc.comm == "tail" 410 411 services = host.supervisor.get_services() 412 assert len(services) == 1 413 assert services[0].name == "tail" 414 assert services[0].is_running 415 assert services[0].pid == service.pid 416 417 host.run("supervisorctl stop tail") 418 service = host.supervisor("tail") 419 assert not service.is_running 420 assert service.status == "STOPPED" 421 assert service.pid is None 422 423 host.run("service supervisor stop") 424 assert not host.service("supervisor").is_running 425 with pytest.raises(RuntimeError) as excinfo: 426 host.supervisor("tail").is_running 427 assert 'Is supervisor running' in str(excinfo.value) 428 429 430def test_mountpoint(host): 431 root_mount = host.mount_point('/') 432 assert root_mount.exists 433 assert isinstance(root_mount.options, list) 434 assert 'rw' in root_mount.options 435 assert root_mount.filesystem 436 437 fake_mount = host.mount_point('/fake/mount') 438 assert not fake_mount.exists 439 440 mountpoints = host.mount_point.get_mountpoints() 441 assert mountpoints 442 assert all(isinstance(m, host.mount_point) for m in mountpoints) 443 assert len([m for m in mountpoints if m.path == "/"]) == 1 444 445 446def test_sudo_from_root(host): 447 assert host.user().name == "root" 448 with host.sudo("user"): 449 assert host.user().name == "user" 450 assert host.user().name == "root" 451 452 453def test_sudo_fail_from_root(host): 454 assert host.user().name == "root" 455 with pytest.raises(AssertionError) as exc: 456 with host.sudo("unprivileged"): 457 assert host.user().name == "unprivileged" 458 host.check_output('ls /root/invalid') 459 assert str(exc.value).startswith('Unexpected exit code') 460 with host.sudo(): 461 assert host.user().name == "root" 462 463 464@pytest.mark.testinfra_hosts("docker://user@debian_stretch") 465def test_sudo_to_root(host): 466 assert host.user().name == "user" 467 with host.sudo(): 468 assert host.user().name == "root" 469 # Test nested sudo 470 with host.sudo("www-data"): 471 assert host.user().name == "www-data" 472 assert host.user().name == "user" 473 474 475def test_command_execution(host): 476 assert host.run("false").failed 477 assert host.run("true").succeeded 478 479 480def test_pip_package(host): 481 assert host.pip_package.get_packages()['pip']['version'] == '9.0.1' 482 pytest = host.pip_package.get_packages(pip_path='/v/bin/pip')['pytest'] 483 assert pytest['version'].startswith('2.') 484 outdated = host.pip_package.get_outdated_packages( 485 pip_path='/v/bin/pip')['pytest'] 486 assert outdated['current'] == pytest['version'] 487 assert int(outdated['latest'].split('.')[0]) > 2 488 489 490def test_environment_home(host): 491 assert host.environment().get('HOME') == '/root' 492 493 494def test_iptables(host): 495 ssh_rule_str = \ 496 '-A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT' 497 vip_redirect_rule_str = \ 498 '-A PREROUTING -d 192.168.0.1/32 -j REDIRECT' 499 rules = host.iptables.rules() 500 input_rules = host.iptables.rules('filter', 'INPUT') 501 nat_rules = host.iptables.rules('nat') 502 nat_prerouting_rules = host.iptables.rules('nat', 'PREROUTING') 503 assert ssh_rule_str in rules 504 assert ssh_rule_str in input_rules 505 assert vip_redirect_rule_str in nat_rules 506 assert vip_redirect_rule_str in nat_prerouting_rules 507 # test ip6tables call works; ipv6 setup is a whole huge thing, but 508 # ensure we at least see the headings 509 v6_rules = host.iptables.rules(version=6) 510 assert '-P INPUT ACCEPT' in v6_rules 511 assert '-P FORWARD ACCEPT' in v6_rules 512 assert '-P OUTPUT ACCEPT' in v6_rules 513 v6_filter_rules = host.iptables.rules('filter', 'INPUT', version=6) 514 assert '-P INPUT ACCEPT' in v6_filter_rules 515 516 517@all_images 518def test_addr(host): 519 non_resolvable = host.addr('some_non_resolvable_host') 520 assert not non_resolvable.is_resolvable 521 assert not non_resolvable.is_reachable 522 assert not non_resolvable.port(80).is_reachable 523 524 # Some arbitrary internal IP, hopefully non reachable 525 # IP addresses are always resolvable no matter what 526 non_reachable_ip = host.addr('10.42.13.73') 527 assert non_reachable_ip.is_resolvable 528 assert non_reachable_ip.ipv4_addresses == ['10.42.13.73'] 529 assert not non_reachable_ip.is_reachable 530 assert not non_reachable_ip.port(80).is_reachable 531 532 google_dns = host.addr('8.8.8.8') 533 assert google_dns.is_resolvable 534 assert google_dns.ipv4_addresses == ['8.8.8.8'] 535 assert google_dns.is_reachable 536 assert google_dns.port(53).is_reachable 537 assert not google_dns.port(666).is_reachable 538 539 google_addr = host.addr('google.com') 540 assert google_addr.is_resolvable 541 assert google_addr.is_reachable 542 assert google_addr.port(443).is_reachable 543 assert not google_addr.port(666).is_reachable 544 545 for ip in google_addr.ipv4_addresses: 546 assert isinstance(ip_address(ip), IPv4Address) 547 548 for ip in google_addr.ipv6_addresses: 549 assert isinstance(ip_address(ip), IPv6Address) 550 551 for ip in google_addr.ip_addresses: 552 assert isinstance(ip_address(ip), (IPv4Address, IPv6Address)) 553