1import os 2import pytest 3import json 4import responses 5import tests.common as common 6 7# integration tests requires nomad Vagrant VM or Binary running 8def test_register_job(nomad_setup): 9 10 with open("example.json") as fh: 11 job = json.loads(fh.read()) 12 nomad_setup.jobs.register_job(job) 13 assert "example" in nomad_setup.jobs 14 15 16def test_get_jobs(nomad_setup): 17 assert isinstance(nomad_setup.jobs.get_jobs(), list) == True 18 19 20def test_get_jobs_prefix(nomad_setup): 21 nomad_setup.jobs.get_jobs(prefix="ex") 22 23 24@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 8, 3), reason="Not supported in version") 25def test_parse_job(nomad_setup): 26 with open("example.nomad") as fh: 27 hcl = fh.read() 28 json_dict = nomad_setup.jobs.parse(hcl) 29 assert json_dict["Name"] == "example" 30 assert json_dict["Type"] == "service" 31 32 33def test_dunder_getitem_exist(nomad_setup): 34 j = nomad_setup.jobs["example"] 35 assert isinstance(j, dict) 36 37 38def test_dunder_getitem_not_exist(nomad_setup): 39 40 with pytest.raises(KeyError): 41 j = nomad_setup.jobs["redis"] 42 43 44def test_dunder_contain_exists(nomad_setup): 45 assert "example" in nomad_setup.jobs 46 47 48def test_dunder_contain_not_exist(nomad_setup): 49 assert "redis" not in nomad_setup.jobs 50 51 52def test_dunder_str(nomad_setup): 53 assert isinstance(str(nomad_setup.jobs), str) 54 55 56def test_dunder_repr(nomad_setup): 57 assert isinstance(repr(nomad_setup.jobs), str) 58 59 60def test_dunder_getattr(nomad_setup): 61 62 with pytest.raises(AttributeError): 63 d = nomad_setup.jobs.does_not_exist 64 65 66def test_dunder_iter(nomad_setup): 67 assert hasattr(nomad_setup.jobs, '__iter__') 68 for j in nomad_setup.jobs: 69 pass 70 71 72def test_dunder_len(nomad_setup): 73 assert len(nomad_setup.jobs) >= 0 74 75@responses.activate 76# 77# fix No data when you are using namespaces #82 78# 79def test_get_jobs_with_namespace(nomad_setup_with_namespace): 80 responses.add( 81 responses.GET, 82 "http://{ip}:{port}/v1/jobs?namespace={namespace}".format(ip=common.IP, port=common.NOMAD_PORT, namespace=common.NOMAD_NAMESPACE), 83 status=200, 84 json=[{"Region": "global","ID": "my-job", "ParentID": "", "Name": "my-job","Namespace": common.NOMAD_NAMESPACE, "Type": "batch", "Priority": 50}] 85 ) 86 assert common.NOMAD_NAMESPACE in nomad_setup_with_namespace.jobs.get_jobs()[0]["Namespace"] 87