⚝
One Hat Cyber Team
⚝
Your IP:
216.73.216.144
Server IP:
157.245.143.252
Server:
Linux www 6.11.0-9-generic #9-Ubuntu SMP PREEMPT_DYNAMIC Mon Oct 14 13:19:59 UTC 2024 x86_64
Server Software:
nginx/1.26.0
PHP Version:
8.3.11
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
bin
/
View File Name :
add-apt-repository
#!/usr/bin/python3 import apt_pkg import io import os import re import sys import gettext import locale import argparse import subprocess import tempfile from softwareproperties.shortcuthandler import ShortcutException from softwareproperties.shortcuts import shortcut_handler from softwareproperties.ppa import PPAShortcutHandler from softwareproperties.cloudarchive import CloudArchiveShortcutHandler from softwareproperties.sourceslist import SourcesListShortcutHandler from softwareproperties.uri import URIShortcutHandler from softwareproperties.sourceutils import * from aptsources.sourceslist import SourcesList, SourceEntry from aptsources.distro import get_distro from copy import copy from gettext import gettext as _, ngettext apt_pkg.init() SOURCESLIST = apt_pkg.config.find_dir("Dir::Etc::sourceparts") + get_distro().id.lower() + ".sources" if not os.path.exists(SOURCESLIST): SOURCESLIST = apt_pkg.config.find_file("Dir::Etc::sourcelist") class AddAptRepository(object): def __init__(self): gettext.textdomain("software-properties") self.distro = get_distro() self.sourceslist = SourcesList(deb822=True) self.distro.get_sources(self.sourceslist) def parse_args(self, args): description = "Only ONE of -P, -C, -U, -S, or old-style 'line' can be specified" parser = argparse.ArgumentParser(description=description) parser.add_argument("-d", "--debug", action="store_true", help=_("Print debug")) parser.add_argument("-r", "--remove", action="store_true", help=_("Disable repository")) parser.add_argument("-s", "--enable-source", action="count", default=0, help=_("Allow downloading of the source packages from the repository")) parser.add_argument("-c", "--component", action="append", default=[], help=_("Components to use with the repository")) parser.add_argument("-p", "--pocket", help=_("Add entry for this pocket")) parser.add_argument("-y", "--yes", action="store_true", help=_("Assume yes to all queries")) parser.add_argument("-n", "--no-update", dest="update", action="store_false", help=_("Do not update package cache after adding")) parser.add_argument("-u", "--update", action="store_true", default=True, help=argparse.SUPPRESS) parser.add_argument("-l", "--login", action="store_true", help=_("Login to Launchpad.")) parser.add_argument("--dry-run", action="store_true", help=_("Don't actually make any changes.")) group = parser.add_mutually_exclusive_group() group.add_argument("--refresh-keys", action="store_true", help=_("Refresh signing keys for currently configured PPAs")) group.add_argument("-L", "--list", action="store_true", help=_("List currently configured repositories")) group.add_argument("-P", "--ppa", help=_("PPA to add")) group.add_argument("-C", "--cloud", help=_("Cloud Archive to add")) group.add_argument("-U", "--uri", help=_("Archive URI to add")) group.add_argument("-S", "--sourceslist", nargs='+', default=[], help=_("Full sources.list entry line to add")) group.add_argument("line", nargs='*', default=[], help=_("sources.list line to add (deprecated)")) self.parser = parser self.options = self.parser.parse_args(args) @property def dry_run(self): return self.options.dry_run @property def enable_source(self): return self.options.enable_source > 0 @property def components(self): return self.options.component @property def pocket(self): if self.options.pocket: return self.options.pocket.lower() return None @property def source_type(self): return self.distro.source_type @property def binary_type(self): return self.distro.binary_type def is_components(self, comps): if not comps: return False return set(comps.split()) <= set([comp.name for comp in self.distro.source_template.components]) def apt_update(self): if self.options.update and not self.dry_run: # We prefer to run apt-get update here. The built-in update support # does not have any progress, and only works for shortcuts. Moving # it to something like save() and using apt.progress.text would # solve the problem, but the new errors might cause problems with # the dbus server or other users of the API. Also, it's unclear # how good the text progress is or how to pass it best. subprocess.run(['apt-get', 'update']) def prompt_user(self): if self.dry_run: print(_("DRY-RUN mode: no modifications will be made")) return if not self.options.yes and sys.stdin.isatty() and not "FORCE_ADD_APT_REPOSITORY" in os.environ: try: input(_("Press [ENTER] to continue or Ctrl-c to cancel.")) except KeyboardInterrupt: print(_("Aborted.")) sys.exit(1) def prompt_user_shortcut(self, shortcut): '''Display more information about the shortcut / ppa info''' print(_("Repository: '%s'") % shortcut.SourceEntry().line) if shortcut.description: # strip ANSI escape sequences description = re.sub(r"(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]", "", shortcut.description) print(_("Description:")) print(description) if shortcut.web_link: print(_("More info: %s") % shortcut.web_link) if self.options.remove: print(_("Removing repository.")) else: print(_("Adding repository.")) self.prompt_user() def _add_components(self, sources): components = {} for entry in sources: key = (entry.uri, entry.dist, entry.type) if key in components: components[key] += entry.comps else: components[key] = copy(entry.comps) for (uri, dist, type), comps in components.items(): added = set(self.components) - set(comps) if added: comps = list(set(comps) | set(self.components)) entry = self.sourceslist.add( type=type, dist=dist, uri=uri, orig_comps=comps, file=SOURCESLIST, comment="Added by software-properties", ) print(_("Added %s to: %s") % (' '.join(added), str(entry))) def _remove_components(self, sources): for entry in sources: removed = set(entry.comps) & set(self.components) if removed: entry.comps = list(set(entry.comps) - set(self.components)) print(_("Removed %s from: %s") % (' '.join(removed), str(entry))) if not entry.comps: self.sourceslist.remove(entry) def change_components(self): sources = [s for s in self.sourceslist.exploded_list() if not s.invalid \ and not s.disabled and s.file == SOURCESLIST] if self.options.remove: self._remove_components(sources) else: self._add_components(sources) if not self.dry_run: self.sourceslist.save() def _add_pocket(self, sources): binary_entries = {} for s in sources: if s.invalid or s.disabled: continue if s.type != self.binary_type or s.file != SOURCESLIST: continue suite = get_source_entry_suite(s) if (s.uri, suite) in binary_entries: binary_entries[(s.uri, suite)].append(s) else: binary_entries[(s.uri, suite)] = [s] for (uri, suite), entries in binary_entries.items(): pockets = [] have_pocket = False have_release = False for e in entries: p = get_source_entry_pocket(e) if p == self.pocket: print(_("Existing: %s") % str(e)) have_pocket = True elif p == 'release': have_release = True pockets.append(p) if have_pocket: continue if uri.startswith("http://security.ubuntu.com"): continue if have_release: comps = [] for e in entries: if get_source_entry_pocket(e) == 'release': comps += e.comps comps = list(set(comps)) else: comps = ['main', 'restricted'] entry = self.sourceslist.add( type=self.binary_type, uri=uri, dist='{}-{}'.format(suite, self.pocket), orig_comps=comps, file=SOURCESLIST, comment="Added by software-properties", parent=entries[0] if entries else None, ) print(_("Adding: %s") % str(entry)) def _remove_pocket(self, sources): for entry in sources: if entry.invalid or entry.disabled or entry.file != SOURCESLIST: continue if get_source_entry_pocket(entry) != self.pocket: continue self.sourceslist.remove(entry) print(_("Removed: %s") % str(entry)) def change_pocket(self): exploded = self.sourceslist.exploded_list() if self.options.remove: self._remove_pocket(exploded) else: self._add_pocket(exploded) if not self.dry_run: self.sourceslist.save() def _enable_source(self): # Check each disabled deb-src line, enable if matching deb line exists for s in self.sourceslist.exploded_list(): if s.invalid or not s.disabled or s.type != self.source_type: continue b_entry = None for b in self.sourceslist: if b.type != self.binary_type or b.disabled: continue if (b.uri, b.dist, b.comps) != (s.uri, s.dist, s.comps): continue b_entry = b break if not b_entry: # no matching binary lines, leave the source line disabled continue disabled_comps = list(set(s.comps) - set(b_entry.comps)) enabled_comps = list(set(s.comps) & set(b_entry.comps)) if not enabled_comps: # we can't enable any of the line continue if disabled_comps: index = sources.index(s) tmp = replace_source_entry(s, comps=disabled_comps) self.sourceslist.list.insert(index + 1, tmp) s.comps = enabled_comps s.set_enabled(True) print(_("Enabled: %s") % str(s).strip()) # Check each enabled deb line, to warn about missing deb-src lines, or add one if -ss for b in self.sourceslist: if b.invalid or b.disabled or b.type != self.binary_type: continue s = replace_source_entry(b, type=self.source_type) s_entry = get_source_entry_from_list(self.sourceslist, s) scomps = set(s_entry.comps if s_entry else []) missing_comps = list(set(b.comps) - scomps) if not missing_comps: continue s.comps = missing_comps if self.options.enable_source > 1: # with multiple -s, add new deb-src entries if needed for all deb entries self.sourceslist.add( type=s.type, uri=s.uri, dist=s.dist, orig_comps=s.comps, file=s.file, comment="Added by software-properties", ) print(_("Added: %s") % str(s).strip()) else: # if only one -s used, don't add missing deb-src, just notify print(_("Warning, missing deb-src for: %s") % str(s).strip()) def _disable_source(self): for s in self.sourceslist: if s.invalid or s.disabled or s.type != self.source_type: continue s.set_enabled(False) print(_("Disabled: %s") % str(s).strip()) def change_source(self): if self.options.remove: self._disable_source() else: self._enable_source() if not self.dry_run: self.sourceslist.save() def global_change(self): if self.components: if self.options.remove: print(_("Removing component(s) '%s' from all repositories.") % ', '.join(self.components)) else: print(_("Adding component(s) '%s' to all repositories.") % ', '.join(self.components)) if self.pocket: if self.options.remove: print(_("Removing pocket %s for all repositories.") % self.pocket) else: print(_("Adding pocket %s for all repositories.") % self.pocket) if self.enable_source: if self.options.remove: print(_("Disabling %s for all repositories.") % self.source_type) else: print(_("Enabling %s for all repositories.") % self.source_type) self.prompt_user() if self.components: self.change_components() if self.pocket: self.change_pocket() if self.enable_source: self.change_source() def show_list(self): merged = {} for s in self.sourceslist: if s.invalid or s.disabled: continue if not self.enable_source and s.type == self.source_type: continue k = (s.type, s.uri, s.dist) if k in merged: merged[k].comps += list(set(s.comps) - set(merged[k].comps)) else: merged[k] = s for s in merged.values(): print(s, '\n') def keyalgos(self, keys, filter_caps="sS"): """Returns a list of key algorithms used. This is formatted as either the curve name or the algorithm name followed by key size; for example, ed25519 or rsa2048. The list is deduplicated. It is possible to filter by capabilities, keys matching any of the capabilities specified in 'filter_caps' are returned. By default, we filter for signing keys only. """ cmd = 'gpg --show-keys -q --no-options --no-keyring --batch --with-colons' # yes, --with-fingerprint twice, to print subkey fingerprints cmd += ' --with-fingerprint' * 2 try: with tempfile.TemporaryDirectory() as homedir: cmd += f' --homedir {homedir}' if not isinstance(keys, bytes): keys = keys.encode() stdout = subprocess.run(cmd.split(), check=True, input=keys, stdout=subprocess.PIPE).stdout.decode() except subprocess.CalledProcessError as e: print(_("Warning: gpg error while processing keys:\n%s") % e) return [] out = set() # Non-curve algorithms algos = {"1": "rsa", "2": "rsa", "3": "rsa", "16": "ElGamal", "17": "dsa"} for line in stdout.splitlines(): fields = line.split(":") if fields[0] not in ("pub", "sub"): continue try: length, algo = fields[2], fields[3] caps = fields[11] if len(fields) > 11 else "" curve = fields[16] if len(fields) > 16 else None except KeyError: print(_("Warning: invalid gpg output:\n%s") % stdout) continue if caps and filter_caps and not any(c in caps for c in filter_caps): continue if curve: out.add(curve) elif algo in algos: out.add(algos[algo] + str(length)) else: out.add("unknown-algorithm:" + algo) return sorted(out) def refresh_signing_keys(self): ppa_matcher = re.compile("https?://ppa.launchpad(content)?.net/(.*)/(.*)/ubuntu.*") refresh = [] # Handlers to refresh key files for or remove key files from to_refresh = [] to_remove = [] known_parts = set() unknown_repos = [] # Cache the launchpad object so we don't send multiple login requests cached_lp = None for s in self.sourceslist: for uri in s.uris: match = ppa_matcher.match(s.uri) if match: break if not match: if hasattr(s, "section") and s.section.get("Signed-By"): signed_by = s.section["Signed-By"].strip() elif not hasattr(s, "section") and re.search("\\[.*signed-by.*\\]", s.line): signed_by = re.search("signed-by=(\\S*)", s.line)[1].strip() else: signed_by = "" if not signed_by or signed_by.startswith("/usr/share/keyrings/ubuntu-"): continue algos = None if "-----BEGIN PGP PUBLIC KEY BLOCK-----" in signed_by: keys = "\n".join(line if line.strip() != "." else "" for line in signed_by.split("\n")).strip() algos = self.keyalgos(keys) elif os.path.exists(signed_by): with open(signed_by, "rb") as keyfile: algos = self.keyalgos(keyfile.read()) if algos: unknown_repos.append(f"{s.uri} ({', '.join(algos)})") else: unknown_repos.append(s.uri) continue shortcut = f"ppa:{match[2]}/{match[3]}" print(_("Checking signing key for %s...") % shortcut) try: handler = PPAShortcutHandler(shortcut, login=self.options.login, lp=cached_lp) cached_lp = handler.lp known_parts.add(os.path.basename(handler.trustedparts_file)) if hasattr(s, "section") and "-----BEGIN PGP PUBLIC KEY BLOCK-----" in s.section.get("Signed-By"): # Encoding copied from softwareproperties/shortcuthandler.py lines = handler.trustedparts_content.splitlines() lines = [' ' + (l if l.strip() else '.') for l in lines] new = '\n' + '\n'.join(lines) if new.strip() != s.section["Signed-By"].strip(): refresh.append(shortcut) s.section["Signed-By"] = new # A legacy trusted.gpg.d key exists, remove it if os.path.exists(handler.trustedparts_file): to_remove.append(handler) elif os.path.exists(handler.trustedparts_file): # Disable deb822 processing, we are storing the key in trusted.gpg.d handler.deb822 = False with open(handler.trustedparts_file, "rb") as file: if handler.fingerprints(handler.trustedparts_content) != handler.fingerprints(file.read()): refresh.append(shortcut) to_refresh.append(handler) else: unknown_repos.append(s.uri) except Exception as e: print(_("Error: Could not check signing key: %s") % e) print() if unknown_repos: print(_("The following repositories with Signed-By cannot be refreshed automatically:")) for descriptor in unknown_repos: print(" -", descriptor) print() interesting_part = lambda f: (f.endswith(".asc") or f.endswith(".gpg")) and not f.startswith("ubuntu-pro") and not f.startswith("ubuntu-keyring") unknown_parts = set(f for f in os.listdir("/etc/apt/trusted.gpg.d") if interesting_part(f)) - known_parts if unknown_parts: print(_("The following trusted.gpg.d keys cannot be refreshed automatically:")) for file in unknown_parts: algos = None with open("/etc/apt/trusted.gpg.d/" + file, "rb") as keyfile: algos = self.keyalgos(keyfile.read()) if algos: print(" -", file, f"({', '.join(algos)})") else: print(" -", file) print() if not refresh: print(_("All Launchpad PPA signing keys are up-to-date")) else: print(ngettext("Refreshing signing key for %d Launchpad PPA:", "Refreshing signing keys for %d Launchpad PPAs:", len(refresh)) % len(refresh)) for r in refresh: print(" -", r) if to_remove: print(_("The following legacy signing keys will be disabled:")) for handler in to_remove: print(" -", handler.trustedparts_file) if not self.dry_run and (refresh or to_remove): self.prompt_user() self.sourceslist.backup(".save") for handler in to_remove: print("Removing", handler.trustedparts_file) os.rename(handler.trustedparts_file, handler.trustedparts_file + ".save") for handler in to_refresh: print("Replacing", handler.trustedparts_file) os.rename(handler.trustedparts_file, handler.trustedparts_file + ".save") handler.add_key() self.sourceslist.save() def main(self, args=sys.argv[1:]): self.parse_args(args) if not any((self.dry_run, self.options.list, os.geteuid() == 0)): print(_("Error: must run as root")) return False line = ' '.join(self.options.line) if line == '-': line = sys.stdin.readline().strip() # if 'line' is only (valid) components, handle as if only -c was used with no line if self.is_components(line): self.options.component += line.split() line = '' if self.options.ppa: source = self.options.ppa if not ':' in source: source = 'ppa:' + source handler = PPAShortcutHandler elif self.options.cloud: source = self.options.cloud if not ':' in source: source = 'uca:' + source handler = CloudArchiveShortcutHandler elif self.options.uri: source = self.options.uri handler = URIShortcutHandler elif self.options.sourceslist: source = ' '.join(self.options.sourceslist) handler = SourcesListShortcutHandler elif line: source = line handler = shortcut_handler elif self.options.list: self.show_list() return True elif self.options.refresh_keys: self.refresh_signing_keys() return True elif any((self.enable_source, self.components, self.pocket)): self.global_change() self.apt_update() return True else: print(_("Error: no actions requested.")) self.parser.print_help() return False try: shortcut_params = { 'login': self.options.login, 'enable_source': self.enable_source, 'dry_run': self.dry_run, 'components': self.components, 'pocket': self.pocket, } shortcut = handler(source, **shortcut_params) except ShortcutException as e: print(e) return False self.prompt_user_shortcut(shortcut) if self.options.remove: shortcut.remove() else: shortcut.add() self.apt_update() return True if __name__ == '__main__': addaptrepo = AddAptRepository() sys.exit(0 if addaptrepo.main() else 1)