diff --git a/DESCRIPTION b/DESCRIPTION
index ef2817ba..96a21b23 100644
--- a/DESCRIPTION
+++ b/DESCRIPTION
@@ -20,7 +20,7 @@ URL: https://processx.r-lib.org, https://github.com/r-lib/processx#readme
BugReports: https://github.com/r-lib/processx/issues
Depends: R (>= 3.4.0)
Imports:
- ps (>= 1.2.0),
+ ps (>= 1.7.5.9001),
R6,
utils
Suggests:
@@ -36,6 +36,8 @@ Suggests:
testthat (>= 3.0.0),
webfakes,
withr
+Remotes:
+ r-lib/ps@kill-tree-grace
Encoding: UTF-8
RoxygenNote: 7.2.3
Roxygen: list(markdown = TRUE)
diff --git a/NEWS.md b/NEWS.md
index 11c04182..325564ac 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -1,5 +1,10 @@
# processx (development version)
+* Processes are now killed in parallel with a period of grace on
+ garbage collection (if `cleanup_tree` is `TRUE`) and on session
+ quit. The delay can be controlled with the new `cleanup_grace`
+ argument.
+
* The `grace` argument of the `kill()` method is now active on Unix
platforms. processx first tries to kill with `SIGTERM` with a
timeout of `grace` seconds. After the timeout, `SIGKILL` is sent as
diff --git a/R/finalize.R b/R/finalize.R
new file mode 100644
index 00000000..a13ea33e
--- /dev/null
+++ b/R/finalize.R
@@ -0,0 +1,44 @@
+process_finalize <- function(private) {
+ ps <- process_cleanup_list(private)
+ ps::ps_kill_parallel(ps, private$cleanup_grace)
+}
+
+process_cleanup_list <- function(private) {
+ ps <- list()
+
+ if (private$cleanup) {
+ # Can't be created in advance because the ps finalizer might run first
+ handle <- ps::ps_handle(private$pid, as.POSIXct(private$starttime))
+ ps <- c(ps, list(handle))
+ }
+
+ if (private$cleanup_tree) {
+ ps <- c(ps, ps::ps_find_tree(private$tree_id))
+ }
+
+ ps
+}
+
+session_finalize <- function(node) {
+ ps <- list()
+ grace <- 0
+
+ while (!node_is_root(node)) {
+ private <- wref_key(node_value(node))
+ ps <- c(ps, process_cleanup_list(private))
+
+ if (!is.null(private$cleanup_grace)) {
+ grace <- max(grace, private$cleanup_grace)
+ }
+
+ node <- node_next(node)
+ }
+
+ ps::ps_kill_parallel(ps, grace)
+}
+
+wref_key <- function(x) .Call(c_processx__wref_key, x)
+node_is_root <- function(x) is.null(node_next(x))
+node_prev <- function(x) x[[1]]
+node_next <- function(x) x[[2]]
+node_value <- function(x) x[[3]]
diff --git a/R/initialize.R b/R/initialize.R
index 673c97c3..f30c744c 100644
--- a/R/initialize.R
+++ b/R/initialize.R
@@ -141,7 +141,7 @@ process_initialize <- function(self, private, command, args,
c_processx_exec,
command, c(command, args), pty, pty_options,
connections, env, windows_verbatim_args, windows_hide_window,
- windows_detached_process, private, cleanup, cleanup_grace, wd, encoding,
+ windows_detached_process, private, cleanup, wd, encoding,
paste0("PROCESSX_", private$tree_id, "=YES")
)
@@ -155,6 +155,9 @@ process_initialize <- function(self, private, command, args,
chain_call(c_processx__proc_start_time, private$status)
if (private$starttime == 0) private$starttime <- Sys.time()
+ # Needed for cleaning up
+ private$pid <- self$get_pid()
+
## Need to close this, otherwise the child's end of the pipe
## will not be closed when the child exits, and then we cannot
## poll it.
diff --git a/R/process.R b/R/process.R
index 94382ab6..efc1d1e5 100644
--- a/R/process.R
+++ b/R/process.R
@@ -233,11 +233,6 @@ process <- R6::R6Class(
#' collected. If requested so in the process constructor, then it
#' eliminates all processes in the process's subprocess tree.
- finalize = function() {
- if (!is.null(private$tree_id) && private$cleanup_tree &&
- ps::ps_is_supported()) self$kill_tree(grace = private$cleanup_grace)
- },
-
#' @description
#' Terminate the process. It also terminate all of its child
#' processes, except if they have created a new process group (on Unix),
@@ -671,6 +666,7 @@ process <- R6::R6Class(
windows_hide_window = NULL,
status = NULL, # C file handle
+ pid = NULL, # pid for cleanup
supervised = FALSE, # Whether process is tracked by supervisor
diff --git a/man/process.Rd b/man/process.Rd
index db35b11e..fd8d748e 100644
--- a/man/process.Rd
+++ b/man/process.Rd
@@ -91,7 +91,6 @@ p$is_alive()
\subsection{Public methods}{
\itemize{
\item \href{#method-process-new}{\code{process$new()}}
-\item \href{#method-process-finalize}{\code{process$finalize()}}
\item \href{#method-process-kill}{\code{process$kill()}}
\item \href{#method-process-kill_tree}{\code{process$kill_tree()}}
\item \href{#method-process-signal}{\code{process$signal()}}
@@ -320,21 +319,14 @@ R6 object representing the process.
}
}
\if{html}{\out{
}}
-\if{html}{\out{}}
-\if{latex}{\out{\hypertarget{method-process-finalize}{}}}
-\subsection{Method \code{finalize()}}{
+\if{html}{\out{}}
+\if{latex}{\out{\hypertarget{method-process-kill}{}}}
+\subsection{Method \code{kill()}}{
Cleanup method that is called when the \code{process} object is garbage
collected. If requested so in the process constructor, then it
eliminates all processes in the process's subprocess tree.
-\subsection{Usage}{
-\if{html}{\out{}}\preformatted{process$finalize()}\if{html}{\out{
}}
-}
-}
-\if{html}{\out{
}}
-\if{html}{\out{}}
-\if{latex}{\out{\hypertarget{method-process-kill}{}}}
-\subsection{Method \code{kill()}}{
+
Terminate the process. It also terminate all of its child
processes, except if they have created a new process group (on Unix),
or job object (on Windows). It returns \code{TRUE} if the process
diff --git a/src/Makevars b/src/Makevars
index 0138c5d9..0724426d 100644
--- a/src/Makevars
+++ b/src/Makevars
@@ -4,7 +4,7 @@ OBJECTS = init.o poll.o errors.o processx-connection.o \
processx-vector.o create-time.o base64.o \
unix/childlist.o unix/connection.o \
unix/processx.o unix/sigchld.o unix/utils.o \
- unix/named_pipe.o cleancall.o
+ unix/named_pipe.o cleancall.o utils.o
.PHONY: all clean
diff --git a/src/init.c b/src/init.c
index 32f47aa3..11133a4c 100644
--- a/src/init.c
+++ b/src/init.c
@@ -12,10 +12,11 @@ SEXP run_testthat_tests(void);
SEXP processx__echo_on(void);
SEXP processx__echo_off(void);
SEXP processx__set_boot_time(SEXP);
+SEXP processx__wref_key(SEXP);
static const R_CallMethodDef callMethods[] = {
CLEANCALL_METHOD_RECORD,
- { "processx_exec", (DL_FUNC) &processx_exec, 15 },
+ { "processx_exec", (DL_FUNC) &processx_exec, 14 },
{ "processx_wait", (DL_FUNC) &processx_wait, 3 },
{ "processx_is_alive", (DL_FUNC) &processx_is_alive, 2 },
{ "processx_get_exit_status", (DL_FUNC) &processx_get_exit_status, 2 },
@@ -33,6 +34,7 @@ static const R_CallMethodDef callMethods[] = {
{ "processx_write_named_pipe", (DL_FUNC) &processx_write_named_pipe, 2 },
{ "processx__proc_start_time", (DL_FUNC) &processx__proc_start_time, 1 },
{ "processx__set_boot_time", (DL_FUNC) &processx__set_boot_time, 1 },
+ { "processx__wref_key", (DL_FUNC) &processx__wref_key, 1 },
{ "processx_connection_create", (DL_FUNC) &processx_connection_create, 2 },
{ "processx_connection_read_chars", (DL_FUNC) &processx_connection_read_chars, 2 },
diff --git a/src/processx.h b/src/processx.h
index 28272171..3f80497a 100644
--- a/src/processx.h
+++ b/src/processx.h
@@ -45,8 +45,8 @@ extern "C" {
SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
SEXP connections, SEXP env, SEXP windows_verbatim_args,
SEXP windows_hide_window, SEXP windows_detached_process,
- SEXP private_, SEXP cleanup, SEXP cleanup_signal,
- SEXP wd, SEXP encoding, SEXP tree_id);
+ SEXP private_, SEXP cleanup, SEXP wd, SEXP encoding,
+ SEXP tree_id);
SEXP processx_wait(SEXP status, SEXP timeout, SEXP name);
SEXP processx_is_alive(SEXP status, SEXP name);
SEXP processx_get_exit_status(SEXP status, SEXP name);
@@ -117,4 +117,10 @@ typedef struct {
}
#endif
+#define r_no_return __attribute__ ((noreturn))
+
+r_no_return void r_unwind(SEXP x);
+SEXP r_unwind_protect(void (*fn)(void *data), void *data);
+SEXP r_safe_eval(SEXP expr, SEXP env, SEXP *out);
+
#endif
diff --git a/src/unix/processx-unix.h b/src/unix/processx-unix.h
index fa481051..e59817bb 100644
--- a/src/unix/processx-unix.h
+++ b/src/unix/processx-unix.h
@@ -23,9 +23,9 @@ typedef struct processx_handle_s {
int fd2; /* readable */
int waitpipe[2]; /* use it for wait() with timeout */
int cleanup;
- int cleanup_signal;
- double cleanup_grace;
+ SEXP r6_private;
double create_time;
+ SEXP finalizer_node;
processx_connection_t *pipes[3];
int ptyfd;
} processx_handle_t;
diff --git a/src/unix/processx.c b/src/unix/processx.c
index cb8df2b3..e75f8bbf 100644
--- a/src/unix/processx.c
+++ b/src/unix/processx.c
@@ -21,7 +21,7 @@ static void processx__child_init(processx_handle_t *handle, SEXP connections,
processx_options_t *options,
const char *tree_id);
-static SEXP processx__make_handle(SEXP private, int cleanup, double cleanup_grace);
+static SEXP processx__make_handle(SEXP private, int cleanup);
static void processx__handle_destroy(processx_handle_t *handle);
void processx__create_connections(processx_handle_t *handle, SEXP private,
const char *encoding);
@@ -336,6 +336,40 @@ SEXP c_processx_kill_data(void *payload) {
return R_NilValue;
}
+static
+SEXP finalizer_call(SEXP private) {
+ static SEXP finalize_fn = NULL;
+ if (!finalize_fn) {
+ finalize_fn = lang3(install(":::"),
+ install("processx"),
+ install("process_finalize"));
+ R_PreserveObject(finalize_fn);
+ }
+
+ return lang2(finalize_fn, private);
+}
+
+static SEXP session_finalizer_list = NULL;
+
+// These need to be macros to be considered protectors by rchk
+#define node_poke_prev(NODE, PREV) SET_VECTOR_ELT((NODE), 0, (PREV))
+#define node_poke_next(NODE, NEXT) SET_VECTOR_ELT((NODE), 1, (NEXT))
+#define node_poke_value(NODE, VALUE) SET_VECTOR_ELT((NODE), 2, (VALUE))
+
+static SEXP node_prev(SEXP node) { return VECTOR_ELT(node, 0); }
+static SEXP node_next(SEXP node) { return VECTOR_ELT(node, 1); }
+
+static
+SEXP new_node(SEXP prev, SEXP next, SEXP value) {
+ SEXP out = allocVector(VECSXP, 3);
+
+ node_poke_prev(out, prev);
+ node_poke_next(out, next);
+ node_poke_value(out, value);
+
+ return out;
+}
+
void processx__finalizer(SEXP status) {
processx_handle_t *handle = (processx_handle_t*) R_ExternalPtrAddr(status);
@@ -348,14 +382,21 @@ void processx__finalizer(SEXP status) {
if (!handle)
return;
- // FIXME: Do we need cleancall here?
- if (handle->cleanup) {
- struct cleanup_kill_data data = {
- .status = status,
- .grace = handle->cleanup_grace,
- .name = R_NilValue
- };
- r_with_cleanup_context(c_processx_kill_data, &data);
+ SEXP call = PROTECT(finalizer_call(handle->r6_private));
+ SEXP err = r_safe_eval(call, R_BaseEnv, NULL);
+ UNPROTECT(1);
+
+ /* Remove node from session finalizer list */
+ SEXP node = handle->finalizer_node;
+ SEXP prev = node_prev(node);
+ SEXP next = node_next(node);
+
+ if (prev != R_NilValue) {
+ node_poke_next(prev, next);
+ }
+ node_poke_prev(next, prev);
+ if (node == session_finalizer_list) {
+ session_finalizer_list = next;
}
/* Note: if no cleanup is requested, then we still have a sigchld
@@ -365,21 +406,64 @@ void processx__finalizer(SEXP status) {
/* Deallocate memory */
R_ClearExternalPtr(status);
processx__handle_destroy(handle);
+
+ if (err) {
+ r_unwind(err);
+ }
+}
+
+static
+void processx__session_finalizer(SEXP _) {
+ static SEXP finalize_fn = NULL;
+ if (!finalize_fn) {
+ finalize_fn = lang3(install(":::"),
+ install("processx"),
+ install("session_finalize"));
+ R_PreserveObject(finalize_fn);
+ }
+
+ SEXP call = PROTECT(lang2(finalize_fn, session_finalizer_list));
+ eval(call, R_BaseEnv);
+ UNPROTECT(1);
}
-static SEXP processx__make_handle(SEXP private, int cleanup, double cleanup_grace) {
+static
+void processx__register_finalizer(SEXP status, processx_handle_t *handle) {
+ if (!session_finalizer_list) {
+ // This root node is never popped and protects the rest of the list
+ session_finalizer_list = new_node(R_NilValue, R_NilValue, R_NilValue);
+ R_PreserveObject(session_finalizer_list);
+ R_RegisterCFinalizerEx(R_BaseEnv, &processx__session_finalizer, 1);
+ }
+
+ // GC finalizer
+ R_RegisterCFinalizerEx(status, &processx__finalizer, 0);
+
+ // Session finalizer
+ SEXP private_weakref = R_MakeWeakRef(handle->r6_private, R_NilValue, R_NilValue, 0);
+ PROTECT(private_weakref);
+
+ SEXP node = new_node(R_NilValue, session_finalizer_list, private_weakref);
+ node_poke_prev(session_finalizer_list, node);
+ session_finalizer_list = node;
+ handle->finalizer_node = node;
+
+ UNPROTECT(1);
+}
+
+static SEXP processx__make_handle(SEXP private, int cleanup) {
processx_handle_t * handle;
SEXP result;
handle = (processx_handle_t*) malloc(sizeof(processx_handle_t));
if (!handle) { R_THROW_ERROR("Cannot make processx handle, out of memory"); }
memset(handle, 0, sizeof(processx_handle_t));
+
handle->waitpipe[0] = handle->waitpipe[1] = -1;
+ handle->r6_private = private;
result = PROTECT(R_MakeExternalPtr(handle, private, R_NilValue));
- R_RegisterCFinalizerEx(result, processx__finalizer, 1);
- handle->cleanup = cleanup;
- handle->cleanup_grace = cleanup_grace;
+ processx__register_finalizer(result, handle);
UNPROTECT(1);
return result;
@@ -426,14 +510,12 @@ void processx__make_socketpair(int pipe[2], const char *exe) {
SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
SEXP connections, SEXP env, SEXP windows_verbatim_args,
SEXP windows_hide_window, SEXP windows_detached_process,
- SEXP private, SEXP cleanup, SEXP cleanup_grace, SEXP wd,
- SEXP encoding, SEXP tree_id) {
+ SEXP private, SEXP cleanup, SEXP wd, SEXP encoding,
+ SEXP tree_id) {
char *ccommand = processx__tmp_string(command, 0);
char **cargs = processx__tmp_character(args);
char **cenv = isNull(env) ? 0 : processx__tmp_character(env);
- int ccleanup = INTEGER(cleanup)[0];
- double ccleanup_grace = REAL(cleanup_grace)[0];
const int cpty = LOGICAL(pty)[0];
const char *cencoding = CHAR(STRING_ELT(encoding, 0));
@@ -468,7 +550,8 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
processx__setup_sigchld();
- result = PROTECT(processx__make_handle(private, ccleanup, ccleanup_grace));
+ int ccleanup = LOGICAL(cleanup)[0];
+ result = PROTECT(processx__make_handle(private, ccleanup));
handle = R_ExternalPtrAddr(result);
if (cpty) {
diff --git a/src/utils.c b/src/utils.c
new file mode 100644
index 00000000..bd3189b8
--- /dev/null
+++ b/src/utils.c
@@ -0,0 +1,82 @@
+#include
+#include "processx.h"
+
+// Need to jump out of the `R_UnwindProtect()` context
+#include
+
+r_no_return
+void r_unwind(SEXP x) {
+ if (inherits(x, "error")) {
+ SEXP call = PROTECT(lang2(install("stop"), x));
+ eval(call, R_BaseEnv);
+ } else {
+ R_ContinueUnwind(x);
+ }
+ error("Unreachable");
+}
+
+static
+void unwind_cleanup(void *payload, Rboolean jump) {
+ if (jump) {
+ jmp_buf *env = (jmp_buf *) payload;
+ longjmp(*env, 1);
+ }
+}
+
+// Conversion of SEXP-returning callback to a void-returning one
+struct callback_compat {
+ void (*fn)(void *data);
+ void *data;
+};
+
+static
+SEXP callback_compat(void *payload) {
+ struct callback_compat *data = (struct callback_compat *) payload;
+ data->fn(data->data);
+ return R_NilValue;
+}
+
+SEXP r_unwind_protect(void (*fn)(void *data), void *data) {
+ SEXP cont = PROTECT(R_MakeUnwindCont());
+ jmp_buf env;
+
+ struct callback_compat compat_data = {
+ .fn = fn,
+ .data = data
+ };
+
+ if (setjmp(env)) {
+ UNPROTECT(1);
+ return cont;
+ }
+
+ R_UnwindProtect(&callback_compat, &compat_data, &unwind_cleanup, &env, cont);
+
+ UNPROTECT(1);
+ return NULL;
+}
+
+struct safe_eval {
+ SEXP expr;
+ SEXP env;
+ SEXP *out;
+};
+
+static
+void safe_eval_callback(void *payload) {
+ struct safe_eval *data = (struct safe_eval *) payload;
+ SEXP out = eval(data->expr, data->env);
+
+ if (data->out) {
+ *data->out = out;
+ }
+}
+
+SEXP r_safe_eval(SEXP expr, SEXP env, SEXP *out) {
+ struct safe_eval data = { .expr = expr, .env = env, .out = out };
+ return r_unwind_protect(&safe_eval_callback, &data);
+}
+
+SEXP processx__wref_key(SEXP x) {
+ return R_WeakRefKey(x);
+}
diff --git a/src/win/processx-win.h b/src/win/processx-win.h
index b17c8537..78bd246f 100644
--- a/src/win/processx-win.h
+++ b/src/win/processx-win.h
@@ -12,7 +12,7 @@ typedef struct processx_handle_s {
BYTE *child_stdio_buffer;
HANDLE waitObject;
processx_connection_t *pipes[3];
- int cleanup;
+ SEXP cleanup;
double create_time;
} processx_handle_t;
diff --git a/src/win/processx.c b/src/win/processx.c
index c423438d..9162ecd8 100644
--- a/src/win/processx.c
+++ b/src/win/processx.c
@@ -843,7 +843,7 @@ void processx__finalizer(SEXP status) {
processx__handle_destroy(handle);
}
-SEXP processx__make_handle(SEXP private, int cleanup) {
+SEXP processx__make_handle(SEXP private, SEXP cleanup) {
processx_handle_t * handle;
SEXP result;
@@ -851,7 +851,7 @@ SEXP processx__make_handle(SEXP private, int cleanup) {
if (!handle) { R_THROW_ERROR("Out of memory when creating subprocess"); }
memset(handle, 0, sizeof(processx_handle_t));
- result = PROTECT(R_MakeExternalPtr(handle, private, R_NilValue));
+ result = PROTECT(R_MakeExternalPtr(handle, private, cleanup));
R_RegisterCFinalizerEx(result, processx__finalizer, 1);
handle->cleanup = cleanup;
@@ -868,8 +868,8 @@ void processx__handle_destroy(processx_handle_t *handle) {
SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
SEXP connections, SEXP env, SEXP windows_verbatim_args,
SEXP windows_hide, SEXP windows_detached_process,
- SEXP private, SEXP cleanup, SEXP _cleanup_grace, SEXP wd,
- SEXP encoding, SEXP tree_id) {
+ SEXP private, SEXP cleanup, SEXP wd, SEXP encoding,
+ SEXP tree_id) {
const char *ccommand = CHAR(STRING_ELT(command, 0));
const char *cencoding = CHAR(STRING_ELT(encoding, 0));
@@ -886,7 +886,6 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
DWORD process_flags;
processx_handle_t *handle;
- int ccleanup = INTEGER(cleanup)[0];
SEXP result;
DWORD dwerr;
@@ -954,9 +953,11 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
}
}
- result = PROTECT(processx__make_handle(private, ccleanup));
+ result = PROTECT(processx__make_handle(private, cleanup));
handle = R_ExternalPtrAddr(result);
+ int ccleanup = LOGICAL(result)[0];
+
int inherit_std = 0;
err = processx__stdio_create(handle, connections,
&handle->child_stdio_buffer, private,
diff --git a/tests/testthat/test-process.R b/tests/testthat/test-process.R
index 4bf8f219..7b5f9d36 100644
--- a/tests/testthat/test-process.R
+++ b/tests/testthat/test-process.R
@@ -324,3 +324,53 @@ test_that("can kill with SIGTERM when ignored", {
Sys.sleep(0.05)
expect_true(p$is_alive())
})
+
+test_that("clean up in parallel", {
+ opts <- callr::r_process_options(extra = list(
+ cleanup_tree = TRUE,
+ cleanup_grace = 0.2
+ ))
+
+ fn <- function(load_sigtermignore, cleanup) {
+ sub_opts <- callr::r_process_options(extra = list(
+ cleanup = cleanup
+ ))
+ s <- callr::r_session$new(sub_opts)
+
+ # Ensure grace delay kicks in
+ s$run(load_sigtermignore)
+
+ # Make sure sessions are not closed by stdin EOF
+ s$call(function() Sys.sleep(60))
+
+ # Keep alive through options
+ sessions <- c(getOption("sessions"), list(s))
+ options(sessions = sessions)
+ }
+
+ # On GC
+ s <- callr::r_session$new(opts)
+ for (i in 1:5) s$run(fn, list(load_sigtermignore, cleanup = FALSE))
+
+ tree <- ps::ps_find_tree(s$.__enclos_env__$private$tree_id)
+ expect_true(all(sapply(tree, ps::ps_is_running)))
+
+ rm(s)
+ gc()
+
+ # Why is this needed?
+ Sys.sleep(0.05)
+
+ expect_false(any(sapply(tree, ps::ps_is_running)))
+
+ # On session quit
+ s <- callr::r_session$new(opts)
+ for (i in 1:5) s$run(fn, list(load_sigtermignore, cleanup = TRUE))
+
+ tree <- ps::ps_find_tree(s$.__enclos_env__$private$tree_id)
+ expect_true(all(sapply(tree, ps::ps_is_running)))
+
+ s$run(function() q())
+
+ expect_false(any(sapply(tree, ps::ps_is_running)))
+})