summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Heinlein <sebi@glatzor.de>2008-08-22 06:14:02 +0200
committerSebastian Heinlein <sebi@glatzor.de>2008-08-22 06:14:02 +0200
commitf59dec995d909d7f32c0cb8422e1554a82e2657d (patch)
tree731b7f691decfc413f38b97d46c0e2b53bd35ab4
parent0b0b062b72b721e5f5b3387b6c0fb4de6f6a4e61 (diff)
parenta39a102e437702e7cd1c3f314e507d6b0d466eb5 (diff)
downloadpython-apt-f59dec995d909d7f32c0cb8422e1554a82e2657d.tar.gz
Merge local dpkg installation from the gdebi branch.
-rw-r--r--apt/cache.py49
-rw-r--r--apt/dpkg.py492
-rw-r--r--apt/progress.py56
3 files changed, 596 insertions, 1 deletions
diff --git a/apt/cache.py b/apt/cache.py
index bbf2165b..3fd1e996 100644
--- a/apt/cache.py
+++ b/apt/cache.py
@@ -182,6 +182,51 @@ class Cache(object):
finally:
os.close(lock)
+ def isVirtualPackage(self, name):
+ """
+ Return True if the package of the given name is a virtual package
+ """
+ try:
+ virtual_pkg = self._cache[name]
+ except KeyError:
+ return False
+ if len(virtual_pkg.VersionList) == 0:
+ return True
+ return False
+
+ def getProvidingPackages(self, virtual):
+ """
+ Return a list of packages which provide the virtual package of the
+ specified name
+ """
+ providers = []
+ try:
+ vp = self._cache[virtual]
+ if len(vp.VersionList) != 0:
+ return providers
+ except KeyError:
+ return providers
+ for pkg in self:
+ v = self._depcache.GetCandidateVer(pkg._pkg)
+ if v == None:
+ continue
+ for p in v.ProvidesList:
+ #print virtual
+ #print p[0]
+ if virtual == p[0]:
+ # we found a pkg that provides this virtual
+ # pkg, check if the proivdes is any good
+ providers.append(pkg)
+ #cand = self._cache[pkg.name]
+ #candver = self._cache._depcache.GetCandidateVer(cand._pkg)
+ #instver = cand._pkg.CurrentVer
+ #res = apt_pkg.CheckDep(candver.VerStr,oper,ver)
+ #if res == True:
+ # self._dbg(1,"we can use %s" % pkg.name)
+ # or_found = True
+ # break
+ return providers
+
def update(self, fetchProgress=None):
" run the equivalent of apt-get update "
lockfile = apt_pkg.Config.FindDir("Dir::State::Lists") + "lock"
@@ -232,6 +277,10 @@ class Cache(object):
fetcher.Shutdown()
return (res == pm.ResultCompleted)
+ def clear(self):
+ """ Unmark all changes """
+ self._depcache.Init()
+
# cache changes
def cachePostChange(self):
" called internally if the cache has changed, emit a signal then "
diff --git a/apt/dpkg.py b/apt/dpkg.py
new file mode 100644
index 00000000..7ebc551f
--- /dev/null
+++ b/apt/dpkg.py
@@ -0,0 +1,492 @@
+# Copyright (c) 2005-2007 Canonical
+#
+# AUTHOR:
+# Michael Vogt <mvo@ubuntu.com>
+#
+# This file is part of GDebi
+#
+# GDebi is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; either version 2 of the License, or (at
+# your option) any later version.
+#
+# GDebi is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GDebi; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import warnings
+from warnings import warn
+warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
+import apt_inst, apt_pkg
+import apt
+import sys
+import os
+from gettext import gettext as _
+from cache import Cache
+from progress import DpkgInstallProgress
+
+class DebPackage(object):
+ debug = 0
+
+ def __init__(self, cache, file=None):
+ cache.clear()
+ self._cache = cache
+ self.file = file
+ self._needPkgs = []
+ self._sections = {}
+ self._installedConflicts = set()
+ self._failureString = ""
+ if file != None:
+ self.open(file)
+
+ def open(self, file):
+ """ read a deb """
+ control = apt_inst.debExtractControl(open(file))
+ self._sections = apt_pkg.ParseSection(control)
+ self.pkgName = self._sections["Package"]
+
+ def _isOrGroupSatisfied(self, or_group):
+ """ this function gets a 'or_group' and analyzes if
+ at least one dependency of this group is already satisfied """
+ self._dbg(2,"_checkOrGroup(): %s " % (or_group))
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # check for virtual pkgs
+ if not self._cache.has_key(depname):
+ if self._cache.isVirtualPackage(depname):
+ self._dbg(3,"_isOrGroupSatisfied(): %s is virtual dep" % depname)
+ for pkg in self._cache.getProvidingPackages(depname):
+ if pkg.isInstalled:
+ return True
+ continue
+
+ inst = self._cache[depname]
+ instver = inst.installedVersion
+ if instver != None and apt_pkg.CheckDep(instver,oper,ver) == True:
+ return True
+ return False
+
+
+ def _satisfyOrGroup(self, or_group):
+ """ try to satisfy the or_group """
+
+ or_found = False
+ virtual_pkg = None
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # if we don't have it in the cache, it may be virtual
+ if not self._cache.has_key(depname):
+ if not self._cache.isVirtualPackage(depname):
+ continue
+ providers = self._cache.getProvidingPackages(depname)
+ # if a package just has a single virtual provider, we
+ # just pick that (just like apt)
+ if len(providers) != 1:
+ continue
+ depname = providers[0].name
+
+ # now check if we can satisfy the deps with the candidate(s)
+ # in the cache
+ cand = self._cache[depname]
+ candver = self._cache._depcache.GetCandidateVer(cand._pkg)
+ if not candver:
+ continue
+ if not apt_pkg.CheckDep(candver.VerStr,oper,ver):
+ continue
+
+ # check if we need to install it
+ self._dbg(2,"Need to get: %s" % depname)
+ self._needPkgs.append(depname)
+ return True
+
+ # if we reach this point, we failed
+ or_str = ""
+ for dep in or_group:
+ or_str += dep[0]
+ if dep != or_group[len(or_group)-1]:
+ or_str += "|"
+ self._failureString += _("Dependency is not satisfiable: %s\n" % or_str)
+ return False
+
+ def _checkSinglePkgConflict(self, pkgname, ver, oper):
+ """ returns true if a pkg conflicts with a real installed/marked
+ pkg """
+ # FIXME: deal with conflicts against its own provides
+ # (e.g. Provides: ftp-server, Conflicts: ftp-server)
+ self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" % (pkgname, ver, oper))
+ pkgver = None
+ cand = self._cache[pkgname]
+ if cand.isInstalled:
+ pkgver = cand.installedVersion
+ elif cand.markedInstall:
+ pkgver = cand.candidateVersion
+ #print "pkg: %s" % pkgname
+ #print "ver: %s" % ver
+ #print "pkgver: %s " % pkgver
+ #print "oper: %s " % oper
+ if (pkgver and apt_pkg.CheckDep(pkgver,oper,ver) and
+ not self.replacesRealPkg(pkgname, oper, ver)):
+ self._failureString += _("Conflicts with the installed package '%s'" % cand.name)
+ return True
+ return False
+
+ def _checkConflictsOrGroup(self, or_group):
+ """ check the or-group for conflicts with installed pkgs """
+ self._dbg(2,"_checkConflictsOrGroup(): %s " % (or_group))
+
+ or_found = False
+ virtual_pkg = None
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # check conflicts with virtual pkgs
+ if not self._cache.has_key(depname):
+ # FIXME: we have to check for virtual replaces here as
+ # well (to pass tests/gdebi-test8.deb)
+ if self._cache.isVirtualPackage(depname):
+ for pkg in self._cache.getProvidingPackages(depname):
+ self._dbg(3, "conflicts virtual check: %s" % pkg.name)
+ # P/C/R on virtal pkg, e.g. ftpd
+ if self.pkgName == pkg.name:
+ self._dbg(3, "conflict on self, ignoring")
+ continue
+ if self._checkSinglePkgConflict(pkg.name,ver,oper):
+ self._installedConflicts.add(pkg.name)
+ continue
+ if self._checkSinglePkgConflict(depname,ver,oper):
+ self._installedConflicts.add(depname)
+ return len(self._installedConflicts) != 0
+
+ def getConflicts(self):
+ conflicts = []
+ key = "Conflicts"
+ if self._sections.has_key(key):
+ conflicts = apt_pkg.ParseDepends(self._sections[key])
+ return conflicts
+
+ def getDepends(self):
+ depends = []
+ # find depends
+ for key in ["Depends","PreDepends"]:
+ if self._sections.has_key(key):
+ depends.extend(apt_pkg.ParseDepends(self._sections[key]))
+ return depends
+
+ def getProvides(self):
+ provides = []
+ key = "Provides"
+ if self._sections.has_key(key):
+ provides = apt_pkg.ParseDepends(self._sections[key])
+ return provides
+
+ def getReplaces(self):
+ replaces = []
+ key = "Replaces"
+ if self._sections.has_key(key):
+ replaces = apt_pkg.ParseDepends(self._sections[key])
+ return replaces
+
+ def replacesRealPkg(self, pkgname, oper, ver):
+ """
+ return True if the deb packages replaces a real (not virtual)
+ packages named pkgname, oper, ver
+ """
+ self._dbg(3, "replacesPkg() %s %s %s" % (pkgname,oper,ver))
+ pkgver = None
+ cand = self._cache[pkgname]
+ if cand.isInstalled:
+ pkgver = cand.installedVersion
+ elif cand.markedInstall:
+ pkgver = cand.candidateVersion
+ for or_group in self.getReplaces():
+ for (name, ver, oper) in or_group:
+ if (name == pkgname and
+ apt_pkg.CheckDep(pkgver,oper,ver)):
+ self._dbg(3, "we have a replaces in our package for the conflict against '%s'" % (pkgname))
+ return True
+ return False
+
+ def checkConflicts(self):
+ """ check if the pkg conflicts with a existing or to be installed
+ package. Return True if the pkg is ok """
+ res = True
+ for or_group in self.getConflicts():
+ if self._checkConflictsOrGroup(or_group):
+ #print "Conflicts with a exisiting pkg!"
+ #self._failureString = "Conflicts with a exisiting pkg!"
+ res = False
+ return res
+
+ # some constants
+ (NO_VERSION,
+ VERSION_OUTDATED,
+ VERSION_SAME,
+ VERSION_IS_NEWER) = range(4)
+
+ def compareToVersionInCache(self, useInstalled=True):
+ """ checks if the pkg is already installed or availabe in the cache
+ and if so in what version, returns if the version of the deb
+ is not available,older,same,newer
+ """
+ self._dbg(3,"compareToVersionInCache")
+ pkgname = self._sections["Package"]
+ debver = self._sections["Version"]
+ self._dbg(1,"debver: %s" % debver)
+ if self._cache.has_key(pkgname):
+ if useInstalled:
+ cachever = self._cache[pkgname].installedVersion
+ else:
+ cachever = self._cache[pkgname].candidateVersion
+ if cachever != None:
+ cmp = apt_pkg.VersionCompare(cachever,debver)
+ self._dbg(1, "CompareVersion(debver,instver): %s" % cmp)
+ if cmp == 0:
+ return self.VERSION_SAME
+ elif cmp < 0:
+ return self.VERSION_IS_NEWER
+ elif cmp > 0:
+ return self.VERSION_OUTDATED
+ return self.NO_VERSION
+
+ def checkDeb(self):
+ self._dbg(3,"checkDepends")
+
+ # check arch
+ arch = self._sections["Architecture"]
+ if arch != "all" and arch != apt_pkg.Config.Find("APT::Architecture"):
+ self._dbg(1,"ERROR: Wrong architecture dude!")
+ self._failureString = _("Wrong architecture '%s'" % arch)
+ return False
+
+ # check version
+ res = self.compareToVersionInCache()
+ if res == self.VERSION_OUTDATED: # the deb is older than the installed
+ self._failureString = _("A later version is already installed")
+ return False
+
+ # FIXME: this sort of error handling sux
+ self._failureString = ""
+
+ # check conflicts
+ if not self.checkConflicts():
+ return False
+
+ # try to satisfy the dependencies
+ res = self._satisfyDepends(self.getDepends())
+ if not res:
+ return False
+
+ # check for conflicts again (this time with the packages that are
+ # makeed for install)
+ if not self.checkConflicts():
+ return False
+
+ if self._cache._depcache.BrokenCount > 0:
+ self._failureString = _("Failed to satisfy all dependencies (broken cache)")
+ # clean the cache again
+ self._cache.clear()
+ return False
+ return True
+
+ def satisfyDependsStr(self, dependsstr):
+ return self._satisfyDepends(apt_pkg.ParseDepends(dependsstr))
+
+ def _satisfyDepends(self, depends):
+ # turn off MarkAndSweep via a action group (if available)
+ try:
+ _actiongroup = apt_pkg.GetPkgActionGroup(self._cache._depcache)
+ except AttributeError, e:
+ pass
+ # check depends
+ for or_group in depends:
+ #print "or_group: %s" % or_group
+ #print "or_group satified: %s" % self._isOrGroupSatisfied(or_group)
+ if not self._isOrGroupSatisfied(or_group):
+ if not self._satisfyOrGroup(or_group):
+ return False
+ # now try it out in the cache
+ for pkg in self._needPkgs:
+ try:
+ self._cache[pkg].markInstall(fromUser=False)
+ except SystemError, e:
+ self._failureString = _("Cannot install '%s'" % pkg)
+ self._cache.clear()
+ return False
+ return True
+
+ def missingDeps(self):
+ self._dbg(1, "Installing: %s" % self._needPkgs)
+ if self._needPkgs == None:
+ self.checkDeb()
+ return self._needPkgs
+ missingDeps = property(missingDeps)
+
+ def requiredChanges(self):
+ """ gets the required changes to satisfy the depends.
+ returns a tuple with (install, remove, unauthenticated)
+ """
+ install = []
+ remove = []
+ unauthenticated = []
+ for pkg in self._cache:
+ if pkg.markedInstall or pkg.markedUpgrade:
+ install.append(pkg.name)
+ # check authentication, one authenticated origin is enough
+ # libapt will skip non-authenticated origins then
+ authenticated = False
+ for origin in pkg.candidateOrigin:
+ authenticated |= origin.trusted
+ if not authenticated:
+ unauthenticated.append(pkg.name)
+ if pkg.markedDelete:
+ remove.append(pkg.name)
+ return (install,remove, unauthenticated)
+ requiredChanges = property(requiredChanges)
+
+ def filelist(self):
+ """ return the list of files in the deb """
+ files = []
+ def extract_cb(What,Name,Link,Mode,UID,GID,Size,MTime,Major,Minor):
+ #print "%s '%s','%s',%u,%u,%u,%u,%u,%u,%u"\
+ # % (What,Name,Link,Mode,UID,GID,Size, MTime, Major, Minor)
+ files.append(Name)
+ try:
+ try:
+ apt_inst.debExtract(open(self.file), extract_cb, "data.tar.gz")
+ except SystemError, e:
+ try:
+ apt_inst.debExtract(open(self.file), extract_cb, "data.tar.bz2")
+ except SystemError, e:
+ try:
+ apt_inst.debExtract(open(self.file), extract_cb, "data.tar.lzma")
+ except SystemError, e:
+ return [_("List of files could not be read, please report this as a bug")]
+ # IOError may happen because of gvfs madness (LP: #211822)
+ except IOError, e:
+ return [_("IOError during filelist read: %s" % e)]
+ return files
+ filelist = property(filelist)
+
+ # properties
+ def __getitem__(self,item):
+ if not self._sections.has_key(item):
+ # Translators: it's for missing entries in the deb package,
+ # e.g. a missing "Maintainer" field
+ return _("%s is not available" % item)
+ return self._sections[item]
+
+ def _dbg(self, level, msg):
+ """Write debugging output to sys.stderr.
+ """
+ if level <= self.debug:
+ print >> sys.stderr, msg
+
+ def install(self, installProgress=None):
+ """ Install the package """
+ if installProgress == None:
+ res = os.system("/usr/sbin/dpkg -i %s" % self.file)
+ else:
+ res = installProgress.run(self.file)
+ return res
+
+class DscSrcPackage(DebPackage):
+ def __init__(self, cache, file=None):
+ DebPackage.__init__(self, cache)
+ self.file = file
+ self.depends = []
+ self.conflicts = []
+ self.binaries = []
+ if file != None:
+ self.open(file)
+ def getConflicts(self):
+ return self.conflicts
+ def getDepends(self):
+ return self.depends
+ def open(self, file):
+ depends_tags = ["Build-Depends:", "Build-Depends-Indep:"]
+ conflicts_tags = ["Build-Conflicts:", "Build-Conflicts-Indep:"]
+ for line in open(file):
+ # check b-d and b-c
+ for tag in depends_tags:
+ if line.startswith(tag):
+ key = line[len(tag):].strip()
+ self.depends.extend(apt_pkg.ParseSrcDepends(key))
+ for tag in conflicts_tags:
+ if line.startswith(tag):
+ key = line[len(tag):].strip()
+ self.conflicts.extend(apt_pkg.ParseSrcDepends(key))
+ # check binary and source and version
+ if line.startswith("Source:"):
+ self.pkgName = line[len("Source:"):].strip()
+ if line.startswith("Binary:"):
+ self.binaries = [pkg.strip() for pkg in line[len("Binary:"):].split(",")]
+ if line.startswith("Version:"):
+ self._sections["Version"] = line[len("Version:"):].strip()
+ # we are at the end
+ if line.startswith("-----BEGIN PGP SIGNATURE-"):
+ break
+ s = _("Install Build-Dependencies for "
+ "source package '%s' that builds %s\n"
+ ) % (self.pkgName, " ".join(self.binaries))
+ self._sections["Description"] = s
+
+ def checkDeb(self):
+ if not self.checkConflicts():
+ for pkgname in self._installedConflicts:
+ if self._cache[pkgname]._pkg.Essential:
+ raise Exception, _("A essential package would be removed")
+ self._cache[pkgname].markDelete()
+ # FIXME: a additional run of the checkConflicts()
+ # after _satisfyDepends() should probably be done
+ return self._satisfyDepends(self.depends)
+
+if __name__ == "__main__":
+
+ cache = Cache()
+
+ vp = "www-browser"
+ print "%s virtual: %s" % (vp,cache.isVirtualPackage(vp))
+ providers = cache.getProvidingPackages(vp)
+ print "Providers for %s :" % vp
+ for pkg in providers:
+ print " %s" % pkg.name
+
+ d = DebPackage(cache, sys.argv[1])
+ print "Deb: %s" % d.pkgName
+ if not d.checkDeb():
+ print "can't be satified"
+ print d._failureString
+ print "missing deps: %s" % d.missingDeps
+ print d.requiredChanges
+
+ print "Installing ..."
+ ret = d.install(DpkgInstallProgress())
+ print ret
+
+ #s = DscSrcPackage(cache, "../tests/3ddesktop_0.2.9-6.dsc")
+ #s.checkDep()
+ #print "Missing deps: ",s.missingDeps
+ #print "Print required changes: ", s.requiredChanges
+
+ s = DscSrcPackage(cache)
+ d = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)"
+ print s._satisfyDepends(apt_pkg.ParseDepends(d))
+
+
diff --git a/apt/progress.py b/apt/progress.py
index bb1bce35..2ef100a8 100644
--- a/apt/progress.py
+++ b/apt/progress.py
@@ -214,7 +214,7 @@ class InstallProgress(DumbInstallProgress):
if pid == 0:
# child
res = pm.DoInstall(self.writefd)
- os._exit(res)
+ os.exit(res)
self.child_pid = pid
res = self.waitChild()
return res
@@ -233,6 +233,60 @@ class CdromProgress:
def changeCdrom(self):
pass
+
+class DpkgInstallProgress(InstallProgress):
+ """
+ Progress handler for a local Debian package installation
+ """
+ def run(self, debfile):
+ """
+ Start installing the given Debian package
+ """
+ self.debfile = debfile
+ self.debname = os.path.basename(debfile).split("_")[0]
+ pid = self.fork()
+ if pid == 0:
+ # child
+ res = os.system("/usr/bin/dpkg --status-fd %s -i %s" % \
+ (self.writefd, self.debfile))
+ os.exit(res)
+ self.child_pid = pid
+ res = self.waitChild()
+ return res
+
+ def updateInterface(self):
+ """
+ Process status messages from dpkg
+ """
+ if self.statusfd != None:
+ while True:
+ try:
+ self.read += os.read(self.statusfd.fileno(),1)
+ except OSError, (errno,errstr):
+ # resource temporarly unavailable is ignored
+ if errno != 11:
+ print errstr
+ break
+ if self.read.endswith("\n"):
+ statusl = string.split(self.read, ":")
+ if len(statusl) < 3:
+ print "got garbage from dpkg: '%s'" % read
+ self.read = ""
+ break
+ status = statusl[2].strip()
+ #print status
+ if status == "error":
+ self.error(self.debname, status)
+ elif status == "conffile-prompt":
+ # we get a string like this:
+ # 'current-conffile' 'new-conffile' useredited distedited
+ match = re.compile("\s*\'(.*)\'\s*\'(.*)\'.*").match(status_str)
+ if match:
+ self.conffile(match.group(1), match.group(2))
+ else:
+ self.status = status
+ self.read = ""
+
# module test code
if __name__ == "__main__":
import apt_pkg