1# Copyright (C) 2017-2019 Open Information Security Foundation 2# Copyright (c) 2011 Jason Ish 3# 4# You can copy, redistribute or modify this Program under the terms of 5# the GNU General Public License version 2 as published by the Free 6# Software Foundation. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# version 2 along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 16# 02110-1301, USA. 17 18""" Module for parsing Snort-like rules. 19 20Parsing is done using regular expressions and the job of this module 21is to do its best at parsing out fields of interest from the rule 22rather than perform a sanity check. 23 24The methods that parse multiple rules for a provided input 25(parse_file, parse_fileobj) return a list of rules instead of dict 26keyed by ID as its not the job of this module to detect or deal with 27duplicate signature IDs. 28""" 29 30from __future__ import print_function 31 32import sys 33import re 34import logging 35import io 36 37logger = logging.getLogger(__name__) 38 39# Compile an re pattern for basic rule matching. 40rule_pattern = re.compile(r"^(?P<enabled>#)*[\s#]*" 41 r"(?P<raw>" 42 r"(?P<header>[^()]+)" 43 r"\((?P<options>.*)\)" 44 r"$)") 45 46# Rule actions we expect to see. 47actions = ( 48 "alert", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop") 49 50class NoEndOfOptionError(Exception): 51 """Exception raised when the end of option terminator (semicolon) is 52 missing.""" 53 pass 54 55class Rule(dict): 56 """Class representing a rule. 57 58 The Rule class is a class that also acts like a dictionary. 59 60 Dictionary fields: 61 62 - **group**: The group the rule belongs to, typically the filename. 63 - **enabled**: True if rule is enabled (uncommented), False is 64 disabled (commented) 65 - **action**: The action of the rule (alert, pass, etc) as a 66 string 67 - **proto**: The protocol of the rule. 68 - **direction**: The direction string of the rule. 69 - **gid**: The gid of the rule as an integer 70 - **sid**: The sid of the rule as an integer 71 - **rev**: The revision of the rule as an integer 72 - **msg**: The rule message as a string 73 - **flowbits**: List of flowbit options in the rule 74 - **metadata**: Metadata values as a list 75 - **references**: References as a list 76 - **classtype**: The classification type 77 - **priority**: The rule priority, 0 if not provided 78 - **noalert**: Is the rule a noalert rule 79 - **features**: Features required by this rule 80 - **raw**: The raw rule as read from the file or buffer 81 82 :param enabled: Optional parameter to set the enabled state of the rule 83 :param action: Optional parameter to set the action of the rule 84 :param group: Optional parameter to set the group (filename) of the rule 85 86 """ 87 88 def __init__(self, enabled=None, action=None, group=None): 89 dict.__init__(self) 90 self["enabled"] = enabled 91 self["action"] = action 92 self["proto"] = None 93 self["source_addr"] = None 94 self["source_port"] = None 95 self["direction"] = None 96 self["dest_addr"] = None 97 self["dest_port"] = None 98 self["group"] = group 99 self["gid"] = 1 100 self["sid"] = None 101 self["rev"] = 0 102 self["msg"] = None 103 self["flowbits"] = [] 104 self["metadata"] = [] 105 self["references"] = [] 106 self["classtype"] = None 107 self["priority"] = 0 108 self["noalert"] = False 109 110 self["features"] = [] 111 112 self["raw"] = None 113 114 def __getattr__(self, name): 115 return self[name] 116 117 @property 118 def id(self): 119 """ The ID of the rule. 120 121 :returns: A tuple (gid, sid) representing the ID of the rule 122 :rtype: A tuple of 2 ints 123 """ 124 return (int(self.gid), int(self.sid)) 125 126 @property 127 def idstr(self): 128 """Return the gid and sid of the rule as a string formatted like: 129 '[GID:SID]'""" 130 return "[%s:%s]" % (str(self.gid), str(self.sid)) 131 132 def brief(self): 133 """ A brief description of the rule. 134 135 :returns: A brief description of the rule 136 :rtype: string 137 """ 138 return "%s[%d:%d] %s" % ( 139 "" if self.enabled else "# ", self.gid, self.sid, self.msg) 140 141 def __hash__(self): 142 return self["raw"].__hash__() 143 144 def __str__(self): 145 """ The string representation of the rule. 146 147 If the rule is disabled it will be returned as commented out. 148 """ 149 return self.format() 150 151 def format(self): 152 if self.noalert and not "noalert;" in self.raw: 153 self.raw = re.sub(r'( *sid\: *[0-9]+\;)', r' noalert;\1', self.raw) 154 return u"%s%s" % (u"" if self.enabled else u"# ", self.raw) 155 156def find_opt_end(options): 157 """ Find the end of an option (;) handling escapes. """ 158 offset = 0 159 160 while True: 161 i = options[offset:].find(";") 162 if options[offset + i - 1] == "\\": 163 offset += 2 164 else: 165 return offset + i 166 167class BadSidError(Exception): 168 """Raises exception when sid is of type null""" 169 170def parse(buf, group=None): 171 """ Parse a single rule for a string buffer. 172 173 :param buf: A string buffer containing a single Snort-like rule 174 175 :returns: An instance of of :py:class:`.Rule` representing the parsed rule 176 """ 177 178 if type(buf) == type(b""): 179 buf = buf.decode("utf-8") 180 buf = buf.strip() 181 182 m = rule_pattern.match(buf) 183 if not m: 184 return None 185 186 if m.group("enabled") == "#": 187 enabled = False 188 else: 189 enabled = True 190 191 header = m.group("header").strip() 192 193 rule = Rule(enabled=enabled, group=group) 194 195 # If a decoder rule, the header will be one word. 196 if len(header.split(" ")) == 1: 197 action = header 198 direction = None 199 else: 200 states = ["action", 201 "proto", 202 "source_addr", 203 "source_port", 204 "direction", 205 "dest_addr", 206 "dest_port", 207 ] 208 state = 0 209 210 rem = header 211 while state < len(states): 212 if not rem: 213 return None 214 if rem[0] == "[": 215 end = rem.find("]") 216 if end < 0: 217 return 218 end += 1 219 token = rem[:end].strip() 220 rem = rem[end:].strip() 221 else: 222 end = rem.find(" ") 223 if end < 0: 224 token = rem 225 rem = "" 226 else: 227 token = rem[:end].strip() 228 rem = rem[end:].strip() 229 230 if states[state] == "action": 231 action = token 232 elif states[state] == "proto": 233 rule["proto"] = token 234 elif states[state] == "source_addr": 235 rule["source_addr"] = token 236 elif states[state] == "source_port": 237 rule["source_port"] = token 238 elif states[state] == "direction": 239 direction = token 240 elif states[state] == "dest_addr": 241 rule["dest_addr"] = token 242 elif states[state] == "dest_port": 243 rule["dest_port"] = token 244 245 state += 1 246 247 if action not in actions: 248 return None 249 250 rule["action"] = action 251 rule["direction"] = direction 252 rule["header"] = header 253 254 options = m.group("options") 255 256 while True: 257 if not options: 258 break 259 index = find_opt_end(options) 260 if index < 0: 261 raise NoEndOfOptionError("no end of option") 262 option = options[:index].strip() 263 options = options[index + 1:].strip() 264 265 if option.find(":") > -1: 266 name, val = [x.strip() for x in option.split(":", 1)] 267 else: 268 name = option 269 val = None 270 271 if name in ["gid", "sid", "rev"]: 272 rule[name] = int(val) 273 elif name == "metadata": 274 if not name in rule: 275 rule[name] = [] 276 rule[name] += [v.strip() for v in val.split(",")] 277 elif name == "flowbits": 278 rule.flowbits.append(val) 279 if val and val.find("noalert") > -1: 280 rule["noalert"] = True 281 elif name == "noalert": 282 rule["noalert"] = True 283 elif name == "reference": 284 rule.references.append(val) 285 elif name == "msg": 286 if val and val.startswith('"') and val.endswith('"'): 287 val = val[1:-1] 288 rule[name] = val 289 else: 290 rule[name] = val 291 292 if name.startswith("ja3"): 293 rule["features"].append("ja3") 294 295 if rule["msg"] is None: 296 rule["msg"] = "" 297 298 if not rule["sid"]: 299 raise BadSidError("Sid cannot be of type null") 300 301 rule["raw"] = m.group("raw").strip() 302 303 return rule 304 305def parse_fileobj(fileobj, group=None): 306 """ Parse multiple rules from a file like object. 307 308 Note: At this time rules must exist on one line. 309 310 :param fileobj: A file like object to parse rules from. 311 312 :returns: A list of :py:class:`.Rule` instances, one for each rule parsed 313 """ 314 rules = [] 315 buf = "" 316 for line in fileobj: 317 try: 318 if type(line) == type(b""): 319 line = line.decode() 320 except: 321 pass 322 if line.rstrip().endswith("\\"): 323 buf = "%s%s " % (buf, line.rstrip()[0:-1]) 324 continue 325 buf = buf + line 326 try: 327 rule = parse(buf, group) 328 if rule: 329 rules.append(rule) 330 except Exception as err: 331 logger.error("Failed to parse rule: %s: %s", buf.rstrip(), err) 332 buf = "" 333 return rules 334 335def parse_file(filename, group=None): 336 """ Parse multiple rules from the provided filename. 337 338 :param filename: Name of file to parse rules from 339 340 :returns: A list of :py:class:`.Rule` instances, one for each rule parsed 341 """ 342 with io.open(filename, encoding="utf-8") as fileobj: 343 return parse_fileobj(fileobj, group) 344 345class FlowbitResolver(object): 346 347 setters = ["set", "setx", "unset", "toggle"] 348 getters = ["isset", "isnotset"] 349 350 def __init__(self): 351 self.enabled = [] 352 353 def resolve(self, rules): 354 required = self.get_required_flowbits(rules) 355 enabled = self.set_required_flowbits(rules, required) 356 if enabled: 357 self.enabled += enabled 358 return self.resolve(rules) 359 return self.enabled 360 361 def set_required_flowbits(self, rules, required): 362 enabled = [] 363 for rule in [rule for rule in rules.values() if not rule.enabled]: 364 for option, value in map(self.parse_flowbit, rule.flowbits): 365 if option in self.setters and value in required: 366 rule.enabled = True 367 enabled.append(rule) 368 return enabled 369 370 def get_required_rules(self, rulemap, flowbits, include_enabled=False): 371 """Returns a list of rules that need to be enabled in order to satisfy 372 the list of required flowbits. 373 374 """ 375 required = [] 376 377 for rule in [rule for rule in rulemap.values()]: 378 if not rule: 379 continue 380 for option, value in map(self.parse_flowbit, rule.flowbits): 381 if option in self.setters and value in flowbits: 382 if rule.enabled and not include_enabled: 383 continue 384 required.append(rule) 385 386 return required 387 388 def get_required_flowbits(self, rules): 389 required_flowbits = set() 390 for rule in [rule for rule in rules.values() if rule and rule.enabled]: 391 for option, value in map(self.parse_flowbit, rule.flowbits): 392 if option in self.getters: 393 required_flowbits.add(value) 394 return required_flowbits 395 396 def parse_flowbit(self, flowbit): 397 tokens = flowbit.split(",", 1) 398 if len(tokens) == 1: 399 return tokens[0], None 400 elif len(tokens) == 2: 401 return tokens[0], tokens[1] 402 else: 403 raise Exception("Flowbit parse error on %s" % (flowbit)) 404 405def enable_flowbit_dependencies(rulemap): 406 """Helper function to resolve flowbits, wrapping the FlowbitResolver 407 class. """ 408 resolver = FlowbitResolver() 409 return resolver.resolve(rulemap) 410 411def format_sidmsgmap(rule): 412 """ Format a rule as a sid-msg.map entry. """ 413 try: 414 return " || ".join([str(rule.sid), rule.msg] + rule.references) 415 except: 416 logger.error("Failed to format rule as sid-msg.map: %s" % (str(rule))) 417 return None 418 419def format_sidmsgmap_v2(rule): 420 """ Format a rule as a v2 sid-msg.map entry. 421 422 eg: 423 gid || sid || rev || classification || priority || msg || ref0 || refN 424 """ 425 try: 426 return " || ".join([ 427 str(rule.gid), str(rule.sid), str(rule.rev), 428 "NOCLASS" if rule.classtype is None else rule.classtype, 429 str(rule.priority), rule.msg] + rule.references) 430 except: 431 logger.error("Failed to format rule as sid-msg-v2.map: %s" % ( 432 str(rule))) 433 return None 434 435def parse_var_names(var): 436 """ Parse out the variable names from a string. """ 437 if var is None: 438 return [] 439 return re.findall("\$([\w_]+)", var) 440