Coverage for python/lsst/pipe/base/single_quantum_executor.py: 11%

233 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 01:48 -0700

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28__all__ = ["SingleQuantumExecutor"] 

29 

30import logging 

31import time 

32import uuid 

33from collections import defaultdict 

34from collections.abc import Callable, Mapping 

35from itertools import chain 

36from typing import Any 

37 

38from lsst.daf.butler import ( 

39 Butler, 

40 DatasetRef, 

41 DatasetType, 

42 LimitedButler, 

43 NamedKeyDict, 

44 Quantum, 

45) 

46from lsst.daf.butler.logging import ButlerLogRecords 

47from lsst.utils.introspection import get_full_type_name 

48from lsst.utils.timer import logInfo 

49 

50from ._quantumContext import ExecutionResources, QuantumContext 

51from ._status import ( 

52 AnnotatedPartialOutputsError, 

53 ExceptionInfo, 

54 InvalidQuantumError, 

55 NoWorkFound, 

56 QuantumSuccessCaveats, 

57) 

58from .connections import AdjustQuantumHelper 

59from .gc_metrics import GcMetrics 

60from .log_capture import LogCapture, _ExecutionLogRecordsExtra 

61from .pipeline_graph import TaskNode 

62from .pipelineTask import PipelineTask 

63from .quantum_graph_executor import QuantumExecutionResult, QuantumExecutor 

64from .quantum_reports import QuantumReport 

65from .task import _TASK_FULL_METADATA_TYPE, _TASK_METADATA_TYPE 

66from .taskFactory import TaskFactory 

67 

68_LOG = logging.getLogger(__name__) 

69 

70 

71class SingleQuantumExecutor(QuantumExecutor): 

72 """Executor class which runs one Quantum at a time. 

73 

74 Parameters 

75 ---------- 

76 butler : `~lsst.daf.butler.LimitedButler` or `None`, optional 

77 Data butler; `None` means that ``limited_butler_factory`` should be 

78 used instead. 

79 task_factory : `.TaskFactory`, optional 

80 Instance of a task factory. Defaults to a new instance of 

81 `lsst.pipe.base.TaskFactory`. 

82 skip_existing_in : `str` or `~collections.abc.Iterable` [ `str` ] 

83 A collection name or list of collections to search for the existing 

84 outputs of quanta, which indicates that those quanta should be skipped. 

85 This class only checks for the presence of butler output run in the 

86 list of collections. If the output run is present in the list then the 

87 quanta whose complete outputs exist in the output run will be skipped. 

88 `None` or empty string/sequence disables skipping. 

89 clobber_outputs : `bool`, optional 

90 If `True`, then outputs from a quantum that exist in output run 

91 collection will be removed prior to executing a quantum. If 

92 ``skip_existing_in`` contains output run, then only partial outputs 

93 from a quantum will be removed. Only used when ``butler`` is not 

94 `None`. 

95 enable_lsst_debug : `bool`, optional 

96 Enable debugging with ``lsstDebug`` facility for a task. 

97 limited_butler_factory : `~collections.abc.Callable`, optional 

98 A method that creates a `~lsst.daf.butler.LimitedButler` instance for a 

99 given Quantum. This parameter must be provided if ``butler`` is 

100 `None`. If ``butler`` is not `None` then this parameter is ignored. 

101 resources : `.ExecutionResources`, optional 

102 The resources available to this quantum when executing. 

103 skip_existing : `bool`, optional 

104 If `True`, skip quanta whose metadata datasets are already stored. 

105 Unlike ``skip_existing_in``, this works with limited butlers as well as 

106 full butlers. Always set to `True` if ``skip_existing_in`` matches 

107 ``butler.run``. 

108 assume_no_existing_outputs : `bool`, optional 

109 If `True`, assume preexisting outputs are impossible (e.g. because this 

110 is known by higher-level code to be a new ``RUN`` collection), and do 

111 not look for them. This causes the ``skip_existing`` and 

112 ``clobber_outputs`` options to be ignored, but unlike just setting both 

113 of those to `False`, it also avoids all dataset existence checks. 

114 raise_on_partial_outputs : `bool`, optional 

115 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError` 

116 immediately, instead of considering the partial result a success and 

117 continuing to run downstream tasks. 

118 job_metadata : `~collections.abc.Mapping` 

119 Mapping with extra metadata to embed within the quantum metadata under 

120 the "job" key. This is intended to correspond to information common to 

121 all quanta being executed in a single process, such as the time taken 

122 to load the quantum graph in a BPS job. 

123 """ 

