summaryrefslogtreecommitdiff
path: root/apt
diff options
context:
space:
mode:
authorMichael Vogt <michael.vogt@ubuntu.com>2009-01-13 17:22:27 +0100
committerMichael Vogt <michael.vogt@ubuntu.com>2009-01-13 17:22:27 +0100
commit38d602dc83006c51dfe4ed594d691ea9b0679498 (patch)
treeb7aedfba82c44cad6c3012f879b5d6d7e8ad1425 /apt
parent12cf58d12b969010f3d98b2974d72bbb950b775f (diff)
parent614897f798d9f16591fbd29ebe2a6c5674102d2d (diff)
downloadpython-apt-38d602dc83006c51dfe4ed594d691ea9b0679498.tar.gz
* apt/*.py:
- Almost complete cleanup of the code - Remove inconsistent use of tabs and spaces (Closes: #505443) - Improved documentation * apt/debfile.py: - Drop get*() methods, as they are deprecated and were never in a stable release - Make DscSrcPackage working * apt/gtk/widgets.py: - Fix the code and document the signals * Introduce new documentation build with Sphinx - Contains style Guide (Closes: #481562) - debian/rules: Build the documentation here - setup.py: Remove pydoc building and add new docs. - debian/examples: Include examples from documentation - debian/python-apt.docs: + Change html/ to build/doc/html. + Add build/doc/text for the text-only documentation * setup.py: - Only create build/data when building, not all the time - Remove build/mo and build/data on clean -a * debian/control: - Remove the Conflicts on python2.3-apt, python2.4-apt, as they are only needed for oldstable (sarge) - Build-Depend on python-sphinx (>= 0.5) * aptsources/distinfo.py: - Allow @ in mirror urls (Closes: #478171) (LP: #223097) * Merge Ben Finney's whitespace changes (Closes: #481563) * Merge Ben Finney's do not use has_key() (Closes: #481878) * Do not use deprecated form of raise statement (Closes: #494259) * Add support for PkgRecords.SHA256Hash (Closes: #456113)
Diffstat (limited to 'apt')
-rw-r--r--apt/README.apt8
-rw-r--r--apt/__init__.py3
-rw-r--r--apt/cache.py161
-rw-r--r--apt/cdrom.py81
-rw-r--r--apt/debfile.py549
-rw-r--r--apt/gtk/widgets.py306
-rw-r--r--apt/package.py550
-rw-r--r--apt/progress.py376
8 files changed, 1156 insertions, 878 deletions
diff --git a/apt/README.apt b/apt/README.apt
index 30166a24..2a017bde 100644
--- a/apt/README.apt
+++ b/apt/README.apt
@@ -12,10 +12,4 @@ WARNING !!! The API is not 100% stable yet !!!
Style Guides:
-------------
-
-Follow PEP08.
-
-Internal variables/methods are prefixed with a "_" (e.g. _foo).
-
-
-
+See ../doc/source/coding.rst
diff --git a/apt/__init__.py b/apt/__init__.py
index c6a2ff39..05407aff 100644
--- a/apt/__init__.py
+++ b/apt/__init__.py
@@ -6,7 +6,8 @@ import os
# import some fancy classes
from apt.package import Package
from apt.cache import Cache
-from apt.progress import OpProgress, FetchProgress, InstallProgress, CdromProgress
+from apt.progress import (
+ OpProgress, FetchProgress, InstallProgress, CdromProgress)
from apt.cdrom import Cdrom
from apt_pkg import SizeToStr, TimeToStr, VersionCompare
diff --git a/apt/cache.py b/apt/cache.py
index 79e58282..0065d14c 100644
--- a/apt/cache.py
+++ b/apt/cache.py
@@ -1,62 +1,68 @@
# cache.py - apt cache abstraction
-#
+#
# Copyright (c) 2005 Canonical
-#
+#
# Author: Michael Vogt <michael.vogt@ubuntu.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
-#
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
+#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
+import os
+import sys
+
import apt_pkg
from apt import Package
import apt.progress
-import os
-import sys
+
class FetchCancelledException(IOError):
- " Exception that is thrown when the user cancels a fetch operation "
- pass
+ """Exception that is thrown when the user cancels a fetch operation."""
+
+
class FetchFailedException(IOError):
- " Exception that is thrown when fetching fails "
- pass
+ """Exception that is thrown when fetching fails."""
+
+
class LockFailedException(IOError):
- " Exception that is thrown when locking fails "
- pass
+ """Exception that is thrown when locking fails."""
+
class Cache(object):
- """ Dictionary-like package cache
- This class has all the packages that are available in it's
- dictionary
+ """Dictionary-like package cache.
+
+ This class has all the packages that are available in it's
+ dictionary
"""
def __init__(self, progress=None, rootdir=None, memonly=False):
self._callbacks = {}
if memonly:
# force apt to build its caches in memory
- apt_pkg.Config.Set("Dir::Cache::pkgcache","")
+ apt_pkg.Config.Set("Dir::Cache::pkgcache", "")
if rootdir:
apt_pkg.Config.Set("Dir", rootdir)
- apt_pkg.Config.Set("Dir::State::status", rootdir + "/var/lib/dpkg/status")
+ apt_pkg.Config.Set("Dir::State::status",
+ rootdir + "/var/lib/dpkg/status")
self.open(progress)
def _runCallbacks(self, name):
""" internal helper to run a callback """
- if self._callbacks.has_key(name):
+ if name in self._callbacks:
for callback in self._callbacks[name]:
callback()
-
+
def open(self, progress):
""" Open the package cache, after that it can be used like
a dictionary
@@ -70,12 +76,12 @@ class Cache(object):
self._dict = {}
# build the packages dict
- if progress != None:
+ if progress is not None:
progress.Op = "Building data structures"
i=last=0
size=len(self._cache.Packages)
for pkg in self._cache.Packages:
- if progress != None and last+100 < i:
+ if progress is not None and last+100 < i:
progress.update(i/float(size)*100)
last=i
# drop stuff with no versions (cruft)
@@ -83,12 +89,12 @@ class Cache(object):
self._dict[pkg.Name] = Package(self._cache, self._depcache,
self._records, self._list,
self, pkg)
-
+
i += 1
- if progress != None:
+ if progress is not None:
progress.done()
self._runCallbacks("cache_post_open")
-
+
def __getitem__(self, key):
""" look like a dictionary (get key) """
return self._dict[key]
@@ -99,10 +105,10 @@ class Cache(object):
raise StopIteration
def has_key(self, key):
- return self._dict.has_key(key)
+ return (key in self._dict)
def __contains__(self, key):
- return key in self._dict
+ return (key in self._dict)
def __len__(self):
return len(self._dict)
@@ -112,7 +118,7 @@ class Cache(object):
def getChanges(self):
""" Get the marked changes """
- changes = []
+ changes = []
for name in self._dict.keys():
p = self._dict[name]
if p.markedUpgrade or p.markedInstall or p.markedDelete or \
@@ -130,21 +136,23 @@ class Cache(object):
@property
def requiredDownload(self):
- """ get the size of the packages that are required to download """
+ """Get the size of the packages that are required to download."""
pm = apt_pkg.GetPackageManager(self._depcache)
fetcher = apt_pkg.GetAcquire()
pm.GetArchives(fetcher, self._list, self._records)
return fetcher.FetchNeeded
+
@property
def additionalRequiredSpace(self):
- """ get the size of the additional required space on the fs """
+ """Get the size of the additional required space on the fs."""
return self._depcache.UsrSize
+
@property
def reqReinstallPkgs(self):
- " return the packages not downloadable packages in reqreinst state "
+ """Return the packages not downloadable packages in reqreinst state."""
reqreinst = set()
for pkg in self:
- if (not pkg.candidateDownloadable and
+ if (not pkg.candidateDownloadable and
(pkg._pkg.InstState == apt_pkg.InstStateReInstReq or
pkg._pkg.InstState == apt_pkg.InstStateHoldReInstReq)):
reqreinst.add(pkg.name)
@@ -153,7 +161,7 @@ class Cache(object):
def _runFetcher(self, fetcher):
# do the actual fetching
res = fetcher.Run()
-
+
# now check the result (this is the code from apt-get.cc)
failed = False
transient = False
@@ -164,14 +172,15 @@ class Cache(object):
if item.StatIdle:
transient = True
continue
- errMsg += "Failed to fetch %s %s\n" % (item.DescURI,item.ErrorText)
+ errMsg += "Failed to fetch %s %s\n" % (item.DescURI,
+ item.ErrorText)
failed = True
# we raise a exception if the download failed or it was cancelt
if res == fetcher.ResultCancelled:
- raise FetchCancelledException, errMsg
+ raise FetchCancelledException(errMsg)
elif failed:
- raise FetchFailedException, errMsg
+ raise FetchFailedException(errMsg)
return res
def _fetchArchives(self, fetcher, pm):
@@ -181,7 +190,7 @@ class Cache(object):
lockfile = apt_pkg.Config.FindDir("Dir::Cache::Archives") + "lock"
lock = apt_pkg.GetLock(lockfile)
if lock < 0:
- raise LockFailedException, "Failed to lock %s" % lockfile
+ raise LockFailedException("Failed to lock %s" % lockfile)
try:
# this may as well throw a SystemError exception
@@ -193,6 +202,11 @@ class Cache(object):
finally:
os.close(lock)
+ def isVirtualPackage(self, pkgname):
+ """Return whether the package is a virtual package."""
+ pkg = self._cache[pkgname]
+ return bool(pkg.ProvidesList and not pkg.VersionList)
+
def getProvidingPackages(self, virtual):
"""
Return a list of packages which provide the virtual package of the
@@ -207,7 +221,7 @@ class Cache(object):
return providers
for pkg in self:
v = self._depcache.GetCandidateVer(pkg._pkg)
- if v == None:
+ if v is None:
continue
for p in v.ProvidesList:
if virtual == p[0]:
@@ -220,21 +234,21 @@ class Cache(object):
lockfile = apt_pkg.Config.FindDir("Dir::State::Lists") + "lock"
lock = apt_pkg.GetLock(lockfile)
if lock < 0:
- raise LockFailedException, "Failed to lock %s" % lockfile
+ raise LockFailedException("Failed to lock %s" % lockfile)
try:
- if fetchProgress == None:
+ if fetchProgress is None:
fetchProgress = apt.progress.FetchProgress()
return self._cache.Update(fetchProgress, self._list)
finally:
os.close(lock)
-
+
def installArchives(self, pm, installProgress):
installProgress.startUpdate()
res = installProgress.run(pm)
installProgress.finishUpdate()
return res
-
+
def commit(self, fetchProgress=None, installProgress=None):
""" Apply the marked changes to the cache """
# FIXME:
@@ -244,9 +258,9 @@ class Cache(object):
# Current a failed download will just display "error"
# which is less than optimal!
- if fetchProgress == None:
+ if fetchProgress is None:
fetchProgress = apt.progress.FetchProgress()
- if installProgress == None:
+ if installProgress is None:
installProgress = apt.progress.InstallProgress()
pm = apt_pkg.GetPackageManager(self._depcache)
@@ -260,16 +274,17 @@ class Cache(object):
if res == pm.ResultCompleted:
break
if res == pm.ResultFailed:
- raise SystemError, "installArchives() failed"
+ raise SystemError("installArchives() failed")
# reload the fetcher for media swaping
fetcher.Shutdown()
return (res == pm.ResultCompleted)
def clear(self):
- """ Unmark all changes """
- self._depcache.Init()
+ """ Unmark all changes """
+ self._depcache.Init()
# cache changes
+
def cachePostChange(self):
" called internally if the cache has changed, emit a signal then "
self._runCallbacks("cache_post_change")
@@ -282,34 +297,42 @@ class Cache(object):
def connect(self, name, callback):
""" connect to a signal, currently only used for
cache_{post,pre}_{changed,open} """
- if not self._callbacks.has_key(name):
+ if not name in self._callbacks:
self._callbacks[name] = []
self._callbacks[name].append(callback)
+
# ----------------------------- experimental interface
+
+
class Filter(object):
""" Filter base class """
+
def apply(self, pkg):
""" Filter function, return True if the package matchs a
filter criteria and False otherwise
"""
return True
+
class MarkedChangesFilter(Filter):
""" Filter that returns all marked changes """
+
def apply(self, pkg):
if pkg.markedInstall or pkg.markedDelete or pkg.markedUpgrade:
return True
else:
return False
+
class FilteredCache(object):
""" A package cache that is filtered.
Can work on a existing cache or create a new one
"""
+
def __init__(self, cache=None, progress=None):
- if cache == None:
+ if cache is None:
self.cache = Cache(progress)
else:
self.cache = cache
@@ -317,17 +340,25 @@ class FilteredCache(object):
self.cache.connect("cache_post_open", self.filterCachePostChange)
self._filtered = {}
self._filters = []
+
def __len__(self):
return len(self._filtered)
-
+
def __getitem__(self, key):
return self.cache._dict[key]
+ def __iter__(self):
+ for pkgname in self._filtered:
+ yield self.cache[pkgname]
+
def keys(self):
return self._filtered.keys()
def has_key(self, key):
- return self._filtered.has_key(key)
+ return (key in self._filtered)
+
+ def __contains__(self, key):
+ return (key in self._filtered)
def _reapplyFilter(self):
" internal helper to refilter "
@@ -337,9 +368,9 @@ class FilteredCache(object):
if f.apply(self.cache._dict[pkg]):
self._filtered[pkg] = 1
break
-
+
def setFilter(self, filter):
- " set the current active filter "
+ """Set the current active filter."""
self._filters = []
self._filters.append(filter)
#self._reapplyFilter()
@@ -347,7 +378,7 @@ class FilteredCache(object):
self.cache.cachePostChange()
def filterCachePostChange(self):
- " called internally if the cache changes, emit a signal then "
+ """Called internally if the cache changes, emit a signal then."""
#print "filterCachePostChange()"
self._reapplyFilter()
@@ -355,17 +386,15 @@ class FilteredCache(object):
# self.cache.connect(name, callback)
def __getattr__(self, key):
- " we try to look exactly like a real cache "
+ """we try to look exactly like a real cache."""
#print "getattr: %s " % key
- if self.__dict__.has_key(key):
- return self.__dict__[key]
- else:
- return getattr(self.cache, key)
-
+ return getattr(self.cache, key)
+
def cache_pre_changed():
print "cache pre changed"
+
def cache_post_changed():
print "cache post changed"
@@ -377,7 +406,7 @@ if __name__ == "__main__":
c = Cache(apt.progress.OpTextProgress())
c.connect("cache_pre_change", cache_pre_changed)
c.connect("cache_post_change", cache_post_changed)
- print c.has_key("aptitude")
+ print ("aptitude" in c)
p = c["aptitude"]
print p.name
print len(c)
@@ -397,7 +426,7 @@ if __name__ == "__main__":
for d in ["/tmp/pytest", "/tmp/pytest/partial"]:
if not os.path.exists(d):
os.mkdir(d)
- apt_pkg.Config.Set("Dir::Cache::Archives","/tmp/pytest")
+ apt_pkg.Config.Set("Dir::Cache::Archives", "/tmp/pytest")
pm = apt_pkg.GetPackageManager(c._depcache)
fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress())
c._fetchArchives(fetcher, pm)
@@ -413,7 +442,7 @@ if __name__ == "__main__":
for pkg in f.keys():
#print c[pkg].name
x = f[pkg].name
-
+
print len(f)
print "Testing filtered cache (no argument)"
@@ -426,5 +455,5 @@ if __name__ == "__main__":
for pkg in f.keys():
#print c[pkg].name
x = f[pkg].name
-
+
print len(f)
diff --git a/apt/cdrom.py b/apt/cdrom.py
index 9d4b62cb..61250fc4 100644
--- a/apt/cdrom.py
+++ b/apt/cdrom.py
@@ -1,14 +1,49 @@
+# cdrom.py - CDROM handling
+#
+# Copyright (c) 2005 Canonical
+# Copyright (c) 2009 Julian Andres Klode <jak@debian.org>
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Classes related to cdrom handling."""
+import glob
+
import apt_pkg
-from progress import CdromProgress
+from apt.progress import CdromProgress
+
class Cdrom(object):
+ """Support for apt-cdrom like features.
+
+ This class has several optional parameters for initialisation, which may
+ be used to influence the behaviour of the object:
+
+ The optional parameter `progress` is a CdromProgress() subclass, which will
+ ask for the correct cdrom, etc. If not specified or None, a CdromProgress()
+ object will be used.
+
+ The optional parameter `mountpoint` may be used to specify an alternative
+ mountpoint.
+
+ If the optional parameter `nomount` is True, the cdroms will not be
+ mounted. This is the default behaviour.
+ """
+
def __init__(self, progress=None, mountpoint=None, nomount=True):
- """ Support for apt-cdrom like features.
- Options:
- - progress: optional progress.CdromProgress() subclass
- - mountpoint: optional alternative mountpoint
- - nomount: do not mess with mount/umount the CD
- """
self._cdrom = apt_pkg.GetCdrom()
if progress is None:
self._progress = CdromProgress()
@@ -16,32 +51,36 @@ class Cdrom(object):
self._progress = progress
# see if we have a alternative mountpoint
if mountpoint is not None:
- apt_pkg.Config.Set("Acquire::cdrom::mount",mountpoint)
+ apt_pkg.Config.Set("Acquire::cdrom::mount", mountpoint)
# do not mess with mount points by default
- if nomount is True:
+ if nomount:
apt_pkg.Config.Set("APT::CDROM::NoMount", "true")
else:
apt_pkg.Config.Set("APT::CDROM::NoMount", "false")
+
def add(self):
- " add cdrom to the sources.list "
+ """Add cdrom to the sources.list."""
return self._cdrom.Add(self._progress)
+
def ident(self):
- " identify the cdrom "
+ """Identify the cdrom."""
(res, ident) = self._cdrom.Ident(self._progress)
if res:
return ident
- return None
+
@property
def inSourcesList(self):
- " check if the cdrom is already in the current sources.list "
- cdid = self.ident()
- if cdid is None:
+ """Check if the cdrom is already in the current sources.list."""
+ cd_id = self.ident()
+ if cd_id is None:
# FIXME: throw exception instead
return False
- # FIXME: check sources.list.d/ as well
- for line in open(apt_pkg.Config.FindFile("Dir::Etc::sourcelist")):
- line = line.strip()
- if not line.startswith("#") and cdid in line:
- return True
+ # Get a list of files
+ src = glob.glob(apt_pkg.Config.FindDir("Dir::Etc::sourceparts") + '*')
+ src.append(apt_pkg.Config.FindFile("Dir::Etc::sourcelist"))
+ # Check each file
+ for fname in src:
+ for line in open(fname):
+ if not line.lstrip().startswith("#") and cd_id in line:
+ return True
return False
-
diff --git a/apt/debfile.py b/apt/debfile.py
index b1d436cd..f24f19f4 100644
--- a/apt/debfile.py
+++ b/apt/debfile.py
@@ -19,24 +19,25 @@
# along with GDebi; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
-
-import warnings
-warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
-import apt_inst, apt_pkg
-import sys
-import os
+"""Classes for working with locally available Debian packages."""
from gettext import gettext as _
+import os
+import sys
+
+import apt_inst
+import apt_pkg
+
# Constants for comparing the local package file with the version in the cache
-(VERSION_NONE,
- VERSION_OUTDATED,
- VERSION_SAME,
- VERSION_NEWER) = range(4)
-
+(VERSION_NONE, VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER) = range(4)
+
+
class NoDebArchiveException(IOError):
- pass
+ """Exception which is raised if a file is no Debian archive."""
+
class DebPackage(object):
+ """A Debian Package (.deb file)."""
_supported_data_members = ("data.tar.gz", "data.tar.bz2", "data.tar.lzma")
@@ -44,11 +45,10 @@ class DebPackage(object):
def __init__(self, filename=None, cache=None):
self._cache = cache
- self.file = filename
- self._needPkgs = []
+ self._need_pkgs = []
self._sections = {}
- self._installedConflicts = set()
- self._failureString = ""
+ self._installed_conflicts = set()
+ self._failure_string = ""
if filename:
self.open(filename)
@@ -56,35 +56,42 @@ class DebPackage(object):
" open given debfile "
self.filename = filename
if not apt_inst.arCheckMember(open(self.filename), "debian-binary"):
- raise NoDebArchiveException, _("This is not a valid DEB archive, missing '%s' member" % "debian-binary")
+ raise NoDebArchiveException(_("This is not a valid DEB archive, "
+ "missing '%s' member" %
+ "debian-binary"))
control = apt_inst.debExtractControl(open(self.filename))
self._sections = apt_pkg.ParseSection(control)
self.pkgname = self._sections["Package"]
def __getitem__(self, key):
return self._sections[key]
-
+
+ @property
def filelist(self):
- """ return the list of files in the deb """
+ """return the list of files in the deb."""
files = []
- def extract_cb(What,Name,Link,Mode,UID,GID,Size,MTime,Major,Minor):
- #print "%s '%s','%s',%u,%u,%u,%u,%u,%u,%u"\
- # % (What,Name,Link,Mode,UID,GID,Size, MTime, Major, Minor)
- files.append(Name)
+
+ def extract_cb(what, name, *_):
+ files.append(name)
+
for member in self._supported_data_members:
if apt_inst.arCheckMember(open(self.filename), member):
try:
- apt_inst.debExtract(open(self.filename), extract_cb, member)
+ apt_inst.debExtract(open(self.filename), extract_cb,
+ member)
break
- except SystemError, e:
- return [_("List of files for '%s'could not be read" % self.filename)]
+ except SystemError:
+ return [_("List of files for '%s'could not be read" %
+ self.filename)]
return files
- filelist = property(filelist)
- def _isOrGroupSatisfied(self, or_group):
- """ this function gets a 'or_group' and analyzes if
- at least one dependency of this group is already satisfied """
- self._dbg(2,"_checkOrGroup(): %s " % (or_group))
+ def _is_or_group_satisfied(self, or_group):
+ """Return True if at least one dependency of the or-group is satisfied.
+
+ This method gets an 'or_group' and analyzes if at least one dependency
+ of this group is already satisfied.
+ """
+ self._dbg(2, "_checkOrGroup(): %s " % (or_group))
for dep in or_group:
depname = dep[0]
@@ -92,9 +99,10 @@ class DebPackage(object):
oper = dep[2]
# check for virtual pkgs
- if not self._cache.has_key(depname):
+ if not depname in self._cache:
if self._cache.isVirtualPackage(depname):
- self._dbg(3,"_isOrGroupSatisfied(): %s is virtual dep" % depname)
+ self._dbg(3, "_isOrGroupSatisfied(): %s is virtual dep" %
+ depname)
for pkg in self._cache.getProvidingPackages(depname):
if pkg.isInstalled:
return True
@@ -102,24 +110,17 @@ class DebPackage(object):
inst = self._cache[depname]
instver = inst.installedVersion
- if instver != None and apt_pkg.CheckDep(instver,oper,ver) == True:
+ if instver is not None and apt_pkg.CheckDep(instver, oper, ver):
return True
return False
-
-
- def _satisfyOrGroup(self, or_group):
- """ try to satisfy the or_group """
-
- or_found = False
- virtual_pkg = None
+ def _satisfy_or_group(self, or_group):
+ """Try to satisfy the or_group."""
for dep in or_group:
- depname = dep[0]
- ver = dep[1]
- oper = dep[2]
+ depname, ver, oper = dep
# if we don't have it in the cache, it may be virtual
- if not self._cache.has_key(depname):
+ if not depname in self._cache:
if not self._cache.isVirtualPackage(depname):
continue
providers = self._cache.getProvidingPackages(depname)
@@ -128,55 +129,59 @@ class DebPackage(object):
if len(providers) != 1:
continue
depname = providers[0].name
-
+
# now check if we can satisfy the deps with the candidate(s)
# in the cache
- cand = self._cache[depname]
- candver = self._cache._depcache.GetCandidateVer(cand._pkg)
- if not candver:
+ pkg = self._cache[depname]
+ cand = self._cache._depcache.GetCandidateVer(pkg._pkg)
+ if not cand:
continue
- if not apt_pkg.CheckDep(candver.VerStr,oper,ver):
+ if not apt_pkg.CheckDep(cand.VerStr, oper, ver):
continue
# check if we need to install it
- self._dbg(2,"Need to get: %s" % depname)
- self._needPkgs.append(depname)
+ self._dbg(2, "Need to get: %s" % depname)
+ self._need_pkgs.append(depname)
return True
# if we reach this point, we failed
or_str = ""
for dep in or_group:
or_str += dep[0]
- if dep != or_group[len(or_group)-1]:
+ if dep != or_group[-1]:
or_str += "|"
- self._failureString += _("Dependency is not satisfiable: %s\n" % or_str)
+ self._failure_string += _("Dependency is not satisfiable: %s\n" %
+ or_str)
return False
- def _checkSinglePkgConflict(self, pkgname, ver, oper):
- """ returns true if a pkg conflicts with a real installed/marked
- pkg """
+ def _check_single_pkg_conflict(self, pkgname, ver, oper):
+ """Return True if a pkg conflicts with a real installed/marked pkg."""
# FIXME: deal with conflicts against its own provides
# (e.g. Provides: ftp-server, Conflicts: ftp-server)
- self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" % (pkgname, ver, oper))
- pkgver = None
- cand = self._cache[pkgname]
- if cand.isInstalled:
- pkgver = cand.installedVersion
- elif cand.markedInstall:
- pkgver = cand.candidateVersion
+ self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" %
+ (pkgname, ver, oper))
+
+ pkg = self._cache[pkgname]
+ if pkg.isInstalled:
+ pkgver = pkg.installedVersion
+ elif pkg.markedInstall:
+ pkgver = pkg.candidateVersion
+ else:
+ return False
#print "pkg: %s" % pkgname
#print "ver: %s" % ver
#print "pkgver: %s " % pkgver
#print "oper: %s " % oper
- if (pkgver and apt_pkg.CheckDep(pkgver,oper,ver) and
- not self.replacesRealPkg(pkgname, oper, ver)):
- self._failureString += _("Conflicts with the installed package '%s'" % cand.name)
+ if (apt_pkg.CheckDep(pkgver, oper, ver) and not
+ self.replaces_real_pkg(pkgname, oper, ver)):
+ self._failure_string += _("Conflicts with the installed package "
+ "'%s'" % pkg.name)
return True
return False
- def _checkConflictsOrGroup(self, or_group):
- """ check the or-group for conflicts with installed pkgs """
- self._dbg(2,"_checkConflictsOrGroup(): %s " % (or_group))
+ def _check_conflicts_or_group(self, or_group):
+ """Check the or-group for conflicts with installed pkgs."""
+ self._dbg(2, "_check_conflicts_or_group(): %s " % (or_group))
or_found = False
virtual_pkg = None
@@ -187,152 +192,116 @@ class DebPackage(object):
oper = dep[2]
# check conflicts with virtual pkgs
- if not self._cache.has_key(depname):
- # FIXME: we have to check for virtual replaces here as
+ if not depname in self._cache:
+ # FIXME: we have to check for virtual replaces here as
# well (to pass tests/gdebi-test8.deb)
if self._cache.isVirtualPackage(depname):
for pkg in self._cache.getProvidingPackages(depname):
self._dbg(3, "conflicts virtual check: %s" % pkg.name)
# P/C/R on virtal pkg, e.g. ftpd
- if self.pkgName == pkg.name:
+ if self.pkgname == pkg.name:
self._dbg(3, "conflict on self, ignoring")
continue
- if self._checkSinglePkgConflict(pkg.name,ver,oper):
- self._installedConflicts.add(pkg.name)
+ if self._check_single_pkg_conflict(pkg.name, ver, oper):
+ self._installed_conflicts.add(pkg.name)
continue
- if self._checkSinglePkgConflict(depname,ver,oper):
- self._installedConflicts.add(depname)
- return len(self._installedConflicts) != 0
-
- def getConflicts(self):
- """
- Return list of package names conflicting with this package.
-
- WARNING: This method will is deprecated. Please use the
- attribute DebPackage.depends instead.
- """
- return self.conflicts
+ if self._check_single_pkg_conflict(depname, ver, oper):
+ self._installed_conflicts.add(depname)
+ return bool(self._installed_conflicts)
+ @property
def conflicts(self):
- """
- List of package names conflicting with this package
- """
- conflicts = []
+ """List of package names conflicting with this package."""
key = "Conflicts"
- if self._sections.has_key(key):
- conflicts = apt_pkg.ParseDepends(self._sections[key])
- return conflicts
- conflicts = property(conflicts)
-
- def getDepends(self):
- """
- Return list of package names on which this package depends on.
-
- WARNING: This method will is deprecated. Please use the
- attribute DebPackage.depends instead.
- """
- return self.depends
+ try:
+ return apt_pkg.ParseDepends(self._sections[key])
+ except KeyError:
+ return []
+ @property
def depends(self):
- """
- List of package names on which this package depends on
- """
+ """List of package names on which this package depends on."""
depends = []
# find depends
- for key in ["Depends","PreDepends"]:
- if self._sections.has_key(key):
+ for key in "Depends", "PreDepends":
+ try:
depends.extend(apt_pkg.ParseDepends(self._sections[key]))
+ except KeyError:
+ pass
return depends
- depends = property(depends)
-
- def getProvides(self):
- """
- Return list of virtual packages which are provided by this package.
-
- WARNING: This method will is deprecated. Please use the
- attribute DebPackage.provides instead.
- """
- return self.provides
+ @property
def provides(self):
- """
- List of virtual packages which are provided by this package
- """
- provides = []
+ """List of virtual packages which are provided by this package."""
key = "Provides"
- if self._sections.has_key(key):
- provides = apt_pkg.ParseDepends(self._sections[key])
- return provides
- provides = property(provides)
-
- def getReplaces(self):
- """
- Return list of packages which are replaced by this package.
-
- WARNING: This method will is deprecated. Please use the
- attribute DebPackage.replaces instead.
- """
- return self.replaces
+ try:
+ return apt_pkg.ParseDepends(self._sections[key])
+ except KeyError:
+ return []
+ @property
def replaces(self):
- """
- List of packages which are replaced by this package
- """
- replaces = []
+ """List of packages which are replaced by this package."""
key = "Replaces"
- if self._sections.has_key(key):
- replaces = apt_pkg.ParseDepends(self._sections[key])
- return replaces
- replaces = property(replaces)
-
- def replacesRealPkg(self, pkgname, oper, ver):
- """
- return True if the deb packages replaces a real (not virtual)
- packages named pkgname, oper, ver
+ try:
+ return apt_pkg.ParseDepends(self._sections[key])
+ except KeyError:
+ return []
+
+ def replaces_real_pkg(self, pkgname, oper, ver):
+ """Return True if a given non-virtual package is replaced.
+
+ Return True if the deb packages replaces a real (not virtual)
+ packages named (pkgname, oper, ver).
"""
- self._dbg(3, "replacesPkg() %s %s %s" % (pkgname,oper,ver))
- pkgver = None
- cand = self._cache[pkgname]
- if cand.isInstalled:
- pkgver = cand.installedVersion
- elif cand.markedInstall:
- pkgver = cand.candidateVersion
- for or_group in self.getReplaces():
+ self._dbg(3, "replacesPkg() %s %s %s" % (pkgname, oper, ver))
+ pkg = self._cache[pkgname]
+ if pkg.isInstalled:
+ pkgver = pkg.installedVersion
+ elif pkg.markedInstall:
+ pkgver = pkg.candidateVersion
+ else:
+ pkgver = None
+ for or_group in self.replaces:
for (name, ver, oper) in or_group:
- if (name == pkgname and
- apt_pkg.CheckDep(pkgver,oper,ver)):
- self._dbg(3, "we have a replaces in our package for the conflict against '%s'" % (pkgname))
+ if (name == pkgname and apt_pkg.CheckDep(pkgver, oper, ver)):
+ self._dbg(3, "we have a replaces in our package for the "
+ "conflict against '%s'" % (pkgname))
return True
return False
- def checkConflicts(self):
- """ check if the pkg conflicts with a existing or to be installed
- package. Return True if the pkg is ok """
+ def check_conflicts(self):
+ """Check if there are conflicts with existing or selected packages.
+
+ Check if the package conflicts with a existing or to be installed
+ package. Return True if the pkg is OK.
+ """
res = True
- for or_group in self.getConflicts():
- if self._checkConflictsOrGroup(or_group):
+ for or_group in self.conflicts:
+ if self._check_conflicts_or_group(or_group):
#print "Conflicts with a exisiting pkg!"
- #self._failureString = "Conflicts with a exisiting pkg!"
+ #self._failure_string = "Conflicts with a exisiting pkg!"
res = False
return res
-
- def compareToVersionInCache(self, useInstalled=True):
- """ checks if the pkg is already installed or availabe in the cache
- and if so in what version, returns if the version of the deb
- is not available,older,same,newer
+ def compare_to_version_in_cache(self, use_installed=True):
+ """Compare the package to the version available in the cache.
+
+ Checks if the package is already installed or availabe in the cache
+ and if so in what version, returns one of (VERSION_NONE,
+ VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER).
"""
- self._dbg(3,"compareToVersionInCache")
+ self._dbg(3, "compareToVersionInCache")
pkgname = self._sections["Package"]
debver = self._sections["Version"]
- self._dbg(1,"debver: %s" % debver)
- if self._cache.has_key(pkgname):
- if useInstalled:
+ self._dbg(1, "debver: %s" % debver)
+ if pkgname in self._cache:
+ if use_installed:
cachever = self._cache[pkgname].installedVersion
else:
cachever = self._cache[pkgname].candidateVersion
- if cachever != None:
- cmp = apt_pkg.VersionCompare(cachever,debver)
+ if cachever is not None:
+ cmp = apt_pkg.VersionCompare(cachever, debver)
self._dbg(1, "CompareVersion(debver,instver): %s" % cmp)
if cmp == 0:
return VERSION_SAME
@@ -342,82 +311,88 @@ class DebPackage(object):
return VERSION_OUTDATED
return VERSION_NONE
- def checkDeb(self):
- self._dbg(3,"checkDepends")
+ def check(self):
+ """Check if the package is installable."""
+ self._dbg(3, "checkDepends")
# check arch
arch = self._sections["Architecture"]
if arch != "all" and arch != apt_pkg.Config.Find("APT::Architecture"):
- self._dbg(1,"ERROR: Wrong architecture dude!")
- self._failureString = _("Wrong architecture '%s'" % arch)
+ self._dbg(1, "ERROR: Wrong architecture dude!")
+ self._failure_string = _("Wrong architecture '%s'" % arch)
return False
# check version
- res = self.compareToVersionInCache()
- if res == VERSION_OUTDATED: # the deb is older than the installed
- self._failureString = _("A later version is already installed")
+ if self.compare_to_version_in_cache() == VERSION_OUTDATED:
+ # the deb is older than the installed
+ self._failure_string = _("A later version is already installed")
return False
# FIXME: this sort of error handling sux
- self._failureString = ""
+ self._failure_string = ""
# check conflicts
- if not self.checkConflicts():
+ if not self.check_conflicts():
return False
# try to satisfy the dependencies
- res = self._satisfyDepends(self.getDepends())
- if not res:
+ if not self._satisfy_depends(self.depends):
return False
# check for conflicts again (this time with the packages that are
# makeed for install)
- if not self.checkConflicts():
+ if not self.check_conflicts():
return False
if self._cache._depcache.BrokenCount > 0:
- self._failureString = _("Failed to satisfy all dependencies (broken cache)")
+ self._failure_string = _("Failed to satisfy all dependencies "
+ "(broken cache)")
# clean the cache again
self._cache.clear()
return False
return True
- def satisfyDependsStr(self, dependsstr):
- return self._satisfyDepends(apt_pkg.ParseDepends(dependsstr))
+ def satisfy_depends_str(self, dependsstr):
+ """Satisfy the dependencies in the given string."""
+ return self._satisfy_depends(apt_pkg.ParseDepends(dependsstr))
- def _satisfyDepends(self, depends):
+ def _satisfy_depends(self, depends):
+ """Satisfy the dependencies."""
# turn off MarkAndSweep via a action group (if available)
try:
_actiongroup = apt_pkg.GetPkgActionGroup(self._cache._depcache)
- except AttributeError, e:
+ except AttributeError:
pass
# check depends
for or_group in depends:
#print "or_group: %s" % or_group
- #print "or_group satified: %s" % self._isOrGroupSatisfied(or_group)
- if not self._isOrGroupSatisfied(or_group):
- if not self._satisfyOrGroup(or_group):
+ #print "or_group satified: %s" % self._is_or_group_satisfied(or_group)
+ if not self._is_or_group_satisfied(or_group):
+ if not self._satisfy_or_group(or_group):
return False
# now try it out in the cache
- for pkg in self._needPkgs:
- try:
- self._cache[pkg].markInstall(fromUser=False)
- except SystemError, e:
- self._failureString = _("Cannot install '%s'" % pkg)
- self._cache.clear()
- return False
+ for pkg in self._need_pkgs:
+ try:
+ self._cache[pkg].markInstall(fromUser=False)
+ except SystemError, e:
+ self._failure_string = _("Cannot install '%s'" % pkg)
+ self._cache.clear()
+ return False
return True
- def missingDeps(self):
- self._dbg(1, "Installing: %s" % self._needPkgs)
- if self._needPkgs == None:
- self.checkDeb()
- return self._needPkgs
- missingDeps = property(missingDeps)
+ @property
+ def missing_deps(self):
+ """Return missing dependencies."""
+ self._dbg(1, "Installing: %s" % self._need_pkgs)
+ if self._need_pkgs is None:
+ self.check()
+ return self._need_pkgs
- def requiredChanges(self):
- """ gets the required changes to satisfy the depends.
- returns a tuple with (install, remove, unauthenticated)
+ @property
+ def required_changes(self):
+ """Get the changes required to satisfy the dependencies.
+
+ Returns: a tuple with (install, remove, unauthenticated)
"""
install = []
remove = []
@@ -434,95 +409,118 @@ class DebPackage(object):
unauthenticated.append(pkg.name)
if pkg.markedDelete:
remove.append(pkg.name)
- return (install,remove, unauthenticated)
- requiredChanges = property(requiredChanges)
+ return (install, remove, unauthenticated)
def _dbg(self, level, msg):
- """Write debugging output to sys.stderr.
- """
+ """Write debugging output to sys.stderr."""
if level <= self.debug:
print >> sys.stderr, msg
- def install(self, installProgress=None):
- """ Install the package """
- if installProgress == None:
- res = os.system("/usr/sbin/dpkg -i %s" % self.filename)
+ def install(self, install_progress=None):
+ """Install the package."""
+ if install_progress is None:
+ return os.system("dpkg -i %s" % self.filename)
else:
- installProgress.startUpdate()
- res = installProgress.run(self.filename)
- installProgress.finishUpdate()
- return res
+ try:
+ install_progress.start_update()
+ except AttributeError:
+ install_progress.startUpdate()
+ res = install_progress.run(self.filename)
+ try:
+ install_progress.finish_update()
+ except AttributeError:
+ install_progress.finishUpdate()
+ return res
+
class DscSrcPackage(DebPackage):
+ """A locally available source package."""
+
def __init__(self, filename=None, cache=None):
- DebPackage.__init__(self, filename, cache)
- self.depends = []
- self.conflicts = []
- self.binaries = []
- if filename != None:
+ DebPackage.__init__(self, None, cache)
+ self._depends = []
+ self._conflicts = []
+ self._binaries = []
+ if filename is not None:
self.open(filename)
- def getConflicts(self):
- return self.conflicts
- def getDepends(self):
- return self.depends
+
+ @property
+ def depends(self):
+ """Return the dependencies of the package"""
+ return self._depends
+
+ @property
+ def conflicts(self):
+ """Return the dependencies of the package"""
+ return self._conflicts
+
def open(self, file):
- depends_tags = ["Build-Depends:", "Build-Depends-Indep:"]
- conflicts_tags = ["Build-Conflicts:", "Build-Conflicts-Indep:"]
- for line in open(file):
- # check b-d and b-c
- for tag in depends_tags:
- if line.startswith(tag):
- key = line[len(tag):].strip()
- self.depends.extend(apt_pkg.ParseSrcDepends(key))
- for tag in conflicts_tags:
- if line.startswith(tag):
- key = line[len(tag):].strip()
- self.conflicts.extend(apt_pkg.ParseSrcDepends(key))
- # check binary and source and version
- if line.startswith("Source:"):
- self.pkgName = line[len("Source:"):].strip()
- if line.startswith("Binary:"):
- self.binaries = [pkg.strip() for pkg in line[len("Binary:"):].split(",")]
- if line.startswith("Version:"):
- self._sections["Version"] = line[len("Version:"):].strip()
- # we are at the end
- if line.startswith("-----BEGIN PGP SIGNATURE-"):
- break
+ """Open the package."""
+ depends_tags = ["Build-Depends", "Build-Depends-Indep"]
+ conflicts_tags = ["Build-Conflicts", "Build-Conflicts-Indep"]
+
+ fobj = open(file)
+ tagfile = apt_pkg.ParseTagFile(fobj)
+ sec = tagfile.Section
+ try:
+ while tagfile.Step() == 1:
+ for tag in depends_tags:
+ if not sec.has_key(tag):
+ continue
+ self._depends.extend(apt_pkg.ParseSrcDepends(sec[tag]))
+ for tag in conflicts_tags:
+ if not sec.has_key(tag):
+ continue
+ self._conflicts.extend(apt_pkg.ParseSrcDepends(sec[tag]))
+ if sec.has_key('Source'):
+ self.pkgname = sec['Source']
+ if sec.has_key('Binary'):
+ self.binaries = sec['Binary'].split(', ')
+ if sec.has_key('Version'):
+ self._sections['Version'] = sec['Version']
+ finally:
+ del sec
+ del tagfile
+ fobj.close()
+
s = _("Install Build-Dependencies for "
- "source package '%s' that builds %s\n"
- ) % (self.pkgName, " ".join(self.binaries))
+ "source package '%s' that builds %s\n") % (self.pkgname,
+ " ".join(self.binaries))
self._sections["Description"] = s
-
- def checkDeb(self):
- if not self.checkConflicts():
- for pkgname in self._installedConflicts:
+
+ def check(self):
+ """Check if the package is installable.."""
+ if not self.check_conflicts():
+ for pkgname in self._installed_conflicts:
if self._cache[pkgname]._pkg.Essential:
- raise Exception, _("A essential package would be removed")
+ raise Exception(_("An essential package would be removed"))
self._cache[pkgname].markDelete()
# FIXME: a additional run of the checkConflicts()
# after _satisfyDepends() should probably be done
- return self._satisfyDepends(self.depends)
+ return self._satisfy_depends(self.depends)
-if __name__ == "__main__":
- from cache import Cache
- from progress import DpkgInstallProgress
+
+def _test():
+ """Test function"""
+ from apt.cache import Cache
+ from apt.progress import DpkgInstallProgress
cache = Cache()
vp = "www-browser"
- print "%s virtual: %s" % (vp,cache.isVirtualPackage(vp))
+ #print "%s virtual: %s" % (vp, cache.isVirtualPackage(vp))
providers = cache.getProvidingPackages(vp)
print "Providers for %s :" % vp
for pkg in providers:
print " %s" % pkg.name
-
+
d = DebPackage(sys.argv[1], cache)
print "Deb: %s" % d.pkgname
- if not d.checkDeb():
+ if not d.check():
print "can't be satified"
- print d._failureString
- print "missing deps: %s" % d.missingDeps
- print d.requiredChanges
+ print d._failure_string
+ print "missing deps: %s" % d.missing_deps
+ print d.required_changes
print "Installing ..."
ret = d.install(DpkgInstallProgress())
@@ -535,4 +533,7 @@ if __name__ == "__main__":
s = DscSrcPackage(cache=cache)
d = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)"
- print s._satisfyDepends(apt_pkg.ParseDepends(d))
+ print s._satisfy_depends(apt_pkg.ParseDepends(d))
+
+if __name__ == "__main__":
+ _test()
diff --git a/apt/gtk/widgets.py b/apt/gtk/widgets.py
index 3a15258f..34cc2759 100644
--- a/apt/gtk/widgets.py
+++ b/apt/gtk/widgets.py
@@ -1,28 +1,26 @@
#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-widgets - GTK widgets to show the progress and status of apt
-
-Copyright (c) 2004,2005 Canonical Ltd.
-
-Authors: Michael Vogt <mvo@ubuntu.com>
- Sebastian Heinlein <glatzor@ubuntu.com>
-
-This program is free software; you can redistribute it and/or
-modify it under the terms of the GNU General Public License as
-published by the Free Software Foundation; either version 2 of the
-License, or (at your option) any later version.
-
-his program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
-USA
-"""
+#
+# Copyright (c) 2004-2005 Canonical
+#
+# Authors: Michael Vogt <michael.vogt@ubuntu.com>
+# Sebastian Heinlein <glatzor@ubuntu.com>
+# Julian Andres Klode <jak@debian.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# his program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""GObject-powered progress classes and a GTK+ status widget."""
from gettext import gettext as _
import os
@@ -39,15 +37,32 @@ import vte
import apt
import apt_pkg
+
+def mksig(params=(), run=gobject.SIGNAL_RUN_FIRST, rettype=gobject.TYPE_NONE):
+ """Simplified Create a gobject signal.
+
+ This allows us to write signals easier, because we just need to define the
+ type of the parameters (in most cases).
+
+ ``params`` is a tuple which defines the types of the arguments.
+ """
+ return (run, rettype, params)
+
+
class GOpProgress(gobject.GObject, apt.progress.OpProgress):
+ """Operation progress with GObject signals.
+
+ Signals:
- __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
- (gobject.TYPE_STRING, gobject.TYPE_INT)),
- "status-started":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ()),
- "status-finished":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ())}
+ * status-changed(str: operation, int: percent)
+ * status-started() - Not Implemented yet
+ * status-finished()
+
+ """
+
+ __gsignals__ = {"status-changed": mksig((str, int)),
+ "status-started": mksig(),
+ "status-finished": mksig()}
def __init__(self):
apt.progress.OpProgress.__init__(self)
@@ -55,31 +70,38 @@ class GOpProgress(gobject.GObject, apt.progress.OpProgress):
self._context = glib.main_context_default()
def update(self, percent):
+ """Called to update the percentage done"""
self.emit("status-changed", self.op, percent)
while self._context.pending():
self._context.iteration()
def done(self):
+ """Called when all operation have finished."""
self.emit("status-finished")
+
class GInstallProgress(gobject.GObject, apt.progress.InstallProgress):
+ """Installation progress with GObject signals.
+
+ Signals:
+
+ * status-changed(str: status, int: percent)
+ * status-started()
+ * status-finished()
+ * status-timeout()
+ * status-error()
+ * status-conffile()
+ """
# Seconds until a maintainer script will be regarded as hanging
INSTALL_TIMEOUT = 5 * 60
- __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
- (gobject.TYPE_STRING, gobject.TYPE_INT)),
- "status-started":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ()),
- "status-timeout":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ()),
- "status-error":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ()),
- "status-conffile":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ()),
- "status-finished":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ())}
+ __gsignals__ = {"status-changed": mksig((str, int)),
+ "status-started": mksig(),
+ "status-timeout": mksig(),
+ "status-error": mksig(),
+ "status-conffile": mksig(),
+ "status-finished": mksig()}
def __init__(self, term):
apt.progress.InstallProgress.__init__(self)
@@ -89,32 +111,57 @@ class GInstallProgress(gobject.GObject, apt.progress.InstallProgress):
self.term = term
reaper = vte.reaper_get()
reaper.connect("child-exited", self.childExited)
- self.env = ["VTE_PTY_KEEP_FD=%s"% self.writefd,
+ self.env = ["VTE_PTY_KEEP_FD=%s" % self.writefd,
"DEBIAN_FRONTEND=gnome",
"APT_LISTCHANGES_FRONTEND=gtk"]
self._context = glib.main_context_default()
def childExited(self, term, pid, status):
+ """Called when a child process exits"""
self.apt_status = os.WEXITSTATUS(status)
self.finished = True
def error(self, pkg, errormsg):
+ """Called when an error happens.
+
+ Emits: status-error()
+ """
self.emit("status-error")
def conffile(self, current, new):
+ """Called during conffile.
+
+ Emits: status-conffile()
+ """
self.emit("status-conffile")
def startUpdate(self):
+ """Called when the update starts.
+
+ Emits: status-started()
+ """
self.emit("status-started")
def finishUpdate(self):
+ """Called when the update finished.
+
+ Emits: status-finished()
+ """
self.emit("status-finished")
def statusChange(self, pkg, percent, status):
+ """Called when the status changed.
+
+ Emits: status-changed(status, percent)
+ """
self.time_last_update = time.time()
self.emit("status-changed", status, percent)
def updateInterface(self):
+ """Called periodically to update the interface.
+
+ Emits: status-timeout() [When a timeout happens]
+ """
apt.progress.InstallProgress.updateInterface(self)
while self._context.pending():
self._context.iteration()
@@ -122,34 +169,55 @@ class GInstallProgress(gobject.GObject, apt.progress.InstallProgress):
self.emit("status-timeout")
def fork(self):
+ """Fork the process."""
return self.term.forkpty(envv=self.env)
def waitChild(self):
+ """Wait for the child process to exit."""
while not self.finished:
self.updateInterface()
return self.apt_status
-class GDpkgInstallProgress(apt.progress.DpkgInstallProgress,GInstallProgress):
+class GDpkgInstallProgress(apt.progress.DpkgInstallProgress, GInstallProgress):
+ """An InstallProgress for local installations.
+
+ Signals:
+
+ * status-changed(str: status, int: percent)
+ * status-started() - Not Implemented yet
+ * status-finished()
+ * status-timeout() - When the maintainer script hangs
+ * status-error() - When an error happens
+ * status-conffile() - On Conffile
+ """
def run(self, debfile):
+ """Install the given package."""
apt.progress.DpkgInstallProgress.run(self, debfile)
def updateInterface(self):
+ """Called periodically to update the interface.
+
+ Emits: status-timeout() [When a timeout happens]"""
apt.progress.DpkgInstallProgress.updateInterface(self)
if self.time_last_update + self.INSTALL_TIMEOUT < time.time():
self.emit("status-timeout")
class GFetchProgress(gobject.GObject, apt.progress.FetchProgress):
+ """A Fetch Progress with GObject signals.
- __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
- (gobject.TYPE_STRING, gobject.TYPE_INT)),
- "status-started":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ()),
- "status-finished":(gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ())}
+ Signals:
+
+ * status-changed(str: description, int: percent)
+ * status-started()
+ * status-finished()
+ """
+
+ __gsignals__ = {"status-changed": mksig((str, int)),
+ "status-started": mksig(),
+ "status-finished": mksig()}
def __init__(self):
apt.progress.FetchProgress.__init__(self)
@@ -170,17 +238,17 @@ class GFetchProgress(gobject.GObject, apt.progress.FetchProgress):
apt.progress.FetchProgress.pulse(self)
currentItem = self.currentItems + 1
if currentItem > self.totalItems:
- currentItem = self.totalItems
+ currentItem = self.totalItems
if self.currentCPS > 0:
text = (_("Downloading file %(current)li of %(total)li with "
"%(speed)s/s") % \
- {"current" : currentItem,
- "total" : self.totalItems,
- "speed" : humanize_size(self.currentCPS)})
+ {"current": currentItem,
+ "total": self.totalItems,
+ "speed": apt_pkg.SizeToStr(self.currentCPS)})
else:
text = (_("Downloading file %(current)li of %(total)li") % \
- {"current" : currentItem,
- "total" : self.totalItems })
+ {"current": currentItem,
+ "total": self.totalItems})
self.emit("status-changed", text, self.percent)
while self._context.pending():
self._context.iteration()
@@ -188,33 +256,12 @@ class GFetchProgress(gobject.GObject, apt.progress.FetchProgress):
class GtkAptProgress(gtk.VBox):
- """
+ """Graphical progress for installation/fetch/operations.
+
This widget provides a progress bar, a terminal and a status bar for
showing the progress of package manipulation tasks.
-
- A simple example code snippet to install/remove a package:
-
- import pygtk
- pygtk.require('2.0')
- import gtk
-
- import apt.widgets
-
- win = gtk.Window()
- progress = apt.widgets.GtkAptProgress()
- win.set_title("GtkAptProgress Demo")
- win.add(progress)
- progress.show()
- win.show()
-
- cache = apt.cache.Cache(progress.open))
- cache["xterm"].markDelete()
- progress.show_terminal(expanded=True)
- cache.commit(progress.fetch),
- progress.install)
-
- gtk.main()
"""
+
def __init__(self):
gtk.VBox.__init__(self)
self.set_spacing(6)
@@ -226,7 +273,7 @@ class GtkAptProgress(gtk.VBox):
self._progressbar = gtk.ProgressBar()
# Setup the always italic status label
self._label = gtk.Label()
- attr_list = pango.AttrList()
+ attr_list = pango.AttrList()
attr_list.insert(pango.AttrStyle(pango.STYLE_ITALIC, 0, -1))
self._label.set_attributes(attr_list)
self._label.set_ellipsize(pango.ELLIPSIZE_END)
@@ -239,86 +286,80 @@ class GtkAptProgress(gtk.VBox):
self._progress_open = GOpProgress()
self._progress_open.connect("status-changed", self._on_status_changed)
self._progress_open.connect("status-started", self._on_status_started)
- self._progress_open.connect("status-finished", self._on_status_finished)
+ self._progress_open.connect("status-finished",
+ self._on_status_finished)
self._progress_fetch = GFetchProgress()
self._progress_fetch.connect("status-changed", self._on_status_changed)
self._progress_fetch.connect("status-started", self._on_status_started)
- self._progress_fetch.connect("status-finished",
+ self._progress_fetch.connect("status-finished",
self._on_status_finished)
self._progress_install = GInstallProgress(self._terminal)
self._progress_install.connect("status-changed",
self._on_status_changed)
- self._progress_install.connect("status-started",
+ self._progress_install.connect("status-started",
self._on_status_started)
- self._progress_install.connect("status-finished",
+ self._progress_install.connect("status-finished",
self._on_status_finished)
- self._progress_install.connect("status-timeout",
+ self._progress_install.connect("status-timeout",
self._on_status_timeout)
- self._progress_install.connect("status-error",
+ self._progress_install.connect("status-error",
self._on_status_timeout)
- self._progress_install.connect("status-conffile",
+ self._progress_install.connect("status-conffile",
self._on_status_timeout)
self._progress_dpkg_install = GDpkgInstallProgress(self._terminal)
self._progress_dpkg_install.connect("status-changed",
self._on_status_changed)
- self._progress_dpkg_install.connect("status-started",
+ self._progress_dpkg_install.connect("status-started",
self._on_status_started)
- self._progress_dpkg_install.connect("status-finished",
+ self._progress_dpkg_install.connect("status-finished",
self._on_status_finished)
- self._progress_dpkg_install.connect("status-timeout",
+ self._progress_dpkg_install.connect("status-timeout",
self._on_status_timeout)
- self._progress_dpkg_install.connect("status-error",
+ self._progress_dpkg_install.connect("status-error",
self._on_status_timeout)
- self._progress_dpkg_install.connect("status-conffile",
+ self._progress_dpkg_install.connect("status-conffile",
self._on_status_timeout)
def clear(self):
- """
- Reset all status information
- """
+ """Reset all status information."""
self._label.set_label("")
- self._progress.set_fraction(0)
+ self._progressbar.set_fraction(0)
self._expander.set_expanded(False)
@property
def open(self):
- """
- Return the cache opening progress handler.
- """
+ """Return the cache opening progress handler."""
return self._progress_open
@property
def install(self):
- """
- Return the install progress handler
- """
+ """Return the install progress handler."""
return self._progress_install
@property
def dpkg_install(self):
- """
- Return the install progress handler for dpkg
- """
+ """Return the install progress handler for dpkg."""
return self._dpkg_progress_install
-
+
@property
def fetch(self):
- """
- Return the fetch progress handler
- """
+ """Return the fetch progress handler."""
return self._progress_fetch
def _on_status_started(self, progress):
+ """Called when something starts."""
self._on_status_changed(progress, _("Starting..."), 0)
while gtk.events_pending():
gtk.main_iteration()
def _on_status_finished(self, progress):
+ """Called when something finished."""
self._on_status_changed(progress, _("Complete"), 100)
while gtk.events_pending():
gtk.main_iteration()
def _on_status_changed(self, progress, status, percent):
+ """Called when the status changed."""
self._label.set_text(status)
if percent is None:
self._progressbar.pulse()
@@ -328,18 +369,18 @@ class GtkAptProgress(gtk.VBox):
gtk.main_iteration()
def _on_status_timeout(self, progress):
- selt._expander.set_expanded(True)
+ """Called when timeout happens."""
+ self._expander.set_expanded(True)
while gtk.events_pending():
gtk.main_iteration()
def cancel_download(self):
- """
- Cancel a currently running download
- """
+ """Cancel a currently running download."""
self._progress_fetch.cancel()
def show_terminal(self, expanded=False):
- """
+ """Show the expander for the terminal.
+
Show an expander with a terminal widget which provides a way
to interact with dpkg
"""
@@ -350,31 +391,33 @@ class GtkAptProgress(gtk.VBox):
gtk.main_iteration()
def hide_terminal(self):
- """
- Hide the expander with the terminal widget
- """
+ """Hide the expander with the terminal widget."""
self._expander.hide()
while gtk.events_pending():
gtk.main_iteration()
def show(self):
+ """Show the Box"""
gtk.HBox.show(self)
self._label.show()
self._progressbar.show()
while gtk.events_pending():
gtk.main_iteration()
-if __name__ == "__main__":
+
+def _test():
+ """Test function"""
import sys
- import debfile
+
+ from apt.debfile import DebPackage
win = gtk.Window()
- apt_progress = GAptProgress()
+ apt_progress = GtkAptProgress()
win.set_title("GtkAptProgress Demo")
win.add(apt_progress)
apt_progress.show()
win.show()
- cache = apt.cache.Cache(apt_progress.get_open_progress())
+ cache = apt.cache.Cache(apt_progress.open)
pkg = cache["xterm"]
if pkg.isInstalled:
pkg.markDelete()
@@ -382,13 +425,16 @@ if __name__ == "__main__":
pkg.markInstall()
apt_progress.show_terminal(True)
try:
- cache.commit(apt_progress.get_fetch_progress(),
- apt_progress.get_install_progress())
- except:
- pass
+ cache.commit(apt_progress.fetch, apt_progress.install)
+ except Exception, exc:
+ print >> sys.stderr, "Exception happened:", exc
if len(sys.argv) > 1:
deb = DebPackage(sys.argv[1], cache)
- deb.install(apt_progress.get_dpkg_install_progress())
+ deb.install(apt_progress.dpkg_install)
gtk.main()
+
+if __name__ == "__main__":
+ _test()
+
# vim: ts=4 et sts=4
diff --git a/apt/package.py b/apt/package.py
index 70ddbb1a..7817c64c 100644
--- a/apt/package.py
+++ b/apt/package.py
@@ -1,73 +1,153 @@
# package.py - apt package abstraction
-#
+#
# Copyright (c) 2005 Canonical
-#
+#
# Author: Michael Vogt <michael.vogt@ubuntu.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
-#
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
+#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
-
+"""Functionality related to packages."""
+import gettext
import httplib
import sys
-import random
import re
import socket
-import string
import urllib2
import apt_pkg
+
+__all__ = 'BaseDependency', 'Dependency', 'Origin', 'Package', 'Record'
+
+
# Set a timeout for the changelog download
socket.setdefaulttimeout(2)
-#from gettext import gettext as _
-import gettext
-def _(s): return gettext.dgettext("python-apt", s)
+
+def _(string):
+ """Return the translation of the string."""
+ return gettext.dgettext("python-apt", string)
+
class BaseDependency(object):
- " a single dependency "
+ """A single dependency.
+
+ Attributes defined here:
+ name - The name of the dependency
+ relation - The relation (>>,>=,==,<<,<=,)
+ version - The version depended on
+ preDepend - Boolean value whether this is a pre-dependency.
+ """
+
def __init__(self, name, rel, ver, pre):
self.name = name
self.relation = rel
self.version = ver
self.preDepend = pre
+
class Dependency(object):
+ """Represent an Or-group of dependencies.
+
+ Attributes defined here:
+ or_dependencies - The possible choices
+ """
+
def __init__(self, alternatives):
self.or_dependencies = alternatives
+
+class Origin(object):
+ """The origin of a version.
+
+ Attributes defined here:
+ archive - The archive (eg. unstable)
+ component - The component (eg. main)
+ label - The Label, as set in the Release file
+ origin - The Origin, as set in the Release file
+ site - The hostname of the site.
+ trusted - Boolean value whether this is trustworthy.
+ """
+
+ def __init__(self, pkg, VerFileIter):
+ self.archive = VerFileIter.Archive
+ self.component = VerFileIter.Component
+ self.label = VerFileIter.Label
+ self.origin = VerFileIter.Origin
+ self.site = VerFileIter.Site
+ # check the trust
+ indexfile = pkg._list.FindIndex(VerFileIter)
+ if indexfile and indexfile.IsTrusted:
+ self.trusted = True
+ else:
+ self.trusted = False
+
+ def __repr__(self):
+ return ("<Origin component:'%s' archive:'%s' origin:'%s' label:'%s'"
+ "site:'%s' isTrusted:'%s'>") % (self.component, self.archive,
+ self.origin, self.label,
+ self.site, self.trusted)
+
+
class Record(object):
- """ represents a pkgRecord, can be accessed like a
- dictionary and gives the original package record
- if accessed as a string """
- def __init__(self, s):
- self._str = s
- self._rec = apt_pkg.ParseSection(s)
+ """Represent a pkgRecord.
+
+ It can be accessed like a dictionary and can also give the original package
+ record if accessed as a string.
+ """
+
+ def __init__(self, record_str):
+ self._rec = apt_pkg.ParseSection(record_str)
+
def __str__(self):
- return self._str
+ return str(self._rec)
+
def __getitem__(self, key):
- k = self._rec.get(key)
- if k is None:
- raise KeyError
- return k
+ return self._rec[key]
+
+ def __contains__(self, key):
+ return self._rec.has_key(key)
+
+ def __iter__(self):
+ return iter(self._rec.keys())
+
+ def iteritems(self):
+ """An iterator over the (key, value) items of the record."""
+ for key in self._rec.keys():
+ yield key, self._rec[key]
+
+ def get(self, key, default=None):
+ """Return record[key] if key in record, else `default`.
+
+ The parameter `default` must be either a string or None.
+ """
+ return self._rec.get(key, default)
+
def has_key(self, key):
+ """deprecated form of 'key in x'."""
return self._rec.has_key(key)
+
class Package(object):
- """ This class represents a package in the cache
+ """Representation of a package in a cache.
+
+ This class provides methods and properties for working with a package. It
+ lets you mark the package for installation, check if it is installed, and
+ much more.
"""
+
def __init__(self, cache, depcache, records, sourcelist, pcache, pkgiter):
""" Init the Package object """
self._cache = cache # low level cache
@@ -77,175 +157,186 @@ class Package(object):
self._list = sourcelist # sourcelist
self._pcache = pcache # python cache in cache.py
self._changelog = "" # Cached changelog
- pass
- # helper
def _lookupRecord(self, UseCandidate=True):
- """ internal helper that moves the Records to the right
- position, must be called before _records is accessed """
+ """Internal helper that moves the Records to the right position.
+
+ Must be called before _records is accessed.
+ """
if UseCandidate:
ver = self._depcache.GetCandidateVer(self._pkg)
else:
ver = self._pkg.CurrentVer
# check if we found a version
- if ver == None:
- #print "No version for: %s (Candidate: %s)" % (self._pkg.Name, UseCandidate)
+ if ver is None:
+ #print "No version for: %s (Candidate: %s)" % (self._pkg.Name,
+ # UseCandidate)
return False
-
- if ver.FileList == None:
+
+ if ver.FileList is None:
print "No FileList for: %s " % self._pkg.Name()
return False
f, index = ver.FileList.pop(0)
- self._records.Lookup((f,index))
+ self._records.Lookup((f, index))
return True
-
- # basic information (implemented as properties)
-
- # FIXME once python2.3 is dropped we can use @property instead
- # of name = property(name)
-
+ @property
def name(self):
- """ Return the name of the package """
+ """Return the name of the package."""
return self._pkg.Name
- name = property(name)
+ @property
def id(self):
- """ Return a uniq ID for the pkg, can be used to store
- additional information about the pkg """
+ """Return a uniq ID for the package.
+
+ This can be used eg. to store additional information about the pkg."""
+ return self._pkg.ID
+
+ def __hash__(self):
+ """Return the hash of the object.
+
+ This returns the same value as ID, which is unique."""
return self._pkg.ID
- id = property(id)
+ @property
def installedVersion(self):
- """ Return the installed version as string """
+ """Return the installed version as string."""
ver = self._pkg.CurrentVer
- if ver != None:
+ if ver is not None:
return ver.VerStr
else:
return None
- installedVersion = property(installedVersion)
+ @property
def candidateVersion(self):
- """ Return the candidate version as string """
+ """Return the candidate version as string."""
ver = self._depcache.GetCandidateVer(self._pkg)
- if ver != None:
+ if ver is not None:
return ver.VerStr
else:
return None
- candidateVersion = property(candidateVersion)
def _getDependencies(self, ver):
+ """Get the dependencies for a given version of a package."""
depends_list = []
depends = ver.DependsList
for t in ["PreDepends", "Depends"]:
- if not depends.has_key(t):
- continue
- for depVerList in depends[t]:
- base_deps = []
- for depOr in depVerList:
- base_deps.append(BaseDependency(depOr.TargetPkg.Name, depOr.CompType, depOr.TargetVer, (t == "PreDepends")))
- depends_list.append(Dependency(base_deps))
+ try:
+ for depVerList in depends[t]:
+ base_deps = []
+ for depOr in depVerList:
+ base_deps.append(BaseDependency(depOr.TargetPkg.Name,
+ depOr.CompType, depOr.TargetVer,
+ (t == "PreDepends")))
+ depends_list.append(Dependency(base_deps))
+ except KeyError:
+ pass
return depends_list
-
+
+ @property
def candidateDependencies(self):
- """ return a list of candidate dependencies """
+ """Return a list of candidate dependencies."""
candver = self._depcache.GetCandidateVer(self._pkg)
- if candver == None:
+ if candver is None:
return []
return self._getDependencies(candver)
- candidateDependencies = property(candidateDependencies)
-
+
+ @property
def installedDependencies(self):
- """ return a list of installed dependencies """
+ """Return a list of installed dependencies."""
ver = self._pkg.CurrentVer
- if ver == None:
+ if ver is None:
return []
return self._getDependencies(ver)
- installedDependencies = property(installedDependencies)
+ @property
def architecture(self):
+ """Return the Architecture of the package"""
if not self._lookupRecord():
return None
sec = apt_pkg.ParseSection(self._records.Record)
- if sec.has_key("Architecture"):
+ try:
return sec["Architecture"]
- return None
- architecture = property(architecture)
+ except KeyError:
+ return None
def _downloadable(self, useCandidate=True):
- """ helper, return if the version is downloadable """
+ """Return True if the package is downloadable."""
if useCandidate:
ver = self._depcache.GetCandidateVer(self._pkg)
else:
ver = self._pkg.CurrentVer
- if ver == None:
+ if ver is None:
return False
return ver.Downloadable
+
+ @property
def candidateDownloadable(self):
- " returns if the canidate is downloadable "
- return self._downloadable(useCandidate=True)
- candidateDownloadable = property(candidateDownloadable)
+ """Return True if the candidate is downloadable."""
+ return self._downloadable(True)
+ @property
def installedDownloadable(self):
- " returns if the installed version is downloadable "
- return self._downloadable(useCandidate=False)
- installedDownloadable = property(installedDownloadable)
+ """Return True if the installed version is downloadable."""
+ return self._downloadable(False)
+ @property
def sourcePackageName(self):
- """ Return the source package name as string """
+ """Return the source package name as string."""
if not self._lookupRecord():
- if not self._lookupRecord(UseCandidate=False):
+ if not self._lookupRecord(False):
return self._pkg.Name
src = self._records.SourcePkg
if src != "":
return src
else:
return self._pkg.Name
- sourcePackageName = property(sourcePackageName)
+ @property
def homepage(self):
- """ Return the homepage field as string """
+ """Return the homepage field as string."""
if not self._lookupRecord():
return None
return self._records.Homepage
- homepage = property(homepage)
+ @property
def section(self):
- """ Return the section of the package"""
+ """Return the section of the package."""
return self._pkg.Section
- section = property(section)
+ @property
def priority(self):
- """ Return the priority (of the candidate version)"""
+ """Return the priority (of the candidate version)."""
ver = self._depcache.GetCandidateVer(self._pkg)
if ver:
return ver.PriorityStr
else:
return None
- priority = property(priority)
+ @property
def installedPriority(self):
- """ Return the priority (of the installed version)"""
+ """Return the priority (of the installed version)."""
ver = self._depcache.GetCandidateVer(self._pkg)
if ver:
return ver.PriorityStr
else:
return None
- installedPriority = property(installedPriority)
+ @property
def summary(self):
- """ Return the short description (one line summary) """
+ """Return the short description (one line summary)."""
if not self._lookupRecord():
return ""
ver = self._depcache.GetCandidateVer(self._pkg)
desc_iter = ver.TranslatedDescription
self._records.Lookup(desc_iter.FileList.pop(0))
return self._records.ShortDesc
- summary = property(summary)
+ @property
def description(self, format=True, useDots=False):
- """
+ """Return the formatted long description.
+
Return the formated long description according to the Debian policy
(Chapter 5.6.13).
See http://www.debian.org/doc/debian-policy/ch-controlfields.html
@@ -261,14 +352,15 @@ class Package(object):
self._records.Lookup(desc_iter.FileList.pop(0))
desc = ""
try:
- s = unicode(self._records.LongDesc,"utf-8")
- except UnicodeDecodeError,e:
- s = _("Invalid unicode in description for '%s' (%s). "
- "Please report.") % (self.name,e)
- lines = string.split(s, "\n")
+ dsc = unicode(self._records.LongDesc, "utf-8")
+ except UnicodeDecodeError, err:
+ dsc = _("Invalid unicode in description for '%s' (%s). "
+ "Please report.") % (self.name, err)
+ lines = dsc.split("\n")
for i in range(len(lines)):
# Skip the first line, since its a duplication of the summary
- if i == 0: continue
+ if i == 0:
+ continue
raw_line = lines[i]
if raw_line.strip() == ".":
# The line is just line break
@@ -296,138 +388,145 @@ class Package(object):
# Add current line to the description
desc += line
return desc
- description = property(description)
+ @property
def rawDescription(self):
- """ return the long description (raw)"""
+ """return the long description (raw)."""
if not self._lookupRecord():
return ""
return self._records.LongDesc
- rawDescription = property(rawDescription)
-
+
+ @property
def candidateRecord(self):
- " return the full pkgrecord as string of the candidate version "
+ """Return the Record of the candidate version of the package."""
if not self._lookupRecord(True):
return None
return Record(self._records.Record)
- candidateRecord = property(candidateRecord)
+ @property
def installedRecord(self):
- " return the full pkgrecord as string of the installed version "
+ """Return the Record of the candidate version of the package."""
if not self._lookupRecord(False):
return None
return Record(self._records.Record)
- installedRecord = property(installedRecord)
# depcache states
+
+ @property
def markedInstall(self):
- """ Package is marked for install """
+ """Return True if the package is marked for install."""
return self._depcache.MarkedInstall(self._pkg)
- markedInstall = property(markedInstall)
+ @property
def markedUpgrade(self):
- """ Package is marked for upgrade """
+ """Return True if the package is marked for upgrade."""
return self._depcache.MarkedUpgrade(self._pkg)
- markedUpgrade = property(markedUpgrade)
+ @property
def markedDelete(self):
- """ Package is marked for delete """
+ """Return True if the package is marked for delete."""
return self._depcache.MarkedDelete(self._pkg)
- markedDelete = property(markedDelete)
+ @property
def markedKeep(self):
- """ Package is marked for keep """
+ """Return True if the package is marked for keep."""
return self._depcache.MarkedKeep(self._pkg)
- markedKeep = property(markedKeep)
+ @property
def markedDowngrade(self):
""" Package is marked for downgrade """
return self._depcache.MarkedDowngrade(self._pkg)
- markedDowngrade = property(markedDowngrade)
+ @property
def markedReinstall(self):
- """ Package is marked for reinstall """
+ """Return True if the package is marked for reinstall."""
return self._depcache.MarkedReinstall(self._pkg)
- markedReinstall = property(markedReinstall)
+ @property
def isInstalled(self):
- """ Package is installed """
- return (self._pkg.CurrentVer != None)
- isInstalled = property(isInstalled)
+ """Return True if the package is installed."""
+ return (self._pkg.CurrentVer is not None)
+ @property
def isUpgradable(self):
- """ Package is upgradable """
+ """Return True if the package is upgradable."""
return self.isInstalled and self._depcache.IsUpgradable(self._pkg)
- isUpgradable = property(isUpgradable)
+ @property
def isAutoRemovable(self):
- """
- Package is installed as a automatic dependency and is
- no longer required
+ """Return True if the package is no longer required.
+
+ If the package has been installed automatically as a dependency of
+ another package, and if no packages depend on it anymore, the package
+ is no longer required.
"""
return self.isInstalled and self._depcache.IsGarbage(self._pkg)
- isAutoRemovable = property(isAutoRemovable)
- # size
+ # sizes
+
+ @property
def packageSize(self):
- """ The size of the candidate deb package """
+ """Return the size of the candidate deb package."""
ver = self._depcache.GetCandidateVer(self._pkg)
return ver.Size
- packageSize = property(packageSize)
+ @property
def installedPackageSize(self):
- """ The size of the installed deb package """
+ """Return the size of the installed deb package."""
ver = self._pkg.CurrentVer
return ver.Size
- installedPackageSize = property(installedPackageSize)
+ @property
def candidateInstalledSize(self, UseCandidate=True):
- """ The size of the candidate installed package """
+ """Return the size of the candidate installed package."""
ver = self._depcache.GetCandidateVer(self._pkg)
- candidateInstalledSize = property(candidateInstalledSize)
+ @property
def installedSize(self):
- """ The size of the currently installed package """
+ """Return the size of the currently installed package."""
ver = self._pkg.CurrentVer
if ver is None:
return 0
return ver.InstalledSize
- installedSize = property(installedSize)
+ @property
def installedFiles(self):
- """
- Return the list of unicode names of the files which have
+ """Return a list of files installed by the package.
+
+ Return a list of unicode names of the files which have
been installed by this package
"""
path = "/var/lib/dpkg/info/%s.list" % self.name
try:
- list = open(path)
- files = list.read().decode().split("\n")
- list.close()
- except:
+ file_list = open(path)
+ try:
+ return file_list.read().decode().split("\n")
+ finally:
+ file_list.close()
+ except EnvironmentError:
return []
- return files
- installedFiles = property(installedFiles)
def getChangelog(self, uri=None, cancel_lock=None):
"""
- Download the changelog of the package and return it as unicode
- string
-
- uri: Is the uri to the changelog file. The following named variables
- will be substituted: src_section, prefix, src_pkg and src_ver
- For example the Ubuntu changelog:
- uri = "http://changelogs.ubuntu.com/changelogs/pool" \\
- "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\
- "/%(src_pkg)s_%(src_ver)s/changelog"
- cancel_lock: If this threading.Lock() is set, the download will be
- canceled
+ Download the changelog of the package and return it as unicode
+ string.
+
+ The parameter `uri` refers to the uri of the changelog file. It may
+ contain multiple named variables which will be substitued. These
+ variables are (src_section, prefix, src_pkg, src_ver). An example is
+ the Ubuntu changelog:
+ "http://changelogs.ubuntu.com/changelogs/pool" \\
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+
+ The parameter `cancel_lock` refers to an instance of threading.Lock,
+ which if set, prevents the download.
"""
# Return a cached changelog if available
if self._changelog != "":
return self._changelog
- if uri == None:
+ if uri is None:
if self.candidateOrigin[0].origin == "Debian":
uri = "http://packages.debian.org/changelogs/pool" \
"/%(src_section)s/%(prefix)s/%(src_pkg)s" \
@@ -442,7 +541,7 @@ class Package(object):
# get the src package name
src_pkg = self.sourcePackageName
- # assume "main" section
+ # assume "main" section
src_section = "main"
# use the section of the candidate as a starting point
section = self._depcache.GetCandidateVer(self._pkg).Section
@@ -452,9 +551,10 @@ class Package(object):
src_ver = self.candidateVersion
#print "bin: %s" % binver
try:
+ # FIXME: This try-statement is too long ...
# try to get the source version of the pkg, this differs
# for some (e.g. libnspr4 on ubuntu)
- # this feature only works if the correct deb-src are in the
+ # this feature only works if the correct deb-src are in the
# sources.list
# otherwise we fall back to the binary version number
src_records = apt_pkg.GetPkgSrcRecords()
@@ -471,7 +571,7 @@ class Package(object):
else:
# fail into the error handler
raise SystemError
- except SystemError, e:
+ except SystemError:
src_ver = bin_ver
l = section.split("/")
@@ -484,26 +584,27 @@ class Package(object):
prefix = "lib" + src_pkg[3]
# stip epoch
- l = string.split(src_ver,":")
+ l = src_ver.split(":")
if len(l) > 1:
src_ver = "".join(l[1:])
- uri = uri % {"src_section" : src_section,
- "prefix" : prefix,
- "src_pkg" : src_pkg,
- "src_ver" : src_ver}
+ uri = uri % {"src_section": src_section,
+ "prefix": prefix,
+ "src_pkg": src_pkg,
+ "src_ver": src_ver}
try:
# Check if the download was canceled
- if cancel_lock and cancel_lock.isSet(): return ""
+ if cancel_lock and cancel_lock.isSet():
+ return ""
changelog_file = urllib2.urlopen(uri)
# do only get the lines that are new
changelog = ""
regexp = "^%s \((.*)\)(.*)$" % (re.escape(src_pkg))
- i=0
while True:
# Check if the download was canceled
- if cancel_lock and cancel_lock.isSet(): return ""
+ if cancel_lock and cancel_lock.isSet():
+ return ""
# Read changelog line by line
line_raw = changelog_file.readline()
if line_raw == "":
@@ -519,7 +620,7 @@ class Package(object):
# and from changelog too
installed = self.installedVersion
if installed and ":" in installed:
- installed = installed.split(":",1)[1]
+ installed = installed.split(":", 1)[1]
changelog_ver = match.group(1)
if changelog_ver and ":" in changelog_ver:
changelog_ver = changelog_ver.split(":", 1)[1]
@@ -533,58 +634,47 @@ class Package(object):
if len(changelog) == 0:
changelog = _("The list of changes is not available")
self._changelog = changelog
- except urllib2.HTTPError,e:
+
+ # FIXME: Ubuntu-specific part.
+ except urllib2.HTTPError:
return _("The list of changes is not available yet.\n\n"
"Please use http://launchpad.net/ubuntu/+source/%s/%s/"
"+changelog\n"
"until the changes become available or try again "
- "later.") % (srcpkg, srcver),
- except IOError, httplib.BadStatusLine:
- return _("Failed to download the list of changes. \nPlease "
- "check your Internet connection.")
+ "later.") % (src_pkg, src_ver)
+ except (IOError, httplib.BadStatusLine):
+ return _("Failed to download the list of changes. \nPlease "
+ "check your Internet connection.")
return self._changelog
- # canidate origin
- class Origin:
- def __init__(self, pkg, VerFileIter):
- self.component = VerFileIter.Component
- self.archive = VerFileIter.Archive
- self.origin = VerFileIter.Origin
- self.label = VerFileIter.Label
- self.site = VerFileIter.Site
- # check the trust
- indexfile = pkg._list.FindIndex(VerFileIter)
- if indexfile and indexfile.IsTrusted:
- self.trusted = True
- else:
- self.trusted = False
- def __repr__(self):
- return "component: '%s' archive: '%s' origin: '%s' label: '%s' " \
- "site '%s' isTrusted: '%s'"% (self.component, self.archive,
- self.origin, self.label,
- self.site, self.trusted)
-
+ @property
def candidateOrigin(self):
+ """Return the Origin() of the candidate version."""
ver = self._depcache.GetCandidateVer(self._pkg)
if not ver:
return None
origins = []
- for (verFileIter,index) in ver.FileList:
- origins.append(self.Origin(self, verFileIter))
+ for (verFileIter, index) in ver.FileList:
+ origins.append(Origin(self, verFileIter))
return origins
- candidateOrigin = property(candidateOrigin)
# depcache actions
+
def markKeep(self):
- """ mark a package for keep """
+ """Mark a package for keep."""
self._pcache.cachePreChange()
self._depcache.MarkKeep(self._pkg)
self._pcache.cachePostChange()
+
def markDelete(self, autoFix=True, purge=False):
- """ mark a package for delete. Run the resolver if autoFix is set.
- Mark the package as purge (remove with configuration) if 'purge'
- is set.
- """
+ """Mark a package for install.
+
+ If autoFix is True, the resolver will be run, trying to fix broken
+ packages. This is the default.
+
+ If purge is True, remove the configuration files of the package as
+ well. The default is to keep the configuration.
+ """
self._pcache.cachePreChange()
self._depcache.MarkDelete(self._pkg, purge)
# try to fix broken stuffsta
@@ -596,10 +686,20 @@ class Package(object):
Fix.InstallProtect()
Fix.Resolve()
self._pcache.cachePostChange()
+
def markInstall(self, autoFix=True, autoInst=True, fromUser=True):
- """ mark a package for install. Run the resolver if autoFix is set,
- automatically install required dependencies if autoInst is set
- record it as automatically installed when fromuser is set to false
+ """Mark a package for install.
+
+ If autoFix is True, the resolver will be run, trying to fix broken
+ packages. This is the default.
+
+ If autoInst is True, the dependencies of the packages will be installed
+ automatically. This is the default.
+
+ If fromUser is True, this package will not be marked as automatically
+ installed. This is the default. Set it to False if you want to be able
+ to remove the package at a later stage if no other package depends on
+ it.
"""
self._pcache.cachePreChange()
self._depcache.MarkInstall(self._pkg, autoInst, fromUser)
@@ -610,24 +710,33 @@ class Package(object):
fixer.Protect(self._pkg)
fixer.Resolve(True)
self._pcache.cachePostChange()
+
def markUpgrade(self):
- """ mark a package for upgrade """
+ """Mark a package for upgrade."""
if self.isUpgradable:
self.markInstall()
else:
# FIXME: we may want to throw a exception here
- sys.stderr.write("MarkUpgrade() called on a non-upgrable pkg: '%s'\n" %self._pkg.Name)
+ sys.stderr.write(("MarkUpgrade() called on a non-upgrable pkg: "
+ "'%s'\n") % self._pkg.Name)
def commit(self, fprogress, iprogress):
- """ commit the changes, need a FetchProgress and InstallProgress
- object as argument
+ """Commit the changes.
+
+ The parameter `fprogress` refers to a FetchProgress() object, as
+ found in apt.progress.
+
+ The parameter `iprogress` refers to an InstallProgress() object, as
+ found in apt.progress.
"""
self._depcache.Commit(fprogress, iprogress)
-
-# self-test
-if __name__ == "__main__":
+
+def _test():
+ """Self-test."""
print "Self-test for the Package modul"
+ import random
+ import apt
apt_pkg.init()
cache = apt_pkg.GetCache()
depcache = apt_pkg.GetDepCache(cache)
@@ -653,35 +762,38 @@ if __name__ == "__main__":
print "PackageSize: %s " % pkg.packageSize
print "Dependencies: %s" % pkg.installedDependencies
for dep in pkg.candidateDependencies:
- print ",".join(["%s (%s) (%s) (%s)" % (o.name,o.version,o.relation, o.preDepend) for o in dep.or_dependencies])
+ print ",".join("%s (%s) (%s) (%s)" % (o.name, o.version, o.relation,
+ o.preDepend) for o in dep.or_dependencies)
print "arch: %s" % pkg.architecture
print "homepage: %s" % pkg.homepage
- print "rec: ",pkg.candidateRecord
+ print "rec: ", pkg.candidateRecord
# now test install/remove
- import apt
progress = apt.progress.OpTextProgress()
cache = apt.Cache(progress)
- for i in [True, False]:
+ for i in True, False:
print "Running install on random upgradable pkgs with AutoFix: %s " % i
- for name in cache.keys():
- pkg = cache[name]
+ for pkg in cache:
if pkg.isUpgradable:
- if random.randint(0,1) == 1:
+ if random.randint(0, 1) == 1:
pkg.markInstall(i)
print "Broken: %s " % cache._depcache.BrokenCount
print "InstCount: %s " % cache._depcache.InstCount
print
# get a new cache
- for i in [True, False]:
+ for i in True, False:
print "Randomly remove some packages with AutoFix: %s" % i
cache = apt.Cache(progress)
for name in cache.keys():
- if random.randint(0,1) == 1:
+ if random.randint(0, 1) == 1:
try:
cache[name].markDelete(i)
except SystemError:
print "Error trying to remove: %s " % name
print "Broken: %s " % cache._depcache.BrokenCount
print "DelCount: %s " % cache._depcache.DelCount
+
+# self-test
+if __name__ == "__main__":
+ _test()
diff --git a/apt/progress.py b/apt/progress.py
index a8ab76b6..51eb2426 100644
--- a/apt/progress.py
+++ b/apt/progress.py
@@ -1,61 +1,82 @@
# Progress.py - progress reporting classes
-#
+#
# Copyright (c) 2005 Canonical
-#
+#
# Author: Michael Vogt <michael.vogt@ubuntu.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
-#
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
+#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
+"""progress reporting classes.
-import sys
+This module provides classes for progress reporting. They can be used with
+e.g., for reporting progress on the cache opening process, the cache update
+progress, or the package install progress.
+"""
+
+import errno
+import fcntl
import os
import re
-import fcntl
-import string
-from errno import *
import select
+import sys
+
import apt_pkg
-import apt
+
+__all__ = ('CdromProgress', 'DpkgInstallProgress', 'DumbInstallProgress',
+ 'FetchProgress', 'InstallProgress', 'OpProgress', 'OpTextProgress',
+ 'TextFetchProgress')
+
class OpProgress(object):
- """ Abstract class to implement reporting on cache opening
- Subclass this class to implement simple Operation progress reporting
+ """Abstract class to implement reporting on cache opening.
+
+ Subclass this class to implement simple Operation progress reporting.
"""
+
def __init__(self):
- pass
+ self.op = None
+ self.subOp = None
+
def update(self, percent):
- pass
+ """Called periodically to update the user interface."""
+
def done(self):
- pass
+ """Called once an operation has been completed."""
+
class OpTextProgress(OpProgress):
- """ A simple text based cache open reporting class """
+ """A simple text based cache open reporting class."""
+
def __init__(self):
OpProgress.__init__(self)
+
def update(self, percent):
- sys.stdout.write("\r%s: %.2i " % (self.subOp,percent))
+ """Called periodically to update the user interface."""
+ sys.stdout.write("\r%s: %.2i " % (self.subOp, percent))
sys.stdout.flush()
+
def done(self):
+ """Called once an operation has been completed."""
sys.stdout.write("\r%s: Done\n" % self.op)
-
class FetchProgress(object):
- """ Report the download/fetching progress
- Subclass this class to implement fetch progress reporting
+ """Report the download/fetching progress.
+
+ Subclass this class to implement fetch progress reporting
"""
# download status constants
@@ -64,46 +85,71 @@ class FetchProgress(object):
dlFailed = 2
dlHit = 3
dlIgnored = 4
- dlStatusStr = {dlDone : "Done",
- dlQueued : "Queued",
- dlFailed : "Failed",
- dlHit : "Hit",
- dlIgnored : "Ignored"}
-
+ dlStatusStr = {dlDone: "Done",
+ dlQueued: "Queued",
+ dlFailed: "Failed",
+ dlHit: "Hit",
+ dlIgnored: "Ignored"}
+
def __init__(self):
self.eta = 0.0
self.percent = 0.0
- pass
-
+ # Make checking easier
+ self.currentBytes = 0
+ self.currentItems = 0
+ self.totalBytes = 0
+ self.totalItems = 0
+ self.currentCPS = 0
+
def start(self):
- pass
-
+ """Called when the fetching starts."""
+
def stop(self):
- pass
-
+ """Called when all files have been fetched."""
+
def updateStatus(self, uri, descr, shortDescr, status):
- pass
+ """Called when the status of an item changes.
+
+ This happens eg. when the downloads fails or is completed.
+ """
def pulse(self):
- """ called periodically (to update the gui), importend to
- return True to continue or False to cancel
+ """Called periodically to update the user interface.
+
+ Return True to continue or False to cancel.
"""
- self.percent = ((self.currentBytes + self.currentItems)*100.0)/float(self.totalBytes+self.totalItems)
+ self.percent = (((self.currentBytes + self.currentItems) * 100.0) /
+ float(self.totalBytes + self.totalItems))
if self.currentCPS > 0:
- self.eta = (self.totalBytes-self.currentBytes)/float(self.currentCPS)
+ self.eta = ((self.totalBytes - self.currentBytes) /
+ float(self.currentCPS))
return True
+
def mediaChange(self, medium, drive):
- pass
+ """react to media change events."""
+
class TextFetchProgress(FetchProgress):
""" Ready to use progress object for terminal windows """
+
def __init__(self):
+ FetchProgress.__init__(self)
self.items = {}
+
def updateStatus(self, uri, descr, shortDescr, status):
+ """Called when the status of an item changes.
+
+ This happens eg. when the downloads fails or is completed.
+ """
if status != self.dlQueued:
print "\r%s %s" % (self.dlStatusStr[status], descr)
self.items[uri] = status
+
def pulse(self):
+ """Called periodically to update the user interface.
+
+ Return True to continue or False to cancel.
+ """
FetchProgress.pulse(self)
if self.currentCPS > 0:
s = "[%2.f%%] %sB/s %s" % (self.percent,
@@ -114,102 +160,121 @@ class TextFetchProgress(FetchProgress):
print "\r%s" % (s),
sys.stdout.flush()
return True
+
def stop(self):
- print "\rDone downloading "
+ """Called when all files have been fetched."""
+ print "\rDone downloading "
+
def mediaChange(self, medium, drive):
- """ react to media change events """
- res = True;
- print "Media change: please insert the disc labeled \
- '%s' in the drive '%s' and press enter" % (medium,drive)
- s = sys.stdin.readline()
- if(s == 'c' or s == 'C'):
- res = false;
- return res
+ """react to media change events."""
+ print ("Media change: please insert the disc labeled "
+ "'%s' in the drive '%s' and press enter") % (medium, drive)
+
+ return raw_input() not in ('c', 'C')
+
class DumbInstallProgress(object):
- """ Report the install progress
- Subclass this class to implement install progress reporting
+ """Report the install progress.
+
+ Subclass this class to implement install progress reporting.
"""
- def __init__(self):
- pass
+
def startUpdate(self):
- pass
+ """Start update."""
+
def run(self, pm):
+ """Start installation."""
return pm.DoInstall()
+
def finishUpdate(self):
- pass
+ """Called when update has finished."""
+
def updateInterface(self):
- pass
+ """Called periodically to update the user interface"""
+
class InstallProgress(DumbInstallProgress):
- """ A InstallProgress that is pretty useful.
- It supports the attributes 'percent' 'status' and callbacks
- for the dpkg errors and conffiles and status changes
- """
+ """An InstallProgress that is pretty useful.
+
+ It supports the attributes 'percent' 'status' and callbacks for the dpkg
+ errors and conffiles and status changes.
+ """
+
def __init__(self):
DumbInstallProgress.__init__(self)
self.selectTimeout = 0.1
(read, write) = os.pipe()
- self.writefd=write
+ self.writefd = write
self.statusfd = os.fdopen(read, "r")
- fcntl.fcntl(self.statusfd.fileno(), fcntl.F_SETFL,os.O_NONBLOCK)
+ fcntl.fcntl(self.statusfd.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
self.read = ""
self.percent = 0.0
self.status = ""
+
def error(self, pkg, errormsg):
- " called when a error is detected during the install "
- pass
- def conffile(self,current,new):
- " called when a conffile question from dpkg is detected "
- pass
+ """Called when a error is detected during the install."""
+
+ def conffile(self, current, new):
+ """Called when a conffile question from dpkg is detected."""
+
def statusChange(self, pkg, percent, status):
- " called when the status changed "
- pass
+ """Called when the status changed."""
+
def updateInterface(self):
- if self.statusfd != None:
- try:
- while not self.read.endswith("\n"):
- self.read += os.read(self.statusfd.fileno(),1)
- except OSError, (errno,errstr):
- # resource temporarly unavailable is ignored
- if errno != EAGAIN and errnor != EWOULDBLOCK:
- print errstr
- if self.read.endswith("\n"):
- s = self.read
- #print s
- try:
- (status, pkg, percent, status_str) = string.split(s, ":",3)
- except ValueError, e:
- # silently ignore lines that can't be parsed
- self.read = ""
- return
- #print "percent: %s %s" % (pkg, float(percent)/100.0)
- if status == "pmerror":
- self.error(pkg,status_str)
- elif status == "pmconffile":
- # we get a string like this:
- # 'current-conffile' 'new-conffile' useredited distedited
- match = re.compile("\s*\'(.*)\'\s*\'(.*)\'.*").match(status_str)
- if match:
- self.conffile(match.group(1), match.group(2))
- elif status == "pmstatus":
- if float(percent) != self.percent or \
- status_str != self.status:
- self.statusChange(pkg, float(percent), status_str.strip())
- self.percent = float(percent)
- self.status = string.strip(status_str)
- self.read = ""
+ """Called periodically to update the interface."""
+ if self.statusfd is None:
+ return
+ try:
+ while not self.read.endswith("\n"):
+ self.read += os.read(self.statusfd.fileno(), 1)
+ except OSError, (errno_, errstr):
+ # resource temporarly unavailable is ignored
+ if errno_ != errno.EAGAIN and errno_ != errno.EWOULDBLOCK:
+ print errstr
+ if not self.read.endswith("\n"):
+ return
+
+ s = self.read
+ #print s
+ try:
+ (status, pkg, percent, status_str) = s.split(":", 3)
+ except ValueError:
+ # silently ignore lines that can't be parsed
+ self.read = ""
+ return
+ #print "percent: %s %s" % (pkg, float(percent)/100.0)
+ if status == "pmerror":
+ self.error(pkg, status_str)
+ elif status == "pmconffile":
+ # we get a string like this:
+ # 'current-conffile' 'new-conffile' useredited distedited
+ match = re.match("\s*\'(.*)\'\s*\'(.*)\'.*", status_str)
+ if match:
+ self.conffile(match.group(1), match.group(2))
+ elif status == "pmstatus":
+ if float(percent) != self.percent or status_str != self.status:
+ self.statusChange(pkg, float(percent),
+ status_str.strip())
+ self.percent = float(percent)
+ self.status = status_str.strip()
+ self.read = ""
+
def fork(self):
+ """Fork."""
return os.fork()
+
def waitChild(self):
+ """Wait for child progress to exit."""
while True:
- select.select([self.statusfd],[],[], self.selectTimeout)
+ select.select([self.statusfd], [], [], self.selectTimeout)
self.updateInterface()
- (pid, res) = os.waitpid(self.child_pid,os.WNOHANG)
+ (pid, res) = os.waitpid(self.child_pid, os.WNOHANG)
if pid == self.child_pid:
break
return res
+
def run(self, pm):
+ """Start installing."""
pid = self.fork()
if pid == 0:
# child
@@ -219,29 +284,31 @@ class InstallProgress(DumbInstallProgress):
res = self.waitChild()
return os.WEXITSTATUS(res)
-class CdromProgress:
- """ Report the cdrom add progress
- Subclass this class to implement cdrom add progress reporting
+
+class CdromProgress(object):
+ """Report the cdrom add progress.
+
+ Subclass this class to implement cdrom add progress reporting.
"""
+
def __init__(self):
pass
+
def update(self, text, step):
- """ update is called regularly so that the gui can be redrawn """
- pass
+ """Called periodically to update the user interface."""
+
def askCdromName(self):
- pass
+ """Called to ask for the name of the cdrom."""
+
def changeCdrom(self):
- pass
+ """Called to ask for the cdrom to be changed."""
class DpkgInstallProgress(InstallProgress):
- """
- Progress handler for a local Debian package installation
- """
+ """Progress handler for a local Debian package installation."""
+
def run(self, debfile):
- """
- Start installing the given Debian package
- """
+ """Start installing the given Debian package."""
self.debfile = debfile
self.debname = os.path.basename(debfile).split("_")[0]
pid = self.fork()
@@ -255,46 +322,35 @@ class DpkgInstallProgress(InstallProgress):
return res
def updateInterface(self):
- """
- Process status messages from dpkg
- """
- if self.statusfd != None:
- while True:
- try:
- self.read += os.read(self.statusfd.fileno(),1)
- except OSError, (errno,errstr):
- # resource temporarly unavailable is ignored
- if errno != 11:
- print errstr
- break
- if self.read.endswith("\n"):
- statusl = string.split(self.read, ":")
- if len(statusl) < 3:
- print "got garbage from dpkg: '%s'" % read
- self.read = ""
- break
- status = statusl[2].strip()
- #print status
- if status == "error":
- self.error(self.debname, status)
- elif status == "conffile-prompt":
- # we get a string like this:
- # 'current-conffile' 'new-conffile' useredited distedited
- match = re.compile("\s*\'(.*)\'\s*\'(.*)\'.*").match(status_str)
- if match:
- self.conffile(match.group(1), match.group(2))
- else:
- self.status = status
- self.read = ""
-
-# module test code
-if __name__ == "__main__":
- import apt_pkg
- apt_pkg.init()
- progress = OpTextProgress()
- cache = apt_pkg.GetCache(progress)
- depcache = apt_pkg.GetDepCache(cache)
- depcache.Init(progress)
-
- fprogress = TextFetchProgress()
- cache.Update(fprogress)
+ """Process status messages from dpkg."""
+ if self.statusfd is None:
+ return
+ while True:
+ try:
+ self.read += os.read(self.statusfd.fileno(), 1)
+ except OSError, (errno_, errstr):
+ # resource temporarly unavailable is ignored
+ if errno_ != 11:
+ print errstr
+ break
+ if not self.read.endswith("\n"):
+ continue
+
+ statusl = self.read.split(":")
+ if len(statusl) < 3:
+ print "got garbage from dpkg: '%s'" % self.read
+ self.read = ""
+ break
+ status = statusl[2].strip()
+ #print status
+ if status == "error":
+ self.error(self.debname, status)
+ elif status == "conffile-prompt":
+ # we get a string like this:
+ # 'current-conffile' 'new-conffile' useredited distedited
+ match = re.match("\s*\'(.*)\'\s*\'(.*)\'.*", statusl[3])
+ if match:
+ self.conffile(match.group(1), match.group(2))
+ else:
+ self.status = status
+ self.read = ""