Coverage for python/lsst/images/fits/_common.py: 37%
194 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:00 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:00 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
13from __future__ import annotations
15__all__ = (
16 "FITS_SOURCE_REGEX",
17 "JSON_COLUMN",
18 "JSON_EXTNAME",
19 "ExtensionHDU",
20 "ExtensionKey",
21 "FitsCompressionAlgorithm",
22 "FitsCompressionOptions",
23 "FitsDitherAlgorithm",
24 "FitsOpaqueMetadata",
25 "FitsQuantizationOptions",
26 "InvalidFitsArchiveError",
27 "PointerModel",
28 "PrecompressedImage",
29 "add_offset_wcs",
30 "strip_butler_cards",
31 "strip_legacy_exposure_cards",
32 "strip_wcs_cards",
33)
35import dataclasses
36import enum
37import itertools
38import re
39import string
40from typing import Any, ClassVar, Self, final
42import astropy.io.fits
43import numpy as np
44import pydantic
46from .._geom import Box
47from ..serialization import ArchiveReadError, OpaqueArchiveMetadata, TableColumnModel
49type ExtensionHDU = astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU
51FITS_SOURCE_REGEX = re.compile(r"fits:(?P<extname>[\w/\-]+)(,(?P<extver>\d+))?(\[\d+\])?")
53JSON_EXTNAME: str = "JSON"
54JSON_COLUMN: str = "JSON"
57@dataclasses.dataclass(frozen=True)
58class ExtensionKey:
59 """The identifiers for a single FITS extension HDU within a file."""
61 name: str = ""
62 """Value of the EXTNAME keyword, or an empty string for the primary HDU."""
64 ver: int = 1
65 """Value of the EXTVER keyword (which may be absent if it is ``1``)."""
67 @classmethod
68 def from_index_row(cls, row: np.ndarray) -> ExtensionKey:
69 """Construct from a row of the index binary table appended to the
70 FITS files written by the package.
71 """
72 return cls(str(row["EXTNAME"]).upper(), int(row["EXTVER"]))
74 @classmethod
75 def from_str(cls, source: str) -> ExtensionKey:
76 """Construct from the `str` coercion of this type, which is used
77 as the 'source' field in various Pydantic models that serve as
78 references to other HDUs.
79 """
80 if (m := FITS_SOURCE_REGEX.fullmatch(source)) is None:
81 raise ArchiveReadError(f"Bad 'source' string for FITS: {source!r}.")
82 ver = 1
83 if m.group("extver") is not None:
84 ver = int(m.group("extver"))
85 return cls(m.group("extname"), ver)
87 def check(self) -> None:
88 if not FITS_SOURCE_REGEX.match(str(self)):
89 raise ValueError(
90 f"Invalid source key: '{str(self)}'; name characters must be alphanumeric, '-', '_', or '/'."
91 )
93 def __str__(self) -> str:
94 if self.ver > 1:
95 return f"fits:{self.name},{self.ver}"
96 else:
97 return f"fits:{self.name}"
100class PointerModel(pydantic.BaseModel):
101 column: TableColumnModel = pydantic.Field(description="Table column for this cell this pointer targets.")
102 row: int = pydantic.Field(description="Zero-indexed row for cell this pointer targets.")
105class InvalidFitsArchiveError(RuntimeError):
106 """The error type raised when the content of a FITS file presumed to have
107 been written by FitsOutputArchive is not self-consistent.
108 """
111class FitsCompressionAlgorithm(enum.StrEnum):
112 """FITS compression algorithms supported by this package.
114 See the FITS standard for definitions.
115 """
117 GZIP_1 = "GZIP_1"
118 GZIP_2 = "GZIP_2"
119 RICE_1 = "RICE_1"
122class FitsDitherAlgorithm(enum.StrEnum):
123 """FITS quantization dither algorithms supported by this package.
125 See the FITS standard for definitions.
126 """
128 NO_DITHER = "NO_DITHER"
129 SUBTRACTIVE_DITHER_1 = "SUBTRACTIVE_DITHER_1"
130 SUBTRACTIVE_DITHER_2 = "SUBTRACTIVE_DITHER_2"
132 def to_astropy_quantize_method(self) -> int:
133 """Convert to the integer code used by Astropy."""
134 match self:
135 case self.NO_DITHER:
136 return -1
137 case self.SUBTRACTIVE_DITHER_1:
138 return 1
139 case self.SUBTRACTIVE_DITHER_2:
140 return 2
141 raise AssertionError("Invalid enum value.")
144class FitsQuantizationOptions(pydantic.BaseModel, frozen=True):
145 """Quantization options for FITS compression."""
147 dither: FitsDitherAlgorithm
148 """How to add random noise during quantization to reduce biases."""
150 level: float
151 """Quantization level.
153 When positive, this is the fraction of the measured standard deviation that
154 corresponds to an integer step. When negative, it is ``-ZSCALE``, the
155 scaling to apply directly to the original pixels before quantization.
156 """
158 seed: int | None = None
159 """Random number seed to use for dithering.
161 Values between 1 and 10000 (inclusive) are used directly. ``0`` will
162 generate a value from the current time, and ``-1`` will generate a value
163 from the checksum of the image.
165 If `None`, the ``compression_seed`` parameter must be passed to
166 `FitsOutputArchive.open` if any quantized compression is configured.
167 """
170class FitsCompressionOptions(pydantic.BaseModel, frozen=True):
171 """Configuration options for FITS compression."""
173 algorithm: FitsCompressionAlgorithm = FitsCompressionAlgorithm.GZIP_2
174 """Compression algorithm to use."""
176 tile_shape: tuple[int, ...] | None = None
177 """Shape ``(..., y, x)`` of independently compressed tiles.
179 The default of `None` compresses each row separately.
180 """
182 quantization: FitsQuantizationOptions | None = None
183 """Quantization to apply before compression, if any."""
185 DEFAULT: ClassVar[FitsCompressionOptions | None]
186 """Default compression options (lossless ``GZIP_2``)."""
188 LOSSY: ClassVar[FitsCompressionOptions]
189 """Default lossy compression options."""
191 def make_hdu(self, data: np.ndarray, name: str) -> astropy.io.fits.CompImageHDU:
192 """Make an `astropy.io.fits.CompImageHDU` object from these options."""
193 if self.quantization is not None:
194 return astropy.io.fits.CompImageHDU(
195 data,
196 name=name,
197 compression_type=self.algorithm.value,
198 tile_shape=self.tile_shape,
199 quantize_method=self.quantization.dither.to_astropy_quantize_method(),
200 quantize_level=self.quantization.level,
201 dither_seed=self.quantization.seed,
202 )
203 else:
204 return astropy.io.fits.CompImageHDU(
205 data,
206 name=name,
207 compression_type=self.algorithm.value,
208 tile_shape=self.tile_shape,
209 quantize_level=0.0,
210 )
213FitsCompressionOptions.DEFAULT = FitsCompressionOptions()
214FitsCompressionOptions.LOSSY = FitsCompressionOptions(
215 algorithm=FitsCompressionAlgorithm.RICE_1,
216 quantization=FitsQuantizationOptions(dither=FitsDitherAlgorithm.SUBTRACTIVE_DITHER_2, level=16.0),
217)
220_COMPRESSION_KEYS = frozenset(
221 (
222 "ZIMAGE",
223 "ZCMPTYPE",
224 "ZBITPIX",
225 "ZNAXIS",
226 "ZMASKCMP",
227 "ZQUANTIZ",
228 "ZDITHER0",
229 "ZCHECKSUM",
230 "ZDATASUM",
231 )
232)
233_COMPRESSION_PREFIX_KEYS = ("ZNAXIS", "ZTILE", "ZNAME", "ZVAL")
236@dataclasses.dataclass
237class PrecompressedImage:
238 """Already-compressed FITS HDUs that are attached to high-level objects
239 via `FitsOpaqueMetadata`, allowing lossy-compressed pixel values to be
240 round-tripped exactly.
241 """
243 header: astropy.io.fits.Header
244 """Header for the HDU.
246 This contains only FITS tile-compression keywords.
247 """
249 data: astropy.io.fits.FITS_rec
250 """FITS binary table data that serves as the low-level representation of a
251 tile-compressed image HDU.
252 """
254 @classmethod
255 def from_bintable(cls, hdu: astropy.io.fits.BinTableHDU) -> Self:
256 """Construct from a binary table HDU.
258 Parameters
259 ----------
260 hdu
261 Binary table HDU, typically read from a FITS file opened with
262 ``disable_image_compression=True``.
264 Returns
265 -------
266 PrecompressedImage
267 A `PrecompressedImage` instance.
268 """
269 header = astropy.io.fits.Header(
270 [
271 card
272 for card in hdu.header.cards
273 if card.keyword in _COMPRESSION_KEYS
274 or any(card.keyword.startswith(k) for k in _COMPRESSION_PREFIX_KEYS)
275 ]
276 )
277 # This is an opportunity to fix CFITSIO's non-standard writing of the
278 # old RICE_ONE value instead of RICE_1.
279 if header["ZCMPTYPE"] == "RICE_ONE":
280 header["ZCMPTYPE"] = "RICE_1"
281 return cls(header=header, data=hdu.data)
284@final
285@dataclasses.dataclass
286class FitsOpaqueMetadata(OpaqueArchiveMetadata):
287 """Opaque metadata that may be carried around by a serializable type to
288 propagate serialization options and opaque information without that type
289 knowing how it was serialized.
290 """
292 headers: dict[ExtensionKey, astropy.io.fits.Header] = dataclasses.field(default_factory=dict)
293 """FITS headers found (but not interpreted and stripped) when reading, to
294 be propagated on write.
296 Keys are EXTNAME/EXTVER combinations, or ("", 1) for the primary header.
297 Header information in opaque metadata is considered immutable, allowing it
298 to be transferred by reference to copies and subsets of the object it is
299 attached to.
300 """
302 precompressed: dict[str, PrecompressedImage] = dataclasses.field(default_factory=dict)
303 """FITS tile-compressed HDUs that should be written out directly instead
304 of the in-memory data provided.
306 Keys are EXTNAME values, which must be unique (no EXTVER disambiguation).
307 Precompressed pixel values are never copied or transferred to subsets.
308 """
310 def add_header(
311 self,
312 header: astropy.io.fits.Header,
313 name: str | None = None,
314 ver: int | None = None,
315 key: ExtensionKey | None = None,
316 ) -> None:
317 """Add a header to the opaque metadata if it is not already present,
318 and strip EXTNAME and EXTVER if present.
320 Parameters
321 ----------
322 header
323 Header to add. May be modified in place.
324 name
325 EXTNAME (all caps). If not provided, the EXTNAME card must be
326 present in the header. Use ``""`` for the primary header.
327 ver
328 EXTVER. If not provided and the EXTVER card is not present,
329 defaults to 1.
330 key
331 Combination of EXTNAME and EXTVER; used instead of both ``name``
332 and ``ver`` if provided.
333 """
334 if key is None:
335 if name is None:
336 name = header["EXTNAME"]
337 if ver is None:
338 ver = header.get("EXTVER", 1)
339 key = ExtensionKey(name, ver)
340 strip_butler_cards(header)
341 if key not in self.headers:
342 header.remove("EXTNAME", ignore_missing=True)
343 header.remove("EXTVER", ignore_missing=True)
344 self.headers[key] = header
346 def maybe_use_precompressed(self, name: str) -> astropy.io.fits.BinTableHDU | None:
347 """Look up the given EXTNAME to see if there is a tile compressed image
348 HDU that should be used directly, instead of requantizing.
350 Parameters
351 ----------
352 name
353 EXTNAME (all caps).
355 Returns
356 -------
357 `astropy.io.fits.BinTableHDU` | `None`
358 An already-compressed HDU, in binary table form, or `None` if there
359 is no precompressed HDU for this EXTNAME.
360 """
361 if (precompressed := self.precompressed.get(name)) is None:
362 return None
363 return astropy.io.fits.BinTableHDU(precompressed.data, header=precompressed.header.copy(), name=name)
365 def extract_legacy_primary_header(self, header: astropy.io.fits.Header) -> dict[str, Any]:
366 """Update the opaque metadata with the header of the primary HDU
367 of a legacy (`lsst.afw.image`) FITS file, stripping cards we know we
368 don't need and extracting any ``LSST IMAGES ...`` cards into a
369 dictionary we return.
370 """
371 primary_header = header.copy(strip=True)
372 # No idea what these spare TAN-SIP headers are doing in the afw
373 # FITS files, but we'll strip them here:
374 primary_header.remove("A_ORDER", ignore_missing=True)
375 primary_header.remove("B_ORDER", ignore_missing=True)
376 strip_legacy_exposure_cards(primary_header)
377 strip_butler_cards(primary_header)
378 metadata: dict[str, Any] = {}
379 for n in itertools.count():
380 if (key := header.pop(f"LSST IMAGES KEY {n + 1}", ...)) is ...:
381 break
382 value = header.pop(f"LSST IMAGES VALUE {n + 1}")
383 metadata[key] = value
384 self.headers[ExtensionKey()] = primary_header
385 return metadata
387 def copy(self) -> FitsOpaqueMetadata:
388 # Docstring inherited.
389 return FitsOpaqueMetadata(headers=self.headers)
391 def subset(self, bbox: Box) -> FitsOpaqueMetadata:
392 # Docstring inherited.
393 return FitsOpaqueMetadata(headers=self.headers)
395 def get_instrumental_unit(self) -> astropy.units.UnitBase | None:
396 """Extract the ``LSST ISR UNIT`` key from the primary header (if it
397 exists) and wrap it with Astropy.
398 """
399 if (primary_header := self.headers.get(ExtensionKey())) is not None:
400 if (instrumental_unit_str := primary_header.get("LSST ISR UNITS")) is not None:
401 return astropy.units.Unit(instrumental_unit_str)
402 return None
405def add_offset_wcs(header: astropy.io.fits.Header, *, x: int | float, y: int | float, key: str = "A") -> None:
406 """Add a trivial FITS WCS to a header that applies the appropriate offset
407 to map FITS array coordinates to a logical pixel grid.
409 Parameters
410 ----------
411 header
412 Header to update in-place.
413 x
414 Logical coordinate of the first column.
415 y
416 Logical coordinate of the first row.
417 key
418 Single-character suffix for this WCS.
419 """
420 header.set(f"CTYPE1{key}", "LINEAR")
421 header.set(f"CTYPE2{key}", "LINEAR")
422 header.set(f"CRPIX1{key}", 1.0)
423 header.set(f"CRPIX2{key}", 1.0)
424 header.set(f"CRVAL1{key}", float(x))
425 header.set(f"CRVAL2{key}", float(y))
426 header.set(f"CUNIT1{key}", "PIXEL")
427 header.set(f"CUNIT2{key}", "PIXEL")
430_WCS_VECTOR_KEYS = ("CUNIT", "CRPIX", "CRPIX", "CRVAL", "CRDELT", "CROTA", "CRDER", "CSYER", "CDELT")
431_WCS_MATRIX_KEYS = ("CD{0}_{1}", "PC{0}_{1}")
434def strip_wcs_cards(header: astropy.io.fits.Header) -> None:
435 """Strip WCS cards from a FITS header.
437 This does *not* attempt to cover all possible FITS WCS forms; it focuses on
438 the ones we actually plan to write (simple undistorted ones + TAN-SIP).
439 """
440 wcsaxes = header.pop("WCSAXES", 2)
441 for wcsname in [""] + list(string.ascii_uppercase):
442 header.remove("RADESYS" + wcsname, ignore_missing=True)
443 if "CTYPE1" + wcsname in header:
444 ctype: str = "" # just for linters that can't figure out that the loop always executes
445 for n in range(wcsaxes):
446 suffix = f"{n + 1}{wcsname}"
447 ctype = header.pop("CTYPE" + suffix)
448 for key in _WCS_VECTOR_KEYS:
449 header.remove(key + suffix, ignore_missing=True)
450 for m in range(wcsaxes):
451 for tmpl in _WCS_MATRIX_KEYS:
452 header.remove(tmpl.format(m + 1, suffix), ignore_missing=True)
453 if ctype.endswith("-SIP"):
454 _strip_sip_poly(header, wcsname, "A")
455 _strip_sip_poly(header, wcsname, "B")
456 _strip_sip_poly(header, wcsname, "AP")
457 _strip_sip_poly(header, wcsname, "BP")
458 header.remove("LONPOLE", ignore_missing=True)
459 header.remove("LATPOLE", ignore_missing=True)
460 header.remove("MJDREF", ignore_missing=True)
461 header.remove("PV1_3", ignore_missing=True)
462 header.remove("PV1_4", ignore_missing=True)
465def _strip_sip_poly(header: astropy.io.fits.Header, wcsname: str, which: str) -> None:
466 order: int | None = header.pop(f"{which}_ORDER{wcsname}", None)
467 if order is not None:
468 for i, j in itertools.product(range(order + 1), range(order + 1)):
469 header.remove(f"{which}_{i}_{j}{wcsname}", ignore_missing=True)
472def strip_legacy_exposure_cards(header: astropy.io.fits.Header) -> None:
473 """Strip header keywords added by lsst.afw.image.Exposure."""
474 header.remove("AR_HDU", ignore_missing=True)
475 for name in (
476 "FILTER",
477 "DETECTOR",
478 "VALID_POLYGON",
479 "SKYWCS",
480 "PSF",
481 "SUMMARYSTATS",
482 "AP_CORR_MAP",
483 "PHOTOCALIB",
484 ):
485 header.remove(f"{name}_ID", ignore_missing=True)
486 header.remove(f"ARCHIVE_ID_{name}", ignore_missing=True)
489def strip_butler_cards(header: astropy.io.fits.Header) -> None:
490 """Strip header keywords added by butler provenance that would be
491 incorrect if propagated to a downstream file.
492 """
493 for key in list(header):
494 if key.startswith("LSST BUTLER"):
495 del header[key]