1#!/usr/bin/env python
2#
3#   BAREOS - Backup Archiving REcovery Open Sourced
4#
5#   Copyright (C) 2019-2020 Bareos GmbH & Co. KG
6#
7#   This program is Free Software; you can redistribute it and/or
8#   modify it under the terms of version three of the GNU Affero General Public
9#   License as published by the Free Software Foundation and included
10#   in the file LICENSE.
11#
12#   This program is distributed in the hope that it will be useful, but
13#   WITHOUT ANY WARRANTY; without even the implied warranty of
14#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15#   Affero General Public License for more details.
16#
17#   You should have received a copy of the GNU Affero General Public License
18#   along with this program; if not, write to the Free Software
19#   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20#   02110-1301, USA.
21
22# -*- coding: utf-8 -*-
23
24from __future__ import print_function
25import json
26import logging
27import os
28import re
29from time import sleep
30import unittest
31import warnings
32
33import bareos.bsock
34from bareos.bsock.constants import Constants
35from bareos.bsock.protocolmessages import ProtocolMessages
36from bareos.bsock.protocolversions import ProtocolVersions
37from bareos.bsock.lowlevel import LowLevel
38import bareos.exceptions
39
40
41class PythonBareosBase(unittest.TestCase):
42
43    director_name = u"bareos-dir"
44
45    director_address = "localhost"
46    director_port = 9101
47    director_root_password = "secret"
48    director_extra_options = {}
49    client = "bareos-fd"
50
51    filedaemon_address = "localhost"
52    filedaemon_port = 9102
53    filedaemon_director_password = u"secret"
54
55    # restorefile = '/usr/sbin/bconsole'
56    # path to store logging files
57    backup_directory = "tmp/data/"
58    debug = False
59    logpath = "{}/PythonBareosTest.log".format(os.getcwd())
60
61    def setUp(self):
62        # Configure the logger, for information about the timings set it to INFO
63        logging.basicConfig(
64            filename=self.logpath,
65            format="%(levelname)s %(module)s.%(funcName)s: %(message)s",
66            level=logging.INFO,
67        )
68        logger = logging.getLogger()
69        if self.debug:
70            logger.setLevel(logging.DEBUG)
71
72        # assertRegexpMatches has been renamed
73        # to assertRegex in Python 3.2
74        # and is deprecated now.
75        # This prevents a deprecation warning.
76        if hasattr(self, "assertRegexpMatches") and not hasattr(self, "assertRegex"):
77            self.assertRegex = self.assertRegexpMatches
78
79        logger.debug("setUp")
80
81    def tearDown(self):
82        logger = logging.getLogger()
83        logger.debug("tearDown\n\n\n")
84
85    def get_name_of_test(self):
86        return self.id().split(".", 1)[1]
87
88    def get_operator_username(self, tls=None):
89        if tls is None:
90            if bareos.bsock.DirectorConsole.is_tls_psk_available():
91                tls = True
92            else:
93                tls = False
94        if tls:
95            return u"admin-tls"
96        else:
97            return u"admin-notls"
98
99    def get_operator_password(self, username=None):
100        return bareos.bsock.Password(u"secret")
101
102
103class PythonBareosModuleTest(PythonBareosBase):
104    def versiontuple(self, versionstring):
105        version, separator, suffix = versionstring.partition("~")
106        return tuple(map(int, (version.split("."))))
107
108    def test_exception_connection_error(self):
109        """
110        Test if the ConnectionError exception deliveres the expected result.
111        """
112        logger = logging.getLogger()
113
114        message = "my test message"
115        try:
116            raise bareos.exceptions.ConnectionError("my test message")
117        except bareos.exceptions.ConnectionError as e:
118            logger.debug(
119                "signal: str={}, repr={}, args={}".format(str(e), repr(e), e.args)
120            )
121            self.assertEqual(message, str(e))
122
123    def test_exception_signal_received(self):
124        """
125        Test if the SignalReceivedException deliveres the expected result.
126        """
127        logger = logging.getLogger()
128
129        try:
130            raise bareos.exceptions.SignalReceivedException(Constants.BNET_TERMINATE)
131        except bareos.exceptions.SignalReceivedException as e:
132            logger.debug(
133                "signal: str={}, repr={}, args={}".format(str(e), repr(e), e.args)
134            )
135            self.assertIn("terminate", str(e))
136
137    def test_protocol_message(self):
138        logger = logging.getLogger()
139
140        name = u"Testname"
141        expected_regex_str = r"Hello {} calling version (.*)".format(name)
142        expected_regex = bytes(bytearray(expected_regex_str, "utf-8"))
143
144        hello_message = ProtocolMessages().hello(name)
145        logger.debug(hello_message)
146
147        self.assertRegex(hello_message, expected_regex)
148
149        version = re.search(expected_regex, hello_message).group(1).decode("utf-8")
150        logger.debug(u"version: {} ({})".format(version, type(version)))
151
152        self.assertGreaterEqual(
153            self.versiontuple(version), self.versiontuple(u"18.2.5")
154        )
155
156    def test_password(self):
157        password = bareos.bsock.Password("secret")
158        md5 = password.md5()
159        self.assertTrue(isinstance(md5, bytes))
160        self.assertEqual(md5, b"5ebe2294ecd0e0f08eab7690d2a6ee69")
161
162
163class PythonBareosPlainTest(PythonBareosBase):
164    def test_login_to_noexisting_host(self):
165        logger = logging.getLogger()
166
167        # try to connect to invalid host:port. Use port 9 (discard).
168        port = 9
169
170        bareos_password = bareos.bsock.Password(self.director_root_password)
171        with self.assertRaises(bareos.exceptions.ConnectionError):
172            director = bareos.bsock.DirectorConsole(
173                address=self.director_address,
174                port=port,
175                password=bareos_password,
176                **self.director_extra_options
177            )
178
179    def test_login_as_root(self):
180        logger = logging.getLogger()
181
182        bareos_password = bareos.bsock.Password(self.director_root_password)
183        director = bareos.bsock.DirectorConsole(
184            address=self.director_address,
185            port=self.director_port,
186            password=bareos_password,
187            **self.director_extra_options
188        )
189        whoami = director.call("whoami").decode("utf-8")
190        self.assertEqual("root", whoami.rstrip())
191
192    def test_login_as_user(self):
193        logger = logging.getLogger()
194
195        username = self.get_operator_username()
196        password = self.get_operator_password(username)
197
198        director = bareos.bsock.DirectorConsole(
199            address=self.director_address,
200            port=self.director_port,
201            name=username,
202            password=password,
203            **self.director_extra_options
204        )
205        whoami = director.call("whoami").decode("utf-8")
206        self.assertEqual(username, whoami.rstrip())
207
208    def test_login_with_not_existing_username(self):
209        """
210        Verify bareos.bsock.DirectorConsole raises an AuthenticationError exception.
211        """
212        logger = logging.getLogger()
213
214        username = "nonexistinguser"
215        password = "secret"
216
217        bareos_password = bareos.bsock.Password(password)
218        with self.assertRaises(bareos.exceptions.AuthenticationError):
219            with warnings.catch_warnings():
220                warnings.simplefilter("ignore")
221                director = bareos.bsock.DirectorConsole(
222                    address=self.director_address,
223                    port=self.director_port,
224                    name=username,
225                    password=bareos_password,
226                    **self.director_extra_options
227                )
228
229    def test_login_with_wrong_password(self):
230        """
231        Verify bareos.bsock.DirectorConsole raises an AuthenticationError exception.
232        """
233        logger = logging.getLogger()
234
235        username = self.get_operator_username()
236        password = "WRONGPASSWORD"
237
238        bareos_password = bareos.bsock.Password(password)
239        with self.assertRaises(bareos.exceptions.AuthenticationError):
240            with warnings.catch_warnings():
241                warnings.simplefilter("ignore")
242                director = bareos.bsock.DirectorConsole(
243                    address=self.director_address,
244                    port=self.director_port,
245                    name=username,
246                    password=bareos_password,
247                    **self.director_extra_options
248                )
249
250    def test_no_autodisplay_command(self):
251        """
252        The console has no access to the "autodisplay" command.
253        However, the initialization of DirectorConsole calls this command.
254        However, the error should not be visible to the console.
255        """
256        logger = logging.getLogger()
257
258        username = u"noautodisplaycommand"
259        password = u"secret"
260
261        bareos_password = bareos.bsock.Password(password)
262        director = bareos.bsock.DirectorConsole(
263            address=self.director_address,
264            port=self.director_port,
265            name=username,
266            password=bareos_password,
267            **self.director_extra_options
268        )
269
270        # get the list of all command
271        result = director.call(".help all")
272        # logger.debug(str(result))
273
274        # verify, the result contains command. We test for the list command.
275        self.assertIn(u"list", str(result))
276        # verify, the result does not contain the autodisplay command.
277        self.assertNotIn(u"autodisplay", str(result))
278
279        # check if the result of 'whoami' only contains the expected result.
280        result = director.call("whoami").decode("utf-8")
281        logger.debug(str(result))
282
283        self.assertEqual(username, result.rstrip())
284
285    def test_json_without_json_backend(self):
286        logger = logging.getLogger()
287
288        username = self.get_operator_username()
289        password = self.get_operator_password(username)
290
291        director = bareos.bsock.DirectorConsole(
292            address=self.director_address,
293            port=self.director_port,
294            name=username,
295            password=password,
296            **self.director_extra_options
297        )
298        result = director.call(".api json").decode("utf-8")
299        result = director.call("whoami").decode("utf-8")
300        logger.debug(str(result))
301        content = json.loads(str(result))
302        logger.debug(str(content))
303        self.assertEqual(content["result"]["whoami"].rstrip(), username)
304
305
306class PythonBareosProtocol124Test(PythonBareosBase):
307    """
308    There are 4 cases to check:
309    # console: notls, director: notls => login (notls)
310    # console: notls, director: tls   => login (notls!)
311    # console: tls, director: notls   => login (tls!)
312    # console: tls, director: tls     => login (tls)
313    """
314
315    def test_login_notls_notls(self):
316        """
317        console: notls, director: notls => login
318        """
319
320        logger = logging.getLogger()
321
322        username = self.get_operator_username(tls=False)
323        password = self.get_operator_password(username)
324
325        director = bareos.bsock.DirectorConsole(
326            address=self.director_address,
327            port=self.director_port,
328            protocolversion=ProtocolVersions.bareos_12_4,
329            tls_psk_enable=False,
330            name=username,
331            password=password,
332            **self.director_extra_options
333        )
334
335        whoami = director.call("whoami").decode("utf-8")
336        self.assertEqual(username, whoami.rstrip())
337
338        protocolversion = director.get_protocol_version()
339        self.assertEqual(protocolversion, ProtocolVersions.bareos_12_4)
340
341        # As there is no encryption,
342        # the socket should not contain a cipher() method.
343        self.assertFalse(hasattr(director.socket, "cipher"))
344
345    def test_login_notls_tls(self):
346        """
347        console: notls, director: tls => login
348
349        Login will succeed.
350        This happens only, because the old ProtocolVersions.bareos_12_4 is used.
351        When using the current protocol, this login will fail.
352        """
353
354        logger = logging.getLogger()
355
356        username = self.get_operator_username(tls=True)
357        password = self.get_operator_password(username)
358
359        director = bareos.bsock.DirectorConsole(
360            address=self.director_address,
361            port=self.director_port,
362            protocolversion=ProtocolVersions.bareos_12_4,
363            tls_psk_enable=False,
364            name=username,
365            password=password,
366            **self.director_extra_options
367        )
368        whoami = director.call("whoami").decode("utf-8")
369        self.assertEqual(username, whoami.rstrip())
370
371        protocolversion = director.get_protocol_version()
372        self.assertEqual(protocolversion, ProtocolVersions.bareos_12_4)
373
374        # As there is no encryption,
375        # the socket should not contain a cipher() method.
376        self.assertFalse(hasattr(director.socket, "cipher"))
377
378    @unittest.skipUnless(
379        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
380    )
381    def test_login_tls_notls(self):
382        """
383        console: tls, director: notls => login (TLS!)
384
385        The behavior of the Bareos Director is to use TLS-PSK
386        even if "TLS Enable = no" is set in the Console configuration of the Director.
387        """
388
389        logger = logging.getLogger()
390
391        username = self.get_operator_username(tls=False)
392        password = self.get_operator_password(username)
393
394        director = bareos.bsock.DirectorConsole(
395            address=self.director_address,
396            port=self.director_port,
397            protocolversion=ProtocolVersions.bareos_12_4,
398            tls_psk_enable=True,
399            name=username,
400            password=password,
401            **self.director_extra_options
402        )
403
404        whoami = director.call("whoami").decode("utf-8")
405        self.assertEqual(username, whoami.rstrip())
406
407        protocolversion = director.get_protocol_version()
408        self.assertEqual(protocolversion, ProtocolVersions.bareos_12_4)
409
410        self.assertTrue(hasattr(director.socket, "cipher"))
411        cipher = director.socket.cipher()
412        logger.debug(str(cipher))
413
414    @unittest.skipUnless(
415        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
416    )
417    def test_login_tls_tls(self):
418        """
419        console: tls, director: tls     => login
420        """
421
422        logger = logging.getLogger()
423
424        username = self.get_operator_username(tls=True)
425        password = self.get_operator_password(username)
426
427        director = bareos.bsock.DirectorConsole(
428            address=self.director_address,
429            port=self.director_port,
430            protocolversion=ProtocolVersions.bareos_12_4,
431            name=username,
432            password=password,
433            **self.director_extra_options
434        )
435
436        whoami = director.call("whoami").decode("utf-8")
437        self.assertEqual(username, whoami.rstrip())
438
439        protocolversion = director.get_protocol_version()
440        self.assertEqual(protocolversion, ProtocolVersions.bareos_12_4)
441
442        self.assertTrue(hasattr(director.socket, "cipher"))
443        cipher = director.socket.cipher()
444        logger.debug(str(cipher))
445
446
447class PythonBareosTlsPskTest(PythonBareosBase):
448    """
449    This test used the default protocol
450    (opposite to the PythonBareosProtocol124Test class,
451    which uses ProtocolVersions.bareos_12_4).
452    However, on authentication failures,
453    DirectorConsole falls back to ProtocolVersions.bareos_12_4.
454
455    There are 4 cases to check:
456    # console: notls, director: notls => login  (notls)
457    # console: notls, director: tls   => login! (notls)
458    # console: tls, director: notls   => login  (tls!)
459    # console: tls, director: tls     => login  (tls)
460    """
461
462    def test_tls_psk_identity(self):
463        """
464        Check if tls_psk_identity is in the expected format
465        and is of type "bytes".
466        """
467        core = LowLevel()
468        core.identity_prefix = "R_TEST"
469        core.name = "Test"
470        identity = core.get_tls_psk_identity()
471        self.assertTrue(isinstance(identity, bytes))
472        self.assertEqual(identity, b"R_TEST\x1eTest")
473
474    def test_login_notls_notls(self):
475        """
476        console: notls, director: notls => login
477        """
478
479        logger = logging.getLogger()
480
481        username = self.get_operator_username(tls=False)
482        password = self.get_operator_password(username)
483
484        director = bareos.bsock.DirectorConsole(
485            address=self.director_address,
486            port=self.director_port,
487            tls_psk_enable=False,
488            name=username,
489            password=password,
490            **self.director_extra_options
491        )
492
493        whoami = director.call("whoami").decode("utf-8")
494        self.assertEqual(username, whoami.rstrip())
495
496        # As there is no encryption,
497        # the socket should not contain a cipher() method.
498        self.assertFalse(hasattr(director.socket, "cipher"))
499
500    def test_login_notls_tls(self):
501        """
502        console: notls, director: tls => login
503
504        This works, because DirectorConsole falls back to the old protocol.
505        Set tls_psk_require=True, if this should not happen.
506        """
507
508        logger = logging.getLogger()
509
510        username = u"admin-tls"
511        password = self.get_operator_password(username)
512
513        director = bareos.bsock.DirectorConsole(
514            address=self.director_address,
515            port=self.director_port,
516            tls_psk_enable=False,
517            name=username,
518            password=password,
519            **self.director_extra_options
520        )
521
522        whoami = director.call("whoami").decode("utf-8")
523        self.assertEqual(username, whoami.rstrip())
524
525        # As there is no encryption,
526        # the socket should not contain a cipher() method.
527        self.assertFalse(hasattr(director.socket, "cipher"))
528
529    def test_login_notls_tls_fixedprotocolversion(self):
530        """
531        console: notls, director: tls => nologin
532
533        As the protocolversion is set to a fixed value,
534        there will be no fallback to the old protocol.
535        """
536
537        logger = logging.getLogger()
538
539        username = u"admin-tls"
540        password = self.get_operator_password(username)
541
542        with self.assertRaises(bareos.exceptions.AuthenticationError):
543            with warnings.catch_warnings():
544                warnings.simplefilter("ignore")
545                director = bareos.bsock.DirectorConsole(
546                    address=self.director_address,
547                    port=self.director_port,
548                    tls_psk_enable=False,
549                    protocolversion=ProtocolVersions.last,
550                    name=username,
551                    password=password,
552                    **self.director_extra_options
553                )
554
555    @unittest.skipUnless(
556        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
557    )
558    def test_login_tls_notls(self):
559        """
560        console: tls, director: notls => login (TLS!)
561
562        The behavior of the Bareos Director is to use TLS-PSK
563        even if "TLS Enable = no" is set in the Console configuration of the Director.
564        """
565
566        logger = logging.getLogger()
567
568        username = self.get_operator_username(tls=False)
569        password = self.get_operator_password(username)
570
571        director = bareos.bsock.DirectorConsole(
572            address=self.director_address,
573            port=self.director_port,
574            tls_psk_enable=True,
575            name=username,
576            password=password,
577            **self.director_extra_options
578        )
579
580        whoami = director.call("whoami").decode("utf-8")
581        self.assertEqual(username, whoami.rstrip())
582
583        self.assertTrue(hasattr(director.socket, "cipher"))
584        cipher = director.socket.cipher()
585        logger.debug(str(cipher))
586
587    @unittest.skipUnless(
588        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
589    )
590    def test_login_tls_tls(self):
591        """
592        console: tls, director: tls     => login
593        """
594
595        logger = logging.getLogger()
596
597        username = self.get_operator_username(tls=True)
598        password = self.get_operator_password(username)
599
600        director = bareos.bsock.DirectorConsole(
601            address=self.director_address,
602            port=self.director_port,
603            name=username,
604            password=password,
605            **self.director_extra_options
606        )
607
608        whoami = director.call("whoami").decode("utf-8")
609        self.assertEqual(username, whoami.rstrip())
610
611        self.assertTrue(hasattr(director.socket, "cipher"))
612        cipher = director.socket.cipher()
613        logger.debug(str(cipher))
614
615    @unittest.skipUnless(
616        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
617    )
618    def test_login_tls_tls_require(self):
619        """
620        console: tls, director: tls     => login
621        """
622
623        logger = logging.getLogger()
624
625        username = self.get_operator_username(tls=True)
626        password = self.get_operator_password(username)
627
628        director = bareos.bsock.DirectorConsole(
629            address=self.director_address,
630            port=self.director_port,
631            tls_psk_require=True,
632            name=username,
633            password=password,
634            **self.director_extra_options
635        )
636
637        whoami = director.call("whoami").decode("utf-8")
638        self.assertEqual(username, whoami.rstrip())
639
640        self.assertTrue(hasattr(director.socket, "cipher"))
641        cipher = director.socket.cipher()
642        logger.debug(str(cipher))
643
644    @unittest.skipUnless(
645        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
646    )
647    def test_login_tls_tls_fixedprotocolversion(self):
648        """
649        console: tls, director: tls     => login
650        """
651
652        logger = logging.getLogger()
653
654        username = self.get_operator_username(tls=True)
655        password = self.get_operator_password(username)
656
657        director = bareos.bsock.DirectorConsole(
658            address=self.director_address,
659            port=self.director_port,
660            protocolversion=ProtocolVersions.last,
661            tls_psk_require=True,
662            name=username,
663            password=password,
664            **self.director_extra_options
665        )
666
667        whoami = director.call("whoami").decode("utf-8")
668        self.assertEqual(username, whoami.rstrip())
669
670        self.assertTrue(hasattr(director.socket, "cipher"))
671        cipher = director.socket.cipher()
672        logger.debug(str(cipher))
673
674
675#
676# Test with JSON backend
677#
678
679
680class PythonBareosJsonBase(PythonBareosBase):
681
682    #
683    # Util
684    #
685
686    @staticmethod
687    def check_resource(director, resourcesname, name):
688        logger = logging.getLogger()
689        rc = False
690        try:
691            result = director.call(u".{}".format(resourcesname))
692            for i in result[resourcesname]:
693                if i["name"] == name:
694                    rc = True
695        except Exception as e:
696            logger.warn(str(e))
697        return rc
698
699    @staticmethod
700    def check_console(director, name):
701        return PythonBareosJsonBase.check_resource(director, "consoles", name)
702
703    @staticmethod
704    def check_jobname(director, name):
705        return PythonBareosJsonBase.check_resource(director, "jobs", name)
706
707    def configure_add(self, director, resourcesname, resourcename, cmd):
708        if not self.check_resource(director, resourcesname, resourcename):
709            result = director.call("configure add {}".format(cmd))
710            self.assertEqual(result["configure"]["add"]["name"], resourcename)
711            self.assertTrue(
712                self.check_resource(director, resourcesname, resourcename),
713                u"Failed to find resource {} in {}.".format(
714                    resourcename, resourcesname
715                ),
716            )
717
718    def run_job(self, director, jobname, level=None, wait=False):
719        logger = logging.getLogger()
720        run_parameter = ["job={}".format(jobname), "yes"]
721        if level:
722            run_parameter.append(u"level={}".format(level))
723        result = director.call("run {}".format(u" ".join(run_parameter)))
724        jobId = result["run"]["jobid"]
725        if wait:
726            result = director.call("wait jobid={}".format(jobId))
727            # "result": {
728            #    "job": {
729            #    "jobid": 1,
730            #    "jobstatuslong": "OK",
731            #    "jobstatus": "T",
732            #    "exitstatus": 0
733            #    }
734            # }
735            self.assertEqual(result["job"]["jobstatuslong"], u"OK")
736
737        return jobId
738
739    def _test_job_result(self, jobs, jobid):
740        logger = logging.getLogger()
741        for job in jobs:
742            if job["jobid"] == jobid:
743                files = int(job["jobfiles"])
744                logger.debug(u"Job {} contains {} files.".format(jobid, files))
745                self.assertTrue(files >= 1, u"Job {} contains no files.".format(jobid))
746                return True
747        self.fail("Failed to find job {}".format(jobid))
748        # add return to prevent pylint warning
749        return False
750
751    def _test_no_volume_in_pool(self, console, password, pool):
752        logger = logging.getLogger()
753        bareos_password = bareos.bsock.Password(password)
754        console_poolbotfull = bareos.bsock.DirectorConsoleJson(
755            address=self.director_address,
756            port=self.director_port,
757            name=console,
758            password=bareos_password,
759            **self.director_extra_options
760        )
761
762        result = console_poolbotfull.call("llist media all")
763        logger.debug(str(result))
764
765        self.assertGreaterEqual(len(result["volumes"]), 1)
766
767        for volume in result["volumes"]:
768            self.assertNotEqual(volume["pool"], pool)
769
770        return True
771
772    def _test_list_with_valid_jobid(self, director, jobid):
773        for cmd in ["list", "llist"]:
774            result = director.call("{} jobs".format(cmd))
775            self._test_job_result(result["jobs"], jobid)
776
777            listcmd = "{} jobid={}".format(cmd, jobid)
778            result = director.call(listcmd)
779            self._test_job_result(result["jobs"], jobid)
780
781    def _test_list_with_invalid_jobid(self, director, jobid):
782        for cmd in ["list", "llist"]:
783            result = director.call("{} jobs".format(cmd))
784            with self.assertRaises(AssertionError):
785                self._test_job_result(result["jobs"], jobid)
786
787            listcmd = "{} jobid={}".format(cmd, jobid)
788            result = director.call(listcmd)
789            self.assertEqual(
790                len(result),
791                0,
792                u"Command {} should not return results. Current result: {} visible".format(
793                    listcmd, str(result)
794                ),
795            )
796
797    def search_joblog(self, director, jobId, patterns):
798
799        if isinstance(patterns, list):
800            pattern_dict = {i: False for i in patterns}
801        else:
802            pattern_dict = {patterns: False}
803
804        result = director.call("list joblog jobid={}".format(jobId))
805        joblog_list = result["joblog"]
806
807        found = False
808        for pattern in pattern_dict:
809            for logentry in joblog_list:
810                if re.search(pattern, logentry["logtext"]):
811                    pattern_dict[pattern] = True
812            self.assertTrue(
813                pattern_dict[pattern],
814                'Failed to find pattern "{}" in Job Log of Job {}.'.format(
815                    pattern, jobId
816                ),
817            )
818
819    def run_job_and_search_joblog(self, director, jobname, level, patterns):
820
821        jobId = self.run_job(director, jobname, level, wait=True)
822        self.search_joblog(director, jobId, patterns)
823        return jobId
824
825    def list_jobid(self, director, jobid):
826        result = director.call("llist jobid={}".format(jobid))
827        try:
828            return result["jobs"][0]
829        except KeyError:
830            raise ValueError(u"jobid {} does not exist".format(jobid))
831
832
833class PythonBareosJsonBackendTest(PythonBareosJsonBase):
834    def test_json_backend(self):
835        logger = logging.getLogger()
836
837        username = self.get_operator_username()
838        password = self.get_operator_password(username)
839        client = self.client
840
841        director = bareos.bsock.DirectorConsoleJson(
842            address=self.director_address,
843            port=self.director_port,
844            name=username,
845            password=password,
846            **self.director_extra_options
847        )
848        result = director.call("list clients")
849        logger.debug(str(result))
850        # test if self.client is in the result of "list clients"
851        for i in result["clients"]:
852            logger.debug(str(i))
853            if i["name"] == client:
854                return
855        self.fail('Failed to retrieve client {} from "list clients"'.format(client))
856
857    def test_json_with_invalid_command(self):
858        logger = logging.getLogger()
859
860        username = self.get_operator_username()
861        password = self.get_operator_password(username)
862
863        director = bareos.bsock.DirectorConsoleJson(
864            address=self.director_address,
865            port=self.director_port,
866            name=username,
867            password=password,
868            **self.director_extra_options
869        )
870
871        with self.assertRaises(bareos.exceptions.JsonRpcErrorReceivedException):
872            result = director.call("invalidcommand")
873
874    def test_json_whoami(self):
875        logger = logging.getLogger()
876
877        username = self.get_operator_username()
878        password = self.get_operator_password(username)
879
880        director = bareos.bsock.DirectorConsoleJson(
881            address=self.director_address,
882            port=self.director_port,
883            name=username,
884            password=password,
885            **self.director_extra_options
886        )
887        result = director.call("whoami")
888        logger.debug(str(result))
889        self.assertEqual(username, result["whoami"])
890
891    @unittest.skip("Most commands do return valid JSON")
892    def test_json_backend_with_invalid_json_output(self):
893        logger = logging.getLogger()
894
895        # This command sends additional plain (none JSON) output.
896        # Therefore the result is not valid JSON.
897        # Used "show clients" earlier,
898        # however, this now produces valid output.
899        # Commands like 'status storage' (and 'status client') only produces empty output.
900        # The "messages" command shows plain outout in JSON mode,
901        # but only if there are pending messages.
902        bcmd = "show clients"
903
904        username = self.get_operator_username()
905        password = self.get_operator_password(username)
906
907        director_plain = bareos.bsock.DirectorConsole(
908            address=self.director_address,
909            port=self.director_port,
910            name=username,
911            password=password,
912            **self.director_extra_options
913        )
914
915        result = director_plain.call(bcmd)
916        logger.debug(str(result))
917
918        director_json = bareos.bsock.DirectorConsoleJson(
919            address=self.director_address,
920            port=self.director_port,
921            name=username,
922            password=password,
923            **self.director_extra_options
924        )
925
926        # The JsonRpcInvalidJsonReceivedException
927        # is inherited from JsonRpcErrorReceivedException,
928        # so both exceptions could by tried.
929        with self.assertRaises(bareos.exceptions.JsonRpcInvalidJsonReceivedException):
930            result = director_json.call(bcmd)
931
932        with self.assertRaises(bareos.exceptions.JsonRpcErrorReceivedException):
933            result = director_json.call(bcmd)
934
935    def test_json_no_api_command(self):
936        """
937        The bareos.bsock.DirectorConsoleJson calls .api on initialization.
938        This test verifies, that a exception is raised,
939        when it is not available.
940        """
941        logger = logging.getLogger()
942
943        username = "noapicommand"
944        password = "secret"
945
946        bareos_password = bareos.bsock.Password(password)
947        with self.assertRaises(bareos.exceptions.JsonRpcInvalidJsonReceivedException):
948            # We expect, that an exception is raised,
949            # as con class initialization,
950            # the ".api json" command is called
951            # and the "noapicommand" don't have access to this command.
952            director = bareos.bsock.DirectorConsoleJson(
953                address=self.director_address,
954                port=self.director_port,
955                name=username,
956                password=bareos_password,
957                **self.director_extra_options
958            )
959
960
961class PythonBareosAclTest(PythonBareosJsonBase):
962    def test_restore_with_client_acl(self):
963        """
964        Check if the restore command honors the client ACL.
965        Login as console with access only to client = bareos-fd.
966        Verify, that a restore can only be performed from this client.
967
968        It checks the intercative restore command,
969        therefore it can not use the Json console.
970        """
971        logger = logging.getLogger()
972
973        username = self.get_operator_username()
974        password = self.get_operator_password(username)
975
976        console_bareos_fd_username = u"client-bareos-fd"
977        console_bareos_fd_password = u"secret"
978
979        director_root = bareos.bsock.DirectorConsoleJson(
980            address=self.director_address,
981            port=self.director_port,
982            name=username,
983            password=password,
984            **self.director_extra_options
985        )
986
987        result = director_root.call("run job=backup-bareos-fd level=Full yes")
988        logger.debug(str(result))
989
990        jobIdBareosFdFull = result["run"]["jobid"]
991
992        result = director_root.call("wait jobid={}".format(jobIdBareosFdFull))
993
994        result = director_root.call("run job=backup-test2-fd level=Full yes")
995        logger.debug(str(result))
996
997        jobIdTestFdFull = result["run"]["jobid"]
998
999        result = director_root.call("wait jobid={}".format(jobIdTestFdFull))
1000
1001        #
1002        # login as console with ACLs
1003        #
1004        bareos_password = bareos.bsock.Password(console_bareos_fd_password)
1005        console_bareos_fd = bareos.bsock.DirectorConsole(
1006            address=self.director_address,
1007            port=self.director_port,
1008            name=console_bareos_fd_username,
1009            password=bareos_password,
1010            **self.director_extra_options
1011        )
1012
1013        result = console_bareos_fd.call("restore")
1014        logger.debug(str(result))
1015
1016        #
1017        # restore: 1: List last 20 Jobs run
1018        #
1019        # This requires access to the "sqlquery" command,
1020        # which this console does not have.
1021        #
1022        result = console_bareos_fd.call("1")
1023        logger.debug(str(result))
1024        self.assertEqual(b"SQL query not authorized.", result.strip())
1025
1026        result = console_bareos_fd.call("restore")
1027
1028        #
1029        # restore: 2: List Jobs where a given File is saved
1030        #
1031        # Only the bareos-fd client should be accessable
1032        # and is therefore autoselected.
1033        #
1034        result = console_bareos_fd.call("2")
1035        logger.debug(str(result))
1036        self.assertIn(b"Automatically selected Client: bareos-fd", result)
1037        result = console_bareos_fd.call("Makefile")
1038        logger.debug(str(result))
1039        self.assertIn(b"/Makefile ", result)
1040        # result = console_bareos_fd.call('.')
1041
1042        #
1043        # restore: 3: Enter list of comma separated JobIds to select
1044        #
1045        # We select a job that did run on a client
1046        # we don't have access to.
1047        # Therefore we expect that we are not allowed to access this jobid.
1048        # However, we got access.
1049        # TODO: This has to be fixed in the Bareos Director.
1050        #
1051        result = console_bareos_fd.call("3")
1052        logger.debug(str(result))
1053        result = console_bareos_fd.call(jobIdTestFdFull)
1054        logger.debug(str(result))
1055        # TODO: This is a bug.
1056        #       ACL checking does not work here,
1057        #       because this jobid should be accessable in this console.
1058        # self.assertIn(b'No Job found for JobId', result)
1059        result = console_bareos_fd.call("find *")
1060        logger.debug(str(result))
1061        self.assertIn(b"/Makefile", result)
1062        result = console_bareos_fd.call("done")
1063
1064        result = console_bareos_fd.call("restore")
1065
1066        #
1067        # 4: Enter SQL list command
1068        #
1069        # This requires access to the "sqlquery" command,
1070        # which this console does not have.
1071        #
1072        result = console_bareos_fd.call("4")
1073        logger.debug(str(result))
1074        self.assertEqual(b"SQL query not authorized.", result.strip())
1075
1076        result = console_bareos_fd.call("restore")
1077
1078        #
1079        # 5: Select the most recent backup for a client
1080        #
1081        # Only the bareos-fd client should be accessable
1082        # and is therefore autoselected.
1083        #
1084        result = console_bareos_fd.call("5")
1085        logger.debug(str(result))
1086        self.assertIn(b"Automatically selected Client: bareos-fd", result)
1087        result = console_bareos_fd.call("done")
1088        logger.debug(str(result))
1089        # result = console_bareos_fd.call('.')
1090
1091        # The remaining options are not tested,
1092        # as we assume that we have covered the relevant cases.
1093
1094
1095class PythonBareosJsonAclTest(PythonBareosJsonBase):
1096    def test_json_list_media_with_pool_acl(self):
1097        """
1098        This tests checks if the Pool ACL works with the "llist media all" command.
1099
1100        login as admin
1101          run a Full job. It gets stored in pool Full
1102          modify the backup directory
1103          run a Incremental job. It gets stored in pool Incremental
1104          verifies that at least 1 volume exists in the Full pool.
1105          verifies that at least 1 volume exists in the Incremental pool.
1106        login as console 'poolfull'
1107          verifies that "llist media all" only shows volumes from the Full pool.
1108        login as console 'poolnotfull'
1109          verifies that "llist media all" only shows volumes not from the Full pool.
1110        """
1111        logger = logging.getLogger()
1112
1113        username = self.get_operator_username()
1114        password = self.get_operator_password(username)
1115
1116        console_password = u"secret"
1117
1118        director_root = bareos.bsock.DirectorConsoleJson(
1119            address=self.director_address,
1120            port=self.director_port,
1121            name=username,
1122            password=password,
1123            **self.director_extra_options
1124        )
1125
1126        # result = director_root.call('run job=backup-bareos-fd level=Full yes')
1127        # logger.debug(str(result))
1128
1129        # jobIdFull = result['run']['jobid']
1130
1131        # wait for job to finish, otherwise next incremental is upgraded to Full.
1132        # result = director_root.call('wait jobid={}'.format(jobIdFull))
1133
1134        jobIdFull = self.run_job(director_root, "backup-bareos-fd", "Full", wait=True)
1135
1136        # make sure, timestamp differs
1137        sleep(2)
1138
1139        with open(u"{}/extrafile.txt".format(self.backup_directory), "a") as writer:
1140            writer.write("Test\n")
1141            writer.flush()
1142
1143        sleep(2)
1144
1145        # result = director_root.call('run job=backup-bareos-fd level=Incremental yes')
1146        # logger.debug(str(result))
1147
1148        # jobIdIncr = result['run']['jobid']
1149
1150        # result = director_root.call('wait jobid={}'.format(jobIdIncr))
1151
1152        jobIdIncr = self.run_job(
1153            director_root, "backup-bareos-fd", "Incremental", wait=True
1154        )
1155
1156        result = director_root.call("list jobs")
1157        logger.debug(str(result))
1158
1159        self._test_job_result(result["jobs"], jobIdFull)
1160        self._test_job_result(result["jobs"], jobIdIncr)
1161
1162        result = director_root.call("list volume pool=Full count")
1163        self.assertTrue(
1164            int(result["volumes"][0]["count"]) >= 1, u"Full pool contains no volumes."
1165        )
1166
1167        result = director_root.call("list volume pool=Incremental count")
1168        self.assertTrue(
1169            int(result["volumes"][0]["count"]) >= 1,
1170            u"Incremental pool contains no volumes.",
1171        )
1172
1173        # without Pool ACL restrictions,
1174        # 'list media all' returns all volumes from all pools.
1175        result = director_root.call("list media all")
1176        logger.debug(str(result))
1177        self.assertGreaterEqual(len(result["volumes"]), 2)
1178
1179        #
1180        # login as console 'poolfull'
1181        #
1182        bareos_password = bareos.bsock.Password(console_password)
1183        console_poolfull = bareos.bsock.DirectorConsoleJson(
1184            address=self.director_address,
1185            port=self.director_port,
1186            name="poolfull",
1187            password=bareos_password,
1188            **self.director_extra_options
1189        )
1190
1191        # 'list media all' returns an error,
1192        # as the current user has limited Pool ACL permissions.
1193        # This behavior describes the current behavior.
1194        # Improvements on the server side welcome.
1195        with self.assertRaises(bareos.exceptions.JsonRpcErrorReceivedException):
1196            result = console_poolfull.call("list media all")
1197
1198        result = console_poolfull.call("llist media all")
1199        logger.debug(str(result))
1200
1201        self.assertGreaterEqual(len(result["volumes"]), 1)
1202
1203        for volume in result["volumes"]:
1204            self.assertEqual(volume["pool"], "Full")
1205
1206        #
1207        # login as console 'poolnotfull'
1208        #
1209        self._test_no_volume_in_pool("poolnotfull", console_password, "Full")
1210
1211        #
1212        # use profile without Pool restrictions
1213        # and overwrite the Pool ACL in the console.
1214        #
1215        console_overwrite = "overwritepoolacl"
1216        self.configure_add(
1217            director_root,
1218            "consoles",
1219            console_overwrite,
1220            u"console={} password={} profile=operator poolacl=!Full tlsenable=no".format(
1221                console_overwrite, console_password
1222            ),
1223        )
1224
1225        # TODO:
1226        # IMHO this is a bug.
1227        # This console should not see volumes in the Full pool.
1228        # It needs to be fixed in the server side code.
1229        with self.assertRaises(AssertionError):
1230            self._test_no_volume_in_pool(console_overwrite, console_password, "Full")
1231
1232    def test_json_list_jobid_with_job_acl(self):
1233        """
1234        This tests checks if the Job ACL works with the "list jobs" and "list jobid=<>" commands.
1235
1236        login as operator
1237          run a backup-bareos-fd job.
1238          create and run a backup-bareos-fd-test job.
1239          verifies that both jobs are visible by the list command.
1240        login as a console that can only see backup-bareos-fd jobs
1241          verifies that the backup-bareos-fd is visible.
1242          verifies that the backup-bareos-fd is not visible.
1243        """
1244        logger = logging.getLogger()
1245
1246        username = self.get_operator_username()
1247        password = self.get_operator_password(username)
1248
1249        console_username = u"job-backup-bareos-fd"
1250        console_password = u"secret"
1251        jobname1 = u"backup-bareos-fd"
1252        jobname2 = u"backup-bareos-fd-test"
1253
1254        director_root = bareos.bsock.DirectorConsoleJson(
1255            address=self.director_address,
1256            port=self.director_port,
1257            name=username,
1258            password=password,
1259            **self.director_extra_options
1260        )
1261
1262        jobid1 = self.run_job(
1263            director=director_root, jobname=jobname1, level=u"Full", wait=True
1264        )
1265
1266        self.configure_add(
1267            director_root,
1268            "jobs",
1269            jobname2,
1270            "job name={} client=bareos-fd jobdefs=DefaultJob".format(jobname2),
1271        )
1272
1273        jobid2 = self.run_job(
1274            director=director_root, jobname=jobname2, level=u"Full", wait=True
1275        )
1276
1277        #
1278        # both jobid should be visible
1279        #
1280        self._test_list_with_valid_jobid(director_root, jobid1)
1281        self._test_list_with_valid_jobid(director_root, jobid2)
1282
1283        #
1284        # login as console_username
1285        #
1286        bareos_password = bareos.bsock.Password(console_password)
1287        director = bareos.bsock.DirectorConsoleJson(
1288            address=self.director_address,
1289            port=self.director_port,
1290            name=console_username,
1291            password=bareos_password,
1292            **self.director_extra_options
1293        )
1294
1295        #
1296        # only jobid1 should be visible
1297        #
1298        self._test_list_with_valid_jobid(director, jobid1)
1299        self._test_list_with_invalid_jobid(director, jobid2)
1300
1301
1302class PythonBareosJsonRunScriptTest(PythonBareosJsonBase):
1303    def test_backup_runscript_client_defaults(self):
1304        """
1305        Run a job which contains a runscript.
1306        Check the JobLog if the runscript worked as expected.
1307        """
1308        logger = logging.getLogger()
1309
1310        username = self.get_operator_username()
1311        password = self.get_operator_password(username)
1312
1313        jobname = "backup-bareos-fd-runscript-client-defaults"
1314        level = None
1315        expected_log = ": ClientBeforeJob: jobname={}".format(jobname)
1316
1317        director_root = bareos.bsock.DirectorConsoleJson(
1318            address=self.director_address,
1319            port=self.director_port,
1320            name=username,
1321            password=password,
1322            **self.director_extra_options
1323        )
1324
1325        # Example log entry:
1326        # {
1327        #    "time": "2019-12-02 00:07:34",
1328        #    "logtext": "bareos-dir JobId 76: BeforeJob: jobname=admin-runscript-server\n"
1329        # },
1330
1331        jobId = self.run_job_and_search_joblog(
1332            director_root, jobname, level, expected_log
1333        )
1334
1335    def test_backup_runscript_client(self):
1336        """
1337        Run a job which contains a runscript.
1338        Check the JobLog if the runscript worked as expected.
1339        """
1340        logger = logging.getLogger()
1341
1342        username = self.get_operator_username()
1343        password = self.get_operator_password(username)
1344
1345        jobname = "backup-bareos-fd-runscript-client"
1346        level = None
1347        expected_log = ": ClientBeforeJob: jobname={}".format(jobname)
1348
1349        director_root = bareos.bsock.DirectorConsoleJson(
1350            address=self.director_address,
1351            port=self.director_port,
1352            name=username,
1353            password=password,
1354            **self.director_extra_options
1355        )
1356
1357        # Example log entry:
1358        # {
1359        #    "time": "2019-12-02 00:07:34",
1360        #    "logtext": "bareos-dir JobId 76: ClientBeforeJob: jobname=admin-runscript-server\n"
1361        # },
1362
1363        jobId = self.run_job_and_search_joblog(
1364            director_root, jobname, level, expected_log
1365        )
1366
1367    def test_backup_runscript_server(self):
1368        """
1369        Run a job which contains a runscript.
1370        Check the JobLog if the runscript worked as expected.
1371        """
1372        logger = logging.getLogger()
1373
1374        username = self.get_operator_username()
1375        password = self.get_operator_password(username)
1376
1377        jobname = "backup-bareos-fd-runscript-server"
1378        level = None
1379        expected_logs = [
1380            ": BeforeJob: jobname={}".format(jobname),
1381            ": BeforeJob: daemon=bareos-dir",
1382        ]
1383
1384        director_root = bareos.bsock.DirectorConsoleJson(
1385            address=self.director_address,
1386            port=self.director_port,
1387            name=username,
1388            password=password,
1389            **self.director_extra_options
1390        )
1391
1392        # Example log entry:
1393        # {
1394        #    "time": "2019-12-02 00:07:34",
1395        #    "logtext": "bareos-dir JobId 76: BeforeJob: jobname=admin-runscript-server\n"
1396        # },
1397
1398        jobId = self.run_job_and_search_joblog(
1399            director_root, jobname, level, expected_logs
1400        )
1401
1402    def test_admin_runscript_server(self):
1403        """
1404        Run a job which contains a runscript.
1405        Check the JobLog if the runscript worked as expected.
1406        """
1407        logger = logging.getLogger()
1408
1409        username = self.get_operator_username()
1410        password = self.get_operator_password(username)
1411
1412        jobname = "admin-runscript-server"
1413        level = None
1414        expected_logs = [
1415            ": BeforeJob: jobname={}".format(jobname),
1416            ": BeforeJob: daemon=bareos-dir",
1417            ": BeforeJob: jobtype=Admin",
1418        ]
1419
1420        director_root = bareos.bsock.DirectorConsoleJson(
1421            address=self.director_address,
1422            port=self.director_port,
1423            name=username,
1424            password=password,
1425            **self.director_extra_options
1426        )
1427
1428        # Example log entry:
1429        # {
1430        #    "time": "2019-12-02 00:07:34",
1431        #    "logtext": "bareos-dir JobId 76: BeforeJob: jobname=admin-runscript-server\n"
1432        # },
1433
1434        jobId = self.run_job_and_search_joblog(
1435            director_root, jobname, level, expected_logs
1436        )
1437
1438    def test_admin_runscript_client(self):
1439        """
1440        RunScripts configured with "RunsOnClient = yes" (default)
1441        are not executed in Admin Jobs.
1442        Instead, a warning is written to the joblog.
1443        """
1444        logger = logging.getLogger()
1445
1446        username = self.get_operator_username()
1447        password = self.get_operator_password(username)
1448
1449        jobname = "admin-runscript-client"
1450        level = None
1451        expected_logs = [": Invalid runscript definition"]
1452
1453        director_root = bareos.bsock.DirectorConsoleJson(
1454            address=self.director_address,
1455            port=self.director_port,
1456            name=username,
1457            password=password,
1458            **self.director_extra_options
1459        )
1460
1461        # Example log entry:
1462        # {
1463        # "time": "2019-12-12 13:23:16",
1464        # "logtext": "bareos-dir JobId 7: Warning: Invalid runscript definition (command=...). Admin Jobs only support local runscripts.\n"
1465        # },
1466
1467        jobId = self.run_job_and_search_joblog(
1468            director_root, jobname, level, expected_logs
1469        )
1470
1471
1472class PythonBareosJsonConfigTest(PythonBareosJsonBase):
1473    def test_show_command(self):
1474        """
1475        Verify, that the "show" command delivers valid JSON.
1476        If the JSON is not valid, the "call" command would raise an exception.
1477        """
1478        logger = logging.getLogger()
1479
1480        username = self.get_operator_username()
1481        password = self.get_operator_password(username)
1482
1483        director = bareos.bsock.DirectorConsoleJson(
1484            address=self.director_address,
1485            port=self.director_port,
1486            name=username,
1487            password=password,
1488            **self.director_extra_options
1489        )
1490
1491        resourcesname = "clients"
1492        newclient = "test-client-fd"
1493        newpassword = "secret"
1494
1495        director.call("show all")
1496
1497        try:
1498            os.remove("etc/bareos/bareos-dir.d/client/{}.conf".format(newclient))
1499            director.call("reload")
1500        except OSError:
1501            pass
1502
1503        self.assertFalse(
1504            self.check_resource(director, resourcesname, newclient),
1505            u"Resource {} in {} already exists.".format(newclient, resourcesname),
1506        )
1507
1508        with self.assertRaises(bareos.exceptions.JsonRpcErrorReceivedException):
1509            director.call("show client={}".format(newclient))
1510
1511        self.configure_add(
1512            director,
1513            resourcesname,
1514            newclient,
1515            u"client={} password={} address=127.0.0.1".format(newclient, newpassword),
1516        )
1517
1518        director.call("show all")
1519        director.call("show all verbose")
1520        result = director.call("show client={}".format(newclient))
1521        logger.debug(str(result))
1522        director.call("show client={} verbose".format(newclient))
1523
1524
1525class PythonBareosDeleteTest(PythonBareosJsonBase):
1526    def test_delete_jobid(self):
1527        """"""
1528        logger = logging.getLogger()
1529
1530        username = self.get_operator_username()
1531        password = self.get_operator_password(username)
1532
1533        jobname = u"backup-bareos-fd"
1534
1535        director = bareos.bsock.DirectorConsoleJson(
1536            address=self.director_address,
1537            port=self.director_port,
1538            name=username,
1539            password=password,
1540            **self.director_extra_options
1541        )
1542
1543        jobid = self.run_job(director, jobname, wait=True)
1544        job = self.list_jobid(director, jobid)
1545        result = director.call("delete jobid={}".format(jobid))
1546        logger.debug(str(result))
1547        self.assertIn(jobid, result["deleted"]["jobids"])
1548        with self.assertRaises(ValueError):
1549            job = self.list_jobid(director, jobid)
1550
1551    def test_delete_jobids(self):
1552        """
1553        """
1554        logger = logging.getLogger()
1555
1556        username = self.get_operator_username()
1557        password = self.get_operator_password(username)
1558
1559        director = bareos.bsock.DirectorConsoleJson(
1560            address=self.director_address,
1561            port=self.director_port,
1562            name=username,
1563            password=password,
1564            **self.director_extra_options
1565        )
1566
1567        # Note:
1568        # If delete is called on a non existing jobid,
1569        # this jobid will neithertheless be returned as deleted.
1570        jobids = ["1001", "1002", "1003"]
1571        result = director.call("delete jobid={}".format(",".join(jobids)))
1572        logger.debug(str(result))
1573        for jobid in jobids:
1574            self.assertIn(jobid, result["deleted"]["jobids"])
1575            with self.assertRaises(ValueError):
1576                job = self.list_jobid(director, jobid)
1577
1578    def test_delete_jobid_paramter(self):
1579        """
1580        """
1581        logger = logging.getLogger()
1582
1583        username = self.get_operator_username()
1584        password = self.get_operator_password(username)
1585
1586        director = bareos.bsock.DirectorConsoleJson(
1587            address=self.director_address,
1588            port=self.director_port,
1589            name=username,
1590            password=password,
1591            **self.director_extra_options
1592        )
1593
1594        # Note:
1595        # If delete is called on a non existing jobid,
1596        # this jobid will neithertheless be returned as deleted.
1597        jobids = "jobid=1001,1002,1101-1110,1201 jobid=2001,2002,2101-2110,2201"
1598        #                 1 +  1 +      10  + 1       +  1 +  1 +      10 +  1
1599        number = 26
1600        result = director.call("delete {}".format(jobids))
1601        logger.debug(str(result))
1602        deleted_jobids = result["deleted"]["jobids"]
1603        self.assertEqual(
1604            len(deleted_jobids),
1605            number,
1606            "Failed: expecting {} jobids, found {} ({})".format(
1607                number, len(deleted_jobids), deleted_jobids
1608            ),
1609        )
1610
1611    def test_delete_invalid_jobids(self):
1612        """"""
1613        logger = logging.getLogger()
1614
1615        username = self.get_operator_username()
1616        password = self.get_operator_password(username)
1617
1618        director = bareos.bsock.DirectorConsoleJson(
1619            address=self.director_address,
1620            port=self.director_port,
1621            name=username,
1622            password=password,
1623            **self.director_extra_options
1624        )
1625
1626        with self.assertRaises(bareos.exceptions.JsonRpcErrorReceivedException):
1627            result = director.call("delete jobid={}".format("1000_INVALID"))
1628
1629    def test_delete_volume(self):
1630        """"""
1631        logger = logging.getLogger()
1632
1633        username = self.get_operator_username()
1634        password = self.get_operator_password(username)
1635
1636        jobname = u"backup-bareos-fd"
1637
1638        director = bareos.bsock.DirectorConsoleJson(
1639            address=self.director_address,
1640            port=self.director_port,
1641            name=username,
1642            password=password,
1643            **self.director_extra_options
1644        )
1645
1646        jobid = self.run_job(director, jobname, level="Full", wait=True)
1647        job = self.list_jobid(director, jobid)
1648        result = director.call("list volume jobid={}".format(jobid))
1649        volume = result["volumes"][0]["volumename"]
1650        result = director.call("delete volume={} yes".format(volume))
1651        logger.debug(str(result))
1652        self.assertIn(jobid, result["deleted"]["jobids"])
1653        self.assertIn(volume, result["deleted"]["volumes"])
1654        with self.assertRaises(ValueError):
1655            job = self.list_jobid(director, jobid)
1656
1657
1658class PythonBareosFiledaemonTest(PythonBareosBase):
1659    @unittest.skipUnless(
1660        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
1661    )
1662    def test_status(self):
1663        logger = logging.getLogger()
1664
1665        name = self.director_name
1666        password = self.filedaemon_director_password
1667
1668        bareos_password = bareos.bsock.Password(password)
1669        fd = bareos.bsock.FileDaemon(
1670            address=self.filedaemon_address,
1671            port=self.filedaemon_port,
1672            name=name,
1673            password=bareos_password,
1674            **self.director_extra_options
1675        )
1676
1677        result = fd.call(u"status")
1678
1679        expected_regex_str = r"{} \(director\) connected at: (.*)".format(name)
1680        expected_regex = bytes(bytearray(expected_regex_str, "utf-8"))
1681
1682        # logger.debug(expected_regex)
1683        # logger.debug(result)
1684
1685        # logger.debug(re.search(expected_regex, result).group(1).decode('utf-8'))
1686
1687        self.assertRegex(result, expected_regex)
1688
1689    @unittest.skipUnless(
1690        bareos.bsock.DirectorConsole.is_tls_psk_available(), "TLS-PSK is not available."
1691    )
1692    def test_execute_external_command(self):
1693        logger = logging.getLogger()
1694
1695        name = self.director_name
1696        password = self.filedaemon_director_password
1697
1698        # let the shell calculate 6 * 7
1699        # and verify, that the output containts 42.
1700        command = u"bash -c \"let result='6 * 7'; echo result=$result\""
1701        expected_regex_str = r"ClientBeforeJob: result=(42)"
1702        expected_regex = bytes(bytearray(expected_regex_str, "utf-8"))
1703
1704        bareos_password = bareos.bsock.Password(password)
1705        fd = bareos.bsock.FileDaemon(
1706            address=self.filedaemon_address,
1707            port=self.filedaemon_port,
1708            name=name,
1709            password=bareos_password,
1710            **self.director_extra_options
1711        )
1712
1713        fd.call(
1714            [
1715                "Run",
1716                "OnSuccess=1",
1717                "OnFailure=1",
1718                "AbortOnError=1",
1719                "When=2",
1720                "Command={}".format(command),
1721            ]
1722        )
1723        result = fd.call([u"RunBeforeNow"])
1724
1725        # logger.debug(expected_regex)
1726        logger.debug(result)
1727
1728        logger.debug(
1729            u"result is {}".format(
1730                re.search(expected_regex, result).group(1).decode("utf-8")
1731            )
1732        )
1733
1734        self.assertRegex(result, expected_regex)
1735
1736
1737class PythonBareosShowTest(PythonBareosJsonBase):
1738    def test_fileset(self):
1739        """
1740        Filesets are stored in the database,
1741        as soon as a job using them did run.
1742        We want to verify, that the catalog content is correct.
1743        As comparing the full content is difficult,
1744        we only check if the "Description" is stored in the catalog.
1745        """
1746        logger = logging.getLogger()
1747
1748        username = self.get_operator_username()
1749        password = self.get_operator_password(username)
1750
1751        jobname = u"backup-bareos-fd"
1752
1753        director = bareos.bsock.DirectorConsoleJson(
1754            address=self.director_address,
1755            port=self.director_port,
1756            name=username,
1757            password=password,
1758            **self.director_extra_options
1759        )
1760
1761        jobid = self.run_job(director, jobname, wait=True)
1762        result = director.call("llist jobid={}".format(jobid))
1763        fileset = result["jobs"][0]["fileset"]
1764        result = director.call("list fileset jobid={} limit=1".format(jobid))
1765        fileset_content_list = result["filesets"][0]["filesettext"]
1766
1767        result = director.call("show fileset={}".format(fileset))
1768        fileset_show_description = result["filesets"][fileset]["description"]
1769
1770        self.assertIn(fileset_show_description, fileset_content_list)
1771
1772
1773def get_env():
1774    """
1775    Get attributes as environment variables,
1776    if not available or set use defaults.
1777    """
1778    director_root_password = os.environ.get("dir_password")
1779    if director_root_password:
1780        PythonBareosBase.director_root_password = director_root_password
1781
1782    director_port = os.environ.get("BAREOS_DIRECTOR_PORT")
1783    if director_port:
1784        PythonBareosBase.director_port = director_port
1785
1786    filedaemon_director_password = os.environ.get("fd_password")
1787    if filedaemon_director_password:
1788        PythonBareosBase.filedaemon_director_password = filedaemon_director_password
1789
1790    filedaemon_port = os.environ.get("BAREOS_FD_PORT")
1791    if filedaemon_port:
1792        PythonBareosBase.filedaemon_port = filedaemon_port
1793
1794    backup_directory = os.environ.get("BackupDirectory")
1795    if backup_directory:
1796        PythonBareosBase.backup_directory = backup_directory
1797
1798    tls_version_str = os.environ.get("PYTHON_BAREOS_TLS_VERSION")
1799    if tls_version_str is not None:
1800        tls_version_parser = bareos.bsock.TlsVersionParser()
1801        tls_version = tls_version_parser.get_protocol_version_from_string(
1802            tls_version_str
1803        )
1804        if tls_version is not None:
1805            PythonBareosBase.director_extra_options["tls_version"] = tls_version
1806        else:
1807            print(
1808                "Environment variable PYTHON_BAREOS_TLS_VERSION has invalid value ({}). Valid values: {}".format(
1809                    tls_version_str,
1810                    ", ".join(tls_version_parser.get_protocol_versions()),
1811                )
1812            )
1813
1814    if os.environ.get("REGRESS_DEBUG"):
1815        PythonBareosBase.debug = True
1816
1817
1818if __name__ == "__main__":
1819    get_env()
1820    unittest.main()
1821