summaryrefslogtreecommitdiff
path: root/AptSources/aptsources.py
diff options
context:
space:
mode:
authorSebastian Heinlein <sebi@sebi-laptop>2006-12-04 06:13:09 +0100
committerSebastian Heinlein <sebi@sebi-laptop>2006-12-04 06:13:09 +0100
commit4ed8672b734132995f16ccb6fa99fe105dd56a48 (patch)
treef2782fea6b9832e9e485de2d00f8851757aa9bb5 /AptSources/aptsources.py
parent21a2e05e903ace1ba6ac977811074146973fb4f4 (diff)
parent359db816dd1d9aecce9800742fdc1ed45846ee7e (diff)
downloadpython-apt-4ed8672b734132995f16ccb6fa99fe105dd56a48.tar.gz
* merge with ubuntu
Diffstat (limited to 'AptSources/aptsources.py')
-rw-r--r--AptSources/aptsources.py316
1 files changed, 224 insertions, 92 deletions
diff --git a/AptSources/aptsources.py b/AptSources/aptsources.py
index e29114e1..45441cdd 100644
--- a/AptSources/aptsources.py
+++ b/AptSources/aptsources.py
@@ -32,7 +32,7 @@ import shutil
import time
import os.path
import sys
-
+from gettext import gettext as _
#import pdb
#from UpdateManager.Common.DistInfo import DistInfo
@@ -141,7 +141,7 @@ class SourceEntry:
self.disabled = True
pieces = string.split(line[1:])
# if it looks not like a disabled deb line return
- if not (pieces[0] == "deb" or pieces[0] == "deb-src"):
+ if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"):
self.invalid = True
return
else:
@@ -160,7 +160,7 @@ class SourceEntry:
# Type, deb or deb-src
self.type = string.strip(pieces[0])
# Sanity check
- if self.type not in ("deb", "deb-src"):
+ if self.type not in ("deb", "deb-src", "rpm", "rpm-src"):
self.invalid = True
return
# URI
@@ -306,7 +306,8 @@ class SourcesList:
" restore sources.list files based on the backup extension "
dir = apt_pkg.Config.FindDir("Dir::Etc")
file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
- if os.path.exists(dir+file+backup_ext):
+ if os.path.exists(dir+file+backup_ext) and \
+ os.path.exists(dir+file):
shutil.copy(dir+file+backup_ext,dir+file)
# now sources.list.d
partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
@@ -321,7 +322,7 @@ class SourcesList:
if backup_ext == None:
backup_ext = time.strftime("%y%m%d.%H%M")
for source in self.list:
- if not source.file in already_backuped:
+ if not source.file in already_backuped and os.path.exists(source.file):
shutil.copy(source.file,"%s%s" % (source.file,backup_ext))
return backup_ext
@@ -421,40 +422,24 @@ class SourceEntryMatcher:
return found
class Distribution:
- def __init__(self):
+ def __init__(self, id, codename, description, release):
""" Container for distribution specific informations """
# LSB information
- self.id = ""
- self.codename = ""
- self.description = ""
- self.release = ""
-
- # get the LSB information
- lsb_info = []
- for lsb_option in ["-i", "-c", "-d", "-r"]:
- pipe = os.popen("lsb_release %s -s" % lsb_option)
- lsb_info.append(pipe.read().strip())
- del pipe
- (self.id, self.codename, self.description, self.release) = lsb_info
+ self.id = id
+ self.codename = codename
+ self.description = description
+ self.release = release
- # get a list of country codes and real names
- self.countries = {}
- try:
- f = open("/usr/share/iso-codes/iso_3166.tab", "r")
- lines = f.readlines()
- for line in lines:
- parts = line.split("\t")
- self.countries[parts[0].lower()] = parts[1]
- except:
- print "could not open file '%s'" % file
- else:
- f.close()
+ self.binary_type = "deb"
+ self.source_type = "deb-src"
- def get_sources(self, sources_list):
+ def get_sources(self, sourceslist):
"""
Find the corresponding template, main and child sources
for the distribution
"""
+
+ self.sourceslist = sourceslist
# corresponding sources
self.source_template = None
self.child_sources = []
@@ -475,8 +460,8 @@ class Distribution:
self.used_servers = []
# find the distro template
- for template in sources_list.matcher.templates:
- if template.name == self.codename and\
+ for template in self.sourceslist.matcher.templates:
+ if self.is_codename(template.name) and\
template.distribution == self.id:
#print "yeah! found a template for %s" % self.description
#print template.description, template.base_uri, template.components
@@ -493,11 +478,11 @@ class Distribution:
cdrom_comps = []
enabled_comps = []
source_code = []
- for source in sources_list.list:
+ for source in self.sourceslist.list:
if source.invalid == False and\
- source.dist == self.codename and\
+ self.is_codename(source.dist) and\
source.template and\
- source.template.name == self.codename:
+ self.is_codename(source.template.name):
#print "yeah! found a distro repo: %s" % source.line
# cdroms need do be handled differently
if source.uri.startswith("cdrom:") and \
@@ -507,21 +492,23 @@ class Distribution:
elif source.uri.startswith("cdrom:") and \
source.disabled == True:
self.cdrom_sources.append(source)
- elif source.type == "deb" and source.disabled == False:
+ elif source.type == self.binary_type and \
+ source.disabled == False:
self.main_sources.append(source)
comps.extend(source.comps)
media.append(source.uri)
- elif source.type == "deb" and source.disabled == True:
+ elif source.type == self.binary_type and \
+ source.disabled == True:
self.disabled_sources.append(source)
- elif source.type.endswith("-src") and source.disabled == False:
+ elif source.type == self.source_type and source.disabled == False:
self.source_code_sources.append(source)
- elif source.type.endswith("-src") and source.disabled == True:
+ elif source.type == self.source_type and source.disabled == True:
self.disabled_sources.append(source)
if source.invalid == False and\
source.template in self.source_template.children:
- if source.disabled == False and source.type == "deb":
+ if source.disabled == False and source.type == self.binary_type:
self.child_sources.append(source)
- elif source.disabled == False and source.type == "deb-src":
+ elif source.disabled == False and source.type == self.source_type:
self.source_code_sources.append(source)
else:
self.disabled_sources.append(source)
@@ -541,21 +528,6 @@ class Distribution:
# the main server is stored in the template
self.main_server = self.source_template.base_uri
- # try to guess the nearest mirror from the locale
- # FIXME: for debian we need something different
- self.country = None
- if self.id == "Ubuntu":
- locale = os.getenv("LANG", default="en.UK")
- a = locale.find("_")
- z = locale.find(".")
- if z == -1:
- z = len(locale)
- country_code = locale[a+1:z].lower()
- self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \
- country_code
- if self.countries.has_key(country_code):
- self.country = self.countries[country_code]
-
# other used servers
for medium in self.used_media:
if not medium.startswith("cdrom:"):
@@ -567,7 +539,7 @@ class Distribution:
else:
self.default_server = self.main_sources[0].uri
- def add_source(self, sources_list, type=None,
+ def add_source(self, type=None,
uri=None, dist=None, comps=None, comment=""):
"""
Add distribution specific sources
@@ -580,18 +552,16 @@ class Distribution:
if comps == None:
comps = list(self.enabled_comps)
if type == None:
- type = "deb"
- if comment == "":
- comment == "Added by software-properties"
- new_source = sources_list.add(type, uri, dist, comps, comment)
+ type = self.binary_type
+ new_source = self.sourceslist.add(type, uri, dist, comps, comment)
# if source code is enabled add a deb-src line after the new
# source
- if self.get_source_code == True and not type.endswith("-src"):
- sources_list.add("%s-src" % type, uri, dist, comps, comment,
- file=new_source.file,
- pos=sources_list.list.index(new_source)+1)
+ if self.get_source_code == True and tpye == self.binary_type:
+ self.sourceslist.add(self.source_type, uri, dist, comps, comment,
+ file=new_source.file,
+ pos=self.sourceslist.list.index(new_source)+1)
- def enable_component(self, sourceslist, comp):
+ def enable_component(self, comp):
"""
Enable a component in all main, child and source code sources
(excluding cdrom based sources)
@@ -620,40 +590,41 @@ class Distribution:
sources = []
sources.extend(self.main_sources)
sources.extend(self.child_sources)
- sources.extend(self.source_code_sources)
# store what comps are enabled already per distro (where distro is
# e.g. "dapper", "dapper-updates")
comps_per_dist = {}
+ comps_per_sdist = {}
for s in sources:
- if s.type != "deb":
- continue
- if not comps_per_dist.has_key(s.dist):
- comps_per_dist[s.dist] = set()
- map(comps_per_dist[s.dist].add, s.comps)
+ if s.type == self.binary_type:
+ if not comps_per_dist.has_key(s.dist):
+ comps_per_dist[s.dist] = set()
+ map(comps_per_dist[s.dist].add, s.comps)
+ for s in self.source_code_sources:
+ if s.type == self.source_type:
+ if not comps_per_sdist.has_key(s.dist):
+ comps_per_sdist[s.dist] = set()
+ map(comps_per_sdist[s.dist].add, s.comps)
+
# check if there is a main source at all
if len(self.main_sources) < 1:
# create a new main source
- self.add_source(sourceslist, comps=["%s"%comp])
+ self.add_source(comps=["%s"%comp])
else:
# add the comp to all main, child and source code sources
for source in sources:
add_component_only_once(source, comps_per_dist)
- # now do the same for source dists
+ # check if there is a main source code source at all
if self.get_source_code == True:
- comps_per_dist = {}
- for s in self.source_code_sources:
- if s.type != "deb-src":
- continue
- if not comps_per_dist.has_key(s.dist):
- comps_per_dist[s.dist] = set()
- map(comps_per_dist[s.dist].add, s.comps)
- for source in self.source_code_sources:
- if comp not in source.comps:
- add_component_only_once(source, comps_per_dist)
-
-
- def disable_component(self, sourceslist, comp):
+ if len(self.source_code_sources) < 1:
+ # create a new main source
+ self.add_source(type=self.source_type, comps=["%s"%comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in self.source_code_sources:
+ add_component_only_once(source, comps_per_sdist)
+
+ def disable_component(self, comp):
"""
Disable a component in all main, child and source code sources
(excluding cdrom based sources)
@@ -670,19 +641,180 @@ class Distribution:
if comp in source.comps:
source.comps.remove(comp)
if len(source.comps) < 1:
- sourceslist.remove(source)
+ self.sourceslist.remove(source)
def change_server(self, uri):
''' Change the server of all distro specific sources to
a given host '''
sources = []
+ seen = []
sources.extend(self.main_sources)
sources.extend(self.child_sources)
sources.extend(self.source_code_sources)
for source in sources:
- # FIXME: ugly
- if not "security.ubuntu.com" in source.uri:
- source.uri = uri
+ # Avoid creating duplicate entries
+ source.uri = uri
+ for comp in source.comps:
+ if [source.uri, source.dist, comp] in seen:
+ source.comps.remove(comp)
+ else:
+ seen.append([source.uri, source.dist, comp])
+ if len(source.comps) < 1:
+ self.sourceslist.remove(source)
+
+ def is_codename(self, name):
+ ''' Compare a given name with the release codename. '''
+ if name == self.codename:
+ return True
+ else:
+ return False
+
+ def get_server_list(self):
+ ''' Return a list of used and suggested servers '''
+
+ # Store all available servers:
+ # Name, URI, active
+ mirrors = []
+
+ mirrors.append([_("Main server"), self.main_server,
+ len(self.used_servers) == 1 and
+ self.used_servers[0] == self.main_server])
+
+ if len(self.used_servers) == 1 and not re.match(self.used_servers[0],
+ self.main_server):
+ # Only one server is used
+ server = self.used_servers[0]
+ mirrors.append([server, server, True])
+ elif len(self.used_servers) > 1:
+ # More than one server is used. Since we don't handle this case
+ # in the user interface we set "custom servers" to true and
+ # append a list of all used servers
+ mirrors.append([_("Custom servers"), None, True])
+ for server in self.used_servers:
+ if not [server, server, False] in mirrors:
+ mirrors.append([server, server, False])
+
+ return mirrors
+
+ if len(self.used_servers) == 1 and not is_single_server(self.main_server):
+ mirrors.append(["%s" % self.used_servers[0],
+ self.used_servers[0], True])
+ elif len(self.used_servers) > 1 or not is_single_server(self.main_server):
+ mirrors.append([_("Custom servers"), None, True])
+
+ return mirrors
+
+
+class DebianDistribution(Distribution):
+ ''' Class to support specific Debian features '''
+
+ def is_codename(self, name):
+ ''' Compare a given name with the release codename and check if
+ if it can be used as a synonym for a development releases '''
+ if name == self.codename or self.release in ("testing", "unstable"):
+ return True
+ else:
+ return False
+
+class UbuntuDistribution(Distribution):
+ ''' Class to support specific Ubuntu features '''
+ def get_mirrors(self):
+ Distribution.get_mirrors(self)
+ # get a list of country codes and real names
+ self.countries = {}
+ try:
+ f = open("/usr/share/iso-codes/iso_3166.tab", "r")
+ lines = f.readlines()
+ for line in lines:
+ parts = line.split("\t")
+ self.countries[parts[0].lower()] = parts[1]
+ except:
+ print "could not open file '%s'" % file
+ else:
+ f.close()
+
+ # try to guess the nearest mirror from the locale
+ self.country = None
+ locale = os.getenv("LANG", default="en.UK")
+ a = locale.find("_")
+ z = locale.find(".")
+ if z == -1:
+ z = len(locale)
+ country_code = locale[a+1:z].lower()
+ self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \
+ country_code
+ if self.countries.has_key(country_code):
+ self.country = self.countries[country_code]
+
+ def get_server_list(self):
+ ''' Return a list of used and suggested servers '''
+
+ def get_mirror_name(server):
+ ''' Try to get a human readable name for the main mirror of a country'''
+ i = server.find("://")
+ l = server.find(".archive.ubuntu.com")
+ if i != -1 and l != -1:
+ country = server[i+len("://"):l]
+ if self.countries.has_key(country):
+ # TRANSLATORS: %s is a country
+ return _("Server for %s") % \
+ gettext.dgettext("iso-3166",
+ self.countries[country].rstrip()).rstrip()
+ else:
+ return("%s" % server)
+
+ # Store all available servers:
+ # Name, URI, active
+ mirrors = []
+ if len(self.used_servers) == 1 and self.used_servers[0] == self.main_server:
+ mirrors.append([_("Main server"), self.main_server, True])
+ mirrors.append([get_mirror_name(self.nearest_server),
+ self.nearest_server, False])
+ elif len(self.used_servers) == 1 and not re.match(self.used_servers[0],
+ self.main_server):
+ mirrors.append([_("Main server"), self.main_server, False])
+ # Only one server is used
+ server = self.used_servers[0]
+
+ # Append the nearest server if it's not already used
+ if not re.match(server, self.nearest_server):
+ mirrors.append([get_mirror_name(self.nearest_server),
+ self.nearest_server, False])
+ mirrors.append([get_mirror_name(server), server, True])
+
+ elif len(self.used_servers) > 1:
+ # More than one server is used. Since we don't handle this case
+ # in the user interface we set "custom servers" to true and
+ # append a list of all used servers
+ mirrors.append([_("Main server"), self.main_server, False])
+ mirrors.append([get_mirror_name(self.nearest_server),
+ self.nearest_server, False])
+ mirrors.append([_("Custom servers"), None, True])
+ for server in self.used_servers:
+ if re.match(server, self.nearest_server):
+ continue
+ elif not [get_mirror_name(server), server, False] in mirrors:
+ mirrors.append([get_mirror_name(server), server, False])
+
+ return mirrors
+
+def get_distro():
+ ''' Check the currently used distribution and return the corresponding
+ distriubtion class that supports distro specific features. '''
+ lsb_info = []
+ for lsb_option in ["-i", "-c", "-d", "-r"]:
+ pipe = os.popen("lsb_release %s -s" % lsb_option)
+ lsb_info.append(pipe.read().strip())
+ del pipe
+ (id, codename, description, release) = lsb_info
+ if id == "Ubuntu":
+ return UbuntuDistribution(id, codename, description,
+ release)
+ elif id == "Debian":
+ return UbuntuDistribution(id, codename, description,
+ release)
+ else:
+ return Distribution(id, codename, description, relase)
# some simple tests
if __name__ == "__main__":