1# Copyright 2021 The Matrix.org Foundation C.I.C.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15from http import HTTPStatus
16from typing import TYPE_CHECKING, Tuple
17
18from synapse.api.errors import SynapseError
19from synapse.http.servlet import (
20    RestServlet,
21    assert_params_in_dict,
22    parse_json_object_from_request,
23)
24from synapse.http.site import SynapseRequest
25from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
26from synapse.types import JsonDict
27
28if TYPE_CHECKING:
29    from synapse.server import HomeServer
30
31logger = logging.getLogger(__name__)
32
33
34class BackgroundUpdateEnabledRestServlet(RestServlet):
35    """Allows temporarily disabling background updates"""
36
37    PATTERNS = admin_patterns("/background_updates/enabled$")
38
39    def __init__(self, hs: "HomeServer"):
40        self._auth = hs.get_auth()
41        self._data_stores = hs.get_datastores()
42
43    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
44        await assert_requester_is_admin(self._auth, request)
45
46        # We need to check that all configured databases have updates enabled.
47        # (They *should* all be in sync.)
48        enabled = all(db.updates.enabled for db in self._data_stores.databases)
49
50        return HTTPStatus.OK, {"enabled": enabled}
51
52    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
53        await assert_requester_is_admin(self._auth, request)
54
55        body = parse_json_object_from_request(request)
56
57        enabled = body.get("enabled", True)
58
59        if not isinstance(enabled, bool):
60            raise SynapseError(
61                HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean"
62            )
63
64        for db in self._data_stores.databases:
65            db.updates.enabled = enabled
66
67            # If we're re-enabling them ensure that we start the background
68            # process again.
69            if enabled:
70                db.updates.start_doing_background_updates()
71
72        return HTTPStatus.OK, {"enabled": enabled}
73
74
75class BackgroundUpdateRestServlet(RestServlet):
76    """Fetch information about background updates"""
77
78    PATTERNS = admin_patterns("/background_updates/status$")
79
80    def __init__(self, hs: "HomeServer"):
81        self._auth = hs.get_auth()
82        self._data_stores = hs.get_datastores()
83
84    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
85        await assert_requester_is_admin(self._auth, request)
86
87        # We need to check that all configured databases have updates enabled.
88        # (They *should* all be in sync.)
89        enabled = all(db.updates.enabled for db in self._data_stores.databases)
90
91        current_updates = {}
92
93        for db in self._data_stores.databases:
94            update = db.updates.get_current_update()
95            if not update:
96                continue
97
98            current_updates[db.name()] = {
99                "name": update.name,
100                "total_item_count": update.total_item_count,
101                "total_duration_ms": update.total_duration_ms,
102                "average_items_per_ms": update.average_items_per_ms(),
103            }
104
105        return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates}
106
107
108class BackgroundUpdateStartJobRestServlet(RestServlet):
109    """Allows to start specific background updates"""
110
111    PATTERNS = admin_patterns("/background_updates/start_job$")
112
113    def __init__(self, hs: "HomeServer"):
114        self._auth = hs.get_auth()
115        self._store = hs.get_datastore()
116
117    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
118        await assert_requester_is_admin(self._auth, request)
119
120        body = parse_json_object_from_request(request)
121        assert_params_in_dict(body, ["job_name"])
122
123        job_name = body["job_name"]
124
125        if job_name == "populate_stats_process_rooms":
126            jobs = [
127                {
128                    "update_name": "populate_stats_process_rooms",
129                    "progress_json": "{}",
130                },
131            ]
132        elif job_name == "regenerate_directory":
133            jobs = [
134                {
135                    "update_name": "populate_user_directory_createtables",
136                    "progress_json": "{}",
137                    "depends_on": "",
138                },
139                {
140                    "update_name": "populate_user_directory_process_rooms",
141                    "progress_json": "{}",
142                    "depends_on": "populate_user_directory_createtables",
143                },
144                {
145                    "update_name": "populate_user_directory_process_users",
146                    "progress_json": "{}",
147                    "depends_on": "populate_user_directory_process_rooms",
148                },
149                {
150                    "update_name": "populate_user_directory_cleanup",
151                    "progress_json": "{}",
152                    "depends_on": "populate_user_directory_process_users",
153                },
154            ]
155        else:
156            raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
157
158        try:
159            await self._store.db_pool.simple_insert_many(
160                table="background_updates",
161                values=jobs,
162                desc=f"admin_api_run_{job_name}",
163            )
164        except self._store.db_pool.engine.module.IntegrityError:
165            raise SynapseError(
166                HTTPStatus.BAD_REQUEST,
167                "Job %s is already in queue of background updates." % (job_name,),
168            )
169
170        self._store.db_pool.updates.start_doing_background_updates()
171
172        return HTTPStatus.OK, {}
173