1"""ApacheParser is a member object of the ApacheConfigurator class.""" 2import copy 3import fnmatch 4import logging 5import re 6from typing import Dict 7from typing import List 8from typing import Optional 9 10from certbot import errors 11from certbot.compat import os 12from certbot_apache._internal import apache_util 13from certbot_apache._internal import constants 14 15try: 16 from augeas import Augeas 17except ImportError: # pragma: no cover 18 Augeas = None 19 20logger = logging.getLogger(__name__) 21 22 23class ApacheParser: 24 """Class handles the fine details of parsing the Apache Configuration. 25 26 .. todo:: Make parsing general... remove sites-available etc... 27 28 :ivar str root: Normalized absolute path to the server root 29 directory. Without trailing slash. 30 :ivar set modules: All module names that are currently enabled. 31 :ivar dict loc: Location to place directives, root - configuration origin, 32 default - user config file, name - NameVirtualHost, 33 34 """ 35 arg_var_interpreter = re.compile(r"\$\{[^ \}]*}") 36 fnmatch_chars = {"*", "?", "\\", "[", "]"} 37 38 def __init__(self, root, vhostroot=None, version=(2, 4), 39 configurator=None): 40 # Note: Order is important here. 41 42 # Needed for calling save() with reverter functionality that resides in 43 # AugeasConfigurator superclass of ApacheConfigurator. This resolves 44 # issues with aug.load() after adding new files / defines to parse tree 45 self.configurator = configurator 46 47 # Initialize augeas 48 self.aug = init_augeas() 49 50 if not self.check_aug_version(): 51 raise errors.NotSupportedError( 52 "Apache plugin support requires libaugeas0 and augeas-lenses " 53 "version 1.2.0 or higher, please make sure you have you have " 54 "those installed.") 55 56 self.modules: Dict[str, Optional[str]] = {} 57 self.parser_paths: Dict[str, List[str]] = {} 58 self.variables: Dict[str, str] = {} 59 60 # Find configuration root and make sure augeas can parse it. 61 self.root = os.path.abspath(root) 62 self.loc = {"root": self._find_config_root()} 63 self.parse_file(self.loc["root"]) 64 65 if version >= (2, 4): 66 # Look up variables from httpd and add to DOM if not already parsed 67 self.update_runtime_variables() 68 69 # This problem has been fixed in Augeas 1.0 70 self.standardize_excl() 71 72 # Parse LoadModule directives from configuration files 73 self.parse_modules() 74 75 # Set up rest of locations 76 self.loc.update(self._set_locations()) 77 78 # list of the active include paths, before modifications 79 self.existing_paths = copy.deepcopy(self.parser_paths) 80 81 # Must also attempt to parse additional virtual host root 82 if vhostroot: 83 self.parse_file(os.path.abspath(vhostroot) + "/" + 84 self.configurator.options.vhost_files) 85 86 # check to see if there were unparsed define statements 87 if version < (2, 4): 88 if self.find_dir("Define", exclude=False): 89 raise errors.PluginError("Error parsing runtime variables") 90 91 def check_parsing_errors(self, lens): 92 """Verify Augeas can parse all of the lens files. 93 94 :param str lens: lens to check for errors 95 96 :raises .errors.PluginError: If there has been an error in parsing with 97 the specified lens. 98 99 """ 100 error_files = self.aug.match("/augeas//error") 101 102 for path in error_files: 103 # Check to see if it was an error resulting from the use of 104 # the httpd lens 105 lens_path = self.aug.get(path + "/lens") 106 # As aug.get may return null 107 if lens_path and lens in lens_path: 108 msg = ( 109 "There has been an error in parsing the file {0} on line {1}: " 110 "{2}".format( 111 # Strip off /augeas/files and /error 112 path[13:len(path) - 6], 113 self.aug.get(path + "/line"), 114 self.aug.get(path + "/message"))) 115 raise errors.PluginError(msg) 116 117 def check_aug_version(self): 118 """ Checks that we have recent enough version of libaugeas. 119 If augeas version is recent enough, it will support case insensitive 120 regexp matching""" 121 122 self.aug.set("/test/path/testing/arg", "aRgUMeNT") 123 try: 124 matches = self.aug.match( 125 "/test//*[self::arg=~regexp('argument', 'i')]") 126 except RuntimeError: 127 self.aug.remove("/test/path") 128 return False 129 self.aug.remove("/test/path") 130 return matches 131 132 def unsaved_files(self): 133 """Lists files that have modified Augeas DOM but the changes have not 134 been written to the filesystem yet, used by `self.save()` and 135 ApacheConfigurator to check the file state. 136 137 :raises .errors.PluginError: If there was an error in Augeas, in 138 an attempt to save the configuration, or an error creating a 139 checkpoint 140 141 :returns: `set` of unsaved files 142 """ 143 save_state = self.aug.get("/augeas/save") 144 self.aug.set("/augeas/save", "noop") 145 # Existing Errors 146 ex_errs = self.aug.match("/augeas//error") 147 try: 148 # This is a noop save 149 self.aug.save() 150 except (RuntimeError, IOError): 151 self._log_save_errors(ex_errs) 152 # Erase Save Notes 153 self.configurator.save_notes = "" 154 raise errors.PluginError( 155 "Error saving files, check logs for more info.") 156 157 # Return the original save method 158 self.aug.set("/augeas/save", save_state) 159 160 # Retrieve list of modified files 161 # Note: Noop saves can cause the file to be listed twice, I used a 162 # set to remove this possibility. This is a known augeas 0.10 error. 163 save_paths = self.aug.match("/augeas/events/saved") 164 165 save_files = set() 166 if save_paths: 167 for path in save_paths: 168 save_files.add(self.aug.get(path)[6:]) 169 return save_files 170 171 def ensure_augeas_state(self): 172 """Makes sure that all Augeas dom changes are written to files to avoid 173 loss of configuration directives when doing additional augeas parsing, 174 causing a possible augeas.load() resulting dom reset 175 """ 176 177 if self.unsaved_files(): 178 self.configurator.save_notes += "(autosave)" 179 self.configurator.save() 180 181 def save(self, save_files): 182 """Saves all changes to the configuration files. 183 184 save() is called from ApacheConfigurator to handle the parser specific 185 tasks of saving. 186 187 :param list save_files: list of strings of file paths that we need to save. 188 189 """ 190 self.configurator.save_notes = "" 191 self.aug.save() 192 193 # Force reload if files were modified 194 # This is needed to recalculate augeas directive span 195 if save_files: 196 for sf in save_files: 197 self.aug.remove("/files/"+sf) 198 self.aug.load() 199 200 def _log_save_errors(self, ex_errs): 201 """Log errors due to bad Augeas save. 202 203 :param list ex_errs: Existing errors before save 204 205 """ 206 # Check for the root of save problems 207 new_errs = self.aug.match("/augeas//error") 208 # logger.error("During Save - %s", mod_conf) 209 logger.error("Unable to save files: %s. Attempted Save Notes: %s", 210 ", ".join(err[13:len(err) - 6] for err in new_errs 211 # Only new errors caused by recent save 212 if err not in ex_errs), self.configurator.save_notes) 213 214 def add_include(self, main_config, inc_path): 215 """Add Include for a new configuration file if one does not exist 216 217 :param str main_config: file path to main Apache config file 218 :param str inc_path: path of file to include 219 220 """ 221 if not self.find_dir(case_i("Include"), inc_path): 222 logger.debug("Adding Include %s to %s", 223 inc_path, get_aug_path(main_config)) 224 self.add_dir( 225 get_aug_path(main_config), 226 "Include", inc_path) 227 228 # Add new path to parser paths 229 new_dir = os.path.dirname(inc_path) 230 new_file = os.path.basename(inc_path) 231 self.existing_paths.setdefault(new_dir, []).append(new_file) 232 233 def add_mod(self, mod_name): 234 """Shortcut for updating parser modules.""" 235 if mod_name + "_module" not in self.modules: 236 self.modules[mod_name + "_module"] = None 237 if "mod_" + mod_name + ".c" not in self.modules: 238 self.modules["mod_" + mod_name + ".c"] = None 239 240 def reset_modules(self): 241 """Reset the loaded modules list. This is called from cleanup to clear 242 temporarily loaded modules.""" 243 self.modules = {} 244 self.update_modules() 245 self.parse_modules() 246 247 def parse_modules(self): 248 """Iterates on the configuration until no new modules are loaded. 249 250 ..todo:: This should be attempted to be done with a binary to avoid 251 the iteration issue. Else... parse and enable mods at same time. 252 253 """ 254 mods: Dict[str, str] = {} 255 matches = self.find_dir("LoadModule") 256 iterator = iter(matches) 257 # Make sure prev_size != cur_size for do: while: iteration 258 prev_size = -1 259 260 while len(mods) != prev_size: 261 prev_size = len(mods) 262 263 for match_name, match_filename in zip( 264 iterator, iterator): 265 mod_name = self.get_arg(match_name) 266 mod_filename = self.get_arg(match_filename) 267 if mod_name and mod_filename: 268 mods[mod_name] = mod_filename 269 mods[os.path.basename(mod_filename)[:-2] + "c"] = mod_filename 270 else: 271 logger.debug("Could not read LoadModule directive from Augeas path: %s", 272 match_name[6:]) 273 self.modules.update(mods) 274 275 def update_runtime_variables(self): 276 """Update Includes, Defines and Includes from httpd config dump data""" 277 278 self.update_defines() 279 self.update_includes() 280 self.update_modules() 281 282 def update_defines(self): 283 """Updates the dictionary of known variables in the configuration""" 284 285 self.variables = apache_util.parse_defines(self.configurator.options.ctl) 286 287 def update_includes(self): 288 """Get includes from httpd process, and add them to DOM if needed""" 289 290 # Find_dir iterates over configuration for Include and IncludeOptional 291 # directives to make sure we see the full include tree present in the 292 # configuration files 293 _ = self.find_dir("Include") 294 295 matches = apache_util.parse_includes(self.configurator.options.ctl) 296 if matches: 297 for i in matches: 298 if not self.parsed_in_current(i): 299 self.parse_file(i) 300 301 def update_modules(self): 302 """Get loaded modules from httpd process, and add them to DOM""" 303 304 matches = apache_util.parse_modules(self.configurator.options.ctl) 305 for mod in matches: 306 self.add_mod(mod.strip()) 307 308 def filter_args_num(self, matches, args): 309 """Filter out directives with specific number of arguments. 310 311 This function makes the assumption that all related arguments are given 312 in order. Thus /files/apache/directive[5]/arg[2] must come immediately 313 after /files/apache/directive[5]/arg[1]. Runs in 1 linear pass. 314 315 :param string matches: Matches of all directives with arg nodes 316 :param int args: Number of args you would like to filter 317 318 :returns: List of directives that contain # of arguments. 319 (arg is stripped off) 320 321 """ 322 filtered = [] 323 if args == 1: 324 for i, match in enumerate(matches): 325 if match.endswith("/arg"): 326 filtered.append(matches[i][:-4]) 327 else: 328 for i, match in enumerate(matches): 329 if match.endswith("/arg[%d]" % args): 330 # Make sure we don't cause an IndexError (end of list) 331 # Check to make sure arg + 1 doesn't exist 332 if (i == (len(matches) - 1) or 333 not matches[i + 1].endswith("/arg[%d]" % 334 (args + 1))): 335 filtered.append(matches[i][:-len("/arg[%d]" % args)]) 336 337 return filtered 338 339 def add_dir_to_ifmodssl(self, aug_conf_path, directive, args): 340 """Adds directive and value to IfMod ssl block. 341 342 Adds given directive and value along configuration path within 343 an IfMod mod_ssl.c block. If the IfMod block does not exist in 344 the file, it is created. 345 346 :param str aug_conf_path: Desired Augeas config path to add directive 347 :param str directive: Directive you would like to add, e.g. Listen 348 :param args: Values of the directive; str "443" or list of str 349 :type args: list 350 351 """ 352 # TODO: Add error checking code... does the path given even exist? 353 # Does it throw exceptions? 354 if_mod_path = self.get_ifmod(aug_conf_path, "mod_ssl.c") 355 # IfModule can have only one valid argument, so append after 356 self.aug.insert(if_mod_path + "arg", "directive", False) 357 nvh_path = if_mod_path + "directive[1]" 358 self.aug.set(nvh_path, directive) 359 if len(args) == 1: 360 self.aug.set(nvh_path + "/arg", args[0]) 361 else: 362 for i, arg in enumerate(args): 363 self.aug.set("%s/arg[%d]" % (nvh_path, i + 1), arg) 364 365 def get_ifmod(self, aug_conf_path, mod, beginning=False): 366 """Returns the path to <IfMod mod> and creates one if it doesn't exist. 367 368 :param str aug_conf_path: Augeas configuration path 369 :param str mod: module ie. mod_ssl.c 370 :param bool beginning: If the IfModule should be created to the beginning 371 of augeas path DOM tree. 372 373 :returns: Augeas path of the requested IfModule directive that pre-existed 374 or was created during the process. The path may be dynamic, 375 i.e. .../IfModule[last()] 376 :rtype: str 377 378 """ 379 if_mods = self.aug.match(("%s/IfModule/*[self::arg='%s']" % 380 (aug_conf_path, mod))) 381 if not if_mods: 382 return self.create_ifmod(aug_conf_path, mod, beginning) 383 384 # Strip off "arg" at end of first ifmod path 385 return if_mods[0].rpartition("arg")[0] 386 387 def create_ifmod(self, aug_conf_path, mod, beginning=False): 388 """Creates a new <IfMod mod> and returns its path. 389 390 :param str aug_conf_path: Augeas configuration path 391 :param str mod: module ie. mod_ssl.c 392 :param bool beginning: If the IfModule should be created to the beginning 393 of augeas path DOM tree. 394 395 :returns: Augeas path of the newly created IfModule directive. 396 The path may be dynamic, i.e. .../IfModule[last()] 397 :rtype: str 398 399 """ 400 if beginning: 401 c_path_arg = "{}/IfModule[1]/arg".format(aug_conf_path) 402 # Insert IfModule before the first directive 403 self.aug.insert("{}/directive[1]".format(aug_conf_path), 404 "IfModule", True) 405 retpath = "{}/IfModule[1]/".format(aug_conf_path) 406 else: 407 c_path = "{}/IfModule[last() + 1]".format(aug_conf_path) 408 c_path_arg = "{}/IfModule[last()]/arg".format(aug_conf_path) 409 self.aug.set(c_path, "") 410 retpath = "{}/IfModule[last()]/".format(aug_conf_path) 411 self.aug.set(c_path_arg, mod) 412 return retpath 413 414 def add_dir(self, aug_conf_path, directive, args): 415 """Appends directive to the end fo the file given by aug_conf_path. 416 417 .. note:: Not added to AugeasConfigurator because it may depend 418 on the lens 419 420 :param str aug_conf_path: Augeas configuration path to add directive 421 :param str directive: Directive to add 422 :param args: Value of the directive. ie. Listen 443, 443 is arg 423 :type args: list or str 424 425 """ 426 self.aug.set(aug_conf_path + "/directive[last() + 1]", directive) 427 if isinstance(args, list): 428 for i, value in enumerate(args, 1): 429 self.aug.set( 430 "%s/directive[last()]/arg[%d]" % (aug_conf_path, i), value) 431 else: 432 self.aug.set(aug_conf_path + "/directive[last()]/arg", args) 433 434 def add_dir_beginning(self, aug_conf_path, dirname, args): 435 """Adds the directive to the beginning of defined aug_conf_path. 436 437 :param str aug_conf_path: Augeas configuration path to add directive 438 :param str dirname: Directive to add 439 :param args: Value of the directive. ie. Listen 443, 443 is arg 440 :type args: list or str 441 """ 442 first_dir = aug_conf_path + "/directive[1]" 443 if self.aug.get(first_dir): 444 self.aug.insert(first_dir, "directive", True) 445 else: 446 self.aug.set(first_dir, "directive") 447 448 self.aug.set(first_dir, dirname) 449 if isinstance(args, list): 450 for i, value in enumerate(args, 1): 451 self.aug.set(first_dir + "/arg[%d]" % (i), value) 452 else: 453 self.aug.set(first_dir + "/arg", args) 454 455 def add_comment(self, aug_conf_path, comment): 456 """Adds the comment to the augeas path 457 458 :param str aug_conf_path: Augeas configuration path to add directive 459 :param str comment: Comment content 460 461 """ 462 self.aug.set(aug_conf_path + "/#comment[last() + 1]", comment) 463 464 def find_comments(self, arg, start=None): 465 """Finds a comment with specified content from the provided DOM path 466 467 :param str arg: Comment content to search 468 :param str start: Beginning Augeas path to begin looking 469 470 :returns: List of augeas paths containing the comment content 471 :rtype: list 472 473 """ 474 if not start: 475 start = get_aug_path(self.root) 476 477 comments = self.aug.match("%s//*[label() = '#comment']" % start) 478 479 results = [] 480 for comment in comments: 481 c_content = self.aug.get(comment) 482 if c_content and arg in c_content: 483 results.append(comment) 484 return results 485 486 def find_dir(self, directive, arg=None, start=None, exclude=True): 487 """Finds directive in the configuration. 488 489 Recursively searches through config files to find directives 490 Directives should be in the form of a case insensitive regex currently 491 492 .. todo:: arg should probably be a list 493 .. todo:: arg search currently only supports direct matching. It does 494 not handle the case of variables or quoted arguments. This should 495 be adapted to use a generic search for the directive and then do a 496 case-insensitive self.get_arg filter 497 498 Note: Augeas is inherently case sensitive while Apache is case 499 insensitive. Augeas 1.0 allows case insensitive regexes like 500 regexp(/Listen/, "i"), however the version currently supported 501 by Ubuntu 0.10 does not. Thus I have included my own case insensitive 502 transformation by calling case_i() on everything to maintain 503 compatibility. 504 505 :param str directive: Directive to look for 506 :param arg: Specific value directive must have, None if all should 507 be considered 508 :type arg: str or None 509 510 :param str start: Beginning Augeas path to begin looking 511 :param bool exclude: Whether or not to exclude directives based on 512 variables and enabled modules 513 514 """ 515 # Cannot place member variable in the definition of the function so... 516 if not start: 517 start = get_aug_path(self.loc["root"]) 518 519 # No regexp code 520 # if arg is None: 521 # matches = self.aug.match(start + 522 # "//*[self::directive='" + directive + "']/arg") 523 # else: 524 # matches = self.aug.match(start + 525 # "//*[self::directive='" + directive + 526 # "']/* [self::arg='" + arg + "']") 527 528 # includes = self.aug.match(start + 529 # "//* [self::directive='Include']/* [label()='arg']") 530 531 regex = "(%s)|(%s)|(%s)" % (case_i(directive), 532 case_i("Include"), 533 case_i("IncludeOptional")) 534 matches = self.aug.match( 535 "%s//*[self::directive=~regexp('%s')]" % (start, regex)) 536 537 if exclude: 538 matches = self.exclude_dirs(matches) 539 540 if arg is None: 541 arg_suffix = "/arg" 542 else: 543 arg_suffix = "/*[self::arg=~regexp('%s')]" % case_i(arg) 544 545 ordered_matches: List[str] = [] 546 547 # TODO: Wildcards should be included in alphabetical order 548 # https://httpd.apache.org/docs/2.4/mod/core.html#include 549 for match in matches: 550 dir_ = self.aug.get(match).lower() 551 if dir_ in ("include", "includeoptional"): 552 ordered_matches.extend(self.find_dir( 553 directive, arg, 554 self._get_include_path(self.get_arg(match + "/arg")), 555 exclude)) 556 # This additionally allows Include 557 if dir_ == directive.lower(): 558 ordered_matches.extend(self.aug.match(match + arg_suffix)) 559 560 return ordered_matches 561 562 def get_all_args(self, match): 563 """ 564 Tries to fetch all arguments for a directive. See get_arg. 565 566 Note that if match is an ancestor node, it returns all names of 567 child directives as well as the list of arguments. 568 569 """ 570 571 if match[-1] != "/": 572 match = match+"/" 573 allargs = self.aug.match(match + '*') 574 return [self.get_arg(arg) for arg in allargs] 575 576 def get_arg(self, match): 577 """Uses augeas.get to get argument value and interprets result. 578 579 This also converts all variables and parameters appropriately. 580 581 """ 582 value = self.aug.get(match) 583 584 # No need to strip quotes for variables, as apache2ctl already does 585 # this, but we do need to strip quotes for all normal arguments. 586 587 # Note: normal argument may be a quoted variable 588 # e.g. strip now, not later 589 if not value: 590 return None 591 value = value.strip("'\"") 592 593 variables = ApacheParser.arg_var_interpreter.findall(value) 594 595 for var in variables: 596 # Strip off ${ and } 597 try: 598 value = value.replace(var, self.variables[var[2:-1]]) 599 except KeyError: 600 raise errors.PluginError("Error Parsing variable: %s" % var) 601 602 return value 603 604 def get_root_augpath(self): 605 """ 606 Returns the Augeas path of root configuration. 607 """ 608 return get_aug_path(self.loc["root"]) 609 610 def exclude_dirs(self, matches): 611 """Exclude directives that are not loaded into the configuration.""" 612 filters = [("ifmodule", self.modules.keys()), ("ifdefine", self.variables)] 613 614 valid_matches = [] 615 616 for match in matches: 617 for filter_ in filters: 618 if not self._pass_filter(match, filter_): 619 break 620 else: 621 valid_matches.append(match) 622 return valid_matches 623 624 def _pass_filter(self, match, filter_): 625 """Determine if directive passes a filter. 626 627 :param str match: Augeas path 628 :param list filter: list of tuples of form 629 [("lowercase if directive", set of relevant parameters)] 630 631 """ 632 match_l = match.lower() 633 last_match_idx = match_l.find(filter_[0]) 634 635 while last_match_idx != -1: 636 # Check args 637 end_of_if = match_l.find("/", last_match_idx) 638 # This should be aug.get (vars are not used e.g. parser.aug_get) 639 expression = self.aug.get(match[:end_of_if] + "/arg") 640 641 if expression.startswith("!"): 642 # Strip off "!" 643 if expression[1:] in filter_[1]: 644 return False 645 else: 646 if expression not in filter_[1]: 647 return False 648 649 last_match_idx = match_l.find(filter_[0], end_of_if) 650 651 return True 652 653 def standard_path_from_server_root(self, arg): 654 """Ensure paths are consistent and absolute 655 656 :param str arg: Argument of directive 657 658 :returns: Standardized argument path 659 :rtype: str 660 """ 661 # Remove beginning and ending quotes 662 arg = arg.strip("'\"") 663 664 # Standardize the include argument based on server root 665 if not arg.startswith("/"): 666 # Normpath will condense ../ 667 arg = os.path.normpath(os.path.join(self.root, arg)) 668 else: 669 arg = os.path.normpath(arg) 670 return arg 671 672 def _get_include_path(self, arg): 673 """Converts an Apache Include directive into Augeas path. 674 675 Converts an Apache Include directive argument into an Augeas 676 searchable path 677 678 .. todo:: convert to use os.path.join() 679 680 :param str arg: Argument of Include directive 681 682 :returns: Augeas path string 683 :rtype: str 684 685 """ 686 # Check to make sure only expected characters are used <- maybe remove 687 # validChars = re.compile("[a-zA-Z0-9.*?_-/]*") 688 # matchObj = validChars.match(arg) 689 # if matchObj.group() != arg: 690 # logger.error("Error: Invalid regexp characters in %s", arg) 691 # return [] 692 arg = self.standard_path_from_server_root(arg) 693 694 # Attempts to add a transform to the file if one does not already exist 695 if os.path.isdir(arg): 696 self.parse_file(os.path.join(arg, "*")) 697 else: 698 self.parse_file(arg) 699 700 # Argument represents an fnmatch regular expression, convert it 701 # Split up the path and convert each into an Augeas accepted regex 702 # then reassemble 703 split_arg = arg.split("/") 704 for idx, split in enumerate(split_arg): 705 if any(char in ApacheParser.fnmatch_chars for char in split): 706 # Turn it into an augeas regex 707 # TODO: Can this instead be an augeas glob instead of regex 708 split_arg[idx] = ("* [label()=~regexp('%s')]" % 709 self.fnmatch_to_re(split)) 710 # Reassemble the argument 711 # Note: This also normalizes the argument /serverroot/ -> /serverroot 712 arg = "/".join(split_arg) 713 714 return get_aug_path(arg) 715 716 def fnmatch_to_re(self, clean_fn_match): 717 """Method converts Apache's basic fnmatch to regular expression. 718 719 Assumption - Configs are assumed to be well-formed and only writable by 720 privileged users. 721 722 https://apr.apache.org/docs/apr/2.0/apr__fnmatch_8h_source.html 723 724 :param str clean_fn_match: Apache style filename match, like globs 725 726 :returns: regex suitable for augeas 727 :rtype: str 728 729 """ 730 # Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z 731 return fnmatch.translate(clean_fn_match)[4:-3] # pragma: no cover 732 733 def parse_file(self, filepath): 734 """Parse file with Augeas 735 736 Checks to see if file_path is parsed by Augeas 737 If filepath isn't parsed, the file is added and Augeas is reloaded 738 739 :param str filepath: Apache config file path 740 741 """ 742 use_new, remove_old = self._check_path_actions(filepath) 743 # Ensure that we have the latest Augeas DOM state on disk before 744 # calling aug.load() which reloads the state from disk 745 self.ensure_augeas_state() 746 # Test if augeas included file for Httpd.lens 747 # Note: This works for augeas globs, ie. *.conf 748 if use_new: 749 inc_test = self.aug.match( 750 "/augeas/load/Httpd['%s' =~ glob(incl)]" % filepath) 751 if not inc_test: 752 # Load up files 753 # This doesn't seem to work on TravisCI 754 # self.aug.add_transform("Httpd.lns", [filepath]) 755 if remove_old: 756 self._remove_httpd_transform(filepath) 757 self._add_httpd_transform(filepath) 758 self.aug.load() 759 760 def parsed_in_current(self, filep): 761 """Checks if the file path is parsed by current Augeas parser config 762 ie. returns True if the file is found on a path that's found in live 763 Augeas configuration. 764 765 :param str filep: Path to match 766 767 :returns: True if file is parsed in existing configuration tree 768 :rtype: bool 769 """ 770 return self._parsed_by_parser_paths(filep, self.parser_paths) 771 772 def parsed_in_original(self, filep): 773 """Checks if the file path is parsed by existing Apache config. 774 ie. returns True if the file is found on a path that matches Include or 775 IncludeOptional statement in the Apache configuration. 776 777 :param str filep: Path to match 778 779 :returns: True if file is parsed in existing configuration tree 780 :rtype: bool 781 """ 782 return self._parsed_by_parser_paths(filep, self.existing_paths) 783 784 def _parsed_by_parser_paths(self, filep, paths): 785 """Helper function that searches through provided paths and returns 786 True if file path is found in the set""" 787 for directory in paths: 788 for filename in paths[directory]: 789 if fnmatch.fnmatch(filep, os.path.join(directory, filename)): 790 return True 791 return False 792 793 def _check_path_actions(self, filepath): 794 """Determine actions to take with a new augeas path 795 796 This helper function will return a tuple that defines 797 if we should try to append the new filepath to augeas 798 parser paths, and / or remove the old one with more 799 narrow matching. 800 801 :param str filepath: filepath to check the actions for 802 803 """ 804 805 try: 806 new_file_match = os.path.basename(filepath) 807 existing_matches = self.parser_paths[os.path.dirname(filepath)] 808 if "*" in existing_matches: 809 use_new = False 810 else: 811 use_new = True 812 remove_old = new_file_match == "*" 813 except KeyError: 814 use_new = True 815 remove_old = False 816 return use_new, remove_old 817 818 def _remove_httpd_transform(self, filepath): 819 """Remove path from Augeas transform 820 821 :param str filepath: filepath to remove 822 """ 823 824 remove_basenames = self.parser_paths[os.path.dirname(filepath)] 825 remove_dirname = os.path.dirname(filepath) 826 for name in remove_basenames: 827 remove_path = remove_dirname + "/" + name 828 remove_inc = self.aug.match( 829 "/augeas/load/Httpd/incl [. ='%s']" % remove_path) 830 self.aug.remove(remove_inc[0]) 831 self.parser_paths.pop(remove_dirname) 832 833 def _add_httpd_transform(self, incl): 834 """Add a transform to Augeas. 835 836 This function will correctly add a transform to augeas 837 The existing augeas.add_transform in python doesn't seem to work for 838 Travis CI as it loads in libaugeas.so.0.10.0 839 840 :param str incl: filepath to include for transform 841 842 """ 843 last_include = self.aug.match("/augeas/load/Httpd/incl [last()]") 844 if last_include: 845 # Insert a new node immediately after the last incl 846 self.aug.insert(last_include[0], "incl", False) 847 self.aug.set("/augeas/load/Httpd/incl[last()]", incl) 848 # On first use... must load lens and add file to incl 849 else: 850 # Augeas uses base 1 indexing... insert at beginning... 851 self.aug.set("/augeas/load/Httpd/lens", "Httpd.lns") 852 self.aug.set("/augeas/load/Httpd/incl", incl) 853 # Add included path to paths dictionary 854 try: 855 self.parser_paths[os.path.dirname(incl)].append( 856 os.path.basename(incl)) 857 except KeyError: 858 self.parser_paths[os.path.dirname(incl)] = [ 859 os.path.basename(incl)] 860 861 def standardize_excl(self): 862 """Standardize the excl arguments for the Httpd lens in Augeas. 863 864 Note: Hack! 865 Standardize the excl arguments for the Httpd lens in Augeas 866 Servers sometimes give incorrect defaults 867 Note: This problem should be fixed in Augeas 1.0. Unfortunately, 868 Augeas 0.10 appears to be the most popular version currently. 869 870 """ 871 # attempt to protect against augeas error in 0.10.0 - ubuntu 872 # *.augsave -> /*.augsave upon augeas.load() 873 # Try to avoid bad httpd files 874 # There has to be a better way... but after a day and a half of testing 875 # I had no luck 876 # This is a hack... work around... submit to augeas if still not fixed 877 878 excl = ["*.augnew", "*.augsave", "*.dpkg-dist", "*.dpkg-bak", 879 "*.dpkg-new", "*.dpkg-old", "*.rpmsave", "*.rpmnew", 880 "*~", 881 self.root + "/*.augsave", 882 self.root + "/*~", 883 self.root + "/*/*augsave", 884 self.root + "/*/*~", 885 self.root + "/*/*/*.augsave", 886 self.root + "/*/*/*~"] 887 888 for i, excluded in enumerate(excl, 1): 889 self.aug.set("/augeas/load/Httpd/excl[%d]" % i, excluded) 890 891 self.aug.load() 892 893 def _set_locations(self): 894 """Set default location for directives. 895 896 Locations are given as file_paths 897 .. todo:: Make sure that files are included 898 899 """ 900 default = self.loc["root"] 901 902 temp = os.path.join(self.root, "ports.conf") 903 if os.path.isfile(temp): 904 listen = temp 905 name = temp 906 else: 907 listen = default 908 name = default 909 910 return {"default": default, "listen": listen, "name": name} 911 912 def _find_config_root(self): 913 """Find the Apache Configuration Root file.""" 914 location = ["apache2.conf", "httpd.conf", "conf/httpd.conf"] 915 for name in location: 916 if os.path.isfile(os.path.join(self.root, name)): 917 return os.path.join(self.root, name) 918 raise errors.NoInstallationError("Could not find configuration root") 919 920 921def case_i(string): 922 """Returns case insensitive regex. 923 924 Returns a sloppy, but necessary version of a case insensitive regex. 925 Any string should be able to be submitted and the string is 926 escaped and then made case insensitive. 927 May be replaced by a more proper /i once augeas 1.0 is widely 928 supported. 929 930 :param str string: string to make case i regex 931 932 """ 933 return "".join("[" + c.upper() + c.lower() + "]" 934 if c.isalpha() else c for c in re.escape(string)) 935 936 937def get_aug_path(file_path): 938 """Return augeas path for full filepath. 939 940 :param str file_path: Full filepath 941 942 """ 943 return "/files%s" % file_path 944 945 946def init_augeas() -> Augeas: 947 """ Initialize the actual Augeas instance """ 948 949 if not Augeas: # pragma: no cover 950 raise errors.NoInstallationError("Problem in Augeas installation") 951 952 return Augeas( 953 # specify a directory to load our preferred lens from 954 loadpath=constants.AUGEAS_LENS_DIR, 955 # Do not save backup (we do it ourselves), do not load 956 # anything by default 957 flags=(Augeas.NONE | 958 Augeas.NO_MODL_AUTOLOAD | 959 Augeas.ENABLE_SPAN)) 960