1import json 2import pytest 3import os 4import responses 5import tests.common as common 6 7 8def test_register_job(nomad_setup): 9 10 with open("example.json") as fh: 11 job = json.loads(fh.read()) 12 nomad_setup.job.register_job("example", job) 13 assert "example" in nomad_setup.job 14 15 16# integration tests requires nomad Vagrant VM or Binary running 17@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 18def test_get_evaluation(nomad_setup): 19 assert "example" == nomad_setup.deployments.get_deployments()[0]["JobID"] 20 21 22@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 23def test_get_deployments_prefix(nomad_setup): 24 deployments = nomad_setup.deployments.get_deployments() 25 prefix = deployments[0]["ID"][:4] 26 nomad_setup.deployments.get_deployments(prefix=prefix) 27 28 29@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 30def test_dunder_getitem_exist(nomad_setup): 31 jobID = nomad_setup.deployments.get_deployments()[0]["ID"] 32 d = nomad_setup.deployment[jobID] 33 assert isinstance(d, dict) 34 35 36@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 37def test_dunder_getitem_not_exist(nomad_setup): 38 with pytest.raises(KeyError): 39 _ = nomad_setup.deployments["nope"] 40 41 42@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 43def test_dunder_contain_exists(nomad_setup): 44 jobID = nomad_setup.deployments.get_deployments()[0]["ID"] 45 assert jobID in nomad_setup.deployments 46 47 48@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 49def test_dunder_contain_not_exist(nomad_setup): 50 assert "nope" not in nomad_setup.deployments 51 52 53@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 54def test_dunder_len(nomad_setup): 55 assert len(nomad_setup.deployments) == 1 56 57 58@pytest.mark.skipif(tuple(int(i) for i in os.environ.get("NOMAD_VERSION").split(".")) < (0, 6, 0), reason="Not supported in version") 59def test_dunder_iter(nomad_setup): 60 for d in nomad_setup.deployments: 61 pass 62 63 64def test_dunder_str(nomad_setup): 65 assert isinstance(str(nomad_setup.deployments), str) 66 67 68def test_dunder_repr(nomad_setup): 69 assert isinstance(repr(nomad_setup.deployments), str) 70 71 72def test_dunder_getattr(nomad_setup): 73 74 with pytest.raises(AttributeError): 75 _ = nomad_setup.deployments.does_not_exist 76 77@responses.activate 78# 79# fix No data when you are using namespaces #82 80# 81def test_get_deployments_with_namespace(nomad_setup_with_namespace): 82 responses.add( 83 responses.GET, 84 "http://{ip}:{port}/v1/deployments?namespace={namespace}".format(ip=common.IP, port=common.NOMAD_PORT, namespace=common.NOMAD_NAMESPACE), 85 status=200, 86 json=[{"ID": "70638f62-5c19-193e-30d6-f9d6e689ab8e","JobID": "example", "JobVersion": 1, "JobModifyIndex": 17, "JobSpecModifyIndex": 17, "JobCreateIndex": 7,"Namespace": common.NOMAD_NAMESPACE, "Name": "example.cache[0]"}] 87 ) 88 assert common.NOMAD_NAMESPACE in nomad_setup_with_namespace.deployments.get_deployments()[0]["Namespace"] 89