1# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
2# vi: set ft=python sts=4 ts=4 sw=4 noet :
3
4# This file is part of Fail2Ban.
5#
6# Fail2Ban is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# Fail2Ban is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Fail2Ban; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19
20
21__author__ = "Steven Hiscocks"
22__copyright__ = "Copyright (c) 2013 Steven Hiscocks"
23__license__ = "GPL"
24
25import datetime
26import time
27from distutils.version import LooseVersion
28
29from systemd import journal
30if LooseVersion(getattr(journal, '__version__', "0")) < '204':
31	raise ImportError("Fail2Ban requires systemd >= 204")
32
33from .failmanager import FailManagerEmpty
34from .filter import JournalFilter, Filter
35from .mytime import MyTime
36from .utils import Utils
37from ..helpers import getLogger, logging, splitwords, uni_decode
38
39# Gets the instance of the logger.
40logSys = getLogger(__name__)
41
42
43##
44# Journal reader class.
45#
46# This class reads from systemd journal and detects login failures or anything
47# else that matches a given regular expression. This class is instantiated by
48# a Jail object.
49
50class FilterSystemd(JournalFilter): # pragma: systemd no cover
51	##
52	# Constructor.
53	#
54	# Initialize the filter object with default values.
55	# @param jail the jail object
56
57	def __init__(self, jail, **kwargs):
58		jrnlargs = FilterSystemd._getJournalArgs(kwargs)
59		JournalFilter.__init__(self, jail, **kwargs)
60		self.__modified = 0
61		# Initialise systemd-journal connection
62		self.__journal = journal.Reader(**jrnlargs)
63		self.__matches = []
64		self.setDatePattern(None)
65		logSys.debug("Created FilterSystemd")
66
67	@staticmethod
68	def _getJournalArgs(kwargs):
69		args = {'converters':{'__CURSOR': lambda x: x}}
70		try:
71			args['path'] = kwargs.pop('journalpath')
72		except KeyError:
73			pass
74
75		try:
76			args['files'] = kwargs.pop('journalfiles')
77		except KeyError:
78			pass
79		else:
80			import glob
81			p = args['files']
82			if not isinstance(p, (list, set, tuple)):
83				p = splitwords(p)
84			files = []
85			for p in p:
86				files.extend(glob.glob(p))
87			args['files'] = list(set(files))
88
89		# Default flags is SYSTEM_ONLY(4). This would lead to ignore user session files,
90		# so can prevent "Too many open files" errors on a lot of user sessions (see gh-2392):
91		try:
92			args['flags'] = int(kwargs.pop('journalflags'))
93		except KeyError:
94			# be sure all journal types will be opened if files specified (don't set flags):
95			if 'files' not in args or not len(args['files']):
96				args['flags'] = 4
97
98		return args
99
100	##
101	# Add a journal match filters from list structure
102	#
103	# @param matches list structure with journal matches
104
105	def _addJournalMatches(self, matches):
106		if self.__matches:
107			self.__journal.add_disjunction() # Add OR
108		newMatches = []
109		for match in matches:
110			newMatches.append([])
111			for match_element in match:
112				self.__journal.add_match(match_element)
113				newMatches[-1].append(match_element)
114			self.__journal.add_disjunction()
115		self.__matches.extend(newMatches)
116
117	##
118	# Add a journal match filter
119	#
120	# @param match journalctl syntax matches in list structure
121
122	def addJournalMatch(self, match):
123		newMatches = [[]]
124		for match_element in match:
125			if match_element == "+":
126				newMatches.append([])
127			else:
128				newMatches[-1].append(match_element)
129		try:
130			self._addJournalMatches(newMatches)
131		except ValueError:
132			logSys.error(
133				"Error adding journal match for: %r", " ".join(match))
134			self.resetJournalMatches()
135			raise
136		else:
137			logSys.info("[%s] Added journal match for: %r", self.jailName,
138				" ".join(match))
139	##
140	# Reset a journal match filter called on removal or failure
141	#
142	# @return None
143
144	def resetJournalMatches(self):
145		self.__journal.flush_matches()
146		logSys.debug("[%s] Flushed all journal matches", self.jailName)
147		match_copy = self.__matches[:]
148		self.__matches = []
149		try:
150			self._addJournalMatches(match_copy)
151		except ValueError:
152			logSys.error("Error restoring journal matches")
153			raise
154		else:
155			logSys.debug("Journal matches restored")
156
157	##
158	# Delete a journal match filter
159	#
160	# @param match journalctl syntax matches
161
162	def delJournalMatch(self, match=None):
163		# clear all:
164		if match is None:
165			if not self.__matches:
166				return
167			del self.__matches[:]
168		# delete by index:
169		elif match in self.__matches:
170			del self.__matches[self.__matches.index(match)]
171		else:
172			raise ValueError("Match %r not found" % match)
173		self.resetJournalMatches()
174		logSys.info("[%s] Removed journal match for: %r", self.jailName,
175			match if match else '*')
176
177	##
178	# Get current journal match filter
179	#
180	# @return journalctl syntax matches
181
182	def getJournalMatch(self):
183		return self.__matches
184
185	##
186	# Get journal reader
187	#
188	# @return journal reader
189
190	def getJournalReader(self):
191		return self.__journal
192
193	def getJrnEntTime(self, logentry):
194		""" Returns time of entry as tuple (ISO-str, Posix)."""
195		date = logentry.get('_SOURCE_REALTIME_TIMESTAMP')
196		if date is None:
197				date = logentry.get('__REALTIME_TIMESTAMP')
198		return (date.isoformat(), time.mktime(date.timetuple()) + date.microsecond/1.0E6)
199
200	##
201	# Format journal log entry into syslog style
202	#
203	# @param entry systemd journal entry dict
204	# @return format log line
205
206	def formatJournalEntry(self, logentry):
207		# Be sure, all argument of line tuple should have the same type:
208		enc = self.getLogEncoding()
209		logelements = []
210		v = logentry.get('_HOSTNAME')
211		if v:
212			logelements.append(uni_decode(v, enc))
213		v = logentry.get('SYSLOG_IDENTIFIER')
214		if not v:
215			v = logentry.get('_COMM')
216		if v:
217			logelements.append(uni_decode(v, enc))
218			v = logentry.get('SYSLOG_PID')
219			if not v:
220				v = logentry.get('_PID')
221			if v:
222				try: # [integer] (if already numeric):
223					v = "[%i]" % v
224				except TypeError:
225					try: # as [integer] (try to convert to int):
226						v = "[%i]" % int(v, 0)
227					except (TypeError, ValueError): # fallback - [string] as it is
228						v = "[%s]" % v
229				logelements[-1] += v
230			logelements[-1] += ":"
231			if logelements[-1] == "kernel:":
232				monotonic = logentry.get('_SOURCE_MONOTONIC_TIMESTAMP')
233				if monotonic is None:
234					monotonic = logentry.get('__MONOTONIC_TIMESTAMP')[0]
235				logelements.append("[%12.6f]" % monotonic.total_seconds())
236		msg = logentry.get('MESSAGE','')
237		if isinstance(msg, list):
238			logelements.append(" ".join(uni_decode(v, enc) for v in msg))
239		else:
240			logelements.append(uni_decode(msg, enc))
241
242		logline = " ".join(logelements)
243
244		date = self.getJrnEntTime(logentry)
245		logSys.log(5, "[%s] Read systemd journal entry: %s %s", self.jailName,
246			date[0], logline)
247		## use the same type for 1st argument:
248		return ((logline[:0], date[0], logline.replace('\n', '\\n')), date[1])
249
250	def seekToTime(self, date):
251		if not isinstance(date, datetime.datetime):
252			date = datetime.datetime.fromtimestamp(date)
253		self.__journal.seek_realtime(date)
254
255	##
256	# Main loop.
257	#
258	# Peridocily check for new journal entries matching the filter and
259	# handover to FailManager
260
261	def run(self):
262
263		if not self.getJournalMatch():
264			logSys.notice(
265				"Jail started without 'journalmatch' set. "
266				"Jail regexs will be checked against all journal entries, "
267				"which is not advised for performance reasons.")
268
269		# Try to obtain the last known time (position of journal)
270		start_time = 0
271		if self.jail.database is not None:
272			start_time = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0
273		# Seek to max(last_known_time, now - findtime) in journal
274		start_time = max( start_time, MyTime.time() - int(self.getFindTime()) )
275		self.seekToTime(start_time)
276		# Move back one entry to ensure do not end up in dead space
277		# if start time beyond end of journal
278		try:
279			self.__journal.get_previous()
280		except OSError:
281			pass # Reading failure, so safe to ignore
282
283		while self.active:
284			# wait for records (or for timeout in sleeptime seconds):
285			try:
286				## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
287				## don't use `journal.close()` for it, because in some python/systemd implementation it may
288				## cause abnormal program termination
289				#self.__journal.wait(self.sleeptime) != journal.NOP
290				##
291				## wait for entries without sleep in intervals, because "sleeping" in journal.wait:
292				Utils.wait_for(lambda: not self.active or \
293					self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP,
294					self.sleeptime, 0.00001)
295				if self.idle:
296					# because journal.wait will returns immediatelly if we have records in journal,
297					# just wait a little bit here for not idle, to prevent hi-load:
298					if not Utils.wait_for(lambda: not self.active or not self.idle,
299						self.sleeptime * 10, self.sleeptime
300					):
301						self.ticks += 1
302						continue
303				self.__modified = 0
304				while self.active:
305					logentry = None
306					try:
307						logentry = self.__journal.get_next()
308					except OSError as e:
309						logSys.error("Error reading line from systemd journal: %s",
310							e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
311					self.ticks += 1
312					if logentry:
313						line = self.formatJournalEntry(logentry)
314						self.processLineAndAdd(*line)
315						self.__modified += 1
316						if self.__modified >= 100: # todo: should be configurable
317							break
318					else:
319						break
320				if self.__modified:
321					if not self.banASAP: # pragma: no cover
322						self.performBan()
323					self.__modified = 0
324					# update position in log (time and iso string):
325					if self.jail.database is not None:
326						self.jail.database.updateJournal(self.jail, 'systemd-journal', line[1], line[0][1])
327			except Exception as e: # pragma: no cover
328				if not self.active: # if not active - error by stop...
329					break
330				logSys.error("Caught unhandled exception in main cycle: %r", e,
331					exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
332				# incr common error counter:
333				self.commonError()
334
335		logSys.debug("[%s] filter terminated", self.jailName)
336
337		# close journal:
338		try:
339			if self.__journal:
340				self.__journal.close()
341		except Exception as e: # pragma: no cover
342			logSys.error("Close journal failed: %r", e,
343				exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
344
345		logSys.debug("[%s] filter exited (systemd)", self.jailName)
346		return True
347
348	def status(self, flavor="basic"):
349		ret = super(FilterSystemd, self).status(flavor=flavor)
350		ret.append(("Journal matches",
351			[" + ".join(" ".join(match) for match in self.__matches)]))
352		return ret
353