diff options
| -rw-r--r-- | apt/progress.py | 8 | ||||
| -rw-r--r-- | aptsources/distro.py | 734 | ||||
| -rw-r--r-- | aptsources/sourceslist.py | 762 | ||||
| -rw-r--r-- | doc/examples/acquire.py | 38 | ||||
| -rwxr-xr-x | doc/examples/build-deps.py | 72 | ||||
| -rwxr-xr-x | doc/examples/checkstate.py | 34 | ||||
| -rwxr-xr-x | doc/examples/config.py | 16 | ||||
| -rwxr-xr-x | doc/examples/configisc.py | 24 | ||||
| -rwxr-xr-x | doc/examples/dependant-pkgs.py | 34 | ||||
| -rwxr-xr-x | doc/examples/gui-inst.py | 48 | ||||
| -rw-r--r-- | doc/examples/inst.py | 36 | ||||
| -rwxr-xr-x | doc/examples/print_uris.py | 16 | ||||
| -rw-r--r-- | doc/examples/progress.py | 4 | ||||
| -rwxr-xr-x | doc/examples/recommends.py | 38 | ||||
| -rwxr-xr-x | doc/examples/records.py | 10 | ||||
| -rw-r--r-- | doc/examples/sources.py | 6 | ||||
| -rwxr-xr-x | doc/examples/tagfile.py | 4 | ||||
| -rwxr-xr-x | doc/examples/versiontest.py | 40 | ||||
| -rw-r--r-- | tests/cache.py | 74 | ||||
| -rw-r--r-- | tests/depcache.py | 76 | ||||
| -rw-r--r-- | tests/lock.py | 6 | ||||
| -rwxr-xr-x | tests/memleak.py | 54 | ||||
| -rw-r--r-- | tests/pkgproblemresolver.py | 110 | ||||
| -rw-r--r-- | tests/pkgrecords.py | 48 | ||||
| -rw-r--r-- | tests/pkgsrcrecords.py | 26 | ||||
| -rwxr-xr-x | utils/get_ubuntu_mirrors.py | 8 |
26 files changed, 1163 insertions, 1163 deletions
diff --git a/apt/progress.py b/apt/progress.py index b50b2915..6f72197e 100644 --- a/apt/progress.py +++ b/apt/progress.py @@ -188,14 +188,14 @@ class InstallProgress(DumbInstallProgress): pass def statusChange(self, pkg, percent, status): - " called when the status changed " - pass + " called when the status changed " + pass def updateInterface(self): if self.statusfd != None: try: - while not self.read.endswith("\n"): - self.read += os.read(self.statusfd.fileno(),1) + while not self.read.endswith("\n"): + self.read += os.read(self.statusfd.fileno(),1) except OSError, (errno,errstr): # resource temporarly unavailable is ignored if errno != EAGAIN and errnor != EWOULDBLOCK: diff --git a/aptsources/distro.py b/aptsources/distro.py index faccc271..962c57bc 100644 --- a/aptsources/distro.py +++ b/aptsources/distro.py @@ -34,396 +34,396 @@ def _(s): return gettext.dgettext("python-apt", s) class NoDistroTemplateException(Exception): - pass + pass class Distribution: - def __init__(self, id, codename, description, release): - """ Container for distribution specific informations """ - # LSB information - self.id = id - self.codename = codename - self.description = description - self.release = release - - self.binary_type = "deb" - self.source_type = "deb-src" - - def get_sources(self, sourceslist): - """ - Find the corresponding template, main and child sources - for the distribution - """ - - self.sourceslist = sourceslist - # corresponding sources - self.source_template = None - self.child_sources = [] - self.main_sources = [] - self.disabled_sources = [] - self.cdrom_sources = [] - self.download_comps = [] - self.enabled_comps = [] - self.cdrom_comps = [] - self.used_media = [] - self.get_source_code = False - self.source_code_sources = [] - - # location of the sources - self.default_server = "" - self.main_server = "" - self.nearest_server = "" - self.used_servers = [] - - # find the distro template - for template in self.sourceslist.matcher.templates: - if self.is_codename(template.name) and\ - template.distribution == self.id: - #print "yeah! found a template for %s" % self.description - #print template.description, template.base_uri, template.components - self.source_template = template - break - if self.source_template == None: - raise (NoDistroTemplateException, - "Error: could not find a distribution template") - - # find main and child sources - media = [] - comps = [] - cdrom_comps = [] - enabled_comps = [] - source_code = [] - for source in self.sourceslist.list: - if source.invalid == False and\ - self.is_codename(source.dist) and\ - source.template and\ - self.is_codename(source.template.name): - #print "yeah! found a distro repo: %s" % source.line - # cdroms need do be handled differently - if source.uri.startswith("cdrom:") and \ - source.disabled == False: - self.cdrom_sources.append(source) - cdrom_comps.extend(source.comps) - elif source.uri.startswith("cdrom:") and \ - source.disabled == True: - self.cdrom_sources.append(source) - elif source.type == self.binary_type and \ - source.disabled == False: - self.main_sources.append(source) - comps.extend(source.comps) - media.append(source.uri) - elif source.type == self.binary_type and \ - source.disabled == True: - self.disabled_sources.append(source) - elif source.type == self.source_type and source.disabled == False: - self.source_code_sources.append(source) - elif source.type == self.source_type and source.disabled == True: - self.disabled_sources.append(source) - if source.invalid == False and\ - source.template in self.source_template.children: - if source.disabled == False and source.type == self.binary_type: - self.child_sources.append(source) - elif source.disabled == False and source.type == self.source_type: - self.source_code_sources.append(source) - else: - self.disabled_sources.append(source) - self.download_comps = set(comps) - self.cdrom_comps = set(cdrom_comps) - enabled_comps.extend(comps) - enabled_comps.extend(cdrom_comps) - self.enabled_comps = set(enabled_comps) - self.used_media = set(media) - - self.get_mirrors() - - def get_mirrors(self, mirror_template=None): - """ - Provide a set of mirrors where you can get the distribution from - """ - # the main server is stored in the template - self.main_server = self.source_template.base_uri - - # other used servers - for medium in self.used_media: - if not medium.startswith("cdrom:"): - # seems to be a network source - self.used_servers.append(medium) - - if len(self.main_sources) == 0: - self.default_server = self.main_server - else: - self.default_server = self.main_sources[0].uri - - # get a list of country codes and real names - self.countries = {} - try: - f = open("/usr/share/iso-codes/iso_3166.tab", "r") - lines = f.readlines() - for line in lines: - parts = line.split("\t") - self.countries[parts[0].lower()] = parts[1].strip() - except: - print "could not open file '%s'" % file - else: - f.close() - - # try to guess the nearest mirror from the locale - self.country = None - self.country_code = None - locale = os.getenv("LANG", default="en.UK") - a = locale.find("_") - z = locale.find(".") - if z == -1: - z = len(locale) - country_code = locale[a+1:z].lower() - - if mirror_template: - self.nearest_server = mirror_template % country_code - - if self.countries.has_key(country_code): - self.country = self.countries[country_code] - self.country_code = country_code - - def _get_mirror_name(self, server): - ''' Try to get a human readable name for the main mirror of a country - Customize for different distributions ''' - country = None - i = server.find("://") - l = server.find(".archive.ubuntu.com") - if i != -1 and l != -1: - country = server[i+len("://"):l] - if self.countries.has_key(country): - # TRANSLATORS: %s is a country - return _("Server for %s") % \ - gettext.dgettext("iso_3166", - self.countries[country].rstrip()).rstrip() - else: - return("%s" % server.rstrip("/ ")) - - def get_server_list(self): - ''' Return a list of used and suggested servers ''' - - def compare_mirrors(mir1, mir2): - '''Helper function that handles comaprision of mirror urls - that could contain trailing slashes''' - return re.match(mir1.strip("/ "), mir2.rstrip("/ ")) - - # Store all available servers: - # Name, URI, active - mirrors = [] - if len(self.used_servers) < 1 or \ - (len(self.used_servers) == 1 and \ - compare_mirrors(self.used_servers[0], self.main_server)): - mirrors.append([_("Main server"), self.main_server, True]) - mirrors.append([self._get_mirror_name(self.nearest_server), - self.nearest_server, False]) - elif len(self.used_servers) == 1 and not \ - compare_mirrors(self.used_servers[0], self.main_server): - mirrors.append([_("Main server"), self.main_server, False]) - # Only one server is used - server = self.used_servers[0] - - # Append the nearest server if it's not already used - if not compare_mirrors(server, self.nearest_server): + def __init__(self, id, codename, description, release): + """ Container for distribution specific informations """ + # LSB information + self.id = id + self.codename = codename + self.description = description + self.release = release + + self.binary_type = "deb" + self.source_type = "deb-src" + + def get_sources(self, sourceslist): + """ + Find the corresponding template, main and child sources + for the distribution + """ + + self.sourceslist = sourceslist + # corresponding sources + self.source_template = None + self.child_sources = [] + self.main_sources = [] + self.disabled_sources = [] + self.cdrom_sources = [] + self.download_comps = [] + self.enabled_comps = [] + self.cdrom_comps = [] + self.used_media = [] + self.get_source_code = False + self.source_code_sources = [] + + # location of the sources + self.default_server = "" + self.main_server = "" + self.nearest_server = "" + self.used_servers = [] + + # find the distro template + for template in self.sourceslist.matcher.templates: + if self.is_codename(template.name) and\ + template.distribution == self.id: + #print "yeah! found a template for %s" % self.description + #print template.description, template.base_uri, template.components + self.source_template = template + break + if self.source_template == None: + raise (NoDistroTemplateException, + "Error: could not find a distribution template") + + # find main and child sources + media = [] + comps = [] + cdrom_comps = [] + enabled_comps = [] + source_code = [] + for source in self.sourceslist.list: + if source.invalid == False and\ + self.is_codename(source.dist) and\ + source.template and\ + self.is_codename(source.template.name): + #print "yeah! found a distro repo: %s" % source.line + # cdroms need do be handled differently + if source.uri.startswith("cdrom:") and \ + source.disabled == False: + self.cdrom_sources.append(source) + cdrom_comps.extend(source.comps) + elif source.uri.startswith("cdrom:") and \ + source.disabled == True: + self.cdrom_sources.append(source) + elif source.type == self.binary_type and \ + source.disabled == False: + self.main_sources.append(source) + comps.extend(source.comps) + media.append(source.uri) + elif source.type == self.binary_type and \ + source.disabled == True: + self.disabled_sources.append(source) + elif source.type == self.source_type and source.disabled == False: + self.source_code_sources.append(source) + elif source.type == self.source_type and source.disabled == True: + self.disabled_sources.append(source) + if source.invalid == False and\ + source.template in self.source_template.children: + if source.disabled == False and source.type == self.binary_type: + self.child_sources.append(source) + elif source.disabled == False and source.type == self.source_type: + self.source_code_sources.append(source) + else: + self.disabled_sources.append(source) + self.download_comps = set(comps) + self.cdrom_comps = set(cdrom_comps) + enabled_comps.extend(comps) + enabled_comps.extend(cdrom_comps) + self.enabled_comps = set(enabled_comps) + self.used_media = set(media) + + self.get_mirrors() + + def get_mirrors(self, mirror_template=None): + """ + Provide a set of mirrors where you can get the distribution from + """ + # the main server is stored in the template + self.main_server = self.source_template.base_uri + + # other used servers + for medium in self.used_media: + if not medium.startswith("cdrom:"): + # seems to be a network source + self.used_servers.append(medium) + + if len(self.main_sources) == 0: + self.default_server = self.main_server + else: + self.default_server = self.main_sources[0].uri + + # get a list of country codes and real names + self.countries = {} + try: + f = open("/usr/share/iso-codes/iso_3166.tab", "r") + lines = f.readlines() + for line in lines: + parts = line.split("\t") + self.countries[parts[0].lower()] = parts[1].strip() + except: + print "could not open file '%s'" % file + else: + f.close() + + # try to guess the nearest mirror from the locale + self.country = None + self.country_code = None + locale = os.getenv("LANG", default="en.UK") + a = locale.find("_") + z = locale.find(".") + if z == -1: + z = len(locale) + country_code = locale[a+1:z].lower() + + if mirror_template: + self.nearest_server = mirror_template % country_code + + if self.countries.has_key(country_code): + self.country = self.countries[country_code] + self.country_code = country_code + + def _get_mirror_name(self, server): + ''' Try to get a human readable name for the main mirror of a country + Customize for different distributions ''' + country = None + i = server.find("://") + l = server.find(".archive.ubuntu.com") + if i != -1 and l != -1: + country = server[i+len("://"):l] + if self.countries.has_key(country): + # TRANSLATORS: %s is a country + return _("Server for %s") % \ + gettext.dgettext("iso_3166", + self.countries[country].rstrip()).rstrip() + else: + return("%s" % server.rstrip("/ ")) + + def get_server_list(self): + ''' Return a list of used and suggested servers ''' + + def compare_mirrors(mir1, mir2): + ''' Helper function that handles comaprision of mirror urls + that could contain trailing slashes''' + return re.match(mir1.strip("/ "), mir2.rstrip("/ ")) + + # Store all available servers: + # Name, URI, active + mirrors = [] + if len(self.used_servers) < 1 or \ + (len(self.used_servers) == 1 and \ + compare_mirrors(self.used_servers[0], self.main_server)): + mirrors.append([_("Main server"), self.main_server, True]) mirrors.append([self._get_mirror_name(self.nearest_server), self.nearest_server, False]) - mirrors.append([self._get_mirror_name(server), server, True]) - - elif len(self.used_servers) > 1: - # More than one server is used. Since we don't handle this case - # in the user interface we set "custom servers" to true and - # append a list of all used servers - mirrors.append([_("Main server"), self.main_server, False]) - mirrors.append([self._get_mirror_name(self.nearest_server), - self.nearest_server, False]) - mirrors.append([_("Custom servers"), None, True]) - for server in self.used_servers: - if compare_mirrors(server, self.nearest_server) or\ - compare_mirrors(server, self.main_server): - continue - elif not [self._get_mirror_name(server), server, False] in mirrors: - mirrors.append([self._get_mirror_name(server), server, False]) - - return mirrors - - def add_source(self, type=None, + elif len(self.used_servers) == 1 and not \ + compare_mirrors(self.used_servers[0], self.main_server): + mirrors.append([_("Main server"), self.main_server, False]) + # Only one server is used + server = self.used_servers[0] + + # Append the nearest server if it's not already used + if not compare_mirrors(server, self.nearest_server): + mirrors.append([self._get_mirror_name(self.nearest_server), + self.nearest_server, False]) + mirrors.append([self._get_mirror_name(server), server, True]) + + elif len(self.used_servers) > 1: + # More than one server is used. Since we don't handle this case + # in the user interface we set "custom servers" to true and + # append a list of all used servers + mirrors.append([_("Main server"), self.main_server, False]) + mirrors.append([self._get_mirror_name(self.nearest_server), + self.nearest_server, False]) + mirrors.append([_("Custom servers"), None, True]) + for server in self.used_servers: + if compare_mirrors(server, self.nearest_server) or\ + compare_mirrors(server, self.main_server): + continue + elif not [self._get_mirror_name(server), server, False] in mirrors: + mirrors.append([self._get_mirror_name(server), server, False]) + + return mirrors + + def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""): - """ - Add distribution specific sources - """ - if uri == None: - # FIXME: Add support for the server selector - uri = self.default_server - if dist == None: - dist = self.codename - if comps == None: - comps = list(self.enabled_comps) - if type == None: - type = self.binary_type - new_source = self.sourceslist.add(type, uri, dist, comps, comment) - # if source code is enabled add a deb-src line after the new - # source - if self.get_source_code == True and type == self.binary_type: - self.sourceslist.add(self.source_type, uri, dist, comps, comment, - file=new_source.file, - pos=self.sourceslist.list.index(new_source)+1) - - def enable_component(self, comp): - """ - Enable a component in all main, child and source code sources - (excluding cdrom based sources) - - comp: the component that should be enabled - """ - - def add_component_only_once(source, comps_per_dist): """ - Check if we already added the component to the repository, since - a repository could be splitted into different apt lines. If not - add the component + Add distribution specific sources """ - # if we don't that distro, just reutnr (can happen for e.g. - # dapper-update only in deb-src - if not comps_per_dist.has_key(source.dist): - return - # if we have seen this component already for this distro, - # return (nothing to do - if comp in comps_per_dist[source.dist]: - return - # add it - source.comps.append(comp) - comps_per_dist[source.dist].add(comp) - - sources = [] - sources.extend(self.main_sources) - sources.extend(self.child_sources) - # store what comps are enabled already per distro (where distro is - # e.g. "dapper", "dapper-updates") - comps_per_dist = {} - comps_per_sdist = {} - for s in sources: - if s.type == self.binary_type: - if not comps_per_dist.has_key(s.dist): - comps_per_dist[s.dist] = set() - map(comps_per_dist[s.dist].add, s.comps) - for s in self.source_code_sources: - if s.type == self.source_type: - if not comps_per_sdist.has_key(s.dist): - comps_per_sdist[s.dist] = set() - map(comps_per_sdist[s.dist].add, s.comps) - - # check if there is a main source at all - if len(self.main_sources) < 1: - # create a new main source - self.add_source(comps=["%s"%comp]) - else: - # add the comp to all main, child and source code sources - for source in sources: - add_component_only_once(source, comps_per_dist) + if uri == None: + # FIXME: Add support for the server selector + uri = self.default_server + if dist == None: + dist = self.codename + if comps == None: + comps = list(self.enabled_comps) + if type == None: + type = self.binary_type + new_source = self.sourceslist.add(type, uri, dist, comps, comment) + # if source code is enabled add a deb-src line after the new + # source + if self.get_source_code == True and type == self.binary_type: + self.sourceslist.add(self.source_type, uri, dist, comps, comment, + file=new_source.file, + pos=self.sourceslist.list.index(new_source)+1) + + def enable_component(self, comp): + """ + Enable a component in all main, child and source code sources + (excluding cdrom based sources) - # check if there is a main source code source at all - if self.get_source_code == True: - if len(self.source_code_sources) < 1: - # create a new main source - self.add_source(type=self.source_type, comps=["%s"%comp]) - else: - # add the comp to all main, child and source code sources - for source in self.source_code_sources: - add_component_only_once(source, comps_per_sdist) - - def disable_component(self, comp): - """ - Disable a component in all main, child and source code sources - (excluding cdrom based sources) - """ - sources = [] - sources.extend(self.main_sources) - sources.extend(self.child_sources) - sources.extend(self.source_code_sources) - if comp in self.cdrom_comps: + comp: the component that should be enabled + """ + + def add_component_only_once(source, comps_per_dist): + """ + Check if we already added the component to the repository, since + a repository could be splitted into different apt lines. If not + add the component + """ + # if we don't that distro, just reutnr (can happen for e.g. + # dapper-update only in deb-src + if not comps_per_dist.has_key(source.dist): + return + # if we have seen this component already for this distro, + # return (nothing to do + if comp in comps_per_dist[source.dist]: + return + # add it + source.comps.append(comp) + comps_per_dist[source.dist].add(comp) + + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + # store what comps are enabled already per distro (where distro is + # e.g. "dapper", "dapper-updates") + comps_per_dist = {} + comps_per_sdist = {} + for s in sources: + if s.type == self.binary_type: + if not comps_per_dist.has_key(s.dist): + comps_per_dist[s.dist] = set() + map(comps_per_dist[s.dist].add, s.comps) + for s in self.source_code_sources: + if s.type == self.source_type: + if not comps_per_sdist.has_key(s.dist): + comps_per_sdist[s.dist] = set() + map(comps_per_sdist[s.dist].add, s.comps) + + # check if there is a main source at all + if len(self.main_sources) < 1: + # create a new main source + self.add_source(comps=["%s"%comp]) + else: + # add the comp to all main, child and source code sources + for source in sources: + add_component_only_once(source, comps_per_dist) + + # check if there is a main source code source at all + if self.get_source_code == True: + if len(self.source_code_sources) < 1: + # create a new main source + self.add_source(type=self.source_type, comps=["%s"%comp]) + else: + # add the comp to all main, child and source code sources + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + def disable_component(self, comp): + """ + Disable a component in all main, child and source code sources + (excluding cdrom based sources) + """ sources = [] sources.extend(self.main_sources) - for source in sources: - if comp in source.comps: - source.comps.remove(comp) + sources.extend(self.child_sources) + sources.extend(self.source_code_sources) + if comp in self.cdrom_comps: + sources = [] + sources.extend(self.main_sources) + for source in sources: + if comp in source.comps: + source.comps.remove(comp) + if len(source.comps) < 1: + self.sourceslist.remove(source) + + def change_server(self, uri): + ''' Change the server of all distro specific sources to + a given host ''' + + def change_server_of_source(source, uri, seen): + # Avoid creating duplicate entries + source.uri = uri + for comp in source.comps: + if [source.uri, source.dist, comp] in seen: + source.comps.remove(comp) + else: + seen.append([source.uri, source.dist, comp]) if len(source.comps) < 1: - self.sourceslist.remove(source) - - def change_server(self, uri): - ''' Change the server of all distro specific sources to - a given host ''' + self.sourceslist.remove(source) - def change_server_of_source(source, uri, seen): - # Avoid creating duplicate entries - source.uri = uri - for comp in source.comps: - if [source.uri, source.dist, comp] in seen: - source.comps.remove(comp) - else: - seen.append([source.uri, source.dist, comp]) - if len(source.comps) < 1: - self.sourceslist.remove(source) - - seen_binary = [] - seen_source = [] - self.default_server = uri - for source in self.main_sources: - change_server_of_source(source, uri, seen_binary) - for source in self.child_sources: - # Do not change the forces server of a child source - if source.template.base_uri == None or \ - source.template.base_uri != source.uri: + seen_binary = [] + seen_source = [] + self.default_server = uri + for source in self.main_sources: change_server_of_source(source, uri, seen_binary) - for source in self.source_code_sources: - change_server_of_source(source, uri, seen_source) - - def is_codename(self, name): - ''' Compare a given name with the release codename. ''' - if name == self.codename: - return True - else: - return False + for source in self.child_sources: + # Do not change the forces server of a child source + if source.template.base_uri == None or \ + source.template.base_uri != source.uri: + change_server_of_source(source, uri, seen_binary) + for source in self.source_code_sources: + change_server_of_source(source, uri, seen_source) + + def is_codename(self, name): + ''' Compare a given name with the release codename. ''' + if name == self.codename: + return True + else: + return False class DebianDistribution(Distribution): - ''' Class to support specific Debian features ''' + ''' Class to support specific Debian features ''' - def is_codename(self, name): - ''' Compare a given name with the release codename and check if - if it can be used as a synonym for a development releases ''' - if name == self.codename or self.release in ("testing", "unstable"): - return True - else: - return False - - def _get_mirror_name(self, server): - ''' Try to get a human readable name for the main mirror of a country - Debian specific ''' - country = None - i = server.find("://ftp.") - l = server.find(".debian.org") - if i != -1 and l != -1: - country = server[i+len("://ftp."):l] - if self.countries.has_key(country): - # TRANSLATORS: %s is a country - return _("Server for %s") % \ - gettext.dgettext("iso_3166", - self.countries[country].rstrip()).rstrip() - else: - return("%s" % server.rstrip("/ ")) - - def get_mirrors(self): - Distribution.get_mirrors(self, - mirror_template="http://ftp.%s.debian.org/debian/") + def is_codename(self, name): + ''' Compare a given name with the release codename and check if + if it can be used as a synonym for a development releases ''' + if name == self.codename or self.release in ("testing", "unstable"): + return True + else: + return False + + def _get_mirror_name(self, server): + ''' Try to get a human readable name for the main mirror of a country + Debian specific ''' + country = None + i = server.find("://ftp.") + l = server.find(".debian.org") + if i != -1 and l != -1: + country = server[i+len("://ftp."):l] + if self.countries.has_key(country): + # TRANSLATORS: %s is a country + return _("Server for %s") % \ + gettext.dgettext("iso_3166", + self.countries[country].rstrip()).rstrip() + else: + return("%s" % server.rstrip("/ ")) + + def get_mirrors(self): + Distribution.get_mirrors(self, + mirror_template="http://ftp.%s.debian.org/debian/") class UbuntuDistribution(Distribution): - ''' Class to support specific Ubuntu features ''' + ''' Class to support specific Ubuntu features ''' - def get_mirrors(self): - Distribution.get_mirrors(self, - mirror_template="http://%s.archive.ubuntu.com/ubuntu/") + def get_mirrors(self): + Distribution.get_mirrors(self, + mirror_template="http://%s.archive.ubuntu.com/ubuntu/") def get_distro(): diff --git a/aptsources/sourceslist.py b/aptsources/sourceslist.py index a8a772a0..57be2a68 100644 --- a/aptsources/sourceslist.py +++ b/aptsources/sourceslist.py @@ -41,409 +41,409 @@ from distinfo import DistInfo def is_mirror(master_uri, compare_uri): - """check if the given add_url is idential or a mirror of orig_uri - e.g. master_uri = archive.ubuntu.com - compare_uri = de.archive.ubuntu.com - -> True - """ - # remove traling spaces and "/" - compare_uri = compare_uri.rstrip("/ ") - master_uri = master_uri.rstrip("/ ") - # uri is identical - if compare_uri == master_uri: - #print "Identical" - return True - # add uri is a master site and orig_uri has the from "XX.mastersite" - # (e.g. de.archive.ubuntu.com) - try: - compare_srv = compare_uri.split("//")[1] - master_srv = master_uri.split("//")[1] - #print "%s == %s " % (add_srv, orig_srv) - except IndexError: # ok, somethings wrong here - #print "IndexError" + """ check if the given add_url is idential or a mirror of orig_uri + e.g. master_uri = archive.ubuntu.com + compare_uri = de.archive.ubuntu.com + -> True + """ + # remove traling spaces and "/" + compare_uri = compare_uri.rstrip("/ ") + master_uri = master_uri.rstrip("/ ") + # uri is identical + if compare_uri == master_uri: + #print "Identical" + return True + # add uri is a master site and orig_uri has the from "XX.mastersite" + # (e.g. de.archive.ubuntu.com) + try: + compare_srv = compare_uri.split("//")[1] + master_srv = master_uri.split("//")[1] + #print "%s == %s " % (add_srv, orig_srv) + except IndexError: # ok, somethings wrong here + #print "IndexError" + return False + # remove the leading "<country>." (if any) and see if that helps + if "." in compare_srv and \ + compare_srv[compare_srv.index(".")+1:] == master_srv: + #print "Mirror" + return True return False - # remove the leading "<country>." (if any) and see if that helps - if "." in compare_srv and \ - compare_srv[compare_srv.index(".")+1:] == master_srv: - #print "Mirror" - return True - return False def uniq(s): - """ simple and efficient way to return uniq list """ - return list(set(s)) + """ simple and efficient way to return uniq list """ + return list(set(s)) class SourceEntry: - """ single sources.list entry """ - - def __init__(self, line,file=None): - self.invalid = False # is the source entry valid - self.disabled = False # is it disabled ('#' in front) - self.type = "" # what type (deb, deb-src) - self.uri = "" # base-uri - self.dist = "" # distribution (dapper, edgy, etc) - self.comps = [] # list of available componetns (may empty) - self.comment = "" # (optional) comment - self.line = line # the original sources.list line - if file == None: - file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist") - self.file = file # the file that the entry is located in - self.parse(line) - self.template = None # type DistInfo.Suite - self.children = [] - - def __eq__(self, other): - """ equal operator for two sources.list entries """ - return (self.disabled == other.disabled and - self.type == other.type and - self.uri == other.uri and - self.dist == other.dist and - self.comps == other.comps) - - def mysplit(self, line): - """ a split() implementation that understands the sources.list - format better and takes [] into account (for e.g. cdroms) """ - line = string.strip(line) - pieces = [] - tmp = "" - # we are inside a [..] block - p_found = False - space_found = False - for i in range(len(line)): - if line[i] == "[": - p_found=True - tmp += line[i] - elif line[i] == "]": - p_found=False - tmp += line[i] - elif space_found and not line[i].isspace(): # we skip one or more space + """ single sources.list entry """ + + def __init__(self, line,file=None): + self.invalid = False # is the source entry valid + self.disabled = False # is it disabled ('#' in front) + self.type = "" # what type (deb, deb-src) + self.uri = "" # base-uri + self.dist = "" # distribution (dapper, edgy, etc) + self.comps = [] # list of available componetns (may empty) + self.comment = "" # (optional) comment + self.line = line # the original sources.list line + if file == None: + file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist") + self.file = file # the file that the entry is located in + self.parse(line) + self.template = None # type DistInfo.Suite + self.children = [] + + def __eq__(self, other): + """ equal operator for two sources.list entries """ + return (self.disabled == other.disabled and + self.type == other.type and + self.uri == other.uri and + self.dist == other.dist and + self.comps == other.comps) + + def mysplit(self, line): + """ a split() implementation that understands the sources.list + format better and takes [] into account (for e.g. cdroms) """ + line = string.strip(line) + pieces = [] + tmp = "" + # we are inside a [..] block + p_found = False space_found = False - pieces.append(tmp) - tmp = line[i] - elif line[i].isspace() and not p_found: # found a whitespace - space_found = True - else: - tmp += line[i] - # append last piece - if len(tmp) > 0: - pieces.append(tmp) - return pieces - - def parse(self,line): - """ parse a given sources.list (textual) line and break it up - into the field we have """ - line = string.strip(self.line) - #print line - # check if the source is enabled/disabled - if line == "" or line == "#": # empty line - self.invalid = True - return - if line[0] == "#": - self.disabled = True - pieces = string.split(line[1:]) - # if it looks not like a disabled deb line return - if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"): - self.invalid = True - return - else: - line = line[1:] - # check for another "#" in the line (this is treated as a comment) - i = line.find("#") - if i > 0: - self.comment = line[i+1:] - line = line[:i] - # source is ok, split it and see what we have - pieces = self.mysplit(line) - # Sanity check - if len(pieces) < 3: - self.invalid = True - return - # Type, deb or deb-src - self.type = string.strip(pieces[0]) - # Sanity check - if self.type not in ("deb", "deb-src", "rpm", "rpm-src"): - self.invalid = True - return - # URI - self.uri = string.strip(pieces[1]) - if len(self.uri) < 1: - self.invalid = True - # distro and components (optional) - # Directory or distro - self.dist = string.strip(pieces[2]) - if len(pieces) > 3: - # List of components - self.comps = pieces[3:] - else: - self.comps = [] - - def set_enabled(self, new_value): - """ set a line to enabled or disabled """ - self.disabled = not new_value - # enable, remove all "#" from the start of the line - if new_value == True: - i=0 - self.line = string.lstrip(self.line) - while self.line[i] == "#": - i += 1 - self.line = self.line[i:] - else: - # disabled, add a "#" - if string.strip(self.line)[0] != "#": - self.line = "#" + self.line - - def __str__(self): - """ debug helper """ - return self.str().strip() - - def str(self): - """ return the current line as string """ - if self.invalid: - return self.line - line = "" - if self.disabled: - line = "# " - line += "%s %s %s" % (self.type, self.uri, self.dist) - if len(self.comps) > 0: - line += " " + " ".join(self.comps) - if self.comment != "": - line += " #"+self.comment - line += "\n" - return line + for i in range(len(line)): + if line[i] == "[": + p_found=True + tmp += line[i] + elif line[i] == "]": + p_found=False + tmp += line[i] + elif space_found and not line[i].isspace(): # we skip one or more space + space_found = False + pieces.append(tmp) + tmp = line[i] + elif line[i].isspace() and not p_found: # found a whitespace + space_found = True + else: + tmp += line[i] + # append last piece + if len(tmp) > 0: + pieces.append(tmp) + return pieces + + def parse(self,line): + """ parse a given sources.list (textual) line and break it up + into the field we have """ + line = string.strip(self.line) + #print line + # check if the source is enabled/disabled + if line == "" or line == "#": # empty line + self.invalid = True + return + if line[0] == "#": + self.disabled = True + pieces = string.split(line[1:]) + # if it looks not like a disabled deb line return + if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"): + self.invalid = True + return + else: + line = line[1:] + # check for another "#" in the line (this is treated as a comment) + i = line.find("#") + if i > 0: + self.comment = line[i+1:] + line = line[:i] + # source is ok, split it and see what we have + pieces = self.mysplit(line) + # Sanity check + if len(pieces) < 3: + self.invalid = True + return + # Type, deb or deb-src + self.type = string.strip(pieces[0]) + # Sanity check + if self.type not in ("deb", "deb-src", "rpm", "rpm-src"): + self.invalid = True + return + # URI + self.uri = string.strip(pieces[1]) + if len(self.uri) < 1: + self.invalid = True + # distro and components (optional) + # Directory or distro + self.dist = string.strip(pieces[2]) + if len(pieces) > 3: + # List of components + self.comps = pieces[3:] + else: + self.comps = [] + + def set_enabled(self, new_value): + """ set a line to enabled or disabled """ + self.disabled = not new_value + # enable, remove all "#" from the start of the line + if new_value == True: + i=0 + self.line = string.lstrip(self.line) + while self.line[i] == "#": + i += 1 + self.line = self.line[i:] + else: + # disabled, add a "#" + if string.strip(self.line)[0] != "#": + self.line = "#" + self.line + + def __str__(self): + """ debug helper """ + return self.str().strip() + + def str(self): + """ return the current line as string """ + if self.invalid: + return self.line + line = "" + if self.disabled: + line = "# " + line += "%s %s %s" % (self.type, self.uri, self.dist) + if len(self.comps) > 0: + line += " " + " ".join(self.comps) + if self.comment != "": + line += " #"+self.comment + line += "\n" + return line class NullMatcher(object): - """ a Matcher that does nothing """ + """ a Matcher that does nothing """ - def match(self, s): - return True + def match(self, s): + return True class SourcesList: - """ represents the full sources.list + sources.list.d file """ - - def __init__(self, - withMatcher=True, - matcherPath="/usr/share/python-apt/templates/"): - self.list = [] # the actual SourceEntries Type - if withMatcher: - self.matcher = SourceEntryMatcher(matcherPath) - else: - self.matcher = NullMatcher() - self.refresh() - - def refresh(self): - """ update the list of known entries """ - self.list = [] - # read sources.list - dir = apt_pkg.Config.FindDir("Dir::Etc") - file = apt_pkg.Config.Find("Dir::Etc::sourcelist") - self.load(dir+file) - # read sources.list.d - partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") - for file in glob.glob("%s/*.list" % partsdir): - self.load(file) - # check if the source item fits a predefined template - for source in self.list: - if source.invalid == False: - self.matcher.match(source) - - def __iter__(self): - """ simple iterator to go over self.list, returns SourceEntry - types """ - for entry in self.list: - yield entry - raise StopIteration - - def add(self, type, uri, dist, orig_comps, comment="", pos=-1, file=None): - """ - Add a new source to the sources.list. - The method will search for existing matching repos and will try to - reuse them as far as possible - """ - # create a working copy of the component list so that - # we can modify it later - comps = orig_comps[:] - # check if we have this source already in the sources.list - for source in self.list: - if source.disabled == False and source.invalid == False and \ - source.type == type and uri == source.uri and \ - source.dist == dist: - for new_comp in comps: - if new_comp in source.comps: - # we have this component already, delete it from the new_comps - # list - del comps[comps.index(new_comp)] - if len(comps) == 0: - return source - for source in self.list: - # if there is a repo with the same (type, uri, dist) just add the - # components - if source.disabled == False and source.invalid == False and \ - source.type == type and uri == source.uri and \ - source.dist == dist: - comps = uniq(source.comps + comps) - source.comps = comps - return source - # if there is a corresponding repo which is disabled, enable it - elif source.disabled == True and source.invalid == False and \ - source.type == type and uri == source.uri and \ - source.dist == dist and \ - len(set(source.comps) & set(comps)) == len(comps): - source.disabled = False - return source - # there isn't any matching source, so create a new line and parse it - line = "%s %s %s" % (type,uri,dist) - for c in comps: - line = line + " " + c; - if comment != "": - line = "%s #%s\n" %(line,comment) - line = line + "\n" - new_entry = SourceEntry(line) - if file != None: - new_entry.file = file - self.matcher.match(new_entry) - self.list.insert(pos, new_entry) - return new_entry - - def remove(self, source_entry): - """ remove the specified entry from the sources.list """ - self.list.remove(source_entry) - - def restoreBackup(self, backup_ext): - " restore sources.list files based on the backup extension " - dir = apt_pkg.Config.FindDir("Dir::Etc") - file = apt_pkg.Config.Find("Dir::Etc::sourcelist") - if os.path.exists(dir+file+backup_ext) and \ - os.path.exists(dir+file): - shutil.copy(dir+file+backup_ext,dir+file) - # now sources.list.d - partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") - for file in glob.glob("%s/*.list" % partsdir): - if os.path.exists(file+backup_ext): - shutil.copy(file+backup_ext,file) - - def backup(self, backup_ext=None): - """ make a backup of the current source files, if no backup extension - is given, the current date/time is used (and returned) """ - already_backuped = set() - if backup_ext == None: - backup_ext = time.strftime("%y%m%d.%H%M") - for source in self.list: - if not source.file in already_backuped and os.path.exists(source.file): - shutil.copy(source.file,"%s%s" % (source.file,backup_ext)) - return backup_ext - - def load(self,file): - """ (re)load the current sources """ - try: - f = open(file, "r") - lines = f.readlines() - for line in lines: - source = SourceEntry(line,file) - self.list.append(source) - except: - print "could not open file '%s'" % file - else: - f.close() - - def save(self): - """ save the current sources """ - files = {} - # write an empty default config file if there aren't any sources - if len(self.list) == 0: - path = "%s%s" % (apt_pkg.Config.FindDir("Dir::Etc"), - apt_pkg.Config.Find("Dir::Etc::sourcelist")) - header = ("## See sources.list(5) for more information, especialy\n" - "# Remember that you can only use http, ftp or file URIs\n" - "# CDROMs are managed through the apt-cdrom tool.\n") - open(path,"w").write(header) - return - for source in self.list: - if not files.has_key(source.file): - files[source.file]=open(source.file,"w") - files[source.file].write(source.str()) - for f in files: - files[f].close() - - def check_for_relations(self, sources_list): - """get all parent and child channels in the sources list""" - parents = [] - used_child_templates = {} - for source in sources_list: - # try to avoid checking uninterressting sources - if source.template == None: - continue - # set up a dict with all used child templates and corresponding - # source entries - if source.template.child == True: - key = source.template - if not used_child_templates.has_key(key): - used_child_templates[key] = [] - temp = used_child_templates[key] - temp.append(source) - else: - # store each source with children aka. a parent :) - if len(source.template.children) > 0: - parents.append(source) - #print self.used_child_templates - #print self.parents - return (parents, used_child_templates) + """ represents the full sources.list + sources.list.d file """ + + def __init__(self, + withMatcher=True, + matcherPath="/usr/share/python-apt/templates/"): + self.list = [] # the actual SourceEntries Type + if withMatcher: + self.matcher = SourceEntryMatcher(matcherPath) + else: + self.matcher = NullMatcher() + self.refresh() + + def refresh(self): + """ update the list of known entries """ + self.list = [] + # read sources.list + dir = apt_pkg.Config.FindDir("Dir::Etc") + file = apt_pkg.Config.Find("Dir::Etc::sourcelist") + self.load(dir+file) + # read sources.list.d + partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") + for file in glob.glob("%s/*.list" % partsdir): + self.load(file) + # check if the source item fits a predefined template + for source in self.list: + if source.invalid == False: + self.matcher.match(source) + + def __iter__(self): + """ simple iterator to go over self.list, returns SourceEntry + types """ + for entry in self.list: + yield entry + raise StopIteration + + def add(self, type, uri, dist, orig_comps, comment="", pos=-1, file=None): + """ + Add a new source to the sources.list. + The method will search for existing matching repos and will try to + reuse them as far as possible + """ + # create a working copy of the component list so that + # we can modify it later + comps = orig_comps[:] + # check if we have this source already in the sources.list + for source in self.list: + if source.disabled == False and source.invalid == False and \ + source.type == type and uri == source.uri and \ + source.dist == dist: + for new_comp in comps: + if new_comp in source.comps: + # we have this component already, delete it from the new_comps + # list + del comps[comps.index(new_comp)] + if len(comps) == 0: + return source + for source in self.list: + # if there is a repo with the same (type, uri, dist) just add the + # components + if source.disabled == False and source.invalid == False and \ + source.type == type and uri == source.uri and \ + source.dist == dist: + comps = uniq(source.comps + comps) + source.comps = comps + return source + # if there is a corresponding repo which is disabled, enable it + elif source.disabled == True and source.invalid == False and \ + source.type == type and uri == source.uri and \ + source.dist == dist and \ + len(set(source.comps) & set(comps)) == len(comps): + source.disabled = False + return source + # there isn't any matching source, so create a new line and parse it + line = "%s %s %s" % (type,uri,dist) + for c in comps: + line = line + " " + c; + if comment != "": + line = "%s #%s\n" %(line,comment) + line = line + "\n" + new_entry = SourceEntry(line) + if file != None: + new_entry.file = file + self.matcher.match(new_entry) + self.list.insert(pos, new_entry) + return new_entry + + def remove(self, source_entry): + """ remove the specified entry from the sources.list """ + self.list.remove(source_entry) + + def restoreBackup(self, backup_ext): + " restore sources.list files based on the backup extension " + dir = apt_pkg.Config.FindDir("Dir::Etc") + file = apt_pkg.Config.Find("Dir::Etc::sourcelist") + if os.path.exists(dir+file+backup_ext) and \ + os.path.exists(dir+file): + shutil.copy(dir+file+backup_ext,dir+file) + # now sources.list.d + partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") + for file in glob.glob("%s/*.list" % partsdir): + if os.path.exists(file+backup_ext): + shutil.copy(file+backup_ext,file) + + def backup(self, backup_ext=None): + """ make a backup of the current source files, if no backup extension + is given, the current date/time is used (and returned) """ + already_backuped = set() + if backup_ext == None: + backup_ext = time.strftime("%y%m%d.%H%M") + for source in self.list: + if not source.file in already_backuped and os.path.exists(source.file): + shutil.copy(source.file,"%s%s" % (source.file,backup_ext)) + return backup_ext + + def load(self,file): + """ (re)load the current sources """ + try: + f = open(file, "r") + lines = f.readlines() + for line in lines: + source = SourceEntry(line,file) + self.list.append(source) + except: + print "could not open file '%s'" % file + else: + f.close() + + def save(self): + """ save the current sources """ + files = {} + # write an empty default config file if there aren't any sources + if len(self.list) == 0: + path = "%s%s" % (apt_pkg.Config.FindDir("Dir::Etc"), + apt_pkg.Config.Find("Dir::Etc::sourcelist")) + header = ("## See sources.list(5) for more information, especialy\n" + "# Remember that you can only use http, ftp or file URIs\n" + "# CDROMs are managed through the apt-cdrom tool.\n") + open(path,"w").write(header) + return + for source in self.list: + if not files.has_key(source.file): + files[source.file]=open(source.file,"w") + files[source.file].write(source.str()) + for f in files: + files[f].close() + + def check_for_relations(self, sources_list): + """get all parent and child channels in the sources list""" + parents = [] + used_child_templates = {} + for source in sources_list: + # try to avoid checking uninterressting sources + if source.template == None: + continue + # set up a dict with all used child templates and corresponding + # source entries + if source.template.child == True: + key = source.template + if not used_child_templates.has_key(key): + used_child_templates[key] = [] + temp = used_child_templates[key] + temp.append(source) + else: + # store each source with children aka. a parent :) + if len(source.template.children) > 0: + parents.append(source) + #print self.used_child_templates + #print self.parents + return (parents, used_child_templates) class SourceEntryMatcher: - """ matcher class to make a source entry look nice - lots of predefined matchers to make it i18n/gettext friendly - """ - - def __init__(self, matcherPath): - self.templates = [] - # Get the human readable channel and comp names from the channel .infos - spec_files = glob.glob("%s/*.info" % matcherPath) - for f in spec_files: - f = os.path.basename(f) - i = f.find(".info") - f = f[0:i] - dist = DistInfo(f,base_dir=matcherPath) - for template in dist.templates: - if template.match_uri != None: - self.templates.append(template) - return - - def match(self, source): - """Add a matching template to the source""" - _ = gettext.gettext - found = False - for template in self.templates: - if (re.search(template.match_uri, source.uri) and - re.match(template.match_name, source.dist)): - found = True - source.template = template - break - elif (template.is_mirror(source.uri) and - re.match(template.match_name, source.dist)): - found = True - source.template = template - break - return found + """ matcher class to make a source entry look nice + lots of predefined matchers to make it i18n/gettext friendly + """ + + def __init__(self, matcherPath): + self.templates = [] + # Get the human readable channel and comp names from the channel .infos + spec_files = glob.glob("%s/*.info" % matcherPath) + for f in spec_files: + f = os.path.basename(f) + i = f.find(".info") + f = f[0:i] + dist = DistInfo(f,base_dir=matcherPath) + for template in dist.templates: + if template.match_uri != None: + self.templates.append(template) + return + + def match(self, source): + """Add a matching template to the source""" + _ = gettext.gettext + found = False + for template in self.templates: + if (re.search(template.match_uri, source.uri) and + re.match(template.match_name, source.dist)): + found = True + source.template = template + break + elif (template.is_mirror(source.uri) and + re.match(template.match_name, source.dist)): + found = True + source.template = template + break + return found # some simple tests if __name__ == "__main__": - apt_pkg.InitConfig() - sources = SourcesList() + apt_pkg.InitConfig() + sources = SourcesList() - for entry in sources: - print entry.str() - #print entry.uri + for entry in sources: + print entry.str() + #print entry.uri - mirror = is_mirror("http://archive.ubuntu.com/ubuntu/", - "http://de.archive.ubuntu.com/ubuntu/") - print "is_mirror(): %s" % mirror + mirror = is_mirror("http://archive.ubuntu.com/ubuntu/", + "http://de.archive.ubuntu.com/ubuntu/") + print "is_mirror(): %s" % mirror - print is_mirror("http://archive.ubuntu.com/ubuntu", - "http://de.archive.ubuntu.com/ubuntu/") - print is_mirror("http://archive.ubuntu.com/ubuntu/", - "http://de.archive.ubuntu.com/ubuntu") + print is_mirror("http://archive.ubuntu.com/ubuntu", + "http://de.archive.ubuntu.com/ubuntu/") + print is_mirror("http://archive.ubuntu.com/ubuntu/", + "http://de.archive.ubuntu.com/ubuntu") diff --git a/doc/examples/acquire.py b/doc/examples/acquire.py index 07f8da0e..7bf7d646 100644 --- a/doc/examples/acquire.py +++ b/doc/examples/acquire.py @@ -6,25 +6,25 @@ import tempfile def get_file(fetcher, uri, destFile): - cwd = os.getcwd() - # create a temp dir - dir = tempfile.mkdtemp() - os.chdir(dir) - # get the file - af = apt_pkg.GetPkgAcqFile(fetcher, - uri=uri, - descr="sample descr") - res = fetcher.Run() - if res != fetcher.ResultContinue: - os.rmdir(dir) - os.chdir(cwd) - return False - filename = os.path.basename(uri) - os.rename(dir+"/"+filename,destFile) - # cleanup - os.rmdir(dir) - os.chdir(cwd) - return True + cwd = os.getcwd() + # create a temp dir + dir = tempfile.mkdtemp() + os.chdir(dir) + # get the file + af = apt_pkg.GetPkgAcqFile(fetcher, + uri=uri, + descr="sample descr") + res = fetcher.Run() + if res != fetcher.ResultContinue: + os.rmdir(dir) + os.chdir(cwd) + return False + filename = os.path.basename(uri) + os.rename(dir+"/"+filename,destFile) + # cleanup + os.rmdir(dir) + os.chdir(cwd) + return True apt_pkg.init() diff --git a/doc/examples/build-deps.py b/doc/examples/build-deps.py index 2d83a54f..dc1a6f4e 100755 --- a/doc/examples/build-deps.py +++ b/doc/examples/build-deps.py @@ -7,17 +7,17 @@ import sets # only needed for python2.3, python2.4 supports this naively def get_source_pkg(pkg, records, depcache): - """ get the source package name of a given package """ - version = depcache.GetCandidateVer(pkg) - if not version: - return None - file, index = version.FileList.pop(0) - records.Lookup((file, index)) - if records.SourcePkg != "": - srcpkg = records.SourcePkg - else: - srcpkg = pkg.Name - return srcpkg + """ get the source package name of a given package """ + version = depcache.GetCandidateVer(pkg) + if not version: + return None + file, index = version.FileList.pop(0) + records.Lookup((file, index)) + if records.SourcePkg != "": + srcpkg = records.SourcePkg + else: + srcpkg = pkg.Name + return srcpkg # main @@ -30,45 +30,45 @@ srcrecords = apt_pkg.GetPkgSrcRecords() # base package that we use for build-depends calculation if len(sys.argv) < 2: - print "need a package name as argument" - sys.exit(1) + print "need a package name as argument" + sys.exit(1) try: - base = cache[sys.argv[1]] + base = cache[sys.argv[1]] except KeyError: - print "No package %s found" % sys.argv[1] - sys.exit(1) + print "No package %s found" % sys.argv[1] + sys.exit(1) all_build_depends = sets.Set() # get the build depdends for the package itself srcpkg_name = get_source_pkg(base, records, depcache) print "srcpkg_name: %s " % srcpkg_name if not srcpkg_name: - print "Can't find source package for '%s'" % pkg.Name + print "Can't find source package for '%s'" % pkg.Name srcrec = srcrecords.Lookup(srcpkg_name) if srcrec: - print "Files:" - print srcrecords.Files - bd = srcrecords.BuildDepends - print "build-depends of the package: %s " % bd - for b in bd: - all_build_depends.add(b[0]) + print "Files:" + print srcrecords.Files + bd = srcrecords.BuildDepends + print "build-depends of the package: %s " % bd + for b in bd: + all_build_depends.add(b[0]) # calculate the build depends for all dependencies depends = depcache.GetCandidateVer(base).DependsList for dep in depends["Depends"]: # FIXME: do we need to consider PreDepends? - pkg = dep[0].TargetPkg - srcpkg_name = get_source_pkg(pkg, records, depcache) - if not srcpkg_name: - print "Can't find source package for '%s'" % pkg.Name - continue - srcrec = srcrecords.Lookup(srcpkg_name) - if srcrec: - #print srcrecords.Package - #print srcrecords.Binaries - bd = srcrecords.BuildDepends - #print "%s: %s " % (srcpkg_name, bd) - for b in bd: - all_build_depends.add(b[0]) + pkg = dep[0].TargetPkg + srcpkg_name = get_source_pkg(pkg, records, depcache) + if not srcpkg_name: + print "Can't find source package for '%s'" % pkg.Name + continue + srcrec = srcrecords.Lookup(srcpkg_name) + if srcrec: + #print srcrecords.Package + #print srcrecords.Binaries + bd = srcrecords.BuildDepends + #print "%s: %s " % (srcpkg_name, bd) + for b in bd: + all_build_depends.add(b[0]) print "\n".join(all_build_depends) diff --git a/doc/examples/checkstate.py b/doc/examples/checkstate.py index 2986872f..3368d500 100755 --- a/doc/examples/checkstate.py +++ b/doc/examples/checkstate.py @@ -14,23 +14,23 @@ packages = cache.Packages uninstalled, updated, upgradable = {}, {}, {} for package in packages: - versions = package.VersionList - if not versions: - continue - version = versions[0] - for other_version in versions: - if apt_pkg.VersionCompare(version.VerStr, other_version.VerStr)<0: - version = other_version - if package.CurrentVer: - current = package.CurrentVer - if apt_pkg.VersionCompare(current.VerStr, version.VerStr)<0: - upgradable[package.Name] = version - break - else: - updated[package.Name] = current - else: - uninstalled[package.Name] = version + versions = package.VersionList + if not versions: + continue + version = versions[0] + for other_version in versions: + if apt_pkg.VersionCompare(version.VerStr, other_version.VerStr)<0: + version = other_version + if package.CurrentVer: + current = package.CurrentVer + if apt_pkg.VersionCompare(current.VerStr, version.VerStr)<0: + upgradable[package.Name] = version + break + else: + updated[package.Name] = current + else: + uninstalled[package.Name] = version for l in (uninstalled, updated, upgradable): - print l.items()[0] + print l.items()[0] diff --git a/doc/examples/config.py b/doc/examples/config.py index 222c1331..337899f0 100755 --- a/doc/examples/config.py +++ b/doc/examples/config.py @@ -25,31 +25,31 @@ print "Command line is",sys.argv # Load the default configuration file, InitConfig() does this better.. Cnf.Set("config-file","/etc/apt/apt.conf"); # or Cnf["config-file"] = ".."; if posixpath.exists(Cnf.FindFile("config-file")): - apt_pkg.ReadConfigFile(Cnf,"/etc/apt/apt.conf"); + apt_pkg.ReadConfigFile(Cnf,"/etc/apt/apt.conf"); # Merge the command line arguments into the configuration space Arguments = [('h',"help","help"), ('v',"version","version"), ('q',"quiet","quiet","IntLevel"), ('c',"config-file","","ConfigFile"), - ('o',"option","","ArbItem")] + ('o',"option","","ArbItem")] print "FileNames",apt_pkg.ParseCommandLine(Cnf,Arguments,sys.argv); print "Quiet level selected is",Cnf.FindI("quiet",0); # Do some stuff with it if Cnf.FindB("version",0) == 1: - print "Version selected - 1.1"; + print "Version selected - 1.1"; if Cnf.FindB("help",0) == 1: - print apt_pkg.Package,apt_pkg.Version,"for",apt_pkg.Architecture, \ - "compiled on",apt_pkg.Date,apt_pkg.Time; - print "Hi, I am the help text for this program"; - sys.exit(0); + print apt_pkg.Package,apt_pkg.Version,"for",apt_pkg.Architecture, \ + "compiled on",apt_pkg.Date,apt_pkg.Time; + print "Hi, I am the help text for this program"; + sys.exit(0); print "No help for you, try -h"; # Print the configuration space print "The Configuration space looks like:"; for I in Cnf.keys(): - print "%s \"%s\";"%(I,Cnf[I]); + print "%s \"%s\";"%(I,Cnf[I]); diff --git a/doc/examples/configisc.py b/doc/examples/configisc.py index 1773a919..45a9c7f6 100755 --- a/doc/examples/configisc.py +++ b/doc/examples/configisc.py @@ -13,8 +13,8 @@ import apt_pkg,sys,posixpath; ConfigFile = apt_pkg.ParseCommandLine(apt_pkg.Config,[],sys.argv); if len(ConfigFile) != 1: - print "Must have exactly 1 file name"; - sys.exit(0); + print "Must have exactly 1 file name"; + sys.exit(0); Cnf = apt_pkg.newConfiguration(); apt_pkg.ReadConfigFileISC(Cnf,ConfigFile[0]); @@ -26,14 +26,14 @@ apt_pkg.ReadConfigFileISC(Cnf,ConfigFile[0]); # bind8 config file.. if Cnf.has_key("Zone"): - print "Zones: ",Cnf.SubTree("zone").List(); - for I in Cnf.List("zone"): - SubCnf = Cnf.SubTree(I); - if SubCnf.Find("type") == "slave": - print "Masters for %s: %s"%(SubCnf.MyTag(),SubCnf.ValueList("masters")); + print "Zones: ",Cnf.SubTree("zone").List(); + for I in Cnf.List("zone"): + SubCnf = Cnf.SubTree(I); + if SubCnf.Find("type") == "slave": + print "Masters for %s: %s"%(SubCnf.MyTag(),SubCnf.ValueList("masters")); else: - print "Tree definitions:"; - for I in Cnf.List("tree"): - SubCnf = Cnf.SubTree(I); - # This could use Find which would eliminate the possibility of exceptions. - print "Subtree %s with sections '%s' and architectures '%s'"%(SubCnf.MyTag(),SubCnf["Sections"],SubCnf["Architectures"]); + print "Tree definitions:"; + for I in Cnf.List("tree"): + SubCnf = Cnf.SubTree(I); + # This could use Find which would eliminate the possibility of exceptions. + print "Subtree %s with sections '%s' and architectures '%s'"%(SubCnf.MyTag(),SubCnf["Sections"],SubCnf["Architectures"]); diff --git a/doc/examples/dependant-pkgs.py b/doc/examples/dependant-pkgs.py index 2de420a5..bb10ce70 100755 --- a/doc/examples/dependant-pkgs.py +++ b/doc/examples/dependant-pkgs.py @@ -6,27 +6,27 @@ import sys pkgs = set() cache = apt.Cache() for pkg in cache: - candver = cache._depcache.GetCandidateVer(pkg._pkg) - if candver == None: - continue - dependslist = candver.DependsList - for dep in dependslist.keys(): - # get the list of each dependency object - for depVerList in dependslist[dep]: - for z in depVerList: - # get all TargetVersions of - # the dependency object - for tpkg in z.AllTargets(): - if sys.argv[1] == tpkg.ParentPkg.Name: - pkgs.add(pkg.name) + candver = cache._depcache.GetCandidateVer(pkg._pkg) + if candver == None: + continue + dependslist = candver.DependsList + for dep in dependslist.keys(): + # get the list of each dependency object + for depVerList in dependslist[dep]: + for z in depVerList: + # get all TargetVersions of + # the dependency object + for tpkg in z.AllTargets(): + if sys.argv[1] == tpkg.ParentPkg.Name: + pkgs.add(pkg.name) main = set() universe = set() for pkg in pkgs: - if "universe" in cache[pkg].section: - universe.add(cache[pkg].sourcePackageName) - else: - main.add(cache[pkg].sourcePackageName) + if "universe" in cache[pkg].section: + universe.add(cache[pkg].sourcePackageName) + else: + main.add(cache[pkg].sourcePackageName) print "main:" print "\n".join(main) diff --git a/doc/examples/gui-inst.py b/doc/examples/gui-inst.py index 28887d34..edaec455 100755 --- a/doc/examples/gui-inst.py +++ b/doc/examples/gui-inst.py @@ -22,51 +22,51 @@ from apt.progress import OpProgress, FetchProgress, InstallProgress class GuiFetchProgress(gtk.Window, FetchProgress): def __init__(self): - gtk.Window.__init__(self) - self.vbox = gtk.VBox() - self.vbox.show() - self.add(self.vbox) - self.progress = gtk.ProgressBar() - self.progress.show() - self.label = gtk.Label() - self.label.show() - self.vbox.pack_start(self.progress) - self.vbox.pack_start(self.label) - self.resize(300,100) + gtk.Window.__init__(self) + self.vbox = gtk.VBox() + self.vbox.show() + self.add(self.vbox) + self.progress = gtk.ProgressBar() + self.progress.show() + self.label = gtk.Label() + self.label.show() + self.vbox.pack_start(self.progress) + self.vbox.pack_start(self.label) + self.resize(300,100) def start(self): print "start" - self.progress.set_fraction(0.0) + self.progress.set_fraction(0.0) self.show() def stop(self): - self.hide() + self.hide() def pulse(self): FetchProgress.pulse(self) self.label.set_text("Speed: %s/s" % apt_pkg.SizeToStr(self.currentCPS)) - #self.progressbar.set_fraction(self.currentBytes/self.totalBytes) - while gtk.events_pending(): - gtk.main_iteration() + #self.progressbar.set_fraction(self.currentBytes/self.totalBytes) + while gtk.events_pending(): + gtk.main_iteration() return True class TermInstallProgress(InstallProgress, gtk.Window): def __init__(self): - gtk.Window.__init__(self) + gtk.Window.__init__(self) InstallProgress.__init__(self) - self.show() + self.show() box = gtk.VBox() box.show() self.add(box) - self.term = vte.Terminal() - self.term.show() + self.term = vte.Terminal() + self.term.show() # check for the child self.reaper = vte.reaper_get() self.reaper.connect("child-exited",self.child_exited) self.finished = False - box.pack_start(self.term) + box.pack_start(self.term) self.progressbar = gtk.ProgressBar() self.progressbar.show() box.pack_start(self.progressbar) @@ -115,16 +115,16 @@ iprogress = TermInstallProgress() # show the interface while gtk.events_pending(): - gtk.main_iteration() + gtk.main_iteration() pkg = cache["3dchess"] print "\n%s"%pkg.name # install or remove, the importend thing is to keep us busy :) if pkg.isInstalled: - pkg.markDelete() + pkg.markDelete() else: - pkg.markInstall() + pkg.markInstall() cache.commit(fprogress, iprogress) print "Exiting" diff --git a/doc/examples/inst.py b/doc/examples/inst.py index d1e2ff99..b37ab4cd 100644 --- a/doc/examples/inst.py +++ b/doc/examples/inst.py @@ -11,23 +11,23 @@ from apt.progress import InstallProgress class TextInstallProgress(InstallProgress): - def __init__(self): - apt.progress.InstallProgress.__init__(self) - self.last = 0.0 + def __init__(self): + apt.progress.InstallProgress.__init__(self) + self.last = 0.0 - def updateInterface(self): - InstallProgress.updateInterface(self) - if self.last >= self.percent: - return - sys.stdout.write("\r[%s] %s\n" %(self.percent, self.status)) - sys.stdout.flush() - self.last = self.percent + def updateInterface(self): + InstallProgress.updateInterface(self) + if self.last >= self.percent: + return + sys.stdout.write("\r[%s] %s\n" %(self.percent, self.status)) + sys.stdout.flush() + self.last = self.percent - def conffile(self,current,new): - print "conffile prompt: %s %s" % (current,new) + def conffile(self,current,new): + print "conffile prompt: %s %s" % (current,new) - def error(self, errorstr): - print "got dpkg error: '%s'" % errorstr + def error(self, errorstr): + print "got dpkg error: '%s'" % errorstr cache = apt.Cache(apt.progress.OpTextProgress()) @@ -39,11 +39,11 @@ pkg = cache["3dchess"] # install or remove, the importend thing is to keep us busy :) if pkg.isInstalled: - print "Going to delete %s" % pkg.name - pkg.markDelete() + print "Going to delete %s" % pkg.name + pkg.markDelete() else: - print "Going to install %s" % pkg.name - pkg.markInstall() + print "Going to install %s" % pkg.name + pkg.markInstall() res = cache.commit(fprogress, iprogress) print res diff --git a/doc/examples/print_uris.py b/doc/examples/print_uris.py index c8a64223..3c93a668 100755 --- a/doc/examples/print_uris.py +++ b/doc/examples/print_uris.py @@ -12,11 +12,11 @@ upgradable = filter(lambda p: p.isUpgradable, cache) for pkg in upgradable: - pkg._lookupRecord(True) - path = apt_pkg.ParseSection(pkg._records.Record)["Filename"] - cand = pkg._depcache.GetCandidateVer(pkg._pkg) - for (packagefile,i) in cand.FileList: - indexfile = cache._list.FindIndex(packagefile) - if indexfile: - uri = indexfile.ArchiveURI(path) - print uri + pkg._lookupRecord(True) + path = apt_pkg.ParseSection(pkg._records.Record)["Filename"] + cand = pkg._depcache.GetCandidateVer(pkg._pkg) + for (packagefile,i) in cand.FileList: + indexfile = cache._list.FindIndex(packagefile) + if indexfile: + uri = indexfile.ArchiveURI(path) + print uri diff --git a/doc/examples/progress.py b/doc/examples/progress.py index 39e73e70..d8f00a52 100644 --- a/doc/examples/progress.py +++ b/doc/examples/progress.py @@ -41,8 +41,8 @@ class TextFetchProgress(apt.FetchProgress): return True def mediaChange(self, medium, drive): - print "Please insert medium %s in drive %s" % (medium, drive) - sys.stdin.readline() + print "Please insert medium %s in drive %s" % (medium, drive) + sys.stdin.readline() #return False diff --git a/doc/examples/recommends.py b/doc/examples/recommends.py index 03d46068..f0b3b1be 100755 --- a/doc/examples/recommends.py +++ b/doc/examples/recommends.py @@ -8,32 +8,32 @@ cache = apt_pkg.GetCache() class Wanted: - def __init__(self, name): - self.name = name - self.recommended = [] - self.suggested = [] + def __init__(self, name): + self.name = name + self.recommended = [] + self.suggested = [] wanted = {} for package in cache.Packages: - current = package.CurrentVer - if not current: - continue - depends = current.DependsList - for (key, attr) in (('Suggests', 'suggested'), - ('Recommends', 'recommended')): - list = depends.get(key, []) - for dependency in list: - name = dependency[0].TargetPkg.Name - dep = cache[name] - if dep.CurrentVer: - continue - getattr(wanted.setdefault(name, Wanted(name)), - attr).append(package.Name) + current = package.CurrentVer + if not current: + continue + depends = current.DependsList + for (key, attr) in (('Suggests', 'suggested'), + ('Recommends', 'recommended')): + list = depends.get(key, []) + for dependency in list: + name = dependency[0].TargetPkg.Name + dep = cache[name] + if dep.CurrentVer: + continue + getattr(wanted.setdefault(name, Wanted(name)), + attr).append(package.Name) ks = wanted.keys() ks.sort() for want in ks: - print want, wanted[want].recommended, wanted[want].suggested + print want, wanted[want].recommended, wanted[want].suggested diff --git a/doc/examples/records.py b/doc/examples/records.py index ef04b555..9dfc460b 100755 --- a/doc/examples/records.py +++ b/doc/examples/records.py @@ -5,8 +5,8 @@ import apt cache = apt.Cache() for pkg in cache: - if not pkg.candidateRecord: - continue - if pkg.candidateRecord.has_key("Task"): - print "Pkg %s is part of '%s'" % (pkg.name, pkg.candidateRecord["Task"].split()) - #print pkg.candidateRecord + if not pkg.candidateRecord: + continue + if pkg.candidateRecord.has_key("Task"): + print "Pkg %s is part of '%s'" % (pkg.name, pkg.candidateRecord["Task"].split()) + #print pkg.candidateRecord diff --git a/doc/examples/sources.py b/doc/examples/sources.py index b48c0ba5..8d807f4c 100644 --- a/doc/examples/sources.py +++ b/doc/examples/sources.py @@ -10,6 +10,6 @@ apt_pkg.init() sources = apt_pkg.GetPkgSrcRecords() sources.Restart() while sources.Lookup('hello'): - print sources.Package, sources.Version, sources.Maintainer, sources.Section, `sources.Binaries` - print sources.Files - print sources.Index.ArchiveURI(sources.Files[0][2]) + print sources.Package, sources.Version, sources.Maintainer, sources.Section, `sources.Binaries` + print sources.Files + print sources.Index.ArchiveURI(sources.Files[0][2]) diff --git a/doc/examples/tagfile.py b/doc/examples/tagfile.py index 653c0a71..aeba34d2 100755 --- a/doc/examples/tagfile.py +++ b/doc/examples/tagfile.py @@ -4,5 +4,5 @@ import apt_pkg Parse = apt_pkg.ParseTagFile(open("/var/lib/dpkg/status","r")); while Parse.Step() == 1: - print Parse.Section.get("Package"); - print apt_pkg.ParseDepends(Parse.Section.get("Depends","")); + print Parse.Section.get("Package"); + print apt_pkg.ParseDepends(Parse.Section.get("Depends","")); diff --git a/doc/examples/versiontest.py b/doc/examples/versiontest.py index c4e5f44d..8f18f6c8 100755 --- a/doc/examples/versiontest.py +++ b/doc/examples/versiontest.py @@ -7,30 +7,30 @@ apt_pkg.InitSystem(); TestFile = apt_pkg.ParseCommandLine(apt_pkg.Config,[],sys.argv); if len(TestFile) != 1: - print "Must have exactly 1 file name"; - sys.exit(0); + print "Must have exactly 1 file name"; + sys.exit(0); # Go over the file.. List = open(TestFile[0],"r"); CurLine = 0; while(1): - Line = List.readline(); - CurLine = CurLine + 1; - if Line == "": - break; - Line = string.strip(Line); - if len(Line) == 0 or Line[0] == '#': - continue; + Line = List.readline(); + CurLine = CurLine + 1; + if Line == "": + break; + Line = string.strip(Line); + if len(Line) == 0 or Line[0] == '#': + continue; - Split = re.split("[ \n]",Line); + Split = re.split("[ \n]",Line); - # Check forward - if apt_pkg.VersionCompare(Split[0],Split[1]) != int(Split[2]): - print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine, - Split[0],Split[1],apt_pkg.VersionCompare(Split[0],Split[1]), - int(Split[2])); - # Check reverse - if apt_pkg.VersionCompare(Split[1],Split[0]) != -1*int(Split[2]): - print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine, - Split[1],Split[0],apt_pkg.VersionCompare(Split[1],Split[0]), - -1*int(Split[2])); + # Check forward + if apt_pkg.VersionCompare(Split[0],Split[1]) != int(Split[2]): + print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine, + Split[0],Split[1],apt_pkg.VersionCompare(Split[0],Split[1]), + int(Split[2])); + # Check reverse + if apt_pkg.VersionCompare(Split[1],Split[0]) != -1*int(Split[2]): + print "Comparision failed on line %u. '%s' ? '%s' %i != %i"%(CurLine, + Split[1],Split[0],apt_pkg.VersionCompare(Split[1],Split[0]), + -1*int(Split[2])); diff --git a/tests/cache.py b/tests/cache.py index 34535e68..24732578 100644 --- a/tests/cache.py +++ b/tests/cache.py @@ -8,44 +8,44 @@ import sys def main(): - apt_pkg.init() - cache = apt_pkg.GetCache() - depcache = apt_pkg.GetDepCache(cache) - depcache.Init() - i=0 - all=cache.PackageCount - print "Running Cache test on all packages:" - # first, get all pkgs - for pkg in cache.Packages: - i += 1 - x = pkg.Name - # then get each version - for ver in pkg.VersionList: - # get some version information - a = ver.FileList - b = ver.VerStr - c = ver.Arch - d = ver.DependsListStr - dl = ver.DependsList - # get all dependencies (a dict of string->list, - # e.g. "depends:" -> [ver1,ver2,..] - for dep in dl.keys(): - # get the list of each dependency object - for depVerList in dl[dep]: - for z in depVerList: - # get all TargetVersions of - # the dependency object - for j in z.AllTargets(): - f = j.FileList - g = ver.VerStr - h = ver.Arch - k = ver.DependsListStr - j = ver.DependsList - pass + apt_pkg.init() + cache = apt_pkg.GetCache() + depcache = apt_pkg.GetDepCache(cache) + depcache.Init() + i=0 + all=cache.PackageCount + print "Running Cache test on all packages:" + # first, get all pkgs + for pkg in cache.Packages: + i += 1 + x = pkg.Name + # then get each version + for ver in pkg.VersionList: + # get some version information + a = ver.FileList + b = ver.VerStr + c = ver.Arch + d = ver.DependsListStr + dl = ver.DependsList + # get all dependencies (a dict of string->list, + # e.g. "depends:" -> [ver1,ver2,..] + for dep in dl.keys(): + # get the list of each dependency object + for depVerList in dl[dep]: + for z in depVerList: + # get all TargetVersions of + # the dependency object + for j in z.AllTargets(): + f = j.FileList + g = ver.VerStr + h = ver.Arch + k = ver.DependsListStr + j = ver.DependsList + pass - print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)), + print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)), if __name__ == "__main__": - main() - sys.exit(0) + main() + sys.exit(0) diff --git a/tests/depcache.py b/tests/depcache.py index 635fea14..32c510f0 100644 --- a/tests/depcache.py +++ b/tests/depcache.py @@ -8,46 +8,46 @@ import sys def main(): - apt_pkg.init() - cache = apt_pkg.GetCache() - depcache = apt_pkg.GetDepCache(cache) - depcache.Init() - i=0 - all=cache.PackageCount - print "Running DepCache test on all packages" - print "(trying to install each and then mark it keep again):" - # first, get all pkgs - for pkg in cache.Packages: - i += 1 - x = pkg.Name - # then get each version - ver =depcache.GetCandidateVer(pkg) - if ver != None: - depcache.MarkInstall(pkg) - if depcache.InstCount == 0: - if depcache.IsUpgradable(pkg): - print "Error marking %s for install" % x - for p in cache.Packages: - if depcache.MarkedInstall(p): - depcache.MarkKeep(p) - if depcache.InstCount != 0: - print "Error undoing the selection for %s (InstCount: %s)" % (x,depcache.InstCount) - print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)), + apt_pkg.init() + cache = apt_pkg.GetCache() + depcache = apt_pkg.GetDepCache(cache) + depcache.Init() + i=0 + all=cache.PackageCount + print "Running DepCache test on all packages" + print "(trying to install each and then mark it keep again):" + # first, get all pkgs + for pkg in cache.Packages: + i += 1 + x = pkg.Name + # then get each version + ver =depcache.GetCandidateVer(pkg) + if ver != None: + depcache.MarkInstall(pkg) + if depcache.InstCount == 0: + if depcache.IsUpgradable(pkg): + print "Error marking %s for install" % x + for p in cache.Packages: + if depcache.MarkedInstall(p): + depcache.MarkKeep(p) + if depcache.InstCount != 0: + print "Error undoing the selection for %s (InstCount: %s)" % (x,depcache.InstCount) + print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)), - print - print "Trying Upgrade:" - depcache.Upgrade() - print "To install: %s " % depcache.InstCount - print "To remove: %s " % depcache.DelCount - print "Kept back: %s " % depcache.KeepCount + print + print "Trying Upgrade:" + depcache.Upgrade() + print "To install: %s " % depcache.InstCount + print "To remove: %s " % depcache.DelCount + print "Kept back: %s " % depcache.KeepCount - print "Trying DistUpgrade:" - depcache.Upgrade(True) - print "To install: %s " % depcache.InstCount - print "To remove: %s " % depcache.DelCount - print "Kept back: %s " % depcache.KeepCount + print "Trying DistUpgrade:" + depcache.Upgrade(True) + print "To install: %s " % depcache.InstCount + print "To remove: %s " % depcache.DelCount + print "Kept back: %s " % depcache.KeepCount if __name__ == "__main__": - main() - sys.exit(0) + main() + sys.exit(0) diff --git a/tests/lock.py b/tests/lock.py index a3d962d7..aeab804b 100644 --- a/tests/lock.py +++ b/tests/lock.py @@ -21,7 +21,7 @@ if __name__ == "__main__": apt_pkg.PkgSystemLock() except SystemError, s: print "Can't get lock: (error text:\n%s)" % s - sys.exit(0) + sys.exit(0) apt_pkg.PkgSystemUnLock() @@ -35,7 +35,7 @@ if __name__ == "__main__": # child fd = apt_pkg.GetLock(lock,False) print "Lockfile fd (child): %s" % fd - sys.exit(0) + sys.exit(0) # try to get lock with error flag pid = os.fork() @@ -43,4 +43,4 @@ if __name__ == "__main__": # child fd = apt_pkg.GetLock(lock,True) print "Lockfile fd (child): %s" % fd - sys.exit(0) + sys.exit(0) diff --git a/tests/memleak.py b/tests/memleak.py index 58a2f886..6eece9d4 100755 --- a/tests/memleak.py +++ b/tests/memleak.py @@ -11,36 +11,36 @@ cache = apt.Cache() # memleak for i in range(100): - cache.open(None) - print cache["apt"].name - time.sleep(1) - gc.collect() - f = open("%s" % i,"w") - for obj in gc.get_objects(): - f.write("%s\n" % str(obj)) - f.close() + cache.open(None) + print cache["apt"].name + time.sleep(1) + gc.collect() + f = open("%s" % i,"w") + for obj in gc.get_objects(): + f.write("%s\n" % str(obj)) + f.close() # memleak #for i in range(100): -# cache = apt.Cache() -# time.sleep(1) -# cache = None -# gc.collect() +# cache = apt.Cache() +# time.sleep(1) +# cache = None +# gc.collect() # no memleak, but more or less the apt.Cache.open() code for i in range(100): - cache = apt_pkg.GetCache() - depcache = apt_pkg.GetDepCache(cache) - records = apt_pkg.GetPkgRecords(cache) - list = apt_pkg.GetPkgSourceList() - list.ReadMainList() - dict = {} - for pkg in cache.Packages: - if len(pkg.VersionList) > 0: - dict[pkg.Name] = apt.Package(cache,depcache, - records, list, None, pkg) - - print cache["apt"] - time.sleep(1) - - gc.collect() + cache = apt_pkg.GetCache() + depcache = apt_pkg.GetDepCache(cache) + records = apt_pkg.GetPkgRecords(cache) + list = apt_pkg.GetPkgSourceList() + list.ReadMainList() + dict = {} + for pkg in cache.Packages: + if len(pkg.VersionList) > 0: + dict[pkg.Name] = apt.Package(cache,depcache, + records, list, None, pkg) + + print cache["apt"] + time.sleep(1) + + gc.collect() diff --git a/tests/pkgproblemresolver.py b/tests/pkgproblemresolver.py index 546e2f16..82186d40 100644 --- a/tests/pkgproblemresolver.py +++ b/tests/pkgproblemresolver.py @@ -8,63 +8,63 @@ import sys def main(): - apt_pkg.init() - cache = apt_pkg.GetCache() - depcache = apt_pkg.GetDepCache(cache) - depcache.Init() - i=0 - all=cache.PackageCount - print "Running DepCache test on all packages" - print "(trying to install each and then mark it keep again):" - # first, get all pkgs - for pkg in cache.Packages: - i += 1 - x = pkg.Name - # then get each version - ver =depcache.GetCandidateVer(pkg) - if ver != None: - depcache.MarkInstall(pkg) - if depcache.BrokenCount > 0: - fixer = apt_pkg.GetPkgProblemResolver(depcache) - fixer.Clear(pkg) - fixer.Protect(pkg) - # we first try to resolve the problem - # with the package that should be installed - # protected - try: - fixer.Resolve(True) - except SystemError: - # the pkg seems to be broken, the - # returns a exception - fixer.Clear(pkg) - fixer.Resolve(True) - if not depcache.MarkedInstall(pkg): - print "broken in archive: %s " % pkg.Name - fixer = None - if depcache.InstCount == 0: - if depcache.IsUpgradable(pkg): - print "Error marking %s for install" % x - for p in cache.Packages: - if depcache.MarkedInstall(p) or depcache.MarkedUpgrade(p): - depcache.MarkKeep(p) - if depcache.InstCount != 0: - print "Error undoing the selection for %s" % x - print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)), + apt_pkg.init() + cache = apt_pkg.GetCache() + depcache = apt_pkg.GetDepCache(cache) + depcache.Init() + i=0 + all=cache.PackageCount + print "Running DepCache test on all packages" + print "(trying to install each and then mark it keep again):" + # first, get all pkgs + for pkg in cache.Packages: + i += 1 + x = pkg.Name + # then get each version + ver =depcache.GetCandidateVer(pkg) + if ver != None: + depcache.MarkInstall(pkg) + if depcache.BrokenCount > 0: + fixer = apt_pkg.GetPkgProblemResolver(depcache) + fixer.Clear(pkg) + fixer.Protect(pkg) + # we first try to resolve the problem + # with the package that should be installed + # protected + try: + fixer.Resolve(True) + except SystemError: + # the pkg seems to be broken, the + # returns a exception + fixer.Clear(pkg) + fixer.Resolve(True) + if not depcache.MarkedInstall(pkg): + print "broken in archive: %s " % pkg.Name + fixer = None + if depcache.InstCount == 0: + if depcache.IsUpgradable(pkg): + print "Error marking %s for install" % x + for p in cache.Packages: + if depcache.MarkedInstall(p) or depcache.MarkedUpgrade(p): + depcache.MarkKeep(p) + if depcache.InstCount != 0: + print "Error undoing the selection for %s" % x + print "\r%i/%i=%.3f%% " % (i,all,(float(i)/float(all)*100)), - print - print "Trying Upgrade:" - depcache.Upgrade() - print "To install: %s " % depcache.InstCount - print "To remove: %s " % depcache.DelCount - print "Kept back: %s " % depcache.KeepCount + print + print "Trying Upgrade:" + depcache.Upgrade() + print "To install: %s " % depcache.InstCount + print "To remove: %s " % depcache.DelCount + print "Kept back: %s " % depcache.KeepCount - print "Trying DistUpgrade:" - depcache.Upgrade(True) - print "To install: %s " % depcache.InstCount - print "To remove: %s " % depcache.DelCount - print "Kept back: %s " % depcache.KeepCount + print "Trying DistUpgrade:" + depcache.Upgrade(True) + print "To install: %s " % depcache.InstCount + print "To remove: %s " % depcache.DelCount + print "Kept back: %s " % depcache.KeepCount if __name__ == "__main__": - main() - sys.exit(0) + main() + sys.exit(0) diff --git a/tests/pkgrecords.py b/tests/pkgrecords.py index 308505f7..72096463 100644 --- a/tests/pkgrecords.py +++ b/tests/pkgrecords.py @@ -9,30 +9,30 @@ import sys def main(): - apt_pkg.init() - cache = apt_pkg.GetCache() - depcache = apt_pkg.GetDepCache(cache) - depcache.Init() - i=0 - print "Running PkgRecords test on all packages:" - for pkg in cache.Packages: - i += 1 - records = apt_pkg.GetPkgRecords(cache) - if len(pkg.VersionList) == 0: - #print "no available version, cruft" - continue - version = depcache.GetCandidateVer(pkg) - if not version: - continue - file, index = version.FileList.pop(0) - if records.Lookup((file,index)): - #print records.FileName - x = records.FileName - y = records.LongDesc - pass - print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)), + apt_pkg.init() + cache = apt_pkg.GetCache() + depcache = apt_pkg.GetDepCache(cache) + depcache.Init() + i=0 + print "Running PkgRecords test on all packages:" + for pkg in cache.Packages: + i += 1 + records = apt_pkg.GetPkgRecords(cache) + if len(pkg.VersionList) == 0: + #print "no available version, cruft" + continue + version = depcache.GetCandidateVer(pkg) + if not version: + continue + file, index = version.FileList.pop(0) + if records.Lookup((file,index)): + #print records.FileName + x = records.FileName + y = records.LongDesc + pass + print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)), if __name__ == "__main__": - main() - sys.exit(0) + main() + sys.exit(0) diff --git a/tests/pkgsrcrecords.py b/tests/pkgsrcrecords.py index 410140c8..3eb0fcab 100644 --- a/tests/pkgsrcrecords.py +++ b/tests/pkgsrcrecords.py @@ -9,19 +9,19 @@ import sys def main(): - apt_pkg.init() - cache = apt_pkg.GetCache() - i=0 - print "Running PkgSrcRecords test on all packages:" - for x in cache.Packages: - i += 1 - src = apt_pkg.GetPkgSrcRecords() - if src.Lookup(x.Name): - #print src.Package - pass - print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)), + apt_pkg.init() + cache = apt_pkg.GetCache() + i=0 + print "Running PkgSrcRecords test on all packages:" + for x in cache.Packages: + i += 1 + src = apt_pkg.GetPkgSrcRecords() + if src.Lookup(x.Name): + #print src.Package + pass + print "\r%i/%i=%.3f%% " % (i,cache.PackageCount, (float(i)/float(cache.PackageCount)*100)), if __name__ == "__main__": - main() - sys.exit(0) + main() + sys.exit(0) diff --git a/utils/get_ubuntu_mirrors.py b/utils/get_ubuntu_mirrors.py index 8482615d..ddd1e369 100755 --- a/utils/get_ubuntu_mirrors.py +++ b/utils/get_ubuntu_mirrors.py @@ -42,10 +42,10 @@ try: uri=urllib2.urlopen(req) p = re.compile('^.*((http|ftp):\/\/[A-Za-z0-9-.:\/_]+).*\n*$') for line in uri.readlines(): - if r"[[Anchor(dvd-images)]]" in line: - break - if "http://" in line or "ftp://" in line: - mirrors.append(p.sub(r"\1", line)) + if r"[[Anchor(dvd-images)]]" in line: + break + if "http://" in line or "ftp://" in line: + mirrors.append(p.sub(r"\1", line)) uri.close() except: print "Failed to download or extract the mirrors list!" |
