diff options
author | rillig <rillig@pkgsrc.org> | 2022-09-04 21:38:47 +0000 |
---|---|---|
committer | rillig <rillig@pkgsrc.org> | 2022-09-04 21:38:47 +0000 |
commit | ce00b94e6a6b99f2a8b2317c71ddc39a7f8ea077 (patch) | |
tree | b2467d39baf146a318431929604300caf6e116d4 /pkgtools/distlint/files/distlint.py | |
parent | f2d2c701f3b3926ef381764408dd4cafdcb3aa26 (diff) | |
download | pkgsrc-ce00b94e6a6b99f2a8b2317c71ddc39a7f8ea077.tar.gz |
pkgtools/distlint: add early prototype implementation
Do not enable the package yet as it is still under active development
and lacking proper documentation.
Diffstat (limited to 'pkgtools/distlint/files/distlint.py')
-rw-r--r-- | pkgtools/distlint/files/distlint.py | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/pkgtools/distlint/files/distlint.py b/pkgtools/distlint/files/distlint.py new file mode 100644 index 00000000000..bacb93dccd3 --- /dev/null +++ b/pkgtools/distlint/files/distlint.py @@ -0,0 +1,259 @@ +#! /usr/bin/env python +# $NetBSD: distlint.py,v 1.1 2022/09/04 21:38:48 rillig Exp $ + +import argparse +import os +import re +from pathlib import Path +from typing import NamedTuple, List, Optional, Set, Dict +from logging import debug + + +pkg_info = '@PREFIX@/sbin/pkg_info' + + +class DistinfoEntry(NamedTuple): + """ + One distfile from a distinfo file, including its hashes. + """ + distfile: Path + hashes: Dict[str, str] + + loc_pkgsrc_dir: Path + loc_distinfo: Path + + +class BinaryPackage(NamedTuple): + """ + A file containing a binary package for a particular platform. + """ + + file: Path + pkgname: str + build_info: Dict[str, str] + distinfo_entries: List[DistinfoEntry] + + def load_build_info(self) -> Dict[str, str]: + if len(self.build_info) > 0: + return self.build_info + + slash_file = str(self.file).replace('\\', '/') + for line in os.popen(f'{pkg_info} -q -B "{slash_file}"').readlines(): + line = line.rstrip() + if line == '': + continue + key, value = line.split('=', 1) + self.build_info[key] = value + + return self.build_info + + def no_src_on_ftp(self) -> str: + return self.load_build_info().get('NO_SRC_ON_FTP', '') + + def license(self) -> str: + return self.load_build_info().get('LICENSE', '') + + def load_distinfo_entries(self, pkgsrc_dirs: List[Path]) -> List[DistinfoEntry]: + if len(self.distinfo_entries) > 0: + return self.distinfo_entries + + distinfo_files = [] + for line in os.popen(f'{pkg_info} -q -b "{self.file}"').readlines(): + m = re.match(r'(?x) ([^\s:]+): \t \$NetBSD: .*?,v \s+ ([0-9.]+)', line) + if m and f'./{m.group(1)}'.endswith('/distinfo'): + distinfo_files.append(Path(m.group(1))) + + #print(f'package {self.pkgname} has distinfo files: {distinfo_files}') + + for distinfo_file in distinfo_files: + for pkgsrc_dir in pkgsrc_dirs: + distinfo = pkgsrc_dir / distinfo_file + #print(f'distinfo {distinfo}') + if distinfo.exists(): + #print(f'exists') + for distinfo_entry in Distdir.load_distinfo(pkgsrc_dir, distinfo): + #print(f'entry {distinfo_entry}') + self.distinfo_entries.append(distinfo_entry) + + return self.distinfo_entries + + +class Distdir(NamedTuple): + """ + A directory containing distribution files, such as /pub/pkgsrc/distfiles. + + Each such directory is connected to one or more directories containing + binary packages, such as: + /pub/pkgsrc/packages/NetBSD/amd64/2022Q4/All + /pub/pkgsrc/packages/Linux/sparc/HEAD/All + + Each such directory is connected to one or more pkgsrc trees, such as: + /usr/pkgsrc-trunk + /usr/pkgsrc-2022Q4 + """ + + distdir: Path + database: Path + packages_dirs: List[Path] + pkgsrc_dirs: List[Path] + + def validate(self): + distfiles = self.load_distfiles() + distinfo_entries = {} # XXX: self.load_pkgsrc() + binary_packages = self.load_binary_packages() + + for distfile in distfiles: + debug('distfile %s', distfile) + for binary_package in binary_packages: + debug('%s', binary_package) + for distinfo_entry in sorted(distinfo_entries.values(), key=lambda e: e[0].distfile): + debug('%s', distinfo_entry) + + self.validate_no_src_on_ftp(distfiles, binary_packages) + + print("TODO: validate that GPL files are still there even after removing the binary package") + + def validate_no_src_on_ftp(self, distfiles: List[Path], binary_packages: List[BinaryPackage]): + for binary_package in binary_packages: + for distinfo_entry in binary_package.load_distinfo_entries(self.pkgsrc_dirs): + if binary_package.no_src_on_ftp() != '': + if (self.distdir / distinfo_entry.distfile).exists(): + print(f"error: '{distinfo_entry.distfile}' must not be in '{self.distdir}' because the package '{binary_package.pkgname}' has NO_SRC_ON_FTP '{binary_package.no_src_on_ftp()}'.") + + def load_distfiles(self): + distfiles: Set[Path] = set() + for root, dirs, files in os.walk(self.distdir): + for file in files: + distfiles.add(Path(root, file).relative_to(self.distdir)) + return sorted(distfiles) + + def load_binary_packages(self) -> List[BinaryPackage]: + binary_package_files: Set[Path] = set() + for packages_dir in self.packages_dirs: + for root, _, files in os.walk(packages_dir): + for file in files: + if file.endswith('.tgz'): + binary_package_files.add(Path(root, file)) + + binary_packages = [] + for binary_package_file in binary_package_files: + binary_package = self.load_binary_package(binary_package_file) + if binary_package: + binary_packages.append(binary_package) + + return sorted(binary_packages, key=lambda p: p.pkgname) + + @staticmethod + def load_binary_package(file: Path) -> Optional[BinaryPackage]: + return BinaryPackage(file, file.name[:-len('.tgz')], {}, []) + + @staticmethod + def list_distinfo_files(pkgsrc_dir: Path): + for root, _, files in os.walk(pkgsrc_dir): + for file in files: + if file == 'distinfo': + yield Path(root, file).relative_to(pkgsrc_dir) + + def load_pkgsrc(self) -> Dict[Path, List[DistinfoEntry]]: + info: Dict[Path, List[DistinfoEntry]] = {} + + for pkgsrc_dir in self.pkgsrc_dirs: + for file in self.list_distinfo_files(pkgsrc_dir): + for entry in self.load_distinfo(pkgsrc_dir, file): + if entry.distfile not in info: + info[entry.distfile] = list() + info[entry.distfile].append(entry) + + return info + + @staticmethod + def load_distinfo(pkgsrc_dir: Path, distinfo: Path) -> List[DistinfoEntry]: + by_path: Dict[Path, DistinfoEntry] = {} + + with open(Path(pkgsrc_dir, distinfo), 'r') as f: + for line in f: + m = re.match(r'(?x) (\S+) \s \( (\S+) \) \s = \s (\S+)', line) + if m: + algorithm, path, value = m.groups() + path = Path(path) + if path not in by_path: + by_path[path] = DistinfoEntry(path, {}, pkgsrc_dir, distinfo) + by_path[path].hashes[algorithm] = value + return list(by_path.values()) + + +def load_configuration(file: str) -> List[Distdir]: + distdirs: List[Distdir] = [] + + with open(file) as f: + distdir: Optional[Path] = None + database: Optional[Path] = None + packages_dirs = [] + pkgsrc_dirs = [] + + lineno = 0 + for line in f: + line = line.strip() + lineno += 1 + + if line == '' or line.startswith('#'): + continue + + words = line.split() + if len(words) != 2: + raise ValueError(f'{file}:{lineno}: invalid line') + cmd, arg = words + + if cmd == 'distdir': + if distdir is not None: + distdirs.append(Distdir(distdir, database, packages_dirs, pkgsrc_dirs)) + database = None + packages_dirs = [] + pkgsrc_dirs = [] + distdir = Path(words[1]) + if not distdir.exists(): + raise ValueError(f"{file}:{lineno}: '{distdir}' must be a directory") + continue + + if distdir is None: + raise ValueError(f"{file}:{lineno}: '{words[0]}' requires a preceding 'distdir'") + + if cmd == 'database': + database = Path(arg) + elif cmd == 'packages': + packages = Path(arg) + if not packages.exists(): + raise ValueError(f"{file}:{lineno}: '{packages}' must be a directory") + packages_dirs.append(packages) + elif cmd == 'pkgsrc': + pkgsrc = Path(arg) + if not pkgsrc.exists(): + raise ValueError(f"{file}:{lineno}: '{pkgsrc}' must be a directory") + pkgsrc_dirs.append(pkgsrc) + else: + raise ValueError(f'{file}:{lineno}: {line}') + + if distdir is not None: + distdirs.append(Distdir(distdir, database, packages_dirs, pkgsrc_dirs)) + + return distdirs + + +def main() -> None: + p = argparse.ArgumentParser(description='Validate distfiles directory') + p.add_argument('file', nargs='+', help='configuration file') + args = p.parse_args() + + distdirs: List[Distdir] = [] + for file in args.file: + try: + distdirs.extend(load_configuration(file)) + except ValueError as e: + raise SystemExit(f'error: {e.args[0]}') + + for distdir in distdirs: + distdir.validate() + + +if __name__ == '__main__': + main() |