1"""
2    :codeauthor: :email:`Bo Maryniuk <bo@suse.de>`
3"""
4
5import datetime
6
7import salt.utils.ssdp as ssdp
8import salt.utils.stringutils
9from tests.support.mock import MagicMock, patch
10from tests.support.unit import TestCase, skipIf
11
12try:
13    import pytest
14except ImportError:
15    pytest = None
16
17
18class Mocks:
19    def get_socket_mock(self, expected_ip, expected_hostname):
20        """
21        Get a mock of a socket
22        :return:
23        """
24        sck = MagicMock()
25        sck.getsockname = MagicMock(return_value=(expected_ip, 123456))
26
27        sock_mock = MagicMock()
28        sock_mock.socket = MagicMock(return_value=sck)
29        sock_mock.gethostname = MagicMock(return_value=expected_hostname)
30        sock_mock.gethostbyname = MagicMock(return_value=expected_ip)
31
32        return sock_mock
33
34    def get_ssdp_factory(self, expected_ip=None, expected_hostname=None, **config):
35        if expected_ip is None:
36            expected_ip = "127.0.0.1"
37        if expected_hostname is None:
38            expected_hostname = "localhost"
39        sock_mock = self.get_socket_mock(expected_ip, expected_hostname)
40        with patch("salt.utils.ssdp.socket", sock_mock):
41            factory = ssdp.SSDPFactory(**config)
42        return factory
43
44    def get_ssdp_discovery_client(
45        self, expected_ip=None, expected_hostname=None, **config
46    ):
47        if expected_ip is None:
48            expected_ip = "127.0.0.1"
49        if expected_hostname is None:
50            expected_hostname = "localhost"
51        sock_mock = self.get_socket_mock(expected_ip, expected_hostname)
52        with patch("salt.utils.ssdp.socket", sock_mock):
53            factory = ssdp.SSDPDiscoveryClient(**config)
54        return factory
55
56    def get_ssdp_discovery_server(
57        self, expected_ip=None, expected_hostname=None, **config
58    ):
59        if expected_ip is None:
60            expected_ip = "127.0.0.1"
61        if expected_hostname is None:
62            expected_hostname = "localhost"
63        sock_mock = self.get_socket_mock(expected_ip, expected_hostname)
64        with patch("salt.utils.ssdp.socket", sock_mock):
65            factory = ssdp.SSDPDiscoveryServer(**config)
66        return factory
67
68
69@skipIf(pytest is None, "PyTest is missing")
70class SSDPBaseTestCase(TestCase, Mocks):
71    """
72    TestCase for SSDP-related parts.
73    """
74
75    @staticmethod
76    def exception_generic(*args, **kwargs):
77        """
78        Side effect
79        :return:
80        """
81        raise Exception("some network error")
82
83    @staticmethod
84    def exception_attr_error(*args, **kwargs):
85        """
86        Side effect
87        :return:
88        """
89        raise AttributeError("attribute error: {}. {}".format(args, kwargs))
90
91    @patch("salt.utils.ssdp._json", None)
92    @patch("salt.utils.ssdp.asyncio", None)
93    def test_base_avail(self):
94        """
95        Test SSDP base class availability method.
96        :return:
97        """
98        base = ssdp.SSDPBase()
99        assert not base._is_available()
100
101        with patch("salt.utils.ssdp._json", True):
102            assert not base._is_available()
103
104        with patch("salt.utils.ssdp.asyncio", True):
105            assert not base._is_available()
106
107        with patch("salt.utils.ssdp._json", True), patch(
108            "salt.utils.ssdp.asyncio", True
109        ):
110            assert base._is_available()
111
112    def test_base_protocol_settings(self):
113        """
114        Tests default constants data.
115        :return:
116        """
117        base = ssdp.SSDPBase()
118        v_keys = ["signature", "answer", "port", "listen_ip", "timeout"]
119        v_vals = ["__salt_master_service", {}, 4520, "0.0.0.0", 3]
120        for key in v_keys:
121            assert key in base.DEFAULTS
122
123        for key in base.DEFAULTS:
124            assert key in v_keys
125
126        for key, value in zip(v_keys, v_vals):
127            assert base.DEFAULTS[key] == value
128
129    def test_base_self_ip(self):
130        """
131        Test getting self IP method.
132
133        :return:
134        """
135        base = ssdp.SSDPBase()
136        expected_ip = "192.168.1.10"
137        expected_host = "oxygen"
138        sock_mock = self.get_socket_mock(expected_ip, expected_host)
139
140        with patch("salt.utils.ssdp.socket", sock_mock):
141            assert base.get_self_ip() == expected_ip
142
143        sock_mock.socket().getsockname.side_effect = SSDPBaseTestCase.exception_generic
144        with patch("salt.utils.ssdp.socket", sock_mock):
145            assert base.get_self_ip() == expected_ip
146
147
148@skipIf(pytest is None, "PyTest is missing")
149class SSDPFactoryTestCase(TestCase, Mocks):
150    """
151    Test socket protocol
152    """
153
154    def test_attr_check(self):
155        """
156        Tests attributes are set to the base class
157
158        :return:
159        """
160        config = {
161            ssdp.SSDPBase.SIGNATURE: "-signature-",
162            ssdp.SSDPBase.ANSWER: {"this-is": "the-answer"},
163        }
164        expected_ip = "10.10.10.10"
165        factory = self.get_ssdp_factory(expected_ip=expected_ip, **config)
166        for attr in [ssdp.SSDPBase.SIGNATURE, ssdp.SSDPBase.ANSWER]:
167            assert hasattr(factory, attr)
168            assert getattr(factory, attr) == config[attr]
169        assert not factory.disable_hidden
170        assert factory.my_ip == expected_ip
171
172    def test_transport_sendto_success(self):
173        """
174        Test transport send_to.
175
176        :return:
177        """
178        transport = MagicMock()
179        log = MagicMock()
180        factory = self.get_ssdp_factory()
181        with patch.object(factory, "transport", transport), patch.object(
182            factory, "log", log
183        ):
184            data = {"some": "data"}
185            addr = "10.10.10.10"
186            factory._sendto(data=data, addr=addr)
187            assert factory.transport.sendto.called
188            assert factory.transport.sendto.mock_calls[0][1][0]["some"] == "data"
189            assert factory.transport.sendto.mock_calls[0][2]["addr"] == "10.10.10.10"
190            assert factory.log.debug.called
191            assert factory.log.debug.mock_calls[0][1][0] == "Sent successfully"
192
193    def test_transport_sendto_retry(self):
194        """
195        Test transport send_to.
196
197        :return:
198        """
199        with patch("salt.utils.ssdp.time.sleep", MagicMock()):
200            transport = MagicMock()
201            transport.sendto = MagicMock(
202                side_effect=SSDPBaseTestCase.exception_attr_error
203            )
204            log = MagicMock()
205            factory = self.get_ssdp_factory()
206            with patch.object(factory, "transport", transport), patch.object(
207                factory, "log", log
208            ):
209                data = {"some": "data"}
210                addr = "10.10.10.10"
211                factory._sendto(data=data, addr=addr)
212                assert factory.transport.sendto.called
213                assert ssdp.time.sleep.called
214                assert (
215                    ssdp.time.sleep.call_args[0][0] > 0
216                    and ssdp.time.sleep.call_args[0][0] < 0.5
217                )
218                assert factory.log.debug.called
219                assert "Permission error" in factory.log.debug.mock_calls[0][1][0]
220
221    def test_datagram_signature_bad(self):
222        """
223        Test datagram_received on bad signature
224
225        :return:
226        """
227        factory = self.get_ssdp_factory()
228        data = "nonsense"
229        addr = "10.10.10.10", "foo.suse.de"
230
231        with patch.object(factory, "log", MagicMock()):
232            factory.datagram_received(data=data, addr=addr)
233            assert factory.log.debug.called
234            assert "Received bad signature from" in factory.log.debug.call_args[0][0]
235            assert factory.log.debug.call_args[0][1] == addr[0]
236            assert factory.log.debug.call_args[0][2] == addr[1]
237
238    def test_datagram_signature_wrong_timestamp_quiet(self):
239        """
240        Test datagram receives a wrong timestamp (no reply).
241
242        :return:
243        """
244        factory = self.get_ssdp_factory()
245        data = "{}nonsense".format(ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE])
246        addr = "10.10.10.10", "foo.suse.de"
247        with patch.object(factory, "log", MagicMock()), patch.object(
248            factory, "_sendto", MagicMock()
249        ):
250            factory.datagram_received(data=data, addr=addr)
251            assert factory.log.debug.called
252            assert (
253                "Received invalid timestamp in package"
254                in factory.log.debug.call_args[0][0]
255            )
256            assert not factory._sendto.called
257
258    def test_datagram_signature_wrong_timestamp_reply(self):
259        """
260        Test datagram receives a wrong timestamp.
261
262        :return:
263        """
264        factory = self.get_ssdp_factory()
265        factory.disable_hidden = True
266        signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
267        data = "{}nonsense".format(signature)
268        addr = "10.10.10.10", "foo.suse.de"
269        with patch.object(factory, "log", MagicMock()), patch.object(
270            factory, "_sendto", MagicMock()
271        ):
272            factory.datagram_received(data=data, addr=addr)
273            assert factory.log.debug.called
274            assert (
275                "Received invalid timestamp in package"
276                in factory.log.debug.call_args[0][0]
277            )
278            assert factory._sendto.called
279            assert (
280                "{}:E:Invalid timestamp".format(signature)
281                == factory._sendto.call_args[0][0]
282            )
283
284    def test_datagram_signature_outdated_timestamp_quiet(self):
285        """
286        Test if datagram processing reacts on outdated message (more than 20 seconds). Quiet mode.
287        :return:
288        """
289        factory = self.get_ssdp_factory()
290        signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
291        data = "{}{}".format(signature, "1516623820")
292        addr = "10.10.10.10", "foo.suse.de"
293
294        ahead_dt = datetime.datetime.fromtimestamp(1516623841)
295        curnt_dt = datetime.datetime.fromtimestamp(1516623820)
296        delta = datetime.timedelta(0, 20)
297        with patch.object(factory, "log", MagicMock()), patch.object(
298            factory, "_sendto"
299        ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch(
300            "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt)
301        ), patch(
302            "salt.utils.ssdp.datetime.datetime.fromtimestamp",
303            MagicMock(return_value=curnt_dt),
304        ), patch(
305            "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta)
306        ):
307            factory.datagram_received(data=data, addr=addr)
308            assert factory.log.debug.called
309            assert not factory.disable_hidden
310            assert not factory._sendto.called
311            assert "Received outdated package" in factory.log.debug.call_args[0][0]
312
313    def test_datagram_signature_outdated_timestamp_reply(self):
314        """
315        Test if datagram processing reacts on outdated message (more than 20 seconds). Reply mode.
316        :return:
317        """
318        factory = self.get_ssdp_factory()
319        factory.disable_hidden = True
320        signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
321        data = "{}{}".format(signature, "1516623820")
322        addr = "10.10.10.10", "foo.suse.de"
323
324        ahead_dt = datetime.datetime.fromtimestamp(1516623841)
325        curnt_dt = datetime.datetime.fromtimestamp(1516623820)
326        delta = datetime.timedelta(0, 20)
327        with patch.object(factory, "log", MagicMock()), patch.object(
328            factory, "_sendto"
329        ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch(
330            "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt)
331        ), patch(
332            "salt.utils.ssdp.datetime.datetime.fromtimestamp",
333            MagicMock(return_value=curnt_dt),
334        ), patch(
335            "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta)
336        ):
337            factory.datagram_received(data=data, addr=addr)
338            assert factory.log.debug.called
339            assert factory.disable_hidden
340            assert factory._sendto.called
341            assert factory._sendto.call_args[0][
342                0
343            ] == "{}:E:Timestamp is too old".format(signature)
344            assert "Received outdated package" in factory.log.debug.call_args[0][0]
345
346    def test_datagram_signature_correct_timestamp_reply(self):
347        """
348        Test if datagram processing sends out correct reply within 20 seconds.
349        :return:
350        """
351        factory = self.get_ssdp_factory()
352        factory.disable_hidden = True
353        signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
354        data = "{}{}".format(signature, "1516623820")
355        addr = "10.10.10.10", "foo.suse.de"
356
357        ahead_dt = datetime.datetime.fromtimestamp(1516623840)
358        curnt_dt = datetime.datetime.fromtimestamp(1516623820)
359        delta = datetime.timedelta(0, 20)
360        with patch.object(factory, "log", MagicMock()), patch.object(
361            factory, "_sendto"
362        ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch(
363            "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt)
364        ), patch(
365            "salt.utils.ssdp.datetime.datetime.fromtimestamp",
366            MagicMock(return_value=curnt_dt),
367        ), patch(
368            "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta)
369        ):
370            factory.datagram_received(data=data, addr=addr)
371            assert factory.log.debug.called
372            assert factory.disable_hidden
373            assert factory._sendto.called
374            assert factory._sendto.call_args[0][0] == salt.utils.stringutils.to_bytes(
375                "{}:@:{{}}".format(signature)
376            )
377            assert 'Received "%s" from %s:%s' in factory.log.debug.call_args[0][0]
378
379
380@skipIf(pytest is None, "PyTest is missing")
381class SSDPServerTestCase(TestCase, Mocks):
382    """
383    Server-related test cases
384    """
385
386    def test_config_detached(self):
387        """
388        Test if configuration is not a reference.
389        :return:
390        """
391        old_ip = "10.10.10.10"
392        new_ip = "20.20.20.20"
393        config = {"answer": {"master": old_ip}}
394        with patch(
395            "salt.utils.ssdp.SSDPDiscoveryServer.get_self_ip",
396            MagicMock(return_value=new_ip),
397        ):
398            srv = ssdp.SSDPDiscoveryServer(**config)
399            assert srv._config["answer"]["master"] == new_ip
400            assert config["answer"]["master"] == old_ip
401
402    def test_run(self):
403        """
404        Test server runner.
405        :return:
406        """
407        with patch("salt.utils.ssdp.SSDPFactory", MagicMock()):
408            config = {
409                "answer": {"master": "10.10.10.10"},
410                ssdp.SSDPBase.LISTEN_IP: "10.10.10.10",
411                ssdp.SSDPBase.PORT: 12345,
412            }
413            srv = self.get_ssdp_discovery_server(**config)
414            srv.create_datagram_endpoint = MagicMock()
415            srv.log = MagicMock()
416
417            trnsp = MagicMock()
418            proto = MagicMock()
419            loop = MagicMock()
420            loop.run_until_complete = MagicMock(return_value=(trnsp, proto))
421
422            io = MagicMock()
423            io.ported = False
424            io.get_event_loop = MagicMock(return_value=loop)
425
426            with patch("salt.utils.ssdp.asyncio", io):
427                srv.run()
428                cde_args = io.get_event_loop().create_datagram_endpoint.call_args[1]
429                cfg_ip_addr, cfg_port = cde_args["local_addr"]
430
431                assert io.get_event_loop.called
432                assert io.get_event_loop().run_until_complete.called
433                assert io.get_event_loop().create_datagram_endpoint.called
434                assert io.get_event_loop().run_forever.called
435                assert trnsp.close.called
436                assert loop.close.called
437                assert srv.log.info.called
438                assert (
439                    srv.log.info.call_args[0][0]
440                    == "Stopping service discovery listener."
441                )
442                assert "allow_broadcast" in cde_args
443                assert cde_args["allow_broadcast"]
444                assert "local_addr" in cde_args
445                assert (
446                    not cfg_ip_addr == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.LISTEN_IP]
447                    and cfg_ip_addr == "10.10.10.10"
448                )
449                assert (
450                    not cfg_port == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.PORT]
451                    and cfg_port == 12345
452                )
453
454
455@skipIf(pytest is None, "PyTest is missing")
456class SSDPClientTestCase(TestCase, Mocks):
457    """
458    Client-related test cases
459    """
460
461    class Resource:
462        """
463        Fake network reader
464        """
465
466        def __init__(self):
467            self.pool = [
468                ("some", "10.10.10.10"),
469                ("data", "20.20.20.20"),
470                ("data", "10.10.10.10"),
471                (None, None),
472            ]
473
474        def read(self, *args, **kwargs):
475            return self.pool.pop(0)
476
477    def test_config_passed(self):
478        """
479        Test if the configuration is passed.
480        :return:
481        """
482        config = {
483            ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server",
484            ssdp.SSDPBase.TIMEOUT: 5,
485            ssdp.SSDPBase.PORT: 12345,
486        }
487        clnt = self.get_ssdp_discovery_client(**config)
488        assert clnt._config[ssdp.SSDPBase.SIGNATURE] == config[ssdp.SSDPBase.SIGNATURE]
489        assert clnt._config[ssdp.SSDPBase.PORT] == config[ssdp.SSDPBase.PORT]
490        assert clnt._config[ssdp.SSDPBase.TIMEOUT] == config[ssdp.SSDPBase.TIMEOUT]
491
492    def test_config_detached(self):
493        """
494        Test if the passed configuration is not a reference.
495        :return:
496        """
497        config = {
498            ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server",
499        }
500        clnt = self.get_ssdp_discovery_client(**config)
501        clnt._config["foo"] = "bar"
502        assert "foo" in clnt._config
503        assert "foo" not in config
504
505    def test_query(self):
506        """
507        Test if client queries the broadcast
508        :return:
509        """
510        config = {
511            ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server",
512            ssdp.SSDPBase.PORT: 4000,
513        }
514        f_time = 1111
515        _socket = MagicMock()
516        with patch("salt.utils.ssdp.socket", _socket), patch(
517            "salt.utils.ssdp.time.time", MagicMock(return_value=f_time)
518        ):
519            clnt = ssdp.SSDPDiscoveryClient(**config)
520            clnt._query()
521            assert clnt._socket.sendto.called
522            message, target = clnt._socket.sendto.call_args[0]
523            assert message == salt.utils.stringutils.to_bytes(
524                "{}{}".format(config[ssdp.SSDPBase.SIGNATURE], f_time)
525            )
526            assert target[0] == "<broadcast>"
527            assert target[1] == config[ssdp.SSDPBase.PORT]
528
529    def test_get_masters_map(self):
530        """
531        Test getting map of the available masters on the network
532        :return:
533        """
534        _socket = MagicMock()
535        response = {}
536        with patch("salt.utils.ssdp.socket", _socket):
537            clnt = ssdp.SSDPDiscoveryClient()
538            clnt._socket.recvfrom = SSDPClientTestCase.Resource().read
539            clnt.log = MagicMock()
540            clnt._collect_masters_map(response=response)
541            assert "10.10.10.10" in response
542            assert "20.20.20.20" in response
543            assert response["10.10.10.10"] == ["some", "data"]
544            assert response["20.20.20.20"] == ["data"]
545
546    def test_get_masters_map_error_handling(self):
547        """
548        Test getting map handles timeout network exception
549        :return:
550        """
551        _socket = MagicMock()
552        response = {}
553        error_msg = "fake testing timeout just had happened"
554        with patch("salt.utils.ssdp.socket", _socket):
555            clnt = ssdp.SSDPDiscoveryClient()
556            clnt._socket.recvfrom = MagicMock(side_effect=Exception(error_msg))
557            clnt.log = MagicMock()
558            clnt._collect_masters_map(response=response)
559            assert clnt.log.error.called
560            assert (
561                "Discovery master collection failure" in clnt.log.error.call_args[0][0]
562            )
563            assert error_msg == str(clnt.log.error.call_args[0][1])
564            assert not response
565
566    def test_discover_no_masters(self):
567        """
568        Test discover available master on the network (none found).
569        :return:
570        """
571
572        clnt = self.get_ssdp_discovery_client()
573        clnt._query = MagicMock()
574        clnt._collect_masters_map = MagicMock()
575        clnt.log = MagicMock()
576        clnt.discover()
577
578        assert clnt.log.info.called
579        assert clnt.log.info.call_args[0][0] == "No master has been discovered."
580
581    def test_discover_general_error(self):
582        """
583        Test discover available master on the network (erroneous found)
584        :return:
585        """
586
587        _socket = MagicMock()
588        error = "Admins on strike due to broken coffee machine"
589        signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
590        fake_resource = SSDPClientTestCase.Resource()
591        fake_resource.pool = [
592            ("{}:E:{}".format(signature, error), "10.10.10.10"),
593            (None, None),
594        ]
595
596        with patch("salt.utils.ssdp.socket", _socket):
597            clnt = ssdp.SSDPDiscoveryClient()
598            clnt._socket.recvfrom = fake_resource.read
599            clnt._query = MagicMock()
600            clnt.log = MagicMock()
601            clnt.discover()
602            assert len(clnt.log.error.mock_calls) == 1
603            assert (
604                "Error response from the service publisher"
605                in clnt.log.error.call_args[0][0]
606            )
607            assert "10.10.10.10" == clnt.log.error.call_args[0][1]
608            assert clnt.log.error.call_args[1] == {}
609            assert clnt.log.error.call_args[0][2] == error
610
611    def test_discover_timestamp_error(self):
612        """
613        Test discover available master on the network (outdated timestamp)
614        :return:
615        """
616
617        _socket = MagicMock()
618        error = (
619            "We only support a 1200 bps connection. Routing timestamp problems on"
620            " neural net."
621        )
622        signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
623        fake_resource = SSDPClientTestCase.Resource()
624        fake_resource.pool = [
625            ("{}:E:{}".format(signature, error), "10.10.10.10"),
626            (None, None),
627        ]
628
629        with patch("salt.utils.ssdp.socket", _socket):
630            clnt = ssdp.SSDPDiscoveryClient()
631            clnt._socket.recvfrom = fake_resource.read
632            clnt._query = MagicMock()
633            clnt.log = MagicMock()
634            clnt.discover()
635            assert len(clnt.log.error.mock_calls) == 2
636            assert (
637                "Error response from the service publisher"
638                in clnt.log.error.mock_calls[0][1][0]
639            )
640            assert clnt.log.error.mock_calls[0][1][2] == error
641            assert clnt.log.error.mock_calls[0][2] == {}
642            assert (
643                "Publisher sent shifted timestamp" in clnt.log.error.mock_calls[1][1][0]
644            )
645            assert (
646                clnt.log.error.mock_calls[1][1][1]
647                == clnt.log.error.mock_calls[0][1][1]
648                == "10.10.10.10"
649            )
650