1# Copyright 2020 Dirk Klimpel 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import urllib.parse 15from http import HTTPStatus 16 17from parameterized import parameterized 18 19from twisted.test.proto_helpers import MemoryReactor 20 21import synapse.rest.admin 22from synapse.api.errors import Codes 23from synapse.rest.client import login 24from synapse.server import HomeServer 25from synapse.util import Clock 26 27from tests import unittest 28 29 30class DeviceRestTestCase(unittest.HomeserverTestCase): 31 32 servlets = [ 33 synapse.rest.admin.register_servlets, 34 login.register_servlets, 35 ] 36 37 def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: 38 self.handler = hs.get_device_handler() 39 40 self.admin_user = self.register_user("admin", "pass", admin=True) 41 self.admin_user_tok = self.login("admin", "pass") 42 43 self.other_user = self.register_user("user", "pass") 44 self.other_user_token = self.login("user", "pass") 45 res = self.get_success(self.handler.get_devices_by_user(self.other_user)) 46 self.other_user_device_id = res[0]["device_id"] 47 48 self.url = "/_synapse/admin/v2/users/%s/devices/%s" % ( 49 urllib.parse.quote(self.other_user), 50 self.other_user_device_id, 51 ) 52 53 @parameterized.expand(["GET", "PUT", "DELETE"]) 54 def test_no_auth(self, method: str) -> None: 55 """ 56 Try to get a device of an user without authentication. 57 """ 58 channel = self.make_request(method, self.url, b"{}") 59 60 self.assertEqual( 61 HTTPStatus.UNAUTHORIZED, 62 channel.code, 63 msg=channel.json_body, 64 ) 65 self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) 66 67 @parameterized.expand(["GET", "PUT", "DELETE"]) 68 def test_requester_is_no_admin(self, method: str) -> None: 69 """ 70 If the user is not a server admin, an error is returned. 71 """ 72 channel = self.make_request( 73 method, 74 self.url, 75 access_token=self.other_user_token, 76 ) 77 78 self.assertEqual( 79 HTTPStatus.FORBIDDEN, 80 channel.code, 81 msg=channel.json_body, 82 ) 83 self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) 84 85 @parameterized.expand(["GET", "PUT", "DELETE"]) 86 def test_user_does_not_exist(self, method: str) -> None: 87 """ 88 Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND 89 """ 90 url = ( 91 "/_synapse/admin/v2/users/@unknown_person:test/devices/%s" 92 % self.other_user_device_id 93 ) 94 95 channel = self.make_request( 96 method, 97 url, 98 access_token=self.admin_user_tok, 99 ) 100 101 self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) 102 self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) 103 104 @parameterized.expand(["GET", "PUT", "DELETE"]) 105 def test_user_is_not_local(self, method: str) -> None: 106 """ 107 Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST 108 """ 109 url = ( 110 "/_synapse/admin/v2/users/@unknown_person:unknown_domain/devices/%s" 111 % self.other_user_device_id 112 ) 113 114 channel = self.make_request( 115 method, 116 url, 117 access_token=self.admin_user_tok, 118 ) 119 120 self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) 121 self.assertEqual("Can only lookup local users", channel.json_body["error"]) 122 123 def test_unknown_device(self) -> None: 124 """ 125 Tests that a lookup for a device that does not exist returns either HTTPStatus.NOT_FOUND or HTTPStatus.OK. 126 """ 127 url = "/_synapse/admin/v2/users/%s/devices/unknown_device" % urllib.parse.quote( 128 self.other_user 129 ) 130 131 channel = self.make_request( 132 "GET", 133 url, 134 access_token=self.admin_user_tok, 135 ) 136 137 self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) 138 self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) 139 140 channel = self.make_request( 141 "PUT", 142 url, 143 access_token=self.admin_user_tok, 144 ) 145 146 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 147 148 channel = self.make_request( 149 "DELETE", 150 url, 151 access_token=self.admin_user_tok, 152 ) 153 154 # Delete unknown device returns status HTTPStatus.OK 155 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 156 157 def test_update_device_too_long_display_name(self) -> None: 158 """ 159 Update a device with a display name that is invalid (too long). 160 """ 161 # Set iniital display name. 162 update = {"display_name": "new display"} 163 self.get_success( 164 self.handler.update_device( 165 self.other_user, self.other_user_device_id, update 166 ) 167 ) 168 169 # Request to update a device display name with a new value that is longer than allowed. 170 update = { 171 "display_name": "a" 172 * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1) 173 } 174 175 channel = self.make_request( 176 "PUT", 177 self.url, 178 access_token=self.admin_user_tok, 179 content=update, 180 ) 181 182 self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) 183 self.assertEqual(Codes.TOO_LARGE, channel.json_body["errcode"]) 184 185 # Ensure the display name was not updated. 186 channel = self.make_request( 187 "GET", 188 self.url, 189 access_token=self.admin_user_tok, 190 ) 191 192 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 193 self.assertEqual("new display", channel.json_body["display_name"]) 194 195 def test_update_no_display_name(self) -> None: 196 """ 197 Tests that a update for a device without JSON returns a HTTPStatus.OK 198 """ 199 # Set iniital display name. 200 update = {"display_name": "new display"} 201 self.get_success( 202 self.handler.update_device( 203 self.other_user, self.other_user_device_id, update 204 ) 205 ) 206 207 channel = self.make_request( 208 "PUT", 209 self.url, 210 access_token=self.admin_user_tok, 211 ) 212 213 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 214 215 # Ensure the display name was not updated. 216 channel = self.make_request( 217 "GET", 218 self.url, 219 access_token=self.admin_user_tok, 220 ) 221 222 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 223 self.assertEqual("new display", channel.json_body["display_name"]) 224 225 def test_update_display_name(self) -> None: 226 """ 227 Tests a normal successful update of display name 228 """ 229 # Set new display_name 230 channel = self.make_request( 231 "PUT", 232 self.url, 233 access_token=self.admin_user_tok, 234 content={"display_name": "new displayname"}, 235 ) 236 237 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 238 239 # Check new display_name 240 channel = self.make_request( 241 "GET", 242 self.url, 243 access_token=self.admin_user_tok, 244 ) 245 246 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 247 self.assertEqual("new displayname", channel.json_body["display_name"]) 248 249 def test_get_device(self) -> None: 250 """ 251 Tests that a normal lookup for a device is successfully 252 """ 253 channel = self.make_request( 254 "GET", 255 self.url, 256 access_token=self.admin_user_tok, 257 ) 258 259 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 260 self.assertEqual(self.other_user, channel.json_body["user_id"]) 261 # Check that all fields are available 262 self.assertIn("user_id", channel.json_body) 263 self.assertIn("device_id", channel.json_body) 264 self.assertIn("display_name", channel.json_body) 265 self.assertIn("last_seen_ip", channel.json_body) 266 self.assertIn("last_seen_ts", channel.json_body) 267 268 def test_delete_device(self) -> None: 269 """ 270 Tests that a remove of a device is successfully 271 """ 272 # Count number of devies of an user. 273 res = self.get_success(self.handler.get_devices_by_user(self.other_user)) 274 number_devices = len(res) 275 self.assertEqual(1, number_devices) 276 277 # Delete device 278 channel = self.make_request( 279 "DELETE", 280 self.url, 281 access_token=self.admin_user_tok, 282 ) 283 284 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 285 286 # Ensure that the number of devices is decreased 287 res = self.get_success(self.handler.get_devices_by_user(self.other_user)) 288 self.assertEqual(number_devices - 1, len(res)) 289 290 291class DevicesRestTestCase(unittest.HomeserverTestCase): 292 293 servlets = [ 294 synapse.rest.admin.register_servlets, 295 login.register_servlets, 296 ] 297 298 def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: 299 self.admin_user = self.register_user("admin", "pass", admin=True) 300 self.admin_user_tok = self.login("admin", "pass") 301 302 self.other_user = self.register_user("user", "pass") 303 304 self.url = "/_synapse/admin/v2/users/%s/devices" % urllib.parse.quote( 305 self.other_user 306 ) 307 308 def test_no_auth(self) -> None: 309 """ 310 Try to list devices of an user without authentication. 311 """ 312 channel = self.make_request("GET", self.url, b"{}") 313 314 self.assertEqual( 315 HTTPStatus.UNAUTHORIZED, 316 channel.code, 317 msg=channel.json_body, 318 ) 319 self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) 320 321 def test_requester_is_no_admin(self) -> None: 322 """ 323 If the user is not a server admin, an error is returned. 324 """ 325 other_user_token = self.login("user", "pass") 326 327 channel = self.make_request( 328 "GET", 329 self.url, 330 access_token=other_user_token, 331 ) 332 333 self.assertEqual( 334 HTTPStatus.FORBIDDEN, 335 channel.code, 336 msg=channel.json_body, 337 ) 338 self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) 339 340 def test_user_does_not_exist(self) -> None: 341 """ 342 Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND 343 """ 344 url = "/_synapse/admin/v2/users/@unknown_person:test/devices" 345 channel = self.make_request( 346 "GET", 347 url, 348 access_token=self.admin_user_tok, 349 ) 350 351 self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) 352 self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) 353 354 def test_user_is_not_local(self) -> None: 355 """ 356 Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST 357 """ 358 url = "/_synapse/admin/v2/users/@unknown_person:unknown_domain/devices" 359 360 channel = self.make_request( 361 "GET", 362 url, 363 access_token=self.admin_user_tok, 364 ) 365 366 self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) 367 self.assertEqual("Can only lookup local users", channel.json_body["error"]) 368 369 def test_user_has_no_devices(self) -> None: 370 """ 371 Tests that a normal lookup for devices is successfully 372 if user has no devices 373 """ 374 375 # Get devices 376 channel = self.make_request( 377 "GET", 378 self.url, 379 access_token=self.admin_user_tok, 380 ) 381 382 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 383 self.assertEqual(0, channel.json_body["total"]) 384 self.assertEqual(0, len(channel.json_body["devices"])) 385 386 def test_get_devices(self) -> None: 387 """ 388 Tests that a normal lookup for devices is successfully 389 """ 390 # Create devices 391 number_devices = 5 392 for _ in range(number_devices): 393 self.login("user", "pass") 394 395 # Get devices 396 channel = self.make_request( 397 "GET", 398 self.url, 399 access_token=self.admin_user_tok, 400 ) 401 402 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 403 self.assertEqual(number_devices, channel.json_body["total"]) 404 self.assertEqual(number_devices, len(channel.json_body["devices"])) 405 self.assertEqual(self.other_user, channel.json_body["devices"][0]["user_id"]) 406 # Check that all fields are available 407 for d in channel.json_body["devices"]: 408 self.assertIn("user_id", d) 409 self.assertIn("device_id", d) 410 self.assertIn("display_name", d) 411 self.assertIn("last_seen_ip", d) 412 self.assertIn("last_seen_ts", d) 413 414 415class DeleteDevicesRestTestCase(unittest.HomeserverTestCase): 416 417 servlets = [ 418 synapse.rest.admin.register_servlets, 419 login.register_servlets, 420 ] 421 422 def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: 423 self.handler = hs.get_device_handler() 424 425 self.admin_user = self.register_user("admin", "pass", admin=True) 426 self.admin_user_tok = self.login("admin", "pass") 427 428 self.other_user = self.register_user("user", "pass") 429 430 self.url = "/_synapse/admin/v2/users/%s/delete_devices" % urllib.parse.quote( 431 self.other_user 432 ) 433 434 def test_no_auth(self) -> None: 435 """ 436 Try to delete devices of an user without authentication. 437 """ 438 channel = self.make_request("POST", self.url, b"{}") 439 440 self.assertEqual( 441 HTTPStatus.UNAUTHORIZED, 442 channel.code, 443 msg=channel.json_body, 444 ) 445 self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) 446 447 def test_requester_is_no_admin(self) -> None: 448 """ 449 If the user is not a server admin, an error is returned. 450 """ 451 other_user_token = self.login("user", "pass") 452 453 channel = self.make_request( 454 "POST", 455 self.url, 456 access_token=other_user_token, 457 ) 458 459 self.assertEqual( 460 HTTPStatus.FORBIDDEN, 461 channel.code, 462 msg=channel.json_body, 463 ) 464 self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) 465 466 def test_user_does_not_exist(self) -> None: 467 """ 468 Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND 469 """ 470 url = "/_synapse/admin/v2/users/@unknown_person:test/delete_devices" 471 channel = self.make_request( 472 "POST", 473 url, 474 access_token=self.admin_user_tok, 475 ) 476 477 self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) 478 self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) 479 480 def test_user_is_not_local(self) -> None: 481 """ 482 Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST 483 """ 484 url = "/_synapse/admin/v2/users/@unknown_person:unknown_domain/delete_devices" 485 486 channel = self.make_request( 487 "POST", 488 url, 489 access_token=self.admin_user_tok, 490 ) 491 492 self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) 493 self.assertEqual("Can only lookup local users", channel.json_body["error"]) 494 495 def test_unknown_devices(self) -> None: 496 """ 497 Tests that a remove of a device that does not exist returns HTTPStatus.OK. 498 """ 499 channel = self.make_request( 500 "POST", 501 self.url, 502 access_token=self.admin_user_tok, 503 content={"devices": ["unknown_device1", "unknown_device2"]}, 504 ) 505 506 # Delete unknown devices returns status HTTPStatus.OK 507 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 508 509 def test_delete_devices(self) -> None: 510 """ 511 Tests that a remove of devices is successfully 512 """ 513 514 # Create devices 515 number_devices = 5 516 for _ in range(number_devices): 517 self.login("user", "pass") 518 519 # Get devices 520 res = self.get_success(self.handler.get_devices_by_user(self.other_user)) 521 self.assertEqual(number_devices, len(res)) 522 523 # Create list of device IDs 524 device_ids = [] 525 for d in res: 526 device_ids.append(str(d["device_id"])) 527 528 # Delete devices 529 channel = self.make_request( 530 "POST", 531 self.url, 532 access_token=self.admin_user_tok, 533 content={"devices": device_ids}, 534 ) 535 536 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) 537 538 res = self.get_success(self.handler.get_devices_by_user(self.other_user)) 539 self.assertEqual(0, len(res)) 540