diff --git a/nextflow/build.gradle b/nextflow/build.gradle index bb072afd..155a6f66 100644 --- a/nextflow/build.gradle +++ b/nextflow/build.gradle @@ -12,4 +12,8 @@ dependencies { BuildUtils.addLabKeyDependency(project: project, config: "modules", depProjectPath: BuildUtils.getPlatformModuleProjectPath(project.gradle, "experiment"), depProjectConfig: "published", depExtension: "module") BuildUtils.addLabKeyDependency(project: project, config: "modules", depProjectPath: BuildUtils.getPlatformModuleProjectPath(project.gradle, "pipeline"), depProjectConfig: "published", depExtension: "module") + + BuildUtils.addLabKeyDependency(project: project, config: "implementation", depProjectPath: ":server:modules:targetedms", depProjectConfig: "apiJarFile") + BuildUtils.addLabKeyDependency(project: project, config: "jspImplementation", depProjectPath: ":server:modules:targetedms", depProjectConfig: "apiJarFile") + BuildUtils.addLabKeyDependency(project: project, config: "modules", depProjectPath: ":server:modules:targetedms", depProjectConfig: "published", depExtension: "module") } diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java index 40455d8c..6f31dafa 100644 --- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java +++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java @@ -2,17 +2,23 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; +import org.json.JSONObject; import org.labkey.api.data.Container; import org.labkey.api.files.FileContentService; import org.labkey.api.pipeline.ParamParser; import org.labkey.api.pipeline.PipeRoot; import org.labkey.api.pipeline.PipelineJobService; +import org.labkey.api.pipeline.PipelineService; +import org.labkey.api.pipeline.PipelineStatusFile; import org.labkey.api.pipeline.TaskId; import org.labkey.api.pipeline.TaskPipeline; import org.labkey.api.pipeline.file.AbstractFileAnalysisJob; import org.labkey.api.util.FileUtil; import org.labkey.api.util.PageFlowUtil; +import org.labkey.api.util.StringUtilsLabKey; +import org.labkey.api.util.logging.LogHelper; import org.labkey.api.view.ViewBackgroundInfo; import java.io.BufferedWriter; @@ -26,6 +32,8 @@ @Getter public class NextFlowPipelineJob extends AbstractFileAnalysisJob { + protected static final Logger LOG = LogHelper.getLogger(NextFlowPipelineJob.class, "NextFlow jobs"); + private Path config; @SuppressWarnings("unused") // For serialization @@ -51,6 +59,24 @@ public NextFlowPipelineJob(ViewBackgroundInfo info, @NotNull PipeRoot root, Path super(new NextFlowProtocol(), NextFlowPipelineProvider.NAME, info, root, config.getFileName().toString(), config, inputFiles, false, false); this.config = config; setLogFile(log); + LOG.info("NextFlow job queued: {}", getJsonJobInfo()); + } + + protected JSONObject getJsonJobInfo() + { + JSONObject result = new JSONObject(); + result.put("user", getUser().getEmail()); + result.put("container", getContainer().getPath()); + result.put("filePath", getLogFilePath().getParent().toString()); + result.put("runName", getNextFlowRunName()); + result.put("configFile", getConfig().getFileName().toString()); + return result; + } + + protected String getNextFlowRunName() + { + PipelineStatusFile file = PipelineService.get().getStatusFile(getJobGUID()); + return file == null ? "Unknown" : ("LabKeyJob" + file.getRowId()); } @Override @@ -70,9 +96,12 @@ private static Path createConfig(Path configTemplate, Path parentDir, Path jobDi String webdavUrl = FileContentService.get().getWebDavUrl(parentDir, container, FileContentService.PathType.full); webdavUrl = StringUtils.stripEnd(webdavUrl, "/"); - String substitutedContent = template.replace("${quant_spectra_dir}", "quant_spectra_dir = '" + webdavUrl + "'"); + String uploadUrl = FileContentService.get().getWebDavUrl(jobDir, container, FileContentService.PathType.full); + uploadUrl = StringUtils.stripEnd(uploadUrl, "/"); + substitutedContent = substitutedContent.replace("${panorama.upload_url}", "panorama.upload_url = '" + uploadUrl + "'"); + Path substitutedFile = jobDir.resolve(configTemplate.getFileName()); try (BufferedWriter writer = Files.newBufferedWriter(substitutedFile)) { @@ -84,7 +113,7 @@ private static Path createConfig(Path configTemplate, Path parentDir, Path jobDi @Override public String getDescription() { - return "NextFlow analysis using " + config.getFileName() + " of " + getInputFilePaths().size() + " files"; + return "NextFlow analysis of " + StringUtilsLabKey.pluralize(getInputFilePaths().size(), "file") + " using config: " + config.getFileName(); } @Override @@ -116,5 +145,4 @@ public File findOutputFile(String name) { return null; } - } diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowProtocol.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowProtocol.java index a2d7270c..2faf82b1 100644 --- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowProtocol.java +++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowProtocol.java @@ -16,7 +16,7 @@ public class NextFlowProtocol extends AbstractFileAnalysisProtocol { public static final List INPUT_TYPES = List.of( - new FileType(".RAW"), + new FileType(".raw"), new FileType(".mzML")); public NextFlowProtocol() diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java index 8954963e..35670120 100644 --- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java +++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java @@ -2,14 +2,17 @@ import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; +import org.labkey.api.exp.XarFormatException; import org.labkey.api.pipeline.AbstractTaskFactory; import org.labkey.api.pipeline.AbstractTaskFactorySettings; import org.labkey.api.pipeline.PipelineJob; import org.labkey.api.pipeline.PipelineJobException; +import org.labkey.api.pipeline.PipelineValidationException; import org.labkey.api.pipeline.RecordedAction; import org.labkey.api.pipeline.RecordedActionSet; import org.labkey.api.pipeline.WorkDirectoryTask; import org.labkey.api.security.SecurityManager; +import org.labkey.api.targetedms.TargetedMSService; import org.labkey.api.util.FileType; import org.labkey.nextflow.NextFlowConfiguration; import org.labkey.nextflow.NextFlowManager; @@ -39,14 +42,14 @@ public NextFlowRunTask(Factory factory, PipelineJob job) super(factory, job); } - - @Override public @NotNull RecordedActionSet run() throws PipelineJobException { Logger log = getJob().getLogger(); + NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo()); SecurityManager.TransformSession session = null; + boolean success = false; try { @@ -66,20 +69,23 @@ public NextFlowRunTask(Factory factory, PipelineJob job) // Need to pass to the main process directly in the future to allow concurrent execution for different users ProcessBuilder secretsPB = new ProcessBuilder("nextflow", "secrets", "set", "PANORAMA_API_KEY", apiKey); - log.info("Job Started"); + log.info("Setting secrets"); File dir = getJob().getLogFile().getParentFile(); getJob().runSubProcess(secretsPB, dir); ProcessBuilder executionPB = new ProcessBuilder(getArgs()); getJob().runSubProcess(executionPB, dir); log.info("Job Finished"); + NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo()); RecordedAction action = new RecordedAction(ACTION_NAME); for (Path inputFile : getJob().getInputFilePaths()) { action.addInput(inputFile.toFile(), SPECTRA_INPUT_ROLE); } - addOutputs(action, getJob().getLogFilePath().getParent().resolve("reports")); + addOutputs(action, getJob().getLogFilePath().getParent().resolve("reports"), log); + addOutputs(action, getJob().getLogFilePath().getParent().resolve("results"), log); + success = true; return new RecordedActionSet(action); } catch (IOException e) @@ -92,14 +98,31 @@ public NextFlowRunTask(Factory factory, PipelineJob job) { session.close(); } + if (!success) + { + NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo()); + } } } - private void addOutputs(RecordedAction action, Path path) throws IOException + private void addOutputs(RecordedAction action, Path path, Logger log) throws IOException { if (Files.isRegularFile(path)) { action.addOutput(path.toFile(), "Output", false); + if (path.toString().toLowerCase().endsWith(".sky.zip")) + { + try + { + log.info("Queueing import for {}", path); + // Make sure that the TargetedMS runs get wrapped with their experiment run counterparts + TargetedMSService.get().importSkylineDocument(getJob().getInfo(), path); + } + catch (XarFormatException | PipelineValidationException e) + { + log.error("Error queuing import of Skyline document", e); + } + } } else if (Files.isDirectory(path)) { @@ -107,7 +130,7 @@ else if (Files.isDirectory(path)) { for (Path child : listing.toList()) { - addOutputs(action, child); + addOutputs(action, child, log); } } } @@ -165,6 +188,8 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException } args.add("-c"); args.add(configFile.toAbsolutePath().toString()); + args.add("-name"); + args.add(getJob().getNextFlowRunName()); return args; } diff --git a/nextflow/webapp/WEB-INF/nextflow/nextflowContext.xml b/nextflow/webapp/WEB-INF/nextflow/nextflowContext.xml index eee32d08..605ec659 100644 --- a/nextflow/webapp/WEB-INF/nextflow/nextflowContext.xml +++ b/nextflow/webapp/WEB-INF/nextflow/nextflowContext.xml @@ -30,6 +30,8 @@ + +