Coverage for tests / test_pipelines.py: 14%
122 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-22 08:08 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-22 08:08 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import itertools
23import tempfile
24import unittest
26import lsst.daf.butler.tests as butlerTests
27import lsst.pipe.base
28from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name
29import lsst.utils
30import lsst.utils.tests
32from lsst.resources import ResourcePath
35class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase):
36 """Tests of the self-consistency of our pipeline definitions.
37 """
38 def setUp(self):
39 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True)
40 # Each pipeline file should have a subset that represents it in
41 # higher-level pipelines.
42 self.synonyms = {"ApPipe.yaml": "apPipe",
43 "ApPipeWithIsrTaskLSST.yaml": "apPipe",
44 "ApPipeWithPreconvolution.yaml": "apPipe",
45 "ApPipeWithFakes.yaml": "apPipe",
46 "SingleFrame.yaml": "singleFrame",
47 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame",
48 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr",
49 "RunIsrForCrosstalkSources.yaml": "runOverscan",
50 }
52 def test_graph_build(self):
53 """Test that each pipeline definition file can be
54 used to build a graph.
55 """
56 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
57 for file in files:
58 if "QuickTemplate" in file.path:
59 # Our QuickTemplate definition cannot be tested here because it
60 # depends on drp_tasks, which we cannot make a dependency here.
61 continue
62 if "PromptTemplate" in file.path:
63 # Our PromptTemplate definition cannot be tested here because it
64 # depends on drp_tasks, which we cannot make a dependency here.
65 continue
66 with self.subTest(file=str(file)):
67 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
68 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
69 # If this fails, it will produce a useful error message.
70 pipeline.to_graph()
72 def test_datasets(self):
73 files = ResourcePath.findFileResources(
74 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$"
75 )
76 for file in files:
77 if "QuickTemplate" in file.path:
78 # Our QuickTemplate definition cannot be tested here because it
79 # depends on drp_tasks, which we cannot make a dependency here.
80 continue
81 if "injection/" in file.path:
82 # The source-injection post-processing ingredient is a partial
83 # pipeline merged into full AP pipelines at build time;
84 # it is validated separately by test_injection_ingredient.
85 continue
86 with self.subTest(file=str(file)):
87 expected_inputs = {
88 # ISR
89 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc",
90 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer",
91 "opticsTransmission", "filterTransmission", "atmosphereTransmission",
92 "illumMaskedImage", "deferredChargeCalib",
93 # ISR-LSST
94 "bfk", "cti", "dnlLUT", "gain_correction",
95 # Everything else
96 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
97 "template_coadd", "pretrainedModelPackage", "dia_source_apdb"
98 }
99 # Detect source-injection pipelines by task label rather than
100 # relying on filename conventions.
101 temp_pipeline = lsst.pipe.base.Pipeline.from_uri(file)
102 temp_pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
103 if "injectVisit" in temp_pipeline.task_labels:
104 expected_inputs.add("injection_catalog")
105 tester = PipelineStepTester(
106 filename=file,
107 step_suffixes=[""], # Test full pipeline
108 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False),
109 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False),
110 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False),
111 ],
112 expected_inputs=expected_inputs,
113 # Pipeline outputs highly in flux, don't test
114 expected_outputs=set(),
115 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml",
116 },
117 )
118 # Tester modifies Butler registry, so need a fresh repo every time
119 with tempfile.TemporaryDirectory() as tempRepo:
120 butler = butlerTests.makeTestRepo(tempRepo)
121 tester.run(butler, self)
123 def test_whole_subset(self):
124 """Test that each pipeline's synonymous subset includes all tasks,
125 including those imported from other files.
126 """
127 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
128 for file in files:
129 if "QuickTemplate" in file.path:
130 # Our QuickTemplate definition cannot be tested here because it
131 # depends on drp_tasks, which we cannot make a dependency here.
132 continue
133 elif "injection/" in file.path:
134 # PostInjectedTasksApPipe is not actually an AP pipeline
135 continue
136 elif "ApdbDeduplication" in file.path:
137 # The task to export catalogs from the APDB and re-run
138 # association is not intended to be part of Prompt Processing
139 # or batch AP pipeline runs.
140 continue
141 elif "PromptTemplate" in file.path:
142 # Our PromptTemplate definition cannot be tested here because it
143 # depends on drp_tasks, which we cannot make a dependency here.
144 continue
145 with self.subTest(file=str(file)):
146 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
147 subset = self.synonyms.get(file.basename(), "<unknown_synonym>")
148 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels),
149 msg=f"These tasks are missing from subset '{subset}'")
151 def test_ap_pipe_subsets(self):
152 """Test the unique subsets of ApPipe.
153 """
154 files = ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$")
155 required_subsets = {"preload", "prompt", "afterburner"}
156 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a
157 # very deliberate exception; see RFC-997.
158 no_subset_wanted = {"getRegionTimeFromVisit"}
160 for file in files:
161 if "injection/" in file.path:
162 # PostInjectedTasksApPipe is not actually an AP pipeline
163 continue
164 with self.subTest(file=str(file)):
165 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
166 # Do all steps exist?
167 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets,
168 msg="An AP pipeline is missing subsets "
169 f"{required_subsets - pipeline.subsets.keys()}.")
170 # Is each task part of exactly one step?
171 for set1, set2 in itertools.product(required_subsets, required_subsets):
172 if set1 == set2:
173 continue
174 tasks1 = pipeline.subsets[set1]
175 tasks2 = pipeline.subsets[set2]
176 self.assertTrue(tasks1.isdisjoint(tasks2),
177 msg=f"Subsets '{set1}' and '{set2}' share tasks "
178 f"{tasks1.intersection(tasks2)}.")
179 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets])
180 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted,
181 msg=f"These tasks are not in any of the subsets {required_subsets}.")
183 def test_injection_ingredient(self):
184 """Test the source-injection post-processing ingredient pipeline.
186 PostInjectedTasksApPipe is a partial pipeline merged into full AP
187 pipelines at build time by make_injection_pipeline. This test
188 validates that it can build a graph and contains the expected tasks.
189 """
190 ingredient = self.path.join("_ingredients").join("injection").join("PostInjectedTasksApPipe.yaml")
191 with self.subTest(file=str(ingredient)):
192 pipeline = lsst.pipe.base.Pipeline.from_uri(ingredient)
193 expected_tasks = {
194 "injectedMatchDiaSrc",
195 "injectedMatchAssocDiaSrc",
196 "consolidateMatchDiaSrc",
197 "consolidateMatchAssocDiaSrc",
198 }
199 self.assertGreaterEqual(
200 set(pipeline.task_labels),
201 expected_tasks,
202 msg="Source-injection post-processing ingredient is missing expected tasks.",
203 )
205 def test_generated_pipeline_readiness(self):
206 """Test that the generated ApPipeWithFakes ingredient exists and is valid.
208 pipelines/_ingredients/ApPipeWithFakes.yaml is generated at build time
209 by make_injection_pipeline (invoked via scons). This test verifies that
210 generation occurred before pipeline tests run, and that the generated
211 pipeline includes the source-injection task. If this test is skipped,
212 run 'scons' in the ap_pipe root directory first.
213 """
214 generated = self.path.join("_ingredients/ApPipeWithFakes.yaml")
215 if not generated.exists():
216 # fail the test with a message that explains how to fix the problem,
217 # rather than silently skipping it
218 self.fail(
219 f"{generated} has not been generated yet. "
220 "Run 'scons' in the ap_pipe root directory to generate it."
221 )
222 with self.subTest(file=str(generated)):
223 pipeline = lsst.pipe.base.Pipeline.from_uri(generated)
224 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
225 self.assertIn(
226 "injectVisit",
227 pipeline.task_labels,
228 msg="Generated ApPipeWithFakes.yaml is missing the 'injectVisit' task.",
229 )
231 def test_preconvolution_isr_matches_ap_pipe(self):
232 """Test that, for each instrument, ApPipeWithPreconvolution defines
233 the same isr task (class and config) as the corresponding ApPipe.
235 Preconvolution changes only image subtraction and DIA-source
236 detection; instrument signature removal must be unaffected.
237 """
238 files = [
239 f for f in ResourcePath.findFileResources(
240 [self.path], file_filter=r"^ApPipeWithPreconvolution\.yaml$"
241 )
242 if "_ingredients" not in f.path
243 ]
244 # Sanity-check that this test actually has cameras to compare.
245 self.assertGreater(len(files), 0,
246 msg="No camera-specific ApPipeWithPreconvolution.yaml files found.")
248 for precon_file in files:
249 with self.subTest(file=str(precon_file)):
250 base_file = precon_file.dirname().join("ApPipe.yaml")
251 self.assertTrue(base_file.exists(),
252 msg=f"Expected sibling ApPipe.yaml next to {precon_file}: "
253 f"{base_file} does not exist.")
255 precon = lsst.pipe.base.Pipeline.from_uri(precon_file)
256 base = lsst.pipe.base.Pipeline.from_uri(base_file)
257 # apdb_config has no default and must be set before to_graph().
258 precon.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
259 base.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
261 precon_isr = precon.to_graph().tasks["isr"]
262 base_isr = base.to_graph().tasks["isr"]
264 self.assertEqual(precon_isr.task_class_name, base_isr.task_class_name,
265 msg=f"isr task class differs between ApPipe.yaml and "
266 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}.")
267 # Can't just do `assertEqual(precon_isr, base_isr)` since
268 # Task nodes are intentionally not equality comparable.
269 self.assertTrue(
270 base_isr.config.compare(precon_isr.config, shortcut=False),
271 msg=f"isr task config differs between ApPipe.yaml and "
272 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}."
273 )
275 def test_inherited_subsets(self):
276 """Test that instrument-specific pipelines have all the subsets of their
277 generic counterparts.
279 Note that this does not check inheritance *within* `_ingredients`!
280 """
281 files = [
282 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
283 if "_ingredients" not in f.path
284 ]
285 for file in files:
286 if "QuickTemplate" in file.path:
287 # Our QuickTemplate definition cannot be tested here because it
288 # depends on drp_tasks, which we cannot make a dependency here.
289 continue
290 with self.subTest(file=str(file)):
291 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename())
292 if not generic.exists():
293 continue
294 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys()
295 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys()
296 self.assertGreaterEqual(special_subsets, generic_subsets,
297 msg="The instrument-specific pipeline is missing subsets "
298 f"{generic_subsets - special_subsets}.")
301class MemoryTester(lsst.utils.tests.MemoryTestCase):
302 pass
305def setup_module(module):
306 lsst.utils.tests.init()
309if __name__ == "__main__": 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 lsst.utils.tests.init()
311 unittest.main()