Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f86971a
Kill dangling subprocesses
ivanpauno Jul 25, 2022
d268600
Address peer review comments
ivanpauno Sep 28, 2022
b5a925f
Fix docstring
ivanpauno Oct 6, 2022
e2b1a3c
Remove leftover comments
ivanpauno Oct 6, 2022
8466b49
Undo unneeded change
ivanpauno Oct 6, 2022
2356223
Cleanup subprocesses before respawning
ivanpauno Oct 6, 2022
ac6afd3
please flake8
ivanpauno Oct 6, 2022
7ffee9a
Fix issue detected in tests
ivanpauno Oct 6, 2022
6432d76
Improve error handling
ivanpauno Oct 13, 2022
6284c82
Address peer review comments
ivanpauno Dec 1, 2022
514c149
Remove unnecessary argument in __get_shutdown_timer_actions()
ivanpauno Dec 1, 2022
5b43cdc
Add test case
ivanpauno Dec 5, 2022
4b796cc
Fix flake8 failures
ivanpauno Dec 14, 2022
58cd661
More flake8 failures
ivanpauno Dec 14, 2022
5ff2f05
Seems to fix CI issues...
ivanpauno Dec 20, 2022
b5ba458
Second try
ivanpauno Dec 20, 2022
fc97730
another try
ivanpauno Dec 20, 2022
caf75e3
Merge remote-tracking branch 'origin/rolling' into ivanpauno/kill-dan…
ahcorde Nov 28, 2025
5636416
Merge commit '082e64087b97decf8e66f77f96b0fe10c195e354' into ivanpaun…
anton-matosov Dec 25, 2025
989f6d4
missing casts
anton-matosov Dec 26, 2025
b25cb57
format
anton-matosov Dec 26, 2025
bdd7410
move cast to processing of substitution
anton-matosov Dec 26, 2025
72e28a2
fix transport missing type
anton-matosov Dec 26, 2025
0dc7b09
fix all mypy issues
anton-matosov Dec 26, 2025
4b232db
Merge branch 'rolling' of github.com:ros2/launch into ivanpauno/kill-…
anton-matosov Feb 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 107 additions & 27 deletions launch/launch/actions/execute_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from osrf_pycommon.process_utils import async_execute_process # type: ignore
from osrf_pycommon.process_utils import AsyncSubprocessProtocol

import psutil

