Coverage for python/lsst/images/tests/_roundtrip.py: 29%

158 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-27 08:29 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("RoundtripFits", "RoundtripJson", "RoundtripNdf", "TemporaryButler") 

15 

16import tempfile 

17import unittest 

18import uuid 

19from abc import ABC, abstractmethod 

20from contextlib import ExitStack 

21from typing import TYPE_CHECKING, Any, Self, TypeVar 

22 

23import astropy.io.fits 

24from pydantic_core import from_json 

25 

26if TYPE_CHECKING: 

27 import h5py 

28 

29try: 

30 from lsst.daf.butler import Butler, DataCoordinate, DatasetProvenance, DatasetRef, DatasetType 

31 

32 HAVE_BUTLER = True 

33except ImportError: 

34 HAVE_BUTLER = False 

35 

36from .. import fits, json 

37from .._generalized_image import GeneralizedImage 

38from ..serialization import ArchiveTree, MetadataValue, ReadResult 

39 

40# We need an old-style TypeVar for Sphinx. 

41T = TypeVar("T") 

42 

43 

44class TemporaryButler: 

45 """Make a temporary butler repository. 

46 

47 Parameters 

48 ---------- 

49 run 

50 Name of a `~lsst.daf.butler.CollectionType.RUN` collection to 

51 register and use as the default run for the returned butler. 

52 **kwargs 

53 A mapping from a dataset type name to its storage class. For each 

54 entry, a dataset type will be registered with empty dimensions, and a 

55 `~lsst.daf.butler.DatasetRef` will be created and added as an 

56 attribute of this class. 

57 

58 Raises 

59 ------ 

60 unittest.SkipTest 

61 Raised when the context manager is entered if `lsst.daf.butler` could 

62 not be imported. This is typically handled by using this context 

63 manager within a `unittest.TestCase.subTest` context, which will skip 

64 just the butler-required tests in that context while allowing the rest 

65 of the test to continue. 

66 """ 

67 

68 def __init__(self, run: str = "test_run", **kwargs: str): 

69 self.run = run 

70 self._kwargs = kwargs 

71 self._exit_stack = ExitStack() 

72 

73 def __enter__(self) -> TemporaryButler: 

74 if not HAVE_BUTLER: 

75 raise unittest.SkipTest("lsst.daf.butler could not be imported.") 

76 self._exit_stack.__enter__() 

77 root = self._exit_stack.enter_context( 

78 tempfile.TemporaryDirectory(ignore_cleanup_errors=True, delete=True) 

79 ) 

80 butler_config = Butler.makeRepo(root) 

81 self.butler = self._exit_stack.enter_context(Butler.from_config(butler_config, run=self.run)) 

82 empty_data_id = DataCoordinate.make_empty(self.butler.dimensions) 

83 for name, storage_class in self._kwargs.items(): 

84 dataset_type = DatasetType(name, self.butler.dimensions.empty, storage_class) 

85 try: 

86 self.butler.registry.registerDatasetType(dataset_type) 

87 except KeyError as err: 

88 err.add_note( 

89 "Storage class not configured in butler defaults. " 

90 "A newer version of daf_butler may be needed." 

91 ) 

92 raise 

93 setattr(self, name, DatasetRef(dataset_type, empty_data_id, self.run)) 

94 return self 

95 

96 def __exit__(self, *args: Any) -> bool | None: 

97 return self._exit_stack.__exit__(*args) 

98 

99 # Just for typing, since this class uses dynamic attributes. 

100 def __getattr__(self, name: str) -> DatasetRef: 

101 raise AttributeError(name) 

102 

103 

104class RoundtripBase[T](ABC): 

105 """A context manager for testing serialization. 

106 

107 Parameters 

108 ---------- 

109 tc 

110 A test case object to used for internal checks. 

111 original 

112 The object to serialize. 

113 storage_class 

114 A butler storage class name to use. If not provided (or 

115 `lsst.daf.butler` cannot be imported), the roundtrip will just use 

116 a direct write to a temporary file. 

117 format 

118 Archive/file format to use when not using a butler (ignored when 

119 using a butler). 

120 

121 Notes 

122 ----- 

123 When entered, this context manager writes the object and reads it back in 

124 to the ``result`` attribute. When exited, any temporary files or 

125 directories are deleted, but the ``result`` attribute is still usable. 

126 In between the `inspect` and `get` methods can be used to perform other 

127 tests. 

128 

129 This helper internally tests that butler provenance and metadata are saved 

130 with any `.GeneralizedImage` object. 

131 """ 

