1""" 2 :codeauthor: :email:`Bo Maryniuk <bo@suse.de>` 3""" 4 5import datetime 6 7import salt.utils.ssdp as ssdp 8import salt.utils.stringutils 9from tests.support.mock import MagicMock, patch 10from tests.support.unit import TestCase, skipIf 11 12try: 13 import pytest 14except ImportError: 15 pytest = None 16 17 18class Mocks: 19 def get_socket_mock(self, expected_ip, expected_hostname): 20 """ 21 Get a mock of a socket 22 :return: 23 """ 24 sck = MagicMock() 25 sck.getsockname = MagicMock(return_value=(expected_ip, 123456)) 26 27 sock_mock = MagicMock() 28 sock_mock.socket = MagicMock(return_value=sck) 29 sock_mock.gethostname = MagicMock(return_value=expected_hostname) 30 sock_mock.gethostbyname = MagicMock(return_value=expected_ip) 31 32 return sock_mock 33 34 def get_ssdp_factory(self, expected_ip=None, expected_hostname=None, **config): 35 if expected_ip is None: 36 expected_ip = "127.0.0.1" 37 if expected_hostname is None: 38 expected_hostname = "localhost" 39 sock_mock = self.get_socket_mock(expected_ip, expected_hostname) 40 with patch("salt.utils.ssdp.socket", sock_mock): 41 factory = ssdp.SSDPFactory(**config) 42 return factory 43 44 def get_ssdp_discovery_client( 45 self, expected_ip=None, expected_hostname=None, **config 46 ): 47 if expected_ip is None: 48 expected_ip = "127.0.0.1" 49 if expected_hostname is None: 50 expected_hostname = "localhost" 51 sock_mock = self.get_socket_mock(expected_ip, expected_hostname) 52 with patch("salt.utils.ssdp.socket", sock_mock): 53 factory = ssdp.SSDPDiscoveryClient(**config) 54 return factory 55 56 def get_ssdp_discovery_server( 57 self, expected_ip=None, expected_hostname=None, **config 58 ): 59 if expected_ip is None: 60 expected_ip = "127.0.0.1" 61 if expected_hostname is None: 62 expected_hostname = "localhost" 63 sock_mock = self.get_socket_mock(expected_ip, expected_hostname) 64 with patch("salt.utils.ssdp.socket", sock_mock): 65 factory = ssdp.SSDPDiscoveryServer(**config) 66 return factory 67 68 69@skipIf(pytest is None, "PyTest is missing") 70class SSDPBaseTestCase(TestCase, Mocks): 71 """ 72 TestCase for SSDP-related parts. 73 """ 74 75 @staticmethod 76 def exception_generic(*args, **kwargs): 77 """ 78 Side effect 79 :return: 80 """ 81 raise Exception("some network error") 82 83 @staticmethod 84 def exception_attr_error(*args, **kwargs): 85 """ 86 Side effect 87 :return: 88 """ 89 raise AttributeError("attribute error: {}. {}".format(args, kwargs)) 90 91 @patch("salt.utils.ssdp._json", None) 92 @patch("salt.utils.ssdp.asyncio", None) 93 def test_base_avail(self): 94 """ 95 Test SSDP base class availability method. 96 :return: 97 """ 98 base = ssdp.SSDPBase() 99 assert not base._is_available() 100 101 with patch("salt.utils.ssdp._json", True): 102 assert not base._is_available() 103 104 with patch("salt.utils.ssdp.asyncio", True): 105 assert not base._is_available() 106 107 with patch("salt.utils.ssdp._json", True), patch( 108 "salt.utils.ssdp.asyncio", True 109 ): 110 assert base._is_available() 111 112 def test_base_protocol_settings(self): 113 """ 114 Tests default constants data. 115 :return: 116 """ 117 base = ssdp.SSDPBase() 118 v_keys = ["signature", "answer", "port", "listen_ip", "timeout"] 119 v_vals = ["__salt_master_service", {}, 4520, "0.0.0.0", 3] 120 for key in v_keys: 121 assert key in base.DEFAULTS 122 123 for key in base.DEFAULTS: 124 assert key in v_keys 125 126 for key, value in zip(v_keys, v_vals): 127 assert base.DEFAULTS[key] == value 128 129 def test_base_self_ip(self): 130 """ 131 Test getting self IP method. 132 133 :return: 134 """ 135 base = ssdp.SSDPBase() 136 expected_ip = "192.168.1.10" 137 expected_host = "oxygen" 138 sock_mock = self.get_socket_mock(expected_ip, expected_host) 139 140 with patch("salt.utils.ssdp.socket", sock_mock): 141 assert base.get_self_ip() == expected_ip 142 143 sock_mock.socket().getsockname.side_effect = SSDPBaseTestCase.exception_generic 144 with patch("salt.utils.ssdp.socket", sock_mock): 145 assert base.get_self_ip() == expected_ip 146 147 148@skipIf(pytest is None, "PyTest is missing") 149class SSDPFactoryTestCase(TestCase, Mocks): 150 """ 151 Test socket protocol 152 """ 153 154 def test_attr_check(self): 155 """ 156 Tests attributes are set to the base class 157 158 :return: 159 """ 160 config = { 161 ssdp.SSDPBase.SIGNATURE: "-signature-", 162 ssdp.SSDPBase.ANSWER: {"this-is": "the-answer"}, 163 } 164 expected_ip = "10.10.10.10" 165 factory = self.get_ssdp_factory(expected_ip=expected_ip, **config) 166 for attr in [ssdp.SSDPBase.SIGNATURE, ssdp.SSDPBase.ANSWER]: 167 assert hasattr(factory, attr) 168 assert getattr(factory, attr) == config[attr] 169 assert not factory.disable_hidden 170 assert factory.my_ip == expected_ip 171 172 def test_transport_sendto_success(self): 173 """ 174 Test transport send_to. 175 176 :return: 177 """ 178 transport = MagicMock() 179 log = MagicMock() 180 factory = self.get_ssdp_factory() 181 with patch.object(factory, "transport", transport), patch.object( 182 factory, "log", log 183 ): 184 data = {"some": "data"} 185 addr = "10.10.10.10" 186 factory._sendto(data=data, addr=addr) 187 assert factory.transport.sendto.called 188 assert factory.transport.sendto.mock_calls[0][1][0]["some"] == "data" 189 assert factory.transport.sendto.mock_calls[0][2]["addr"] == "10.10.10.10" 190 assert factory.log.debug.called 191 assert factory.log.debug.mock_calls[0][1][0] == "Sent successfully" 192 193 def test_transport_sendto_retry(self): 194 """ 195 Test transport send_to. 196 197 :return: 198 """ 199 with patch("salt.utils.ssdp.time.sleep", MagicMock()): 200 transport = MagicMock() 201 transport.sendto = MagicMock( 202 side_effect=SSDPBaseTestCase.exception_attr_error 203 ) 204 log = MagicMock() 205 factory = self.get_ssdp_factory() 206 with patch.object(factory, "transport", transport), patch.object( 207 factory, "log", log 208 ): 209 data = {"some": "data"} 210 addr = "10.10.10.10" 211 factory._sendto(data=data, addr=addr) 212 assert factory.transport.sendto.called 213 assert ssdp.time.sleep.called 214 assert ( 215 ssdp.time.sleep.call_args[0][0] > 0 216 and ssdp.time.sleep.call_args[0][0] < 0.5 217 ) 218 assert factory.log.debug.called 219 assert "Permission error" in factory.log.debug.mock_calls[0][1][0] 220 221 def test_datagram_signature_bad(self): 222 """ 223 Test datagram_received on bad signature 224 225 :return: 226 """ 227 factory = self.get_ssdp_factory() 228 data = "nonsense" 229 addr = "10.10.10.10", "foo.suse.de" 230 231 with patch.object(factory, "log", MagicMock()): 232 factory.datagram_received(data=data, addr=addr) 233 assert factory.log.debug.called 234 assert "Received bad signature from" in factory.log.debug.call_args[0][0] 235 assert factory.log.debug.call_args[0][1] == addr[0] 236 assert factory.log.debug.call_args[0][2] == addr[1] 237 238 def test_datagram_signature_wrong_timestamp_quiet(self): 239 """ 240 Test datagram receives a wrong timestamp (no reply). 241 242 :return: 243 """ 244 factory = self.get_ssdp_factory() 245 data = "{}nonsense".format(ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]) 246 addr = "10.10.10.10", "foo.suse.de" 247 with patch.object(factory, "log", MagicMock()), patch.object( 248 factory, "_sendto", MagicMock() 249 ): 250 factory.datagram_received(data=data, addr=addr) 251 assert factory.log.debug.called 252 assert ( 253 "Received invalid timestamp in package" 254 in factory.log.debug.call_args[0][0] 255 ) 256 assert not factory._sendto.called 257 258 def test_datagram_signature_wrong_timestamp_reply(self): 259 """ 260 Test datagram receives a wrong timestamp. 261 262 :return: 263 """ 264 factory = self.get_ssdp_factory() 265 factory.disable_hidden = True 266 signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] 267 data = "{}nonsense".format(signature) 268 addr = "10.10.10.10", "foo.suse.de" 269 with patch.object(factory, "log", MagicMock()), patch.object( 270 factory, "_sendto", MagicMock() 271 ): 272 factory.datagram_received(data=data, addr=addr) 273 assert factory.log.debug.called 274 assert ( 275 "Received invalid timestamp in package" 276 in factory.log.debug.call_args[0][0] 277 ) 278 assert factory._sendto.called 279 assert ( 280 "{}:E:Invalid timestamp".format(signature) 281 == factory._sendto.call_args[0][0] 282 ) 283 284 def test_datagram_signature_outdated_timestamp_quiet(self): 285 """ 286 Test if datagram processing reacts on outdated message (more than 20 seconds). Quiet mode. 287 :return: 288 """ 289 factory = self.get_ssdp_factory() 290 signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] 291 data = "{}{}".format(signature, "1516623820") 292 addr = "10.10.10.10", "foo.suse.de" 293 294 ahead_dt = datetime.datetime.fromtimestamp(1516623841) 295 curnt_dt = datetime.datetime.fromtimestamp(1516623820) 296 delta = datetime.timedelta(0, 20) 297 with patch.object(factory, "log", MagicMock()), patch.object( 298 factory, "_sendto" 299 ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch( 300 "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt) 301 ), patch( 302 "salt.utils.ssdp.datetime.datetime.fromtimestamp", 303 MagicMock(return_value=curnt_dt), 304 ), patch( 305 "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta) 306 ): 307 factory.datagram_received(data=data, addr=addr) 308 assert factory.log.debug.called 309 assert not factory.disable_hidden 310 assert not factory._sendto.called 311 assert "Received outdated package" in factory.log.debug.call_args[0][0] 312 313 def test_datagram_signature_outdated_timestamp_reply(self): 314 """ 315 Test if datagram processing reacts on outdated message (more than 20 seconds). Reply mode. 316 :return: 317 """ 318 factory = self.get_ssdp_factory() 319 factory.disable_hidden = True 320 signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] 321 data = "{}{}".format(signature, "1516623820") 322 addr = "10.10.10.10", "foo.suse.de" 323 324 ahead_dt = datetime.datetime.fromtimestamp(1516623841) 325 curnt_dt = datetime.datetime.fromtimestamp(1516623820) 326 delta = datetime.timedelta(0, 20) 327 with patch.object(factory, "log", MagicMock()), patch.object( 328 factory, "_sendto" 329 ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch( 330 "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt) 331 ), patch( 332 "salt.utils.ssdp.datetime.datetime.fromtimestamp", 333 MagicMock(return_value=curnt_dt), 334 ), patch( 335 "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta) 336 ): 337 factory.datagram_received(data=data, addr=addr) 338 assert factory.log.debug.called 339 assert factory.disable_hidden 340 assert factory._sendto.called 341 assert factory._sendto.call_args[0][ 342 0 343 ] == "{}:E:Timestamp is too old".format(signature) 344 assert "Received outdated package" in factory.log.debug.call_args[0][0] 345 346 def test_datagram_signature_correct_timestamp_reply(self): 347 """ 348 Test if datagram processing sends out correct reply within 20 seconds. 349 :return: 350 """ 351 factory = self.get_ssdp_factory() 352 factory.disable_hidden = True 353 signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] 354 data = "{}{}".format(signature, "1516623820") 355 addr = "10.10.10.10", "foo.suse.de" 356 357 ahead_dt = datetime.datetime.fromtimestamp(1516623840) 358 curnt_dt = datetime.datetime.fromtimestamp(1516623820) 359 delta = datetime.timedelta(0, 20) 360 with patch.object(factory, "log", MagicMock()), patch.object( 361 factory, "_sendto" 362 ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch( 363 "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt) 364 ), patch( 365 "salt.utils.ssdp.datetime.datetime.fromtimestamp", 366 MagicMock(return_value=curnt_dt), 367 ), patch( 368 "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta) 369 ): 370 factory.datagram_received(data=data, addr=addr) 371 assert factory.log.debug.called 372 assert factory.disable_hidden 373 assert factory._sendto.called 374 assert factory._sendto.call_args[0][0] == salt.utils.stringutils.to_bytes( 375 "{}:@:{{}}".format(signature) 376 ) 377 assert 'Received "%s" from %s:%s' in factory.log.debug.call_args[0][0] 378 379 380@skipIf(pytest is None, "PyTest is missing") 381class SSDPServerTestCase(TestCase, Mocks): 382 """ 383 Server-related test cases 384 """ 385 386 def test_config_detached(self): 387 """ 388 Test if configuration is not a reference. 389 :return: 390 """ 391 old_ip = "10.10.10.10" 392 new_ip = "20.20.20.20" 393 config = {"answer": {"master": old_ip}} 394 with patch( 395 "salt.utils.ssdp.SSDPDiscoveryServer.get_self_ip", 396 MagicMock(return_value=new_ip), 397 ): 398 srv = ssdp.SSDPDiscoveryServer(**config) 399 assert srv._config["answer"]["master"] == new_ip 400 assert config["answer"]["master"] == old_ip 401 402 def test_run(self): 403 """ 404 Test server runner. 405 :return: 406 """ 407 with patch("salt.utils.ssdp.SSDPFactory", MagicMock()): 408 config = { 409 "answer": {"master": "10.10.10.10"}, 410 ssdp.SSDPBase.LISTEN_IP: "10.10.10.10", 411 ssdp.SSDPBase.PORT: 12345, 412 } 413 srv = self.get_ssdp_discovery_server(**config) 414 srv.create_datagram_endpoint = MagicMock() 415 srv.log = MagicMock() 416 417 trnsp = MagicMock() 418 proto = MagicMock() 419 loop = MagicMock() 420 loop.run_until_complete = MagicMock(return_value=(trnsp, proto)) 421 422 io = MagicMock() 423 io.ported = False 424 io.get_event_loop = MagicMock(return_value=loop) 425 426 with patch("salt.utils.ssdp.asyncio", io): 427 srv.run() 428 cde_args = io.get_event_loop().create_datagram_endpoint.call_args[1] 429 cfg_ip_addr, cfg_port = cde_args["local_addr"] 430 431 assert io.get_event_loop.called 432 assert io.get_event_loop().run_until_complete.called 433 assert io.get_event_loop().create_datagram_endpoint.called 434 assert io.get_event_loop().run_forever.called 435 assert trnsp.close.called 436 assert loop.close.called 437 assert srv.log.info.called 438 assert ( 439 srv.log.info.call_args[0][0] 440 == "Stopping service discovery listener." 441 ) 442 assert "allow_broadcast" in cde_args 443 assert cde_args["allow_broadcast"] 444 assert "local_addr" in cde_args 445 assert ( 446 not cfg_ip_addr == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.LISTEN_IP] 447 and cfg_ip_addr == "10.10.10.10" 448 ) 449 assert ( 450 not cfg_port == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.PORT] 451 and cfg_port == 12345 452 ) 453 454 455@skipIf(pytest is None, "PyTest is missing") 456class SSDPClientTestCase(TestCase, Mocks): 457 """ 458 Client-related test cases 459 """ 460 461 class Resource: 462 """ 463 Fake network reader 464 """ 465 466 def __init__(self): 467 self.pool = [ 468 ("some", "10.10.10.10"), 469 ("data", "20.20.20.20"), 470 ("data", "10.10.10.10"), 471 (None, None), 472 ] 473 474 def read(self, *args, **kwargs): 475 return self.pool.pop(0) 476 477 def test_config_passed(self): 478 """ 479 Test if the configuration is passed. 480 :return: 481 """ 482 config = { 483 ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server", 484 ssdp.SSDPBase.TIMEOUT: 5, 485 ssdp.SSDPBase.PORT: 12345, 486 } 487 clnt = self.get_ssdp_discovery_client(**config) 488 assert clnt._config[ssdp.SSDPBase.SIGNATURE] == config[ssdp.SSDPBase.SIGNATURE] 489 assert clnt._config[ssdp.SSDPBase.PORT] == config[ssdp.SSDPBase.PORT] 490 assert clnt._config[ssdp.SSDPBase.TIMEOUT] == config[ssdp.SSDPBase.TIMEOUT] 491 492 def test_config_detached(self): 493 """ 494 Test if the passed configuration is not a reference. 495 :return: 496 """ 497 config = { 498 ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server", 499 } 500 clnt = self.get_ssdp_discovery_client(**config) 501 clnt._config["foo"] = "bar" 502 assert "foo" in clnt._config 503 assert "foo" not in config 504 505 def test_query(self): 506 """ 507 Test if client queries the broadcast 508 :return: 509 """ 510 config = { 511 ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server", 512 ssdp.SSDPBase.PORT: 4000, 513 } 514 f_time = 1111 515 _socket = MagicMock() 516 with patch("salt.utils.ssdp.socket", _socket), patch( 517 "salt.utils.ssdp.time.time", MagicMock(return_value=f_time) 518 ): 519 clnt = ssdp.SSDPDiscoveryClient(**config) 520 clnt._query() 521 assert clnt._socket.sendto.called 522 message, target = clnt._socket.sendto.call_args[0] 523 assert message == salt.utils.stringutils.to_bytes( 524 "{}{}".format(config[ssdp.SSDPBase.SIGNATURE], f_time) 525 ) 526 assert target[0] == "<broadcast>" 527 assert target[1] == config[ssdp.SSDPBase.PORT] 528 529 def test_get_masters_map(self): 530 """ 531 Test getting map of the available masters on the network 532 :return: 533 """ 534 _socket = MagicMock() 535 response = {} 536 with patch("salt.utils.ssdp.socket", _socket): 537 clnt = ssdp.SSDPDiscoveryClient() 538 clnt._socket.recvfrom = SSDPClientTestCase.Resource().read 539 clnt.log = MagicMock() 540 clnt._collect_masters_map(response=response) 541 assert "10.10.10.10" in response 542 assert "20.20.20.20" in response 543 assert response["10.10.10.10"] == ["some", "data"] 544 assert response["20.20.20.20"] == ["data"] 545 546 def test_get_masters_map_error_handling(self): 547 """ 548 Test getting map handles timeout network exception 549 :return: 550 """ 551 _socket = MagicMock() 552 response = {} 553 error_msg = "fake testing timeout just had happened" 554 with patch("salt.utils.ssdp.socket", _socket): 555 clnt = ssdp.SSDPDiscoveryClient() 556 clnt._socket.recvfrom = MagicMock(side_effect=Exception(error_msg)) 557 clnt.log = MagicMock() 558 clnt._collect_masters_map(response=response) 559 assert clnt.log.error.called 560 assert ( 561 "Discovery master collection failure" in clnt.log.error.call_args[0][0] 562 ) 563 assert error_msg == str(clnt.log.error.call_args[0][1]) 564 assert not response 565 566 def test_discover_no_masters(self): 567 """ 568 Test discover available master on the network (none found). 569 :return: 570 """ 571 572 clnt = self.get_ssdp_discovery_client() 573 clnt._query = MagicMock() 574 clnt._collect_masters_map = MagicMock() 575 clnt.log = MagicMock() 576 clnt.discover() 577 578 assert clnt.log.info.called 579 assert clnt.log.info.call_args[0][0] == "No master has been discovered." 580 581 def test_discover_general_error(self): 582 """ 583 Test discover available master on the network (erroneous found) 584 :return: 585 """ 586 587 _socket = MagicMock() 588 error = "Admins on strike due to broken coffee machine" 589 signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] 590 fake_resource = SSDPClientTestCase.Resource() 591 fake_resource.pool = [ 592 ("{}:E:{}".format(signature, error), "10.10.10.10"), 593 (None, None), 594 ] 595 596 with patch("salt.utils.ssdp.socket", _socket): 597 clnt = ssdp.SSDPDiscoveryClient() 598 clnt._socket.recvfrom = fake_resource.read 599 clnt._query = MagicMock() 600 clnt.log = MagicMock() 601 clnt.discover() 602 assert len(clnt.log.error.mock_calls) == 1 603 assert ( 604 "Error response from the service publisher" 605 in clnt.log.error.call_args[0][0] 606 ) 607 assert "10.10.10.10" == clnt.log.error.call_args[0][1] 608 assert clnt.log.error.call_args[1] == {} 609 assert clnt.log.error.call_args[0][2] == error 610 611 def test_discover_timestamp_error(self): 612 """ 613 Test discover available master on the network (outdated timestamp) 614 :return: 615 """ 616 617 _socket = MagicMock() 618 error = ( 619 "We only support a 1200 bps connection. Routing timestamp problems on" 620 " neural net." 621 ) 622 signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] 623 fake_resource = SSDPClientTestCase.Resource() 624 fake_resource.pool = [ 625 ("{}:E:{}".format(signature, error), "10.10.10.10"), 626 (None, None), 627 ] 628 629 with patch("salt.utils.ssdp.socket", _socket): 630 clnt = ssdp.SSDPDiscoveryClient() 631 clnt._socket.recvfrom = fake_resource.read 632 clnt._query = MagicMock() 633 clnt.log = MagicMock() 634 clnt.discover() 635 assert len(clnt.log.error.mock_calls) == 2 636 assert ( 637 "Error response from the service publisher" 638 in clnt.log.error.mock_calls[0][1][0] 639 ) 640 assert clnt.log.error.mock_calls[0][1][2] == error 641 assert clnt.log.error.mock_calls[0][2] == {} 642 assert ( 643 "Publisher sent shifted timestamp" in clnt.log.error.mock_calls[1][1][0] 644 ) 645 assert ( 646 clnt.log.error.mock_calls[1][1][1] 647 == clnt.log.error.mock_calls[0][1][1] 648 == "10.10.10.10" 649 ) 650