1""" 2File server pluggable modules and generic backend functions 3""" 4 5 6import errno 7import fnmatch 8import logging 9import os 10import re 11import time 12from collections.abc import Sequence 13 14import salt.loader 15import salt.utils.data 16import salt.utils.files 17import salt.utils.path 18import salt.utils.url 19import salt.utils.versions 20from salt.utils.args import get_function_argspec as _argspec 21from salt.utils.decorators import ensure_unicode_args 22 23log = logging.getLogger(__name__) 24 25 26def _unlock_cache(w_lock): 27 """ 28 Unlock a FS file/dir based lock 29 """ 30 if not os.path.exists(w_lock): 31 return 32 try: 33 if os.path.isdir(w_lock): 34 os.rmdir(w_lock) 35 elif os.path.isfile(w_lock): 36 os.unlink(w_lock) 37 except OSError as exc: 38 log.trace("Error removing lockfile %s: %s", w_lock, exc) 39 40 41def _lock_cache(w_lock): 42 try: 43 os.mkdir(w_lock) 44 except OSError as exc: 45 if exc.errno != errno.EEXIST: 46 raise 47 return False 48 else: 49 log.trace("Lockfile %s created", w_lock) 50 return True 51 52 53def wait_lock(lk_fn, dest, wait_timeout=0): 54 """ 55 If the write lock is there, check to see if the file is actually being 56 written. If there is no change in the file size after a short sleep, 57 remove the lock and move forward. 58 """ 59 if not os.path.exists(lk_fn): 60 return False 61 if not os.path.exists(dest): 62 # The dest is not here, sleep for a bit, if the dest is not here yet 63 # kill the lockfile and start the write 64 time.sleep(1) 65 if not os.path.isfile(dest): 66 _unlock_cache(lk_fn) 67 return False 68 timeout = None 69 if wait_timeout: 70 timeout = time.time() + wait_timeout 71 # There is a lock file, the dest is there, stat the dest, sleep and check 72 # that the dest is being written, if it is not being written kill the lock 73 # file and continue. Also check if the lock file is gone. 74 s_count = 0 75 s_size = os.stat(dest).st_size 76 while True: 77 time.sleep(1) 78 if not os.path.exists(lk_fn): 79 return False 80 size = os.stat(dest).st_size 81 if size == s_size: 82 s_count += 1 83 if s_count >= 3: 84 # The file is not being written to, kill the lock and proceed 85 _unlock_cache(lk_fn) 86 return False 87 else: 88 s_size = size 89 if timeout: 90 if time.time() > timeout: 91 raise ValueError( 92 "Timeout({}s) for {} (lock: {}) elapsed".format( 93 wait_timeout, dest, lk_fn 94 ) 95 ) 96 return False 97 98 99def check_file_list_cache(opts, form, list_cache, w_lock): 100 """ 101 Checks the cache file to see if there is a new enough file list cache, and 102 returns the match (if found, along with booleans used by the fileserver 103 backend to determine if the cache needs to be refreshed/written). 104 """ 105 refresh_cache = False 106 save_cache = True 107 wait_lock(w_lock, list_cache, 5 * 60) 108 if not os.path.isfile(list_cache) and _lock_cache(w_lock): 109 refresh_cache = True 110 else: 111 attempt = 0 112 while attempt < 11: 113 try: 114 if os.path.exists(w_lock): 115 # wait for a filelist lock for max 15min 116 wait_lock(w_lock, list_cache, 15 * 60) 117 if os.path.exists(list_cache): 118 # calculate filelist age is possible 119 cache_stat = os.stat(list_cache) 120 # st_time can have a greater precision than time, removing 121 # float precision makes sure age will never be a negative 122 # number. 123 current_time = int(time.time()) 124 file_mtime = int(cache_stat.st_mtime) 125 if file_mtime > current_time: 126 log.debug( 127 "Cache file modified time is in the future, ignoring. " 128 "file=%s mtime=%s current_time=%s", 129 list_cache, 130 current_time, 131 file_mtime, 132 ) 133 age = -1 134 else: 135 age = current_time - file_mtime 136 else: 137 # if filelist does not exists yet, mark it as expired 138 age = opts.get("fileserver_list_cache_time", 20) + 1 139 if age < 0: 140 # Cache is from the future! Warn and mark cache invalid. 141 log.warning("The file list_cache was created in the future!") 142 if 0 <= age < opts.get("fileserver_list_cache_time", 20): 143 # Young enough! Load this sucker up! 144 with salt.utils.files.fopen(list_cache, "rb") as fp_: 145 log.debug( 146 "Returning file list from cache: age=%s cache_time=%s %s", 147 age, 148 opts.get("fileserver_list_cache_time", 20), 149 list_cache, 150 ) 151 return ( 152 salt.utils.data.decode( 153 salt.payload.load(fp_).get(form, []) 154 ), 155 False, 156 False, 157 ) 158 elif _lock_cache(w_lock): 159 # Set the w_lock and go 160 refresh_cache = True 161 break 162 except Exception: # pylint: disable=broad-except 163 time.sleep(0.2) 164 attempt += 1 165 continue 166 if attempt > 10: 167 save_cache = False 168 refresh_cache = True 169 return None, refresh_cache, save_cache 170 171 172def write_file_list_cache(opts, data, list_cache, w_lock): 173 """ 174 Checks the cache file to see if there is a new enough file list cache, and 175 returns the match (if found, along with booleans used by the fileserver 176 backend to determine if the cache needs to be refreshed/written). 177 """ 178 with salt.utils.files.fopen(list_cache, "w+b") as fp_: 179 fp_.write(salt.payload.dumps(data)) 180 _unlock_cache(w_lock) 181 log.trace("Lockfile %s removed", w_lock) 182 183 184def check_env_cache(opts, env_cache): 185 """ 186 Returns cached env names, if present. Otherwise returns None. 187 """ 188 if not os.path.isfile(env_cache): 189 return None 190 try: 191 with salt.utils.files.fopen(env_cache, "rb") as fp_: 192 log.trace("Returning env cache data from %s", env_cache) 193 return salt.utils.data.decode(salt.payload.load(fp_)) 194 except OSError: 195 pass 196 return None 197 198 199def generate_mtime_map(opts, path_map): 200 """ 201 Generate a dict of filename -> mtime 202 """ 203 file_map = {} 204 for saltenv, path_list in path_map.items(): 205 for path in path_list: 206 for directory, _, filenames in salt.utils.path.os_walk(path): 207 for item in filenames: 208 try: 209 file_path = os.path.join(directory, item) 210 # Don't walk any directories that match 211 # file_ignore_regex or glob 212 if is_file_ignored(opts, file_path): 213 continue 214 file_map[file_path] = os.path.getmtime(file_path) 215 except OSError: 216 # skip dangling symlinks 217 log.info( 218 "Failed to get mtime on %s, dangling symlink?", file_path 219 ) 220 continue 221 return file_map 222 223 224def diff_mtime_map(map1, map2): 225 """ 226 Is there a change to the mtime map? return a boolean 227 """ 228 # check if the mtimes are the same 229 if sorted(map1) != sorted(map2): 230 return True 231 232 # map1 and map2 are guaranteed to have same keys, 233 # so compare mtimes 234 for filename, mtime in map1.items(): 235 if map2[filename] != mtime: 236 return True 237 238 # we made it, that means we have no changes 239 return False 240 241 242def reap_fileserver_cache_dir(cache_base, find_func): 243 """ 244 Remove unused cache items assuming the cache directory follows a directory 245 convention: 246 247 cache_base -> saltenv -> relpath 248 """ 249 for saltenv in os.listdir(cache_base): 250 env_base = os.path.join(cache_base, saltenv) 251 for root, dirs, files in salt.utils.path.os_walk(env_base): 252 # if we have an empty directory, lets cleanup 253 # This will only remove the directory on the second time 254 # "_reap_cache" is called (which is intentional) 255 if not dirs and not files: 256 # only remove if empty directory is older than 60s 257 if time.time() - os.path.getctime(root) > 60: 258 os.rmdir(root) 259 continue 260 # if not, lets check the files in the directory 261 for file_ in files: 262 file_path = os.path.join(root, file_) 263 file_rel_path = os.path.relpath(file_path, env_base) 264 try: 265 filename, _, hash_type = file_rel_path.rsplit(".", 2) 266 except ValueError: 267 log.warning( 268 "Found invalid hash file [%s] when attempting to reap " 269 "cache directory", 270 file_, 271 ) 272 continue 273 # do we have the file? 274 ret = find_func(filename, saltenv=saltenv) 275 # if we don't actually have the file, lets clean up the cache 276 # object 277 if ret["path"] == "": 278 os.unlink(file_path) 279 280 281def is_file_ignored(opts, fname): 282 """ 283 If file_ignore_regex or file_ignore_glob were given in config, 284 compare the given file path against all of them and return True 285 on the first match. 286 """ 287 if opts["file_ignore_regex"]: 288 for regex in opts["file_ignore_regex"]: 289 if re.search(regex, fname): 290 log.debug("File matching file_ignore_regex. Skipping: %s", fname) 291 return True 292 293 if opts["file_ignore_glob"]: 294 for glob in opts["file_ignore_glob"]: 295 if fnmatch.fnmatch(fname, glob): 296 log.debug("File matching file_ignore_glob. Skipping: %s", fname) 297 return True 298 return False 299 300 301def clear_lock(clear_func, role, remote=None, lock_type="update"): 302 """ 303 Function to allow non-fileserver functions to clear update locks 304 305 clear_func 306 A function reference. This function will be run (with the ``remote`` 307 param as an argument) to clear the lock, and must return a 2-tuple of 308 lists, one containing messages describing successfully cleared locks, 309 and one containing messages describing errors encountered. 310 311 role 312 What type of lock is being cleared (gitfs, git_pillar, etc.). Used 313 solely for logging purposes. 314 315 remote 316 Optional string which should be used in ``func`` to pattern match so 317 that a subset of remotes can be targeted. 318 319 lock_type : update 320 Which type of lock to clear 321 322 Returns the return data from ``clear_func``. 323 """ 324 msg = "Clearing {} lock for {} remotes".format(lock_type, role) 325 if remote: 326 msg += " matching {}".format(remote) 327 log.debug(msg) 328 return clear_func(remote=remote, lock_type=lock_type) 329 330 331class Fileserver: 332 """ 333 Create a fileserver wrapper object that wraps the fileserver functions and 334 iterates over them to execute the desired function within the scope of the 335 desired fileserver backend. 336 """ 337 338 def __init__(self, opts): 339 self.opts = opts 340 self.servers = salt.loader.fileserver(opts, opts["fileserver_backend"]) 341 342 def backends(self, back=None): 343 """ 344 Return the backend list 345 """ 346 if not back: 347 back = self.opts["fileserver_backend"] 348 else: 349 if not isinstance(back, list): 350 try: 351 back = back.split(",") 352 except AttributeError: 353 back = str(back).split(",") 354 355 if isinstance(back, Sequence): 356 # The test suite uses an ImmutableList type (based on 357 # collections.abc.Sequence) for lists, which breaks this function in 358 # the test suite. This normalizes the value from the opts into a 359 # list if it is based on collections.abc.Sequence. 360 back = list(back) 361 362 ret = [] 363 if not isinstance(back, list): 364 return ret 365 366 # Avoid error logging when performing lookups in the LazyDict by 367 # instead doing the membership check on the result of a call to its 368 # .keys() attribute rather than on the LazyDict itself. 369 server_funcs = self.servers.keys() 370 try: 371 subtract_only = all(x.startswith("-") for x in back) 372 except AttributeError: 373 pass 374 else: 375 if subtract_only: 376 # Only subtracting backends from enabled ones 377 ret = self.opts["fileserver_backend"] 378 for sub in back: 379 if "{}.envs".format(sub[1:]) in server_funcs: 380 ret.remove(sub[1:]) 381 return ret 382 383 for sub in back: 384 if "{}.envs".format(sub) in server_funcs: 385 ret.append(sub) 386 return ret 387 388 def master_opts(self, load): 389 """ 390 Simplify master opts 391 """ 392 return self.opts 393 394 def update_opts(self): 395 # This fix func monkey patching by pillar 396 for name, func in self.servers.items(): 397 try: 398 if "__opts__" in func.__globals__: 399 func.__globals__["__opts__"].update(self.opts) 400 except AttributeError: 401 pass 402 403 def clear_cache(self, back=None): 404 """ 405 Clear the cache of all of the fileserver backends that support the 406 clear_cache function or the named backend(s) only. 407 """ 408 back = self.backends(back) 409 cleared = [] 410 errors = [] 411 for fsb in back: 412 fstr = "{}.clear_cache".format(fsb) 413 if fstr in self.servers: 414 log.debug("Clearing %s fileserver cache", fsb) 415 failed = self.servers[fstr]() 416 if failed: 417 errors.extend(failed) 418 else: 419 cleared.append( 420 "The {} fileserver cache was successfully cleared".format(fsb) 421 ) 422 return cleared, errors 423 424 def lock(self, back=None, remote=None): 425 """ 426 ``remote`` can either be a dictionary containing repo configuration 427 information, or a pattern. If the latter, then remotes for which the URL 428 matches the pattern will be locked. 429 """ 430 back = self.backends(back) 431 locked = [] 432 errors = [] 433 for fsb in back: 434 fstr = "{}.lock".format(fsb) 435 if fstr in self.servers: 436 msg = "Setting update lock for {} remotes".format(fsb) 437 if remote: 438 if not isinstance(remote, str): 439 errors.append( 440 "Badly formatted remote pattern '{}'".format(remote) 441 ) 442 continue 443 else: 444 msg += " matching {}".format(remote) 445 log.debug(msg) 446 good, bad = self.servers[fstr](remote=remote) 447 locked.extend(good) 448 errors.extend(bad) 449 return locked, errors 450 451 def clear_lock(self, back=None, remote=None): 452 """ 453 Clear the update lock for the enabled fileserver backends 454 455 back 456 Only clear the update lock for the specified backend(s). The 457 default is to clear the lock for all enabled backends 458 459 remote 460 If specified, then any remotes which contain the passed string will 461 have their lock cleared. 462 """ 463 back = self.backends(back) 464 cleared = [] 465 errors = [] 466 for fsb in back: 467 fstr = "{}.clear_lock".format(fsb) 468 if fstr in self.servers: 469 good, bad = clear_lock(self.servers[fstr], fsb, remote=remote) 470 cleared.extend(good) 471 errors.extend(bad) 472 return cleared, errors 473 474 def update(self, back=None, **kwargs): 475 """ 476 Update all of the enabled fileserver backends which support the update 477 function 478 """ 479 back = self.backends(back) 480 for fsb in back: 481 fstr = "{}.update".format(fsb) 482 if fstr in self.servers: 483 log.debug("Updating %s fileserver cache", fsb) 484 self.servers[fstr](**kwargs) 485 486 def update_intervals(self, back=None): 487 """ 488 Return the update intervals for all of the enabled fileserver backends 489 which support variable update intervals. 490 """ 491 back = self.backends(back) 492 ret = {} 493 for fsb in back: 494 fstr = "{}.update_intervals".format(fsb) 495 if fstr in self.servers: 496 ret[fsb] = self.servers[fstr]() 497 return ret 498 499 def envs(self, back=None, sources=False): 500 """ 501 Return the environments for the named backend or all backends 502 """ 503 back = self.backends(back) 504 ret = set() 505 if sources: 506 ret = {} 507 for fsb in back: 508 fstr = "{}.envs".format(fsb) 509 kwargs = ( 510 {"ignore_cache": True} 511 if "ignore_cache" in _argspec(self.servers[fstr]).args 512 and self.opts["__role"] == "minion" 513 else {} 514 ) 515 if sources: 516 ret[fsb] = self.servers[fstr](**kwargs) 517 else: 518 ret.update(self.servers[fstr](**kwargs)) 519 if sources: 520 return ret 521 return list(ret) 522 523 def file_envs(self, load=None): 524 """ 525 Return environments for all backends for requests from fileclient 526 """ 527 if load is None: 528 load = {} 529 load.pop("cmd", None) 530 return self.envs(**load) 531 532 def init(self, back=None): 533 """ 534 Initialize the backend, only do so if the fs supports an init function 535 """ 536 back = self.backends(back) 537 for fsb in back: 538 fstr = "{}.init".format(fsb) 539 if fstr in self.servers: 540 self.servers[fstr]() 541 542 def _find_file(self, load): 543 """ 544 Convenience function for calls made using the RemoteClient 545 """ 546 path = load.get("path") 547 if not path: 548 return {"path": "", "rel": ""} 549 tgt_env = load.get("saltenv", "base") 550 return self.find_file(path, tgt_env) 551 552 def file_find(self, load): 553 """ 554 Convenience function for calls made using the LocalClient 555 """ 556 path = load.get("path") 557 if not path: 558 return {"path": "", "rel": ""} 559 tgt_env = load.get("saltenv", "base") 560 return self.find_file(path, tgt_env) 561 562 def find_file(self, path, saltenv, back=None): 563 """ 564 Find the path and return the fnd structure, this structure is passed 565 to other backend interfaces. 566 """ 567 path = salt.utils.stringutils.to_unicode(path) 568 saltenv = salt.utils.stringutils.to_unicode(saltenv) 569 back = self.backends(back) 570 kwargs = {} 571 fnd = {"path": "", "rel": ""} 572 if os.path.isabs(path): 573 return fnd 574 if "../" in path: 575 return fnd 576 if salt.utils.url.is_escaped(path): 577 # don't attempt to find URL query arguments in the path 578 path = salt.utils.url.unescape(path) 579 else: 580 if "?" in path: 581 hcomps = path.split("?") 582 path = hcomps[0] 583 comps = hcomps[1].split("&") 584 for comp in comps: 585 if "=" not in comp: 586 # Invalid option, skip it 587 continue 588 args = comp.split("=", 1) 589 kwargs[args[0]] = args[1] 590 591 if "env" in kwargs: 592 # "env" is not supported; Use "saltenv". 593 kwargs.pop("env") 594 if "saltenv" in kwargs: 595 saltenv = kwargs.pop("saltenv") 596 597 if not isinstance(saltenv, str): 598 saltenv = str(saltenv) 599 600 for fsb in back: 601 fstr = "{}.find_file".format(fsb) 602 if fstr in self.servers: 603 fnd = self.servers[fstr](path, saltenv, **kwargs) 604 if fnd.get("path"): 605 fnd["back"] = fsb 606 return fnd 607 return fnd 608 609 def serve_file(self, load): 610 """ 611 Serve up a chunk of a file 612 """ 613 ret = {"data": "", "dest": ""} 614 615 if "env" in load: 616 # "env" is not supported; Use "saltenv". 617 load.pop("env") 618 619 if "path" not in load or "loc" not in load or "saltenv" not in load: 620 return ret 621 if not isinstance(load["saltenv"], str): 622 load["saltenv"] = str(load["saltenv"]) 623 624 fnd = self.find_file(load["path"], load["saltenv"]) 625 if not fnd.get("back"): 626 return ret 627 fstr = "{}.serve_file".format(fnd["back"]) 628 if fstr in self.servers: 629 return self.servers[fstr](load, fnd) 630 return ret 631 632 def __file_hash_and_stat(self, load): 633 """ 634 Common code for hashing and stating files 635 """ 636 if "env" in load: 637 # "env" is not supported; Use "saltenv". 638 load.pop("env") 639 640 if "path" not in load or "saltenv" not in load: 641 return "", None 642 if not isinstance(load["saltenv"], str): 643 load["saltenv"] = str(load["saltenv"]) 644 645 fnd = self.find_file( 646 salt.utils.stringutils.to_unicode(load["path"]), load["saltenv"] 647 ) 648 if not fnd.get("back"): 649 return "", None 650 stat_result = fnd.get("stat", None) 651 fstr = "{}.file_hash".format(fnd["back"]) 652 if fstr in self.servers: 653 return self.servers[fstr](load, fnd), stat_result 654 return "", None 655 656 def file_hash(self, load): 657 """ 658 Return the hash of a given file 659 """ 660 try: 661 return self.__file_hash_and_stat(load)[0] 662 except (IndexError, TypeError): 663 return "" 664 665 def file_hash_and_stat(self, load): 666 """ 667 Return the hash and stat result of a given file 668 """ 669 try: 670 return self.__file_hash_and_stat(load) 671 except (IndexError, TypeError): 672 return "", None 673 674 def clear_file_list_cache(self, load): 675 """ 676 Deletes the file_lists cache files 677 """ 678 if "env" in load: 679 # "env" is not supported; Use "saltenv". 680 load.pop("env") 681 682 saltenv = load.get("saltenv", []) 683 if saltenv is not None: 684 if not isinstance(saltenv, list): 685 try: 686 saltenv = [x.strip() for x in saltenv.split(",")] 687 except AttributeError: 688 saltenv = [x.strip() for x in str(saltenv).split(",")] 689 690 for idx, val in enumerate(saltenv): 691 if not isinstance(val, str): 692 saltenv[idx] = str(val) 693 694 ret = {} 695 fsb = self.backends(load.pop("fsbackend", None)) 696 list_cachedir = os.path.join(self.opts["cachedir"], "file_lists") 697 try: 698 file_list_backends = os.listdir(list_cachedir) 699 except OSError as exc: 700 if exc.errno == errno.ENOENT: 701 log.debug("No file list caches found") 702 return {} 703 else: 704 log.error( 705 "Failed to get list of saltenvs for which the master has " 706 "cached file lists: %s", 707 exc, 708 ) 709 710 for back in file_list_backends: 711 try: 712 cache_files = os.listdir(os.path.join(list_cachedir, back)) 713 except OSError as exc: 714 log.error( 715 "Failed to find file list caches for saltenv '%s': %s", back, exc 716 ) 717 continue 718 for cache_file in cache_files: 719 try: 720 cache_saltenv, extension = cache_file.rsplit(".", 1) 721 except ValueError: 722 # Filename has no dot in it. Not a cache file, ignore. 723 continue 724 if extension != "p": 725 # Filename does not end in ".p". Not a cache file, ignore. 726 continue 727 elif back not in fsb or ( 728 saltenv is not None and cache_saltenv not in saltenv 729 ): 730 log.debug( 731 "Skipping %s file list cache for saltenv '%s'", 732 back, 733 cache_saltenv, 734 ) 735 continue 736 try: 737 os.remove(os.path.join(list_cachedir, back, cache_file)) 738 except OSError as exc: 739 if exc.errno != errno.ENOENT: 740 log.error("Failed to remove %s: %s", exc.filename, exc.strerror) 741 else: 742 ret.setdefault(back, []).append(cache_saltenv) 743 log.debug( 744 "Removed %s file list cache for saltenv '%s'", 745 cache_saltenv, 746 back, 747 ) 748 749 # Ensure reproducible ordering of returns 750 for key in ret: 751 ret[key].sort() 752 753 return ret 754 755 @ensure_unicode_args 756 def file_list(self, load): 757 """ 758 Return a list of files from the dominant environment 759 """ 760 if "env" in load: 761 # "env" is not supported; Use "saltenv". 762 load.pop("env") 763 764 ret = set() 765 if "saltenv" not in load: 766 return [] 767 if not isinstance(load["saltenv"], str): 768 load["saltenv"] = str(load["saltenv"]) 769 770 for fsb in self.backends(load.pop("fsbackend", None)): 771 fstr = "{}.file_list".format(fsb) 772 if fstr in self.servers: 773 ret.update(self.servers[fstr](load)) 774 # some *fs do not handle prefix. Ensure it is filtered 775 prefix = load.get("prefix", "").strip("/") 776 if prefix != "": 777 ret = [f for f in ret if f.startswith(prefix)] 778 return sorted(ret) 779 780 @ensure_unicode_args 781 def file_list_emptydirs(self, load): 782 """ 783 List all emptydirs in the given environment 784 """ 785 if "env" in load: 786 # "env" is not supported; Use "saltenv". 787 load.pop("env") 788 789 ret = set() 790 if "saltenv" not in load: 791 return [] 792 if not isinstance(load["saltenv"], str): 793 load["saltenv"] = str(load["saltenv"]) 794 795 for fsb in self.backends(None): 796 fstr = "{}.file_list_emptydirs".format(fsb) 797 if fstr in self.servers: 798 ret.update(self.servers[fstr](load)) 799 # some *fs do not handle prefix. Ensure it is filtered 800 prefix = load.get("prefix", "").strip("/") 801 if prefix != "": 802 ret = [f for f in ret if f.startswith(prefix)] 803 return sorted(ret) 804 805 @ensure_unicode_args 806 def dir_list(self, load): 807 """ 808 List all directories in the given environment 809 """ 810 if "env" in load: 811 # "env" is not supported; Use "saltenv". 812 load.pop("env") 813 814 ret = set() 815 if "saltenv" not in load: 816 return [] 817 if not isinstance(load["saltenv"], str): 818 load["saltenv"] = str(load["saltenv"]) 819 820 for fsb in self.backends(load.pop("fsbackend", None)): 821 fstr = "{}.dir_list".format(fsb) 822 if fstr in self.servers: 823 ret.update(self.servers[fstr](load)) 824 # some *fs do not handle prefix. Ensure it is filtered 825 prefix = load.get("prefix", "").strip("/") 826 if prefix != "": 827 ret = [f for f in ret if f.startswith(prefix)] 828 return sorted(ret) 829 830 @ensure_unicode_args 831 def symlink_list(self, load): 832 """ 833 Return a list of symlinked files and dirs 834 """ 835 if "env" in load: 836 # "env" is not supported; Use "saltenv". 837 load.pop("env") 838 839 ret = {} 840 if "saltenv" not in load: 841 return {} 842 if not isinstance(load["saltenv"], str): 843 load["saltenv"] = str(load["saltenv"]) 844 845 for fsb in self.backends(load.pop("fsbackend", None)): 846 symlstr = "{}.symlink_list".format(fsb) 847 if symlstr in self.servers: 848 ret = self.servers[symlstr](load) 849 # some *fs do not handle prefix. Ensure it is filtered 850 prefix = load.get("prefix", "").strip("/") 851 if prefix != "": 852 ret = {x: y for x, y in ret.items() if x.startswith(prefix)} 853 return ret 854 855 856class FSChan: 857 """ 858 A class that mimics the transport channels allowing for local access to 859 to the fileserver class class structure 860 """ 861 862 def __init__(self, opts, **kwargs): 863 self.opts = opts 864 self.kwargs = kwargs 865 self.fs = Fileserver(self.opts) 866 self.fs.init() 867 if self.opts.get("file_client", "remote") == "local": 868 if "__fs_update" not in self.opts: 869 self.fs.update() 870 self.opts["__fs_update"] = True 871 else: 872 self.fs.update() 873 self.cmd_stub = {"master_tops": {}, "ext_nodes": {}} 874 875 def send( 876 self, load, tries=None, timeout=None, raw=False 877 ): # pylint: disable=unused-argument 878 """ 879 Emulate the channel send method, the tries and timeout are not used 880 """ 881 if "cmd" not in load: 882 log.error("Malformed request, no cmd: %s", load) 883 return {} 884 cmd = load["cmd"].lstrip("_") 885 if cmd in self.cmd_stub: 886 return self.cmd_stub[cmd] 887 if not hasattr(self.fs, cmd): 888 log.error("Malformed request, invalid cmd: %s", load) 889 return {} 890 return getattr(self.fs, cmd)(load) 891 892 def close(self): 893 pass 894