diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 379aacee..efdf57ef 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -59,7 +59,7 @@ jobs: - name: Run tests shell: bash -l {0} run: | - pytest -r a -v -n 3 --cov=lsst.ctrl.mpexec --cov=tests --cov-report=xml --cov-report=term --cov-branch \ + pytest -Wd -r a -v -n 3 --cov=lsst.ctrl.mpexec --cov=tests --cov-report=xml --cov-report=term --cov-branch \ --junitxml=junit.xml -o junit_family=legacy - name: Upload coverage to codecov uses: codecov/codecov-action@v5 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f998ab01..cc4a88cd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: # Ruff version. rev: v0.13.3 hooks: - - id: ruff + - id: ruff-check args: [--fix] - id: ruff-format - repo: https://github.com/numpy/numpydoc diff --git a/python/lsst/ctrl/mpexec/cli/butler_factory.py b/python/lsst/ctrl/mpexec/cli/butler_factory.py index 593d3a62..a78ab048 100644 --- a/python/lsst/ctrl/mpexec/cli/butler_factory.py +++ b/python/lsst/ctrl/mpexec/cli/butler_factory.py @@ -311,96 +311,39 @@ def _make_read_parts( of ``args``. """ butler = Butler.from_config(butler_config, writeable=False) - self = cls( - butler, - output=output, - output_run=output_run, - inputs=inputs, - extend_run=extend_run, - rebase=rebase, - writeable=False, - ) - self.check(extend_run=extend_run, replace_run=replace_run, prune_replaced=prune_replaced) - if self.output and self.output.exists: - if replace_run: - replaced = self.output.chain[0] - inputs = list(self.output.chain[1:]) - _LOG.debug( - "Simulating collection search in '%s' after removing '%s'.", self.output.name, replaced - ) + try: + self = cls( + butler, + output=output, + output_run=output_run, + inputs=inputs, + extend_run=extend_run, + rebase=rebase, + writeable=False, + ) + self.check(extend_run=extend_run, replace_run=replace_run, prune_replaced=prune_replaced) + if self.output and self.output.exists: + if replace_run: + replaced = self.output.chain[0] + inputs = list(self.output.chain[1:]) + _LOG.debug( + "Simulating collection search in '%s' after removing '%s'.", + self.output.name, + replaced, + ) + else: + inputs = [self.output.name] else: - inputs = [self.output.name] - else: - inputs = list(self.inputs) - if extend_run: - assert self.output_run is not None, "Output collection has to be specified." - inputs.insert(0, self.output_run.name) - collSearch = CollectionWildcard.from_expression(inputs).require_ordered() + inputs = list(self.inputs) + if extend_run: + assert self.output_run is not None, "Output collection has to be specified." + inputs.insert(0, self.output_run.name) + collSearch = CollectionWildcard.from_expression(inputs).require_ordered() + except Exception: + butler.close() + raise return butler, collSearch, self - @classmethod - def make_read_butler( - cls, - butler_config: ResourcePathExpression, - *, - output: str | None, - output_run: str | None, - inputs: str | Iterable[str], - extend_run: bool = False, - rebase: bool = False, - replace_run: bool, - prune_replaced: str | None = None, - ) -> Butler: - """Construct a read-only butler according to the given command-line - arguments. - - Parameters - ---------- - butler_config : convertible to `lsst.resources.ResourcePath` - Path to configuration for the butler. - output : `str` or `None` - The name of a `~lsst.daf.butler.CollectionType.CHAINED` - input/output collection. - output_run : `str` or `None` - The name of a `~lsst.daf.butler.CollectionType.RUN` input/output - collection. - inputs : `str` or `~collections.abc.Iterable` [`str`] - Input collection name or iterable of collection names. - extend_run : `bool` - A boolean indicating whether ``output_run`` should already exist - and be extended. - rebase : `bool` - A boolean indicating whether to force the ``output`` collection to - be consistent with ``inputs`` and ``output`` run such that the - ``output`` collection has output run collections first (i.e. those - that start with the same prefix), then the new inputs, then any - original inputs not included in the new inputs. - replace_run : `bool` - Whether the ``output_run`` should be replaced in the ``output`` - chain. - prune_replaced : `str` or `None` - If ``replace_run=True``, whether/how datasets in the old run should - be removed. Options are ``"purge"``, ``"unstore"``, and `None`. - - Returns - ------- - butler : `lsst.daf.butler.Butler` - A read-only butler initialized with the given collections. - """ - cls.define_datastore_cache() # Ensure that this butler can use a shared cache. - butler, inputs, _ = cls._make_read_parts( - butler_config, - output=output, - output_run=output_run, - inputs=inputs, - extend_run=extend_run, - rebase=rebase, - replace_run=replace_run, - prune_replaced=prune_replaced, - ) - _LOG.debug("Preparing butler to read from %s.", inputs) - return Butler.from_config(butler=butler, collections=inputs) - @classmethod def make_butler_and_collections( cls, diff --git a/python/lsst/ctrl/mpexec/cli/script/build.py b/python/lsst/ctrl/mpexec/cli/script/build.py index 8d034dfb..78bfb7f0 100644 --- a/python/lsst/ctrl/mpexec/cli/script/build.py +++ b/python/lsst/ctrl/mpexec/cli/script/build.py @@ -138,7 +138,11 @@ def build( if butler_config: butler = Butler.from_config(butler_config, writeable=False) - pipeline_graph_factory = PipelineGraphFactory(pipeline, butler, select_tasks) + try: + pipeline_graph_factory = PipelineGraphFactory(pipeline, butler, select_tasks) + finally: + if butler is not None: + butler.close() if pipeline_dot: with open(pipeline_dot, "w") as stream: diff --git a/python/lsst/ctrl/mpexec/cli/script/cleanup.py b/python/lsst/ctrl/mpexec/cli/script/cleanup.py index d84169f4..cf691a3a 100644 --- a/python/lsst/ctrl/mpexec/cli/script/cleanup.py +++ b/python/lsst/ctrl/mpexec/cli/script/cleanup.py @@ -95,8 +95,7 @@ def describe(self, will: bool) -> str: return msg def on_confirmation(self) -> None: - butler = Butler.from_config(self.butler_config, writeable=True) - with butler.transaction(): + with Butler.from_config(self.butler_config, writeable=True) as butler, butler.transaction(): for collection in self.others_to_remove: butler.registry.removeCollection(collection) butler.removeRuns(self.runs_to_remove) @@ -128,25 +127,25 @@ def cleanup( collection : str The name of the chained collection. """ - butler = Butler.from_config(butler_config) - result = CleanupResult(butler_config) - try: - to_keep = set(butler.registry.getCollectionChain(collection)) - except MissingCollectionError: - result.failure = NoSuchCollectionFailure(collection) - return result - except CollectionTypeError: - result.failure = NotChainedCollectionFailure( - collection, butler.registry.getCollectionType(collection).name - ) - return result - to_keep.add(collection) - glob = collection + "*" - to_consider = set(butler.registry.queryCollections(glob)) - to_remove = to_consider - to_keep - for r in to_remove: - if butler.registry.getCollectionType(r) == CollectionType.RUN: - result.runs_to_remove.append(r) - else: - result.others_to_remove.append(r) + with Butler.from_config(butler_config) as butler: + result = CleanupResult(butler_config) + try: + to_keep = set(butler.registry.getCollectionChain(collection)) + except MissingCollectionError: + result.failure = NoSuchCollectionFailure(collection) + return result + except CollectionTypeError: + result.failure = NotChainedCollectionFailure( + collection, butler.registry.getCollectionType(collection).name + ) + return result + to_keep.add(collection) + glob = collection + "*" + to_consider = set(butler.registry.queryCollections(glob)) + to_remove = to_consider - to_keep + for r in to_remove: + if butler.registry.getCollectionType(r) == CollectionType.RUN: + result.runs_to_remove.append(r) + else: + result.others_to_remove.append(r) return result diff --git a/python/lsst/ctrl/mpexec/cli/script/pre_exec_init_qbb.py b/python/lsst/ctrl/mpexec/cli/script/pre_exec_init_qbb.py index 7e9cbae1..46129609 100644 --- a/python/lsst/ctrl/mpexec/cli/script/pre_exec_init_qbb.py +++ b/python/lsst/ctrl/mpexec/cli/script/pre_exec_init_qbb.py @@ -94,7 +94,7 @@ def pre_exec_init_qbb( # Make QBB. _LOG.verbose("Initializing quantum-backed butler.") - butler = qg.make_init_qbb(butler_config, config_search_paths=config_search_path) - # Save all InitOutputs, configs, etc. - _LOG.verbose("Instantiating tasks and saving init-outputs.") - qg.init_output_run(butler) + with qg.make_init_qbb(butler_config, config_search_paths=config_search_path) as butler: + # Save all InitOutputs, configs, etc. + _LOG.verbose("Instantiating tasks and saving init-outputs.") + qg.init_output_run(butler) diff --git a/python/lsst/ctrl/mpexec/cli/script/purge.py b/python/lsst/ctrl/mpexec/cli/script/purge.py index 9e600137..2f2be5c1 100644 --- a/python/lsst/ctrl/mpexec/cli/script/purge.py +++ b/python/lsst/ctrl/mpexec/cli/script/purge.py @@ -156,8 +156,7 @@ def on_confirmation(self) -> None: if self.failure: # This should not happen, it is a logic error. raise RuntimeError("Can not purge, there were errors preparing collections.") - butler = Butler.from_config(self.butler_config, writeable=True) - with butler.transaction(): + with Butler.from_config(self.butler_config, writeable=True) as butler, butler.transaction(): for c in itertools.chain(self.others_to_remove, self.chains_to_remove): butler.registry.removeCollection(c) butler.removeRuns(self.runs_to_remove) @@ -290,24 +289,23 @@ def purge( to remove the datasets after confirmation, if needed. """ result = PurgeResult(butler_config) - butler = Butler.from_config(butler_config) - - try: - collection_type = butler.registry.getCollectionType(collection) - except MissingCollectionError: - result.fail(TopCollectionNotFoundFailure(collection)) - return result - - if collection_type != CollectionType.CHAINED: - result.fail(TopCollectionIsNotChainedFailure(collection, collection_type)) - elif parents := check_parents(butler, collection, []): - result.fail(TopCollectionHasParentsFailure(collection, parents)) - else: - prepare_to_remove( - top_collection=collection, - parent_collection=collection, - purge_result=result, - butler=butler, - recursive=recursive, - ) + with Butler.from_config(butler_config) as butler: + try: + collection_type = butler.registry.getCollectionType(collection) + except MissingCollectionError: + result.fail(TopCollectionNotFoundFailure(collection)) + return result + + if collection_type != CollectionType.CHAINED: + result.fail(TopCollectionIsNotChainedFailure(collection, collection_type)) + elif parents := check_parents(butler, collection, []): + result.fail(TopCollectionHasParentsFailure(collection, parents)) + else: + prepare_to_remove( + top_collection=collection, + parent_collection=collection, + purge_result=result, + butler=butler, + recursive=recursive, + ) return result diff --git a/python/lsst/ctrl/mpexec/cli/script/qgraph.py b/python/lsst/ctrl/mpexec/cli/script/qgraph.py index d379639a..0215ddce 100644 --- a/python/lsst/ctrl/mpexec/cli/script/qgraph.py +++ b/python/lsst/ctrl/mpexec/cli/script/qgraph.py @@ -220,107 +220,115 @@ def qgraph( replace_run=replace_run, prune_replaced=prune_replaced, ) + with butler: + if skip_existing and run: + skip_existing_in += (run,) - if skip_existing and run: - skip_existing_in += (run,) - - qgc: PredictedQuantumGraphComponents - if qgraph is not None: - # click passes empty tuple as default value for qgraph_node_id - quantum_ids = ( - {uuid.UUID(q) if not isinstance(q, uuid.UUID) else q for q in qgraph_node_id} - if qgraph_node_id - else None - ) - qgraph = ResourcePath(qgraph) - match qgraph.getExtension(): - case ".qgraph": - qgc = PredictedQuantumGraphComponents.from_old_quantum_graph( - QuantumGraph.loadUri( - qgraph, - butler.dimensions, - nodes=quantum_ids, - graphID=BuildId(qgraph_id) if qgraph_id is not None else None, + qgc: PredictedQuantumGraphComponents + if qgraph is not None: + # click passes empty tuple as default value for qgraph_node_id + quantum_ids = ( + {uuid.UUID(q) if not isinstance(q, uuid.UUID) else q for q in qgraph_node_id} + if qgraph_node_id + else None + ) + qgraph = ResourcePath(qgraph) + match qgraph.getExtension(): + case ".qgraph": + qgc = PredictedQuantumGraphComponents.from_old_quantum_graph( + QuantumGraph.loadUri( + qgraph, + butler.dimensions, + nodes=quantum_ids, + graphID=BuildId(qgraph_id) if qgraph_id is not None else None, + ) ) - ) - case ".qg": - if qgraph_id is not None: - _LOG.warning("--qgraph-id is ignored when loading new '.qg' files.") - if for_execution or for_init_output_run or save_qgraph or show.needs_full_qg: - import_mode = TaskImportMode.ASSUME_CONSISTENT_EDGES - else: - import_mode = TaskImportMode.DO_NOT_IMPORT - with PredictedQuantumGraph.open(qgraph, import_mode=import_mode) as reader: - if for_execution or qgraph_dot or qgraph_mermaid or show.needs_full_qg or qgraph_node_id: - # This reads everything for the given quanta. - reader.read_execution_quanta(quantum_ids) - elif for_init_output_run: - reader.read_init_quanta() + case ".qg": + if qgraph_id is not None: + _LOG.warning("--qgraph-id is ignored when loading new '.qg' files.") + if for_execution or for_init_output_run or save_qgraph or show.needs_full_qg: + import_mode = TaskImportMode.ASSUME_CONSISTENT_EDGES else: - reader.read_thin_graph() - qgc = reader.components - case ext: - raise ValueError(f"Unrecognized extension for quantum graph: {ext!r}") + import_mode = TaskImportMode.DO_NOT_IMPORT + with PredictedQuantumGraph.open(qgraph, import_mode=import_mode) as reader: + if ( + for_execution + or qgraph_dot + or qgraph_mermaid + or show.needs_full_qg + or qgraph_node_id + ): + # This reads everything for the given quanta. + reader.read_execution_quanta(quantum_ids) + elif for_init_output_run: + reader.read_init_quanta() + else: + reader.read_thin_graph() + qgc = reader.components + case ext: + raise ValueError(f"Unrecognized extension for quantum graph: {ext!r}") - # pipeline can not be provided in this case - if pipeline_graph_factory: - raise ValueError( - "Pipeline must not be given when quantum graph is read from " - f"file: {bool(pipeline_graph_factory)}" - ) - else: - if pipeline_graph_factory is None: - raise ValueError("Pipeline must be given when quantum graph is not read from file.") - # We can't resolve the pipeline graph if we're mocking until after - # we've done the mocking (and the QG build will resolve on its own - # anyway). - pipeline_graph = pipeline_graph_factory(resolve=False) - if mock: - from lsst.pipe.base.tests.mocks import mock_pipeline_graph + # pipeline can not be provided in this case + if pipeline_graph_factory: + raise ValueError( + "Pipeline must not be given when quantum graph is read from " + f"file: {bool(pipeline_graph_factory)}" + ) + else: + if pipeline_graph_factory is None: + raise ValueError("Pipeline must be given when quantum graph is not read from file.") + # We can't resolve the pipeline graph if we're mocking until after + # we've done the mocking (and the QG build will resolve on its own + # anyway). + pipeline_graph = pipeline_graph_factory(resolve=False) + if mock: + from lsst.pipe.base.tests.mocks import mock_pipeline_graph - pipeline_graph = mock_pipeline_graph( + pipeline_graph = mock_pipeline_graph( + pipeline_graph, + unmocked_dataset_types=unmocked_dataset_types, + force_failures=mock_failure, + ) + data_id_tables = [] + for table_file in data_id_table: + with ResourcePath(table_file).as_local() as local_path: + table = Table.read(local_path.ospath) + # Add the filename to the metadata for more logging + # information down in the QG builder. + table.meta["filename"] = table_file + data_id_tables.append(table) + # make execution plan (a.k.a. DAG) for pipeline + graph_builder = AllDimensionsQuantumGraphBuilder( pipeline_graph, - unmocked_dataset_types=unmocked_dataset_types, - force_failures=mock_failure, + butler, + where=data_query, + skip_existing_in=skip_existing_in, + clobber=clobber_outputs, + dataset_query_constraint=DatasetQueryConstraintVariant.fromExpression( + dataset_query_constraint + ), + input_collections=collections, + output_run=run, + data_id_tables=data_id_tables, + ) + # Accumulate metadata (QB builder adds some of its own). + metadata = { + "butler_argument": str(butler_config), + "extend_run": extend_run, + "skip_existing_in": skip_existing_in, + "skip_existing": skip_existing, + "data_query": data_query, + } + assert run is not None, "Butler output run collection must be defined" + qgc = graph_builder.finish( + output, metadata=metadata, attach_datastore_records=qgraph_datastore_records ) - data_id_tables = [] - for table_file in data_id_table: - with ResourcePath(table_file).as_local() as local_path: - table = Table.read(local_path.ospath) - # Add the filename to the metadata for more logging - # information down in the QG builder. - table.meta["filename"] = table_file - data_id_tables.append(table) - # make execution plan (a.k.a. DAG) for pipeline - graph_builder = AllDimensionsQuantumGraphBuilder( - pipeline_graph, - butler, - where=data_query, - skip_existing_in=skip_existing_in, - clobber=clobber_outputs, - dataset_query_constraint=DatasetQueryConstraintVariant.fromExpression(dataset_query_constraint), - input_collections=collections, - output_run=run, - data_id_tables=data_id_tables, - ) - # Accumulate metadata (QB builder adds some of its own). - metadata = { - "butler_argument": str(butler_config), - "extend_run": extend_run, - "skip_existing_in": skip_existing_in, - "skip_existing": skip_existing, - "data_query": data_query, - } - assert run is not None, "Butler output run collection must be defined" - qgc = graph_builder.finish( - output, metadata=metadata, attach_datastore_records=qgraph_datastore_records - ) - if save_qgraph: - _LOG.verbose("Writing quantum graph to %r.", save_qgraph) - qgc.write(save_qgraph) + if save_qgraph: + _LOG.verbose("Writing quantum graph to %r.", save_qgraph) + qgc.write(save_qgraph) - qg = qgc.assemble() + qg = qgc.assemble() if not summarize_quantum_graph(qg): return None diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index dce4adfb..16c71b5a 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -66,54 +66,54 @@ def report( List only the counts (or data_ids if number of failures < 5). This option is good for those who just want to see totals. """ - butler = Butler.from_config(butler_config, writeable=False) qgraph = QuantumGraph.loadUri(qgraph_uri) - report = QuantumGraphExecutionReport.make_reports(butler, qgraph) - if not full_output_filename: - # this is the option to print to the command-line - summary_dict = report.to_summary_dict(butler, logs, human_readable=True) - dataset_table_rows = [] - data_products = [] - quanta_summary = [] - error_summary = [] - for task in summary_dict.keys(): - for data_product in summary_dict[task]["outputs"]: - dataset_table_rows.append(summary_dict[task]["outputs"][data_product]) - data_products.append(data_product) + with Butler.from_config(butler_config, writeable=False) as butler: + report = QuantumGraphExecutionReport.make_reports(butler, qgraph) + if not full_output_filename: + # this is the option to print to the command-line + summary_dict = report.to_summary_dict(butler, logs, human_readable=True) + dataset_table_rows = [] + data_products = [] + quanta_summary = [] + error_summary = [] + for task in summary_dict.keys(): + for data_product in summary_dict[task]["outputs"]: + dataset_table_rows.append(summary_dict[task]["outputs"][data_product]) + data_products.append(data_product) - if len(summary_dict[task]["failed_quanta"]) > 5: - quanta_summary.append( - { - "Task": task, - "Failed": len(summary_dict[task]["failed_quanta"]), - "Blocked": summary_dict[task]["n_quanta_blocked"], - "Succeeded": summary_dict[task]["n_succeeded"], - "Expected": summary_dict[task]["n_expected"], - } - ) - else: - quanta_summary.append( - { - "Task": task, - "Failed": summary_dict[task]["failed_quanta"], - "Blocked": summary_dict[task]["n_quanta_blocked"], - "Succeeded": summary_dict[task]["n_succeeded"], - "Expected": summary_dict[task]["n_expected"], - } - ) - if "errors" in summary_dict[task].keys(): - error_summary.append({task: summary_dict[task]["errors"]}) - quanta = Table(quanta_summary) - datasets = Table(dataset_table_rows) - datasets.add_column(data_products, index=0, name="DatasetType") - quanta.pprint_all() - print("\n") - if not brief: - pprint.pprint(error_summary) + if len(summary_dict[task]["failed_quanta"]) > 5: + quanta_summary.append( + { + "Task": task, + "Failed": len(summary_dict[task]["failed_quanta"]), + "Blocked": summary_dict[task]["n_quanta_blocked"], + "Succeeded": summary_dict[task]["n_succeeded"], + "Expected": summary_dict[task]["n_expected"], + } + ) + else: + quanta_summary.append( + { + "Task": task, + "Failed": summary_dict[task]["failed_quanta"], + "Blocked": summary_dict[task]["n_quanta_blocked"], + "Succeeded": summary_dict[task]["n_succeeded"], + "Expected": summary_dict[task]["n_expected"], + } + ) + if "errors" in summary_dict[task].keys(): + error_summary.append({task: summary_dict[task]["errors"]}) + quanta = Table(quanta_summary) + datasets = Table(dataset_table_rows) + datasets.add_column(data_products, index=0, name="DatasetType") + quanta.pprint_all() print("\n") - datasets.pprint_all() - else: - report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs) + if not brief: + pprint.pprint(error_summary) + print("\n") + datasets.pprint_all() + else: + report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs) def report_v2( @@ -190,18 +190,18 @@ def report_v2( the flow of quanta and datasets through the graph and to identify where problems may be occurring. """ - butler = Butler.from_config(butler_config, writeable=False) - qpg = QuantumProvenanceGraph( - butler, - qgraph_uris, - collections=collections, - where=where, - curse_failed_logs=curse_failed_logs, - read_caveats=read_caveats, - use_qbb=use_qbb, - n_cores=n_cores, - ) - summary = qpg.to_summary(butler, do_store_logs=logs) + with Butler.from_config(butler_config, writeable=False) as butler: + qpg = QuantumProvenanceGraph( + butler, + qgraph_uris, + collections=collections, + where=where, + curse_failed_logs=curse_failed_logs, + read_caveats=read_caveats, + use_qbb=use_qbb, + n_cores=n_cores, + ) + summary = qpg.to_summary(butler, do_store_logs=logs) if view_graph: from lsst.pipe.base.pipeline_graph.visualization import ( diff --git a/python/lsst/ctrl/mpexec/cli/script/run.py b/python/lsst/ctrl/mpexec/cli/script/run.py index dacc3147..d087f859 100644 --- a/python/lsst/ctrl/mpexec/cli/script/run.py +++ b/python/lsst/ctrl/mpexec/cli/script/run.py @@ -235,7 +235,7 @@ def run( # Make butler instance. QuantumGraph should have an output run defined, # but we ignore it here and let command line decide actual output run. - butler = ButlerFactory.make_write_butler( + with ButlerFactory.make_write_butler( butler_config, qg.pipeline_graph, output=output, @@ -245,74 +245,73 @@ def run( rebase=rebase, replace_run=replace_run, prune_replaced=prune_replaced, - ) + ) as butler: + assert butler.run is not None, "Guaranteed by make_write_butler." + if skip_existing: + skip_existing_in += (butler.run,) - assert butler.run is not None, "Guaranteed by make_write_butler." - if skip_existing: - skip_existing_in += (butler.run,) + # Enable lsstDebug debugging. Note that this is done once in the + # main process before PreExecInit and it is also repeated before + # running each task in SingleQuantumExecutor (which may not be + # needed if `multiprocessing` always uses fork start method). + if enable_lsst_debug: + try: + _LOG.debug("Will try to import debug.py") + import debug # type: ignore # noqa: F401 + except ImportError: + _LOG.warning("No 'debug' module found.") - # Enable lsstDebug debugging. Note that this is done once in the - # main process before PreExecInit and it is also repeated before - # running each task in SingleQuantumExecutor (which may not be - # needed if `multiprocessing` always uses fork start method). - if enable_lsst_debug: - try: - _LOG.debug("Will try to import debug.py") - import debug # type: ignore # noqa: F401 - except ImportError: - _LOG.warning("No 'debug' module found.") - - # Save all InitOutputs, configs, etc. - if register_dataset_types: - qg.pipeline_graph.register_dataset_types(butler, include_packages=not no_versions) - if not skip_init_writes: - qg.write_init_outputs(butler, skip_existing=skip_existing) - qg.write_configs(butler, compare_existing=extend_run) - if not no_versions: - qg.write_packages(butler, compare_existing=extend_run) + # Save all InitOutputs, configs, etc. + if register_dataset_types: + qg.pipeline_graph.register_dataset_types(butler, include_packages=not no_versions) + if not skip_init_writes: + qg.write_init_outputs(butler, skip_existing=skip_existing) + qg.write_configs(butler, compare_existing=extend_run) + if not no_versions: + qg.write_packages(butler, compare_existing=extend_run) - if init_only: - return + if init_only: + return - if task_factory is None: - task_factory = TaskFactory() - resources = ExecutionResources( - num_cores=cores_per_quantum, max_mem=memory_per_quantum, default_mem_units=u.MB - ) - quantum_executor = SingleQuantumExecutor( - butler=butler, - task_factory=task_factory, - skip_existing_in=skip_existing_in, - clobber_outputs=clobber_outputs, - enable_lsst_debug=enable_lsst_debug, - resources=resources, - raise_on_partial_outputs=raise_on_partial_outputs, - ) + if task_factory is None: + task_factory = TaskFactory() + resources = ExecutionResources( + num_cores=cores_per_quantum, max_mem=memory_per_quantum, default_mem_units=u.MB + ) + quantum_executor = SingleQuantumExecutor( + butler=butler, + task_factory=task_factory, + skip_existing_in=skip_existing_in, + clobber_outputs=clobber_outputs, + enable_lsst_debug=enable_lsst_debug, + resources=resources, + raise_on_partial_outputs=raise_on_partial_outputs, + ) - if timeout is None: - timeout = MP_TIMEOUT - executor = MPGraphExecutor( - num_proc=processes, - timeout=timeout, - start_method=start_method, - quantum_executor=quantum_executor, - fail_fast=fail_fast, - pdb=pdb, - execution_graph_fixup=_import_graph_fixup(graph_fixup), - ) - # Have to reset connection pool to avoid sharing connections with - # forked processes. - butler.registry.resetConnectionPool() - try: - with lsst.utils.timer.profile(profile, _LOG): - executor.execute(qg) - finally: - if summary: - report = executor.getReport() - if report: - with ResourcePath(summary).open("w") as out: - # Do not save fields that are not set. - out.write(report.model_dump_json(exclude_none=True, indent=2)) + if timeout is None: + timeout = MP_TIMEOUT + executor = MPGraphExecutor( + num_proc=processes, + timeout=timeout, + start_method=start_method, + quantum_executor=quantum_executor, + fail_fast=fail_fast, + pdb=pdb, + execution_graph_fixup=_import_graph_fixup(graph_fixup), + ) + # Have to reset connection pool to avoid sharing connections with + # forked processes. + butler.registry.resetConnectionPool() + try: + with lsst.utils.timer.profile(profile, _LOG): + executor.execute(qg) + finally: + if summary: + report = executor.getReport() + if report: + with ResourcePath(summary).open("w") as out: + # Do not save fields that are not set. + out.write(report.model_dump_json(exclude_none=True, indent=2)) def _import_graph_fixup(graph_fixup: str) -> ExecutionGraphFixup | None: diff --git a/python/lsst/ctrl/mpexec/showInfo.py b/python/lsst/ctrl/mpexec/showInfo.py index 015b1256..ecb3f82d 100644 --- a/python/lsst/ctrl/mpexec/showInfo.py +++ b/python/lsst/ctrl/mpexec/showInfo.py @@ -389,7 +389,7 @@ def _showUri(self, qg: PredictedQuantumGraph, butler_config: ResourcePathExpress Path to configuration for the butler. """ - def dumpURIs(thisRef: DatasetRef) -> None: + def dumpURIs(butler: Butler, thisRef: DatasetRef) -> None: primary, components = butler.getURIs(thisRef, predict=True, run="TBD") if primary: print(f" {primary}", file=self.stream) @@ -398,17 +398,19 @@ def dumpURIs(thisRef: DatasetRef) -> None: for compName, compUri in components.items(): print(f" {compName}: {compUri}", file=self.stream) - butler = Butler.from_config(butler_config) - xgraph = qg.quantum_only_xgraph - execution_quanta = qg.build_execution_quanta() - for quantum_id, quantum_data in xgraph.nodes.items(): - print(f"Quantum {quantum_id}: {quantum_data['pipeline_node'].task_class_name}", file=self.stream) - print(" inputs:", file=self.stream) - execution_quantum = execution_quanta[quantum_id] - for refs in execution_quantum.inputs.values(): - for ref in refs: - dumpURIs(ref) - print(" outputs:", file=self.stream) - for refs in execution_quantum.outputs.values(): - for ref in refs: - dumpURIs(ref) + with Butler.from_config(butler_config) as butler: + xgraph = qg.quantum_only_xgraph + execution_quanta = qg.build_execution_quanta() + for quantum_id, quantum_data in xgraph.nodes.items(): + print( + f"Quantum {quantum_id}: {quantum_data['pipeline_node'].task_class_name}", file=self.stream + ) + print(" inputs:", file=self.stream) + execution_quantum = execution_quanta[quantum_id] + for refs in execution_quantum.inputs.values(): + for ref in refs: + dumpURIs(butler, ref) + print(" outputs:", file=self.stream) + for refs in execution_quantum.outputs.values(): + for ref in refs: + dumpURIs(butler, ref) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index be1edf1a..b1b51344 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -40,6 +40,7 @@ if TYPE_CHECKING: from lsst.daf.butler import Butler, ButlerMetrics, LimitedButler, Quantum from lsst.pipe.base import ExecutionResources, PipelineTask, QuantumSuccessCaveats, TaskFactory + from lsst.pipe.base.log_capture import _ExecutionLogRecordsExtra from lsst.pipe.base.pipeline_graph import TaskNode @@ -136,9 +137,16 @@ def __init__( ) def checkExistingOutputs( - self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler + self, + quantum: Quantum, + task_node: TaskNode, + /, + limited_butler: LimitedButler, + log_extra: _ExecutionLogRecordsExtra, ) -> bool: - return super()._check_existing_outputs(quantum, task_node, limited_butler=limited_butler) + return super()._check_existing_outputs( + quantum, task_node, limited_butler=limited_butler, log_extra=log_extra + ) def updatedQuantumInputs( self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler diff --git a/tests/test_cliCmdCleanup.py b/tests/test_cliCmdCleanup.py index 12181c45..5a327bf8 100644 --- a/tests/test_cliCmdCleanup.py +++ b/tests/test_cliCmdCleanup.py @@ -55,6 +55,7 @@ def setUp(self): self.root, configFile=os.path.join(TESTDIR, "config/metricTestRepoButler.yaml"), ) + self.enterContext(self.testRepo.butler) def tearDown(self): removeTestTempDir(self.root) @@ -82,8 +83,8 @@ def test_cleanup_yesNo(self): self.assertIn("Will remove:\n runs: \n others: ingest\n", result.output) self.assertIn("Done.", result.output) - butler = Butler.from_config(self.root) - self.assertEqual(set(butler.registry.queryCollections()), {"in", "ingest/run"}) + with Butler.from_config(self.root) as butler: + self.assertEqual(set(butler.registry.queryCollections()), {"in", "ingest/run"}) def test_nonExistantCollection(self): """Test running cleanup on a collection that has never existed.""" diff --git a/tests/test_cliCmdPurge.py b/tests/test_cliCmdPurge.py index 3441100c..00c90173 100644 --- a/tests/test_cliCmdPurge.py +++ b/tests/test_cliCmdPurge.py @@ -54,6 +54,7 @@ def setUp(self): self.root, configFile=os.path.join(TESTDIR, "config/metricTestRepoButler.yaml"), ) + self.enterContext(self.testRepo.butler) def tearDown(self): removeTestTempDir(self.root) diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index c2d90d82..4c7f0909 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -87,6 +87,7 @@ def test_report(self): root=self.root, metadata=metadata, ) + butler.close() # Check that we can get the proper run collection from the qgraph self.assertEqual(check_output_run(qgraph, "run"), []) @@ -272,6 +273,7 @@ def test_aggregate_reports(self): root=self.root, metadata=metadata, ) + butler.close() # Check that we can get the proper run collection from the qgraph self.assertEqual(check_output_run(qgraph1, "run"), []) diff --git a/tests/test_cliCmdUpdateGraphRun.py b/tests/test_cliCmdUpdateGraphRun.py index c4a2e156..4af9361c 100644 --- a/tests/test_cliCmdUpdateGraphRun.py +++ b/tests/test_cliCmdUpdateGraphRun.py @@ -54,6 +54,7 @@ def tearDown(self) -> None: def test_update(self): """Test for updating output run in a graph.""" helper = InMemoryRepo() + self.enterContext(helper) helper.add_task() helper.add_task() qgc = helper.make_quantum_graph_builder().finish(attach_datastore_records=False) diff --git a/tests/test_preExecInit.py b/tests/test_preExecInit.py index 94d6b3a3..550fe837 100644 --- a/tests/test_preExecInit.py +++ b/tests/test_preExecInit.py @@ -62,6 +62,7 @@ def test_saveInitOutputs(self): with self.subTest(extendRun=extendRun): with temporaryDirectory() as tmpdir: butler, qgraph = makeSimpleQGraph(root=tmpdir) + self.enterContext(butler) preExecInit = PreExecInit(butler=butler, taskFactory=taskFactory, extendRun=extendRun) preExecInit.saveInitOutputs(qgraph) @@ -71,6 +72,7 @@ def test_saveInitOutputs_twice(self): with self.subTest(extendRun=extendRun): with temporaryDirectory() as tmpdir: butler, qgraph = makeSimpleQGraph(root=tmpdir) + self.enterContext(butler) preExecInit = PreExecInit(butler=butler, taskFactory=taskFactory, extendRun=extendRun) preExecInit.saveInitOutputs(qgraph) if extendRun: @@ -86,6 +88,7 @@ def test_saveConfigs(self): with self.subTest(extendRun=extendRun): with temporaryDirectory() as tmpdir: butler, qgraph = makeSimpleQGraph(root=tmpdir) + self.enterContext(butler) preExecInit = PreExecInit(butler=butler, taskFactory=None, extendRun=extendRun) preExecInit.saveConfigs(qgraph) @@ -94,6 +97,7 @@ def test_saveConfigs_twice(self): with self.subTest(extendRun=extendRun): with temporaryDirectory() as tmpdir: butler, qgraph = makeSimpleQGraph(root=tmpdir) + self.enterContext(butler) preExecInit = PreExecInit(butler=butler, taskFactory=None, extendRun=extendRun) preExecInit.saveConfigs(qgraph) if extendRun: @@ -109,6 +113,7 @@ def test_savePackageVersions(self): with self.subTest(extendRun=extendRun): with temporaryDirectory() as tmpdir: butler, qgraph = makeSimpleQGraph(root=tmpdir) + self.enterContext(butler) preExecInit = PreExecInit(butler=butler, taskFactory=None, extendRun=extendRun) preExecInit.savePackageVersions(qgraph) @@ -117,6 +122,7 @@ def test_savePackageVersions_twice(self): with self.subTest(extendRun=extendRun): with temporaryDirectory() as tmpdir: butler, qgraph = makeSimpleQGraph(root=tmpdir) + self.enterContext(butler) preExecInit = PreExecInit(butler=butler, taskFactory=None, extendRun=extendRun) preExecInit.savePackageVersions(qgraph) if extendRun: diff --git a/tests/test_run.py b/tests/test_run.py index 8588f900..22dc9a25 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -40,6 +40,7 @@ from lsst.ctrl.mpexec.cli.utils import collect_pipeline_actions from lsst.ctrl.mpexec.showInfo import ShowInfo from lsst.daf.butler import CollectionType, MissingCollectionError +from lsst.daf.butler.cli.utils import LogCliRunner from lsst.pipe.base.mp_graph_executor import MPGraphExecutorError from lsst.pipe.base.script import transfer_from_graph from lsst.pipe.base.tests.mocks import DirectButlerRepo, DynamicTestPipelineTaskConfig @@ -60,7 +61,9 @@ def fake_run(ctx: click.Context, **kwargs: object): kwargs = collect_pipeline_actions(ctx, **kwargs) mock(**kwargs) - runner = click.testing.CliRunner() + # At least one tests requires that we enable INFO logging so use + # the specialist runner. + runner = LogCliRunner() result = runner.invoke(fake_run, args, catch_exceptions=False) if result.exit_code != 0: raise RuntimeError(f"Failure getting default args for 'run': {result}")