Coverage for python / lsst / images / tests / _roundtrip.py: 29%
158 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 08:52 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 08:52 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("RoundtripFits", "RoundtripJson", "RoundtripNdf", "TemporaryButler")
16import tempfile
17import unittest
18import uuid
19from abc import ABC, abstractmethod
20from contextlib import ExitStack
21from typing import TYPE_CHECKING, Any, Self, TypeVar
23import astropy.io.fits
24from pydantic_core import from_json
26if TYPE_CHECKING:
27 import h5py
29try:
30 from lsst.daf.butler import Butler, DataCoordinate, DatasetProvenance, DatasetRef, DatasetType
32 HAVE_BUTLER = True
33except ImportError:
34 HAVE_BUTLER = False
36from .. import fits, json
37from .._generalized_image import GeneralizedImage
38from ..serialization import ArchiveTree, MetadataValue, ReadResult
40# We need an old-style TypeVar for Sphinx.
41T = TypeVar("T")
44class TemporaryButler:
45 """Make a temporary butler repository.
47 Parameters
48 ----------
49 run
50 Name of a `~lsst.daf.butler.CollectionType.RUN` collection to
51 register and use as the default run for the returned butler.
52 **kwargs
53 A mapping from a dataset type name to its storage class. For each
54 entry, a dataset type will be registered with empty dimensions, and a
55 `~lsst.daf.butler.DatasetRef` will be created and added as an
56 attribute of this class.
58 Raises
59 ------
60 unittest.SkipTest
61 Raised when the context manager is entered if `lsst.daf.butler` could
62 not be imported. This is typically handled by using this context
63 manager within a `unittest.TestCase.subTest` context, which will skip
64 just the butler-required tests in that context while allowing the rest
65 of the test to continue.
66 """
68 def __init__(self, run: str = "test_run", **kwargs: str):
69 self.run = run
70 self._kwargs = kwargs
71 self._exit_stack = ExitStack()
73 def __enter__(self) -> TemporaryButler:
74 if not HAVE_BUTLER:
75 raise unittest.SkipTest("lsst.daf.butler could not be imported.")
76 self._exit_stack.__enter__()
77 root = self._exit_stack.enter_context(
78 tempfile.TemporaryDirectory(ignore_cleanup_errors=True, delete=True)
79 )
80 butler_config = Butler.makeRepo(root)
81 self.butler = self._exit_stack.enter_context(Butler.from_config(butler_config, run=self.run))
82 empty_data_id = DataCoordinate.make_empty(self.butler.dimensions)
83 for name, storage_class in self._kwargs.items():
84 dataset_type = DatasetType(name, self.butler.dimensions.empty, storage_class)
85 try:
86 self.butler.registry.registerDatasetType(dataset_type)
87 except KeyError as err:
88 err.add_note(
89 "Storage class not configured in butler defaults. "
90 "A newer version of daf_butler may be needed."
91 )
92 raise
93 setattr(self, name, DatasetRef(dataset_type, empty_data_id, self.run))
94 return self
96 def __exit__(self, *args: Any) -> bool | None:
97 return self._exit_stack.__exit__(*args)
99 # Just for typing, since this class uses dynamic attributes.
100 def __getattr__(self, name: str) -> DatasetRef:
101 raise AttributeError(name)
104class RoundtripBase[T](ABC):
105 """A context manager for testing serialization.
107 Parameters
108 ----------
109 tc
110 A test case object to used for internal checks.
111 original
112 The object to serialize.
113 storage_class
114 A butler storage class name to use. If not provided (or
115 `lsst.daf.butler` cannot be imported), the roundtrip will just use
116 a direct write to a temporary file.
117 format
118 Archive/file format to use when not using a butler (ignored when
119 using a butler).
121 Notes
122 -----
123 When entered, this context manager writes the object and reads it back in
124 to the ``result`` attribute. When exited, any temporary files or
125 directories are deleted, but the ``result`` attribute is still usable.
126 In between the `inspect` and `get` methods can be used to perform other
127 tests.
129 This helper internally tests that butler provenance and metadata are saved
130 with any `.GeneralizedImage` object.
131 """
133 def __init__(
134 self,
135 tc: unittest.TestCase,
136 original: T,
137 storage_class: str | None = None,
138 ):
139 self._original = original
140 self._storage_class = storage_class
141 self._serialized: Any = None
142 self._exit_stack = ExitStack()
143 self._filename: str | None = None
144 self._tc = tc
145 self.result: Any
146 self.butler: Butler | None = None
147 self.ref: DatasetRef | None = None
148 self._test_metadata: dict[str, MetadataValue] = {
149 "roundtrip_test_1": 1,
150 "roundtrip_test_2": 2.5,
151 "roundtrip_test_3": "three",
152 "roundtrip_test_4": True,
153 "roundtrip_test_5": None,
154 }
156 def __enter__(self) -> Self:
157 self._exit_stack.__enter__()
158 if isinstance(self._original, GeneralizedImage):
159 self._original.metadata.update(self._test_metadata)
160 if HAVE_BUTLER and self._storage_class is not None:
161 self._run_with_butler()
162 else:
163 self._run_without_butler()
164 if isinstance(self._original, GeneralizedImage):
165 assert isinstance(self.result, GeneralizedImage)
166 for k in self._test_metadata:
167 self._tc.assertEqual(self.result.metadata[k], self._test_metadata[k])
168 del self._original.metadata[k]
169 del self.result.metadata[k]
170 return self
172 def __exit__(self, *args: Any) -> bool | None:
173 return self._exit_stack.__exit__(*args)
175 @property
176 def filename(self) -> str:
177 """The name of the file the object was written to."""
178 if self._filename is None:
179 assert self.butler is not None and self.ref is not None
180 self._filename = self.butler.getURI(self.ref).ospath
181 return self._filename
183 @property
184 def serialized(self) -> Any:
185 """The serialization model for this object
186 (`.serialization.ArchiveTree`).
187 """
188 if self._serialized is None:
189 # The butler code path doesn't give us a way to inspect the
190 # serialized model, so we have to save it again directly to another
191 # file (which we then discard).
192 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp:
193 tmp.close()
194 self._serialized = fits.write(self._original, tmp.name)
195 return self._serialized
197 def get(self, component: str | None = None, storageClass: str | None = None, **kwargs: Any) -> Any:
198 """Perform a partial read.
200 Parameters
201 ----------
202 component
203 Component to read instead of the main object. This requires the
204 roundtrip to use a butler, raising `unittest.SkipTest` otherwise;
205 this generally means these tests should be nested within a
206 `~unittest.TestCase.subTest` context.
207 storageClass
208 Override storage class name to affect the type returned by
209 the get. Only used if a butler is active.
210 **kwargs
211 Keyword arguments either passed directly to `.fits.read` or used
212 as ``parameters`` for a `~lsst.daf.butler.Butler.get`.
214 Return
215 ------
216 object
217 Result of the partial read.
218 """
219 if self.butler is None:
220 if component is not None:
221 raise unittest.SkipTest("Cannot test component reads without a butler.")
222 if storageClass is not None:
223 raise unittest.SkipTest("Cannot test storage class override without a butler")
224 result = fits.read(type(self._original), self.filename, **kwargs).deserialized
225 else:
226 assert self.ref is not None, "butler and ref should be None or not together"
227 ref = self.ref
228 if component is not None:
229 ref = ref.makeComponentRef(component)
230 result = self.butler.get(ref, parameters=kwargs, storageClass=storageClass)
231 if isinstance(result, GeneralizedImage):
232 # The metadata the RoundtripFits object added for the test may or
233 # may not be present; strip it if it does so comparisons to the
234 # original are not messed up.
235 for k in self._test_metadata:
236 result.metadata.pop(k, None)
237 return result
239 def _run_with_butler(self) -> None:
240 assert self._storage_class is not None, "Should not use butler if no storage class"
241 butler_helper = self._exit_stack.enter_context(TemporaryButler(test_dataset=self._storage_class))
242 self.butler = butler_helper.butler
243 quantum_id = uuid.uuid4()
244 self.ref = self.butler.put(
245 self._original, butler_helper.test_dataset, provenance=DatasetProvenance(quantum_id=quantum_id)
246 )
247 self.result = self.butler.get(self.ref)
248 if isinstance(self._original, GeneralizedImage):
249 self._tc.assertEqual(
250 DatasetRef.from_simple(self.result.butler_dataset, universe=self.butler.dimensions), self.ref
251 )
252 self._tc.assertEqual(self.result.butler_provenance.quantum_id, quantum_id)
253 self._tc.assertTrue(self.filename.endswith(self._get_extension()))
255 def _run_without_butler(self) -> None:
256 tmp = self._exit_stack.enter_context(
257 tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True)
258 )
259 tmp.close()
260 self._filename = tmp.name
261 self._serialized = self._write(self._original, tmp.name)
262 read_result = self._read(type(self._original), tmp.name)
263 self._tc.assertIsNone(read_result.butler_info)
264 self.result = read_result.deserialized
266 @abstractmethod
267 def _get_extension(self) -> str:
268 raise NotImplementedError()
270 @abstractmethod
271 def _write(self, obj: Any, filename: str) -> ArchiveTree:
272 raise NotImplementedError()
274 @abstractmethod
275 def _read(self, obj_type: Any, filename: str) -> ReadResult:
276 raise NotImplementedError()
279class RoundtripFits[T](RoundtripBase[T]):
280 def inspect(self) -> astropy.io.fits.HDUList:
281 """Open the FITS file with Astropy."""
282 return self._exit_stack.enter_context(
283 astropy.io.fits.open(self.filename, disable_image_compression=True)
284 )
286 def _get_extension(self) -> str:
287 return ".fits"
289 def _write(self, obj: Any, filename: str) -> ArchiveTree:
290 return fits.write(obj, filename)
292 def _read(self, obj_type: Any, filename: str) -> ReadResult:
293 return fits.read(obj_type, filename)
296class RoundtripJson[T](RoundtripBase[T]):
297 def inspect(self) -> dict[str, Any]:
298 """Read the JSON file as a dictionary."""
299 with open(self.filename, "rb") as stream:
300 return from_json(stream.read())
302 def _get_extension(self) -> str:
303 return ".json"
305 def _write(self, obj: Any, filename: str) -> ArchiveTree:
306 return json.write(obj, filename)
308 def _read(self, obj_type: Any, filename: str) -> ReadResult:
309 return json.read(obj_type, filename)
312class RoundtripNdf[T](RoundtripBase[T]):
313 def inspect(self) -> h5py.File:
314 """Open the NDF file with h5py."""
315 import h5py
317 return self._exit_stack.enter_context(h5py.File(self.filename, "r"))
319 def _get_extension(self) -> str:
320 return ".sdf"
322 def _write(self, obj: Any, filename: str) -> ArchiveTree:
323 from .. import ndf
325 return ndf.write(obj, filename)
327 def _read(self, obj_type: Any, filename: str) -> ReadResult:
328 from .. import ndf
330 return ndf.read(obj_type, filename)