1# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
2# vi: set ft=python sts=4 ts=4 sw=4 noet :
3
4# This file is part of Fail2Ban.
5#
6# Fail2Ban is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# Fail2Ban is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Fail2Ban; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19
20# Fail2Ban developers
21
22__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
23__license__ = "GPL"
24
25from builtins import open as fopen
26import unittest
27import os
28import re
29import sys
30import time, datetime
31import tempfile
32import uuid
33
34try:
35	from systemd import journal
36except ImportError:
37	journal = None
38
39from ..server.jail import Jail
40from ..server.filterpoll import FilterPoll
41from ..server.filter import FailTicket, Filter, FileFilter, FileContainer
42from ..server.failmanager import FailManagerEmpty
43from ..server.ipdns import asip, getfqdn, DNSUtils, IPAddr
44from ..server.mytime import MyTime
45from ..server.utils import Utils, uni_decode
46from .databasetestcase import getFail2BanDb
47from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_alt_time, with_tmpdir, LogCaptureTestCase, \
48	logSys as DefLogSys, CONFIG_DIR as STOCK_CONF_DIR
49from .dummyjail import DummyJail
50
51TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
52
53
54# yoh: per Steven Hiscocks's insight while troubleshooting
55# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
56# adding a sufficiently large buffer might help to guarantee that
57# writes happen atomically.
58def open(*args):
59	"""Overload built in open so we could assure sufficiently large buffer
60
61	Explicit .flush would be needed to assure that changes leave the buffer
62	"""
63	if len(args) == 2:
64		# ~50kB buffer should be sufficient for all tests here.
65		args = args + (50000,)
66	return fopen(*args)
67
68
69def _killfile(f, name):
70	try:
71		f.close()
72	except:
73		pass
74	try:
75		os.unlink(name)
76	except:
77		pass
78
79	# there might as well be the .bak file
80	if os.path.exists(name + '.bak'):
81		_killfile(None, name + '.bak')
82
83
84_maxWaitTime = unittest.F2B.maxWaitTime
85
86
87class _tmSerial():
88	_last_s = -0x7fffffff
89	_last_m = -0x7fffffff
90	_str_s = ""
91	_str_m = ""
92	@staticmethod
93	def _tm(time):
94		# ## strftime it too slow for large time serializer :
95		# return MyTime.time2str(time)
96		c = _tmSerial
97		sec = (time % 60)
98		if c._last_s == time - sec:
99			return "%s%02u" % (c._str_s, sec)
100		mt = (time % 3600)
101		if c._last_m == time - mt:
102			c._last_s = time - sec
103			c._str_s = "%s%02u:" % (c._str_m, mt // 60)
104			return "%s%02u" % (c._str_s, sec)
105		c._last_m = time - mt
106		c._str_m = datetime.datetime.fromtimestamp(time).strftime("%Y-%m-%d %H:")
107		c._last_s = time - sec
108		c._str_s = "%s%02u:" % (c._str_m, mt // 60)
109		return "%s%02u" % (c._str_s, sec)
110
111_tm = _tmSerial._tm
112
113
114def _assert_equal_entries(utest, found, output, count=None):
115	"""Little helper to unify comparisons with the target entries
116
117	and report helpful failure reports instead of millions of seconds ;)
118	"""
119	utest.assertEqual(found[0], output[0])            # IP
120	utest.assertEqual(found[1], count or output[1])   # count
121	found_time, output_time = \
122				MyTime.localtime(found[2]),\
123				MyTime.localtime(output[2])
124	try:
125		utest.assertEqual(found_time, output_time)
126	except AssertionError as e:
127		# assert more structured:
128		utest.assertEqual((float(found[2]), found_time), (float(output[2]), output_time))
129	if len(output) > 3 and count is None: # match matches
130		# do not check if custom count (e.g. going through them twice)
131		if os.linesep != '\n' or sys.platform.startswith('cygwin'):
132			# on those where text file lines end with '\r\n', remove '\r'
133			srepr = lambda x: repr(x).replace(r'\r', '')
134		else:
135			srepr = repr
136		utest.assertEqual(srepr(found[3]), srepr(output[3]))
137
138
139def _ticket_tuple(ticket):
140	"""Create a tuple for easy comparison from fail ticket
141	"""
142	attempts = ticket.getAttempt()
143	date = ticket.getTime()
144	ip = ticket.getIP()
145	matches = ticket.getMatches()
146	return (ip, attempts, date, matches)
147
148
149def _assert_correct_last_attempt(utest, filter_, output, count=None):
150	"""Additional helper to wrap most common test case
151
152	Test filter to contain target ticket
153	"""
154	# one or multiple tickets:
155	if not isinstance(output[0], (tuple,list)):
156		tickcount = 1
157		failcount = (count if count else output[1])
158	else:
159		tickcount = len(output)
160		failcount = (count if count else sum((o[1] for o in output)))
161
162	found = []
163	if isinstance(filter_, DummyJail):
164		# get fail ticket from jail
165		found.append(_ticket_tuple(filter_.getFailTicket()))
166	else:
167		# when we are testing without jails
168		# wait for failures (up to max time)
169		Utils.wait_for(
170			lambda: filter_.failManager.getFailCount() >= (tickcount, failcount),
171			_maxWaitTime(10))
172		# get fail ticket(s) from filter
173		while tickcount:
174			try:
175				found.append(_ticket_tuple(filter_.failManager.toBan()))
176			except FailManagerEmpty:
177				break
178			tickcount -= 1
179
180	if not isinstance(output[0], (tuple,list)):
181		utest.assertEqual(len(found), 1)
182		_assert_equal_entries(utest, found[0], output, count)
183	else:
184		# sort by string representation of ip (multiple failures with different ips):
185		found = sorted(found, key=lambda x: str(x))
186		output = sorted(output, key=lambda x: str(x))
187		for f, o in zip(found, output):
188			_assert_equal_entries(utest, f, o)
189
190
191def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""):
192	"""Copy lines from one file to another (which might be already open)
193
194	Returns open fout
195	"""
196	# on old Python st_mtime is int, so we should give at least 1 sec so
197	# polling filter could detect the change
198	mtimesleep()
199	if isinstance(in_, str): # pragma: no branch - only used with str in test cases
200		fin = open(in_, 'rb')
201	else:
202		fin = in_
203	# Skip
204	for i in range(skip):
205		fin.readline()
206	# Read
207	i = 0
208	lines = []
209	while n is None or i < n:
210		l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n')
211		if terminal_line is not None and l == terminal_line:
212			break
213		lines.append(l)
214		i += 1
215	# Write: all at once and flush
216	if isinstance(fout, str):
217		fout = open(fout, mode)
218	fout.write('\n'.join(lines)+'\n')
219	fout.flush()
220	if isinstance(in_, str): # pragma: no branch - only used with str in test cases
221		# Opened earlier, therefore must close it
222		fin.close()
223	# to give other threads possibly some time to crunch
224	time.sleep(Utils.DEFAULT_SHORT_INTERVAL)
225	return fout
226
227
228TEST_JOURNAL_FIELDS = {
229  "SYSLOG_IDENTIFIER": "fail2ban-testcases",
230	"PRIORITY": "7",
231}
232def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # pragma: systemd no cover
233	"""Copy lines from one file to systemd journal
234
235	Returns None
236	"""
237	if isinstance(in_, str): # pragma: no branch - only used with str in test cases
238		fin = open(in_, 'rb')
239	else:
240		fin = in_
241	# Required for filtering
242	fields.update(TEST_JOURNAL_FIELDS)
243	# Skip
244	for i in range(skip):
245		fin.readline()
246	# Read/Write
247	i = 0
248	while n is None or i < n:
249		l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n')
250		if terminal_line is not None and l == terminal_line:
251			break
252		journal.send(MESSAGE=l.strip(), **fields)
253		i += 1
254	if isinstance(in_, str): # pragma: no branch - only used with str in test cases
255		# Opened earlier, therefore must close it
256		fin.close()
257
258
259#
260#  Actual tests
261#
262
263class BasicFilter(unittest.TestCase):
264
265	def setUp(self):
266		super(BasicFilter, self).setUp()
267		self.filter = Filter(None)
268
269	def testGetSetUseDNS(self):
270		# default is warn
271		self.assertEqual(self.filter.getUseDns(), 'warn')
272		self.filter.setUseDns(True)
273		self.assertEqual(self.filter.getUseDns(), 'yes')
274		self.filter.setUseDns(False)
275		self.assertEqual(self.filter.getUseDns(), 'no')
276
277	def testGetSetDatePattern(self):
278		self.assertEqual(self.filter.getDatePattern(),
279			(None, "Default Detectors"))
280		self.filter.setDatePattern(r"^%Y-%m-%d-%H%M%S\.%f %z **")
281		self.assertEqual(self.filter.getDatePattern(),
282			(r"^%Y-%m-%d-%H%M%S\.%f %z **",
283			r"^Year-Month-Day-24hourMinuteSecond\.Microseconds Zone offset **"))
284
285	def testGetSetLogTimeZone(self):
286		self.assertEqual(self.filter.getLogTimeZone(), None)
287		self.filter.setLogTimeZone('UTC')
288		self.assertEqual(self.filter.getLogTimeZone(), 'UTC')
289		self.filter.setLogTimeZone('UTC-0400')
290		self.assertEqual(self.filter.getLogTimeZone(), 'UTC-0400')
291		self.filter.setLogTimeZone('UTC+0200')
292		self.assertEqual(self.filter.getLogTimeZone(), 'UTC+0200')
293		self.assertRaises(ValueError, self.filter.setLogTimeZone, 'not-a-time-zone')
294
295	def testAssertWrongTime(self):
296		self.assertRaises(AssertionError,
297			lambda: _assert_equal_entries(self,
298				('1.1.1.1', 1, 1421262060.0),
299				('1.1.1.1', 1, 1421262059.0),
300			1)
301		)
302
303	def testTest_tm(self):
304		unittest.F2B.SkipIfFast()
305		## test function "_tm" works correct (returns the same as slow strftime):
306		for i in range(1417512352, (1417512352 // 3600 + 3) * 3600):
307			tm = MyTime.time2str(i)
308			if _tm(i) != tm: # pragma: no cover - never reachable
309				self.assertEqual((_tm(i), i), (tm, i))
310
311	def testWrongCharInTupleLine(self):
312		## line tuple has different types (ascii after ascii / unicode):
313		for a1 in ('', '', b''):
314			for a2 in ('2016-09-05T20:18:56', '2016-09-05T20:18:56', b'2016-09-05T20:18:56'):
315				for a3 in (
316					'Fail for "g\xc3\xb6ran" from 192.0.2.1',
317					'Fail for "g\xc3\xb6ran" from 192.0.2.1',
318					b'Fail for "g\xc3\xb6ran" from 192.0.2.1'
319				):
320					# join should work if all arguments have the same type:
321					"".join([uni_decode(v) for v in (a1, a2, a3)])
322
323
324class IgnoreIP(LogCaptureTestCase):
325
326	def setUp(self):
327		"""Call before every test case."""
328		LogCaptureTestCase.setUp(self)
329		self.jail = DummyJail()
330		self.filter = FileFilter(self.jail)
331		self.filter.ignoreSelf = False
332
333	def testIgnoreSelfIP(self):
334		ipList = ("127.0.0.1",)
335		# test ignoreSelf is false:
336		for ip in ipList:
337			self.assertFalse(self.filter.inIgnoreIPList(ip))
338			self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ignoreself rule"))
339		# test ignoreSelf with true:
340		self.filter.ignoreSelf = True
341		self.pruneLog()
342		for ip in ipList:
343			self.assertTrue(self.filter.inIgnoreIPList(ip))
344			self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ignoreself rule"))
345
346	def testIgnoreIPOK(self):
347		ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
348		for ip in ipList:
349			self.filter.addIgnoreIP(ip)
350			self.assertTrue(self.filter.inIgnoreIPList(ip))
351			self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ip"))
352
353	def testIgnoreIPNOK(self):
354		ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0."
355		for ip in ipList:
356			self.filter.addIgnoreIP(ip)
357			self.assertFalse(self.filter.inIgnoreIPList(ip))
358		if not unittest.F2B.no_network: # pragma: no cover
359			self.assertLogged(
360				'Unable to find a corresponding IP address for 999.999.999.999',
361				'Unable to find a corresponding IP address for abcdef.abcdef',
362				'Unable to find a corresponding IP address for 192.168.0.', all=True)
363
364	def testIgnoreIPCIDR(self):
365		self.filter.addIgnoreIP('192.168.1.0/25')
366		self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
367		self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
368		self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
369		self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
370		self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
371		self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
372
373	def testIgnoreIPMask(self):
374		self.filter.addIgnoreIP('192.168.1.0/255.255.255.128')
375		self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
376		self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
377		self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
378		self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
379		self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
380		self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
381
382	def testWrongIPMask(self):
383		self.filter.addIgnoreIP('192.168.1.0/255.255.0.0')
384		self.assertRaises(ValueError, self.filter.addIgnoreIP, '192.168.1.0/255.255.0.128')
385
386	def testIgnoreInProcessLine(self):
387		setUpMyTime()
388		try:
389			self.filter.addIgnoreIP('192.168.1.0/25')
390			self.filter.addFailRegex('<HOST>')
391			self.filter.setDatePattern(r'{^LN-BEG}EPOCH')
392			self.filter.processLineAndAdd('1387203300.222 192.168.1.32')
393			self.assertLogged('Ignore 192.168.1.32')
394		finally:
395			tearDownMyTime()
396
397	def _testTimeJump(self, inOperation=False):
398		try:
399			self.filter.addFailRegex('^<HOST>')
400			self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s')
401			self.filter.setFindTime(10); # max 10 seconds back
402			self.filter.setMaxRetry(5); # don't ban here
403			self.filter.inOperation = inOperation
404			#
405			self.pruneLog('[phase 1] DST time jump')
406			# check local time jump (DST hole):
407			MyTime.setTime(1572137999)
408			self.filter.processLineAndAdd('2019-10-27 02:59:59 192.0.2.5'); # +1 = 1
409			MyTime.setTime(1572138000)
410			self.filter.processLineAndAdd('2019-10-27 02:00:00 192.0.2.5'); # +1 = 2
411			MyTime.setTime(1572138001)
412			self.filter.processLineAndAdd('2019-10-27 02:00:01 192.0.2.5'); # +1 = 3
413			self.assertLogged(
414				'Current failures from 1 IPs (IP:count): 192.0.2.5:1',
415				'Current failures from 1 IPs (IP:count): 192.0.2.5:2',
416				'Current failures from 1 IPs (IP:count): 192.0.2.5:3',
417				"Total # of detected failures: 3.", all=True, wait=True)
418			self.assertNotLogged('Ignore line')
419			#
420			self.pruneLog('[phase 2] UTC time jump (NTP correction)')
421			# check time drifting backwards (NTP correction):
422			MyTime.setTime(1572210000)
423			self.filter.processLineAndAdd('2019-10-27 22:00:00 CET 192.0.2.6'); # +1 = 1
424			MyTime.setTime(1572200000)
425			self.filter.processLineAndAdd('2019-10-27 22:00:01 CET 192.0.2.6'); # +1 = 2 (logged before correction)
426			self.filter.processLineAndAdd('2019-10-27 19:13:20 CET 192.0.2.6'); # +1 = 3 (logged after correction)
427			self.filter.processLineAndAdd('2019-10-27 19:13:21 CET 192.0.2.6'); # +1 = 4
428			self.assertLogged(
429				'192.0.2.6:1', '192.0.2.6:2', '192.0.2.6:3', '192.0.2.6:4',
430				"Total # of detected failures: 7.", all=True, wait=True)
431			self.assertNotLogged('Ignore line')
432		finally:
433			tearDownMyTime()
434	def testTimeJump(self):
435		self._testTimeJump(inOperation=False)
436	def testTimeJump_InOperation(self):
437		self._testTimeJump(inOperation=True)
438
439	def testWrongTimeZone(self):
440		try:
441			self.filter.addFailRegex('fail from <ADDR>$')
442			self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s')
443			self.filter.setMaxRetry(5); # don't ban here
444			self.filter.inOperation = True; # real processing (all messages are new)
445			# current time is 1h later than log-entries:
446			MyTime.setTime(1572138000+3600)
447			#
448			self.pruneLog("[phase 1] simulate wrong TZ")
449			for i in (1,2,3):
450				self.filter.processLineAndAdd('2019-10-27 02:00:00 fail from 192.0.2.15'); # +3 = 3
451			self.assertLogged(
452				"Simulate NOW in operation since found time has too large deviation",
453				"Please check jail has possibly a timezone issue.",
454				"192.0.2.15:1", "192.0.2.15:2", "192.0.2.15:3",
455				"Total # of detected failures: 3.", wait=True)
456			#
457			self.pruneLog("[phase 2] wrong TZ given in log")
458			for i in (1,2,3):
459				self.filter.processLineAndAdd('2019-10-27 04:00:00 GMT fail from 192.0.2.16'); # +3 = 6
460			self.assertLogged(
461				"192.0.2.16:1", "192.0.2.16:2", "192.0.2.16:3",
462				"Total # of detected failures: 6.", all=True, wait=True)
463			self.assertNotLogged("Found a match but no valid date/time found")
464			#
465			self.pruneLog("[phase 3] other timestamp (don't match datepattern), regex matches")
466			for i in range(3):
467				self.filter.processLineAndAdd('27.10.2019 04:00:00 fail from 192.0.2.17'); # +3 = 9
468			self.assertLogged(
469				"Found a match but no valid date/time found",
470				"Match without a timestamp:",
471				"192.0.2.17:1", "192.0.2.17:2", "192.0.2.17:3",
472				"Total # of detected failures: 9.", all=True, wait=True)
473		finally:
474			tearDownMyTime()
475
476	def testAddAttempt(self):
477		self.filter.setMaxRetry(3)
478		for i in range(1, 1+3):
479			self.filter.addAttempt('192.0.2.1')
480			self.assertLogged('Attempt 192.0.2.1', '192.0.2.1:%d' % i, all=True, wait=True)
481		self.jail.actions._Actions__checkBan()
482		self.assertLogged('Ban 192.0.2.1', wait=True)
483
484	def testIgnoreCommand(self):
485		self.filter.ignoreCommand = sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>")
486		self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
487		self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
488		self.assertLogged("returned successfully 0", "returned successfully 1", all=True)
489		self.pruneLog()
490		self.assertFalse(self.filter.inIgnoreIPList(""))
491		self.assertLogged("usage: ignorecommand IP", "returned 10", all=True)
492
493	def testIgnoreCommandForTicket(self):
494		# by host of IP (2001:db8::1 and 2001:db8::ffff map to "test-host" and "test-other" in the test-suite):
495		self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1'
496		self.pruneLog()
497		self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1")))
498		self.assertLogged("returned successfully 0")
499		self.pruneLog()
500		self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff")))
501		self.assertLogged("returned successfully 1")
502		# by user-name (ignore tester):
503		self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1'
504		self.pruneLog()
505		self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'})))
506		self.assertLogged("returned successfully 0")
507		self.pruneLog()
508		self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'})))
509		self.assertLogged("returned successfully 1", all=True)
510
511	def testIgnoreCache(self):
512		# like both test-cases above, just cached (so once per key)...
513		self.filter.ignoreCache = {"key":"<ip>"}
514		self.filter.ignoreCommand = 'if [ "<ip>" = "10.0.0.1" ]; then exit 0; fi; exit 1'
515		for i in range(5):
516			self.pruneLog()
517			self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
518			self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
519			if not i:
520				self.assertLogged("returned successfully 0", "returned successfully 1", all=True)
521			else:
522				self.assertNotLogged("returned successfully 0", "returned successfully 1", all=True)
523		# by host of IP:
524		self.filter.ignoreCache = {"key":"<ip-host>"}
525		self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1'
526		for i in range(5):
527			self.pruneLog()
528			self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1")))
529			self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff")))
530			if not i:
531				self.assertLogged("returned successfully")
532			else:
533				self.assertNotLogged("returned successfully")
534		# by user-name:
535		self.filter.ignoreCache = {"key":"<F-USER>", "max-count":"10", "max-time":"1h"}
536		self.assertEqual(self.filter.ignoreCache, ["<F-USER>", 10, 60*60])
537		self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1'
538		for i in range(5):
539			self.pruneLog()
540			self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'})))
541			self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'})))
542			if not i:
543				self.assertLogged("returned successfully")
544			else:
545				self.assertNotLogged("returned successfully")
546
547	def testIgnoreCauseOK(self):
548		ip = "93.184.216.34"
549		for ignore_source in ["dns", "ip", "command"]:
550			self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source)
551			self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
552
553	def testIgnoreCauseNOK(self):
554		self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
555		self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))
556
557
558class IgnoreIPDNS(LogCaptureTestCase):
559
560	def setUp(self):
561		"""Call before every test case."""
562		unittest.F2B.SkipIfNoNetwork()
563		LogCaptureTestCase.setUp(self)
564		self.jail = DummyJail()
565		self.filter = FileFilter(self.jail)
566
567	def testIgnoreIPDNS(self):
568		for dns in ("www.epfl.ch", "example.com"):
569			self.filter.addIgnoreIP(dns)
570			ips = DNSUtils.dnsToIp(dns)
571			self.assertTrue(len(ips) > 0)
572			# for each ip from dns check ip ignored:
573			for ip in ips:
574				ip = str(ip)
575				DefLogSys.debug('  ++ positive case for %s', ip)
576				self.assertTrue(self.filter.inIgnoreIPList(ip))
577				# check another ips (with increment/decrement of first/last part) not ignored:
578				iparr = []
579				ip2 = re.search(r'^([^.:]+)([.:])(.*?)([.:])([^.:]+)$', ip)
580				if ip2:
581					ip2 = ip2.groups()
582					for o in (0, 4):
583						for i in (1, -1):
584							ipo = list(ip2)
585							if ipo[1] == '.':
586								ipo[o] = str(int(ipo[o])+i)
587							else:
588								ipo[o] = '%x' % (int(ipo[o], 16)+i)
589							ipo = ''.join(ipo)
590							if ipo not in ips:
591								iparr.append(ipo)
592				self.assertTrue(len(iparr) > 0)
593				for ip in iparr:
594					DefLogSys.debug('  -- negative case for %s', ip)
595					self.assertFalse(self.filter.inIgnoreIPList(str(ip)))
596
597	def testIgnoreCmdApacheFakegooglebot(self):
598		unittest.F2B.SkipIfCfgMissing(stock=True)
599		cmd = os.path.join(STOCK_CONF_DIR, "filter.d/ignorecommands/apache-fakegooglebot")
600		## below test direct as python module:
601		mod = Utils.load_python_module(cmd)
602		self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "128.178.222.69"])))
603		self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "192.0.2.1"])))
604		bot_ips = ['66.249.66.1']
605		for ip in bot_ips:
606			self.assertTrue(mod.is_googlebot(mod.process_args([cmd, str(ip)])), "test of googlebot ip %s failed" % ip)
607		self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd])))
608		self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd, "192.0"])))
609		## via command:
610		self.filter.ignoreCommand = cmd + " <ip>"
611		for ip in bot_ips:
612			self.assertTrue(self.filter.inIgnoreIPList(str(ip)), "test of googlebot ip %s failed" % ip)
613			self.assertLogged('-- returned successfully')
614			self.pruneLog()
615		self.assertFalse(self.filter.inIgnoreIPList("192.0"))
616		self.assertLogged('Argument must be a single valid IP.')
617		self.pruneLog()
618		self.filter.ignoreCommand = cmd + " bad arguments <ip>"
619		self.assertFalse(self.filter.inIgnoreIPList("192.0"))
620		self.assertLogged('Please provide a single IP as an argument.')
621
622
623
624class LogFile(LogCaptureTestCase):
625
626	MISSING = 'testcases/missingLogFile'
627
628	def setUp(self):
629		LogCaptureTestCase.setUp(self)
630
631	def tearDown(self):
632		LogCaptureTestCase.tearDown(self)
633
634	def testMissingLogFiles(self):
635		self.filter = FilterPoll(None)
636		self.assertRaises(IOError, self.filter.addLogPath, LogFile.MISSING)
637
638
639class LogFileFilterPoll(unittest.TestCase):
640
641	FILENAME = os.path.join(TEST_FILES_DIR, "testcase01.log")
642
643	def setUp(self):
644		"""Call before every test case."""
645		super(LogFileFilterPoll, self).setUp()
646		self.filter = FilterPoll(DummyJail())
647		self.filter.addLogPath(LogFileFilterPoll.FILENAME)
648
649	def tearDown(self):
650		"""Call after every test case."""
651		super(LogFileFilterPoll, self).tearDown()
652
653	#def testOpen(self):
654	#	self.filter.openLogFile(LogFile.FILENAME)
655
656	def testIsModified(self):
657		self.assertTrue(self.filter.isModified(LogFileFilterPoll.FILENAME))
658		self.assertFalse(self.filter.isModified(LogFileFilterPoll.FILENAME))
659
660	def testSeekToTimeSmallFile(self):
661		# speedup search using exact date pattern:
662		self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
663		fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
664		time = 1417512352
665		f = open(fname, 'w')
666		fc = None
667		try:
668			fc = FileContainer(fname, self.filter.getLogEncoding())
669			fc.open()
670			fc.setPos(0); self.filter.seekToTime(fc, time)
671			f.flush()
672			# empty :
673			fc.setPos(0); self.filter.seekToTime(fc, time)
674			self.assertEqual(fc.getPos(), 0)
675			# one entry with exact time:
676			f.write("%s [sshd] error: PAM: failure len 1\n" % _tm(time))
677			f.flush()
678			fc.setPos(0); self.filter.seekToTime(fc, time)
679
680			# rewrite :
681			f.seek(0)
682			f.truncate()
683			fc.close()
684			fc = FileContainer(fname, self.filter.getLogEncoding())
685			fc.open()
686			# no time - nothing should be found :
687			for i in range(10):
688				f.write("[sshd] error: PAM: failure len 1\n")
689				f.flush()
690				fc.setPos(0); self.filter.seekToTime(fc, time)
691
692			# rewrite
693			f.seek(0)
694			f.truncate()
695			fc.close()
696			fc = FileContainer(fname, self.filter.getLogEncoding())
697			fc.open()
698			# one entry with smaller time:
699			f.write("%s [sshd] error: PAM: failure len 2\n" % _tm(time - 10))
700			f.flush()
701			fc.setPos(0); self.filter.seekToTime(fc, time)
702			self.assertEqual(fc.getPos(), 53)
703			# two entries with smaller time:
704			f.write("%s [sshd] error: PAM: failure len 3 2 1\n" % _tm(time - 9))
705			f.flush()
706			fc.setPos(0); self.filter.seekToTime(fc, time)
707			self.assertEqual(fc.getPos(), 110)
708			# check move after end (all of time smaller):
709			f.write("%s [sshd] error: PAM: failure\n" % _tm(time - 1))
710			f.flush()
711			self.assertEqual(fc.getFileSize(), 157)
712			fc.setPos(0); self.filter.seekToTime(fc, time)
713			self.assertEqual(fc.getPos(), 157)
714
715			# stil one exact line:
716			f.write("%s [sshd] error: PAM: Authentication failure\n" % _tm(time))
717			f.write("%s [sshd] error: PAM: failure len 1\n" % _tm(time))
718			f.flush()
719			fc.setPos(0); self.filter.seekToTime(fc, time)
720			self.assertEqual(fc.getPos(), 157)
721
722			# add something hereafter:
723			f.write("%s [sshd] error: PAM: failure len 3 2 1\n" % _tm(time + 2))
724			f.write("%s [sshd] error: PAM: Authentication failure\n" % _tm(time + 3))
725			f.flush()
726			fc.setPos(0); self.filter.seekToTime(fc, time)
727			self.assertEqual(fc.getPos(), 157)
728			# add something hereafter:
729			f.write("%s [sshd] error: PAM: failure\n" % _tm(time + 9))
730			f.write("%s [sshd] error: PAM: failure len 4 3 2\n" % _tm(time + 9))
731			f.flush()
732			fc.setPos(0); self.filter.seekToTime(fc, time)
733			self.assertEqual(fc.getPos(), 157)
734			# start search from current pos :
735			fc.setPos(157); self.filter.seekToTime(fc, time)
736			self.assertEqual(fc.getPos(), 157)
737			# start search from current pos :
738			fc.setPos(110); self.filter.seekToTime(fc, time)
739			self.assertEqual(fc.getPos(), 157)
740
741		finally:
742			if fc:
743				fc.close()
744			_killfile(f, fname)
745
746	def testSeekToTimeLargeFile(self):
747		# speedup search using exact date pattern:
748		self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
749		fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
750		time = 1417512352
751		f = open(fname, 'w')
752		fc = None
753		count = 1000 if unittest.F2B.fast else 10000
754		try:
755			fc = FileContainer(fname, self.filter.getLogEncoding())
756			fc.open()
757			f.seek(0)
758			# variable length of file (ca 45K or 450K before and hereafter):
759			# write lines with smaller as search time:
760			t = time - count - 1
761			for i in range(count):
762				f.write("%s [sshd] error: PAM: failure\n" % _tm(t))
763				t += 1
764			f.flush()
765			fc.setPos(0); self.filter.seekToTime(fc, time)
766			self.assertEqual(fc.getPos(), 47*count)
767			# write lines with exact search time:
768			for i in range(10):
769				f.write("%s [sshd] error: PAM: failure\n" % _tm(time))
770			f.flush()
771			fc.setPos(0); self.filter.seekToTime(fc, time)
772			self.assertEqual(fc.getPos(), 47*count)
773			fc.setPos(4*count); self.filter.seekToTime(fc, time)
774			self.assertEqual(fc.getPos(), 47*count)
775			# write lines with greater as search time:
776			t = time+1
777			for i in range(count//500):
778				for j in range(500):
779					f.write("%s [sshd] error: PAM: failure\n" % _tm(t))
780					t += 1
781				f.flush()
782				fc.setPos(0); self.filter.seekToTime(fc, time)
783				self.assertEqual(fc.getPos(), 47*count)
784				fc.setPos(53); self.filter.seekToTime(fc, time)
785				self.assertEqual(fc.getPos(), 47*count)
786
787		finally:
788			if fc:
789				fc.close()
790			_killfile(f, fname)
791
792class LogFileMonitor(LogCaptureTestCase):
793	"""Few more tests for FilterPoll API
794	"""
795	def setUp(self):
796		"""Call before every test case."""
797		setUpMyTime()
798		LogCaptureTestCase.setUp(self)
799		self.filter = self.name = 'NA'
800		_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
801		self.file = open(self.name, 'a')
802		self.filter = FilterPoll(DummyJail())
803		self.filter.banASAP = False # avoid immediate ban in this tests
804		self.filter.addLogPath(self.name, autoSeek=False)
805		self.filter.active = True
806		self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
807
808	def tearDown(self):
809		tearDownMyTime()
810		LogCaptureTestCase.tearDown(self)
811		_killfile(self.file, self.name)
812		pass
813
814	def isModified(self, delay=2):
815		"""Wait up to `delay` sec to assure that it was modified or not
816		"""
817		return Utils.wait_for(lambda: self.filter.isModified(self.name), _maxWaitTime(delay))
818
819	def notModified(self, delay=2):
820		"""Wait up to `delay` sec as long as it was not modified
821		"""
822		return Utils.wait_for(lambda: not self.filter.isModified(self.name), _maxWaitTime(delay))
823
824	def testUnaccessibleLogFile(self):
825		os.chmod(self.name, 0)
826		self.filter.getFailures(self.name)
827		failure_was_logged = self._is_logged('Unable to open %s' % self.name)
828		# verify that we cannot access the file. Checking by name of user is not
829		# sufficient since could be a fakeroot or some other super-user
830		is_root = True
831		try:
832			with open(self.name) as f: # pragma: no cover - normally no root
833				f.read()
834		except IOError:
835			is_root = False
836
837		# If ran as root, those restrictive permissions would not
838		# forbid log to be read.
839		self.assertTrue(failure_was_logged != is_root)
840
841	def testNoLogFile(self):
842		_killfile(self.file, self.name)
843		self.filter.getFailures(self.name)
844		self.assertLogged('Unable to open %s' % self.name)
845
846	def testErrorProcessLine(self):
847		# speedup search using exact date pattern:
848		self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
849		self.filter.sleeptime /= 1000.0
850		## produce error with not callable processLine:
851		_org_processLine = self.filter.processLine
852		self.filter.processLine = None
853		for i in range(100):
854			self.file.write("line%d\n" % 1)
855		self.file.flush()
856		for i in range(100):
857			self.filter.getFailures(self.name)
858		self.assertLogged('Failed to process line:')
859		self.assertLogged('Too many errors at once')
860		self.pruneLog()
861		self.assertTrue(self.filter.idle)
862		self.filter.idle = False
863		self.filter.getFailures(self.name)
864		self.filter.processLine = _org_processLine
865		self.file.write("line%d\n" % 1)
866		self.file.flush()
867		self.filter.getFailures(self.name)
868		self.assertNotLogged('Failed to process line:')
869
870	def testRemovingFailRegex(self):
871		self.filter.delFailRegex(0)
872		self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid')
873		self.filter.delFailRegex(0)
874		self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
875
876	def testRemovingIgnoreRegex(self):
877		self.filter.delIgnoreRegex(0)
878		self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
879
880	def testNewChangeViaIsModified(self):
881		# it is a brand new one -- so first we think it is modified
882		self.assertTrue(self.isModified())
883		# but not any longer
884		self.assertTrue(self.notModified())
885		self.assertTrue(self.notModified())
886		mtimesleep()				# to guarantee freshier mtime
887		for i in range(4):			  # few changes
888			# unless we write into it
889			self.file.write("line%d\n" % i)
890			self.file.flush()
891			self.assertTrue(self.isModified())
892			self.assertTrue(self.notModified())
893			mtimesleep()				# to guarantee freshier mtime
894		os.rename(self.name, self.name + '.old')
895		# we are not signaling as modified whenever
896		# it gets away
897		self.assertTrue(self.notModified(1))
898		f = open(self.name, 'a')
899		self.assertTrue(self.isModified())
900		self.assertTrue(self.notModified())
901		mtimesleep()
902		f.write("line%d\n" % i)
903		f.flush()
904		self.assertTrue(self.isModified())
905		self.assertTrue(self.notModified())
906		_killfile(f, self.name)
907		_killfile(self.name, self.name + '.old')
908		pass
909
910	def testNewChangeViaGetFailures_simple(self):
911		# speedup search using exact date pattern:
912		self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
913		# suck in lines from this sample log file
914		self.filter.getFailures(self.name)
915		self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
916
917		# Now let's feed it with entries from the file
918		_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
919		self.filter.getFailures(self.name)
920		self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
921		# and it should have not been enough
922
923		_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3)
924		self.filter.getFailures(self.name)
925		_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
926
927	def testNewChangeViaGetFailures_rewrite(self):
928		# speedup search using exact date pattern:
929		self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
930		#
931		# if we rewrite the file at once
932		self.file.close()
933		_copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
934		self.filter.getFailures(self.name)
935		_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
936
937		# What if file gets overridden
938		# yoh: skip so we skip those 2 identical lines which our
939		# filter "marked" as the known beginning, otherwise it
940		# would not detect "rotation"
941		self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
942											  skip=12, n=3, mode='w')
943		self.filter.getFailures(self.name)
944		#self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
945		_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
946
947	def testNewChangeViaGetFailures_move(self):
948		# speedup search using exact date pattern:
949		self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
950		#
951		# if we move file into a new location while it has been open already
952		self.file.close()
953		self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
954											  n=14, mode='w')
955		self.filter.getFailures(self.name)
956		self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
957		self.assertEqual(self.filter.failManager.getFailTotal(), 2)
958
959		# move aside, but leaving the handle still open...
960		os.rename(self.name, self.name + '.bak')
961		_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1).close()
962		self.filter.getFailures(self.name)
963		_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
964		self.assertEqual(self.filter.failManager.getFailTotal(), 3)
965
966
967class CommonMonitorTestCase(unittest.TestCase):
968
969	def setUp(self):
970		"""Call before every test case."""
971		super(CommonMonitorTestCase, self).setUp()
972		self._failTotal = 0
973
974	def waitFailTotal(self, count, delay=1):
975		"""Wait up to `delay` sec to assure that expected failure `count` reached
976		"""
977		ret = Utils.wait_for(
978			lambda: self.filter.failManager.getFailTotal() >= (self._failTotal + count) and self.jail.isFilled(),
979			_maxWaitTime(delay))
980		self._failTotal += count
981		return ret
982
983	def isFilled(self, delay=1):
984		"""Wait up to `delay` sec to assure that it was modified or not
985		"""
986		return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay))
987
988	def isEmpty(self, delay=5):
989		"""Wait up to `delay` sec to assure that it empty again
990		"""
991		return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay))
992
993	def waitForTicks(self, ticks, delay=2):
994		"""Wait up to `delay` sec to assure that it was modified or not
995		"""
996		last_ticks = self.filter.ticks
997		return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay))
998
999
1000def get_monitor_failures_testcase(Filter_):
1001	"""Generator of TestCase's for different filters/backends
1002	"""
1003
1004	# add Filter_'s name so we could easily identify bad cows
1005	testclass_name = tempfile.mktemp(
1006		'fail2ban', 'monitorfailures_%s_' % (Filter_.__name__,))
1007
1008	class MonitorFailures(CommonMonitorTestCase):
1009		count = 0
1010
1011		def setUp(self):
1012			"""Call before every test case."""
1013			super(MonitorFailures, self).setUp()
1014			setUpMyTime()
1015			self.filter = self.name = 'NA'
1016			self.name = '%s-%d' % (testclass_name, self.count)
1017			MonitorFailures.count += 1 # so we have unique filenames across tests
1018			self.file = open(self.name, 'a')
1019			self.jail = DummyJail()
1020			self.filter = Filter_(self.jail)
1021			self.filter.banASAP = False # avoid immediate ban in this tests
1022			self.filter.addLogPath(self.name, autoSeek=False)
1023			# speedup search using exact date pattern:
1024			self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
1025			self.filter.active = True
1026			self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
1027			self.filter.start()
1028			# If filter is polling it would sleep a bit to guarantee that
1029			# we have initial time-stamp difference to trigger "actions"
1030			self._sleep_4_poll()
1031			#print "D: started filter %s" % self.filter
1032
1033		def tearDown(self):
1034			tearDownMyTime()
1035			#print "D: SLEEPING A BIT"
1036			#import time; time.sleep(5)
1037			#print "D: TEARING DOWN"
1038			self.filter.stop()
1039			#print "D: WAITING FOR FILTER TO STOP"
1040			self.filter.join()		  # wait for the thread to terminate
1041			#print "D: KILLING THE FILE"
1042			_killfile(self.file, self.name)
1043			#time.sleep(0.2)			  # Give FS time to ack the removal
1044			super(MonitorFailures, self).tearDown()
1045
1046		def _sleep_4_poll(self):
1047			# Since FilterPoll relies on time stamps and some
1048			# actions might be happening too fast in the tests,
1049			# sleep a bit to guarantee reliable time stamps
1050			if isinstance(self.filter, FilterPoll):
1051				Utils.wait_for(self.filter.isAlive, _maxWaitTime(5))
1052
1053		def assert_correct_last_attempt(self, failures, count=None):
1054			self.assertTrue(self.waitFailTotal(count if count else failures[1], 10))
1055			_assert_correct_last_attempt(self, self.jail, failures, count=count)
1056
1057		def test_grow_file(self):
1058			self._test_grow_file()
1059
1060		def test_grow_file_in_idle(self):
1061			self._test_grow_file(True)
1062
1063		def _test_grow_file(self, idle=False):
1064			if idle:
1065				self.filter.sleeptime /= 100.0
1066				self.filter.idle = True
1067				self.waitForTicks(1)
1068			# suck in lines from this sample log file
1069			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1070
1071			# Now let's feed it with entries from the file
1072			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=12)
1073			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1074			# and our dummy jail is empty as well
1075			self.assertFalse(len(self.jail))
1076			# since it should have not been enough
1077
1078			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3)
1079			if idle:
1080				self.waitForTicks(1)
1081				self.assertTrue(self.isEmpty(1))
1082				return
1083			self.assertTrue(self.isFilled(10))
1084			# so we sleep a bit for it not to become empty,
1085			# and meanwhile pass to other thread(s) and filter should
1086			# have gathered new failures and passed them into the
1087			# DummyJail
1088			self.assertEqual(len(self.jail), 1)
1089			# and there should be no "stuck" ticket in failManager
1090			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1091			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1092			self.assertEqual(len(self.jail), 0)
1093
1094			#return
1095			# just for fun let's copy all of them again and see if that results
1096			# in a new ban
1097			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3)
1098			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1099
1100		def test_rewrite_file(self):
1101			# if we rewrite the file at once
1102			self.file.close()
1103			_copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
1104			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1105
1106			# What if file gets overridden
1107			# yoh: skip so we skip those 2 identical lines which our
1108			# filter "marked" as the known beginning, otherwise it
1109			# would not detect "rotation"
1110			self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
1111												  skip=12, n=3, mode='w')
1112			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1113
1114		def _wait4failures(self, count=2):
1115			# Poll might need more time
1116			self.assertTrue(self.isEmpty(_maxWaitTime(5)),
1117							"Queue must be empty but it is not: %s."
1118							% (', '.join([str(x) for x in self.jail.queue])))
1119			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1120			Utils.wait_for(lambda: self.filter.failManager.getFailTotal() >= count, _maxWaitTime(10))
1121			self.assertEqual(self.filter.failManager.getFailTotal(), count)
1122
1123		def test_move_file(self):
1124			# if we move file into a new location while it has been open already
1125			self.file.close()
1126			self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
1127												  n=14, mode='w')
1128			self._wait4failures()
1129
1130			# move aside, but leaving the handle still open...
1131			os.rename(self.name, self.name + '.bak')
1132			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1).close()
1133			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1134			self.assertEqual(self.filter.failManager.getFailTotal(), 3)
1135
1136			# now remove the moved file
1137			_killfile(None, self.name + '.bak')
1138			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3).close()
1139			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1140			self.assertEqual(self.filter.failManager.getFailTotal(), 6)
1141
1142		def test_pyinotify_delWatch(self):
1143			if hasattr(self.filter, '_delWatch'): # pyinotify only
1144				m = self.filter._FilterPyinotify__monitor
1145				# remove existing watch:
1146				self.assertTrue(self.filter._delWatch(m.get_wd(self.name)))
1147				# mockup get_path to allow once find path for invalid wd-value:
1148				_org_get_path = m.get_path
1149				def _get_path(wd):
1150					#m.get_path = _org_get_path
1151					return 'test'
1152				m.get_path = _get_path
1153				# try remove watch using definitely not existing handle:
1154				self.assertFalse(self.filter._delWatch(0x7fffffff))
1155				m.get_path = _org_get_path
1156
1157		def test_del_file(self):
1158			# test filter reaction by delete watching file:
1159			self.file.close()
1160			self.waitForTicks(1)
1161			# remove file (cause detection of log-rotation)...
1162			os.unlink(self.name)
1163			# check it was detected (in pending files):
1164			self.waitForTicks(2)
1165			if hasattr(self.filter, "getPendingPaths"):
1166				self.assertTrue(Utils.wait_for(lambda: self.name in self.filter.getPendingPaths(), _maxWaitTime(10)))
1167				self.assertEqual(len(self.filter.getPendingPaths()), 1)
1168
1169		@with_tmpdir
1170		def test_move_dir(self, tmp):
1171			self.file.close()
1172			self.filter.setMaxRetry(10)
1173			self.filter.delLogPath(self.name)
1174			_killfile(None, self.name)
1175			# if we rename parent dir into a new location (simulate directory-base log rotation)
1176			tmpsub1 = os.path.join(tmp, "1")
1177			tmpsub2 = os.path.join(tmp, "2")
1178			os.mkdir(tmpsub1)
1179			self.name = os.path.join(tmpsub1, os.path.basename(self.name))
1180			os.close(os.open(self.name, os.O_CREAT|os.O_APPEND)); # create empty file
1181			self.filter.addLogPath(self.name, autoSeek=False)
1182
1183			self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
1184												  skip=12, n=1, mode='w')
1185			self.file.close()
1186			self._wait4failures(1)
1187
1188			# rotate whole directory: rename directory 1 as 2a:
1189			os.rename(tmpsub1, tmpsub2 + 'a')
1190			os.mkdir(tmpsub1)
1191			self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
1192												  skip=12, n=1, mode='w')
1193			self.file.close()
1194			self._wait4failures(2)
1195
1196			# rotate whole directory: rename directory 1 as 2b:
1197			os.rename(tmpsub1, tmpsub2 + 'b')
1198			# wait a bit in-between (try to increase coverage, should find pending file for pending dir):
1199			self.waitForTicks(2)
1200			os.mkdir(tmpsub1)
1201			self.waitForTicks(2)
1202			self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
1203												  skip=12, n=1, mode='w')
1204			self.file.close()
1205			self._wait4failures(3)
1206
1207			# stop before tmpdir deleted (just prevents many monitor events)
1208			self.filter.stop()
1209			self.filter.join()
1210
1211
1212		def _test_move_into_file(self, interim_kill=False):
1213			# if we move a new file into the location of an old (monitored) file
1214			_copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
1215			# make sure that it is monitored first
1216			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1217			self.assertEqual(self.filter.failManager.getFailTotal(), 3)
1218
1219			if interim_kill:
1220				_killfile(None, self.name)
1221				time.sleep(Utils.DEFAULT_SHORT_INTERVAL)				  # let them know
1222
1223			# now create a new one to override old one
1224			_copy_lines_between_files(GetFailures.FILENAME_01, self.name + '.new',
1225				skip=12, n=3).close()
1226			os.rename(self.name + '.new', self.name)
1227			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1228			self.assertEqual(self.filter.failManager.getFailTotal(), 6)
1229
1230			# and to make sure that it now monitored for changes
1231			_copy_lines_between_files(GetFailures.FILENAME_01, self.name,
1232				skip=12, n=3).close()
1233			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1234			self.assertEqual(self.filter.failManager.getFailTotal(), 9)
1235
1236		def test_move_into_file(self):
1237			self._test_move_into_file(interim_kill=False)
1238
1239		def test_move_into_file_after_removed(self):
1240			# exactly as above test + remove file explicitly
1241			# to test against possible drop-out of the file from monitoring
1242		    self._test_move_into_file(interim_kill=True)
1243
1244		def test_new_bogus_file(self):
1245			# to make sure that watching whole directory does not effect
1246			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
1247			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1248
1249			# create a bogus file in the same directory and see if that doesn't affect
1250			open(self.name + '.bak2', 'w').close()
1251			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3).close()
1252			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1253			self.assertEqual(self.filter.failManager.getFailTotal(), 6)
1254			_killfile(None, self.name + '.bak2')
1255
1256		def test_delLogPath(self):
1257			# Smoke test for removing of the path from being watched
1258
1259			# basic full test
1260			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
1261			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1262
1263			# and now remove the LogPath
1264			self.filter.delLogPath(self.name)
1265			# wait a bit for filter (backend-threads):
1266			self.waitForTicks(2)
1267
1268			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
1269			# so we should get no more failures detected
1270			self.assertTrue(self.isEmpty(10))
1271
1272			# but then if we add it back again (no seek to time in FileFilter's, because in file used the same time)
1273			self.filter.addLogPath(self.name, autoSeek=False)
1274			# wait a bit for filter (backend-threads):
1275			self.waitForTicks(2)
1276			# Tricky catch here is that it should get them from the
1277			# tail written before, so let's not copy anything yet
1278			#_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
1279			# we should detect the failures
1280			self.assert_correct_last_attempt(GetFailures.FAILURES_01, count=6) # was needed if we write twice above
1281
1282			# now copy and get even more
1283			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=12, n=3)
1284			# check for 3 failures (not 9), because 6 already get above...
1285			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
1286			# total count in this test:
1287			self.assertEqual(self.filter.failManager.getFailTotal(), 12)
1288
1289	cls = MonitorFailures
1290	cls.__qualname__ = cls.__name__ = "MonitorFailures<%s>(%s)" \
1291			  % (Filter_.__name__, testclass_name) # 'tempfile')
1292	return cls
1293
1294
1295def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
1296	"""Generator of TestCase's for journal based filters/backends
1297	"""
1298
1299	testclass_name = "monitorjournalfailures_%s" % (Filter_.__name__,)
1300
1301	class MonitorJournalFailures(CommonMonitorTestCase):
1302		def setUp(self):
1303			"""Call before every test case."""
1304			super(MonitorJournalFailures, self).setUp()
1305			self._runtimeJournal = None
1306			self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log")
1307			self.jail = DummyJail()
1308			self.filter = None
1309			# UUID used to ensure that only meeages generated
1310			# as part of this test are picked up by the filter
1311			self.test_uuid = str(uuid.uuid4())
1312			self.name = "%s-%s" % (testclass_name, self.test_uuid)
1313			self.journal_fields = {
1314				'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid}
1315
1316		def _initFilter(self, **kwargs):
1317			self._getRuntimeJournal() # check journal available
1318			self.filter = Filter_(self.jail, **kwargs)
1319			self.filter.banASAP = False # avoid immediate ban in this tests
1320			self.filter.addJournalMatch([
1321				"SYSLOG_IDENTIFIER=fail2ban-testcases",
1322				"TEST_FIELD=1",
1323				"TEST_UUID=%s" % self.test_uuid])
1324			self.filter.addJournalMatch([
1325				"SYSLOG_IDENTIFIER=fail2ban-testcases",
1326				"TEST_FIELD=2",
1327				"TEST_UUID=%s" % self.test_uuid])
1328			self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
1329
1330		def tearDown(self):
1331			if self.filter and self.filter.active:
1332				self.filter.stop()
1333				self.filter.join()		  # wait for the thread to terminate
1334			super(MonitorJournalFailures, self).tearDown()
1335
1336		def _getRuntimeJournal(self):
1337			"""Retrieve current system journal path
1338
1339			If not found, SkipTest exception will be raised.
1340			"""
1341			# we can cache it:
1342			if self._runtimeJournal is None:
1343				# Depending on the system, it could be found under /run or /var/log (e.g. Debian)
1344				# which are pointed by different systemd-path variables.  We will
1345				# check one at at time until the first hit
1346				for systemd_var in 'system-runtime-logs', 'system-state-logs':
1347					tmp = Utils.executeCmd(
1348						'find "$(systemd-path %s)" -name system.journal' % systemd_var,
1349						timeout=10, shell=True, output=True
1350					)
1351					self.assertTrue(tmp)
1352					out = str(tmp[1].decode('utf-8')).split('\n')[0]
1353					if out: break
1354				self._runtimeJournal = out
1355			if self._runtimeJournal:
1356				return self._runtimeJournal
1357			raise unittest.SkipTest('systemd journal seems to be not available (e. g. no rights to read)')
1358
1359		def testJournalFilesArg(self):
1360			# retrieve current system journal path
1361			jrnlfile = self._getRuntimeJournal()
1362			self._initFilter(journalfiles=jrnlfile)
1363
1364		def testJournalFilesAndFlagsArgs(self):
1365			# retrieve current system journal path
1366			jrnlfile = self._getRuntimeJournal()
1367			self._initFilter(journalfiles=jrnlfile, journalflags=0)
1368
1369		def testJournalPathArg(self):
1370			# retrieve current system journal path
1371			jrnlpath = self._getRuntimeJournal()
1372			jrnlpath = os.path.dirname(jrnlpath)
1373			self._initFilter(journalpath=jrnlpath)
1374			self.filter.seekToTime(
1375				datetime.datetime.now() - datetime.timedelta(days=1)
1376			)
1377			self.filter.start()
1378			self.waitForTicks(2)
1379			self.assertTrue(self.isEmpty(1))
1380			self.assertEqual(len(self.jail), 0)
1381			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1382
1383		def testJournalFlagsArg(self):
1384			self._initFilter(journalflags=0) # e. g. 2 - journal.RUNTIME_ONLY
1385
1386		def assert_correct_ban(self, test_ip, test_attempts):
1387			self.assertTrue(self.waitFailTotal(test_attempts, 10)) # give Filter a chance to react
1388			ticket = self.jail.getFailTicket()
1389			self.assertTrue(ticket)
1390
1391			attempts = ticket.getAttempt()
1392			ip = ticket.getIP()
1393			ticket.getMatches()
1394
1395			self.assertEqual(ip, test_ip)
1396			self.assertEqual(attempts, test_attempts)
1397
1398		def test_grow_file(self):
1399			self._test_grow_file()
1400
1401		def test_grow_file_in_idle(self):
1402			self._test_grow_file(True)
1403
1404		def _test_grow_file(self, idle=False):
1405			self._initFilter()
1406			self.filter.start()
1407			if idle:
1408				self.filter.sleeptime /= 100.0
1409				self.filter.idle = True
1410				self.waitForTicks(1)
1411			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1412
1413			# Now let's feed it with entries from the file
1414			_copy_lines_to_journal(
1415				self.test_file, self.journal_fields, n=2)
1416			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1417			# and our dummy jail is empty as well
1418			self.assertFalse(len(self.jail))
1419			# since it should have not been enough
1420
1421			_copy_lines_to_journal(
1422				self.test_file, self.journal_fields, skip=2, n=3)
1423			if idle:
1424				self.waitForTicks(1)
1425				self.assertTrue(self.isEmpty(1))
1426				return
1427			self.assertTrue(self.isFilled(10))
1428			# so we sleep for up to 6 sec for it not to become empty,
1429			# and meanwhile pass to other thread(s) and filter should
1430			# have gathered new failures and passed them into the
1431			# DummyJail
1432			self.assertEqual(len(self.jail), 1)
1433			# and there should be no "stuck" ticket in failManager
1434			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1435			self.assert_correct_ban("193.168.0.128", 3)
1436			self.assertEqual(len(self.jail), 0)
1437
1438			# Lets read some more to check it bans again
1439			_copy_lines_to_journal(
1440				self.test_file, self.journal_fields, skip=5, n=4)
1441			self.assert_correct_ban("193.168.0.128", 3)
1442
1443		@with_alt_time
1444		def test_grow_file_with_db(self):
1445
1446			def _gen_falure(ip):
1447				# insert new failures ans check it is monitored:
1448				fields = self.journal_fields
1449				fields.update(TEST_JOURNAL_FIELDS)
1450				journal.send(MESSAGE="error: PAM: Authentication failure for test from "+ip, **fields)
1451				self.waitForTicks(1)
1452				self.assert_correct_ban(ip, 1)
1453
1454			# coverage for update log:
1455			self.jail.database = getFail2BanDb(':memory:')
1456			self.jail.database.addJail(self.jail)
1457			MyTime.setTime(time.time())
1458			self._test_grow_file()
1459			# stop:
1460			self.filter.stop()
1461			self.filter.join()
1462			MyTime.setTime(time.time() + 2)
1463			# update log manually (should cause a seek to end of log without wait for next second):
1464			self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time(), 'TEST')
1465			# check seek to last (simulated) position succeeds (without bans of previous copied tickets):
1466			self._failTotal = 0
1467			self._initFilter()
1468			self.filter.setMaxRetry(1)
1469			self.filter.start()
1470			self.waitForTicks(1)
1471			# check new IP but no old IPs found:
1472			_gen_falure("192.0.2.5")
1473			self.assertFalse(self.jail.getFailTicket())
1474
1475			# now the same with increased time (check now - findtime case):
1476			self.filter.stop()
1477			self.filter.join()
1478			MyTime.setTime(time.time() + 10000)
1479			self._failTotal = 0
1480			self._initFilter()
1481			self.filter.setMaxRetry(1)
1482			self.filter.start()
1483			self.waitForTicks(1)
1484			MyTime.setTime(time.time() + 3)
1485			# check new IP but no old IPs found:
1486			_gen_falure("192.0.2.6")
1487			self.assertFalse(self.jail.getFailTicket())
1488
1489		def test_delJournalMatch(self):
1490			self._initFilter()
1491			self.filter.start()
1492			# Smoke test for removing of match
1493
1494			# basic full test
1495			_copy_lines_to_journal(
1496				self.test_file, self.journal_fields, n=5)
1497			self.assert_correct_ban("193.168.0.128", 3)
1498
1499			# and now remove the JournalMatch
1500			self.filter.delJournalMatch([
1501				"SYSLOG_IDENTIFIER=fail2ban-testcases",
1502				"TEST_FIELD=1",
1503				"TEST_UUID=%s" % self.test_uuid])
1504
1505			_copy_lines_to_journal(
1506				self.test_file, self.journal_fields, n=5, skip=5)
1507			# so we should get no more failures detected
1508			self.assertTrue(self.isEmpty(10))
1509
1510			# but then if we add it back again
1511			self.filter.addJournalMatch([
1512				"SYSLOG_IDENTIFIER=fail2ban-testcases",
1513				"TEST_FIELD=1",
1514				"TEST_UUID=%s" % self.test_uuid])
1515			self.assert_correct_ban("193.168.0.128", 4)
1516			_copy_lines_to_journal(
1517				self.test_file, self.journal_fields, n=6, skip=10)
1518			# we should detect the failures
1519			self.assertTrue(self.isFilled(10))
1520
1521		def test_WrongChar(self):
1522			self._initFilter()
1523			self.filter.start()
1524			# Now let's feed it with entries from the file
1525			_copy_lines_to_journal(
1526				self.test_file, self.journal_fields, skip=15, n=4)
1527			self.waitForTicks(1)
1528			self.assertTrue(self.isFilled(10))
1529			self.assert_correct_ban("87.142.124.10", 4)
1530			# Add direct utf, unicode, blob:
1531			for l in (
1532		    "error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
1533		   "error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
1534		   b"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1".decode('utf-8', 'replace'),
1535		    "error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
1536		   "error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
1537		   b"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2".decode('utf-8', 'replace')
1538			):
1539				fields = self.journal_fields
1540				fields.update(TEST_JOURNAL_FIELDS)
1541				journal.send(MESSAGE=l, **fields)
1542			self.waitForTicks(1)
1543			self.waitFailTotal(6, 10)
1544			self.assertTrue(Utils.wait_for(lambda: len(self.jail) == 2, 10))
1545			self.assertSortedEqual([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()],
1546				["192.0.2.1", "192.0.2.2"])
1547
1548	cls = MonitorJournalFailures
1549	cls.__qualname__ = cls.__name__ = "MonitorJournalFailures<%s>(%s)" \
1550			  % (Filter_.__name__, testclass_name)
1551	return cls
1552
1553
1554class GetFailures(LogCaptureTestCase):
1555
1556	FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
1557	FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
1558	FILENAME_03 = os.path.join(TEST_FILES_DIR, "testcase03.log")
1559	FILENAME_04 = os.path.join(TEST_FILES_DIR, "testcase04.log")
1560	FILENAME_USEDNS = os.path.join(TEST_FILES_DIR, "testcase-usedns.log")
1561	FILENAME_MULTILINE = os.path.join(TEST_FILES_DIR, "testcase-multiline.log")
1562
1563	# so that they could be reused by other tests
1564	FAILURES_01 = ('193.168.0.128', 3, 1124013599.0,
1565				  ['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128']*3)
1566
1567	def setUp(self):
1568		"""Call before every test case."""
1569		LogCaptureTestCase.setUp(self)
1570		setUpMyTime()
1571		self.jail = DummyJail()
1572		self.filter = FileFilter(self.jail)
1573		self.filter.banASAP = False # avoid immediate ban in this tests
1574		self.filter.active = True
1575		# speedup search using exact date pattern:
1576		self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
1577		# TODO Test this
1578		#self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
1579		#self.filter.setTimePattern("%b %d %H:%M:%S")
1580
1581	def tearDown(self):
1582		"""Call after every test case."""
1583		tearDownMyTime()
1584		LogCaptureTestCase.tearDown(self)
1585
1586	def testFilterAPI(self):
1587		self.assertEqual(self.filter.getLogs(), [])
1588		self.assertEqual(self.filter.getLogCount(), 0)
1589		self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
1590		self.assertEqual(self.filter.getLogCount(), 1)
1591		self.assertEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01])
1592		self.filter.addLogPath(GetFailures.FILENAME_02, tail=True)
1593		self.assertEqual(self.filter.getLogCount(), 2)
1594		self.assertSortedEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01, GetFailures.FILENAME_02])
1595
1596	def testTail(self):
1597		# There must be no containters registered, otherwise [-1] indexing would be wrong
1598		self.assertEqual(self.filter.getLogs(), [])
1599		self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
1600		self.assertEqual(self.filter.getLogs()[-1].getPos(), 1653)
1601		self.filter.getLogs()[-1].close()
1602		self.assertEqual(self.filter.getLogs()[-1].readline(), "")
1603		self.filter.delLogPath(GetFailures.FILENAME_01)
1604		self.assertEqual(self.filter.getLogs(), [])
1605
1606	def testNoLogAdded(self):
1607		self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
1608		self.assertTrue(self.filter.containsLogPath(GetFailures.FILENAME_01))
1609		self.filter.delLogPath(GetFailures.FILENAME_01)
1610		self.assertFalse(self.filter.containsLogPath(GetFailures.FILENAME_01))
1611		# and unknown (safety and cover)
1612		self.assertFalse(self.filter.containsLogPath('unknown.log'))
1613		self.filter.delLogPath('unknown.log')
1614
1615
1616	def testGetFailures01(self, filename=None, failures=None):
1617		filename = filename or GetFailures.FILENAME_01
1618		failures = failures or GetFailures.FAILURES_01
1619
1620		self.filter.addLogPath(filename, autoSeek=0)
1621		self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$")
1622		self.filter.getFailures(filename)
1623		_assert_correct_last_attempt(self, self.filter,  failures)
1624
1625	def testCRLFFailures01(self):
1626		# We first adjust logfile/failures to end with CR+LF
1627		fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
1628		# poor man unix2dos:
1629		fin, fout = open(GetFailures.FILENAME_01, 'rb'), open(fname, 'wb')
1630		for l in fin.read().splitlines():
1631			fout.write(l + b'\r\n')
1632		fin.close()
1633		fout.close()
1634
1635		# now see if we should be getting the "same" failures
1636		self.testGetFailures01(filename=fname)
1637		_killfile(fout, fname)
1638
1639	def testGetFailures02(self):
1640		output = ('141.3.81.106', 4, 1124013539.0,
1641				  ['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2'
1642				   % m for m in (53, 54, 57, 58)])
1643
1644		self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=0)
1645		self.filter.addFailRegex(r"Failed .* from <HOST>")
1646		self.filter.getFailures(GetFailures.FILENAME_02)
1647		_assert_correct_last_attempt(self, self.filter, output)
1648
1649	def testGetFailures03(self):
1650		output = ('203.162.223.135', 6, 1124013600.0)
1651
1652		self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0)
1653		self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
1654		self.filter.getFailures(GetFailures.FILENAME_03)
1655		_assert_correct_last_attempt(self, self.filter, output)
1656
1657	def testGetFailures03_InOperation(self):
1658		output = ('203.162.223.135', 9, 1124013600.0)
1659
1660		self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0)
1661		self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
1662		self.filter.getFailures(GetFailures.FILENAME_03, inOperation=True)
1663		_assert_correct_last_attempt(self, self.filter, output)
1664
1665	def testGetFailures03_Seek1(self):
1666		# same test as above but with seek to 'Aug 14 11:55:04' - so other output ...
1667		output = ('203.162.223.135', 3, 1124013600.0)
1668
1669		self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2] - 4*60)
1670		self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
1671		self.filter.getFailures(GetFailures.FILENAME_03)
1672		_assert_correct_last_attempt(self, self.filter, output)
1673
1674	def testGetFailures03_Seek2(self):
1675		# same test as above but with seek to 'Aug 14 11:59:04' - so other output ...
1676		output = ('203.162.223.135', 2, 1124013600.0)
1677		self.filter.setMaxRetry(1)
1678
1679		self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2])
1680		self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
1681		self.filter.getFailures(GetFailures.FILENAME_03)
1682		_assert_correct_last_attempt(self, self.filter, output)
1683
1684	def testGetFailures04(self):
1685		# because of not exact time in testcase04.log (no year), we should always use our test time:
1686		self.assertEqual(MyTime.time(), 1124013600)
1687		# should find exact 4 failures for *.186 and 2 failures for *.185
1688		output = (('212.41.96.186', 4, 1124013600.0),
1689				  ('212.41.96.185', 2, 1124013598.0))
1690
1691		# speedup search using exact date pattern:
1692		self.filter.setDatePattern((r'^%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?',
1693			r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?',
1694			r'^EPOCH'
1695		))
1696		self.filter.setMaxRetry(2)
1697		self.filter.addLogPath(GetFailures.FILENAME_04, autoSeek=0)
1698		self.filter.addFailRegex(r"Invalid user .* <HOST>")
1699		self.filter.getFailures(GetFailures.FILENAME_04)
1700
1701		_assert_correct_last_attempt(self, self.filter, output)
1702
1703	def testGetFailuresWrongChar(self):
1704		self.filter.checkFindTime = False
1705		# write wrong utf-8 char:
1706		fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
1707		fout = fopen(fname, 'wb')
1708		try:
1709			# write:
1710			for l in (
1711				b'2015-01-14 20:00:58 user \"test\xf1ing\" from \"192.0.2.0\"\n',          # wrong utf-8 char
1712				b'2015-01-14 20:00:59 user \"\xd1\xe2\xe5\xf2\xe0\" from \"192.0.2.0\"\n', # wrong utf-8 chars
1713				b'2015-01-14 20:01:00 user \"testing\" from \"192.0.2.0\"\n'               # correct utf-8 chars
1714			):
1715				fout.write(l)
1716			fout.close()
1717			#
1718			output = ('192.0.2.0', 3, 1421262060.0)
1719			failregex = r"^\s*user \"[^\"]*\" from \"<HOST>\"\s*$"
1720
1721			# test encoding auto or direct set of encoding:
1722			for enc in (None, 'utf-8', 'ascii'):
1723				if enc is not None:
1724					self.tearDown();self.setUp();
1725					if DefLogSys.getEffectiveLevel() > 7: DefLogSys.setLevel(7); # ensure decode_line logs always
1726					self.filter.checkFindTime = False;
1727					self.filter.setLogEncoding(enc);
1728				# speedup search using exact date pattern:
1729				self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
1730				self.assertNotLogged('Error decoding line');
1731				self.filter.addLogPath(fname)
1732				self.filter.addFailRegex(failregex)
1733				self.filter.getFailures(fname)
1734				_assert_correct_last_attempt(self, self.filter, output)
1735
1736				self.assertLogged('Error decoding line');
1737				self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user ');
1738				self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user ');
1739
1740		finally:
1741			_killfile(fout, fname)
1742
1743	def testGetFailuresUseDNS(self):
1744		unittest.F2B.SkipIfNoNetwork()
1745		# We should still catch failures with usedns = no ;-)
1746		output_yes = (
1747			('93.184.216.34', 2, 1124013539.0,
1748			  ['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2',
1749			   'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
1750			),
1751			('2606:2800:220:1:248:1893:25c8:1946', 1, 1124013299.0,
1752			  ['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']
1753			),
1754		)
1755
1756		output_no = (
1757			('93.184.216.34', 1, 1124013539.0,
1758			  ['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
1759			)
1760		)
1761
1762		# Actually no exception would be raised -- it will be just set to 'no'
1763		#self.assertRaises(ValueError,
1764		#				  FileFilter, None, useDns='wrong_value_for_useDns')
1765
1766		for useDns, output in (
1767			('yes',  output_yes),
1768			('no',   output_no),
1769			('warn', output_yes)
1770		):
1771			self.pruneLog("[test-phase useDns=%s]" % useDns)
1772			jail = DummyJail()
1773			filter_ = FileFilter(jail, useDns=useDns)
1774			filter_.banASAP = False # avoid immediate ban in this tests
1775			filter_.active = True
1776			filter_.failManager.setMaxRetry(1)	# we might have just few failures
1777
1778			filter_.addLogPath(GetFailures.FILENAME_USEDNS, autoSeek=False)
1779			filter_.addFailRegex(r"Failed .* from <HOST>")
1780			filter_.getFailures(GetFailures.FILENAME_USEDNS)
1781			_assert_correct_last_attempt(self, filter_, output)
1782
1783	def testGetFailuresMultiRegex(self):
1784		output = ('141.3.81.106', 8, 1124013541.0)
1785
1786		self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False)
1787		self.filter.addFailRegex(r"Failed .* from <HOST>")
1788		self.filter.addFailRegex(r"Accepted .* from <HOST>")
1789		self.filter.getFailures(GetFailures.FILENAME_02)
1790		_assert_correct_last_attempt(self, self.filter, output)
1791
1792	def testGetFailuresIgnoreRegex(self):
1793		self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False)
1794		self.filter.addFailRegex(r"Failed .* from <HOST>")
1795		self.filter.addFailRegex(r"Accepted .* from <HOST>")
1796		self.filter.addIgnoreRegex("for roehl")
1797
1798		self.filter.getFailures(GetFailures.FILENAME_02)
1799
1800		self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1801
1802	def testGetFailuresMultiLine(self):
1803		output = [("192.0.43.10", 2, 1124013599.0),
1804			("192.0.43.11", 1, 1124013598.0)]
1805		self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
1806		self.filter.setMaxLines(100)
1807		self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
1808		self.filter.setMaxRetry(1)
1809
1810		self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
1811
1812		foundList = []
1813		while True:
1814			try:
1815				foundList.append(
1816					_ticket_tuple(self.filter.failManager.toBan())[0:3])
1817			except FailManagerEmpty:
1818				break
1819		self.assertSortedEqual(foundList, output)
1820
1821	def testGetFailuresMultiLineIgnoreRegex(self):
1822		output = [("192.0.43.10", 2, 1124013599.0)]
1823		self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
1824		self.filter.setMaxLines(100)
1825		self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
1826		self.filter.addIgnoreRegex("rsync error: Received SIGINT")
1827		self.filter.setMaxRetry(1)
1828
1829		self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
1830
1831		_assert_correct_last_attempt(self, self.filter, output.pop())
1832
1833		self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
1834
1835	def testGetFailuresMultiLineMultiRegex(self):
1836		output = [("192.0.43.10", 2, 1124013599.0),
1837			("192.0.43.11", 1, 1124013598.0),
1838			("192.0.43.15", 1, 1124013598.0)]
1839		self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
1840		self.filter.setMaxLines(100)
1841		self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
1842		self.filter.addFailRegex(r"^.* sendmail\[.*, msgid=<(?P<msgid>[^>]+).*relay=\[<HOST>\].*$<SKIPLINES>^.+ spamd: result: Y \d+ .*,mid=<(?P=msgid)>(,bayes=[.\d]+)?(,autolearn=\S+)?\s*$")
1843		self.filter.setMaxRetry(1)
1844
1845		self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
1846
1847		foundList = []
1848		while True:
1849			try:
1850				foundList.append(
1851					_ticket_tuple(self.filter.failManager.toBan())[0:3])
1852			except FailManagerEmpty:
1853				break
1854		self.assertSortedEqual(foundList, output)
1855
1856
1857class DNSUtilsTests(unittest.TestCase):
1858
1859	def testCache(self):
1860		c = Utils.Cache(maxCount=5, maxTime=60)
1861		# not available :
1862		self.assertTrue(c.get('a') is None)
1863		self.assertEqual(c.get('a', 'test'), 'test')
1864		# exact 5 elements :
1865		for i in range(5):
1866			c.set(i, i)
1867		for i in range(5):
1868			self.assertEqual(c.get(i), i)
1869		# remove unavailable key:
1870		c.unset('a'); c.unset('a')
1871
1872	def testCacheMaxSize(self):
1873		c = Utils.Cache(maxCount=5, maxTime=60)
1874		# exact 5 elements :
1875		for i in range(5):
1876			c.set(i, i)
1877		self.assertEqual([c.get(i) for i in range(5)], [i for i in range(5)])
1878		self.assertNotIn(-1, (c.get(i, -1) for i in range(5)))
1879		# add one - too many:
1880		c.set(10, i)
1881		# one element should be removed :
1882		self.assertIn(-1, (c.get(i, -1) for i in range(5)))
1883		# test max size (not expired):
1884		for i in range(10):
1885			c.set(i, 1)
1886		self.assertEqual(len(c), 5)
1887
1888	def testCacheMaxTime(self):
1889		# test max time (expired, timeout reached) :
1890		c = Utils.Cache(maxCount=5, maxTime=0.0005)
1891		for i in range(10):
1892			c.set(i, 1)
1893		st = time.time()
1894		self.assertTrue(Utils.wait_for(lambda: time.time() >= st + 0.0005, 1))
1895		# we have still 5 elements (or fewer if too slow test mashine):
1896		self.assertTrue(len(c) <= 5)
1897		# but all that are expiered also:
1898		for i in range(10):
1899			self.assertTrue(c.get(i) is None)
1900		# here the whole cache should be empty:
1901		self.assertEqual(len(c), 0)
1902
1903	def testOverflowedIPCache(self):
1904		# test overflow of IP-cache multi-threaded (2 "parasite" threads flooding cache):
1905		from threading import Thread
1906		from random import shuffle
1907		# save original cache and use smaller cache during the test here:
1908		_org_cache = IPAddr.CACHE_OBJ
1909		cache = IPAddr.CACHE_OBJ = Utils.Cache(maxCount=5, maxTime=60)
1910		result = list()
1911		count = 1 if unittest.F2B.fast else 50
1912		try:
1913			# tester procedure of worker:
1914			def _TestCacheStr2IP(forw=True, result=[], random=False):
1915				try:
1916					c = count
1917					while c:
1918						c -= 1
1919						s = range(0, 256, 1) if forw else range(255, -1, -1)
1920						if random: shuffle([i for i in s])
1921						for i in s:
1922							IPAddr('192.0.2.'+str(i), IPAddr.FAM_IPv4)
1923							IPAddr('2001:db8::'+str(i), IPAddr.FAM_IPv6)
1924					result.append(None)
1925				except Exception as e:
1926					DefLogSys.debug(e, exc_info=True)
1927					result.append(e)
1928
1929			# 2 workers flooding it forwards and backwards:
1930			th1 = Thread(target=_TestCacheStr2IP, args=(True,  result)); th1.start()
1931			th2 = Thread(target=_TestCacheStr2IP, args=(False, result)); th2.start()
1932			# and here we flooding it with random IPs too:
1933			_TestCacheStr2IP(True, result, True)
1934		finally:
1935			# wait for end of threads and restore cache:
1936			th1.join()
1937			th2.join()
1938			IPAddr.CACHE_OBJ = _org_cache
1939		self.assertEqual(result, [None]*3) # no errors
1940		self.assertTrue(len(cache) <= cache.maxCount)
1941
1942
1943class DNSUtilsNetworkTests(unittest.TestCase):
1944
1945	def setUp(self):
1946		"""Call before every test case."""
1947		super(DNSUtilsNetworkTests, self).setUp()
1948		#unittest.F2B.SkipIfNoNetwork()
1949
1950	def test_IPAddr(self):
1951		ip4 = IPAddr('192.0.2.1')
1952		ip6 = IPAddr('2001:DB8::')
1953		self.assertTrue(ip4.isIPv4)
1954		self.assertTrue(ip4.isSingle)
1955		self.assertTrue(ip6.isIPv6)
1956		self.assertTrue(ip6.isSingle)
1957		self.assertTrue(asip('192.0.2.1').isIPv4)
1958		self.assertTrue(id(asip(ip4)) == id(ip4))
1959
1960	def test_IPAddr_Raw(self):
1961		# raw string:
1962		r = IPAddr('xxx', IPAddr.CIDR_RAW)
1963		self.assertFalse(r.isIPv4)
1964		self.assertFalse(r.isIPv6)
1965		self.assertFalse(r.isSingle)
1966		self.assertTrue(r.isValid)
1967		self.assertEqual(r, 'xxx')
1968		self.assertEqual('xxx', str(r))
1969		self.assertNotEqual(r, IPAddr('xxx'))
1970		# raw (not IP, for example host:port as string):
1971		r = IPAddr('1:2', IPAddr.CIDR_RAW)
1972		self.assertFalse(r.isIPv4)
1973		self.assertFalse(r.isIPv6)
1974		self.assertFalse(r.isSingle)
1975		self.assertTrue(r.isValid)
1976		self.assertEqual(r, '1:2')
1977		self.assertEqual('1:2', str(r))
1978		self.assertNotEqual(r, IPAddr('1:2'))
1979		# raw vs ip4 (raw is not an ip):
1980		r = IPAddr('93.184.0.1', IPAddr.CIDR_RAW)
1981		ip4 = IPAddr('93.184.0.1')
1982		self.assertNotEqual(ip4, r)
1983		self.assertNotEqual(r, ip4)
1984		self.assertTrue(r < ip4)
1985		self.assertTrue(r < ip4)
1986		# raw vs ip6 (raw is not an ip):
1987		r = IPAddr('1::2', IPAddr.CIDR_RAW)
1988		ip6 = IPAddr('1::2')
1989		self.assertNotEqual(ip6, r)
1990		self.assertNotEqual(r, ip6)
1991		self.assertTrue(r < ip6)
1992		self.assertTrue(r < ip6)
1993
1994	def testUseDns(self):
1995		res = DNSUtils.textToIp('www.example.com', 'no')
1996		self.assertSortedEqual(res, [])
1997		#unittest.F2B.SkipIfNoNetwork()
1998		res = DNSUtils.textToIp('www.example.com', 'warn')
1999		# sort ipaddr, IPv4 is always smaller as IPv6
2000		self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
2001		res = DNSUtils.textToIp('www.example.com', 'yes')
2002		# sort ipaddr, IPv4 is always smaller as IPv6
2003		self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
2004
2005	def testTextToIp(self):
2006		#unittest.F2B.SkipIfNoNetwork()
2007		# Test hostnames
2008		hostnames = [
2009			'www.example.com',
2010			'doh1.2.3.4.buga.xxxxx.yyy.invalid',
2011			'1.2.3.4.buga.xxxxx.yyy.invalid',
2012			]
2013		for s in hostnames:
2014			res = DNSUtils.textToIp(s, 'yes')
2015			if s == 'www.example.com':
2016				# sort ipaddr, IPv4 is always smaller as IPv6
2017				self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
2018			else:
2019				self.assertSortedEqual(res, [])
2020
2021	def testIpToIp(self):
2022		# pure ips:
2023		for s in ('93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'):
2024			ips = DNSUtils.textToIp(s, 'yes')
2025			self.assertSortedEqual(ips, [s])
2026			for ip in ips:
2027				self.assertTrue(isinstance(ip, IPAddr))
2028
2029	def testIpToName(self):
2030		#unittest.F2B.SkipIfNoNetwork()
2031		res = DNSUtils.ipToName('8.8.4.4')
2032		self.assertTrue(res.endswith(('.google', '.google.com')))
2033		# same as above, but with IPAddr:
2034		res = DNSUtils.ipToName(IPAddr('8.8.4.4'))
2035		self.assertTrue(res.endswith(('.google', '.google.com')))
2036		# invalid ip (TEST-NET-1 according to RFC 5737)
2037		res = DNSUtils.ipToName('192.0.2.0')
2038		self.assertEqual(res, None)
2039		# invalid ip:
2040		res = DNSUtils.ipToName('192.0.2.888')
2041		self.assertEqual(res, None)
2042
2043	def testAddr2bin(self):
2044		res = IPAddr('10.0.0.0')
2045		self.assertEqual(res.addr, 167772160)
2046		res = IPAddr('10.0.0.0', cidr=None)
2047		self.assertEqual(res.addr, 167772160)
2048		res = IPAddr('10.0.0.0', cidr=32)
2049		self.assertEqual(res.addr, 167772160)
2050		res = IPAddr('10.0.0.1', cidr=32)
2051		self.assertEqual(res.addr, 167772161)
2052		self.assertTrue(res.isSingle)
2053		res = IPAddr('10.0.0.1', cidr=31)
2054		self.assertEqual(res.addr, 167772160)
2055		self.assertFalse(res.isSingle)
2056
2057		self.assertEqual(IPAddr('10.0.0.0').hexdump, '0a000000')
2058		self.assertEqual(IPAddr('1::2').hexdump, '00010000000000000000000000000002')
2059		self.assertEqual(IPAddr('xxx').hexdump, '')
2060
2061		self.assertEqual(IPAddr('192.0.2.0').getPTR(), '0.2.0.192.in-addr.arpa.')
2062		self.assertEqual(IPAddr('192.0.2.1').getPTR(), '1.2.0.192.in-addr.arpa.')
2063		self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:1946').getPTR(),
2064			'6.4.9.1.8.c.5.2.3.9.8.1.8.4.2.0.1.0.0.0.0.2.2.0.0.0.8.2.6.0.6.2.ip6.arpa.')
2065
2066	def testIPAddr_Equal6(self):
2067		self.assertEqual(
2068			IPAddr('2606:2800:220:1:248:1893::'),
2069			IPAddr('2606:2800:220:1:248:1893:0:0')
2070		)
2071		# special case IPv6 in brackets:
2072		self.assertEqual(
2073			IPAddr('[2606:2800:220:1:248:1893::]'),
2074			IPAddr('2606:2800:220:1:248:1893:0:0')
2075		)
2076
2077	def testIPAddr_InInet(self):
2078		ip4net = IPAddr('93.184.0.1/24')
2079		ip6net = IPAddr('2606:2800:220:1:248:1893:25c8:0/120')
2080		self.assertFalse(ip4net.isSingle)
2081		self.assertFalse(ip6net.isSingle)
2082		# ip4:
2083		self.assertTrue(IPAddr('93.184.0.1').isInNet(ip4net))
2084		self.assertTrue(IPAddr('93.184.0.255').isInNet(ip4net))
2085		self.assertFalse(IPAddr('93.184.1.0').isInNet(ip4net))
2086		self.assertFalse(IPAddr('93.184.0.1').isInNet(ip6net))
2087		# ip6:
2088		self.assertTrue(IPAddr('2606:2800:220:1:248:1893:25c8:1').isInNet(ip6net))
2089		self.assertTrue(IPAddr('2606:2800:220:1:248:1893:25c8:ff').isInNet(ip6net))
2090		self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:100').isInNet(ip6net))
2091		self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:100').isInNet(ip4net))
2092		# raw not in net:
2093		self.assertFalse(IPAddr('93.184.0.1', IPAddr.CIDR_RAW).isInNet(ip4net))
2094		self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:1', IPAddr.CIDR_RAW).isInNet(ip6net))
2095		# invalid not in net:
2096		self.assertFalse(IPAddr('xxx').isInNet(ip4net))
2097
2098	def testIPAddr_Compare(self):
2099		ip4 = [
2100			IPAddr('93.184.0.1'),
2101			IPAddr('93.184.216.1'),
2102			IPAddr('93.184.216.34')
2103		]
2104		ip6 = [
2105			IPAddr('2606:2800:220:1:248:1893::'),
2106			IPAddr('2606:2800:220:1:248:1893:25c8:0'),
2107			IPAddr('2606:2800:220:1:248:1893:25c8:1946')
2108		]
2109		# ip4
2110		self.assertNotEqual(ip4[0], None)
2111		self.assertTrue(ip4[0] is not None)
2112		self.assertFalse(ip4[0] is None)
2113		self.assertTrue(ip4[0] < ip4[1])
2114		self.assertTrue(ip4[1] < ip4[2])
2115		self.assertEqual(sorted(reversed(ip4)), ip4)
2116		# ip6
2117		self.assertNotEqual(ip6[0], None)
2118		self.assertTrue(ip6[0] is not None)
2119		self.assertFalse(ip6[0] is None)
2120		self.assertTrue(ip6[0] < ip6[1])
2121		self.assertTrue(ip6[1] < ip6[2])
2122		self.assertEqual(sorted(reversed(ip6)), ip6)
2123		# ip4 vs ip6
2124		self.assertNotEqual(ip4[0], ip6[0])
2125		self.assertTrue(ip4[0] < ip6[0])
2126		self.assertTrue(ip4[2] < ip6[2])
2127		self.assertEqual(sorted(reversed(ip4+ip6)), ip4+ip6)
2128		# hashing (with string as key):
2129		d={
2130			'93.184.216.34': 'ip4-test',
2131			'2606:2800:220:1:248:1893:25c8:1946': 'ip6-test'
2132		}
2133		d2 = dict([(IPAddr(k), v) for k, v in d.items()])
2134		self.assertTrue(isinstance(list(d.keys())[0], str))
2135		self.assertTrue(isinstance(list(d2.keys())[0], IPAddr))
2136		self.assertEqual(d.get(ip4[2], ''), 'ip4-test')
2137		self.assertEqual(d.get(ip6[2], ''), 'ip6-test')
2138		self.assertEqual(d2.get(str(ip4[2]), ''), 'ip4-test')
2139		self.assertEqual(d2.get(str(ip6[2]), ''), 'ip6-test')
2140		# compare with string direct:
2141		self.assertEqual(d, d2)
2142
2143	def testIPAddr_CIDR(self):
2144		self.assertEqual(str(IPAddr('93.184.0.1', 24)), '93.184.0.0/24')
2145		self.assertEqual(str(IPAddr('192.168.1.0/255.255.255.128')), '192.168.1.0/25')
2146		self.assertEqual(IPAddr('93.184.0.1', 24).ntoa, '93.184.0.0/24')
2147		self.assertEqual(IPAddr('192.168.1.0/255.255.255.128').ntoa, '192.168.1.0/25')
2148
2149		self.assertEqual(IPAddr('93.184.0.1/32').ntoa, '93.184.0.1')
2150		self.assertEqual(IPAddr('93.184.0.1/255.255.255.255').ntoa, '93.184.0.1')
2151
2152		self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8::', 120)), '2606:2800:220:1:248:1893:25c8:0/120')
2153		self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8::', 120).ntoa, '2606:2800:220:1:248:1893:25c8:0/120')
2154		self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8:0/120')), '2606:2800:220:1:248:1893:25c8:0/120')
2155		self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:0/120').ntoa, '2606:2800:220:1:248:1893:25c8:0/120')
2156
2157		self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::', 25)), '2606:2880::/25')
2158		self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ff80::')), '2606:2880::/25')
2159		self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ffff:ffff:ffff:ffff:ffff:ffff::')),
2160			'2606:28ff:220:1:248:1893:25c8:0/112')
2161
2162		self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/128')),
2163			'2606:28ff:220:1:248:1893:25c8:0')
2164		self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')),
2165			'2606:28ff:220:1:248:1893:25c8:0')
2166
2167	def testIPAddr_CIDR_Wrong(self):
2168		# too many plen representations:
2169		self.assertRaises(ValueError, IPAddr, '2606:28ff:220:1:248:1893:25c8::/ffff::/::1')
2170
2171	def testIPAddr_CIDR_Repr(self):
2172		self.assertEqual(["127.0.0.0/8", "::/32", "2001:db8::/32"],
2173			[IPAddr("127.0.0.0", 8), IPAddr("::1", 32), IPAddr("2001:db8::", 32)]
2174		)
2175
2176	def testIPAddr_CompareDNS(self):
2177		#unittest.F2B.SkipIfNoNetwork()
2178		ips = IPAddr('example.com')
2179		self.assertTrue(IPAddr("93.184.216.34").isInNet(ips))
2180		self.assertTrue(IPAddr("2606:2800:220:1:248:1893:25c8:1946").isInNet(ips))
2181
2182	def testIPAddr_wrongDNS_IP(self):
2183		unittest.F2B.SkipIfNoNetwork()
2184		DNSUtils.dnsToIp('`this`.dns-is-wrong.`wrong-nic`-dummy')
2185		DNSUtils.ipToName('*')
2186
2187	def testIPAddr_Cached(self):
2188		ips = [DNSUtils.dnsToIp('example.com'), DNSUtils.dnsToIp('example.com')]
2189		for ip1, ip2 in zip(ips, ips):
2190			self.assertEqual(id(ip1), id(ip2))
2191		ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2))
2192		ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2))
2193
2194	def testFQDN(self):
2195		sname = DNSUtils.getHostname(fqdn=False)
2196		lname = DNSUtils.getHostname(fqdn=True)
2197		# FQDN is not localhost if short hostname is not localhost too (or vice versa):
2198		self.assertEqual(lname != 'localhost',
2199		                 sname != 'localhost')
2200		# FQDN from short name should be long name:
2201		self.assertEqual(getfqdn(sname), lname)
2202		# FQDN from FQDN is the same:
2203		self.assertEqual(getfqdn(lname), lname)
2204		# coverage (targeting all branches): FQDN from loopback and DNS blackhole is always the same:
2205		self.assertIn(getfqdn('localhost.'), ('localhost', 'localhost.'))
2206
2207	def testFQDN_DNS(self):
2208		unittest.F2B.SkipIfNoNetwork()
2209		self.assertIn(getfqdn('as112.arpa.'), ('as112.arpa.', 'as112.arpa'))
2210
2211
2212class JailTests(unittest.TestCase):
2213
2214	def testSetBackend_gh83(self):
2215		# smoke test
2216		# Must not fail to initiate
2217		Jail('test', backend='polling')
2218
2219