# cache.py - apt cache abstraction # # Copyright (c) 2005-2009 Canonical # # Author: Michael Vogt # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import os import weakref import apt_pkg from apt import Package from apt.deprecation import (AttributeDeprecatedBy, function_deprecated_by, deprecated_args) import apt.progress class FetchCancelledException(IOError): """Exception that is thrown when the user cancels a fetch operation.""" class FetchFailedException(IOError): """Exception that is thrown when fetching fails.""" class LockFailedException(IOError): """Exception that is thrown when locking fails.""" class Cache(object): """Dictionary-like package cache. This class has all the packages that are available in it's dictionary """ def __init__(self, progress=None, rootdir=None, memonly=False): self._callbacks = {} if memonly: # force apt to build its caches in memory apt_pkg.config.set("Dir::Cache::pkgcache", "") if rootdir: if os.path.exists(rootdir+"/etc/apt/apt.conf"): apt_pkg.read_config_file(apt_pkg.config, rootdir + "/etc/apt/apt.conf") if os.path.isdir(rootdir+"/etc/apt/apt.conf.d"): apt_pkg.read_config_dir(apt_pkg.config, rootdir + "/etc/apt/apt.conf.d") apt_pkg.config.set("Dir", rootdir) apt_pkg.config.set("Dir::State::status", rootdir + "/var/lib/dpkg/status") self.open(progress) def _run_callbacks(self, name): """ internal helper to run a callback """ if name in self._callbacks: for callback in self._callbacks[name]: callback() def open(self, progress): """ Open the package cache, after that it can be used like a dictionary """ if progress is None: progress = apt.progress.OpProgress() self._run_callbacks("cache_pre_open") self._cache = apt_pkg.Cache(progress) self._depcache = apt_pkg.DepCache(self._cache) self._records = apt_pkg.PackageRecords(self._cache) self._list = apt_pkg.SourceList() self._list.read_main_list() self._set = set() self._weakref = weakref.WeakValueDictionary() progress.Op = "Building data structures" i=last=0 size=len(self._cache.packages) for pkg in self._cache.packages: if progress is not None and last+100 < i: progress.update(i/float(size)*100) last=i # drop stuff with no versions (cruft) if len(pkg.version_list) > 0: self._set.add(pkg.name) i += 1 progress.done() self._run_callbacks("cache_post_open") def __getitem__(self, key): """ look like a dictionary (get key) """ try: return self._weakref[key] except KeyError: if key in self._set: pkg = self._weakref[key] = Package(self, self._cache[key]) return pkg else: raise KeyError('The cache has no package named %r' % key) def __iter__(self): for pkgname in self._set: yield self[pkgname] raise StopIteration def has_key(self, key): return (key in self._set) def __contains__(self, key): return (key in self._set) def __len__(self): return len(self._set) def keys(self): return list(self._set) def get_changes(self): """ Get the marked changes """ changes = [] for pkg in self: if (pkg.marked_upgrade or pkg.marked_install or pkg.marked_delete or pkg.marked_downgrade or pkg.marked_reinstall): changes.append(pkg) return changes @deprecated_args def upgrade(self, dist_upgrade=False): """Upgrade all packages. If the parameter *dist_upgrade* is True, new dependencies will be installed as well (and conflicting packages may be removed). The default value is False. """ self.cache_pre_change() self._depcache.upgrade(dist_upgrade) self.cache_post_change() @property def required_download(self): """Get the size of the packages that are required to download.""" pm = apt_pkg.PackageManager(self._depcache) fetcher = apt_pkg.Acquire() pm.get_archives(fetcher, self._list, self._records) return fetcher.fetch_needed @property def required_space(self): """Get the size of the additional required space on the fs.""" return self._depcache.usr_size @property def req_reinstall_pkgs(self): """Return the packages not downloadable packages in reqreinst state.""" reqreinst = set() for pkg in self: if (not pkg.candidate.downloadable and (pkg._pkg.inst_state == apt_pkg.INSTSTATE_REINSTREQ or pkg._pkg.inst_state == apt_pkg.INSTSTATE_HOLD_REINSTREQ)): reqreinst.add(pkg.name) return reqreinst def _run_fetcher(self, fetcher): # do the actual fetching res = fetcher.run() # now check the result (this is the code from apt-get.cc) failed = False transient = False err_msg = "" for item in fetcher.items: if item.status == item.stat_done: continue if item.stat_idle: transient = True continue err_msg += "Failed to fetch %s %s\n" % (item.desc_uri, item.error_text) failed = True # we raise a exception if the download failed or it was cancelt if res == fetcher.result_cancelled: raise FetchCancelledException(err_msg) elif failed: raise FetchFailedException(err_msg) return res def _fetch_archives(self, fetcher, pm): """ fetch the needed archives """ # get lock lockfile = apt_pkg.config.find_dir("Dir::Cache::Archives") + "lock" lock = apt_pkg.get_lock(lockfile) if lock < 0: raise LockFailedException("Failed to lock %s" % lockfile) try: # this may as well throw a SystemError exception if not pm.get_archives(fetcher, self._list, self._records): return False # now run the fetcher, throw exception if something fails to be # fetched return self._run_fetcher(fetcher) finally: os.close(lock) def is_virtual_package(self, pkgname): """Return whether the package is a virtual package.""" pkg = self._cache[pkgname] return bool(pkg.provides_list and not pkg.version_list) def get_providing_packages(self, virtual): """ Return a list of packages which provide the virtual package of the specified name """ providers = [] try: vp = self._cache[virtual] if len(vp.version_list) != 0: return providers except KeyError: return providers for pkg in self: v = self._depcache.get_candidate_ver(pkg._pkg) if v is None: continue for p in v.provides_list: if virtual == p[0]: # we found a pkg that provides this virtual pkg providers.append(pkg) return providers @deprecated_args def update(self, fetch_progress=None): """Run the equivalent of apt-get update. The first parameter *fetch_progress* may be set to an instance of apt.progress.FetchProgress, the default is apt.progress.FetchProgress() . """ lockfile = apt_pkg.config.find_dir("Dir::State::Lists") + "lock" lock = apt_pkg.get_lock(lockfile) if lock < 0: raise LockFailedException("Failed to lock %s" % lockfile) try: if fetch_progress is None: fetch_progress = apt.progress.FetchProgress() return self._cache.update(fetch_progress, self._list) finally: os.close(lock) @deprecated_args def install_archives(self, pm, install_progress): """ The first parameter *pm* refers to an object returned by apt_pkg.PackageManager(). The second parameter *install_progress* refers to an InstallProgress() object of the module apt.progress. """ try: install_progress.start_update() except AttributeError: install_progress.startUpdate() res = install_progress.run(pm) try: install_progress.finish_update() except AttributeError: install_progress.finishUpdate() return res @deprecated_args def commit(self, fetch_progress=None, install_progress=None): """Apply the marked changes to the cache. The first parameter, *fetch_progress*, refers to a FetchProgress() object as found in apt.progress, the default being apt.progress.FetchProgress(). The second parameter, *install_progress*, is a apt.progress.InstallProgress() object. """ # FIXME: # use the new acquire/pkgmanager interface here, # raise exceptions when a download or install fails # and send proper error strings to the application. # Current a failed download will just display "error" # which is less than optimal! if fetch_progress is None: fetch_progress = apt.progress.FetchProgress() if install_progress is None: install_progress = apt.progress.InstallProgress() pm = apt_pkg.PackageManager(self._depcache) fetcher = apt_pkg.Acquire(fetch_progress) while True: # fetch archives first res = self._fetch_archives(fetcher, pm) # then install res = self.install_archives(pm, install_progress) if res == pm.result_completed: break if res == pm.result_failed: raise SystemError("installArchives() failed") # reload the fetcher for media swaping fetcher.shutdown() return (res == pm.result_completed) def clear(self): """ Unmark all changes """ self._depcache.init() # cache changes def cache_post_change(self): " called internally if the cache has changed, emit a signal then " self._run_callbacks("cache_post_change") def cache_pre_change(self): """ called internally if the cache is about to change, emit a signal then """ self._run_callbacks("cache_pre_change") def connect(self, name, callback): """ connect to a signal, currently only used for cache_{post,pre}_{changed,open} """ if not name in self._callbacks: self._callbacks[name] = [] self._callbacks[name].append(callback) if apt_pkg._COMPAT_0_7: _runCallbacks = function_deprecated_by(_run_callbacks) getChanges = function_deprecated_by(get_changes) requiredDownload = AttributeDeprecatedBy('required_download') additionalRequiredSpace = AttributeDeprecatedBy('required_space') reqReinstallPkgs = AttributeDeprecatedBy('req_reinstall_pkgs') _runFetcher = function_deprecated_by(_run_fetcher) _fetchArchives = function_deprecated_by(_fetch_archives) isVirtualPackage = function_deprecated_by(is_virtual_package) getProvidingPackages = function_deprecated_by(get_providing_packages) installArchives = function_deprecated_by(install_archives) cachePostChange = function_deprecated_by(cache_post_change) cachePreChange = function_deprecated_by(cache_pre_change) # ----------------------------- experimental interface class Filter(object): """ Filter base class """ def apply(self, pkg): """ Filter function, return True if the package matchs a filter criteria and False otherwise """ return True class MarkedChangesFilter(Filter): """ Filter that returns all marked changes """ def apply(self, pkg): if pkg.marked_install or pkg.marked_delete or pkg.marked_upgrade: return True else: return False class FilteredCache(object): """ A package cache that is filtered. Can work on a existing cache or create a new one """ def __init__(self, cache=None, progress=None): if cache is None: self.cache = Cache(progress) else: self.cache = cache self.cache.connect("cache_post_change", self.filter_cache_post_change) self.cache.connect("cache_post_open", self.filter_cache_post_change) self._filtered = {} self._filters = [] def __len__(self): return len(self._filtered) def __getitem__(self, key): return self.cache[key] def __iter__(self): for pkgname in self._filtered: yield self.cache[pkgname] def keys(self): return self._filtered.keys() def has_key(self, key): return (key in self._filtered) def __contains__(self, key): return (key in self._filtered) def _reapply_filter(self): " internal helper to refilter " self._filtered = {} for pkg in self.cache: for f in self._filters: if f.apply(pkg): self._filtered[pkg.name] = 1 break def set_filter(self, filter): """Set the current active filter.""" self._filters = [] self._filters.append(filter) #self._reapplyFilter() # force a cache-change event that will result in a refiltering self.cache.cache_post_change() def filter_cache_post_change(self): """Called internally if the cache changes, emit a signal then.""" #print "filterCachePostChange()" self._reapply_filter() # def connect(self, name, callback): # self.cache.connect(name, callback) def __getattr__(self, key): """we try to look exactly like a real cache.""" #print "getattr: %s " % key return getattr(self.cache, key) if apt_pkg._COMPAT_0_7: _reapplyFilter = function_deprecated_by(_reapply_filter) setFilter = function_deprecated_by(set_filter) filterCachePostChange = function_deprecated_by(\ filter_cache_post_change) def cache_pre_changed(): print "cache pre changed" def cache_post_changed(): print "cache post changed" def _test(): """Internal test code.""" print "Cache self test" apt_pkg.init() cache = Cache(apt.progress.OpTextProgress()) cache.connect("cache_pre_change", cache_pre_changed) cache.connect("cache_post_change", cache_post_changed) print ("aptitude" in cache) pkg = cache["aptitude"] print pkg.name print len(cache) for pkgname in cache.keys(): assert cache[pkgname].name == pkgname cache.upgrade() changes = cache.get_changes() print len(changes) for pkg in changes: assert pkg.name # see if fetching works for dir in ["/tmp/pytest", "/tmp/pytest/partial"]: if not os.path.exists(dir): os.mkdir(dir) apt_pkg.config.set("Dir::Cache::Archives", "/tmp/pytest") pm = apt_pkg.PackageManager(cache._depcache) fetcher = apt_pkg.Acquire(apt.progress.TextFetchProgress()) cache._fetch_archives(fetcher, pm) #sys.exit(1) print "Testing filtered cache (argument is old cache)" filtered = FilteredCache(cache) filtered.cache.connect("cache_pre_change", cache_pre_changed) filtered.cache.connect("cache_post_change", cache_post_changed) filtered.cache.upgrade() filtered.set_filter(MarkedChangesFilter()) print len(filtered) for pkgname in filtered.keys(): assert pkgname == filtered[pkg].name print len(filtered) print "Testing filtered cache (no argument)" filtered = FilteredCache(progress=apt.progress.OpTextProgress()) filtered.cache.connect("cache_pre_change", cache_pre_changed) filtered.cache.connect("cache_post_change", cache_post_changed) filtered.cache.upgrade() filtered.set_filter(MarkedChangesFilter()) print len(filtered) for pkgname in filtered.keys(): assert pkgname == filtered[pkgname].name print len(filtered) if __name__ == '__main__': _test()