summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apt/progress.py8
-rw-r--r--aptsources/distro.py734
-rw-r--r--aptsources/sourceslist.py762
-rw-r--r--doc/examples/acquire.py38
-rwxr-xr-xdoc/examples/build-deps.py72
-rwxr-xr-xdoc/examples/checkstate.py34
-rwxr-xr-xdoc/examples/config.py16
-rwxr-xr-xdoc/examples/configisc.py24
-rwxr-xr-xdoc/examples/dependant-pkgs.py34
-rwxr-xr-xdoc/examples/gui-inst.py48
-rw-r--r--doc/examples/inst.py36
-rwxr-xr-xdoc/examples/print_uris.py16
-rw-r--r--doc/examples/progress.py4
-rwxr-xr-xdoc/examples/recommends.py38
-rwxr-xr-xdoc/examples/records.py10
-rw-r--r--doc/examples/sources.py6
-rwxr-xr-xdoc/examples/tagfile.py4
-rwxr-xr-xdoc/examples/versiontest.py40
-rw-r--r--tests/cache.py74
-rw-r--r--tests/depcache.py76
-rw-r--r--tests/lock.py6
-rwxr-xr-xtests/memleak.py54
-rw-r--r--tests/pkgproblemresolver.py110
-rw-r--r--tests/pkgrecords.py48
-rw-r--r--tests/pkgsrcrecords.py26
-rwxr-xr-xutils/get_ubuntu_mirrors.py8
26 files changed, 1163 insertions, 1163 deletions
diff --git a/apt/progress.py b/apt/progress.py
index b50b2915..6f72197e 100644
--- a/apt/progress.py
+++ b/apt/progress.py
@@ -188,14 +188,14 @@ class InstallProgress(DumbInstallProgress):
pass
def statusChange(self, pkg, percent, status):
- " called when the status changed "
- pass
+ " called when the status changed "
+ pass
def updateInterface(self):
if self.statusfd != None:
try:
- while not self.read.endswith("\n"):
- self.read += os.read(self.statusfd.fileno(),1)
+ while not self.read.endswith("\n"):
+ self.read += os.read(self.statusfd.fileno(),1)
except OSError, (errno,errstr):
# resource temporarly unavailable is ignored
if errno != EAGAIN and errnor != EWOULDBLOCK:
diff --git a/aptsources/distro.py b/aptsources/distro.py
index faccc271..962c57bc 100644
--- a/aptsources/distro.py
+++ b/aptsources/distro.py
@@ -34,396 +34,396 @@ def _(s): return gettext.dgettext("python-apt", s)
class NoDistroTemplateException(Exception):
- pass
+ pass
class Distribution:
- def __init__(self, id, codename, description, release):
- """ Container for distribution specific informations """
- # LSB information
- self.id = id
- self.codename = codename
- self.description = description
- self.release = release
-
- self.binary_type = "deb"
- self.source_type = "deb-src"
-
- def get_sources(self, sourceslist):
- """
- Find the corresponding template, main and child sources
- for the distribution
- """
-
- self.sourceslist = sourceslist
- # corresponding sources
- self.source_template = None
- self.child_sources = []
- self.main_sources = []
- self.disabled_sources = []
- self.cdrom_sources = []
- self.download_comps = []
- self.enabled_comps = []
- self.cdrom_comps = []
- self.used_media = []
- self.get_source_code = False
- self.source_code_sources = []
-
- # location of the sources
- self.default_server = ""
- self.main_server = ""
- self.nearest_server = ""
- self.used_servers = []
-
- # find the distro template
- for template in self.sourceslist.matcher.templates:
- if self.is_codename(template.name) and\
- template.distribution == self.id:
- #print "yeah! found a template for %s" % self.description
- #print template.description, template.base_uri, template.components
- self.source_template = template
- break
- if self.source_template == None:
- raise (NoDistroTemplateException,
- "Error: could not find a distribution template")
-
- # find main and child sources
- media = []
- comps = []
- cdrom_comps = []
- enabled_comps = []
- source_code = []
- for source in self.sourceslist.list:
- if source.invalid == False and\
- self.is_codename(source.dist) and\
- source.template and\
- self.is_codename(source.template.name):
- #print "yeah! found a distro repo: %s" % source.line
- # cdroms need do be handled differently
- if source.uri.startswith("cdrom:") and \
- source.disabled == False:
- self.cdrom_sources.append(source)
- cdrom_comps.extend(source.comps)
- elif source.uri.startswith("cdrom:") and \
- source.disabled == True:
- self.cdrom_sources.append(source)
- elif source.type == self.binary_type and \
- source.disabled == False:
- self.main_sources.append(source)
- comps.extend(source.comps)
- media.append(source.uri)
- elif source.type == self.binary_type and \
- source.disabled == True:
- self.disabled_sources.append(source)
- elif source.type == self.source_type and source.disabled == False:
- self.source_code_sources.append(source)
- elif source.type == self.source_type and source.disabled == True:
- self.disabled_sources.append(source)
- if source.invalid == False and\
- source.template in self.source_template.children:
- if source.disabled == False and source.type == self.binary_type:
- self.child_sources.append(source)
- elif source.disabled == False and source.type == self.source_type:
- self.source_code_sources.append(source)
- else:
- self.disabled_sources.append(source)
- self.download_comps = set(comps)
- self.cdrom_comps = set(cdrom_comps)
- enabled_comps.extend(comps)
- enabled_comps.extend(cdrom_comps)
- self.enabled_comps = set(enabled_comps)
- self.used_media = set(media)
-
- self.get_mirrors()
-
- def get_mirrors(self, mirror_template=None):
- """
- Provide a set of mirrors where you can get the distribution from
- """
- # the main server is stored in the template
- self.main_server = self.source_template.base_uri
-
- # other used servers
- for medium in self.used_media:
- if not medium.startswith("cdrom:"):
- # seems to be a network source
- self.used_servers.append(medium)
-
- if len(self.main_sources) == 0:
- self.default_server = self.main_server
- else:
- self.default_server = self.main_sources[0].uri
-
- # get a list of country codes and real names
- self.countries = {}
- try:
- f = open("/usr/share/iso-codes/iso_3166.tab", "r")
- lines = f.readlines()
- for line in lines:
- parts = line.split("\t")
- self.countries[parts[0].lower()] = parts[1].strip()
- except:
- print "could not open file '%s'" % file
- else:
- f.close()
-
- # try to guess the nearest mirror from the locale
- self.country = None
- self.country_code = None
- locale = os.getenv("LANG", default="en.UK")
- a = locale.find("_")
- z = locale.find(".")
- if z == -1:
- z = len(locale)
- country_code = locale[a+1:z].lower()
-
- if mirror_template:
- self.nearest_server = mirror_template % country_code
-
- if self.countries.has_key(country_code):
- self.country = self.countries[country_code]
- self.country_code = country_code
-
- def _get_mirror_name(self, server):
- ''' Try to get a human readable name for the main mirror of a country
- Customize for different distributions '''
- country = None
- i = server.find("://")
- l = server.find(".archive.ubuntu.com")
- if i != -1 and l != -1:
- country = server[i+len("://"):l]
- if self.countries.has_key(country):
- # TRANSLATORS: %s is a country
- return _("Server for %s") % \
- gettext.dgettext("iso_3166",
- self.countries[country].rstrip()).rstrip()
- else:
- return("%s" % server.rstrip("/ "))
-
- def get_server_list(self):
- ''' Return a list of used and suggested servers '''
-
- def compare_mirrors(mir1, mir2):
- '''Helper function that handles comaprision of mirror urls
- that could contain trailing slashes'''
- return re.match(mir1.strip("/ "), mir2.rstrip("/ "))
-
- # Store all available servers:
- # Name, URI, active
- mirrors = []
- if len(self.used_servers) < 1 or \
- (len(self.used_servers) == 1 and \
- compare_mirrors(self.used_servers[0], self.main_server)):
- mirrors.append([_("Main server"), self.main_server, True])
- mirrors.append([self._get_mirror_name(self.nearest_server),
- self.nearest_server, False])
- elif len(self.used_servers) == 1 and not \
- compare_mirrors(self.used_servers[0], self.main_server):
- mirrors.append([_("Main server"), self.main_server, False])
- # Only one server is used
- server = self.used_servers[0]
-
- # Append the nearest server if it's not already used
- if not compare_mirrors(server, self.nearest_server):
+ def __init__(self, id, codename, description, release):
+ """ Container for distribution specific informations """
+ # LSB information
+ self.id = id
+ self.codename = codename
+ self.description = description
+ self.release = release
+
+ self.binary_type = "deb"
+ self.source_type = "deb-src"
+
+ def get_sources(self, sourceslist):
+ """
+ Find the corresponding template, main and child sources
+ for the distribution
+ """
+
+ self.sourceslist = sourceslist
+ # corresponding sources
+ self.source_template = None
+ self.child_sources = []
+ self.main_sources = []
+ self.disabled_sources = []
+ self.cdrom_sources = []
+ self.download_comps = []
+ self.enabled_comps = []
+ self.cdrom_comps = []
+ self.used_media = []
+ self.get_source_code = False
+ self.source_code_sources = []
+
+ # location of the sources
+ self.default_server = ""
+ self.main_server = ""
+ self.nearest_server = ""
+ self.used_servers = []
+
+ # find the distro template
+ for template in self.sourceslist.matcher.templates:
+ if self.is_codename(template.name) and\
+ template.distribution == self.id:
+ #print "yeah! found a template for %s" % self.description
+ #print template.description, template.base_uri, template.components
+ self.source_template = template
+ break
+ if self.source_template == None:
+ raise (NoDistroTemplateException,
+ "Error: could not find a distribution template")
+
+ # find main and child sources
+ media = []
+ comps = []
+ cdrom_comps = []
+ enabled_comps = []
+ source_code = []
+ for source in self.sourceslist.list:
+ if source.invalid == False and\
+ self.is_codename(source.dist) and\
+ source.template and\
+ self.is_codename(source.template.name):
+ #print "yeah! found a distro repo: %s" % source.line
+ # cdroms need do be handled differently
+ if source.uri.startswith("cdrom:") and \
+ source.disabled == False:
+ self.cdrom_sources.append(source)
+ cdrom_comps.extend(source.comps)
+ elif source.uri.startswith("cdrom:") and \
+ source.disabled == True:
+ self.cdrom_sources.append(source)
+ elif source.type == self.binary_type and \
+ source.disabled == False:
+ self.main_sources.append(source)
+ comps.extend(source.comps)
+ media.append(source.uri)
+ elif source.type == self.binary_type and \
+ source.disabled == True:
+ self.disabled_sources.append(source)
+ elif source.type == self.source_type and source.disabled == False:
+ self.source_code_sources.append(source)
+ elif source.type == self.source_type and source.disabled == True:
+ self.disabled_sources.append(source)
+ if source.invalid == False and\
+ source.template in self.source_template.children:
+ if source.disabled == False and source.type == self.binary_type:
+ self.child_sources.append(source)
+ elif source.disabled == False and source.type == self.source_type:
+ self.source_code_sources.append(source)
+ else:
+ self.disabled_sources.append(source)
+ self.download_comps = set(comps)
+ self.cdrom_comps = set(cdrom_comps)
+ enabled_comps.extend(comps)
+ enabled_comps.extend(cdrom_comps)
+ self.enabled_comps = set(enabled_comps)
+ self.used_media = set(media)
+
+ self.get_mirrors()
+
+ def get_mirrors(self, mirror_template=None):
+ """
+ Provide a set of mirrors where you can get the distribution from
+ """
+ # the main server is stored in the template
+ self.main_server = self.source_template.base_uri
+
+ # other used servers
+ for medium in self.used_media:
+ if not medium.startswith("cdrom:"):
+ # seems to be a network source
+ self.used_servers.append(medium)
+
+ if len(self.main_sources) == 0:
+ self.default_server = self.main_server
+ else:
+ self.default_server = self.main_sources[0].uri
+
+ # get a list of country codes and real names
+ self.countries = {}
+ try:
+ f = open("/usr/share/iso-codes/iso_3166.tab", "r")
+ lines = f.readlines()
+ for line in lines:
+ parts = line.split("\t")
+ self.countries[parts[0].lower()] = parts[1].strip()
+ except:
+ print "could not open file '%s'" % file
+ else:
+ f.close()
+
+ # try to guess the nearest mirror from the locale
+ self.country = None
+ self.country_code = None
+ locale = os.getenv("LANG", default="en.UK")
+ a = locale.find("_")
+ z = locale.find(".")
+ if z == -1:
+ z = len(locale)
+ country_code = locale[a+1:z].lower()
+
+ if mirror_template:
+ self.nearest_server = mirror_template % country_code
+
+ if self.countries.has_key(country_code):
+ self.country = self.countries[country_code]
+ self.country_code = country_code
+
+ def _get_mirror_name(self, server):
+ ''' Try to get a human readable name for the main mirror of a country
+ Customize for different distributions '''
+ country = None
+ i = server.find("://")
+ l = server.find(".archive.ubuntu.com")
+ if i != -1 and l != -1:
+ country = server[i+len("://"):l]
+ if self.countries.has_key(country):
+ # TRANSLATORS: %s is a country
+ return _("Server for %s") % \
+ gettext.dgettext("iso_3166",
+ self.countries[country].rstrip()).rstrip()
+ else:
+ return("%s" % server.rstrip("/ "))
+
+ def get_server_list(self):
+ ''' Return a list of used and suggested servers '''
+
+ def compare_mirrors(mir1, mir2):
+ ''' Helper function that handles comaprision of mirror urls
+ that could contain trailing slashes'''
+ return re.match(mir1.strip("/ "), mir2.rstrip("/ "))
+
+ # Store all available servers:
+ # Name, URI, active
+ mirrors = []
+ if len(self.used_servers) < 1 or \
+ (len(self.used_servers) == 1 and \
+ compare_mirrors(self.used_servers[0], self.main_server)):
+ mirrors.append([_("Main server"), self.main_server, True])
mirrors.append([self._get_mirror_name(self.nearest_server),
self.nearest_server, False])
- mirrors.append([self._get_mirror_name(server), server, True])
-
- elif len(self.used_servers) > 1:
- # More than one server is used. Since we don't handle this case
- # in the user interface we set "custom servers" to true and
- # append a list of all used servers
- mirrors.append([_("Main server"), self.main_server, False])
- mirrors.append([self._get_mirror_name(self.nearest_server),
- self.nearest_server, False])
- mirrors.append([_("Custom servers"), None, True])
- for server in self.used_servers:
- if compare_mirrors(server, self.nearest_server) or\
- compare_mirrors(server, self.main_server):
- continue
- elif not [self._get_mirror_name(server), server, False] in mirrors:
- mirrors.append([self._get_mirror_name(server), server, False])
-
- return mirrors
-
- def add_source(self, type=None,
+ elif len(self.used_servers) == 1 and not \
+ compare_mirrors(self.used_servers[0], self.main_server):
+ mirrors.append([_("Main server"), self.main_server, False])
+ # Only one server is used
+ server = self.used_servers[0]
+
+ # Append the nearest server if it's not already used
+ if not compare_mirrors(server, self.nearest_server):
+ mirrors.append([self._get_mirror_name(self.nearest_server),
+ self.nearest_server, False])
+ mirrors.append([self._get_mirror_name(server), server, True])
+
+ elif len(self.used_servers) > 1:
+ # More than one server is used. Since we don't handle this case
+ # in the user interface we set "custom servers" to true and
+ # append a list of all used servers
+ mirrors.append([_("Main server"), self.main_server, False])
+ mirrors.append([self._get_mirror_name(self.nearest_server),
+ self.nearest_server, False])
+ mirrors.append([_("Custom servers"), None, True])
+ for server in self.used_servers:
+ if compare_mirrors(server, self.nearest_server) or\
+ compare_mirrors(server, self.main_server):
+ continue
+ elif not [self._get_mirror_name(server), server, False] in mirrors:
+ mirrors.append([self._get_mirror_name(server), server, False])
+
+ return mirrors
+
+ def add_source(self, type=None,
uri=None, dist=None, comps=None, comment=""):
- """
- Add distribution specific sources
- """
- if uri == None:
- # FIXME: Add support for the server selector
- uri = self.default_server
- if dist == None:
- dist = self.codename
- if comps == None:
- comps = list(self.enabled_comps)
- if type == None:
- type = self.binary_type
- new_source = self.sourceslist.add(type, uri, dist, comps, comment)
- # if source code is enabled add a deb-src line after the new
- # source
- if self.get_source_code == True and type == self.binary_type:
- self.sourceslist.add(self.source_type, uri, dist, comps, comment,
- file=new_source.file,
- pos=self.sourceslist.list.index(new_source)+1)
-
- def enable_component(self, comp):
- """
- Enable a component in all main, child and source code sources
- (excluding cdrom based sources)
-
- comp: the component that should be enabled
- """
-
- def add_component_only_once(source, comps_per_dist):
"""
- Check if we already added the component to the repository, since
- a repository could be splitted into different apt lines. If not
- add the component
+ Add distribution specific sources
"""
- # if we don't that distro, just reutnr (can happen for e.g.
- # dapper-update only in deb-src
- if not comps_per_dist.has_key(source.dist):
- return
- # if we have seen this component already for this distro,
- # return (nothing to do
- if comp in comps_per_dist[source.dist]:
- return
- # add it
- source.comps.append(comp)
- comps_per_dist[source.dist].add(comp)
-
- sources = []
- sources.extend(self.main_sources)
- sources.extend(self.child_sources)
- # store what comps are enabled already per distro (where distro is
- # e.g. "dapper", "dapper-updates")
- comps_per_dist = {}
- comps_per_sdist = {}
- for s in sources:
- if s.type == self.binary_type:
- if not comps_per_dist.has_key(s.dist):
- comps_per_dist[s.dist] = set()
- map(comps_per_dist[s.dist].add, s.comps)
- for s in self.source_code_sources:
- if s.type == self.source_type:
- if not comps_per_sdist.has_key(s.dist):
- comps_per_sdist[s.dist] = set()
- map(comps_per_sdist[s.dist].add, s.comps)
-
- # check if there is a main source at all
- if len(self.main_sources) < 1:
- # create a new main source
- self.add_source(comps=["%s"%comp])
- else:
- # add the comp to all main, child and source code sources
- for source in sources:
- add_component_only_once(source, comps_per_dist)
+ if uri == None:
+ # FIXME: Add support for the server selector
+ uri = self.default_server
+ if dist == None:
+ dist = self.codename
+ if comps == None:
+ comps = list(self.enabled_comps)
+ if type == None:
+ type = self.binary_type
+ new_source = self.sourceslist.add(type, uri, dist, comps, comment)
+ # if source code is enabled add a deb-src line after the new
+ # source
+ if self.get_source_code == True and type == self.binary_type:
+ self.sourceslist.add(self.source_type, uri, dist, comps, comment,
+ file=new_source.file,
+ pos=self.sourceslist.list.index(new_source)+1)
+
+ def enable_component(self, comp):
+ """
+ Enable a component in all main, child and source code sources
+ (excluding cdrom based sources)
- # check if there is a main source code source at all
- if self.get_source_code == True:
- if len(self.source_code_sources) < 1:
- # create a new main source
- self.add_source(type=self.source_type, comps=["%s"%comp])
- else:
- # add the comp to all main, child and source code sources
- for source in self.source_code_sources:
- add_component_only_once(source, comps_per_sdist)
-
- def disable_component(self, comp):
- """
- Disable a component in all main, child and source code sources
- (excluding cdrom based sources)
- """
- sources = []
- sources.extend(self.main_sources)
- sources.extend(self.child_sources)
- sources.extend(self.source_code_sources)
- if comp in self.cdrom_comps:
+ comp: the component that should be enabled
+ """
+
+ def add_component_only_once(source, comps_per_dist):
+ """
+ Check if we already added the component to the repository, since
+ a repository could be splitted into different apt lines. If not
+ add the component
+ """
+ # if we don't that distro, just reutnr (can happen for e.g.
+ # dapper-update only in deb-src
+ if not comps_per_dist.has_key(source.dist):
+ return
+ # if we have seen this component already for this distro,
+ # return (nothing to do
+ if comp in comps_per_dist[source.dist]:
+ return
+ # add it
+ source.comps.append(comp)
+ comps_per_dist[source.dist].add(comp)
+
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ # store what comps are enabled already per distro (where distro is
+ # e.g. "dapper", "dapper-updates")
+ comps_per_dist = {}
+ comps_per_sdist = {}
+ for s in sources:
+ if s.type == self.binary_type:
+ if not comps_per_dist.has_key(s.dist):
+ comps_per_dist[s.dist] = set()
+ map(comps_per_dist[s.dist].add, s.comps)
+ for s in self.source_code_sources:
+ if s.type == self.source_type:
+ if not comps_per_sdist.has_key(s.dist):
+ comps_per_sdist[s.dist] = set()
+ map(comps_per_sdist[s.dist].add, s.comps)
+
+ # check if there is a main source at all
+ if len(self.main_sources) < 1:
+ # create a new main source
+ self.add_source(comps=["%s"%comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in sources:
+ add_component_only_once(source, comps_per_dist)
+
+ # check if there is a main source code source at all
+ if self.get_source_code == True:
+ if len(self.source_code_sources) < 1:
+ # create a new main source
+ self.add_source(type=self.source_type, comps=["%s"%comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in self.source_code_sources:
+ add_component_only_once(source, comps_per_sdist)
+
+ def disable_component(self, comp):
+ """
+ Disable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+ """
sources = []
sources.extend(self.main_sources)
- for source in sources:
- if comp in source.comps:
- source.comps.remove(comp)
+ sources.extend(self.child_sources)
+ sources.extend(self.source_code_sources)
+ if comp in self.cdrom_comps:
+ sources = []
+ sources.extend(self.main_sources)
+ for source in sources:
+ if comp in source.comps:
+ source.comps.remove(comp)
+ if len(source.comps) < 1:
+ self.sourceslist.remove(source)
+
+ def change_server(self, uri):
+ ''' Change the server of all distro specific sources to
+ a given host '''
+
+ def change_server_of_source(source, uri, seen):
+ # Avoid creating duplicate entries
+ source.uri = uri
+ for comp in source.comps:
+ if [source.uri, source.dist, comp] in seen:
+ source.comps.remove(comp)
+ else:
+ seen.append([source.uri, source.dist, comp])
if len(source.comps) < 1:
- self.sourceslist.remove(source)
-
- def change_server(self, uri):
- ''' Change the server of all distro specific sources to
- a given host '''
+ self.sourceslist.remove(source)
- def change_server_of_source(source, uri, seen):
- # Avoid creating duplicate entries
- source.uri = uri
- for comp in source.comps:
- if [source.uri, source.dist, comp] in seen:
- source.comps.remove(comp)
- else:
- seen.append([source.uri, source.dist, comp])
- if len(source.comps) < 1:
- self.sourceslist.remove(source)
-
- seen_binary = []
- seen_source = []
- self.default_server = uri
- for source in self.main_sources:
- change_server_of_source(source, uri, seen_binary)
- for source in self.child_sources:
- # Do not change the forces server of a child source
- if source.template.base_uri == None or \
- source.template.base_uri != source.uri:
+ seen_binary = []
+ seen_source = []
+ self.default_server = uri
+ for source in self.main_sources:
change_server_of_source(source, uri, seen_binary)
- for source in self.source_code_sources:
- change_server_of_source(source, uri, seen_source)
-
- def is_codename(self, name):
- ''' Compare a given name with the release codename. '''
- if name == self.codename:
- return True
- else:
- return False
+ for source in self.child_sources:
+ # Do not change the forces server of a child source
+ if source.template.base_uri == None or \
+ source.template.base_uri != source.uri:
+ change_server_of_source(source, uri, seen_binary)
+ for source in self.source_code_sources:
+ change_server_of_source(source, uri, seen_source)
+
+ def is_codename(self, name):
+ ''' Compare a given name with the release codename. '''
+ if name == self.codename:
+ return True
+ else:
+ return False
class DebianDistribution(Distribution):
- ''' Class to support specific Debian features '''
+ ''' Class to support specific Debian features '''
- def is_codename(self, name):
- ''' Compare a given name with the release codename and check if
- if it can be used as a synonym for a development releases '''
- if name == self.codename or self.release in ("testing", "unstable"):
- return True
- else:
- return False
-
- def _get_mirror_name(self, server):
- ''' Try to get a human readable name for the main mirror of a country
- Debian specific '''
- country = None
- i = server.find("://ftp.")
- l = server.find(".debian.org")
- if i != -1 and l != -1:
- country = server[i+len("://ftp."):l]
- if self.countries.has_key(country):
- # TRANSLATORS: %s is a country
- return _("Server for %s") % \
- gettext.dgettext("iso_3166",
- self.countries[country].rstrip()).rstrip()
- else:
- return("%s" % server.rstrip("/ "))
-
- def get_mirrors(self):
- Distribution.get_mirrors(self,
- mirror_template="http://ftp.%s.debian.org/debian/")
+ def is_codename(self, name):
+ ''' Compare a given name with the release codename and check if
+ if it can be used as a synonym for a development releases '''
+ if name == self.codename or self.release in ("testing", "unstable"):
+ return True
+ else:
+ return False
+
+ def _get_mirror_name(self, server):
+ ''' Try to get a human readable name for the main mirror of a country
+ Debian specific '''
+ country = None
+ i = server.find("://ftp.")
+ l = server.find(".debian.org")
+ if i != -1 and l != -1:
+ country = server[i+len("://ftp."):l]
+ if self.countries.has_key(country):
+ # TRANSLATORS: %s is a country
+ return _("Server for %s") % \
+ gettext.dgettext("iso_3166",
+ self.countries[country].rstrip()).rstrip()
+ else:
+ return("%s" % server.rstrip("/ "))
+
+ def get_mirrors(self):
+ Distribution.get_mirrors(self,
+ mirror_template="http://ftp.%s.debian.org/debian/")
class UbuntuDistribution(Distribution):
- ''' Class to support specific Ubuntu features '''
+ ''' Class to support specific Ubuntu features '''
- def get_mirrors(self):
- Distribution.get_mirrors(self,
- mirror_template="http://%s.archive.ubuntu.com/ubuntu/")
+ def get_mirrors(self):
+ Distribution.get_mirrors(self,
+ mirror_template="http://%s.archive.ubuntu.com/ubuntu/")
def get_distro():
diff --git a/aptsources/sourceslist.py b/aptsources/sourceslist.py
index a8a772a0..57be2a68 100644
--- a/aptsources/sourceslist.py
+++ b/aptsources/sourceslist.py
@@ -41,409 +41,409 @@ from distinfo import DistInfo
def is_mirror(master_uri, compare_uri):
- """check if the given add_url is idential or a mirror of orig_uri
- e.g. master_uri = archive.ubuntu.com
- compare_uri = de.archive.ubuntu.com
- -> True
- """
- # remove traling spaces and "/"
- compare_uri = compare_uri.rstrip("/ ")
- master_uri = master_uri.rstrip("/ ")
- # uri is identical
- if compare_uri == master_uri:
- #print "Identical"
- return True
- # add uri is a master site and orig_uri has the from "XX.mastersite"
- # (e.g. de.archive.ubuntu.com)
- try:
- compare_srv = compare_uri.split("//")[1]
- master_srv = master_uri.split("//")[1]
- #print "%s == %s " % (add_srv, orig_srv)
- except IndexError: # ok, somethings wrong here
- #print "IndexError"
+ """ check if the given add_url is idential or a mirror of orig_uri
+ e.g. master_uri = archive.ubuntu.com
+ compare_uri = de.archive.ubuntu.com
+ -> True
+ """
+ # remove traling spaces and "/"
+ compare_uri = compare_uri.rstrip("/ ")
+ master_uri = master_uri.rstrip("/ ")
+ # uri is identical
+ if compare_uri == master_uri:
+ #print "Identical"
+ return True
+ # add uri is a master site and orig_uri has the from "XX.mastersite"
+ # (e.g. de.archive.ubuntu.com)
+ try:
+ compare_srv = compare_uri.split("//")[1]
+ master_srv = master_uri.split("//")[1]
+ #print "%s == %s " % (add_srv, orig_srv)
+ except IndexError: # ok, somethings wrong here
+ #print "IndexError"
+ return False
+ # remove the leading "<country>." (if any) and see if that helps
+ if "." in compare_srv and \
+ compare_srv[compare_srv.index(".")+1:] == master_srv:
+ #print "Mirror"
+ return True
return False
- # remove the leading "<country>." (if any) and see if that helps
- if "." in compare_srv and \
- compare_srv[compare_srv.index(".")+1:] == master_srv:
- #print "Mirror"
- return True
- return False
def uniq(s):
- """ simple and efficient way to return uniq list """
- return list(set(s))
+ """ simple and efficient way to return uniq list """
+ return list(set(s))
class SourceEntry:
- """ single sources.list entry """
-
- def __init__(self, line,file=None):
- self.invalid = False # is the source entry valid
- self.disabled = False # is it disabled ('#' in front)
- self.type = "" # what type (deb, deb-src)
- self.uri = "" # base-uri
- self.dist = "" # distribution (dapper, edgy, etc)
- self.comps = [] # list of available componetns (may empty)
- self.comment = "" # (optional) comment
- self.line = line # the original sources.list line
- if file == None:
- file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist")
- self.file = file # the file that the entry is located in
- self.parse(line)
- self.template = None # type DistInfo.Suite
- self.children = []
-
- def __eq__(self, other):
- """ equal operator for two sources.list entries """
- return (self.disabled == other.disabled and
- self.type == other.type and
- self.uri == other.uri and
- self.dist == other.dist and
- self.comps == other.comps)
-
- def mysplit(self, line):
- """ a split() implementation that understands the sources.list
- format better and takes [] into account (for e.g. cdroms) """
- line = string.strip(line)
- pieces = []
- tmp = ""
- # we are inside a [..] block
- p_found = False
- space_found = False
- for i in range(len(line)):
- if line[i] == "[":
- p_found=True
- tmp += line[i]
- elif line[i] == "]":
- p_found=False
- tmp += line[i]
- elif space_found and not line[i].isspace(): # we skip one or more space
+ """ single sources.list entry """
+
+ def __init__(self, line,file=None):
+ self.invalid = False # is the source entry valid
+ self.disabled = False # is it disabled ('#' in front)
+ self.type = "" # what type (deb, deb-src)
+ self.uri = "" # base-uri
+ self.dist = "" # distribution (dapper, edgy, etc)
+ self.comps = [] # list of available componetns (may empty)
+ self.comment = "" # (optional) comment
+ self.line = line # the original sources.list line
+ if file == None:
+ file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist")
+ self.file = file # the file that the entry is located in
+ self.parse(line)
+ self.template = None # type DistInfo.Suite
+ self.children = []
+
+ def __eq__(self, other):
+ """ equal operator for two sources.list entries """
+ return (self.disabled == other.disabled and
+ self.type == other.type and
+ self.uri == other.uri and
+ self.dist == other.dist and
+ self.comps == other.comps)
+
+ def mysplit(self, line):
+ """ a split() implementation that understands the sources.list
+ format better and takes [] into account (for e.g. cdroms) """
+ line = string.strip(line)
+ pieces = []
+ tmp = ""
+ # we are inside a [..] block
+ p_found = False
space_found = False
- pieces.append(tmp)
- tmp = line[i]
- elif line[i].isspace() and not p_found: # found a whitespace
- space_found = True
- else:
- tmp += line[i]
- # append last piece
- if len(tmp) > 0:
- pieces.append(tmp)
- return pieces
-
- def parse(self,line):
- """ parse a given sources.list (textual) line and break it up
- into the field we have """
- line = string.strip(self.line)
- #print line
- # check if the source is enabled/disabled
- if line == "" or line == "#": # empty line
- self.invalid = True
- return
- if line[0] == "#":
- self.disabled = True
- pieces = string.split(line[1:])
- # if it looks not like a disabled deb line return
- if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"):
- self.invalid = True
- return
- else:
- line = line[1:]
- # check for another "#" in the line (this is treated as a comment)
- i = line.find("#")
- if i > 0:
- self.comment = line[i+1:]
- line = line[:i]
- # source is ok, split it and see what we have
- pieces = self.mysplit(line)
- # Sanity check
- if len(pieces) < 3:
- self.invalid = True
- return
- # Type, deb or deb-src
- self.type = string.strip(pieces[0])
- # Sanity check
- if self.type not in ("deb", "deb-src", "rpm", "rpm-src"):
- self.invalid = True
- return
- # URI
- self.uri = string.strip(pieces[1])
- if len(self.uri) < 1:
- self.invalid = True
- # distro and components (optional)
- # Directory or distro
- self.dist = string.strip(pieces[2])
- if len(pieces) > 3:
- # List of components
- self.comps = pieces[3:]
- else:
- self.comps = []
-
- def set_enabled(self, new_value):
- """ set a line to enabled or disabled """
- self.disabled = not new_value
- # enable, remove all "#" from the start of the line
- if new_value == True:
- i=0
- self.line = string.lstrip(self.line)
- while self.line[i] == "#":
- i += 1
- self.line = self.line[i:]
- else:
- # disabled, add a "#"
- if string.strip(self.line)[0] != "#":
- self.line = "#" + self.line
-
- def __str__(self):
- """ debug helper """
- return self.str().strip()
-
- def str(self):
- """ return the current line as string """
- if self.invalid:
- return self.line
- line = ""
- if self.disabled:
- line = "# "
- line += "%s %s %s" % (self.type, self.uri, self.dist)
- if len(self.comps) > 0:
- line += " " + " ".join(self.comps)
- if self.comment != "":
- line += " #"+self.comment
- line += "\n"
- return line
+ for i in range(len(line)):
+ if line[i] == "[":
+ p_found=True
+ tmp += line[i]
+ elif line[i] == "]":
+ p_found=False
+ tmp += line[i]
+ elif space_found and not line[i].isspace(): # we skip one or more space
+ space_found = False
+ pieces.append(tmp)
+ tmp = line[i]
+ elif line[i].isspace() and not p_found: # found a whitespace
+ space_found = True
+ else:
+ tmp += line[i]
+ # append last piece
+ if len(tmp) > 0:
+ pieces.append(tmp)
+ return pieces
+
+ def parse(self,line):
+ """ parse a given sources.list (textual) line and break it up
+ into the field we have """
+ line = string.strip(self.line)
+ #print line
+ # check if the source is enabled/disabled
+ if line == "" or line == "#": # empty line
+ self.invalid = True
+ return
+ if line[0] == "#":
+ self.disabled = True
+ pieces = string.split(line[1:])
+ # if it looks not like a disabled deb line return
+ if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"):
+ self.invalid = True
+ return
+ else:
+ line = line[1:]
+ # check for another "#" in the line (this is treated as a comment)
+ i = line.find("#")
+ if i > 0:
+ self.comment = line[i+1:]
+ line = line[:i]
+ # source is ok, split it and see what we have
+ pieces = self.mysplit(line)
+ # Sanity check
+ if len(pieces) < 3:
+ self.invalid = True
+ return
+ # Type, deb or deb-src
+ self.type = string.strip(pieces[0])
+ # Sanity check
+ if self.type not in ("deb", "deb-src", "rpm", "rpm-src"):
+ self.invalid = True
+ return
+ # URI
+ self.uri = string.strip(pieces[1])
+ if len(self.uri) < 1:
+ self.invalid = True
+ # distro and components (optional)
+ # Directory or distro
+ self.dist = string.strip(pieces[2])
+ if len(pieces) > 3:
+ # List of components
+ self.comps = pieces[3:]
+ else:
+ self.comps = []
+
+ def set_enabled(self, new_value):
+ """ set a line to enabled or disabled """
+ self.disabled = not new_value
+ # enable, remove all "#" from the start of the line
+ if new_value == True:
+ i=0
+ self.line = string.lstrip(self.line)
+ while self.line[i] == "#":
+ i += 1
+ self.line = self.line[i:]
+ else:
+ # disabled, add a "#"
+ if string.strip(self.line)[0] != "#":
+ self.line = "#" + self.line
+
+ def __str__(self):
+ """ debug helper """
+ return self.str().strip()
+
+ def str(self):
+ """ return the current line as string """
+ if self.invalid:
+ return self.line
+ line = ""
+ if self.disabled:
+ line = "# "
+ line += "%s %s %s" % (self.type, self.uri, self.dist)
+ if len(self.comps) > 0:
+ line += " " + " ".join(self.comps)
+ if self.comment != "":
+ line += " #"+self.comment
+ line += "\n"
+ return line
class NullMatcher(object):
- """ a Matcher that does nothing """
+ """ a Matcher that does nothing """
- def match(self, s):
- return True
+ def match(self, s):
+ return True
class SourcesList:
- """ represents the full sources.list + sources.list.d file """
-
- def __init__(self,
- withMatcher=True,
- matcherPath="/usr/share/python-apt/templates/"):
- self.list = [] # the actual SourceEntries Type
- if withMatcher:
- self.matcher = SourceEntryMatcher(matcherPath)
- else:
- self.matcher = NullMatcher()
- self.refresh()
-
- def refresh(self):
- """ update the list of known entries """
- self.list = []
- # read sources.list
- dir = apt_pkg.Config.FindDir("Dir::Etc")
- file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
- self.load(dir+file)
- # read sources.list.d
- partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
- for file in glob.glob("%s/*.list" % partsdir):
- self.load(file)
- # check if the source item fits a predefined template
- for source in self.list:
- if source.invalid == False:
- self.matcher.match(source)
-
- def __iter__(self):
- """ simple iterator to go over self.list, returns SourceEntry
- types """
- for entry in self.list:
- yield entry
- raise StopIteration
-
- def add(self, type, uri, dist, orig_comps, comment="", pos=-1, file=None):
- """
- Add a new source to the sources.list.
- The method will search for existing matching repos and will try to
- reuse them as far as possible
- """
- # create a working copy of the component list so that
- # we can modify it later
- comps = orig_comps[:]
- # check if we have this source already in the sources.list
- for source in self.list:
- if source.disabled == False and source.invalid == False and \
- source.type == type and uri == source.uri and \
- source.dist == dist:
- for new_comp in comps:
- if new_comp in source.comps:
- # we have this component already, delete it from the new_comps
- # list
- del comps[comps.index(new_comp)]
- if len(comps) == 0:
- return source
- for source in self.list:
- # if there is a repo with the same (type, uri, dist) just add the
- # components
- if source.disabled == False and source.invalid == False and \
- source.type == type and uri == source.uri and \
- source.dist == dist:
- comps = uniq(source.comps + comps)
- source.comps = comps
- return source
- # if there is a corresponding repo which is disabled, enable it
- elif source.disabled == True and source.invalid == False and \
- source.type == type and uri == source.uri and \
- source.dist == dist and \
- len(set(source.comps) & set(comps)) == len(comps):
- source.disabled = False
- return source
- # there isn't any matching source, so create a new line and parse it
- line = "%s %s %s" % (type,uri,dist)
- for c in comps:
- line = line + " " + c;
- if comment != "":
- line = "%s #%s\n" %(line,comment)
- line = line + "\n"
- new_entry = SourceEntry(line)
- if file != None:
- new_entry.file = file
- self.matcher.match(new_entry)
- self.list.insert(pos, new_entry)
- return new_entry
-
- def remove(self, source_entry):
- """ remove the specified entry from the sources.list """
- self.list.remove(source_entry)
-
- def restoreBackup(self, backup_ext):
- " restore sources.list files based on the backup extension "
- dir = apt_pkg.Config.FindDir("Dir::Etc")
- file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
- if os.path.exists(dir+file+backup_ext) and \
- os.path.exists(dir+file):
- shutil.copy(dir+file+backup_ext,dir+file)
- # now sources.list.d
- partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
- for file in glob.glob("%s/*.list" % partsdir):
- if os.path.exists(file+backup_ext):
- shutil.copy(file+backup_ext,file)
-
- def backup(self, backup_ext=None):
- """ make a backup of the current source files, if no backup extension
- is given, the current date/time is used (and returned) """
- already_backuped = set()
- if backup_ext == None:
- backup_ext = time.strftime("%y%m%d.%H%M")
- for source in self.list:
- if not source.file in already_backuped and os.path.exists(source.file):
- shutil.copy(source.file,"%s%s" % (source.file,backup_ext))
- return backup_ext
-
- def load(self,file):
- """ (re)load the current sources """
- try:
- f = open(file, "r")
- lines = f.readlines()
- for line in lines:
- source = SourceEntry(line,file)
- self.list.append(source)
- except:
- print "could not open file '%s'" % file
- else:
- f.close()
-
- def save(self):
- """ save the current sources """
- files = {}
- # write an empty default config file if there aren't any sources
- if len(self.list) == 0:
- path = "%s%s" % (apt_pkg.Config.FindDir("Dir::Etc"),
- apt_pkg.Config.Find("Dir::Etc::sourcelist"))
- header = ("## See sources.list(5) for more information, especialy\n"
- "# Remember that you can only use http, ftp or file URIs\n"
- "# CDROMs are managed through the apt-cdrom tool.\n")
- open(path,"w").write(header)
- return
- for source in self.list:
- if not files.has_key(source.file):
- files[source.file]=open(source.file,"w")
- files[source.file].write(source.str())
- for f in files:
- files[f].close()
-
- def check_for_relations(self, sources_list):
- """get all parent and child channels in the sources list"""
- parents = []
- used_child_templates = {}
- for source in sources_list:
- # try to avoid checking uninterressting sources
- if source.template == None:
- continue
- # set up a dict with all used child templates and corresponding
- # source entries
- if source.template.child == True:
- key = source.template
- if not used_child_templates.has_key(key):
- used_child_templates[key] = []
- temp = used_child_templates[key]
- temp.append(source)
- else:
- # store each source with children aka. a parent :)
- if len(source.template.children) > 0:
- parents.append(source)
- #print self.used_child_templates
- #print self.parents
- return (parents, used_child_templates)
+ """ represents the full sources.list + sources.list.d file """
+
+ def __init__(self,
+ withMatcher=True,
+ matcherPath="/usr/share/python-apt/templates/"):
+ self.list = [] # the actual SourceEntries Type
+ if withMatcher:
+ self.matcher = SourceEntryMatcher(matcherPath)
+ else:
+ self.matcher = NullMatcher()
+ self.refresh()
+
+ def refresh(self):
+ """ update the list of known entries """
+ self.list = []
+ # read sources.list
+ dir = apt_pkg.Config.FindDir("Dir::Etc")
+ file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
+ self.load(dir+file)
+ # read sources.list.d
+ partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
+ for file in glob.glob("%s/*.list" % partsdir):
+ self.load(file)
+ # check if the source item fits a predefined template
+ for source in self.list:
+ if source.invalid == False:
+ self.matcher.match(source)
+
+ def __iter__(self):
+ """ simple iterator to go over self.list, returns SourceEntry
+ types """
+ for entry in self.list:
+ yield entry
+ raise StopIteration
+
+ def add(self, type, uri, dist, orig_comps, comment="", pos=-1, file=None):
+ """
+ Add a new source to the sources.list.
+ The method will search for existing matching repos and will try to
+ reuse them as far as possible
+ """
+ # create a working copy of the component list so that
+ # we can modify it later
+ comps = orig_comps[:]
+ # check if we have this source already in the sources.list
+ for source in self.list:
+ if source.disabled == False and source.invalid == False and \
+ source.type == type and uri == source.uri and \
+ source.dist == dist:
+ for new_comp in comps:
+ if new_comp in source.comps:
+ # we have this component already, delete it from the new_comps
+ # list
+ del comps[comps.index(new_comp)]
+ if len(comps) == 0:
+ return source
+ for source in self.list:
+ # if there is a repo with the same (type, uri, dist) just add the
+ # components
+ if source.disabled == False and source.invalid == False and \
+ source.type == type and uri == source.uri and \
+ source.dist == dist:
+ comps = uniq(source.comps + comps)
+ source.comps = comps
+ return source
+ # if there is a corresponding repo which is disabled, enable it
+ elif source.disabled == True and source.invalid == False and \
+ source.type == type and uri == source.uri and \
+ source.dist == dist and \
+ len(set(source.comps) & set(comps)) == len(comps):
+ source.disabled = False
+ return source
+ # there isn't any matching source, so create a new line and parse it
+ line = "%s %s %s" % (type,uri,dist)
+ for c in comps:
+ line = line + " " + c;
+ if comment != "":
+ line = "%s #%s\n" %(line,comment)
+ line = line + "\n"
+ new_entry = SourceEntry(line)
+ if file != None:
+ new_entry.file = file
+ self.matcher.match(new_entry)
+ self.list.insert(pos, new_entry)
+ return new_entry
+
+ def remove(self, source_entry):
+ """ remove the specified entry from the sources.list """
+ self.list.remove(source_entry)
+
+ def restoreBackup(self, backup_ext):
+ " restore sources.list files based on the backup extension "
+ dir = apt_pkg.Config.FindDir("Dir::Etc")
+ file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
+ if os.path.exists(dir+file+backup_ext) and \
+ os.path.exists(dir+file):
+ shutil.copy(dir+file+backup_ext,dir+file)
+ # now sources.list.d
+ partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
+ for file in glob.glob("%s/*.list" % partsdir):
+ if os.path.exists(file+backup_ext):
+ shutil.copy(file+backup_ext,file)
+
+ def backup(self, backup_ext=None):
+ """ make a backup of the current source files, if no backup extension
+ is given, the current date/time is used (and returned) """
+ already_backuped = set()
+ if backup_ext == None:
+ backup_ext = time.strftime("%y%m%d.%H%M")
+ for source in self.list:
+ if not source.file in already_backuped and os.path.exists(source.file):
+ shutil.copy(source.file,"%s%s" % (source.file,backup_ext))
+ return backup_ext
+
+ def load(self,file):
+ """ (re)load the current sources """
+ try:
+ f = open(file, "r")
+ lines = f.readlines()
+ for line in lines:
+ source = SourceEntry(line,file)
+ self.list.append(source)
+ except:
+ print "could not open file '%s'" % file
+ else:
+ f.close()
+
+ def save(self):
+ """ save the current sources """
+ files = {}
+ # write an empty default config file if there aren't any sources
+ if len(self.list) == 0:
+ path = "%s%s" % (apt_pkg.Config.FindDir("Dir::Etc"),
+ apt_pkg.Config.Find("Dir::Etc::sourcelist"))
+ header = ("## See sources.list(5) for more information, especialy\n"
+ "# Remember that you can only use http, ftp or file URIs\n"
+ "# CDROMs are managed through the apt-cdrom tool.\n")
+ open(path,"w").write(header)
+ return
+ for source in self.list:
+ if not files.has_key(source.file):
+ files[source.file]=open(source.file,"w")
+ files[source.file].write(source.str())
+ for f in files:
+ files[f].close()
+
+ def check_for_relations(self, sources_list):
+ """get all parent and child channels in the sources list"""
+ parents = []
+ used_child_templates = {}
+ for source in sources_list:
+ # try to avoid checking uninterressting sources
+ if source.template == None:
+ continue
+ # set up a dict with all used child templates and corresponding
+ # source entries
+ if source.template.child == True:
+ key = source.template
+ if not used_child_templates.has_key(key):
+ used_child_templates[key] = []
+ temp = used_child_templates[key]
+ temp.append(source)
+ else:
+ # store each source with children aka. a parent :)
+ if len(source.template.children) > 0:
+ parents.append(source)
+ #print self.used_child_templates
+ #print self.parents
+ return (parents, used_child_templates)
class SourceEntryMatcher:
- """ matcher class to make a source entry look nice
- lots of predefined matchers to make it i18n/gettext friendly
- """
-
- def __init__(self, matcherPath):
- self.templates = []
- # Get the human readable channel and comp names from the channel .infos
- spec_files = glob.glob("%s/*.info" % matcherPath)
- for f in spec_files:
- f = os.path.basename(f)
- i = f.find(".info")
- f = f[0:i]
- dist = DistInfo(f,base_dir=matcherPath)
- for template in dist.templates:
- if template.match_uri != None:
- self.templates.append(template)
- return
-
- def match(self, source):
- """Add a matching template to the source"""
- _ = gettext.gettext
- found = False
- for template in self.templates:
- if (re.search(template.match_uri, source.uri) and
- re.match(template.match_name, source.dist)):
- found = True
- source.template = template
- break
- elif (template.is_mirror(source.uri) and
- re.match(template.match_name, source.dist)):
- found = True
- source.template = template
- break
- return found
+ """ matcher class to make a source entry look nice
+ lots of predefined matchers to make it i18n/gettext friendly
+ """
+
+ def __init__(self, matcherPath):
+ self.templates = []
+ # Get the human readable channel and comp names from the channel .infos
+ spec_files = glob.glob("%s/*.info" % matcherPath)
+ for f in spec_files:
+ f = os.path.basename(f)
+ i = f.find(".info")
+ f = f[0:i]
+ dist = DistInfo(f,base_dir=matcherPath)
+ for template in dist.templates:
+ if template.match_uri != None:
+ self.templates.append(template)
+ return
+
+ def match(self, source):
+ """Add a matching template to the source"""
+ _ = gettext.gettext
+ found = False
+ for template in self.templates:
+ if (re.search(template.match_uri, source.uri) and
+ re.match(template.match_name, source.dist)):
+ found = True
+ source.template = template
+ break
+ elif (template.is_mirror(source.uri) and
+ re.match(template.match_name, source.dist)):
+ found = True
+ source.template = template
+ break
+ return found
# some simple tests
if __name__ == "__main__":
- apt_pkg.InitConfig()
- sources = SourcesList()
+ apt_pkg.InitConfig()
+ sources = SourcesList()
- for entry in sources:
- print entry.str()
- #print entry.uri
+ for entry in sources:
+ print entry.str()
+ #print entry.uri
- mirror = is_mirror("http://archive.ubuntu.com/ubuntu/",
- "http://de.archive.ubuntu.com/ubuntu/")
- print "is_mirror(): %s" % mirror
+ mirror = is_mirror("http://archive.ubuntu.com/ubuntu/",
+ "http://de.archive.ubuntu.com/ubuntu/")
+ print "is_mirror(): %s" % mirror
- print is_mirror("http://archive.ubuntu.com/ubuntu",
- "http://de.archive.ubuntu.com/ubuntu/")
- print is_mirror("http://archive.ubuntu.com/ubuntu/",
- "http://de.archive.ubuntu.com/ubuntu")
+ print is_mirror("http://archive.ubuntu.com/ubuntu",
+ "http://de.archive.ubuntu.com/ubuntu/")
+ print is_mirror("http://archive.ubuntu.com/ubuntu/",
+ "http://de.archive.ubuntu.com/ubuntu")
diff --git a/doc/examples/acquire.py b/doc/examples/acquire.py
index 07f8da0e..7bf7d646 100644
--- a/doc/examples/acquire.py
+++ b/doc/examples/acquire.py
@@ -6,25 +6,25 @@ import tempfile
def get_file(fetcher, uri, destFile):
- cwd = os.getcwd()
- # create a temp dir
- dir = tempfile.mkdtemp()
- os.chdir(dir)
- # get the file
- af = apt_pkg.GetPkgAcqFile(fetcher,
- uri=uri,
- descr="sample descr")
- res = fetcher.Run()
- if res != fetcher.ResultContinue:
- os.rmdir(dir)
- os.chdir(cwd)
- return False
- filename = os.path.basename(uri)
- os.rename(dir+"/"+filename,destFile)
- # cleanup
- os.rmdir(dir)
- os.chdir(cwd)
- return True
+ cwd = os.getcwd()
+ # create a temp dir
+ dir = tempfile.mkdtemp()
+ os.chdir(dir)
+ # get the file
+ af = apt_pkg.GetPkgAcqFile(fetcher,
+ uri=uri,
+ descr="sample descr")
+ res = fetcher.Run()
+ if res != fetcher.ResultContinue:
+ os.rmdir(dir)
+ os.chdir(cwd)
+ return False
+ filename = os.path.basename(uri)
+ os.rename(dir+"/"+filename,destFile)
+ # cleanup
+ os.rmdir(dir)
+ os.chdir(cwd)
+ return True
apt_pkg.init()
diff --git a/doc/examples/build-deps.py b/doc/examples/build-deps.py
index 2d83a54f..dc1a6f4e 100755
--- a/doc/examples/build-deps.py
+++ b/doc/examples/build-deps.py
@@ -7,17 +7,17 @@ import sets # only needed for python2.3, python2.4 supports this naively
def get_source_pkg(pkg, records, depcache):
- """ get the source package name of a given package """
- version = depcache.GetCandidateVer(pkg)
- if not version:
- return None
- file, index = version.FileList.pop(0)
- records.Lookup((file, index))
- if records.SourcePkg != "":
- srcpkg = records.SourcePkg
- else:
- srcpkg = pkg.Name
- return srcpkg
+ """ get the source package name of a given package """
+ version = depcache.GetCandidateVer(pkg)
+ if not version:
+ return None
+ file, index = version.FileList.pop(0)
+ records.Lookup((file, index))
+ if records.SourcePkg != "":
+ srcpkg = records.SourcePkg
+ else:
+ srcpkg = pkg.Name
+ return srcpkg
# main
@@ -30,45 +30,45 @@ srcrecords = apt_pkg.GetPkgSrcRecords()
# base package that we use for build-depends calculation
if len(sys.argv) < 2:
- print "need a package name as argument"
- sys.exit(1)
+ print "need a package name as argument"
+ sys.exit(1)
try:
- base = cache[sys.argv[1]]
+ base = cache[sys.argv[1]]
except KeyError:
- print "No package %s found" % sys.argv[1]
- sys.exit(1)
+ print "No package %s found" % sys.argv[1]
+ sys.exit(1)
all_build_depends = sets.Set()
# get the build depdends for the package itself
srcpkg_name = get_source_pkg(base, records, depcache)
print "srcpkg_name: %s " % srcpkg_name
if not srcpkg_name:
- print "Can't find source package for '%s'" % pkg.Name
+ print "Can't find source package for '%s'" % pkg.Name
srcrec = srcrecords.Lookup(srcpkg_name)
if srcrec:
- print "Files:"
- print srcrecords.Files
- bd = srcrecords.BuildDepends
- print "build-depends of the package: %s " % bd
- for b in bd:
- all_build_depends.add(b[0])
+ print "Files:"
+ print srcrecords.Files
+ bd = srcrecords.BuildDepends
+ print "build-depends of the package: %s " % bd
+ for b in bd:
+ all_build_depends.add(b[0])
# calculate the build depends for all dependencies
depends = depcache.GetCandidateVer(base).DependsList
for dep in depends["Depends"]: # FIXME: do we need to consider PreDepends?
- pkg = dep[0].TargetPkg
- srcpkg_name = get_source_pkg(pkg, records, depcache)
- if not srcpkg_name:
- print "Can't find source package for '%s'" % pkg.Name
- continue
- srcrec = srcrecords.Lookup(srcpkg_name)
- if srcrec:
- #print srcrecords.Package
- #print srcrecords.Binaries
- bd = srcrecords.BuildDepends
- #print "%s: %s " % (srcpkg_name, bd)
- for b in bd:
- all_build_depends.add(b[0])
+ pkg = dep[0].TargetPkg
+ srcpkg_name = get_source_pkg(pkg, records, depcache)
+ if not srcpkg_name:
+ print "Can't find source package for '%s'" % pkg.Name
+ continue
+ srcrec = srcrecords.Lookup(srcpkg_name)
+ if srcrec:
+ #print srcrecords.Package
+ #print srcrecords.Binaries
+ bd = srcrecords.BuildDepends
+ #print "%s: %s " % (srcpkg_name, bd)
+ for b in bd:
+ all_build_depends.add(b[0])
print "\n".join(all_build_depends)
diff --git a/doc/examples/checkstate.py b/doc/examples/checkstate.py
index 2986872f..3368d500 100755
--- a/doc/examples/checkstate.py
+++ b/doc/examples/checkstate.py
@@ -14,23 +14,23 @@ packages = cache.Packages
uninstalled, updated, upgradable = {}, {}, {}
for package in packages:
- versions = package.VersionList
- if not versions:
- continue
- version = versions[0]
- for other_version in versions:
- if apt_pkg.VersionCompare(version.VerStr, other_version.VerStr)<0:
- version = other_version
- if package.CurrentVer:
- current = package.CurrentVer
- if apt_pkg.VersionCompare(current.VerStr, version.VerStr)<0:
- upgradable[package.Name] = version
- break
- else:
- updated[package.Name] = current
- else:
- uninstalled[package.Name] = version
+ versions = package.VersionList
+ if not versions:
+ continue
+ version = versions[0]
+ for other_version in versions:
+ if apt_pkg.VersionCompare(version.VerStr, other_version.VerStr)<0:
+ version = other_version
+ if package.CurrentVer:
+ current = package.CurrentVer
+ if apt_pkg.VersionCompare(current.VerStr, version.VerStr)<0:
+ upgradable[package.Name] = version
+ break
+ else:
+ updated[package.Name] = current
+ else:
+ uninstalled[package.Name] = version
for l in (uninstalled, updated, upgradable):
- print l.items()[0]
+ print l.items()[0]
diff --git a/doc/examples/config.py b/doc/examples/config.py
index 222c1331..337899f0 100755
--- a/doc/examples/config.py
+++ b/doc/examples/config.py
@@ -25,31 +25,31 @@ print "Command line is",sys.argv
# Load the default configuration file, InitConfig() does this better..
Cnf.Set("config-file","/etc/apt/apt.conf"); # or Cnf["config-file"] = "..";
if posixpath.exists(Cnf.FindFile("config-file")):
- apt_pkg.ReadConfigFile(Cnf,"/etc/apt/apt.conf");
+ apt_pkg.ReadConfigFile(Cnf,"/etc/apt/apt.conf");
# Merge the command line arguments into the configuration space
Arguments = [('h',"help","help"),
('v',"version","version"),
('q',"quiet","quiet","IntLevel"),
('c',"config-file","","ConfigFile"),
- ('o',"option","","ArbItem")]
+ ('o',"option","","ArbItem")]
print "FileNames",apt_pkg.ParseCommandLine(Cnf,Arguments,sys.argv);
print "Quiet level selected is",Cnf.FindI("quiet",0);
# Do some stuff with it
if Cnf.FindB("version",0) == 1:
- print "Version selected - 1.1";
+ print "Version selected - 1.1";
if Cnf.FindB("help",0) == 1:
- print apt_pkg.Package,apt_pkg.Version,"for",apt_pkg.Architecture, \
- "compiled on",apt_pkg.Date,apt_pkg.Time;
- print "Hi, I am the help text for this program";
- sys.exit(0);
+ print apt_pkg.Package,apt_pkg.Version,"for",apt_pkg.Architecture, \
+ "compiled on",apt_pkg.Date,apt_pkg.Time;
+ print "Hi, I am the help text for this program";
+ sys.exit(0);
print "No help for you, try -h";
# Print the configuration space
print "The Configuration space looks like:";
for I in Cnf.keys():
- print "%s \"%s\";"%(I,Cnf[I]);
+ print "%s \"%s\";"%(I,Cnf[I]);
diff --git a/doc/examples/configisc.py b/doc/examples/configisc.py
index 1773a919..45a9c7f6 100755
--- a/doc/examples/configisc.py
+++ b/doc/examples/configisc.py
@@ -13,8 +13,8 @@ import apt_pkg,sys,posixpath;
ConfigFile = apt_pkg.ParseCommandLine(apt_pkg.Config,[],sys.argv);
if len(ConfigFile) != 1:
- print "Must have exactly 1 file name";
- sys.exit(0);
+ print "Must have exactly 1 file name";
+ sys.exit(0);
Cnf = apt_pkg.newConfiguration();
apt_pkg.ReadConfigFileISC(Cnf,ConfigFile[0]);
@@ -26,14 +26,14 @@ apt_pkg.ReadConfigFileISC(Cnf,ConfigFile[0]);
# bind8 config file..
if Cnf.has_key("Zone"):
- print "Zones: ",Cnf.SubTree("zone").List();
- for I in Cnf.List("zone"):
- SubCnf = Cnf.SubTree(I);
- if SubCnf.Find("type") == "slave":
- print "Masters for %s: %s"%(SubCnf.MyTag(),SubCnf.ValueList("masters"));
+ print "Zones: ",Cnf.SubTree("zone").List();
+ for I in Cnf.List("zone"):
+ SubCnf = Cnf.SubTree(I);
+ if SubCnf.Find("type") == "slave":
+ print "Masters for %s: %s"%(SubCnf.MyTag(),SubCnf.ValueList("masters"));
else:
- print "Tree definitions:";
- for I in Cnf.List("tree"):
- SubCnf = Cnf.SubTree(I);
- # This could use Find which would eliminate the possibility of exceptions.
- print "Subtree %s with sections '%s' and architectures '%s'"%(SubCnf.MyTag(),SubCnf["Sections"],SubCnf["Architectures"]);
+ print "Tree definitions:";
+ for I in Cnf.List("tree"):
+ SubCnf = Cnf.SubTree(I);
+ # This could use Find which would eliminate the possibility of exceptions.
+ print "Subtree %s with sections '%s' and architectures '%s'"%(SubCnf.MyTag(),SubCnf["Sections"],SubCnf["Architectures"]);
diff --git a/doc/examples/dependant-pkgs.py b/doc/examples/dependant-pkgs.py
index 2de420a5..bb10ce70 100755
--- a/doc/examples/dependant-pkgs.py
+++ b/doc/examples/dependant-pkgs.py
@@ -6,27 +6,27 @@ import sys
pkgs = set()
cache = apt.Cache()
for pkg in cache:
- candver = cache._depcache.GetCandidateVer(pkg._pkg)
- if candver == None:
- continue
- dependslist = candver.DependsList
- for dep in dependslist.keys():
- # get the list of each dependency object
- for depVerList in dependslist[dep]:
- for z in depVerList:
- # get all TargetVersions of
- # the dependency object
- for tpkg in z.AllTargets():
- if sys.argv[1] == tpkg.ParentPkg.Name:
- pkgs.add(pkg.name)
+ candver = cache._depcache.GetCandidateVer(pkg._pkg)
+ if candver == None:
+ continue
+ dependslist = candver.DependsList
+ for dep in dependslist.keys():
+ # get the list of each dependency object
+ for depVerList in dependslist[dep]:
+ for z in depVerList:
+ # get all TargetVersions of
+ # the dependency object
+ for tpkg in z.AllTargets():
+ if sys.argv[1] == tpkg.ParentPkg.Name:
+ pkgs.add(pkg.name)
main = set()
universe = set()
for pkg in pkgs:
- if "universe" in cache[pkg].section:
- universe.add(cache[pkg].sourcePackageName)
- else:
- main.add(cache[pkg].sourcePackageName)
+ if "universe" in cache[pkg].section:
+ universe.add(cache[pkg].sourcePackageName)
+ else:
+ main.add(cache[pkg].sourcePackageName)
print "main:"
print "\n".join(main)
diff --git a/doc/examples/gui-inst.py b/doc/examples/gui-inst.py
index 28887d34..edaec455 100755
--- a/doc/examples/gui-inst.py
+++ b/doc/examples/gui-inst.py
@@ -22,51 +22,51 @@ from apt.progress import OpProgress, FetchProgress, InstallProgress
class GuiFetchProgress(gtk.Window, FetchProgress):
def __init__(self):
- gtk.Window.__init__(self)
- self.vbox = gtk.VBox()
- self.vbox.show()
- self.add(self.vbox)
- self.progress = gtk.ProgressBar()
- self.progress.show()
- self.label = gtk.Label()
- self.label.show()
- self.vbox.pack_start(self.progress)
- self.vbox.pack_start(self.label)
- self.resize(300,100)
+ gtk.Window.__init__(self)
+ self.vbox = gtk.VBox()
+ self.vbox.show()
+ self.add(self.vbox)
+ self.progress = gtk.ProgressBar()
+ self.progress.show()
+ self.label = gtk.Label()
+ self.label.show()
+ self.vbox.pack_start(self.progress)
+ self.vbox.pack_start(self.label)
+ self.resize(300,100)
def start(self):
print "start"
- self.progress.set_fraction(0.0)
+ self.progress.set_fraction(0.0)
self.show()
def stop(self):
- self.hide()
+ self.hide()
def pulse(self):
FetchProgress.pulse(self)
self.label.set_text("Speed: %s/s" % apt_pkg.SizeToStr(self.currentCPS))
- #self.progressbar.set_fraction(self.currentBytes/self.totalBytes)
- while gtk.events_pending():
- gtk.main_iteration()
+ #self.progressbar.set_fraction(self.currentBytes/self.totalBytes)
+ while gtk.events_pending():
+ gtk.main_iteration()
return True
class TermInstallProgress(InstallProgress, gtk.Window):
def __init__(self):
- gtk.Window.__init__(self)
+ gtk.Window.__init__(self)
InstallProgress.__init__(self)
- self.show()
+ self.show()
box = gtk.VBox()
box.show()
self.add(box)
- self.term = vte.Terminal()
- self.term.show()
+ self.term = vte.Terminal()
+ self.term.show()
# check for the child
self.reaper = vte.reaper_get()
self.reaper.connect("child-exited",self.child_exited)
self.finished = False
- box.pack_start(self.term)
+ box.pack_start(self.term)
self.progressbar = gtk.ProgressBar()
self.progressbar.show()
box.pack_start(self.progressbar)
@@ -115,16 +115,16 @@ iprogress = TermInstallProgress()
# show the interface
while gtk.events_pending():
- gtk.main_iteration()
+ gtk.main_iteration()
pkg = cache["3dchess"]
print "\n%s"%pkg.name
# install or remove, the importend thing is to keep us busy :)
if pkg.isInstalled:
- pkg.markDelete()
+ pkg.markDelete()
else:
- pkg.markInstall()
+ pkg.markInstall()
cache.commit(fprogress, iprogress)
print "Exiting"
diff --git a/doc/examples/inst.py b/doc/examples/inst.py
index d1e2ff99..b37ab4cd 100644
--- a/doc/examples/inst.py
+++ b/doc/examples/inst.py
@@ -11,23 +11,23 @@ from apt.progress import InstallProgress
class TextInstallProgress(InstallProgress):
- def __init__(self):
- apt.progress.InstallProgress.__init__(self)
- self.last = 0.0
+ def __init__(self):
+ apt.progress.InstallProgress.__init__(self)
+ self.last = 0.0
- def updateInterface(self):
- InstallProgress.updateInterface(self)
- if self.last >= self.percent:
- return
- sys.stdout.write("\r[%s] %s\n" %(self.percent, self.status))
- sys.stdout.flush()
- self.last = self.percent
+ def updateInterface(self):
+ InstallProgress.updateInterface(self)
+ if self.last >= self.percent:
+ return
+ sys.stdout.write("\r[%s] %s\n" %(self.percent, self.status))
+ sys.stdout.flush()
+ self.last = self.percent
- def conffile(self,current,new):
- print "conffile prompt: %s %s" % (current,new)
+ def conffile(self,current,new):
+ print "conffile prompt: %s %s" % (current,new)
- def error(self, errorstr):
- print "got dpkg error: '%s'" % errorstr
+ def error(self, errorstr):
+ print "got dpkg error: '%s'" % errorstr
cache = apt.Cache(apt.progress.OpTextProgress())
@@ -39,11 +39,11 @@ pkg = cache["3dchess"]
# install or remove, the importend thing is to keep us busy :)
if pkg.isInstalled:
- print "Going to delete %s" % pkg.name
- pkg.markDelete()
+ print "Going to delete %s" % pkg.name
+ pkg.markDelete()
else:
- print "Going to install %s" % pkg.name
- pkg.markInstall()
+ print "Going to install %s" % pkg.name
+ pkg.markInstall()
res = cache.commit(fprogress, iprogress)
print res
diff --git a/doc/examples/print_uris.py b/doc/examples/print_uris.py
index c8a64223..3c93a668 100755
--- a/doc/examples/print_uris.py
+++ b/doc/examples/print_uris.py
@@ -12,11 +12,11 @@ upgradable = filter(lambda p: p.isUpgradable, cache)
for pkg in upgradable:
- pkg._lookupRecord(True)
- path = apt_pkg.ParseSection(pkg._records.Record)["Filename"]
- cand = pkg._depcache.GetCandidateVer(pkg._pkg)
- for (packagefile,i) in cand.FileList:
- indexfile = cache._list.FindIndex(packagefile)
- if indexfile:
- uri = indexfile.ArchiveURI(path)
- print uri
+ pkg._lookupRecord(True)
+ path = apt_pkg.ParseSection(pkg._records.Record)["Filename"]
+ cand = pkg._depcache.GetCandidateVer(pkg._pkg)
+ for (packagefile,i) in cand.FileList:
+ indexfile = cache._list.FindIndex(packagefile)
+ if indexfile:
+ uri = indexfile.ArchiveURI(path)
+ print uri
diff --git a/doc/examples/progress.py b/doc/examples/progress.py
index 39e73e70..d8f00a52 100644
--- a/doc/examples/progress.py
+++ b/doc/examples/progress.py
@@ -41,8 +41,8 @@ class TextFetchProgress(apt.FetchProgress):
return True
def mediaChange(self, medium, drive):
- print "Please insert medium %s in drive %s" % (medium, drive)
- sys.stdin.readline()
+ print "Please insert medium %s in drive %s" % (medium, drive)
+ sys.stdin.readline()
#return False
diff --git a/doc/examples/recommends.py b/doc/examples/recommends.py
index 03d46068..f0b3b1be 100755
--- a/doc/examples/recommends.py
+++ b/doc/examples/recommends.py
@@ -8,32 +8,32 @@ cache = apt_pkg.GetCache()
class Wanted:
- def __init__(self, name):
- self.name = name
- self.recommended = []
- self.suggested = []
+ def __init__(self, name):
+ self.name = name
+ self.recommended = []
+ self.suggested = []
wanted = {}
for package in cache.Packages:
- current = package.CurrentVer
- if not current:
- continue
- depends = current.DependsList
- for (key, attr) in (('Suggests', 'suggested'),
- ('Recommends', 'recommended')):
- list = depends.get(key, [])
- for dependency in list:
- name = dependency[0].TargetPkg.Name
- dep = cache[name]
- if dep.CurrentVer:
- continue
- getattr(wanted.setdefault(name, Wanted(name)),
- attr).append(package.Name)
+ current = package.CurrentVer
+ if not current:
+ continue
+ depends = current.DependsList
+ for (key, attr) in (('Suggests', 'suggested'),
+ ('Recommends', 'recommended')):
+ list = depends.get(key, [])
+ for dependency in list:
+ name = dependency[0].TargetPkg.Name
+ dep = cache[name]
+ if dep.CurrentVer:
+ continue
+ getattr(wanted.setdefault(name, Wanted(name)),
+ attr).append(package.Name)
ks = wanted.keys()
ks.sort()
for want in ks:
- print want, wanted[want].recommended, wanted[want].suggested
+ print want, wanted[want].recommended, wanted[want].suggested
diff --git a/doc/examples/records.py b/doc/examples/records.py
index ef04b555..9dfc460b 100755
--- a/doc/examples/records.py
+++ b/doc/examples/records.py
@@ -5,8 +5,8 @@ import apt
cache = apt.Cache()
for pkg in cache:
- if not pkg.candidateRecord:
- continue
- if pkg.candidateRecord.has_key("Task"):
- print "Pkg %s is part of '%s'" % (pkg.name, pkg.candidateRecord["Task"].split())
- #print pkg.candidateRecord
+ if not pkg.candidateRecord:
+ continue
+ if pkg.candidateRecord.has_key("Task"):
+ print "Pkg %s is part of '%s'" % (pkg.name, pkg.candidateRecord["Task"].split())
+ #print pkg.candidateRecord
diff --git a/doc/examples/sources.py b/doc/examples/sources.py
index b48c0ba5..8d807f4c 100644
--- a/doc/examples/sources.py
+++ b/doc/examples/sources.py
@@ -10,6 +10,6 @@ apt_pkg.init()
sources = apt_pkg.GetPkgSrcRecords()
sources.Restart()
while sources.Lookup('hello'):
- print sources.Package, sources.Version, sources.Maintainer, sources.Section, `sources.Binaries`
- print sources.Files
- print sources.Index.ArchiveURI(sources.Files[0][2])
+ print sources.Package, sources.Version, sources.Maintainer, sources.Section, `sources.Binaries`
+ print sources.Files
+ print sources.Index.ArchiveURI(sources.Files[0][2])
diff --git a/doc/examples/tagfile.py b/doc/examples/tagfile.py
index 653c0a71..aeba34d2 100755
--- a/doc/examples/tagfile.py
+++ b/doc/examples/tagfile.py
@@ -4,5 +4,5 @@ import apt_pkg
Parse = apt_pkg.ParseTagFile(open("/var/lib/dpkg/status","r"));
while Parse.Step() == 1:
- print Parse.Section.get("Package");
- print apt_pkg.ParseDepends(Parse.Section.get("Depends",""));
+ print Parse.Section.get("Package");
+ print apt_pkg.ParseDepends(Parse.Section.get("Depends",""));
diff --git a/doc/examples/versiontest.py b/doc/examples/versiontest.py
index c4e5f44d..8f18f6c8 100755
--- a/doc/examples/versiontest.py
+++ b/doc/examples/versiontest.py
@@ -7,30 +7,30 @@ apt_pkg.InitSystem();
TestFile = apt_pkg.ParseCommandLine(apt_pkg.Config,[],sys.argv);
if len(TestFile) != 1:
- print "Must have exactly 1 file name";
- sys.exit(0);
+ print "Must have exactly 1 file name";
+ sys.exit(0);
# Go over the file..
List = open(TestFile[0],"r");
CurLine = 0;
while(1):
- Line = List.readline();
- CurLine = CurLine + 1;
- if Line == "":
- break;
- Line = string.strip(Line);
- if len(Line) == 0 or Line[0] == '#':
- continue;
+ Line = List.readline();
+ CurLine = CurLine + 1;
+ if Line == "":
+ break;
+ Line = string.strip(Line);
+ if len(Line) == 0 or Line[0] == '#':
+ continue;
- Split = re.split("[ \n]",Line);
+ Split = re.split("[ \n]",Line);
- # Check forward
- if apt_pkg.VersionCompare(Split[0],Split[1]) != int(Split[2]):
- print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine,
- Split[0],Split[1],apt_pkg.VersionCompare(Split[0],Split[1]),
- int(Split[2]));
- # Check reverse
- if apt_pkg.VersionCompare(Split[1],Split[0]) != -1*int(Split[2]):
- print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine,
- Split[1],Split[0],apt_pkg.VersionCompare(Split[1],Split[0]),
- -1*int(Split[2]));
+ # Check forward
+ if apt_pkg.VersionCompare(Split[0],Split[1]) != int(Split[2]):
+ print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine,
+ Split[0],Split[1],apt_pkg.VersionCompare(Split[0],Split[1]),
+ int(Split[2]));
+ # Check reverse
+ if apt_pkg.VersionCompare(Split[1],Split[0]) != -1*int(Split[2]):
+ print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine,
+ Split[1],Split[0],apt_pkg.VersionCompare(Split[1],Split[0]),
+ -1*int(Split[2]));
diff --git a/tests/cache.py b/tests/cache.py
index 34535e68..24732578 100644
--- a/tests/cache.py
+++ b/tests/cache.py
@@ -8,44 +8,44 @@ import sys
def main():
- apt_pkg.init()
- cache = apt_pkg.GetCache()
- depcache = apt_pkg.GetDepCache(cache)
- depcache.Init()
- i=0
- all=cache.PackageCount
- print "Running Cache test on all packages:"
- # first, get all pkgs
- for pkg in cache.Packages:
- i += 1
- x = pkg.Name
- # then get each version
- for ver in pkg.VersionList:
- # get some version information
- a = ver.FileList
- b = ver.VerStr
- c = ver.Arch
- d = ver.DependsListStr
- dl = ver.DependsList
- # get all dependencies (a dict of string->list,
- # e.g. "depends:" -> [ver1,ver2,..]
- for dep in dl.keys():
- # get the list of each dependency object
- for depVerList in dl[dep]:
- for z in depVerList:
- # get all TargetVersions of
- # the dependency object
- for j in z.AllTargets():
- f = j.FileList
- g = ver.VerStr
- h = ver.Arch
- k = ver.DependsListStr
- j = ver.DependsList
- pass
+ apt_pkg.init()
+ cache = apt_pkg.GetCache()
+ depcache = apt_pkg.GetDepCache(cache)
+ depcache.Init()
+ i=0
+ all=cache.PackageCount
+ print "Running Cache test on all packages:"
+ # first, get all pkgs
+ for pkg in cache.Packages:
+ i += 1
+ x = pkg.Name
+ # then get each version
+ for ver in pkg.VersionList:
+ # get some version information
+ a = ver.FileList
+ b = ver.VerStr
+ c = ver.Arch
+ d = ver.DependsListStr
+ dl = ver.DependsList
+ # get all dependencies (a dict of string->list,
+ # e.g. "depends:" -> [ver1,ver2,..]
+ for dep in dl.keys():
+ # get the list of each dependency object
+ for depVerList in dl[dep]:
+ for z in depVerList:
+ # get all TargetVersions of
+ # the dependency object
+ for j in z.AllTargets():
+ f = j.FileList
+ g = ver.VerStr
+ h = ver.Arch
+ k = ver.DependsListStr
+ j = ver.DependsList
+ pass
- print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)),
+ print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)),
if __name__ == "__main__":
- main()
- sys.exit(0)
+ main()
+ sys.exit(0)
diff --git a/tests/depcache.py b/tests/depcache.py
index 635fea14..32c510f0 100644
--- a/tests/depcache.py
+++ b/tests/depcache.py
@@ -8,46 +8,46 @@ import sys
def main():
- apt_pkg.init()
- cache = apt_pkg.GetCache()
- depcache = apt_pkg.GetDepCache(cache)
- depcache.Init()
- i=0
- all=cache.PackageCount
- print "Running DepCache test on all packages"
- print "(trying to install each and then mark it keep again):"
- # first, get all pkgs
- for pkg in cache.Packages:
- i += 1
- x = pkg.Name
- # then get each version
- ver =depcache.GetCandidateVer(pkg)
- if ver != None:
- depcache.MarkInstall(pkg)
- if depcache.InstCount == 0:
- if depcache.IsUpgradable(pkg):
- print "Error marking %s for install" % x
- for p in cache.Packages:
- if depcache.MarkedInstall(p):
- depcache.MarkKeep(p)
- if depcache.InstCount != 0:
- print "Error undoing the selection for %s (InstCount: %s)" % (x,depcache.InstCount)
- print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)),
+ apt_pkg.init()
+ cache = apt_pkg.GetCache()
+ depcache = apt_pkg.GetDepCache(cache)
+ depcache.Init()
+ i=0
+ all=cache.PackageCount
+ print "Running DepCache test on all packages"
+ print "(trying to install each and then mark it keep again):"
+ # first, get all pkgs
+ for pkg in cache.Packages:
+ i += 1
+ x = pkg.Name
+ # then get each version
+ ver =depcache.GetCandidateVer(pkg)
+ if ver != None:
+ depcache.MarkInstall(pkg)
+ if depcache.InstCount == 0:
+ if depcache.IsUpgradable(pkg):
+ print "Error marking %s for install" % x
+ for p in cache.Packages:
+ if depcache.MarkedInstall(p):
+ depcache.MarkKeep(p)
+ if depcache.InstCount != 0:
+ print "Error undoing the selection for %s (InstCount: %s)" % (x,depcache.InstCount)
+ print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)),
- print
- print "Trying Upgrade:"
- depcache.Upgrade()
- print "To install: %s " % depcache.InstCount
- print "To remove: %s " % depcache.DelCount
- print "Kept back: %s " % depcache.KeepCount
+ print
+ print "Trying Upgrade:"
+ depcache.Upgrade()
+ print "To install: %s " % depcache.InstCount
+ print "To remove: %s " % depcache.DelCount
+ print "Kept back: %s " % depcache.KeepCount
- print "Trying DistUpgrade:"
- depcache.Upgrade(True)
- print "To install: %s " % depcache.InstCount
- print "To remove: %s " % depcache.DelCount
- print "Kept back: %s " % depcache.KeepCount
+ print "Trying DistUpgrade:"
+ depcache.Upgrade(True)
+ print "To install: %s " % depcache.InstCount
+ print "To remove: %s " % depcache.DelCount
+ print "Kept back: %s " % depcache.KeepCount
if __name__ == "__main__":
- main()
- sys.exit(0)
+ main()
+ sys.exit(0)
diff --git a/tests/lock.py b/tests/lock.py
index a3d962d7..aeab804b 100644
--- a/tests/lock.py
+++ b/tests/lock.py
@@ -21,7 +21,7 @@ if __name__ == "__main__":
apt_pkg.PkgSystemLock()
except SystemError, s:
print "Can't get lock: (error text:\n%s)" % s
- sys.exit(0)
+ sys.exit(0)
apt_pkg.PkgSystemUnLock()
@@ -35,7 +35,7 @@ if __name__ == "__main__":
# child
fd = apt_pkg.GetLock(lock,False)
print "Lockfile fd (child): %s" % fd
- sys.exit(0)
+ sys.exit(0)
# try to get lock with error flag
pid = os.fork()
@@ -43,4 +43,4 @@ if __name__ == "__main__":
# child
fd = apt_pkg.GetLock(lock,True)
print "Lockfile fd (child): %s" % fd
- sys.exit(0)
+ sys.exit(0)
diff --git a/tests/memleak.py b/tests/memleak.py
index 58a2f886..6eece9d4 100755
--- a/tests/memleak.py
+++ b/tests/memleak.py
@@ -11,36 +11,36 @@ cache = apt.Cache()
# memleak
for i in range(100):
- cache.open(None)
- print cache["apt"].name
- time.sleep(1)
- gc.collect()
- f = open("%s" % i,"w")
- for obj in gc.get_objects():
- f.write("%s\n" % str(obj))
- f.close()
+ cache.open(None)
+ print cache["apt"].name
+ time.sleep(1)
+ gc.collect()
+ f = open("%s" % i,"w")
+ for obj in gc.get_objects():
+ f.write("%s\n" % str(obj))
+ f.close()
# memleak
#for i in range(100):
-# cache = apt.Cache()
-# time.sleep(1)
-# cache = None
-# gc.collect()
+# cache = apt.Cache()
+# time.sleep(1)
+# cache = None
+# gc.collect()
# no memleak, but more or less the apt.Cache.open() code
for i in range(100):
- cache = apt_pkg.GetCache()
- depcache = apt_pkg.GetDepCache(cache)
- records = apt_pkg.GetPkgRecords(cache)
- list = apt_pkg.GetPkgSourceList()
- list.ReadMainList()
- dict = {}
- for pkg in cache.Packages:
- if len(pkg.VersionList) > 0:
- dict[pkg.Name] = apt.Package(cache,depcache,
- records, list, None, pkg)
-
- print cache["apt"]
- time.sleep(1)
-
- gc.collect()
+ cache = apt_pkg.GetCache()
+ depcache = apt_pkg.GetDepCache(cache)
+ records = apt_pkg.GetPkgRecords(cache)
+ list = apt_pkg.GetPkgSourceList()
+ list.ReadMainList()
+ dict = {}
+ for pkg in cache.Packages:
+ if len(pkg.VersionList) > 0:
+ dict[pkg.Name] = apt.Package(cache,depcache,
+ records, list, None, pkg)
+
+ print cache["apt"]
+ time.sleep(1)
+
+ gc.collect()
diff --git a/tests/pkgproblemresolver.py b/tests/pkgproblemresolver.py
index 546e2f16..82186d40 100644
--- a/tests/pkgproblemresolver.py
+++ b/tests/pkgproblemresolver.py
@@ -8,63 +8,63 @@ import sys
def main():
- apt_pkg.init()
- cache = apt_pkg.GetCache()
- depcache = apt_pkg.GetDepCache(cache)
- depcache.Init()
- i=0
- all=cache.PackageCount
- print "Running DepCache test on all packages"
- print "(trying to install each and then mark it keep again):"
- # first, get all pkgs
- for pkg in cache.Packages:
- i += 1
- x = pkg.Name
- # then get each version
- ver =depcache.GetCandidateVer(pkg)
- if ver != None:
- depcache.MarkInstall(pkg)
- if depcache.BrokenCount > 0:
- fixer = apt_pkg.GetPkgProblemResolver(depcache)
- fixer.Clear(pkg)
- fixer.Protect(pkg)
- # we first try to resolve the problem
- # with the package that should be installed
- # protected
- try:
- fixer.Resolve(True)
- except SystemError:
- # the pkg seems to be broken, the
- # returns a exception
- fixer.Clear(pkg)
- fixer.Resolve(True)
- if not depcache.MarkedInstall(pkg):
- print "broken in archive: %s " % pkg.Name
- fixer = None
- if depcache.InstCount == 0:
- if depcache.IsUpgradable(pkg):
- print "Error marking %s for install" % x
- for p in cache.Packages:
- if depcache.MarkedInstall(p) or depcache.MarkedUpgrade(p):
- depcache.MarkKeep(p)
- if depcache.InstCount != 0:
- print "Error undoing the selection for %s" % x
- print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)),
+ apt_pkg.init()
+ cache = apt_pkg.GetCache()
+ depcache = apt_pkg.GetDepCache(cache)
+ depcache.Init()
+ i=0
+ all=cache.PackageCount
+ print "Running DepCache test on all packages"
+ print "(trying to install each and then mark it keep again):"
+ # first, get all pkgs
+ for pkg in cache.Packages:
+ i += 1
+ x = pkg.Name
+ # then get each version
+ ver =depcache.GetCandidateVer(pkg)
+ if ver != None:
+ depcache.MarkInstall(pkg)
+ if depcache.BrokenCount > 0:
+ fixer = apt_pkg.GetPkgProblemResolver(depcache)
+ fixer.Clear(pkg)
+ fixer.Protect(pkg)
+ # we first try to resolve the problem
+ # with the package that should be installed
+ # protected
+ try:
+ fixer.Resolve(True)
+ except SystemError:
+ # the pkg seems to be broken, the
+ # returns a exception
+ fixer.Clear(pkg)
+ fixer.Resolve(True)
+ if not depcache.MarkedInstall(pkg):
+ print "broken in archive: %s " % pkg.Name
+ fixer = None
+ if depcache.InstCount == 0:
+ if depcache.IsUpgradable(pkg):
+ print "Error marking %s for install" % x
+ for p in cache.Packages:
+ if depcache.MarkedInstall(p) or depcache.MarkedUpgrade(p):
+ depcache.MarkKeep(p)
+ if depcache.InstCount != 0:
+ print "Error undoing the selection for %s" % x
+ print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)),
- print
- print "Trying Upgrade:"
- depcache.Upgrade()
- print "To install: %s " % depcache.InstCount
- print "To remove: %s " % depcache.DelCount
- print "Kept back: %s " % depcache.KeepCount
+ print
+ print "Trying Upgrade:"
+ depcache.Upgrade()
+ print "To install: %s " % depcache.InstCount
+ print "To remove: %s " % depcache.DelCount
+ print "Kept back: %s " % depcache.KeepCount
- print "Trying DistUpgrade:"
- depcache.Upgrade(True)
- print "To install: %s " % depcache.InstCount
- print "To remove: %s " % depcache.DelCount
- print "Kept back: %s " % depcache.KeepCount
+ print "Trying DistUpgrade:"
+ depcache.Upgrade(True)
+ print "To install: %s " % depcache.InstCount
+ print "To remove: %s " % depcache.DelCount
+ print "Kept back: %s " % depcache.KeepCount
if __name__ == "__main__":
- main()
- sys.exit(0)
+ main()
+ sys.exit(0)
diff --git a/tests/pkgrecords.py b/tests/pkgrecords.py
index 308505f7..72096463 100644
--- a/tests/pkgrecords.py
+++ b/tests/pkgrecords.py
@@ -9,30 +9,30 @@ import sys
def main():
- apt_pkg.init()
- cache = apt_pkg.GetCache()
- depcache = apt_pkg.GetDepCache(cache)
- depcache.Init()
- i=0
- print "Running PkgRecords test on all packages:"
- for pkg in cache.Packages:
- i += 1
- records = apt_pkg.GetPkgRecords(cache)
- if len(pkg.VersionList) == 0:
- #print "no available version, cruft"
- continue
- version = depcache.GetCandidateVer(pkg)
- if not version:
- continue
- file, index = version.FileList.pop(0)
- if records.Lookup((file,index)):
- #print records.FileName
- x = records.FileName
- y = records.LongDesc
- pass
- print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)),
+ apt_pkg.init()
+ cache = apt_pkg.GetCache()
+ depcache = apt_pkg.GetDepCache(cache)
+ depcache.Init()
+ i=0
+ print "Running PkgRecords test on all packages:"
+ for pkg in cache.Packages:
+ i += 1
+ records = apt_pkg.GetPkgRecords(cache)
+ if len(pkg.VersionList) == 0:
+ #print "no available version, cruft"
+ continue
+ version = depcache.GetCandidateVer(pkg)
+ if not version:
+ continue
+ file, index = version.FileList.pop(0)
+ if records.Lookup((file,index)):
+ #print records.FileName
+ x = records.FileName
+ y = records.LongDesc
+ pass
+ print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)),
if __name__ == "__main__":
- main()
- sys.exit(0)
+ main()
+ sys.exit(0)
diff --git a/tests/pkgsrcrecords.py b/tests/pkgsrcrecords.py
index 410140c8..3eb0fcab 100644
--- a/tests/pkgsrcrecords.py
+++ b/tests/pkgsrcrecords.py
@@ -9,19 +9,19 @@ import sys
def main():
- apt_pkg.init()
- cache = apt_pkg.GetCache()
- i=0
- print "Running PkgSrcRecords test on all packages:"
- for x in cache.Packages:
- i += 1
- src = apt_pkg.GetPkgSrcRecords()
- if src.Lookup(x.Name):
- #print src.Package
- pass
- print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)),
+ apt_pkg.init()
+ cache = apt_pkg.GetCache()
+ i=0
+ print "Running PkgSrcRecords test on all packages:"
+ for x in cache.Packages:
+ i += 1
+ src = apt_pkg.GetPkgSrcRecords()
+ if src.Lookup(x.Name):
+ #print src.Package
+ pass
+ print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)),
if __name__ == "__main__":
- main()
- sys.exit(0)
+ main()
+ sys.exit(0)
diff --git a/utils/get_ubuntu_mirrors.py b/utils/get_ubuntu_mirrors.py
index 8482615d..ddd1e369 100755
--- a/utils/get_ubuntu_mirrors.py
+++ b/utils/get_ubuntu_mirrors.py
@@ -42,10 +42,10 @@ try:
uri=urllib2.urlopen(req)
p = re.compile('^.*((http|ftp):\/\/[A-Za-z0-9-.:\/_]+).*\n*$')
for line in uri.readlines():
- if r"[[Anchor(dvd-images)]]" in line:
- break
- if "http://" in line or "ftp://" in line:
- mirrors.append(p.sub(r"\1", line))
+ if r"[[Anchor(dvd-images)]]" in line:
+ break
+ if "http://" in line or "ftp://" in line:
+ mirrors.append(p.sub(r"\1", line))
uri.close()
except:
print "Failed to download or extract the mirrors list!"