Coverage for python / lsst / images / tests / extract_legacy_test_data.py: 0%
95 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:56 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:56 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ()
16import os
18import numpy as np
20try:
21 import click
23 from lsst.afw.fits import (
24 CompressionAlgorithm,
25 CompressionOptions,
26 DitherAlgorithm,
27 QuantizationOptions,
28 ScalingAlgorithm,
29 )
30 from lsst.cell_coadds import MultipleCellCoadd
31 from lsst.daf.butler import Butler, DatasetRef
32 from lsst.geom import Box2I, Extent2I, Point2I
33 from lsst.utils import getPackageDir
34except ImportError as err:
35 err.add_note(
36 "Updating the test data requires a full Rubin development enviroment with at least "
37 "'click', 'afw', 'obs_base', 'meas_extensions_psfex', 'meas_extensions_piff' and 'cell_coadds' "
38 "importable. This is not necessary for just running the tests."
39 )
40 raise
43from ._data_ids import DP2_COADD_DATA_ID, DP2_COADD_MISSING_CELL, DP2_VISIT_DETECTOR_DATA_ID
46def extract_visit_image(
47 butler: Butler,
48 output_path: str,
49 dataset_ref: DatasetRef,
50 shuffle: bool,
51) -> None:
52 """Load a subimage of a processed visit image from a butler repository
53 and save it to testdata_images.
54 """
55 visit_image = butler.get(dataset_ref, parameters={"bbox": Box2I(Point2I(5, 4), Extent2I(256, 250))})
56 if shuffle:
57 indices = np.arange(visit_image.image.array.size, dtype=int)
58 rng = np.random.default_rng()
59 rng.shuffle(indices)
60 visit_image.image.array[:, :] = visit_image.image.array.flat[indices].reshape(250, 256)
61 visit_image.mask.array[:, :] = visit_image.mask.array.flat[indices].reshape(250, 256)
62 visit_image.variance.array[:, :] = visit_image.variance.array.flat[indices].reshape(250, 256)
63 float_compression = CompressionOptions(
64 algorithm=CompressionAlgorithm.RICE_1,
65 tile_height=50,
66 tile_width=64,
67 quantization=QuantizationOptions(
68 dither=DitherAlgorithm.SUBTRACTIVE_DITHER_2,
69 scaling=ScalingAlgorithm.STDEV_MASKED,
70 level=16,
71 seed=747,
72 ),
73 )
74 mask_compression = CompressionOptions(
75 algorithm=CompressionAlgorithm.GZIP_2,
76 tile_height=50,
77 tile_width=64,
78 quantization=None,
79 )
80 os.makedirs(os.path.dirname(output_path), exist_ok=True)
81 visit_image.writeFits(
82 output_path,
83 imageOptions=float_compression,
84 maskOptions=mask_compression,
85 varianceOptions=float_compression,
86 )
89def extract_visit_image_background(
90 butler: Butler,
91 output_path: str,
92 dataset_ref: DatasetRef,
93) -> None:
94 """Load the background model of a processed visit image from a butler
95 repository and save it to testdata_images.
96 """
97 visit_image_background = butler.get(dataset_ref)
98 visit_image_background.writeFits(output_path)
101def extract_visit_summary(
102 butler: Butler,
103 output_path: str,
104 dataset_ref: DatasetRef,
105) -> None:
106 """Load a visit_summary dataset and save it to testdata_images."""
107 visit_summary = butler.get(dataset_ref)
108 visit_summary.writeFits(output_path)
111def extract_cell_coadd(
112 butler: Butler,
113 output_path: str,
114 dataset_ref: DatasetRef,
115 shuffle: bool,
116) -> None:
117 """Load a subimage of a cell coadd from a butler repository and save it
118 to testdata_images.
119 """
120 full_cell_coadd = butler.get(dataset_ref)
121 cell_coadd = MultipleCellCoadd(
122 [
123 full_cell_coadd.cells[x, y]
124 for y in range(7, 11)
125 for x in range(5, 8)
126 if {"i": y, "j": x} != DP2_COADD_MISSING_CELL
127 ],
128 grid=full_cell_coadd.grid,
129 outer_cell_size=full_cell_coadd.outer_cell_size,
130 psf_image_size=full_cell_coadd.psf_image_size,
131 common=full_cell_coadd.common,
132 )
133 if shuffle:
134 rng = np.random.default_rng()
135 for cell in cell_coadd.cells.values():
136 indices = np.arange(cell.outer.image.array.size, dtype=int)
137 rng.shuffle(indices)
138 cell.outer.image.array[:, :] = cell.outer.image.array.flat[indices].reshape(150, 150)
139 cell.outer.mask.array[:, :] = cell.outer.mask.array.flat[indices].reshape(150, 150)
140 cell.outer.variance.array[:, :] = cell.outer.variance.array.flat[indices].reshape(150, 150)
141 for n in cell.outer.noise_realizations:
142 n.array[:, :] = n.array.flat[indices].reshape(150, 150)
143 if cell.outer.mask_fractions is not None:
144 cell.outer.mask_fractions.array[:, :] = cell.outer.mask_fractions.array.flat[indices].reshape(
145 150, 150
146 )
147 os.makedirs(os.path.dirname(output_path), exist_ok=True)
148 cell_coadd.writeFits(output_path)
151def extract_camera(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None:
152 """Read camera geometry from a butler repository and save it to
153 testdata_images.
154 """
155 camera = butler.get(dataset_ref)
156 os.makedirs(os.path.dirname(output_path), exist_ok=True)
157 camera.writeFits(output_path)
160def extract_skymap(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None:
161 """Read a skymap definition from a butler repository and save it to
162 testdata_images.
163 """
164 os.makedirs(os.path.dirname(output_path), exist_ok=True)
165 (path,) = butler.retrieveArtifacts(
166 [dataset_ref],
167 destination=os.path.dirname(output_path),
168 transfer="copy",
169 preserve_path=False,
170 overwrite=True,
171 )
172 if path.ospath != output_path:
173 os.rename(path.ospath, output_path)
176def find_dataset_or_raise(
177 butler: Butler, dataset_type: str, *, collections: str | None = None, **kwargs
178) -> DatasetRef:
179 """Call `lsst.daf.butler.Butler.find_dataset` with the given arguments and
180 raise `LookupError` if it returns `None`.
181 """
182 ref = butler.find_dataset(dataset_type, collections=collections, **kwargs)
183 if ref is None:
184 raise LookupError(f"Could not find dataset {dataset_type} with data ID {kwargs}.")
185 return ref
188@click.group("extract_test_data")
189def extract_test_data() -> None:
190 pass
193@extract_test_data.command("dp2")
194@click.option("-b", "--butler-repo", help="Path to the butler repository.")
195@click.option("-d", "--testdata-dir", help="Path to the testdata_images directory.")
196@click.option(
197 "-c",
198 "--collection",
199 default="LSSTCam/runs/DRP/DP2/v30_0_0/DM-53881/stage4",
200 help="Collection to use for most data products.",
201)
202@click.option(
203 "--wcs-collection",
204 default="LSSTCam/runs/DRP/v30_0_0/DM-53877",
205 help="Collection to search for visit_summary datasets used to update the WCS.",
206)
207@click.option(
208 "--visit-images/--no-visit-images",
209 default=True,
210 help="Whether to extract [preliminary_]visit_image datasets.",
211)
212@click.option(
213 "--coadds/--no-coadds",
214 default=True,
215 help="Whether to extract coadd datasets.",
216)
217@click.option(
218 "--camera/--no-camera",
219 default=True,
220 help="Whether to extract the camera.",
221)
222@click.option(
223 "--skymap/--no-skymap",
224 default=True,
225 help="Whether to extract the skymap.",
226)
227def extract_dp2(
228 butler_repo: str | None,
229 testdata_dir: str | None,
230 collection: str,
231 wcs_collection: str,
232 *,
233 visit_images: bool,
234 coadds: bool,
235 camera: bool,
236 skymap: bool,
237) -> None:
238 """Extract test data from a butler repository."""
239 if butler_repo is None:
240 butler_repo = "dp2_prep"
241 if testdata_dir is None:
242 testdata_dir = getPackageDir("testdata_images")
243 butler = Butler.from_config(butler_repo, collections=[collection])
244 if visit_images:
245 extract_visit_image(
246 butler,
247 os.path.join(testdata_dir, "dp2", "legacy", "visit_image.fits"),
248 find_dataset_or_raise(butler, "visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
249 shuffle=True,
250 )
251 extract_visit_image_background(
252 butler,
253 os.path.join(testdata_dir, "dp2", "legacy", "visit_image_background.fits"),
254 find_dataset_or_raise(butler, "visit_image_background", **DP2_VISIT_DETECTOR_DATA_ID),
255 )
256 extract_visit_summary(
257 butler,
258 os.path.join(testdata_dir, "dp2", "legacy", "visit_summary.fits"),
259 find_dataset_or_raise(butler, "visit_summary", **DP2_VISIT_DETECTOR_DATA_ID),
260 )
261 extract_visit_image(
262 butler,
263 os.path.join(testdata_dir, "dp2", "legacy", "preliminary_visit_image.fits"),
264 find_dataset_or_raise(butler, "preliminary_visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
265 shuffle=True,
266 )
267 extract_visit_image_background(
268 butler,
269 os.path.join(testdata_dir, "dp2", "legacy", "preliminary_visit_image_background.fits"),
270 find_dataset_or_raise(butler, "preliminary_visit_image_background", **DP2_VISIT_DETECTOR_DATA_ID),
271 )
272 if coadds:
273 extract_cell_coadd(
274 butler,
275 os.path.join(
276 testdata_dir,
277 "dp2",
278 "legacy",
279 "deep_coadd_cell_predetection.fits",
280 ),
281 find_dataset_or_raise(butler, "deep_coadd_cell_predetection", **DP2_COADD_DATA_ID),
282 shuffle=True,
283 )
284 if skymap:
285 extract_skymap(
286 butler,
287 os.path.join(
288 testdata_dir,
289 "dp2",
290 "legacy",
291 "skyMap.pickle",
292 ),
293 find_dataset_or_raise(butler, "skyMap", skymap=DP2_COADD_DATA_ID["skymap"]),
294 )
295 if camera:
296 extract_camera(
297 butler,
298 os.path.join(
299 testdata_dir,
300 "dp2",
301 "legacy",
302 "camera.fits",
303 ),
304 find_dataset_or_raise(butler, "camera", instrument="LSSTCam"),
305 )
308if __name__ == "__main__":
309 extract_test_data()