1# -*- coding: utf-8 -*-
2from datetime import datetime
3from datetime import timedelta
4from haproxy import filters
5from haproxy.line import Line
6from haproxy.tests.test_log_line import LogLineBaseTest
7
8
9class FiltersTest(LogLineBaseTest):
10
11    def test_filter_ip(self):
12        """Check that filter_ip filter works as expected."""
13        ip_one = '1.2.3.4'
14        ip_two = '2.3.4.5'
15        self.headers = ' {{{0}}} '.format(ip_one)
16        raw_line = self._build_test_string()
17        log_line = Line(raw_line)
18
19        filter_func_ip_one = filters.filter_ip(ip_one)
20        filter_func_ip_two = filters.filter_ip(ip_two)
21
22        self.assertTrue(filter_func_ip_one(log_line))
23        self.assertFalse(filter_func_ip_two(log_line))
24
25    def test_filter_ip_range(self):
26        """Check that filter_ip_range filter works as expected."""
27        filter_func = filters.filter_ip_range('1.2.3')
28
29        ips = ('1.2.3.4', '2.3.4.5', '1.2.3.6', )
30        results = []
31        for ip in ips:
32            self.headers = ' {{{0}}} '.format(ip)
33            raw_line = self._build_test_string()
34            log_line = Line(raw_line)
35
36            results.append(filter_func(log_line))
37
38        self.assertEqual(results, [True, False, True, ])
39
40    def test_filter_path(self):
41        """Check that filter_path filter works as expected."""
42        filter_func = filters.filter_path('/image')
43        method = 'GET'
44        protocol = 'HTTP/1.1'
45
46        paths = ('/path/to/image', '/something/else', '/another/image/here', )
47        results = []
48        for path in paths:
49            self.http_request = '{0} {1} {2}'.format(method, path, protocol)
50            raw_line = self._build_test_string()
51            log_line = Line(raw_line)
52
53            results.append(filter_func(log_line))
54
55        self.assertEqual(results, [True, False, True, ])
56
57    def test_filter_ssl(self):
58        """Check that filter_path filter works as expected."""
59        filter_func = filters.filter_ssl()
60        method = 'GET'
61        protocol = 'HTTP/1.1'
62
63        paths = ('/ssl_path:443/image',
64                 '/something/else',
65                 '/another:443/ssl', )
66        results = []
67        for path in paths:
68            self.http_request = '{0} {1} {2}'.format(method, path, protocol)
69            raw_line = self._build_test_string()
70            log_line = Line(raw_line)
71
72            results.append(filter_func(log_line))
73
74        self.assertEqual(results, [True, False, True, ])
75
76    def test_filter_slow_requests(self):
77        """Check that filter_slow_requests filter works as expected."""
78        filter_func = filters.filter_slow_requests('10000')
79
80        results = []
81        for response_time in (45, 13000, 4566):
82            self.tr = response_time
83            raw_line = self._build_test_string()
84            log_line = Line(raw_line)
85
86            results.append(filter_func(log_line))
87
88        self.assertEqual(results, [False, True, False, ])
89
90    def test_filter_wait_on_queues(self):
91        """Check that filter_wait_on_queues filter works as expected"""
92        filter_func = filters.filter_wait_on_queues('50')
93
94        results = []
95        for waiting_time in (45, 13000, 4566):
96            self.tw = waiting_time
97            raw_line = self._build_test_string()
98            log_line = Line(raw_line)
99
100            results.append(filter_func(log_line))
101
102        self.assertEqual(results, [True, False, False, ])
103
104    def test_str_to_timedelta(self):
105        """Check that deltas are converted to timedelta objects."""
106        data = filters._delta_str_to_timedelta('45s')
107        self.assertEqual(timedelta(seconds=45), data)
108
109        data = filters._delta_str_to_timedelta('2m')
110        self.assertEqual(timedelta(minutes=2), data)
111
112        data = filters._delta_str_to_timedelta('13h')
113        self.assertEqual(timedelta(hours=13), data)
114
115        data = filters._delta_str_to_timedelta('1d')
116        self.assertEqual(timedelta(days=1), data)
117
118    def test_str_to_datetime(self):
119        """Check that start are converted to datetime objects."""
120        data = filters._date_str_to_datetime('11/Dec/2013')
121        self.assertEqual(datetime(2013, 12, 11), data)
122
123        data = filters._date_str_to_datetime('11/Dec/2013:13')
124        self.assertEqual(datetime(2013, 12, 11, hour=13), data)
125
126        data = filters._date_str_to_datetime('11/Dec/2013:14:15')
127        self.assertEqual(datetime(2013, 12, 11, hour=14, minute=15), data)
128
129        data = filters._date_str_to_datetime('11/Dec/2013:14:15:16')
130        self.assertEqual(datetime(2013, 12, 11, hour=14, minute=15, second=16),
131                         data)
132
133    def test_filter_time_frame_no_limit(self):
134        """Test that if empty strings are passed to filter_time_frame all log
135        lines are accepted.
136        """
137        filter_func = filters.filter_time_frame('', '')
138
139        results = []
140        for accept_date in ('09/Dec/2013:10:53:42.33',
141                            '19/Jan/2014:12:39:16.63',
142                            '29/Jun/2012:15:27:23.66'):
143            self.accept_date = accept_date
144            raw_line = self._build_test_string()
145            log_line = Line(raw_line)
146
147            results.append(filter_func(log_line))
148
149        self.assertEqual(results, [True, True, True, ])
150
151    def test_filter_time_frame_only_start(self):
152        """Test that if empty strings are passed to filter_time_frame all log
153        lines are accepted.
154        """
155        filter_func = filters.filter_time_frame('3/Oct/2013', '')
156
157        results = []
158        for accept_date in ('09/Dec/2013:10:53:42.33',
159                            '19/Jan/2014:12:39:16.63',
160                            '29/Jun/2012:15:27:23.66'):
161            self.accept_date = accept_date
162            raw_line = self._build_test_string()
163            log_line = Line(raw_line)
164
165            results.append(filter_func(log_line))
166
167        self.assertEqual(results, [True, True, False, ])
168
169    def test_filter_time_frame_start_and_delta(self):
170        """Test that if empty strings are passed to filter_time_frame all log
171        lines are accepted.
172        """
173        filter_func = filters.filter_time_frame('29/Jun/2012:15', '30m')
174
175        results = []
176        for accept_date in ('09/Dec/2013:10:53:42.33',
177                            '19/Jan/2014:12:39:16.63',
178                            '29/Jun/2012:15:27:23.66'):
179            self.accept_date = accept_date
180            raw_line = self._build_test_string()
181            log_line = Line(raw_line)
182
183            results.append(filter_func(log_line))
184
185        self.assertEqual(results, [False, False, True, ])
186
187    def test_filter_status_code(self):
188        """Test that the status_code filter works as expected."""
189        filter_func = filters.filter_status_code('404')
190
191        self.status = '404'
192        raw_line = self._build_test_string()
193        log_line = Line(raw_line)
194        self.assertTrue(filter_func(log_line))
195
196        self.status = '200'
197        raw_line = self._build_test_string()
198        log_line = Line(raw_line)
199        self.assertFalse(filter_func(log_line))
200
201    def test_filter_status_code_family(self):
202        """Test that the status_code_family filter works as expected."""
203        filter_func = filters.filter_status_code_family('4')
204
205        results = []
206        for status in ('404', '503', '401', ):
207            self.status = status
208            raw_line = self._build_test_string()
209            log_line = Line(raw_line)
210            results.append(filter_func(log_line))
211
212        self.assertEqual(results, [True, False, True, ])
213
214    def test_filter_http_method(self):
215        """Test that the http_method filter works as expected."""
216        filter_func = filters.filter_http_method('GET')
217
218        results = []
219        for http_method in ('GET', 'POST', 'PUT', 'GET', ):
220            self.http_request = '{0} /something HTTP/1.1'.format(http_method)
221            raw_line = self._build_test_string()
222            log_line = Line(raw_line)
223            results.append(filter_func(log_line))
224
225        self.assertEqual(results, [True, False, False, True, ])
226
227    def test_filter_backend(self):
228        """Test that the backend filter works as expected."""
229        filter_func = filters.filter_backend('default')
230
231        results = []
232        for backend_name in ('default', 'anonymous', 'registered', ):
233            self.backend_name = backend_name
234            raw_line = self._build_test_string()
235            log_line = Line(raw_line)
236            results.append(filter_func(log_line))
237
238        self.assertEqual(results, [True, False, False, ])
239
240    def test_filter_frontend(self):
241        """Test that the frontend filter works as expected."""
242        filter_func = filters.filter_frontend('loadbalancer')
243
244        results = []
245        for frontend_name in ('loadbalancer', 'other', 'loadbalancer', ):
246            self.frontend_name = frontend_name
247            raw_line = self._build_test_string()
248            log_line = Line(raw_line)
249            results.append(filter_func(log_line))
250
251        self.assertEqual(results, [True, False, True, ])
252
253    def test_filter_server(self):
254        """Test that the server filter works as expected."""
255        filter_func = filters.filter_server('instance8')
256
257        results = []
258        for server_name in ('instance7', 'instance', 'instance8', ):
259            self.server_name = server_name
260            raw_line = self._build_test_string()
261            log_line = Line(raw_line)
262            results.append(filter_func(log_line))
263
264        self.assertEqual(results, [False, False, True, ])
265
266    def test_filter_response_size(self):
267        """Test that the size filter works as expected."""
268        # does not matter if the filter parameter has a leading sign plus
269        # or not, it should return the same results.
270        filter_func_1 = filters.filter_response_size('400')
271        filter_func_2 = filters.filter_response_size('+400')
272
273        results_1 = []
274        results_2 = []
275        for size in ('300', '+399', '+598', '401', ):
276            self.bytes = size
277            raw_line = self._build_test_string()
278            log_line = Line(raw_line)
279            results_1.append(filter_func_1(log_line))
280            results_2.append(filter_func_2(log_line))
281
282        self.assertEqual(results_1, [False, False, True, True, ])
283        self.assertEqual(results_1, results_2)
284