⚝
One Hat Cyber Team
⚝
Your IP:
216.73.216.144
Server IP:
157.245.143.252
Server:
Linux www 6.11.0-9-generic #9-Ubuntu SMP PREEMPT_DYNAMIC Mon Oct 14 13:19:59 UTC 2024 x86_64
Server Software:
nginx/1.26.0
PHP Version:
8.3.11
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
usr
/
lib
/
python3
/
dist-packages
/
DistUpgrade
/
View File Name :
DistUpgradeQuirks.py
# DistUpgradeQuirks.py # # Copyright (c) 2004-2010 Canonical # # Author: Michael Vogt
# # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import apt import atexit import distro_info import glob import logging import os import pwd import re import subprocess import pathlib from subprocess import PIPE, Popen from .utils import get_arch from .DistUpgradeGettext import gettext as _ class DistUpgradeQuirks(object): """ This class collects the various quirks handlers that can be hooked into to fix/work around issues that the individual releases have. The following handlers are supported: - PreCacheOpen: run *before* the apt cache is opened the first time to set options that affect the cache - PostInitialUpdate: run *before* the sources.list is rewritten but after an initial apt-get update - PreDistUpgradeCache: run *right before* the dist-upgrade is calculated in the cache - PostDistUpgradeCache: run *after* the dist-upgrade was calculated in the cache - StartUpgrade: before the first package gets installed (but the download is finished) - PostUpgrade: run *after* the upgrade is finished successfully and packages got installed - PostCleanup: run *after* the cleanup (orphaned etc) is finished """ def __init__(self, controller, config): self.controller = controller self._view = controller._view self.config = config self.uname = Popen(["uname", "-r"], stdout=PIPE, universal_newlines=True).communicate()[0].strip() self.arch = get_arch() self.extra_snap_space = 0 self._poke = None self._snapstore_reachable = False self._snap_list = None self._from_version = None self._to_version = None self._did_change_font = False # individual quirks handler that run *before* the cache is opened def PreCacheOpen(self): """ run before the apt cache is opened the first time """ # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return logging.debug("running Quirks.PreCacheOpen") self._add_apport_ignore_list() # individual quirks handler that run *after* the cache is opened def PostInitialUpdate(self): # PreCacheOpen would be better but controller.abort fails terribly """ run after the apt cache is opened the first time """ # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return logging.debug("running Quirks.PostInitialUpdate") self._get_from_and_to_version() self._test_and_fail_on_tpm_fde() cache = self.controller.cache self._test_and_warn_if_ros_installed(cache) self._maybe_prevent_flatpak_auto_removal() if 'snapd' not in cache: logging.debug("package required for Quirk not in cache") return if cache['snapd'].is_installed and \ (os.path.exists('/run/snapd.socket') or os.path.exists('/run/snapd-snap.socket')): self._checkStoreConnectivity() # If the snap store is accessible, at the same time calculate the # extra size needed by to-be-installed snaps. This also prepares # the snaps-to-install list for the actual upgrade. if self._snapstore_reachable: self._calculateSnapSizeRequirements() def PostUpgrade(self): # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return logging.debug("running Quirks.PostUpgrade") cache = self.controller.cache if 'snapd' not in cache: logging.debug("package required for Quirk not in cache") return if cache['snapd'].is_installed and \ self._snap_list: self._replaceDebsAndSnaps() if 'ubuntu-desktop-raspi' in cache: if cache['ubuntu-desktop-raspi'].is_installed: self._replace_fkms_overlay() if 'ubuntu-server-raspi' in cache: if cache['ubuntu-server-raspi'].is_installed: self._add_kms_overlay() # individual quirks handler when the dpkg run is finished --------- def PostCleanup(self): # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return " run after cleanup " logging.debug("running Quirks.PostCleanup") self._remove_apport_ignore_list() # run right before the first packages get installed def StartUpgrade(self): # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return logging.debug("running Quirks.StartUpgrade") cache = self.controller.cache self._removeOldApportCrashes() self._killUpdateNotifier() self._pokeScreensaver() self._set_generic_font() self._disable_kdump_tools_on_install(cache) # individual quirks handler that run *right before* the dist-upgrade # is calculated in the cache def PreDistUpgradeCache(self): """ run right before calculating the dist-upgrade """ # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return logging.debug("running Quirks.PreDistUpgradeCache") self._maybe_remove_gpg_wks_server() # individual quirks handler that run *after* the dist-upgrade was # calculated in the cache def PostDistUpgradeCache(self): """ run after calculating the dist-upgrade """ # we do not run any quirks in partialUpgrade mode if self.controller._partialUpgrade: logging.info("not running quirks in partialUpgrade mode") return logging.debug("running Quirks.PostDistUpgradeCache") self._install_linux_metapackage() # helpers def _get_from_and_to_version(self): di = distro_info.UbuntuDistroInfo() try: self._from_version = \ di.version('%s' % self.controller.fromDist).split()[0] self._to_version = \ di.version('%s' % self.controller.toDist).split()[0] # Ubuntu 18.04's python3-distro-info does not have version except AttributeError: self._from_version = next( (r.version for r in di.get_all("object") if r.series == self.controller.fromDist), self.controller.fromDist).split()[0] self._to_version = next( (r.version for r in di.get_all("object") if r.series == self.controller.toDist), self.controller.toDist).split()[0] def _test_and_warn_if_ros_installed(self, cache): """ Test and warn if ROS is installed. A given ROS release only supports specific Ubuntu releases, and can cause the upgrade to fail in an overly-cryptic manner. """ # These are the root ROS 1 and 2 dependencies as of 07/27/2020 ros_package_patterns = set() for package_name in ( "catkin", "rosboost-cfg", "rosclean", "ros-environment", "ros-workspace"): ros_package_patterns.add( re.compile(r"ros-[^\-]+-%s" % package_name)) ros_is_installed = False for pkg in cache: if ros_is_installed: break for pattern in ros_package_patterns: if pattern.match(pkg.name): if pkg.is_installed or pkg.marked_install: ros_is_installed = True break if ros_is_installed: res = self._view.askYesNoQuestion( _("The Robot Operating System (ROS) is installed"), _("It appears that ROS is currently installed. Each ROS " "release is very strict about the versions of Ubuntu " "it supports, and Ubuntu upgrades can fail if that " "guidance isn't followed. Before continuing, please " "either uninstall ROS, or ensure the ROS release you " "have installed supports the version of Ubuntu to " "which you're upgrading.\n\n" "For ROS 1 releases, refer to REP 3:\n" "https://www.ros.org/reps/rep-0003.html\n\n" "For ROS 2 releases, refer to REP 2000:\n" "https://www.ros.org/reps/rep-2000.html\n\n" "Are you sure you want to continue?")) if not res: self.controller.abort() def _maybe_prevent_flatpak_auto_removal(self): """ If flatpak is installed, and there are either active remotes, or flatpak apps installed, prevent flatpak's auto-removal on upgrade. """ prevent_auto_removal = False if "flatpak" not in self.controller.cache: return if not self.controller.cache["flatpak"].is_installed: return if not os.path.exists("/usr/bin/flatpak"): return for subcmd in ["remotes", "list"]: r = subprocess.run( ["/usr/bin/flatpak", subcmd], stdout=subprocess.PIPE ) if r.stdout.decode("utf-8").strip(): prevent_auto_removal = True break logging.debug("flatpak will{}be marked as manually installed" .format(" " if prevent_auto_removal else " NOT ")) if not prevent_auto_removal: return self.controller.cache["flatpak"].mark_auto(auto=False) for pkg in ("plasma-discover-backend-flatpak", "gnome-software-plugin-flatpak"): if pkg not in self.controller.cache: continue if not self.controller.cache[pkg].is_installed: continue logging.debug("{} will be marked as manually installed" .format(pkg)) self.controller.cache[pkg].mark_auto(auto=False) self.controller.cache.commit( self._view.getAcquireProgress(), self._view.getInstallProgress(self.controller.cache) ) def _add_apport_ignore_list(self): ignore_list = [ '/usr/libexec/tracker-extract-3', # LP: #2012638 '/usr/sbin/update-apt-xapian-index', # LP: #2058227 ] path = '/etc/apport/blacklist.d/upgrade-quirks-ignore-list' try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'w') as f: for bin in ignore_list: f.write(f'{bin}\n') except Exception as e: logging.debug(f'Failed to create {path}: {e}') def _remove_apport_ignore_list(self): path = '/etc/apport/blacklist.d/upgrade-quirks-ignore-list' try: os.remove(path) except Exception as e: logging.debug(f'Failed to remove {path}: {e}') def _killUpdateNotifier(self): "kill update-notifier" # kill update-notifier now to suppress reboot required if os.path.exists("/usr/bin/killall"): logging.debug("killing update-notifier") subprocess.call(["killall", "-q", "update-notifier"]) def _pokeScreensaver(self): if (os.path.exists("/usr/bin/xdg-screensaver") and os.environ.get('DISPLAY')): logging.debug("setup poke timer for the screensaver") cmd = "while true;" cmd += " do /usr/bin/xdg-screensaver reset >/dev/null 2>&1;" cmd += " sleep 30; done" try: self._poke = subprocess.Popen(cmd, shell=True) atexit.register(self._stopPokeScreensaver) except (OSError, ValueError): logging.exception("failed to setup screensaver poke") def _stopPokeScreensaver(self): res = False if self._poke is not None: try: self._poke.terminate() res = self._poke.wait() except OSError: logging.exception("failed to stop screensaver poke") self._poke = None return res def _removeOldApportCrashes(self): " remove old apport crash files and whoopsie control files " try: for ext in ['.crash', '.upload', '.uploaded']: for f in glob.glob("/var/crash/*%s" % ext): logging.debug("removing old %s file '%s'" % (ext, f)) os.unlink(f) except Exception as e: logging.warning("error during unlink of old crash files (%s)" % e) def _checkStoreConnectivity(self): """ check for connectivity to the snap store to install snaps""" res = False snap_env = os.environ.copy() snap_env["LANG"] = "C.UTF-8" connected = Popen(["snap", "debug", "connectivity"], stdout=PIPE, stderr=PIPE, env=snap_env, universal_newlines=True).communicate() if re.search(r"^ \* PASS", connected[0], re.MULTILINE): self._snapstore_reachable = True return # can't connect elif re.search(r"^ \*.*unreachable", connected[0], re.MULTILINE): logging.error("No snap store connectivity") old_lxd_deb_installed = False cache = self.controller.cache if 'lxd' in cache: # epoch 1 is the transitional deb if cache['lxd'].is_installed and not \ cache['lxd'].candidate.version.startswith("1:"): logging.error("lxd is installed") old_lxd_deb_installed = True if old_lxd_deb_installed: summary = _("Connection to the Snap Store failed") msg = _("You have the package lxd installed but your " "system is unable to reach the Snap Store. " "lxd is now provided via a snap and the release " "upgrade will fail if snapd is not functional. " "Please make sure you're connected to the " "Internet and update any firewall or proxy " "settings as needed so that you can reach " "api.snapcraft.io. If you are an enterprise " "with a firewall setup you may want to configure " "a Snap Store proxy." ) self._view.error(summary, msg) self.controller.abort() else: res = self._view.askYesNoQuestion( _("Connection to Snap Store failed"), _("Your system does not have a connection to the Snap " "Store. For the best upgrade experience make sure " "that your system can connect to api.snapcraft.io.\n" "Do you still want to continue with the upgrade?") ) # debug command not available elif 'error: unknown command' in connected[1]: logging.error("snap debug command not available") res = self._view.askYesNoQuestion( _("Outdated snapd package"), _("Your system does not have the latest version of snapd. " "Please update the version of snapd on your system to " "improve the upgrade experience.\n" "Do you still want to continue with the upgrade?") ) # not running as root elif 'error: access denied' in connected[1]: res = False logging.error("Not running as root!") else: logging.error("Unhandled error connecting to the snap store.") if not res: self.controller.abort() def _calculateSnapSizeRequirements(self): import json import urllib.request from urllib.error import URLError # first fetch the list of snap-deb replacements that will be needed # and store them for future reference, along with other data we'll # need in the process self._prepare_snap_replacement_data() # now perform direct API calls to the store, requesting size # information for each of the snaps needing installation self._view.updateStatus(_("Calculating snap size requirements")) for snap, snap_object in self._snap_list.items(): if snap_object['command'] != 'install': continue action = { "instance-key": "upgrade-size-check", "action": "download", "snap-id": snap_object['snap-id'], "channel": snap_object['channel'], } data = { "context": [], "actions": [action], } req = urllib.request.Request( url='https://api.snapcraft.io/v2/snaps/refresh', data=bytes(json.dumps(data), encoding='utf-8')) req.add_header('Snap-Device-Series', '16') req.add_header('Content-type', 'application/json') req.add_header('Snap-Device-Architecture', self.arch) try: response = urllib.request.urlopen(req).read() info = json.loads(response) size = int(info['results'][0]['snap']['download']['size']) except (KeyError, URLError, ValueError): logging.debug("Failed fetching size of snap %s" % snap) continue self.extra_snap_space += size def _replaceDebsAndSnaps(self): """ install a snap and mark its corresponding package for removal """ self._view.updateStatus(_("Processing snap replacements")) # _snap_list should be populated by the earlier # _calculateSnapSizeRequirements call. for snap, snap_object in self._snap_list.items(): command = snap_object['command'] if command == 'switch': channel = snap_object['channel'] self._view.updateStatus( _("switching channel for snap %s" % snap) ) popenargs = ["snap", command, "--channel", channel, snap] elif command == 'remove': self._view.updateStatus(_("removing snap %s" % snap)) popenargs = ["snap", command, snap] else: self._view.updateStatus(_("installing snap %s" % snap)) popenargs = ["snap", command, "--channel", snap_object['channel'], snap] try: self._view.processEvents() proc = subprocess.run( popenargs, stdout=subprocess.PIPE, check=True) self._view.processEvents() except subprocess.CalledProcessError: logging.debug("%s of snap %s failed" % (command, snap)) continue if proc.returncode == 0: logging.debug("%s of snap %s succeeded" % (command, snap)) if command == 'install' and snap_object['deb']: self.controller.forced_obsoletes.append(snap_object['deb']) def _is_greater_than(self, term1, term2): """ copied from ubuntu-drivers common """ # We don't want to take into account # the flavour pattern = re.compile('(.+)-([0-9]+)-(.+)') match1 = pattern.match(term1) match2 = pattern.match(term2) if match1 and match2: term1 = '%s-%s' % (match1.group(1), match1.group(2)) term2 = '%s-%s' % (match2.group(1), match2.group(2)) logging.debug('Comparing %s with %s' % (term1, term2)) return apt.apt_pkg.version_compare(term1, term2) > 0 def _get_linux_metapackage(self, cache, headers): """ Get the linux headers or linux metapackage copied from ubuntu-drivers-common """ suffix = headers and '-headers' or '' pattern = re.compile('linux-image-(.+)-([0-9]+)-(.+)') source_pattern = re.compile('linux-(.+)') metapackage = '' version = '' for pkg in cache: if ('linux-image' in pkg.name and 'extra' not in pkg.name and (pkg.is_installed or pkg.marked_install)): match = pattern.match(pkg.name) # Here we filter out packages such as # linux-generic-lts-quantal if match: source = pkg.candidate.record['Source'] current_version = '%s-%s' % (match.group(1), match.group(2)) # See if the current version is greater than # the greatest that we've found so far if self._is_greater_than(current_version, version): version = current_version match_source = source_pattern.match(source) # Set the linux-headers metapackage if '-lts-' in source and match_source: # This is the case of packages such as # linux-image-3.5.0-18-generic which # comes from linux-lts-quantal. # Therefore the linux-headers-generic # metapackage would be wrong here and # we should use # linux-headers-generic-lts-quantal # instead metapackage = 'linux%s-%s-%s' % ( suffix, match.group(3), match_source.group(1)) else: # The scheme linux-headers-$flavour works # well here metapackage = 'linux%s-%s' % ( suffix, match.group(3)) return metapackage def _install_linux_metapackage(self): """ Ensure the linux metapackage is installed for the newest_kernel installed. (LP: #1509305) """ cache = self.controller.cache linux_metapackage = self._get_linux_metapackage(cache, False) # Seen on errors.u.c with linux-rpi2 metapackage # https://errors.ubuntu.com/problem/994bf05fae85fbcd44f721495db6518f2d5a126d if linux_metapackage not in cache: logging.info("linux metapackage (%s) not available" % linux_metapackage) return # install the package if it isn't installed if not cache[linux_metapackage].is_installed: logging.info("installing linux metapackage: %s" % linux_metapackage) reason = "linux metapackage may have been accidentally uninstalled" cache.mark_install(linux_metapackage, reason) def ensure_recommends_are_installed_on_desktops(self): """ ensure that on a desktop install recommends are installed (LP: #759262) """ if not self.controller.serverMode: if not apt.apt_pkg.config.find_b("Apt::Install-Recommends"): msg = "Apt::Install-Recommends was disabled," msg += " enabling it just for the upgrade" logging.warning(msg) apt.apt_pkg.config.set("Apt::Install-Recommends", "1") def _is_deb2snap_metapkg_installed(self, deb2snap_entry): """ Helper function that checks if the given deb2snap entry has at least one metapkg which is installed on the system. """ metapkg_list = deb2snap_entry.get("metapkg", None) if not isinstance(metapkg_list, list): metapkg_list = [metapkg_list] for metapkg in metapkg_list: if metapkg not in self.controller.cache: continue if metapkg and \ self.controller.cache[metapkg].is_installed is False: continue return True return False def _prepare_snap_replacement_data(self): """ Helper function fetching all required info for the deb-to-snap migration: version strings for upgrade (from and to) and the list of snaps (with actions). """ import json self._snap_list = {} from_channel = "stable/ubuntu-%s" % self._from_version to_channel = "stable/ubuntu-%s" % self._to_version seeded_snaps = {} unseeded_snaps = {} try: current_path = os.path.dirname(os.path.abspath(__file__)) d2s_file = open(current_path + '/deb2snap.json', 'r') d2s = json.load(d2s_file) d2s_file.close() for snap in d2s["seeded"]: seed = d2s["seeded"][snap] if not self._is_deb2snap_metapkg_installed(seed): continue deb = seed.get("deb", None) from_chan = seed.get("from_channel", from_channel) to_chan = seed.get("to_channel", to_channel) force_switch = seed.get("force_switch", False) seeded_snaps[snap] = (deb, from_chan, to_chan, force_switch) for snap in d2s["unseeded"]: unseed = d2s["unseeded"][snap] deb = unseed.get("deb", None) if not self._is_deb2snap_metapkg_installed(unseed): continue from_chan = unseed.get("from_channel", from_channel) unseeded_snaps[snap] = (deb, from_chan) except Exception as e: logging.warning("error reading deb2snap.json file (%s)" % e) snap_list = '' # list the installed snaps and add them to seeded ones snap_list = subprocess.Popen(["snap", "list"], universal_newlines=True, stdout=subprocess.PIPE).communicate() if snap_list: # first line of output is a header and the last line is empty snaps_installed = [line.split()[0] for line in snap_list[0].split('\n')[1:-1]] for snap in snaps_installed: if snap in seeded_snaps or snap in unseeded_snaps: continue else: seeded_snaps[snap] = ( None, from_channel, to_channel, False ) self._view.updateStatus(_("Checking for installed snaps")) for snap, props in seeded_snaps.items(): (deb, from_channel, to_channel, force_switch) = props snap_object = {} # check to see if the snap is already installed snap_info = subprocess.Popen(["snap", "info", snap], universal_newlines=True, stdout=subprocess.PIPE).communicate() self._view.processEvents() if re.search(r"^installed: ", snap_info[0], re.MULTILINE): logging.debug("Snap %s is installed" % snap) if not re.search(r"^tracking:.*%s" % from_channel, snap_info[0], re.MULTILINE): logging.debug("Snap %s is not tracking the release channel" % snap) if not force_switch: # It is not tracking the release channel, and we were # not told to force so don't switch. continue snap_object['command'] = 'switch' else: # Do not replace packages not installed cache = self.controller.cache if (deb and (deb not in cache or not cache[deb].is_installed)): logging.debug("Deb package %s is not installed. Skipping " "snap package %s installation" % (deb, snap)) continue match = re.search(r"snap-id:\s*(\w*)", snap_info[0]) if not match: logging.debug("Could not parse snap-id for the %s snap" % snap) continue snap_object['command'] = 'install' snap_object['deb'] = deb snap_object['snap-id'] = match[1] snap_object['channel'] = to_channel self._snap_list[snap] = snap_object for snap, (deb, from_channel) in unseeded_snaps.items(): snap_object = {} # check to see if the snap is already installed snap_info = subprocess.Popen(["snap", "info", snap], universal_newlines=True, stdout=subprocess.PIPE).communicate() self._view.processEvents() if re.search(r"^installed: ", snap_info[0], re.MULTILINE): logging.debug("Snap %s is installed" % snap) # its not tracking the release channel so don't remove if not re.search(r"^tracking:.*%s" % from_channel, snap_info[0], re.MULTILINE): logging.debug("Snap %s is not tracking the release channel" % snap) continue snap_object['command'] = 'remove' # check if this snap is being used by any other snaps conns = subprocess.Popen(["snap", "connections", snap], universal_newlines=True, stdout=subprocess.PIPE).communicate() self._view.processEvents() for conn in conns[0].split('\n'): conn_cols = conn.split() if len(conn_cols) != 4: continue plug = conn_cols[1] slot = conn_cols[2] if slot.startswith(snap + ':'): plug_snap = plug.split(':')[0] if plug_snap != '-' and \ plug_snap not in unseeded_snaps: logging.debug("Snap %s is being used by %s. " "Switching it to stable track" % (snap, plug_snap)) snap_object['command'] = 'switch' snap_object['channel'] = 'stable' break self._snap_list[snap] = snap_object return self._snap_list def _replace_pi_boot_config(self, old_config, new_config, boot_config_filename, failure_action): try: boot_backup_filename = boot_config_filename + '.distUpgrade' with open(boot_backup_filename, 'w', encoding='utf-8') as f: f.write(old_config) except IOError as exc: logging.error("unable to write boot config backup to %s: %s; %s", boot_backup_filename, exc, failure_action) return try: with open(boot_config_filename, 'w', encoding='utf-8') as f: f.write(new_config) except IOError as exc: logging.error("unable to write new boot config to %s: %s; %s", boot_config_filename, exc, failure_action) def _replace_fkms_overlay(self, boot_dir='/boot/firmware'): failure_action = ( "You may need to replace the vc4-fkms-v3d overlay with " "vc4-kms-v3d in config.txt on your boot partition") try: boot_config_filename = os.path.join(boot_dir, 'config.txt') with open(boot_config_filename, 'r', encoding='utf-8') as f: boot_config = f.read() except FileNotFoundError: logging.error("failed to open boot configuration in %s; %s", boot_config_filename, failure_action) return new_config = ''.join( # startswith and replace used to cope with (and preserve) any # trailing d-t parameters, and any use of the -pi4 suffix '# changed by do-release-upgrade (LP: #1923673)\n#' + line + line.replace('dtoverlay=vc4-fkms-v3d', 'dtoverlay=vc4-kms-v3d') if line.startswith('dtoverlay=vc4-fkms-v3d') else # camera firmware disabled due to incompatibility with "full" kms # overlay; without the camera firmware active it's also better to # disable gpu_mem leaving the default (64MB) to allow as much as # possible for the KMS driver '# disabled by do-release-upgrade (LP: #1923673)\n#' + line if line.startswith('gpu_mem=') or line.rstrip() == 'start_x=1' else line for line in boot_config.splitlines(keepends=True) ) if new_config == boot_config: logging.warning("no fkms overlay or camera firmware line found " "in %s", boot_config_filename) return self._replace_pi_boot_config( boot_config, new_config, boot_config_filename, failure_action) def _add_kms_overlay(self, boot_dir='/boot/firmware'): failure_action = ( "You may need to add dtoverlay=vc4-kms-v3d to an [all] section " "in config.txt on your boot partition") added_lines = [ '# added by do-release-upgrade (LP: #2065051)', 'dtoverlay=vc4-kms-v3d', 'disable_fw_kms_setup=1', '', '[pi3+]', 'dtoverlay=vc4-kms-v3d,cma-128', '', '[pi02]', 'dtoverlay=vc4-kms-v3d,cma-128', '', '[all]', ] try: boot_config_filename = os.path.join(boot_dir, 'config.txt') with open(boot_config_filename, 'r', encoding='utf-8') as f: boot_config = f.read() except FileNotFoundError: logging.error("failed to open boot configuration in %s; %s", boot_config_filename, failure_action) return def find_insertion_point(lines): # Returns the zero-based index of the dtoverlay=vc4-kms-v3d line in # an [all] section, if one exists, or the last line of the last # [all] section of the file, if one does not exist in_all = True last = 0 for index, line in enumerate(lines): line = line.rstrip() if in_all: last = index # startswith used to cope with any trailing dtparams if line.startswith('dtoverlay=vc4-kms-v3d'): return last elif line.startswith('[') and line.endswith(']'): in_all = line == '[all]' elif line.startswith('include '): # [sections] are included from includes verbatim, hence # (without reading the included file) we must assume # we're no longer in an [all] section in_all = False else: in_all = line == '[all]' return last def add_kms_overlay(lines): insert_point = find_insertion_point(lines) try: if lines[insert_point].startswith('dtoverlay=vc4-kms-v3d'): return lines except IndexError: # Empty config, apparently! pass lines[insert_point:insert_point] = added_lines return lines lines = [line.rstrip() for line in boot_config.splitlines()] lines = add_kms_overlay(lines) new_config = ''.join(line + '\n' for line in lines) if new_config == boot_config: logging.warning("no addition of KMS overlay required in %s", boot_config_filename) return self._replace_pi_boot_config( boot_config, new_config, boot_config_filename, failure_action) def _set_generic_font(self): """ Due to changes to the Ubuntu font we enable a generic font (in practice DejaVu or Noto) during the upgrade. See https://launchpad.net/bugs/2034986 """ temp_font = 'Sans' if self._did_change_font: return if os.getenv('XDG_SESSION_TYPE', '') in ('', 'tty'): # Avoid running this on server systems or when the upgrade # is done over ssh. return if not {'SUDO_UID', 'PKEXEC_UID'}.intersection(os.environ): logging.debug( 'Cannot determine how root privileges were gained, will ' 'not change font' ) return try: uid = int(os.getenv('SUDO_UID', os.getenv('PKEXEC_UID'))) pwuid = pwd.getpwuid(uid) except ValueError: logging.debug( 'Cannot determine non-root UID, will not change font' ) return schema = 'org.gnome.desktop.interface' desktops = os.getenv('XDG_CURRENT_DESKTOP', '').split(':') # Some flavors use other schemas for the desktop font. if 'MATE' in desktops or 'UKUI' in desktops: schema = 'org.mate.interface' elif 'X-Cinnamon' in desktops: schema = 'org.cinnamon.desktop.interface' # Some flavors lack the systemd integration needed for a # user service, so we create an autostart file instead. use_autostart = bool( set(['Budgie', 'LXQt', 'MATE', 'UKUI', 'X-Cinnamon', 'XFCE']) & set(desktops) ) r = subprocess.run( ['systemd-run', '--user', '-M', f'{pwuid.pw_name}@.host', '--wait', '--pipe', '-q', '--', '/usr/bin/gsettings', 'get', f'{schema}', 'font-name'], stdout=subprocess.PIPE, encoding='utf-8', ) (font, _, size) = r.stdout.strip('\'\n').rpartition(' ') font = font or 'Ubuntu' try: int(size) except ValueError: size = '11' logging.debug(f'Setting generic font {temp_font} {size} during the ' f'upgrade. Original font is {font} {size}.') r = subprocess.run([ 'systemd-run', '--user', '-M', f'{pwuid.pw_name}@.host', '--wait', '--pipe', '-q', '--', '/usr/bin/gsettings', 'set', f'{schema}', 'font-name', f'"{temp_font} {size}"' ]) if r.returncode != 0: logging.debug(f'Failed to change font to {temp_font} {size}') return self._did_change_font = True # Touch a file to indiate that the font should be restored on the next # boot. need_font_restore_file = os.path.join( pwuid.pw_dir, '.config/upgrade-need-font-restore' ) os.makedirs(os.path.dirname(need_font_restore_file), exist_ok=True) pathlib.Path(need_font_restore_file).touch(mode=0o666) os.chown(need_font_restore_file, pwuid.pw_uid, pwuid.pw_gid) if use_autostart: autostart_file = '/etc/xdg/autostart/upgrade-restore-font.desktop' os.makedirs(os.path.dirname(autostart_file), exist_ok=True) flag = '$HOME/.config/upgrade-need-font-restore' with open(autostart_file, 'w') as f: f.write( '[Desktop Entry]\n' 'Name=Restore font after upgrade\n' 'Comment=Auto-generated by ubuntu-release-upgrader\n' 'Type=Application\n' f'Exec=sh -c \'if [ -e "{flag}" ]; then gsettings set ' f'{schema} font-name "{font} {size}";' f'rm -f "{flag}"; fi\'\n' 'NoDisplay=true\n' ) return # If we set the font back to normal before a reboot, the font will # still get all messed up. To allow normal usage whether the user # reboots immediately or not, create a service that will run only if a # ~/.config/upgrade-need-font-restore exists, and then remove that file # in ExecStart. This has the effect of creating a one-time service on # the next boot. unit_file = '/usr/lib/systemd/user/upgrade-restore-font.service' os.makedirs(os.path.dirname(unit_file), exist_ok=True) with open(unit_file, 'w') as f: f.write( '# Auto-generated by ubuntu-release-upgrader\n' '[Unit]\n' 'Description=Restore font after upgrade\n' 'After=graphical-session.target dconf.service\n' 'ConditionPathExists=%h/.config/upgrade-need-font-restore\n' '\n' '[Service]\n' 'Type=oneshot\n' 'ExecStart=/usr/bin/gsettings set ' f'{schema} font-name \'{font} {size}\'\n' 'ExecStart=/usr/bin/rm -f ' '%h/.config/upgrade-need-font-restore\n' '\n' '[Install]\n' 'WantedBy=graphical-session.target\n' ) subprocess.run([ 'systemctl', '--user', '-M', f'{pwuid.pw_name}@.host', 'daemon-reload' ]) r = subprocess.run([ 'systemctl', '--user', '-M', f'{pwuid.pw_name}@.host', 'enable', os.path.basename(unit_file) ]) if r.returncode != 0: logging.debug(f'Failed to enable {os.path.basename(unit_file)}. ' 'Font will not be restored on reboot') def _maybe_remove_gpg_wks_server(self): """ Prevent postfix from being unnecessarily installed, and leading to a debconf prompt (LP: #2060578). """ # We want to use attributes of the cache that are only exposed in # apt_pkg.Cache, not the higher level apt.Cache. Hence we operate on # apt.Cache._cache. cache = self.controller.cache._cache try: if not cache['gpg-wks-server'].current_ver: # Not installed, nothing to do. return provides_mta = cache['mail-transport-agent'].provides_list installed_mta = [ ver for _, _, ver in provides_mta if ver.parent_pkg.current_ver ] except KeyError: # Something wasn't in the cache, ignore. return if not any(installed_mta): logging.info( 'No mail-transport-agent installed, ' 'marking gpg-wks-server for removal' ) self.controller.cache['gpg-wks-server'].mark_delete(auto_fix=False) apt.ProblemResolver(self.controller.cache).protect( self.controller.cache['gpg-wks-server'] ) def _test_and_fail_on_tpm_fde(self): """ LP: #2065229 """ try: snap_list = subprocess.check_output(['snap', 'list']) snaps = [s.decode().split()[0] for s in snap_list.splitlines()] except FileNotFoundError: # snapd not installed? return if ( 'pc-kernel' in snaps and 'ubuntu-desktop-minimal' in self.controller.cache and self.controller.cache['ubuntu-desktop-minimal'].is_installed ): logging.debug('Detected TPM FDE system') di = distro_info.UbuntuDistroInfo() version = di.version(self.controller.toDist) or 'next release' self._view.error( _( f'Sorry, cannot upgrade this system to {version}' ), _( 'Upgrades for desktop systems running TPM FDE are not ' 'currently supported. ' 'Please see https://launchpad.net/bugs/2065229 ' 'for more information.' ), ) self.controller.abort() def _disable_kdump_tools_on_install(self, cache): """Disable kdump-tools if installed during upgrade.""" if 'kdump-tools' not in cache: # Not installed or requested, nothing to do. return pkg = cache['kdump-tools'] if pkg.is_installed: logging.info("kdump-tools already installed. Not disabling.") return elif pkg.marked_install: logging.info("installing kdump-tools due to upgrade. Disabling.") proc = subprocess.run( ( 'echo "kdump-tools kdump-tools/use_kdump boolean false"' ' | debconf-set-selections' ), shell=True, ) ret_code = proc.returncode if ret_code != 0: logging.debug( ( "kdump-tools debconf-set-selections " f"returned: {ret_code}" ) )