1import base64 2import struct 3 4import pytest 5import six 6from twisted.internet import defer, reactor 7 8import consul 9import consul.twisted 10 11Check = consul.Check 12 13 14def sleep(seconds): 15 """ 16 An asynchronous sleep function using twsited. Source: 17 http://twistedmatrix.com/pipermail/twisted-python/2009-October/020788.html 18 19 :type seconds: float 20 """ 21 d = defer.Deferred() 22 reactor.callLater(seconds, d.callback, seconds) 23 return d 24 25 26class TestConsul(object): 27 @pytest.inlineCallbacks 28 def test_kv(self, consul_port): 29 c = consul.twisted.Consul(port=consul_port) 30 index, data = yield c.kv.get('foo') 31 assert data is None 32 response = yield c.kv.put('foo', 'bar') 33 assert response is True 34 index, data = yield c.kv.get('foo') 35 assert data['Value'] == six.b('bar') 36 37 @pytest.inlineCallbacks 38 def test_kv_binary(self, consul_port): 39 c = consul.twisted.Consul(port=consul_port) 40 yield c.kv.put('foo', struct.pack('i', 1000)) 41 index, data = yield c.kv.get('foo') 42 assert struct.unpack('i', data['Value']) == (1000,) 43 44 @pytest.inlineCallbacks 45 def test_kv_missing(self, consul_port): 46 c = consul.twisted.Consul(port=consul_port) 47 reactor.callLater(2.0 / 100, c.kv.put, 'foo', 'bar') 48 yield c.kv.put('index', 'bump') 49 index, data = yield c.kv.get('foo') 50 assert data is None 51 index, data = yield c.kv.get('foo', index=index) 52 assert data['Value'] == six.b('bar') 53 54 @pytest.inlineCallbacks 55 def test_kv_put_flags(self, consul_port): 56 c = consul.twisted.Consul(port=consul_port) 57 yield c.kv.put('foo', 'bar') 58 index, data = yield c.kv.get('foo') 59 assert data['Flags'] == 0 60 61 response = yield c.kv.put('foo', 'bar', flags=50) 62 assert response is True 63 index, data = yield c.kv.get('foo') 64 assert data['Flags'] == 50 65 66 @pytest.inlineCallbacks 67 def test_kv_delete(self, consul_port): 68 c = consul.twisted.Consul(port=consul_port) 69 yield c.kv.put('foo1', '1') 70 yield c.kv.put('foo2', '2') 71 yield c.kv.put('foo3', '3') 72 index, data = yield c.kv.get('foo', recurse=True) 73 assert [x['Key'] for x in data] == ['foo1', 'foo2', 'foo3'] 74 75 response = yield c.kv.delete('foo2') 76 assert response is True 77 index, data = yield c.kv.get('foo', recurse=True) 78 assert [x['Key'] for x in data] == ['foo1', 'foo3'] 79 response = yield c.kv.delete('foo', recurse=True) 80 assert response is True 81 index, data = yield c.kv.get('foo', recurse=True) 82 assert data is None 83 84 @pytest.inlineCallbacks 85 def test_kv_subscribe(self, consul_port): 86 c = consul.twisted.Consul(port=consul_port) 87 88 @defer.inlineCallbacks 89 def put(): 90 response = yield c.kv.put('foo', 'bar') 91 assert response is True 92 93 reactor.callLater(1.0 / 100, put) 94 index, data = yield c.kv.get('foo') 95 assert data is None 96 index, data = yield c.kv.get('foo', index=index) 97 assert data['Value'] == six.b('bar') 98 99 @pytest.inlineCallbacks 100 def test_transaction(self, consul_port): 101 c = consul.twisted.Consul(port=consul_port) 102 value = base64.b64encode(b"1").decode("utf8") 103 d = {"KV": {"Verb": "set", "Key": "asdf", "Value": value}} 104 r = yield c.txn.put([d]) 105 assert r["Errors"] is None 106 107 d = {"KV": {"Verb": "get", "Key": "asdf"}} 108 r = yield c.txn.put([d]) 109 assert r["Results"][0]["KV"]["Value"] == value 110 111 @pytest.inlineCallbacks 112 def test_agent_services(self, consul_port): 113 c = consul.twisted.Consul(port=consul_port) 114 services = yield c.agent.services() 115 assert services == {} 116 response = yield c.agent.service.register('foo') 117 assert response is True 118 services = yield c.agent.services() 119 assert services == { 120 'foo': { 121 'Port': 0, 122 'ID': 'foo', 123 'CreateIndex': 0, 124 'ModifyIndex': 0, 125 'EnableTagOverride': False, 126 'Service': 'foo', 127 'Tags': [], 128 'Meta': {}, 129 'Address': ''} 130 } 131 response = yield c.agent.service.deregister('foo') 132 assert response is True 133 services = yield c.agent.services() 134 assert services == {} 135 136 @pytest.inlineCallbacks 137 def test_catalog(self, consul_port): 138 c = consul.twisted.Consul(port=consul_port) 139 140 @defer.inlineCallbacks 141 def register(): 142 response = yield c.catalog.register('n1', '10.1.10.11') 143 assert response is True 144 yield sleep(50 / 1000.0) 145 response = yield c.catalog.deregister('n1') 146 assert response is True 147 148 reactor.callLater(1.0 / 100, register) 149 150 index, nodes = yield c.catalog.nodes() 151 assert len(nodes) == 1 152 current = nodes[0] 153 154 index, nodes = yield c.catalog.nodes(index=index) 155 nodes.remove(current) 156 assert [x['Node'] for x in nodes] == ['n1'] 157 158 index, nodes = yield c.catalog.nodes(index=index) 159 nodes.remove(current) 160 assert [x['Node'] for x in nodes] == [] 161 162 @pytest.inlineCallbacks 163 def test_health_service(self, consul_port): 164 c = consul.twisted.Consul(port=consul_port) 165 166 # check there are no nodes for the service 'foo' 167 index, nodes = yield c.health.service('foo') 168 assert nodes == [] 169 170 # register two nodes, one with a long ttl, the other shorter 171 yield c.agent.service.register( 172 'foo', service_id='foo:1', check=Check.ttl('10s')) 173 yield c.agent.service.register( 174 'foo', service_id='foo:2', check=Check.ttl('100ms')) 175 176 yield sleep(1.0) 177 178 # check the nodes show for the /health/service endpoint 179 index, nodes = yield c.health.service('foo') 180 assert [node['Service']['ID'] for node in nodes] == \ 181 ['foo:1', 'foo:2'] 182 183 # but that they aren't passing their health check 184 index, nodes = yield c.health.service('foo', passing=True) 185 assert nodes == [] 186 187 # ping the two node's health check 188 yield c.agent.check.ttl_pass('service:foo:1') 189 yield c.agent.check.ttl_pass('service:foo:2') 190 191 yield sleep(0.05) 192 193 # both nodes are now available 194 index, nodes = yield c.health.service('foo', passing=True) 195 assert [node['Service']['ID'] for node in nodes] == \ 196 ['foo:1', 'foo:2'] 197 198 # wait until the short ttl node fails 199 yield sleep(0.5) 200 201 # only one node available 202 index, nodes = yield c.health.service('foo', passing=True) 203 assert [node['Service']['ID'] for node in nodes] == ['foo:1'] 204 205 # ping the failed node's health check 206 yield c.agent.check.ttl_pass('service:foo:2') 207 208 yield sleep(0.05) 209 210 # check both nodes are available 211 index, nodes = yield c.health.service('foo', passing=True) 212 assert [node['Service']['ID'] for node in nodes] == \ 213 ['foo:1', 'foo:2'] 214 215 # deregister the nodes 216 yield c.agent.service.deregister('foo:1') 217 yield c.agent.service.deregister('foo:2') 218 219 yield sleep(2) 220 index, nodes = yield c.health.service('foo') 221 assert nodes == [] 222 223 @pytest.inlineCallbacks 224 def test_health_service_subscribe(self, consul_port): 225 c = consul.twisted.Consul(port=consul_port) 226 227 class Config(object): 228 def __init__(self): 229 self.nodes = [] 230 self.index = None 231 232 @defer.inlineCallbacks 233 def update(self): 234 self.index, nodes = yield c.health.service( 235 'foo', index=None, passing=True) 236 self.nodes = [node['Service']['ID'] for node in nodes] 237 238 config = Config() 239 yield c.agent.service.register( 240 'foo', service_id='foo:1', check=Check.ttl('40ms')) 241 yield config.update() 242 assert config.nodes == [] 243 244 # ping the service's health check 245 yield c.agent.check.ttl_pass('service:foo:1') 246 yield config.update() 247 assert config.nodes == ['foo:1'] 248 249 # the service should fail 250 yield sleep(0.8) 251 yield config.update() 252 assert config.nodes == [] 253 254 yield c.agent.service.deregister('foo:1') 255 256 @pytest.inlineCallbacks 257 def test_session(self, consul_port): 258 c = consul.twisted.Consul(port=consul_port) 259 260 index, services = yield c.session.list() 261 assert services == [] 262 263 session_id = yield c.session.create() 264 index, services = yield c.session.list(index=index) 265 assert len(services) 266 267 response = yield c.session.destroy(session_id) 268 assert response is True 269 270 index, services = yield c.session.list(index=index) 271 assert services == [] 272 273 @pytest.inlineCallbacks 274 def test_acl(self, acl_consul): 275 c = consul.twisted.Consul( 276 port=acl_consul.port, token=acl_consul.token) 277 278 rules = """ 279 key "" { 280 policy = "read" 281 } 282 key "private/" { 283 policy = "deny" 284 } 285 """ 286 token = yield c.acl.create(rules=rules) 287 288 raised = False 289 try: 290 yield c.acl.list(token=token) 291 except consul.ACLPermissionDenied: 292 raised = True 293 assert raised 294 295 destroyed = yield c.acl.destroy(token) 296 assert destroyed is True 297