summaryrefslogtreecommitdiff
path: root/tests/test_apt_cache.py
blob: 4df5e27d4b7fbde3f84cb34e7d4c90fc6dcdb276 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#!/usr/bin/python
#
# Copyright (C) 2010 Julian Andres Klode <jak@debian.org>
#               2010 Michael Vogt <mvo@ubuntu.com>
#
# Copying and distribution of this file, with or without modification,
# are permitted in any medium without royalty provided the copyright
# notice and this notice are preserved.
"""Unit tests for verifying the correctness of check_dep, etc in apt_pkg."""

import glob
import logging
import os
import shutil
import sys
import tempfile
import unittest


from test_all import get_library_dir
libdir = get_library_dir()
if libdir:
    sys.path.insert(0, libdir)

import apt
import apt_pkg


def if_sources_list_is_readable(f):
    def wrapper(*args, **kwargs):
        if os.access("/etc/apt/sources.list", os.R_OK):
            f(*args, **kwargs)
        else:
            logging.warning(
                "skipping '%s' because sources.list is not readable" % f)
    return wrapper


def get_open_file_descriptors():
    try:
        fds = os.listdir("/proc/self/fd")
    except OSError:
        logging.warning("failed to list /proc/self/fd")
        return set([])
    return set(map(int, fds))


