Coverage for python / lsst / images / fits / _common.py: 37%

194 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-23 01:30 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12 

13from __future__ import annotations 

14 

15__all__ = ( 

16 "FITS_SOURCE_REGEX", 

17 "JSON_COLUMN", 

18 "JSON_EXTNAME", 

19 "ExtensionHDU", 

20 "ExtensionKey", 

21 "FitsCompressionAlgorithm", 

22 "FitsCompressionOptions", 

23 "FitsDitherAlgorithm", 

24 "FitsOpaqueMetadata", 

25 "FitsQuantizationOptions", 

26 "InvalidFitsArchiveError", 

27 "PointerModel", 

28 "PrecompressedImage", 

29 "add_offset_wcs", 

30 "strip_butler_cards", 

31 "strip_legacy_exposure_cards", 

32 "strip_wcs_cards", 

33) 

34 

35import dataclasses 

36import enum 

37import itertools 

38import re 

39import string 

40from typing import Any, ClassVar, Self, final 

41 

42import astropy.io.fits 

43import numpy as np 

44import pydantic 

45 

46from .._geom import Box 

47from ..serialization import ArchiveReadError, OpaqueArchiveMetadata, TableColumnModel 

48 

49type ExtensionHDU = astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU 

50 

51FITS_SOURCE_REGEX = re.compile(r"fits:(?P<extname>[\w/\-]+)(,(?P<extver>\d+))?(\[\d+\])?") 

52 

53JSON_EXTNAME: str = "JSON" 

54JSON_COLUMN: str = "JSON" 

55 

56 

57@dataclasses.dataclass(frozen=True) 

58class ExtensionKey: 

59 """The identifiers for a single FITS extension HDU within a file.""" 

60 

61 name: str = "" 

62 """Value of the EXTNAME keyword, or an empty string for the primary HDU.""" 

63 

64 ver: int = 1 

65 """Value of the EXTVER keyword (which may be absent if it is ``1``).""" 

66 

67 @classmethod 

68 def from_index_row(cls, row: np.ndarray) -> ExtensionKey: 

69 """Construct from a row of the index binary table appended to the 

70 FITS files written by the package. 

71 """ 

72 return cls(str(row["EXTNAME"]).upper(), int(row["EXTVER"])) 

73 

74 @classmethod 

75 def from_str(cls, source: str) -> ExtensionKey: 

76 """Construct from the `str` coercion of this type, which is used 

77 as the 'source' field in various Pydantic models that serve as 

78 references to other HDUs. 

79 """ 

80 if (m := FITS_SOURCE_REGEX.fullmatch(source)) is None: 

81 raise ArchiveReadError(f"Bad 'source' string for FITS: {source!r}.") 

82 ver = 1 

83 if m.group("extver") is not None: 

84 ver = int(m.group("extver")) 

85 return cls(m.group("extname"), ver) 

86 

87 def check(self) -> None: 

88 if not FITS_SOURCE_REGEX.match(str(self)): 

89 raise ValueError( 

90 f"Invalid source key: '{str(self)}'; name characters must be alphanumeric, '-', '_', or '/'." 

91 ) 

92 

93 def __str__(self) -> str: 

94 if self.ver > 1: 

95 return f"fits:{self.name},{self.ver}" 

96 else: 

97 return f"fits:{self.name}" 

98 

99 

100class PointerModel(pydantic.BaseModel): 

101 column: TableColumnModel = pydantic.Field(description="Table column for this cell this pointer targets.") 

102 row: int = pydantic.Field(description="Zero-indexed row for cell this pointer targets.") 

103 

104 

105class InvalidFitsArchiveError(RuntimeError): 

106 """The error type raised when the content of a FITS file presumed to have 

107 been written by FitsOutputArchive is not self-consistent. 

108 """ 

109 

110 

111class FitsCompressionAlgorithm(enum.StrEnum): 

112 """FITS compression algorithms supported by this package. 

113 

114 See the FITS standard for definitions. 

115 """ 

116 

117 GZIP_1 = "GZIP_1" 

