1import base64
2import struct
3
4import pytest
5import six
6from twisted.internet import defer, reactor
7
8import consul
9import consul.twisted
10
11Check = consul.Check
12
13
14def sleep(seconds):
15    """
16    An asynchronous sleep function using twsited. Source:
17    http://twistedmatrix.com/pipermail/twisted-python/2009-October/020788.html
18
19    :type seconds: float
20    """
21    d = defer.Deferred()
22    reactor.callLater(seconds, d.callback, seconds)
23    return d
24
25
26class TestConsul(object):
27    @pytest.inlineCallbacks
28    def test_kv(self, consul_port):
29        c = consul.twisted.Consul(port=consul_port)
30        index, data = yield c.kv.get('foo')
31        assert data is None
32        response = yield c.kv.put('foo', 'bar')
33        assert response is True
34        index, data = yield c.kv.get('foo')
35        assert data['Value'] == six.b('bar')
36
37    @pytest.inlineCallbacks
38    def test_kv_binary(self, consul_port):
39        c = consul.twisted.Consul(port=consul_port)
40        yield c.kv.put('foo', struct.pack('i', 1000))
41        index, data = yield c.kv.get('foo')
42        assert struct.unpack('i', data['Value']) == (1000,)
43
44    @pytest.inlineCallbacks
45    def test_kv_missing(self, consul_port):
46        c = consul.twisted.Consul(port=consul_port)
47        reactor.callLater(2.0 / 100, c.kv.put, 'foo', 'bar')
48        yield c.kv.put('index', 'bump')
49        index, data = yield c.kv.get('foo')
50        assert data is None
51        index, data = yield c.kv.get('foo', index=index)
52        assert data['Value'] == six.b('bar')
53
54    @pytest.inlineCallbacks
55    def test_kv_put_flags(self, consul_port):
56        c = consul.twisted.Consul(port=consul_port)
57        yield c.kv.put('foo', 'bar')
58        index, data = yield c.kv.get('foo')
59        assert data['Flags'] == 0
60
61        response = yield c.kv.put('foo', 'bar', flags=50)
62        assert response is True
63        index, data = yield c.kv.get('foo')
64        assert data['Flags'] == 50
65
66    @pytest.inlineCallbacks
67    def test_kv_delete(self, consul_port):
68        c = consul.twisted.Consul(port=consul_port)
69        yield c.kv.put('foo1', '1')
70        yield c.kv.put('foo2', '2')
71        yield c.kv.put('foo3', '3')
72        index, data = yield c.kv.get('foo', recurse=True)
73        assert [x['Key'] for x in data] == ['foo1', 'foo2', 'foo3']
74
75        response = yield c.kv.delete('foo2')
76        assert response is True
77        index, data = yield c.kv.get('foo', recurse=True)
78        assert [x['Key'] for x in data] == ['foo1', 'foo3']
79        response = yield c.kv.delete('foo', recurse=True)
80        assert response is True
81        index, data = yield c.kv.get('foo', recurse=True)
82        assert data is None
83
84    @pytest.inlineCallbacks
85    def test_kv_subscribe(self, consul_port):
86        c = consul.twisted.Consul(port=consul_port)
87
88        @defer.inlineCallbacks
89        def put():
90            response = yield c.kv.put('foo', 'bar')
91            assert response is True
92
93        reactor.callLater(1.0 / 100, put)
94        index, data = yield c.kv.get('foo')
95        assert data is None
96        index, data = yield c.kv.get('foo', index=index)
97        assert data['Value'] == six.b('bar')
98
99    @pytest.inlineCallbacks
100    def test_transaction(self, consul_port):
101        c = consul.twisted.Consul(port=consul_port)
102        value = base64.b64encode(b"1").decode("utf8")
103        d = {"KV": {"Verb": "set", "Key": "asdf", "Value": value}}
104        r = yield c.txn.put([d])
105        assert r["Errors"] is None
106
107        d = {"KV": {"Verb": "get", "Key": "asdf"}}
108        r = yield c.txn.put([d])
109        assert r["Results"][0]["KV"]["Value"] == value
110
111    @pytest.inlineCallbacks
112    def test_agent_services(self, consul_port):
113        c = consul.twisted.Consul(port=consul_port)
114        services = yield c.agent.services()
115        assert services == {}
116        response = yield c.agent.service.register('foo')
117        assert response is True
118        services = yield c.agent.services()
119        assert services == {
120            'foo': {
121                'Port': 0,
122                'ID': 'foo',
123                'CreateIndex': 0,
124                'ModifyIndex': 0,
125                'EnableTagOverride': False,
126                'Service': 'foo',
127                'Tags': [],
128                'Meta': {},
129                'Address': ''}
130        }
131        response = yield c.agent.service.deregister('foo')
132        assert response is True
133        services = yield c.agent.services()
134        assert services == {}
135
136    @pytest.inlineCallbacks
137    def test_catalog(self, consul_port):
138        c = consul.twisted.Consul(port=consul_port)
139
140        @defer.inlineCallbacks
141        def register():
142            response = yield c.catalog.register('n1', '10.1.10.11')
143            assert response is True
144            yield sleep(50 / 1000.0)
145            response = yield c.catalog.deregister('n1')
146            assert response is True
147
148        reactor.callLater(1.0 / 100, register)
149
150        index, nodes = yield c.catalog.nodes()
151        assert len(nodes) == 1
152        current = nodes[0]
153
154        index, nodes = yield c.catalog.nodes(index=index)
155        nodes.remove(current)
156        assert [x['Node'] for x in nodes] == ['n1']
157
158        index, nodes = yield c.catalog.nodes(index=index)
159        nodes.remove(current)
160        assert [x['Node'] for x in nodes] == []
161
162    @pytest.inlineCallbacks
163    def test_health_service(self, consul_port):
164        c = consul.twisted.Consul(port=consul_port)
165
166        # check there are no nodes for the service 'foo'
167        index, nodes = yield c.health.service('foo')
168        assert nodes == []
169
170        # register two nodes, one with a long ttl, the other shorter
171        yield c.agent.service.register(
172            'foo', service_id='foo:1', check=Check.ttl('10s'))
173        yield c.agent.service.register(
174            'foo', service_id='foo:2', check=Check.ttl('100ms'))
175
176        yield sleep(1.0)
177
178        # check the nodes show for the /health/service endpoint
179        index, nodes = yield c.health.service('foo')
180        assert [node['Service']['ID'] for node in nodes] == \
181               ['foo:1', 'foo:2']
182
183        # but that they aren't passing their health check
184        index, nodes = yield c.health.service('foo', passing=True)
185        assert nodes == []
186
187        # ping the two node's health check
188        yield c.agent.check.ttl_pass('service:foo:1')
189        yield c.agent.check.ttl_pass('service:foo:2')
190
191        yield sleep(0.05)
192
193        # both nodes are now available
194        index, nodes = yield c.health.service('foo', passing=True)
195        assert [node['Service']['ID'] for node in nodes] == \
196               ['foo:1', 'foo:2']
197
198        # wait until the short ttl node fails
199        yield sleep(0.5)
200
201        # only one node available
202        index, nodes = yield c.health.service('foo', passing=True)
203        assert [node['Service']['ID'] for node in nodes] == ['foo:1']
204
205        # ping the failed node's health check
206        yield c.agent.check.ttl_pass('service:foo:2')
207
208        yield sleep(0.05)
209
210        # check both nodes are available
211        index, nodes = yield c.health.service('foo', passing=True)
212        assert [node['Service']['ID'] for node in nodes] == \
213               ['foo:1', 'foo:2']
214
215        # deregister the nodes
216        yield c.agent.service.deregister('foo:1')
217        yield c.agent.service.deregister('foo:2')
218
219        yield sleep(2)
220        index, nodes = yield c.health.service('foo')
221        assert nodes == []
222
223    @pytest.inlineCallbacks
224    def test_health_service_subscribe(self, consul_port):
225        c = consul.twisted.Consul(port=consul_port)
226
227        class Config(object):
228            def __init__(self):
229                self.nodes = []
230                self.index = None
231
232            @defer.inlineCallbacks
233            def update(self):
234                self.index, nodes = yield c.health.service(
235                    'foo', index=None, passing=True)
236                self.nodes = [node['Service']['ID'] for node in nodes]
237
238        config = Config()
239        yield c.agent.service.register(
240            'foo', service_id='foo:1', check=Check.ttl('40ms'))
241        yield config.update()
242        assert config.nodes == []
243
244        # ping the service's health check
245        yield c.agent.check.ttl_pass('service:foo:1')
246        yield config.update()
247        assert config.nodes == ['foo:1']
248
249        # the service should fail
250        yield sleep(0.8)
251        yield config.update()
252        assert config.nodes == []
253
254        yield c.agent.service.deregister('foo:1')
255
256    @pytest.inlineCallbacks
257    def test_session(self, consul_port):
258        c = consul.twisted.Consul(port=consul_port)
259
260        index, services = yield c.session.list()
261        assert services == []
262
263        session_id = yield c.session.create()
264        index, services = yield c.session.list(index=index)
265        assert len(services)
266
267        response = yield c.session.destroy(session_id)
268        assert response is True
269
270        index, services = yield c.session.list(index=index)
271        assert services == []
272
273    @pytest.inlineCallbacks
274    def test_acl(self, acl_consul):
275        c = consul.twisted.Consul(
276            port=acl_consul.port, token=acl_consul.token)
277
278        rules = """
279            key "" {
280                policy = "read"
281            }
282            key "private/" {
283                policy = "deny"
284            }
285        """
286        token = yield c.acl.create(rules=rules)
287
288        raised = False
289        try:
290            yield c.acl.list(token=token)
291        except consul.ACLPermissionDenied:
292            raised = True
293        assert raised
294
295        destroyed = yield c.acl.destroy(token)
296        assert destroyed is True
297