diff --git a/NAMESPACE b/NAMESPACE index 1690c4b..7ecb09d 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -37,6 +37,8 @@ importFrom(DT,datatable) importFrom(DT,renderDT) importFrom(DT,renderDataTable) importFrom(Hmisc,describe) +importFrom(MSstatsBig,bigDIANNtoMSstatsFormat) +importFrom(MSstatsBig,bigSpectronauttoMSstatsFormat) importFrom(MSstatsBioNet,annotateProteinInfoFromIndra) importFrom(MSstatsBioNet,generateCytoscapeConfig) importFrom(MSstatsBioNet,getSubnetworkFromIndra) diff --git a/R/MSstatsShiny.R b/R/MSstatsShiny.R index c378098..1861d5c 100644 --- a/R/MSstatsShiny.R +++ b/R/MSstatsShiny.R @@ -35,6 +35,7 @@ #' @importFrom dplyr `%>%` filter summarise n_distinct group_by ungroup select n mutate #' @importFrom tidyr unite #' @importFrom MSstatsConvert MSstatsLogsSettings +#' @importFrom MSstatsBig bigDIANNtoMSstatsFormat bigSpectronauttoMSstatsFormat #' @importFrom MSstatsPTM dataProcessPlotsPTM groupComparisonPlotsPTM MaxQtoMSstatsPTMFormat PDtoMSstatsPTMFormat FragPipetoMSstatsPTMFormat SkylinetoMSstatsPTMFormat SpectronauttoMSstatsPTMFormat #' @importFrom utils capture.output head packageVersion read.csv read.delim write.csv #' @importFrom stats aggregate diff --git a/R/module-loadpage-server.R b/R/module-loadpage-server.R index 975b6a6..ba8b707 100644 --- a/R/module-loadpage-server.R +++ b/R/module-loadpage-server.R @@ -38,7 +38,7 @@ loadpageServer <- function(id, parent_session, is_web_server = FALSE) { }) # Render just the filename for user feedback in the UI. - output$specdata_big_path <- renderPrint({ + output$big_file_path <- renderPrint({ req(nrow(local_file_info()) > 0) cat(local_file_info()$name) }) @@ -72,6 +72,52 @@ loadpageServer <- function(id, parent_session, is_web_server = FALSE) { tagList(ui_elements, create_separator_buttons(session$ns, "sep_specdata")) }) + output$diann_header_ui <- renderUI({ + req(input$filetype == 'diann', input$BIO != 'PTM') + create_diann_header() + }) + + output$diann_file_selection_ui <- renderUI({ + req(input$filetype == 'diann', input$BIO != 'PTM') + + ui_elements <- tagList() + + if (!is_web_server) { + ui_elements <- tagList(ui_elements, create_diann_mode_selector(session$ns, isTRUE(input$big_file_diann))) + + if (isTRUE(input$big_file_diann)) { + ui_elements <- tagList(ui_elements, create_diann_large_file_ui(session$ns)) + } else { + ui_elements <- tagList(ui_elements, create_diann_standard_ui(session$ns)) + } + } else { + ui_elements <- tagList(ui_elements, create_diann_standard_ui(session$ns)) + } + + tagList(ui_elements, create_separator_buttons(session$ns, "sep_dianndata")) + }) + + output$diann_options_ui <- renderUI({ + req(input$filetype == 'diann', input$BIO != 'PTM') + + if (!is_web_server && isTRUE(input$big_file_diann)) { + mbr_def <- if (is.null(input$diann_MBR)) TRUE else input$diann_MBR + quant_col_def <- if (is.null(input$diann_quantificationColumn)) "Fragment.Quant.Corrected" else input$diann_quantificationColumn + + max_feature_def <- if (is.null(input$max_feature_count)) 100 else input$max_feature_count + unique_peps_def <- if (is.null(input$filter_unique_peptides)) FALSE else input$filter_unique_peptides + agg_psms_def <- if (is.null(input$aggregate_psms)) FALSE else input$aggregate_psms + few_obs_def <- if (is.null(input$filter_few_obs)) FALSE else input$filter_few_obs + + tagList( + create_diann_large_filter_options(session$ns, mbr_def, quant_col_def), + create_diann_large_bottom_ui(session$ns, max_feature_def, unique_peps_def, agg_psms_def, few_obs_def) + ) + } else { + NULL + } + }) + output$spectronaut_options_ui <- renderUI({ req(input$filetype == 'spec', input$BIO != 'PTM') @@ -195,7 +241,9 @@ loadpageServer <- function(id, parent_session, is_web_server = FALSE) { enable("proceed1") } } else if (input$filetype == "diann") { - if(!is.null(input$dianndata) && !is.null(input$sep_dianndata)) { # && !is.null(input$annot) + diann_regular_file_ok <- !isTRUE(input$big_file_diann) && !is.null(input$dianndata) + diann_big_file_ok <- isTRUE(input$big_file_diann) && length(local_big_file_path()) > 0 + if((diann_regular_file_ok || diann_big_file_ok) && !is.null(input$sep_dianndata)) { enable("proceed1") } } diff --git a/R/module-loadpage-ui.R b/R/module-loadpage-ui.R index 0d06d74..efae2bf 100644 --- a/R/module-loadpage-ui.R +++ b/R/module-loadpage-ui.R @@ -271,11 +271,10 @@ create_skyline_uploads <- function(ns) { #' Create DIANN file uploads #' @noRd create_diann_uploads <- function(ns) { - conditionalPanel( - condition = "input['loadpage-filetype'] == 'diann' && input['loadpage-BIO'] != 'PTM'", - h4("4. Upload MSstats report from DIANN"), - fileInput(ns('dianndata'), "", multiple = FALSE, accept = NULL), - create_separator_buttons(ns, "sep_dianndata") + tagList( + uiOutput(ns("diann_header_ui")), + uiOutput(ns("diann_file_selection_ui")), + uiOutput(ns("diann_options_ui")) ) } @@ -295,24 +294,51 @@ create_spectronaut_header <- function() { h4("4. Upload MSstats scheme output from Spectronaut") } +#' Create DIANN header +#' @noRd +create_diann_header <- function() { + h4("4. Upload MSstats report from DIANN") +} + #' Create Spectronaut mode selector (Local only) #' @noRd create_spectronaut_mode_selector <- function(ns, selected = FALSE) { checkboxInput(ns("big_file_spec"), "Large file mode", value = selected) } +#' Create DIANN mode selector (Local only) +#' @noRd +create_diann_mode_selector <- function(ns, selected = FALSE) { + checkboxInput(ns("big_file_diann"), "Large file mode", value = selected) +} + #' Create Spectronaut standard file input #' @noRd create_spectronaut_standard_ui <- function(ns) { fileInput(ns('specdata'), "", multiple = FALSE, accept = NULL) } +#' Create DIANN standard file input +#' @noRd +create_diann_standard_ui <- function(ns) { + fileInput(ns('dianndata'), "", multiple = FALSE, accept = NULL) +} + #' Create Spectronaut large file selection UI #' @noRd create_spectronaut_large_file_ui <- function(ns) { tagList( shinyFiles::shinyFilesButton(ns("big_file_browse"), "Browse for local file...", "Please select a file", multiple = FALSE), - verbatimTextOutput(ns("specdata_big_path")) + verbatimTextOutput(ns("big_file_path")) + ) +} + +#' Create DIANN large file selection UI +#' @noRd +create_diann_large_file_ui <- function(ns) { + tagList( + shinyFiles::shinyFilesButton(ns("big_file_browse"), "Browse for local file...", "Please select a file", multiple = FALSE), + verbatimTextOutput(ns("big_file_path")) ) } @@ -328,6 +354,17 @@ create_spectronaut_large_filter_options <- function(ns, excluded_def = FALSE, id ) } +#' Create DIANN large file filter options +#' @noRd +create_diann_large_filter_options <- function(ns, mbr_def = TRUE, quant_col_def = "Fragment.Quant.Corrected") { + tagList( + tags$hr(), + h4("Options for large file processing"), + checkboxInput(ns("diann_MBR"), "MBR Enabled", value = mbr_def), + textInput(ns("diann_quantificationColumn"), "Quantification Column", value = quant_col_def) + ) +} + #' Create Spectronaut Q-value cutoff input #' @noRd create_spectronaut_qvalue_cutoff_ui <- function(ns, cutoff_def = 0.01) { @@ -345,6 +382,17 @@ create_spectronaut_large_bottom_ui <- function(ns, max_feature_def = 20, unique_ ) } +#' Create DIANN large file options (Bottom part) +#' @noRd +create_diann_large_bottom_ui <- function(ns, max_feature_def = 100, unique_peps_def = FALSE, agg_psms_def = FALSE, few_obs_def = FALSE) { + tagList( + numericInput(ns("max_feature_count"), "Max feature count", value = max_feature_def, min = 1), + checkboxInput(ns("filter_unique_peptides"), "Use unique peptides", value = unique_peps_def), + checkboxInput(ns("aggregate_psms"), "Aggregate PSMs to peptides", value = agg_psms_def), + checkboxInput(ns("filter_few_obs"), "Filter features with few observations", value = few_obs_def) + ) +} + #' Create PTM FragPipe uploads #' @noRd create_ptm_fragpipe_uploads <- function(ns) { diff --git a/R/utils.R b/R/utils.R index e2dee6b..4aa0094 100644 --- a/R/utils.R +++ b/R/utils.R @@ -530,7 +530,7 @@ getData <- function(input) { shinybusy::update_modal_spinner(text = "Processing large Spectronaut file...") # Call the big file conversion function from MSstatsConvert - converted_data <- MSstatsBig::bigSpectronauttoMSstatsFormat( + converted_data <- bigSpectronauttoMSstatsFormat( input_file = local_big_file_path, output_file_name = "output_file.csv", backend = "arrow", @@ -592,10 +592,70 @@ getData <- function(input) { } } else if(input$filetype == 'diann') { + if (isTRUE(input$big_file_diann)) { + # Logic for big DIANN files + # Parse the file path from shinyFiles input + volumes <- shinyFiles::getVolumes()() + path_info <- shinyFiles::parseFilePaths(volumes, input$big_file_browse) + local_big_file_path <- if (nrow(path_info) > 0) path_info$datapath else NULL + + if (!is.numeric(input$max_feature_count) || is.na(input$max_feature_count) || input$max_feature_count <= 0) { + showNotification("Error: max_feature_count must be a positive number.", type = "error") + shinybusy::remove_modal_spinner() + return(NULL) + } + + if (is.null(local_big_file_path) || !file.exists(local_big_file_path)) { + showNotification("Error: The selected file does not exist or is not readable.", type = "error") + shinybusy::remove_modal_spinner() + return(NULL) + } + + shinybusy::update_modal_spinner(text = "Processing large DIANN file...") + + # Call the big file conversion function from MSstatsConvert + converted_data <- bigDIANNtoMSstatsFormat( + input_file = local_big_file_path, + annotation = getAnnot(input), + output_file_name = "output_file.csv", + backend = "arrow", + MBR = isTRUE(input$diann_MBR), + quantificationColumn = input$diann_quantificationColumn, + max_feature_count = input$max_feature_count, + filter_unique_peptides = input$filter_unique_peptides, + aggregate_psms = input$aggregate_psms, + filter_few_obs = input$filter_few_obs + ) + + # Attempt to load the data into memory. + mydata <- tryCatch({ + dplyr::collect(converted_data) + }, error = function(e) { + showNotification( + paste("Memory Error: The dataset is too large to process in-memory.", e$message), + type = "error", + duration = NULL + ) + return(NULL) + }) + + if (is.null(mydata)) { + shinybusy::remove_modal_spinner() + return(NULL) + } + } else { if (getFileExtension(input$dianndata$name) %in% c("parquet", "pq")) { data = read_parquet(input$dianndata$datapath) } else { - data = read.csv(input$dianndata$datapath, sep=input$sep_dianndata) + sep = input$sep_dianndata + if(is.null(sep)) { + sep = "\t" + } + if (sep == "\t") { + data = read.delim(input$dianndata$datapath) + } else { + data = read.csv(input$dianndata$datapath, sep = sep) + } } qvalue_cutoff = 0.01 @@ -620,6 +680,7 @@ getData <- function(input) { use_log_file = FALSE, quantificationColumn = quantificationColumn ) + } print("Mydata from mstats") print(mydata) } @@ -721,7 +782,8 @@ library(MSstatsTMT) library(MSstatsPTM)\n", sep = "") codes = paste(codes, "\n# Package versions\n# MSstats version ", packageVersion("MSstats"), "\n# MSstatsTMT version ", packageVersion("MSstatsTMT"), - "\n# MSstatsPTM version ", packageVersion("MSstatsPTM"), sep = "") + "\n# MSstatsPTM version ", packageVersion("MSstatsPTM"), + "\n# MSstatsBig version ", tryCatch(packageVersion("MSstatsBig"), error = function(e) "Not Installed"), sep = "") codes = paste(codes, "\n\n# Read data\n", sep = "") if(input$filetype == 'sample') { if(input$BIO != "PTM" && input$DDA_DIA =='LType' && input$LabelFreeType == "SRM_PRM") { @@ -843,27 +905,68 @@ library(MSstatsPTM)\n", sep = "") } else if(input$filetype == 'spec') { - codes = paste(codes, "data = read.csv(\"insert your MSstats scheme output from Spectronaut filepath\", header = TRUE, sep = ",input$sep_specdata,")\nannot_file = read.csv(\"insert your annotation filepath\", sep='\t')#Optional\n" - , sep = "") - - codes = paste(codes, "data = SpectronauttoMSstatsFormat(data, - annotation = annot_file #Optional, - filter_with_Qvalue = TRUE, ## same as default - qvalue_cutoff = 0.01, ## same as default - fewMeasurements=\"remove\", - removeProtein_with1Feature = TRUE, - use_log_file = FALSE)\n", sep = "") + if (isTRUE(input$big_file_spec)) { + codes = paste(codes, "library(MSstatsBig)\n", sep = "") + codes = paste(codes, "data = MSstatsBig::bigSpectronauttoMSstatsFormat(\n", sep = "") + codes = paste(codes, " input_file = \"insert your large Spectronaut file path\",\n", sep = "") + codes = paste(codes, " output_file_name = \"output_file.csv\",\n", sep = "") + codes = paste(codes, " backend = \"arrow\",\n", sep = "") + codes = paste(codes, " filter_by_excluded = ", input$filter_by_excluded, ",\n", sep = "") + codes = paste(codes, " filter_by_identified = ", input$filter_by_identified, ",\n", sep = "") + codes = paste(codes, " filter_by_qvalue = ", input$filter_by_qvalue, ",\n", sep = "") + codes = paste(codes, " qvalue_cutoff = ", input$qvalue_cutoff, ",\n", sep = "") + codes = paste(codes, " max_feature_count = ", input$max_feature_count, ",\n", sep = "") + codes = paste(codes, " filter_unique_peptides = ", input$filter_unique_peptides, ",\n", sep = "") + codes = paste(codes, " aggregate_psms = ", input$aggregate_psms, ",\n", sep = "") + codes = paste(codes, " filter_few_obs = ", input$filter_few_obs, "\n", sep = "") + codes = paste(codes, ")\n", sep = "") + codes = paste(codes, "data = dplyr::collect(data)\n", sep = "") + } else { + codes = paste(codes, "data = read.csv(\"insert your MSstats scheme output from Spectronaut filepath\", header = TRUE, sep = ",input$sep_specdata,")\nannot_file = read.csv(\"insert your annotation filepath\", sep='\t')#Optional\n" + , sep = "") + codes = paste(codes, "data = SpectronauttoMSstatsFormat(data, + annotation = annot_file #Optional, + filter_with_Qvalue = TRUE, ## same as default + qvalue_cutoff = 0.01, ## same as default + fewMeasurements=\"remove\", + removeProtein_with1Feature = TRUE, + use_log_file = FALSE)\n", sep = "") + } } else if(input$filetype == 'diann') { - codes = paste(codes, "data = read.csv(\"insert your MSstats scheme output from DIANN filepath\", header = TRUE, sep = '\\t')\nannot_file = read.csv(\"insert your annotation filepath\")#Optional\n" - , sep = "") - - codes = paste(codes, "data = DIANNtoMSstatsFormat(data, - annotation = annot_file, #Optional - qvalue_cutoff = 0.01, ## same as default - removeProtein_with1Feature = TRUE, - use_log_file = FALSE)\n", sep = "") + if (isTRUE(input$big_file_diann)) { + codes = paste(codes, "library(MSstatsBig)\n", sep = "") + codes = paste(codes, "data = MSstatsBig::bigDIANNtoMSstatsFormat(\n", sep = "") + codes = paste(codes, " input_file = \"insert your large DIANN file path\",\n", sep = "") + codes = paste(codes, " output_file_name = \"output_file.csv\",\n", sep = "") + codes = paste(codes, " backend = \"arrow\",\n", sep = "") + codes = paste(codes, " MBR = ", isTRUE(input$diann_MBR), ",\n", sep = "") + codes = paste(codes, " quantificationColumn = \"", input$diann_quantificationColumn, "\",\n", sep = "") + codes = paste(codes, " max_feature_count = ", input$max_feature_count, ",\n", sep = "") + codes = paste(codes, " filter_unique_peptides = ", input$filter_unique_peptides, ",\n", sep = "") + codes = paste(codes, " aggregate_psms = ", input$aggregate_psms, ",\n", sep = "") + codes = paste(codes, " filter_few_obs = ", input$filter_few_obs, "\n", sep = "") + codes = paste(codes, ")\n", sep = "") + codes = paste(codes, "data = dplyr::collect(data)\n", sep = "") + } else { + sep = input$sep_dianndata + if(is.null(sep)) { + sep = "\t" + } + + if (sep == "\t") { + codes = paste(codes, "data = read.delim(\"insert your MSstats scheme output from DIANN filepath\")\nannot_file = read.csv(\"insert your annotation filepath\")#Optional\n", sep = "") + } else { + codes = paste(codes, "data = read.csv(\"insert your MSstats scheme output from DIANN filepath\", header = TRUE, sep = '", sep, "')\nannot_file = read.csv(\"insert your annotation filepath\")#Optional\n", sep = "") + } + + codes = paste(codes, "data = DIANNtoMSstatsFormat(data, + annotation = annot_file, #Optional + qvalue_cutoff = 0.01, ## same as default + removeProtein_with1Feature = TRUE, + use_log_file = FALSE)\n", sep = "") + } } else if(input$filetype == 'open') { diff --git a/tests/testthat/test-utils.R b/tests/testthat/test-utils.R index 8d3d6ce..e72f98b 100644 --- a/tests/testthat/test-utils.R +++ b/tests/testthat/test-utils.R @@ -1527,3 +1527,78 @@ describe("getData for Big Spectronaut", { expect_null(res) }) }) + +describe("getData for Big DIANN", { + + # Common mock input for big diann + mock_input_big_diann <- list( + filetype = "diann", + big_file_diann = TRUE, + big_file_browse = list(files = list("file.csv")), + max_feature_count = 20, + diann_MBR = TRUE, + diann_quantificationColumn = "Fragment.Quant.Corrected", + filter_unique_peptides = TRUE, + aggregate_psms = TRUE, + filter_few_obs = TRUE, + BIO = "Protein", + DDA_DIA = "DIA" + ) + + # Mock data to return + mock_arrow_obj <- list(dummy = "arrow") + mock_df <- data.frame(ProteinName = "P1", Intensity = 100) + + test_that("Valid input returns data", { + # Mocks + stub(getData, "shinyFiles::getVolumes", function() function() c(root = "/")) + stub(getData, "shinyFiles::parseFilePaths", function(...) data.frame(datapath = "test.csv")) + stub(getData, "file.exists", TRUE) + stub(getData, "MSstatsBig::bigDIANNtoMSstatsFormat", mock_arrow_obj) + stub(getData, "dplyr::collect", mock_df) + stub(getData, "showNotification", function(...) NULL) + stub(getData, "shinybusy::update_modal_spinner", function(...) NULL) + stub(getData, "shinybusy::remove_modal_spinner", function(...) NULL) + + res <- getData(mock_input_big_diann) + expect_equal(res, mock_df) + }) + + test_that("Invalid max_feature_count returns NULL", { + bad_input <- mock_input_big_diann + bad_input$max_feature_count <- 0 + + stub(getData, "shinyFiles::getVolumes", function() function() c(root = "/")) + stub(getData, "shinyFiles::parseFilePaths", function(...) data.frame(datapath = "test.csv")) + stub(getData, "showNotification", function(msg, ...) expect_match(msg, "max_feature_count")) + stub(getData, "shinybusy::remove_modal_spinner", function(...) NULL) + + res <- getData(bad_input) + expect_null(res) + }) + + test_that("File not found returns NULL", { + stub(getData, "shinyFiles::getVolumes", function() function() c(root = "/")) + stub(getData, "shinyFiles::parseFilePaths", function(...) data.frame(datapath = "nonexistent.csv")) + stub(getData, "file.exists", FALSE) + stub(getData, "showNotification", function(msg, ...) expect_match(msg, "does not exist")) + stub(getData, "shinybusy::remove_modal_spinner", function(...) NULL) + + res <- getData(mock_input_big_diann) + expect_null(res) + }) + + test_that("Memory error returns NULL", { + stub(getData, "shinyFiles::getVolumes", function() function() c(root = "/")) + stub(getData, "shinyFiles::parseFilePaths", function(...) data.frame(datapath = "test.csv")) + stub(getData, "file.exists", TRUE) + stub(getData, "MSstatsBig::bigDIANNtoMSstatsFormat", mock_arrow_obj) + stub(getData, "dplyr::collect", function(...) stop("Memory allocation failed")) + stub(getData, "showNotification", function(msg, ...) expect_match(msg, "Memory Error")) + stub(getData, "shinybusy::update_modal_spinner", function(...) NULL) + stub(getData, "shinybusy::remove_modal_spinner", function(...) NULL) + + res <- getData(mock_input_big_diann) + expect_null(res) + }) +}) diff --git a/tests/testthat/test_parallel_simulation.R b/tests/testthat/test_parallel_simulation.R new file mode 100644 index 0000000..166ef4b --- /dev/null +++ b/tests/testthat/test_parallel_simulation.R @@ -0,0 +1,159 @@ +library(data.table) +library(survival) +library(parallel) + +# ------------------------------------------------------------------------- +# 1. Mock Internal MSstats Functions +# ------------------------------------------------------------------------- + +.fitSurvival <- function(data, iterations) { + tryCatch({ + # Create a real survreg object to simulate memory usage + # Use a subset of data to ensure stability/speed, as we only care about object size + fit <- survreg(Surv(newABUNDANCE, !cen, type="left") ~ 1, + data = head(data, 500), dist = "gaussian") + # Artificially bloat the object to simulate a complex model (approx 100MB) + # Real MSstats models can be very large due to model frames and environments + fit$bloat <- numeric(12.5 * 1024 * 1024) + return(fit) + }, error = function(e) return(NULL)) +} + +.isSummarizable <- function(data, remove50missing) return(data) + +.runTukey <- function(data, is_labeled, censored_symbol, remove50missing) { + return(data.table(Protein = "TestProtein", LogIntensities = mean(data$newABUNDANCE, na.rm=TRUE))) +} + +# ------------------------------------------------------------------------- +# 2. Define Functions with "Work Simulation" (Sleep) +# ------------------------------------------------------------------------- + +# LEAKY VERSION +MSstatsSummarizeSingleTMP_Leaky_Sim <- function (single_protein, impute, censored_symbol, remove50missing, aft_iterations = 90) { + # ... Setup ... + newABUNDANCE = n_obs = n_obs_run = RUN = FEATURE = LABEL = NULL + predicted = censored = NULL + cols = intersect(colnames(single_protein), c("newABUNDANCE", "cen", "RUN", "FEATURE", "ref")) + single_protein = single_protein[(n_obs > 1 & !is.na(n_obs)) & (n_obs_run > 0 & !is.na(n_obs_run))] + if (nrow(single_protein) == 0) return(list(NULL, NULL)) + single_protein[, `:=`(RUN, factor(RUN))] + single_protein[, `:=`(FEATURE, factor(FEATURE))] + + if (impute & any(single_protein[["censored"]])) { + converged = TRUE + survival_fit = withCallingHandlers({ + .fitSurvival(single_protein[LABEL == "L", cols, with = FALSE], aft_iterations) + }, warning = function(w) { if (grepl("converge", conditionMessage(w), ignore.case = TRUE)) converged <<- FALSE }) + + if (converged && !is.null(survival_fit)) { + single_protein[, `:=`(predicted, predict(survival_fit, newdata = .SD))] + } else { + single_protein[, `:=`(predicted, NA_real_)] + } + + # --- LEAK SIMULATION --- + # The object 'survival_fit' is still in memory here. + # We simulate "doing other work" (predictions, formatting) by sleeping. + Sys.sleep(1) + + # Report Memory Usage of this Worker + mem_used <- sum(gc()[,2]) + msg <- sprintf("[Worker %d] LEAKY State - Holding Memory: %.2f MB\n", Sys.getpid(), mem_used) + cat(msg) + cat(msg, file = "parallel_log.txt", append = TRUE) + + single_protein[, `:=`(predicted, ifelse(censored & (LABEL == "L"), predicted, NA))] + single_protein[, `:=`(newABUNDANCE, ifelse(censored & LABEL == "L", predicted, newABUNDANCE))] + survival = single_protein[, c(cols, "predicted"), with = FALSE] + } else { + survival = single_protein[, cols, with = FALSE] + survival[, `:=`(predicted, NA)] + } + # ... Finalize ... + single_protein = .isSummarizable(single_protein, remove50missing) + if (is.null(single_protein)) return(list(NULL, NULL)) + result = .runTukey(single_protein, TRUE, censored_symbol, remove50missing) + list(result, survival) +} + +# FIXED VERSION +MSstatsSummarizeSingleTMP_Fixed_Sim <- function (single_protein, impute, censored_symbol, remove50missing, aft_iterations = 90) { + # ... Setup ... + newABUNDANCE = n_obs = n_obs_run = RUN = FEATURE = LABEL = NULL + predicted = censored = NULL + cols = intersect(colnames(single_protein), c("newABUNDANCE", "cen", "RUN", "FEATURE", "ref")) + single_protein = single_protein[(n_obs > 1 & !is.na(n_obs)) & (n_obs_run > 0 & !is.na(n_obs_run))] + if (nrow(single_protein) == 0) return(list(NULL, NULL)) + single_protein[, `:=`(RUN, factor(RUN))] + single_protein[, `:=`(FEATURE, factor(FEATURE))] + + if (impute & any(single_protein[["censored"]])) { + converged = TRUE + survival_fit = withCallingHandlers({ + .fitSurvival(single_protein[LABEL == "L", cols, with = FALSE], aft_iterations) + }, warning = function(w) { if (grepl("converge", conditionMessage(w), ignore.case = TRUE)) converged <<- FALSE }) + + if (converged && !is.null(survival_fit)) { + single_protein[, `:=`(predicted, predict(survival_fit, newdata = .SD))] + } else { + single_protein[, `:=`(predicted, NA_real_)] + } + + # --- FIX APPLIED --- + rm(survival_fit) + + # --- FIXED SIMULATION --- + # We simulate "doing other work" by sleeping. + Sys.sleep(1) + + # Report Memory Usage of this Worker + mem_used <- sum(gc()[,2]) + msg <- sprintf("[Worker %d] FIXED State - Holding Memory: %.2f MB\n", Sys.getpid(), mem_used) + cat(msg) + cat(msg, file = "parallel_log.txt", append = TRUE) + + single_protein[, `:=`(predicted, ifelse(censored & (LABEL == "L"), predicted, NA))] + single_protein[, `:=`(newABUNDANCE, ifelse(censored & LABEL == "L", predicted, newABUNDANCE))] + survival = single_protein[, c(cols, "predicted"), with = FALSE] + } else { + survival = single_protein[, cols, with = FALSE] + survival[, `:=`(predicted, NA)] + } + # ... Finalize ... + single_protein = .isSummarizable(single_protein, remove50missing) + if (is.null(single_protein)) return(list(NULL, NULL)) + result = .runTukey(single_protein, TRUE, censored_symbol, remove50missing) + list(result, survival) +} + +# ------------------------------------------------------------------------- +# 3. Run Simulation +# ------------------------------------------------------------------------- + +set.seed(123) +n_rows <- 20000 +dt <- data.table( + newABUNDANCE = rnorm(n_rows, 20, 5), + censored = sample(c(TRUE, FALSE), n_rows, replace=TRUE, prob=c(0.3, 0.7)), + LABEL = "L", RUN = sample(1:20, n_rows, replace=TRUE), FEATURE = sample(1:500, n_rows, replace=TRUE), + n_obs = 5, n_obs_run = 5, cen = FALSE, ref = "ref" +) +dt$cen <- dt$censored +dt$newABUNDANCE[dt$censored] <- dt$newABUNDANCE[dt$censored] - 5 + +# Clear log file +file.create("parallel_log.txt") + +cat("\n--- Simulating LEAKY Parallel Execution ---\n") +# We run 2 cores. Both will hit the 'sleep' at the same time. +# Both will report HIGH memory because they haven't cleaned up yet. +invisible(mclapply(1:2, function(i) MSstatsSummarizeSingleTMP_Leaky_Sim(copy(dt), TRUE, "NA", FALSE), mc.cores = 2)) + +cat("\n--- Simulating FIXED Parallel Execution ---\n") +# We run 2 cores. Both will hit the 'sleep' at the same time. +# Both will report LOW memory because they cleaned up BEFORE sleeping. +invisible(mclapply(1:2, function(i) MSstatsSummarizeSingleTMP_Fixed_Sim(copy(dt), TRUE, "NA", FALSE), mc.cores = 2)) + +cat("\n--- Log File Content ---\n") +cat(readLines("parallel_log.txt"), sep = "\n") \ No newline at end of file