118 GZIP_2 = "GZIP_2" 

119 RICE_1 = "RICE_1" 

120 

121 

122class FitsDitherAlgorithm(enum.StrEnum): 

123 """FITS quantization dither algorithms supported by this package. 

124 

125 See the FITS standard for definitions. 

126 """ 

127 

128 NO_DITHER = "NO_DITHER" 

129 SUBTRACTIVE_DITHER_1 = "SUBTRACTIVE_DITHER_1" 

130 SUBTRACTIVE_DITHER_2 = "SUBTRACTIVE_DITHER_2" 

131 

132 def to_astropy_quantize_method(self) -> int: 

133 """Convert to the integer code used by Astropy.""" 

134 match self: 

135 case self.NO_DITHER: 

136 return -1 

137 case self.SUBTRACTIVE_DITHER_1: 

138 return 1 

139 case self.SUBTRACTIVE_DITHER_2: 

140 return 2 

141 raise AssertionError("Invalid enum value.") 

142 

143 

144class FitsQuantizationOptions(pydantic.BaseModel, frozen=True): 

145 """Quantization options for FITS compression.""" 

146 

147 dither: FitsDitherAlgorithm 

148 """How to add random noise during quantization to reduce biases.""" 

149 

150 level: float 

151 """Quantization level. 

152 

153 When positive, this is the fraction of the measured standard deviation that 

154 corresponds to an integer step. When negative, it is ``-ZSCALE``, the 

155 scaling to apply directly to the original pixels before quantization. 

156 """ 

157 

158 seed: int | None = None 

159 """Random number seed to use for dithering. 

160 

161 Values between 1 and 10000 (inclusive) are used directly. ``0`` will 

162 generate a value from the current time, and ``-1`` will generate a value 

163 from the checksum of the image. 

164 

165 If `None`, the ``compression_seed`` parameter must be passed to 

166 `FitsOutputArchive.open` if any quantized compression is configured. 

167 """ 

168 

169 

170class FitsCompressionOptions(pydantic.BaseModel, frozen=True): 

171 """Configuration options for FITS compression.""" 

172 

173 algorithm: FitsCompressionAlgorithm = FitsCompressionAlgorithm.GZIP_2 

174 """Compression algorithm to use.""" 

175 

176 tile_shape: tuple[int, ...] | None = None 

177 """Shape ``(..., y, x)`` of independently compressed tiles. 

178 

179 The default of `None` compresses each row separately. 

180 """ 

181 

182 quantization: FitsQuantizationOptions | None = None 

183 """Quantization to apply before compression, if any.""" 

184 

185 DEFAULT: ClassVar[FitsCompressionOptions | None] 

186 """Default compression options (lossless ``GZIP_2``).""" 

187 

188 LOSSY: ClassVar[FitsCompressionOptions] 

189 """Default lossy compression options.""" 

190 

191 def make_hdu(self, data: np.ndarray, name: str) -> astropy.io.fits.CompImageHDU: 

192 """Make an `astropy.io.fits.CompImageHDU` object from these options.""" 

193 if self.quantization is not None: 

194 return astropy.io.fits.CompImageHDU( 

195 data, 

196 name=name, 

197 compression_type=self.algorithm.value, 

198 tile_shape=self.tile_shape, 

199 quantize_method=self.quantization.dither.to_astropy_quantize_method(), 

200 quantize_level=self.quantization.level, 

201 dither_seed=self.quantization.seed, 

202 ) 

203 else: 

204 return astropy.io.fits.CompImageHDU( 

205 data, 

206 name=name, 

207 compression_type=self.algorithm.value, 

208 tile_shape=self.tile_shape, 

209 quantize_level=0.0, 

210 ) 

211 

212 

213FitsCompressionOptions.DEFAULT = FitsCompressionOptions() 

214FitsCompressionOptions.LOSSY = FitsCompressionOptions( 

215 algorithm=FitsCompressionAlgorithm.RICE_1, 

216 quantization=FitsQuantizationOptions(dither=FitsDitherAlgorithm.SUBTRACTIVE_DITHER_2, level=16.0), 

217) 