from .emit_event import EmitEvent
from .opaque_function import OpaqueFunction
from .timer_action import TimerAction
Expand Down Expand Up @@ -64,9 +66,8 @@
from ..launch_description_entity import LaunchDescriptionEntity
from ..some_entities_type import SomeEntitiesType
from ..some_substitutions_type import SomeSubstitutionsType
from ..substitution import Substitution # noqa: F401
from ..substitution import Substitution
from ..substitutions import LaunchConfiguration
from ..substitutions import PythonExpression
from ..utilities import is_a_subclass
from ..utilities import normalize_to_list_of_substitutions
from ..utilities import perform_substitutions
Expand All @@ -86,6 +87,8 @@ def __init__(
'sigterm_timeout', default=5),
sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration(
'sigkill_timeout', default=5),
signal_lingering_subprocesses: SomeSubstitutionsType = LaunchConfiguration(
'signal_lingering_subprocesses', default=True),
emulate_tty: bool = False,
output: SomeSubstitutionsType = 'log',
output_format: Text = '[{this.process_description.final_name}] {line}',
Expand Down Expand Up @@ -158,6 +161,11 @@ def __init__(
as a string or a list of strings and Substitutions to be resolved
at runtime, defaults to the LaunchConfiguration called
'sigkill_timeout'
:param: signal_lingering_subprocesses if `True`, all subprocesses spawned by the process
will be signaled to make sure they finish.
The sequence of signals used is the same SIGINT/SIGTERM/SIGKILL sequence
used to kill the main process.
Subprocesses start being signaled when the main process completes.
:param: emulate_tty emulate a tty (terminal), defaults to False, but can
be overridden with the LaunchConfiguration called 'emulate_tty',
the value of which is evaluated as true or false according to
Expand Down Expand Up @@ -190,6 +198,8 @@ def __init__(
self.__shell = shell
self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout)
self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout)
self.__signal_lingering_subprocesses = normalize_to_list_of_substitutions(
signal_lingering_subprocesses)
self.__emulate_tty = emulate_tty
# Note: we need to use a temporary here so that we don't assign values with different types
# to the same variable
Expand All @@ -214,11 +224,12 @@ def __init__(

self.__process_event_args = None # type: Optional[Dict[Text, Any]]
self._subprocess_protocol = None # type: Optional[Any]
self._subprocess_transport = None
self._subprocess_transport = None # type: Optional[Any]
self.__completed_future = None # type: Optional[asyncio.Future[None]]
self.__shutdown_future = None # type: Optional[asyncio.Future[None]]
self.__sigterm_timer = None # type: Optional[TimerAction]
self.__sigkill_timer = None # type: Optional[TimerAction]
self.__children: List[psutil.Process] = []
self.__stdout_buffer = io.StringIO()
self.__stderr_buffer = io.StringIO()

Expand Down Expand Up @@ -292,7 +303,11 @@ def _shutdown_process(self, context: LaunchContext, *, send_sigint: bool
self.__shutdown_future.set_result(None)

# Otherwise process is still running, start the shutdown procedures.
context.extend_locals({'process_name': self.process_details['name']})
context.extend_locals(
{
'process_name': self.process_details['name'],
'process_pid': self.process_details['pid'],
})
actions_to_return = self.__get_shutdown_timer_actions()
if send_sigint:
actions_to_return.append(self.__get_sigint_event())
Expand Down Expand Up @@ -320,7 +335,7 @@ def __on_signal_process_event(
raise RuntimeError('Signal event received before execution.')
if self._subprocess_transport is None:
raise RuntimeError('Signal event received before subprocess transport available.')
if self._subprocess_protocol.complete.done():
if self._subprocess_protocol and self._subprocess_protocol.complete.done():
# the process is done or is cleaning up, no need to signal
self.__logger.debug(
"signal '{}' not set to '{}' because it is already closing".format(
Expand All @@ -344,13 +359,13 @@ def __on_signal_process_event(
self._subprocess_transport.kill() # works on both Windows and POSIX
return None
self._subprocess_transport.send_signal(typed_event.signal)
return None
except ProcessLookupError:
self.__logger.debug(
"signal '{}' not sent to '{}' because it has closed already".format(
typed_event.signal_name, self.process_details['name']
)
)
return None

def __on_process_stdin(
self,
Expand Down Expand Up @@ -453,46 +468,52 @@ def __get_shutdown_timer_actions(self) -> List[Action]:
base_msg = \
"process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'"

def printer(context, msg, timeout_substitutions):
self.__logger.error(msg.format(
context.locals.process_name,
perform_substitutions(context, timeout_substitutions),
))
def printer(context, msg):
self.__logger.error(msg.format(context.locals.process_name))

sigterm_timeout = self.__sigterm_timeout
sigkill_timeout = [PythonExpression(
('float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, ')')
)]
# Setup a timer to send us a SIGTERM if we don't shutdown quickly.
sigterm_timeout = self.__sigterm_timeout_value
self.__sigterm_timer = TimerAction(
period=sigterm_timeout,
actions=[
OpaqueFunction(
function=printer,
args=(base_msg.format('{}', '{}', 'SIGINT', 'SIGTERM'), sigterm_timeout)
args=(base_msg.format('{}', sigterm_timeout, 'SIGINT', 'SIGTERM'),),
),
EmitEvent(
event=SignalProcess(
signal_number=signal.SIGTERM, process_matcher=matches_action(self)
)
),
EmitEvent(event=SignalProcess(
signal_number=signal.SIGTERM,
process_matcher=matches_action(self)
)),
],
cancel_on_shutdown=False,
)
sigkill_timeout = sigterm_timeout + self.__sigkill_timeout_value
# Setup a timer to send us a SIGKILL if we don't shutdown after SIGTERM.
self.__sigkill_timer = TimerAction(
period=sigkill_timeout,
actions=[
OpaqueFunction(
function=printer,
args=(base_msg.format('{}', '{}', 'SIGTERM', 'SIGKILL'), sigkill_timeout)
args=(base_msg.format('{}', sigkill_timeout, 'SIGTERM', 'SIGKILL'),),
),
EmitEvent(
event=SignalProcess(
signal_number='SIGKILL', process_matcher=matches_action(self)
)
),
EmitEvent(event=SignalProcess(
signal_number='SIGKILL',
process_matcher=matches_action(self)
))
],
cancel_on_shutdown=False,
)
self.__children = []
pid = None
if self._subprocess_transport is not None:
pid = self._subprocess_transport.get_pid()
if pid is not None:
try:
self.__children = psutil.Process(pid).children(recursive=True)
except psutil.NoSuchProcess:
pass
return [
cast(Action, self.__sigterm_timer),
cast(Action, self.__sigkill_timer),
Expand All @@ -504,12 +525,15 @@ def __get_sigint_event(self) -> EmitEvent:
process_matcher=matches_action(self),
))

def __cleanup(self) -> None:
# Cancel any pending timers we started.
def __cleanup_timers(self):
if self.__sigterm_timer is not None:
self.__sigterm_timer.cancel()
if self.__sigkill_timer is not None:
self.__sigkill_timer.cancel()

def __cleanup(self):
# Cancel any pending timers we started.
self.__cleanup_timers()
# Close subprocess transport if any.
if self._subprocess_transport is not None:
self._subprocess_transport.close()
Expand Down Expand Up @@ -543,6 +567,48 @@ def on_stdout_received(self, data: bytes) -> None:
def on_stderr_received(self, data: bytes) -> None:
self.__context.emit_event_sync(ProcessStderr(text=data, **self.__process_event_args))

async def _signal_subprocesses(self, context):
to_signal = self.__children
signaled = []
sig = signal.SIGINT
start_time = context.asyncio_loop.time()
sigterm_timeout = self.__sigterm_timeout_value
sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value
process_pid = self.process_details['pid']
process_name = self.process_details['name']
log_prefix_format = (
'subprocess[pid={}] of process['
f'{process_name}, pid={process_pid}]: ')
next_signals = iter(((signal.SIGTERM, sigterm_timeout), (signal.SIGKILL, sigkill_timeout)))
while True:
for p in to_signal:
try:
p.send_signal(sig)
except psutil.NoSuchProcess:
continue
log_prefix = log_prefix_format.format(p.pid)
self.__logger.info(
f'{log_prefix}sending {sig.name} to subprocess directly.'
)
signaled.append(p)
try:
sig, timeout = next(next_signals)
except StopIteration:
return
current_time = context.asyncio_loop.time()
while current_time < start_time + timeout:
await asyncio.sleep(min(0.5, start_time + timeout - current_time))
for p in list(signaled):
if not p.is_running():
log_prefix = log_prefix_format.format(p.pid)
self.__logger.info(f'{log_prefix}exited')
signaled.remove(p)
if not signaled:
return
current_time = context.asyncio_loop.time()
to_signal = signaled
signaled = []

async def __execute_process(self, context: LaunchContext) -> None:
process_event_args = self.__process_event_args
if process_event_args is None:
Expand Down Expand Up @@ -619,8 +685,13 @@ async def __execute_process(self, context: LaunchContext) -> None:
timeout=self.__respawn_delay
)
if not self.__shutdown_future.done():
if self.__signal_lingering_subprocesses_value:
await self._signal_subprocesses(context)
context.asyncio_loop.create_task(self.__execute_process(context))
return
self.__cleanup_timers()
if self.__signal_lingering_subprocesses_value:
await self._signal_subprocesses(context)
self.__cleanup()

def prepare(self, context: LaunchContext) -> None:
Expand Down Expand Up @@ -703,6 +774,15 @@ def execute(self, context: LaunchContext) -> None:
]
for event_handler in event_handlers:
context.register_event_handler(event_handler)
self.__sigterm_timeout_value = cast(
float, perform_typed_substitution(context, self.__sigterm_timeout, float)
)
self.__sigkill_timeout_value = cast(
float, perform_typed_substitution(context, self.__sigkill_timeout, float)
)
self.__signal_lingering_subprocesses_value = perform_typed_substitution(
context, self.__signal_lingering_subprocesses, bool
)

try:
self.__completed_future = context.asyncio_loop.create_future()
Expand Down
1 change: 1 addition & 0 deletions launch/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<exec_depend>ament_index_python</exec_depend>
<exec_depend>python3-lark-parser</exec_depend>
<exec_depend>python3-osrf-pycommon</exec_depend>
<exec_depend>python3-psutil</exec_depend>
<exec_depend>python3-yaml</exec_depend>
<exec_depend>python3-typing-extensions</exec_depend>

Expand Down
41 changes: 41 additions & 0 deletions launch/test/launch/test_execute_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

"""Tests for the ExecuteLocal Action."""

import asyncio
import os
import signal
import sys
import time

from launch import LaunchDescription
from launch import LaunchService
Expand All @@ -28,6 +31,8 @@
from launch.actions import TimerAction
from launch.descriptions import Executable

import psutil

import pytest


Expand Down Expand Up @@ -178,3 +183,39 @@ def test_execute_process_with_output_dictionary():
ls = LaunchService()
ls.include_launch_description(ld)
assert 0 == ls.run()


PYTHON_SCRIPT = """\
import time

while 1:
time.sleep(0.5)
"""


def test_kill_subprocesses():
"""Test launching a process with an environment variable."""
executable = ExecuteLocal(
process_description=Executable(
cmd=['python3', '-c', f'"{PYTHON_SCRIPT}"'],
),
shell=True,
output='screen',
)
ld = LaunchDescription([executable])
ls = LaunchService()
ls.include_launch_description(ld)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
run_async_task = loop.create_task(ls.run_async())

async def wait_for_subprocesses():
start = time.time()
while len(psutil.Process().children(recursive=True)) != 2:
await asyncio.sleep(0.5)
assert time.time() < start + 5., 'timed out waiting for processes to setup'
wait_for_subprocesses_task = loop.create_task(wait_for_subprocesses())
loop.run_until_complete(wait_for_subprocesses_task)
os.kill(executable.process_details['pid'], signal.SIGTERM)
loop.run_until_complete(run_async_task)
assert len(psutil.Process().children(recursive=True)) == 0