# -*- coding: utf-8 -*- # Copyright (C) 2012-2018 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # from __future__ import absolute_import from __future__ import unicode_literals import contextlib import logging import os import re import sys import unittest from functools import reduce import hawkey import hawkey.test import libdnf.transaction import dnf import dnf.conf import dnf.cli.cli import dnf.cli.demand import dnf.cli.option_parser import dnf.comps import dnf.exceptions import dnf.goal import dnf.i18n import dnf.package import dnf.persistor import dnf.pycomp import dnf.repo import dnf.sack if dnf.pycomp.PY3: from unittest import mock from unittest.mock import MagicMock, mock_open else: from tests import mock from tests.mock import MagicMock def mock_open(mock=None, data=None): if mock is None: mock = MagicMock(spec=file) handle = MagicMock(spec=file) handle.write.return_value = None if data is None: handle.__enter__.return_value = handle else: handle.__enter__.return_value = data mock.return_value = handle return mock logger = logging.getLogger('dnf') skip = unittest.skip TRACEBACK_RE = re.compile( r'(Traceback \(most recent call last\):\n' r'(?: File "[^"\n]+", line \d+, in \w+\n' r'(?: .+\n)?)+' r'\S.*\n)') REASONS = { 'hole': 'group', 'pepper': 'group', 'right': 'dep', 'tour': 'group', 'trampoline': 'group', } # @System.repo doesn't provide sha1header/pkgid # the checksum is computed from an empty string RPMDB_CHECKSUM = 'da39a3ee5e6b4b0d3255bfef95601890afd80709' TOTAL_RPMDB_COUNT = 10 SYSTEM_NSOLVABLES = TOTAL_RPMDB_COUNT MAIN_NSOLVABLES = 9 UPDATES_NSOLVABLES = 4 AVAILABLE_NSOLVABLES = MAIN_NSOLVABLES + UPDATES_NSOLVABLES TOTAL_GROUPS = 4 TOTAL_NSOLVABLES = SYSTEM_NSOLVABLES + AVAILABLE_NSOLVABLES # testing infrastructure def dnf_toplevel(): return os.path.normpath(os.path.join(__file__, '../../')) def repo(reponame): return os.path.join(REPO_DIR, reponame) def resource_path(path): this_dir = os.path.dirname(__file__) return os.path.join(this_dir, path) REPO_DIR = resource_path('repos') COMPS_PATH = os.path.join(REPO_DIR, 'main_comps.xml') NONEXISTENT_FILE = resource_path('does-not/exist') TOUR_44_PKG_PATH = resource_path('repos/rpm/tour-4-4.noarch.rpm') TOUR_50_PKG_PATH = resource_path('repos/rpm/tour-5-0.noarch.rpm') TOUR_51_PKG_PATH = resource_path('repos/rpm/tour-5-1.noarch.rpm') USER_RUNDIR = '/tmp/dnf-user-rundir' # often used query def installed_but(sack, *args): q = sack.query().filter(reponame__eq=hawkey.SYSTEM_REPO_NAME) return reduce(lambda query, name: query.filter(name__neq=name), args, q) # patching the stdout @contextlib.contextmanager def patch_std_streams(): with mock.patch('sys.stdout', new_callable=dnf.pycomp.StringIO) as stdout, \ mock.patch('sys.stderr', new_callable=dnf.pycomp.StringIO) as stderr: yield (stdout, stderr) @contextlib.contextmanager def wiretap_logs(logger_name, level, stream): """Record *logger_name* logs of at least *level* into the *stream*.""" logger = logging.getLogger(logger_name) orig_level = logger.level logger.setLevel(level) handler = logging.StreamHandler(stream) orig_handlers = logger.handlers logger.handlers = [] logger.addHandler(handler) try: yield stream finally: logger.removeHandler(handler) logger.setLevel(orig_level) logger.handlers = orig_handlers def command_configure(cmd, args): parser = dnf.cli.option_parser.OptionParser() args = [cmd._basecmd] + args parser.parse_main_args(args) parser.parse_command_args(cmd, args) return cmd.configure() def command_run(cmd, args): command_configure(cmd, args) return cmd.run() class Base(dnf.Base): def __init__(self, *args, **kwargs): with mock.patch('dnf.rpm.detect_releasever', return_value=69): super(Base, self).__init__(*args, **kwargs) # mock objects def mock_comps(history, seed_history): comps = dnf.comps.Comps() comps._add_from_xml_filename(COMPS_PATH) if seed_history: name = 'Peppers' pkg_types = dnf.comps.MANDATORY swdb_group = history.group.new(name, name, name, pkg_types) for pkg_name in ['hole', 'lotus']: swdb_group.addPackage(pkg_name, True, dnf.comps.MANDATORY) history.group.install(swdb_group) name = 'somerset' pkg_types = dnf.comps.MANDATORY swdb_group = history.group.new(name, name, name, pkg_types) for pkg_name in ['pepper', 'trampoline', 'lotus']: swdb_group.addPackage(pkg_name, True, dnf.comps.MANDATORY) history.group.install(swdb_group) name = 'sugar-desktop-environment' pkg_types = dnf.comps.ALL_TYPES swdb_env = history.env.new(name, name, name, pkg_types) for group_id in ['Peppers', 'somerset']: swdb_env.addGroup(group_id, True, dnf.comps.MANDATORY) history.env.install(swdb_env) return comps def mock_logger(): return mock.create_autospec(logger) class _BaseStubMixin(object): """A reusable class for creating `dnf.Base` stubs. See also: hawkey/test/python/__init__.py. Note that currently the used TestSack has always architecture set to "x86_64". This is to get the same behavior when running unit tests on different arches. """ def __init__(self, *extra_repos): super(_BaseStubMixin, self).__init__(FakeConf()) for r in extra_repos: repo = MockRepo(r, self.conf) repo.enable() self._repos.add(repo) self._repo_persistor = FakePersistor() self._ds_callback = mock.Mock() self._history = None self._closing = False def add_test_dir_repo(self, id_, cachedir): """Add a repository located in a directory in the tests.""" repo = dnf.repo.Repo(id_, cachedir) repo.baseurl = ['file://%s/%s' % (REPO_DIR, repo.id)] self.repos.add(repo) return repo def close(self): self._closing = True super(_BaseStubMixin, self).close() @property def history(self): if self._history: return self._history else: self._history = super(_BaseStubMixin, self).history if not self._closing: # don't reset db on close, it causes several tests to fail self._history.reset_db() return self._history @property def sack(self): if self._sack: return self._sack return self.init_sack() def _build_comps_solver(self): return dnf.comps.Solver(self.history, self._comps, REASONS.get) def _activate_persistor(self): pass def init_sack(self): # Create the Sack, tell it how to build packages, passing in the Package # class and a Base reference. self._sack = TestSack(REPO_DIR, self) self._sack.load_system_repo() for repo in self.repos.iter_enabled(): if repo.__class__ is dnf.repo.Repo: self._add_repo_to_sack(repo) else: fn = "%s.repo" % repo.id self._sack.load_test_repo(repo.id, fn) self._sack._configure(self.conf.installonlypkgs) self._goal = dnf.goal.Goal(self._sack) return self._sack def mock_cli(self): stream = dnf.pycomp.StringIO() logger = logging.getLogger('test') logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler(stream)) return mock.Mock(base=self, log_stream=stream, logger=logger, demands=dnf.cli.demand.DemandSheet()) def read_mock_comps(self, seed_history=True): self._comps = mock_comps(self.history, seed_history) return self._comps def read_all_repos(self, opts=None): for repo in self.repos.values(): repo._configure_from_options(opts) def set_debuglevel(self, level): self.conf._set_value('debuglevel', level, dnf.conf.PRIO_RUNTIME) class BaseCliStub(_BaseStubMixin, dnf.cli.cli.BaseCli): """A class mocking `dnf.cli.cli.BaseCli`.""" def __init__(self, *extra_repos): """Initialize the base.""" super(BaseCliStub, self).__init__(*extra_repos) self.output.term = MockTerminal() class DemandsStub(object): pass class CliStub(object): """A class mocking `dnf.cli.Cli`.""" def __init__(self, base): """Initialize the CLI.""" self.base = base self.cli_commands = {} self.demands = DemandsStub() self.logger = logging.getLogger() self.register_command(dnf.cli.commands.HelpCommand) def redirect_logger(self, stdout=None, stderr=None): return def redirect_repo_progress(self, fo=sys.stderr): return def register_command(self, command): """Register given *command*.""" self.cli_commands.update({alias: command for alias in command.aliases}) class MockOutput(object): def __init__(self): self.term = MockTerminal() def setup_progress_callbacks(self): return (None, None) class MockPackage(object): def __init__(self, nevra, repo=None): self.baseurl = None self._chksum = (None, None) self.downloadsize = None self._header = None self.location = '%s.rpm' % nevra self.repo = repo self.reponame = None if repo is None else repo.id self.str = nevra self.buildtime = 0 nevra = hawkey.split_nevra(nevra) self.name = nevra.name self.epoch = nevra.epoch self.version = nevra.version self.release = nevra.release self.arch = nevra.arch self.evr = '%(epoch)d:%(version)s-%(release)s' % vars(self) self.pkgtup = (self.name, self.arch, str(self.epoch), self.version, self.release) def __str__(self): return self.str def localPkg(self): return os.path.join(self.repo.pkgdir, os.path.basename(self.location)) def returnIdSum(self): return self._chksum class MockRepo(dnf.repo.Repo): def _valid(self): return None class MockQuery(dnf.query.Query): def __init__(self, query): self.pkgs = [MockPackage(str(p)) for p in query.run()] self.i = 0 self.n = len(self.pkgs) def __getitem__(self, key): if key < self.n: return self.pkgs[key] else: raise KeyError() def __iter__(self): return self def __len__(self): return self.n def filter(self, pkg): self.pkgs = [] self.pkgs.extend(pkg) self.n = len(self.pkgs) return self def next(self): return self.__next__() def __next__(self): if self.i < self.n: i = self.i self.i += 1 return self.pkgs[i] else: raise StopIteration() def run(self): return self.pkgs class MockTerminal(object): def __init__(self): self.MODE = {'bold': '', 'normal': ''} self.columns = 80 self.real_columns = 80 self.reinit = mock.Mock() def bold(self, s): return s class TestSack(hawkey.test.TestSackMixin, dnf.sack.Sack): def __init__(self, repo_dir, base): hawkey.test.TestSackMixin.__init__(self, repo_dir) dnf.sack.Sack.__init__(self, arch=hawkey.test.FIXED_ARCH, pkgcls=dnf.package.Package, pkginitval=base, make_cache_dir=True) class MockBase(_BaseStubMixin, Base): """A class mocking `dnf.Base`.""" def mock_sack(*extra_repos): return MockBase(*extra_repos).sack class FakeConf(dnf.conf.Conf): def __init__(self, **kwargs): super(FakeConf, self).__init__() self.substitutions['releasever'] = 'Fedora69' options = [ ('assumeyes', None), ('best', False), ('cachedir', dnf.const.TMPDIR), ('clean_requirements_on_remove', False), ('color', 'never'), ('color_update_installed', 'normal'), ('color_update_remote', 'normal'), ('color_list_available_downgrade', 'dim'), ('color_list_available_install', 'normal'), ('color_list_available_reinstall', 'bold'), ('color_list_available_upgrade', 'bold'), ('color_list_installed_extra', 'bold'), ('color_list_installed_newer', 'bold'), ('color_list_installed_older', 'bold'), ('color_list_installed_reinstall', 'normal'), ('color_update_local', 'bold'), ('debug_solver', False), ('debuglevel', 2), ('defaultyes', False), ('disable_excludes', []), ('diskspacecheck', True), ('exclude', []), ('includepkgs', []), ('install_weak_deps', True), ('history_record', False), ('installonly_limit', 0), ('installonlypkgs', ['kernel']), ('installroot', '/tmp/swdb/'), ('ip_resolve', None), ('multilib_policy', 'best'), ('obsoletes', True), ('persistdir', dnf.const.PERSISTDIR), ('transformdb', False), ('protected_packages', ["dnf"]), ('plugins', False), ('showdupesfromrepos', False), ('tsflags', []), ('strict', True), ] + list(kwargs.items()) for optname, val in options: self._set_value(optname, val, dnf.conf.PRIO_DEFAULT) # TODO: consolidate with dnf.cli.Cli._read_conf_file() for opt in ('cachedir', 'logdir', 'persistdir'): # don't prepend installroot if option was specified by user # TODO: is this desired? ^^^ (tests won't pass without it ATM) if opt in kwargs: continue self.prepend_installroot(opt) @property def releasever(self): return self.substitutions['releasever'] class FakePersistor(object): reset_last_makecache = False expired_to_add = set() def get_expired_repos(self): return set() def since_last_makecache(self): return None def save(self): pass # object matchers for asserts class ObjectMatcher(object): """Class allowing partial matching of objects.""" def __init__(self, type_=None, attrs=None): """Initialize a matcher instance.""" self._type = type_ self._attrs = attrs def __eq__(self, other): """Test whether this object is equal to the *other* one.""" if self._type is not None: if type(other) is not self._type: return False if self._attrs: for attr, value in self._attrs.items(): if value != getattr(other, attr): return False return True def __ne__(self, other): """Test whether this object is not equal to the *other* one.""" return not self == other def __repr__(self): """Compute the "official" string representation of this object.""" args_strs = [] if self._type is not None: args_strs.append('type_=%s' % repr(self._type)) if self._attrs: attrs_str = ', '.join('%s: %s' % (dnf.i18n.ucd(attr), repr(value)) for attr, value in self._attrs.items()) args_strs.append('attrs={%s}' % attrs_str) return '%s(%s)' % (type(self).__name__, ", ".join(args_strs)) # test cases: class TestCase(unittest.TestCase): if not dnf.pycomp.PY3: assertCountEqual = unittest.TestCase.assertItemsEqual assertRegex = unittest.TestCase.assertRegexpMatches def assertEmpty(self, collection): return self.assertEqual(len(collection), 0) def assertFile(self, path): """Assert the given path is a file.""" return self.assertTrue(os.path.isfile(path)) def assertLength(self, collection, length): return self.assertEqual(len(collection), length) def assertPathDoesNotExist(self, path): return self.assertFalse(os.access(path, os.F_OK)) def assertStartsWith(self, string, what): return self.assertTrue(string.startswith(what)) def assertTracebackIn(self, end, string): """Test that a traceback ending with line *end* is in the *string*.""" traces = (match.group() for match in TRACEBACK_RE.finditer(string)) self.assertTrue(any(trace.endswith(end) for trace in traces)) def assertTransEqual(self, trans_pkgs, list): return self.assertCountEqual([pkg.name for pkg in trans_pkgs], list) class DnfBaseTestCase(TestCase): # create base with specified test repos REPOS = [] # initialize mock sack INIT_SACK = False # initialize self.base._transaction INIT_TRANSACTION = False # False: self.base = MockBase() # True: self.base = BaseCliStub() BASE_CLI = False # None: self.cli = None # "init": self.cli = dnf.cli.cli.Cli(self.base) # "mock": self.cli = self.base.mock_cli() # "stub": self.cli = StubCli(self.base) CLI = None COMPS = False COMPS_SEED_HISTORY = False COMPS_SOLVER = False def setUp(self): if self.BASE_CLI: self.base = BaseCliStub(*self.REPOS) else: self.base = MockBase(*self.REPOS) if self.CLI is None: self.cli = None elif self.CLI == "init": self.cli = dnf.cli.cli.Cli(self.base) elif self.CLI == "mock": self.cli = self.base.mock_cli() elif self.CLI == "stub": self.cli = CliStub(self.base) else: raise ValueError("Invalid CLI value: {}".format(self.CLI)) if self.COMPS: self.base.read_mock_comps(seed_history=self.COMPS_SEED_HISTORY) if self.INIT_SACK: self.base.init_sack() if self.INIT_TRANSACTION: self.base._transaction = self.base.history.rpm if self.COMPS_SOLVER: self.solver = dnf.comps.Solver(self.history, self.comps, REASONS.get) else: self.solver = None def tearDown(self): self.base.close() @property def comps(self): return self.base.comps @property def goal(self): return self.base._goal @property def history(self): return self.base.history @property def sack(self): return self.base.sack def _swdb_begin(self, tsis=None): # history.beg() replaces persistor.commit() tsis = tsis or [] self.history.beg("", [], tsis) def _swdb_end(self, tsis=None): for tsi in self.history._swdb.getItems(): if tsi.getState() == libdnf.transaction.TransactionItemState_UNKNOWN: tsi.setState(libdnf.transaction.TransactionItemState_DONE) self.history.end("") self.history.close() def _swdb_commit(self, tsis=None): self._swdb_begin(tsis) self._swdb_end() self.history.close() class ResultTestCase(DnfBaseTestCase): allow_erasing = False def _get_installed(self, base): try: base.resolve(self.allow_erasing) except dnf.exceptions.DepsolveError: self.fail() installed = set(base.sack.query().installed()) for r in base._transaction.remove_set: installed.remove(r) installed.update(base._transaction.install_set) return installed def assertResult(self, base, pkgs): """Check whether the system contains the given pkgs. pkgs must be present. Any other pkgs result in an error. Pkgs are present if they are in the rpmdb and are not REMOVEd or they are INSTALLed. """ self.assertCountEqual(self._get_installed(base), pkgs) def installed_removed(self, base): try: base.resolve(self.allow_erasing) except dnf.exceptions.DepsolveError: self.fail() installed = base._transaction.install_set removed = base._transaction.remove_set return installed, removed