218 

219 

220_COMPRESSION_KEYS = frozenset( 

221 ( 

222 "ZIMAGE", 

223 "ZCMPTYPE", 

224 "ZBITPIX", 

225 "ZNAXIS", 

226 "ZMASKCMP", 

227 "ZQUANTIZ", 

228 "ZDITHER0", 

229 "ZCHECKSUM", 

230 "ZDATASUM", 

231 ) 

232) 

233_COMPRESSION_PREFIX_KEYS = ("ZNAXIS", "ZTILE", "ZNAME", "ZVAL") 

234 

235 

236@dataclasses.dataclass 

237class PrecompressedImage: 

238 """Already-compressed FITS HDUs that are attached to high-level objects 

239 via `FitsOpaqueMetadata`, allowing lossy-compressed pixel values to be 

240 round-tripped exactly. 

241 """ 

242 

243 header: astropy.io.fits.Header 

244 """Header for the HDU. 

245 

246 This contains only FITS tile-compression keywords. 

247 """ 

248 

249 data: astropy.io.fits.FITS_rec 

250 """FITS binary table data that serves as the low-level representation of a 

251 tile-compressed image HDU. 

252 """ 

253 

254 @classmethod 

255 def from_bintable(cls, hdu: astropy.io.fits.BinTableHDU) -> Self: 

256 """Construct from a binary table HDU. 

257 

258 Parameters 

259 ---------- 

260 hdu 

261 Binary table HDU, typically read from a FITS file opened with 

262 ``disable_image_compression=True``. 

263 

264 Returns 

265 ------- 

266 PrecompressedImage 

267 A `PrecompressedImage` instance. 

268 """ 

269 header = astropy.io.fits.Header( 

270 [ 

271 card 

272 for card in hdu.header.cards 

273 if card.keyword in _COMPRESSION_KEYS 

274 or any(card.keyword.startswith(k) for k in _COMPRESSION_PREFIX_KEYS) 

275 ] 

276 ) 

277 # This is an opportunity to fix CFITSIO's non-standard writing of the 

278 # old RICE_ONE value instead of RICE_1. 

279 if header["ZCMPTYPE"] == "RICE_ONE": 

280 header["ZCMPTYPE"] = "RICE_1" 

281 return cls(header=header, data=hdu.data) 

282 

283 

284@final 

285@dataclasses.dataclass 

286class FitsOpaqueMetadata(OpaqueArchiveMetadata): 

287 """Opaque metadata that may be carried around by a serializable type to 

288 propagate serialization options and opaque information without that type 

289 knowing how it was serialized. 

290 """ 

291 

292 headers: dict[ExtensionKey, astropy.io.fits.Header] = dataclasses.field(default_factory=dict) 

293 """FITS headers found (but not interpreted and stripped) when reading, to 

294 be propagated on write. 

295 

296 Keys are EXTNAME/EXTVER combinations, or ("", 1) for the primary header. 

297 Header information in opaque metadata is considered immutable, allowing it 

298 to be transferred by reference to copies and subsets of the object it is 

299 attached to. 

300 """ 

301 

302 precompressed: dict[str, PrecompressedImage] = dataclasses.field(default_factory=dict) 

303 """FITS tile-compressed HDUs that should be written out directly instead 

304 of the in-memory data provided. 

305 

306 Keys are EXTNAME values, which must be unique (no EXTVER disambiguation). 

307 Precompressed pixel values are never copied or transferred to subsets. 

308 """ 

309 

310 def add_header( 

311 self, 

312 header: astropy.io.fits.Header, 

313 name: str | None = None, 

314 ver: int | None = None, 

315 key: ExtensionKey | None = None, 

316 ) -> None: 

317 """Add a header to the opaque metadata if it is not already present, 

318 and strip EXTNAME and EXTVER if present. 

319 

320 Parameters 

321 ---------- 

322 header 

323 Header to add. May be modified in place. 

324 name 

325 EXTNAME (all caps). If not provided, the EXTNAME card must be 

326 present in the header. Use ``""`` for the primary header. 

327 ver 

328 EXTVER. If not provided and the EXTVER card is not present, 

329 defaults to 1. 

330 key 

331 Combination of EXTNAME and EXTVER; used instead of both ``name`` 

332 and ``ver`` if provided. 

333 """ 

