1# -*- coding: utf-8 -*-
2
3# Copyright (C) 2012-2018 Red Hat, Inc.
4#
5# This copyrighted material is made available to anyone wishing to use,
6# modify, copy, or redistribute it subject to the terms and conditions of
7# the GNU General Public License v.2, or (at your option) any later version.
8# This program is distributed in the hope that it will be useful, but WITHOUT
9# ANY WARRANTY expressed or implied, including the implied warranties of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
11# Public License for more details.  You should have received a copy of the
12# GNU General Public License along with this program; if not, write to the
13# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
14# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
15# source code or documentation are not subject to the GNU General Public
16# License and may only be used or replicated with the express permission of
17# Red Hat, Inc.
18#
19
20from __future__ import absolute_import
21from __future__ import unicode_literals
22
23import contextlib
24import logging
25import os
26import re
27import shutil
28import sys
29import tempfile
30import unittest
31from functools import reduce
32
33import hawkey
34import hawkey.test
35import libdnf.transaction
36
37import dnf
38import dnf.conf
39import dnf.cli.cli
40import dnf.cli.demand
41import dnf.cli.option_parser
42import dnf.comps
43import dnf.exceptions
44import dnf.goal
45import dnf.i18n
46import dnf.package
47import dnf.persistor
48import dnf.pycomp
49import dnf.repo
50import dnf.sack
51
52
53if dnf.pycomp.PY3:
54    from unittest import mock
55    from unittest.mock import MagicMock, mock_open
56else:
57    from tests import mock
58    from tests.mock import MagicMock
59
60    def mock_open(mock=None, data=None):
61        if mock is None:
62            mock = MagicMock(spec=file)
63
64        handle = MagicMock(spec=file)
65        handle.write.return_value = None
66        if data is None:
67            handle.__enter__.return_value = handle
68        else:
69            handle.__enter__.return_value = data
70        mock.return_value = handle
71        return mock
72
73
74logger = logging.getLogger('dnf')
75skip = unittest.skip
76
77TRACEBACK_RE = re.compile(
78    r'(Traceback \(most recent call last\):\n'
79    r'(?:  File "[^"\n]+", line \d+, in \w+\n'
80    r'(?:    .+\n)?)+'
81    r'\S.*\n)')
82REASONS = {
83    'hole': 'group',
84    'pepper': 'group',
85    'right': 'dep',
86    'tour': 'group',
87    'trampoline': 'group',
88}
89# @System.repo doesn't provide sha1header/pkgid
90# the checksum is computed from an empty string
91RPMDB_CHECKSUM = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
92TOTAL_RPMDB_COUNT = 10
93SYSTEM_NSOLVABLES = TOTAL_RPMDB_COUNT
94MAIN_NSOLVABLES = 9
95UPDATES_NSOLVABLES = 4
96AVAILABLE_NSOLVABLES = MAIN_NSOLVABLES + UPDATES_NSOLVABLES
97TOTAL_GROUPS = 5
98TOTAL_NSOLVABLES = SYSTEM_NSOLVABLES + AVAILABLE_NSOLVABLES
99
100
101# testing infrastructure
102
103
104def dnf_toplevel():
105    return os.path.normpath(os.path.join(__file__, '../../'))
106
107
108def repo(reponame):
109    return os.path.join(REPO_DIR, reponame)
110
111
112def resource_path(path):
113    this_dir = os.path.dirname(__file__)
114    return os.path.join(this_dir, path)
115
116
117REPO_DIR = resource_path('repos')
118COMPS_PATH = os.path.join(REPO_DIR, 'main_comps.xml')
119NONEXISTENT_FILE = resource_path('does-not/exist')
120TOUR_44_PKG_PATH = resource_path('repos/rpm/tour-4-4.noarch.rpm')
121TOUR_50_PKG_PATH = resource_path('repos/rpm/tour-5-0.noarch.rpm')
122TOUR_51_PKG_PATH = resource_path('repos/rpm/tour-5-1.noarch.rpm')
123USER_RUNDIR = '/tmp/dnf-user-rundir'
124
125
126# often used query
127
128
129def installed_but(sack, *args):
130    q = sack.query().filter(reponame__eq=hawkey.SYSTEM_REPO_NAME)
131    return reduce(lambda query, name: query.filter(name__neq=name), args, q)
132
133
134# patching the stdout
135
136
137@contextlib.contextmanager
138def patch_std_streams():
139    with mock.patch('sys.stdout', new_callable=dnf.pycomp.StringIO) as stdout, \
140            mock.patch('sys.stderr', new_callable=dnf.pycomp.StringIO) as stderr:
141        yield (stdout, stderr)
142
143
144@contextlib.contextmanager
145def wiretap_logs(logger_name, level, stream):
146    """Record *logger_name* logs of at least *level* into the *stream*."""
147    logger = logging.getLogger(logger_name)
148
149    orig_level = logger.level
150    logger.setLevel(level)
151
152    handler = logging.StreamHandler(stream)
153    orig_handlers = logger.handlers
154    logger.handlers = []
155    logger.addHandler(handler)
156
157    try:
158        yield stream
159    finally:
160        logger.removeHandler(handler)
161        logger.setLevel(orig_level)
162        logger.handlers = orig_handlers
163
164
165def command_configure(cmd, args):
166    parser = dnf.cli.option_parser.OptionParser()
167    args = [cmd._basecmd] + args
168    parser.parse_main_args(args)
169    parser.parse_command_args(cmd, args)
170    return cmd.configure()
171
172
173def command_run(cmd, args):
174    command_configure(cmd, args)
175    return cmd.run()
176
177class Base(dnf.Base):
178    def __init__(self, *args, **kwargs):
179        with mock.patch('dnf.rpm.detect_releasever', return_value=69):
180            super(Base, self).__init__(*args, **kwargs)
181
182# mock objects
183
184
185def mock_comps(history, seed_history):
186    comps = dnf.comps.Comps()
187    comps._add_from_xml_filename(COMPS_PATH)
188
189    if seed_history:
190        name = 'Peppers'
191        pkg_types = dnf.comps.MANDATORY
192        swdb_group = history.group.new(name, name, name, pkg_types)
193        for pkg_name in ['hole', 'lotus']:
194            swdb_group.addPackage(pkg_name, True, dnf.comps.MANDATORY)
195        history.group.install(swdb_group)
196
197        name = 'somerset'
198        pkg_types = dnf.comps.MANDATORY
199        swdb_group = history.group.new(name, name, name, pkg_types)
200        for pkg_name in ['pepper', 'trampoline', 'lotus']:
201            swdb_group.addPackage(pkg_name, True, dnf.comps.MANDATORY)
202        history.group.install(swdb_group)
203
204        name = 'sugar-desktop-environment'
205        pkg_types = dnf.comps.ALL_TYPES
206        swdb_env = history.env.new(name, name, name, pkg_types)
207        for group_id in ['Peppers', 'somerset']:
208            swdb_env.addGroup(group_id, True, dnf.comps.MANDATORY)
209        history.env.install(swdb_env)
210    return comps
211
212
213def mock_logger():
214    return mock.create_autospec(logger)
215
216
217class _BaseStubMixin(object):
218    """A reusable class for creating `dnf.Base` stubs.
219
220    See also: hawkey/test/python/__init__.py.
221
222    Note that currently the used TestSack has always architecture set to
223    "x86_64". This is to get the same behavior when running unit tests on
224    different arches.
225
226    """
227    def __init__(self, *extra_repos, **config_opts):
228        super(_BaseStubMixin, self).__init__(FakeConf(**config_opts))
229        for r in extra_repos:
230            repo = MockRepo(r, self.conf)
231            repo.enable()
232            self._repos.add(repo)
233
234        self._repo_persistor = FakePersistor()
235        self._ds_callback = mock.Mock()
236        self._history = None
237        self._closed = False
238        self._closing = False
239
240    def add_test_dir_repo(self, id_, cachedir):
241        """Add a repository located in a directory in the tests."""
242        repo = dnf.repo.Repo(id_, cachedir)
243        repo.baseurl = ['file://%s/%s' % (REPO_DIR, repo.id)]
244        self.repos.add(repo)
245        return repo
246
247    def close(self):
248        self._closing = True
249        super(_BaseStubMixin, self).close()
250
251    @property
252    def history(self):
253        if self._history:
254            return self._history
255        else:
256            self._history = super(_BaseStubMixin, self).history
257            if not self._closing:
258                # don't reset db on close, it causes several tests to fail
259                self._history.reset_db()
260            return self._history
261
262    @property
263    def sack(self):
264        if self._sack:
265            return self._sack
266        return self.init_sack()
267
268    def _build_comps_solver(self):
269        return dnf.comps.Solver(self.history, self._comps,
270                                REASONS.get)
271
272    def _activate_persistor(self):
273        pass
274
275    def init_sack(self):
276        # Create the Sack, tell it how to build packages, passing in the Package
277        # class and a Base reference.
278        self._sack = TestSack(REPO_DIR, self)
279        self._sack.load_system_repo()
280        for repo in self.repos.iter_enabled():
281            if repo.__class__ is dnf.repo.Repo:
282                self._add_repo_to_sack(repo)
283            else:
284                fn = "%s.repo" % repo.id
285                self._sack.load_test_repo(repo.id, fn)
286
287        self._sack._configure(self.conf.installonlypkgs)
288        self._goal = dnf.goal.Goal(self._sack)
289        self._goal.protect_running_kernel = self.conf.protect_running_kernel
290        return self._sack
291
292    def mock_cli(self):
293        stream = dnf.pycomp.StringIO()
294        logger = logging.getLogger('test')
295        logger.setLevel(logging.DEBUG)
296        logger.addHandler(logging.StreamHandler(stream))
297        return mock.Mock(base=self, log_stream=stream, logger=logger,
298                         demands=dnf.cli.demand.DemandSheet())
299
300    def read_mock_comps(self, seed_history=True):
301        self._comps = mock_comps(self.history, seed_history)
302        return self._comps
303
304    def read_all_repos(self, opts=None):
305        for repo in self.repos.values():
306            repo._configure_from_options(opts)
307
308    def set_debuglevel(self, level):
309        self.conf._set_value('debuglevel', level, dnf.conf.PRIO_RUNTIME)
310
311
312class BaseCliStub(_BaseStubMixin, dnf.cli.cli.BaseCli):
313    """A class mocking `dnf.cli.cli.BaseCli`."""
314
315    def __init__(self, *extra_repos, **config_opts):
316        """Initialize the base."""
317        super(BaseCliStub, self).__init__(*extra_repos, **config_opts)
318        self.output.term = MockTerminal()
319
320
321class DemandsStub(object):
322    pass
323
324
325class CliStub(object):
326    """A class mocking `dnf.cli.Cli`."""
327
328    def __init__(self, base):
329        """Initialize the CLI."""
330        self.base = base
331        self.cli_commands = {}
332        self.demands = DemandsStub()
333        self.logger = logging.getLogger()
334        self.register_command(dnf.cli.commands.HelpCommand)
335
336    def redirect_logger(self, stdout=None, stderr=None):
337        return
338
339    def redirect_repo_progress(self, fo=sys.stderr):
340        return
341
342    def register_command(self, command):
343        """Register given *command*."""
344        self.cli_commands.update({alias: command for alias in command.aliases})
345
346
347class MockOutput(object):
348    def __init__(self):
349        self.term = MockTerminal()
350
351    def setup_progress_callbacks(self):
352        return (None, None)
353
354
355class MockPackage(object):
356    def __init__(self, nevra, repo=None):
357        self.baseurl = None
358        self._chksum = (None, None)
359        self.downloadsize = None
360        self._header = None
361        self.location = '%s.rpm' % nevra
362        self.repo = repo
363        self.reponame = None if repo is None else repo.id
364        self.str = nevra
365        self.buildtime = 0
366        nevra = hawkey.split_nevra(nevra)
367        self.name = nevra.name
368        self.epoch = nevra.epoch
369        self.version = nevra.version
370        self.release = nevra.release
371        self.arch = nevra.arch
372        self.evr = '%(epoch)d:%(version)s-%(release)s' % vars(self)
373        self.pkgtup = (self.name, self.arch, str(self.epoch), self.version,
374                       self.release)
375
376    def __str__(self):
377        return self.str
378
379    def localPkg(self):
380        return os.path.join(self.repo.pkgdir, os.path.basename(self.location))
381
382    def returnIdSum(self):
383        return self._chksum
384
385
386class MockRepo(dnf.repo.Repo):
387    def _valid(self):
388        return None
389
390
391class MockQuery(dnf.query.Query):
392    def __init__(self, query):
393        self.pkgs = [MockPackage(str(p)) for p in query.run()]
394        self.i = 0
395        self.n = len(self.pkgs)
396
397    def __getitem__(self, key):
398        if key < self.n:
399            return self.pkgs[key]
400        else:
401            raise KeyError()
402
403    def __iter__(self):
404        return self
405
406    def __len__(self):
407        return self.n
408
409    def filter(self, pkg):
410        self.pkgs = []
411        self.pkgs.extend(pkg)
412        self.n = len(self.pkgs)
413        return self
414
415    def next(self):
416        return self.__next__()
417
418    def __next__(self):
419        if self.i < self.n:
420            i = self.i
421            self.i += 1
422            return self.pkgs[i]
423        else:
424            raise StopIteration()
425
426    def run(self):
427        return self.pkgs
428
429
430class MockTerminal(object):
431    def __init__(self):
432        self.MODE = {'bold': '', 'normal': ''}
433        self.columns = 80
434        self.real_columns = 80
435        self.reinit = mock.Mock()
436
437    def bold(self, s):
438        return s
439
440
441class TestSack(hawkey.test.TestSackMixin, dnf.sack.Sack):
442    def __init__(self, repo_dir, base):
443        hawkey.test.TestSackMixin.__init__(self, repo_dir)
444        dnf.sack.Sack.__init__(self,
445                               arch=hawkey.test.FIXED_ARCH,
446                               pkgcls=dnf.package.Package,
447                               pkginitval=base,
448                               make_cache_dir=True)
449
450
451class MockBase(_BaseStubMixin, Base):
452    """A class mocking `dnf.Base`."""
453
454
455def mock_sack(*extra_repos):
456    return MockBase(*extra_repos).sack
457
458
459class FakeConf(dnf.conf.Conf):
460    def __init__(self, **kwargs):
461        super(FakeConf, self).__init__()
462        self.substitutions['releasever'] = 'Fedora69'
463        options = [
464            ('assumeyes', None),
465            ('best', False),
466            ('cachedir', dnf.const.TMPDIR),
467            ('clean_requirements_on_remove', False),
468            ('color', 'never'),
469            ('color_update_installed', 'normal'),
470            ('color_update_remote', 'normal'),
471            ('color_list_available_downgrade', 'dim'),
472            ('color_list_available_install', 'normal'),
473            ('color_list_available_reinstall', 'bold'),
474            ('color_list_available_upgrade', 'bold'),
475            ('color_list_installed_extra', 'bold'),
476            ('color_list_installed_newer', 'bold'),
477            ('color_list_installed_older', 'bold'),
478            ('color_list_installed_reinstall', 'normal'),
479            ('color_update_local', 'bold'),
480            ('debug_solver', False),
481            ('debuglevel', 2),
482            ('defaultyes', False),
483            ('disable_excludes', []),
484            ('diskspacecheck', True),
485            ('exclude', []),
486            ('includepkgs', []),
487            ('install_weak_deps', True),
488            ('history_record', False),
489            ('installonly_limit', 0),
490            ('installonlypkgs', ['kernel']),
491            ('installroot', '/tmp/dnf-test-installroot/'),
492            ('ip_resolve', None),
493            ('multilib_policy', 'best'),
494            ('obsoletes', True),
495            ('persistdir', dnf.const.PERSISTDIR),
496            ('transformdb', False),
497            ('protected_packages', ["dnf"]),
498            ('protect_running_kernel', True),
499            ('plugins', False),
500            ('showdupesfromrepos', False),
501            ('tsflags', []),
502            ('strict', True),
503        ] + list(kwargs.items())
504        for optname, val in options:
505            self._set_value(optname, val, dnf.conf.PRIO_DEFAULT)
506
507        # TODO: consolidate with dnf.cli.Cli._read_conf_file()
508        for opt in ('cachedir', 'logdir', 'persistdir'):
509            # don't prepend installroot if option was specified by user
510            # TODO: is this desired? ^^^ (tests won't pass without it ATM)
511            if opt in kwargs:
512                continue
513            self.prepend_installroot(opt)
514
515        try:
516            os.makedirs(self.persistdir)
517        except:
518            pass
519
520    @property
521    def releasever(self):
522        return self.substitutions['releasever']
523
524
525class FakePersistor(object):
526    reset_last_makecache = False
527    expired_to_add = set()
528
529    def get_expired_repos(self):
530        return set()
531
532    def since_last_makecache(self):
533        return None
534
535    def save(self):
536        pass
537
538
539# object matchers for asserts
540
541
542class ObjectMatcher(object):
543    """Class allowing partial matching of objects."""
544
545    def __init__(self, type_=None, attrs=None):
546        """Initialize a matcher instance."""
547        self._type = type_
548        self._attrs = attrs
549
550    def __eq__(self, other):
551        """Test whether this object is equal to the *other* one."""
552        if self._type is not None:
553            if type(other) is not self._type:
554                return False
555
556        if self._attrs:
557            for attr, value in self._attrs.items():
558                if value != getattr(other, attr):
559                    return False
560        return True
561
562    def __ne__(self, other):
563        """Test whether this object is not equal to the *other* one."""
564        return not self == other
565
566    def __repr__(self):
567        """Compute the "official" string representation of this object."""
568        args_strs = []
569
570        if self._type is not None:
571            args_strs.append('type_=%s' % repr(self._type))
572
573        if self._attrs:
574            attrs_str = ', '.join('%s: %s' % (dnf.i18n.ucd(attr), repr(value))
575                                  for attr, value in self._attrs.items())
576            args_strs.append('attrs={%s}' % attrs_str)
577
578        return '%s(%s)' % (type(self).__name__, ", ".join(args_strs))
579
580
581# test cases:
582
583
584class TestCase(unittest.TestCase):
585
586    if not dnf.pycomp.PY3:
587        assertCountEqual = unittest.TestCase.assertItemsEqual
588        assertRegex = unittest.TestCase.assertRegexpMatches
589
590    def assertEmpty(self, collection):
591        return self.assertEqual(len(collection), 0)
592
593    def assertFile(self, path):
594        """Assert the given path is a file."""
595        return self.assertTrue(os.path.isfile(path))
596
597    def assertLength(self, collection, length):
598        return self.assertEqual(len(collection), length)
599
600    def assertPathDoesNotExist(self, path):
601        return self.assertFalse(os.access(path, os.F_OK))
602
603    def assertStartsWith(self, string, what):
604        return self.assertTrue(string.startswith(what))
605
606    def assertTracebackIn(self, end, string):
607        """Test that a traceback ending with line *end* is in the *string*."""
608        traces = (match.group() for match in TRACEBACK_RE.finditer(string))
609        self.assertTrue(any(trace.endswith(end) for trace in traces))
610
611    def assertTransEqual(self, trans_pkgs, list):
612        return self.assertCountEqual([pkg.name for pkg in trans_pkgs], list)
613
614
615class DnfBaseTestCase(TestCase):
616
617    # create base with specified test repos
618    REPOS = []
619
620    # initialize mock sack
621    INIT_SACK = False
622
623    # initialize self.base._transaction
624    INIT_TRANSACTION = False
625
626    # False: self.base = MockBase()
627    # True: self.base = BaseCliStub()
628    BASE_CLI = False
629
630    # None: self.cli = None
631    # "init": self.cli = dnf.cli.cli.Cli(self.base)
632    # "mock": self.cli = self.base.mock_cli()
633    # "stub": self.cli = StubCli(self.base)
634    CLI = None
635
636    COMPS = False
637    COMPS_SEED_HISTORY = False
638    COMPS_SOLVER = False
639
640    def setUp(self):
641        self._installroot = tempfile.mkdtemp(prefix="dnf_test_installroot_")
642
643        if self.BASE_CLI:
644            self.base = BaseCliStub(*self.REPOS, installroot=self._installroot)
645        else:
646            self.base = MockBase(*self.REPOS, installroot=self._installroot)
647
648        if self.CLI is None:
649            self.cli = None
650        elif self.CLI == "init":
651            self.cli = dnf.cli.cli.Cli(self.base)
652        elif self.CLI == "mock":
653            self.cli = self.base.mock_cli()
654        elif self.CLI == "stub":
655            self.cli = CliStub(self.base)
656        else:
657            raise ValueError("Invalid CLI value: {}".format(self.CLI))
658
659        if self.COMPS:
660            self.base.read_mock_comps(seed_history=self.COMPS_SEED_HISTORY)
661
662        if self.INIT_SACK:
663            self.base.init_sack()
664
665        if self.INIT_TRANSACTION:
666            self.base._transaction = self.base.history.rpm
667
668        if self.COMPS_SOLVER:
669            self.solver = dnf.comps.Solver(self.history, self.comps, REASONS.get)
670        else:
671            self.solver = None
672
673    def tearDown(self):
674        self.base.close()
675        if self._installroot.startswith("/tmp/"):
676            shutil.rmtree(self._installroot)
677
678    @property
679    def comps(self):
680        return self.base.comps
681
682    @property
683    def goal(self):
684        return self.base._goal
685
686    @property
687    def history(self):
688        return self.base.history
689
690    @property
691    def sack(self):
692        return self.base.sack
693
694    def _swdb_begin(self, tsis=None):
695        # history.beg() replaces persistor.commit()
696        tsis = tsis or []
697        self.history.beg("", [], tsis)
698
699    def _swdb_end(self, tsis=None):
700        for tsi in self.history._swdb.getItems():
701            if tsi.getState() == libdnf.transaction.TransactionItemState_UNKNOWN:
702                tsi.setState(libdnf.transaction.TransactionItemState_DONE)
703        self.history.end("")
704        self.history.close()
705
706    def _swdb_commit(self, tsis=None):
707        self._swdb_begin(tsis)
708        self._swdb_end()
709        self.history.close()
710
711
712class ResultTestCase(DnfBaseTestCase):
713
714    allow_erasing = False
715
716    def _get_installed(self, base):
717        try:
718            base.resolve(self.allow_erasing)
719        except dnf.exceptions.DepsolveError:
720            self.fail()
721
722        installed = set(base.sack.query().installed())
723        for r in base._transaction.remove_set:
724            installed.remove(r)
725        installed.update(base._transaction.install_set)
726        return installed
727
728    def assertResult(self, base, pkgs):
729        """Check whether the system contains the given pkgs.
730
731        pkgs must be present. Any other pkgs result in an error. Pkgs are
732        present if they are in the rpmdb and are not REMOVEd or they are
733        INSTALLed.
734        """
735
736        self.assertCountEqual(self._get_installed(base), pkgs)
737
738    def installed_removed(self, base):
739        try:
740            base.resolve(self.allow_erasing)
741        except dnf.exceptions.DepsolveError:
742            self.fail()
743
744        installed = base._transaction.install_set
745        removed = base._transaction.remove_set
746        return installed, removed
747