1# Copyright 2021 The Matrix.org Foundation C.I.C. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15from http import HTTPStatus 16from typing import TYPE_CHECKING, Tuple 17 18from synapse.api.errors import SynapseError 19from synapse.http.servlet import ( 20 RestServlet, 21 assert_params_in_dict, 22 parse_json_object_from_request, 23) 24from synapse.http.site import SynapseRequest 25from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin 26from synapse.types import JsonDict 27 28if TYPE_CHECKING: 29 from synapse.server import HomeServer 30 31logger = logging.getLogger(__name__) 32 33 34class BackgroundUpdateEnabledRestServlet(RestServlet): 35 """Allows temporarily disabling background updates""" 36 37 PATTERNS = admin_patterns("/background_updates/enabled$") 38 39 def __init__(self, hs: "HomeServer"): 40 self._auth = hs.get_auth() 41 self._data_stores = hs.get_datastores() 42 43 async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: 44 await assert_requester_is_admin(self._auth, request) 45 46 # We need to check that all configured databases have updates enabled. 47 # (They *should* all be in sync.) 48 enabled = all(db.updates.enabled for db in self._data_stores.databases) 49 50 return HTTPStatus.OK, {"enabled": enabled} 51 52 async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: 53 await assert_requester_is_admin(self._auth, request) 54 55 body = parse_json_object_from_request(request) 56 57 enabled = body.get("enabled", True) 58 59 if not isinstance(enabled, bool): 60 raise SynapseError( 61 HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean" 62 ) 63 64 for db in self._data_stores.databases: 65 db.updates.enabled = enabled 66 67 # If we're re-enabling them ensure that we start the background 68 # process again. 69 if enabled: 70 db.updates.start_doing_background_updates() 71 72 return HTTPStatus.OK, {"enabled": enabled} 73 74 75class BackgroundUpdateRestServlet(RestServlet): 76 """Fetch information about background updates""" 77 78 PATTERNS = admin_patterns("/background_updates/status$") 79 80 def __init__(self, hs: "HomeServer"): 81 self._auth = hs.get_auth() 82 self._data_stores = hs.get_datastores() 83 84 async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: 85 await assert_requester_is_admin(self._auth, request) 86 87 # We need to check that all configured databases have updates enabled. 88 # (They *should* all be in sync.) 89 enabled = all(db.updates.enabled for db in self._data_stores.databases) 90 91 current_updates = {} 92 93 for db in self._data_stores.databases: 94 update = db.updates.get_current_update() 95 if not update: 96 continue 97 98 current_updates[db.name()] = { 99 "name": update.name, 100 "total_item_count": update.total_item_count, 101 "total_duration_ms": update.total_duration_ms, 102 "average_items_per_ms": update.average_items_per_ms(), 103 } 104 105 return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates} 106 107 108class BackgroundUpdateStartJobRestServlet(RestServlet): 109 """Allows to start specific background updates""" 110 111 PATTERNS = admin_patterns("/background_updates/start_job$") 112 113 def __init__(self, hs: "HomeServer"): 114 self._auth = hs.get_auth() 115 self._store = hs.get_datastore() 116 117 async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: 118 await assert_requester_is_admin(self._auth, request) 119 120 body = parse_json_object_from_request(request) 121 assert_params_in_dict(body, ["job_name"]) 122 123 job_name = body["job_name"] 124 125 if job_name == "populate_stats_process_rooms": 126 jobs = [ 127 { 128 "update_name": "populate_stats_process_rooms", 129 "progress_json": "{}", 130 }, 131 ] 132 elif job_name == "regenerate_directory": 133 jobs = [ 134 { 135 "update_name": "populate_user_directory_createtables", 136 "progress_json": "{}", 137 "depends_on": "", 138 }, 139 { 140 "update_name": "populate_user_directory_process_rooms", 141 "progress_json": "{}", 142 "depends_on": "populate_user_directory_createtables", 143 }, 144 { 145 "update_name": "populate_user_directory_process_users", 146 "progress_json": "{}", 147 "depends_on": "populate_user_directory_process_rooms", 148 }, 149 { 150 "update_name": "populate_user_directory_cleanup", 151 "progress_json": "{}", 152 "depends_on": "populate_user_directory_process_users", 153 }, 154 ] 155 else: 156 raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") 157 158 try: 159 await self._store.db_pool.simple_insert_many( 160 table="background_updates", 161 values=jobs, 162 desc=f"admin_api_run_{job_name}", 163 ) 164 except self._store.db_pool.engine.module.IntegrityError: 165 raise SynapseError( 166 HTTPStatus.BAD_REQUEST, 167 "Job %s is already in queue of background updates." % (job_name,), 168 ) 169 170 self._store.db_pool.updates.start_doing_background_updates() 171 172 return HTTPStatus.OK, {} 173