1"""
2File server pluggable modules and generic backend functions
3"""
4
5
6import errno
7import fnmatch
8import logging
9import os
10import re
11import time
12from collections.abc import Sequence
13
14import salt.loader
15import salt.utils.data
16import salt.utils.files
17import salt.utils.path
18import salt.utils.url
19import salt.utils.versions
20from salt.utils.args import get_function_argspec as _argspec
21from salt.utils.decorators import ensure_unicode_args
22
23log = logging.getLogger(__name__)
24
25
26def _unlock_cache(w_lock):
27    """
28    Unlock a FS file/dir based lock
29    """
30    if not os.path.exists(w_lock):
31        return
32    try:
33        if os.path.isdir(w_lock):
34            os.rmdir(w_lock)
35        elif os.path.isfile(w_lock):
36            os.unlink(w_lock)
37    except OSError as exc:
38        log.trace("Error removing lockfile %s: %s", w_lock, exc)
39
40
41def _lock_cache(w_lock):
42    try:
43        os.mkdir(w_lock)
44    except OSError as exc:
45        if exc.errno != errno.EEXIST:
46            raise
47        return False
48    else:
49        log.trace("Lockfile %s created", w_lock)
50        return True
51
52
53def wait_lock(lk_fn, dest, wait_timeout=0):
54    """
55    If the write lock is there, check to see if the file is actually being
56    written. If there is no change in the file size after a short sleep,
57    remove the lock and move forward.
58    """
59    if not os.path.exists(lk_fn):
60        return False
61    if not os.path.exists(dest):
62        # The dest is not here, sleep for a bit, if the dest is not here yet
63        # kill the lockfile and start the write
64        time.sleep(1)
65        if not os.path.isfile(dest):
66            _unlock_cache(lk_fn)
67            return False
68    timeout = None
69    if wait_timeout:
70        timeout = time.time() + wait_timeout
71    # There is a lock file, the dest is there, stat the dest, sleep and check
72    # that the dest is being written, if it is not being written kill the lock
73    # file and continue. Also check if the lock file is gone.
74    s_count = 0
75    s_size = os.stat(dest).st_size
76    while True:
77        time.sleep(1)
78        if not os.path.exists(lk_fn):
79            return False
80        size = os.stat(dest).st_size
81        if size == s_size:
82            s_count += 1
83            if s_count >= 3:
84                # The file is not being written to, kill the lock and proceed
85                _unlock_cache(lk_fn)
86                return False
87        else:
88            s_size = size
89        if timeout:
90            if time.time() > timeout:
91                raise ValueError(
92                    "Timeout({}s) for {} (lock: {}) elapsed".format(
93                        wait_timeout, dest, lk_fn
94                    )
95                )
96    return False
97
98
99def check_file_list_cache(opts, form, list_cache, w_lock):
100    """
101    Checks the cache file to see if there is a new enough file list cache, and
102    returns the match (if found, along with booleans used by the fileserver
103    backend to determine if the cache needs to be refreshed/written).
104    """
105    refresh_cache = False
106    save_cache = True
107    wait_lock(w_lock, list_cache, 5 * 60)
108    if not os.path.isfile(list_cache) and _lock_cache(w_lock):
109        refresh_cache = True
110    else:
111        attempt = 0
112        while attempt < 11:
113            try:
114                if os.path.exists(w_lock):
115                    # wait for a filelist lock for max 15min
116                    wait_lock(w_lock, list_cache, 15 * 60)
117                if os.path.exists(list_cache):
118                    # calculate filelist age is possible
119                    cache_stat = os.stat(list_cache)
120                    # st_time can have a greater precision than time, removing
121                    # float precision makes sure age will never be a negative
122                    # number.
123                    current_time = int(time.time())
124                    file_mtime = int(cache_stat.st_mtime)
125                    if file_mtime > current_time:
126                        log.debug(
127                            "Cache file modified time is in the future, ignoring. "
128                            "file=%s mtime=%s current_time=%s",
129                            list_cache,
130                            current_time,
131                            file_mtime,
132                        )
133                        age = -1
134                    else:
135                        age = current_time - file_mtime
136                else:
137                    # if filelist does not exists yet, mark it as expired
138                    age = opts.get("fileserver_list_cache_time", 20) + 1
139                if age < 0:
140                    # Cache is from the future! Warn and mark cache invalid.
141                    log.warning("The file list_cache was created in the future!")
142                if 0 <= age < opts.get("fileserver_list_cache_time", 20):
143                    # Young enough! Load this sucker up!
144                    with salt.utils.files.fopen(list_cache, "rb") as fp_:
145                        log.debug(
146                            "Returning file list from cache: age=%s cache_time=%s %s",
147                            age,
148                            opts.get("fileserver_list_cache_time", 20),
149                            list_cache,
150                        )
151                        return (
152                            salt.utils.data.decode(
153                                salt.payload.load(fp_).get(form, [])
154                            ),
155                            False,
156                            False,
157                        )
158                elif _lock_cache(w_lock):
159                    # Set the w_lock and go
160                    refresh_cache = True
161                    break
162            except Exception:  # pylint: disable=broad-except
163                time.sleep(0.2)
164                attempt += 1
165                continue
166        if attempt > 10:
167            save_cache = False
168            refresh_cache = True
169    return None, refresh_cache, save_cache
170
171
172def write_file_list_cache(opts, data, list_cache, w_lock):
173    """
174    Checks the cache file to see if there is a new enough file list cache, and
175    returns the match (if found, along with booleans used by the fileserver
176    backend to determine if the cache needs to be refreshed/written).
177    """
178    with salt.utils.files.fopen(list_cache, "w+b") as fp_:
179        fp_.write(salt.payload.dumps(data))
180        _unlock_cache(w_lock)
181        log.trace("Lockfile %s removed", w_lock)
182
183
184def check_env_cache(opts, env_cache):
185    """
186    Returns cached env names, if present. Otherwise returns None.
187    """
188    if not os.path.isfile(env_cache):
189        return None
190    try:
191        with salt.utils.files.fopen(env_cache, "rb") as fp_:
192            log.trace("Returning env cache data from %s", env_cache)
193            return salt.utils.data.decode(salt.payload.load(fp_))
194    except OSError:
195        pass
196    return None
197
198
199def generate_mtime_map(opts, path_map):
200    """
201    Generate a dict of filename -> mtime
202    """
203    file_map = {}
204    for saltenv, path_list in path_map.items():
205        for path in path_list:
206            for directory, _, filenames in salt.utils.path.os_walk(path):
207                for item in filenames:
208                    try:
209                        file_path = os.path.join(directory, item)
210                        # Don't walk any directories that match
211                        # file_ignore_regex or glob
212                        if is_file_ignored(opts, file_path):
213                            continue
214                        file_map[file_path] = os.path.getmtime(file_path)
215                    except OSError:
216                        # skip dangling symlinks
217                        log.info(
218                            "Failed to get mtime on %s, dangling symlink?", file_path
219                        )
220                        continue
221    return file_map
222
223
224def diff_mtime_map(map1, map2):
225    """
226    Is there a change to the mtime map? return a boolean
227    """
228    # check if the mtimes are the same
229    if sorted(map1) != sorted(map2):
230        return True
231
232    # map1 and map2 are guaranteed to have same keys,
233    # so compare mtimes
234    for filename, mtime in map1.items():
235        if map2[filename] != mtime:
236            return True
237
238    # we made it, that means we have no changes
239    return False
240
241
242def reap_fileserver_cache_dir(cache_base, find_func):
243    """
244    Remove unused cache items assuming the cache directory follows a directory
245    convention:
246
247    cache_base -> saltenv -> relpath
248    """
249    for saltenv in os.listdir(cache_base):
250        env_base = os.path.join(cache_base, saltenv)
251        for root, dirs, files in salt.utils.path.os_walk(env_base):
252            # if we have an empty directory, lets cleanup
253            # This will only remove the directory on the second time
254            # "_reap_cache" is called (which is intentional)
255            if not dirs and not files:
256                # only remove if empty directory is older than 60s
257                if time.time() - os.path.getctime(root) > 60:
258                    os.rmdir(root)
259                continue
260            # if not, lets check the files in the directory
261            for file_ in files:
262                file_path = os.path.join(root, file_)
263                file_rel_path = os.path.relpath(file_path, env_base)
264                try:
265                    filename, _, hash_type = file_rel_path.rsplit(".", 2)
266                except ValueError:
267                    log.warning(
268                        "Found invalid hash file [%s] when attempting to reap "
269                        "cache directory",
270                        file_,
271                    )
272                    continue
273                # do we have the file?
274                ret = find_func(filename, saltenv=saltenv)
275                # if we don't actually have the file, lets clean up the cache
276                # object
277                if ret["path"] == "":
278                    os.unlink(file_path)
279
280
281def is_file_ignored(opts, fname):
282    """
283    If file_ignore_regex or file_ignore_glob were given in config,
284    compare the given file path against all of them and return True
285    on the first match.
286    """
287    if opts["file_ignore_regex"]:
288        for regex in opts["file_ignore_regex"]:
289            if re.search(regex, fname):
290                log.debug("File matching file_ignore_regex. Skipping: %s", fname)
291                return True
292
293    if opts["file_ignore_glob"]:
294        for glob in opts["file_ignore_glob"]:
295            if fnmatch.fnmatch(fname, glob):
296                log.debug("File matching file_ignore_glob. Skipping: %s", fname)
297                return True
298    return False
299
300
301def clear_lock(clear_func, role, remote=None, lock_type="update"):
302    """
303    Function to allow non-fileserver functions to clear update locks
304
305    clear_func
306        A function reference. This function will be run (with the ``remote``
307        param as an argument) to clear the lock, and must return a 2-tuple of
308        lists, one containing messages describing successfully cleared locks,
309        and one containing messages describing errors encountered.
310
311    role
312        What type of lock is being cleared (gitfs, git_pillar, etc.). Used
313        solely for logging purposes.
314
315    remote
316        Optional string which should be used in ``func`` to pattern match so
317        that a subset of remotes can be targeted.
318
319    lock_type : update
320        Which type of lock to clear
321
322    Returns the return data from ``clear_func``.
323    """
324    msg = "Clearing {} lock for {} remotes".format(lock_type, role)
325    if remote:
326        msg += " matching {}".format(remote)
327    log.debug(msg)
328    return clear_func(remote=remote, lock_type=lock_type)
329
330
331class Fileserver:
332    """
333    Create a fileserver wrapper object that wraps the fileserver functions and
334    iterates over them to execute the desired function within the scope of the
335    desired fileserver backend.
336    """
337
338    def __init__(self, opts):
339        self.opts = opts
340        self.servers = salt.loader.fileserver(opts, opts["fileserver_backend"])
341
342    def backends(self, back=None):
343        """
344        Return the backend list
345        """
346        if not back:
347            back = self.opts["fileserver_backend"]
348        else:
349            if not isinstance(back, list):
350                try:
351                    back = back.split(",")
352                except AttributeError:
353                    back = str(back).split(",")
354
355        if isinstance(back, Sequence):
356            # The test suite uses an ImmutableList type (based on
357            # collections.abc.Sequence) for lists, which breaks this function in
358            # the test suite. This normalizes the value from the opts into a
359            # list if it is based on collections.abc.Sequence.
360            back = list(back)
361
362        ret = []
363        if not isinstance(back, list):
364            return ret
365
366        # Avoid error logging when performing lookups in the LazyDict by
367        # instead doing the membership check on the result of a call to its
368        # .keys() attribute rather than on the LazyDict itself.
369        server_funcs = self.servers.keys()
370        try:
371            subtract_only = all(x.startswith("-") for x in back)
372        except AttributeError:
373            pass
374        else:
375            if subtract_only:
376                # Only subtracting backends from enabled ones
377                ret = self.opts["fileserver_backend"]
378                for sub in back:
379                    if "{}.envs".format(sub[1:]) in server_funcs:
380                        ret.remove(sub[1:])
381                return ret
382
383        for sub in back:
384            if "{}.envs".format(sub) in server_funcs:
385                ret.append(sub)
386        return ret
387
388    def master_opts(self, load):
389        """
390        Simplify master opts
391        """
392        return self.opts
393
394    def update_opts(self):
395        # This fix func monkey patching by pillar
396        for name, func in self.servers.items():
397            try:
398                if "__opts__" in func.__globals__:
399                    func.__globals__["__opts__"].update(self.opts)
400            except AttributeError:
401                pass
402
403    def clear_cache(self, back=None):
404        """
405        Clear the cache of all of the fileserver backends that support the
406        clear_cache function or the named backend(s) only.
407        """
408        back = self.backends(back)
409        cleared = []
410        errors = []
411        for fsb in back:
412            fstr = "{}.clear_cache".format(fsb)
413            if fstr in self.servers:
414                log.debug("Clearing %s fileserver cache", fsb)
415                failed = self.servers[fstr]()
416                if failed:
417                    errors.extend(failed)
418                else:
419                    cleared.append(
420                        "The {} fileserver cache was successfully cleared".format(fsb)
421                    )
422        return cleared, errors
423
424    def lock(self, back=None, remote=None):
425        """
426        ``remote`` can either be a dictionary containing repo configuration
427        information, or a pattern. If the latter, then remotes for which the URL
428        matches the pattern will be locked.
429        """
430        back = self.backends(back)
431        locked = []
432        errors = []
433        for fsb in back:
434            fstr = "{}.lock".format(fsb)
435            if fstr in self.servers:
436                msg = "Setting update lock for {} remotes".format(fsb)
437                if remote:
438                    if not isinstance(remote, str):
439                        errors.append(
440                            "Badly formatted remote pattern '{}'".format(remote)
441                        )
442                        continue
443                    else:
444                        msg += " matching {}".format(remote)
445                log.debug(msg)
446                good, bad = self.servers[fstr](remote=remote)
447                locked.extend(good)
448                errors.extend(bad)
449        return locked, errors
450
451    def clear_lock(self, back=None, remote=None):
452        """
453        Clear the update lock for the enabled fileserver backends
454
455        back
456            Only clear the update lock for the specified backend(s). The
457            default is to clear the lock for all enabled backends
458
459        remote
460            If specified, then any remotes which contain the passed string will
461            have their lock cleared.
462        """
463        back = self.backends(back)
464        cleared = []
465        errors = []
466        for fsb in back:
467            fstr = "{}.clear_lock".format(fsb)
468            if fstr in self.servers:
469                good, bad = clear_lock(self.servers[fstr], fsb, remote=remote)
470                cleared.extend(good)
471                errors.extend(bad)
472        return cleared, errors
473
474    def update(self, back=None, **kwargs):
475        """
476        Update all of the enabled fileserver backends which support the update
477        function
478        """
479        back = self.backends(back)
480        for fsb in back:
481            fstr = "{}.update".format(fsb)
482            if fstr in self.servers:
483                log.debug("Updating %s fileserver cache", fsb)
484                self.servers[fstr](**kwargs)
485
486    def update_intervals(self, back=None):
487        """
488        Return the update intervals for all of the enabled fileserver backends
489        which support variable update intervals.
490        """
491        back = self.backends(back)
492        ret = {}
493        for fsb in back:
494            fstr = "{}.update_intervals".format(fsb)
495            if fstr in self.servers:
496                ret[fsb] = self.servers[fstr]()
497        return ret
498
499    def envs(self, back=None, sources=False):
500        """
501        Return the environments for the named backend or all backends
502        """
503        back = self.backends(back)
504        ret = set()
505        if sources:
506            ret = {}
507        for fsb in back:
508            fstr = "{}.envs".format(fsb)
509            kwargs = (
510                {"ignore_cache": True}
511                if "ignore_cache" in _argspec(self.servers[fstr]).args
512                and self.opts["__role"] == "minion"
513                else {}
514            )
515            if sources:
516                ret[fsb] = self.servers[fstr](**kwargs)
517            else:
518                ret.update(self.servers[fstr](**kwargs))
519        if sources:
520            return ret
521        return list(ret)
522
523    def file_envs(self, load=None):
524        """
525        Return environments for all backends for requests from fileclient
526        """
527        if load is None:
528            load = {}
529        load.pop("cmd", None)
530        return self.envs(**load)
531
532    def init(self, back=None):
533        """
534        Initialize the backend, only do so if the fs supports an init function
535        """
536        back = self.backends(back)
537        for fsb in back:
538            fstr = "{}.init".format(fsb)
539            if fstr in self.servers:
540                self.servers[fstr]()
541
542    def _find_file(self, load):
543        """
544        Convenience function for calls made using the RemoteClient
545        """
546        path = load.get("path")
547        if not path:
548            return {"path": "", "rel": ""}
549        tgt_env = load.get("saltenv", "base")
550        return self.find_file(path, tgt_env)
551
552    def file_find(self, load):
553        """
554        Convenience function for calls made using the LocalClient
555        """
556        path = load.get("path")
557        if not path:
558            return {"path": "", "rel": ""}
559        tgt_env = load.get("saltenv", "base")
560        return self.find_file(path, tgt_env)
561
562    def find_file(self, path, saltenv, back=None):
563        """
564        Find the path and return the fnd structure, this structure is passed
565        to other backend interfaces.
566        """
567        path = salt.utils.stringutils.to_unicode(path)
568        saltenv = salt.utils.stringutils.to_unicode(saltenv)
569        back = self.backends(back)
570        kwargs = {}
571        fnd = {"path": "", "rel": ""}
572        if os.path.isabs(path):
573            return fnd
574        if "../" in path:
575            return fnd
576        if salt.utils.url.is_escaped(path):
577            # don't attempt to find URL query arguments in the path
578            path = salt.utils.url.unescape(path)
579        else:
580            if "?" in path:
581                hcomps = path.split("?")
582                path = hcomps[0]
583                comps = hcomps[1].split("&")
584                for comp in comps:
585                    if "=" not in comp:
586                        # Invalid option, skip it
587                        continue
588                    args = comp.split("=", 1)
589                    kwargs[args[0]] = args[1]
590
591        if "env" in kwargs:
592            # "env" is not supported; Use "saltenv".
593            kwargs.pop("env")
594        if "saltenv" in kwargs:
595            saltenv = kwargs.pop("saltenv")
596
597        if not isinstance(saltenv, str):
598            saltenv = str(saltenv)
599
600        for fsb in back:
601            fstr = "{}.find_file".format(fsb)
602            if fstr in self.servers:
603                fnd = self.servers[fstr](path, saltenv, **kwargs)
604                if fnd.get("path"):
605                    fnd["back"] = fsb
606                    return fnd
607        return fnd
608
609    def serve_file(self, load):
610        """
611        Serve up a chunk of a file
612        """
613        ret = {"data": "", "dest": ""}
614
615        if "env" in load:
616            # "env" is not supported; Use "saltenv".
617            load.pop("env")
618
619        if "path" not in load or "loc" not in load or "saltenv" not in load:
620            return ret
621        if not isinstance(load["saltenv"], str):
622            load["saltenv"] = str(load["saltenv"])
623
624        fnd = self.find_file(load["path"], load["saltenv"])
625        if not fnd.get("back"):
626            return ret
627        fstr = "{}.serve_file".format(fnd["back"])
628        if fstr in self.servers:
629            return self.servers[fstr](load, fnd)
630        return ret
631
632    def __file_hash_and_stat(self, load):
633        """
634        Common code for hashing and stating files
635        """
636        if "env" in load:
637            # "env" is not supported; Use "saltenv".
638            load.pop("env")
639
640        if "path" not in load or "saltenv" not in load:
641            return "", None
642        if not isinstance(load["saltenv"], str):
643            load["saltenv"] = str(load["saltenv"])
644
645        fnd = self.find_file(
646            salt.utils.stringutils.to_unicode(load["path"]), load["saltenv"]
647        )
648        if not fnd.get("back"):
649            return "", None
650        stat_result = fnd.get("stat", None)
651        fstr = "{}.file_hash".format(fnd["back"])
652        if fstr in self.servers:
653            return self.servers[fstr](load, fnd), stat_result
654        return "", None
655
656    def file_hash(self, load):
657        """
658        Return the hash of a given file
659        """
660        try:
661            return self.__file_hash_and_stat(load)[0]
662        except (IndexError, TypeError):
663            return ""
664
665    def file_hash_and_stat(self, load):
666        """
667        Return the hash and stat result of a given file
668        """
669        try:
670            return self.__file_hash_and_stat(load)
671        except (IndexError, TypeError):
672            return "", None
673
674    def clear_file_list_cache(self, load):
675        """
676        Deletes the file_lists cache files
677        """
678        if "env" in load:
679            # "env" is not supported; Use "saltenv".
680            load.pop("env")
681
682        saltenv = load.get("saltenv", [])
683        if saltenv is not None:
684            if not isinstance(saltenv, list):
685                try:
686                    saltenv = [x.strip() for x in saltenv.split(",")]
687                except AttributeError:
688                    saltenv = [x.strip() for x in str(saltenv).split(",")]
689
690            for idx, val in enumerate(saltenv):
691                if not isinstance(val, str):
692                    saltenv[idx] = str(val)
693
694        ret = {}
695        fsb = self.backends(load.pop("fsbackend", None))
696        list_cachedir = os.path.join(self.opts["cachedir"], "file_lists")
697        try:
698            file_list_backends = os.listdir(list_cachedir)
699        except OSError as exc:
700            if exc.errno == errno.ENOENT:
701                log.debug("No file list caches found")
702                return {}
703            else:
704                log.error(
705                    "Failed to get list of saltenvs for which the master has "
706                    "cached file lists: %s",
707                    exc,
708                )
709
710        for back in file_list_backends:
711            try:
712                cache_files = os.listdir(os.path.join(list_cachedir, back))
713            except OSError as exc:
714                log.error(
715                    "Failed to find file list caches for saltenv '%s': %s", back, exc
716                )
717                continue
718            for cache_file in cache_files:
719                try:
720                    cache_saltenv, extension = cache_file.rsplit(".", 1)
721                except ValueError:
722                    # Filename has no dot in it. Not a cache file, ignore.
723                    continue
724                if extension != "p":
725                    # Filename does not end in ".p". Not a cache file, ignore.
726                    continue
727                elif back not in fsb or (
728                    saltenv is not None and cache_saltenv not in saltenv
729                ):
730                    log.debug(
731                        "Skipping %s file list cache for saltenv '%s'",
732                        back,
733                        cache_saltenv,
734                    )
735                    continue
736                try:
737                    os.remove(os.path.join(list_cachedir, back, cache_file))
738                except OSError as exc:
739                    if exc.errno != errno.ENOENT:
740                        log.error("Failed to remove %s: %s", exc.filename, exc.strerror)
741                else:
742                    ret.setdefault(back, []).append(cache_saltenv)
743                    log.debug(
744                        "Removed %s file list cache for saltenv '%s'",
745                        cache_saltenv,
746                        back,
747                    )
748
749        # Ensure reproducible ordering of returns
750        for key in ret:
751            ret[key].sort()
752
753        return ret
754
755    @ensure_unicode_args
756    def file_list(self, load):
757        """
758        Return a list of files from the dominant environment
759        """
760        if "env" in load:
761            # "env" is not supported; Use "saltenv".
762            load.pop("env")
763
764        ret = set()
765        if "saltenv" not in load:
766            return []
767        if not isinstance(load["saltenv"], str):
768            load["saltenv"] = str(load["saltenv"])
769
770        for fsb in self.backends(load.pop("fsbackend", None)):
771            fstr = "{}.file_list".format(fsb)
772            if fstr in self.servers:
773                ret.update(self.servers[fstr](load))
774        # some *fs do not handle prefix. Ensure it is filtered
775        prefix = load.get("prefix", "").strip("/")
776        if prefix != "":
777            ret = [f for f in ret if f.startswith(prefix)]
778        return sorted(ret)
779
780    @ensure_unicode_args
781    def file_list_emptydirs(self, load):
782        """
783        List all emptydirs in the given environment
784        """
785        if "env" in load:
786            # "env" is not supported; Use "saltenv".
787            load.pop("env")
788
789        ret = set()
790        if "saltenv" not in load:
791            return []
792        if not isinstance(load["saltenv"], str):
793            load["saltenv"] = str(load["saltenv"])
794
795        for fsb in self.backends(None):
796            fstr = "{}.file_list_emptydirs".format(fsb)
797            if fstr in self.servers:
798                ret.update(self.servers[fstr](load))
799        # some *fs do not handle prefix. Ensure it is filtered
800        prefix = load.get("prefix", "").strip("/")
801        if prefix != "":
802            ret = [f for f in ret if f.startswith(prefix)]
803        return sorted(ret)
804
805    @ensure_unicode_args
806    def dir_list(self, load):
807        """
808        List all directories in the given environment
809        """
810        if "env" in load:
811            # "env" is not supported; Use "saltenv".
812            load.pop("env")
813
814        ret = set()
815        if "saltenv" not in load:
816            return []
817        if not isinstance(load["saltenv"], str):
818            load["saltenv"] = str(load["saltenv"])
819
820        for fsb in self.backends(load.pop("fsbackend", None)):
821            fstr = "{}.dir_list".format(fsb)
822            if fstr in self.servers:
823                ret.update(self.servers[fstr](load))
824        # some *fs do not handle prefix. Ensure it is filtered
825        prefix = load.get("prefix", "").strip("/")
826        if prefix != "":
827            ret = [f for f in ret if f.startswith(prefix)]
828        return sorted(ret)
829
830    @ensure_unicode_args
831    def symlink_list(self, load):
832        """
833        Return a list of symlinked files and dirs
834        """
835        if "env" in load:
836            # "env" is not supported; Use "saltenv".
837            load.pop("env")
838
839        ret = {}
840        if "saltenv" not in load:
841            return {}
842        if not isinstance(load["saltenv"], str):
843            load["saltenv"] = str(load["saltenv"])
844
845        for fsb in self.backends(load.pop("fsbackend", None)):
846            symlstr = "{}.symlink_list".format(fsb)
847            if symlstr in self.servers:
848                ret = self.servers[symlstr](load)
849        # some *fs do not handle prefix. Ensure it is filtered
850        prefix = load.get("prefix", "").strip("/")
851        if prefix != "":
852            ret = {x: y for x, y in ret.items() if x.startswith(prefix)}
853        return ret
854
855
856class FSChan:
857    """
858    A class that mimics the transport channels allowing for local access to
859    to the fileserver class class structure
860    """
861
862    def __init__(self, opts, **kwargs):
863        self.opts = opts
864        self.kwargs = kwargs
865        self.fs = Fileserver(self.opts)
866        self.fs.init()
867        if self.opts.get("file_client", "remote") == "local":
868            if "__fs_update" not in self.opts:
869                self.fs.update()
870                self.opts["__fs_update"] = True
871        else:
872            self.fs.update()
873        self.cmd_stub = {"master_tops": {}, "ext_nodes": {}}
874
875    def send(
876        self, load, tries=None, timeout=None, raw=False
877    ):  # pylint: disable=unused-argument
878        """
879        Emulate the channel send method, the tries and timeout are not used
880        """
881        if "cmd" not in load:
882            log.error("Malformed request, no cmd: %s", load)
883            return {}
884        cmd = load["cmd"].lstrip("_")
885        if cmd in self.cmd_stub:
886            return self.cmd_stub[cmd]
887        if not hasattr(self.fs, cmd):
888            log.error("Malformed request, invalid cmd: %s", load)
889            return {}
890        return getattr(self.fs, cmd)(load)
891
892    def close(self):
893        pass
894