diff --git a/DESCRIPTION b/DESCRIPTION index ef2817ba..96a21b23 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -20,7 +20,7 @@ URL: https://processx.r-lib.org, https://github.com/r-lib/processx#readme BugReports: https://github.com/r-lib/processx/issues Depends: R (>= 3.4.0) Imports: - ps (>= 1.2.0), + ps (>= 1.7.5.9001), R6, utils Suggests: @@ -36,6 +36,8 @@ Suggests: testthat (>= 3.0.0), webfakes, withr +Remotes: + r-lib/ps@kill-tree-grace Encoding: UTF-8 RoxygenNote: 7.2.3 Roxygen: list(markdown = TRUE) diff --git a/NEWS.md b/NEWS.md index 11c04182..325564ac 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,10 @@ # processx (development version) +* Processes are now killed in parallel with a period of grace on + garbage collection (if `cleanup_tree` is `TRUE`) and on session + quit. The delay can be controlled with the new `cleanup_grace` + argument. + * The `grace` argument of the `kill()` method is now active on Unix platforms. processx first tries to kill with `SIGTERM` with a timeout of `grace` seconds. After the timeout, `SIGKILL` is sent as diff --git a/R/finalize.R b/R/finalize.R new file mode 100644 index 00000000..a13ea33e --- /dev/null +++ b/R/finalize.R @@ -0,0 +1,44 @@ +process_finalize <- function(private) { + ps <- process_cleanup_list(private) + ps::ps_kill_parallel(ps, private$cleanup_grace) +} + +process_cleanup_list <- function(private) { + ps <- list() + + if (private$cleanup) { + # Can't be created in advance because the ps finalizer might run first + handle <- ps::ps_handle(private$pid, as.POSIXct(private$starttime)) + ps <- c(ps, list(handle)) + } + + if (private$cleanup_tree) { + ps <- c(ps, ps::ps_find_tree(private$tree_id)) + } + + ps +} + +session_finalize <- function(node) { + ps <- list() + grace <- 0 + + while (!node_is_root(node)) { + private <- wref_key(node_value(node)) + ps <- c(ps, process_cleanup_list(private)) + + if (!is.null(private$cleanup_grace)) { + grace <- max(grace, private$cleanup_grace) + } + + node <- node_next(node) + } + + ps::ps_kill_parallel(ps, grace) +} + +wref_key <- function(x) .Call(c_processx__wref_key, x) +node_is_root <- function(x) is.null(node_next(x)) +node_prev <- function(x) x[[1]] +node_next <- function(x) x[[2]] +node_value <- function(x) x[[3]] diff --git a/R/initialize.R b/R/initialize.R index 673c97c3..f30c744c 100644 --- a/R/initialize.R +++ b/R/initialize.R @@ -141,7 +141,7 @@ process_initialize <- function(self, private, command, args, c_processx_exec, command, c(command, args), pty, pty_options, connections, env, windows_verbatim_args, windows_hide_window, - windows_detached_process, private, cleanup, cleanup_grace, wd, encoding, + windows_detached_process, private, cleanup, wd, encoding, paste0("PROCESSX_", private$tree_id, "=YES") ) @@ -155,6 +155,9 @@ process_initialize <- function(self, private, command, args, chain_call(c_processx__proc_start_time, private$status) if (private$starttime == 0) private$starttime <- Sys.time() + # Needed for cleaning up + private$pid <- self$get_pid() + ## Need to close this, otherwise the child's end of the pipe ## will not be closed when the child exits, and then we cannot ## poll it. diff --git a/R/process.R b/R/process.R index 94382ab6..efc1d1e5 100644 --- a/R/process.R +++ b/R/process.R @@ -233,11 +233,6 @@ process <- R6::R6Class( #' collected. If requested so in the process constructor, then it #' eliminates all processes in the process's subprocess tree. - finalize = function() { - if (!is.null(private$tree_id) && private$cleanup_tree && - ps::ps_is_supported()) self$kill_tree(grace = private$cleanup_grace) - }, - #' @description #' Terminate the process. It also terminate all of its child #' processes, except if they have created a new process group (on Unix), @@ -671,6 +666,7 @@ process <- R6::R6Class( windows_hide_window = NULL, status = NULL, # C file handle + pid = NULL, # pid for cleanup supervised = FALSE, # Whether process is tracked by supervisor diff --git a/man/process.Rd b/man/process.Rd index db35b11e..fd8d748e 100644 --- a/man/process.Rd +++ b/man/process.Rd @@ -91,7 +91,6 @@ p$is_alive() \subsection{Public methods}{ \itemize{ \item \href{#method-process-new}{\code{process$new()}} -\item \href{#method-process-finalize}{\code{process$finalize()}} \item \href{#method-process-kill}{\code{process$kill()}} \item \href{#method-process-kill_tree}{\code{process$kill_tree()}} \item \href{#method-process-signal}{\code{process$signal()}} @@ -320,21 +319,14 @@ R6 object representing the process. } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-process-finalize}{}}} -\subsection{Method \code{finalize()}}{ +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-process-kill}{}}} +\subsection{Method \code{kill()}}{ Cleanup method that is called when the \code{process} object is garbage collected. If requested so in the process constructor, then it eliminates all processes in the process's subprocess tree. -\subsection{Usage}{ -\if{html}{\out{
}}\preformatted{process$finalize()}\if{html}{\out{
}} -} -} -\if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-process-kill}{}}} -\subsection{Method \code{kill()}}{ + Terminate the process. It also terminate all of its child processes, except if they have created a new process group (on Unix), or job object (on Windows). It returns \code{TRUE} if the process diff --git a/src/Makevars b/src/Makevars index 0138c5d9..0724426d 100644 --- a/src/Makevars +++ b/src/Makevars @@ -4,7 +4,7 @@ OBJECTS = init.o poll.o errors.o processx-connection.o \ processx-vector.o create-time.o base64.o \ unix/childlist.o unix/connection.o \ unix/processx.o unix/sigchld.o unix/utils.o \ - unix/named_pipe.o cleancall.o + unix/named_pipe.o cleancall.o utils.o .PHONY: all clean diff --git a/src/init.c b/src/init.c index 32f47aa3..11133a4c 100644 --- a/src/init.c +++ b/src/init.c @@ -12,10 +12,11 @@ SEXP run_testthat_tests(void); SEXP processx__echo_on(void); SEXP processx__echo_off(void); SEXP processx__set_boot_time(SEXP); +SEXP processx__wref_key(SEXP); static const R_CallMethodDef callMethods[] = { CLEANCALL_METHOD_RECORD, - { "processx_exec", (DL_FUNC) &processx_exec, 15 }, + { "processx_exec", (DL_FUNC) &processx_exec, 14 }, { "processx_wait", (DL_FUNC) &processx_wait, 3 }, { "processx_is_alive", (DL_FUNC) &processx_is_alive, 2 }, { "processx_get_exit_status", (DL_FUNC) &processx_get_exit_status, 2 }, @@ -33,6 +34,7 @@ static const R_CallMethodDef callMethods[] = { { "processx_write_named_pipe", (DL_FUNC) &processx_write_named_pipe, 2 }, { "processx__proc_start_time", (DL_FUNC) &processx__proc_start_time, 1 }, { "processx__set_boot_time", (DL_FUNC) &processx__set_boot_time, 1 }, + { "processx__wref_key", (DL_FUNC) &processx__wref_key, 1 }, { "processx_connection_create", (DL_FUNC) &processx_connection_create, 2 }, { "processx_connection_read_chars", (DL_FUNC) &processx_connection_read_chars, 2 }, diff --git a/src/processx.h b/src/processx.h index 28272171..3f80497a 100644 --- a/src/processx.h +++ b/src/processx.h @@ -45,8 +45,8 @@ extern "C" { SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options, SEXP connections, SEXP env, SEXP windows_verbatim_args, SEXP windows_hide_window, SEXP windows_detached_process, - SEXP private_, SEXP cleanup, SEXP cleanup_signal, - SEXP wd, SEXP encoding, SEXP tree_id); + SEXP private_, SEXP cleanup, SEXP wd, SEXP encoding, + SEXP tree_id); SEXP processx_wait(SEXP status, SEXP timeout, SEXP name); SEXP processx_is_alive(SEXP status, SEXP name); SEXP processx_get_exit_status(SEXP status, SEXP name); @@ -117,4 +117,10 @@ typedef struct { } #endif +#define r_no_return __attribute__ ((noreturn)) + +r_no_return void r_unwind(SEXP x); +SEXP r_unwind_protect(void (*fn)(void *data), void *data); +SEXP r_safe_eval(SEXP expr, SEXP env, SEXP *out); + #endif diff --git a/src/unix/processx-unix.h b/src/unix/processx-unix.h index fa481051..e59817bb 100644 --- a/src/unix/processx-unix.h +++ b/src/unix/processx-unix.h @@ -23,9 +23,9 @@ typedef struct processx_handle_s { int fd2; /* readable */ int waitpipe[2]; /* use it for wait() with timeout */ int cleanup; - int cleanup_signal; - double cleanup_grace; + SEXP r6_private; double create_time; + SEXP finalizer_node; processx_connection_t *pipes[3]; int ptyfd; } processx_handle_t; diff --git a/src/unix/processx.c b/src/unix/processx.c index cb8df2b3..e75f8bbf 100644 --- a/src/unix/processx.c +++ b/src/unix/processx.c @@ -21,7 +21,7 @@ static void processx__child_init(processx_handle_t *handle, SEXP connections, processx_options_t *options, const char *tree_id); -static SEXP processx__make_handle(SEXP private, int cleanup, double cleanup_grace); +static SEXP processx__make_handle(SEXP private, int cleanup); static void processx__handle_destroy(processx_handle_t *handle); void processx__create_connections(processx_handle_t *handle, SEXP private, const char *encoding); @@ -336,6 +336,40 @@ SEXP c_processx_kill_data(void *payload) { return R_NilValue; } +static +SEXP finalizer_call(SEXP private) { + static SEXP finalize_fn = NULL; + if (!finalize_fn) { + finalize_fn = lang3(install(":::"), + install("processx"), + install("process_finalize")); + R_PreserveObject(finalize_fn); + } + + return lang2(finalize_fn, private); +} + +static SEXP session_finalizer_list = NULL; + +// These need to be macros to be considered protectors by rchk +#define node_poke_prev(NODE, PREV) SET_VECTOR_ELT((NODE), 0, (PREV)) +#define node_poke_next(NODE, NEXT) SET_VECTOR_ELT((NODE), 1, (NEXT)) +#define node_poke_value(NODE, VALUE) SET_VECTOR_ELT((NODE), 2, (VALUE)) + +static SEXP node_prev(SEXP node) { return VECTOR_ELT(node, 0); } +static SEXP node_next(SEXP node) { return VECTOR_ELT(node, 1); } + +static +SEXP new_node(SEXP prev, SEXP next, SEXP value) { + SEXP out = allocVector(VECSXP, 3); + + node_poke_prev(out, prev); + node_poke_next(out, next); + node_poke_value(out, value); + + return out; +} + void processx__finalizer(SEXP status) { processx_handle_t *handle = (processx_handle_t*) R_ExternalPtrAddr(status); @@ -348,14 +382,21 @@ void processx__finalizer(SEXP status) { if (!handle) return; - // FIXME: Do we need cleancall here? - if (handle->cleanup) { - struct cleanup_kill_data data = { - .status = status, - .grace = handle->cleanup_grace, - .name = R_NilValue - }; - r_with_cleanup_context(c_processx_kill_data, &data); + SEXP call = PROTECT(finalizer_call(handle->r6_private)); + SEXP err = r_safe_eval(call, R_BaseEnv, NULL); + UNPROTECT(1); + + /* Remove node from session finalizer list */ + SEXP node = handle->finalizer_node; + SEXP prev = node_prev(node); + SEXP next = node_next(node); + + if (prev != R_NilValue) { + node_poke_next(prev, next); + } + node_poke_prev(next, prev); + if (node == session_finalizer_list) { + session_finalizer_list = next; } /* Note: if no cleanup is requested, then we still have a sigchld @@ -365,21 +406,64 @@ void processx__finalizer(SEXP status) { /* Deallocate memory */ R_ClearExternalPtr(status); processx__handle_destroy(handle); + + if (err) { + r_unwind(err); + } +} + +static +void processx__session_finalizer(SEXP _) { + static SEXP finalize_fn = NULL; + if (!finalize_fn) { + finalize_fn = lang3(install(":::"), + install("processx"), + install("session_finalize")); + R_PreserveObject(finalize_fn); + } + + SEXP call = PROTECT(lang2(finalize_fn, session_finalizer_list)); + eval(call, R_BaseEnv); + UNPROTECT(1); } -static SEXP processx__make_handle(SEXP private, int cleanup, double cleanup_grace) { +static +void processx__register_finalizer(SEXP status, processx_handle_t *handle) { + if (!session_finalizer_list) { + // This root node is never popped and protects the rest of the list + session_finalizer_list = new_node(R_NilValue, R_NilValue, R_NilValue); + R_PreserveObject(session_finalizer_list); + R_RegisterCFinalizerEx(R_BaseEnv, &processx__session_finalizer, 1); + } + + // GC finalizer + R_RegisterCFinalizerEx(status, &processx__finalizer, 0); + + // Session finalizer + SEXP private_weakref = R_MakeWeakRef(handle->r6_private, R_NilValue, R_NilValue, 0); + PROTECT(private_weakref); + + SEXP node = new_node(R_NilValue, session_finalizer_list, private_weakref); + node_poke_prev(session_finalizer_list, node); + session_finalizer_list = node; + handle->finalizer_node = node; + + UNPROTECT(1); +} + +static SEXP processx__make_handle(SEXP private, int cleanup) { processx_handle_t * handle; SEXP result; handle = (processx_handle_t*) malloc(sizeof(processx_handle_t)); if (!handle) { R_THROW_ERROR("Cannot make processx handle, out of memory"); } memset(handle, 0, sizeof(processx_handle_t)); + handle->waitpipe[0] = handle->waitpipe[1] = -1; + handle->r6_private = private; result = PROTECT(R_MakeExternalPtr(handle, private, R_NilValue)); - R_RegisterCFinalizerEx(result, processx__finalizer, 1); - handle->cleanup = cleanup; - handle->cleanup_grace = cleanup_grace; + processx__register_finalizer(result, handle); UNPROTECT(1); return result; @@ -426,14 +510,12 @@ void processx__make_socketpair(int pipe[2], const char *exe) { SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options, SEXP connections, SEXP env, SEXP windows_verbatim_args, SEXP windows_hide_window, SEXP windows_detached_process, - SEXP private, SEXP cleanup, SEXP cleanup_grace, SEXP wd, - SEXP encoding, SEXP tree_id) { + SEXP private, SEXP cleanup, SEXP wd, SEXP encoding, + SEXP tree_id) { char *ccommand = processx__tmp_string(command, 0); char **cargs = processx__tmp_character(args); char **cenv = isNull(env) ? 0 : processx__tmp_character(env); - int ccleanup = INTEGER(cleanup)[0]; - double ccleanup_grace = REAL(cleanup_grace)[0]; const int cpty = LOGICAL(pty)[0]; const char *cencoding = CHAR(STRING_ELT(encoding, 0)); @@ -468,7 +550,8 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options, processx__setup_sigchld(); - result = PROTECT(processx__make_handle(private, ccleanup, ccleanup_grace)); + int ccleanup = LOGICAL(cleanup)[0]; + result = PROTECT(processx__make_handle(private, ccleanup)); handle = R_ExternalPtrAddr(result); if (cpty) { diff --git a/src/utils.c b/src/utils.c new file mode 100644 index 00000000..bd3189b8 --- /dev/null +++ b/src/utils.c @@ -0,0 +1,82 @@ +#include +#include "processx.h" + +// Need to jump out of the `R_UnwindProtect()` context +#include + +r_no_return +void r_unwind(SEXP x) { + if (inherits(x, "error")) { + SEXP call = PROTECT(lang2(install("stop"), x)); + eval(call, R_BaseEnv); + } else { + R_ContinueUnwind(x); + } + error("Unreachable"); +} + +static +void unwind_cleanup(void *payload, Rboolean jump) { + if (jump) { + jmp_buf *env = (jmp_buf *) payload; + longjmp(*env, 1); + } +} + +// Conversion of SEXP-returning callback to a void-returning one +struct callback_compat { + void (*fn)(void *data); + void *data; +}; + +static +SEXP callback_compat(void *payload) { + struct callback_compat *data = (struct callback_compat *) payload; + data->fn(data->data); + return R_NilValue; +} + +SEXP r_unwind_protect(void (*fn)(void *data), void *data) { + SEXP cont = PROTECT(R_MakeUnwindCont()); + jmp_buf env; + + struct callback_compat compat_data = { + .fn = fn, + .data = data + }; + + if (setjmp(env)) { + UNPROTECT(1); + return cont; + } + + R_UnwindProtect(&callback_compat, &compat_data, &unwind_cleanup, &env, cont); + + UNPROTECT(1); + return NULL; +} + +struct safe_eval { + SEXP expr; + SEXP env; + SEXP *out; +}; + +static +void safe_eval_callback(void *payload) { + struct safe_eval *data = (struct safe_eval *) payload; + SEXP out = eval(data->expr, data->env); + + if (data->out) { + *data->out = out; + } +} + +SEXP r_safe_eval(SEXP expr, SEXP env, SEXP *out) { + struct safe_eval data = { .expr = expr, .env = env, .out = out }; + return r_unwind_protect(&safe_eval_callback, &data); +} + +SEXP processx__wref_key(SEXP x) { + return R_WeakRefKey(x); +} diff --git a/src/win/processx-win.h b/src/win/processx-win.h index b17c8537..78bd246f 100644 --- a/src/win/processx-win.h +++ b/src/win/processx-win.h @@ -12,7 +12,7 @@ typedef struct processx_handle_s { BYTE *child_stdio_buffer; HANDLE waitObject; processx_connection_t *pipes[3]; - int cleanup; + SEXP cleanup; double create_time; } processx_handle_t; diff --git a/src/win/processx.c b/src/win/processx.c index c423438d..9162ecd8 100644 --- a/src/win/processx.c +++ b/src/win/processx.c @@ -843,7 +843,7 @@ void processx__finalizer(SEXP status) { processx__handle_destroy(handle); } -SEXP processx__make_handle(SEXP private, int cleanup) { +SEXP processx__make_handle(SEXP private, SEXP cleanup) { processx_handle_t * handle; SEXP result; @@ -851,7 +851,7 @@ SEXP processx__make_handle(SEXP private, int cleanup) { if (!handle) { R_THROW_ERROR("Out of memory when creating subprocess"); } memset(handle, 0, sizeof(processx_handle_t)); - result = PROTECT(R_MakeExternalPtr(handle, private, R_NilValue)); + result = PROTECT(R_MakeExternalPtr(handle, private, cleanup)); R_RegisterCFinalizerEx(result, processx__finalizer, 1); handle->cleanup = cleanup; @@ -868,8 +868,8 @@ void processx__handle_destroy(processx_handle_t *handle) { SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options, SEXP connections, SEXP env, SEXP windows_verbatim_args, SEXP windows_hide, SEXP windows_detached_process, - SEXP private, SEXP cleanup, SEXP _cleanup_grace, SEXP wd, - SEXP encoding, SEXP tree_id) { + SEXP private, SEXP cleanup, SEXP wd, SEXP encoding, + SEXP tree_id) { const char *ccommand = CHAR(STRING_ELT(command, 0)); const char *cencoding = CHAR(STRING_ELT(encoding, 0)); @@ -886,7 +886,6 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options, DWORD process_flags; processx_handle_t *handle; - int ccleanup = INTEGER(cleanup)[0]; SEXP result; DWORD dwerr; @@ -954,9 +953,11 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options, } } - result = PROTECT(processx__make_handle(private, ccleanup)); + result = PROTECT(processx__make_handle(private, cleanup)); handle = R_ExternalPtrAddr(result); + int ccleanup = LOGICAL(result)[0]; + int inherit_std = 0; err = processx__stdio_create(handle, connections, &handle->child_stdio_buffer, private, diff --git a/tests/testthat/test-process.R b/tests/testthat/test-process.R index 4bf8f219..7b5f9d36 100644 --- a/tests/testthat/test-process.R +++ b/tests/testthat/test-process.R @@ -324,3 +324,53 @@ test_that("can kill with SIGTERM when ignored", { Sys.sleep(0.05) expect_true(p$is_alive()) }) + +test_that("clean up in parallel", { + opts <- callr::r_process_options(extra = list( + cleanup_tree = TRUE, + cleanup_grace = 0.2 + )) + + fn <- function(load_sigtermignore, cleanup) { + sub_opts <- callr::r_process_options(extra = list( + cleanup = cleanup + )) + s <- callr::r_session$new(sub_opts) + + # Ensure grace delay kicks in + s$run(load_sigtermignore) + + # Make sure sessions are not closed by stdin EOF + s$call(function() Sys.sleep(60)) + + # Keep alive through options + sessions <- c(getOption("sessions"), list(s)) + options(sessions = sessions) + } + + # On GC + s <- callr::r_session$new(opts) + for (i in 1:5) s$run(fn, list(load_sigtermignore, cleanup = FALSE)) + + tree <- ps::ps_find_tree(s$.__enclos_env__$private$tree_id) + expect_true(all(sapply(tree, ps::ps_is_running))) + + rm(s) + gc() + + # Why is this needed? + Sys.sleep(0.05) + + expect_false(any(sapply(tree, ps::ps_is_running))) + + # On session quit + s <- callr::r_session$new(opts) + for (i in 1:5) s$run(fn, list(load_sigtermignore, cleanup = TRUE)) + + tree <- ps::ps_find_tree(s$.__enclos_env__$private$tree_id) + expect_true(all(sapply(tree, ps::ps_is_running))) + + s$run(function() q()) + + expect_false(any(sapply(tree, ps::ps_is_running))) +})