diff options
Diffstat (limited to 'SoftwareProperties/aptsources.py')
| -rw-r--r-- | SoftwareProperties/aptsources.py | 550 |
1 files changed, 309 insertions, 241 deletions
diff --git a/SoftwareProperties/aptsources.py b/SoftwareProperties/aptsources.py index 5da04b23..40b55dfa 100644 --- a/SoftwareProperties/aptsources.py +++ b/SoftwareProperties/aptsources.py @@ -30,6 +30,8 @@ import shutil import time import os.path +#import pdb + from UpdateManager.Common.DistInfo import DistInfo @@ -64,12 +66,8 @@ def is_mirror(master_uri, compare_uri): return False def uniq(s): - """ simple (and not efficient) way to return uniq list """ - u = [] - for x in s: - if x not in u: - u.append(x) - return u + """ simple and efficient way to return uniq list """ + return list(set(s)) @@ -89,6 +87,8 @@ class SourceEntry: file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist") self.file = file self.parse(line) + self.template = None + self.children = [] # works mostely like split but takes [] into account def mysplit(self, line): @@ -207,6 +207,7 @@ class SourceEntry: class SourcesList: def __init__(self): self.list = [] # of Type SourceEntries + self.matcher = SourceEntryMatcher() self.refresh() def refresh(self): @@ -219,29 +220,51 @@ class SourcesList: partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") for file in glob.glob("%s/*.list" % partsdir): self.load(file) + # check if the source item fits a predefined template + for source in self.list: + if source.invalid == False: + self.matcher.match(source) def __iter__(self): for entry in self.list: yield entry raise StopIteration - def add(self, type, uri, dist, comps, comment="", pos=-1): - # if there is a repo with the same (type, uri, dist) just add the - # components - for i in self.list: - if i.type == type and is_mirror(uri,i.uri) and i.dist == dist: - comps = uniq(i.comps + comps) - # set to the old position and preserve comment - comment = i.comment - pos = self.list.index(i) - self.list.remove(i) + def add(self, type, uri, dist, comps, comment="", pos=-1, file=None): + """ + Add a new source to the sources.list. + The method will search for existing matching repos and will try to + reuse them as far as possible + """ + for source in self.list: + # if there is a repo with the same (type, uri, dist) just add the + # components + if source.disabled == False and source.invalid == False and \ + source.type == type and uri == source.uri and \ + source.dist == dist: + comps = uniq(source.comps + comps) + source.comps = comps + return source + # if there is a corresponding repo which is disabled, enable it + elif source.disabled == True and source.invalid == False and \ + source.type == type and uri == source.uri and \ + source.dist == dist and \ + len(set(source.comps) & set(comps)) == len(comps): + source.disabled = False + return source + # there isn't any matching source, so create a new line and parse it line = "%s %s %s" % (type,uri,dist) for c in comps: line = line + " " + c; if comment != "": line = "%s #%s\n" %(line,comment) line = line + "\n" - self.list.insert(pos, SourceEntry(line)) + new_entry = SourceEntry(line) + if file != None: + new_entry.file = file + self.matcher.match(new_entry) + self.list.insert(pos, new_entry) + return new_entry def remove(self, source_entry): self.list.remove(source_entry) @@ -271,12 +294,16 @@ class SourcesList: def load(self,file): """ (re)load the current sources """ - f = open(file, "r") - lines = f.readlines() - for line in lines: - source = SourceEntry(line,file) - self.list.append(source) - f.close() + try: + f = open(file, "r") + lines = f.readlines() + for line in lines: + source = SourceEntry(line,file) + self.list.append(source) + except: + print "could not open file '%s'" % file + else: + f.close() def save(self): """ save the current sources """ @@ -288,6 +315,30 @@ class SourcesList: for f in files: files[f].close() + def check_for_relations(self, sources_list): + """get all parent and child channels in the sources list""" + parents = [] + used_child_templates = {} + for source in sources_list: + # try to avoid checking uninterressting sources + if source.template == None: + continue + # set up a dict with all used child templates and corresponding + # source entries + if source.template.child == True: + key = source.template + if not used_child_templates.has_key(key): + used_child_templates[key] = [] + temp = used_child_templates[key] + temp.append(source) + else: + # store each source with children aka. a parent :) + if len(source.template.children) > 0: + parents.append(source) + #print self.used_child_templates + #print self.parents + return (parents, used_child_templates) + # templates for the add dialog class SourceEntryTemplate(SourceEntry): def __init__(self,a_type,uri,dist,description,comps): @@ -354,227 +405,244 @@ class SourceEntryMatcher: self.comps_descriptions = l_comps_descr def __init__(self): + self.templates = [] + # Get the human readable channel and comp names from the channel .infos + spec_files = glob.glob("/usr/share/update-manager/channels/*.info") + for f in spec_files: + f = os.path.basename(f) + i = f.find(".info") + f = f[0:i] + dist = DistInfo(f) + for suite in dist.suites: + if suite.match_uri != None: + self.templates.append(suite) + return + + def match(self, source): + """Add a matching template to the source""" _ = gettext.gettext - self.type_list = [] - self.type_list.append(self.MatchType("^deb$",_("Binary"))) - self.type_list.append(self.MatchType("^deb-src$",_("Source"))) - - self.dist_list = [] - - ubuntu_comps = ["^main$","^restricted$","^universe$","^multiverse$"] - ubuntu_comps_descr = [_("Officially supported"), - _("Restricted copyright"), - _("Community maintained (Universe)"), - _("Non-free (Multiverse)")] - # CDs - self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*6.06", - ".*", - _("CD disk with Ubuntu 6.06 LTS"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*5.10", - ".*", - _("CD disk with Ubuntu 5.10"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*5.04", - ".*", - _("CD disk with Ubuntu 5.04"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*4.10", - ".*", - _("CD disk with Ubuntu 4.10"), - ubuntu_comps, ubuntu_comps_descr)) - # URIs - # Warty - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^warty$", - "Ubuntu 4.10", - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu", - "^warty-security$", - _("Ubuntu 4.10 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^warty-security$", - _("Ubuntu 4.10 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^warty-updates$", - _("Ubuntu 4.10 Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^warty-backports$", - _("Ubuntu 4.10 Backports"), - ubuntu_comps, ubuntu_comps_descr)) - # Hoary - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^hoary-security$", - _("Ubuntu 5.04 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu", - "^hoary-security$", - _("Ubuntu 5.04 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^hoary$", - "Ubuntu 5.04", - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^hoary-updates$", - _("Ubuntu 5.04 Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^hoary-backports$", - _("Ubuntu 5.04 Backports"), - ubuntu_comps, ubuntu_comps_descr)) - # Breezy - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^breezy-security$", - _("Ubuntu 5.10 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu", - "^breezy-security$", - _("Ubuntu 5.10 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^breezy$", - "Ubuntu 5.10", - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^breezy-backports$", - _("Ubuntu 5.10 Backports"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^breezy-updates$", - _("Ubuntu 5.10 Updates"), - ubuntu_comps, ubuntu_comps_descr)) - # dapper - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^dapper-security$", - _("Ubuntu 6.06 LTS Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu", - "^dapper-security$", - _("Ubuntu 6.06 LTS Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^dapper$", - "Ubuntu 6.06 LTS", - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^dapper-backports$", - _("Ubuntu 6.06 LTS Backports"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^dapper-updates$", - _("Ubuntu 6.06 LTS Updates"), - ubuntu_comps, ubuntu_comps_descr)) - # edgy - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^edgy-security$", - _("Ubuntu 6.10 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu", - "^edgy-security$", - _("Ubuntu 6.10 Security Updates"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^edgy$", - "Ubuntu 6.10", - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^edgy-backports$", - _("Ubuntu 6.10 Backports"), - ubuntu_comps, ubuntu_comps_descr)) - self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu", - "^edgy-updates$", - _("Ubuntu 6.10 Updates"), - ubuntu_comps, ubuntu_comps_descr)) - - - # DEBIAN - debian_comps = ["^main$","^contrib$","^non-free$","^non-US$"] - debian_comps_descr = [_("Officially supported"), - _("Contributed software"), - _("Non-free software"), - _("US export restricted software") - ] - - # dists by name - self.dist_list.append(self.MatchDist(".*debian.org/debian", - "^sarge$", - _("Debian 3.1 \"Sarge\""), - debian_comps, debian_comps_descr)) - self.dist_list.append(self.MatchDist(".*debian.org/debian", - "^woody$", - _("Debian 3.0 \"Woody\""), - debian_comps, debian_comps_descr)) - # securtiy - self.dist_list.append(self.MatchDist(".*security.debian.org", - "^stable.*$", - _("Debian Stable Security Updates"), - debian_comps, debian_comps_descr)) - # dists by status - self.dist_list.append(self.MatchDist(".*debian.org/debian", - "^stable$", - _("Debian Stable"), - debian_comps, debian_comps_descr)) - self.dist_list.append(self.MatchDist(".*debian.org/debian", - "^testing$", - _("Debian Testing"), - debian_comps, debian_comps_descr)) - self.dist_list.append(self.MatchDist(".*debian.org/debian", - "^unstable$", - _("Debian Unstable \"Sid\""), - debian_comps, debian_comps_descr)) - - # non-us - self.dist_list.append(self.MatchDist(".*debian.org/debian-non-US", - "^stable.*$", - _("Debian Non-US (Stable)"), - debian_comps, debian_comps_descr)) - self.dist_list.append(self.MatchDist(".*debian.org/debian-non-US", - "^testing.*$", - _("Debian Non-US (Testing)"), - debian_comps, debian_comps_descr)) - self.dist_list.append(self.MatchDist(".*debian.org/debian-non-US", - "^unstable.*$", - _("Debian Non-US (Unstable)"), - debian_comps, debian_comps_descr)) - - - - - def match(self,source): - _ = gettext.gettext - # some sane defaults first - type_description = source.type - dist_description = source.uri + " " + source.dist - comp_description = "" - for c in source.comps: - comp_description = comp_description + " " + c - - for t in self.type_list: - if re.match(t.type, source.type): - type_description = _(t.description) - break - - for d in self.dist_list: + found = False + for template in self.templates: #print "'%s'" %source.uri - if re.match(d.uri, source.uri) and re.match(d.dist,source.dist): - dist_description = d.description - comp_description = "" - for c in source.comps: - found = False - for i in range(len(d.comps)): - if re.match(d.comps[i], c): - comp_description = comp_description+"\n"+d.comps_descriptions[i] - found = True - if found == False: - comp_description = comp_description+" "+c + if re.search(template.match_uri, source.uri) and \ + re.match(template.match_name, source.dist): + found = True + source.template = template break - - - return (type_description,dist_description,comp_description) + return found + +class Distribution: + def __init__(self): + """" + Container for distribution specific informations + """ + # LSB information + self.id = "" + self.codename = "" + self.description = "" + self.release = "" + + # get the LSB information + lsb_info = [] + for lsb_option in ["-i", "-c", "-d", "-r"]: + pipe = os.popen("lsb_release %s | cut -d : -f 2-" % lsb_option) + lsb_info.append(pipe.read().strip()) + del pipe + (self.id, self.codename, self.description, self.release) = lsb_info + + # get a list of country codes and real names + self.countries = {} + try: + f = open("/usr/share/iso-codes/iso_3166.tab", "r") + lines = f.readlines() + for line in lines: + parts = line.split("\t") + self.countries[parts[0].lower()] = parts[1] + except: + print "could not open file '%s'" % file + else: + f.close() + + def get_sources(self, sources_list): + """ + Find the corresponding template, main and child sources + for the distribution + """ + # corresponding sources + self.source_template = None + self.child_sources = [] + self.main_sources = [] + self.disabled_sources = [] + self.cdrom_sources = [] + self.download_comps = [] + self.enabled_comps = [] + self.cdrom_comps = [] + self.used_media = [] + self.get_source_code = False + self.source_code_sources = [] + + # location of the sources + self.default_server = "" + self.main_server = "" + self.nearest_server = "" + self.used_servers = [] + + # find the distro template + for template in sources_list.matcher.templates: + if template.name == self.codename and\ + template.distribution == self.id: + #print "yeah! found a template for %s" % self.description + #print template.description, template.base_uri, template.components + self.source_template = template + break + if self.source_template == None: + print "Error: could not find a distribution template" + # FIXME: will go away - only for debugging issues + sys.exit(1) + + # find main and child sources + media = [] + comps = [] + cdrom_comps = [] + enabled_comps = [] + source_code = [] + for source in sources_list.list: + if source.invalid == False and\ + source.dist == self.codename and\ + source.template and\ + source.template.name == self.codename: + #print "yeah! found a distro repo: %s" % source.line + # cdroms need do be handled differently + if source.uri.startswith("cdrom:") and \ + source.disabled == False: + self.cdrom_sources.append(source) + cdrom_comps.extend(source.comps) + elif source.uri.startswith("cdrom:") and \ + source.disabled == True: + self.cdrom_sources.append(source) + elif source.type == "deb" and source.disabled == False: + self.main_sources.append(source) + comps.extend(source.comps) + media.append(source.uri) + elif source.type == "deb" and source.disabled == True: + self.disabled_sources.append(source) + elif source.type.endswith("-src") and source.disabled == False: + self.source_code_sources.append(source) + elif source.type.endswith("-src") and source.disabled == True: + self.disabled_sources.append(source) + if source.template in self.source_template.children: + #print "yeah! child found: %s" % source.template.name + if source.type == "deb": + self.child_sources.append(source) + elif source.type == "deb-src": + self.source_code_sources.append(source) + self.download_comps = set(comps) + self.cdrom_comps = set(cdrom_comps) + enabled_comps.extend(comps) + enabled_comps.extend(cdrom_comps) + self.enabled_comps = set(enabled_comps) + self.used_media = set(media) + + self.get_mirrors() + + def get_mirrors(self): + """ + Provide a set of mirrors where you can get the distribution from + """ + # the main server is stored in the template + self.main_server = self.source_template.base_uri + + # try to guess the nearest mirror from the locale + # FIXME: for debian we need something different + if self.id == "Ubuntu": + locale = os.getenv("LANG", default="en.UK") + a = locale.find("_") + z = locale.find(".") + if z == -1: + z = len(locale) + country_code = locale[a+1:z].lower() + self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \ + country_code + self.country = self.countries[country_code] + + # other used servers + for medium in self.used_media: + if not medium.startswith("cdrom:"): + # seems to be a network source + self.used_servers.append(medium) + + if len(self.main_sources) == 0: + self.default_server = self.main_server + else: + self.default_server = self.main_sources[0].uri + + def add_source(self, sources_list, type=None, + uri=None, dist=None, comps=None, comment=""): + """ + Add distribution specific sources + """ + if uri == None: + # FIXME: Add support for the server selector + uri = self.default_server + if dist == None: + dist = self.codename + if comps == None: + comps = list(self.enabled_comps) + if type == None: + type = "deb" + if comment == "": + comment == "Added by software-properties" + new_source = sources_list.add(type, uri, dist, comps, comment) + # if source code is enabled add a deb-src line after the new + # source + if self.get_source_code == True and not type.endswith("-src"): + sources_list.add("%s-src" % type, uri, dist, comps, comment, + file=new_source.file, + pos=sources_list.list.index(new_source)+1) + + def enable_component(self, sourceslist, comp): + """ + Disable a component in all main, child and source code sources + (excluding cdrom based sources) + + sourceslist: an aptsource.sources_list + comp: the component that should be enabled + """ + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + sources.extend(self.source_code_sources) + # check if there is a main source at all + if len(self.main_sources) < 1: + # create a new main source + self.add_source(sourceslist, comps=["%s"%comp]) + else: + # add the comp to all main, child and source code sources + for source in sources: + if comp not in source.comps: + source.comps.append(comp) + if self.get_source_code == True: + for source in self.source_code_sources: + if comp not in source.comps: source.comps.append(comp) + + def disable_component(self, sourceslist, comp): + """ + Disable a component in all main, child and source code sources + (excluding cdrom based sources) + """ + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + sources.extend(self.source_code_sources) + if comp in self.cdrom_comps: + sources = [] + sources.extend(self.main_sources) + + for source in sources: + if comp in source.comps: + source.comps.remove(comp) + if len(source.comps) < 1: + sourceslist.remove(source) # some simple tests |
