1from __future__ import absolute_import, unicode_literals
2
3from datetime import datetime
4from pickle import dumps, loads
5
6import pytest
7from case import Mock, mock
8
9from celery import states
10from celery.exceptions import ImproperlyConfigured
11from celery.utils.objects import Bunch
12
13CASSANDRA_MODULES = [
14    'cassandra',
15    'cassandra.auth',
16    'cassandra.cluster',
17    'cassandra.query',
18]
19
20
21@mock.module(*CASSANDRA_MODULES)
22class test_CassandraBackend:
23
24    def setup(self):
25        self.app.conf.update(
26            cassandra_servers=['example.com'],
27            cassandra_keyspace='celery',
28            cassandra_table='task_results',
29        )
30
31    def test_init_no_cassandra(self, *modules):
32        # should raise ImproperlyConfigured when no python-driver
33        # installed.
34        from celery.backends import cassandra as mod
35        prev, mod.cassandra = mod.cassandra, None
36        try:
37            with pytest.raises(ImproperlyConfigured):
38                mod.CassandraBackend(app=self.app)
39        finally:
40            mod.cassandra = prev
41
42    def test_init_with_and_without_LOCAL_QUROM(self, *modules):
43        from celery.backends import cassandra as mod
44        mod.cassandra = Mock()
45
46        cons = mod.cassandra.ConsistencyLevel = Bunch(
47            LOCAL_QUORUM='foo',
48        )
49
50        self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
51        self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'
52
53        mod.CassandraBackend(app=self.app)
54        cons.LOCAL_FOO = 'bar'
55        mod.CassandraBackend(app=self.app)
56
57        # no servers raises ImproperlyConfigured
58        with pytest.raises(ImproperlyConfigured):
59            self.app.conf.cassandra_servers = None
60            mod.CassandraBackend(
61                app=self.app, keyspace='b', column_family='c',
62            )
63
64    @pytest.mark.usefixtures('depends_on_current_app')
65    def test_reduce(self, *modules):
66        from celery.backends.cassandra import CassandraBackend
67        assert loads(dumps(CassandraBackend(app=self.app)))
68
69    def test_get_task_meta_for(self, *modules):
70        from celery.backends import cassandra as mod
71        mod.cassandra = Mock()
72
73        x = mod.CassandraBackend(app=self.app)
74        session = x._session = Mock()
75        execute = session.execute = Mock()
76        result_set = Mock()
77        result_set.one.return_value = [
78            states.SUCCESS, '1', datetime.now(), b'', b''
79        ]
80        execute.return_value = result_set
81        x.decode = Mock()
82        meta = x._get_task_meta_for('task_id')
83        assert meta['status'] == states.SUCCESS
84
85        result_set.one.return_value = []
86        x._session.execute.return_value = result_set
87        meta = x._get_task_meta_for('task_id')
88        assert meta['status'] == states.PENDING
89
90    def test_as_uri(self):
91        # Just ensure as_uri works properly
92        from celery.backends import cassandra as mod
93        mod.cassandra = Mock()
94
95        x = mod.CassandraBackend(app=self.app)
96        x.as_uri()
97        x.as_uri(include_password=False)
98
99    def test_store_result(self, *modules):
100        from celery.backends import cassandra as mod
101        mod.cassandra = Mock()
102
103        x = mod.CassandraBackend(app=self.app)
104        session = x._session = Mock()
105        session.execute = Mock()
106        x._store_result('task_id', 'result', states.SUCCESS)
107
108    def test_timeouting_cluster(self):
109        # Tests behavior when Cluster.connect raises
110        # cassandra.OperationTimedOut.
111        from celery.backends import cassandra as mod
112
113        class OTOExc(Exception):
114            pass
115
116        class VeryFaultyCluster(object):
117            def __init__(self, *args, **kwargs):
118                pass
119
120            def connect(self, *args, **kwargs):
121                raise OTOExc()
122
123            def shutdown(self):
124                pass
125
126        mod.cassandra = Mock()
127        mod.cassandra.OperationTimedOut = OTOExc
128        mod.cassandra.cluster = Mock()
129        mod.cassandra.cluster.Cluster = VeryFaultyCluster
130
131        x = mod.CassandraBackend(app=self.app)
132
133        with pytest.raises(OTOExc):
134            x._store_result('task_id', 'result', states.SUCCESS)
135        assert x._cluster is None
136        assert x._session is None
137
138    def test_create_result_table(self):
139        # Tests behavior when session.execute raises
140        # cassandra.AlreadyExists.
141        from celery.backends import cassandra as mod
142
143        class OTOExc(Exception):
144            pass
145
146        class FaultySession(object):
147            def __init__(self, *args, **kwargs):
148                pass
149
150            def execute(self, *args, **kwargs):
151                raise OTOExc()
152
153        class DummyCluster(object):
154
155            def __init__(self, *args, **kwargs):
156                pass
157
158            def connect(self, *args, **kwargs):
159                return FaultySession()
160
161        mod.cassandra = Mock()
162        mod.cassandra.cluster = Mock()
163        mod.cassandra.cluster.Cluster = DummyCluster
164        mod.cassandra.AlreadyExists = OTOExc
165
166        x = mod.CassandraBackend(app=self.app)
167        x._get_connection(write=True)
168        assert x._session is not None
169
170    def test_init_session(self):
171        # Tests behavior when Cluster.connect works properly
172        from celery.backends import cassandra as mod
173
174        class DummyCluster(object):
175
176            def __init__(self, *args, **kwargs):
177                pass
178
179            def connect(self, *args, **kwargs):
180                return Mock()
181
182        mod.cassandra = Mock()
183        mod.cassandra.cluster = Mock()
184        mod.cassandra.cluster.Cluster = DummyCluster
185
186        x = mod.CassandraBackend(app=self.app)
187        assert x._session is None
188        x._get_connection(write=True)
189        assert x._session is not None
190
191        s = x._session
192        x._get_connection()
193        assert s is x._session
194
195    def test_auth_provider(self):
196        # Ensure valid auth_provider works properly, and invalid one raises
197        # ImproperlyConfigured exception.
198        from celery.backends import cassandra as mod
199
200        class DummyAuth(object):
201            ValidAuthProvider = Mock()
202
203        mod.cassandra = Mock()
204        mod.cassandra.auth = DummyAuth
205
206        # Valid auth_provider
207        self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'
208        self.app.conf.cassandra_auth_kwargs = {
209            'username': 'stuff'
210        }
211        mod.CassandraBackend(app=self.app)
212
213        # Invalid auth_provider
214        self.app.conf.cassandra_auth_provider = 'SpiderManAuth'
215        self.app.conf.cassandra_auth_kwargs = {
216            'username': 'Jack'
217        }
218        with pytest.raises(ImproperlyConfigured):
219            mod.CassandraBackend(app=self.app)
220
221    def test_options(self):
222        # Ensure valid options works properly
223        from celery.backends import cassandra as mod
224
225        mod.cassandra = Mock()
226        # Valid options
227        self.app.conf.cassandra_options = {
228            'cql_version': '3.2.1',
229            'protocol_version': 3
230        }
231        mod.CassandraBackend(app=self.app)
232