Coverage for python/lsst/pipe/base/single_quantum_executor.py: 11%
233 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 01:23 -0700
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 01:23 -0700
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28__all__ = ["SingleQuantumExecutor"]
30import logging
31import time
32import uuid
33from collections import defaultdict
34from collections.abc import Callable, Mapping
35from itertools import chain
36from typing import Any
38from lsst.daf.butler import (
39 Butler,
40 DatasetRef,
41 DatasetType,
42 LimitedButler,
43 NamedKeyDict,
44 Quantum,
45)
46from lsst.daf.butler.logging import ButlerLogRecords
47from lsst.utils.introspection import get_full_type_name
48from lsst.utils.timer import logInfo
50from ._quantumContext import ExecutionResources, QuantumContext
51from ._status import (
52 AnnotatedPartialOutputsError,
53 ExceptionInfo,
54 InvalidQuantumError,
55 NoWorkFound,
56 QuantumSuccessCaveats,
57)
58from .connections import AdjustQuantumHelper
59from .gc_metrics import GcMetrics
60from .log_capture import LogCapture, _ExecutionLogRecordsExtra
61from .pipeline_graph import TaskNode
62from .pipelineTask import PipelineTask
63from .quantum_graph_executor import QuantumExecutionResult, QuantumExecutor
64from .quantum_reports import QuantumReport
65from .task import _TASK_FULL_METADATA_TYPE, _TASK_METADATA_TYPE
66from .taskFactory import TaskFactory
68_LOG = logging.getLogger(__name__)
71class SingleQuantumExecutor(QuantumExecutor):
72 """Executor class which runs one Quantum at a time.
74 Parameters
75 ----------
76 butler : `~lsst.daf.butler.LimitedButler` or `None`, optional
77 Data butler; `None` means that ``limited_butler_factory`` should be
78 used instead.
79 task_factory : `.TaskFactory`, optional
80 Instance of a task factory. Defaults to a new instance of
81 `lsst.pipe.base.TaskFactory`.
82 skip_existing_in : `str` or `~collections.abc.Iterable` [ `str` ]
83 A collection name or list of collections to search for the existing
84 outputs of quanta, which indicates that those quanta should be skipped.
85 This class only checks for the presence of butler output run in the
86 list of collections. If the output run is present in the list then the
87 quanta whose complete outputs exist in the output run will be skipped.
88 `None` or empty string/sequence disables skipping.
89 clobber_outputs : `bool`, optional
90 If `True`, then outputs from a quantum that exist in output run
91 collection will be removed prior to executing a quantum. If
92 ``skip_existing_in`` contains output run, then only partial outputs
93 from a quantum will be removed. Only used when ``butler`` is not
94 `None`.
95 enable_lsst_debug : `bool`, optional
96 Enable debugging with ``lsstDebug`` facility for a task.
97 limited_butler_factory : `~collections.abc.Callable`, optional
98 A method that creates a `~lsst.daf.butler.LimitedButler` instance for a
99 given Quantum. This parameter must be provided if ``butler`` is
100 `None`. If ``butler`` is not `None` then this parameter is ignored.
101 resources : `.ExecutionResources`, optional
102 The resources available to this quantum when executing.
103 skip_existing : `bool`, optional
104 If `True`, skip quanta whose metadata datasets are already stored.
105 Unlike ``skip_existing_in``, this works with limited butlers as well as
106 full butlers. Always set to `True` if ``skip_existing_in`` matches
107 ``butler.run``.
108 assume_no_existing_outputs : `bool`, optional
109 If `True`, assume preexisting outputs are impossible (e.g. because this
110 is known by higher-level code to be a new ``RUN`` collection), and do
111 not look for them. This causes the ``skip_existing`` and
112 ``clobber_outputs`` options to be ignored, but unlike just setting both
113 of those to `False`, it also avoids all dataset existence checks.
114 raise_on_partial_outputs : `bool`, optional
115 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError`
116 immediately, instead of considering the partial result a success and
117 continuing to run downstream tasks.
118 job_metadata : `~collections.abc.Mapping`
119 Mapping with extra metadata to embed within the quantum metadata under
120 the "job" key. This is intended to correspond to information common to
121 all quanta being executed in a single process, such as the time taken
122 to load the quantum graph in a BPS job.
123 """
125 def __init__(
126 self,
127 *,
128 butler: LimitedButler | None = None,
129 task_factory: TaskFactory | None = None,
130 skip_existing_in: Any = None,
131 clobber_outputs: bool = False,
132 enable_lsst_debug: bool = False,
133 limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None,
134 resources: ExecutionResources | None = None,
135 skip_existing: bool = False,
136 assume_no_existing_outputs: bool = False,
137 raise_on_partial_outputs: bool = True,
138 job_metadata: Mapping[str, int | str | float] | None = None,
139 ):
140 self._butler: Butler | None = None
141 self._limited_butler: LimitedButler | None = None
142 match butler:
143 case Butler():
144 self._butler = butler
145 self._limited_butler = butler
146 case LimitedButler():
147 self._limited_butler = butler
148 case None:
149 if limited_butler_factory is None:
150 raise ValueError("limited_butler_factory is needed when butler is None")
151 self._task_factory = task_factory if task_factory is not None else TaskFactory()
152 self._clobber_outputs = clobber_outputs
153 self._enable_lsst_debug = enable_lsst_debug
154 self._limited_butler_factory = limited_butler_factory
155 self._resources = resources
156 self._assume_no_existing_outputs = assume_no_existing_outputs
157 self._raise_on_partial_outputs = raise_on_partial_outputs
158 self._job_metadata = job_metadata
159 # Find whether output run is in skip_existing_in.
160 self._skip_existing = skip_existing
161 if self._butler is not None and skip_existing_in and not self._skip_existing:
162 self._skip_existing = self._butler.run in self._butler.collections.query(
163 skip_existing_in, flatten_chains=True
164 )
165 self._previous_process_quanta: list[uuid.UUID] = []
167 def execute(
168 self,
169 task_node: TaskNode,
170 /,
171 quantum: Quantum,
172 quantum_id: uuid.UUID | None = None,
173 *,
174 log_records: ButlerLogRecords | None = None,
175 ) -> QuantumExecutionResult:
176 # Docstring inherited from QuantumExecutor.execute
177 if self._butler is not None:
178 self._butler.registry.refresh()
179 return self._execute(task_node, quantum, quantum_id=quantum_id, log_records=log_records)
181 def _execute(
182 self,
183 task_node: TaskNode,
184 /,
185 quantum: Quantum,
186 quantum_id: uuid.UUID | None = None,
187 *,
188 log_records: ButlerLogRecords | None = None,
189 ) -> QuantumExecutionResult:
190 """Execute the quantum.
192 Internal implementation of `execute()`.
193 """
194 # Make a limited butler instance if needed.
195 limited_butler: LimitedButler
196 used_butler_factory = False
197 if self._butler is not None:
198 limited_butler = self._butler
199 else:
200 # We check this in constructor, but mypy needs this check here.
201 if self._limited_butler is not None:
202 limited_butler = self._limited_butler
203 else:
204 assert self._limited_butler_factory is not None
205 limited_butler = self._limited_butler_factory(quantum)
206 used_butler_factory = True
208 try:
209 return self._execute_with_limited_butler(
210 task_node, limited_butler, quantum=quantum, quantum_id=quantum_id, log_records=log_records
211 )
212 finally:
213 if used_butler_factory:
214 limited_butler.close()
216 def _execute_with_limited_butler(
217 self,
218 task_node: TaskNode,
219 limited_butler: LimitedButler,
220 /,
221 quantum: Quantum,
222 quantum_id: uuid.UUID | None = None,
223 *,
224 log_records: ButlerLogRecords | None = None,
225 ) -> QuantumExecutionResult:
226 startTime = time.time()
227 assert quantum.dataId is not None, "Quantum DataId cannot be None"
228 report = QuantumReport(quantumId=quantum_id, dataId=quantum.dataId, taskLabel=task_node.label)
229 if self._butler is not None:
230 log_capture = LogCapture.from_full(self._butler)
231 else:
232 log_capture = LogCapture.from_limited(limited_butler)
233 with log_capture.capture_logging(task_node, quantum, records=log_records) as captureLog:
234 # Save detailed resource usage before task start to metadata.
235 quantumMetadata = _TASK_METADATA_TYPE()
236 logInfo(None, "prep", metadata=quantumMetadata) # type: ignore[arg-type]
238 _LOG.info(
239 "Preparing execution of quantum for label=%s dataId=%s.", task_node.label, quantum.dataId
240 )
242 # check whether to skip or delete old outputs, if it returns True
243 # or raises an exception do not try to store logs, as they may be
244 # already in butler.
245 captureLog.store = False
246 if self._check_existing_outputs(quantum, task_node, limited_butler, captureLog.extra):
247 _LOG.info(
248 "Skipping already-successful quantum for label=%s dataId=%s.",
249 task_node.label,
250 quantum.dataId,
251 )
252 return QuantumExecutionResult(quantum, report, skipped_existing=True, adjusted_no_work=False)
253 captureLog.store = True
255 captureLog.extra.previous_process_quanta.extend(self._previous_process_quanta)
256 if quantum_id is not None:
257 self._previous_process_quanta.append(quantum_id)
258 try:
259 quantum = self._updated_quantum_inputs(quantum, task_node, limited_butler)
260 except NoWorkFound as exc:
261 _LOG.info(
262 "Nothing to do for task '%s' on quantum %s; saving metadata and skipping: %s",
263 task_node.label,
264 quantum.dataId,
265 str(exc),
266 )
267 quantumMetadata["caveats"] = QuantumSuccessCaveats.from_adjust_quantum_no_work().value
268 quantumMetadata["outputs"] = []
269 # Make empty metadata that looks something like what a
270 # do-nothing task would write (but we don't bother with empty
271 # nested PropertySets for subtasks). This is slightly
272 # duplicative with logic in pipe_base that we can't easily call
273 # from here; we'll fix this on DM-29761.
274 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
275 fullMetadata = _TASK_FULL_METADATA_TYPE()
276 fullMetadata[task_node.label] = _TASK_METADATA_TYPE()
277 fullMetadata["quantum"] = quantumMetadata
278 if self._job_metadata is not None:
279 fullMetadata["job"] = self._job_metadata
280 self._write_metadata(quantum, fullMetadata, task_node, limited_butler)
281 return QuantumExecutionResult(
282 quantum,
283 report,
284 skipped_existing=False,
285 adjusted_no_work=True,
286 task_metadata=fullMetadata,
287 )
289 # enable lsstDebug debugging
290 if self._enable_lsst_debug:
291 try:
292 _LOG.debug("Will try to import debug.py")
293 import debug # type: ignore # noqa:F401
294 except ImportError:
295 _LOG.warning("No 'debug' module found.")
297 # Ensure that we are executing a frozen config
298 task_node.config.freeze()
299 logInfo(None, "init", metadata=quantumMetadata) # type: ignore[arg-type]
300 init_input_refs = list(quantum.initInputs.values())
302 _LOG.info(
303 "Constructing task and executing quantum for label=%s dataId=%s.",
304 task_node.label,
305 quantum.dataId,
306 )
307 try:
308 task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs)
309 logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
310 outputs_put: list[uuid.UUID] = []
311 with limited_butler.record_metrics() as butler_metrics, GcMetrics() as gc_metrics:
312 caveats = self._run_quantum(
313 task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put
314 )
315 except Exception as e:
316 _LOG.error(
317 "Execution of task '%s' on quantum %s failed. Exception %s: %s",
318 task_node.label,
319 quantum.dataId,
320 e.__class__.__name__,
321 str(e),
322 )
323 captureLog.extra.exception = ExceptionInfo(
324 type_name=get_full_type_name(e),
325 message=str(e),
326 metadata={},
327 )
328 raise
329 else:
330 quantumMetadata["butler_metrics"] = butler_metrics.model_dump()
331 quantumMetadata["gc_metrics"] = gc_metrics.model_dump()
332 quantumMetadata["caveats"] = caveats.value
333 # Stringify the UUID for easier compatibility with
334 # PropertyList.
335 finally:
336 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
337 fullMetadata = task.getFullMetadata()
338 quantumMetadata["outputs"] = [str(output) for output in outputs_put]
339 fullMetadata["quantum"] = quantumMetadata
340 if self._job_metadata is not None:
341 fullMetadata["job"] = self._job_metadata
342 captureLog.extra.metadata = fullMetadata
343 self._write_metadata(quantum, fullMetadata, task_node, limited_butler)
344 stopTime = time.time()
345 _LOG.info(
346 "Execution of task '%s' on quantum %s took %.3f seconds",
347 task_node.label,
348 quantum.dataId,
349 stopTime - startTime,
350 )
351 return QuantumExecutionResult(
352 quantum,
353 report,
354 skipped_existing=False,
355 adjusted_no_work=False,
356 task_metadata=fullMetadata,
357 )
359 def _check_existing_outputs(
360 self,
361 quantum: Quantum,
362 task_node: TaskNode,
363 /,
364 limited_butler: LimitedButler,
365 log_extra: _ExecutionLogRecordsExtra,
366 ) -> bool:
367 """Decide whether this quantum needs to be executed.
369 If only partial outputs exist then they are removed if
370 ``clobberOutputs`` is True, otherwise an exception is raised.
372 The ``LimitedButler`` is used for everything, and should be set to
373 ``self.butler`` if no separate ``LimitedButler`` is available.
375 Parameters
376 ----------
377 quantum : `~lsst.daf.butler.Quantum`
378 Quantum to check for existing outputs.
379 task_node : `~.pipeline_graph.TaskNode`
380 Task definition structure.
381 limited_butler : `~lsst.daf.butler.LimitedButler`
382 Butler to use for querying and clobbering.
383 log_extra : `.log_capture.TaskLogRecordsExtra`
384 Extra information to attach to log records.
386 Returns
387 -------
388 exist : `bool`
389 `True` if ``self.skipExisting`` is defined, and a previous
390 execution of this quanta appears to have completed successfully
391 (either because metadata was written or all datasets were written).
392 `False` otherwise.
394 Raises
395 ------
396 RuntimeError
397 Raised if some outputs exist and some not.
398 """
399 if self._assume_no_existing_outputs:
400 return False
402 if self._skip_existing:
403 _LOG.debug(
404 "Checking existence of metadata from previous execution of label=%s dataId=%s.",
405 task_node.label,
406 quantum.dataId,
407 )
408 # Metadata output exists; this is sufficient to assume the previous
409 # run was successful and should be skipped.
410 [metadata_ref] = quantum.outputs[task_node.metadata_output.dataset_type_name]
411 if metadata_ref is not None:
412 if limited_butler.stored(metadata_ref):
413 return True
415 # Find and prune (partial) outputs if `self.clobberOutputs` is set.
416 _LOG.debug(
417 "Looking for existing outputs in the way for label=%s dataId=%s.", task_node.label, quantum.dataId
418 )
419 ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values()))
420 if task_node.log_output is not None:
421 (log_ref,) = quantum.outputs[task_node.log_output.dataset_type_name]
422 if ref_dict[log_ref]:
423 _LOG.debug(
424 "Attaching logs from previous attempt on label=%s dataId=%s.",
425 task_node.label,
426 quantum.dataId,
427 )
428 log_extra.attach_previous_attempt(limited_butler.get(log_ref))
429 existingRefs = [ref for ref, exists in ref_dict.items() if exists]
430 missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
431 if existingRefs:
432 if not missingRefs:
433 # Full outputs exist.
434 if self._skip_existing:
435 return True
436 elif self._clobber_outputs:
437 _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs)
438 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
439 else:
440 raise RuntimeError(
441 f"Complete outputs exists for a quantum {quantum} "
442 "and neither clobberOutputs nor skipExisting is set: "
443 f"existingRefs={existingRefs}"
444 )
445 else:
446 # Partial outputs from a failed quantum.
447 _LOG.debug(
448 "Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s",
449 quantum,
450 existingRefs,
451 missingRefs,
452 )
453 if self._clobber_outputs:
454 # only prune
455 _LOG.info("Removing partial outputs for task %s: %s", task_node.label, existingRefs)
456 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
457 return False
458 else:
459 raise RuntimeError(
460 "Registry inconsistency while checking for existing quantum outputs:"
461 f" quantum={quantum} existingRefs={existingRefs}"
462 f" missingRefs={missingRefs}"
463 )
465 # By default always execute.
466 return False
468 def _updated_quantum_inputs(
469 self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler
470 ) -> Quantum:
471 """Update quantum with extra information, returns a new updated
472 Quantum.
474 Some methods may require input DatasetRefs to have non-None
475 ``dataset_id``, but in case of intermediate dataset it may not be
476 filled during QuantumGraph construction. This method will retrieve
477 missing info from registry.
479 Parameters
480 ----------
481 quantum : `~lsst.daf.butler.Quantum`
482 Single Quantum instance.
483 task_node : `~.pipeline_graph.TaskNode`
484 Task definition structure.
485 limited_butler : `~lsst.daf.butler.LimitedButler`
486 Butler to use for querying.
488 Returns
489 -------
490 update : `~lsst.daf.butler.Quantum`
491 Updated Quantum instance.
492 """
493 anyChanges = False
494 updatedInputs: defaultdict[DatasetType, list] = defaultdict(list)
495 for key, refsForDatasetType in quantum.inputs.items():
496 _LOG.debug(
497 "Checking existence of input '%s' for label=%s dataId=%s.",
498 key.name,
499 task_node.label,
500 quantum.dataId,
501 )
502 toCheck = []
503 newRefsForDatasetType = updatedInputs[key]
504 for ref in refsForDatasetType:
505 if self._should_assume_exists(quantum, ref):
506 newRefsForDatasetType.append(ref)
507 else:
508 toCheck.append(ref)
509 if not toCheck:
510 _LOG.debug(
511 "Assuming overall input '%s' is present without checks for label=%s dataId=%s.",
512 key.name,
513 task_node.label,
514 quantum.dataId,
515 )
516 continue
517 stored = limited_butler.stored_many(toCheck)
518 for ref in toCheck:
519 if stored[ref]:
520 newRefsForDatasetType.append(ref)
521 else:
522 # This should only happen if a predicted intermediate was
523 # not actually produced upstream, but
524 # datastore misconfigurations can unfortunately also land
525 # us here.
526 _LOG.info("No dataset artifact found for %s", ref)
527 continue
528 if len(newRefsForDatasetType) != len(refsForDatasetType):
529 anyChanges = True
530 # If we removed any input datasets, let the task check if it has enough
531 # to proceed and/or prune related datasets that it also doesn't
532 # need/produce anymore. It will raise NoWorkFound if it can't run,
533 # which we'll let propagate up. This is exactly what we run during QG
534 # generation, because a task shouldn't care whether an input is missing
535 # because some previous task didn't produce it, or because it just
536 # wasn't there during QG generation.
537 namedUpdatedInputs = NamedKeyDict[DatasetType, list[DatasetRef]](updatedInputs.items())
538 helper = AdjustQuantumHelper(namedUpdatedInputs, quantum.outputs)
539 if anyChanges:
540 _LOG.debug("Running adjustQuantum for label=%s dataId=%s.", task_node.label, quantum.dataId)
541 assert quantum.dataId is not None, "Quantum DataId cannot be None"
542 helper.adjust_in_place(task_node.get_connections(), label=task_node.label, data_id=quantum.dataId)
543 return Quantum(
544 taskName=quantum.taskName,
545 taskClass=quantum.taskClass,
546 dataId=quantum.dataId,
547 initInputs=quantum.initInputs,
548 inputs=helper.inputs,
549 outputs=helper.outputs,
550 )
552 def _run_quantum(
553 self,
554 task: PipelineTask,
555 quantum: Quantum,
556 task_node: TaskNode,
557 /,
558 limited_butler: LimitedButler,
559 quantum_id: uuid.UUID | None,
560 ids_put: list[uuid.UUID],
561 ) -> QuantumSuccessCaveats:
562 """Execute task on a single quantum.
564 Parameters
565 ----------
566 task : `PipelineTask`
567 Task object.
568 quantum : `~lsst.daf.butler.Quantum`
569 Single Quantum instance.
570 task_node : `~.pipeline_graph.TaskNode`
571 Task definition structure.
572 limited_butler : `~lsst.daf.butler.LimitedButler`
573 Butler to use for dataset I/O.
574 quantum_id : `uuid.UUID` or `None`
575 ID of the quantum being executed.
576 ids_put : list[ `uuid.UUID` ]
577 List to be populated with the dataset IDs that were written by this
578 quantum. This is an output parameter in order to allow it to be
579 populated even when an exception is raised.
581 Returns
582 -------
583 flags : `QuantumSuccessCaveats`
584 Flags that describe qualified successes.
585 """
586 flags = QuantumSuccessCaveats.NO_CAVEATS
588 # Create a butler that operates in the context of a quantum
589 butlerQC = QuantumContext(limited_butler, quantum, resources=self._resources, quantum_id=quantum_id)
591 # Get the input and output references for the task
592 inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum)
594 # Call task runQuantum() method.
595 try:
596 task.runQuantum(butlerQC, inputRefs, outputRefs)
597 except NoWorkFound as err:
598 # Not an error, just an early exit.
599 _LOG.info(
600 "Task '%s' on quantum %s exited early with no work found: %s.",
601 task_node.label,
602 quantum.dataId,
603 str(err),
604 )
605 flags |= err.FLAGS
606 except AnnotatedPartialOutputsError as caught:
607 error: BaseException
608 if caught.__cause__ is None:
609 _LOG.error(
610 "Incorrect use of AnnotatedPartialOutputsError: no chained exception found.",
611 task_node.label,
612 quantum.dataId,
613 )
614 error = caught
615 else:
616 error = caught.__cause__
617 if self._raise_on_partial_outputs:
618 # Note: this is a real edge case that required some
619 # experimentation: without 'from None' below, this raise would
620 # produce a "while one exception was being handled, another was
621 # raised" traceback involving AnnotatedPartialOutputsError.
622 # With the 'from None', we get just the error chained to it, as
623 # desired.
624 raise error from None
625 else:
626 _LOG.error(
627 "Task '%s' on quantum %s exited with partial outputs; "
628 "considering this a qualified success and proceeding.",
629 task_node.label,
630 quantum.dataId,
631 )
632 _LOG.error(error, exc_info=error)
633 flags |= caught.FLAGS
634 finally:
635 ids_put.extend(output[2] for output in butlerQC.outputsPut)
636 if not butlerQC.outputsPut:
637 flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
638 if not butlerQC.outputsPut == butlerQC.allOutputs:
639 flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
640 return flags
642 def _write_metadata(
643 self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
644 ) -> None:
645 # DatasetRef has to be in the Quantum outputs, can lookup by name
646 try:
647 [ref] = quantum.outputs[task_node.metadata_output.dataset_type_name]
648 except LookupError as exc:
649 raise InvalidQuantumError(
650 "Quantum outputs is missing metadata dataset type "
651 f"{task_node.metadata_output.dataset_type_name};"
652 " this could happen due to inconsistent options between QuantumGraph generation"
653 " and execution"
654 ) from exc
655 limited_butler.put(metadata, ref)
657 def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None:
658 """Report whether the given dataset can be assumed to exist because
659 some previous check reported that it did.
661 If this is `True` for a dataset does not in fact exist anymore, that's
662 an unexpected problem that we want to raise as an exception, and
663 definitely not a case where some predicted output just wasn't produced.
664 We can't always tell the difference, but in this case we can.
666 Parameters
667 ----------
668 quantum : `Quantum`
669 Quantum being processed.
670 ref : `lsst.daf.butler.DatasetRef`
671 Reference to the input dataset.
673 Returns
674 -------
675 exists : `bool` or `None`
676 `True` if this dataset is definitely an overall input, `False` if
677 some other quantum in the graph is expected to produce it, and
678 `None` if the answer could not be determined.
679 """
680 if quantum.datastore_records:
681 for datastore_record_data in quantum.datastore_records.values():
682 if ref.id in datastore_record_data.records:
683 return True
684 return False
685 return None