1import os
2import sys
3import time
4
5import pytest
6from qcelemental.models import AtomicInput
7
8from qcengine import util
9from qcengine.exceptions import InputError
10
11
12def test_model_wrapper():
13
14    with pytest.raises(InputError):
15        util.model_wrapper({"bad": "yup"}, AtomicInput)
16
17
18def test_compute_wrapper_capture():
19
20    oldout = id(sys.stdout)
21    with util.compute_wrapper(capture_output=True):
22
23        assert id(sys.stdout) != id(oldout)
24
25    assert id(sys.stdout) == oldout
26
27
28def test_compute_wrapper_capture_exception():
29
30    oldout = id(sys.stdout)
31    with util.compute_wrapper(capture_output=True) as metadata:
32
33        assert id(sys.stdout) != id(oldout)
34
35        raise KeyError("Hello there!")
36
37    assert id(sys.stdout) == oldout
38
39    assert metadata["success"] is False
40    assert metadata["error_type"] == "unknown_error"
41
42
43def test_terminate():
44
45    t = time.time()
46    with util.popen(["sleep", "30"]) as proc:
47
48        util.terminate_process(proc["proc"])
49
50    assert (time.time() - t) < 1
51
52
53def test_tmpdir():
54
55    with util.temporary_directory(child="this") as tmpdir:
56        assert str(tmpdir).split(os.path.sep)[-1] == "this"
57
58    with util.temporary_directory() as parentdir:
59        with util.temporary_directory(parent=parentdir, child="this") as tmpdir:
60            assert str(tmpdir).split(os.path.sep)[-1] == "this"
61            assert str(tmpdir).rsplit(os.path.sep, 1)[0] == str(parentdir)
62
63    with util.temporary_directory(suffix="this") as tmpdir:
64        assert str(tmpdir).split(os.path.sep)[-1].endswith("this")
65
66
67def test_disk_files():
68
69    infiles = {"thing1": "hello", "thing2": "world", "other": "everyone"}
70    outfiles = {"thing*": None, "other": None}
71    with util.temporary_directory(suffix="this") as tmpdir:
72        with util.disk_files(infiles=infiles, outfiles=outfiles, cwd=tmpdir):
73            pass
74
75    assert outfiles.keys() == {"thing*", "other"}
76    assert outfiles["thing*"]["thing1"] == "hello"
77    assert outfiles["other"] == "everyone"
78
79
80def test_popen_tee_output(capsys):
81    # Test without passing
82    with util.popen(["echo", "hello"]) as proc:
83        proc["proc"].wait()
84    assert proc["stdout"].strip() == "hello"
85
86    # Test with passing
87    with util.popen(["echo", "hello"], pass_output_forward=True) as proc:
88        proc["proc"].wait()
89    assert proc["stdout"] == "hello\n"
90    captured = capsys.readouterr()
91    assert captured.out == "hello\n"
92