1# -*- coding: utf-8 -*-
2# (c) 2009-2021 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
3# Licensed under the MIT license:
4# http://www.opensource.org/licenses/mit-license.php
5"""
6WSGI middleware that handles GET requests on collections to display directories.
7"""
8import os
9import sys
10from fnmatch import fnmatch
11
12from jinja2 import Environment, FileSystemLoader
13
14from wsgidav import __version__, compat, util
15from wsgidav.dav_error import HTTP_MEDIATYPE_NOT_SUPPORTED, HTTP_OK, DAVError
16from wsgidav.middleware import BaseMiddleware
17from wsgidav.util import safe_re_encode
18
19__docformat__ = "reStructuredText"
20
21_logger = util.get_module_logger(__name__)
22
23ASSET_SHARE = "/:dir_browser"
24
25DAVMOUNT_TEMPLATE = """
26<dm:mount xmlns:dm="http://purl.org/NET/webdav/mount">
27  <dm:url>{}</dm:url>
28</dm:mount>
29""".strip()
30
31msOfficeTypeToExtMap = {
32    "excel": ("xls", "xlt", "xlm", "xlsm", "xlsx", "xltm", "xltx"),
33    "powerpoint": ("pps", "ppt", "pptm", "pptx", "potm", "potx", "ppsm", "ppsx"),
34    "word": ("doc", "dot", "docm", "docx", "dotm", "dotx"),
35    "visio": ("vsd", "vsdm", "vsdx", "vstm", "vstx"),
36}
37msOfficeExtToTypeMap = {}
38for t, el in msOfficeTypeToExtMap.items():
39    for e in el:
40        msOfficeExtToTypeMap[e] = t
41
42
43class WsgiDavDirBrowser(BaseMiddleware):
44    """WSGI middleware that handles GET requests on collections to display directories."""
45
46    def __init__(self, wsgidav_app, next_app, config):
47        super(WsgiDavDirBrowser, self).__init__(wsgidav_app, next_app, config)
48        self.dir_config = config.get("dir_browser", {})
49
50        htdocs_path = self.dir_config.get("htdocs_path")
51        if htdocs_path:
52            self.htdocs_path = os.path.realpath(htdocs_path)
53        else:
54            self.htdocs_path = os.path.join(os.path.dirname(__file__), "htdocs")
55
56        if not os.path.isdir(self.htdocs_path):
57            raise ValueError(
58                "Invalid dir_browser htdocs_path {!r}".format(self.htdocs_path)
59            )
60
61        # Add an additional read-only FS provider that serves the dir_browser assets
62        self.wsgidav_app.add_provider(ASSET_SHARE, self.htdocs_path, readonly=True)
63        # and make sure we have anonymous access there
64        config.get("simple_dc", {}).get("user_mapping", {}).setdefault(
65            ASSET_SHARE, True
66        )
67
68        # Prepare a Jinja2 template
69        templateLoader = FileSystemLoader(searchpath=self.htdocs_path)
70        templateEnv = Environment(loader=templateLoader)
71        self.template = templateEnv.get_template("template.html")
72
73    def is_disabled(self):
74        return self.dir_config.get("enable") is False
75
76    def __call__(self, environ, start_response):
77        path = environ["PATH_INFO"]
78
79        dav_res = None
80        if environ["wsgidav.provider"]:
81            dav_res = environ["wsgidav.provider"].get_resource_inst(path, environ)
82
83        if (
84            environ["REQUEST_METHOD"] in ("GET", "HEAD")
85            and dav_res
86            and dav_res.is_collection
87        ):
88
89            if util.get_content_length(environ) != 0:
90                self._fail(
91                    HTTP_MEDIATYPE_NOT_SUPPORTED,
92                    "The server does not handle any body content.",
93                )
94
95            if environ["REQUEST_METHOD"] == "HEAD":
96                return util.send_status_response(
97                    environ, start_response, HTTP_OK, is_head=True
98                )
99
100            # Support DAV mount (http://www.ietf.org/rfc/rfc4709.txt)
101            if self.dir_config.get("davmount") and "davmount" in environ.get(
102                "QUERY_STRING", ""
103            ):
104                collectionUrl = util.make_complete_url(environ)
105                collectionUrl = collectionUrl.split("?", 1)[0]
106                res = compat.to_bytes(DAVMOUNT_TEMPLATE.format(collectionUrl))
107                # TODO: support <dm:open>%s</dm:open>
108
109                start_response(
110                    "200 OK",
111                    [
112                        ("Content-Type", "application/davmount+xml"),
113                        ("Content-Length", str(len(res))),
114                        ("Cache-Control", "private"),
115                        ("Date", util.get_rfc1123_time()),
116                    ],
117                )
118                return [res]
119
120            context = self._get_context(environ, dav_res)
121
122            res = self.template.render(**context)
123            res = compat.to_bytes(res)
124            start_response(
125                "200 OK",
126                [
127                    ("Content-Type", "text/html"),
128                    ("Content-Length", str(len(res))),
129                    ("Cache-Control", "private"),
130                    ("Date", util.get_rfc1123_time()),
131                ],
132            )
133            return [res]
134
135        return self.next_app(environ, start_response)
136
137    def _fail(self, value, context_info=None, src_exception=None, err_condition=None):
138        """Wrapper to raise (and log) DAVError."""
139        e = DAVError(value, context_info, src_exception, err_condition)
140        if self.verbose >= 4:
141            _logger.warning(
142                "Raising DAVError {}".format(
143                    safe_re_encode(e.get_user_info(), sys.stdout.encoding)
144                )
145            )
146        raise e
147
148    def _get_context(self, environ, dav_res):
149        """
150        @see: http://www.webdav.org/specs/rfc4918.html#rfc.section.9.4
151        """
152        assert dav_res.is_collection
153
154        is_readonly = environ["wsgidav.provider"].is_readonly()
155
156        context = {
157            "htdocs": (self.config.get("mount_path") or "") + ASSET_SHARE,
158            "rows": [],
159            "version": __version__,
160            "display_path": compat.unquote(dav_res.get_href()),
161            "url": dav_res.get_href(),  # util.make_complete_url(environ),
162            "parent_url": util.get_uri_parent(dav_res.get_href()),
163            "config": self.dir_config,
164            "is_readonly": is_readonly,
165            "access": "read-only" if is_readonly else "read-write",
166            "is_authenticated": False,
167        }
168
169        trailer = self.dir_config.get("response_trailer")
170        if trailer is True:
171            trailer = "${version} - ${time}"
172
173        if trailer:
174            trailer = trailer.replace(
175                "${version}",
176                "<a href='https://github.com/mar10/wsgidav/'>WsgiDAV/{}</a>".format(
177                    __version__
178                ),
179            )
180            trailer = trailer.replace("${time}", util.get_rfc1123_time())
181
182        context["trailer"] = trailer
183
184        rows = context["rows"]
185
186        # Ask collection for member info list
187        dirInfoList = dav_res.get_directory_info()
188
189        if dirInfoList is None:
190            # No pre-build info: traverse members
191            dirInfoList = []
192            childList = dav_res.get_descendants(depth="1", add_self=False)
193            for res in childList:
194                di = res.get_display_info()
195                href = res.get_href()
196                ofe_prefix = None
197                tr_classes = []
198                a_classes = []
199                if res.is_collection:
200                    tr_classes.append("directory")
201
202                if not is_readonly and not res.is_collection:
203                    ext = os.path.splitext(href)[1].lstrip(".").lower()
204                    officeType = msOfficeExtToTypeMap.get(ext)
205                    if officeType:
206                        if self.dir_config.get("ms_sharepoint_support"):
207                            ofe_prefix = "ms-{}:ofe|u|".format(officeType)
208                            a_classes.append("msoffice")
209                        # elif self.dir_config.get("ms_sharepoint_plugin"):
210                        #     a_classes.append("msoffice")
211                        # elif self.dir_config.get("ms_sharepoint_urls"):
212                        #     href = "ms-{}:ofe|u|{}".format(officeType, href)
213
214                entry = {
215                    "href": href,
216                    "ofe_prefix": ofe_prefix,
217                    "a_class": " ".join(a_classes),
218                    "tr_class": " ".join(tr_classes),
219                    "display_name": res.get_display_name(),
220                    "last_modified": res.get_last_modified(),
221                    "is_collection": res.is_collection,
222                    "content_length": res.get_content_length(),
223                    "display_type": di.get("type"),
224                    "display_type_comment": di.get("typeComment"),
225                }
226
227                dirInfoList.append(entry)
228        #
229        ignore_patterns = self.dir_config.get("ignore", [])
230        if compat.is_basestring(ignore_patterns):
231            ignore_patterns = ignore_patterns.split(",")
232
233        ignored_list = []
234        for entry in dirInfoList:
235            # Skip ignore patterns
236            ignore = False
237            for pat in ignore_patterns:
238                if fnmatch(entry["display_name"], pat):
239                    ignored_list.append(entry["display_name"])
240                    # _logger.debug("Ignore {}".format(entry["display_name"]))
241                    ignore = True
242                    break
243            if ignore:
244                continue
245            #
246            last_modified = entry.get("last_modified")
247            if last_modified is None:
248                entry["str_modified"] = ""
249            else:
250                entry["str_modified"] = util.get_rfc1123_time(last_modified)
251
252            entry["str_size"] = "-"
253            if not entry.get("is_collection"):
254                content_length = entry.get("content_length")
255                if content_length is not None:
256                    entry["str_size"] = util.byte_number_string(content_length)
257
258            rows.append(entry)
259        if ignored_list:
260            _logger.debug(
261                "Dir browser ignored {} entries: {}".format(
262                    len(ignored_list), ignored_list
263                )
264            )
265
266        # sort
267        sort = "name"
268        if sort == "name":
269            rows.sort(
270                key=lambda v: "{}{}".format(
271                    not v["is_collection"], v["display_name"].lower()
272                )
273            )
274
275        if "wsgidav.auth.user_name" in environ:
276            context.update(
277                {
278                    "is_authenticated": True,
279                    "user_name": (environ.get("wsgidav.auth.user_name") or "anonymous"),
280                    "realm": environ.get("wsgidav.auth.realm"),
281                    "user_roles": ", ".join(environ.get("wsgidav.auth.roles") or []),
282                    "user_permissions": ", ".join(
283                        environ.get("wsgidav.auth.permissions") or []
284                    ),
285                }
286            )
287
288        return context
289