# package.py - apt package abstraction # # Copyright (c) 2005 Canonical # # Author: Michael Vogt # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA """Functionality related to packages.""" import gettext import httplib import sys import re import socket import urllib2 import apt_pkg __all__ = 'BaseDependency', 'Dependency', 'Origin', 'Package', 'Record' # Set a timeout for the changelog download socket.setdefaulttimeout(2) def _(string): """Return the translation of the string.""" return gettext.dgettext("python-apt", string) class BaseDependency(object): """A single dependency. Attributes defined here: name - The name of the dependency relation - The relation (>>,>=,==,<<,<=,) version - The version depended on preDepend - Boolean value whether this is a pre-dependency. """ def __init__(self, name, rel, ver, pre): self.name = name self.relation = rel self.version = ver self.preDepend = pre class Dependency(object): """Represent an Or-group of dependencies. Attributes defined here: or_dependencies - The possible choices """ def __init__(self, alternatives): self.or_dependencies = alternatives class Origin(object): """The origin of a version. Attributes defined here: archive - The archive (eg. unstable) component - The component (eg. main) label - The Label, as set in the Release file origin - The Origin, as set in the Release file site - The hostname of the site. trusted - Boolean value whether this is trustworthy. """ def __init__(self, pkg, VerFileIter): self.archive = VerFileIter.Archive self.component = VerFileIter.Component self.label = VerFileIter.Label self.origin = VerFileIter.Origin self.site = VerFileIter.Site # check the trust indexfile = pkg._list.FindIndex(VerFileIter) if indexfile and indexfile.IsTrusted: self.trusted = True else: self.trusted = False def __repr__(self): return ("") % (self.component, self.archive, self.origin, self.label, self.site, self.trusted) class Record(object): """Represent a pkgRecord. It can be accessed like a dictionary and can also give the original package record if accessed as a string. """ def __init__(self, record_str): self._rec = apt_pkg.ParseSection(record_str) def __str__(self): return str(self._rec) def __getitem__(self, key): return self._rec[key] def __contains__(self, key): return self._rec.has_key(key) def __iter__(self): return iter(self._rec.keys()) def iteritems(self): """An iterator over the (key, value) items of the record.""" for key in self._rec.keys(): yield key, self._rec[key] def get(self, key, default=None): """Return record[key] if key in record, else `default`. The parameter `default` must be either a string or None. """ return self._rec.get(key, default) def has_key(self, key): """deprecated form of 'key in x'.""" return self._rec.has_key(key) class Package(object): """Representation of a package in a cache. This class provides methods and properties for working with a package. It lets you mark the package for installation, check if it is installed, and much more. """ def __init__(self, cache, depcache, records, sourcelist, pcache, pkgiter): """ Init the Package object """ self._cache = cache # low level cache self._depcache = depcache self._records = records self._pkg = pkgiter self._list = sourcelist # sourcelist self._pcache = pcache # python cache in cache.py self._changelog = "" # Cached changelog def _lookupRecord(self, UseCandidate=True): """Internal helper that moves the Records to the right position. Must be called before _records is accessed. """ if UseCandidate: ver = self._depcache.GetCandidateVer(self._pkg) else: ver = self._pkg.CurrentVer # check if we found a version if ver is None: #print "No version for: %s (Candidate: %s)" % (self._pkg.Name, # UseCandidate) return False if ver.FileList is None: print "No FileList for: %s " % self._pkg.Name() return False f, index = ver.FileList.pop(0) self._records.Lookup((f, index)) return True @property def name(self): """Return the name of the package.""" return self._pkg.Name @property def id(self): """Return a uniq ID for the package. This can be used eg. to store additional information about the pkg.""" return self._pkg.ID def __hash__(self): """Return the hash of the object. This returns the same value as ID, which is unique.""" return self._pkg.ID @property def installedVersion(self): """Return the installed version as string.""" ver = self._pkg.CurrentVer if ver is not None: return ver.VerStr else: return None @property def candidateVersion(self): """Return the candidate version as string.""" ver = self._depcache.GetCandidateVer(self._pkg) if ver is not None: return ver.VerStr else: return None def _getDependencies(self, ver): """Get the dependencies for a given version of a package.""" depends_list = [] depends = ver.DependsList for t in ["PreDepends", "Depends"]: try: for depVerList in depends[t]: base_deps = [] for depOr in depVerList: base_deps.append(BaseDependency(depOr.TargetPkg.Name, depOr.CompType, depOr.TargetVer, (t == "PreDepends"))) depends_list.append(Dependency(base_deps)) except KeyError: pass return depends_list @property def candidateDependencies(self): """Return a list of candidate dependencies.""" candver = self._depcache.GetCandidateVer(self._pkg) if candver is None: return [] return self._getDependencies(candver) @property def installedDependencies(self): """Return a list of installed dependencies.""" ver = self._pkg.CurrentVer if ver is None: return [] return self._getDependencies(ver) @property def architecture(self): """Return the Architecture of the package""" if not self._lookupRecord(): return None sec = apt_pkg.ParseSection(self._records.Record) try: return sec["Architecture"] except KeyError: return None def _downloadable(self, useCandidate=True): """Return True if the package is downloadable.""" if useCandidate: ver = self._depcache.GetCandidateVer(self._pkg) else: ver = self._pkg.CurrentVer if ver is None: return False return ver.Downloadable @property def candidateDownloadable(self): """Return True if the candidate is downloadable.""" return self._downloadable(True) @property def installedDownloadable(self): """Return True if the installed version is downloadable.""" return self._downloadable(False) @property def sourcePackageName(self): """Return the source package name as string.""" if not self._lookupRecord(): if not self._lookupRecord(False): return self._pkg.Name src = self._records.SourcePkg if src != "": return src else: return self._pkg.Name @property def homepage(self): """Return the homepage field as string.""" if not self._lookupRecord(): return None return self._records.Homepage @property def section(self): """Return the section of the package.""" return self._pkg.Section @property def priority(self): """Return the priority (of the candidate version).""" ver = self._depcache.GetCandidateVer(self._pkg) if ver: return ver.PriorityStr else: return None @property def installedPriority(self): """Return the priority (of the installed version).""" ver = self._depcache.GetCandidateVer(self._pkg) if ver: return ver.PriorityStr else: return None @property def summary(self): """Return the short description (one line summary).""" if not self._lookupRecord(): return "" ver = self._depcache.GetCandidateVer(self._pkg) desc_iter = ver.TranslatedDescription self._records.Lookup(desc_iter.FileList.pop(0)) return self._records.ShortDesc @property def description(self, format=True, useDots=False): """Return the formatted long description. Return the formated long description according to the Debian policy (Chapter 5.6.13). See http://www.debian.org/doc/debian-policy/ch-controlfields.html for more information. """ if not format: return self.rawDescription if not self._lookupRecord(): return "" # get the translated description ver = self._depcache.GetCandidateVer(self._pkg) desc_iter = ver.TranslatedDescription self._records.Lookup(desc_iter.FileList.pop(0)) desc = "" try: dsc = unicode(self._records.LongDesc, "utf-8") except UnicodeDecodeError, err: dsc = _("Invalid unicode in description for '%s' (%s). " "Please report.") % (self.name, err) lines = dsc.split("\n") for i in range(len(lines)): # Skip the first line, since its a duplication of the summary if i == 0: continue raw_line = lines[i] if raw_line.strip() == ".": # The line is just line break if not desc.endswith("\n"): desc += "\n" continue elif raw_line.startswith(" "): # The line should be displayed verbatim without word wrapping if not desc.endswith("\n"): line = "\n%s\n" % raw_line[2:] else: line = "%s\n" % raw_line[2:] elif raw_line.startswith(" "): # The line is part of a paragraph. if desc.endswith("\n") or desc == "": # Skip the leading white space line = raw_line[1:] else: line = raw_line else: line = raw_line # Use dots for lists if useDots: line = re.sub(r"^(\s*)(\*|0|o|-) ", ur"\1\u2022 ", line, 1) # Add current line to the description desc += line return desc @property def rawDescription(self): """return the long description (raw).""" if not self._lookupRecord(): return "" return self._records.LongDesc @property def candidateRecord(self): """Return the Record of the candidate version of the package.""" if not self._lookupRecord(True): return None return Record(self._records.Record) @property def installedRecord(self): """Return the Record of the candidate version of the package.""" if not self._lookupRecord(False): return None return Record(self._records.Record) # depcache states @property def markedInstall(self): """Return True if the package is marked for install.""" return self._depcache.MarkedInstall(self._pkg) @property def markedUpgrade(self): """Return True if the package is marked for upgrade.""" return self._depcache.MarkedUpgrade(self._pkg) @property def markedDelete(self): """Return True if the package is marked for delete.""" return self._depcache.MarkedDelete(self._pkg) @property def markedKeep(self): """Return True if the package is marked for keep.""" return self._depcache.MarkedKeep(self._pkg) @property def markedDowngrade(self): """ Package is marked for downgrade """ return self._depcache.MarkedDowngrade(self._pkg) @property def markedReinstall(self): """Return True if the package is marked for reinstall.""" return self._depcache.MarkedReinstall(self._pkg) @property def isInstalled(self): """Return True if the package is installed.""" return (self._pkg.CurrentVer is not None) @property def isUpgradable(self): """Return True if the package is upgradable.""" return self.isInstalled and self._depcache.IsUpgradable(self._pkg) @property def isAutoRemovable(self): """Return True if the package is no longer required. If the package has been installed automatically as a dependency of another package, and if no packages depend on it anymore, the package is no longer required. """ return self.isInstalled and self._depcache.IsGarbage(self._pkg) # sizes @property def packageSize(self): """Return the size of the candidate deb package.""" ver = self._depcache.GetCandidateVer(self._pkg) return ver.Size @property def installedPackageSize(self): """Return the size of the installed deb package.""" ver = self._pkg.CurrentVer return ver.Size @property def candidateInstalledSize(self, UseCandidate=True): """Return the size of the candidate installed package.""" ver = self._depcache.GetCandidateVer(self._pkg) @property def installedSize(self): """Return the size of the currently installed package.""" ver = self._pkg.CurrentVer if ver is None: return 0 return ver.InstalledSize @property def installedFiles(self): """Return a list of files installed by the package. Return a list of unicode names of the files which have been installed by this package """ path = "/var/lib/dpkg/info/%s.list" % self.name try: file_list = open(path) try: return file_list.read().decode().split("\n") finally: file_list.close() except EnvironmentError: return [] def getChangelog(self, uri=None, cancel_lock=None): """ Download the changelog of the package and return it as unicode string. The parameter `uri` refers to the uri of the changelog file. It may contain multiple named variables which will be substitued. These variables are (src_section, prefix, src_pkg, src_ver). An example is the Ubuntu changelog: "http://changelogs.ubuntu.com/changelogs/pool" \\ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ "/%(src_pkg)s_%(src_ver)s/changelog" The parameter `cancel_lock` refers to an instance of threading.Lock, which if set, prevents the download. """ # Return a cached changelog if available if self._changelog != "": return self._changelog if uri is None: if self.candidateOrigin[0].origin == "Debian": uri = "http://packages.debian.org/changelogs/pool" \ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \ "/%(src_pkg)s_%(src_ver)s/changelog" elif self.candidateOrigin[0].origin == "Ubuntu": uri = "http://changelogs.ubuntu.com/changelogs/pool" \ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \ "/%(src_pkg)s_%(src_ver)s/changelog" else: return _("The list of changes is not available") # get the src package name src_pkg = self.sourcePackageName # assume "main" section src_section = "main" # use the section of the candidate as a starting point section = self._depcache.GetCandidateVer(self._pkg).Section # get the source version, start with the binaries version bin_ver = self.candidateVersion src_ver = self.candidateVersion #print "bin: %s" % binver try: # FIXME: This try-statement is too long ... # try to get the source version of the pkg, this differs # for some (e.g. libnspr4 on ubuntu) # this feature only works if the correct deb-src are in the # sources.list # otherwise we fall back to the binary version number src_records = apt_pkg.GetPkgSrcRecords() src_rec = src_records.Lookup(src_pkg) if src_rec: src_ver = src_records.Version #if apt_pkg.VersionCompare(binver, srcver) > 0: # srcver = binver if not src_ver: src_ver = bin_ver #print "srcver: %s" % src_ver section = src_records.Section #print "srcsect: %s" % section else: # fail into the error handler raise SystemError except SystemError: src_ver = bin_ver l = section.split("/") if len(l) > 1: src_section = l[0] # lib is handled special prefix = src_pkg[0] if src_pkg.startswith("lib"): prefix = "lib" + src_pkg[3] # stip epoch l = src_ver.split(":") if len(l) > 1: src_ver = "".join(l[1:]) uri = uri % {"src_section": src_section, "prefix": prefix, "src_pkg": src_pkg, "src_ver": src_ver} try: # Check if the download was canceled if cancel_lock and cancel_lock.isSet(): return "" changelog_file = urllib2.urlopen(uri) # do only get the lines that are new changelog = "" regexp = "^%s \((.*)\)(.*)$" % (re.escape(src_pkg)) while True: # Check if the download was canceled if cancel_lock and cancel_lock.isSet(): return "" # Read changelog line by line line_raw = changelog_file.readline() if line_raw == "": break # The changelog is encoded in utf-8, but since there isn't any # http header, urllib2 seems to treat it as ascii line = line_raw.decode("utf-8") #print line.encode('utf-8') match = re.match(regexp, line) if match: # strip epoch from installed version # and from changelog too installed = self.installedVersion if installed and ":" in installed: installed = installed.split(":", 1)[1] changelog_ver = match.group(1) if changelog_ver and ":" in changelog_ver: changelog_ver = changelog_ver.split(":", 1)[1] if installed and \ apt_pkg.VersionCompare(changelog_ver, installed) <= 0: break # EOF (shouldn't really happen) changelog += line # Print an error if we failed to extract a changelog if len(changelog) == 0: changelog = _("The list of changes is not available") self._changelog = changelog # FIXME: Ubuntu-specific part. except urllib2.HTTPError: return _("The list of changes is not available yet.\n\n" "Please use http://launchpad.net/ubuntu/+source/%s/%s/" "+changelog\n" "until the changes become available or try again " "later.") % (src_pkg, src_ver) except (IOError, httplib.BadStatusLine): return _("Failed to download the list of changes. \nPlease " "check your Internet connection.") return self._changelog @property def candidateOrigin(self): """Return the Origin() of the candidate version.""" ver = self._depcache.GetCandidateVer(self._pkg) if not ver: return None origins = [] for (verFileIter, index) in ver.FileList: origins.append(Origin(self, verFileIter)) return origins # depcache actions def markKeep(self): """Mark a package for keep.""" self._pcache.cachePreChange() self._depcache.MarkKeep(self._pkg) self._pcache.cachePostChange() def markDelete(self, autoFix=True, purge=False): """Mark a package for install. If autoFix is True, the resolver will be run, trying to fix broken packages. This is the default. If purge is True, remove the configuration files of the package as well. The default is to keep the configuration. """ self._pcache.cachePreChange() self._depcache.MarkDelete(self._pkg, purge) # try to fix broken stuffsta if autoFix and self._depcache.BrokenCount > 0: Fix = apt_pkg.GetPkgProblemResolver(self._depcache) Fix.Clear(self._pkg) Fix.Protect(self._pkg) Fix.Remove(self._pkg) Fix.InstallProtect() Fix.Resolve() self._pcache.cachePostChange() def markInstall(self, autoFix=True, autoInst=True, fromUser=True): """Mark a package for install. If autoFix is True, the resolver will be run, trying to fix broken packages. This is the default. If autoInst is True, the dependencies of the packages will be installed automatically. This is the default. If fromUser is True, this package will not be marked as automatically installed. This is the default. Set it to False if you want to be able to remove the package at a later stage if no other package depends on it. """ self._pcache.cachePreChange() self._depcache.MarkInstall(self._pkg, autoInst, fromUser) # try to fix broken stuff if autoFix and self._depcache.BrokenCount > 0: fixer = apt_pkg.GetPkgProblemResolver(self._depcache) fixer.Clear(self._pkg) fixer.Protect(self._pkg) fixer.Resolve(True) self._pcache.cachePostChange() def markUpgrade(self): """Mark a package for upgrade.""" if self.isUpgradable: self.markInstall() else: # FIXME: we may want to throw a exception here sys.stderr.write(("MarkUpgrade() called on a non-upgrable pkg: " "'%s'\n") % self._pkg.Name) def commit(self, fprogress, iprogress): """Commit the changes. The parameter `fprogress` refers to a FetchProgress() object, as found in apt.progress. The parameter `iprogress` refers to an InstallProgress() object, as found in apt.progress. """ self._depcache.Commit(fprogress, iprogress) def _test(): """Self-test.""" print "Self-test for the Package modul" import random import apt apt_pkg.init() cache = apt_pkg.GetCache() depcache = apt_pkg.GetDepCache(cache) records = apt_pkg.GetPkgRecords(cache) sourcelist = apt_pkg.GetPkgSourceList() pkgiter = cache["apt-utils"] pkg = Package(cache, depcache, records, sourcelist, None, pkgiter) print "Name: %s " % pkg.name print "ID: %s " % pkg.id print "Priority (Candidate): %s " % pkg.priority print "Priority (Installed): %s " % pkg.installedPriority print "Installed: %s " % pkg.installedVersion print "Candidate: %s " % pkg.candidateVersion print "CandidateDownloadable: %s" % pkg.candidateDownloadable print "CandidateOrigins: %s" % pkg.candidateOrigin print "SourcePkg: %s " % pkg.sourcePackageName print "Section: %s " % pkg.section print "Summary: %s" % pkg.summary print "Description (formated) :\n%s" % pkg.description print "Description (unformated):\n%s" % pkg.rawDescription print "InstalledSize: %s " % pkg.installedSize print "PackageSize: %s " % pkg.packageSize print "Dependencies: %s" % pkg.installedDependencies for dep in pkg.candidateDependencies: print ",".join("%s (%s) (%s) (%s)" % (o.name, o.version, o.relation, o.preDepend) for o in dep.or_dependencies) print "arch: %s" % pkg.architecture print "homepage: %s" % pkg.homepage print "rec: ", pkg.candidateRecord # now test install/remove progress = apt.progress.OpTextProgress() cache = apt.Cache(progress) for i in True, False: print "Running install on random upgradable pkgs with AutoFix: %s " % i for pkg in cache: if pkg.isUpgradable: if random.randint(0, 1) == 1: pkg.markInstall(i) print "Broken: %s " % cache._depcache.BrokenCount print "InstCount: %s " % cache._depcache.InstCount print # get a new cache for i in True, False: print "Randomly remove some packages with AutoFix: %s" % i cache = apt.Cache(progress) for name in cache.keys(): if random.randint(0, 1) == 1: try: cache[name].markDelete(i) except SystemError: print "Error trying to remove: %s " % name print "Broken: %s " % cache._depcache.BrokenCount print "DelCount: %s " % cache._depcache.DelCount # self-test if __name__ == "__main__": _test()