1# -*- coding: utf-8 -*- 2from datetime import datetime 3from datetime import timedelta 4from haproxy import filters 5from haproxy.line import Line 6from haproxy.tests.test_log_line import LogLineBaseTest 7 8 9class FiltersTest(LogLineBaseTest): 10 11 def test_filter_ip(self): 12 """Check that filter_ip filter works as expected.""" 13 ip_one = '1.2.3.4' 14 ip_two = '2.3.4.5' 15 self.headers = ' {{{0}}} '.format(ip_one) 16 raw_line = self._build_test_string() 17 log_line = Line(raw_line) 18 19 filter_func_ip_one = filters.filter_ip(ip_one) 20 filter_func_ip_two = filters.filter_ip(ip_two) 21 22 self.assertTrue(filter_func_ip_one(log_line)) 23 self.assertFalse(filter_func_ip_two(log_line)) 24 25 def test_filter_ip_range(self): 26 """Check that filter_ip_range filter works as expected.""" 27 filter_func = filters.filter_ip_range('1.2.3') 28 29 ips = ('1.2.3.4', '2.3.4.5', '1.2.3.6', ) 30 results = [] 31 for ip in ips: 32 self.headers = ' {{{0}}} '.format(ip) 33 raw_line = self._build_test_string() 34 log_line = Line(raw_line) 35 36 results.append(filter_func(log_line)) 37 38 self.assertEqual(results, [True, False, True, ]) 39 40 def test_filter_path(self): 41 """Check that filter_path filter works as expected.""" 42 filter_func = filters.filter_path('/image') 43 method = 'GET' 44 protocol = 'HTTP/1.1' 45 46 paths = ('/path/to/image', '/something/else', '/another/image/here', ) 47 results = [] 48 for path in paths: 49 self.http_request = '{0} {1} {2}'.format(method, path, protocol) 50 raw_line = self._build_test_string() 51 log_line = Line(raw_line) 52 53 results.append(filter_func(log_line)) 54 55 self.assertEqual(results, [True, False, True, ]) 56 57 def test_filter_ssl(self): 58 """Check that filter_path filter works as expected.""" 59 filter_func = filters.filter_ssl() 60 method = 'GET' 61 protocol = 'HTTP/1.1' 62 63 paths = ('/ssl_path:443/image', 64 '/something/else', 65 '/another:443/ssl', ) 66 results = [] 67 for path in paths: 68 self.http_request = '{0} {1} {2}'.format(method, path, protocol) 69 raw_line = self._build_test_string() 70 log_line = Line(raw_line) 71 72 results.append(filter_func(log_line)) 73 74 self.assertEqual(results, [True, False, True, ]) 75 76 def test_filter_slow_requests(self): 77 """Check that filter_slow_requests filter works as expected.""" 78 filter_func = filters.filter_slow_requests('10000') 79 80 results = [] 81 for response_time in (45, 13000, 4566): 82 self.tr = response_time 83 raw_line = self._build_test_string() 84 log_line = Line(raw_line) 85 86 results.append(filter_func(log_line)) 87 88 self.assertEqual(results, [False, True, False, ]) 89 90 def test_filter_wait_on_queues(self): 91 """Check that filter_wait_on_queues filter works as expected""" 92 filter_func = filters.filter_wait_on_queues('50') 93 94 results = [] 95 for waiting_time in (45, 13000, 4566): 96 self.tw = waiting_time 97 raw_line = self._build_test_string() 98 log_line = Line(raw_line) 99 100 results.append(filter_func(log_line)) 101 102 self.assertEqual(results, [True, False, False, ]) 103 104 def test_str_to_timedelta(self): 105 """Check that deltas are converted to timedelta objects.""" 106 data = filters._delta_str_to_timedelta('45s') 107 self.assertEqual(timedelta(seconds=45), data) 108 109 data = filters._delta_str_to_timedelta('2m') 110 self.assertEqual(timedelta(minutes=2), data) 111 112 data = filters._delta_str_to_timedelta('13h') 113 self.assertEqual(timedelta(hours=13), data) 114 115 data = filters._delta_str_to_timedelta('1d') 116 self.assertEqual(timedelta(days=1), data) 117 118 def test_str_to_datetime(self): 119 """Check that start are converted to datetime objects.""" 120 data = filters._date_str_to_datetime('11/Dec/2013') 121 self.assertEqual(datetime(2013, 12, 11), data) 122 123 data = filters._date_str_to_datetime('11/Dec/2013:13') 124 self.assertEqual(datetime(2013, 12, 11, hour=13), data) 125 126 data = filters._date_str_to_datetime('11/Dec/2013:14:15') 127 self.assertEqual(datetime(2013, 12, 11, hour=14, minute=15), data) 128 129 data = filters._date_str_to_datetime('11/Dec/2013:14:15:16') 130 self.assertEqual(datetime(2013, 12, 11, hour=14, minute=15, second=16), 131 data) 132 133 def test_filter_time_frame_no_limit(self): 134 """Test that if empty strings are passed to filter_time_frame all log 135 lines are accepted. 136 """ 137 filter_func = filters.filter_time_frame('', '') 138 139 results = [] 140 for accept_date in ('09/Dec/2013:10:53:42.33', 141 '19/Jan/2014:12:39:16.63', 142 '29/Jun/2012:15:27:23.66'): 143 self.accept_date = accept_date 144 raw_line = self._build_test_string() 145 log_line = Line(raw_line) 146 147 results.append(filter_func(log_line)) 148 149 self.assertEqual(results, [True, True, True, ]) 150 151 def test_filter_time_frame_only_start(self): 152 """Test that if empty strings are passed to filter_time_frame all log 153 lines are accepted. 154 """ 155 filter_func = filters.filter_time_frame('3/Oct/2013', '') 156 157 results = [] 158 for accept_date in ('09/Dec/2013:10:53:42.33', 159 '19/Jan/2014:12:39:16.63', 160 '29/Jun/2012:15:27:23.66'): 161 self.accept_date = accept_date 162 raw_line = self._build_test_string() 163 log_line = Line(raw_line) 164 165 results.append(filter_func(log_line)) 166 167 self.assertEqual(results, [True, True, False, ]) 168 169 def test_filter_time_frame_start_and_delta(self): 170 """Test that if empty strings are passed to filter_time_frame all log 171 lines are accepted. 172 """ 173 filter_func = filters.filter_time_frame('29/Jun/2012:15', '30m') 174 175 results = [] 176 for accept_date in ('09/Dec/2013:10:53:42.33', 177 '19/Jan/2014:12:39:16.63', 178 '29/Jun/2012:15:27:23.66'): 179 self.accept_date = accept_date 180 raw_line = self._build_test_string() 181 log_line = Line(raw_line) 182 183 results.append(filter_func(log_line)) 184 185 self.assertEqual(results, [False, False, True, ]) 186 187 def test_filter_status_code(self): 188 """Test that the status_code filter works as expected.""" 189 filter_func = filters.filter_status_code('404') 190 191 self.status = '404' 192 raw_line = self._build_test_string() 193 log_line = Line(raw_line) 194 self.assertTrue(filter_func(log_line)) 195 196 self.status = '200' 197 raw_line = self._build_test_string() 198 log_line = Line(raw_line) 199 self.assertFalse(filter_func(log_line)) 200 201 def test_filter_status_code_family(self): 202 """Test that the status_code_family filter works as expected.""" 203 filter_func = filters.filter_status_code_family('4') 204 205 results = [] 206 for status in ('404', '503', '401', ): 207 self.status = status 208 raw_line = self._build_test_string() 209 log_line = Line(raw_line) 210 results.append(filter_func(log_line)) 211 212 self.assertEqual(results, [True, False, True, ]) 213 214 def test_filter_http_method(self): 215 """Test that the http_method filter works as expected.""" 216 filter_func = filters.filter_http_method('GET') 217 218 results = [] 219 for http_method in ('GET', 'POST', 'PUT', 'GET', ): 220 self.http_request = '{0} /something HTTP/1.1'.format(http_method) 221 raw_line = self._build_test_string() 222 log_line = Line(raw_line) 223 results.append(filter_func(log_line)) 224 225 self.assertEqual(results, [True, False, False, True, ]) 226 227 def test_filter_backend(self): 228 """Test that the backend filter works as expected.""" 229 filter_func = filters.filter_backend('default') 230 231 results = [] 232 for backend_name in ('default', 'anonymous', 'registered', ): 233 self.backend_name = backend_name 234 raw_line = self._build_test_string() 235 log_line = Line(raw_line) 236 results.append(filter_func(log_line)) 237 238 self.assertEqual(results, [True, False, False, ]) 239 240 def test_filter_frontend(self): 241 """Test that the frontend filter works as expected.""" 242 filter_func = filters.filter_frontend('loadbalancer') 243 244 results = [] 245 for frontend_name in ('loadbalancer', 'other', 'loadbalancer', ): 246 self.frontend_name = frontend_name 247 raw_line = self._build_test_string() 248 log_line = Line(raw_line) 249 results.append(filter_func(log_line)) 250 251 self.assertEqual(results, [True, False, True, ]) 252 253 def test_filter_server(self): 254 """Test that the server filter works as expected.""" 255 filter_func = filters.filter_server('instance8') 256 257 results = [] 258 for server_name in ('instance7', 'instance', 'instance8', ): 259 self.server_name = server_name 260 raw_line = self._build_test_string() 261 log_line = Line(raw_line) 262 results.append(filter_func(log_line)) 263 264 self.assertEqual(results, [False, False, True, ]) 265 266 def test_filter_response_size(self): 267 """Test that the size filter works as expected.""" 268 # does not matter if the filter parameter has a leading sign plus 269 # or not, it should return the same results. 270 filter_func_1 = filters.filter_response_size('400') 271 filter_func_2 = filters.filter_response_size('+400') 272 273 results_1 = [] 274 results_2 = [] 275 for size in ('300', '+399', '+598', '401', ): 276 self.bytes = size 277 raw_line = self._build_test_string() 278 log_line = Line(raw_line) 279 results_1.append(filter_func_1(log_line)) 280 results_2.append(filter_func_2(log_line)) 281 282 self.assertEqual(results_1, [False, False, True, True, ]) 283 self.assertEqual(results_1, results_2) 284