1import os 2import random 3import shutil 4import time 5import unittest 6from ast import literal_eval 7 8from .config import tape, TweepyTestCase, use_replay, username 9from tweepy import API, FileCache, Friendship, MemoryCache 10from tweepy.parsers import Parser 11 12test_tweet_id = '266367358078169089' 13tweet_text = 'testing 1000' 14 15"""Unit tests""" 16 17 18class TweepyErrorTests(unittest.TestCase): 19 20 def testpickle(self): 21 """Verify exceptions can be pickled and unpickled.""" 22 import pickle 23 from tweepy.error import TweepError 24 25 e = TweepError('no reason', {'status': 200}) 26 e2 = pickle.loads(pickle.dumps(e)) 27 28 self.assertEqual(e.reason, e2.reason) 29 self.assertEqual(e.response, e2.response) 30 31 32class TweepyAPITests(TweepyTestCase): 33 34 #@tape.use_cassette('testfailure.json') 35 #def testapierror(self): 36 # from tweepy.error import TweepError 37 # 38 # with self.assertRaises(TweepError) as cm: 39 # self.api.direct_messages() 40 # 41 # reason, = literal_eval(cm.exception.reason) 42 # self.assertEqual(reason['message'], 'Bad Authentication data.') 43 # self.assertEqual(reason['code'], 215) 44 # self.assertEqual(cm.exception.api_code, 215) 45 46 # TODO: Actually have some sort of better assertion 47 @tape.use_cassette('testgetoembed.json') 48 def testgetoembed(self): 49 data = self.api.get_oembed("https://twitter.com/Twitter/status/" + test_tweet_id) 50 self.assertEqual(data['author_name'], "Twitter") 51 52 @tape.use_cassette('testparserargumenthastobeaparserinstance.json') 53 def testparserargumenthastobeaparserinstance(self): 54 """ Testing the issue https://github.com/tweepy/tweepy/issues/421""" 55 self.assertRaises(TypeError, API, self.auth, parser=Parser) 56 57 @tape.use_cassette('testhometimeline.json') 58 def testhometimeline(self): 59 self.api.home_timeline() 60 61 @tape.use_cassette('testusertimeline.json') 62 def testusertimeline(self): 63 self.api.user_timeline() 64 self.api.user_timeline('twitter') 65 66 @tape.use_cassette('testmentionstimeline.json') 67 def testmentionstimeline(self): 68 self.api.mentions_timeline() 69 70 @tape.use_cassette('testretweetsofme.json') 71 def testretweetsofme(self): 72 self.api.retweets_of_me() 73 74 def testretweet(self): 75 self.skipTest('Missing method to retrieve random Tweet to Retweet') 76 77 @tape.use_cassette('testretweets.json') 78 def testretweets(self): 79 self.api.retweets(test_tweet_id) 80 81 @tape.use_cassette('testretweeters.json') 82 def testretweeters(self): 83 self.api.retweeters(test_tweet_id) 84 85 @tape.use_cassette('testgetstatus.json') 86 def testgetstatus(self): 87 self.api.get_status(id=test_tweet_id) 88 89 @tape.use_cassette('testupdateanddestroystatus.json') 90 def testupdateanddestroystatus(self): 91 # test update 92 update = self.api.update_status(status=tweet_text) 93 self.assertEqual(update.text, tweet_text) 94 95 # test destroy 96 deleted = self.api.destroy_status(id=update.id) 97 self.assertEqual(deleted.id, update.id) 98 99 @tape.use_cassette('testupdateanddestroystatus.json') 100 def testupdateanddestroystatuswithoutkwarg(self): 101 # test update, passing text as a positional argument (#554) 102 update = self.api.update_status(tweet_text) 103 self.assertEqual(update.text, tweet_text) 104 105 # test destroy 106 deleted = self.api.destroy_status(id=update.id) 107 self.assertEqual(deleted.id, update.id) 108 109 @tape.use_cassette('testupdatestatuswithmedia.yaml', serializer='yaml') 110 def testupdatestatuswithmedia(self): 111 update = self.api.update_with_media('examples/banner.png', status=tweet_text) 112 self.assertIn(tweet_text + ' https://t.co', update.text) 113 114 @tape.use_cassette('testgetuser.json') 115 def testgetuser(self): 116 u = self.api.get_user('Twitter') 117 self.assertEqual(u.screen_name, 'Twitter') 118 119 u = self.api.get_user(783214) 120 self.assertEqual(u.screen_name, 'Twitter') 121 122 @tape.use_cassette('testlookupusers.json') 123 def testlookupusers(self): 124 def check(users): 125 self.assertEqual(len(users), 2) 126 check(self.api.lookup_users(user_ids=[6844292, 6253282])) 127 check(self.api.lookup_users(screen_names=['twitterapi', 'twitter'])) 128 129 @tape.use_cassette('testsearchusers.json') 130 def testsearchusers(self): 131 self.api.search_users('twitter') 132 133 @tape.use_cassette('testme.json') 134 def testme(self): 135 me = self.api.me() 136 self.assertEqual(me.screen_name, username) 137 138 @tape.use_cassette('testlistdirectmessages.json') 139 def testlistdirectmessages(self): 140 self.api.list_direct_messages() 141 142 @tape.use_cassette('testsendanddestroydirectmessage.json') 143 def testsendanddestroydirectmessage(self): 144 me = self.api.me() 145 146 # send 147 sent_dm = self.api.send_direct_message(me.id, text='test message') 148 self.assertEqual(sent_dm.message_create['message_data']['text'], 'test message') 149 self.assertEqual(int(sent_dm.message_create['sender_id']), me.id) 150 self.assertEqual(int(sent_dm.message_create['target']['recipient_id']), me.id) 151 152 # destroy 153 self.api.destroy_direct_message(sent_dm.id) 154 155 @tape.use_cassette('testcreatedestroyfriendship.json') 156 def testcreatedestroyfriendship(self): 157 enemy = self.api.destroy_friendship('Twitter') 158 self.assertEqual(enemy.screen_name, 'Twitter') 159 160 friend = self.api.create_friendship('Twitter') 161 self.assertEqual(friend.screen_name, 'Twitter') 162 163 @tape.use_cassette('testshowfriendship.json') 164 def testshowfriendship(self): 165 source, target = self.api.show_friendship(target_screen_name='twitter') 166 self.assertTrue(isinstance(source, Friendship)) 167 self.assertTrue(isinstance(target, Friendship)) 168 169 @tape.use_cassette('testfriendsids.json') 170 def testfriendsids(self): 171 self.api.friends_ids(username) 172 173 @tape.use_cassette('testfollowersids.json') 174 def testfollowersids(self): 175 self.api.followers_ids(username) 176 177 @tape.use_cassette('testfriends.json') 178 def testfriends(self): 179 self.api.friends(username) 180 181 @tape.use_cassette('testfollowers.json') 182 def testfollowers(self): 183 self.api.followers(username) 184 185 @tape.use_cassette('testverifycredentials.json') 186 def testverifycredentials(self): 187 self.assertNotEqual(self.api.verify_credentials(), False) 188 189 # make sure that `me.status.entities` is not an empty dict 190 me = self.api.verify_credentials(include_entities=True) 191 self.assertTrue(me.status.entities) 192 193 # `status` shouldn't be included 194 me = self.api.verify_credentials(skip_status=True) 195 self.assertFalse(hasattr(me, 'status')) 196 197 @tape.use_cassette('testratelimitstatus.json') 198 def testratelimitstatus(self): 199 self.api.rate_limit_status() 200 201 @tape.use_cassette('testupdateprofilecolors.json') 202 def testupdateprofilecolors(self): 203 original = self.api.me() 204 updated = self.api.update_profile(profile_link_color='D0F900') 205 206 # restore colors 207 self.api.update_profile( 208 profile_link_color=original.profile_link_color, 209 ) 210 211 self.assertEqual(updated.profile_background_color, '000000') 212 self.assertEqual(updated.profile_text_color, '000000') 213 self.assertEqual(updated.profile_link_color, 'D0F900') 214 self.assertEqual(updated.profile_sidebar_fill_color, '000000') 215 self.assertEqual(updated.profile_sidebar_border_color, '000000') 216 217 """ 218 def testupateprofileimage(self): 219 self.api.update_profile_image('examples/profile.png') 220 """ 221 222 @tape.use_cassette('testupdateprofilebannerimage.yaml', serializer='yaml') 223 def testupdateprofilebannerimage(self): 224 self.api.update_profile_banner('examples/banner.png') 225 226 @tape.use_cassette('testupdateprofile.json') 227 def testupdateprofile(self): 228 original = self.api.me() 229 profile = { 230 'name': 'Tweepy test 123', 231 'location': 'pytopia', 232 'description': 'just testing things out' 233 } 234 updated = self.api.update_profile(**profile) 235 self.api.update_profile( 236 name = original.name, url = original.url, 237 location = original.location, description = original.description 238 ) 239 240 for k,v in profile.items(): 241 if k == 'email': continue 242 self.assertEqual(getattr(updated, k), v) 243 244 @tape.use_cassette('testfavorites.json') 245 def testfavorites(self): 246 self.api.favorites() 247 248 @tape.use_cassette('testcreatedestroyfavorite.json') 249 def testcreatedestroyfavorite(self): 250 self.api.create_favorite(145344012) 251 self.api.destroy_favorite(145344012) 252 253 @tape.use_cassette('testcreatedestroyblock.json') 254 def testcreatedestroyblock(self): 255 self.api.create_block('twitter') 256 self.api.destroy_block('twitter') 257 self.api.create_friendship('twitter') # restore 258 259 @tape.use_cassette('testblocks.json') 260 def testblocks(self): 261 self.api.blocks() 262 263 @tape.use_cassette('testblocksids.json') 264 def testblocksids(self): 265 self.api.blocks_ids() 266 267 # TODO: Rewrite test to be less brittle. It fails way too often. 268 # def testcreateupdatedestroylist(self): 269 # params = { 270 # 'owner_screen_name': username, 271 # 'slug': 'tweeps' 272 # } 273 # l = self.api.create_list(name=params['slug'], **params) 274 # l = self.api.update_list(list_id=l.id, description='updated!') 275 # self.assertEqual(l.description, 'updated!') 276 # self.api.destroy_list(list_id=l.id) 277 278 @tape.use_cassette('testlistsall.json') 279 def testlistsall(self): 280 self.api.lists_all() 281 282 @tape.use_cassette('testlistsmemberships.json') 283 def testlistsmemberships(self): 284 self.api.lists_memberships() 285 286 @tape.use_cassette('testlistssubscriptions.json') 287 def testlistssubscriptions(self): 288 self.api.lists_subscriptions() 289 290 @tape.use_cassette('testlisttimeline.json') 291 def testlisttimeline(self): 292 self.api.list_timeline('Twitter', 'Official-Twitter-Accounts') 293 294 @tape.use_cassette('testgetlist.json') 295 def testgetlist(self): 296 self.api.get_list(owner_screen_name='Twitter', slug='Official-Twitter-Accounts') 297 298 @tape.use_cassette('testaddremovelistmember.json') 299 def testaddremovelistmember(self): 300 params = { 301 'slug': 'test', 302 'owner_screen_name': username, 303 'screen_name': 'twitter' 304 } 305 306 def assert_list(l): 307 self.assertEqual(l.name, params['slug']) 308 309 assert_list(self.api.add_list_member(**params)) 310 assert_list(self.api.remove_list_member(**params)) 311 312 @tape.use_cassette('testaddremovelistmembers.json') 313 def testaddremovelistmembers(self): 314 params = { 315 'slug': 'test', 316 'owner_screen_name': username, 317 'screen_name': ['Twitter', 'TwitterAPI'] 318 } 319 320 def assert_list(l): 321 self.assertEqual(l.name, params['slug']) 322 323 assert_list(self.api.add_list_members(**params)) 324 assert_list(self.api.remove_list_members(**params)) 325 326 @tape.use_cassette('testlistmembers.json') 327 def testlistmembers(self): 328 self.api.list_members('Twitter', 'Official-Twitter-Accounts') 329 330 @tape.use_cassette('testshowlistmember.json') 331 def testshowlistmember(self): 332 self.assertTrue(self.api.show_list_member(owner_screen_name='Twitter', slug='Official-Twitter-Accounts', screen_name='TwitterAPI')) 333 334 @tape.use_cassette('testsubscribeunsubscribelist.json') 335 def testsubscribeunsubscribelist(self): 336 params = { 337 'owner_screen_name': 'Twitter', 338 'slug': 'Official-Twitter-Accounts' 339 } 340 self.api.subscribe_list(**params) 341 self.api.unsubscribe_list(**params) 342 343 @tape.use_cassette('testlistsubscribers.json') 344 def testlistsubscribers(self): 345 self.api.list_subscribers('Twitter', 'Official-Twitter-Accounts') 346 347 @tape.use_cassette('testshowlistsubscriber.json') 348 def testshowlistsubscriber(self): 349 self.assertTrue(self.api.show_list_subscriber('Twitter', 'Official-Twitter-Accounts', 'TwitterMktg')) 350 351 @tape.use_cassette('testsavedsearches.json') 352 def testsavedsearches(self): 353 s = self.api.create_saved_search('test') 354 self.api.saved_searches() 355 self.assertEqual(self.api.get_saved_search(s.id).query, 'test') 356 self.api.destroy_saved_search(s.id) 357 358 @tape.use_cassette('testsearch.json') 359 def testsearch(self): 360 self.api.search('tweepy') 361 362 @tape.use_cassette('testgeoapis.json') 363 def testgeoapis(self): 364 def place_name_in_list(place_name, place_list): 365 """Return True if a given place_name is in place_list.""" 366 return any(x.full_name.lower() == place_name.lower() for x in place_list) 367 368 twitter_hq = self.api.geo_similar_places(lat='37.7821120598956', 369 long='-122.400612831116', 370 name='South of Market Child Care') 371 # Assumes that twitter_hq is first Place returned... 372 self.assertEqual(twitter_hq[0].id, '1d019624e6b4dcff') 373 # Test various API functions using Austin, TX, USA 374 self.assertEqual(self.api.geo_id(id='1ffd3558f2e98349').full_name, 'Dogpatch, San Francisco') 375 self.assertTrue(place_name_in_list('Austin, TX', 376 self.api.reverse_geocode(lat=30.2673701685, long= -97.7426147461))) # Austin, TX, USA 377 378 @tape.use_cassette('testsupportedlanguages.json') 379 def testsupportedlanguages(self): 380 languages = self.api.supported_languages() 381 expected_dict = { 382 "code": "en", 383 "debug": False, 384 "local_name": "English", 385 "name": "English", 386 "status": "production" 387 } 388 self.assertTrue(expected_dict in languages) 389 390 @tape.use_cassette('testcachedresult.json') 391 def testcachedresult(self): 392 self.api.cache = MemoryCache() 393 self.api.home_timeline() 394 self.assertFalse(self.api.cached_result) 395 self.api.home_timeline() 396 self.assertTrue(self.api.cached_result) 397 398 @tape.use_cassette('testcachedresult.json') 399 def testcachedifferentqueryparameters(self): 400 self.api.cache = MemoryCache() 401 402 user1 = self.api.get_user('TweepyDev') 403 self.assertFalse(self.api.cached_result) 404 self.assertEqual('TweepyDev', user1.screen_name) 405 406 user2 = self.api.get_user('Twitter') 407 self.assertEqual('Twitter', user2.screen_name) 408 self.assertFalse(self.api.cached_result) 409 410 411 412class TweepyCacheTests(unittest.TestCase): 413 timeout = 0.5 414 memcache_servers = ['127.0.0.1:11211'] # must be running for test to pass 415 416 def _run_tests(self, do_cleanup=True): 417 # test store and get 418 self.cache.store('testkey', 'testvalue') 419 self.assertEqual(self.cache.get('testkey'), 'testvalue', 420 'Stored value does not match retrieved value') 421 422 # test timeout 423 time.sleep(self.timeout) 424 self.assertEqual(self.cache.get('testkey'), None, 425 'Cache entry should have expired') 426 427 # test cleanup 428 if do_cleanup: 429 self.cache.store('testkey', 'testvalue') 430 time.sleep(self.timeout) 431 self.cache.cleanup() 432 self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed') 433 434 # test count 435 for i in range(0, 20): 436 self.cache.store('testkey%i' % i, 'testvalue') 437 self.assertEqual(self.cache.count(), 20, 'Count is wrong') 438 439 # test flush 440 self.cache.flush() 441 self.assertEqual(self.cache.count(), 0, 'Cache failed to flush') 442 443 def testmemorycache(self): 444 self.cache = MemoryCache(timeout=self.timeout) 445 self._run_tests() 446 447 def testfilecache(self): 448 os.mkdir('cache_test_dir') 449 try: 450 self.cache = FileCache('cache_test_dir', self.timeout) 451 self._run_tests() 452 self.cache.flush() 453 finally: 454 if os.path.exists('cache_test_dir'): 455 shutil.rmtree('cache_test_dir') 456 457if __name__ == '__main__': 458 unittest.main() 459