summaryrefslogtreecommitdiff
path: root/apt
diff options
context:
space:
mode:
authorMichael Vogt <mvo@debian.org>2008-12-15 14:42:01 +0100
committerMichael Vogt <mvo@debian.org>2008-12-15 14:42:01 +0100
commit12cf58d12b969010f3d98b2974d72bbb950b775f (patch)
treedc7f0af010aefa0b76a256f594d6dc4ae3163aaa /apt
parentbe8fa1ee4581faeadfb6d067301ee9e7761dc1c7 (diff)
parent481a90a820564dc12567348c16773e15cc129560 (diff)
downloadpython-apt-12cf58d12b969010f3d98b2974d72bbb950b775f.tar.gz
merge mvo branch, upload to experimental
Diffstat (limited to 'apt')
-rw-r--r--apt/cache.py37
-rw-r--r--apt/debfile.py508
-rw-r--r--apt/gtk/__init__.py0
-rw-r--r--apt/gtk/widgets.py394
-rw-r--r--apt/package.py204
-rw-r--r--apt/progress.py58
6 files changed, 1178 insertions, 23 deletions
diff --git a/apt/cache.py b/apt/cache.py
index bbf2165b..79e58282 100644
--- a/apt/cache.py
+++ b/apt/cache.py
@@ -129,6 +129,17 @@ class Cache(object):
self.cachePostChange()
@property
+ def requiredDownload(self):
+ """ get the size of the packages that are required to download """
+ pm = apt_pkg.GetPackageManager(self._depcache)
+ fetcher = apt_pkg.GetAcquire()
+ pm.GetArchives(fetcher, self._list, self._records)
+ return fetcher.FetchNeeded
+ @property
+ def additionalRequiredSpace(self):
+ """ get the size of the additional required space on the fs """
+ return self._depcache.UsrSize
+ @property
def reqReinstallPkgs(self):
" return the packages not downloadable packages in reqreinst state "
reqreinst = set()
@@ -182,6 +193,28 @@ class Cache(object):
finally:
os.close(lock)
+ def getProvidingPackages(self, virtual):
+ """
+ Return a list of packages which provide the virtual package of the
+ specified name
+ """
+ providers = []
+ try:
+ vp = self._cache[virtual]
+ if len(vp.VersionList) != 0:
+ return providers
+ except KeyError:
+ return providers
+ for pkg in self:
+ v = self._depcache.GetCandidateVer(pkg._pkg)
+ if v == None:
+ continue
+ for p in v.ProvidesList:
+ if virtual == p[0]:
+ # we found a pkg that provides this virtual pkg
+ providers.append(pkg)
+ return providers
+
def update(self, fetchProgress=None):
" run the equivalent of apt-get update "
lockfile = apt_pkg.Config.FindDir("Dir::State::Lists") + "lock"
@@ -232,6 +265,10 @@ class Cache(object):
fetcher.Shutdown()
return (res == pm.ResultCompleted)
+ def clear(self):
+ """ Unmark all changes """
+ self._depcache.Init()
+
# cache changes
def cachePostChange(self):
" called internally if the cache has changed, emit a signal then "
diff --git a/apt/debfile.py b/apt/debfile.py
index ddde5bf1..b1d436cd 100644
--- a/apt/debfile.py
+++ b/apt/debfile.py
@@ -1,9 +1,38 @@
-import apt_inst
-import apt_pkg
-from apt_inst import arCheckMember
+# Copyright (c) 2005-2007 Canonical
+#
+# AUTHOR:
+# Michael Vogt <mvo@ubuntu.com>
+#
+# This file is part of GDebi
+#
+# GDebi is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; either version 2 of the License, or (at
+# your option) any later version.
+#
+# GDebi is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GDebi; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+import warnings
+warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
+import apt_inst, apt_pkg
+import sys
+import os
from gettext import gettext as _
+# Constants for comparing the local package file with the version in the cache
+(VERSION_NONE,
+ VERSION_OUTDATED,
+ VERSION_SAME,
+ VERSION_NEWER) = range(4)
+
class NoDebArchiveException(IOError):
pass
@@ -11,15 +40,22 @@ class DebPackage(object):
_supported_data_members = ("data.tar.gz", "data.tar.bz2", "data.tar.lzma")
- def __init__(self, filename=None):
- self._section = {}
+ debug = 0
+
+ def __init__(self, filename=None, cache=None):
+ self._cache = cache
+ self.file = filename
+ self._needPkgs = []
+ self._sections = {}
+ self._installedConflicts = set()
+ self._failureString = ""
if filename:
self.open(filename)
def open(self, filename):
" open given debfile "
self.filename = filename
- if not arCheckMember(open(self.filename), "debian-binary"):
+ if not apt_inst.arCheckMember(open(self.filename), "debian-binary"):
raise NoDebArchiveException, _("This is not a valid DEB archive, missing '%s' member" % "debian-binary")
control = apt_inst.debExtractControl(open(self.filename))
self._sections = apt_pkg.ParseSection(control)
@@ -36,7 +72,7 @@ class DebPackage(object):
# % (What,Name,Link,Mode,UID,GID,Size, MTime, Major, Minor)
files.append(Name)
for member in self._supported_data_members:
- if arCheckMember(open(self.filename), member):
+ if apt_inst.arCheckMember(open(self.filename), member):
try:
apt_inst.debExtract(open(self.filename), extract_cb, member)
break
@@ -45,14 +81,458 @@ class DebPackage(object):
return files
filelist = property(filelist)
+ def _isOrGroupSatisfied(self, or_group):
+ """ this function gets a 'or_group' and analyzes if
+ at least one dependency of this group is already satisfied """
+ self._dbg(2,"_checkOrGroup(): %s " % (or_group))
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # check for virtual pkgs
+ if not self._cache.has_key(depname):
+ if self._cache.isVirtualPackage(depname):
+ self._dbg(3,"_isOrGroupSatisfied(): %s is virtual dep" % depname)
+ for pkg in self._cache.getProvidingPackages(depname):
+ if pkg.isInstalled:
+ return True
+ continue
+
+ inst = self._cache[depname]
+ instver = inst.installedVersion
+ if instver != None and apt_pkg.CheckDep(instver,oper,ver) == True:
+ return True
+ return False
+
+
+ def _satisfyOrGroup(self, or_group):
+ """ try to satisfy the or_group """
+
+ or_found = False
+ virtual_pkg = None
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # if we don't have it in the cache, it may be virtual
+ if not self._cache.has_key(depname):
+ if not self._cache.isVirtualPackage(depname):
+ continue
+ providers = self._cache.getProvidingPackages(depname)
+ # if a package just has a single virtual provider, we
+ # just pick that (just like apt)
+ if len(providers) != 1:
+ continue
+ depname = providers[0].name
+
+ # now check if we can satisfy the deps with the candidate(s)
+ # in the cache
+ cand = self._cache[depname]
+ candver = self._cache._depcache.GetCandidateVer(cand._pkg)
+ if not candver:
+ continue
+ if not apt_pkg.CheckDep(candver.VerStr,oper,ver):
+ continue
+
+ # check if we need to install it
+ self._dbg(2,"Need to get: %s" % depname)
+ self._needPkgs.append(depname)
+ return True
+
+ # if we reach this point, we failed
+ or_str = ""
+ for dep in or_group:
+ or_str += dep[0]
+ if dep != or_group[len(or_group)-1]:
+ or_str += "|"
+ self._failureString += _("Dependency is not satisfiable: %s\n" % or_str)
+ return False
+
+ def _checkSinglePkgConflict(self, pkgname, ver, oper):
+ """ returns true if a pkg conflicts with a real installed/marked
+ pkg """
+ # FIXME: deal with conflicts against its own provides
+ # (e.g. Provides: ftp-server, Conflicts: ftp-server)
+ self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" % (pkgname, ver, oper))
+ pkgver = None
+ cand = self._cache[pkgname]
+ if cand.isInstalled:
+ pkgver = cand.installedVersion
+ elif cand.markedInstall:
+ pkgver = cand.candidateVersion
+ #print "pkg: %s" % pkgname
+ #print "ver: %s" % ver
+ #print "pkgver: %s " % pkgver
+ #print "oper: %s " % oper
+ if (pkgver and apt_pkg.CheckDep(pkgver,oper,ver) and
+ not self.replacesRealPkg(pkgname, oper, ver)):
+ self._failureString += _("Conflicts with the installed package '%s'" % cand.name)
+ return True
+ return False
+
+ def _checkConflictsOrGroup(self, or_group):
+ """ check the or-group for conflicts with installed pkgs """
+ self._dbg(2,"_checkConflictsOrGroup(): %s " % (or_group))
+
+ or_found = False
+ virtual_pkg = None
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # check conflicts with virtual pkgs
+ if not self._cache.has_key(depname):
+ # FIXME: we have to check for virtual replaces here as
+ # well (to pass tests/gdebi-test8.deb)
+ if self._cache.isVirtualPackage(depname):
+ for pkg in self._cache.getProvidingPackages(depname):
+ self._dbg(3, "conflicts virtual check: %s" % pkg.name)
+ # P/C/R on virtal pkg, e.g. ftpd
+ if self.pkgName == pkg.name:
+ self._dbg(3, "conflict on self, ignoring")
+ continue
+ if self._checkSinglePkgConflict(pkg.name,ver,oper):
+ self._installedConflicts.add(pkg.name)
+ continue
+ if self._checkSinglePkgConflict(depname,ver,oper):
+ self._installedConflicts.add(depname)
+ return len(self._installedConflicts) != 0
+
+ def getConflicts(self):
+ """
+ Return list of package names conflicting with this package.
+
+ WARNING: This method will is deprecated. Please use the
+ attribute DebPackage.depends instead.
+ """
+ return self.conflicts
+
+ def conflicts(self):
+ """
+ List of package names conflicting with this package
+ """
+ conflicts = []
+ key = "Conflicts"
+ if self._sections.has_key(key):
+ conflicts = apt_pkg.ParseDepends(self._sections[key])
+ return conflicts
+ conflicts = property(conflicts)
+
+ def getDepends(self):
+ """
+ Return list of package names on which this package depends on.
+
+ WARNING: This method will is deprecated. Please use the
+ attribute DebPackage.depends instead.
+ """
+ return self.depends
+
+ def depends(self):
+ """
+ List of package names on which this package depends on
+ """
+ depends = []
+ # find depends
+ for key in ["Depends","PreDepends"]:
+ if self._sections.has_key(key):
+ depends.extend(apt_pkg.ParseDepends(self._sections[key]))
+ return depends
+ depends = property(depends)
+
+ def getProvides(self):
+ """
+ Return list of virtual packages which are provided by this package.
+
+ WARNING: This method will is deprecated. Please use the
+ attribute DebPackage.provides instead.
+ """
+ return self.provides
+
+ def provides(self):
+ """
+ List of virtual packages which are provided by this package
+ """
+ provides = []
+ key = "Provides"
+ if self._sections.has_key(key):
+ provides = apt_pkg.ParseDepends(self._sections[key])
+ return provides
+ provides = property(provides)
+
+ def getReplaces(self):
+ """
+ Return list of packages which are replaced by this package.
+
+ WARNING: This method will is deprecated. Please use the
+ attribute DebPackage.replaces instead.
+ """
+ return self.replaces
+
+ def replaces(self):
+ """
+ List of packages which are replaced by this package
+ """
+ replaces = []
+ key = "Replaces"
+ if self._sections.has_key(key):
+ replaces = apt_pkg.ParseDepends(self._sections[key])
+ return replaces
+ replaces = property(replaces)
+ def replacesRealPkg(self, pkgname, oper, ver):
+ """
+ return True if the deb packages replaces a real (not virtual)
+ packages named pkgname, oper, ver
+ """
+ self._dbg(3, "replacesPkg() %s %s %s" % (pkgname,oper,ver))
+ pkgver = None
+ cand = self._cache[pkgname]
+ if cand.isInstalled:
+ pkgver = cand.installedVersion
+ elif cand.markedInstall:
+ pkgver = cand.candidateVersion
+ for or_group in self.getReplaces():
+ for (name, ver, oper) in or_group:
+ if (name == pkgname and
+ apt_pkg.CheckDep(pkgver,oper,ver)):
+ self._dbg(3, "we have a replaces in our package for the conflict against '%s'" % (pkgname))
+ return True
+ return False
+
+ def checkConflicts(self):
+ """ check if the pkg conflicts with a existing or to be installed
+ package. Return True if the pkg is ok """
+ res = True
+ for or_group in self.getConflicts():
+ if self._checkConflictsOrGroup(or_group):
+ #print "Conflicts with a exisiting pkg!"
+ #self._failureString = "Conflicts with a exisiting pkg!"
+ res = False
+ return res
+
+
+ def compareToVersionInCache(self, useInstalled=True):
+ """ checks if the pkg is already installed or availabe in the cache
+ and if so in what version, returns if the version of the deb
+ is not available,older,same,newer
+ """
+ self._dbg(3,"compareToVersionInCache")
+ pkgname = self._sections["Package"]
+ debver = self._sections["Version"]
+ self._dbg(1,"debver: %s" % debver)
+ if self._cache.has_key(pkgname):
+ if useInstalled:
+ cachever = self._cache[pkgname].installedVersion
+ else:
+ cachever = self._cache[pkgname].candidateVersion
+ if cachever != None:
+ cmp = apt_pkg.VersionCompare(cachever,debver)
+ self._dbg(1, "CompareVersion(debver,instver): %s" % cmp)
+ if cmp == 0:
+ return VERSION_SAME
+ elif cmp < 0:
+ return VERSION_NEWER
+ elif cmp > 0:
+ return VERSION_OUTDATED
+ return VERSION_NONE
+
+ def checkDeb(self):
+ self._dbg(3,"checkDepends")
+
+ # check arch
+ arch = self._sections["Architecture"]
+ if arch != "all" and arch != apt_pkg.Config.Find("APT::Architecture"):
+ self._dbg(1,"ERROR: Wrong architecture dude!")
+ self._failureString = _("Wrong architecture '%s'" % arch)
+ return False
+
+ # check version
+ res = self.compareToVersionInCache()
+ if res == VERSION_OUTDATED: # the deb is older than the installed
+ self._failureString = _("A later version is already installed")
+ return False
+
+ # FIXME: this sort of error handling sux
+ self._failureString = ""
+
+ # check conflicts
+ if not self.checkConflicts():
+ return False
+
+ # try to satisfy the dependencies
+ res = self._satisfyDepends(self.getDepends())
+ if not res:
+ return False
+
+ # check for conflicts again (this time with the packages that are
+ # makeed for install)
+ if not self.checkConflicts():
+ return False
+
+ if self._cache._depcache.BrokenCount > 0:
+ self._failureString = _("Failed to satisfy all dependencies (broken cache)")
+ # clean the cache again
+ self._cache.clear()
+ return False
+ return True
+
+ def satisfyDependsStr(self, dependsstr):
+ return self._satisfyDepends(apt_pkg.ParseDepends(dependsstr))
+
+ def _satisfyDepends(self, depends):
+ # turn off MarkAndSweep via a action group (if available)
+ try:
+ _actiongroup = apt_pkg.GetPkgActionGroup(self._cache._depcache)
+ except AttributeError, e:
+ pass
+ # check depends
+ for or_group in depends:
+ #print "or_group: %s" % or_group
+ #print "or_group satified: %s" % self._isOrGroupSatisfied(or_group)
+ if not self._isOrGroupSatisfied(or_group):
+ if not self._satisfyOrGroup(or_group):
+ return False
+ # now try it out in the cache
+ for pkg in self._needPkgs:
+ try:
+ self._cache[pkg].markInstall(fromUser=False)
+ except SystemError, e:
+ self._failureString = _("Cannot install '%s'" % pkg)
+ self._cache.clear()
+ return False
+ return True
+
+ def missingDeps(self):
+ self._dbg(1, "Installing: %s" % self._needPkgs)
+ if self._needPkgs == None:
+ self.checkDeb()
+ return self._needPkgs
+ missingDeps = property(missingDeps)
+
+ def requiredChanges(self):
+ """ gets the required changes to satisfy the depends.
+ returns a tuple with (install, remove, unauthenticated)
+ """
+ install = []
+ remove = []
+ unauthenticated = []
+ for pkg in self._cache:
+ if pkg.markedInstall or pkg.markedUpgrade:
+ install.append(pkg.name)
+ # check authentication, one authenticated origin is enough
+ # libapt will skip non-authenticated origins then
+ authenticated = False
+ for origin in pkg.candidateOrigin:
+ authenticated |= origin.trusted
+ if not authenticated:
+ unauthenticated.append(pkg.name)
+ if pkg.markedDelete:
+ remove.append(pkg.name)
+ return (install,remove, unauthenticated)
+ requiredChanges = property(requiredChanges)
+
+ def _dbg(self, level, msg):
+ """Write debugging output to sys.stderr.
+ """
+ if level <= self.debug:
+ print >> sys.stderr, msg
+
+ def install(self, installProgress=None):
+ """ Install the package """
+ if installProgress == None:
+ res = os.system("/usr/sbin/dpkg -i %s" % self.filename)
+ else:
+ installProgress.startUpdate()
+ res = installProgress.run(self.filename)
+ installProgress.finishUpdate()
+ return res
+
+class DscSrcPackage(DebPackage):
+ def __init__(self, filename=None, cache=None):
+ DebPackage.__init__(self, filename, cache)
+ self.depends = []
+ self.conflicts = []
+ self.binaries = []
+ if filename != None:
+ self.open(filename)
+ def getConflicts(self):
+ return self.conflicts
+ def getDepends(self):
+ return self.depends
+ def open(self, file):
+ depends_tags = ["Build-Depends:", "Build-Depends-Indep:"]
+ conflicts_tags = ["Build-Conflicts:", "Build-Conflicts-Indep:"]
+ for line in open(file):
+ # check b-d and b-c
+ for tag in depends_tags:
+ if line.startswith(tag):
+ key = line[len(tag):].strip()
+ self.depends.extend(apt_pkg.ParseSrcDepends(key))
+ for tag in conflicts_tags:
+ if line.startswith(tag):
+ key = line[len(tag):].strip()
+ self.conflicts.extend(apt_pkg.ParseSrcDepends(key))
+ # check binary and source and version
+ if line.startswith("Source:"):
+ self.pkgName = line[len("Source:"):].strip()
+ if line.startswith("Binary:"):
+ self.binaries = [pkg.strip() for pkg in line[len("Binary:"):].split(",")]
+ if line.startswith("Version:"):
+ self._sections["Version"] = line[len("Version:"):].strip()
+ # we are at the end
+ if line.startswith("-----BEGIN PGP SIGNATURE-"):
+ break
+ s = _("Install Build-Dependencies for "
+ "source package '%s' that builds %s\n"
+ ) % (self.pkgName, " ".join(self.binaries))
+ self._sections["Description"] = s
+
+ def checkDeb(self):
+ if not self.checkConflicts():
+ for pkgname in self._installedConflicts:
+ if self._cache[pkgname]._pkg.Essential:
+ raise Exception, _("A essential package would be removed")
+ self._cache[pkgname].markDelete()
+ # FIXME: a additional run of the checkConflicts()
+ # after _satisfyDepends() should probably be done
+ return self._satisfyDepends(self.depends)
if __name__ == "__main__":
- import sys
-
- d = DebPackage(sys.argv[1])
- print d["Section"]
- print d["Maintainer"]
- print "Files:"
- print "\n".join(d.filelist)
+ from cache import Cache
+ from progress import DpkgInstallProgress
+
+ cache = Cache()
+
+ vp = "www-browser"
+ print "%s virtual: %s" % (vp,cache.isVirtualPackage(vp))
+ providers = cache.getProvidingPackages(vp)
+ print "Providers for %s :" % vp
+ for pkg in providers:
+ print " %s" % pkg.name
+ d = DebPackage(sys.argv[1], cache)
+ print "Deb: %s" % d.pkgname
+ if not d.checkDeb():
+ print "can't be satified"
+ print d._failureString
+ print "missing deps: %s" % d.missingDeps
+ print d.requiredChanges
+
+ print "Installing ..."
+ ret = d.install(DpkgInstallProgress())
+ print ret
+
+ #s = DscSrcPackage(cache, "../tests/3ddesktop_0.2.9-6.dsc")
+ #s.checkDep()
+ #print "Missing deps: ",s.missingDeps
+ #print "Print required changes: ", s.requiredChanges
+
+ s = DscSrcPackage(cache=cache)
+ d = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)"
+ print s._satisfyDepends(apt_pkg.ParseDepends(d))
diff --git a/apt/gtk/__init__.py b/apt/gtk/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/apt/gtk/__init__.py
diff --git a/apt/gtk/widgets.py b/apt/gtk/widgets.py
new file mode 100644
index 00000000..3a15258f
--- /dev/null
+++ b/apt/gtk/widgets.py
@@ -0,0 +1,394 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+widgets - GTK widgets to show the progress and status of apt
+
+Copyright (c) 2004,2005 Canonical Ltd.
+
+Authors: Michael Vogt <mvo@ubuntu.com>
+ Sebastian Heinlein <glatzor@ubuntu.com>
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License as
+published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version.
+
+his program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+USA
+"""
+
+from gettext import gettext as _
+import os
+import time
+
+import pygtk
+pygtk.require('2.0')
+import gtk
+import glib
+import gobject
+import pango
+import vte
+
+import apt
+import apt_pkg
+
+class GOpProgress(gobject.GObject, apt.progress.OpProgress):
+
+ __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE,
+ (gobject.TYPE_STRING, gobject.TYPE_INT)),
+ "status-started":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ()),
+ "status-finished":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ())}
+
+ def __init__(self):
+ apt.progress.OpProgress.__init__(self)
+ gobject.GObject.__init__(self)
+ self._context = glib.main_context_default()
+
+ def update(self, percent):
+ self.emit("status-changed", self.op, percent)
+ while self._context.pending():
+ self._context.iteration()
+
+ def done(self):
+ self.emit("status-finished")
+
+class GInstallProgress(gobject.GObject, apt.progress.InstallProgress):
+
+ # Seconds until a maintainer script will be regarded as hanging
+ INSTALL_TIMEOUT = 5 * 60
+
+ __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE,
+ (gobject.TYPE_STRING, gobject.TYPE_INT)),
+ "status-started":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ()),
+ "status-timeout":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ()),
+ "status-error":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ()),
+ "status-conffile":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ()),
+ "status-finished":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ())}
+
+ def __init__(self, term):
+ apt.progress.InstallProgress.__init__(self)
+ gobject.GObject.__init__(self)
+ self.finished = False
+ self.time_last_update = time.time()
+ self.term = term
+ reaper = vte.reaper_get()
+ reaper.connect("child-exited", self.childExited)
+ self.env = ["VTE_PTY_KEEP_FD=%s"% self.writefd,
+ "DEBIAN_FRONTEND=gnome",
+ "APT_LISTCHANGES_FRONTEND=gtk"]
+ self._context = glib.main_context_default()
+
+ def childExited(self, term, pid, status):
+ self.apt_status = os.WEXITSTATUS(status)
+ self.finished = True
+
+ def error(self, pkg, errormsg):
+ self.emit("status-error")
+
+ def conffile(self, current, new):
+ self.emit("status-conffile")
+
+ def startUpdate(self):
+ self.emit("status-started")
+
+ def finishUpdate(self):
+ self.emit("status-finished")
+
+ def statusChange(self, pkg, percent, status):
+ self.time_last_update = time.time()
+ self.emit("status-changed", status, percent)
+
+ def updateInterface(self):
+ apt.progress.InstallProgress.updateInterface(self)
+ while self._context.pending():
+ self._context.iteration()
+ if self.time_last_update + self.INSTALL_TIMEOUT < time.time():
+ self.emit("status-timeout")
+
+ def fork(self):
+ return self.term.forkpty(envv=self.env)
+
+ def waitChild(self):
+ while not self.finished:
+ self.updateInterface()
+ return self.apt_status
+
+
+class GDpkgInstallProgress(apt.progress.DpkgInstallProgress,GInstallProgress):
+
+ def run(self, debfile):
+ apt.progress.DpkgInstallProgress.run(self, debfile)
+
+ def updateInterface(self):
+ apt.progress.DpkgInstallProgress.updateInterface(self)
+ if self.time_last_update + self.INSTALL_TIMEOUT < time.time():
+ self.emit("status-timeout")
+
+
+class GFetchProgress(gobject.GObject, apt.progress.FetchProgress):
+
+ __gsignals__ = {"status-changed":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE,
+ (gobject.TYPE_STRING, gobject.TYPE_INT)),
+ "status-started":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ()),
+ "status-finished":(gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ())}
+
+ def __init__(self):
+ apt.progress.FetchProgress.__init__(self)
+ gobject.GObject.__init__(self)
+ self._continue = True
+ self._context = glib.main_context_default()
+
+ def start(self):
+ self.emit("status-started")
+
+ def stop(self):
+ self.emit("status-finished")
+
+ def cancel(self):
+ self._continue = False
+
+ def pulse(self):
+ apt.progress.FetchProgress.pulse(self)
+ currentItem = self.currentItems + 1
+ if currentItem > self.totalItems:
+ currentItem = self.totalItems
+ if self.currentCPS > 0:
+ text = (_("Downloading file %(current)li of %(total)li with "
+ "%(speed)s/s") % \
+ {"current" : currentItem,
+ "total" : self.totalItems,
+ "speed" : humanize_size(self.currentCPS)})
+ else:
+ text = (_("Downloading file %(current)li of %(total)li") % \
+ {"current" : currentItem,
+ "total" : self.totalItems })
+ self.emit("status-changed", text, self.percent)
+ while self._context.pending():
+ self._context.iteration()
+ return self._continue
+
+
+class GtkAptProgress(gtk.VBox):
+ """
+ This widget provides a progress bar, a terminal and a status bar for
+ showing the progress of package manipulation tasks.
+
+ A simple example code snippet to install/remove a package:
+
+ import pygtk
+ pygtk.require('2.0')
+ import gtk
+
+ import apt.widgets
+
+ win = gtk.Window()
+ progress = apt.widgets.GtkAptProgress()
+ win.set_title("GtkAptProgress Demo")
+ win.add(progress)
+ progress.show()
+ win.show()
+
+ cache = apt.cache.Cache(progress.open))
+ cache["xterm"].markDelete()
+ progress.show_terminal(expanded=True)
+ cache.commit(progress.fetch),
+ progress.install)
+
+ gtk.main()
+ """
+ def __init__(self):
+ gtk.VBox.__init__(self)
+ self.set_spacing(6)
+ # Setup some child widgets
+ self._expander = gtk.Expander(_("Details"))
+ self._terminal = vte.Terminal()
+ #self._terminal.set_font_from_string("monospace 10")
+ self._expander.add(self._terminal)
+ self._progressbar = gtk.ProgressBar()
+ # Setup the always italic status label
+ self._label = gtk.Label()
+ attr_list = pango.AttrList()
+ attr_list.insert(pango.AttrStyle(pango.STYLE_ITALIC, 0, -1))
+ self._label.set_attributes(attr_list)
+ self._label.set_ellipsize(pango.ELLIPSIZE_END)
+ self._label.set_alignment(0, 0)
+ # add child widgets
+ self.pack_start(self._progressbar, False)
+ self.pack_start(self._label, False)
+ self.pack_start(self._expander, False)
+ # Setup the internal progress handlers
+ self._progress_open = GOpProgress()
+ self._progress_open.connect("status-changed", self._on_status_changed)
+ self._progress_open.connect("status-started", self._on_status_started)
+ self._progress_open.connect("status-finished", self._on_status_finished)
+ self._progress_fetch = GFetchProgress()
+ self._progress_fetch.connect("status-changed", self._on_status_changed)
+ self._progress_fetch.connect("status-started", self._on_status_started)
+ self._progress_fetch.connect("status-finished",
+ self._on_status_finished)
+ self._progress_install = GInstallProgress(self._terminal)
+ self._progress_install.connect("status-changed",
+ self._on_status_changed)
+ self._progress_install.connect("status-started",
+ self._on_status_started)
+ self._progress_install.connect("status-finished",
+ self._on_status_finished)
+ self._progress_install.connect("status-timeout",
+ self._on_status_timeout)
+ self._progress_install.connect("status-error",
+ self._on_status_timeout)
+ self._progress_install.connect("status-conffile",
+ self._on_status_timeout)
+ self._progress_dpkg_install = GDpkgInstallProgress(self._terminal)
+ self._progress_dpkg_install.connect("status-changed",
+ self._on_status_changed)
+ self._progress_dpkg_install.connect("status-started",
+ self._on_status_started)
+ self._progress_dpkg_install.connect("status-finished",
+ self._on_status_finished)
+ self._progress_dpkg_install.connect("status-timeout",
+ self._on_status_timeout)
+ self._progress_dpkg_install.connect("status-error",
+ self._on_status_timeout)
+ self._progress_dpkg_install.connect("status-conffile",
+ self._on_status_timeout)
+
+ def clear(self):
+ """
+ Reset all status information
+ """
+ self._label.set_label("")
+ self._progress.set_fraction(0)
+ self._expander.set_expanded(False)
+
+ @property
+ def open(self):
+ """
+ Return the cache opening progress handler.
+ """
+ return self._progress_open
+
+ @property
+ def install(self):
+ """
+ Return the install progress handler
+ """
+ return self._progress_install
+
+ @property
+ def dpkg_install(self):
+ """
+ Return the install progress handler for dpkg
+ """
+ return self._dpkg_progress_install
+
+ @property
+ def fetch(self):
+ """
+ Return the fetch progress handler
+ """
+ return self._progress_fetch
+
+ def _on_status_started(self, progress):
+ self._on_status_changed(progress, _("Starting..."), 0)
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+ def _on_status_finished(self, progress):
+ self._on_status_changed(progress, _("Complete"), 100)
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+ def _on_status_changed(self, progress, status, percent):
+ self._label.set_text(status)
+ if percent is None:
+ self._progressbar.pulse()
+ else:
+ self._progressbar.set_fraction(percent/100.0)
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+ def _on_status_timeout(self, progress):
+ selt._expander.set_expanded(True)
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+ def cancel_download(self):
+ """
+ Cancel a currently running download
+ """
+ self._progress_fetch.cancel()
+
+ def show_terminal(self, expanded=False):
+ """
+ Show an expander with a terminal widget which provides a way
+ to interact with dpkg
+ """
+ self._expander.show()
+ self._terminal.show()
+ self._expander.set_expanded(expanded)
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+ def hide_terminal(self):
+ """
+ Hide the expander with the terminal widget
+ """
+ self._expander.hide()
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+ def show(self):
+ gtk.HBox.show(self)
+ self._label.show()
+ self._progressbar.show()
+ while gtk.events_pending():
+ gtk.main_iteration()
+
+if __name__ == "__main__":
+ import sys
+ import debfile
+
+ win = gtk.Window()
+ apt_progress = GAptProgress()
+ win.set_title("GtkAptProgress Demo")
+ win.add(apt_progress)
+ apt_progress.show()
+ win.show()
+ cache = apt.cache.Cache(apt_progress.get_open_progress())
+ pkg = cache["xterm"]
+ if pkg.isInstalled:
+ pkg.markDelete()
+ else:
+ pkg.markInstall()
+ apt_progress.show_terminal(True)
+ try:
+ cache.commit(apt_progress.get_fetch_progress(),
+ apt_progress.get_install_progress())
+ except:
+ pass
+ if len(sys.argv) > 1:
+ deb = DebPackage(sys.argv[1], cache)
+ deb.install(apt_progress.get_dpkg_install_progress())
+ gtk.main()
+
+# vim: ts=4 et sts=4
diff --git a/apt/package.py b/apt/package.py
index c1c2b1e1..70ddbb1a 100644
--- a/apt/package.py
+++ b/apt/package.py
@@ -19,10 +19,18 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
-import apt_pkg
+import httplib
import sys
import random
+import re
+import socket
import string
+import urllib2
+
+import apt_pkg
+
+# Set a timeout for the changelog download
+socket.setdefaulttimeout(2)
#from gettext import gettext as _
import gettext
@@ -68,6 +76,7 @@ class Package(object):
self._pkg = pkgiter
self._list = sourcelist # sourcelist
self._pcache = pcache # python cache in cache.py
+ self._changelog = "" # Cached changelog
pass
# helper
@@ -235,8 +244,15 @@ class Package(object):
return self._records.ShortDesc
summary = property(summary)
- def description(self, format=True):
- """ Return the formated long description """
+ def description(self, format=True, useDots=False):
+ """
+ Return the formated long description according to the Debian policy
+ (Chapter 5.6.13).
+ See http://www.debian.org/doc/debian-policy/ch-controlfields.html
+ for more information.
+ """
+ if not format:
+ return self.rawDescription
if not self._lookupRecord():
return ""
# get the translated description
@@ -249,12 +265,36 @@ class Package(object):
except UnicodeDecodeError,e:
s = _("Invalid unicode in description for '%s' (%s). "
"Please report.") % (self.name,e)
- for line in string.split(s,"\n"):
- tmp = string.strip(line)
- if tmp == ".":
+ lines = string.split(s, "\n")
+ for i in range(len(lines)):
+ # Skip the first line, since its a duplication of the summary
+ if i == 0: continue
+ raw_line = lines[i]
+ if raw_line.strip() == ".":
+ # The line is just line break
+ if not desc.endswith("\n"):
desc += "\n"
+ continue
+ elif raw_line.startswith(" "):
+ # The line should be displayed verbatim without word wrapping
+ if not desc.endswith("\n"):
+ line = "\n%s\n" % raw_line[2:]
+ else:
+ line = "%s\n" % raw_line[2:]
+ elif raw_line.startswith(" "):
+ # The line is part of a paragraph.
+ if desc.endswith("\n") or desc == "":
+ # Skip the leading white space
+ line = raw_line[1:]
else:
- desc += tmp + "\n"
+ line = raw_line
+ else:
+ line = raw_line
+ # Use dots for lists
+ if useDots:
+ line = re.sub(r"^(\s*)(\*|0|o|-) ", ur"\1\u2022 ", line, 1)
+ # Add current line to the description
+ desc += line
return desc
description = property(description)
@@ -354,6 +394,156 @@ class Package(object):
return ver.InstalledSize
installedSize = property(installedSize)
+ def installedFiles(self):
+ """
+ Return the list of unicode names of the files which have
+ been installed by this package
+ """
+ path = "/var/lib/dpkg/info/%s.list" % self.name
+ try:
+ list = open(path)
+ files = list.read().decode().split("\n")
+ list.close()
+ except:
+ return []
+ return files
+ installedFiles = property(installedFiles)
+
+ def getChangelog(self, uri=None, cancel_lock=None):
+ """
+ Download the changelog of the package and return it as unicode
+ string
+
+ uri: Is the uri to the changelog file. The following named variables
+ will be substituted: src_section, prefix, src_pkg and src_ver
+ For example the Ubuntu changelog:
+ uri = "http://changelogs.ubuntu.com/changelogs/pool" \\
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+ cancel_lock: If this threading.Lock() is set, the download will be
+ canceled
+ """
+ # Return a cached changelog if available
+ if self._changelog != "":
+ return self._changelog
+
+ if uri == None:
+ if self.candidateOrigin[0].origin == "Debian":
+ uri = "http://packages.debian.org/changelogs/pool" \
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+ elif self.candidateOrigin[0].origin == "Ubuntu":
+ uri = "http://changelogs.ubuntu.com/changelogs/pool" \
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+ else:
+ return _("The list of changes is not available")
+
+ # get the src package name
+ src_pkg = self.sourcePackageName
+
+ # assume "main" section
+ src_section = "main"
+ # use the section of the candidate as a starting point
+ section = self._depcache.GetCandidateVer(self._pkg).Section
+
+ # get the source version, start with the binaries version
+ bin_ver = self.candidateVersion
+ src_ver = self.candidateVersion
+ #print "bin: %s" % binver
+ try:
+ # try to get the source version of the pkg, this differs
+ # for some (e.g. libnspr4 on ubuntu)
+ # this feature only works if the correct deb-src are in the
+ # sources.list
+ # otherwise we fall back to the binary version number
+ src_records = apt_pkg.GetPkgSrcRecords()
+ src_rec = src_records.Lookup(src_pkg)
+ if src_rec:
+ src_ver = src_records.Version
+ #if apt_pkg.VersionCompare(binver, srcver) > 0:
+ # srcver = binver
+ if not src_ver:
+ src_ver = bin_ver
+ #print "srcver: %s" % src_ver
+ section = src_records.Section
+ #print "srcsect: %s" % section
+ else:
+ # fail into the error handler
+ raise SystemError
+ except SystemError, e:
+ src_ver = bin_ver
+
+ l = section.split("/")
+ if len(l) > 1:
+ src_section = l[0]
+
+ # lib is handled special
+ prefix = src_pkg[0]
+ if src_pkg.startswith("lib"):
+ prefix = "lib" + src_pkg[3]
+
+ # stip epoch
+ l = string.split(src_ver,":")
+ if len(l) > 1:
+ src_ver = "".join(l[1:])
+
+ uri = uri % {"src_section" : src_section,
+ "prefix" : prefix,
+ "src_pkg" : src_pkg,
+ "src_ver" : src_ver}
+ try:
+ # Check if the download was canceled
+ if cancel_lock and cancel_lock.isSet(): return ""
+ changelog_file = urllib2.urlopen(uri)
+ # do only get the lines that are new
+ changelog = ""
+ regexp = "^%s \((.*)\)(.*)$" % (re.escape(src_pkg))
+
+ i=0
+ while True:
+ # Check if the download was canceled
+ if cancel_lock and cancel_lock.isSet(): return ""
+ # Read changelog line by line
+ line_raw = changelog_file.readline()
+ if line_raw == "":
+ break
+ # The changelog is encoded in utf-8, but since there isn't any
+ # http header, urllib2 seems to treat it as ascii
+ line = line_raw.decode("utf-8")
+
+ #print line.encode('utf-8')
+ match = re.match(regexp, line)
+ if match:
+ # strip epoch from installed version
+ # and from changelog too
+ installed = self.installedVersion
+ if installed and ":" in installed:
+ installed = installed.split(":",1)[1]
+ changelog_ver = match.group(1)
+ if changelog_ver and ":" in changelog_ver:
+ changelog_ver = changelog_ver.split(":", 1)[1]
+ if installed and \
+ apt_pkg.VersionCompare(changelog_ver, installed) <= 0:
+ break
+ # EOF (shouldn't really happen)
+ changelog += line
+
+ # Print an error if we failed to extract a changelog
+ if len(changelog) == 0:
+ changelog = _("The list of changes is not available")
+ self._changelog = changelog
+ except urllib2.HTTPError,e:
+ return _("The list of changes is not available yet.\n\n"
+ "Please use http://launchpad.net/ubuntu/+source/%s/%s/"
+ "+changelog\n"
+ "until the changes become available or try again "
+ "later.") % (srcpkg, srcver),
+ except IOError, httplib.BadStatusLine:
+ return _("Failed to download the list of changes. \nPlease "
+ "check your Internet connection.")
+ return self._changelog
+
# canidate origin
class Origin:
def __init__(self, pkg, VerFileIter):
diff --git a/apt/progress.py b/apt/progress.py
index bb1bce35..a8ab76b6 100644
--- a/apt/progress.py
+++ b/apt/progress.py
@@ -208,7 +208,7 @@ class InstallProgress(DumbInstallProgress):
(pid, res) = os.waitpid(self.child_pid,os.WNOHANG)
if pid == self.child_pid:
break
- return os.WEXITSTATUS(res)
+ return res
def run(self, pm):
pid = self.fork()
if pid == 0:
@@ -217,7 +217,7 @@ class InstallProgress(DumbInstallProgress):
os._exit(res)
self.child_pid = pid
res = self.waitChild()
- return res
+ return os.WEXITSTATUS(res)
class CdromProgress:
""" Report the cdrom add progress
@@ -233,6 +233,60 @@ class CdromProgress:
def changeCdrom(self):
pass
+
+class DpkgInstallProgress(InstallProgress):
+ """
+ Progress handler for a local Debian package installation
+ """
+ def run(self, debfile):
+ """
+ Start installing the given Debian package
+ """
+ self.debfile = debfile
+ self.debname = os.path.basename(debfile).split("_")[0]
+ pid = self.fork()
+ if pid == 0:
+ # child
+ res = os.system("/usr/bin/dpkg --status-fd %s -i %s" % \
+ (self.writefd, self.debfile))
+ os._exit(os.WEXITSTATUS(res))
+ self.child_pid = pid
+ res = self.waitChild()
+ return res
+
+ def updateInterface(self):
+ """
+ Process status messages from dpkg
+ """
+ if self.statusfd != None:
+ while True:
+ try:
+ self.read += os.read(self.statusfd.fileno(),1)
+ except OSError, (errno,errstr):
+ # resource temporarly unavailable is ignored
+ if errno != 11:
+ print errstr
+ break
+ if self.read.endswith("\n"):
+ statusl = string.split(self.read, ":")
+ if len(statusl) < 3:
+ print "got garbage from dpkg: '%s'" % read
+ self.read = ""
+ break
+ status = statusl[2].strip()
+ #print status
+ if status == "error":
+ self.error(self.debname, status)
+ elif status == "conffile-prompt":
+ # we get a string like this:
+ # 'current-conffile' 'new-conffile' useredited distedited
+ match = re.compile("\s*\'(.*)\'\s*\'(.*)\'.*").match(status_str)
+ if match:
+ self.conffile(match.group(1), match.group(2))
+ else:
+ self.status = status
+ self.read = ""
+
# module test code
if __name__ == "__main__":
import apt_pkg