124 

125 def __init__( 

126 self, 

127 *, 

128 butler: LimitedButler | None = None, 

129 task_factory: TaskFactory | None = None, 

130 skip_existing_in: Any = None, 

131 clobber_outputs: bool = False, 

132 enable_lsst_debug: bool = False, 

133 limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, 

134 resources: ExecutionResources | None = None, 

135 skip_existing: bool = False, 

136 assume_no_existing_outputs: bool = False, 

137 raise_on_partial_outputs: bool = True, 

138 job_metadata: Mapping[str, int | str | float] | None = None, 

139 ): 

140 self._butler: Butler | None = None 

141 self._limited_butler: LimitedButler | None = None 

142 match butler: 

143 case Butler(): 

144 self._butler = butler 

145 self._limited_butler = butler 

146 case LimitedButler(): 

147 self._limited_butler = butler 

148 case None: 

149 if limited_butler_factory is None: 

150 raise ValueError("limited_butler_factory is needed when butler is None") 

151 self._task_factory = task_factory if task_factory is not None else TaskFactory() 

152 self._clobber_outputs = clobber_outputs 

153 self._enable_lsst_debug = enable_lsst_debug 

154 self._limited_butler_factory = limited_butler_factory 

155 self._resources = resources 

156 self._assume_no_existing_outputs = assume_no_existing_outputs 

157 self._raise_on_partial_outputs = raise_on_partial_outputs 

158 self._job_metadata = job_metadata 

159 # Find whether output run is in skip_existing_in. 

160 self._skip_existing = skip_existing 

161 if self._butler is not None and skip_existing_in and not self._skip_existing: 

162 self._skip_existing = self._butler.run in self._butler.collections.query( 

163 skip_existing_in, flatten_chains=True 

164 ) 

165 self._previous_process_quanta: list[uuid.UUID] = [] 

166 

167 def execute( 

168 self, 

169 task_node: TaskNode, 

170 /, 

171 quantum: Quantum, 

172 quantum_id: uuid.UUID | None = None, 

173 *, 

174 log_records: ButlerLogRecords | None = None, 

175 ) -> QuantumExecutionResult: 

176 # Docstring inherited from QuantumExecutor.execute 

177 if self._butler is not None: 

178 self._butler.registry.refresh() 

179 return self._execute(task_node, quantum, quantum_id=quantum_id, log_records=log_records) 

180 

181 def _execute( 

182 self, 

183 task_node: TaskNode, 

184 /, 

185 quantum: Quantum, 

186 quantum_id: uuid.UUID | None = None, 

187 *, 

188 log_records: ButlerLogRecords | None = None, 

189 ) -> QuantumExecutionResult: 

190 """Execute the quantum. 

191 

192 Internal implementation of `execute()`. 

193 """ 

194 # Make a limited butler instance if needed. 

195 limited_butler: LimitedButler 

196 used_butler_factory = False 

197 if self._butler is not None: 

198 limited_butler = self._butler 

199 else: 

200 # We check this in constructor, but mypy needs this check here. 

201 if self._limited_butler is not None: 

202 limited_butler = self._limited_butler 

203 else: 

204 assert self._limited_butler_factory is not None 

205 limited_butler = self._limited_butler_factory(quantum) 

206 used_butler_factory = True 

207 

208 try: 

209 return self._execute_with_limited_butler( 

210 task_node, limited_butler, quantum=quantum, quantum_id=quantum_id, log_records=log_records 

211 ) 

212 finally: 

213 if used_butler_factory: 

214 limited_butler.close() 

215 

216 def _execute_with_limited_butler( 

217 self, 

218 task_node: TaskNode, 

219 limited_butler: LimitedButler, 

220 /, 

221 quantum: Quantum, 

222 quantum_id: uuid.UUID | None = None, 

223 *, 

224 log_records: ButlerLogRecords | None = None, 

225 ) -> QuantumExecutionResult: 

226 startTime = time.time() 

227 assert quantum.dataId is not None, "Quantum DataId cannot be None" 

228 report = QuantumReport(quantumId=quantum_id, dataId=quantum.dataId, taskLabel=task_node.label) 

229 if self._butler is not None: 

230 log_capture = LogCapture.from_full(self._butler) 

231 else: 

232 log_capture = LogCapture.from_limited(limited_butler) 

233 with log_capture.capture_logging(task_node, quantum, records=log_records) as captureLog: 

