summaryrefslogtreecommitdiff
path: root/SoftwareProperties/aptsources.py
diff options
context:
space:
mode:
Diffstat (limited to 'SoftwareProperties/aptsources.py')
-rw-r--r--SoftwareProperties/aptsources.py550
1 files changed, 309 insertions, 241 deletions
diff --git a/SoftwareProperties/aptsources.py b/SoftwareProperties/aptsources.py
index 5da04b23..40b55dfa 100644
--- a/SoftwareProperties/aptsources.py
+++ b/SoftwareProperties/aptsources.py
@@ -30,6 +30,8 @@ import shutil
import time
import os.path
+#import pdb
+
from UpdateManager.Common.DistInfo import DistInfo
@@ -64,12 +66,8 @@ def is_mirror(master_uri, compare_uri):
return False
def uniq(s):
- """ simple (and not efficient) way to return uniq list """
- u = []
- for x in s:
- if x not in u:
- u.append(x)
- return u
+ """ simple and efficient way to return uniq list """
+ return list(set(s))
@@ -89,6 +87,8 @@ class SourceEntry:
file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist")
self.file = file
self.parse(line)
+ self.template = None
+ self.children = []
# works mostely like split but takes [] into account
def mysplit(self, line):
@@ -207,6 +207,7 @@ class SourceEntry:
class SourcesList:
def __init__(self):
self.list = [] # of Type SourceEntries
+ self.matcher = SourceEntryMatcher()
self.refresh()
def refresh(self):
@@ -219,29 +220,51 @@ class SourcesList:
partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
for file in glob.glob("%s/*.list" % partsdir):
self.load(file)
+ # check if the source item fits a predefined template
+ for source in self.list:
+ if source.invalid == False:
+ self.matcher.match(source)
def __iter__(self):
for entry in self.list:
yield entry
raise StopIteration
- def add(self, type, uri, dist, comps, comment="", pos=-1):
- # if there is a repo with the same (type, uri, dist) just add the
- # components
- for i in self.list:
- if i.type == type and is_mirror(uri,i.uri) and i.dist == dist:
- comps = uniq(i.comps + comps)
- # set to the old position and preserve comment
- comment = i.comment
- pos = self.list.index(i)
- self.list.remove(i)
+ def add(self, type, uri, dist, comps, comment="", pos=-1, file=None):
+ """
+ Add a new source to the sources.list.
+ The method will search for existing matching repos and will try to
+ reuse them as far as possible
+ """
+ for source in self.list:
+ # if there is a repo with the same (type, uri, dist) just add the
+ # components
+ if source.disabled == False and source.invalid == False and \
+ source.type == type and uri == source.uri and \
+ source.dist == dist:
+ comps = uniq(source.comps + comps)
+ source.comps = comps
+ return source
+ # if there is a corresponding repo which is disabled, enable it
+ elif source.disabled == True and source.invalid == False and \
+ source.type == type and uri == source.uri and \
+ source.dist == dist and \
+ len(set(source.comps) & set(comps)) == len(comps):
+ source.disabled = False
+ return source
+ # there isn't any matching source, so create a new line and parse it
line = "%s %s %s" % (type,uri,dist)
for c in comps:
line = line + " " + c;
if comment != "":
line = "%s #%s\n" %(line,comment)
line = line + "\n"
- self.list.insert(pos, SourceEntry(line))
+ new_entry = SourceEntry(line)
+ if file != None:
+ new_entry.file = file
+ self.matcher.match(new_entry)
+ self.list.insert(pos, new_entry)
+ return new_entry
def remove(self, source_entry):
self.list.remove(source_entry)
@@ -271,12 +294,16 @@ class SourcesList:
def load(self,file):
""" (re)load the current sources """
- f = open(file, "r")
- lines = f.readlines()
- for line in lines:
- source = SourceEntry(line,file)
- self.list.append(source)
- f.close()
+ try:
+ f = open(file, "r")
+ lines = f.readlines()
+ for line in lines:
+ source = SourceEntry(line,file)
+ self.list.append(source)
+ except:
+ print "could not open file '%s'" % file
+ else:
+ f.close()
def save(self):
""" save the current sources """
@@ -288,6 +315,30 @@ class SourcesList:
for f in files:
files[f].close()
+ def check_for_relations(self, sources_list):
+ """get all parent and child channels in the sources list"""
+ parents = []
+ used_child_templates = {}
+ for source in sources_list:
+ # try to avoid checking uninterressting sources
+ if source.template == None:
+ continue
+ # set up a dict with all used child templates and corresponding
+ # source entries
+ if source.template.child == True:
+ key = source.template
+ if not used_child_templates.has_key(key):
+ used_child_templates[key] = []
+ temp = used_child_templates[key]
+ temp.append(source)
+ else:
+ # store each source with children aka. a parent :)
+ if len(source.template.children) > 0:
+ parents.append(source)
+ #print self.used_child_templates
+ #print self.parents
+ return (parents, used_child_templates)
+
# templates for the add dialog
class SourceEntryTemplate(SourceEntry):
def __init__(self,a_type,uri,dist,description,comps):
@@ -354,227 +405,244 @@ class SourceEntryMatcher:
self.comps_descriptions = l_comps_descr
def __init__(self):
+ self.templates = []
+ # Get the human readable channel and comp names from the channel .infos
+ spec_files = glob.glob("/usr/share/update-manager/channels/*.info")
+ for f in spec_files:
+ f = os.path.basename(f)
+ i = f.find(".info")
+ f = f[0:i]
+ dist = DistInfo(f)
+ for suite in dist.suites:
+ if suite.match_uri != None:
+ self.templates.append(suite)
+ return
+
+ def match(self, source):
+ """Add a matching template to the source"""
_ = gettext.gettext
- self.type_list = []
- self.type_list.append(self.MatchType("^deb$",_("Binary")))
- self.type_list.append(self.MatchType("^deb-src$",_("Source")))
-
- self.dist_list = []
-
- ubuntu_comps = ["^main$","^restricted$","^universe$","^multiverse$"]
- ubuntu_comps_descr = [_("Officially supported"),
- _("Restricted copyright"),
- _("Community maintained (Universe)"),
- _("Non-free (Multiverse)")]
- # CDs
- self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*6.06",
- ".*",
- _("CD disk with Ubuntu 6.06 LTS"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*5.10",
- ".*",
- _("CD disk with Ubuntu 5.10"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*5.04",
- ".*",
- _("CD disk with Ubuntu 5.04"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist("cdrom:\[Ubuntu.*4.10",
- ".*",
- _("CD disk with Ubuntu 4.10"),
- ubuntu_comps, ubuntu_comps_descr))
- # URIs
- # Warty
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^warty$",
- "Ubuntu 4.10",
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu",
- "^warty-security$",
- _("Ubuntu 4.10 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^warty-security$",
- _("Ubuntu 4.10 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^warty-updates$",
- _("Ubuntu 4.10 Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^warty-backports$",
- _("Ubuntu 4.10 Backports"),
- ubuntu_comps, ubuntu_comps_descr))
- # Hoary
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^hoary-security$",
- _("Ubuntu 5.04 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu",
- "^hoary-security$",
- _("Ubuntu 5.04 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^hoary$",
- "Ubuntu 5.04",
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^hoary-updates$",
- _("Ubuntu 5.04 Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^hoary-backports$",
- _("Ubuntu 5.04 Backports"),
- ubuntu_comps, ubuntu_comps_descr))
- # Breezy
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^breezy-security$",
- _("Ubuntu 5.10 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu",
- "^breezy-security$",
- _("Ubuntu 5.10 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^breezy$",
- "Ubuntu 5.10",
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^breezy-backports$",
- _("Ubuntu 5.10 Backports"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^breezy-updates$",
- _("Ubuntu 5.10 Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- # dapper
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^dapper-security$",
- _("Ubuntu 6.06 LTS Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu",
- "^dapper-security$",
- _("Ubuntu 6.06 LTS Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^dapper$",
- "Ubuntu 6.06 LTS",
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^dapper-backports$",
- _("Ubuntu 6.06 LTS Backports"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^dapper-updates$",
- _("Ubuntu 6.06 LTS Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- # edgy
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^edgy-security$",
- _("Ubuntu 6.10 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*security.ubuntu.com/ubuntu",
- "^edgy-security$",
- _("Ubuntu 6.10 Security Updates"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^edgy$",
- "Ubuntu 6.10",
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^edgy-backports$",
- _("Ubuntu 6.10 Backports"),
- ubuntu_comps, ubuntu_comps_descr))
- self.dist_list.append(self.MatchDist(".*archive.ubuntu.com/ubuntu",
- "^edgy-updates$",
- _("Ubuntu 6.10 Updates"),
- ubuntu_comps, ubuntu_comps_descr))
-
-
- # DEBIAN
- debian_comps = ["^main$","^contrib$","^non-free$","^non-US$"]
- debian_comps_descr = [_("Officially supported"),
- _("Contributed software"),
- _("Non-free software"),
- _("US export restricted software")
- ]
-
- # dists by name
- self.dist_list.append(self.MatchDist(".*debian.org/debian",
- "^sarge$",
- _("Debian 3.1 \"Sarge\""),
- debian_comps, debian_comps_descr))
- self.dist_list.append(self.MatchDist(".*debian.org/debian",
- "^woody$",
- _("Debian 3.0 \"Woody\""),
- debian_comps, debian_comps_descr))
- # securtiy
- self.dist_list.append(self.MatchDist(".*security.debian.org",
- "^stable.*$",
- _("Debian Stable Security Updates"),
- debian_comps, debian_comps_descr))
- # dists by status
- self.dist_list.append(self.MatchDist(".*debian.org/debian",
- "^stable$",
- _("Debian Stable"),
- debian_comps, debian_comps_descr))
- self.dist_list.append(self.MatchDist(".*debian.org/debian",
- "^testing$",
- _("Debian Testing"),
- debian_comps, debian_comps_descr))
- self.dist_list.append(self.MatchDist(".*debian.org/debian",
- "^unstable$",
- _("Debian Unstable \"Sid\""),
- debian_comps, debian_comps_descr))
-
- # non-us
- self.dist_list.append(self.MatchDist(".*debian.org/debian-non-US",
- "^stable.*$",
- _("Debian Non-US (Stable)"),
- debian_comps, debian_comps_descr))
- self.dist_list.append(self.MatchDist(".*debian.org/debian-non-US",
- "^testing.*$",
- _("Debian Non-US (Testing)"),
- debian_comps, debian_comps_descr))
- self.dist_list.append(self.MatchDist(".*debian.org/debian-non-US",
- "^unstable.*$",
- _("Debian Non-US (Unstable)"),
- debian_comps, debian_comps_descr))
-
-
-
-
- def match(self,source):
- _ = gettext.gettext
- # some sane defaults first
- type_description = source.type
- dist_description = source.uri + " " + source.dist
- comp_description = ""
- for c in source.comps:
- comp_description = comp_description + " " + c
-
- for t in self.type_list:
- if re.match(t.type, source.type):
- type_description = _(t.description)
- break
-
- for d in self.dist_list:
+ found = False
+ for template in self.templates:
#print "'%s'" %source.uri
- if re.match(d.uri, source.uri) and re.match(d.dist,source.dist):
- dist_description = d.description
- comp_description = ""
- for c in source.comps:
- found = False
- for i in range(len(d.comps)):
- if re.match(d.comps[i], c):
- comp_description = comp_description+"\n"+d.comps_descriptions[i]
- found = True
- if found == False:
- comp_description = comp_description+" "+c
+ if re.search(template.match_uri, source.uri) and \
+ re.match(template.match_name, source.dist):
+ found = True
+ source.template = template
break
-
-
- return (type_description,dist_description,comp_description)
+ return found
+
+class Distribution:
+ def __init__(self):
+ """"
+ Container for distribution specific informations
+ """
+ # LSB information
+ self.id = ""
+ self.codename = ""
+ self.description = ""
+ self.release = ""
+
+ # get the LSB information
+ lsb_info = []
+ for lsb_option in ["-i", "-c", "-d", "-r"]:
+ pipe = os.popen("lsb_release %s | cut -d : -f 2-" % lsb_option)
+ lsb_info.append(pipe.read().strip())
+ del pipe
+ (self.id, self.codename, self.description, self.release) = lsb_info
+
+ # get a list of country codes and real names
+ self.countries = {}
+ try:
+ f = open("/usr/share/iso-codes/iso_3166.tab", "r")
+ lines = f.readlines()
+ for line in lines:
+ parts = line.split("\t")
+ self.countries[parts[0].lower()] = parts[1]
+ except:
+ print "could not open file '%s'" % file
+ else:
+ f.close()
+
+ def get_sources(self, sources_list):
+ """
+ Find the corresponding template, main and child sources
+ for the distribution
+ """
+ # corresponding sources
+ self.source_template = None
+ self.child_sources = []
+ self.main_sources = []
+ self.disabled_sources = []
+ self.cdrom_sources = []
+ self.download_comps = []
+ self.enabled_comps = []
+ self.cdrom_comps = []
+ self.used_media = []
+ self.get_source_code = False
+ self.source_code_sources = []
+
+ # location of the sources
+ self.default_server = ""
+ self.main_server = ""
+ self.nearest_server = ""
+ self.used_servers = []
+
+ # find the distro template
+ for template in sources_list.matcher.templates:
+ if template.name == self.codename and\
+ template.distribution == self.id:
+ #print "yeah! found a template for %s" % self.description
+ #print template.description, template.base_uri, template.components
+ self.source_template = template
+ break
+ if self.source_template == None:
+ print "Error: could not find a distribution template"
+ # FIXME: will go away - only for debugging issues
+ sys.exit(1)
+
+ # find main and child sources
+ media = []
+ comps = []
+ cdrom_comps = []
+ enabled_comps = []
+ source_code = []
+ for source in sources_list.list:
+ if source.invalid == False and\
+ source.dist == self.codename and\
+ source.template and\
+ source.template.name == self.codename:
+ #print "yeah! found a distro repo: %s" % source.line
+ # cdroms need do be handled differently
+ if source.uri.startswith("cdrom:") and \
+ source.disabled == False:
+ self.cdrom_sources.append(source)
+ cdrom_comps.extend(source.comps)
+ elif source.uri.startswith("cdrom:") and \
+ source.disabled == True:
+ self.cdrom_sources.append(source)
+ elif source.type == "deb" and source.disabled == False:
+ self.main_sources.append(source)
+ comps.extend(source.comps)
+ media.append(source.uri)
+ elif source.type == "deb" and source.disabled == True:
+ self.disabled_sources.append(source)
+ elif source.type.endswith("-src") and source.disabled == False:
+ self.source_code_sources.append(source)
+ elif source.type.endswith("-src") and source.disabled == True:
+ self.disabled_sources.append(source)
+ if source.template in self.source_template.children:
+ #print "yeah! child found: %s" % source.template.name
+ if source.type == "deb":
+ self.child_sources.append(source)
+ elif source.type == "deb-src":
+ self.source_code_sources.append(source)
+ self.download_comps = set(comps)
+ self.cdrom_comps = set(cdrom_comps)
+ enabled_comps.extend(comps)
+ enabled_comps.extend(cdrom_comps)
+ self.enabled_comps = set(enabled_comps)
+ self.used_media = set(media)
+
+ self.get_mirrors()
+
+ def get_mirrors(self):
+ """
+ Provide a set of mirrors where you can get the distribution from
+ """
+ # the main server is stored in the template
+ self.main_server = self.source_template.base_uri
+
+ # try to guess the nearest mirror from the locale
+ # FIXME: for debian we need something different
+ if self.id == "Ubuntu":
+ locale = os.getenv("LANG", default="en.UK")
+ a = locale.find("_")
+ z = locale.find(".")
+ if z == -1:
+ z = len(locale)
+ country_code = locale[a+1:z].lower()
+ self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \
+ country_code
+ self.country = self.countries[country_code]
+
+ # other used servers
+ for medium in self.used_media:
+ if not medium.startswith("cdrom:"):
+ # seems to be a network source
+ self.used_servers.append(medium)
+
+ if len(self.main_sources) == 0:
+ self.default_server = self.main_server
+ else:
+ self.default_server = self.main_sources[0].uri
+
+ def add_source(self, sources_list, type=None,
+ uri=None, dist=None, comps=None, comment=""):
+ """
+ Add distribution specific sources
+ """
+ if uri == None:
+ # FIXME: Add support for the server selector
+ uri = self.default_server
+ if dist == None:
+ dist = self.codename
+ if comps == None:
+ comps = list(self.enabled_comps)
+ if type == None:
+ type = "deb"
+ if comment == "":
+ comment == "Added by software-properties"
+ new_source = sources_list.add(type, uri, dist, comps, comment)
+ # if source code is enabled add a deb-src line after the new
+ # source
+ if self.get_source_code == True and not type.endswith("-src"):
+ sources_list.add("%s-src" % type, uri, dist, comps, comment,
+ file=new_source.file,
+ pos=sources_list.list.index(new_source)+1)
+
+ def enable_component(self, sourceslist, comp):
+ """
+ Disable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+
+ sourceslist: an aptsource.sources_list
+ comp: the component that should be enabled
+ """
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ sources.extend(self.source_code_sources)
+ # check if there is a main source at all
+ if len(self.main_sources) < 1:
+ # create a new main source
+ self.add_source(sourceslist, comps=["%s"%comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in sources:
+ if comp not in source.comps:
+ source.comps.append(comp)
+ if self.get_source_code == True:
+ for source in self.source_code_sources:
+ if comp not in source.comps: source.comps.append(comp)
+
+ def disable_component(self, sourceslist, comp):
+ """
+ Disable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+ """
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ sources.extend(self.source_code_sources)
+ if comp in self.cdrom_comps:
+ sources = []
+ sources.extend(self.main_sources)
+
+ for source in sources:
+ if comp in source.comps:
+ source.comps.remove(comp)
+ if len(source.comps) < 1:
+ sourceslist.remove(source)
# some simple tests