summaryrefslogtreecommitdiff
path: root/pkgtools/distlint/files/distlint.py
diff options
context:
space:
mode:
Diffstat (limited to 'pkgtools/distlint/files/distlint.py')
-rw-r--r--pkgtools/distlint/files/distlint.py259
1 files changed, 259 insertions, 0 deletions
diff --git a/pkgtools/distlint/files/distlint.py b/pkgtools/distlint/files/distlint.py
new file mode 100644
index 00000000000..bacb93dccd3
--- /dev/null
+++ b/pkgtools/distlint/files/distlint.py
@@ -0,0 +1,259 @@
+#! /usr/bin/env python
+# $NetBSD: distlint.py,v 1.1 2022/09/04 21:38:48 rillig Exp $
+
+import argparse
+import os
+import re
+from pathlib import Path
+from typing import NamedTuple, List, Optional, Set, Dict
+from logging import debug
+
+
+pkg_info = '@PREFIX@/sbin/pkg_info'
+
+
+class DistinfoEntry(NamedTuple):
+ """
+ One distfile from a distinfo file, including its hashes.
+ """
+ distfile: Path
+ hashes: Dict[str, str]
+
+ loc_pkgsrc_dir: Path
+ loc_distinfo: Path
+
+
+class BinaryPackage(NamedTuple):
+ """
+ A file containing a binary package for a particular platform.
+ """
+
+ file: Path
+ pkgname: str
+ build_info: Dict[str, str]
+ distinfo_entries: List[DistinfoEntry]
+
+ def load_build_info(self) -> Dict[str, str]:
+ if len(self.build_info) > 0:
+ return self.build_info
+
+ slash_file = str(self.file).replace('\\', '/')
+ for line in os.popen(f'{pkg_info} -q -B "{slash_file}"').readlines():
+ line = line.rstrip()
+ if line == '':
+ continue
+ key, value = line.split('=', 1)
+ self.build_info[key] = value
+
+ return self.build_info
+
+ def no_src_on_ftp(self) -> str:
+ return self.load_build_info().get('NO_SRC_ON_FTP', '')
+
+ def license(self) -> str:
+ return self.load_build_info().get('LICENSE', '')
+
+ def load_distinfo_entries(self, pkgsrc_dirs: List[Path]) -> List[DistinfoEntry]:
+ if len(self.distinfo_entries) > 0:
+ return self.distinfo_entries
+
+ distinfo_files = []
+ for line in os.popen(f'{pkg_info} -q -b "{self.file}"').readlines():
+ m = re.match(r'(?x) ([^\s:]+): \t \$NetBSD: .*?,v \s+ ([0-9.]+)', line)
+ if m and f'./{m.group(1)}'.endswith('/distinfo'):
+ distinfo_files.append(Path(m.group(1)))
+
+ #print(f'package {self.pkgname} has distinfo files: {distinfo_files}')
+
+ for distinfo_file in distinfo_files:
+ for pkgsrc_dir in pkgsrc_dirs:
+ distinfo = pkgsrc_dir / distinfo_file
+ #print(f'distinfo {distinfo}')
+ if distinfo.exists():
+ #print(f'exists')
+ for distinfo_entry in Distdir.load_distinfo(pkgsrc_dir, distinfo):
+ #print(f'entry {distinfo_entry}')
+ self.distinfo_entries.append(distinfo_entry)
+
+ return self.distinfo_entries
+
+
+class Distdir(NamedTuple):
+ """
+ A directory containing distribution files, such as /pub/pkgsrc/distfiles.
+
+ Each such directory is connected to one or more directories containing
+ binary packages, such as:
+ /pub/pkgsrc/packages/NetBSD/amd64/2022Q4/All
+ /pub/pkgsrc/packages/Linux/sparc/HEAD/All
+
+ Each such directory is connected to one or more pkgsrc trees, such as:
+ /usr/pkgsrc-trunk
+ /usr/pkgsrc-2022Q4
+ """
+
+ distdir: Path
+ database: Path
+ packages_dirs: List[Path]
+ pkgsrc_dirs: List[Path]
+
+ def validate(self):
+ distfiles = self.load_distfiles()
+ distinfo_entries = {} # XXX: self.load_pkgsrc()
+ binary_packages = self.load_binary_packages()
+
+ for distfile in distfiles:
+ debug('distfile %s', distfile)
+ for binary_package in binary_packages:
+ debug('%s', binary_package)
+ for distinfo_entry in sorted(distinfo_entries.values(), key=lambda e: e[0].distfile):
+ debug('%s', distinfo_entry)
+
+ self.validate_no_src_on_ftp(distfiles, binary_packages)
+
+ print("TODO: validate that GPL files are still there even after removing the binary package")
+
+ def validate_no_src_on_ftp(self, distfiles: List[Path], binary_packages: List[BinaryPackage]):
+ for binary_package in binary_packages:
+ for distinfo_entry in binary_package.load_distinfo_entries(self.pkgsrc_dirs):
+ if binary_package.no_src_on_ftp() != '':
+ if (self.distdir / distinfo_entry.distfile).exists():
+ print(f"error: '{distinfo_entry.distfile}' must not be in '{self.distdir}' because the package '{binary_package.pkgname}' has NO_SRC_ON_FTP '{binary_package.no_src_on_ftp()}'.")
+
+ def load_distfiles(self):
+ distfiles: Set[Path] = set()
+ for root, dirs, files in os.walk(self.distdir):
+ for file in files:
+ distfiles.add(Path(root, file).relative_to(self.distdir))
+ return sorted(distfiles)
+
+ def load_binary_packages(self) -> List[BinaryPackage]:
+ binary_package_files: Set[Path] = set()
+ for packages_dir in self.packages_dirs:
+ for root, _, files in os.walk(packages_dir):
+ for file in files:
+ if file.endswith('.tgz'):
+ binary_package_files.add(Path(root, file))
+
+ binary_packages = []
+ for binary_package_file in binary_package_files:
+ binary_package = self.load_binary_package(binary_package_file)
+ if binary_package:
+ binary_packages.append(binary_package)
+
+ return sorted(binary_packages, key=lambda p: p.pkgname)
+
+ @staticmethod
+ def load_binary_package(file: Path) -> Optional[BinaryPackage]:
+ return BinaryPackage(file, file.name[:-len('.tgz')], {}, [])
+
+ @staticmethod
+ def list_distinfo_files(pkgsrc_dir: Path):
+ for root, _, files in os.walk(pkgsrc_dir):
+ for file in files:
+ if file == 'distinfo':
+ yield Path(root, file).relative_to(pkgsrc_dir)
+
+ def load_pkgsrc(self) -> Dict[Path, List[DistinfoEntry]]:
+ info: Dict[Path, List[DistinfoEntry]] = {}
+
+ for pkgsrc_dir in self.pkgsrc_dirs:
+ for file in self.list_distinfo_files(pkgsrc_dir):
+ for entry in self.load_distinfo(pkgsrc_dir, file):
+ if entry.distfile not in info:
+ info[entry.distfile] = list()
+ info[entry.distfile].append(entry)
+
+ return info
+
+ @staticmethod
+ def load_distinfo(pkgsrc_dir: Path, distinfo: Path) -> List[DistinfoEntry]:
+ by_path: Dict[Path, DistinfoEntry] = {}
+
+ with open(Path(pkgsrc_dir, distinfo), 'r') as f:
+ for line in f:
+ m = re.match(r'(?x) (\S+) \s \( (\S+) \) \s = \s (\S+)', line)
+ if m:
+ algorithm, path, value = m.groups()
+ path = Path(path)
+ if path not in by_path:
+ by_path[path] = DistinfoEntry(path, {}, pkgsrc_dir, distinfo)
+ by_path[path].hashes[algorithm] = value
+ return list(by_path.values())
+
+
+def load_configuration(file: str) -> List[Distdir]:
+ distdirs: List[Distdir] = []
+
+ with open(file) as f:
+ distdir: Optional[Path] = None
+ database: Optional[Path] = None
+ packages_dirs = []
+ pkgsrc_dirs = []
+
+ lineno = 0
+ for line in f:
+ line = line.strip()
+ lineno += 1
+
+ if line == '' or line.startswith('#'):
+ continue
+
+ words = line.split()
+ if len(words) != 2:
+ raise ValueError(f'{file}:{lineno}: invalid line')
+ cmd, arg = words
+
+ if cmd == 'distdir':
+ if distdir is not None:
+ distdirs.append(Distdir(distdir, database, packages_dirs, pkgsrc_dirs))
+ database = None
+ packages_dirs = []
+ pkgsrc_dirs = []
+ distdir = Path(words[1])
+ if not distdir.exists():
+ raise ValueError(f"{file}:{lineno}: '{distdir}' must be a directory")
+ continue
+
+ if distdir is None:
+ raise ValueError(f"{file}:{lineno}: '{words[0]}' requires a preceding 'distdir'")
+
+ if cmd == 'database':
+ database = Path(arg)
+ elif cmd == 'packages':
+ packages = Path(arg)
+ if not packages.exists():
+ raise ValueError(f"{file}:{lineno}: '{packages}' must be a directory")
+ packages_dirs.append(packages)
+ elif cmd == 'pkgsrc':
+ pkgsrc = Path(arg)
+ if not pkgsrc.exists():
+ raise ValueError(f"{file}:{lineno}: '{pkgsrc}' must be a directory")
+ pkgsrc_dirs.append(pkgsrc)
+ else:
+ raise ValueError(f'{file}:{lineno}: {line}')
+
+ if distdir is not None:
+ distdirs.append(Distdir(distdir, database, packages_dirs, pkgsrc_dirs))
+
+ return distdirs
+
+
+def main() -> None:
+ p = argparse.ArgumentParser(description='Validate distfiles directory')
+ p.add_argument('file', nargs='+', help='configuration file')
+ args = p.parse_args()
+
+ distdirs: List[Distdir] = []
+ for file in args.file:
+ try:
+ distdirs.extend(load_configuration(file))
+ except ValueError as e:
+ raise SystemExit(f'error: {e.args[0]}')
+
+ for distdir in distdirs:
+ distdir.validate()
+
+
+if __name__ == '__main__':
+ main()