diff options
| author | Julian Andres Klode <jak@debian.org> | 2009-01-09 21:49:42 +0100 |
|---|---|---|
| committer | Julian Andres Klode <jak@debian.org> | 2009-01-09 21:49:42 +0100 |
| commit | 9c0e4ac3ec3657c5b77ec4a50b47af08625d3985 (patch) | |
| tree | 5d50dc578374b172129c74161689edcb74822064 | |
| parent | 47feb903a8bc9b5ac4d93167b4c316ca82caa1b8 (diff) | |
| download | python-apt-9c0e4ac3ec3657c5b77ec4a50b47af08625d3985.tar.gz | |
* apt/package.py: Cleanup, move Origin to module-level, enhance Record
This commit includes multiple changes:
First of all, the code has been adjusted to follow PEP 8 and the documentation
has been improved.
Secondly, the Origin class has been moved out of the Package class into the
module, and its __repr__ has been changed.
Thirdly, the Record class has been enhances with some new methods, and the
attribute containing the string has been removed, as we can reproduce the
string using the TagSection in the _rec attribute. The methods added are
__contains__(), __iter__(), get() and iteritems().
Fourthly, the method getChangelog() has been fixed in some areas, especially
in the exception handling.
The method/property installedFiles() is now enclosing its file access into
a try/finally block.
All the testing code has been moved into a _test() function for now, until
the new testing functionality is implemented.
| -rw-r--r-- | apt/package.py | 442 |
1 files changed, 277 insertions, 165 deletions
diff --git a/apt/package.py b/apt/package.py index a4840e35..7817c64c 100644 --- a/apt/package.py +++ b/apt/package.py @@ -18,56 +18,136 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA - +"""Functionality related to packages.""" +import gettext import httplib import sys -import random import re import socket -import string import urllib2 import apt_pkg + +__all__ = 'BaseDependency', 'Dependency', 'Origin', 'Package', 'Record' + + # Set a timeout for the changelog download socket.setdefaulttimeout(2) -#from gettext import gettext as _ -import gettext -def _(s): return gettext.dgettext("python-apt", s) + +def _(string): + """Return the translation of the string.""" + return gettext.dgettext("python-apt", string) + class BaseDependency(object): - " a single dependency " + """A single dependency. + + Attributes defined here: + name - The name of the dependency + relation - The relation (>>,>=,==,<<,<=,) + version - The version depended on + preDepend - Boolean value whether this is a pre-dependency. + """ + def __init__(self, name, rel, ver, pre): self.name = name self.relation = rel self.version = ver self.preDepend = pre + class Dependency(object): + """Represent an Or-group of dependencies. + + Attributes defined here: + or_dependencies - The possible choices + """ + def __init__(self, alternatives): self.or_dependencies = alternatives + +class Origin(object): + """The origin of a version. + + Attributes defined here: + archive - The archive (eg. unstable) + component - The component (eg. main) + label - The Label, as set in the Release file + origin - The Origin, as set in the Release file + site - The hostname of the site. + trusted - Boolean value whether this is trustworthy. + """ + + def __init__(self, pkg, VerFileIter): + self.archive = VerFileIter.Archive + self.component = VerFileIter.Component + self.label = VerFileIter.Label + self.origin = VerFileIter.Origin + self.site = VerFileIter.Site + # check the trust + indexfile = pkg._list.FindIndex(VerFileIter) + if indexfile and indexfile.IsTrusted: + self.trusted = True + else: + self.trusted = False + + def __repr__(self): + return ("<Origin component:'%s' archive:'%s' origin:'%s' label:'%s'" + "site:'%s' isTrusted:'%s'>") % (self.component, self.archive, + self.origin, self.label, + self.site, self.trusted) + + class Record(object): - """ represents a pkgRecord, can be accessed like a - dictionary and gives the original package record - if accessed as a string """ - def __init__(self, s): - self._str = s - self._rec = apt_pkg.ParseSection(s) + """Represent a pkgRecord. + + It can be accessed like a dictionary and can also give the original package + record if accessed as a string. + """ + + def __init__(self, record_str): + self._rec = apt_pkg.ParseSection(record_str) + def __str__(self): - return self._str + return str(self._rec) + def __getitem__(self, key): - k = self._rec.get(key) - if k is None: - raise KeyError - return k + return self._rec[key] + + def __contains__(self, key): + return self._rec.has_key(key) + + def __iter__(self): + return iter(self._rec.keys()) + + def iteritems(self): + """An iterator over the (key, value) items of the record.""" + for key in self._rec.keys(): + yield key, self._rec[key] + + def get(self, key, default=None): + """Return record[key] if key in record, else `default`. + + The parameter `default` must be either a string or None. + """ + return self._rec.get(key, default) + def has_key(self, key): + """deprecated form of 'key in x'.""" return self._rec.has_key(key) + class Package(object): - """ This class represents a package in the cache + """Representation of a package in a cache. + + This class provides methods and properties for working with a package. It + lets you mark the package for installation, check if it is installed, and + much more. """ + def __init__(self, cache, depcache, records, sourcelist, pcache, pkgiter): """ Init the Package object """ self._cache = cache # low level cache @@ -77,122 +157,135 @@ class Package(object): self._list = sourcelist # sourcelist self._pcache = pcache # python cache in cache.py self._changelog = "" # Cached changelog - pass - # helper def _lookupRecord(self, UseCandidate=True): - """ internal helper that moves the Records to the right - position, must be called before _records is accessed """ + """Internal helper that moves the Records to the right position. + + Must be called before _records is accessed. + """ if UseCandidate: ver = self._depcache.GetCandidateVer(self._pkg) else: ver = self._pkg.CurrentVer # check if we found a version - if ver == None: - #print "No version for: %s (Candidate: %s)" % (self._pkg.Name, UseCandidate) + if ver is None: + #print "No version for: %s (Candidate: %s)" % (self._pkg.Name, + # UseCandidate) return False - if ver.FileList == None: + if ver.FileList is None: print "No FileList for: %s " % self._pkg.Name() return False f, index = ver.FileList.pop(0) - self._records.Lookup((f,index)) + self._records.Lookup((f, index)) return True - @property def name(self): - """ Return the name of the package """ + """Return the name of the package.""" return self._pkg.Name @property def id(self): - """ Return a uniq ID for the pkg, can be used to store - additional information about the pkg """ + """Return a uniq ID for the package. + + This can be used eg. to store additional information about the pkg.""" + return self._pkg.ID + + def __hash__(self): + """Return the hash of the object. + + This returns the same value as ID, which is unique.""" return self._pkg.ID @property def installedVersion(self): - """ Return the installed version as string """ + """Return the installed version as string.""" ver = self._pkg.CurrentVer - if ver != None: + if ver is not None: return ver.VerStr else: return None @property def candidateVersion(self): - """ Return the candidate version as string """ + """Return the candidate version as string.""" ver = self._depcache.GetCandidateVer(self._pkg) - if ver != None: + if ver is not None: return ver.VerStr else: return None def _getDependencies(self, ver): + """Get the dependencies for a given version of a package.""" depends_list = [] depends = ver.DependsList for t in ["PreDepends", "Depends"]: - if not depends.has_key(t): - continue - for depVerList in depends[t]: - base_deps = [] - for depOr in depVerList: - base_deps.append(BaseDependency(depOr.TargetPkg.Name, depOr.CompType, depOr.TargetVer, (t == "PreDepends"))) - depends_list.append(Dependency(base_deps)) + try: + for depVerList in depends[t]: + base_deps = [] + for depOr in depVerList: + base_deps.append(BaseDependency(depOr.TargetPkg.Name, + depOr.CompType, depOr.TargetVer, + (t == "PreDepends"))) + depends_list.append(Dependency(base_deps)) + except KeyError: + pass return depends_list @property def candidateDependencies(self): - """ return a list of candidate dependencies """ + """Return a list of candidate dependencies.""" candver = self._depcache.GetCandidateVer(self._pkg) - if candver == None: + if candver is None: return [] return self._getDependencies(candver) @property def installedDependencies(self): - """ return a list of installed dependencies """ + """Return a list of installed dependencies.""" ver = self._pkg.CurrentVer - if ver == None: + if ver is None: return [] return self._getDependencies(ver) @property def architecture(self): + """Return the Architecture of the package""" if not self._lookupRecord(): return None sec = apt_pkg.ParseSection(self._records.Record) - if sec.has_key("Architecture"): + try: return sec["Architecture"] - return None + except KeyError: + return None def _downloadable(self, useCandidate=True): - """ helper, return if the version is downloadable """ + """Return True if the package is downloadable.""" if useCandidate: ver = self._depcache.GetCandidateVer(self._pkg) else: ver = self._pkg.CurrentVer - if ver == None: + if ver is None: return False return ver.Downloadable @property def candidateDownloadable(self): - " returns if the canidate is downloadable " - return self._downloadable(useCandidate=True) + """Return True if the candidate is downloadable.""" + return self._downloadable(True) @property def installedDownloadable(self): - " returns if the installed version is downloadable " - return self._downloadable(useCandidate=False) + """Return True if the installed version is downloadable.""" + return self._downloadable(False) @property def sourcePackageName(self): - """ Return the source package name as string """ + """Return the source package name as string.""" if not self._lookupRecord(): - if not self._lookupRecord(UseCandidate=False): + if not self._lookupRecord(False): return self._pkg.Name src = self._records.SourcePkg if src != "": @@ -202,19 +295,19 @@ class Package(object): @property def homepage(self): - """ Return the homepage field as string """ + """Return the homepage field as string.""" if not self._lookupRecord(): return None return self._records.Homepage @property def section(self): - """ Return the section of the package""" + """Return the section of the package.""" return self._pkg.Section @property def priority(self): - """ Return the priority (of the candidate version)""" + """Return the priority (of the candidate version).""" ver = self._depcache.GetCandidateVer(self._pkg) if ver: return ver.PriorityStr @@ -223,7 +316,7 @@ class Package(object): @property def installedPriority(self): - """ Return the priority (of the installed version)""" + """Return the priority (of the installed version).""" ver = self._depcache.GetCandidateVer(self._pkg) if ver: return ver.PriorityStr @@ -232,7 +325,7 @@ class Package(object): @property def summary(self): - """ Return the short description (one line summary) """ + """Return the short description (one line summary).""" if not self._lookupRecord(): return "" ver = self._depcache.GetCandidateVer(self._pkg) @@ -242,7 +335,8 @@ class Package(object): @property def description(self, format=True, useDots=False): - """ + """Return the formatted long description. + Return the formated long description according to the Debian policy (Chapter 5.6.13). See http://www.debian.org/doc/debian-policy/ch-controlfields.html @@ -258,14 +352,15 @@ class Package(object): self._records.Lookup(desc_iter.FileList.pop(0)) desc = "" try: - s = unicode(self._records.LongDesc,"utf-8") - except UnicodeDecodeError,e: - s = _("Invalid unicode in description for '%s' (%s). " - "Please report.") % (self.name,e) - lines = string.split(s, "\n") + dsc = unicode(self._records.LongDesc, "utf-8") + except UnicodeDecodeError, err: + dsc = _("Invalid unicode in description for '%s' (%s). " + "Please report.") % (self.name, err) + lines = dsc.split("\n") for i in range(len(lines)): # Skip the first line, since its a duplication of the summary - if i == 0: continue + if i == 0: + continue raw_line = lines[i] if raw_line.strip() == ".": # The line is just line break @@ -296,44 +391,45 @@ class Package(object): @property def rawDescription(self): - """ return the long description (raw)""" + """return the long description (raw).""" if not self._lookupRecord(): return "" return self._records.LongDesc @property def candidateRecord(self): - " return the full pkgrecord as string of the candidate version " + """Return the Record of the candidate version of the package.""" if not self._lookupRecord(True): return None return Record(self._records.Record) @property def installedRecord(self): - " return the full pkgrecord as string of the installed version " + """Return the Record of the candidate version of the package.""" if not self._lookupRecord(False): return None return Record(self._records.Record) # depcache states + @property def markedInstall(self): - """ Package is marked for install """ + """Return True if the package is marked for install.""" return self._depcache.MarkedInstall(self._pkg) @property def markedUpgrade(self): - """ Package is marked for upgrade """ + """Return True if the package is marked for upgrade.""" return self._depcache.MarkedUpgrade(self._pkg) @property def markedDelete(self): - """ Package is marked for delete """ + """Return True if the package is marked for delete.""" return self._depcache.MarkedDelete(self._pkg) @property def markedKeep(self): - """ Package is marked for keep """ + """Return True if the package is marked for keep.""" return self._depcache.MarkedKeep(self._pkg) @property @@ -343,49 +439,51 @@ class Package(object): @property def markedReinstall(self): - """ Package is marked for reinstall """ + """Return True if the package is marked for reinstall.""" return self._depcache.MarkedReinstall(self._pkg) @property def isInstalled(self): - """ Package is installed """ - return (self._pkg.CurrentVer != None) + """Return True if the package is installed.""" + return (self._pkg.CurrentVer is not None) @property def isUpgradable(self): - """ Package is upgradable """ + """Return True if the package is upgradable.""" return self.isInstalled and self._depcache.IsUpgradable(self._pkg) @property def isAutoRemovable(self): - """ - Package is installed as a automatic dependency and is - no longer required + """Return True if the package is no longer required. + + If the package has been installed automatically as a dependency of + another package, and if no packages depend on it anymore, the package + is no longer required. """ return self.isInstalled and self._depcache.IsGarbage(self._pkg) + # sizes - # size @property def packageSize(self): - """ The size of the candidate deb package """ + """Return the size of the candidate deb package.""" ver = self._depcache.GetCandidateVer(self._pkg) return ver.Size @property def installedPackageSize(self): - """ The size of the installed deb package """ + """Return the size of the installed deb package.""" ver = self._pkg.CurrentVer return ver.Size @property def candidateInstalledSize(self, UseCandidate=True): - """ The size of the candidate installed package """ + """Return the size of the candidate installed package.""" ver = self._depcache.GetCandidateVer(self._pkg) @property def installedSize(self): - """ The size of the currently installed package """ + """Return the size of the currently installed package.""" ver = self._pkg.CurrentVer if ver is None: return 0 @@ -393,38 +491,42 @@ class Package(object): @property def installedFiles(self): - """ - Return the list of unicode names of the files which have + """Return a list of files installed by the package. + + Return a list of unicode names of the files which have been installed by this package """ path = "/var/lib/dpkg/info/%s.list" % self.name try: - list = open(path) - files = list.read().decode().split("\n") - list.close() - except: + file_list = open(path) + try: + return file_list.read().decode().split("\n") + finally: + file_list.close() + except EnvironmentError: return [] - return files def getChangelog(self, uri=None, cancel_lock=None): """ Download the changelog of the package and return it as unicode - string - - uri: Is the uri to the changelog file. The following named variables - will be substituted: src_section, prefix, src_pkg and src_ver - For example the Ubuntu changelog: - uri = "http://changelogs.ubuntu.com/changelogs/pool" \\ - "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ - "/%(src_pkg)s_%(src_ver)s/changelog" - cancel_lock: If this threading.Lock() is set, the download will be - canceled + string. + + The parameter `uri` refers to the uri of the changelog file. It may + contain multiple named variables which will be substitued. These + variables are (src_section, prefix, src_pkg, src_ver). An example is + the Ubuntu changelog: + "http://changelogs.ubuntu.com/changelogs/pool" \\ + "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ + "/%(src_pkg)s_%(src_ver)s/changelog" + + The parameter `cancel_lock` refers to an instance of threading.Lock, + which if set, prevents the download. """ # Return a cached changelog if available if self._changelog != "": return self._changelog - if uri == None: + if uri is None: if self.candidateOrigin[0].origin == "Debian": uri = "http://packages.debian.org/changelogs/pool" \ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \ @@ -449,6 +551,7 @@ class Package(object): src_ver = self.candidateVersion #print "bin: %s" % binver try: + # FIXME: This try-statement is too long ... # try to get the source version of the pkg, this differs # for some (e.g. libnspr4 on ubuntu) # this feature only works if the correct deb-src are in the @@ -468,7 +571,7 @@ class Package(object): else: # fail into the error handler raise SystemError - except SystemError, e: + except SystemError: src_ver = bin_ver l = section.split("/") @@ -481,26 +584,27 @@ class Package(object): prefix = "lib" + src_pkg[3] # stip epoch - l = string.split(src_ver,":") + l = src_ver.split(":") if len(l) > 1: src_ver = "".join(l[1:]) - uri = uri % {"src_section" : src_section, - "prefix" : prefix, - "src_pkg" : src_pkg, - "src_ver" : src_ver} + uri = uri % {"src_section": src_section, + "prefix": prefix, + "src_pkg": src_pkg, + "src_ver": src_ver} try: # Check if the download was canceled - if cancel_lock and cancel_lock.isSet(): return "" + if cancel_lock and cancel_lock.isSet(): + return "" changelog_file = urllib2.urlopen(uri) # do only get the lines that are new changelog = "" regexp = "^%s \((.*)\)(.*)$" % (re.escape(src_pkg)) - i=0 while True: # Check if the download was canceled - if cancel_lock and cancel_lock.isSet(): return "" + if cancel_lock and cancel_lock.isSet(): + return "" # Read changelog line by line line_raw = changelog_file.readline() if line_raw == "": @@ -516,7 +620,7 @@ class Package(object): # and from changelog too installed = self.installedVersion if installed and ":" in installed: - installed = installed.split(":",1)[1] + installed = installed.split(":", 1)[1] changelog_ver = match.group(1) if changelog_ver and ":" in changelog_ver: changelog_ver = changelog_ver.split(":", 1)[1] @@ -530,59 +634,47 @@ class Package(object): if len(changelog) == 0: changelog = _("The list of changes is not available") self._changelog = changelog - except urllib2.HTTPError,e: + + # FIXME: Ubuntu-specific part. + except urllib2.HTTPError: return _("The list of changes is not available yet.\n\n" "Please use http://launchpad.net/ubuntu/+source/%s/%s/" "+changelog\n" "until the changes become available or try again " - "later.") % (srcpkg, srcver), - except IOError, httplib.BadStatusLine: - return _("Failed to download the list of changes. \nPlease " - "check your Internet connection.") + "later.") % (src_pkg, src_ver) + except (IOError, httplib.BadStatusLine): + return _("Failed to download the list of changes. \nPlease " + "check your Internet connection.") return self._changelog - # canidate origin - class Origin: - def __init__(self, pkg, VerFileIter): - self.component = VerFileIter.Component - self.archive = VerFileIter.Archive - self.origin = VerFileIter.Origin - self.label = VerFileIter.Label - self.site = VerFileIter.Site - # check the trust - indexfile = pkg._list.FindIndex(VerFileIter) - if indexfile and indexfile.IsTrusted: - self.trusted = True - else: - self.trusted = False - def __repr__(self): - return "component: '%s' archive: '%s' origin: '%s' label: '%s' " \ - "site '%s' isTrusted: '%s'"% (self.component, self.archive, - self.origin, self.label, - self.site, self.trusted) - @property def candidateOrigin(self): + """Return the Origin() of the candidate version.""" ver = self._depcache.GetCandidateVer(self._pkg) if not ver: return None origins = [] - for (verFileIter,index) in ver.FileList: - origins.append(self.Origin(self, verFileIter)) + for (verFileIter, index) in ver.FileList: + origins.append(Origin(self, verFileIter)) return origins # depcache actions + def markKeep(self): - """ mark a package for keep """ + """Mark a package for keep.""" self._pcache.cachePreChange() self._depcache.MarkKeep(self._pkg) self._pcache.cachePostChange() def markDelete(self, autoFix=True, purge=False): - """ mark a package for delete. Run the resolver if autoFix is set. - Mark the package as purge (remove with configuration) if 'purge' - is set. - """ + """Mark a package for install. + + If autoFix is True, the resolver will be run, trying to fix broken + packages. This is the default. + + If purge is True, remove the configuration files of the package as + well. The default is to keep the configuration. + """ self._pcache.cachePreChange() self._depcache.MarkDelete(self._pkg, purge) # try to fix broken stuffsta @@ -596,9 +688,18 @@ class Package(object): self._pcache.cachePostChange() def markInstall(self, autoFix=True, autoInst=True, fromUser=True): - """ mark a package for install. Run the resolver if autoFix is set, - automatically install required dependencies if autoInst is set - record it as automatically installed when fromuser is set to false + """Mark a package for install. + + If autoFix is True, the resolver will be run, trying to fix broken + packages. This is the default. + + If autoInst is True, the dependencies of the packages will be installed + automatically. This is the default. + + If fromUser is True, this package will not be marked as automatically + installed. This is the default. Set it to False if you want to be able + to remove the package at a later stage if no other package depends on + it. """ self._pcache.cachePreChange() self._depcache.MarkInstall(self._pkg, autoInst, fromUser) @@ -611,23 +712,31 @@ class Package(object): self._pcache.cachePostChange() def markUpgrade(self): - """ mark a package for upgrade """ + """Mark a package for upgrade.""" if self.isUpgradable: self.markInstall() else: # FIXME: we may want to throw a exception here - sys.stderr.write("MarkUpgrade() called on a non-upgrable pkg: '%s'\n" %self._pkg.Name) + sys.stderr.write(("MarkUpgrade() called on a non-upgrable pkg: " + "'%s'\n") % self._pkg.Name) def commit(self, fprogress, iprogress): - """ commit the changes, need a FetchProgress and InstallProgress - object as argument + """Commit the changes. + + The parameter `fprogress` refers to a FetchProgress() object, as + found in apt.progress. + + The parameter `iprogress` refers to an InstallProgress() object, as + found in apt.progress. """ self._depcache.Commit(fprogress, iprogress) -# self-test -if __name__ == "__main__": +def _test(): + """Self-test.""" print "Self-test for the Package modul" + import random + import apt apt_pkg.init() cache = apt_pkg.GetCache() depcache = apt_pkg.GetDepCache(cache) @@ -653,35 +762,38 @@ if __name__ == "__main__": print "PackageSize: %s " % pkg.packageSize print "Dependencies: %s" % pkg.installedDependencies for dep in pkg.candidateDependencies: - print ",".join(["%s (%s) (%s) (%s)" % (o.name,o.version,o.relation, o.preDepend) for o in dep.or_dependencies]) + print ",".join("%s (%s) (%s) (%s)" % (o.name, o.version, o.relation, + o.preDepend) for o in dep.or_dependencies) print "arch: %s" % pkg.architecture print "homepage: %s" % pkg.homepage - print "rec: ",pkg.candidateRecord + print "rec: ", pkg.candidateRecord # now test install/remove - import apt progress = apt.progress.OpTextProgress() cache = apt.Cache(progress) - for i in [True, False]: + for i in True, False: print "Running install on random upgradable pkgs with AutoFix: %s " % i - for name in cache.keys(): - pkg = cache[name] + for pkg in cache: if pkg.isUpgradable: - if random.randint(0,1) == 1: + if random.randint(0, 1) == 1: pkg.markInstall(i) print "Broken: %s " % cache._depcache.BrokenCount print "InstCount: %s " % cache._depcache.InstCount print # get a new cache - for i in [True, False]: + for i in True, False: print "Randomly remove some packages with AutoFix: %s" % i cache = apt.Cache(progress) for name in cache.keys(): - if random.randint(0,1) == 1: + if random.randint(0, 1) == 1: try: cache[name].markDelete(i) except SystemError: print "Error trying to remove: %s " % name print "Broken: %s " % cache._depcache.BrokenCount print "DelCount: %s " % cache._depcache.DelCount + +# self-test +if __name__ == "__main__": + _test() |
