1# -*- coding: utf-8 -*- 2import time 3 4from django.test import TestCase 5from django.test.utils import override_settings 6 7from django_extensions.logging.filters import RateLimiterFilter 8 9from unittest import mock 10 11 12TEST_SUBJECT = 'test_subect' 13 14 15class RateLimiterFilterTests(TestCase): 16 """Tests for RateLimiterFilter.""" 17 18 def setUp(self): 19 self.rate_limiter_filter = RateLimiterFilter() 20 self.record = mock.Mock(msg=TEST_SUBJECT) 21 self.record.getMessage.return_value = TEST_SUBJECT.encode() 22 self.time_patch = mock.patch.object(time, 'time', return_value='test_time') 23 self.time_patch.start() 24 25 def tearDown(self): 26 self.time_patch.stop() 27 28 @override_settings(RATE_LIMITER_FILTER_PREFIX='test_prefix') 29 @mock.patch('django.core.cache.cache') 30 def test_should_incr_cache_with_custom_prefix_and_return_False(self, m_cache): 31 m_cache.get_many.return_value = { 32 'test_prefix:114392702498ad1d75c1829b9519b8c7': 10, 33 'test_prefix:114392702498ad1d75c1829b9519b8c7:count': 1, 34 } 35 36 result = self.rate_limiter_filter.filter(self.record) 37 38 self.assertIs(m_cache.set.called, False) 39 m_cache.incr.assert_called_once_with('test_prefix:114392702498ad1d75c1829b9519b8c7:count') 40 self.assertIs(result, False) 41 42 @override_settings(RATE_LIMITER_FILTER_RATE=1) 43 @mock.patch('django.core.cache.cache') 44 def test_should_set_cache_key_with_custom_rate_and_return_True(self, m_cache): 45 m_cache.get_many.return_value = {} 46 expected_calls = [ 47 mock.call('ratelimiterfilter:114392702498ad1d75c1829b9519b8c7:count', 1, 61), 48 mock.call('ratelimiterfilter:114392702498ad1d75c1829b9519b8c7', 'test_time', 1), 49 ] 50 51 result = self.rate_limiter_filter.filter(self.record) 52 53 self.assertEqual(self.record.msg, '[1x] test_subect') 54 m_cache.set.assert_has_calls(expected_calls, any_order=False) 55 self.assertIs(result, True) 56 57 @mock.patch('django.core.cache.cache') 58 def test_should_modify_record_msg_and_return_True(self, m_cache): 59 """Default rate and prefix values.""" 60 m_cache.get_many.return_value = { 61 'ratelimiterfilter:114392702498ad1d75c1829b9519b8c7:count': 999, 62 } 63 64 result = self.rate_limiter_filter.filter(self.record) 65 66 self.assertEqual(self.record.msg, '[999x] test_subect') 67 m_cache.set.assert_called_once_with('ratelimiterfilter:114392702498ad1d75c1829b9519b8c7', 'test_time', 10) 68 self.assertIs(result, True) 69