Coverage for python / lsst / images / formatters.py: 0%

103 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-27 01:31 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Unified butler formatter for lsst.images. 

13 

14This formatter dispatches on a write-time ``format`` parameter and on the 

15file extension at read time, replacing the three per-format 

16(`lsst.images.fits.formatters`, `lsst.images.json.formatters`, 

17`lsst.images.ndf.formatters`) hierarchies that previously duplicated almost 

18all of their logic. 

19""" 

20 

21from __future__ import annotations 

22 

23__all__ = ("GenericFormatter",) 

24 

25import hashlib 

26import json as _stdlib_json # disambiguates from .json subpackage 

27from collections.abc import Callable, Iterator 

28from contextlib import contextmanager 

29from typing import Any, ClassVar 

30 

31import astropy.io.fits 

32 

33from lsst.daf.butler import DatasetProvenance, FormatterV2 

34from lsst.resources import ResourcePath 

35 

36from . import fits as _fits 

37from . import json as _json 

38from .serialization import ArchiveTree, ButlerInfo, InputArchive, JsonRef 

39 

40 

41class GenericFormatter(FormatterV2): 

42 """Unified butler formatter for any lsst.images type. 

43 

44 The on-disk format is selected by the ``format`` write parameter 

45 (``fits``, ``json``, ``sdf``) at write time and by the file 

46 extension at read time. The default format is taken from 

47 ``self.default_extension`` (``.fits`` for the base class). 

48 

49 Notes 

50 ----- 

51 Subclasses (`ImageFormatter` and below) add component-level read 

52 support. This base class forwards any read parameters straight to 

53 the underlying ``read`` function. 

54 """ 

55 

56 default_extension: ClassVar[str] = ".fits" 

57 supported_extensions: ClassVar[frozenset[str]] = frozenset({".fits", ".sdf", ".json"}) 

58 supported_write_parameters: ClassVar[frozenset[str]] = frozenset({"format", "recipe"}) 

59 can_read_from_uri: ClassVar[bool] = True 

60 

61 butler_provenance: DatasetProvenance | None = None 

62 

63 # --- Write parameter handling ------------------------------------------- 

64 

65 def get_write_extension(self) -> str: 

66 default_fmt = self.default_extension.lstrip(".") 

67 fmt = self.write_parameters.get("format", default_fmt) 

68 ext = "." + fmt 

69 if ext not in self.supported_extensions: 

70 raise RuntimeError( 

71 f"Requested format {fmt!r} is not supported; expected one of {{fits, json, sdf}}." 

72 ) 

73 return ext 

74 

75 def _validate_write_parameters(self) -> None: 

76 ext = self.get_write_extension() 

77 if ext != ".fits" and "recipe" in self.write_parameters: 

78 raise RuntimeError("The 'recipe' write parameter is only valid for FITS output.") 

79 

80 # --- Write path --------------------------------------------------------- 

81 

82 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None: 

83 self._validate_write_parameters() 

84 ext = self.get_write_extension() 

85 butler_info = ButlerInfo( 

86 dataset=self.dataset_ref.to_simple(), 

87 provenance=self.butler_provenance if self.butler_provenance is not None else DatasetProvenance(), 

88 ) 

89 kwargs: dict[str, Any] = {"butler_info": butler_info} 

90 write_func: Callable[..., ArchiveTree] 

91 match ext: 

92 case ".fits": 

93 kwargs["update_header"] = self._update_header 

94 kwargs["compression_options"] = self._get_compression_options() 

95 kwargs["compression_seed"] = self._get_compression_seed() 

96 write_func = _fits.write 

97 case ".json": 

98 write_func = _json.write 

99 case ".sdf": 

100 from . import ndf as _ndf 

101 

102 write_func = _ndf.write 

103 write_func(in_memory_dataset, uri.ospath, **kwargs) 

104 

105 def add_provenance( 

106 self, 

107 in_memory_dataset: Any, 

108 /, 

109 *, 

110 provenance: DatasetProvenance | None = None, 

111 ) -> Any: 

112 # A FormatterV2 instance is used once; stash provenance on self 

113 # rather than mutating the dataset. 

114 self.butler_provenance = provenance 

115 return in_memory_dataset 

116 

117 # --- FITS-specific helpers (kept verbatim from fits/formatters.py) ---- 

118 

119 def _get_compression_seed(self) -> int: 

120 # Set the seed based on data ID (all logic here duplicated from 

121 # obs_base). We can't just use 'hash', since like 'set' that's not 

122 # deterministic. And we can't rely on a DimensionPacker because those 

123 # are only defined for certain combinations of dimensions. Doing an MD5 

124 # of the JSON feels like overkill but I don't really see anything much 

125 # simpler. 

126 hash_bytes = hashlib.md5( 

127 _stdlib_json.dumps(list(self.data_id.required_values)).encode(), 

128 usedforsecurity=False, 

129 ).digest() 

130 # And it *really* feels like overkill when we squash that into the [1, 

131 # 10000] range allowed by FITS. 

132 return 1 + int.from_bytes(hash_bytes) % 9999 

133 

134 def _get_compression_options(self) -> dict[str, _fits.FitsCompressionOptions]: 

135 recipe = self.write_parameters.get("recipe", "default") 

136 try: 

137 config = self.write_recipes[recipe] 

138 except KeyError: 

139 if recipe == "default": 

140 # If there's no default recipe just use the software defaults. 

141 return {} 

142 raise RuntimeError(f"Invalid recipe for GenericFormatter: {recipe!r}.") from None 

143 return {k: _fits.FitsCompressionOptions.model_validate(v) for k, v in config.items()} 

144 

145 def _update_header(self, header: astropy.io.fits.Header) -> None: 

146 # Logic here largely lifted from lsst.obs.base.utils, which we 

147 # can't use directly for dependency and maybe mapping-type 

148 # (PropertyList vs. astropy) reasons. We assume we can always add 

149 # long cards (astropy will CONTINUE them) but not comments 

150 # (astropy will truncate and warn on long cards). 

151 for key in list(header): 

152 if key.startswith("LSST BUTLER"): 

153 del header[key] 

154 if self.butler_provenance is not None: 

155 for key, value in self.butler_provenance.to_flat_dict( 

156 self.dataset_ref, 

157 prefix="HIERARCH LSST BUTLER", 

158 sep=" ", 

159 simple_types=True, 

160 max_inputs=3_000, 

161 ).items(): 

162 header.set(key, value) 

163 

164 # --- Read path --------------------------------------------------------- 

165 

166 def _extension_from_uri(self, uri: ResourcePath) -> str: 

167 ext = uri.getExtension() 

168 if ext not in self.supported_extensions: 

169 raise RuntimeError(f"Cannot read {uri}: unsupported extension {ext!r}.") 

170 return ext 

171 

172 @contextmanager 

173 def _open_archive_and_tree( 

174 self, uri: ResourcePath, partial: bool 

175 ) -> Iterator[tuple[InputArchive[Any], ArchiveTree]]: 

176 pytype: type[Any] = self.dataset_ref.datasetType.storageClass.pytype 

177 ext = self._extension_from_uri(uri) 

178 archive: InputArchive[Any] 

179 match ext: 

180 case ".fits": 

181 tree_type = pytype._get_archive_tree_type(_fits.PointerModel) 

182 with _fits.FitsInputArchive.open(uri, partial=partial) as archive: 

183 tree = archive.get_tree(tree_type) 

184 yield archive, tree 

185 case ".json": 

186 tree_type = pytype._get_archive_tree_type(JsonRef) 

187 tree = tree_type.model_validate_json(ResourcePath(uri).read()) 

188 archive = _json.JsonInputArchive(tree.indirect) 

189 yield archive, tree 

190 case ".sdf": 

191 from . import ndf as _ndf 

192 

193 tree_type = pytype._get_archive_tree_type(_ndf.NdfPointerModel) 

194 with _ndf.NdfInputArchive.open(uri) as archive: 

195 tree = archive.get_tree(tree_type) 

196 yield archive, tree 

197 

198 def read_from_uri( 

199 self, 

200 uri: ResourcePath, 

201 component: str | None = None, 

202 expected_size: int = -1, 

203 ) -> Any: 

204 kwargs = self.file_descriptor.parameters or {} 

205 with self._open_archive_and_tree(uri, partial=bool(kwargs or component)) as (archive, tree): 

206 if component is None: 

207 result = tree.deserialize(archive, **kwargs) 

208 result._opaque_metadata = archive.get_opaque_metadata() 

209 return result 

210 else: 

211 return tree.deserialize_component(component, archive, **kwargs)