Coverage for tests/test_single_quantum_executor.py: 14%
97 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:23 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:23 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from __future__ import annotations
30import os
31import time
32import unittest
34import lsst.pipe.base.automatic_connection_constants as acc
35from lsst.pipe.base.gc_metrics import GcMetrics
36from lsst.pipe.base.resource_usage import QuantumResourceUsage
37from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor
38from lsst.pipe.base.tests.mocks import InMemoryRepo
40TESTDIR = os.path.abspath(os.path.dirname(__file__))
43class SingleQuantumExecutorTestCase(unittest.TestCase):
44 """Tests for SingleQuantumExecutor implementation."""
46 def test_simple_execute(self) -> None:
47 """Run execute() method in simplest setup."""
48 helper = InMemoryRepo("base.yaml")
49 self.enterContext(helper)
50 helper.add_task()
51 qgraph = helper.make_quantum_graph_builder().build(attach_datastore_records=False)
52 executor, butler = helper.make_single_quantum_executor()
53 nQuanta = 1
54 nodes = list(qgraph)
55 self.assertEqual(len(nodes), nQuanta)
56 node = nodes[0]
57 t1 = time.time()
58 executor.execute(node.task_node, node.quantum)
59 t2 = time.time()
60 # There must be one dataset of task's output connection
61 self.assertEqual(len(butler.get_datasets("dataset_auto1")), 1)
62 # Test that we can construct resource usage information from the
63 # metadata.
64 (md,) = butler.get_datasets(acc.METADATA_OUTPUT_TEMPLATE.format(label="task_auto1")).values()
65 ru = QuantumResourceUsage.from_task_metadata(md)
66 self.assertIsNotNone(ru)
67 self.assertGreater(ru.memory, 0)
68 self.assertGreater(ru.prep_time, 0)
69 self.assertGreater(ru.init_time, 0)
70 self.assertGreater(ru.run_time, 0)
71 self.assertGreater(ru.run_time_cpu, 0)
72 self.assertGreater(ru.total_time, 0)
73 self.assertLess(ru.total_time, t2 - t1)
75 # Check that GC metrics are filled.
76 gc_metrics = GcMetrics.from_task_metadata(md)
77 self.assertIsNotNone(gc_metrics)
78 self.assertTrue(gc_metrics.start_isenabled)
79 self.assertTrue(gc_metrics.end_isenabled)
80 self.assertEqual(len(gc_metrics.start_threshold), 3)
81 self.assertEqual(len(gc_metrics.end_threshold), 3)
82 self.assertEqual(len(gc_metrics.start_count), 3)
83 self.assertEqual(len(gc_metrics.end_count), 3)
84 self.assertEqual(set(gc_metrics.start_stats), {"collections", "collected", "uncollectable"})
85 self.assertEqual(set(gc_metrics.end_stats), {"collections", "collected", "uncollectable"})
87 def test_skip_existing_execute(self) -> None:
88 """Run execute() method twice, with skip_existing_in."""
89 helper = InMemoryRepo("base.yaml")
90 self.enterContext(helper)
91 helper.add_task()
92 qgraph = helper.make_quantum_graph_builder().build(attach_datastore_records=False)
93 executor, butler = helper.make_single_quantum_executor()
94 nQuanta = 1
95 nodes = list(qgraph)
96 self.assertEqual(len(nodes), nQuanta)
97 node = nodes[0]
98 executor.execute(node.task_node, node.quantum)
100 outputs1 = butler.get_datasets("dataset_auto1")
101 self.assertEqual(len(outputs1), 1)
102 ref1, obj1 = outputs1.popitem()
104 # Re-run it with skip_existing, it should not run. Note that if it did
105 # run (and called 'butler.put') that would raise an exception.
106 executor = SingleQuantumExecutor(limited_butler_factory=butler.factory, skip_existing=True)
107 executor.execute(node.task_node, node.quantum)
109 outputs2 = butler.get_datasets("dataset_auto1")
110 self.assertEqual(len(outputs2), 1)
111 ref2, obj2 = outputs2.popitem()
112 self.assertEqual(ref1, ref2)
113 # Objects should be the same (but not identities, because the butler
114 # will copy them).
115 self.assertEqual(obj1, obj2)
117 def test_clobber_outputs_execute(self) -> None:
118 """Run execute() method twice, with clobber_outputs."""
119 helper = InMemoryRepo("base.yaml")
120 self.enterContext(helper)
121 helper.add_task()
122 qgraph = helper.make_quantum_graph_builder().build(attach_datastore_records=False)
123 executor, butler = helper.make_single_quantum_executor()
124 nQuanta = 1
125 nodes = list(qgraph)
126 self.assertEqual(len(nodes), nQuanta)
127 node = nodes[0]
128 executor.execute(node.task_node, node.quantum)
130 outputs1 = butler.get_datasets("dataset_auto1")
131 self.assertEqual(len(outputs1), 1)
132 ref1, obj1 = outputs1.popitem()
134 # Remove the dataset ourself, and replace it with something
135 # different so we can check later whether it got replaced.
136 butler.pruneDatasets([ref1], disassociate=False, unstore=True, purge=False)
137 obj1.quantum = None
138 butler.put(obj1, ref1)
140 # Re-run it with clobber_outputs and skip_existing, it should not
141 # clobber but should skip instead.
142 executor = SingleQuantumExecutor(
143 limited_butler_factory=butler.factory, skip_existing=True, clobber_outputs=True
144 )
145 executor.execute(node.task_node, node.quantum)
146 outputs2 = butler.get_datasets("dataset_auto1")
147 self.assertEqual(len(outputs2), 1)
148 ref2, obj2 = outputs2.popitem()
149 self.assertEqual(ref1, ref2)
150 self.assertEqual(obj1, obj2)
152 # Re-run it with clobber_outputs but without skip_existing_in, it
153 # should clobber.
154 executor = SingleQuantumExecutor(limited_butler_factory=butler.factory, clobber_outputs=True)
155 executor.execute(node.task_node, node.quantum)
156 outputs3 = butler.get_datasets("dataset_auto1")
157 self.assertEqual(len(outputs3), 1)
158 ref3, obj3 = outputs3.popitem()
159 self.assertEqual(ref1, ref3)
160 self.assertNotEqual(obj1, obj3)
163if __name__ == "__main__":
164 unittest.main()