1import re 2import os.path 3from carbon import log 4from twisted.internet.task import LoopingCall 5 6 7class RegexList: 8 """ Maintain a list of regex for matching whitelist and blacklist """ 9 10 def __init__(self): 11 self.regex_list = [] 12 self.list_file = None 13 self.read_task = LoopingCall(self.read_list) 14 self.rules_last_read = 0.0 15 16 def read_from(self, list_file): 17 self.list_file = list_file 18 self.read_list() 19 self.read_task.start(10, now=False) 20 21 def read_list(self): 22 # Clear rules and move on if file isn't there 23 if not os.path.exists(self.list_file): 24 self.regex_list = [] 25 return 26 27 try: 28 mtime = os.path.getmtime(self.list_file) 29 except OSError: 30 log.err("Failed to get mtime of %s" % self.list_file) 31 return 32 33 if mtime <= self.rules_last_read: 34 return 35 36 # Begin read 37 new_regex_list = [] 38 for line in open(self.list_file): 39 pattern = line.strip() 40 if line.startswith('#') or not pattern: 41 continue 42 try: 43 new_regex_list.append(re.compile(pattern)) 44 except re.error: 45 log.err("Failed to parse '%s' in '%s'. Ignoring line" % (pattern, self.list_file)) 46 47 self.regex_list = new_regex_list 48 self.rules_last_read = mtime 49 50 def __contains__(self, value): 51 for regex in self.regex_list: 52 if regex.search(value): 53 return True 54 return False 55 56 def __nonzero__(self): 57 return bool(self.regex_list) 58 59 __bool__ = __nonzero__ # py2/3 compatibility 60 61 62WhiteList = RegexList() 63BlackList = RegexList() 64