234 # Save detailed resource usage before task start to metadata. 

235 quantumMetadata = _TASK_METADATA_TYPE() 

236 logInfo(None, "prep", metadata=quantumMetadata) # type: ignore[arg-type] 

237 

238 _LOG.info( 

239 "Preparing execution of quantum for label=%s dataId=%s.", task_node.label, quantum.dataId 

240 ) 

241 

242 # check whether to skip or delete old outputs, if it returns True 

243 # or raises an exception do not try to store logs, as they may be 

244 # already in butler. 

245 captureLog.store = False 

246 if self._check_existing_outputs(quantum, task_node, limited_butler, captureLog.extra): 

247 _LOG.info( 

248 "Skipping already-successful quantum for label=%s dataId=%s.", 

249 task_node.label, 

250 quantum.dataId, 

251 ) 

252 return QuantumExecutionResult(quantum, report, skipped_existing=True, adjusted_no_work=False) 

253 captureLog.store = True 

254 

255 captureLog.extra.previous_process_quanta.extend(self._previous_process_quanta) 

256 if quantum_id is not None: 

257 self._previous_process_quanta.append(quantum_id) 

258 try: 

259 quantum = self._updated_quantum_inputs(quantum, task_node, limited_butler) 

260 except NoWorkFound as exc: 

261 _LOG.info( 

262 "Nothing to do for task '%s' on quantum %s; saving metadata and skipping: %s", 

263 task_node.label, 

264 quantum.dataId, 

265 str(exc), 

266 ) 

267 quantumMetadata["caveats"] = QuantumSuccessCaveats.from_adjust_quantum_no_work().value 

268 quantumMetadata["outputs"] = [] 

269 # Make empty metadata that looks something like what a 

270 # do-nothing task would write (but we don't bother with empty 

271 # nested PropertySets for subtasks). This is slightly 

272 # duplicative with logic in pipe_base that we can't easily call 

273 # from here; we'll fix this on DM-29761. 

274 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type] 

275 fullMetadata = _TASK_FULL_METADATA_TYPE() 

276 fullMetadata[task_node.label] = _TASK_METADATA_TYPE() 

277 fullMetadata["quantum"] = quantumMetadata 

278 if self._job_metadata is not None: 

279 fullMetadata["job"] = self._job_metadata 

280 self._write_metadata(quantum, fullMetadata, task_node, limited_butler) 

281 return QuantumExecutionResult( 

282 quantum, 

283 report, 

284 skipped_existing=False, 

285 adjusted_no_work=True, 

286 task_metadata=fullMetadata, 

287 ) 

288 

289 # enable lsstDebug debugging 

290 if self._enable_lsst_debug: 

291 try: 

292 _LOG.debug("Will try to import debug.py") 

293 import debug # type: ignore # noqa:F401 

294 except ImportError: 

295 _LOG.warning("No 'debug' module found.") 

296 

297 # Ensure that we are executing a frozen config 

298 task_node.config.freeze() 

299 logInfo(None, "init", metadata=quantumMetadata) # type: ignore[arg-type] 

300 init_input_refs = list(quantum.initInputs.values()) 

301 

302 _LOG.info( 

303 "Constructing task and executing quantum for label=%s dataId=%s.", 

304 task_node.label, 

305 quantum.dataId, 

306 ) 

307 try: 

308 task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs) 

309 logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type] 

310 outputs_put: list[uuid.UUID] = [] 

311 with limited_butler.record_metrics() as butler_metrics, GcMetrics() as gc_metrics: 

312 caveats = self._run_quantum( 

313 task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put 

314 ) 

315 except Exception as e: 

316 _LOG.error( 

317 "Execution of task '%s' on quantum %s failed. Exception %s: %s", 

318 task_node.label, 

319 quantum.dataId, 

320 e.__class__.__name__, 

321 str(e), 

322 ) 

323 captureLog.extra.exception = ExceptionInfo( 

324 type_name=get_full_type_name(e), 

325 message=str(e), 

326 metadata={}, 

327 ) 

328 raise 

329 else: 

330 quantumMetadata["butler_metrics"] = butler_metrics.model_dump() 

331 quantumMetadata["gc_metrics"] = gc_metrics.model_dump() 

332 quantumMetadata["caveats"] = caveats.value 

333 # Stringify the UUID for easier compatibility with 

334 # PropertyList. 

335 finally: 

336 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type] 

337 fullMetadata = task.getFullMetadata() 

338 quantumMetadata["outputs"] = [str(output) for output in outputs_put] 

