1# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- 2# vi: set ft=python sts=4 ts=4 sw=4 noet : 3 4# This file is part of Fail2Ban. 5# 6# Fail2Ban is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# Fail2Ban is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Fail2Ban; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 19 20# Fail2Ban developers 21 22__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko" 23__license__ = "GPL" 24 25from builtins import open as fopen 26import unittest 27import os 28import re 29import sys 30import time, datetime 31import tempfile 32import uuid 33 34try: 35 from systemd import journal 36except ImportError: 37 journal = None 38 39from ..server.jail import Jail 40from ..server.filterpoll import FilterPoll 41from ..server.filter import FailTicket, Filter, FileFilter, FileContainer 42from ..server.failmanager import FailManagerEmpty 43from ..server.ipdns import asip, getfqdn, DNSUtils, IPAddr 44from ..server.mytime import MyTime 45from ..server.utils import Utils, uni_decode 46from .databasetestcase import getFail2BanDb 47from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_alt_time, with_tmpdir, LogCaptureTestCase, \ 48 logSys as DefLogSys, CONFIG_DIR as STOCK_CONF_DIR 49from .dummyjail import DummyJail 50 51TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") 52 53 54# yoh: per Steven Hiscocks's insight while troubleshooting 55# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836 56# adding a sufficiently large buffer might help to guarantee that 57# writes happen atomically. 58def open(*args): 59 """Overload built in open so we could assure sufficiently large buffer 60 61 Explicit .flush would be needed to assure that changes leave the buffer 62 """ 63 if len(args) == 2: 64 # ~50kB buffer should be sufficient for all tests here. 65 args = args + (50000,) 66 return fopen(*args) 67 68 69def _killfile(f, name): 70 try: 71 f.close() 72 except: 73 pass 74 try: 75 os.unlink(name) 76 except: 77 pass 78 79 # there might as well be the .bak file 80 if os.path.exists(name + '.bak'): 81 _killfile(None, name + '.bak') 82 83 84_maxWaitTime = unittest.F2B.maxWaitTime 85 86 87class _tmSerial(): 88 _last_s = -0x7fffffff 89 _last_m = -0x7fffffff 90 _str_s = "" 91 _str_m = "" 92 @staticmethod 93 def _tm(time): 94 # ## strftime it too slow for large time serializer : 95 # return MyTime.time2str(time) 96 c = _tmSerial 97 sec = (time % 60) 98 if c._last_s == time - sec: 99 return "%s%02u" % (c._str_s, sec) 100 mt = (time % 3600) 101 if c._last_m == time - mt: 102 c._last_s = time - sec 103 c._str_s = "%s%02u:" % (c._str_m, mt // 60) 104 return "%s%02u" % (c._str_s, sec) 105 c._last_m = time - mt 106 c._str_m = datetime.datetime.fromtimestamp(time).strftime("%Y-%m-%d %H:") 107 c._last_s = time - sec 108 c._str_s = "%s%02u:" % (c._str_m, mt // 60) 109 return "%s%02u" % (c._str_s, sec) 110 111_tm = _tmSerial._tm 112 113 114def _assert_equal_entries(utest, found, output, count=None): 115 """Little helper to unify comparisons with the target entries 116 117 and report helpful failure reports instead of millions of seconds ;) 118 """ 119 utest.assertEqual(found[0], output[0]) # IP 120 utest.assertEqual(found[1], count or output[1]) # count 121 found_time, output_time = \ 122 MyTime.localtime(found[2]),\ 123 MyTime.localtime(output[2]) 124 try: 125 utest.assertEqual(found_time, output_time) 126 except AssertionError as e: 127 # assert more structured: 128 utest.assertEqual((float(found[2]), found_time), (float(output[2]), output_time)) 129 if len(output) > 3 and count is None: # match matches 130 # do not check if custom count (e.g. going through them twice) 131 if os.linesep != '\n' or sys.platform.startswith('cygwin'): 132 # on those where text file lines end with '\r\n', remove '\r' 133 srepr = lambda x: repr(x).replace(r'\r', '') 134 else: 135 srepr = repr 136 utest.assertEqual(srepr(found[3]), srepr(output[3])) 137 138 139def _ticket_tuple(ticket): 140 """Create a tuple for easy comparison from fail ticket 141 """ 142 attempts = ticket.getAttempt() 143 date = ticket.getTime() 144 ip = ticket.getIP() 145 matches = ticket.getMatches() 146 return (ip, attempts, date, matches) 147 148 149def _assert_correct_last_attempt(utest, filter_, output, count=None): 150 """Additional helper to wrap most common test case 151 152 Test filter to contain target ticket 153 """ 154 # one or multiple tickets: 155 if not isinstance(output[0], (tuple,list)): 156 tickcount = 1 157 failcount = (count if count else output[1]) 158 else: 159 tickcount = len(output) 160 failcount = (count if count else sum((o[1] for o in output))) 161 162 found = [] 163 if isinstance(filter_, DummyJail): 164 # get fail ticket from jail 165 found.append(_ticket_tuple(filter_.getFailTicket())) 166 else: 167 # when we are testing without jails 168 # wait for failures (up to max time) 169 Utils.wait_for( 170 lambda: filter_.failManager.getFailCount() >= (tickcount, failcount), 171 _maxWaitTime(10)) 172 # get fail ticket(s) from filter 173 while tickcount: 174 try: 175 found.append(_ticket_tuple(filter_.failManager.toBan())) 176 except FailManagerEmpty: 177 break 178 tickcount -= 1 179 180 if not isinstance(output[0], (tuple,list)): 181 utest.assertEqual(len(found), 1) 182 _assert_equal_entries(utest, found[0], output, count) 183 else: 184 # sort by string representation of ip (multiple failures with different ips): 185 found = sorted(found, key=lambda x: str(x)) 186 output = sorted(output, key=lambda x: str(x)) 187 for f, o in zip(found, output): 188 _assert_equal_entries(utest, f, o) 189 190 191def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""): 192 """Copy lines from one file to another (which might be already open) 193 194 Returns open fout 195 """ 196 # on old Python st_mtime is int, so we should give at least 1 sec so 197 # polling filter could detect the change 198 mtimesleep() 199 if isinstance(in_, str): # pragma: no branch - only used with str in test cases 200 fin = open(in_, 'rb') 201 else: 202 fin = in_ 203 # Skip 204 for i in range(skip): 205 fin.readline() 206 # Read 207 i = 0 208 lines = [] 209 while n is None or i < n: 210 l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n') 211 if terminal_line is not None and l == terminal_line: 212 break 213 lines.append(l) 214 i += 1 215 # Write: all at once and flush 216 if isinstance(fout, str): 217 fout = open(fout, mode) 218 fout.write('\n'.join(lines)+'\n') 219 fout.flush() 220 if isinstance(in_, str): # pragma: no branch - only used with str in test cases 221 # Opened earlier, therefore must close it 222 fin.close() 223 # to give other threads possibly some time to crunch 224 time.sleep(Utils.DEFAULT_SHORT_INTERVAL) 225 return fout 226 227 228TEST_JOURNAL_FIELDS = { 229 "SYSLOG_IDENTIFIER": "fail2ban-testcases", 230 "PRIORITY": "7", 231} 232def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # pragma: systemd no cover 233 """Copy lines from one file to systemd journal 234 235 Returns None 236 """ 237 if isinstance(in_, str): # pragma: no branch - only used with str in test cases 238 fin = open(in_, 'rb') 239 else: 240 fin = in_ 241 # Required for filtering 242 fields.update(TEST_JOURNAL_FIELDS) 243 # Skip 244 for i in range(skip): 245 fin.readline() 246 # Read/Write 247 i = 0 248 while n is None or i < n: 249 l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n') 250 if terminal_line is not None and l == terminal_line: 251 break 252 journal.send(MESSAGE=l.strip(), **fields) 253 i += 1 254 if isinstance(in_, str): # pragma: no branch - only used with str in test cases 255 # Opened earlier, therefore must close it 256 fin.close() 257 258 259# 260# Actual tests 261# 262 263class BasicFilter(unittest.TestCase): 264 265 def setUp(self): 266 super(BasicFilter, self).setUp() 267 self.filter = Filter(None) 268 269 def testGetSetUseDNS(self): 270 # default is warn 271 self.assertEqual(self.filter.getUseDns(), 'warn') 272 self.filter.setUseDns(True) 273 self.assertEqual(self.filter.getUseDns(), 'yes') 274 self.filter.setUseDns(False) 275 self.assertEqual(self.filter.getUseDns(), 'no') 276 277 def testGetSetDatePattern(self): 278 self.assertEqual(self.filter.getDatePattern(), 279 (None, "Default Detectors")) 280 self.filter.setDatePattern(r"^%Y-%m-%d-%H%M%S\.%f %z **") 281 self.assertEqual(self.filter.getDatePattern(), 282 (r"^%Y-%m-%d-%H%M%S\.%f %z **", 283 r"^Year-Month-Day-24hourMinuteSecond\.Microseconds Zone offset **")) 284 285 def testGetSetLogTimeZone(self): 286 self.assertEqual(self.filter.getLogTimeZone(), None) 287 self.filter.setLogTimeZone('UTC') 288 self.assertEqual(self.filter.getLogTimeZone(), 'UTC') 289 self.filter.setLogTimeZone('UTC-0400') 290 self.assertEqual(self.filter.getLogTimeZone(), 'UTC-0400') 291 self.filter.setLogTimeZone('UTC+0200') 292 self.assertEqual(self.filter.getLogTimeZone(), 'UTC+0200') 293 self.assertRaises(ValueError, self.filter.setLogTimeZone, 'not-a-time-zone') 294 295 def testAssertWrongTime(self): 296 self.assertRaises(AssertionError, 297 lambda: _assert_equal_entries(self, 298 ('1.1.1.1', 1, 1421262060.0), 299 ('1.1.1.1', 1, 1421262059.0), 300 1) 301 ) 302 303 def testTest_tm(self): 304 unittest.F2B.SkipIfFast() 305 ## test function "_tm" works correct (returns the same as slow strftime): 306 for i in range(1417512352, (1417512352 // 3600 + 3) * 3600): 307 tm = MyTime.time2str(i) 308 if _tm(i) != tm: # pragma: no cover - never reachable 309 self.assertEqual((_tm(i), i), (tm, i)) 310 311 def testWrongCharInTupleLine(self): 312 ## line tuple has different types (ascii after ascii / unicode): 313 for a1 in ('', '', b''): 314 for a2 in ('2016-09-05T20:18:56', '2016-09-05T20:18:56', b'2016-09-05T20:18:56'): 315 for a3 in ( 316 'Fail for "g\xc3\xb6ran" from 192.0.2.1', 317 'Fail for "g\xc3\xb6ran" from 192.0.2.1', 318 b'Fail for "g\xc3\xb6ran" from 192.0.2.1' 319 ): 320 # join should work if all arguments have the same type: 321 "".join([uni_decode(v) for v in (a1, a2, a3)]) 322 323 324class IgnoreIP(LogCaptureTestCase): 325 326 def setUp(self): 327 """Call before every test case.""" 328 LogCaptureTestCase.setUp(self) 329 self.jail = DummyJail() 330 self.filter = FileFilter(self.jail) 331 self.filter.ignoreSelf = False 332 333 def testIgnoreSelfIP(self): 334 ipList = ("127.0.0.1",) 335 # test ignoreSelf is false: 336 for ip in ipList: 337 self.assertFalse(self.filter.inIgnoreIPList(ip)) 338 self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ignoreself rule")) 339 # test ignoreSelf with true: 340 self.filter.ignoreSelf = True 341 self.pruneLog() 342 for ip in ipList: 343 self.assertTrue(self.filter.inIgnoreIPList(ip)) 344 self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ignoreself rule")) 345 346 def testIgnoreIPOK(self): 347 ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99" 348 for ip in ipList: 349 self.filter.addIgnoreIP(ip) 350 self.assertTrue(self.filter.inIgnoreIPList(ip)) 351 self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ip")) 352 353 def testIgnoreIPNOK(self): 354 ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0." 355 for ip in ipList: 356 self.filter.addIgnoreIP(ip) 357 self.assertFalse(self.filter.inIgnoreIPList(ip)) 358 if not unittest.F2B.no_network: # pragma: no cover 359 self.assertLogged( 360 'Unable to find a corresponding IP address for 999.999.999.999', 361 'Unable to find a corresponding IP address for abcdef.abcdef', 362 'Unable to find a corresponding IP address for 192.168.0.', all=True) 363 364 def testIgnoreIPCIDR(self): 365 self.filter.addIgnoreIP('192.168.1.0/25') 366 self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0')) 367 self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1')) 368 self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127')) 369 self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128')) 370 self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255')) 371 self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255')) 372 373 def testIgnoreIPMask(self): 374 self.filter.addIgnoreIP('192.168.1.0/255.255.255.128') 375 self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0')) 376 self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1')) 377 self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127')) 378 self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128')) 379 self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255')) 380 self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255')) 381 382 def testWrongIPMask(self): 383 self.filter.addIgnoreIP('192.168.1.0/255.255.0.0') 384 self.assertRaises(ValueError, self.filter.addIgnoreIP, '192.168.1.0/255.255.0.128') 385 386 def testIgnoreInProcessLine(self): 387 setUpMyTime() 388 try: 389 self.filter.addIgnoreIP('192.168.1.0/25') 390 self.filter.addFailRegex('<HOST>') 391 self.filter.setDatePattern(r'{^LN-BEG}EPOCH') 392 self.filter.processLineAndAdd('1387203300.222 192.168.1.32') 393 self.assertLogged('Ignore 192.168.1.32') 394 finally: 395 tearDownMyTime() 396 397 def _testTimeJump(self, inOperation=False): 398 try: 399 self.filter.addFailRegex('^<HOST>') 400 self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s') 401 self.filter.setFindTime(10); # max 10 seconds back 402 self.filter.setMaxRetry(5); # don't ban here 403 self.filter.inOperation = inOperation 404 # 405 self.pruneLog('[phase 1] DST time jump') 406 # check local time jump (DST hole): 407 MyTime.setTime(1572137999) 408 self.filter.processLineAndAdd('2019-10-27 02:59:59 192.0.2.5'); # +1 = 1 409 MyTime.setTime(1572138000) 410 self.filter.processLineAndAdd('2019-10-27 02:00:00 192.0.2.5'); # +1 = 2 411 MyTime.setTime(1572138001) 412 self.filter.processLineAndAdd('2019-10-27 02:00:01 192.0.2.5'); # +1 = 3 413 self.assertLogged( 414 'Current failures from 1 IPs (IP:count): 192.0.2.5:1', 415 'Current failures from 1 IPs (IP:count): 192.0.2.5:2', 416 'Current failures from 1 IPs (IP:count): 192.0.2.5:3', 417 "Total # of detected failures: 3.", all=True, wait=True) 418 self.assertNotLogged('Ignore line') 419 # 420 self.pruneLog('[phase 2] UTC time jump (NTP correction)') 421 # check time drifting backwards (NTP correction): 422 MyTime.setTime(1572210000) 423 self.filter.processLineAndAdd('2019-10-27 22:00:00 CET 192.0.2.6'); # +1 = 1 424 MyTime.setTime(1572200000) 425 self.filter.processLineAndAdd('2019-10-27 22:00:01 CET 192.0.2.6'); # +1 = 2 (logged before correction) 426 self.filter.processLineAndAdd('2019-10-27 19:13:20 CET 192.0.2.6'); # +1 = 3 (logged after correction) 427 self.filter.processLineAndAdd('2019-10-27 19:13:21 CET 192.0.2.6'); # +1 = 4 428 self.assertLogged( 429 '192.0.2.6:1', '192.0.2.6:2', '192.0.2.6:3', '192.0.2.6:4', 430 "Total # of detected failures: 7.", all=True, wait=True) 431 self.assertNotLogged('Ignore line') 432 finally: 433 tearDownMyTime() 434 def testTimeJump(self): 435 self._testTimeJump(inOperation=False) 436 def testTimeJump_InOperation(self): 437 self._testTimeJump(inOperation=True) 438 439 def testWrongTimeZone(self): 440 try: 441 self.filter.addFailRegex('fail from <ADDR>$') 442 self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s') 443 self.filter.setMaxRetry(5); # don't ban here 444 self.filter.inOperation = True; # real processing (all messages are new) 445 # current time is 1h later than log-entries: 446 MyTime.setTime(1572138000+3600) 447 # 448 self.pruneLog("[phase 1] simulate wrong TZ") 449 for i in (1,2,3): 450 self.filter.processLineAndAdd('2019-10-27 02:00:00 fail from 192.0.2.15'); # +3 = 3 451 self.assertLogged( 452 "Simulate NOW in operation since found time has too large deviation", 453 "Please check jail has possibly a timezone issue.", 454 "192.0.2.15:1", "192.0.2.15:2", "192.0.2.15:3", 455 "Total # of detected failures: 3.", wait=True) 456 # 457 self.pruneLog("[phase 2] wrong TZ given in log") 458 for i in (1,2,3): 459 self.filter.processLineAndAdd('2019-10-27 04:00:00 GMT fail from 192.0.2.16'); # +3 = 6 460 self.assertLogged( 461 "192.0.2.16:1", "192.0.2.16:2", "192.0.2.16:3", 462 "Total # of detected failures: 6.", all=True, wait=True) 463 self.assertNotLogged("Found a match but no valid date/time found") 464 # 465 self.pruneLog("[phase 3] other timestamp (don't match datepattern), regex matches") 466 for i in range(3): 467 self.filter.processLineAndAdd('27.10.2019 04:00:00 fail from 192.0.2.17'); # +3 = 9 468 self.assertLogged( 469 "Found a match but no valid date/time found", 470 "Match without a timestamp:", 471 "192.0.2.17:1", "192.0.2.17:2", "192.0.2.17:3", 472 "Total # of detected failures: 9.", all=True, wait=True) 473 finally: 474 tearDownMyTime() 475 476 def testAddAttempt(self): 477 self.filter.setMaxRetry(3) 478 for i in range(1, 1+3): 479 self.filter.addAttempt('192.0.2.1') 480 self.assertLogged('Attempt 192.0.2.1', '192.0.2.1:%d' % i, all=True, wait=True) 481 self.jail.actions._Actions__checkBan() 482 self.assertLogged('Ban 192.0.2.1', wait=True) 483 484 def testIgnoreCommand(self): 485 self.filter.ignoreCommand = sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>") 486 self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1")) 487 self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0")) 488 self.assertLogged("returned successfully 0", "returned successfully 1", all=True) 489 self.pruneLog() 490 self.assertFalse(self.filter.inIgnoreIPList("")) 491 self.assertLogged("usage: ignorecommand IP", "returned 10", all=True) 492 493 def testIgnoreCommandForTicket(self): 494 # by host of IP (2001:db8::1 and 2001:db8::ffff map to "test-host" and "test-other" in the test-suite): 495 self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1' 496 self.pruneLog() 497 self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1"))) 498 self.assertLogged("returned successfully 0") 499 self.pruneLog() 500 self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff"))) 501 self.assertLogged("returned successfully 1") 502 # by user-name (ignore tester): 503 self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1' 504 self.pruneLog() 505 self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'}))) 506 self.assertLogged("returned successfully 0") 507 self.pruneLog() 508 self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'}))) 509 self.assertLogged("returned successfully 1", all=True) 510 511 def testIgnoreCache(self): 512 # like both test-cases above, just cached (so once per key)... 513 self.filter.ignoreCache = {"key":"<ip>"} 514 self.filter.ignoreCommand = 'if [ "<ip>" = "10.0.0.1" ]; then exit 0; fi; exit 1' 515 for i in range(5): 516 self.pruneLog() 517 self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1")) 518 self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0")) 519 if not i: 520 self.assertLogged("returned successfully 0", "returned successfully 1", all=True) 521 else: 522 self.assertNotLogged("returned successfully 0", "returned successfully 1", all=True) 523 # by host of IP: 524 self.filter.ignoreCache = {"key":"<ip-host>"} 525 self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1' 526 for i in range(5): 527 self.pruneLog() 528 self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1"))) 529 self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff"))) 530 if not i: 531 self.assertLogged("returned successfully") 532 else: 533 self.assertNotLogged("returned successfully") 534 # by user-name: 535 self.filter.ignoreCache = {"key":"<F-USER>", "max-count":"10", "max-time":"1h"} 536 self.assertEqual(self.filter.ignoreCache, ["<F-USER>", 10, 60*60]) 537 self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1' 538 for i in range(5): 539 self.pruneLog() 540 self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'}))) 541 self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'}))) 542 if not i: 543 self.assertLogged("returned successfully") 544 else: 545 self.assertNotLogged("returned successfully") 546 547 def testIgnoreCauseOK(self): 548 ip = "93.184.216.34" 549 for ignore_source in ["dns", "ip", "command"]: 550 self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source) 551 self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source)) 552 553 def testIgnoreCauseNOK(self): 554 self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED") 555 self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED")) 556 557 558class IgnoreIPDNS(LogCaptureTestCase): 559 560 def setUp(self): 561 """Call before every test case.""" 562 unittest.F2B.SkipIfNoNetwork() 563 LogCaptureTestCase.setUp(self) 564 self.jail = DummyJail() 565 self.filter = FileFilter(self.jail) 566 567 def testIgnoreIPDNS(self): 568 for dns in ("www.epfl.ch", "example.com"): 569 self.filter.addIgnoreIP(dns) 570 ips = DNSUtils.dnsToIp(dns) 571 self.assertTrue(len(ips) > 0) 572 # for each ip from dns check ip ignored: 573 for ip in ips: 574 ip = str(ip) 575 DefLogSys.debug(' ++ positive case for %s', ip) 576 self.assertTrue(self.filter.inIgnoreIPList(ip)) 577 # check another ips (with increment/decrement of first/last part) not ignored: 578 iparr = [] 579 ip2 = re.search(r'^([^.:]+)([.:])(.*?)([.:])([^.:]+)$', ip) 580 if ip2: 581 ip2 = ip2.groups() 582 for o in (0, 4): 583 for i in (1, -1): 584 ipo = list(ip2) 585 if ipo[1] == '.': 586 ipo[o] = str(int(ipo[o])+i) 587 else: 588 ipo[o] = '%x' % (int(ipo[o], 16)+i) 589 ipo = ''.join(ipo) 590 if ipo not in ips: 591 iparr.append(ipo) 592 self.assertTrue(len(iparr) > 0) 593 for ip in iparr: 594 DefLogSys.debug(' -- negative case for %s', ip) 595 self.assertFalse(self.filter.inIgnoreIPList(str(ip))) 596 597 def testIgnoreCmdApacheFakegooglebot(self): 598 unittest.F2B.SkipIfCfgMissing(stock=True) 599 cmd = os.path.join(STOCK_CONF_DIR, "filter.d/ignorecommands/apache-fakegooglebot") 600 ## below test direct as python module: 601 mod = Utils.load_python_module(cmd) 602 self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "128.178.222.69"]))) 603 self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "192.0.2.1"]))) 604 bot_ips = ['66.249.66.1'] 605 for ip in bot_ips: 606 self.assertTrue(mod.is_googlebot(mod.process_args([cmd, str(ip)])), "test of googlebot ip %s failed" % ip) 607 self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd]))) 608 self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd, "192.0"]))) 609 ## via command: 610 self.filter.ignoreCommand = cmd + " <ip>" 611 for ip in bot_ips: 612 self.assertTrue(self.filter.inIgnoreIPList(str(ip)), "test of googlebot ip %s failed" % ip) 613 self.assertLogged('-- returned successfully') 614 self.pruneLog() 615 self.assertFalse(self.filter.inIgnoreIPList("192.0")) 616 self.assertLogged('Argument must be a single valid IP.') 617 self.pruneLog() 618 self.filter.ignoreCommand = cmd + " bad arguments <ip>" 619 self.assertFalse(self.filter.inIgnoreIPList("192.0")) 620 self.assertLogged('Please provide a single IP as an argument.') 621 622 623 624class LogFile(LogCaptureTestCase): 625 626 MISSING = 'testcases/missingLogFile' 627 628 def setUp(self): 629 LogCaptureTestCase.setUp(self) 630 631 def tearDown(self): 632 LogCaptureTestCase.tearDown(self) 633 634 def testMissingLogFiles(self): 635 self.filter = FilterPoll(None) 636 self.assertRaises(IOError, self.filter.addLogPath, LogFile.MISSING) 637 638 639class LogFileFilterPoll(unittest.TestCase): 640 641 FILENAME = os.path.join(TEST_FILES_DIR, "testcase01.log") 642 643 def setUp(self): 644 """Call before every test case.""" 645 super(LogFileFilterPoll, self).setUp() 646 self.filter = FilterPoll(DummyJail()) 647 self.filter.addLogPath(LogFileFilterPoll.FILENAME) 648 649 def tearDown(self): 650 """Call after every test case.""" 651 super(LogFileFilterPoll, self).tearDown() 652 653 #def testOpen(self): 654 # self.filter.openLogFile(LogFile.FILENAME) 655 656 def testIsModified(self): 657 self.assertTrue(self.filter.isModified(LogFileFilterPoll.FILENAME)) 658 self.assertFalse(self.filter.isModified(LogFileFilterPoll.FILENAME)) 659 660 def testSeekToTimeSmallFile(self): 661 # speedup search using exact date pattern: 662 self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') 663 fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log') 664 time = 1417512352 665 f = open(fname, 'w') 666 fc = None 667 try: 668 fc = FileContainer(fname, self.filter.getLogEncoding()) 669 fc.open() 670 fc.setPos(0); self.filter.seekToTime(fc, time) 671 f.flush() 672 # empty : 673 fc.setPos(0); self.filter.seekToTime(fc, time) 674 self.assertEqual(fc.getPos(), 0) 675 # one entry with exact time: 676 f.write("%s [sshd] error: PAM: failure len 1\n" % _tm(time)) 677 f.flush() 678 fc.setPos(0); self.filter.seekToTime(fc, time) 679 680 # rewrite : 681 f.seek(0) 682 f.truncate() 683 fc.close() 684 fc = FileContainer(fname, self.filter.getLogEncoding()) 685 fc.open() 686 # no time - nothing should be found : 687 for i in range(10): 688 f.write("[sshd] error: PAM: failure len 1\n") 689 f.flush() 690 fc.setPos(0); self.filter.seekToTime(fc, time) 691 692 # rewrite 693 f.seek(0) 694 f.truncate() 695 fc.close() 696 fc = FileContainer(fname, self.filter.getLogEncoding()) 697 fc.open() 698 # one entry with smaller time: 699 f.write("%s [sshd] error: PAM: failure len 2\n" % _tm(time - 10)) 700 f.flush() 701 fc.setPos(0); self.filter.seekToTime(fc, time) 702 self.assertEqual(fc.getPos(), 53) 703 # two entries with smaller time: 704 f.write("%s [sshd] error: PAM: failure len 3 2 1\n" % _tm(time - 9)) 705 f.flush() 706 fc.setPos(0); self.filter.seekToTime(fc, time) 707 self.assertEqual(fc.getPos(), 110) 708 # check move after end (all of time smaller): 709 f.write("%s [sshd] error: PAM: failure\n" % _tm(time - 1)) 710 f.flush() 711 self.assertEqual(fc.getFileSize(), 157) 712 fc.setPos(0); self.filter.seekToTime(fc, time) 713 self.assertEqual(fc.getPos(), 157) 714 715 # stil one exact line: 716 f.write("%s [sshd] error: PAM: Authentication failure\n" % _tm(time)) 717 f.write("%s [sshd] error: PAM: failure len 1\n" % _tm(time)) 718 f.flush() 719 fc.setPos(0); self.filter.seekToTime(fc, time) 720 self.assertEqual(fc.getPos(), 157) 721 722 # add something hereafter: 723 f.write("%s [sshd] error: PAM: failure len 3 2 1\n" % _tm(time + 2)) 724 f.write("%s [sshd] error: PAM: Authentication failure\n" % _tm(time + 3)) 725 f.flush() 726 fc.setPos(0); self.filter.seekToTime(fc, time) 727 self.assertEqual(fc.getPos(), 157) 728 # add something hereafter: 729 f.write("%s [sshd] error: PAM: failure\n" % _tm(time + 9)) 730 f.write("%s [sshd] error: PAM: failure len 4 3 2\n" % _tm(time + 9)) 731 f.flush() 732 fc.setPos(0); self.filter.seekToTime(fc, time) 733 self.assertEqual(fc.getPos(), 157) 734 # start search from current pos : 735 fc.setPos(157); self.filter.seekToTime(fc, time) 736 self.assertEqual(fc.getPos(), 157) 737 # start search from current pos : 738 fc.setPos(110); self.filter.seekToTime(fc, time) 739 self.assertEqual(fc.getPos(), 157) 740 741 finally: 742 if fc: 743 fc.close() 744 _killfile(f, fname) 745 746 def testSeekToTimeLargeFile(self): 747 # speedup search using exact date pattern: 748 self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') 749 fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log') 750 time = 1417512352 751 f = open(fname, 'w') 752 fc = None 753 count = 1000 if unittest.F2B.fast else 10000 754 try: 755 fc = FileContainer(fname, self.filter.getLogEncoding()) 756 fc.open() 757 f.seek(0) 758 # variable length of file (ca 45K or 450K before and hereafter): 759 # write lines with smaller as search time: 760 t = time - count - 1 761 for i in range(count): 762 f.write("%s [sshd] error: PAM: failure\n" % _tm(t)) 763 t += 1 764 f.flush() 765 fc.setPos(0); self.filter.seekToTime(fc, time) 766 self.assertEqual(fc.getPos(), 47*count) 767 # write lines with exact search time: 768 for i in range(10): 769 f.write("%s [sshd] error: PAM: failure\n" % _tm(time)) 770 f.flush() 771 fc.setPos(0); self.filter.seekToTime(fc, time) 772 self.assertEqual(fc.getPos(), 47*count) 773 fc.setPos(4*count); self.filter.seekToTime(fc, time) 774 self.assertEqual(fc.getPos(), 47*count) 775 # write lines with greater as search time: 776 t = time+1 777 for i in range(count//500): 778 for j in range(500): 779 f.write("%s [sshd] error: PAM: failure\n" % _tm(t)) 780 t += 1 781 f.flush() 782 fc.setPos(0); self.filter.seekToTime(fc, time) 783 self.assertEqual(fc.getPos(), 47*count) 784 fc.setPos(53); self.filter.seekToTime(fc, time) 785 self.assertEqual(fc.getPos(), 47*count) 786 787 finally: 788 if fc: 789 fc.close() 790 _killfile(f, fname) 791 792class LogFileMonitor(LogCaptureTestCase): 793 """Few more tests for FilterPoll API 794 """ 795 def setUp(self): 796 """Call before every test case.""" 797 setUpMyTime() 798 LogCaptureTestCase.setUp(self) 799 self.filter = self.name = 'NA' 800 _, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures') 801 self.file = open(self.name, 'a') 802 self.filter = FilterPoll(DummyJail()) 803 self.filter.banASAP = False # avoid immediate ban in this tests 804 self.filter.addLogPath(self.name, autoSeek=False) 805 self.filter.active = True 806 self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") 807 808 def tearDown(self): 809 tearDownMyTime() 810 LogCaptureTestCase.tearDown(self) 811 _killfile(self.file, self.name) 812 pass 813 814 def isModified(self, delay=2): 815 """Wait up to `delay` sec to assure that it was modified or not 816 """ 817 return Utils.wait_for(lambda: self.filter.isModified(self.name), _maxWaitTime(delay)) 818 819 def notModified(self, delay=2): 820 """Wait up to `delay` sec as long as it was not modified 821 """ 822 return Utils.wait_for(lambda: not self.filter.isModified(self.name), _maxWaitTime(delay)) 823 824 def testUnaccessibleLogFile(self): 825 os.chmod(self.name, 0) 826 self.filter.getFailures(self.name) 827 failure_was_logged = self._is_logged('Unable to open %s' % self.name) 828 # verify that we cannot access the file. Checking by name of user is not 829 # sufficient since could be a fakeroot or some other super-user 830 is_root = True 831 try: 832 with open(self.name) as f: # pragma: no cover - normally no root 833 f.read() 834 except IOError: 835 is_root = False 836 837 # If ran as root, those restrictive permissions would not 838 # forbid log to be read. 839 self.assertTrue(failure_was_logged != is_root) 840 841 def testNoLogFile(self): 842 _killfile(self.file, self.name) 843 self.filter.getFailures(self.name) 844 self.assertLogged('Unable to open %s' % self.name) 845 846 def testErrorProcessLine(self): 847 # speedup search using exact date pattern: 848 self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') 849 self.filter.sleeptime /= 1000.0 850 ## produce error with not callable processLine: 851 _org_processLine = self.filter.processLine 852 self.filter.processLine = None 853 for i in range(100): 854 self.file.write("line%d\n" % 1) 855 self.file.flush() 856 for i in range(100): 857 self.filter.getFailures(self.name) 858 self.assertLogged('Failed to process line:') 859 self.assertLogged('Too many errors at once') 860 self.pruneLog() 861 self.assertTrue(self.filter.idle) 862 self.filter.idle = False 863 self.filter.getFailures(self.name) 864 self.filter.processLine = _org_processLine 865 self.file.write("line%d\n" % 1) 866 self.file.flush() 867 self.filter.getFailures(self.name) 868 self.assertNotLogged('Failed to process line:') 869 870 def testRemovingFailRegex(self): 871 self.filter.delFailRegex(0) 872 self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid') 873 self.filter.delFailRegex(0) 874 self.assertLogged('Cannot remove regular expression. Index 0 is not valid') 875 876 def testRemovingIgnoreRegex(self): 877 self.filter.delIgnoreRegex(0) 878 self.assertLogged('Cannot remove regular expression. Index 0 is not valid') 879 880 def testNewChangeViaIsModified(self): 881 # it is a brand new one -- so first we think it is modified 882 self.assertTrue(self.isModified()) 883 # but not any longer 884 self.assertTrue(self.notModified()) 885 self.assertTrue(self.notModified()) 886 mtimesleep() # to guarantee freshier mtime 887 for i in range(4): # few changes 888 # unless we write into it 889 self.file.write("line%d\n" % i) 890 self.file.flush() 891 self.assertTrue(self.isModified()) 892 self.assertTrue(self.notModified()) 893 mtimesleep() # to guarantee freshier mtime 894 os.rename(self.name, self.name + '.old') 895 # we are not signaling as modified whenever 896 # it gets away 897 self.assertTrue(self.notModified(1)) 898 f = open(self.name, 'a') 899 self.assertTrue(self.isModified()) 900 self.assertTrue(self.notModified()) 901 mtimesleep() 902 f.write("line%d\n" % i) 903 f.flush() 904 self.assertTrue(self.isModified()) 905 self.assertTrue(self.notModified()) 906 _killfile(f, self.name) 907 _killfile(self.name, self.name + '.old') 908 pass 909 910 def testNewChangeViaGetFailures_simple(self): 911 # speedup search using exact date pattern: 912 self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') 913 # suck in lines from this sample log file 914 self.filter.getFailures(self.name) 915 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 916 917 # Now let's feed it with entries from the file 918 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5) 919 self.filter.getFailures(self.name) 920 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 921 # and it should have not been enough 922 923 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3) 924 self.filter.getFailures(self.name) 925 _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) 926 927 def testNewChangeViaGetFailures_rewrite(self): 928 # speedup search using exact date pattern: 929 self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') 930 # 931 # if we rewrite the file at once 932 self.file.close() 933 _copy_lines_between_files(GetFailures.FILENAME_01, self.name).close() 934 self.filter.getFailures(self.name) 935 _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) 936 937 # What if file gets overridden 938 # yoh: skip so we skip those 2 identical lines which our 939 # filter "marked" as the known beginning, otherwise it 940 # would not detect "rotation" 941 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 942 skip=12, n=3, mode='w') 943 self.filter.getFailures(self.name) 944 #self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 945 _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) 946 947 def testNewChangeViaGetFailures_move(self): 948 # speedup search using exact date pattern: 949 self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') 950 # 951 # if we move file into a new location while it has been open already 952 self.file.close() 953 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 954 n=14, mode='w') 955 self.filter.getFailures(self.name) 956 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 957 self.assertEqual(self.filter.failManager.getFailTotal(), 2) 958 959 # move aside, but leaving the handle still open... 960 os.rename(self.name, self.name + '.bak') 961 _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1).close() 962 self.filter.getFailures(self.name) 963 _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01) 964 self.assertEqual(self.filter.failManager.getFailTotal(), 3) 965 966 967class CommonMonitorTestCase(unittest.TestCase): 968 969 def setUp(self): 970 """Call before every test case.""" 971 super(CommonMonitorTestCase, self).setUp() 972 self._failTotal = 0 973 974 def waitFailTotal(self, count, delay=1): 975 """Wait up to `delay` sec to assure that expected failure `count` reached 976 """ 977 ret = Utils.wait_for( 978 lambda: self.filter.failManager.getFailTotal() >= (self._failTotal + count) and self.jail.isFilled(), 979 _maxWaitTime(delay)) 980 self._failTotal += count 981 return ret 982 983 def isFilled(self, delay=1): 984 """Wait up to `delay` sec to assure that it was modified or not 985 """ 986 return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay)) 987 988 def isEmpty(self, delay=5): 989 """Wait up to `delay` sec to assure that it empty again 990 """ 991 return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay)) 992 993 def waitForTicks(self, ticks, delay=2): 994 """Wait up to `delay` sec to assure that it was modified or not 995 """ 996 last_ticks = self.filter.ticks 997 return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay)) 998 999 1000def get_monitor_failures_testcase(Filter_): 1001 """Generator of TestCase's for different filters/backends 1002 """ 1003 1004 # add Filter_'s name so we could easily identify bad cows 1005 testclass_name = tempfile.mktemp( 1006 'fail2ban', 'monitorfailures_%s_' % (Filter_.__name__,)) 1007 1008 class MonitorFailures(CommonMonitorTestCase): 1009 count = 0 1010 1011 def setUp(self): 1012 """Call before every test case.""" 1013 super(MonitorFailures, self).setUp() 1014 setUpMyTime() 1015 self.filter = self.name = 'NA' 1016 self.name = '%s-%d' % (testclass_name, self.count) 1017 MonitorFailures.count += 1 # so we have unique filenames across tests 1018 self.file = open(self.name, 'a') 1019 self.jail = DummyJail() 1020 self.filter = Filter_(self.jail) 1021 self.filter.banASAP = False # avoid immediate ban in this tests 1022 self.filter.addLogPath(self.name, autoSeek=False) 1023 # speedup search using exact date pattern: 1024 self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') 1025 self.filter.active = True 1026 self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") 1027 self.filter.start() 1028 # If filter is polling it would sleep a bit to guarantee that 1029 # we have initial time-stamp difference to trigger "actions" 1030 self._sleep_4_poll() 1031 #print "D: started filter %s" % self.filter 1032 1033 def tearDown(self): 1034 tearDownMyTime() 1035 #print "D: SLEEPING A BIT" 1036 #import time; time.sleep(5) 1037 #print "D: TEARING DOWN" 1038 self.filter.stop() 1039 #print "D: WAITING FOR FILTER TO STOP" 1040 self.filter.join() # wait for the thread to terminate 1041 #print "D: KILLING THE FILE" 1042 _killfile(self.file, self.name) 1043 #time.sleep(0.2) # Give FS time to ack the removal 1044 super(MonitorFailures, self).tearDown() 1045 1046 def _sleep_4_poll(self): 1047 # Since FilterPoll relies on time stamps and some 1048 # actions might be happening too fast in the tests, 1049 # sleep a bit to guarantee reliable time stamps 1050 if isinstance(self.filter, FilterPoll): 1051 Utils.wait_for(self.filter.isAlive, _maxWaitTime(5)) 1052 1053 def assert_correct_last_attempt(self, failures, count=None): 1054 self.assertTrue(self.waitFailTotal(count if count else failures[1], 10)) 1055 _assert_correct_last_attempt(self, self.jail, failures, count=count) 1056 1057 def test_grow_file(self): 1058 self._test_grow_file() 1059 1060 def test_grow_file_in_idle(self): 1061 self._test_grow_file(True) 1062 1063 def _test_grow_file(self, idle=False): 1064 if idle: 1065 self.filter.sleeptime /= 100.0 1066 self.filter.idle = True 1067 self.waitForTicks(1) 1068 # suck in lines from this sample log file 1069 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1070 1071 # Now let's feed it with entries from the file 1072 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=12) 1073 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1074 # and our dummy jail is empty as well 1075 self.assertFalse(len(self.jail)) 1076 # since it should have not been enough 1077 1078 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3) 1079 if idle: 1080 self.waitForTicks(1) 1081 self.assertTrue(self.isEmpty(1)) 1082 return 1083 self.assertTrue(self.isFilled(10)) 1084 # so we sleep a bit for it not to become empty, 1085 # and meanwhile pass to other thread(s) and filter should 1086 # have gathered new failures and passed them into the 1087 # DummyJail 1088 self.assertEqual(len(self.jail), 1) 1089 # and there should be no "stuck" ticket in failManager 1090 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1091 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1092 self.assertEqual(len(self.jail), 0) 1093 1094 #return 1095 # just for fun let's copy all of them again and see if that results 1096 # in a new ban 1097 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3) 1098 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1099 1100 def test_rewrite_file(self): 1101 # if we rewrite the file at once 1102 self.file.close() 1103 _copy_lines_between_files(GetFailures.FILENAME_01, self.name).close() 1104 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1105 1106 # What if file gets overridden 1107 # yoh: skip so we skip those 2 identical lines which our 1108 # filter "marked" as the known beginning, otherwise it 1109 # would not detect "rotation" 1110 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 1111 skip=12, n=3, mode='w') 1112 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1113 1114 def _wait4failures(self, count=2): 1115 # Poll might need more time 1116 self.assertTrue(self.isEmpty(_maxWaitTime(5)), 1117 "Queue must be empty but it is not: %s." 1118 % (', '.join([str(x) for x in self.jail.queue]))) 1119 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1120 Utils.wait_for(lambda: self.filter.failManager.getFailTotal() >= count, _maxWaitTime(10)) 1121 self.assertEqual(self.filter.failManager.getFailTotal(), count) 1122 1123 def test_move_file(self): 1124 # if we move file into a new location while it has been open already 1125 self.file.close() 1126 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 1127 n=14, mode='w') 1128 self._wait4failures() 1129 1130 # move aside, but leaving the handle still open... 1131 os.rename(self.name, self.name + '.bak') 1132 _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1).close() 1133 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1134 self.assertEqual(self.filter.failManager.getFailTotal(), 3) 1135 1136 # now remove the moved file 1137 _killfile(None, self.name + '.bak') 1138 _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3).close() 1139 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1140 self.assertEqual(self.filter.failManager.getFailTotal(), 6) 1141 1142 def test_pyinotify_delWatch(self): 1143 if hasattr(self.filter, '_delWatch'): # pyinotify only 1144 m = self.filter._FilterPyinotify__monitor 1145 # remove existing watch: 1146 self.assertTrue(self.filter._delWatch(m.get_wd(self.name))) 1147 # mockup get_path to allow once find path for invalid wd-value: 1148 _org_get_path = m.get_path 1149 def _get_path(wd): 1150 #m.get_path = _org_get_path 1151 return 'test' 1152 m.get_path = _get_path 1153 # try remove watch using definitely not existing handle: 1154 self.assertFalse(self.filter._delWatch(0x7fffffff)) 1155 m.get_path = _org_get_path 1156 1157 def test_del_file(self): 1158 # test filter reaction by delete watching file: 1159 self.file.close() 1160 self.waitForTicks(1) 1161 # remove file (cause detection of log-rotation)... 1162 os.unlink(self.name) 1163 # check it was detected (in pending files): 1164 self.waitForTicks(2) 1165 if hasattr(self.filter, "getPendingPaths"): 1166 self.assertTrue(Utils.wait_for(lambda: self.name in self.filter.getPendingPaths(), _maxWaitTime(10))) 1167 self.assertEqual(len(self.filter.getPendingPaths()), 1) 1168 1169 @with_tmpdir 1170 def test_move_dir(self, tmp): 1171 self.file.close() 1172 self.filter.setMaxRetry(10) 1173 self.filter.delLogPath(self.name) 1174 _killfile(None, self.name) 1175 # if we rename parent dir into a new location (simulate directory-base log rotation) 1176 tmpsub1 = os.path.join(tmp, "1") 1177 tmpsub2 = os.path.join(tmp, "2") 1178 os.mkdir(tmpsub1) 1179 self.name = os.path.join(tmpsub1, os.path.basename(self.name)) 1180 os.close(os.open(self.name, os.O_CREAT|os.O_APPEND)); # create empty file 1181 self.filter.addLogPath(self.name, autoSeek=False) 1182 1183 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 1184 skip=12, n=1, mode='w') 1185 self.file.close() 1186 self._wait4failures(1) 1187 1188 # rotate whole directory: rename directory 1 as 2a: 1189 os.rename(tmpsub1, tmpsub2 + 'a') 1190 os.mkdir(tmpsub1) 1191 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 1192 skip=12, n=1, mode='w') 1193 self.file.close() 1194 self._wait4failures(2) 1195 1196 # rotate whole directory: rename directory 1 as 2b: 1197 os.rename(tmpsub1, tmpsub2 + 'b') 1198 # wait a bit in-between (try to increase coverage, should find pending file for pending dir): 1199 self.waitForTicks(2) 1200 os.mkdir(tmpsub1) 1201 self.waitForTicks(2) 1202 self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 1203 skip=12, n=1, mode='w') 1204 self.file.close() 1205 self._wait4failures(3) 1206 1207 # stop before tmpdir deleted (just prevents many monitor events) 1208 self.filter.stop() 1209 self.filter.join() 1210 1211 1212 def _test_move_into_file(self, interim_kill=False): 1213 # if we move a new file into the location of an old (monitored) file 1214 _copy_lines_between_files(GetFailures.FILENAME_01, self.name).close() 1215 # make sure that it is monitored first 1216 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1217 self.assertEqual(self.filter.failManager.getFailTotal(), 3) 1218 1219 if interim_kill: 1220 _killfile(None, self.name) 1221 time.sleep(Utils.DEFAULT_SHORT_INTERVAL) # let them know 1222 1223 # now create a new one to override old one 1224 _copy_lines_between_files(GetFailures.FILENAME_01, self.name + '.new', 1225 skip=12, n=3).close() 1226 os.rename(self.name + '.new', self.name) 1227 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1228 self.assertEqual(self.filter.failManager.getFailTotal(), 6) 1229 1230 # and to make sure that it now monitored for changes 1231 _copy_lines_between_files(GetFailures.FILENAME_01, self.name, 1232 skip=12, n=3).close() 1233 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1234 self.assertEqual(self.filter.failManager.getFailTotal(), 9) 1235 1236 def test_move_into_file(self): 1237 self._test_move_into_file(interim_kill=False) 1238 1239 def test_move_into_file_after_removed(self): 1240 # exactly as above test + remove file explicitly 1241 # to test against possible drop-out of the file from monitoring 1242 self._test_move_into_file(interim_kill=True) 1243 1244 def test_new_bogus_file(self): 1245 # to make sure that watching whole directory does not effect 1246 _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close() 1247 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1248 1249 # create a bogus file in the same directory and see if that doesn't affect 1250 open(self.name + '.bak2', 'w').close() 1251 _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3).close() 1252 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1253 self.assertEqual(self.filter.failManager.getFailTotal(), 6) 1254 _killfile(None, self.name + '.bak2') 1255 1256 def test_delLogPath(self): 1257 # Smoke test for removing of the path from being watched 1258 1259 # basic full test 1260 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100) 1261 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1262 1263 # and now remove the LogPath 1264 self.filter.delLogPath(self.name) 1265 # wait a bit for filter (backend-threads): 1266 self.waitForTicks(2) 1267 1268 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100) 1269 # so we should get no more failures detected 1270 self.assertTrue(self.isEmpty(10)) 1271 1272 # but then if we add it back again (no seek to time in FileFilter's, because in file used the same time) 1273 self.filter.addLogPath(self.name, autoSeek=False) 1274 # wait a bit for filter (backend-threads): 1275 self.waitForTicks(2) 1276 # Tricky catch here is that it should get them from the 1277 # tail written before, so let's not copy anything yet 1278 #_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100) 1279 # we should detect the failures 1280 self.assert_correct_last_attempt(GetFailures.FAILURES_01, count=6) # was needed if we write twice above 1281 1282 # now copy and get even more 1283 _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3) 1284 # check for 3 failures (not 9), because 6 already get above... 1285 self.assert_correct_last_attempt(GetFailures.FAILURES_01) 1286 # total count in this test: 1287 self.assertEqual(self.filter.failManager.getFailTotal(), 12) 1288 1289 cls = MonitorFailures 1290 cls.__qualname__ = cls.__name__ = "MonitorFailures<%s>(%s)" \ 1291 % (Filter_.__name__, testclass_name) # 'tempfile') 1292 return cls 1293 1294 1295def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover 1296 """Generator of TestCase's for journal based filters/backends 1297 """ 1298 1299 testclass_name = "monitorjournalfailures_%s" % (Filter_.__name__,) 1300 1301 class MonitorJournalFailures(CommonMonitorTestCase): 1302 def setUp(self): 1303 """Call before every test case.""" 1304 super(MonitorJournalFailures, self).setUp() 1305 self._runtimeJournal = None 1306 self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log") 1307 self.jail = DummyJail() 1308 self.filter = None 1309 # UUID used to ensure that only meeages generated 1310 # as part of this test are picked up by the filter 1311 self.test_uuid = str(uuid.uuid4()) 1312 self.name = "%s-%s" % (testclass_name, self.test_uuid) 1313 self.journal_fields = { 1314 'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid} 1315 1316 def _initFilter(self, **kwargs): 1317 self._getRuntimeJournal() # check journal available 1318 self.filter = Filter_(self.jail, **kwargs) 1319 self.filter.banASAP = False # avoid immediate ban in this tests 1320 self.filter.addJournalMatch([ 1321 "SYSLOG_IDENTIFIER=fail2ban-testcases", 1322 "TEST_FIELD=1", 1323 "TEST_UUID=%s" % self.test_uuid]) 1324 self.filter.addJournalMatch([ 1325 "SYSLOG_IDENTIFIER=fail2ban-testcases", 1326 "TEST_FIELD=2", 1327 "TEST_UUID=%s" % self.test_uuid]) 1328 self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>") 1329 1330 def tearDown(self): 1331 if self.filter and self.filter.active: 1332 self.filter.stop() 1333 self.filter.join() # wait for the thread to terminate 1334 super(MonitorJournalFailures, self).tearDown() 1335 1336 def _getRuntimeJournal(self): 1337 """Retrieve current system journal path 1338 1339 If not found, SkipTest exception will be raised. 1340 """ 1341 # we can cache it: 1342 if self._runtimeJournal is None: 1343 # Depending on the system, it could be found under /run or /var/log (e.g. Debian) 1344 # which are pointed by different systemd-path variables. We will 1345 # check one at at time until the first hit 1346 for systemd_var in 'system-runtime-logs', 'system-state-logs': 1347 tmp = Utils.executeCmd( 1348 'find "$(systemd-path %s)" -name system.journal' % systemd_var, 1349 timeout=10, shell=True, output=True 1350 ) 1351 self.assertTrue(tmp) 1352 out = str(tmp[1].decode('utf-8')).split('\n')[0] 1353 if out: break 1354 self._runtimeJournal = out 1355 if self._runtimeJournal: 1356 return self._runtimeJournal 1357 raise unittest.SkipTest('systemd journal seems to be not available (e. g. no rights to read)') 1358 1359 def testJournalFilesArg(self): 1360 # retrieve current system journal path 1361 jrnlfile = self._getRuntimeJournal() 1362 self._initFilter(journalfiles=jrnlfile) 1363 1364 def testJournalFilesAndFlagsArgs(self): 1365 # retrieve current system journal path 1366 jrnlfile = self._getRuntimeJournal() 1367 self._initFilter(journalfiles=jrnlfile, journalflags=0) 1368 1369 def testJournalPathArg(self): 1370 # retrieve current system journal path 1371 jrnlpath = self._getRuntimeJournal() 1372 jrnlpath = os.path.dirname(jrnlpath) 1373 self._initFilter(journalpath=jrnlpath) 1374 self.filter.seekToTime( 1375 datetime.datetime.now() - datetime.timedelta(days=1) 1376 ) 1377 self.filter.start() 1378 self.waitForTicks(2) 1379 self.assertTrue(self.isEmpty(1)) 1380 self.assertEqual(len(self.jail), 0) 1381 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1382 1383 def testJournalFlagsArg(self): 1384 self._initFilter(journalflags=0) # e. g. 2 - journal.RUNTIME_ONLY 1385 1386 def assert_correct_ban(self, test_ip, test_attempts): 1387 self.assertTrue(self.waitFailTotal(test_attempts, 10)) # give Filter a chance to react 1388 ticket = self.jail.getFailTicket() 1389 self.assertTrue(ticket) 1390 1391 attempts = ticket.getAttempt() 1392 ip = ticket.getIP() 1393 ticket.getMatches() 1394 1395 self.assertEqual(ip, test_ip) 1396 self.assertEqual(attempts, test_attempts) 1397 1398 def test_grow_file(self): 1399 self._test_grow_file() 1400 1401 def test_grow_file_in_idle(self): 1402 self._test_grow_file(True) 1403 1404 def _test_grow_file(self, idle=False): 1405 self._initFilter() 1406 self.filter.start() 1407 if idle: 1408 self.filter.sleeptime /= 100.0 1409 self.filter.idle = True 1410 self.waitForTicks(1) 1411 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1412 1413 # Now let's feed it with entries from the file 1414 _copy_lines_to_journal( 1415 self.test_file, self.journal_fields, n=2) 1416 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1417 # and our dummy jail is empty as well 1418 self.assertFalse(len(self.jail)) 1419 # since it should have not been enough 1420 1421 _copy_lines_to_journal( 1422 self.test_file, self.journal_fields, skip=2, n=3) 1423 if idle: 1424 self.waitForTicks(1) 1425 self.assertTrue(self.isEmpty(1)) 1426 return 1427 self.assertTrue(self.isFilled(10)) 1428 # so we sleep for up to 6 sec for it not to become empty, 1429 # and meanwhile pass to other thread(s) and filter should 1430 # have gathered new failures and passed them into the 1431 # DummyJail 1432 self.assertEqual(len(self.jail), 1) 1433 # and there should be no "stuck" ticket in failManager 1434 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1435 self.assert_correct_ban("193.168.0.128", 3) 1436 self.assertEqual(len(self.jail), 0) 1437 1438 # Lets read some more to check it bans again 1439 _copy_lines_to_journal( 1440 self.test_file, self.journal_fields, skip=5, n=4) 1441 self.assert_correct_ban("193.168.0.128", 3) 1442 1443 @with_alt_time 1444 def test_grow_file_with_db(self): 1445 1446 def _gen_falure(ip): 1447 # insert new failures ans check it is monitored: 1448 fields = self.journal_fields 1449 fields.update(TEST_JOURNAL_FIELDS) 1450 journal.send(MESSAGE="error: PAM: Authentication failure for test from "+ip, **fields) 1451 self.waitForTicks(1) 1452 self.assert_correct_ban(ip, 1) 1453 1454 # coverage for update log: 1455 self.jail.database = getFail2BanDb(':memory:') 1456 self.jail.database.addJail(self.jail) 1457 MyTime.setTime(time.time()) 1458 self._test_grow_file() 1459 # stop: 1460 self.filter.stop() 1461 self.filter.join() 1462 MyTime.setTime(time.time() + 2) 1463 # update log manually (should cause a seek to end of log without wait for next second): 1464 self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time(), 'TEST') 1465 # check seek to last (simulated) position succeeds (without bans of previous copied tickets): 1466 self._failTotal = 0 1467 self._initFilter() 1468 self.filter.setMaxRetry(1) 1469 self.filter.start() 1470 self.waitForTicks(1) 1471 # check new IP but no old IPs found: 1472 _gen_falure("192.0.2.5") 1473 self.assertFalse(self.jail.getFailTicket()) 1474 1475 # now the same with increased time (check now - findtime case): 1476 self.filter.stop() 1477 self.filter.join() 1478 MyTime.setTime(time.time() + 10000) 1479 self._failTotal = 0 1480 self._initFilter() 1481 self.filter.setMaxRetry(1) 1482 self.filter.start() 1483 self.waitForTicks(1) 1484 MyTime.setTime(time.time() + 3) 1485 # check new IP but no old IPs found: 1486 _gen_falure("192.0.2.6") 1487 self.assertFalse(self.jail.getFailTicket()) 1488 1489 def test_delJournalMatch(self): 1490 self._initFilter() 1491 self.filter.start() 1492 # Smoke test for removing of match 1493 1494 # basic full test 1495 _copy_lines_to_journal( 1496 self.test_file, self.journal_fields, n=5) 1497 self.assert_correct_ban("193.168.0.128", 3) 1498 1499 # and now remove the JournalMatch 1500 self.filter.delJournalMatch([ 1501 "SYSLOG_IDENTIFIER=fail2ban-testcases", 1502 "TEST_FIELD=1", 1503 "TEST_UUID=%s" % self.test_uuid]) 1504 1505 _copy_lines_to_journal( 1506 self.test_file, self.journal_fields, n=5, skip=5) 1507 # so we should get no more failures detected 1508 self.assertTrue(self.isEmpty(10)) 1509 1510 # but then if we add it back again 1511 self.filter.addJournalMatch([ 1512 "SYSLOG_IDENTIFIER=fail2ban-testcases", 1513 "TEST_FIELD=1", 1514 "TEST_UUID=%s" % self.test_uuid]) 1515 self.assert_correct_ban("193.168.0.128", 4) 1516 _copy_lines_to_journal( 1517 self.test_file, self.journal_fields, n=6, skip=10) 1518 # we should detect the failures 1519 self.assertTrue(self.isFilled(10)) 1520 1521 def test_WrongChar(self): 1522 self._initFilter() 1523 self.filter.start() 1524 # Now let's feed it with entries from the file 1525 _copy_lines_to_journal( 1526 self.test_file, self.journal_fields, skip=15, n=4) 1527 self.waitForTicks(1) 1528 self.assertTrue(self.isFilled(10)) 1529 self.assert_correct_ban("87.142.124.10", 4) 1530 # Add direct utf, unicode, blob: 1531 for l in ( 1532 "error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1", 1533 "error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1", 1534 b"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1".decode('utf-8', 'replace'), 1535 "error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2", 1536 "error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2", 1537 b"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2".decode('utf-8', 'replace') 1538 ): 1539 fields = self.journal_fields 1540 fields.update(TEST_JOURNAL_FIELDS) 1541 journal.send(MESSAGE=l, **fields) 1542 self.waitForTicks(1) 1543 self.waitFailTotal(6, 10) 1544 self.assertTrue(Utils.wait_for(lambda: len(self.jail) == 2, 10)) 1545 self.assertSortedEqual([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()], 1546 ["192.0.2.1", "192.0.2.2"]) 1547 1548 cls = MonitorJournalFailures 1549 cls.__qualname__ = cls.__name__ = "MonitorJournalFailures<%s>(%s)" \ 1550 % (Filter_.__name__, testclass_name) 1551 return cls 1552 1553 1554class GetFailures(LogCaptureTestCase): 1555 1556 FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log") 1557 FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log") 1558 FILENAME_03 = os.path.join(TEST_FILES_DIR, "testcase03.log") 1559 FILENAME_04 = os.path.join(TEST_FILES_DIR, "testcase04.log") 1560 FILENAME_USEDNS = os.path.join(TEST_FILES_DIR, "testcase-usedns.log") 1561 FILENAME_MULTILINE = os.path.join(TEST_FILES_DIR, "testcase-multiline.log") 1562 1563 # so that they could be reused by other tests 1564 FAILURES_01 = ('193.168.0.128', 3, 1124013599.0, 1565 ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128']*3) 1566 1567 def setUp(self): 1568 """Call before every test case.""" 1569 LogCaptureTestCase.setUp(self) 1570 setUpMyTime() 1571 self.jail = DummyJail() 1572 self.filter = FileFilter(self.jail) 1573 self.filter.banASAP = False # avoid immediate ban in this tests 1574 self.filter.active = True 1575 # speedup search using exact date pattern: 1576 self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?') 1577 # TODO Test this 1578 #self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}") 1579 #self.filter.setTimePattern("%b %d %H:%M:%S") 1580 1581 def tearDown(self): 1582 """Call after every test case.""" 1583 tearDownMyTime() 1584 LogCaptureTestCase.tearDown(self) 1585 1586 def testFilterAPI(self): 1587 self.assertEqual(self.filter.getLogs(), []) 1588 self.assertEqual(self.filter.getLogCount(), 0) 1589 self.filter.addLogPath(GetFailures.FILENAME_01, tail=True) 1590 self.assertEqual(self.filter.getLogCount(), 1) 1591 self.assertEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01]) 1592 self.filter.addLogPath(GetFailures.FILENAME_02, tail=True) 1593 self.assertEqual(self.filter.getLogCount(), 2) 1594 self.assertSortedEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01, GetFailures.FILENAME_02]) 1595 1596 def testTail(self): 1597 # There must be no containters registered, otherwise [-1] indexing would be wrong 1598 self.assertEqual(self.filter.getLogs(), []) 1599 self.filter.addLogPath(GetFailures.FILENAME_01, tail=True) 1600 self.assertEqual(self.filter.getLogs()[-1].getPos(), 1653) 1601 self.filter.getLogs()[-1].close() 1602 self.assertEqual(self.filter.getLogs()[-1].readline(), "") 1603 self.filter.delLogPath(GetFailures.FILENAME_01) 1604 self.assertEqual(self.filter.getLogs(), []) 1605 1606 def testNoLogAdded(self): 1607 self.filter.addLogPath(GetFailures.FILENAME_01, tail=True) 1608 self.assertTrue(self.filter.containsLogPath(GetFailures.FILENAME_01)) 1609 self.filter.delLogPath(GetFailures.FILENAME_01) 1610 self.assertFalse(self.filter.containsLogPath(GetFailures.FILENAME_01)) 1611 # and unknown (safety and cover) 1612 self.assertFalse(self.filter.containsLogPath('unknown.log')) 1613 self.filter.delLogPath('unknown.log') 1614 1615 1616 def testGetFailures01(self, filename=None, failures=None): 1617 filename = filename or GetFailures.FILENAME_01 1618 failures = failures or GetFailures.FAILURES_01 1619 1620 self.filter.addLogPath(filename, autoSeek=0) 1621 self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$") 1622 self.filter.getFailures(filename) 1623 _assert_correct_last_attempt(self, self.filter, failures) 1624 1625 def testCRLFFailures01(self): 1626 # We first adjust logfile/failures to end with CR+LF 1627 fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf') 1628 # poor man unix2dos: 1629 fin, fout = open(GetFailures.FILENAME_01, 'rb'), open(fname, 'wb') 1630 for l in fin.read().splitlines(): 1631 fout.write(l + b'\r\n') 1632 fin.close() 1633 fout.close() 1634 1635 # now see if we should be getting the "same" failures 1636 self.testGetFailures01(filename=fname) 1637 _killfile(fout, fname) 1638 1639 def testGetFailures02(self): 1640 output = ('141.3.81.106', 4, 1124013539.0, 1641 ['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2' 1642 % m for m in (53, 54, 57, 58)]) 1643 1644 self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=0) 1645 self.filter.addFailRegex(r"Failed .* from <HOST>") 1646 self.filter.getFailures(GetFailures.FILENAME_02) 1647 _assert_correct_last_attempt(self, self.filter, output) 1648 1649 def testGetFailures03(self): 1650 output = ('203.162.223.135', 6, 1124013600.0) 1651 1652 self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0) 1653 self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown") 1654 self.filter.getFailures(GetFailures.FILENAME_03) 1655 _assert_correct_last_attempt(self, self.filter, output) 1656 1657 def testGetFailures03_InOperation(self): 1658 output = ('203.162.223.135', 9, 1124013600.0) 1659 1660 self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0) 1661 self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown") 1662 self.filter.getFailures(GetFailures.FILENAME_03, inOperation=True) 1663 _assert_correct_last_attempt(self, self.filter, output) 1664 1665 def testGetFailures03_Seek1(self): 1666 # same test as above but with seek to 'Aug 14 11:55:04' - so other output ... 1667 output = ('203.162.223.135', 3, 1124013600.0) 1668 1669 self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2] - 4*60) 1670 self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown") 1671 self.filter.getFailures(GetFailures.FILENAME_03) 1672 _assert_correct_last_attempt(self, self.filter, output) 1673 1674 def testGetFailures03_Seek2(self): 1675 # same test as above but with seek to 'Aug 14 11:59:04' - so other output ... 1676 output = ('203.162.223.135', 2, 1124013600.0) 1677 self.filter.setMaxRetry(1) 1678 1679 self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2]) 1680 self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown") 1681 self.filter.getFailures(GetFailures.FILENAME_03) 1682 _assert_correct_last_attempt(self, self.filter, output) 1683 1684 def testGetFailures04(self): 1685 # because of not exact time in testcase04.log (no year), we should always use our test time: 1686 self.assertEqual(MyTime.time(), 1124013600) 1687 # should find exact 4 failures for *.186 and 2 failures for *.185 1688 output = (('212.41.96.186', 4, 1124013600.0), 1689 ('212.41.96.185', 2, 1124013598.0)) 1690 1691 # speedup search using exact date pattern: 1692 self.filter.setDatePattern((r'^%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?', 1693 r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?', 1694 r'^EPOCH' 1695 )) 1696 self.filter.setMaxRetry(2) 1697 self.filter.addLogPath(GetFailures.FILENAME_04, autoSeek=0) 1698 self.filter.addFailRegex(r"Invalid user .* <HOST>") 1699 self.filter.getFailures(GetFailures.FILENAME_04) 1700 1701 _assert_correct_last_attempt(self, self.filter, output) 1702 1703 def testGetFailuresWrongChar(self): 1704 self.filter.checkFindTime = False 1705 # write wrong utf-8 char: 1706 fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf') 1707 fout = fopen(fname, 'wb') 1708 try: 1709 # write: 1710 for l in ( 1711 b'2015-01-14 20:00:58 user \"test\xf1ing\" from \"192.0.2.0\"\n', # wrong utf-8 char 1712 b'2015-01-14 20:00:59 user \"\xd1\xe2\xe5\xf2\xe0\" from \"192.0.2.0\"\n', # wrong utf-8 chars 1713 b'2015-01-14 20:01:00 user \"testing\" from \"192.0.2.0\"\n' # correct utf-8 chars 1714 ): 1715 fout.write(l) 1716 fout.close() 1717 # 1718 output = ('192.0.2.0', 3, 1421262060.0) 1719 failregex = r"^\s*user \"[^\"]*\" from \"<HOST>\"\s*$" 1720 1721 # test encoding auto or direct set of encoding: 1722 for enc in (None, 'utf-8', 'ascii'): 1723 if enc is not None: 1724 self.tearDown();self.setUp(); 1725 if DefLogSys.getEffectiveLevel() > 7: DefLogSys.setLevel(7); # ensure decode_line logs always 1726 self.filter.checkFindTime = False; 1727 self.filter.setLogEncoding(enc); 1728 # speedup search using exact date pattern: 1729 self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS') 1730 self.assertNotLogged('Error decoding line'); 1731 self.filter.addLogPath(fname) 1732 self.filter.addFailRegex(failregex) 1733 self.filter.getFailures(fname) 1734 _assert_correct_last_attempt(self, self.filter, output) 1735 1736 self.assertLogged('Error decoding line'); 1737 self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user '); 1738 self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user '); 1739 1740 finally: 1741 _killfile(fout, fname) 1742 1743 def testGetFailuresUseDNS(self): 1744 unittest.F2B.SkipIfNoNetwork() 1745 # We should still catch failures with usedns = no ;-) 1746 output_yes = ( 1747 ('93.184.216.34', 2, 1124013539.0, 1748 ['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2', 1749 'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2'] 1750 ), 1751 ('2606:2800:220:1:248:1893:25c8:1946', 1, 1124013299.0, 1752 ['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2'] 1753 ), 1754 ) 1755 1756 output_no = ( 1757 ('93.184.216.34', 1, 1124013539.0, 1758 ['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2'] 1759 ) 1760 ) 1761 1762 # Actually no exception would be raised -- it will be just set to 'no' 1763 #self.assertRaises(ValueError, 1764 # FileFilter, None, useDns='wrong_value_for_useDns') 1765 1766 for useDns, output in ( 1767 ('yes', output_yes), 1768 ('no', output_no), 1769 ('warn', output_yes) 1770 ): 1771 self.pruneLog("[test-phase useDns=%s]" % useDns) 1772 jail = DummyJail() 1773 filter_ = FileFilter(jail, useDns=useDns) 1774 filter_.banASAP = False # avoid immediate ban in this tests 1775 filter_.active = True 1776 filter_.failManager.setMaxRetry(1) # we might have just few failures 1777 1778 filter_.addLogPath(GetFailures.FILENAME_USEDNS, autoSeek=False) 1779 filter_.addFailRegex(r"Failed .* from <HOST>") 1780 filter_.getFailures(GetFailures.FILENAME_USEDNS) 1781 _assert_correct_last_attempt(self, filter_, output) 1782 1783 def testGetFailuresMultiRegex(self): 1784 output = ('141.3.81.106', 8, 1124013541.0) 1785 1786 self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False) 1787 self.filter.addFailRegex(r"Failed .* from <HOST>") 1788 self.filter.addFailRegex(r"Accepted .* from <HOST>") 1789 self.filter.getFailures(GetFailures.FILENAME_02) 1790 _assert_correct_last_attempt(self, self.filter, output) 1791 1792 def testGetFailuresIgnoreRegex(self): 1793 self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False) 1794 self.filter.addFailRegex(r"Failed .* from <HOST>") 1795 self.filter.addFailRegex(r"Accepted .* from <HOST>") 1796 self.filter.addIgnoreRegex("for roehl") 1797 1798 self.filter.getFailures(GetFailures.FILENAME_02) 1799 1800 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1801 1802 def testGetFailuresMultiLine(self): 1803 output = [("192.0.43.10", 2, 1124013599.0), 1804 ("192.0.43.11", 1, 1124013598.0)] 1805 self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False) 1806 self.filter.setMaxLines(100) 1807 self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$") 1808 self.filter.setMaxRetry(1) 1809 1810 self.filter.getFailures(GetFailures.FILENAME_MULTILINE) 1811 1812 foundList = [] 1813 while True: 1814 try: 1815 foundList.append( 1816 _ticket_tuple(self.filter.failManager.toBan())[0:3]) 1817 except FailManagerEmpty: 1818 break 1819 self.assertSortedEqual(foundList, output) 1820 1821 def testGetFailuresMultiLineIgnoreRegex(self): 1822 output = [("192.0.43.10", 2, 1124013599.0)] 1823 self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False) 1824 self.filter.setMaxLines(100) 1825 self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$") 1826 self.filter.addIgnoreRegex("rsync error: Received SIGINT") 1827 self.filter.setMaxRetry(1) 1828 1829 self.filter.getFailures(GetFailures.FILENAME_MULTILINE) 1830 1831 _assert_correct_last_attempt(self, self.filter, output.pop()) 1832 1833 self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) 1834 1835 def testGetFailuresMultiLineMultiRegex(self): 1836 output = [("192.0.43.10", 2, 1124013599.0), 1837 ("192.0.43.11", 1, 1124013598.0), 1838 ("192.0.43.15", 1, 1124013598.0)] 1839 self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False) 1840 self.filter.setMaxLines(100) 1841 self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$") 1842 self.filter.addFailRegex(r"^.* sendmail\[.*, msgid=<(?P<msgid>[^>]+).*relay=\[<HOST>\].*$<SKIPLINES>^.+ spamd: result: Y \d+ .*,mid=<(?P=msgid)>(,bayes=[.\d]+)?(,autolearn=\S+)?\s*$") 1843 self.filter.setMaxRetry(1) 1844 1845 self.filter.getFailures(GetFailures.FILENAME_MULTILINE) 1846 1847 foundList = [] 1848 while True: 1849 try: 1850 foundList.append( 1851 _ticket_tuple(self.filter.failManager.toBan())[0:3]) 1852 except FailManagerEmpty: 1853 break 1854 self.assertSortedEqual(foundList, output) 1855 1856 1857class DNSUtilsTests(unittest.TestCase): 1858 1859 def testCache(self): 1860 c = Utils.Cache(maxCount=5, maxTime=60) 1861 # not available : 1862 self.assertTrue(c.get('a') is None) 1863 self.assertEqual(c.get('a', 'test'), 'test') 1864 # exact 5 elements : 1865 for i in range(5): 1866 c.set(i, i) 1867 for i in range(5): 1868 self.assertEqual(c.get(i), i) 1869 # remove unavailable key: 1870 c.unset('a'); c.unset('a') 1871 1872 def testCacheMaxSize(self): 1873 c = Utils.Cache(maxCount=5, maxTime=60) 1874 # exact 5 elements : 1875 for i in range(5): 1876 c.set(i, i) 1877 self.assertEqual([c.get(i) for i in range(5)], [i for i in range(5)]) 1878 self.assertNotIn(-1, (c.get(i, -1) for i in range(5))) 1879 # add one - too many: 1880 c.set(10, i) 1881 # one element should be removed : 1882 self.assertIn(-1, (c.get(i, -1) for i in range(5))) 1883 # test max size (not expired): 1884 for i in range(10): 1885 c.set(i, 1) 1886 self.assertEqual(len(c), 5) 1887 1888 def testCacheMaxTime(self): 1889 # test max time (expired, timeout reached) : 1890 c = Utils.Cache(maxCount=5, maxTime=0.0005) 1891 for i in range(10): 1892 c.set(i, 1) 1893 st = time.time() 1894 self.assertTrue(Utils.wait_for(lambda: time.time() >= st + 0.0005, 1)) 1895 # we have still 5 elements (or fewer if too slow test mashine): 1896 self.assertTrue(len(c) <= 5) 1897 # but all that are expiered also: 1898 for i in range(10): 1899 self.assertTrue(c.get(i) is None) 1900 # here the whole cache should be empty: 1901 self.assertEqual(len(c), 0) 1902 1903 def testOverflowedIPCache(self): 1904 # test overflow of IP-cache multi-threaded (2 "parasite" threads flooding cache): 1905 from threading import Thread 1906 from random import shuffle 1907 # save original cache and use smaller cache during the test here: 1908 _org_cache = IPAddr.CACHE_OBJ 1909 cache = IPAddr.CACHE_OBJ = Utils.Cache(maxCount=5, maxTime=60) 1910 result = list() 1911 count = 1 if unittest.F2B.fast else 50 1912 try: 1913 # tester procedure of worker: 1914 def _TestCacheStr2IP(forw=True, result=[], random=False): 1915 try: 1916 c = count 1917 while c: 1918 c -= 1 1919 s = range(0, 256, 1) if forw else range(255, -1, -1) 1920 if random: shuffle([i for i in s]) 1921 for i in s: 1922 IPAddr('192.0.2.'+str(i), IPAddr.FAM_IPv4) 1923 IPAddr('2001:db8::'+str(i), IPAddr.FAM_IPv6) 1924 result.append(None) 1925 except Exception as e: 1926 DefLogSys.debug(e, exc_info=True) 1927 result.append(e) 1928 1929 # 2 workers flooding it forwards and backwards: 1930 th1 = Thread(target=_TestCacheStr2IP, args=(True, result)); th1.start() 1931 th2 = Thread(target=_TestCacheStr2IP, args=(False, result)); th2.start() 1932 # and here we flooding it with random IPs too: 1933 _TestCacheStr2IP(True, result, True) 1934 finally: 1935 # wait for end of threads and restore cache: 1936 th1.join() 1937 th2.join() 1938 IPAddr.CACHE_OBJ = _org_cache 1939 self.assertEqual(result, [None]*3) # no errors 1940 self.assertTrue(len(cache) <= cache.maxCount) 1941 1942 1943class DNSUtilsNetworkTests(unittest.TestCase): 1944 1945 def setUp(self): 1946 """Call before every test case.""" 1947 super(DNSUtilsNetworkTests, self).setUp() 1948 #unittest.F2B.SkipIfNoNetwork() 1949 1950 def test_IPAddr(self): 1951 ip4 = IPAddr('192.0.2.1') 1952 ip6 = IPAddr('2001:DB8::') 1953 self.assertTrue(ip4.isIPv4) 1954 self.assertTrue(ip4.isSingle) 1955 self.assertTrue(ip6.isIPv6) 1956 self.assertTrue(ip6.isSingle) 1957 self.assertTrue(asip('192.0.2.1').isIPv4) 1958 self.assertTrue(id(asip(ip4)) == id(ip4)) 1959 1960 def test_IPAddr_Raw(self): 1961 # raw string: 1962 r = IPAddr('xxx', IPAddr.CIDR_RAW) 1963 self.assertFalse(r.isIPv4) 1964 self.assertFalse(r.isIPv6) 1965 self.assertFalse(r.isSingle) 1966 self.assertTrue(r.isValid) 1967 self.assertEqual(r, 'xxx') 1968 self.assertEqual('xxx', str(r)) 1969 self.assertNotEqual(r, IPAddr('xxx')) 1970 # raw (not IP, for example host:port as string): 1971 r = IPAddr('1:2', IPAddr.CIDR_RAW) 1972 self.assertFalse(r.isIPv4) 1973 self.assertFalse(r.isIPv6) 1974 self.assertFalse(r.isSingle) 1975 self.assertTrue(r.isValid) 1976 self.assertEqual(r, '1:2') 1977 self.assertEqual('1:2', str(r)) 1978 self.assertNotEqual(r, IPAddr('1:2')) 1979 # raw vs ip4 (raw is not an ip): 1980 r = IPAddr('93.184.0.1', IPAddr.CIDR_RAW) 1981 ip4 = IPAddr('93.184.0.1') 1982 self.assertNotEqual(ip4, r) 1983 self.assertNotEqual(r, ip4) 1984 self.assertTrue(r < ip4) 1985 self.assertTrue(r < ip4) 1986 # raw vs ip6 (raw is not an ip): 1987 r = IPAddr('1::2', IPAddr.CIDR_RAW) 1988 ip6 = IPAddr('1::2') 1989 self.assertNotEqual(ip6, r) 1990 self.assertNotEqual(r, ip6) 1991 self.assertTrue(r < ip6) 1992 self.assertTrue(r < ip6) 1993 1994 def testUseDns(self): 1995 res = DNSUtils.textToIp('www.example.com', 'no') 1996 self.assertSortedEqual(res, []) 1997 #unittest.F2B.SkipIfNoNetwork() 1998 res = DNSUtils.textToIp('www.example.com', 'warn') 1999 # sort ipaddr, IPv4 is always smaller as IPv6 2000 self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946']) 2001 res = DNSUtils.textToIp('www.example.com', 'yes') 2002 # sort ipaddr, IPv4 is always smaller as IPv6 2003 self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946']) 2004 2005 def testTextToIp(self): 2006 #unittest.F2B.SkipIfNoNetwork() 2007 # Test hostnames 2008 hostnames = [ 2009 'www.example.com', 2010 'doh1.2.3.4.buga.xxxxx.yyy.invalid', 2011 '1.2.3.4.buga.xxxxx.yyy.invalid', 2012 ] 2013 for s in hostnames: 2014 res = DNSUtils.textToIp(s, 'yes') 2015 if s == 'www.example.com': 2016 # sort ipaddr, IPv4 is always smaller as IPv6 2017 self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946']) 2018 else: 2019 self.assertSortedEqual(res, []) 2020 2021 def testIpToIp(self): 2022 # pure ips: 2023 for s in ('93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'): 2024 ips = DNSUtils.textToIp(s, 'yes') 2025 self.assertSortedEqual(ips, [s]) 2026 for ip in ips: 2027 self.assertTrue(isinstance(ip, IPAddr)) 2028 2029 def testIpToName(self): 2030 #unittest.F2B.SkipIfNoNetwork() 2031 res = DNSUtils.ipToName('8.8.4.4') 2032 self.assertTrue(res.endswith(('.google', '.google.com'))) 2033 # same as above, but with IPAddr: 2034 res = DNSUtils.ipToName(IPAddr('8.8.4.4')) 2035 self.assertTrue(res.endswith(('.google', '.google.com'))) 2036 # invalid ip (TEST-NET-1 according to RFC 5737) 2037 res = DNSUtils.ipToName('192.0.2.0') 2038 self.assertEqual(res, None) 2039 # invalid ip: 2040 res = DNSUtils.ipToName('192.0.2.888') 2041 self.assertEqual(res, None) 2042 2043 def testAddr2bin(self): 2044 res = IPAddr('10.0.0.0') 2045 self.assertEqual(res.addr, 167772160) 2046 res = IPAddr('10.0.0.0', cidr=None) 2047 self.assertEqual(res.addr, 167772160) 2048 res = IPAddr('10.0.0.0', cidr=32) 2049 self.assertEqual(res.addr, 167772160) 2050 res = IPAddr('10.0.0.1', cidr=32) 2051 self.assertEqual(res.addr, 167772161) 2052 self.assertTrue(res.isSingle) 2053 res = IPAddr('10.0.0.1', cidr=31) 2054 self.assertEqual(res.addr, 167772160) 2055 self.assertFalse(res.isSingle) 2056 2057 self.assertEqual(IPAddr('10.0.0.0').hexdump, '0a000000') 2058 self.assertEqual(IPAddr('1::2').hexdump, '00010000000000000000000000000002') 2059 self.assertEqual(IPAddr('xxx').hexdump, '') 2060 2061 self.assertEqual(IPAddr('192.0.2.0').getPTR(), '0.2.0.192.in-addr.arpa.') 2062 self.assertEqual(IPAddr('192.0.2.1').getPTR(), '1.2.0.192.in-addr.arpa.') 2063 self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:1946').getPTR(), 2064 '6.4.9.1.8.c.5.2.3.9.8.1.8.4.2.0.1.0.0.0.0.2.2.0.0.0.8.2.6.0.6.2.ip6.arpa.') 2065 2066 def testIPAddr_Equal6(self): 2067 self.assertEqual( 2068 IPAddr('2606:2800:220:1:248:1893::'), 2069 IPAddr('2606:2800:220:1:248:1893:0:0') 2070 ) 2071 # special case IPv6 in brackets: 2072 self.assertEqual( 2073 IPAddr('[2606:2800:220:1:248:1893::]'), 2074 IPAddr('2606:2800:220:1:248:1893:0:0') 2075 ) 2076 2077 def testIPAddr_InInet(self): 2078 ip4net = IPAddr('93.184.0.1/24') 2079 ip6net = IPAddr('2606:2800:220:1:248:1893:25c8:0/120') 2080 self.assertFalse(ip4net.isSingle) 2081 self.assertFalse(ip6net.isSingle) 2082 # ip4: 2083 self.assertTrue(IPAddr('93.184.0.1').isInNet(ip4net)) 2084 self.assertTrue(IPAddr('93.184.0.255').isInNet(ip4net)) 2085 self.assertFalse(IPAddr('93.184.1.0').isInNet(ip4net)) 2086 self.assertFalse(IPAddr('93.184.0.1').isInNet(ip6net)) 2087 # ip6: 2088 self.assertTrue(IPAddr('2606:2800:220:1:248:1893:25c8:1').isInNet(ip6net)) 2089 self.assertTrue(IPAddr('2606:2800:220:1:248:1893:25c8:ff').isInNet(ip6net)) 2090 self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:100').isInNet(ip6net)) 2091 self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:100').isInNet(ip4net)) 2092 # raw not in net: 2093 self.assertFalse(IPAddr('93.184.0.1', IPAddr.CIDR_RAW).isInNet(ip4net)) 2094 self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:1', IPAddr.CIDR_RAW).isInNet(ip6net)) 2095 # invalid not in net: 2096 self.assertFalse(IPAddr('xxx').isInNet(ip4net)) 2097 2098 def testIPAddr_Compare(self): 2099 ip4 = [ 2100 IPAddr('93.184.0.1'), 2101 IPAddr('93.184.216.1'), 2102 IPAddr('93.184.216.34') 2103 ] 2104 ip6 = [ 2105 IPAddr('2606:2800:220:1:248:1893::'), 2106 IPAddr('2606:2800:220:1:248:1893:25c8:0'), 2107 IPAddr('2606:2800:220:1:248:1893:25c8:1946') 2108 ] 2109 # ip4 2110 self.assertNotEqual(ip4[0], None) 2111 self.assertTrue(ip4[0] is not None) 2112 self.assertFalse(ip4[0] is None) 2113 self.assertTrue(ip4[0] < ip4[1]) 2114 self.assertTrue(ip4[1] < ip4[2]) 2115 self.assertEqual(sorted(reversed(ip4)), ip4) 2116 # ip6 2117 self.assertNotEqual(ip6[0], None) 2118 self.assertTrue(ip6[0] is not None) 2119 self.assertFalse(ip6[0] is None) 2120 self.assertTrue(ip6[0] < ip6[1]) 2121 self.assertTrue(ip6[1] < ip6[2]) 2122 self.assertEqual(sorted(reversed(ip6)), ip6) 2123 # ip4 vs ip6 2124 self.assertNotEqual(ip4[0], ip6[0]) 2125 self.assertTrue(ip4[0] < ip6[0]) 2126 self.assertTrue(ip4[2] < ip6[2]) 2127 self.assertEqual(sorted(reversed(ip4+ip6)), ip4+ip6) 2128 # hashing (with string as key): 2129 d={ 2130 '93.184.216.34': 'ip4-test', 2131 '2606:2800:220:1:248:1893:25c8:1946': 'ip6-test' 2132 } 2133 d2 = dict([(IPAddr(k), v) for k, v in d.items()]) 2134 self.assertTrue(isinstance(list(d.keys())[0], str)) 2135 self.assertTrue(isinstance(list(d2.keys())[0], IPAddr)) 2136 self.assertEqual(d.get(ip4[2], ''), 'ip4-test') 2137 self.assertEqual(d.get(ip6[2], ''), 'ip6-test') 2138 self.assertEqual(d2.get(str(ip4[2]), ''), 'ip4-test') 2139 self.assertEqual(d2.get(str(ip6[2]), ''), 'ip6-test') 2140 # compare with string direct: 2141 self.assertEqual(d, d2) 2142 2143 def testIPAddr_CIDR(self): 2144 self.assertEqual(str(IPAddr('93.184.0.1', 24)), '93.184.0.0/24') 2145 self.assertEqual(str(IPAddr('192.168.1.0/255.255.255.128')), '192.168.1.0/25') 2146 self.assertEqual(IPAddr('93.184.0.1', 24).ntoa, '93.184.0.0/24') 2147 self.assertEqual(IPAddr('192.168.1.0/255.255.255.128').ntoa, '192.168.1.0/25') 2148 2149 self.assertEqual(IPAddr('93.184.0.1/32').ntoa, '93.184.0.1') 2150 self.assertEqual(IPAddr('93.184.0.1/255.255.255.255').ntoa, '93.184.0.1') 2151 2152 self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8::', 120)), '2606:2800:220:1:248:1893:25c8:0/120') 2153 self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8::', 120).ntoa, '2606:2800:220:1:248:1893:25c8:0/120') 2154 self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8:0/120')), '2606:2800:220:1:248:1893:25c8:0/120') 2155 self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:0/120').ntoa, '2606:2800:220:1:248:1893:25c8:0/120') 2156 2157 self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::', 25)), '2606:2880::/25') 2158 self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ff80::')), '2606:2880::/25') 2159 self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ffff:ffff:ffff:ffff:ffff:ffff::')), 2160 '2606:28ff:220:1:248:1893:25c8:0/112') 2161 2162 self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/128')), 2163 '2606:28ff:220:1:248:1893:25c8:0') 2164 self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')), 2165 '2606:28ff:220:1:248:1893:25c8:0') 2166 2167 def testIPAddr_CIDR_Wrong(self): 2168 # too many plen representations: 2169 self.assertRaises(ValueError, IPAddr, '2606:28ff:220:1:248:1893:25c8::/ffff::/::1') 2170 2171 def testIPAddr_CIDR_Repr(self): 2172 self.assertEqual(["127.0.0.0/8", "::/32", "2001:db8::/32"], 2173 [IPAddr("127.0.0.0", 8), IPAddr("::1", 32), IPAddr("2001:db8::", 32)] 2174 ) 2175 2176 def testIPAddr_CompareDNS(self): 2177 #unittest.F2B.SkipIfNoNetwork() 2178 ips = IPAddr('example.com') 2179 self.assertTrue(IPAddr("93.184.216.34").isInNet(ips)) 2180 self.assertTrue(IPAddr("2606:2800:220:1:248:1893:25c8:1946").isInNet(ips)) 2181 2182 def testIPAddr_wrongDNS_IP(self): 2183 unittest.F2B.SkipIfNoNetwork() 2184 DNSUtils.dnsToIp('`this`.dns-is-wrong.`wrong-nic`-dummy') 2185 DNSUtils.ipToName('*') 2186 2187 def testIPAddr_Cached(self): 2188 ips = [DNSUtils.dnsToIp('example.com'), DNSUtils.dnsToIp('example.com')] 2189 for ip1, ip2 in zip(ips, ips): 2190 self.assertEqual(id(ip1), id(ip2)) 2191 ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2)) 2192 ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2)) 2193 2194 def testFQDN(self): 2195 sname = DNSUtils.getHostname(fqdn=False) 2196 lname = DNSUtils.getHostname(fqdn=True) 2197 # FQDN is not localhost if short hostname is not localhost too (or vice versa): 2198 self.assertEqual(lname != 'localhost', 2199 sname != 'localhost') 2200 # FQDN from short name should be long name: 2201 self.assertEqual(getfqdn(sname), lname) 2202 # FQDN from FQDN is the same: 2203 self.assertEqual(getfqdn(lname), lname) 2204 # coverage (targeting all branches): FQDN from loopback and DNS blackhole is always the same: 2205 self.assertIn(getfqdn('localhost.'), ('localhost', 'localhost.')) 2206 2207 def testFQDN_DNS(self): 2208 unittest.F2B.SkipIfNoNetwork() 2209 self.assertIn(getfqdn('as112.arpa.'), ('as112.arpa.', 'as112.arpa')) 2210 2211 2212class JailTests(unittest.TestCase): 2213 2214 def testSetBackend_gh83(self): 2215 # smoke test 2216 # Must not fail to initiate 2217 Jail('test', backend='polling') 2218 2219