1# -*- coding: utf-8 -*- 2# Part of Odoo. See LICENSE file for full copyright and licensing details. 3import logging 4from lxml import etree 5import os 6import unittest 7import time 8 9import pytz 10import werkzeug 11import werkzeug.routing 12import werkzeug.utils 13 14from functools import partial 15 16import odoo 17from odoo import api, models 18from odoo import registry, SUPERUSER_ID 19from odoo.http import request 20from odoo.tools.safe_eval import safe_eval 21from odoo.osv.expression import FALSE_DOMAIN 22from odoo.addons.http_routing.models.ir_http import ModelConverter, _guess_mimetype 23from odoo.addons.portal.controllers.portal import _build_url_w_params 24 25logger = logging.getLogger(__name__) 26 27 28def sitemap_qs2dom(qs, route, field='name'): 29 """ Convert a query_string (can contains a path) to a domain""" 30 dom = [] 31 if qs and qs.lower() not in route: 32 needles = qs.strip('/').split('/') 33 # needles will be altered and keep only element which one is not in route 34 # diff(from=['shop', 'product'], to=['shop', 'product', 'product']) => to=['product'] 35 unittest.util.unorderable_list_difference(route.strip('/').split('/'), needles) 36 if len(needles) == 1: 37 dom = [(field, 'ilike', needles[0])] 38 else: 39 dom = FALSE_DOMAIN 40 return dom 41 42 43def get_request_website(): 44 """ Return the website set on `request` if called in a frontend context 45 (website=True on route). 46 This method can typically be used to check if we are in the frontend. 47 48 This method is easy to mock during python tests to simulate frontend 49 context, rather than mocking every method accessing request.website. 50 51 Don't import directly the method or it won't be mocked during tests, do: 52 ``` 53 from odoo.addons.website.models import ir_http 54 my_var = ir_http.get_request_website() 55 ``` 56 """ 57 return request and getattr(request, 'website', False) or False 58 59 60class Http(models.AbstractModel): 61 _inherit = 'ir.http' 62 63 @classmethod 64 def routing_map(cls, key=None): 65 key = key or (request and request.website_routing) 66 return super(Http, cls).routing_map(key=key) 67 68 @classmethod 69 def clear_caches(cls): 70 super(Http, cls)._clear_routing_map() 71 return super(Http, cls).clear_caches() 72 73 @classmethod 74 def _slug_matching(cls, adapter, endpoint, **kw): 75 for arg in kw: 76 if isinstance(kw[arg], models.BaseModel): 77 kw[arg] = kw[arg].with_context(slug_matching=True) 78 qs = request.httprequest.query_string.decode('utf-8') 79 try: 80 return adapter.build(endpoint, kw) + (qs and '?%s' % qs or '') 81 except odoo.exceptions.MissingError: 82 raise werkzeug.exceptions.NotFound() 83 84 @classmethod 85 def _match(cls, path_info, key=None): 86 key = key or (request and request.website_routing) 87 return super(Http, cls)._match(path_info, key=key) 88 89 @classmethod 90 def _generate_routing_rules(cls, modules, converters): 91 website_id = request.website_routing 92 logger.debug("_generate_routing_rules for website: %s", website_id) 93 domain = [('redirect_type', 'in', ('308', '404')), '|', ('website_id', '=', False), ('website_id', '=', website_id)] 94 95 rewrites = dict([(x.url_from, x) for x in request.env['website.rewrite'].sudo().search(domain)]) 96 cls._rewrite_len[website_id] = len(rewrites) 97 98 for url, endpoint, routing in super(Http, cls)._generate_routing_rules(modules, converters): 99 routing = dict(routing) 100 if url in rewrites: 101 rewrite = rewrites[url] 102 url_to = rewrite.url_to 103 if rewrite.redirect_type == '308': 104 logger.debug('Add rule %s for %s' % (url_to, website_id)) 105 yield url_to, endpoint, routing # yield new url 106 107 if url != url_to: 108 logger.debug('Redirect from %s to %s for website %s' % (url, url_to, website_id)) 109 _slug_matching = partial(cls._slug_matching, endpoint=endpoint) 110 routing['redirect_to'] = _slug_matching 111 yield url, endpoint, routing # yield original redirected to new url 112 elif rewrite.redirect_type == '404': 113 logger.debug('Return 404 for %s for website %s' % (url, website_id)) 114 continue 115 else: 116 yield url, endpoint, routing 117 118 @classmethod 119 def _get_converters(cls): 120 """ Get the converters list for custom url pattern werkzeug need to 121 match Rule. This override adds the website ones. 122 """ 123 return dict( 124 super(Http, cls)._get_converters(), 125 model=ModelConverter, 126 ) 127 128 @classmethod 129 def _auth_method_public(cls): 130 """ If no user logged, set the public user of current website, or default 131 public user as request uid. 132 After this method `request.env` can be called, since the `request.uid` is 133 set. The `env` lazy property of `request` will be correct. 134 """ 135 if not request.session.uid: 136 env = api.Environment(request.cr, SUPERUSER_ID, request.context) 137 website = env['website'].get_current_website() 138 request.uid = website and website._get_cached('user_id') 139 140 if not request.uid: 141 super(Http, cls)._auth_method_public() 142 143 @classmethod 144 def _register_website_track(cls, response): 145 if getattr(response, 'status_code', 0) != 200: 146 return False 147 148 template = False 149 if hasattr(response, 'qcontext'): # classic response 150 main_object = response.qcontext.get('main_object') 151 website_page = getattr(main_object, '_name', False) == 'website.page' and main_object 152 template = response.qcontext.get('response_template') 153 elif hasattr(response, '_cached_page'): 154 website_page, template = response._cached_page, response._cached_template 155 156 view = template and request.env['website'].get_template(template) 157 if view and view.track: 158 request.env['website.visitor']._handle_webpage_dispatch(response, website_page) 159 160 return False 161 162 @classmethod 163 def _dispatch(cls): 164 """ 165 In case of rerouting for translate (e.g. when visiting odoo.com/fr_BE/), 166 _dispatch calls reroute() that returns _dispatch with altered request properties. 167 The second _dispatch will continue until end of process. When second _dispatch is finished, the first _dispatch 168 call receive the new altered request and continue. 169 At the end, 2 calls of _dispatch (and this override) are made with exact same request properties, instead of one. 170 As the response has not been sent back to the client, the visitor cookie does not exist yet when second _dispatch call 171 is treated in _handle_webpage_dispatch, leading to create 2 visitors with exact same properties. 172 To avoid this, we check if, !!! before calling super !!!, we are in a rerouting request. If not, it means that we are 173 handling the original request, in which we should create the visitor. We ignore every other rerouting requests. 174 """ 175 is_rerouting = hasattr(request, 'routing_iteration') 176 177 if request.session.db: 178 reg = registry(request.session.db) 179 with reg.cursor() as cr: 180 env = api.Environment(cr, SUPERUSER_ID, {}) 181 request.website_routing = env['website'].get_current_website().id 182 183 response = super(Http, cls)._dispatch() 184 185 if not is_rerouting: 186 cls._register_website_track(response) 187 return response 188 189 @classmethod 190 def _add_dispatch_parameters(cls, func): 191 192 # DEPRECATED for /website/force/<website_id> - remove me in master~saas-14.4 193 # Force website with query string paramater, typically set from website selector in frontend navbar and inside tests 194 force_website_id = request.httprequest.args.get('fw') 195 if (force_website_id and request.session.get('force_website_id') != force_website_id 196 and request.env.user.has_group('website.group_multi_website') 197 and request.env.user.has_group('website.group_website_publisher')): 198 request.env['website']._force_website(request.httprequest.args.get('fw')) 199 200 context = {} 201 if not request.context.get('tz'): 202 context['tz'] = request.session.get('geoip', {}).get('time_zone') 203 try: 204 pytz.timezone(context['tz'] or '') 205 except pytz.UnknownTimeZoneError: 206 context.pop('tz') 207 208 request.website = request.env['website'].get_current_website() # can use `request.env` since auth methods are called 209 context['website_id'] = request.website.id 210 # This is mainly to avoid access errors in website controllers where there is no 211 # context (eg: /shop), and it's not going to propagate to the global context of the tab 212 # If the company of the website is not in the allowed companies of the user, set the main 213 # company of the user. 214 website_company_id = request.website._get_cached('company_id') 215 if website_company_id in request.env.user.company_ids.ids: 216 context['allowed_company_ids'] = [website_company_id] 217 else: 218 context['allowed_company_ids'] = request.env.user.company_id.ids 219 220 # modify bound context 221 request.context = dict(request.context, **context) 222 223 super(Http, cls)._add_dispatch_parameters(func) 224 225 if request.routing_iteration == 1: 226 request.website = request.website.with_context(request.context) 227 228 @classmethod 229 def _get_frontend_langs(cls): 230 if get_request_website(): 231 return [code for code, *_ in request.env['res.lang'].get_available()] 232 else: 233 return super()._get_frontend_langs() 234 235 @classmethod 236 def _get_default_lang(cls): 237 if getattr(request, 'website', False): 238 return request.env['res.lang'].browse(request.website._get_cached('default_lang_id')) 239 return super(Http, cls)._get_default_lang() 240 241 @classmethod 242 def _get_translation_frontend_modules_name(cls): 243 mods = super(Http, cls)._get_translation_frontend_modules_name() 244 installed = request.registry._init_modules | set(odoo.conf.server_wide_modules) 245 return mods + [mod for mod in installed if mod.startswith('website')] 246 247 @classmethod 248 def _serve_page(cls): 249 req_page = request.httprequest.path 250 page_domain = [('url', '=', req_page)] + request.website.website_domain() 251 252 published_domain = page_domain 253 # specific page first 254 page = request.env['website.page'].sudo().search(published_domain, order='website_id asc', limit=1) 255 256 # redirect withtout trailing / 257 if not page and req_page != "/" and req_page.endswith("/"): 258 return request.redirect(req_page[:-1]) 259 260 if page: 261 # prefetch all menus (it will prefetch website.page too) 262 request.website.menu_id 263 264 if page and (request.website.is_publisher() or page.is_visible): 265 need_to_cache = False 266 cache_key = page._get_cache_key(request) 267 if ( 268 page.cache_time # cache > 0 269 and request.httprequest.method == "GET" 270 and request.env.user._is_public() # only cache for unlogged user 271 and 'nocache' not in request.params # allow bypass cache / debug 272 and not request.session.debug 273 and len(cache_key) and cache_key[-1] is not None # nocache via expr 274 ): 275 need_to_cache = True 276 try: 277 r = page._get_cache_response(cache_key) 278 if r['time'] + page.cache_time > time.time(): 279 response = werkzeug.Response(r['content'], mimetype=r['contenttype']) 280 response._cached_template = r['template'] 281 response._cached_page = page 282 return response 283 except KeyError: 284 pass 285 286 _, ext = os.path.splitext(req_page) 287 response = request.render(page.view_id.id, { 288 'deletable': True, 289 'main_object': page, 290 }, mimetype=_guess_mimetype(ext)) 291 292 if need_to_cache and response.status_code == 200: 293 r = response.render() 294 page._set_cache_response(cache_key, { 295 'content': r, 296 'contenttype': response.headers['Content-Type'], 297 'time': time.time(), 298 'template': getattr(response, 'qcontext', {}).get('response_template') 299 }) 300 return response 301 return False 302 303 @classmethod 304 def _serve_redirect(cls): 305 req_page = request.httprequest.path 306 domain = [ 307 ('redirect_type', 'in', ('301', '302')), 308 # trailing / could have been removed by server_page 309 '|', ('url_from', '=', req_page.rstrip('/')), ('url_from', '=', req_page + '/') 310 ] 311 domain += request.website.website_domain() 312 return request.env['website.rewrite'].sudo().search(domain, limit=1) 313 314 @classmethod 315 def _serve_fallback(cls, exception): 316 # serve attachment before 317 parent = super(Http, cls)._serve_fallback(exception) 318 if parent: # attachment 319 return parent 320 if not request.is_frontend: 321 return False 322 website_page = cls._serve_page() 323 if website_page: 324 return website_page 325 326 redirect = cls._serve_redirect() 327 if redirect: 328 return request.redirect(_build_url_w_params(redirect.url_to, request.params), code=redirect.redirect_type) 329 330 return False 331 332 @classmethod 333 def _get_exception_code_values(cls, exception): 334 code, values = super(Http, cls)._get_exception_code_values(exception) 335 if isinstance(exception, werkzeug.exceptions.NotFound) and request.website.is_publisher(): 336 code = 'page_404' 337 values['path'] = request.httprequest.path[1:] 338 if isinstance(exception, werkzeug.exceptions.Forbidden) and \ 339 exception.description == "website_visibility_password_required": 340 code = 'protected_403' 341 values['path'] = request.httprequest.path 342 return (code, values) 343 344 @classmethod 345 def _get_values_500_error(cls, env, values, exception): 346 View = env["ir.ui.view"] 347 values = super(Http, cls)._get_values_500_error(env, values, exception) 348 if 'qweb_exception' in values: 349 try: 350 # exception.name might be int, string 351 exception_template = int(exception.name) 352 except ValueError: 353 exception_template = exception.name 354 view = View._view_obj(exception_template) 355 if exception.html and exception.html in view.arch: 356 values['view'] = view 357 else: 358 # There might be 2 cases where the exception code can't be found 359 # in the view, either the error is in a child view or the code 360 # contains branding (<div t-att-data="request.browse('ok')"/>). 361 et = etree.fromstring(view.with_context(inherit_branding=False).read_combined(['arch'])['arch']) 362 node = et.xpath(exception.path) 363 line = node is not None and etree.tostring(node[0], encoding='unicode') 364 if line: 365 values['view'] = View._views_get(exception_template).filtered( 366 lambda v: line in v.arch 367 ) 368 values['view'] = values['view'] and values['view'][0] 369 # Needed to show reset template on translated pages (`_prepare_qcontext` will set it for main lang) 370 values['editable'] = request.uid and request.website.is_publisher() 371 return values 372 373 @classmethod 374 def _get_error_html(cls, env, code, values): 375 if code in ('page_404', 'protected_403'): 376 return code.split('_')[1], env['ir.ui.view']._render_template('website.%s' % code, values) 377 return super(Http, cls)._get_error_html(env, code, values) 378 379 def binary_content(self, xmlid=None, model='ir.attachment', id=None, field='datas', 380 unique=False, filename=None, filename_field='name', download=False, 381 mimetype=None, default_mimetype='application/octet-stream', 382 access_token=None): 383 obj = None 384 if xmlid: 385 obj = self._xmlid_to_obj(self.env, xmlid) 386 elif id and model in self.env: 387 obj = self.env[model].browse(int(id)) 388 if obj and 'website_published' in obj._fields: 389 if self.env[obj._name].sudo().search([('id', '=', obj.id), ('website_published', '=', True)]): 390 self = self.sudo() 391 return super(Http, self).binary_content( 392 xmlid=xmlid, model=model, id=id, field=field, unique=unique, filename=filename, 393 filename_field=filename_field, download=download, mimetype=mimetype, 394 default_mimetype=default_mimetype, access_token=access_token) 395 396 @classmethod 397 def _xmlid_to_obj(cls, env, xmlid): 398 website_id = env['website'].get_current_website() 399 if website_id and website_id.theme_id: 400 domain = [('key', '=', xmlid), ('website_id', '=', website_id.id)] 401 Attachment = env['ir.attachment'] 402 if request.env.user.share: 403 domain.append(('public', '=', True)) 404 Attachment = Attachment.sudo() 405 obj = Attachment.search(domain) 406 if obj: 407 return obj[0] 408 409 return super(Http, cls)._xmlid_to_obj(env, xmlid) 410 411 @api.model 412 def get_frontend_session_info(self): 413 session_info = super(Http, self).get_frontend_session_info() 414 session_info.update({ 415 'is_website_user': request.env.user.id == request.website.user_id.id, 416 }) 417 if request.env.user.has_group('website.group_website_publisher'): 418 session_info.update({ 419 'website_id': request.website.id, 420 'website_company_id': request.website._get_cached('company_id'), 421 }) 422 return session_info 423 424 425class ModelConverter(ModelConverter): 426 427 def to_url(self, value): 428 if value.env.context.get('slug_matching'): 429 return value.env.context.get('_converter_value', str(value.id)) 430 return super().to_url(value) 431 432 def generate(self, uid, dom=None, args=None): 433 Model = request.env[self.model].with_user(uid) 434 # Allow to current_website_id directly in route domain 435 args.update(current_website_id=request.env['website'].get_current_website().id) 436 domain = safe_eval(self.domain, (args or {}).copy()) 437 if dom: 438 domain += dom 439 for record in Model.search(domain): 440 # return record so URL will be the real endpoint URL as the record will go through `slug()` 441 # the same way as endpoint URL is retrieved during dispatch (301 redirect), see `to_url()` from ModelConverter 442 yield record 443