132 

133 def __init__( 

134 self, 

135 tc: unittest.TestCase, 

136 original: T, 

137 storage_class: str | None = None, 

138 ): 

139 self._original = original 

140 self._storage_class = storage_class 

141 self._serialized: Any = None 

142 self._exit_stack = ExitStack() 

143 self._filename: str | None = None 

144 self._tc = tc 

145 self.result: Any 

146 self.butler: Butler | None = None 

147 self.ref: DatasetRef | None = None 

148 self._test_metadata: dict[str, MetadataValue] = { 

149 "roundtrip_test_1": 1, 

150 "roundtrip_test_2": 2.5, 

151 "roundtrip_test_3": "three", 

152 "roundtrip_test_4": True, 

153 "roundtrip_test_5": None, 

154 } 

155 

156 def __enter__(self) -> Self: 

157 self._exit_stack.__enter__() 

158 if isinstance(self._original, GeneralizedImage): 

159 self._original.metadata.update(self._test_metadata) 

160 if HAVE_BUTLER and self._storage_class is not None: 

161 self._run_with_butler() 

162 else: 

163 self._run_without_butler() 

164 if isinstance(self._original, GeneralizedImage): 

165 assert isinstance(self.result, GeneralizedImage) 

166 for k in self._test_metadata: 

167 self._tc.assertEqual(self.result.metadata[k], self._test_metadata[k]) 

168 del self._original.metadata[k] 

169 del self.result.metadata[k] 

170 return self 

171 

172 def __exit__(self, *args: Any) -> bool | None: 

173 return self._exit_stack.__exit__(*args) 

174 

175 @property 

176 def filename(self) -> str: 

177 """The name of the file the object was written to.""" 

178 if self._filename is None: 

179 assert self.butler is not None and self.ref is not None 

180 self._filename = self.butler.getURI(self.ref).ospath 

181 return self._filename 

182 

183 @property 

184 def serialized(self) -> Any: 

185 """The serialization model for this object 

186 (`.serialization.ArchiveTree`). 

187 """ 

188 if self._serialized is None: 

189 # The butler code path doesn't give us a way to inspect the 

190 # serialized model, so we have to save it again directly to another 

191 # file (which we then discard). 

192 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp: 

193 tmp.close() 

194 self._serialized = fits.write(self._original, tmp.name) 

195 return self._serialized 

196 

197 def get(self, component: str | None = None, storageClass: str | None = None, **kwargs: Any) -> Any: 

198 """Perform a partial read. 

199 

200 Parameters 

201 ---------- 

202 component 

203 Component to read instead of the main object. This requires the 

204 roundtrip to use a butler, raising `unittest.SkipTest` otherwise; 

205 this generally means these tests should be nested within a 

206 `~unittest.TestCase.subTest` context. 

207 storageClass 

208 Override storage class name to affect the type returned by 

209 the get. Only used if a butler is active. 

210 **kwargs 

211 Keyword arguments either passed directly to `.fits.read` or used 

212 as ``parameters`` for a `~lsst.daf.butler.Butler.get`. 

213 

214 Return 

215 ------ 

216 object 

217 Result of the partial read. 

218 """ 

219 if self.butler is None: 

220 if component is not None: 

221 raise unittest.SkipTest("Cannot test component reads without a butler.") 

222 if storageClass is not None: 

223 raise unittest.SkipTest("Cannot test storage class override without a butler") 

224 result = fits.read(type(self._original), self.filename, **kwargs).deserialized 

225 else: 

226 assert self.ref is not None, "butler and ref should be None or not together" 

227 ref = self.ref 

228 if component is not None: 

229 ref = ref.makeComponentRef(component) 

