1# Copyright 2020 Dirk Klimpel
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import urllib.parse
15from http import HTTPStatus
16
17from parameterized import parameterized
18
19from twisted.test.proto_helpers import MemoryReactor
20
21import synapse.rest.admin
22from synapse.api.errors import Codes
23from synapse.rest.client import login
24from synapse.server import HomeServer
25from synapse.util import Clock
26
27from tests import unittest
28
29
30class DeviceRestTestCase(unittest.HomeserverTestCase):
31
32    servlets = [
33        synapse.rest.admin.register_servlets,
34        login.register_servlets,
35    ]
36
37    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
38        self.handler = hs.get_device_handler()
39
40        self.admin_user = self.register_user("admin", "pass", admin=True)
41        self.admin_user_tok = self.login("admin", "pass")
42
43        self.other_user = self.register_user("user", "pass")
44        self.other_user_token = self.login("user", "pass")
45        res = self.get_success(self.handler.get_devices_by_user(self.other_user))
46        self.other_user_device_id = res[0]["device_id"]
47
48        self.url = "/_synapse/admin/v2/users/%s/devices/%s" % (
49            urllib.parse.quote(self.other_user),
50            self.other_user_device_id,
51        )
52
53    @parameterized.expand(["GET", "PUT", "DELETE"])
54    def test_no_auth(self, method: str) -> None:
55        """
56        Try to get a device of an user without authentication.
57        """
58        channel = self.make_request(method, self.url, b"{}")
59
60        self.assertEqual(
61            HTTPStatus.UNAUTHORIZED,
62            channel.code,
63            msg=channel.json_body,
64        )
65        self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
66
67    @parameterized.expand(["GET", "PUT", "DELETE"])
68    def test_requester_is_no_admin(self, method: str) -> None:
69        """
70        If the user is not a server admin, an error is returned.
71        """
72        channel = self.make_request(
73            method,
74            self.url,
75            access_token=self.other_user_token,
76        )
77
78        self.assertEqual(
79            HTTPStatus.FORBIDDEN,
80            channel.code,
81            msg=channel.json_body,
82        )
83        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
84
85    @parameterized.expand(["GET", "PUT", "DELETE"])
86    def test_user_does_not_exist(self, method: str) -> None:
87        """
88        Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
89        """
90        url = (
91            "/_synapse/admin/v2/users/@unknown_person:test/devices/%s"
92            % self.other_user_device_id
93        )
94
95        channel = self.make_request(
96            method,
97            url,
98            access_token=self.admin_user_tok,
99        )
100
101        self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
102        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
103
104    @parameterized.expand(["GET", "PUT", "DELETE"])
105    def test_user_is_not_local(self, method: str) -> None:
106        """
107        Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
108        """
109        url = (
110            "/_synapse/admin/v2/users/@unknown_person:unknown_domain/devices/%s"
111            % self.other_user_device_id
112        )
113
114        channel = self.make_request(
115            method,
116            url,
117            access_token=self.admin_user_tok,
118        )
119
120        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
121        self.assertEqual("Can only lookup local users", channel.json_body["error"])
122
123    def test_unknown_device(self) -> None:
124        """
125        Tests that a lookup for a device that does not exist returns either HTTPStatus.NOT_FOUND or HTTPStatus.OK.
126        """
127        url = "/_synapse/admin/v2/users/%s/devices/unknown_device" % urllib.parse.quote(
128            self.other_user
129        )
130
131        channel = self.make_request(
132            "GET",
133            url,
134            access_token=self.admin_user_tok,
135        )
136
137        self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
138        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
139
140        channel = self.make_request(
141            "PUT",
142            url,
143            access_token=self.admin_user_tok,
144        )
145
146        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
147
148        channel = self.make_request(
149            "DELETE",
150            url,
151            access_token=self.admin_user_tok,
152        )
153
154        # Delete unknown device returns status HTTPStatus.OK
155        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
156
157    def test_update_device_too_long_display_name(self) -> None:
158        """
159        Update a device with a display name that is invalid (too long).
160        """
161        # Set iniital display name.
162        update = {"display_name": "new display"}
163        self.get_success(
164            self.handler.update_device(
165                self.other_user, self.other_user_device_id, update
166            )
167        )
168
169        # Request to update a device display name with a new value that is longer than allowed.
170        update = {
171            "display_name": "a"
172            * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1)
173        }
174
175        channel = self.make_request(
176            "PUT",
177            self.url,
178            access_token=self.admin_user_tok,
179            content=update,
180        )
181
182        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
183        self.assertEqual(Codes.TOO_LARGE, channel.json_body["errcode"])
184
185        # Ensure the display name was not updated.
186        channel = self.make_request(
187            "GET",
188            self.url,
189            access_token=self.admin_user_tok,
190        )
191
192        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
193        self.assertEqual("new display", channel.json_body["display_name"])
194
195    def test_update_no_display_name(self) -> None:
196        """
197        Tests that a update for a device without JSON returns a HTTPStatus.OK
198        """
199        # Set iniital display name.
200        update = {"display_name": "new display"}
201        self.get_success(
202            self.handler.update_device(
203                self.other_user, self.other_user_device_id, update
204            )
205        )
206
207        channel = self.make_request(
208            "PUT",
209            self.url,
210            access_token=self.admin_user_tok,
211        )
212
213        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
214
215        # Ensure the display name was not updated.
216        channel = self.make_request(
217            "GET",
218            self.url,
219            access_token=self.admin_user_tok,
220        )
221
222        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
223        self.assertEqual("new display", channel.json_body["display_name"])
224
225    def test_update_display_name(self) -> None:
226        """
227        Tests a normal successful update of display name
228        """
229        # Set new display_name
230        channel = self.make_request(
231            "PUT",
232            self.url,
233            access_token=self.admin_user_tok,
234            content={"display_name": "new displayname"},
235        )
236
237        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
238
239        # Check new display_name
240        channel = self.make_request(
241            "GET",
242            self.url,
243            access_token=self.admin_user_tok,
244        )
245
246        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
247        self.assertEqual("new displayname", channel.json_body["display_name"])
248
249    def test_get_device(self) -> None:
250        """
251        Tests that a normal lookup for a device is successfully
252        """
253        channel = self.make_request(
254            "GET",
255            self.url,
256            access_token=self.admin_user_tok,
257        )
258
259        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
260        self.assertEqual(self.other_user, channel.json_body["user_id"])
261        # Check that all fields are available
262        self.assertIn("user_id", channel.json_body)
263        self.assertIn("device_id", channel.json_body)
264        self.assertIn("display_name", channel.json_body)
265        self.assertIn("last_seen_ip", channel.json_body)
266        self.assertIn("last_seen_ts", channel.json_body)
267
268    def test_delete_device(self) -> None:
269        """
270        Tests that a remove of a device is successfully
271        """
272        # Count number of devies of an user.
273        res = self.get_success(self.handler.get_devices_by_user(self.other_user))
274        number_devices = len(res)
275        self.assertEqual(1, number_devices)
276
277        # Delete device
278        channel = self.make_request(
279            "DELETE",
280            self.url,
281            access_token=self.admin_user_tok,
282        )
283
284        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
285
286        # Ensure that the number of devices is decreased
287        res = self.get_success(self.handler.get_devices_by_user(self.other_user))
288        self.assertEqual(number_devices - 1, len(res))
289
290
291class DevicesRestTestCase(unittest.HomeserverTestCase):
292
293    servlets = [
294        synapse.rest.admin.register_servlets,
295        login.register_servlets,
296    ]
297
298    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
299        self.admin_user = self.register_user("admin", "pass", admin=True)
300        self.admin_user_tok = self.login("admin", "pass")
301
302        self.other_user = self.register_user("user", "pass")
303
304        self.url = "/_synapse/admin/v2/users/%s/devices" % urllib.parse.quote(
305            self.other_user
306        )
307
308    def test_no_auth(self) -> None:
309        """
310        Try to list devices of an user without authentication.
311        """
312        channel = self.make_request("GET", self.url, b"{}")
313
314        self.assertEqual(
315            HTTPStatus.UNAUTHORIZED,
316            channel.code,
317            msg=channel.json_body,
318        )
319        self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
320
321    def test_requester_is_no_admin(self) -> None:
322        """
323        If the user is not a server admin, an error is returned.
324        """
325        other_user_token = self.login("user", "pass")
326
327        channel = self.make_request(
328            "GET",
329            self.url,
330            access_token=other_user_token,
331        )
332
333        self.assertEqual(
334            HTTPStatus.FORBIDDEN,
335            channel.code,
336            msg=channel.json_body,
337        )
338        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
339
340    def test_user_does_not_exist(self) -> None:
341        """
342        Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
343        """
344        url = "/_synapse/admin/v2/users/@unknown_person:test/devices"
345        channel = self.make_request(
346            "GET",
347            url,
348            access_token=self.admin_user_tok,
349        )
350
351        self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
352        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
353
354    def test_user_is_not_local(self) -> None:
355        """
356        Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
357        """
358        url = "/_synapse/admin/v2/users/@unknown_person:unknown_domain/devices"
359
360        channel = self.make_request(
361            "GET",
362            url,
363            access_token=self.admin_user_tok,
364        )
365
366        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
367        self.assertEqual("Can only lookup local users", channel.json_body["error"])
368
369    def test_user_has_no_devices(self) -> None:
370        """
371        Tests that a normal lookup for devices is successfully
372        if user has no devices
373        """
374
375        # Get devices
376        channel = self.make_request(
377            "GET",
378            self.url,
379            access_token=self.admin_user_tok,
380        )
381
382        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
383        self.assertEqual(0, channel.json_body["total"])
384        self.assertEqual(0, len(channel.json_body["devices"]))
385
386    def test_get_devices(self) -> None:
387        """
388        Tests that a normal lookup for devices is successfully
389        """
390        # Create devices
391        number_devices = 5
392        for _ in range(number_devices):
393            self.login("user", "pass")
394
395        # Get devices
396        channel = self.make_request(
397            "GET",
398            self.url,
399            access_token=self.admin_user_tok,
400        )
401
402        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
403        self.assertEqual(number_devices, channel.json_body["total"])
404        self.assertEqual(number_devices, len(channel.json_body["devices"]))
405        self.assertEqual(self.other_user, channel.json_body["devices"][0]["user_id"])
406        # Check that all fields are available
407        for d in channel.json_body["devices"]:
408            self.assertIn("user_id", d)
409            self.assertIn("device_id", d)
410            self.assertIn("display_name", d)
411            self.assertIn("last_seen_ip", d)
412            self.assertIn("last_seen_ts", d)
413
414
415class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
416
417    servlets = [
418        synapse.rest.admin.register_servlets,
419        login.register_servlets,
420    ]
421
422    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
423        self.handler = hs.get_device_handler()
424
425        self.admin_user = self.register_user("admin", "pass", admin=True)
426        self.admin_user_tok = self.login("admin", "pass")
427
428        self.other_user = self.register_user("user", "pass")
429
430        self.url = "/_synapse/admin/v2/users/%s/delete_devices" % urllib.parse.quote(
431            self.other_user
432        )
433
434    def test_no_auth(self) -> None:
435        """
436        Try to delete devices of an user without authentication.
437        """
438        channel = self.make_request("POST", self.url, b"{}")
439
440        self.assertEqual(
441            HTTPStatus.UNAUTHORIZED,
442            channel.code,
443            msg=channel.json_body,
444        )
445        self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
446
447    def test_requester_is_no_admin(self) -> None:
448        """
449        If the user is not a server admin, an error is returned.
450        """
451        other_user_token = self.login("user", "pass")
452
453        channel = self.make_request(
454            "POST",
455            self.url,
456            access_token=other_user_token,
457        )
458
459        self.assertEqual(
460            HTTPStatus.FORBIDDEN,
461            channel.code,
462            msg=channel.json_body,
463        )
464        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
465
466    def test_user_does_not_exist(self) -> None:
467        """
468        Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
469        """
470        url = "/_synapse/admin/v2/users/@unknown_person:test/delete_devices"
471        channel = self.make_request(
472            "POST",
473            url,
474            access_token=self.admin_user_tok,
475        )
476
477        self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
478        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
479
480    def test_user_is_not_local(self) -> None:
481        """
482        Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
483        """
484        url = "/_synapse/admin/v2/users/@unknown_person:unknown_domain/delete_devices"
485
486        channel = self.make_request(
487            "POST",
488            url,
489            access_token=self.admin_user_tok,
490        )
491
492        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
493        self.assertEqual("Can only lookup local users", channel.json_body["error"])
494
495    def test_unknown_devices(self) -> None:
496        """
497        Tests that a remove of a device that does not exist returns HTTPStatus.OK.
498        """
499        channel = self.make_request(
500            "POST",
501            self.url,
502            access_token=self.admin_user_tok,
503            content={"devices": ["unknown_device1", "unknown_device2"]},
504        )
505
506        # Delete unknown devices returns status HTTPStatus.OK
507        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
508
509    def test_delete_devices(self) -> None:
510        """
511        Tests that a remove of devices is successfully
512        """
513
514        # Create devices
515        number_devices = 5
516        for _ in range(number_devices):
517            self.login("user", "pass")
518
519        # Get devices
520        res = self.get_success(self.handler.get_devices_by_user(self.other_user))
521        self.assertEqual(number_devices, len(res))
522
523        # Create list of device IDs
524        device_ids = []
525        for d in res:
526            device_ids.append(str(d["device_id"]))
527
528        # Delete devices
529        channel = self.make_request(
530            "POST",
531            self.url,
532            access_token=self.admin_user_tok,
533            content={"devices": device_ids},
534        )
535
536        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
537
538        res = self.get_success(self.handler.get_devices_by_user(self.other_user))
539        self.assertEqual(0, len(res))
540