1# -*- coding: utf-8 -*- 2# (c) 2009-2021 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav 3# Licensed under the MIT license: 4# http://www.opensource.org/licenses/mit-license.php 5""" 6WSGI middleware that handles GET requests on collections to display directories. 7""" 8import os 9import sys 10from fnmatch import fnmatch 11 12from jinja2 import Environment, FileSystemLoader 13 14from wsgidav import __version__, compat, util 15from wsgidav.dav_error import HTTP_MEDIATYPE_NOT_SUPPORTED, HTTP_OK, DAVError 16from wsgidav.middleware import BaseMiddleware 17from wsgidav.util import safe_re_encode 18 19__docformat__ = "reStructuredText" 20 21_logger = util.get_module_logger(__name__) 22 23ASSET_SHARE = "/:dir_browser" 24 25DAVMOUNT_TEMPLATE = """ 26<dm:mount xmlns:dm="http://purl.org/NET/webdav/mount"> 27 <dm:url>{}</dm:url> 28</dm:mount> 29""".strip() 30 31msOfficeTypeToExtMap = { 32 "excel": ("xls", "xlt", "xlm", "xlsm", "xlsx", "xltm", "xltx"), 33 "powerpoint": ("pps", "ppt", "pptm", "pptx", "potm", "potx", "ppsm", "ppsx"), 34 "word": ("doc", "dot", "docm", "docx", "dotm", "dotx"), 35 "visio": ("vsd", "vsdm", "vsdx", "vstm", "vstx"), 36} 37msOfficeExtToTypeMap = {} 38for t, el in msOfficeTypeToExtMap.items(): 39 for e in el: 40 msOfficeExtToTypeMap[e] = t 41 42 43class WsgiDavDirBrowser(BaseMiddleware): 44 """WSGI middleware that handles GET requests on collections to display directories.""" 45 46 def __init__(self, wsgidav_app, next_app, config): 47 super(WsgiDavDirBrowser, self).__init__(wsgidav_app, next_app, config) 48 self.dir_config = config.get("dir_browser", {}) 49 50 htdocs_path = self.dir_config.get("htdocs_path") 51 if htdocs_path: 52 self.htdocs_path = os.path.realpath(htdocs_path) 53 else: 54 self.htdocs_path = os.path.join(os.path.dirname(__file__), "htdocs") 55 56 if not os.path.isdir(self.htdocs_path): 57 raise ValueError( 58 "Invalid dir_browser htdocs_path {!r}".format(self.htdocs_path) 59 ) 60 61 # Add an additional read-only FS provider that serves the dir_browser assets 62 self.wsgidav_app.add_provider(ASSET_SHARE, self.htdocs_path, readonly=True) 63 # and make sure we have anonymous access there 64 config.get("simple_dc", {}).get("user_mapping", {}).setdefault( 65 ASSET_SHARE, True 66 ) 67 68 # Prepare a Jinja2 template 69 templateLoader = FileSystemLoader(searchpath=self.htdocs_path) 70 templateEnv = Environment(loader=templateLoader) 71 self.template = templateEnv.get_template("template.html") 72 73 def is_disabled(self): 74 return self.dir_config.get("enable") is False 75 76 def __call__(self, environ, start_response): 77 path = environ["PATH_INFO"] 78 79 dav_res = None 80 if environ["wsgidav.provider"]: 81 dav_res = environ["wsgidav.provider"].get_resource_inst(path, environ) 82 83 if ( 84 environ["REQUEST_METHOD"] in ("GET", "HEAD") 85 and dav_res 86 and dav_res.is_collection 87 ): 88 89 if util.get_content_length(environ) != 0: 90 self._fail( 91 HTTP_MEDIATYPE_NOT_SUPPORTED, 92 "The server does not handle any body content.", 93 ) 94 95 if environ["REQUEST_METHOD"] == "HEAD": 96 return util.send_status_response( 97 environ, start_response, HTTP_OK, is_head=True 98 ) 99 100 # Support DAV mount (http://www.ietf.org/rfc/rfc4709.txt) 101 if self.dir_config.get("davmount") and "davmount" in environ.get( 102 "QUERY_STRING", "" 103 ): 104 collectionUrl = util.make_complete_url(environ) 105 collectionUrl = collectionUrl.split("?", 1)[0] 106 res = compat.to_bytes(DAVMOUNT_TEMPLATE.format(collectionUrl)) 107 # TODO: support <dm:open>%s</dm:open> 108 109 start_response( 110 "200 OK", 111 [ 112 ("Content-Type", "application/davmount+xml"), 113 ("Content-Length", str(len(res))), 114 ("Cache-Control", "private"), 115 ("Date", util.get_rfc1123_time()), 116 ], 117 ) 118 return [res] 119 120 context = self._get_context(environ, dav_res) 121 122 res = self.template.render(**context) 123 res = compat.to_bytes(res) 124 start_response( 125 "200 OK", 126 [ 127 ("Content-Type", "text/html"), 128 ("Content-Length", str(len(res))), 129 ("Cache-Control", "private"), 130 ("Date", util.get_rfc1123_time()), 131 ], 132 ) 133 return [res] 134 135 return self.next_app(environ, start_response) 136 137 def _fail(self, value, context_info=None, src_exception=None, err_condition=None): 138 """Wrapper to raise (and log) DAVError.""" 139 e = DAVError(value, context_info, src_exception, err_condition) 140 if self.verbose >= 4: 141 _logger.warning( 142 "Raising DAVError {}".format( 143 safe_re_encode(e.get_user_info(), sys.stdout.encoding) 144 ) 145 ) 146 raise e 147 148 def _get_context(self, environ, dav_res): 149 """ 150 @see: http://www.webdav.org/specs/rfc4918.html#rfc.section.9.4 151 """ 152 assert dav_res.is_collection 153 154 is_readonly = environ["wsgidav.provider"].is_readonly() 155 156 context = { 157 "htdocs": (self.config.get("mount_path") or "") + ASSET_SHARE, 158 "rows": [], 159 "version": __version__, 160 "display_path": compat.unquote(dav_res.get_href()), 161 "url": dav_res.get_href(), # util.make_complete_url(environ), 162 "parent_url": util.get_uri_parent(dav_res.get_href()), 163 "config": self.dir_config, 164 "is_readonly": is_readonly, 165 "access": "read-only" if is_readonly else "read-write", 166 "is_authenticated": False, 167 } 168 169 trailer = self.dir_config.get("response_trailer") 170 if trailer is True: 171 trailer = "${version} - ${time}" 172 173 if trailer: 174 trailer = trailer.replace( 175 "${version}", 176 "<a href='https://github.com/mar10/wsgidav/'>WsgiDAV/{}</a>".format( 177 __version__ 178 ), 179 ) 180 trailer = trailer.replace("${time}", util.get_rfc1123_time()) 181 182 context["trailer"] = trailer 183 184 rows = context["rows"] 185 186 # Ask collection for member info list 187 dirInfoList = dav_res.get_directory_info() 188 189 if dirInfoList is None: 190 # No pre-build info: traverse members 191 dirInfoList = [] 192 childList = dav_res.get_descendants(depth="1", add_self=False) 193 for res in childList: 194 di = res.get_display_info() 195 href = res.get_href() 196 ofe_prefix = None 197 tr_classes = [] 198 a_classes = [] 199 if res.is_collection: 200 tr_classes.append("directory") 201 202 if not is_readonly and not res.is_collection: 203 ext = os.path.splitext(href)[1].lstrip(".").lower() 204 officeType = msOfficeExtToTypeMap.get(ext) 205 if officeType: 206 if self.dir_config.get("ms_sharepoint_support"): 207 ofe_prefix = "ms-{}:ofe|u|".format(officeType) 208 a_classes.append("msoffice") 209 # elif self.dir_config.get("ms_sharepoint_plugin"): 210 # a_classes.append("msoffice") 211 # elif self.dir_config.get("ms_sharepoint_urls"): 212 # href = "ms-{}:ofe|u|{}".format(officeType, href) 213 214 entry = { 215 "href": href, 216 "ofe_prefix": ofe_prefix, 217 "a_class": " ".join(a_classes), 218 "tr_class": " ".join(tr_classes), 219 "display_name": res.get_display_name(), 220 "last_modified": res.get_last_modified(), 221 "is_collection": res.is_collection, 222 "content_length": res.get_content_length(), 223 "display_type": di.get("type"), 224 "display_type_comment": di.get("typeComment"), 225 } 226 227 dirInfoList.append(entry) 228 # 229 ignore_patterns = self.dir_config.get("ignore", []) 230 if compat.is_basestring(ignore_patterns): 231 ignore_patterns = ignore_patterns.split(",") 232 233 ignored_list = [] 234 for entry in dirInfoList: 235 # Skip ignore patterns 236 ignore = False 237 for pat in ignore_patterns: 238 if fnmatch(entry["display_name"], pat): 239 ignored_list.append(entry["display_name"]) 240 # _logger.debug("Ignore {}".format(entry["display_name"])) 241 ignore = True 242 break 243 if ignore: 244 continue 245 # 246 last_modified = entry.get("last_modified") 247 if last_modified is None: 248 entry["str_modified"] = "" 249 else: 250 entry["str_modified"] = util.get_rfc1123_time(last_modified) 251 252 entry["str_size"] = "-" 253 if not entry.get("is_collection"): 254 content_length = entry.get("content_length") 255 if content_length is not None: 256 entry["str_size"] = util.byte_number_string(content_length) 257 258 rows.append(entry) 259 if ignored_list: 260 _logger.debug( 261 "Dir browser ignored {} entries: {}".format( 262 len(ignored_list), ignored_list 263 ) 264 ) 265 266 # sort 267 sort = "name" 268 if sort == "name": 269 rows.sort( 270 key=lambda v: "{}{}".format( 271 not v["is_collection"], v["display_name"].lower() 272 ) 273 ) 274 275 if "wsgidav.auth.user_name" in environ: 276 context.update( 277 { 278 "is_authenticated": True, 279 "user_name": (environ.get("wsgidav.auth.user_name") or "anonymous"), 280 "realm": environ.get("wsgidav.auth.realm"), 281 "user_roles": ", ".join(environ.get("wsgidav.auth.roles") or []), 282 "user_permissions": ", ".join( 283 environ.get("wsgidav.auth.permissions") or [] 284 ), 285 } 286 ) 287 288 return context 289