# aptsource.py.in - parse sources.list # # Copyright (c) 2004-2006 Canonical # 2004 Michiel Sikkes # 2006 Sebastian Heinlein # # Author: Michiel Sikkes # Michael Vogt # Sebastian Heinlein # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import string import gettext import re import apt_pkg import glob import shutil import time import os.path import sys #import pdb #from UpdateManager.Common.DistInfo import DistInfo from DistInfo import DistInfo # some global helpers def is_mirror(master_uri, compare_uri): """check if the given add_url is idential or a mirror of orig_uri e.g. master_uri = archive.ubuntu.com compare_uri = de.archive.ubuntu.com -> True """ # remove traling spaces and "/" compare_uri = compare_uri.rstrip("/ ") master_uri = master_uri.rstrip("/ ") # uri is identical if compare_uri == master_uri: #print "Identical" return True # add uri is a master site and orig_uri has the from "XX.mastersite" # (e.g. de.archive.ubuntu.com) try: compare_srv = compare_uri.split("//")[1] master_srv = master_uri.split("//")[1] #print "%s == %s " % (add_srv, orig_srv) except IndexError: # ok, somethings wrong here #print "IndexError" return False # remove the leading "." (if any) and see if that helps if "." in compare_srv and \ compare_srv[compare_srv.index(".")+1:] == master_srv: #print "Mirror" return True return False def uniq(s): """ simple and efficient way to return uniq list """ return list(set(s)) class SourceEntry: """ single sources.list entry """ def __init__(self, line,file=None): self.invalid = False # is the source entry valid self.disabled = False # is it disabled ('#' in front) self.type = "" # what type (deb, deb-src) self.uri = "" # base-uri self.dist = "" # distribution (dapper, edgy, etc) self.comps = [] # list of available componetns (may empty) self.comment = "" # (optional) comment self.line = line # the original sources.list line if file == None: file = apt_pkg.Config.FindDir("Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist") self.file = file # the file that the entry is located in self.parse(line) self.template = None # type DistInfo.Suite self.children = [] def __eq__(self, other): """ equal operator for two sources.list entries """ return (self.disabled == other.disabled and self.type == other.type and self.uri == other.uri and self.dist == other.dist and self.comps == other.comps) def mysplit(self, line): """ a split() implementation that understands the sources.list format better and takes [] into account (for e.g. cdroms) """ line = string.strip(line) pieces = [] tmp = "" # we are inside a [..] block p_found = False space_found = False for i in range(len(line)): if line[i] == "[": p_found=True tmp += line[i] elif line[i] == "]": p_found=False tmp += line[i] elif space_found and not line[i].isspace(): # we skip one or more space space_found = False pieces.append(tmp) tmp = line[i] elif line[i].isspace() and not p_found: # found a whitespace space_found = True else: tmp += line[i] # append last piece if len(tmp) > 0: pieces.append(tmp) return pieces def parse(self,line): """ parse a given sources.list (textual) line and break it up into the field we have """ line = string.strip(self.line) #print line # check if the source is enabled/disabled if line == "" or line == "#": # empty line self.invalid = True return if line[0] == "#": self.disabled = True pieces = string.split(line[1:]) # if it looks not like a disabled deb line return if not (pieces[0] == "deb" or pieces[0] == "deb-src"): self.invalid = True return else: line = line[1:] # check for another "#" in the line (this is treated as a comment) i = line.find("#") if i > 0: self.comment = line[i+1:] line = line[:i] # source is ok, split it and see what we have pieces = self.mysplit(line) # Sanity check if len(pieces) < 3: self.invalid = True return # Type, deb or deb-src self.type = string.strip(pieces[0]) # Sanity check if self.type not in ("deb", "deb-src"): self.invalid = True return # URI self.uri = string.strip(pieces[1]) if len(self.uri) < 1: self.invalid = True # distro and components (optional) # Directory or distro self.dist = string.strip(pieces[2]) if len(pieces) > 3: # List of components self.comps = pieces[3:] else: self.comps = [] def set_enabled(self, new_value): """ set a line to enabled or disabled """ self.disabled = not new_value # enable, remove all "#" from the start of the line if new_value == True: i=0 self.line = string.lstrip(self.line) while self.line[i] == "#": i += 1 self.line = self.line[i:] else: # disabled, add a "#" if string.strip(self.line)[0] != "#": self.line = "#" + self.line def __str__(self): """ debug helper """ return self.str().strip() def str(self): """ return the current line as string """ if self.invalid: return self.line line = "" if self.disabled: line = "# " line += "%s %s %s" % (self.type, self.uri, self.dist) if len(self.comps) > 0: line += " " + " ".join(self.comps) if self.comment != "": line += " #"+self.comment line += "\n" return line class NullMatcher(object): """ a Matcher that does nothing """ def match(self, s): return True class SourcesList: """ represents the full sources.list + sources.list.d file """ def __init__(self, withMatcher=True, matcherPath="/usr/share/update-manager/channels/"): self.list = [] # the actual SourceEntries Type if withMatcher: self.matcher = SourceEntryMatcher(matcherPath) else: self.matcher = NullMatcher() self.refresh() def refresh(self): """ update the list of known entries """ self.list = [] # read sources.list dir = apt_pkg.Config.FindDir("Dir::Etc") file = apt_pkg.Config.Find("Dir::Etc::sourcelist") self.load(dir+file) # read sources.list.d partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") for file in glob.glob("%s/*.list" % partsdir): self.load(file) # check if the source item fits a predefined template for source in self.list: if source.invalid == False: self.matcher.match(source) def __iter__(self): """ simple iterator to go over self.list, returns SourceEntry types """ for entry in self.list: yield entry raise StopIteration def add(self, type, uri, dist, comps, comment="", pos=-1, file=None): """ Add a new source to the sources.list. The method will search for existing matching repos and will try to reuse them as far as possible """ # check if we have this source already in the sources.list for source in self.list: if source.disabled == False and source.invalid == False and \ source.type == type and uri == source.uri and \ source.dist == dist: for new_comp in comps: if new_comp in source.comps: # we have this component already, delete it from the new_comps # list del comps[comps.index(new_comp)] if len(comps) == 0: return source for source in self.list: # if there is a repo with the same (type, uri, dist) just add the # components if source.disabled == False and source.invalid == False and \ source.type == type and uri == source.uri and \ source.dist == dist: comps = uniq(source.comps + comps) source.comps = comps return source # if there is a corresponding repo which is disabled, enable it elif source.disabled == True and source.invalid == False and \ source.type == type and uri == source.uri and \ source.dist == dist and \ len(set(source.comps) & set(comps)) == len(comps): source.disabled = False return source # there isn't any matching source, so create a new line and parse it line = "%s %s %s" % (type,uri,dist) for c in comps: line = line + " " + c; if comment != "": line = "%s #%s\n" %(line,comment) line = line + "\n" new_entry = SourceEntry(line) if file != None: new_entry.file = file self.matcher.match(new_entry) self.list.insert(pos, new_entry) return new_entry def remove(self, source_entry): """ remove the specified entry from the sources.list """ self.list.remove(source_entry) def restoreBackup(self, backup_ext): " restore sources.list files based on the backup extension " dir = apt_pkg.Config.FindDir("Dir::Etc") file = apt_pkg.Config.Find("Dir::Etc::sourcelist") if os.path.exists(dir+file+backup_ext): shutil.copy(dir+file+backup_ext,dir+file) # now sources.list.d partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts") for file in glob.glob("%s/*.list" % partsdir): if os.path.exists(file+backup_ext): shutil.copy(file+backup_ext,file) def backup(self, backup_ext=None): """ make a backup of the current source files, if no backup extension is given, the current date/time is used (and returned) """ already_backuped = set() if backup_ext == None: backup_ext = time.strftime("%y%m%d.%H%M") for source in self.list: if not source.file in already_backuped: shutil.copy(source.file,"%s%s" % (source.file,backup_ext)) return backup_ext def load(self,file): """ (re)load the current sources """ try: f = open(file, "r") lines = f.readlines() for line in lines: source = SourceEntry(line,file) self.list.append(source) except: print "could not open file '%s'" % file else: f.close() def save(self): """ save the current sources """ files = {} for source in self.list: if not files.has_key(source.file): files[source.file]=open(source.file,"w") files[source.file].write(source.str()) for f in files: files[f].close() def check_for_relations(self, sources_list): """get all parent and child channels in the sources list""" parents = [] used_child_templates = {} for source in sources_list: # try to avoid checking uninterressting sources if source.template == None: continue # set up a dict with all used child templates and corresponding # source entries if source.template.child == True: key = source.template if not used_child_templates.has_key(key): used_child_templates[key] = [] temp = used_child_templates[key] temp.append(source) else: # store each source with children aka. a parent :) if len(source.template.children) > 0: parents.append(source) #print self.used_child_templates #print self.parents return (parents, used_child_templates) # matcher class to make a source entry look nice # lots of predefined matchers to make it i18n/gettext friendly class SourceEntryMatcher: class MatchType: def __init__(self, a_type,a_descr): self.type = a_type self.description = a_descr class MatchDist: def __init__(self,a_uri,a_dist, a_descr,l_comps, l_comps_descr): self.uri = a_uri self.dist = a_dist self.description = a_descr self.comps = l_comps self.comps_descriptions = l_comps_descr def __init__(self, matcherPath): self.templates = [] # Get the human readable channel and comp names from the channel .infos spec_files = glob.glob("%s/*.info" % matcherPath) for f in spec_files: f = os.path.basename(f) i = f.find(".info") f = f[0:i] dist = DistInfo(f,base_dir=matcherPath) for suite in dist.suites: if suite.match_uri != None: self.templates.append(suite) return def match(self, source): """Add a matching template to the source""" _ = gettext.gettext found = False for template in self.templates: if (re.search(template.match_uri, source.uri) and re.match(template.match_name, source.dist)): found = True source.template = template break for mirror in template.valid_mirrors: if (is_mirror(mirror,source.uri) and re.match(template.match_name, source.dist)): found = True source.template = template break return found class Distribution: def __init__(self): """ Container for distribution specific informations """ # LSB information self.id = "" self.codename = "" self.description = "" self.release = "" # get the LSB information lsb_info = [] for lsb_option in ["-i", "-c", "-d", "-r"]: pipe = os.popen("lsb_release %s -s" % lsb_option) lsb_info.append(pipe.read().strip()) del pipe (self.id, self.codename, self.description, self.release) = lsb_info # get a list of country codes and real names self.countries = {} try: f = open("/usr/share/iso-codes/iso_3166.tab", "r") lines = f.readlines() for line in lines: parts = line.split("\t") self.countries[parts[0].lower()] = parts[1] except: print "could not open file '%s'" % file else: f.close() def get_sources(self, sources_list): """ Find the corresponding template, main and child sources for the distribution """ # corresponding sources self.source_template = None self.child_sources = [] self.main_sources = [] self.disabled_sources = [] self.cdrom_sources = [] self.download_comps = [] self.enabled_comps = [] self.cdrom_comps = [] self.used_media = [] self.get_source_code = False self.source_code_sources = [] # location of the sources self.default_server = "" self.main_server = "" self.nearest_server = "" self.used_servers = [] # find the distro template for template in sources_list.matcher.templates: if template.name == self.codename and\ template.distribution == self.id: #print "yeah! found a template for %s" % self.description #print template.description, template.base_uri, template.components self.source_template = template break if self.source_template == None: print "Error: could not find a distribution template" # FIXME: will go away - only for debugging issues sys.exit(1) # find main and child sources media = [] comps = [] cdrom_comps = [] enabled_comps = [] source_code = [] for source in sources_list.list: if source.invalid == False and\ source.dist == self.codename and\ source.template and\ source.template.name == self.codename: #print "yeah! found a distro repo: %s" % source.line # cdroms need do be handled differently if source.uri.startswith("cdrom:") and \ source.disabled == False: self.cdrom_sources.append(source) cdrom_comps.extend(source.comps) elif source.uri.startswith("cdrom:") and \ source.disabled == True: self.cdrom_sources.append(source) elif source.type == "deb" and source.disabled == False: self.main_sources.append(source) comps.extend(source.comps) media.append(source.uri) elif source.type == "deb" and source.disabled == True: self.disabled_sources.append(source) elif source.type.endswith("-src") and source.disabled == False: self.source_code_sources.append(source) elif source.type.endswith("-src") and source.disabled == True: self.disabled_sources.append(source) if source.invalid == False and\ source.template in self.source_template.children: if source.disabled == False and source.type == "deb": self.child_sources.append(source) elif source.disabled == False and source.type == "deb-src": self.source_code_sources.append(source) else: self.disabled_sources.append(source) self.download_comps = set(comps) self.cdrom_comps = set(cdrom_comps) enabled_comps.extend(comps) enabled_comps.extend(cdrom_comps) self.enabled_comps = set(enabled_comps) self.used_media = set(media) self.get_mirrors() def get_mirrors(self): """ Provide a set of mirrors where you can get the distribution from """ # the main server is stored in the template self.main_server = self.source_template.base_uri # try to guess the nearest mirror from the locale # FIXME: for debian we need something different if self.id == "Ubuntu": locale = os.getenv("LANG", default="en.UK") a = locale.find("_") z = locale.find(".") if z == -1: z = len(locale) country_code = locale[a+1:z].lower() self.nearest_server = "http://%s.archive.ubuntu.com/ubuntu/" % \ country_code if self.countries.has_key(country_code): self.country = self.countries[country_code] else: self.country = None # other used servers for medium in self.used_media: if not medium.startswith("cdrom:"): # seems to be a network source self.used_servers.append(medium) if len(self.main_sources) == 0: self.default_server = self.main_server else: self.default_server = self.main_sources[0].uri def add_source(self, sources_list, type=None, uri=None, dist=None, comps=None, comment=""): """ Add distribution specific sources """ if uri == None: # FIXME: Add support for the server selector uri = self.default_server if dist == None: dist = self.codename if comps == None: comps = list(self.enabled_comps) if type == None: type = "deb" if comment == "": comment == "Added by software-properties" new_source = sources_list.add(type, uri, dist, comps, comment) # if source code is enabled add a deb-src line after the new # source if self.get_source_code == True and not type.endswith("-src"): sources_list.add("%s-src" % type, uri, dist, comps, comment, file=new_source.file, pos=sources_list.list.index(new_source)+1) def enable_component(self, sourceslist, comp): """ Enable a component in all main, child and source code sources (excluding cdrom based sources) sourceslist: an aptsource.sources_list comp: the component that should be enabled """ def add_component_only_once(source, comps_per_dist): """ Check if we already added the component to the repository, since a repository could be splitted into different apt lines. If not add the component """ if comp in comps_per_dist[source.dist]: return source.comps.append(comp) comps_per_dist[source.dist].add(comp) sources = [] sources.extend(self.main_sources) sources.extend(self.child_sources) sources.extend(self.source_code_sources) # store what comps are enabled already per distro (where distro is # e.g. "dapper", "dapper-updates") comps_per_dist = {} for s in sources: if s.type != "deb": continue if not comps_per_dist.has_key(s.dist): comps_per_dist[s.dist] = set() map(comps_per_dist[s.dist].add, s.comps) # check if there is a main source at all if len(self.main_sources) < 1: # create a new main source self.add_source(sourceslist, comps=["%s"%comp]) else: # add the comp to all main, child and source code sources for source in sources: add_component_only_once(source, comps_per_dist) # now do the same for source dists if self.get_source_code == True: comps_per_dist = {} for s in self.source_code_sources: if s.type != "deb-src": continue if not comps_per_dist.has_key(s.dist): comps_per_dist[s.dist] = set() map(comps_per_dist[s.dist].add, s.comps) for source in self.source_code_sources: if comp not in source.comps: add_component_only_once(source, comps_per_dist) def disable_component(self, sourceslist, comp): """ Disable a component in all main, child and source code sources (excluding cdrom based sources) """ sources = [] sources.extend(self.main_sources) sources.extend(self.child_sources) sources.extend(self.source_code_sources) if comp in self.cdrom_comps: sources = [] sources.extend(self.main_sources) for source in sources: if comp in source.comps: source.comps.remove(comp) if len(source.comps) < 1: sourceslist.remove(source) # some simple tests if __name__ == "__main__": apt_pkg.InitConfig() sources = SourcesList() for entry in sources: print entry.str() #print entry.uri mirror = is_mirror("http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu/") print "is_mirror(): %s" % mirror print is_mirror("http://archive.ubuntu.com/ubuntu", "http://de.archive.ubuntu.com/ubuntu/") print is_mirror("http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu")