Coverage for python / lsst / images / formatters.py: 24%

217 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-16 00:52 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Unified butler formatter for lsst.images. 

13 

14This formatter dispatches on a write-time ``format`` parameter and on the 

15file extension at read time, replacing the three per-format 

16(`lsst.images.fits.formatters`, `lsst.images.json.formatters`, 

17`lsst.images.ndf.formatters`) hierarchies that previously duplicated almost 

18all of their logic. 

19""" 

20 

21from __future__ import annotations 

22 

23__all__ = ( 

24 "CellCoaddFormatter", 

25 "ComponentSentinel", 

26 "GenericFormatter", 

27 "ImageFormatter", 

28 "MaskedImageFormatter", 

29 "VisitImageFormatter", 

30) 

31 

32import enum 

33import hashlib 

34import json as _stdlib_json # disambiguates from .json subpackage 

35from collections.abc import Callable 

36from dataclasses import dataclass 

37from typing import Any, ClassVar 

38 

39import astropy.io.fits 

40from astro_metadata_translator import ObservationInfo 

41 

42from lsst.daf.butler import DatasetProvenance, FormatterV2 

43from lsst.resources import ResourcePath 

44 

45from . import fits as _fits 

46from . import json as _json 

47from ._geom import Box 

48from ._masked_image import MaskedImageSerializationModel 

49from ._transforms import ProjectionSerializationModel 

50from ._visit_image import VisitImageSerializationModel 

51from .fits._common import FitsCompressionOptions 

52from .fits._common import PointerModel as _FitsPointerModel 

53from .fits._input_archive import FitsInputArchive as _FitsInputArchive 

54from .serialization import ButlerInfo 

55 

56try: 

57 from . import ndf as _ndf 

58 from .ndf._common import NdfPointerModel as _NdfPointerModel 

59 from .ndf._input_archive import NdfInputArchive as _NdfInputArchive 

60 

61 _HAVE_NDF = True 

62except ImportError: # h5py is optional; see ndf/__init__.py 

63 _ndf = None # type: ignore[assignment] 

64 _NdfPointerModel = None # type: ignore[assignment,misc] 

65 _NdfInputArchive = None # type: ignore[assignment,misc] 

66 _HAVE_NDF = False 

67 

68 

69@dataclass(frozen=True) 

70class _Backend: 

71 """One row of the extension-to-backend lookup table.""" 

72 

73 read: Callable[..., Any] 

74 write: Callable[..., Any] 

75 input_archive: type | None 

76 pointer_model: type | None 

77 

78 

79_BACKENDS: dict[str, _Backend] = { 

80 ".fits": _Backend( 

81 read=_fits.read, 

82 write=_fits.write, 

83 input_archive=_FitsInputArchive, 

84 pointer_model=_FitsPointerModel, 

85 ), 

86 ".json": _Backend( 

87 read=_json.read, 

88 write=_json.write, 

89 input_archive=None, 

90 pointer_model=None, 

91 ), 

92} 

93if _HAVE_NDF: 93 ↛ 102line 93 didn't jump to line 102 because the condition on line 93 was always true

94 _BACKENDS[".sdf"] = _Backend( 

95 read=_ndf.read, 

96 write=_ndf.write, 

97 input_archive=_NdfInputArchive, 

98 pointer_model=_NdfPointerModel, 

99 ) 

100 

101 

102class GenericFormatter(FormatterV2): 

103 """Unified butler formatter for any lsst.images type. 

104 

105 The on-disk format is selected by the ``format`` write parameter 

106 (``fits``, ``json``, ``sdf``) at write time and by the file 

107 extension at read time. The default format is taken from 

108 ``self.default_extension`` (``.fits`` for the base class). 

109 

110 Notes 

111 ----- 

112 Subclasses (`ImageFormatter` and below) add component-level read 

113 support. This base class forwards any read parameters straight to 

114 the underlying ``read`` function. 

115 """ 

116 

117 default_extension: ClassVar[str] = ".fits" 

118 supported_extensions: ClassVar[frozenset[str]] = frozenset({".fits", ".sdf", ".json"}) 

119 supported_write_parameters: ClassVar[frozenset[str]] = frozenset({"format", "recipe"}) 

120 can_read_from_uri: ClassVar[bool] = True 

121 

122 butler_provenance: DatasetProvenance | None = None 

123 

124 # --- Write parameter handling ------------------------------------------- 

125 

126 def get_write_extension(self) -> str: 

127 default_fmt = self.default_extension.lstrip(".") 

128 fmt = self.write_parameters.get("format", default_fmt) 

129 ext = "." + fmt 

130 if ext not in self.supported_extensions: 

131 raise RuntimeError( 

132 f"Requested format {fmt!r} is not supported; expected one of {{fits, json, sdf}}." 

133 ) 

134 return ext 

135 

136 def _validate_write_parameters(self) -> None: 

137 ext = self.get_write_extension() 

138 if ext != ".fits" and "recipe" in self.write_parameters: 

139 raise RuntimeError("The 'recipe' write parameter is only valid for FITS output.") 

140 

141 # --- Write path --------------------------------------------------------- 

142 

143 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None: 

144 self._validate_write_parameters() 

145 ext = self.get_write_extension() 

146 backend = _BACKENDS[ext] 

147 butler_info = ButlerInfo( 

148 dataset=self.dataset_ref.to_simple(), 

149 provenance=self.butler_provenance if self.butler_provenance is not None else DatasetProvenance(), 

150 ) 

151 kwargs: dict[str, Any] = {"butler_info": butler_info} 

152 if ext == ".fits": 

153 kwargs["update_header"] = self._update_header 

154 kwargs["compression_options"] = self._get_compression_options() 

155 kwargs["compression_seed"] = self._get_compression_seed() 

156 backend.write(in_memory_dataset, uri.ospath, **kwargs) 

157 

158 def add_provenance( 

159 self, 

160 in_memory_dataset: Any, 

161 /, 

162 *, 

163 provenance: DatasetProvenance | None = None, 

164 ) -> Any: 

165 # A FormatterV2 instance is used once; stash provenance on self 

166 # rather than mutating the dataset. 

167 self.butler_provenance = provenance 

168 return in_memory_dataset 

169 

170 # --- FITS-specific helpers (kept verbatim from fits/formatters.py) ---- 

171 

172 def _get_compression_seed(self) -> int: 

173 # Set the seed based on data ID (all logic here duplicated from 

174 # obs_base). We can't just use 'hash', since like 'set' that's not 

175 # deterministic. And we can't rely on a DimensionPacker because those 

176 # are only defined for certain combinations of dimensions. Doing an MD5 

177 # of the JSON feels like overkill but I don't really see anything much 

178 # simpler. 

179 hash_bytes = hashlib.md5( 

180 _stdlib_json.dumps(list(self.data_id.required_values)).encode(), 

181 usedforsecurity=False, 

182 ).digest() 

183 # And it *really* feels like overkill when we squash that into the [1, 

184 # 10000] range allowed by FITS. 

185 return 1 + int.from_bytes(hash_bytes) % 9999 

186 

187 def _get_compression_options(self) -> dict[str, FitsCompressionOptions]: 

188 recipe = self.write_parameters.get("recipe", "default") 

189 try: 

190 config = self.write_recipes[recipe] 

191 except KeyError: 

192 if recipe == "default": 

193 # If there's no default recipe just use the software defaults. 

194 return {} 

195 raise RuntimeError(f"Invalid recipe for GenericFormatter: {recipe!r}.") from None 

196 return {k: FitsCompressionOptions.model_validate(v) for k, v in config.items()} 

197 

198 def _update_header(self, header: astropy.io.fits.Header) -> None: 

199 # Logic here largely lifted from lsst.obs.base.utils, which we 

200 # can't use directly for dependency and maybe mapping-type 

201 # (PropertyList vs. astropy) reasons. We assume we can always add 

202 # long cards (astropy will CONTINUE them) but not comments 

203 # (astropy will truncate and warn on long cards). 

204 for key in list(header): 

205 if key.startswith("LSST BUTLER"): 

206 del header[key] 

207 if self.butler_provenance is not None: 

208 for key, value in self.butler_provenance.to_flat_dict( 

209 self.dataset_ref, 

210 prefix="HIERARCH LSST BUTLER", 

211 sep=" ", 

212 simple_types=True, 

213 max_inputs=3_000, 

214 ).items(): 

215 header.set(key, value) 

216 

217 # --- Read path --------------------------------------------------------- 

218 

219 def _extension_from_uri(self, uri: ResourcePath) -> str: 

220 ext = uri.getExtension() 

221 if ext not in self.supported_extensions: 

222 raise RuntimeError(f"Cannot read {uri}: unsupported extension {ext!r}.") 

223 return ext 

224 

225 def read_from_uri( 

226 self, 

227 uri: ResourcePath, 

228 component: str | None = None, 

229 expected_size: int = -1, 

230 ) -> Any: 

231 pytype = self.dataset_ref.datasetType.storageClass.pytype 

232 ext = self._extension_from_uri(uri) 

233 backend = _BACKENDS[ext] 

234 kwargs = self.file_descriptor.parameters or {} 

235 return backend.read(pytype, uri, **kwargs).deserialized 

236 

237 

238class ComponentSentinel(enum.Enum): 

239 """Special return values from `ImageFormatter.read_component`.""" 

240 

241 UNRECOGNIZED_COMPONENT = enum.auto() 

242 """Subclasses might still recognise this component.""" 

243 

244 INVALID_COMPONENT_MODEL = enum.auto() 

245 """Component name is known but the model attribute is missing or 