339 fullMetadata["quantum"] = quantumMetadata 

340 if self._job_metadata is not None: 

341 fullMetadata["job"] = self._job_metadata 

342 captureLog.extra.metadata = fullMetadata 

343 self._write_metadata(quantum, fullMetadata, task_node, limited_butler) 

344 stopTime = time.time() 

345 _LOG.info( 

346 "Execution of task '%s' on quantum %s took %.3f seconds", 

347 task_node.label, 

348 quantum.dataId, 

349 stopTime - startTime, 

350 ) 

351 return QuantumExecutionResult( 

352 quantum, 

353 report, 

354 skipped_existing=False, 

355 adjusted_no_work=False, 

356 task_metadata=fullMetadata, 

357 ) 

358 

359 def _check_existing_outputs( 

360 self, 

361 quantum: Quantum, 

362 task_node: TaskNode, 

363 /, 

364 limited_butler: LimitedButler, 

365 log_extra: _ExecutionLogRecordsExtra, 

366 ) -> bool: 

367 """Decide whether this quantum needs to be executed. 

368 

369 If only partial outputs exist then they are removed if 

370 ``clobberOutputs`` is True, otherwise an exception is raised. 

371 

372 The ``LimitedButler`` is used for everything, and should be set to 

373 ``self.butler`` if no separate ``LimitedButler`` is available. 

374 

375 Parameters 

376 ---------- 

377 quantum : `~lsst.daf.butler.Quantum` 

378 Quantum to check for existing outputs. 

379 task_node : `~.pipeline_graph.TaskNode` 

380 Task definition structure. 

381 limited_butler : `~lsst.daf.butler.LimitedButler` 

382 Butler to use for querying and clobbering. 

383 log_extra : `.log_capture.TaskLogRecordsExtra` 

384 Extra information to attach to log records. 

385 

386 Returns 

387 ------- 

388 exist : `bool` 

389 `True` if ``self.skipExisting`` is defined, and a previous 

390 execution of this quanta appears to have completed successfully 

391 (either because metadata was written or all datasets were written). 

392 `False` otherwise. 

393 

394 Raises 

395 ------ 

396 RuntimeError 

397 Raised if some outputs exist and some not. 

398 """ 

399 if self._assume_no_existing_outputs: 

400 return False 

401 

402 if self._skip_existing: 

403 _LOG.debug( 

404 "Checking existence of metadata from previous execution of label=%s dataId=%s.", 

405 task_node.label, 

406 quantum.dataId, 

407 ) 

408 # Metadata output exists; this is sufficient to assume the previous 

409 # run was successful and should be skipped. 

410 [metadata_ref] = quantum.outputs[task_node.metadata_output.dataset_type_name] 

411 if metadata_ref is not None: 

412 if limited_butler.stored(metadata_ref): 

413 return True 

414 

415 # Find and prune (partial) outputs if `self.clobberOutputs` is set. 

416 _LOG.debug( 

417 "Looking for existing outputs in the way for label=%s dataId=%s.", task_node.label, quantum.dataId 

418 ) 

419 ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values())) 

420 if task_node.log_output is not None: 

421 (log_ref,) = quantum.outputs[task_node.log_output.dataset_type_name] 

422 if ref_dict[log_ref]: 

423 _LOG.debug( 

424 "Attaching logs from previous attempt on label=%s dataId=%s.", 

425 task_node.label, 

426 quantum.dataId, 

427 ) 

428 log_extra.attach_previous_attempt(limited_butler.get(log_ref)) 

429 existingRefs = [ref for ref, exists in ref_dict.items() if exists] 

430 missingRefs = [ref for ref, exists in ref_dict.items() if not exists] 

431 if existingRefs: 

432 if not missingRefs: 

433 # Full outputs exist. 

434 if self._skip_existing: 

435 return True 

436 elif self._clobber_outputs: 

437 _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs) 

438 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) 

439 else: 

440 raise RuntimeError( 

441 f"Complete outputs exists for a quantum {quantum} " 

442 "and neither clobberOutputs nor skipExisting is set: " 

443 f"existingRefs={existingRefs}" 

444 ) 

445 else: 

446 # Partial outputs from a failed quantum. 

447 _LOG.debug( 

448 "Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s", 

449 quantum, 

450 existingRefs, 

451 missingRefs, 

452 ) 

453 if self._clobber_outputs: 

454 # only prune 

455 _LOG.info("Removing partial outputs for task %s: %s", task_node.label, existingRefs) 