334 if key is None: 

335 if name is None: 

336 name = header["EXTNAME"] 

337 if ver is None: 

338 ver = header.get("EXTVER", 1) 

339 key = ExtensionKey(name, ver) 

340 strip_butler_cards(header) 

341 if key not in self.headers: 

342 header.remove("EXTNAME", ignore_missing=True) 

343 header.remove("EXTVER", ignore_missing=True) 

344 self.headers[key] = header 

345 

346 def maybe_use_precompressed(self, name: str) -> astropy.io.fits.BinTableHDU | None: 

347 """Look up the given EXTNAME to see if there is a tile compressed image 

348 HDU that should be used directly, instead of requantizing. 

349 

350 Parameters 

351 ---------- 

352 name 

353 EXTNAME (all caps). 

354 

355 Returns 

356 ------- 

357 `astropy.io.fits.BinTableHDU` | `None` 

358 An already-compressed HDU, in binary table form, or `None` if there 

359 is no precompressed HDU for this EXTNAME. 

360 """ 

361 if (precompressed := self.precompressed.get(name)) is None: 

362 return None 

363 return astropy.io.fits.BinTableHDU(precompressed.data, header=precompressed.header.copy(), name=name) 

364 

365 def extract_legacy_primary_header(self, header: astropy.io.fits.Header) -> dict[str, Any]: 

366 """Update the opaque metadata with the header of the primary HDU 

367 of a legacy (`lsst.afw.image`) FITS file, stripping cards we know we 

368 don't need and extracting any ``LSST IMAGES ...`` cards into a 

369 dictionary we return. 

370 """ 

371 primary_header = header.copy(strip=True) 

372 # No idea what these spare TAN-SIP headers are doing in the afw 

373 # FITS files, but we'll strip them here: 

374 primary_header.remove("A_ORDER", ignore_missing=True) 

375 primary_header.remove("B_ORDER", ignore_missing=True) 

376 strip_legacy_exposure_cards(primary_header) 

377 strip_butler_cards(primary_header) 

378 metadata: dict[str, Any] = {} 

379 for n in itertools.count(): 

380 if (key := header.pop(f"LSST IMAGES KEY {n + 1}", ...)) is ...: 

381 break 

382 value = header.pop(f"LSST IMAGES VALUE {n + 1}") 

383 metadata[key] = value 

384 self.headers[ExtensionKey()] = primary_header 

385 return metadata 

386 

387 def copy(self) -> FitsOpaqueMetadata: 

388 # Docstring inherited. 

389 return FitsOpaqueMetadata(headers=self.headers) 

390 

391 def subset(self, bbox: Box) -> FitsOpaqueMetadata: 

392 # Docstring inherited. 

393 return FitsOpaqueMetadata(headers=self.headers) 

394 

395 def get_instrumental_unit(self) -> astropy.units.UnitBase | None: 

396 """Extract the ``LSST ISR UNIT`` key from the primary header (if it 

397 exists) and wrap it with Astropy. 

398 """ 

399 if (primary_header := self.headers.get(ExtensionKey())) is not None: 

400 if (instrumental_unit_str := primary_header.get("LSST ISR UNITS")) is not None: 

401 return astropy.units.Unit(instrumental_unit_str) 

402 return None 

403 

404 

405def add_offset_wcs(header: astropy.io.fits.Header, *, x: int | float, y: int | float, key: str = "A") -> None: 

406 """Add a trivial FITS WCS to a header that applies the appropriate offset 

407 to map FITS array coordinates to a logical pixel grid. 

408 

409 Parameters 

410 ---------- 

411 header 

412 Header to update in-place. 

413 x 

414 Logical coordinate of the first column. 

415 y 

416 Logical coordinate of the first row. 

417 key 

418 Single-character suffix for this WCS. 

419 """ 

