Skip to content

Commit

Permalink
checkinstalledkernels: detect kernel-rt and improve comparisons (deps
Browse files Browse the repository at this point in the history
changed)

RHEL Real Time (RT) systems boot using kernel-rt rpm. But still,
even a kernel rpm can be installed. So now, detect whether we are
booted via kernel-rt or not and do a comparison only in regarding
the expected package name.

Unfortunately, this make comparisons between releases more difficult
as we need to check the whole release, including alphanumeric
characters in release, e.g.:
        kernel-rt-3.10.0-1160.2.1.rt56.1133.el7.x86_64

Previously we worked just with numbers. To make the check valid and
more robust, let's use rpm.labelCompare(). We are sure this library
is on every RHEL 7 system as dnf requires it (and probably yum too).
Let's use it as it resolves all our current troubles right now and
makes whole comparison more reliable.

In this implementation I am skipping comparison with epoch as
there is no (and will not be) kernel with bumped epoch.

Add 'rpm' into requirements
  • Loading branch information
pirat89 committed Oct 19, 2020
1 parent 0c30120 commit f17a36e
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 146 deletions.
Original file line number Diff line number Diff line change
@@ -1,77 +1,115 @@
try:
from rpm import labelCompare
except ImportError:
# this can happen only on non-rpm based systems or with Python3 on RHEL 7
# based OSs, as the rpm python module is available here just for Python2
# - and vice versa on F31+
# This will not happen in real run, just in case of unit-tests..
def labelCompare(*args):
raise NotImplementedError(
"The labelCompare function is not implemented for the python"
" you are using."
)

from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries.common.config import architecture
from leapp.libraries.stdlib import api
from leapp.models import InstalledRedHatSignedRPM


def _normalize_version(version):
if len(version) != 3:
if len(version) > 3:
api.current_logger().debug('Version {} has more than three components, trimming'.format(version))
del version[3:]
elif len(version) < 3:
api.current_logger().debug('Version {} has less than three components, padding'.format(version))
while len(version) < 3:
version.append(0)
api.current_logger().debug('Normalised version to {}'.format(version))
return version # could be omitted but it's useful for nesting calls
def get_current_kernel_version():
"""
Get the version of the running kernel as a string.
"""
return api.current_actor().configuration.kernel.split('-')[0]


def get_current_kernel_version():
def get_current_kernel_release():
"""
Get the version of the running kernel as a tuple of three integers.
Get the release of the current kernel as a string.
"""
kernel = api.current_actor().configuration.kernel
return tuple(_normalize_version(list(
map(int, kernel.split('-')[0].split('.'))
)))
return api.current_actor().configuration.kernel.split('-')[1]


def get_kernel_rpm_version(rpm):
def get_current_kernel_evr():
"""
Get the version of a kernel RPM as a tuple of three integers.
Get a 3-tuple (EVR) of the current booted kernel.
:param rpm: An instance of an RPM derived model.
Epoch in this case is always empty string. In case of kernel, epoch is
never expected to be set.
"""
return tuple(_normalize_version(list(
map(int, rpm.version.split('.'))
)))
return ('', get_current_kernel_version(), get_current_kernel_release())


def get_current_kernel_release():
def get_pkgs(pkg_name):
"""
Get all installed packages of the given name signed by Red Hat.
"""
rpms = next(api.consume(InstalledRedHatSignedRPM), InstalledRedHatSignedRPM()).items
return [pkg for pkg in rpms if pkg.name == pkg_name]


def get_EVR(pkg):
"""
Get the release of the running kernel as an integer.
Return 3-tuple EVR (_epoch_, version, release) of the given RPM.
Epoch is always set as an empty string as in case of kernel epoch is not
expected to be set - ever.
The release includes an architecture as well.
"""
kernel = api.current_actor().configuration.kernel
return int(kernel.split('-')[1].split('.')[0])
return ('', pkg.version, '{}.{}'.format(pkg.release, pkg.arch))


def get_kernel_rpm_release(rpm):
def _get_pkgs_evr(pkgs):
"""
Return 3-tuples (EVR) of the given packages.
"""
Get the release of a kernel RPM as an integer.
return [get_EVR(pkg) for pkg in pkgs]

:param rpm: An instance of an RPM derived model.

def get_newest_evr(pkgs):
"""
return int(rpm.release.split('.')[0])
Return the 3-tuple (EVR) of the newest package from the given list.
Return None if the given list is empty. It's expected that all given
packages have same name.
"""
if not pkgs:
return None
rpms_evr = _get_pkgs_evr(pkgs)

def get_kernel_rpms():
newest_evr = rpms_evr.pop()
for pkg in rpms_evr:
if labelCompare(newest_evr, pkg) < 0:
newest_evr = pkg
return newest_evr


def is_kernel_rt_booted():
"""
Get all installed kernel packages ordered first by version, then release number (ascending).
Return True if kernel-rt is booted.
We need to check what kernel is actually booted. In case or RHEL Real Time
systems, the booted kernel should be represented by the kernel-rt rpm.
The kernel-rt rpm contains always '.rt' substring in the release.
"""
rpms = next(api.consume(InstalledRedHatSignedRPM), InstalledRedHatSignedRPM())
return sorted([pkg for pkg in rpms.items if pkg.name == 'kernel'],
key=lambda k: (get_kernel_rpm_version(k), get_kernel_rpm_release(k)))
return '.rt' in get_current_kernel_release()


def process():
pkgs = get_kernel_rpms()
kernel_name = 'kernel'
if is_kernel_rt_booted():
api.current_logger().info('The Real Time kernel boot detected.')
kernel_name = 'kernel-rt'

pkgs = get_pkgs(kernel_name)
if not pkgs:
# Hypothatical, user is not allowed to install any kernel that is not signed by RH
# In case we would like to be cautious, we could check whether there are no other
# kernels installed as well.
api.current_logger().log.error('Cannot find any installed kernel signed by Red Hat.')
api.current_logger().error('Cannot find any installed kernel signed by Red Hat.')
raise StopActorExecutionError('Cannot find any installed kernel signed by Red Hat.')

if len(pkgs) > 1 and architecture.matches_architecture(architecture.ARCH_S390X):
Expand All @@ -92,15 +130,13 @@ def process():
reporting.RelatedResource('package', 'kernel')
])

newest = pkgs[-1]
newest_release = get_kernel_rpm_release(newest)
newest_version = get_kernel_rpm_version(newest)
current_release = get_current_kernel_release()
current_version = get_current_kernel_version()
api.current_logger().debug('Current kernel: V {}, R {}'.format(current_version, current_release))
api.current_logger().debug('Newest kernel: V {}, R {}'.format(newest_version, newest_release))
current_evr = get_current_kernel_evr()
newest_evr = get_newest_evr(pkgs)

api.current_logger().debug('Current kernel EVR: {}'.format(current_evr))
api.current_logger().debug('Newest kernel EVR: {}'.format(newest_evr))

if newest_release != current_release or newest_version != current_version:
if current_evr != newest_evr:
title = 'Newest installed kernel not in use'
summary = ('To ensure a stable upgrade, the machine needs to be'
' booted into the latest installed kernel.')
Expand Down
Loading

0 comments on commit f17a36e

Please sign in to comment.