1# Copyright 2020 Dirk Klimpel
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15from http import HTTPStatus
16from typing import TYPE_CHECKING, Tuple
17
18from synapse.api.errors import NotFoundError, SynapseError
19from synapse.http.servlet import (
20    RestServlet,
21    assert_params_in_dict,
22    parse_json_object_from_request,
23)
24from synapse.http.site import SynapseRequest
25from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
26from synapse.types import JsonDict, UserID
27
28if TYPE_CHECKING:
29    from synapse.server import HomeServer
30
31logger = logging.getLogger(__name__)
32
33
34class DeviceRestServlet(RestServlet):
35    """
36    Get, update or delete the given user's device
37    """
38
39    PATTERNS = admin_patterns(
40        "/users/(?P<user_id>[^/]*)/devices/(?P<device_id>[^/]*)$", "v2"
41    )
42
43    def __init__(self, hs: "HomeServer"):
44        super().__init__()
45        self.auth = hs.get_auth()
46        self.device_handler = hs.get_device_handler()
47        self.store = hs.get_datastore()
48        self.is_mine = hs.is_mine
49
50    async def on_GET(
51        self, request: SynapseRequest, user_id: str, device_id: str
52    ) -> Tuple[int, JsonDict]:
53        await assert_requester_is_admin(self.auth, request)
54
55        target_user = UserID.from_string(user_id)
56        if not self.is_mine(target_user):
57            raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
58
59        u = await self.store.get_user_by_id(target_user.to_string())
60        if u is None:
61            raise NotFoundError("Unknown user")
62
63        device = await self.device_handler.get_device(
64            target_user.to_string(), device_id
65        )
66        if device is None:
67            raise NotFoundError("No device found")
68        return HTTPStatus.OK, device
69
70    async def on_DELETE(
71        self, request: SynapseRequest, user_id: str, device_id: str
72    ) -> Tuple[int, JsonDict]:
73        await assert_requester_is_admin(self.auth, request)
74
75        target_user = UserID.from_string(user_id)
76        if not self.is_mine(target_user):
77            raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
78
79        u = await self.store.get_user_by_id(target_user.to_string())
80        if u is None:
81            raise NotFoundError("Unknown user")
82
83        await self.device_handler.delete_device(target_user.to_string(), device_id)
84        return HTTPStatus.OK, {}
85
86    async def on_PUT(
87        self, request: SynapseRequest, user_id: str, device_id: str
88    ) -> Tuple[int, JsonDict]:
89        await assert_requester_is_admin(self.auth, request)
90
91        target_user = UserID.from_string(user_id)
92        if not self.is_mine(target_user):
93            raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
94
95        u = await self.store.get_user_by_id(target_user.to_string())
96        if u is None:
97            raise NotFoundError("Unknown user")
98
99        body = parse_json_object_from_request(request, allow_empty_body=True)
100        await self.device_handler.update_device(
101            target_user.to_string(), device_id, body
102        )
103        return HTTPStatus.OK, {}
104
105
106class DevicesRestServlet(RestServlet):
107    """
108    Retrieve the given user's devices
109    """
110
111    PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
112
113    def __init__(self, hs: "HomeServer"):
114        self.auth = hs.get_auth()
115        self.device_handler = hs.get_device_handler()
116        self.store = hs.get_datastore()
117        self.is_mine = hs.is_mine
118
119    async def on_GET(
120        self, request: SynapseRequest, user_id: str
121    ) -> Tuple[int, JsonDict]:
122        await assert_requester_is_admin(self.auth, request)
123
124        target_user = UserID.from_string(user_id)
125        if not self.is_mine(target_user):
126            raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
127
128        u = await self.store.get_user_by_id(target_user.to_string())
129        if u is None:
130            raise NotFoundError("Unknown user")
131
132        devices = await self.device_handler.get_devices_by_user(target_user.to_string())
133        return HTTPStatus.OK, {"devices": devices, "total": len(devices)}
134
135
136class DeleteDevicesRestServlet(RestServlet):
137    """
138    API for bulk deletion of devices. Accepts a JSON object with a devices
139    key which lists the device_ids to delete.
140    """
141
142    PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/delete_devices$", "v2")
143
144    def __init__(self, hs: "HomeServer"):
145        self.auth = hs.get_auth()
146        self.device_handler = hs.get_device_handler()
147        self.store = hs.get_datastore()
148        self.is_mine = hs.is_mine
149
150    async def on_POST(
151        self, request: SynapseRequest, user_id: str
152    ) -> Tuple[int, JsonDict]:
153        await assert_requester_is_admin(self.auth, request)
154
155        target_user = UserID.from_string(user_id)
156        if not self.is_mine(target_user):
157            raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
158
159        u = await self.store.get_user_by_id(target_user.to_string())
160        if u is None:
161            raise NotFoundError("Unknown user")
162
163        body = parse_json_object_from_request(request, allow_empty_body=False)
164        assert_params_in_dict(body, ["devices"])
165
166        await self.device_handler.delete_devices(
167            target_user.to_string(), body["devices"]
168        )
169        return HTTPStatus.OK, {}
170