246 has the wrong type. 

247 """ 

248 

249 

250class ImageFormatter(GenericFormatter): 

251 """Adds component-level read support for image-like types. 

252 

253 Subclasses override `read_component` to handle additional components 

254 (image/mask/variance for MaskedImage; psf/summary_stats/etc. for 

255 VisitImage). 

256 """ 

257 

258 def read_from_uri( 

259 self, 

260 uri: ResourcePath, 

261 component: str | None = None, 

262 expected_size: int = -1, 

263 ) -> Any: 

264 pytype: Any = self.file_descriptor.storageClass.pytype 

265 ext = self._extension_from_uri(uri) 

266 backend = _BACKENDS[ext] 

267 if component is None: 

268 result = backend.read(pytype, uri, bbox=self.pop_bbox_from_parameters()).deserialized 

269 else: 

270 result = self._read_component_from_uri(component, uri) 

271 self.check_unhandled_parameters() 

272 return result 

273 

274 def _read_component_from_uri(self, component: str, uri: ResourcePath) -> Any: 

275 ext = self._extension_from_uri(uri) 

276 backend = _BACKENDS[ext] 

277 pytype: Any = self.file_descriptor.storageClass.pytype 

278 if ext == ".json": 

279 obj = backend.read(pytype, uri).deserialized 

280 try: 

281 return getattr(obj, component) 

282 except AttributeError as exc: 

283 raise NotImplementedError(f"Unrecognized component {component!r} for JSON read.") from exc 

284 # FITS/NDF archive path. backend.input_archive and pointer_model are 

285 # typed as `type | None` to allow the JSON row to opt out; here we 

286 # know they are populated. 

287 archive_cls: Any = backend.input_archive 

288 pointer_model: Any = backend.pointer_model 

289 assert archive_cls is not None 

290 assert pointer_model is not None 

291 # FitsInputArchive uses partial=True for component reads; NDF 

292 # has no such kwarg. 

293 open_kwargs = {"partial": True} if ext == ".fits" else {} 

294 with archive_cls.open(uri, **open_kwargs) as archive: 

295 tree_type = pytype._get_archive_tree_type(pointer_model) 

296 tree = archive.get_tree(tree_type) 

297 result = self.read_component(component, tree, archive) 

298 if result is ComponentSentinel.UNRECOGNIZED_COMPONENT: 

299 raise NotImplementedError(f"Unrecognized component {component!r} for {type(self).__name__}.") 

300 if result is ComponentSentinel.INVALID_COMPONENT_MODEL: 

301 raise NotImplementedError( 

302 f"Invalid serialization model for component {component!r} for {type(self).__name__}." 

303 ) 

304 return result 

305 

306 def pop_bbox_from_parameters(self) -> Box | None: 

307 parameters = self.file_descriptor.parameters or {} 

308 return parameters.pop("bbox", None) 

309 

310 def check_unhandled_parameters(self) -> None: 

311 parameters = self.file_descriptor.parameters 

312 if parameters: 

313 raise RuntimeError(f"Parameters {list(parameters.keys())} not recognized.") 

314 

315 def read_component(self, component: str, tree: Any, archive: Any) -> Any: 

316 match component: 

317 case "projection": 

318 if isinstance( 

319 p := getattr(tree, "projection", None), 

320 ProjectionSerializationModel, 

321 ): 

322 return p.deserialize(archive) 

323 return ComponentSentinel.INVALID_COMPONENT_MODEL 

324 case "bbox": 

325 if isinstance(bbox := getattr(tree, "bbox", None), Box): 

326 return bbox 

327 return ComponentSentinel.INVALID_COMPONENT_MODEL 

328 case "obs_info": 

329 if isinstance(oi := getattr(tree, "obs_info", None), ObservationInfo): 

330 return oi 

331 return ComponentSentinel.INVALID_COMPONENT_MODEL 

332 return ComponentSentinel.UNRECOGNIZED_COMPONENT 

333 

334 

335class MaskedImageFormatter(ImageFormatter): 

336 """Adds image/mask/variance component support.""" 

337 

338 def read_component(self, component: str, tree: Any, archive: Any) -> Any: 

339 match super().read_component(component, tree, archive): 

340 case ComponentSentinel(): 

341 pass 

342 case handled: 

343 return handled 

344 if not isinstance(tree, MaskedImageSerializationModel): 

345 return ComponentSentinel.INVALID_COMPONENT_MODEL 

346 match component: 

347 case "image": 

348 return tree.image.deserialize(archive, bbox=self.pop_bbox_from_parameters()) 

349 case "mask": 

350 return tree.mask.deserialize(archive, bbox=self.pop_bbox_from_parameters()) 

351 case "variance": 

352 return tree.variance.deserialize(archive, bbox=self.pop_bbox_from_parameters()) 

353 return ComponentSentinel.UNRECOGNIZED_COMPONENT 

354 

355 

356class VisitImageFormatter(MaskedImageFormatter): 

357 """Adds psf/summary_stats/detector/aperture_corrections.""" 

358 

359 def read_component(self, component: str, tree: Any, archive: Any) -> Any: 

360 match super().read_component(component, tree, archive): 

361 case ComponentSentinel(): 

362 pass 

363 case handled: 

364 return handled 

365 if not isinstance(tree, VisitImageSerializationModel): 

366 return ComponentSentinel.INVALID_COMPONENT_MODEL 

367 match component: 

368 case "psf": 

369 # The FITS path uses tree.psf.deserialize; the NDF tree 

370 # exposes deserialize_psf for the same effect. 

371 if hasattr(tree, "deserialize_psf"): 

372 return tree.deserialize_psf(archive) 

373 return tree.psf.deserialize(archive) 

374 case "summary_stats": 

375 return tree.summary_stats 

376 case "detector": 

377 if getattr(tree, "detector", None) is not None: 

378 return tree.detector.deserialize(archive) 

379 return ComponentSentinel.INVALID_COMPONENT_MODEL 

380 case "aperture_corrections": 

381 return tree.aperture_corrections.deserialize(archive) 

382 return ComponentSentinel.UNRECOGNIZED_COMPONENT 

383 

384 

385class CellCoaddFormatter(MaskedImageFormatter): 

386 """Adds CellCoadd-specific psf and provenance components.""" 

387 

388 def read_component(self, component: str, tree: Any, archive: Any) -> Any: 

389 from .cells import CellCoaddSerializationModel # avoid cycles 

390 

391 match super().read_component(component, tree, archive): 

392 case ComponentSentinel(): 

393 pass 

394 case handled: 

395 return handled 

396 if not isinstance(tree, CellCoaddSerializationModel): 

397 return ComponentSentinel.INVALID_COMPONENT_MODEL 

398 match component: 

399 case "psf": 

400 bbox = self.pop_bbox_from_parameters() 

401 return tree.deserialize_psf(archive, bbox=bbox) 

402 case "provenance": 

403 return tree.deserialize_provenance(archive) 

404 return ComponentSentinel.UNRECOGNIZED_COMPONENT