diff options
| author | Sebastian Heinlein <sebi@sebi-laptop> | 2006-12-04 06:13:09 +0100 |
|---|---|---|
| committer | Sebastian Heinlein <sebi@sebi-laptop> | 2006-12-04 06:13:09 +0100 |
| commit | 4ed8672b734132995f16ccb6fa99fe105dd56a48 (patch) | |
| tree | f2782fea6b9832e9e485de2d00f8851757aa9bb5 /AptSources/aptsources.py | |
| parent | 21a2e05e903ace1ba6ac977811074146973fb4f4 (diff) | |
| parent | 359db816dd1d9aecce9800742fdc1ed45846ee7e (diff) | |
| download | python-apt-4ed8672b734132995f16ccb6fa99fe105dd56a48.tar.gz | |
* merge with ubuntu
Diffstat (limited to 'AptSources/aptsources.py')
| -rw-r--r-- | AptSources/aptsources.py | 316 |
1 files changed, 224 insertions, 92 deletions
diff --git a/AptSources/aptsources.py b/AptSources/aptsources.py index e29114e1..45441cdd 100644 --- a/AptSources/aptsources.py +++ b/AptSources/aptsources.py @@ -32,7 +32,7 @@ import shutil import time import os.path import sys - +from gettext import gettext as _ #import pdb #from UpdateManager.Common.DistInfo import DistInfo @@ -141,7 +141,7 @@ class SourceEntry: self.disabled = True pieces = string.split(line[1:]) # if it looks not like a disabled deb line return - if not (pieces[0] == "deb" or pieces[0] == "deb-src"): + if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"): self.invalid = True return else: @@ -160,7 +160,7 @@ class SourceEntry: # Type, deb or deb-src self.type = string.strip(pieces[0]) # Sanity check - if self.type not in ("deb", "deb-src"): + if self.type not in ("deb", "deb-src", "rpm", "rpm-src"): self.invalid = True return # URI @@ -306,7 +306,8 @@ class SourcesList: " restore sources.list files based on the backup extension " dir = apt_pkg.Config.FindDir("Dir::Etc") file = apt_pkg.Config.Find("Dir::Etc::sourcelist") - if os.path.exists(dir+file+backup_ext): + if os.path.exists(dir+file+backup_ext) and \ + os.path.exists(dir+file): shutil.copy(dir+file+backup_ext,dir+file) # now sources.list.d partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") @@ -321,7 +322,7 @@ class SourcesList: if backup_ext == None: backup_ext = time.strftime("%y%m%d.%H%M") for source in self.list: - if not source.file in already_backuped: + if not source.file in already_backuped and os.path.exists(source.file): shutil.copy(source.file,"%s%s" % (source.file,backup_ext)) return backup_ext @@ -421,40 +422,24 @@ class SourceEntryMatcher: return found class Distribution: - def __init__(self): + def __init__(self, id, codename, description, release): """ Container for distribution specific informations """ # LSB information - self.id = "" - self.codename = "" - self.description = "" - self.release = "" - - # get the LSB information - lsb_info = [] - for lsb_option in ["-i", "-c", "-d", "-r"]: - pipe = os.popen("lsb_release %s -s" % lsb_option) - lsb_info.append(pipe.read().strip()) - del pipe - (self.id, self.codename, self.description, self.release) = lsb_info + self.id = id + self.codename = codename + self.description = description + self.release = release - # get a list of country codes and real names - self.countries = {} - try: - f = open("/usr/share/iso-codes/iso_3166.tab", "r") - lines = f.readlines() - for line in lines: - parts = line.split("\t") - self.countries[parts[0].lower()] = parts[1] - except: - print "could not open file '%s'" % file - else: - f.close() + self.binary_type = "deb" + self.source_type = "deb-src" - def get_sources(self, sources_list): + def get_sources(self, sourceslist): """ Find the corresponding template, main and child sources for the distribution """ + + self.sourceslist = sourceslist # corresponding sources self.source_template = None self.child_sources = [] @@ -475,8 +460,8 @@ class Distribution: self.used_servers = [] # find the distro template - for template in sources_list.matcher.templates: - if template.name == self.codename and\ + for template in self.sourceslist.matcher.templates: + if self.is_codename(template.name) and\ template.distribution == self.id: #print "yeah! found a template for %s" % self.description #print template.description, template.base_uri, template.components @@ -493,11 +478,11 @@ class Distribution: cdrom_comps = [] enabled_comps = [] source_code = [] - for source in sources_list.list: + for source in self.sourceslist.list: if source.invalid == False and\ - source.dist == self.codename and\ + self.is_codename(source.dist) and\ source.template and\ - source.template.name == self.codename: + self.is_codename(source.template.name): #print "yeah! found a distro repo: %s" % source.line # cdroms need do be handled differently if source.uri.startswith("cdrom:") and \ @@ -507,21 +492,23 @@ class Distribution: elif source.uri.startswith("cdrom:") and \ source.disabled == True: self.cdrom_sources.append(source) - elif source.type == "deb" and source.disabled == False: + elif source.type == self.binary_type and \ + source.disabled == False: self.main_sources.append(source) comps.extend(source.comps) media.append(source.uri) - elif source.type == "deb" and source.disabled == True: + elif source.type == self.binary_type and \ + source.disabled == True: self.disabled_sources.append(source) - elif source.type.endswith("-src") and source.disabled == False: + elif source.type == self.source_type and source.disabled == False: self.source_code_sources.append(source) - elif source.type.endswith("-src") and source.disabled == True: + elif source.type == self.source_type and source.disabled == True: self.disabled_sources.append(source) if source.invalid == False and\ source.template in self.source_template.children: - if source.disabled == False and source.type == "deb": + if source.disabled == False and source.type == self.binary_type: self.child_sources.append(source) - elif source.disabled == False and source.type == "deb-src": + elif source.disabled == False and source.type == self.source_type: self.source_code_sources.append(source) else: self.disabled_sources.append(source) @@ -541,21 +528,6 @@ class Distribution: # the main server is stored in the template self.main_server = self.source_template.base_uri - # try to guess the nearest mirror from the locale - # FIXME: for debian we need something different - self.country = None - if self.id == "Ubuntu": - locale = os.getenv("LANG", default="en.UK") - a = locale.find("_") - z = locale.find(".") - if z == -1: - z = len(locale) - country_code = locale[a+1:z].lower() - self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \ - country_code - if self.countries.has_key(country_code): - self.country = self.countries[country_code] - # other used servers for medium in self.used_media: if not medium.startswith("cdrom:"): @@ -567,7 +539,7 @@ class Distribution: else: self.default_server = self.main_sources[0].uri - def add_source(self, sources_list, type=None, + def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""): """ Add distribution specific sources @@ -580,18 +552,16 @@ class Distribution: if comps == None: comps = list(self.enabled_comps) if type == None: - type = "deb" - if comment == "": - comment == "Added by software-properties" - new_source = sources_list.add(type, uri, dist, comps, comment) + type = self.binary_type + new_source = self.sourceslist.add(type, uri, dist, comps, comment) # if source code is enabled add a deb-src line after the new # source - if self.get_source_code == True and not type.endswith("-src"): - sources_list.add("%s-src" % type, uri, dist, comps, comment, - file=new_source.file, - pos=sources_list.list.index(new_source)+1) + if self.get_source_code == True and tpye == self.binary_type: + self.sourceslist.add(self.source_type, uri, dist, comps, comment, + file=new_source.file, + pos=self.sourceslist.list.index(new_source)+1) - def enable_component(self, sourceslist, comp): + def enable_component(self, comp): """ Enable a component in all main, child and source code sources (excluding cdrom based sources) @@ -620,40 +590,41 @@ class Distribution: sources = [] sources.extend(self.main_sources) sources.extend(self.child_sources) - sources.extend(self.source_code_sources) # store what comps are enabled already per distro (where distro is # e.g. "dapper", "dapper-updates") comps_per_dist = {} + comps_per_sdist = {} for s in sources: - if s.type != "deb": - continue - if not comps_per_dist.has_key(s.dist): - comps_per_dist[s.dist] = set() - map(comps_per_dist[s.dist].add, s.comps) + if s.type == self.binary_type: + if not comps_per_dist.has_key(s.dist): + comps_per_dist[s.dist] = set() + map(comps_per_dist[s.dist].add, s.comps) + for s in self.source_code_sources: + if s.type == self.source_type: + if not comps_per_sdist.has_key(s.dist): + comps_per_sdist[s.dist] = set() + map(comps_per_sdist[s.dist].add, s.comps) + # check if there is a main source at all if len(self.main_sources) < 1: # create a new main source - self.add_source(sourceslist, comps=["%s"%comp]) + self.add_source(comps=["%s"%comp]) else: # add the comp to all main, child and source code sources for source in sources: add_component_only_once(source, comps_per_dist) - # now do the same for source dists + # check if there is a main source code source at all if self.get_source_code == True: - comps_per_dist = {} - for s in self.source_code_sources: - if s.type != "deb-src": - continue - if not comps_per_dist.has_key(s.dist): - comps_per_dist[s.dist] = set() - map(comps_per_dist[s.dist].add, s.comps) - for source in self.source_code_sources: - if comp not in source.comps: - add_component_only_once(source, comps_per_dist) - - - def disable_component(self, sourceslist, comp): + if len(self.source_code_sources) < 1: + # create a new main source + self.add_source(type=self.source_type, comps=["%s"%comp]) + else: + # add the comp to all main, child and source code sources + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + def disable_component(self, comp): """ Disable a component in all main, child and source code sources (excluding cdrom based sources) @@ -670,19 +641,180 @@ class Distribution: if comp in source.comps: source.comps.remove(comp) if len(source.comps) < 1: - sourceslist.remove(source) + self.sourceslist.remove(source) def change_server(self, uri): ''' Change the server of all distro specific sources to a given host ''' sources = [] + seen = [] sources.extend(self.main_sources) sources.extend(self.child_sources) sources.extend(self.source_code_sources) for source in sources: - # FIXME: ugly - if not "security.ubuntu.com" in source.uri: - source.uri = uri + # Avoid creating duplicate entries + source.uri = uri + for comp in source.comps: + if [source.uri, source.dist, comp] in seen: + source.comps.remove(comp) + else: + seen.append([source.uri, source.dist, comp]) + if len(source.comps) < 1: + self.sourceslist.remove(source) + + def is_codename(self, name): + ''' Compare a given name with the release codename. ''' + if name == self.codename: + return True + else: + return False + + def get_server_list(self): + ''' Return a list of used and suggested servers ''' + + # Store all available servers: + # Name, URI, active + mirrors = [] + + mirrors.append([_("Main server"), self.main_server, + len(self.used_servers) == 1 and + self.used_servers[0] == self.main_server]) + + if len(self.used_servers) == 1 and not re.match(self.used_servers[0], + self.main_server): + # Only one server is used + server = self.used_servers[0] + mirrors.append([server, server, True]) + elif len(self.used_servers) > 1: + # More than one server is used. Since we don't handle this case + # in the user interface we set "custom servers" to true and + # append a list of all used servers + mirrors.append([_("Custom servers"), None, True]) + for server in self.used_servers: + if not [server, server, False] in mirrors: + mirrors.append([server, server, False]) + + return mirrors + + if len(self.used_servers) == 1 and not is_single_server(self.main_server): + mirrors.append(["%s" % self.used_servers[0], + self.used_servers[0], True]) + elif len(self.used_servers) > 1 or not is_single_server(self.main_server): + mirrors.append([_("Custom servers"), None, True]) + + return mirrors + + +class DebianDistribution(Distribution): + ''' Class to support specific Debian features ''' + + def is_codename(self, name): + ''' Compare a given name with the release codename and check if + if it can be used as a synonym for a development releases ''' + if name == self.codename or self.release in ("testing", "unstable"): + return True + else: + return False + +class UbuntuDistribution(Distribution): + ''' Class to support specific Ubuntu features ''' + def get_mirrors(self): + Distribution.get_mirrors(self) + # get a list of country codes and real names + self.countries = {} + try: + f = open("/usr/share/iso-codes/iso_3166.tab", "r") + lines = f.readlines() + for line in lines: + parts = line.split("\t") + self.countries[parts[0].lower()] = parts[1] + except: + print "could not open file '%s'" % file + else: + f.close() + + # try to guess the nearest mirror from the locale + self.country = None + locale = os.getenv("LANG", default="en.UK") + a = locale.find("_") + z = locale.find(".") + if z == -1: + z = len(locale) + country_code = locale[a+1:z].lower() + self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \ + country_code + if self.countries.has_key(country_code): + self.country = self.countries[country_code] + + def get_server_list(self): + ''' Return a list of used and suggested servers ''' + + def get_mirror_name(server): + ''' Try to get a human readable name for the main mirror of a country''' + i = server.find("://") + l = server.find(".archive.ubuntu.com") + if i != -1 and l != -1: + country = server[i+len("://"):l] + if self.countries.has_key(country): + # TRANSLATORS: %s is a country + return _("Server for %s") % \ + gettext.dgettext("iso-3166", + self.countries[country].rstrip()).rstrip() + else: + return("%s" % server) + + # Store all available servers: + # Name, URI, active + mirrors = [] + if len(self.used_servers) == 1 and self.used_servers[0] == self.main_server: + mirrors.append([_("Main server"), self.main_server, True]) + mirrors.append([get_mirror_name(self.nearest_server), + self.nearest_server, False]) + elif len(self.used_servers) == 1 and not re.match(self.used_servers[0], + self.main_server): + mirrors.append([_("Main server"), self.main_server, False]) + # Only one server is used + server = self.used_servers[0] + + # Append the nearest server if it's not already used + if not re.match(server, self.nearest_server): + mirrors.append([get_mirror_name(self.nearest_server), + self.nearest_server, False]) + mirrors.append([get_mirror_name(server), server, True]) + + elif len(self.used_servers) > 1: + # More than one server is used. Since we don't handle this case + # in the user interface we set "custom servers" to true and + # append a list of all used servers + mirrors.append([_("Main server"), self.main_server, False]) + mirrors.append([get_mirror_name(self.nearest_server), + self.nearest_server, False]) + mirrors.append([_("Custom servers"), None, True]) + for server in self.used_servers: + if re.match(server, self.nearest_server): + continue + elif not [get_mirror_name(server), server, False] in mirrors: + mirrors.append([get_mirror_name(server), server, False]) + + return mirrors + +def get_distro(): + ''' Check the currently used distribution and return the corresponding + distriubtion class that supports distro specific features. ''' + lsb_info = [] + for lsb_option in ["-i", "-c", "-d", "-r"]: + pipe = os.popen("lsb_release %s -s" % lsb_option) + lsb_info.append(pipe.read().strip()) + del pipe + (id, codename, description, release) = lsb_info + if id == "Ubuntu": + return UbuntuDistribution(id, codename, description, + release) + elif id == "Debian": + return UbuntuDistribution(id, codename, description, + release) + else: + return Distribution(id, codename, description, relase) # some simple tests if __name__ == "__main__": |