456 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) 

457 return False 

458 else: 

459 raise RuntimeError( 

460 "Registry inconsistency while checking for existing quantum outputs:" 

461 f" quantum={quantum} existingRefs={existingRefs}" 

462 f" missingRefs={missingRefs}" 

463 ) 

464 

465 # By default always execute. 

466 return False 

467 

468 def _updated_quantum_inputs( 

469 self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler 

470 ) -> Quantum: 

471 """Update quantum with extra information, returns a new updated 

472 Quantum. 

473 

474 Some methods may require input DatasetRefs to have non-None 

475 ``dataset_id``, but in case of intermediate dataset it may not be 

476 filled during QuantumGraph construction. This method will retrieve 

477 missing info from registry. 

478 

479 Parameters 

480 ---------- 

481 quantum : `~lsst.daf.butler.Quantum` 

482 Single Quantum instance. 

483 task_node : `~.pipeline_graph.TaskNode` 

484 Task definition structure. 

485 limited_butler : `~lsst.daf.butler.LimitedButler` 

486 Butler to use for querying. 

487 

488 Returns 

489 ------- 

490 update : `~lsst.daf.butler.Quantum` 

491 Updated Quantum instance. 

492 """ 

493 anyChanges = False 

494 updatedInputs: defaultdict[DatasetType, list] = defaultdict(list) 

495 for key, refsForDatasetType in quantum.inputs.items(): 

496 _LOG.debug( 

497 "Checking existence of input '%s' for label=%s dataId=%s.", 

498 key.name, 

499 task_node.label, 

500 quantum.dataId, 

501 ) 

502 toCheck = [] 

503 newRefsForDatasetType = updatedInputs[key] 

504 for ref in refsForDatasetType: 

505 if self._should_assume_exists(quantum, ref): 

506 newRefsForDatasetType.append(ref) 

507 else: 

508 toCheck.append(ref) 

509 if not toCheck: 

510 _LOG.debug( 

511 "Assuming overall input '%s' is present without checks for label=%s dataId=%s.", 

512 key.name, 

513 task_node.label, 

514 quantum.dataId, 

515 ) 

516 continue 

517 stored = limited_butler.stored_many(toCheck) 

518 for ref in toCheck: 

519 if stored[ref]: 

520 newRefsForDatasetType.append(ref) 

521 else: 

522 # This should only happen if a predicted intermediate was 

523 # not actually produced upstream, but 

524 # datastore misconfigurations can unfortunately also land 

525 # us here. 

526 _LOG.info("No dataset artifact found for %s", ref) 

527 continue 

528 if len(newRefsForDatasetType) != len(refsForDatasetType): 

529 anyChanges = True 

530 # If we removed any input datasets, let the task check if it has enough 

531 # to proceed and/or prune related datasets that it also doesn't 

532 # need/produce anymore. It will raise NoWorkFound if it can't run, 

533 # which we'll let propagate up. This is exactly what we run during QG 

534 # generation, because a task shouldn't care whether an input is missing 

535 # because some previous task didn't produce it, or because it just 

536 # wasn't there during QG generation. 

537 namedUpdatedInputs = NamedKeyDict[DatasetType, list[DatasetRef]](updatedInputs.items()) 

538 helper = AdjustQuantumHelper(namedUpdatedInputs, quantum.outputs) 

539 if anyChanges: 

540 _LOG.debug("Running adjustQuantum for label=%s dataId=%s.", task_node.label, quantum.dataId) 

541 assert quantum.dataId is not None, "Quantum DataId cannot be None" 

542 helper.adjust_in_place(task_node.get_connections(), label=task_node.label, data_id=quantum.dataId) 

543 return Quantum( 

544 taskName=quantum.taskName, 

545 taskClass=quantum.taskClass, 

546 dataId=quantum.dataId, 

547 initInputs=quantum.initInputs, 

548 inputs=helper.inputs, 

549 outputs=helper.outputs, 

550 ) 

551 

552 def _run_quantum( 

553 self, 

554 task: PipelineTask, 

555 quantum: Quantum, 

556 task_node: TaskNode, 

557 /, 

558 limited_butler: LimitedButler, 

559 quantum_id: uuid.UUID | None, 

560 ids_put: list[uuid.UUID], 

561 ) -> QuantumSuccessCaveats: 

