# cache.py - apt cache abstraction # # Copyright (c) 2005-2009 Canonical # # Author: Michael Vogt # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import os import weakref import apt_pkg from apt import Package import apt.progress class FetchCancelledException(IOError): """Exception that is thrown when the user cancels a fetch operation.""" class FetchFailedException(IOError): """Exception that is thrown when fetching fails.""" class LockFailedException(IOError): """Exception that is thrown when locking fails.""" class Cache(object): """Dictionary-like package cache. This class has all the packages that are available in it's dictionary """ def __init__(self, progress=None, rootdir=None, memonly=False): self._callbacks = {} self._weakref = weakref.WeakValueDictionary() self._set = set() if memonly: # force apt to build its caches in memory apt_pkg.Config.Set("Dir::Cache::pkgcache", "") if rootdir: if os.path.exists(rootdir+"/etc/apt/apt.conf"): apt_pkg.ReadConfigFile(apt_pkg.Config, rootdir + "/etc/apt/apt.conf") if os.path.isdir(rootdir+"/etc/apt/apt.conf.d"): apt_pkg.ReadConfigDir(apt_pkg.Config, rootdir + "/etc/apt/apt.conf.d") apt_pkg.Config.Set("Dir", rootdir) apt_pkg.Config.Set("Dir::State::status", rootdir + "/var/lib/dpkg/status") # create required dirs/files when run with special rootdir # automatically self._check_and_create_required_dirs(rootdir) # Call InitSystem so the change to Dir::State::Status is actually # recognized (LP: #320665) apt_pkg.InitSystem() self.open(progress) def _check_and_create_required_dirs(self, rootdir): """ check if the required apt directories/files are there and if not create them """ files = ["/var/lib/dpkg/status", "/etc/apt/sources.list", ] dirs = ["/var/lib/dpkg", "/etc/apt/", "/var/cache/apt/archives/partial", "/var/lib/apt/lists/partial", ] for d in dirs: if not os.path.exists(rootdir+d): print "creating: ",rootdir+d os.makedirs(rootdir+d) for f in files: if not os.path.exists(rootdir+f): open(rootdir+f,"w") def _runCallbacks(self, name): """ internal helper to run a callback """ if name in self._callbacks: for callback in self._callbacks[name]: callback() def open(self, progress=None): """ Open the package cache, after that it can be used like a dictionary """ if progress is None: progress = apt.progress.OpProgress() self._runCallbacks("cache_pre_open") self._cache = apt_pkg.GetCache(progress) self._depcache = apt_pkg.GetDepCache(self._cache) self._records = apt_pkg.GetPkgRecords(self._cache) self._list = apt_pkg.GetPkgSourceList() self._list.ReadMainList() self._set.clear() self._weakref.clear() progress.Op = "Building data structures" i=last=0 size=len(self._cache.Packages) for pkg in self._cache.Packages: if progress is not None and last+100 < i: progress.update(i/float(size)*100) last=i # drop stuff with no versions (cruft) if len(pkg.VersionList) > 0: self._set.add(pkg.Name) i += 1 progress.done() self._runCallbacks("cache_post_open") def __getitem__(self, key): """ look like a dictionary (get key) """ try: return self._weakref[key] except KeyError: if key in self._set: pkg = self._weakref[key] = Package(self, self._cache[key]) return pkg else: raise KeyError('The cache has no package named %r' % key) def __iter__(self): for pkgname in self._set: yield self[pkgname] raise StopIteration def has_key(self, key): return (key in self._set) def __contains__(self, key): return (key in self._set) def __len__(self): return len(self._set) def keys(self): return list(self._set) def getChanges(self): """ Get the marked changes """ changes = [] for p in self: if p.markedUpgrade or p.markedInstall or p.markedDelete or \ p.markedDowngrade or p.markedReinstall: changes.append(p) return changes def upgrade(self, distUpgrade=False): """ Upgrade the all package, DistUpgrade will also install new dependencies """ self.cachePreChange() self._depcache.Upgrade(distUpgrade) self.cachePostChange() @property def requiredDownload(self): """Get the size of the packages that are required to download.""" pm = apt_pkg.GetPackageManager(self._depcache) fetcher = apt_pkg.GetAcquire() pm.GetArchives(fetcher, self._list, self._records) return fetcher.FetchNeeded @property def additionalRequiredSpace(self): """Get the size of the additional required space on the fs.""" return self._depcache.UsrSize @property def reqReinstallPkgs(self): """Return the packages not downloadable packages in reqreinst state.""" reqreinst = set() for pkg in self: if (not pkg.candidate.downloadable and (pkg._pkg.InstState == apt_pkg.InstStateReInstReq or pkg._pkg.InstState == apt_pkg.InstStateHoldReInstReq)): reqreinst.add(pkg.name) return reqreinst def _runFetcher(self, fetcher): # do the actual fetching res = fetcher.Run() # now check the result (this is the code from apt-get.cc) failed = False transient = False errMsg = "" for item in fetcher.Items: if item.Status == item.StatDone: continue if item.StatIdle: transient = True continue errMsg += "Failed to fetch %s %s\n" % (item.DescURI, item.ErrorText) failed = True # we raise a exception if the download failed or it was cancelt if res == fetcher.ResultCancelled: raise FetchCancelledException(errMsg) elif failed: raise FetchFailedException(errMsg) return res def _fetchArchives(self, fetcher, pm): """ fetch the needed archives """ # get lock lockfile = apt_pkg.Config.FindDir("Dir::Cache::Archives") + "lock" lock = apt_pkg.GetLock(lockfile) if lock < 0: raise LockFailedException("Failed to lock %s" % lockfile) try: # this may as well throw a SystemError exception if not pm.GetArchives(fetcher, self._list, self._records): return False # now run the fetcher, throw exception if something fails to be # fetched return self._runFetcher(fetcher) finally: os.close(lock) def isVirtualPackage(self, pkgname): """Return whether the package is a virtual package.""" pkg = self._cache[pkgname] return bool(pkg.ProvidesList and not pkg.VersionList) def getProvidingPackages(self, virtual): """ Return a list of packages which provide the virtual package of the specified name """ providers = [] try: vp = self._cache[virtual] if len(vp.VersionList) != 0: return providers except KeyError: return providers for pkg in self: v = self._depcache.GetCandidateVer(pkg._pkg) if v is None: continue for p in v.ProvidesList: if virtual == p[0]: # we found a pkg that provides this virtual pkg providers.append(pkg) return providers def update(self, fetchProgress=None): " run the equivalent of apt-get update " lockfile = apt_pkg.Config.FindDir("Dir::State::Lists") + "lock" lock = apt_pkg.GetLock(lockfile) if lock < 0: raise LockFailedException("Failed to lock %s" % lockfile) try: if fetchProgress is None: fetchProgress = apt.progress.FetchProgress() return self._cache.Update(fetchProgress, self._list) finally: os.close(lock) def installArchives(self, pm, installProgress): installProgress.startUpdate() res = installProgress.run(pm) installProgress.finishUpdate() return res def commit(self, fetchProgress=None, installProgress=None): """ Apply the marked changes to the cache """ # FIXME: # use the new acquire/pkgmanager interface here, # raise exceptions when a download or install fails # and send proper error strings to the application. # Current a failed download will just display "error" # which is less than optimal! if fetchProgress is None: fetchProgress = apt.progress.FetchProgress() if installProgress is None: installProgress = apt.progress.InstallProgress() pm = apt_pkg.GetPackageManager(self._depcache) fetcher = apt_pkg.GetAcquire(fetchProgress) while True: # fetch archives first res = self._fetchArchives(fetcher, pm) # then install res = self.installArchives(pm, installProgress) if res == pm.ResultCompleted: break if res == pm.ResultFailed: raise SystemError("installArchives() failed") # reload the fetcher for media swaping fetcher.Shutdown() return (res == pm.ResultCompleted) def clear(self): """ Unmark all changes """ self._depcache.Init() # cache changes def cachePostChange(self): " called internally if the cache has changed, emit a signal then " self._runCallbacks("cache_post_change") def cachePreChange(self): """ called internally if the cache is about to change, emit a signal then """ self._runCallbacks("cache_pre_change") def connect(self, name, callback): """ connect to a signal, currently only used for cache_{post,pre}_{changed,open} """ if not name in self._callbacks: self._callbacks[name] = [] self._callbacks[name].append(callback) @property def broken_count(self): """Return the number of packages with broken dependencies.""" return self._depcache.broken_count @property def delete_count(self): """Return the number of packages marked for deletion.""" return self._depcache.del_count @property def install_count(self): """Return the number of packages marked for installation.""" return self._depcache.inst_count @property def keep_count(self): """Return the number of packages marked as keep.""" return self._depcache.keep_count # ----------------------------- experimental interface class Filter(object): """ Filter base class """ def apply(self, pkg): """ Filter function, return True if the package matchs a filter criteria and False otherwise """ return True class MarkedChangesFilter(Filter): """ Filter that returns all marked changes """ def apply(self, pkg): if pkg.markedInstall or pkg.markedDelete or pkg.markedUpgrade: return True else: return False class FilteredCache(object): """ A package cache that is filtered. Can work on a existing cache or create a new one """ def __init__(self, cache=None, progress=None): if cache is None: self.cache = Cache(progress) else: self.cache = cache self.cache.connect("cache_post_change", self.filterCachePostChange) self.cache.connect("cache_post_open", self.filterCachePostChange) self._filtered = {} self._filters = [] def __len__(self): return len(self._filtered) def __getitem__(self, key): return self.cache[key] def __iter__(self): for pkgname in self._filtered: yield self.cache[pkgname] def keys(self): return self._filtered.keys() def has_key(self, key): return (key in self._filtered) def __contains__(self, key): return (key in self._filtered) def _reapplyFilter(self): " internal helper to refilter " self._filtered = {} for pkg in self.cache: for f in self._filters: if f.apply(pkg): self._filtered[pkg.name] = 1 break def setFilter(self, filter): """Set the current active filter.""" self._filters = [] self._filters.append(filter) #self._reapplyFilter() # force a cache-change event that will result in a refiltering self.cache.cachePostChange() def filterCachePostChange(self): """Called internally if the cache changes, emit a signal then.""" #print "filterCachePostChange()" self._reapplyFilter() # def connect(self, name, callback): # self.cache.connect(name, callback) def __getattr__(self, key): """we try to look exactly like a real cache.""" #print "getattr: %s " % key return getattr(self.cache, key) def cache_pre_changed(): print "cache pre changed" def cache_post_changed(): print "cache post changed" # internal test code if __name__ == "__main__": print "Cache self test" apt_pkg.init() c = Cache(apt.progress.OpTextProgress()) c.connect("cache_pre_change", cache_pre_changed) c.connect("cache_post_change", cache_post_changed) print ("aptitude" in c) p = c["aptitude"] print p.name print len(c) for pkg in c.keys(): x= c[pkg].name c.upgrade() changes = c.getChanges() print len(changes) for p in changes: #print p.name x = p.name # see if fetching works for d in ["/tmp/pytest", "/tmp/pytest/partial"]: if not os.path.exists(d): os.mkdir(d) apt_pkg.Config.Set("Dir::Cache::Archives", "/tmp/pytest") pm = apt_pkg.GetPackageManager(c._depcache) fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress()) c._fetchArchives(fetcher, pm) #sys.exit(1) print "Testing filtered cache (argument is old cache)" f = FilteredCache(c) f.cache.connect("cache_pre_change", cache_pre_changed) f.cache.connect("cache_post_change", cache_post_changed) f.cache.upgrade() f.setFilter(MarkedChangesFilter()) print len(f) for pkg in f.keys(): #print c[pkg].name x = f[pkg].name print len(f) print "Testing filtered cache (no argument)" f = FilteredCache(progress=apt.progress.OpTextProgress()) f.cache.connect("cache_pre_change", cache_pre_changed) f.cache.connect("cache_post_change", cache_post_changed) f.cache.upgrade() f.setFilter(MarkedChangesFilter()) print len(f) for pkg in f.keys(): #print c[pkg].name x = f[pkg].name print len(f)