class TestAptCache(unittest.TestCase):
    """ test the apt cache """

    def setUp(self):
        # reset any config manipulations done in the individual tests
        apt_pkg.init_config()
        # save/restore the apt config
        self._cnf = {}
        for item in apt_pkg.config.keys():
            self._cnf[item] = apt_pkg.config.find(item)
        apt_pkg.config.clear("APT::Update::Post-Invoke")
        apt_pkg.config.clear("APT::Update::Post-Invoke-Success")

    def tearDown(self):
        for item in self._cnf:
            apt_pkg.config.set(item, self._cnf[item])

    @if_sources_list_is_readable
    def test_apt_cache(self):
        """cache: iterate all packages and all dependencies """
        cache = apt.Cache()
        # number is not meaningful and just need to be "big enough",
        # the important bit is the test against __len__
        self.assertTrue(len(cache) > 100)
        # go over the cache and all dependencies, just to see if
        # that is possible and does not crash
        for pkg in cache:
            if pkg.candidate:
                for or_deps in pkg.candidate.dependencies:
                    for dep in or_deps:
                        self.assertTrue(dep.name)
                        self.assertTrue(isinstance(dep.relation, str))
                        self.assertTrue(dep.pre_depend in (True, False))

                # accessing record should take a reasonable time; in
                # particular, when using compressed indexes, it should not use
                # tons of seek operations
                r = pkg.candidate.record
                self.assertEqual(r['Package'], pkg.shortname)
                self.assertTrue('Version' in r)
                self.assertTrue(len(r['Description']) > 0)
                self.assertTrue(
                    str(r).startswith('Package: %s\n' % pkg.shortname))

    @if_sources_list_is_readable
    def test_cache_close_leak_fd(self):
        fds_before_open = get_open_file_descriptors()
        cache = apt.Cache()
        opened_fd = get_open_file_descriptors().difference(fds_before_open)
        cache.close()
        fds_after_close = get_open_file_descriptors()
        unclosed_fd = opened_fd.intersection(fds_after_close)
        self.assertEqual(fds_before_open, fds_after_close)
        self.assertEqual(unclosed_fd, set())

    def test_cache_open_twice_leaks_fds(self):
        cache = apt.Cache()
        fds_before_open = get_open_file_descriptors()
        cache.open()
        fds_after_open_twice = get_open_file_descriptors()
        self.assertEqual(fds_before_open, fds_after_open_twice)

    @if_sources_list_is_readable
    def test_cache_close_download_fails(self):
        cache = apt.Cache()
        self.assertEqual(cache.required_download, 0)
        cache.close()
        with self.assertRaises(apt.cache.CacheClosedException):
            cache.required_download

    def test_get_provided_packages(self):
        apt.apt_pkg.config.set("Apt::architecture", "i386")
        cache = apt.Cache(rootdir="./data/test-provides/")
        cache.open()
        if len(cache) == 0:
            logging.warning(
                "skipping test_get_provided_packages, cache empty?!?")
            return
        # a true virtual pkg
        l = cache.get_providing_packages("mail-transport-agent")
        self.assertTrue(len(l) > 0)
        self.assertTrue("postfix" in [p.name for p in l])
        self.assertTrue(
            "mail-transport-agent" in cache["postfix"].candidate.provides)

    def test_low_level_pkg_provides(self):
        apt.apt_pkg.config.set("Apt::architecture", "i386")
        # create highlevel cache and get the lowlevel one from it
        highlevel_cache = apt.Cache(rootdir="./data/test-provides")
        if len(highlevel_cache) == 0:
            logging.warning(
                "skipping test_log_level_pkg_provides, cache empty?!?")
            return
        # low level cache provides list of the pkg
        cache = highlevel_cache._cache
        l = cache["mail-transport-agent"].provides_list
        # arbitrary number, just needs to be higher enough
        self.assertEqual(len(l), 2)
        for (providesname, providesver, version) in l:
            self.assertEqual(providesname, "mail-transport-agent")
            if version.parent_pkg.name == "postfix":
                break
        else:
            self.assertNotReached()

    @if_sources_list_is_readable
    def test_dpkg_journal_dirty(self):
        # create tmp env
        tmpdir = tempfile.mkdtemp()
        dpkg_dir = os.path.join(tmpdir, "var", "lib", "dpkg")
        os.makedirs(os.path.join(dpkg_dir, "updates"))
        open(os.path.join(dpkg_dir, "status"), "w").close()
        apt_pkg.config.set("Dir::State::status",
                           os.path.join(dpkg_dir, "status"))
        cache = apt.Cache()
        # test empty
        self.assertFalse(cache.dpkg_journal_dirty)
        # that is ok, only [0-9] are dpkg jounral entries
        open(os.path.join(dpkg_dir, "updates", "xxx"), "w").close()
        self.assertFalse(cache.dpkg_journal_dirty)
        # that is a dirty journal
        open(os.path.join(dpkg_dir, "updates", "000"), "w").close()
        self.assertTrue(cache.dpkg_journal_dirty)

    @if_sources_list_is_readable
    def test_apt_update(self):
        rootdir = "./data/tmp"
        if os.path.exists(rootdir):
            shutil.rmtree(rootdir)
        try:
            os.makedirs(os.path.join(rootdir, "var/lib/apt/lists/partial"))
        except OSError:
            pass
        state_dir = os.path.join(rootdir, "var/lib/apt")
        lists_dir = os.path.join(rootdir, "var/lib/apt/lists")
        old_state = apt_pkg.config.find("dir::state")
        apt_pkg.config.set("dir::state", state_dir)
        # set a local sources.list that does not need the network
        base_sources = os.path.abspath(os.path.join(rootdir, "sources.list"))
        old_source_list = apt_pkg.config.find("dir::etc::sourcelist")
        old_source_parts = apt_pkg.config.find("dir::etc::sourceparts")
        apt_pkg.config.set("dir::etc::sourcelist", base_sources)
        # TODO: /dev/null is not a dir, perhaps find something better
        apt_pkg.config.set("dir::etc::sourceparts", "/dev/null")
        # main sources.list
        sources_list = base_sources
        with open(sources_list, "w") as f:
            repo = os.path.abspath("./data/test-repo2")
            f.write("deb copy:%s /\n" % repo)

        # test single sources.list fetching
        sources_list = os.path.join(rootdir, "test.list")
        with open(sources_list, "w") as f:
            repo_dir = os.path.abspath("./data/test-repo")
            f.write("deb copy:%s /\n" % repo_dir)

        self.assertTrue(os.path.exists(sources_list))
        # write marker to ensure listcleaner is not run
        open("./data/tmp/var/lib/apt/lists/marker", "w").close()

        # update a single sources.list
        cache = apt.Cache()
        cache.update(sources_list=sources_list)
        # verify we just got the excpected package file
        needle_packages = glob.glob(
            lists_dir + "/*tests_data_test-repo_Packages*")
        self.assertEqual(len(needle_packages), 1)
        # verify that we *only* got the Packages file from a single source
        all_packages = glob.glob(lists_dir + "/*_Packages*")
        self.assertEqual(needle_packages, all_packages)
        # verify that the listcleaner was not run and the marker file is
        # still there
        self.assertTrue("marker" in os.listdir(lists_dir))
        # now run update again (without the "normal" sources.list that
        # contains test-repo2 and verify that we got the normal sources.list
        cache.update()
        needle_packages = glob.glob(
            lists_dir + "/*tests_data_test-repo2_Packages*")
        self.assertEqual(len(needle_packages), 1)
        all_packages = glob.glob(lists_dir + "/*_Packages*")
        self.assertEqual(needle_packages, all_packages)

        # and another update with a single source only
        cache = apt.Cache()
        cache.update(sources_list=sources_list)
        all_packages = glob.glob(lists_dir + "/*_Packages*")
        self.assertEqual(len(all_packages), 2)
        apt_pkg.config.set("dir::state", old_state)
        apt_pkg.config.set("dir::etc::sourcelist", old_source_list)
        apt_pkg.config.set("dir::etc::sourceparts", old_source_parts)

    def test_package_cmp(self):
        cache = apt.Cache(rootdir="/")
        l = []
        l.append(cache["intltool"])
        l.append(cache["python3"])
        l.append(cache["apt"])
        l.sort()
        self.assertEqual([p.name for p in l],
                         ["apt", "intltool", "python3"])

    def test_get_architectures(self):
        main_arch = apt.apt_pkg.config.get("APT::Architecture")
        arches = apt_pkg.get_architectures()
        self.assertTrue(main_arch in arches)


if __name__ == "__main__":
    unittest.main()