562 """Execute task on a single quantum. 

563 

564 Parameters 

565 ---------- 

566 task : `PipelineTask` 

567 Task object. 

568 quantum : `~lsst.daf.butler.Quantum` 

569 Single Quantum instance. 

570 task_node : `~.pipeline_graph.TaskNode` 

571 Task definition structure. 

572 limited_butler : `~lsst.daf.butler.LimitedButler` 

573 Butler to use for dataset I/O. 

574 quantum_id : `uuid.UUID` or `None` 

575 ID of the quantum being executed. 

576 ids_put : list[ `uuid.UUID` ] 

577 List to be populated with the dataset IDs that were written by this 

578 quantum. This is an output parameter in order to allow it to be 

579 populated even when an exception is raised. 

580 

581 Returns 

582 ------- 

583 flags : `QuantumSuccessCaveats` 

584 Flags that describe qualified successes. 

585 """ 

586 flags = QuantumSuccessCaveats.NO_CAVEATS 

587 

588 # Create a butler that operates in the context of a quantum 

589 butlerQC = QuantumContext(limited_butler, quantum, resources=self._resources, quantum_id=quantum_id) 

590 

591 # Get the input and output references for the task 

592 inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum) 

593 

594 # Call task runQuantum() method. 

595 try: 

596 task.runQuantum(butlerQC, inputRefs, outputRefs) 

597 except NoWorkFound as err: 

598 # Not an error, just an early exit. 

599 _LOG.info( 

600 "Task '%s' on quantum %s exited early with no work found: %s.", 

601 task_node.label, 

602 quantum.dataId, 

603 str(err), 

604 ) 

605 flags |= err.FLAGS 

606 except AnnotatedPartialOutputsError as caught: 

607 error: BaseException 

608 if caught.__cause__ is None: 

609 _LOG.error( 

610 "Incorrect use of AnnotatedPartialOutputsError: no chained exception found.", 

611 task_node.label, 

612 quantum.dataId, 

613 ) 

614 error = caught 

615 else: 

616 error = caught.__cause__ 

617 if self._raise_on_partial_outputs: 

618 # Note: this is a real edge case that required some 

619 # experimentation: without 'from None' below, this raise would 

620 # produce a "while one exception was being handled, another was 

621 # raised" traceback involving AnnotatedPartialOutputsError. 

622 # With the 'from None', we get just the error chained to it, as 

623 # desired. 

624 raise error from None 

625 else: 

626 _LOG.error( 

627 "Task '%s' on quantum %s exited with partial outputs; " 

628 "considering this a qualified success and proceeding.", 

629 task_node.label, 

630 quantum.dataId, 

631 ) 

632 _LOG.error(error, exc_info=error) 

633 flags |= caught.FLAGS 

634 finally: 

635 ids_put.extend(output[2] for output in butlerQC.outputsPut) 

636 if not butlerQC.outputsPut: 

637 flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

638 if not butlerQC.outputsPut == butlerQC.allOutputs: 

639 flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

640 return flags 

641 

642 def _write_metadata( 

643 self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler 

644 ) -> None: 

645 # DatasetRef has to be in the Quantum outputs, can lookup by name 

646 try: 

647 [ref] = quantum.outputs[task_node.metadata_output.dataset_type_name] 

648 except LookupError as exc: 

649 raise InvalidQuantumError( 

650 "Quantum outputs is missing metadata dataset type " 

651 f"{task_node.metadata_output.dataset_type_name};" 

652 " this could happen due to inconsistent options between QuantumGraph generation" 

653 " and execution" 

654 ) from exc 

655 limited_butler.put(metadata, ref) 

656 

657 def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None: 

658 """Report whether the given dataset can be assumed to exist because 

659 some previous check reported that it did. 

660 

661 If this is `True` for a dataset does not in fact exist anymore, that's 

662 an unexpected problem that we want to raise as an exception, and 

663 definitely not a case where some predicted output just wasn't produced. 

664 We can't always tell the difference, but in this case we can. 

665 

666 Parameters 

667 ---------- 

668 quantum : `Quantum` 

669 Quantum being processed. 

670 ref : `lsst.daf.butler.DatasetRef` 

671 Reference to the input dataset. 

672 

673 Returns 

674 ------- 

675 exists : `bool` or `None` 

676 `True` if this dataset is definitely an overall input, `False` if 

677 some other quantum in the graph is expected to produce it, and 

678 `None` if the answer could not be determined. 

679 """ 

680 if quantum.datastore_records: 

681 for datastore_record_data in quantum.datastore_records.values(): 

682 if ref.id in datastore_record_data.records: 

683 return True 

684 return False 

685 return None