diff --git a/src/check_systemd_units.py b/src/check_systemd_units.py index 3295613..b08991f 100755 --- a/src/check_systemd_units.py +++ b/src/check_systemd_units.py @@ -33,11 +33,17 @@ import collections import datetime import logging +import re import subprocess import sys import time import typing +# Compiled regex for transient user/session units +TRANSIENT_UNIT_RE = re.compile( + r'^(session-.*\.scope|user[@-].*\.(service|slice))$' +) + CheckResult = collections.namedtuple('CheckResult', ['code', 'msg']) logging.basicConfig( @@ -61,7 +67,7 @@ def parse_args(): action='append', default=[], help='Units to return critical when failed. Checking a timer unit' - ' will implicitly check the related service unit as well', + ' will implicitly check the related service unit as well', ) parser.add_argument( '-i', @@ -73,14 +79,14 @@ def parse_args(): parser.add_argument( '-w', '--timer-warn', - default=3., + default=3.0, type=float, help='Warning threshold of timer (inactivity/min_monotonic_interval)', ) parser.add_argument( '-c', '--timer-crit', - default=7., + default=7.0, type=float, help='Critical threshold of timer (inactivity/min_monotonic_interval)', ) @@ -118,8 +124,8 @@ def run_checks(args: argparse.Namespace) -> CheckResult: units = parse_units(raw_units) results = process(args, units) - logger.info('Criticals are: {}'.format(results[2])) - logger.info('Warnings are: {}'.format(results[1])) + logger.info(f'Criticals are: {results[2]}') + logger.info(f'Warnings are: {results[1]}') return gen_output(results) @@ -147,9 +153,7 @@ def process( if res_code == Codes.OK: continue - logger.info('Problem for {} is: {} - {}'.format( - unit_id, res_code, res.msg, - )) + logger.info(f'Problem for {unit_id} is: {res_code} - {res.msg}') results[res_code].append((unit_id, res.msg)) return results @@ -175,7 +179,7 @@ def gen_output( else: problems[problem] = unit - logger.info('Problems are: {}'.format(problems)) + logger.info(f'Problems are: {problems}') message += '; '.join([': '.join(i) for i in problems.items()]) return exit_code, message @@ -282,21 +286,30 @@ def check_unit(self, unit_id: str, timer: bool = False) -> CheckResult: ) ) - # Fast state checks first + # Ignore transient user/session units that are managed by logind. + # These come and go with user logins and are arguably harmless: + # * session-*.scope + # * user@.service + # * user-.slice + if TRANSIENT_UNIT_RE.match(unit_id): + logger.debug(f'Ignoring transient user/session unit {unit_id}') + return CheckResult(Codes.OK, '') + + # Fast state checks if unit['LoadState'] != 'loaded' and unit['ActiveState'] != 'inactive': return CheckResult( Codes.CRITICAL, 'the unit is not loaded but not inactive', ) - elif unit['ActiveState'] == 'failed': + if unit['ActiveState'] == 'failed': return CheckResult(Codes.CRITICAL, 'the unit is failed') - elif unit['LoadState'] != 'loaded': + if unit['LoadState'] != 'loaded': return CheckResult(Codes.WARNING, 'the unit is not loaded') # Check the specifics about the different unit types if unit_id.endswith('.timer'): return self.check_timer(unit_id) - elif unit_id.endswith('.service'): + if unit_id.endswith('.service'): return self.check_service(unit_id, timer) return CheckResult(Codes.OK, '') @@ -397,10 +410,11 @@ def check_intervals(self, unit_id: str) -> CheckResult: # We can check only monotonic triggers for regular execution checked_intervals = ['OnUnitActiveUSec', 'OnUnitInactiveUSec'] intervals = [ - (p[0], p[1]) for p in unit['TimersMonotonic'] + (p[0], p[1]) + for p in unit['TimersMonotonic'] if p[0] in checked_intervals ] - logger.debug('Monotonic timers are: {}'.format(intervals)) + logger.debug(f'Monotonic timers are: {intervals}') if not intervals: return CheckResult(Codes.OK, '') @@ -462,7 +476,7 @@ def _check_interval( return CheckResult( code, f"the timer hasn't been launched since {last_trigger_human}, " - f"look at {service_unit['Id']}" + f'look at {service_unit["Id"]}', ) return CheckResult(Codes.OK, '')