230 result = self.butler.get(ref, parameters=kwargs, storageClass=storageClass) 

231 if isinstance(result, GeneralizedImage): 

232 # The metadata the RoundtripFits object added for the test may or 

233 # may not be present; strip it if it does so comparisons to the 

234 # original are not messed up. 

235 for k in self._test_metadata: 

236 result.metadata.pop(k, None) 

237 return result 

238 

239 def _run_with_butler(self) -> None: 

240 assert self._storage_class is not None, "Should not use butler if no storage class" 

241 butler_helper = self._exit_stack.enter_context(TemporaryButler(test_dataset=self._storage_class)) 

242 self.butler = butler_helper.butler 

243 quantum_id = uuid.uuid4() 

244 self.ref = self.butler.put( 

245 self._original, butler_helper.test_dataset, provenance=DatasetProvenance(quantum_id=quantum_id) 

246 ) 

247 self.result = self.butler.get(self.ref) 

248 if isinstance(self._original, GeneralizedImage): 

249 self._tc.assertEqual( 

250 DatasetRef.from_simple(self.result.butler_dataset, universe=self.butler.dimensions), self.ref 

251 ) 

252 self._tc.assertEqual(self.result.butler_provenance.quantum_id, quantum_id) 

253 self._tc.assertTrue(self.filename.endswith(self._get_extension())) 

254 

255 def _run_without_butler(self) -> None: 

256 tmp = self._exit_stack.enter_context( 

257 tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) 

258 ) 

259 tmp.close() 

260 self._filename = tmp.name 

261 self._serialized = self._write(self._original, tmp.name) 

262 read_result = self._read(type(self._original), tmp.name) 

263 self._tc.assertIsNone(read_result.butler_info) 

264 self.result = read_result.deserialized 

265 

266 @abstractmethod 

267 def _get_extension(self) -> str: 

268 raise NotImplementedError() 

269 

270 @abstractmethod 

271 def _write(self, obj: Any, filename: str) -> ArchiveTree: 

272 raise NotImplementedError() 

273 

274 @abstractmethod 

275 def _read(self, obj_type: Any, filename: str) -> ReadResult: 

276 raise NotImplementedError() 

277 

278 

279class RoundtripFits[T](RoundtripBase[T]): 

280 def inspect(self) -> astropy.io.fits.HDUList: 

281 """Open the FITS file with Astropy.""" 

282 return self._exit_stack.enter_context( 

283 astropy.io.fits.open(self.filename, disable_image_compression=True) 

284 ) 

285 

286 def _get_extension(self) -> str: 

287 return ".fits" 

288 

289 def _write(self, obj: Any, filename: str) -> ArchiveTree: 

290 return fits.write(obj, filename) 

291 

292 def _read(self, obj_type: Any, filename: str) -> ReadResult: 

293 return fits.read(obj_type, filename) 

294 

295 

296class RoundtripJson[T](RoundtripBase[T]): 

297 def inspect(self) -> dict[str, Any]: 

298 """Read the JSON file as a dictionary.""" 

299 with open(self.filename, "rb") as stream: 

300 return from_json(stream.read()) 

301 

302 def _get_extension(self) -> str: 

303 return ".json" 

304 

305 def _write(self, obj: Any, filename: str) -> ArchiveTree: 

306 return json.write(obj, filename) 

307 

308 def _read(self, obj_type: Any, filename: str) -> ReadResult: 

309 return json.read(obj_type, filename) 

310 

311 

312class RoundtripNdf[T](RoundtripBase[T]): 

313 def inspect(self) -> h5py.File: 

314 """Open the NDF file with h5py.""" 

315 import h5py 

316 

317 return self._exit_stack.enter_context(h5py.File(self.filename, "r")) 

318 

319 def _get_extension(self) -> str: 

320 return ".sdf" 

321 

322 def _write(self, obj: Any, filename: str) -> ArchiveTree: 

323 from .. import ndf 

324 

325 return ndf.write(obj, filename) 

326 

327 def _read(self, obj_type: Any, filename: str) -> ReadResult: 

328 from .. import ndf 

329 

330 return ndf.read(obj_type, filename)