1import os
2import random
3import shutil
4import time
5import unittest
6from ast import literal_eval
7
8from .config import tape, TweepyTestCase, use_replay, username
9from tweepy import API, FileCache, Friendship, MemoryCache
10from tweepy.parsers import Parser
11
12test_tweet_id = '266367358078169089'
13tweet_text = 'testing 1000'
14
15"""Unit tests"""
16
17
18class TweepyErrorTests(unittest.TestCase):
19
20    def testpickle(self):
21        """Verify exceptions can be pickled and unpickled."""
22        import pickle
23        from tweepy.error import TweepError
24
25        e = TweepError('no reason', {'status': 200})
26        e2 = pickle.loads(pickle.dumps(e))
27
28        self.assertEqual(e.reason, e2.reason)
29        self.assertEqual(e.response, e2.response)
30
31
32class TweepyAPITests(TweepyTestCase):
33
34    #@tape.use_cassette('testfailure.json')
35    #def testapierror(self):
36    #    from tweepy.error import TweepError
37    #
38    #    with self.assertRaises(TweepError) as cm:
39    #        self.api.direct_messages()
40    #
41    #    reason, = literal_eval(cm.exception.reason)
42    #    self.assertEqual(reason['message'], 'Bad Authentication data.')
43    #    self.assertEqual(reason['code'], 215)
44    #    self.assertEqual(cm.exception.api_code, 215)
45
46    # TODO: Actually have some sort of better assertion
47    @tape.use_cassette('testgetoembed.json')
48    def testgetoembed(self):
49        data = self.api.get_oembed("https://twitter.com/Twitter/status/" + test_tweet_id)
50        self.assertEqual(data['author_name'], "Twitter")
51
52    @tape.use_cassette('testparserargumenthastobeaparserinstance.json')
53    def testparserargumenthastobeaparserinstance(self):
54        """ Testing the issue https://github.com/tweepy/tweepy/issues/421"""
55        self.assertRaises(TypeError, API, self.auth, parser=Parser)
56
57    @tape.use_cassette('testhometimeline.json')
58    def testhometimeline(self):
59        self.api.home_timeline()
60
61    @tape.use_cassette('testusertimeline.json')
62    def testusertimeline(self):
63        self.api.user_timeline()
64        self.api.user_timeline('twitter')
65
66    @tape.use_cassette('testmentionstimeline.json')
67    def testmentionstimeline(self):
68        self.api.mentions_timeline()
69
70    @tape.use_cassette('testretweetsofme.json')
71    def testretweetsofme(self):
72        self.api.retweets_of_me()
73
74    def testretweet(self):
75        self.skipTest('Missing method to retrieve random Tweet to Retweet')
76
77    @tape.use_cassette('testretweets.json')
78    def testretweets(self):
79        self.api.retweets(test_tweet_id)
80
81    @tape.use_cassette('testretweeters.json')
82    def testretweeters(self):
83        self.api.retweeters(test_tweet_id)
84
85    @tape.use_cassette('testgetstatus.json')
86    def testgetstatus(self):
87        self.api.get_status(id=test_tweet_id)
88
89    @tape.use_cassette('testupdateanddestroystatus.json')
90    def testupdateanddestroystatus(self):
91        # test update
92        update = self.api.update_status(status=tweet_text)
93        self.assertEqual(update.text, tweet_text)
94
95        # test destroy
96        deleted = self.api.destroy_status(id=update.id)
97        self.assertEqual(deleted.id, update.id)
98
99    @tape.use_cassette('testupdateanddestroystatus.json')
100    def testupdateanddestroystatuswithoutkwarg(self):
101        # test update, passing text as a positional argument (#554)
102        update = self.api.update_status(tweet_text)
103        self.assertEqual(update.text, tweet_text)
104
105        # test destroy
106        deleted = self.api.destroy_status(id=update.id)
107        self.assertEqual(deleted.id, update.id)
108
109    @tape.use_cassette('testupdatestatuswithmedia.yaml', serializer='yaml')
110    def testupdatestatuswithmedia(self):
111        update = self.api.update_with_media('examples/banner.png', status=tweet_text)
112        self.assertIn(tweet_text + ' https://t.co', update.text)
113
114    @tape.use_cassette('testgetuser.json')
115    def testgetuser(self):
116        u = self.api.get_user('Twitter')
117        self.assertEqual(u.screen_name, 'Twitter')
118
119        u = self.api.get_user(783214)
120        self.assertEqual(u.screen_name, 'Twitter')
121
122    @tape.use_cassette('testlookupusers.json')
123    def testlookupusers(self):
124        def check(users):
125            self.assertEqual(len(users), 2)
126        check(self.api.lookup_users(user_ids=[6844292, 6253282]))
127        check(self.api.lookup_users(screen_names=['twitterapi', 'twitter']))
128
129    @tape.use_cassette('testsearchusers.json')
130    def testsearchusers(self):
131        self.api.search_users('twitter')
132
133    @tape.use_cassette('testme.json')
134    def testme(self):
135        me = self.api.me()
136        self.assertEqual(me.screen_name, username)
137
138    @tape.use_cassette('testlistdirectmessages.json')
139    def testlistdirectmessages(self):
140        self.api.list_direct_messages()
141
142    @tape.use_cassette('testsendanddestroydirectmessage.json')
143    def testsendanddestroydirectmessage(self):
144        me = self.api.me()
145
146        # send
147        sent_dm = self.api.send_direct_message(me.id, text='test message')
148        self.assertEqual(sent_dm.message_create['message_data']['text'], 'test message')
149        self.assertEqual(int(sent_dm.message_create['sender_id']), me.id)
150        self.assertEqual(int(sent_dm.message_create['target']['recipient_id']), me.id)
151
152        # destroy
153        self.api.destroy_direct_message(sent_dm.id)
154
155    @tape.use_cassette('testcreatedestroyfriendship.json')
156    def testcreatedestroyfriendship(self):
157        enemy = self.api.destroy_friendship('Twitter')
158        self.assertEqual(enemy.screen_name, 'Twitter')
159
160        friend = self.api.create_friendship('Twitter')
161        self.assertEqual(friend.screen_name, 'Twitter')
162
163    @tape.use_cassette('testshowfriendship.json')
164    def testshowfriendship(self):
165        source, target = self.api.show_friendship(target_screen_name='twitter')
166        self.assertTrue(isinstance(source, Friendship))
167        self.assertTrue(isinstance(target, Friendship))
168
169    @tape.use_cassette('testfriendsids.json')
170    def testfriendsids(self):
171        self.api.friends_ids(username)
172
173    @tape.use_cassette('testfollowersids.json')
174    def testfollowersids(self):
175        self.api.followers_ids(username)
176
177    @tape.use_cassette('testfriends.json')
178    def testfriends(self):
179        self.api.friends(username)
180
181    @tape.use_cassette('testfollowers.json')
182    def testfollowers(self):
183        self.api.followers(username)
184
185    @tape.use_cassette('testverifycredentials.json')
186    def testverifycredentials(self):
187        self.assertNotEqual(self.api.verify_credentials(), False)
188
189        # make sure that `me.status.entities` is not an empty dict
190        me = self.api.verify_credentials(include_entities=True)
191        self.assertTrue(me.status.entities)
192
193        # `status` shouldn't be included
194        me = self.api.verify_credentials(skip_status=True)
195        self.assertFalse(hasattr(me, 'status'))
196
197    @tape.use_cassette('testratelimitstatus.json')
198    def testratelimitstatus(self):
199        self.api.rate_limit_status()
200
201    @tape.use_cassette('testupdateprofilecolors.json')
202    def testupdateprofilecolors(self):
203        original = self.api.me()
204        updated = self.api.update_profile(profile_link_color='D0F900')
205
206        # restore colors
207        self.api.update_profile(
208            profile_link_color=original.profile_link_color,
209        )
210
211        self.assertEqual(updated.profile_background_color, '000000')
212        self.assertEqual(updated.profile_text_color, '000000')
213        self.assertEqual(updated.profile_link_color, 'D0F900')
214        self.assertEqual(updated.profile_sidebar_fill_color, '000000')
215        self.assertEqual(updated.profile_sidebar_border_color, '000000')
216
217    """
218    def testupateprofileimage(self):
219        self.api.update_profile_image('examples/profile.png')
220    """
221
222    @tape.use_cassette('testupdateprofilebannerimage.yaml', serializer='yaml')
223    def testupdateprofilebannerimage(self):
224        self.api.update_profile_banner('examples/banner.png')
225
226    @tape.use_cassette('testupdateprofile.json')
227    def testupdateprofile(self):
228        original = self.api.me()
229        profile = {
230            'name': 'Tweepy test 123',
231            'location': 'pytopia',
232            'description': 'just testing things out'
233        }
234        updated = self.api.update_profile(**profile)
235        self.api.update_profile(
236            name = original.name, url = original.url,
237            location = original.location, description = original.description
238        )
239
240        for k,v in profile.items():
241            if k == 'email': continue
242            self.assertEqual(getattr(updated, k), v)
243
244    @tape.use_cassette('testfavorites.json')
245    def testfavorites(self):
246        self.api.favorites()
247
248    @tape.use_cassette('testcreatedestroyfavorite.json')
249    def testcreatedestroyfavorite(self):
250        self.api.create_favorite(145344012)
251        self.api.destroy_favorite(145344012)
252
253    @tape.use_cassette('testcreatedestroyblock.json')
254    def testcreatedestroyblock(self):
255        self.api.create_block('twitter')
256        self.api.destroy_block('twitter')
257        self.api.create_friendship('twitter')  # restore
258
259    @tape.use_cassette('testblocks.json')
260    def testblocks(self):
261        self.api.blocks()
262
263    @tape.use_cassette('testblocksids.json')
264    def testblocksids(self):
265        self.api.blocks_ids()
266
267    # TODO: Rewrite test to be less brittle. It fails way too often.
268    # def testcreateupdatedestroylist(self):
269    #     params = {
270    #         'owner_screen_name': username,
271    #         'slug': 'tweeps'
272    #     }
273    #     l = self.api.create_list(name=params['slug'], **params)
274    #     l = self.api.update_list(list_id=l.id, description='updated!')
275    #     self.assertEqual(l.description, 'updated!')
276    #     self.api.destroy_list(list_id=l.id)
277
278    @tape.use_cassette('testlistsall.json')
279    def testlistsall(self):
280        self.api.lists_all()
281
282    @tape.use_cassette('testlistsmemberships.json')
283    def testlistsmemberships(self):
284        self.api.lists_memberships()
285
286    @tape.use_cassette('testlistssubscriptions.json')
287    def testlistssubscriptions(self):
288        self.api.lists_subscriptions()
289
290    @tape.use_cassette('testlisttimeline.json')
291    def testlisttimeline(self):
292        self.api.list_timeline('Twitter', 'Official-Twitter-Accounts')
293
294    @tape.use_cassette('testgetlist.json')
295    def testgetlist(self):
296        self.api.get_list(owner_screen_name='Twitter', slug='Official-Twitter-Accounts')
297
298    @tape.use_cassette('testaddremovelistmember.json')
299    def testaddremovelistmember(self):
300        params = {
301            'slug': 'test',
302            'owner_screen_name': username,
303            'screen_name': 'twitter'
304        }
305
306        def assert_list(l):
307            self.assertEqual(l.name, params['slug'])
308
309        assert_list(self.api.add_list_member(**params))
310        assert_list(self.api.remove_list_member(**params))
311
312    @tape.use_cassette('testaddremovelistmembers.json')
313    def testaddremovelistmembers(self):
314        params = {
315            'slug': 'test',
316            'owner_screen_name': username,
317            'screen_name': ['Twitter', 'TwitterAPI']
318        }
319
320        def assert_list(l):
321            self.assertEqual(l.name, params['slug'])
322
323        assert_list(self.api.add_list_members(**params))
324        assert_list(self.api.remove_list_members(**params))
325
326    @tape.use_cassette('testlistmembers.json')
327    def testlistmembers(self):
328        self.api.list_members('Twitter', 'Official-Twitter-Accounts')
329
330    @tape.use_cassette('testshowlistmember.json')
331    def testshowlistmember(self):
332        self.assertTrue(self.api.show_list_member(owner_screen_name='Twitter', slug='Official-Twitter-Accounts', screen_name='TwitterAPI'))
333
334    @tape.use_cassette('testsubscribeunsubscribelist.json')
335    def testsubscribeunsubscribelist(self):
336        params = {
337            'owner_screen_name': 'Twitter',
338            'slug': 'Official-Twitter-Accounts'
339        }
340        self.api.subscribe_list(**params)
341        self.api.unsubscribe_list(**params)
342
343    @tape.use_cassette('testlistsubscribers.json')
344    def testlistsubscribers(self):
345        self.api.list_subscribers('Twitter', 'Official-Twitter-Accounts')
346
347    @tape.use_cassette('testshowlistsubscriber.json')
348    def testshowlistsubscriber(self):
349        self.assertTrue(self.api.show_list_subscriber('Twitter', 'Official-Twitter-Accounts', 'TwitterMktg'))
350
351    @tape.use_cassette('testsavedsearches.json')
352    def testsavedsearches(self):
353        s = self.api.create_saved_search('test')
354        self.api.saved_searches()
355        self.assertEqual(self.api.get_saved_search(s.id).query, 'test')
356        self.api.destroy_saved_search(s.id)
357
358    @tape.use_cassette('testsearch.json')
359    def testsearch(self):
360        self.api.search('tweepy')
361
362    @tape.use_cassette('testgeoapis.json')
363    def testgeoapis(self):
364        def place_name_in_list(place_name, place_list):
365            """Return True if a given place_name is in place_list."""
366            return any(x.full_name.lower() == place_name.lower() for x in place_list)
367
368        twitter_hq = self.api.geo_similar_places(lat='37.7821120598956',
369                                                 long='-122.400612831116',
370                                                 name='South of Market Child Care')
371        # Assumes that twitter_hq is first Place returned...
372        self.assertEqual(twitter_hq[0].id, '1d019624e6b4dcff')
373        # Test various API functions using Austin, TX, USA
374        self.assertEqual(self.api.geo_id(id='1ffd3558f2e98349').full_name, 'Dogpatch, San Francisco')
375        self.assertTrue(place_name_in_list('Austin, TX',
376            self.api.reverse_geocode(lat=30.2673701685, long= -97.7426147461)))  # Austin, TX, USA
377
378    @tape.use_cassette('testsupportedlanguages.json')
379    def testsupportedlanguages(self):
380        languages = self.api.supported_languages()
381        expected_dict = {
382            "code": "en",
383            "debug": False,
384            "local_name": "English",
385            "name": "English",
386            "status": "production"
387        }
388        self.assertTrue(expected_dict in languages)
389
390    @tape.use_cassette('testcachedresult.json')
391    def testcachedresult(self):
392        self.api.cache = MemoryCache()
393        self.api.home_timeline()
394        self.assertFalse(self.api.cached_result)
395        self.api.home_timeline()
396        self.assertTrue(self.api.cached_result)
397
398    @tape.use_cassette('testcachedresult.json')
399    def testcachedifferentqueryparameters(self):
400        self.api.cache = MemoryCache()
401
402        user1 = self.api.get_user('TweepyDev')
403        self.assertFalse(self.api.cached_result)
404        self.assertEqual('TweepyDev', user1.screen_name)
405
406        user2 = self.api.get_user('Twitter')
407        self.assertEqual('Twitter', user2.screen_name)
408        self.assertFalse(self.api.cached_result)
409
410
411
412class TweepyCacheTests(unittest.TestCase):
413    timeout = 0.5
414    memcache_servers = ['127.0.0.1:11211']  # must be running for test to pass
415
416    def _run_tests(self, do_cleanup=True):
417        # test store and get
418        self.cache.store('testkey', 'testvalue')
419        self.assertEqual(self.cache.get('testkey'), 'testvalue',
420            'Stored value does not match retrieved value')
421
422        # test timeout
423        time.sleep(self.timeout)
424        self.assertEqual(self.cache.get('testkey'), None,
425            'Cache entry should have expired')
426
427        # test cleanup
428        if do_cleanup:
429            self.cache.store('testkey', 'testvalue')
430            time.sleep(self.timeout)
431            self.cache.cleanup()
432            self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed')
433
434        # test count
435        for i in range(0, 20):
436            self.cache.store('testkey%i' % i, 'testvalue')
437        self.assertEqual(self.cache.count(), 20, 'Count is wrong')
438
439        # test flush
440        self.cache.flush()
441        self.assertEqual(self.cache.count(), 0, 'Cache failed to flush')
442
443    def testmemorycache(self):
444        self.cache = MemoryCache(timeout=self.timeout)
445        self._run_tests()
446
447    def testfilecache(self):
448        os.mkdir('cache_test_dir')
449        try:
450            self.cache = FileCache('cache_test_dir', self.timeout)
451            self._run_tests()
452            self.cache.flush()
453        finally:
454            if os.path.exists('cache_test_dir'):
455                shutil.rmtree('cache_test_dir')
456
457if __name__ == '__main__':
458    unittest.main()
459