diff options
| author | Michael Vogt <michael.vogt@ubuntu.com> | 2009-01-13 17:22:27 +0100 |
|---|---|---|
| committer | Michael Vogt <michael.vogt@ubuntu.com> | 2009-01-13 17:22:27 +0100 |
| commit | 38d602dc83006c51dfe4ed594d691ea9b0679498 (patch) | |
| tree | b7aedfba82c44cad6c3012f879b5d6d7e8ad1425 /apt | |
| parent | 12cf58d12b969010f3d98b2974d72bbb950b775f (diff) | |
| parent | 614897f798d9f16591fbd29ebe2a6c5674102d2d (diff) | |
| download | python-apt-38d602dc83006c51dfe4ed594d691ea9b0679498.tar.gz | |
* apt/*.py:
- Almost complete cleanup of the code
- Remove inconsistent use of tabs and spaces (Closes: #505443)
- Improved documentation
* apt/debfile.py:
- Drop get*() methods, as they are deprecated and were
never in a stable release
- Make DscSrcPackage working
* apt/gtk/widgets.py:
- Fix the code and document the signals
* Introduce new documentation build with Sphinx
- Contains style Guide (Closes: #481562)
- debian/rules: Build the documentation here
- setup.py: Remove pydoc building and add new docs.
- debian/examples: Include examples from documentation
- debian/python-apt.docs:
+ Change html/ to build/doc/html.
+ Add build/doc/text for the text-only documentation
* setup.py:
- Only create build/data when building, not all the time
- Remove build/mo and build/data on clean -a
* debian/control:
- Remove the Conflicts on python2.3-apt, python2.4-apt, as
they are only needed for oldstable (sarge)
- Build-Depend on python-sphinx (>= 0.5)
* aptsources/distinfo.py:
- Allow @ in mirror urls (Closes: #478171) (LP: #223097)
* Merge Ben Finney's whitespace changes (Closes: #481563)
* Merge Ben Finney's do not use has_key() (Closes: #481878)
* Do not use deprecated form of raise statement (Closes: #494259)
* Add support for PkgRecords.SHA256Hash (Closes: #456113)
Diffstat (limited to 'apt')
| -rw-r--r-- | apt/README.apt | 8 | ||||
| -rw-r--r-- | apt/__init__.py | 3 | ||||
| -rw-r--r-- | apt/cache.py | 161 | ||||
| -rw-r--r-- | apt/cdrom.py | 81 | ||||
| -rw-r--r-- | apt/debfile.py | 549 | ||||
| -rw-r--r-- | apt/gtk/widgets.py | 306 | ||||
| -rw-r--r-- | apt/package.py | 550 | ||||
| -rw-r--r-- | apt/progress.py | 376 |
8 files changed, 1156 insertions, 878 deletions
diff --git a/apt/README.apt b/apt/README.apt index 30166a24..2a017bde 100644 --- a/apt/README.apt +++ b/apt/README.apt @@ -12,10 +12,4 @@ WARNING !!! The API is not 100% stable yet !!! Style Guides: ------------- - -Follow PEP08. - -Internal variables/methods are prefixed with a "_" (e.g. _foo). - - - +See ../doc/source/coding.rst diff --git a/apt/__init__.py b/apt/__init__.py index c6a2ff39..05407aff 100644 --- a/apt/__init__.py +++ b/apt/__init__.py @@ -6,7 +6,8 @@ import os # import some fancy classes from apt.package import Package from apt.cache import Cache -from apt.progress import OpProgress, FetchProgress, InstallProgress, CdromProgress +from apt.progress import ( + OpProgress, FetchProgress, InstallProgress, CdromProgress) from apt.cdrom import Cdrom from apt_pkg import SizeToStr, TimeToStr, VersionCompare diff --git a/apt/cache.py b/apt/cache.py index 79e58282..0065d14c 100644 --- a/apt/cache.py +++ b/apt/cache.py @@ -1,62 +1,68 @@ # cache.py - apt cache abstraction -# +# # Copyright (c) 2005 Canonical -# +# # Author: Michael Vogt <michael.vogt@ubuntu.com> -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA +import os +import sys + import apt_pkg from apt import Package import apt.progress -import os -import sys + class FetchCancelledException(IOError): - " Exception that is thrown when the user cancels a fetch operation " - pass + """Exception that is thrown when the user cancels a fetch operation.""" + + class FetchFailedException(IOError): - " Exception that is thrown when fetching fails " - pass + """Exception that is thrown when fetching fails.""" + + class LockFailedException(IOError): - " Exception that is thrown when locking fails " - pass + """Exception that is thrown when locking fails.""" + class Cache(object): - """ Dictionary-like package cache - This class has all the packages that are available in it's - dictionary + """Dictionary-like package cache. + + This class has all the packages that are available in it's + dictionary """ def __init__(self, progress=None, rootdir=None, memonly=False): self._callbacks = {} if memonly: # force apt to build its caches in memory - apt_pkg.Config.Set("Dir::Cache::pkgcache","") + apt_pkg.Config.Set("Dir::Cache::pkgcache", "") if rootdir: apt_pkg.Config.Set("Dir", rootdir) - apt_pkg.Config.Set("Dir::State::status", rootdir + "/var/lib/dpkg/status") + apt_pkg.Config.Set("Dir::State::status", + rootdir + "/var/lib/dpkg/status") self.open(progress) def _runCallbacks(self, name): """ internal helper to run a callback """ - if self._callbacks.has_key(name): + if name in self._callbacks: for callback in self._callbacks[name]: callback() - + def open(self, progress): """ Open the package cache, after that it can be used like a dictionary @@ -70,12 +76,12 @@ class Cache(object): self._dict = {} # build the packages dict - if progress != None: + if progress is not None: progress.Op = "Building data structures" i=last=0 size=len(self._cache.Packages) for pkg in self._cache.Packages: - if progress != None and last+100 < i: + if progress is not None and last+100 < i: progress.update(i/float(size)*100) last=i # drop stuff with no versions (cruft) @@ -83,12 +89,12 @@ class Cache(object): self._dict[pkg.Name] = Package(self._cache, self._depcache, self._records, self._list, self, pkg) - + i += 1 - if progress != None: + if progress is not None: progress.done() self._runCallbacks("cache_post_open") - + def __getitem__(self, key): """ look like a dictionary (get key) """ return self._dict[key] @@ -99,10 +105,10 @@ class Cache(object): raise StopIteration def has_key(self, key): - return self._dict.has_key(key) + return (key in self._dict) def __contains__(self, key): - return key in self._dict + return (key in self._dict) def __len__(self): return len(self._dict) @@ -112,7 +118,7 @@ class Cache(object): def getChanges(self): """ Get the marked changes """ - changes = [] + changes = [] for name in self._dict.keys(): p = self._dict[name] if p.markedUpgrade or p.markedInstall or p.markedDelete or \ @@ -130,21 +136,23 @@ class Cache(object): @property def requiredDownload(self): - """ get the size of the packages that are required to download """ + """Get the size of the packages that are required to download.""" pm = apt_pkg.GetPackageManager(self._depcache) fetcher = apt_pkg.GetAcquire() pm.GetArchives(fetcher, self._list, self._records) return fetcher.FetchNeeded + @property def additionalRequiredSpace(self): - """ get the size of the additional required space on the fs """ + """Get the size of the additional required space on the fs.""" return self._depcache.UsrSize + @property def reqReinstallPkgs(self): - " return the packages not downloadable packages in reqreinst state " + """Return the packages not downloadable packages in reqreinst state.""" reqreinst = set() for pkg in self: - if (not pkg.candidateDownloadable and + if (not pkg.candidateDownloadable and (pkg._pkg.InstState == apt_pkg.InstStateReInstReq or pkg._pkg.InstState == apt_pkg.InstStateHoldReInstReq)): reqreinst.add(pkg.name) @@ -153,7 +161,7 @@ class Cache(object): def _runFetcher(self, fetcher): # do the actual fetching res = fetcher.Run() - + # now check the result (this is the code from apt-get.cc) failed = False transient = False @@ -164,14 +172,15 @@ class Cache(object): if item.StatIdle: transient = True continue - errMsg += "Failed to fetch %s %s\n" % (item.DescURI,item.ErrorText) + errMsg += "Failed to fetch %s %s\n" % (item.DescURI, + item.ErrorText) failed = True # we raise a exception if the download failed or it was cancelt if res == fetcher.ResultCancelled: - raise FetchCancelledException, errMsg + raise FetchCancelledException(errMsg) elif failed: - raise FetchFailedException, errMsg + raise FetchFailedException(errMsg) return res def _fetchArchives(self, fetcher, pm): @@ -181,7 +190,7 @@ class Cache(object): lockfile = apt_pkg.Config.FindDir("Dir::Cache::Archives") + "lock" lock = apt_pkg.GetLock(lockfile) if lock < 0: - raise LockFailedException, "Failed to lock %s" % lockfile + raise LockFailedException("Failed to lock %s" % lockfile) try: # this may as well throw a SystemError exception @@ -193,6 +202,11 @@ class Cache(object): finally: os.close(lock) + def isVirtualPackage(self, pkgname): + """Return whether the package is a virtual package.""" + pkg = self._cache[pkgname] + return bool(pkg.ProvidesList and not pkg.VersionList) + def getProvidingPackages(self, virtual): """ Return a list of packages which provide the virtual package of the @@ -207,7 +221,7 @@ class Cache(object): return providers for pkg in self: v = self._depcache.GetCandidateVer(pkg._pkg) - if v == None: + if v is None: continue for p in v.ProvidesList: if virtual == p[0]: @@ -220,21 +234,21 @@ class Cache(object): lockfile = apt_pkg.Config.FindDir("Dir::State::Lists") + "lock" lock = apt_pkg.GetLock(lockfile) if lock < 0: - raise LockFailedException, "Failed to lock %s" % lockfile + raise LockFailedException("Failed to lock %s" % lockfile) try: - if fetchProgress == None: + if fetchProgress is None: fetchProgress = apt.progress.FetchProgress() return self._cache.Update(fetchProgress, self._list) finally: os.close(lock) - + def installArchives(self, pm, installProgress): installProgress.startUpdate() res = installProgress.run(pm) installProgress.finishUpdate() return res - + def commit(self, fetchProgress=None, installProgress=None): """ Apply the marked changes to the cache """ # FIXME: @@ -244,9 +258,9 @@ class Cache(object): # Current a failed download will just display "error" # which is less than optimal! - if fetchProgress == None: + if fetchProgress is None: fetchProgress = apt.progress.FetchProgress() - if installProgress == None: + if installProgress is None: installProgress = apt.progress.InstallProgress() pm = apt_pkg.GetPackageManager(self._depcache) @@ -260,16 +274,17 @@ class Cache(object): if res == pm.ResultCompleted: break if res == pm.ResultFailed: - raise SystemError, "installArchives() failed" + raise SystemError("installArchives() failed") # reload the fetcher for media swaping fetcher.Shutdown() return (res == pm.ResultCompleted) def clear(self): - """ Unmark all changes """ - self._depcache.Init() + """ Unmark all changes """ + self._depcache.Init() # cache changes + def cachePostChange(self): " called internally if the cache has changed, emit a signal then " self._runCallbacks("cache_post_change") @@ -282,34 +297,42 @@ class Cache(object): def connect(self, name, callback): """ connect to a signal, currently only used for cache_{post,pre}_{changed,open} """ - if not self._callbacks.has_key(name): + if not name in self._callbacks: self._callbacks[name] = [] self._callbacks[name].append(callback) + # ----------------------------- experimental interface + + class Filter(object): """ Filter base class """ + def apply(self, pkg): """ Filter function, return True if the package matchs a filter criteria and False otherwise """ return True + class MarkedChangesFilter(Filter): """ Filter that returns all marked changes """ + def apply(self, pkg): if pkg.markedInstall or pkg.markedDelete or pkg.markedUpgrade: return True else: return False + class FilteredCache(object): """ A package cache that is filtered. Can work on a existing cache or create a new one """ + def __init__(self, cache=None, progress=None): - if cache == None: + if cache is None: self.cache = Cache(progress) else: self.cache = cache @@ -317,17 +340,25 @@ class FilteredCache(object): self.cache.connect("cache_post_open", self.filterCachePostChange) self._filtered = {} self._filters = [] + def __len__(self): return len(self._filtered) - + def __getitem__(self, key): return self.cache._dict[key] + def __iter__(self): + for pkgname in self._filtered: + yield self.cache[pkgname] + def keys(self): return self._filtered.keys() def has_key(self, key): - return self._filtered.has_key(key) + return (key in self._filtered) + + def __contains__(self, key): + return (key in self._filtered) def _reapplyFilter(self): " internal helper to refilter " @@ -337,9 +368,9 @@ class FilteredCache(object): if f.apply(self.cache._dict[pkg]): self._filtered[pkg] = 1 break - + def setFilter(self, filter): - " set the current active filter " + """Set the current active filter.""" self._filters = [] self._filters.append(filter) #self._reapplyFilter() @@ -347,7 +378,7 @@ class FilteredCache(object): self.cache.cachePostChange() def filterCachePostChange(self): - " called internally if the cache changes, emit a signal then " + """Called internally if the cache changes, emit a signal then.""" #print "filterCachePostChange()" self._reapplyFilter() @@ -355,17 +386,15 @@ class FilteredCache(object): # self.cache.connect(name, callback) def __getattr__(self, key): - " we try to look exactly like a real cache " + """we try to look exactly like a real cache.""" #print "getattr: %s " % key - if self.__dict__.has_key(key): - return self.__dict__[key] - else: - return getattr(self.cache, key) - + return getattr(self.cache, key) + def cache_pre_changed(): print "cache pre changed" + def cache_post_changed(): print "cache post changed" @@ -377,7 +406,7 @@ if __name__ == "__main__": c = Cache(apt.progress.OpTextProgress()) c.connect("cache_pre_change", cache_pre_changed) c.connect("cache_post_change", cache_post_changed) - print c.has_key("aptitude") + print ("aptitude" in c) p = c["aptitude"] print p.name print len(c) @@ -397,7 +426,7 @@ if __name__ == "__main__": for d in ["/tmp/pytest", "/tmp/pytest/partial"]: if not os.path.exists(d): os.mkdir(d) - apt_pkg.Config.Set("Dir::Cache::Archives","/tmp/pytest") + apt_pkg.Config.Set("Dir::Cache::Archives", "/tmp/pytest") pm = apt_pkg.GetPackageManager(c._depcache) fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress()) c._fetchArchives(fetcher, pm) @@ -413,7 +442,7 @@ if __name__ == "__main__": for pkg in f.keys(): #print c[pkg].name x = f[pkg].name - + print len(f) print "Testing filtered cache (no argument)" @@ -426,5 +455,5 @@ if __name__ == "__main__": for pkg in f.keys(): #print c[pkg].name x = f[pkg].name - + print len(f) diff --git a/apt/cdrom.py b/apt/cdrom.py index 9d4b62cb..61250fc4 100644 --- a/apt/cdrom.py +++ b/apt/cdrom.py @@ -1,14 +1,49 @@ +# cdrom.py - CDROM handling +# +# Copyright (c) 2005 Canonical +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Classes related to cdrom handling.""" +import glob + import apt_pkg -from progress import CdromProgress +from apt.progress import CdromProgress + class Cdrom(object): + """Support for apt-cdrom like features. + + This class has several optional parameters for initialisation, which may + be used to influence the behaviour of the object: + + The optional parameter `progress` is a CdromProgress() subclass, which will + ask for the correct cdrom, etc. If not specified or None, a CdromProgress() + object will be used. + + The optional parameter `mountpoint` may be used to specify an alternative + mountpoint. + + If the optional parameter `nomount` is True, the cdroms will not be + mounted. This is the default behaviour. + """ + def __init__(self, progress=None, mountpoint=None, nomount=True): - """ Support for apt-cdrom like features. - Options: - - progress: optional progress.CdromProgress() subclass - - mountpoint: optional alternative mountpoint - - nomount: do not mess with mount/umount the CD - """ self._cdrom = apt_pkg.GetCdrom() if progress is None: self._progress = CdromProgress() @@ -16,32 +51,36 @@ class Cdrom(object): self._progress = progress # see if we have a alternative mountpoint if mountpoint is not None: - apt_pkg.Config.Set("Acquire::cdrom::mount",mountpoint) + apt_pkg.Config.Set("Acquire::cdrom::mount", mountpoint) # do not mess with mount points by default - if nomount is True: + if nomount: apt_pkg.Config.Set("APT::CDROM::NoMount", "true") else: apt_pkg.Config.Set("APT::CDROM::NoMount", "false") + def add(self): - " add cdrom to the sources.list " + """Add cdrom to the sources.list.""" return self._cdrom.Add(self._progress) + def ident(self): - " identify the cdrom " + """Identify the cdrom.""" (res, ident) = self._cdrom.Ident(self._progress) if res: return ident - return None + @property def inSourcesList(self): - " check if the cdrom is already in the current sources.list " - cdid = self.ident() - if cdid is None: + """Check if the cdrom is already in the current sources.list.""" + cd_id = self.ident() + if cd_id is None: # FIXME: throw exception instead return False - # FIXME: check sources.list.d/ as well - for line in open(apt_pkg.Config.FindFile("Dir::Etc::sourcelist")): - line = line.strip() - if not line.startswith("#") and cdid in line: - return True + # Get a list of files + src = glob.glob(apt_pkg.Config.FindDir("Dir::Etc::sourceparts") + '*') + src.append(apt_pkg.Config.FindFile("Dir::Etc::sourcelist")) + # Check each file + for fname in src: + for line in open(fname): + if not line.lstrip().startswith("#") and cd_id in line: + return True return False - diff --git a/apt/debfile.py b/apt/debfile.py index b1d436cd..f24f19f4 100644 --- a/apt/debfile.py +++ b/apt/debfile.py @@ -19,24 +19,25 @@ # along with GDebi; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # - -import warnings -warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning) -import apt_inst, apt_pkg -import sys -import os +"""Classes for working with locally available Debian packages.""" from gettext import gettext as _ +import os +import sys + +import apt_inst +import apt_pkg + # Constants for comparing the local package file with the version in the cache -(VERSION_NONE, - VERSION_OUTDATED, - VERSION_SAME, - VERSION_NEWER) = range(4) - +(VERSION_NONE, VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER) = range(4) + + class NoDebArchiveException(IOError): - pass + """Exception which is raised if a file is no Debian archive.""" + class DebPackage(object): + """A Debian Package (.deb file).""" _supported_data_members = ("data.tar.gz", "data.tar.bz2", "data.tar.lzma") @@ -44,11 +45,10 @@ class DebPackage(object): def __init__(self, filename=None, cache=None): self._cache = cache - self.file = filename - self._needPkgs = [] + self._need_pkgs = [] self._sections = {} - self._installedConflicts = set() - self._failureString = "" + self._installed_conflicts = set() + self._failure_string = "" if filename: self.open(filename) @@ -56,35 +56,42 @@ class DebPackage(object): " open given debfile " self.filename = filename if not apt_inst.arCheckMember(open(self.filename), "debian-binary"): - raise NoDebArchiveException, _("This is not a valid DEB archive, missing '%s' member" % "debian-binary") + raise NoDebArchiveException(_("This is not a valid DEB archive, " + "missing '%s' member" % + "debian-binary")) control = apt_inst.debExtractControl(open(self.filename)) self._sections = apt_pkg.ParseSection(control) self.pkgname = self._sections["Package"] def __getitem__(self, key): return self._sections[key] - + + @property def filelist(self): - """ return the list of files in the deb """ + """return the list of files in the deb.""" files = [] - def extract_cb(What,Name,Link,Mode,UID,GID,Size,MTime,Major,Minor): - #print "%s '%s','%s',%u,%u,%u,%u,%u,%u,%u"\ - # % (What,Name,Link,Mode,UID,GID,Size, MTime, Major, Minor) - files.append(Name) + + def extract_cb(what, name, *_): + files.append(name) + for member in self._supported_data_members: if apt_inst.arCheckMember(open(self.filename), member): try: - apt_inst.debExtract(open(self.filename), extract_cb, member) + apt_inst.debExtract(open(self.filename), extract_cb, + member) break - except SystemError, e: - return [_("List of files for '%s'could not be read" % self.filename)] + except SystemError: + return [_("List of files for '%s'could not be read" % + self.filename)] return files - filelist = property(filelist) - def _isOrGroupSatisfied(self, or_group): - """ this function gets a 'or_group' and analyzes if - at least one dependency of this group is already satisfied """ - self._dbg(2,"_checkOrGroup(): %s " % (or_group)) + def _is_or_group_satisfied(self, or_group): + """Return True if at least one dependency of the or-group is satisfied. + + This method gets an 'or_group' and analyzes if at least one dependency + of this group is already satisfied. + """ + self._dbg(2, "_checkOrGroup(): %s " % (or_group)) for dep in or_group: depname = dep[0] @@ -92,9 +99,10 @@ class DebPackage(object): oper = dep[2] # check for virtual pkgs - if not self._cache.has_key(depname): + if not depname in self._cache: if self._cache.isVirtualPackage(depname): - self._dbg(3,"_isOrGroupSatisfied(): %s is virtual dep" % depname) + self._dbg(3, "_isOrGroupSatisfied(): %s is virtual dep" % + depname) for pkg in self._cache.getProvidingPackages(depname): if pkg.isInstalled: return True @@ -102,24 +110,17 @@ class DebPackage(object): inst = self._cache[depname] instver = inst.installedVersion - if instver != None and apt_pkg.CheckDep(instver,oper,ver) == True: + if instver is not None and apt_pkg.CheckDep(instver, oper, ver): return True return False - - - def _satisfyOrGroup(self, or_group): - """ try to satisfy the or_group """ - - or_found = False - virtual_pkg = None + def _satisfy_or_group(self, or_group): + """Try to satisfy the or_group.""" for dep in or_group: - depname = dep[0] - ver = dep[1] - oper = dep[2] + depname, ver, oper = dep # if we don't have it in the cache, it may be virtual - if not self._cache.has_key(depname): + if not depname in self._cache: if not self._cache.isVirtualPackage(depname): continue providers = self._cache.getProvidingPackages(depname) @@ -128,55 +129,59 @@ class DebPackage(object): if len(providers) != 1: continue depname = providers[0].name - + # now check if we can satisfy the deps with the candidate(s) # in the cache - cand = self._cache[depname] - candver = self._cache._depcache.GetCandidateVer(cand._pkg) - if not candver: + pkg = self._cache[depname] + cand = self._cache._depcache.GetCandidateVer(pkg._pkg) + if not cand: continue - if not apt_pkg.CheckDep(candver.VerStr,oper,ver): + if not apt_pkg.CheckDep(cand.VerStr, oper, ver): continue # check if we need to install it - self._dbg(2,"Need to get: %s" % depname) - self._needPkgs.append(depname) + self._dbg(2, "Need to get: %s" % depname) + self._need_pkgs.append(depname) return True # if we reach this point, we failed or_str = "" for dep in or_group: or_str += dep[0] - if dep != or_group[len(or_group)-1]: + if dep != or_group[-1]: or_str += "|" - self._failureString += _("Dependency is not satisfiable: %s\n" % or_str) + self._failure_string += _("Dependency is not satisfiable: %s\n" % + or_str) return False - def _checkSinglePkgConflict(self, pkgname, ver, oper): - """ returns true if a pkg conflicts with a real installed/marked - pkg """ + def _check_single_pkg_conflict(self, pkgname, ver, oper): + """Return True if a pkg conflicts with a real installed/marked pkg.""" # FIXME: deal with conflicts against its own provides # (e.g. Provides: ftp-server, Conflicts: ftp-server) - self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" % (pkgname, ver, oper)) - pkgver = None - cand = self._cache[pkgname] - if cand.isInstalled: - pkgver = cand.installedVersion - elif cand.markedInstall: - pkgver = cand.candidateVersion + self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" % + (pkgname, ver, oper)) + + pkg = self._cache[pkgname] + if pkg.isInstalled: + pkgver = pkg.installedVersion + elif pkg.markedInstall: + pkgver = pkg.candidateVersion + else: + return False #print "pkg: %s" % pkgname #print "ver: %s" % ver #print "pkgver: %s " % pkgver #print "oper: %s " % oper - if (pkgver and apt_pkg.CheckDep(pkgver,oper,ver) and - not self.replacesRealPkg(pkgname, oper, ver)): - self._failureString += _("Conflicts with the installed package '%s'" % cand.name) + if (apt_pkg.CheckDep(pkgver, oper, ver) and not + self.replaces_real_pkg(pkgname, oper, ver)): + self._failure_string += _("Conflicts with the installed package " + "'%s'" % pkg.name) return True return False - def _checkConflictsOrGroup(self, or_group): - """ check the or-group for conflicts with installed pkgs """ - self._dbg(2,"_checkConflictsOrGroup(): %s " % (or_group)) + def _check_conflicts_or_group(self, or_group): + """Check the or-group for conflicts with installed pkgs.""" + self._dbg(2, "_check_conflicts_or_group(): %s " % (or_group)) or_found = False virtual_pkg = None @@ -187,152 +192,116 @@ class DebPackage(object): oper = dep[2] # check conflicts with virtual pkgs - if not self._cache.has_key(depname): - # FIXME: we have to check for virtual replaces here as + if not depname in self._cache: + # FIXME: we have to check for virtual replaces here as # well (to pass tests/gdebi-test8.deb) if self._cache.isVirtualPackage(depname): for pkg in self._cache.getProvidingPackages(depname): self._dbg(3, "conflicts virtual check: %s" % pkg.name) # P/C/R on virtal pkg, e.g. ftpd - if self.pkgName == pkg.name: + if self.pkgname == pkg.name: self._dbg(3, "conflict on self, ignoring") continue - if self._checkSinglePkgConflict(pkg.name,ver,oper): - self._installedConflicts.add(pkg.name) + if self._check_single_pkg_conflict(pkg.name, ver, oper): + self._installed_conflicts.add(pkg.name) continue - if self._checkSinglePkgConflict(depname,ver,oper): - self._installedConflicts.add(depname) - return len(self._installedConflicts) != 0 - - def getConflicts(self): - """ - Return list of package names conflicting with this package. - - WARNING: This method will is deprecated. Please use the - attribute DebPackage.depends instead. - """ - return self.conflicts + if self._check_single_pkg_conflict(depname, ver, oper): + self._installed_conflicts.add(depname) + return bool(self._installed_conflicts) + @property def conflicts(self): - """ - List of package names conflicting with this package - """ - conflicts = [] + """List of package names conflicting with this package.""" key = "Conflicts" - if self._sections.has_key(key): - conflicts = apt_pkg.ParseDepends(self._sections[key]) - return conflicts - conflicts = property(conflicts) - - def getDepends(self): - """ - Return list of package names on which this package depends on. - - WARNING: This method will is deprecated. Please use the - attribute DebPackage.depends instead. - """ - return self.depends + try: + return apt_pkg.ParseDepends(self._sections[key]) + except KeyError: + return [] + @property def depends(self): - """ - List of package names on which this package depends on - """ + """List of package names on which this package depends on.""" depends = [] # find depends - for key in ["Depends","PreDepends"]: - if self._sections.has_key(key): + for key in "Depends", "PreDepends": + try: depends.extend(apt_pkg.ParseDepends(self._sections[key])) + except KeyError: + pass return depends - depends = property(depends) - - def getProvides(self): - """ - Return list of virtual packages which are provided by this package. - - WARNING: This method will is deprecated. Please use the - attribute DebPackage.provides instead. - """ - return self.provides + @property def provides(self): - """ - List of virtual packages which are provided by this package - """ - provides = [] + """List of virtual packages which are provided by this package.""" key = "Provides" - if self._sections.has_key(key): - provides = apt_pkg.ParseDepends(self._sections[key]) - return provides - provides = property(provides) - - def getReplaces(self): - """ - Return list of packages which are replaced by this package. - - WARNING: This method will is deprecated. Please use the - attribute DebPackage.replaces instead. - """ - return self.replaces + try: + return apt_pkg.ParseDepends(self._sections[key]) + except KeyError: + return [] + @property def replaces(self): - """ - List of packages which are replaced by this package - """ - replaces = [] + """List of packages which are replaced by this package.""" key = "Replaces" - if self._sections.has_key(key): - replaces = apt_pkg.ParseDepends(self._sections[key]) - return replaces - replaces = property(replaces) - - def replacesRealPkg(self, pkgname, oper, ver): - """ - return True if the deb packages replaces a real (not virtual) - packages named pkgname, oper, ver + try: + return apt_pkg.ParseDepends(self._sections[key]) + except KeyError: + return [] + + def replaces_real_pkg(self, pkgname, oper, ver): + """Return True if a given non-virtual package is replaced. + + Return True if the deb packages replaces a real (not virtual) + packages named (pkgname, oper, ver). """ - self._dbg(3, "replacesPkg() %s %s %s" % (pkgname,oper,ver)) - pkgver = None - cand = self._cache[pkgname] - if cand.isInstalled: - pkgver = cand.installedVersion - elif cand.markedInstall: - pkgver = cand.candidateVersion - for or_group in self.getReplaces(): + self._dbg(3, "replacesPkg() %s %s %s" % (pkgname, oper, ver)) + pkg = self._cache[pkgname] + if pkg.isInstalled: + pkgver = pkg.installedVersion + elif pkg.markedInstall: + pkgver = pkg.candidateVersion + else: + pkgver = None + for or_group in self.replaces: for (name, ver, oper) in or_group: - if (name == pkgname and - apt_pkg.CheckDep(pkgver,oper,ver)): - self._dbg(3, "we have a replaces in our package for the conflict against '%s'" % (pkgname)) + if (name == pkgname and apt_pkg.CheckDep(pkgver, oper, ver)): + self._dbg(3, "we have a replaces in our package for the " + "conflict against '%s'" % (pkgname)) return True return False - def checkConflicts(self): - """ check if the pkg conflicts with a existing or to be installed - package. Return True if the pkg is ok """ + def check_conflicts(self): + """Check if there are conflicts with existing or selected packages. + + Check if the package conflicts with a existing or to be installed + package. Return True if the pkg is OK. + """ res = True - for or_group in self.getConflicts(): - if self._checkConflictsOrGroup(or_group): + for or_group in self.conflicts: + if self._check_conflicts_or_group(or_group): #print "Conflicts with a exisiting pkg!" - #self._failureString = "Conflicts with a exisiting pkg!" + #self._failure_string = "Conflicts with a exisiting pkg!" res = False return res - - def compareToVersionInCache(self, useInstalled=True): - """ checks if the pkg is already installed or availabe in the cache - and if so in what version, returns if the version of the deb - is not available,older,same,newer + def compare_to_version_in_cache(self, use_installed=True): + """Compare the package to the version available in the cache. + + Checks if the package is already installed or availabe in the cache + and if so in what version, returns one of (VERSION_NONE, + VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER). """ - self._dbg(3,"compareToVersionInCache") + self._dbg(3, "compareToVersionInCache") pkgname = self._sections["Package"] debver = self._sections["Version"] - self._dbg(1,"debver: %s" % debver) - if self._cache.has_key(pkgname): - if useInstalled: + self._dbg(1, "debver: %s" % debver) + if pkgname in self._cache: + if use_installed: cachever = self._cache[pkgname].installedVersion else: cachever = self._cache[pkgname].candidateVersion - if cachever != None: - cmp = apt_pkg.VersionCompare(cachever,debver) + if cachever is not None: + cmp = apt_pkg.VersionCompare(cachever, debver) self._dbg(1, "CompareVersion(debver,instver): %s" % cmp) if cmp == 0: return VERSION_SAME @@ -342,82 +311,88 @@ class DebPackage(object): return VERSION_OUTDATED return VERSION_NONE - def checkDeb(self): - self._dbg(3,"checkDepends") + def check(self): + """Check if the package is installable.""" + self._dbg(3, "checkDepends") # check arch arch = self._sections["Architecture"] if arch != "all" and arch != apt_pkg.Config.Find("APT::Architecture"): - self._dbg(1,"ERROR: Wrong architecture dude!") - self._failureString = _("Wrong architecture '%s'" % arch) + self._dbg(1, "ERROR: Wrong architecture dude!") + self._failure_string = _("Wrong architecture '%s'" % arch) return False # check version - res = self.compareToVersionInCache() - if res == VERSION_OUTDATED: # the deb is older than the installed - self._failureString = _("A later version is already installed") + if self.compare_to_version_in_cache() == VERSION_OUTDATED: + # the deb is older than the installed + self._failure_string = _("A later version is already installed") return False # FIXME: this sort of error handling sux - self._failureString = "" + self._failure_string = "" # check conflicts - if not self.checkConflicts(): + if not self.check_conflicts(): return False # try to satisfy the dependencies - res = self._satisfyDepends(self.getDepends()) - if not res: + if not self._satisfy_depends(self.depends): return False # check for conflicts again (this time with the packages that are # makeed for install) - if not self.checkConflicts(): + if not self.check_conflicts(): return False if self._cache._depcache.BrokenCount > 0: - self._failureString = _("Failed to satisfy all dependencies (broken cache)") + self._failure_string = _("Failed to satisfy all dependencies " + "(broken cache)") # clean the cache again self._cache.clear() return False return True - def satisfyDependsStr(self, dependsstr): - return self._satisfyDepends(apt_pkg.ParseDepends(dependsstr)) + def satisfy_depends_str(self, dependsstr): + """Satisfy the dependencies in the given string.""" + return self._satisfy_depends(apt_pkg.ParseDepends(dependsstr)) - def _satisfyDepends(self, depends): + def _satisfy_depends(self, depends): + """Satisfy the dependencies.""" # turn off MarkAndSweep via a action group (if available) try: _actiongroup = apt_pkg.GetPkgActionGroup(self._cache._depcache) - except AttributeError, e: + except AttributeError: pass # check depends for or_group in depends: #print "or_group: %s" % or_group - #print "or_group satified: %s" % self._isOrGroupSatisfied(or_group) - if not self._isOrGroupSatisfied(or_group): - if not self._satisfyOrGroup(or_group): + #print "or_group satified: %s" % self._is_or_group_satisfied(or_group) + if not self._is_or_group_satisfied(or_group): + if not self._satisfy_or_group(or_group): return False # now try it out in the cache - for pkg in self._needPkgs: - try: - self._cache[pkg].markInstall(fromUser=False) - except SystemError, e: - self._failureString = _("Cannot install '%s'" % pkg) - self._cache.clear() - return False + for pkg in self._need_pkgs: + try: + self._cache[pkg].markInstall(fromUser=False) + except SystemError, e: + self._failure_string = _("Cannot install '%s'" % pkg) + self._cache.clear() + return False return True - def missingDeps(self): - self._dbg(1, "Installing: %s" % self._needPkgs) - if self._needPkgs == None: - self.checkDeb() - return self._needPkgs - missingDeps = property(missingDeps) + @property + def missing_deps(self): + """Return missing dependencies.""" + self._dbg(1, "Installing: %s" % self._need_pkgs) + if self._need_pkgs is None: + self.check() + return self._need_pkgs - def requiredChanges(self): - """ gets the required changes to satisfy the depends. - returns a tuple with (install, remove, unauthenticated) + @property + def required_changes(self): + """Get the changes required to satisfy the dependencies. + + Returns: a tuple with (install, remove, unauthenticated) """ install = [] remove = [] @@ -434,95 +409,118 @@ class DebPackage(object): unauthenticated.append(pkg.name) if pkg.markedDelete: remove.append(pkg.name) - return (install,remove, unauthenticated) - requiredChanges = property(requiredChanges) + return (install, remove, unauthenticated) def _dbg(self, level, msg): - """Write debugging output to sys.stderr. - """ + """Write debugging output to sys.stderr.""" if level <= self.debug: print >> sys.stderr, msg - def install(self, installProgress=None): - """ Install the package """ - if installProgress == None: - res = os.system("/usr/sbin/dpkg -i %s" % self.filename) + def install(self, install_progress=None): + """Install the package.""" + if install_progress is None: + return os.system("dpkg -i %s" % self.filename) else: - installProgress.startUpdate() - res = installProgress.run(self.filename) - installProgress.finishUpdate() - return res + try: + install_progress.start_update() + except AttributeError: + install_progress.startUpdate() + res = install_progress.run(self.filename) + try: + install_progress.finish_update() + except AttributeError: + install_progress.finishUpdate() + return res + class DscSrcPackage(DebPackage): + """A locally available source package.""" + def __init__(self, filename=None, cache=None): - DebPackage.__init__(self, filename, cache) - self.depends = [] - self.conflicts = [] - self.binaries = [] - if filename != None: + DebPackage.__init__(self, None, cache) + self._depends = [] + self._conflicts = [] + self._binaries = [] + if filename is not None: self.open(filename) - def getConflicts(self): - return self.conflicts - def getDepends(self): - return self.depends + + @property + def depends(self): + """Return the dependencies of the package""" + return self._depends + + @property + def conflicts(self): + """Return the dependencies of the package""" + return self._conflicts + def open(self, file): - depends_tags = ["Build-Depends:", "Build-Depends-Indep:"] - conflicts_tags = ["Build-Conflicts:", "Build-Conflicts-Indep:"] - for line in open(file): - # check b-d and b-c - for tag in depends_tags: - if line.startswith(tag): - key = line[len(tag):].strip() - self.depends.extend(apt_pkg.ParseSrcDepends(key)) - for tag in conflicts_tags: - if line.startswith(tag): - key = line[len(tag):].strip() - self.conflicts.extend(apt_pkg.ParseSrcDepends(key)) - # check binary and source and version - if line.startswith("Source:"): - self.pkgName = line[len("Source:"):].strip() - if line.startswith("Binary:"): - self.binaries = [pkg.strip() for pkg in line[len("Binary:"):].split(",")] - if line.startswith("Version:"): - self._sections["Version"] = line[len("Version:"):].strip() - # we are at the end - if line.startswith("-----BEGIN PGP SIGNATURE-"): - break + """Open the package.""" + depends_tags = ["Build-Depends", "Build-Depends-Indep"] + conflicts_tags = ["Build-Conflicts", "Build-Conflicts-Indep"] + + fobj = open(file) + tagfile = apt_pkg.ParseTagFile(fobj) + sec = tagfile.Section + try: + while tagfile.Step() == 1: + for tag in depends_tags: + if not sec.has_key(tag): + continue + self._depends.extend(apt_pkg.ParseSrcDepends(sec[tag])) + for tag in conflicts_tags: + if not sec.has_key(tag): + continue + self._conflicts.extend(apt_pkg.ParseSrcDepends(sec[tag])) + if sec.has_key('Source'): + self.pkgname = sec['Source'] + if sec.has_key('Binary'): + self.binaries = sec['Binary'].split(', ') + if sec.has_key('Version'): + self._sections['Version'] = sec['Version'] + finally: + del sec + del tagfile + fobj.close() + s = _("Install Build-Dependencies for " - "source package '%s' that builds %s\n" - ) % (self.pkgName, " ".join(self.binaries)) + "source package '%s' that builds %s\n") % (self.pkgname, + " ".join(self.binaries)) self._sections["Description"] = s - - def checkDeb(self): - if not self.checkConflicts(): - for pkgname in self._installedConflicts: + + def check(self): + """Check if the package is installable..""" + if not self.check_conflicts(): + for pkgname in self._installed_conflicts: if self._cache[pkgname]._pkg.Essential: - raise Exception, _("A essential package would be removed") + raise Exception(_("An essential package would be removed")) self._cache[pkgname].markDelete() # FIXME: a additional run of the checkConflicts() # after _satisfyDepends() should probably be done - return self._satisfyDepends(self.depends) + return self._satisfy_depends(self.depends) -if __name__ == "__main__": - from cache import Cache - from progress import DpkgInstallProgress + +def _test(): + """Test function""" + from apt.cache import Cache + from apt.progress import DpkgInstallProgress cache = Cache() vp = "www-browser" - print "%s virtual: %s" % (vp,cache.isVirtualPackage(vp)) + #print "%s virtual: %s" % (vp, cache.isVirtualPackage(vp)) providers = cache.getProvidingPackages(vp) print "Providers for %s :" % vp for pkg in providers: print " %s" % pkg.name - + d = DebPackage(sys.argv[1], cache) print "Deb: %s" % d.pkgname - if not d.checkDeb(): + if not d.check(): print "can't be satified" - print d._failureString - print "missing deps: %s" % d.missingDeps - print d.requiredChanges + print d._failure_string + print "missing deps: %s" % d.missing_deps + print d.required_changes print "Installing ..." ret = d.install(DpkgInstallProgress()) @@ -535,4 +533,7 @@ if __name__ == "__main__": s = DscSrcPackage(cache=cache) d = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)" - print s._satisfyDepends(apt_pkg.ParseDepends(d)) + print s._satisfy_depends(apt_pkg.ParseDepends(d)) + +if __name__ == "__main__": + _test() diff --git a/apt/gtk/widgets.py b/apt/gtk/widgets.py index 3a15258f..34cc2759 100644 --- a/apt/gtk/widgets.py +++ b/apt/gtk/widgets.py @@ -1,28 +1,26 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -widgets - GTK widgets to show the progress and status of apt - -Copyright (c) 2004,2005 Canonical Ltd. - -Authors: Michael Vogt <mvo@ubuntu.com> - Sebastian Heinlein <glatzor@ubuntu.com> - -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License as -published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version. - -his program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 -USA -""" +# +# Copyright (c) 2004-2005 Canonical +# +# Authors: Michael Vogt <michael.vogt@ubuntu.com> +# Sebastian Heinlein <glatzor@ubuntu.com> +# Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# his program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""GObject-powered progress classes and a GTK+ status widget.""" from gettext import gettext as _ import os @@ -39,15 +37,32 @@ import vte import apt import apt_pkg + +def mksig(params=(), run=gobject.SIGNAL_RUN_FIRST, rettype=gobject.TYPE_NONE): + """Simplified Create a gobject signal. + + This allows us to write signals easier, because we just need to define the + type of the parameters (in most cases). + + ``params`` is a tuple which defines the types of the arguments. + """ + return (run, rettype, params) + + class GOpProgress(gobject.GObject, apt.progress.OpProgress): + """Operation progress with GObject signals. + + Signals: - __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, - (gobject.TYPE_STRING, gobject.TYPE_INT)), - "status-started":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ()), - "status-finished":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ())} + * status-changed(str: operation, int: percent) + * status-started() - Not Implemented yet + * status-finished() + + """ + + __gsignals__ = {"status-changed": mksig((str, int)), + "status-started": mksig(), + "status-finished": mksig()} def __init__(self): apt.progress.OpProgress.__init__(self) @@ -55,31 +70,38 @@ class GOpProgress(gobject.GObject, apt.progress.OpProgress): self._context = glib.main_context_default() def update(self, percent): + """Called to update the percentage done""" self.emit("status-changed", self.op, percent) while self._context.pending(): self._context.iteration() def done(self): + """Called when all operation have finished.""" self.emit("status-finished") + class GInstallProgress(gobject.GObject, apt.progress.InstallProgress): + """Installation progress with GObject signals. + + Signals: + + * status-changed(str: status, int: percent) + * status-started() + * status-finished() + * status-timeout() + * status-error() + * status-conffile() + """ # Seconds until a maintainer script will be regarded as hanging INSTALL_TIMEOUT = 5 * 60 - __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, - (gobject.TYPE_STRING, gobject.TYPE_INT)), - "status-started":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ()), - "status-timeout":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ()), - "status-error":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ()), - "status-conffile":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ()), - "status-finished":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ())} + __gsignals__ = {"status-changed": mksig((str, int)), + "status-started": mksig(), + "status-timeout": mksig(), + "status-error": mksig(), + "status-conffile": mksig(), + "status-finished": mksig()} def __init__(self, term): apt.progress.InstallProgress.__init__(self) @@ -89,32 +111,57 @@ class GInstallProgress(gobject.GObject, apt.progress.InstallProgress): self.term = term reaper = vte.reaper_get() reaper.connect("child-exited", self.childExited) - self.env = ["VTE_PTY_KEEP_FD=%s"% self.writefd, + self.env = ["VTE_PTY_KEEP_FD=%s" % self.writefd, "DEBIAN_FRONTEND=gnome", "APT_LISTCHANGES_FRONTEND=gtk"] self._context = glib.main_context_default() def childExited(self, term, pid, status): + """Called when a child process exits""" self.apt_status = os.WEXITSTATUS(status) self.finished = True def error(self, pkg, errormsg): + """Called when an error happens. + + Emits: status-error() + """ self.emit("status-error") def conffile(self, current, new): + """Called during conffile. + + Emits: status-conffile() + """ self.emit("status-conffile") def startUpdate(self): + """Called when the update starts. + + Emits: status-started() + """ self.emit("status-started") def finishUpdate(self): + """Called when the update finished. + + Emits: status-finished() + """ self.emit("status-finished") def statusChange(self, pkg, percent, status): + """Called when the status changed. + + Emits: status-changed(status, percent) + """ self.time_last_update = time.time() self.emit("status-changed", status, percent) def updateInterface(self): + """Called periodically to update the interface. + + Emits: status-timeout() [When a timeout happens] + """ apt.progress.InstallProgress.updateInterface(self) while self._context.pending(): self._context.iteration() @@ -122,34 +169,55 @@ class GInstallProgress(gobject.GObject, apt.progress.InstallProgress): self.emit("status-timeout") def fork(self): + """Fork the process.""" return self.term.forkpty(envv=self.env) def waitChild(self): + """Wait for the child process to exit.""" while not self.finished: self.updateInterface() return self.apt_status -class GDpkgInstallProgress(apt.progress.DpkgInstallProgress,GInstallProgress): +class GDpkgInstallProgress(apt.progress.DpkgInstallProgress, GInstallProgress): + """An InstallProgress for local installations. + + Signals: + + * status-changed(str: status, int: percent) + * status-started() - Not Implemented yet + * status-finished() + * status-timeout() - When the maintainer script hangs + * status-error() - When an error happens + * status-conffile() - On Conffile + """ def run(self, debfile): + """Install the given package.""" apt.progress.DpkgInstallProgress.run(self, debfile) def updateInterface(self): + """Called periodically to update the interface. + + Emits: status-timeout() [When a timeout happens]""" apt.progress.DpkgInstallProgress.updateInterface(self) if self.time_last_update + self.INSTALL_TIMEOUT < time.time(): self.emit("status-timeout") class GFetchProgress(gobject.GObject, apt.progress.FetchProgress): + """A Fetch Progress with GObject signals. - __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, - (gobject.TYPE_STRING, gobject.TYPE_INT)), - "status-started":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ()), - "status-finished":(gobject.SIGNAL_RUN_FIRST, - gobject.TYPE_NONE, ())} + Signals: + + * status-changed(str: description, int: percent) + * status-started() + * status-finished() + """ + + __gsignals__ = {"status-changed": mksig((str, int)), + "status-started": mksig(), + "status-finished": mksig()} def __init__(self): apt.progress.FetchProgress.__init__(self) @@ -170,17 +238,17 @@ class GFetchProgress(gobject.GObject, apt.progress.FetchProgress): apt.progress.FetchProgress.pulse(self) currentItem = self.currentItems + 1 if currentItem > self.totalItems: - currentItem = self.totalItems + currentItem = self.totalItems if self.currentCPS > 0: text = (_("Downloading file %(current)li of %(total)li with " "%(speed)s/s") % \ - {"current" : currentItem, - "total" : self.totalItems, - "speed" : humanize_size(self.currentCPS)}) + {"current": currentItem, + "total": self.totalItems, + "speed": apt_pkg.SizeToStr(self.currentCPS)}) else: text = (_("Downloading file %(current)li of %(total)li") % \ - {"current" : currentItem, - "total" : self.totalItems }) + {"current": currentItem, + "total": self.totalItems}) self.emit("status-changed", text, self.percent) while self._context.pending(): self._context.iteration() @@ -188,33 +256,12 @@ class GFetchProgress(gobject.GObject, apt.progress.FetchProgress): class GtkAptProgress(gtk.VBox): - """ + """Graphical progress for installation/fetch/operations. + This widget provides a progress bar, a terminal and a status bar for showing the progress of package manipulation tasks. - - A simple example code snippet to install/remove a package: - - import pygtk - pygtk.require('2.0') - import gtk - - import apt.widgets - - win = gtk.Window() - progress = apt.widgets.GtkAptProgress() - win.set_title("GtkAptProgress Demo") - win.add(progress) - progress.show() - win.show() - - cache = apt.cache.Cache(progress.open)) - cache["xterm"].markDelete() - progress.show_terminal(expanded=True) - cache.commit(progress.fetch), - progress.install) - - gtk.main() """ + def __init__(self): gtk.VBox.__init__(self) self.set_spacing(6) @@ -226,7 +273,7 @@ class GtkAptProgress(gtk.VBox): self._progressbar = gtk.ProgressBar() # Setup the always italic status label self._label = gtk.Label() - attr_list = pango.AttrList() + attr_list = pango.AttrList() attr_list.insert(pango.AttrStyle(pango.STYLE_ITALIC, 0, -1)) self._label.set_attributes(attr_list) self._label.set_ellipsize(pango.ELLIPSIZE_END) @@ -239,86 +286,80 @@ class GtkAptProgress(gtk.VBox): self._progress_open = GOpProgress() self._progress_open.connect("status-changed", self._on_status_changed) self._progress_open.connect("status-started", self._on_status_started) - self._progress_open.connect("status-finished", self._on_status_finished) + self._progress_open.connect("status-finished", + self._on_status_finished) self._progress_fetch = GFetchProgress() self._progress_fetch.connect("status-changed", self._on_status_changed) self._progress_fetch.connect("status-started", self._on_status_started) - self._progress_fetch.connect("status-finished", + self._progress_fetch.connect("status-finished", self._on_status_finished) self._progress_install = GInstallProgress(self._terminal) self._progress_install.connect("status-changed", self._on_status_changed) - self._progress_install.connect("status-started", + self._progress_install.connect("status-started", self._on_status_started) - self._progress_install.connect("status-finished", + self._progress_install.connect("status-finished", self._on_status_finished) - self._progress_install.connect("status-timeout", + self._progress_install.connect("status-timeout", self._on_status_timeout) - self._progress_install.connect("status-error", + self._progress_install.connect("status-error", self._on_status_timeout) - self._progress_install.connect("status-conffile", + self._progress_install.connect("status-conffile", self._on_status_timeout) self._progress_dpkg_install = GDpkgInstallProgress(self._terminal) self._progress_dpkg_install.connect("status-changed", self._on_status_changed) - self._progress_dpkg_install.connect("status-started", + self._progress_dpkg_install.connect("status-started", self._on_status_started) - self._progress_dpkg_install.connect("status-finished", + self._progress_dpkg_install.connect("status-finished", self._on_status_finished) - self._progress_dpkg_install.connect("status-timeout", + self._progress_dpkg_install.connect("status-timeout", self._on_status_timeout) - self._progress_dpkg_install.connect("status-error", + self._progress_dpkg_install.connect("status-error", self._on_status_timeout) - self._progress_dpkg_install.connect("status-conffile", + self._progress_dpkg_install.connect("status-conffile", self._on_status_timeout) def clear(self): - """ - Reset all status information - """ + """Reset all status information.""" self._label.set_label("") - self._progress.set_fraction(0) + self._progressbar.set_fraction(0) self._expander.set_expanded(False) @property def open(self): - """ - Return the cache opening progress handler. - """ + """Return the cache opening progress handler.""" return self._progress_open @property def install(self): - """ - Return the install progress handler - """ + """Return the install progress handler.""" return self._progress_install @property def dpkg_install(self): - """ - Return the install progress handler for dpkg - """ + """Return the install progress handler for dpkg.""" return self._dpkg_progress_install - + @property def fetch(self): - """ - Return the fetch progress handler - """ + """Return the fetch progress handler.""" return self._progress_fetch def _on_status_started(self, progress): + """Called when something starts.""" self._on_status_changed(progress, _("Starting..."), 0) while gtk.events_pending(): gtk.main_iteration() def _on_status_finished(self, progress): + """Called when something finished.""" self._on_status_changed(progress, _("Complete"), 100) while gtk.events_pending(): gtk.main_iteration() def _on_status_changed(self, progress, status, percent): + """Called when the status changed.""" self._label.set_text(status) if percent is None: self._progressbar.pulse() @@ -328,18 +369,18 @@ class GtkAptProgress(gtk.VBox): gtk.main_iteration() def _on_status_timeout(self, progress): - selt._expander.set_expanded(True) + """Called when timeout happens.""" + self._expander.set_expanded(True) while gtk.events_pending(): gtk.main_iteration() def cancel_download(self): - """ - Cancel a currently running download - """ + """Cancel a currently running download.""" self._progress_fetch.cancel() def show_terminal(self, expanded=False): - """ + """Show the expander for the terminal. + Show an expander with a terminal widget which provides a way to interact with dpkg """ @@ -350,31 +391,33 @@ class GtkAptProgress(gtk.VBox): gtk.main_iteration() def hide_terminal(self): - """ - Hide the expander with the terminal widget - """ + """Hide the expander with the terminal widget.""" self._expander.hide() while gtk.events_pending(): gtk.main_iteration() def show(self): + """Show the Box""" gtk.HBox.show(self) self._label.show() self._progressbar.show() while gtk.events_pending(): gtk.main_iteration() -if __name__ == "__main__": + +def _test(): + """Test function""" import sys - import debfile + + from apt.debfile import DebPackage win = gtk.Window() - apt_progress = GAptProgress() + apt_progress = GtkAptProgress() win.set_title("GtkAptProgress Demo") win.add(apt_progress) apt_progress.show() win.show() - cache = apt.cache.Cache(apt_progress.get_open_progress()) + cache = apt.cache.Cache(apt_progress.open) pkg = cache["xterm"] if pkg.isInstalled: pkg.markDelete() @@ -382,13 +425,16 @@ if __name__ == "__main__": pkg.markInstall() apt_progress.show_terminal(True) try: - cache.commit(apt_progress.get_fetch_progress(), - apt_progress.get_install_progress()) - except: - pass + cache.commit(apt_progress.fetch, apt_progress.install) + except Exception, exc: + print >> sys.stderr, "Exception happened:", exc if len(sys.argv) > 1: deb = DebPackage(sys.argv[1], cache) - deb.install(apt_progress.get_dpkg_install_progress()) + deb.install(apt_progress.dpkg_install) gtk.main() + +if __name__ == "__main__": + _test() + # vim: ts=4 et sts=4 diff --git a/apt/package.py b/apt/package.py index 70ddbb1a..7817c64c 100644 --- a/apt/package.py +++ b/apt/package.py @@ -1,73 +1,153 @@ # package.py - apt package abstraction -# +# # Copyright (c) 2005 Canonical -# +# # Author: Michael Vogt <michael.vogt@ubuntu.com> -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA - +"""Functionality related to packages.""" +import gettext import httplib import sys -import random import re import socket -import string import urllib2 import apt_pkg + +__all__ = 'BaseDependency', 'Dependency', 'Origin', 'Package', 'Record' + + # Set a timeout for the changelog download socket.setdefaulttimeout(2) -#from gettext import gettext as _ -import gettext -def _(s): return gettext.dgettext("python-apt", s) + +def _(string): + """Return the translation of the string.""" + return gettext.dgettext("python-apt", string) + class BaseDependency(object): - " a single dependency " + """A single dependency. + + Attributes defined here: + name - The name of the dependency + relation - The relation (>>,>=,==,<<,<=,) + version - The version depended on + preDepend - Boolean value whether this is a pre-dependency. + """ + def __init__(self, name, rel, ver, pre): self.name = name self.relation = rel self.version = ver self.preDepend = pre + class Dependency(object): + """Represent an Or-group of dependencies. + + Attributes defined here: + or_dependencies - The possible choices + """ + def __init__(self, alternatives): self.or_dependencies = alternatives + +class Origin(object): + """The origin of a version. + + Attributes defined here: + archive - The archive (eg. unstable) + component - The component (eg. main) + label - The Label, as set in the Release file + origin - The Origin, as set in the Release file + site - The hostname of the site. + trusted - Boolean value whether this is trustworthy. + """ + + def __init__(self, pkg, VerFileIter): + self.archive = VerFileIter.Archive + self.component = VerFileIter.Component + self.label = VerFileIter.Label + self.origin = VerFileIter.Origin + self.site = VerFileIter.Site + # check the trust + indexfile = pkg._list.FindIndex(VerFileIter) + if indexfile and indexfile.IsTrusted: + self.trusted = True + else: + self.trusted = False + + def __repr__(self): + return ("<Origin component:'%s' archive:'%s' origin:'%s' label:'%s'" + "site:'%s' isTrusted:'%s'>") % (self.component, self.archive, + self.origin, self.label, + self.site, self.trusted) + + class Record(object): - """ represents a pkgRecord, can be accessed like a - dictionary and gives the original package record - if accessed as a string """ - def __init__(self, s): - self._str = s - self._rec = apt_pkg.ParseSection(s) + """Represent a pkgRecord. + + It can be accessed like a dictionary and can also give the original package + record if accessed as a string. + """ + + def __init__(self, record_str): + self._rec = apt_pkg.ParseSection(record_str) + def __str__(self): - return self._str + return str(self._rec) + def __getitem__(self, key): - k = self._rec.get(key) - if k is None: - raise KeyError - return k + return self._rec[key] + + def __contains__(self, key): + return self._rec.has_key(key) + + def __iter__(self): + return iter(self._rec.keys()) + + def iteritems(self): + """An iterator over the (key, value) items of the record.""" + for key in self._rec.keys(): + yield key, self._rec[key] + + def get(self, key, default=None): + """Return record[key] if key in record, else `default`. + + The parameter `default` must be either a string or None. + """ + return self._rec.get(key, default) + def has_key(self, key): + """deprecated form of 'key in x'.""" return self._rec.has_key(key) + class Package(object): - """ This class represents a package in the cache + """Representation of a package in a cache. + + This class provides methods and properties for working with a package. It + lets you mark the package for installation, check if it is installed, and + much more. """ + def __init__(self, cache, depcache, records, sourcelist, pcache, pkgiter): """ Init the Package object """ self._cache = cache # low level cache @@ -77,175 +157,186 @@ class Package(object): self._list = sourcelist # sourcelist self._pcache = pcache # python cache in cache.py self._changelog = "" # Cached changelog - pass - # helper def _lookupRecord(self, UseCandidate=True): - """ internal helper that moves the Records to the right - position, must be called before _records is accessed """ + """Internal helper that moves the Records to the right position. + + Must be called before _records is accessed. + """ if UseCandidate: ver = self._depcache.GetCandidateVer(self._pkg) else: ver = self._pkg.CurrentVer # check if we found a version - if ver == None: - #print "No version for: %s (Candidate: %s)" % (self._pkg.Name, UseCandidate) + if ver is None: + #print "No version for: %s (Candidate: %s)" % (self._pkg.Name, + # UseCandidate) return False - - if ver.FileList == None: + + if ver.FileList is None: print "No FileList for: %s " % self._pkg.Name() return False f, index = ver.FileList.pop(0) - self._records.Lookup((f,index)) + self._records.Lookup((f, index)) return True - - # basic information (implemented as properties) - - # FIXME once python2.3 is dropped we can use @property instead - # of name = property(name) - + @property def name(self): - """ Return the name of the package """ + """Return the name of the package.""" return self._pkg.Name - name = property(name) + @property def id(self): - """ Return a uniq ID for the pkg, can be used to store - additional information about the pkg """ + """Return a uniq ID for the package. + + This can be used eg. to store additional information about the pkg.""" + return self._pkg.ID + + def __hash__(self): + """Return the hash of the object. + + This returns the same value as ID, which is unique.""" return self._pkg.ID - id = property(id) + @property def installedVersion(self): - """ Return the installed version as string """ + """Return the installed version as string.""" ver = self._pkg.CurrentVer - if ver != None: + if ver is not None: return ver.VerStr else: return None - installedVersion = property(installedVersion) + @property def candidateVersion(self): - """ Return the candidate version as string """ + """Return the candidate version as string.""" ver = self._depcache.GetCandidateVer(self._pkg) - if ver != None: + if ver is not None: return ver.VerStr else: return None - candidateVersion = property(candidateVersion) def _getDependencies(self, ver): + """Get the dependencies for a given version of a package.""" depends_list = [] depends = ver.DependsList for t in ["PreDepends", "Depends"]: - if not depends.has_key(t): - continue - for depVerList in depends[t]: - base_deps = [] - for depOr in depVerList: - base_deps.append(BaseDependency(depOr.TargetPkg.Name, depOr.CompType, depOr.TargetVer, (t == "PreDepends"))) - depends_list.append(Dependency(base_deps)) + try: + for depVerList in depends[t]: + base_deps = [] + for depOr in depVerList: + base_deps.append(BaseDependency(depOr.TargetPkg.Name, + depOr.CompType, depOr.TargetVer, + (t == "PreDepends"))) + depends_list.append(Dependency(base_deps)) + except KeyError: + pass return depends_list - + + @property def candidateDependencies(self): - """ return a list of candidate dependencies """ + """Return a list of candidate dependencies.""" candver = self._depcache.GetCandidateVer(self._pkg) - if candver == None: + if candver is None: return [] return self._getDependencies(candver) - candidateDependencies = property(candidateDependencies) - + + @property def installedDependencies(self): - """ return a list of installed dependencies """ + """Return a list of installed dependencies.""" ver = self._pkg.CurrentVer - if ver == None: + if ver is None: return [] return self._getDependencies(ver) - installedDependencies = property(installedDependencies) + @property def architecture(self): + """Return the Architecture of the package""" if not self._lookupRecord(): return None sec = apt_pkg.ParseSection(self._records.Record) - if sec.has_key("Architecture"): + try: return sec["Architecture"] - return None - architecture = property(architecture) + except KeyError: + return None def _downloadable(self, useCandidate=True): - """ helper, return if the version is downloadable """ + """Return True if the package is downloadable.""" if useCandidate: ver = self._depcache.GetCandidateVer(self._pkg) else: ver = self._pkg.CurrentVer - if ver == None: + if ver is None: return False return ver.Downloadable + + @property def candidateDownloadable(self): - " returns if the canidate is downloadable " - return self._downloadable(useCandidate=True) - candidateDownloadable = property(candidateDownloadable) + """Return True if the candidate is downloadable.""" + return self._downloadable(True) + @property def installedDownloadable(self): - " returns if the installed version is downloadable " - return self._downloadable(useCandidate=False) - installedDownloadable = property(installedDownloadable) + """Return True if the installed version is downloadable.""" + return self._downloadable(False) + @property def sourcePackageName(self): - """ Return the source package name as string """ + """Return the source package name as string.""" if not self._lookupRecord(): - if not self._lookupRecord(UseCandidate=False): + if not self._lookupRecord(False): return self._pkg.Name src = self._records.SourcePkg if src != "": return src else: return self._pkg.Name - sourcePackageName = property(sourcePackageName) + @property def homepage(self): - """ Return the homepage field as string """ + """Return the homepage field as string.""" if not self._lookupRecord(): return None return self._records.Homepage - homepage = property(homepage) + @property def section(self): - """ Return the section of the package""" + """Return the section of the package.""" return self._pkg.Section - section = property(section) + @property def priority(self): - """ Return the priority (of the candidate version)""" + """Return the priority (of the candidate version).""" ver = self._depcache.GetCandidateVer(self._pkg) if ver: return ver.PriorityStr else: return None - priority = property(priority) + @property def installedPriority(self): - """ Return the priority (of the installed version)""" + """Return the priority (of the installed version).""" ver = self._depcache.GetCandidateVer(self._pkg) if ver: return ver.PriorityStr else: return None - installedPriority = property(installedPriority) + @property def summary(self): - """ Return the short description (one line summary) """ + """Return the short description (one line summary).""" if not self._lookupRecord(): return "" ver = self._depcache.GetCandidateVer(self._pkg) desc_iter = ver.TranslatedDescription self._records.Lookup(desc_iter.FileList.pop(0)) return self._records.ShortDesc - summary = property(summary) + @property def description(self, format=True, useDots=False): - """ + """Return the formatted long description. + Return the formated long description according to the Debian policy (Chapter 5.6.13). See http://www.debian.org/doc/debian-policy/ch-controlfields.html @@ -261,14 +352,15 @@ class Package(object): self._records.Lookup(desc_iter.FileList.pop(0)) desc = "" try: - s = unicode(self._records.LongDesc,"utf-8") - except UnicodeDecodeError,e: - s = _("Invalid unicode in description for '%s' (%s). " - "Please report.") % (self.name,e) - lines = string.split(s, "\n") + dsc = unicode(self._records.LongDesc, "utf-8") + except UnicodeDecodeError, err: + dsc = _("Invalid unicode in description for '%s' (%s). " + "Please report.") % (self.name, err) + lines = dsc.split("\n") for i in range(len(lines)): # Skip the first line, since its a duplication of the summary - if i == 0: continue + if i == 0: + continue raw_line = lines[i] if raw_line.strip() == ".": # The line is just line break @@ -296,138 +388,145 @@ class Package(object): # Add current line to the description desc += line return desc - description = property(description) + @property def rawDescription(self): - """ return the long description (raw)""" + """return the long description (raw).""" if not self._lookupRecord(): return "" return self._records.LongDesc - rawDescription = property(rawDescription) - + + @property def candidateRecord(self): - " return the full pkgrecord as string of the candidate version " + """Return the Record of the candidate version of the package.""" if not self._lookupRecord(True): return None return Record(self._records.Record) - candidateRecord = property(candidateRecord) + @property def installedRecord(self): - " return the full pkgrecord as string of the installed version " + """Return the Record of the candidate version of the package.""" if not self._lookupRecord(False): return None return Record(self._records.Record) - installedRecord = property(installedRecord) # depcache states + + @property def markedInstall(self): - """ Package is marked for install """ + """Return True if the package is marked for install.""" return self._depcache.MarkedInstall(self._pkg) - markedInstall = property(markedInstall) + @property def markedUpgrade(self): - """ Package is marked for upgrade """ + """Return True if the package is marked for upgrade.""" return self._depcache.MarkedUpgrade(self._pkg) - markedUpgrade = property(markedUpgrade) + @property def markedDelete(self): - """ Package is marked for delete """ + """Return True if the package is marked for delete.""" return self._depcache.MarkedDelete(self._pkg) - markedDelete = property(markedDelete) + @property def markedKeep(self): - """ Package is marked for keep """ + """Return True if the package is marked for keep.""" return self._depcache.MarkedKeep(self._pkg) - markedKeep = property(markedKeep) + @property def markedDowngrade(self): """ Package is marked for downgrade """ return self._depcache.MarkedDowngrade(self._pkg) - markedDowngrade = property(markedDowngrade) + @property def markedReinstall(self): - """ Package is marked for reinstall """ + """Return True if the package is marked for reinstall.""" return self._depcache.MarkedReinstall(self._pkg) - markedReinstall = property(markedReinstall) + @property def isInstalled(self): - """ Package is installed """ - return (self._pkg.CurrentVer != None) - isInstalled = property(isInstalled) + """Return True if the package is installed.""" + return (self._pkg.CurrentVer is not None) + @property def isUpgradable(self): - """ Package is upgradable """ + """Return True if the package is upgradable.""" return self.isInstalled and self._depcache.IsUpgradable(self._pkg) - isUpgradable = property(isUpgradable) + @property def isAutoRemovable(self): - """ - Package is installed as a automatic dependency and is - no longer required + """Return True if the package is no longer required. + + If the package has been installed automatically as a dependency of + another package, and if no packages depend on it anymore, the package + is no longer required. """ return self.isInstalled and self._depcache.IsGarbage(self._pkg) - isAutoRemovable = property(isAutoRemovable) - # size + # sizes + + @property def packageSize(self): - """ The size of the candidate deb package """ + """Return the size of the candidate deb package.""" ver = self._depcache.GetCandidateVer(self._pkg) return ver.Size - packageSize = property(packageSize) + @property def installedPackageSize(self): - """ The size of the installed deb package """ + """Return the size of the installed deb package.""" ver = self._pkg.CurrentVer return ver.Size - installedPackageSize = property(installedPackageSize) + @property def candidateInstalledSize(self, UseCandidate=True): - """ The size of the candidate installed package """ + """Return the size of the candidate installed package.""" ver = self._depcache.GetCandidateVer(self._pkg) - candidateInstalledSize = property(candidateInstalledSize) + @property def installedSize(self): - """ The size of the currently installed package """ + """Return the size of the currently installed package.""" ver = self._pkg.CurrentVer if ver is None: return 0 return ver.InstalledSize - installedSize = property(installedSize) + @property def installedFiles(self): - """ - Return the list of unicode names of the files which have + """Return a list of files installed by the package. + + Return a list of unicode names of the files which have been installed by this package """ path = "/var/lib/dpkg/info/%s.list" % self.name try: - list = open(path) - files = list.read().decode().split("\n") - list.close() - except: + file_list = open(path) + try: + return file_list.read().decode().split("\n") + finally: + file_list.close() + except EnvironmentError: return [] - return files - installedFiles = property(installedFiles) def getChangelog(self, uri=None, cancel_lock=None): """ - Download the changelog of the package and return it as unicode - string - - uri: Is the uri to the changelog file. The following named variables - will be substituted: src_section, prefix, src_pkg and src_ver - For example the Ubuntu changelog: - uri = "http://changelogs.ubuntu.com/changelogs/pool" \\ - "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ - "/%(src_pkg)s_%(src_ver)s/changelog" - cancel_lock: If this threading.Lock() is set, the download will be - canceled + Download the changelog of the package and return it as unicode + string. + + The parameter `uri` refers to the uri of the changelog file. It may + contain multiple named variables which will be substitued. These + variables are (src_section, prefix, src_pkg, src_ver). An example is + the Ubuntu changelog: + "http://changelogs.ubuntu.com/changelogs/pool" \\ + "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ + "/%(src_pkg)s_%(src_ver)s/changelog" + + The parameter `cancel_lock` refers to an instance of threading.Lock, + which if set, prevents the download. """ # Return a cached changelog if available if self._changelog != "": return self._changelog - if uri == None: + if uri is None: if self.candidateOrigin[0].origin == "Debian": uri = "http://packages.debian.org/changelogs/pool" \ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \ @@ -442,7 +541,7 @@ class Package(object): # get the src package name src_pkg = self.sourcePackageName - # assume "main" section + # assume "main" section src_section = "main" # use the section of the candidate as a starting point section = self._depcache.GetCandidateVer(self._pkg).Section @@ -452,9 +551,10 @@ class Package(object): src_ver = self.candidateVersion #print "bin: %s" % binver try: + # FIXME: This try-statement is too long ... # try to get the source version of the pkg, this differs # for some (e.g. libnspr4 on ubuntu) - # this feature only works if the correct deb-src are in the + # this feature only works if the correct deb-src are in the # sources.list # otherwise we fall back to the binary version number src_records = apt_pkg.GetPkgSrcRecords() @@ -471,7 +571,7 @@ class Package(object): else: # fail into the error handler raise SystemError - except SystemError, e: + except SystemError: src_ver = bin_ver l = section.split("/") @@ -484,26 +584,27 @@ class Package(object): prefix = "lib" + src_pkg[3] # stip epoch - l = string.split(src_ver,":") + l = src_ver.split(":") if len(l) > 1: src_ver = "".join(l[1:]) - uri = uri % {"src_section" : src_section, - "prefix" : prefix, - "src_pkg" : src_pkg, - "src_ver" : src_ver} + uri = uri % {"src_section": src_section, + "prefix": prefix, + "src_pkg": src_pkg, + "src_ver": src_ver} try: # Check if the download was canceled - if cancel_lock and cancel_lock.isSet(): return "" + if cancel_lock and cancel_lock.isSet(): + return "" changelog_file = urllib2.urlopen(uri) # do only get the lines that are new changelog = "" regexp = "^%s \((.*)\)(.*)$" % (re.escape(src_pkg)) - i=0 while True: # Check if the download was canceled - if cancel_lock and cancel_lock.isSet(): return "" + if cancel_lock and cancel_lock.isSet(): + return "" # Read changelog line by line line_raw = changelog_file.readline() if line_raw == "": @@ -519,7 +620,7 @@ class Package(object): # and from changelog too installed = self.installedVersion if installed and ":" in installed: - installed = installed.split(":",1)[1] + installed = installed.split(":", 1)[1] changelog_ver = match.group(1) if changelog_ver and ":" in changelog_ver: changelog_ver = changelog_ver.split(":", 1)[1] @@ -533,58 +634,47 @@ class Package(object): if len(changelog) == 0: changelog = _("The list of changes is not available") self._changelog = changelog - except urllib2.HTTPError,e: + + # FIXME: Ubuntu-specific part. + except urllib2.HTTPError: return _("The list of changes is not available yet.\n\n" "Please use http://launchpad.net/ubuntu/+source/%s/%s/" "+changelog\n" "until the changes become available or try again " - "later.") % (srcpkg, srcver), - except IOError, httplib.BadStatusLine: - return _("Failed to download the list of changes. \nPlease " - "check your Internet connection.") + "later.") % (src_pkg, src_ver) + except (IOError, httplib.BadStatusLine): + return _("Failed to download the list of changes. \nPlease " + "check your Internet connection.") return self._changelog - # canidate origin - class Origin: - def __init__(self, pkg, VerFileIter): - self.component = VerFileIter.Component - self.archive = VerFileIter.Archive - self.origin = VerFileIter.Origin - self.label = VerFileIter.Label - self.site = VerFileIter.Site - # check the trust - indexfile = pkg._list.FindIndex(VerFileIter) - if indexfile and indexfile.IsTrusted: - self.trusted = True - else: - self.trusted = False - def __repr__(self): - return "component: '%s' archive: '%s' origin: '%s' label: '%s' " \ - "site '%s' isTrusted: '%s'"% (self.component, self.archive, - self.origin, self.label, - self.site, self.trusted) - + @property def candidateOrigin(self): + """Return the Origin() of the candidate version.""" ver = self._depcache.GetCandidateVer(self._pkg) if not ver: return None origins = [] - for (verFileIter,index) in ver.FileList: - origins.append(self.Origin(self, verFileIter)) + for (verFileIter, index) in ver.FileList: + origins.append(Origin(self, verFileIter)) return origins - candidateOrigin = property(candidateOrigin) # depcache actions + def markKeep(self): - """ mark a package for keep """ + """Mark a package for keep.""" self._pcache.cachePreChange() self._depcache.MarkKeep(self._pkg) self._pcache.cachePostChange() + def markDelete(self, autoFix=True, purge=False): - """ mark a package for delete. Run the resolver if autoFix is set. - Mark the package as purge (remove with configuration) if 'purge' - is set. - """ + """Mark a package for install. + + If autoFix is True, the resolver will be run, trying to fix broken + packages. This is the default. + + If purge is True, remove the configuration files of the package as + well. The default is to keep the configuration. + """ self._pcache.cachePreChange() self._depcache.MarkDelete(self._pkg, purge) # try to fix broken stuffsta @@ -596,10 +686,20 @@ class Package(object): Fix.InstallProtect() Fix.Resolve() self._pcache.cachePostChange() + def markInstall(self, autoFix=True, autoInst=True, fromUser=True): - """ mark a package for install. Run the resolver if autoFix is set, - automatically install required dependencies if autoInst is set - record it as automatically installed when fromuser is set to false + """Mark a package for install. + + If autoFix is True, the resolver will be run, trying to fix broken + packages. This is the default. + + If autoInst is True, the dependencies of the packages will be installed + automatically. This is the default. + + If fromUser is True, this package will not be marked as automatically + installed. This is the default. Set it to False if you want to be able + to remove the package at a later stage if no other package depends on + it. """ self._pcache.cachePreChange() self._depcache.MarkInstall(self._pkg, autoInst, fromUser) @@ -610,24 +710,33 @@ class Package(object): fixer.Protect(self._pkg) fixer.Resolve(True) self._pcache.cachePostChange() + def markUpgrade(self): - """ mark a package for upgrade """ + """Mark a package for upgrade.""" if self.isUpgradable: self.markInstall() else: # FIXME: we may want to throw a exception here - sys.stderr.write("MarkUpgrade() called on a non-upgrable pkg: '%s'\n" %self._pkg.Name) + sys.stderr.write(("MarkUpgrade() called on a non-upgrable pkg: " + "'%s'\n") % self._pkg.Name) def commit(self, fprogress, iprogress): - """ commit the changes, need a FetchProgress and InstallProgress - object as argument + """Commit the changes. + + The parameter `fprogress` refers to a FetchProgress() object, as + found in apt.progress. + + The parameter `iprogress` refers to an InstallProgress() object, as + found in apt.progress. """ self._depcache.Commit(fprogress, iprogress) - -# self-test -if __name__ == "__main__": + +def _test(): + """Self-test.""" print "Self-test for the Package modul" + import random + import apt apt_pkg.init() cache = apt_pkg.GetCache() depcache = apt_pkg.GetDepCache(cache) @@ -653,35 +762,38 @@ if __name__ == "__main__": print "PackageSize: %s " % pkg.packageSize print "Dependencies: %s" % pkg.installedDependencies for dep in pkg.candidateDependencies: - print ",".join(["%s (%s) (%s) (%s)" % (o.name,o.version,o.relation, o.preDepend) for o in dep.or_dependencies]) + print ",".join("%s (%s) (%s) (%s)" % (o.name, o.version, o.relation, + o.preDepend) for o in dep.or_dependencies) print "arch: %s" % pkg.architecture print "homepage: %s" % pkg.homepage - print "rec: ",pkg.candidateRecord + print "rec: ", pkg.candidateRecord # now test install/remove - import apt progress = apt.progress.OpTextProgress() cache = apt.Cache(progress) - for i in [True, False]: + for i in True, False: print "Running install on random upgradable pkgs with AutoFix: %s " % i - for name in cache.keys(): - pkg = cache[name] + for pkg in cache: if pkg.isUpgradable: - if random.randint(0,1) == 1: + if random.randint(0, 1) == 1: pkg.markInstall(i) print "Broken: %s " % cache._depcache.BrokenCount print "InstCount: %s " % cache._depcache.InstCount print # get a new cache - for i in [True, False]: + for i in True, False: print "Randomly remove some packages with AutoFix: %s" % i cache = apt.Cache(progress) for name in cache.keys(): - if random.randint(0,1) == 1: + if random.randint(0, 1) == 1: try: cache[name].markDelete(i) except SystemError: print "Error trying to remove: %s " % name print "Broken: %s " % cache._depcache.BrokenCount print "DelCount: %s " % cache._depcache.DelCount + +# self-test +if __name__ == "__main__": + _test() diff --git a/apt/progress.py b/apt/progress.py index a8ab76b6..51eb2426 100644 --- a/apt/progress.py +++ b/apt/progress.py @@ -1,61 +1,82 @@ # Progress.py - progress reporting classes -# +# # Copyright (c) 2005 Canonical -# +# # Author: Michael Vogt <michael.vogt@ubuntu.com> -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA +"""progress reporting classes. -import sys +This module provides classes for progress reporting. They can be used with +e.g., for reporting progress on the cache opening process, the cache update +progress, or the package install progress. +""" + +import errno +import fcntl import os import re -import fcntl -import string -from errno import * import select +import sys + import apt_pkg -import apt + +__all__ = ('CdromProgress', 'DpkgInstallProgress', 'DumbInstallProgress', + 'FetchProgress', 'InstallProgress', 'OpProgress', 'OpTextProgress', + 'TextFetchProgress') + class OpProgress(object): - """ Abstract class to implement reporting on cache opening - Subclass this class to implement simple Operation progress reporting + """Abstract class to implement reporting on cache opening. + + Subclass this class to implement simple Operation progress reporting. """ + def __init__(self): - pass + self.op = None + self.subOp = None + def update(self, percent): - pass + """Called periodically to update the user interface.""" + def done(self): - pass + """Called once an operation has been completed.""" + class OpTextProgress(OpProgress): - """ A simple text based cache open reporting class """ + """A simple text based cache open reporting class.""" + def __init__(self): OpProgress.__init__(self) + def update(self, percent): - sys.stdout.write("\r%s: %.2i " % (self.subOp,percent)) + """Called periodically to update the user interface.""" + sys.stdout.write("\r%s: %.2i " % (self.subOp, percent)) sys.stdout.flush() + def done(self): + """Called once an operation has been completed.""" sys.stdout.write("\r%s: Done\n" % self.op) - class FetchProgress(object): - """ Report the download/fetching progress - Subclass this class to implement fetch progress reporting + """Report the download/fetching progress. + + Subclass this class to implement fetch progress reporting """ # download status constants @@ -64,46 +85,71 @@ class FetchProgress(object): dlFailed = 2 dlHit = 3 dlIgnored = 4 - dlStatusStr = {dlDone : "Done", - dlQueued : "Queued", - dlFailed : "Failed", - dlHit : "Hit", - dlIgnored : "Ignored"} - + dlStatusStr = {dlDone: "Done", + dlQueued: "Queued", + dlFailed: "Failed", + dlHit: "Hit", + dlIgnored: "Ignored"} + def __init__(self): self.eta = 0.0 self.percent = 0.0 - pass - + # Make checking easier + self.currentBytes = 0 + self.currentItems = 0 + self.totalBytes = 0 + self.totalItems = 0 + self.currentCPS = 0 + def start(self): - pass - + """Called when the fetching starts.""" + def stop(self): - pass - + """Called when all files have been fetched.""" + def updateStatus(self, uri, descr, shortDescr, status): - pass + """Called when the status of an item changes. + + This happens eg. when the downloads fails or is completed. + """ def pulse(self): - """ called periodically (to update the gui), importend to - return True to continue or False to cancel + """Called periodically to update the user interface. + + Return True to continue or False to cancel. """ - self.percent = ((self.currentBytes + self.currentItems)*100.0)/float(self.totalBytes+self.totalItems) + self.percent = (((self.currentBytes + self.currentItems) * 100.0) / + float(self.totalBytes + self.totalItems)) if self.currentCPS > 0: - self.eta = (self.totalBytes-self.currentBytes)/float(self.currentCPS) + self.eta = ((self.totalBytes - self.currentBytes) / + float(self.currentCPS)) return True + def mediaChange(self, medium, drive): - pass + """react to media change events.""" + class TextFetchProgress(FetchProgress): """ Ready to use progress object for terminal windows """ + def __init__(self): + FetchProgress.__init__(self) self.items = {} + def updateStatus(self, uri, descr, shortDescr, status): + """Called when the status of an item changes. + + This happens eg. when the downloads fails or is completed. + """ if status != self.dlQueued: print "\r%s %s" % (self.dlStatusStr[status], descr) self.items[uri] = status + def pulse(self): + """Called periodically to update the user interface. + + Return True to continue or False to cancel. + """ FetchProgress.pulse(self) if self.currentCPS > 0: s = "[%2.f%%] %sB/s %s" % (self.percent, @@ -114,102 +160,121 @@ class TextFetchProgress(FetchProgress): print "\r%s" % (s), sys.stdout.flush() return True + def stop(self): - print "\rDone downloading " + """Called when all files have been fetched.""" + print "\rDone downloading " + def mediaChange(self, medium, drive): - """ react to media change events """ - res = True; - print "Media change: please insert the disc labeled \ - '%s' in the drive '%s' and press enter" % (medium,drive) - s = sys.stdin.readline() - if(s == 'c' or s == 'C'): - res = false; - return res + """react to media change events.""" + print ("Media change: please insert the disc labeled " + "'%s' in the drive '%s' and press enter") % (medium, drive) + + return raw_input() not in ('c', 'C') + class DumbInstallProgress(object): - """ Report the install progress - Subclass this class to implement install progress reporting + """Report the install progress. + + Subclass this class to implement install progress reporting. """ - def __init__(self): - pass + def startUpdate(self): - pass + """Start update.""" + def run(self, pm): + """Start installation.""" return pm.DoInstall() + def finishUpdate(self): - pass + """Called when update has finished.""" + def updateInterface(self): - pass + """Called periodically to update the user interface""" + class InstallProgress(DumbInstallProgress): - """ A InstallProgress that is pretty useful. - It supports the attributes 'percent' 'status' and callbacks - for the dpkg errors and conffiles and status changes - """ + """An InstallProgress that is pretty useful. + + It supports the attributes 'percent' 'status' and callbacks for the dpkg + errors and conffiles and status changes. + """ + def __init__(self): DumbInstallProgress.__init__(self) self.selectTimeout = 0.1 (read, write) = os.pipe() - self.writefd=write + self.writefd = write self.statusfd = os.fdopen(read, "r") - fcntl.fcntl(self.statusfd.fileno(), fcntl.F_SETFL,os.O_NONBLOCK) + fcntl.fcntl(self.statusfd.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) self.read = "" self.percent = 0.0 self.status = "" + def error(self, pkg, errormsg): - " called when a error is detected during the install " - pass - def conffile(self,current,new): - " called when a conffile question from dpkg is detected " - pass + """Called when a error is detected during the install.""" + + def conffile(self, current, new): + """Called when a conffile question from dpkg is detected.""" + def statusChange(self, pkg, percent, status): - " called when the status changed " - pass + """Called when the status changed.""" + def updateInterface(self): - if self.statusfd != None: - try: - while not self.read.endswith("\n"): - self.read += os.read(self.statusfd.fileno(),1) - except OSError, (errno,errstr): - # resource temporarly unavailable is ignored - if errno != EAGAIN and errnor != EWOULDBLOCK: - print errstr - if self.read.endswith("\n"): - s = self.read - #print s - try: - (status, pkg, percent, status_str) = string.split(s, ":",3) - except ValueError, e: - # silently ignore lines that can't be parsed - self.read = "" - return - #print "percent: %s %s" % (pkg, float(percent)/100.0) - if status == "pmerror": - self.error(pkg,status_str) - elif status == "pmconffile": - # we get a string like this: - # 'current-conffile' 'new-conffile' useredited distedited - match = re.compile("\s*\'(.*)\'\s*\'(.*)\'.*").match(status_str) - if match: - self.conffile(match.group(1), match.group(2)) - elif status == "pmstatus": - if float(percent) != self.percent or \ - status_str != self.status: - self.statusChange(pkg, float(percent), status_str.strip()) - self.percent = float(percent) - self.status = string.strip(status_str) - self.read = "" + """Called periodically to update the interface.""" + if self.statusfd is None: + return + try: + while not self.read.endswith("\n"): + self.read += os.read(self.statusfd.fileno(), 1) + except OSError, (errno_, errstr): + # resource temporarly unavailable is ignored + if errno_ != errno.EAGAIN and errno_ != errno.EWOULDBLOCK: + print errstr + if not self.read.endswith("\n"): + return + + s = self.read + #print s + try: + (status, pkg, percent, status_str) = s.split(":", 3) + except ValueError: + # silently ignore lines that can't be parsed + self.read = "" + return + #print "percent: %s %s" % (pkg, float(percent)/100.0) + if status == "pmerror": + self.error(pkg, status_str) + elif status == "pmconffile": + # we get a string like this: + # 'current-conffile' 'new-conffile' useredited distedited + match = re.match("\s*\'(.*)\'\s*\'(.*)\'.*", status_str) + if match: + self.conffile(match.group(1), match.group(2)) + elif status == "pmstatus": + if float(percent) != self.percent or status_str != self.status: + self.statusChange(pkg, float(percent), + status_str.strip()) + self.percent = float(percent) + self.status = status_str.strip() + self.read = "" + def fork(self): + """Fork.""" return os.fork() + def waitChild(self): + """Wait for child progress to exit.""" while True: - select.select([self.statusfd],[],[], self.selectTimeout) + select.select([self.statusfd], [], [], self.selectTimeout) self.updateInterface() - (pid, res) = os.waitpid(self.child_pid,os.WNOHANG) + (pid, res) = os.waitpid(self.child_pid, os.WNOHANG) if pid == self.child_pid: break return res + def run(self, pm): + """Start installing.""" pid = self.fork() if pid == 0: # child @@ -219,29 +284,31 @@ class InstallProgress(DumbInstallProgress): res = self.waitChild() return os.WEXITSTATUS(res) -class CdromProgress: - """ Report the cdrom add progress - Subclass this class to implement cdrom add progress reporting + +class CdromProgress(object): + """Report the cdrom add progress. + + Subclass this class to implement cdrom add progress reporting. """ + def __init__(self): pass + def update(self, text, step): - """ update is called regularly so that the gui can be redrawn """ - pass + """Called periodically to update the user interface.""" + def askCdromName(self): - pass + """Called to ask for the name of the cdrom.""" + def changeCdrom(self): - pass + """Called to ask for the cdrom to be changed.""" class DpkgInstallProgress(InstallProgress): - """ - Progress handler for a local Debian package installation - """ + """Progress handler for a local Debian package installation.""" + def run(self, debfile): - """ - Start installing the given Debian package - """ + """Start installing the given Debian package.""" self.debfile = debfile self.debname = os.path.basename(debfile).split("_")[0] pid = self.fork() @@ -255,46 +322,35 @@ class DpkgInstallProgress(InstallProgress): return res def updateInterface(self): - """ - Process status messages from dpkg - """ - if self.statusfd != None: - while True: - try: - self.read += os.read(self.statusfd.fileno(),1) - except OSError, (errno,errstr): - # resource temporarly unavailable is ignored - if errno != 11: - print errstr - break - if self.read.endswith("\n"): - statusl = string.split(self.read, ":") - if len(statusl) < 3: - print "got garbage from dpkg: '%s'" % read - self.read = "" - break - status = statusl[2].strip() - #print status - if status == "error": - self.error(self.debname, status) - elif status == "conffile-prompt": - # we get a string like this: - # 'current-conffile' 'new-conffile' useredited distedited - match = re.compile("\s*\'(.*)\'\s*\'(.*)\'.*").match(status_str) - if match: - self.conffile(match.group(1), match.group(2)) - else: - self.status = status - self.read = "" - -# module test code -if __name__ == "__main__": - import apt_pkg - apt_pkg.init() - progress = OpTextProgress() - cache = apt_pkg.GetCache(progress) - depcache = apt_pkg.GetDepCache(cache) - depcache.Init(progress) - - fprogress = TextFetchProgress() - cache.Update(fprogress) + """Process status messages from dpkg.""" + if self.statusfd is None: + return + while True: + try: + self.read += os.read(self.statusfd.fileno(), 1) + except OSError, (errno_, errstr): + # resource temporarly unavailable is ignored + if errno_ != 11: + print errstr + break + if not self.read.endswith("\n"): + continue + + statusl = self.read.split(":") + if len(statusl) < 3: + print "got garbage from dpkg: '%s'" % self.read + self.read = "" + break + status = statusl[2].strip() + #print status + if status == "error": + self.error(self.debname, status) + elif status == "conffile-prompt": + # we get a string like this: + # 'current-conffile' 'new-conffile' useredited distedited + match = re.match("\s*\'(.*)\'\s*\'(.*)\'.*", statusl[3]) + if match: + self.conffile(match.group(1), match.group(2)) + else: + self.status = status + self.read = "" |