420 header.set(f"CTYPE1{key}", "LINEAR") 

421 header.set(f"CTYPE2{key}", "LINEAR") 

422 header.set(f"CRPIX1{key}", 1.0) 

423 header.set(f"CRPIX2{key}", 1.0) 

424 header.set(f"CRVAL1{key}", float(x)) 

425 header.set(f"CRVAL2{key}", float(y)) 

426 header.set(f"CUNIT1{key}", "PIXEL") 

427 header.set(f"CUNIT2{key}", "PIXEL") 

428 

429 

430_WCS_VECTOR_KEYS = ("CUNIT", "CRPIX", "CRPIX", "CRVAL", "CRDELT", "CROTA", "CRDER", "CSYER", "CDELT") 

431_WCS_MATRIX_KEYS = ("CD{0}_{1}", "PC{0}_{1}") 

432 

433 

434def strip_wcs_cards(header: astropy.io.fits.Header) -> None: 

435 """Strip WCS cards from a FITS header. 

436 

437 This does *not* attempt to cover all possible FITS WCS forms; it focuses on 

438 the ones we actually plan to write (simple undistorted ones + TAN-SIP). 

439 """ 

440 wcsaxes = header.pop("WCSAXES", 2) 

441 for wcsname in [""] + list(string.ascii_uppercase): 

442 header.remove("RADESYS" + wcsname, ignore_missing=True) 

443 if "CTYPE1" + wcsname in header: 

444 ctype: str = "" # just for linters that can't figure out that the loop always executes 

445 for n in range(wcsaxes): 

446 suffix = f"{n + 1}{wcsname}" 

447 ctype = header.pop("CTYPE" + suffix) 

448 for key in _WCS_VECTOR_KEYS: 

449 header.remove(key + suffix, ignore_missing=True) 

450 for m in range(wcsaxes): 

451 for tmpl in _WCS_MATRIX_KEYS: 

452 header.remove(tmpl.format(m + 1, suffix), ignore_missing=True) 

453 if ctype.endswith("-SIP"): 

454 _strip_sip_poly(header, wcsname, "A") 

455 _strip_sip_poly(header, wcsname, "B") 

456 _strip_sip_poly(header, wcsname, "AP") 

457 _strip_sip_poly(header, wcsname, "BP") 

458 header.remove("LONPOLE", ignore_missing=True) 

459 header.remove("LATPOLE", ignore_missing=True) 

460 header.remove("MJDREF", ignore_missing=True) 

461 header.remove("PV1_3", ignore_missing=True) 

462 header.remove("PV1_4", ignore_missing=True) 

463 

464 

465def _strip_sip_poly(header: astropy.io.fits.Header, wcsname: str, which: str) -> None: 

466 order: int | None = header.pop(f"{which}_ORDER{wcsname}", None) 

467 if order is not None: 

468 for i, j in itertools.product(range(order + 1), range(order + 1)): 

469 header.remove(f"{which}_{i}_{j}{wcsname}", ignore_missing=True) 

470 

471 

472def strip_legacy_exposure_cards(header: astropy.io.fits.Header) -> None: 

473 """Strip header keywords added by lsst.afw.image.Exposure.""" 

474 header.remove("AR_HDU", ignore_missing=True) 

475 for name in ( 

476 "FILTER", 

477 "DETECTOR", 

478 "VALID_POLYGON", 

479 "SKYWCS", 

480 "PSF", 

481 "SUMMARYSTATS", 

482 "AP_CORR_MAP", 

483 "PHOTOCALIB", 

484 ): 

485 header.remove(f"{name}_ID", ignore_missing=True) 

486 header.remove(f"ARCHIVE_ID_{name}", ignore_missing=True) 

487 

488 

489def strip_butler_cards(header: astropy.io.fits.Header) -> None: 

490 """Strip header keywords added by butler provenance that would be 

491 incorrect if propagated to a downstream file. 

492 """ 

493 for key in list(header): 

494 if key.startswith("LSST BUTLER"): 

495 del header[key]