1from __future__ import absolute_import, unicode_literals 2 3from datetime import datetime 4from pickle import dumps, loads 5 6import pytest 7from case import Mock, mock 8 9from celery import states 10from celery.exceptions import ImproperlyConfigured 11from celery.utils.objects import Bunch 12 13CASSANDRA_MODULES = [ 14 'cassandra', 15 'cassandra.auth', 16 'cassandra.cluster', 17 'cassandra.query', 18] 19 20 21@mock.module(*CASSANDRA_MODULES) 22class test_CassandraBackend: 23 24 def setup(self): 25 self.app.conf.update( 26 cassandra_servers=['example.com'], 27 cassandra_keyspace='celery', 28 cassandra_table='task_results', 29 ) 30 31 def test_init_no_cassandra(self, *modules): 32 # should raise ImproperlyConfigured when no python-driver 33 # installed. 34 from celery.backends import cassandra as mod 35 prev, mod.cassandra = mod.cassandra, None 36 try: 37 with pytest.raises(ImproperlyConfigured): 38 mod.CassandraBackend(app=self.app) 39 finally: 40 mod.cassandra = prev 41 42 def test_init_with_and_without_LOCAL_QUROM(self, *modules): 43 from celery.backends import cassandra as mod 44 mod.cassandra = Mock() 45 46 cons = mod.cassandra.ConsistencyLevel = Bunch( 47 LOCAL_QUORUM='foo', 48 ) 49 50 self.app.conf.cassandra_read_consistency = 'LOCAL_FOO' 51 self.app.conf.cassandra_write_consistency = 'LOCAL_FOO' 52 53 mod.CassandraBackend(app=self.app) 54 cons.LOCAL_FOO = 'bar' 55 mod.CassandraBackend(app=self.app) 56 57 # no servers raises ImproperlyConfigured 58 with pytest.raises(ImproperlyConfigured): 59 self.app.conf.cassandra_servers = None 60 mod.CassandraBackend( 61 app=self.app, keyspace='b', column_family='c', 62 ) 63 64 @pytest.mark.usefixtures('depends_on_current_app') 65 def test_reduce(self, *modules): 66 from celery.backends.cassandra import CassandraBackend 67 assert loads(dumps(CassandraBackend(app=self.app))) 68 69 def test_get_task_meta_for(self, *modules): 70 from celery.backends import cassandra as mod 71 mod.cassandra = Mock() 72 73 x = mod.CassandraBackend(app=self.app) 74 session = x._session = Mock() 75 execute = session.execute = Mock() 76 result_set = Mock() 77 result_set.one.return_value = [ 78 states.SUCCESS, '1', datetime.now(), b'', b'' 79 ] 80 execute.return_value = result_set 81 x.decode = Mock() 82 meta = x._get_task_meta_for('task_id') 83 assert meta['status'] == states.SUCCESS 84 85 result_set.one.return_value = [] 86 x._session.execute.return_value = result_set 87 meta = x._get_task_meta_for('task_id') 88 assert meta['status'] == states.PENDING 89 90 def test_as_uri(self): 91 # Just ensure as_uri works properly 92 from celery.backends import cassandra as mod 93 mod.cassandra = Mock() 94 95 x = mod.CassandraBackend(app=self.app) 96 x.as_uri() 97 x.as_uri(include_password=False) 98 99 def test_store_result(self, *modules): 100 from celery.backends import cassandra as mod 101 mod.cassandra = Mock() 102 103 x = mod.CassandraBackend(app=self.app) 104 session = x._session = Mock() 105 session.execute = Mock() 106 x._store_result('task_id', 'result', states.SUCCESS) 107 108 def test_timeouting_cluster(self): 109 # Tests behavior when Cluster.connect raises 110 # cassandra.OperationTimedOut. 111 from celery.backends import cassandra as mod 112 113 class OTOExc(Exception): 114 pass 115 116 class VeryFaultyCluster(object): 117 def __init__(self, *args, **kwargs): 118 pass 119 120 def connect(self, *args, **kwargs): 121 raise OTOExc() 122 123 def shutdown(self): 124 pass 125 126 mod.cassandra = Mock() 127 mod.cassandra.OperationTimedOut = OTOExc 128 mod.cassandra.cluster = Mock() 129 mod.cassandra.cluster.Cluster = VeryFaultyCluster 130 131 x = mod.CassandraBackend(app=self.app) 132 133 with pytest.raises(OTOExc): 134 x._store_result('task_id', 'result', states.SUCCESS) 135 assert x._cluster is None 136 assert x._session is None 137 138 def test_create_result_table(self): 139 # Tests behavior when session.execute raises 140 # cassandra.AlreadyExists. 141 from celery.backends import cassandra as mod 142 143 class OTOExc(Exception): 144 pass 145 146 class FaultySession(object): 147 def __init__(self, *args, **kwargs): 148 pass 149 150 def execute(self, *args, **kwargs): 151 raise OTOExc() 152 153 class DummyCluster(object): 154 155 def __init__(self, *args, **kwargs): 156 pass 157 158 def connect(self, *args, **kwargs): 159 return FaultySession() 160 161 mod.cassandra = Mock() 162 mod.cassandra.cluster = Mock() 163 mod.cassandra.cluster.Cluster = DummyCluster 164 mod.cassandra.AlreadyExists = OTOExc 165 166 x = mod.CassandraBackend(app=self.app) 167 x._get_connection(write=True) 168 assert x._session is not None 169 170 def test_init_session(self): 171 # Tests behavior when Cluster.connect works properly 172 from celery.backends import cassandra as mod 173 174 class DummyCluster(object): 175 176 def __init__(self, *args, **kwargs): 177 pass 178 179 def connect(self, *args, **kwargs): 180 return Mock() 181 182 mod.cassandra = Mock() 183 mod.cassandra.cluster = Mock() 184 mod.cassandra.cluster.Cluster = DummyCluster 185 186 x = mod.CassandraBackend(app=self.app) 187 assert x._session is None 188 x._get_connection(write=True) 189 assert x._session is not None 190 191 s = x._session 192 x._get_connection() 193 assert s is x._session 194 195 def test_auth_provider(self): 196 # Ensure valid auth_provider works properly, and invalid one raises 197 # ImproperlyConfigured exception. 198 from celery.backends import cassandra as mod 199 200 class DummyAuth(object): 201 ValidAuthProvider = Mock() 202 203 mod.cassandra = Mock() 204 mod.cassandra.auth = DummyAuth 205 206 # Valid auth_provider 207 self.app.conf.cassandra_auth_provider = 'ValidAuthProvider' 208 self.app.conf.cassandra_auth_kwargs = { 209 'username': 'stuff' 210 } 211 mod.CassandraBackend(app=self.app) 212 213 # Invalid auth_provider 214 self.app.conf.cassandra_auth_provider = 'SpiderManAuth' 215 self.app.conf.cassandra_auth_kwargs = { 216 'username': 'Jack' 217 } 218 with pytest.raises(ImproperlyConfigured): 219 mod.CassandraBackend(app=self.app) 220 221 def test_options(self): 222 # Ensure valid options works properly 223 from celery.backends import cassandra as mod 224 225 mod.cassandra = Mock() 226 # Valid options 227 self.app.conf.cassandra_options = { 228 'cql_version': '3.2.1', 229 'protocol_version': 3 230 } 231 mod.CassandraBackend(app=self.app) 232