Coverage for python/lsst/images/_image.py: 22%
223 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:43 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:43 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("Image", "ImageSerializationModel")
16from collections.abc import Callable, Sequence
17from contextlib import ExitStack
18from types import EllipsisType
19from typing import TYPE_CHECKING, Any, ClassVar, final
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import numpy as np
25import numpy.typing as npt
26import pydantic
28from lsst.resources import ResourcePath, ResourcePathExpression
30from . import fits
31from ._generalized_image import GeneralizedImage
32from ._geom import YX, Box
33from ._transforms import Frame, Projection, ProjectionSerializationModel
34from .serialization import (
35 ArchiveTree,
36 ArrayReferenceModel,
37 ArrayReferenceQuantityModel,
38 InlineArrayModel,
39 InlineArrayQuantityModel,
40 InputArchive,
41 InvalidParameterError,
42 MetadataValue,
43 OutputArchive,
44 no_header_updates,
45)
46from .utils import is_none
48if TYPE_CHECKING:
49 try:
50 from lsst.afw.image import Image as LegacyImage
51 except ImportError:
52 type LegacyImage = Any # type: ignore[no-redef]
55@final
56class Image(GeneralizedImage):
57 """A 2-d array that may be augmented with units and a nonzero origin.
59 Parameters
60 ----------
61 array_or_fill
62 Array or fill value for the image. If a fill value, ``bbox`` or
63 ``shape`` must be provided.
64 bbox
65 Bounding box for the image.
66 start
67 Logical coordinates of the first pixel in the array, ordered ``y``,
68 ``x`` (unless an `XY` instance is passed). Ignored if
69 ``bbox`` is provided. Defaults to zeros.
70 shape
71 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
72 instance is passed). Only needed if ``array_or_fill`` is not an
73 array and ``bbox`` is not provided. Like the bbox, this does not
74 include the last dimension of the array.
75 dtype
76 Pixel data type override.
77 unit
78 Units for the image's pixel values.
79 projection
80 Projection that maps the pixel grid to the sky.
81 metadata
82 Arbitrary flexible metadata to associate with the image.
84 Notes
85 -----
86 Indexing the `array` attribute of an `Image` does not take into account its
87 ``start`` offset, but accessing a subimage by indexing an `Image` with a
88 `Box` does, and the `bbox` of the subimage is set to match its location
89 within the original image.
91 Indexed assignment to a subimage requires consistency between the
92 coordinate systems and units of both operands, but it will automatically
93 select a subimage of the right-hand side and convert compatible units when
94 possible. In other words::
96 a[box] = b
98 is a shortcut for
100 a[box].quantity = b[box].quantity
102 An ellipsis (``...``) can be used instead of a `Box` to assign to the full
103 image.
104 """
106 def __init__(
107 self,
108 array_or_fill: np.ndarray | int | float = 0,
109 /,
110 *,
111 bbox: Box | None = None,
112 start: Sequence[int] | None = None,
113 shape: Sequence[int] | None = None,
114 dtype: npt.DTypeLike | None = None,
115 unit: astropy.units.UnitBase | None = None,
116 projection: Projection[Any] | None = None,
117 metadata: dict[str, MetadataValue] | None = None,
118 ):
119 super().__init__(metadata)
120 if isinstance(array_or_fill, np.ndarray):
121 if dtype is not None:
122 array = np.array(array_or_fill, dtype=dtype, copy=None)
123 else:
124 array = array_or_fill
125 if bbox is None:
126 bbox = Box.from_shape(array.shape, start=start)
127 elif bbox.shape != array.shape:
128 raise ValueError(
129 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}."
130 )
131 if shape is not None and shape != array.shape:
132 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.")
133 else:
134 if bbox is None:
135 if shape is None:
136 raise TypeError("No bbox, shape, or array provided.")
137 bbox = Box.from_shape(shape, start=start)
138 elif shape is not None and shape != bbox.shape:
139 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.")
140 array = np.full(bbox.shape, array_or_fill, dtype=dtype)
141 self._array: np.ndarray = array
142 self._bbox: Box = bbox
143 self._unit = unit
144 self._projection = projection
146 @property
147 def array(self) -> np.ndarray:
148 """The low-level array (`numpy.ndarray`).
150 Assigning to this attribute modifies the existing array in place; the
151 bounding box and underlying data pointer are never changed.
152 """
153 return self._array
155 @array.setter
156 def array(self, value: np.ndarray | int | float) -> None:
157 self._array[...] = value
159 @property
160 def quantity(self) -> astropy.units.Quantity:
161 """The low-level array with units (`astropy.units.Quantity`).
163 Assigning to this attribute modifies the existing array in place; the
164 bounding box and underlying data pointer are never changed.
165 """
166 return astropy.units.Quantity(self._array, self._unit, copy=False)
168 @quantity.setter
169 def quantity(self, value: astropy.units.Quantity) -> None:
170 self.quantity[...] = value
172 @property
173 def bbox(self) -> Box:
174 """Bounding box for the image (`Box`)."""
175 return self._bbox
177 @property
178 def unit(self) -> astropy.units.UnitBase | None:
179 """Units for the image's pixel values (`astropy.units.Unit` or
180 `None`).
181 """
182 return self._unit
184 @property
185 def projection(self) -> Projection[Any] | None:
186 """The projection that maps this image's pixel grid to the sky
187 (`Projection` | `None`).
189 Notes
190 -----
191 The pixel coordinates used by this projection account for the bounding
192 box ``start``; they are not just array indices.
193 """
194 return self._projection
196 def __getitem__(self, bbox: Box | EllipsisType) -> Image:
197 if bbox is ...:
198 return self
199 super().__getitem__(bbox)
200 indices = bbox.slice_within(self._bbox)
201 return self._transfer_metadata(
202 Image(self._array[indices], bbox=bbox, unit=self._unit, projection=self._projection),
203 bbox=bbox,
204 )
206 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None:
207 self[bbox].quantity[...] = value.quantity
209 def __str__(self) -> str:
210 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})"
212 def __repr__(self) -> str:
213 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})"
215 def __eq__(self, other: object) -> bool:
216 if not isinstance(other, Image):
217 return NotImplemented
218 return (
219 self._bbox == other._bbox
220 and self._unit == other._unit
221 and np.array_equal(self._array, other._array, equal_nan=True)
222 )
224 def copy(self) -> Image:
225 return self._transfer_metadata(
226 Image(self._array.copy(), bbox=self._bbox, unit=self._unit, projection=self._projection),
227 copy=True,
228 )
230 def view(
231 self,
232 *,
233 unit: astropy.units.UnitBase | None | EllipsisType = ...,
234 projection: Projection | None | EllipsisType = ...,
235 start: Sequence[int] | EllipsisType = ...,
236 ) -> Image:
237 """Make a view of the image, with optional updates."""
238 if unit is ...:
239 unit = self._unit
240 if projection is ...:
241 projection = self._projection
242 if start is ...:
243 start = self._bbox.start
244 return self._transfer_metadata(Image(self._array, start=start, unit=unit, projection=projection))
246 def serialize[P: pydantic.BaseModel](
247 self,
248 archive: OutputArchive[P],
249 *,
250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
251 save_projection: bool = True,
252 add_offset_wcs: str | None = "A",
253 ) -> ImageSerializationModel[P]:
254 """Serialize the image to an output archive.
256 Parameters
257 ----------
258 archive
259 Archive to write to.
260 update_header
261 A callback that will be given the FITS header for the HDU
262 containing this image in order to add keys to it. This callback
263 may be provided but will not be called if the output format is not
264 FITS.
265 save_projection
266 If `True`, save the `Projection` attached to the image, if there
267 is one. This does not affect whether a FITS WCS corresponding to
268 the projection is written (it always is, if available, and if
269 ``add_offset_wcs`` is not ``" "``).
270 add_offset_wcs
271 A FITS WCS single-character suffix to use when adding a linear
272 WCS that maps the FITS array to the logical pixel coordinates
273 defined by ``bbox.start``. Set to `None` to not write this WCS.
274 If this is set to ``" "``, it will prevent the `Projection` from
275 being saved as a FITS WCS.
276 """
278 def _update_header(header: astropy.io.fits.Header) -> None:
279 update_header(header)
280 if self.unit is not None:
281 try:
282 header["BUNIT"] = self.unit.to_string(format="fits")
283 except ValueError:
284 # Units not supported by FITS; write it anyway because
285 # the accepted units are just a recommendation in the
286 # standard.
287 header["BUNIT"] = self.unit.to_string()
288 if self.projection is not None and add_offset_wcs != " ":
289 if self.fits_wcs:
290 header.update(self.fits_wcs.to_header(relax=True))
291 if add_offset_wcs is not None:
292 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
294 array_model = archive.add_array(self.array, update_header=_update_header)
295 serialized_projection: ProjectionSerializationModel[P] | None = None
296 if save_projection and self.projection is not None:
297 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
298 data = array_model if self.unit is None else array_model.with_units(self.unit)
299 return ImageSerializationModel.model_construct(
300 data=data,
301 start=list(self.bbox.start),
302 projection=serialized_projection,
303 metadata=self.metadata,
304 )
306 @staticmethod
307 def _get_archive_tree_type[P: pydantic.BaseModel](
308 pointer_type: type[P],
309 ) -> type[ImageSerializationModel[P]]:
310 """Return the serialization model type for this object for an archive
311 type that uses the given pointer type.
312 """
313 return ImageSerializationModel[pointer_type] # type: ignore
315 _archive_default_name: ClassVar[str] = "image"
316 """The name this object should be serialized with when written as the
317 top-level object.
318 """
320 def write_fits(
321 self,
322 filename: str,
323 *,
324 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
325 compression_seed: int | None = None,
326 ) -> None:
327 """Write the image to a FITS file.
329 Parameters
330 ----------
331 filename
332 Name of the file to write to. Must be a local file.
333 compression
334 Compression options.
335 compression_seed
336 A FITS tile compression seed to use whenever the configured
337 compression seed is `None` or (for backwards compatibility) ``0``.
338 """
339 compression_options = {}
340 if compression is not fits.FitsCompressionOptions.DEFAULT:
341 compression_options[self._archive_default_name] = compression
342 fits.write(self, filename, compression_options, compression_seed=compression_seed)
344 @staticmethod
345 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image:
346 """Read an image from a FITS file.
348 Parameters
349 ----------
350 url
351 URL of the file to read; may be any type supported by
352 `lsst.resources.ResourcePath`.
353 bbox
354 Bounding box of a subimage to read instead.
355 """
356 return fits.read(Image, url, bbox=bbox).deserialized
358 @staticmethod
359 def from_legacy(legacy: LegacyImage, unit: astropy.units.UnitBase | None = None) -> Image:
360 """Convert from an `lsst.afw.image.Image` instance.
362 Parameters
363 ----------
364 legacy
365 An `lsst.afw.image.Image` instance that will share pixel data with
366 the returned object.
367 unit
368 Units of the image.
369 """
370 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit)
372 def to_legacy(self, *, copy: bool | None = None) -> LegacyImage:
373 """Convert to an `lsst.afw.image.Image` instance.
375 Parameters
376 ----------
377 copy
378 If `True`, always copy the pixel data. If `False`, return a view,
379 and raise `TypeError` if the pixel data is read-only (this is not
380 supported by afw). If `None`, onyl if the pixel data is
381 read-only.
382 """
383 import lsst.afw.image
384 import lsst.geom
386 array = self._array
387 if copy:
388 array = array.copy()
389 elif not self._array.flags.writeable:
390 if copy is None:
391 array = array.copy()
392 else:
393 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.")
395 return lsst.afw.image.Image(
396 array,
397 deep=False,
398 dtype=array.dtype.type,
399 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min),
400 )
402 @staticmethod
403 def read_legacy(
404 uri: ResourcePathExpression,
405 *,
406 preserve_quantization: bool = False,
407 ext: str | int = 1,
408 fits_wcs_frame: Frame | None = None,
409 ) -> Image:
410 """Read a FITS file written by `lsst.afw.image.Image.writeFits`.
412 Parameters
413 ----------
414 uri
415 URI or file name.
416 preserve_quantization
417 If `True`, ensure that writing the image back out again will
418 exactly preserve quantization-compressed pixel values. This causes
419 the arrays to be marked as read-only and stores the original binary
420 table data for those planes in memory. If the `Image` is copied,
421 the precompressed pixel values are not transferred to the copy.
422 ext
423 Name or index of the FITS HDU to read.
424 fits_wcs_frame
425 If not `None` and the HDU containing the image has a FITS WCS,
426 attach a `Projection` to the returned image by converting that
427 WCS.
428 """
429 opaque_metadata = fits.FitsOpaqueMetadata()
430 with ExitStack() as exit_stack:
431 fs, fspath = ResourcePath(uri).to_fsspec()
432 stream = exit_stack.enter_context(fs.open(fspath))
433 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream))
434 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
435 bintable_hdu: astropy.io.fits.BinTableHDU | None = None
436 if preserve_quantization:
437 bintable_stream = exit_stack.enter_context(fs.open(fspath))
438 bintable_hdu_list = exit_stack.enter_context(
439 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
440 )
441 bintable_hdu = bintable_hdu_list[ext]
442 result = Image._read_legacy_hdu(
443 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame
444 )
445 result._opaque_metadata = opaque_metadata
446 return result
448 @staticmethod
449 def _read_legacy_hdu(
450 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU,
451 opaque_metadata: fits.FitsOpaqueMetadata,
452 *,
453 preserve_bintable: astropy.io.fits.BinTableHDU | None,
454 fits_wcs_frame: Frame | None = None,
455 ) -> Image:
456 unit: astropy.units.UnitBase | None = None
457 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None:
458 try:
459 unit = astropy.units.Unit(fits_unit, format="fits")
460 except ValueError:
461 # Accept non-FITS units by assuming Astropy can still figure
462 # them out if we don't specify the format.
463 unit = astropy.units.Unit(fits_unit)
464 if opaque_metadata.get_instrumental_unit() == astropy.units.electron:
465 # Fix incorrect BUNIT='adu' in LSST preliminary_visit_image.
466 if unit == astropy.units.adu:
467 unit = astropy.units.electron
468 if unit == astropy.units.adu**2:
469 unit = astropy.units.electron**2
470 dx: int = hdu.header.pop("LTV1")
471 dy: int = hdu.header.pop("LTV2")
472 start = YX(y=-dy, x=-dx)
473 read_only: bool = False
474 if preserve_bintable is not None:
475 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable)
476 read_only = True
477 projection: Projection | None = None
478 if fits_wcs_frame is not None:
479 try:
480 fits_wcs = astropy.wcs.WCS(hdu.header)
481 except KeyError:
482 pass
483 else:
484 projection = Projection.from_fits_wcs(
485 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
486 )
487 image = Image(hdu.data, start=start, unit=unit, projection=projection)
488 if read_only:
489 image._array.flags["WRITEABLE"] = False
490 fits.strip_wcs_cards(hdu.header)
491 hdu.header.strip()
492 hdu.header.remove("EXTTYPE", ignore_missing=True)
493 hdu.header.remove("INHERIT", ignore_missing=True)
494 hdu.header.remove("UZSCALE", ignore_missing=True)
495 opaque_metadata.add_header(hdu.header)
496 return image
499class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
500 """Pydantic model used to represent the serialized form of an `.Image`."""
502 data: ArrayReferenceQuantityModel | ArrayReferenceModel | InlineArrayModel | InlineArrayQuantityModel = (
503 pydantic.Field(description="Reference to pixel data.")
504 )
505 start: list[int] = pydantic.Field(
506 description="Coordinate of the first pixels in the array, ordered (y, x)."
507 )
508 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
509 default=None,
510 exclude_if=is_none,
511 description="Projection that maps the logical pixel grid onto the sky.",
512 )
514 @property
515 def bbox(self) -> Box:
516 """The bounding box of the image."""
517 match self.data:
518 case ArrayReferenceQuantityModel() | InlineArrayQuantityModel():
519 shape = self.data.value.shape
520 case ArrayReferenceModel() | InlineArrayModel():
521 shape = self.data.shape
522 return Box.from_shape(shape, self.start)
524 def deserialize(
525 self,
526 archive: InputArchive[Any],
527 *,
528 bbox: Box | None = None,
529 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
530 **kwargs: Any,
531 ) -> Image:
532 """Deserialize an image from an input archive.
534 Parameters
535 ----------
536 archive
537 Archive to read from.
538 bbox
539 Bounding box of a subimage to read instead.
540 strip_header
541 A callable that strips out any FITS header cards added by the
542 ``update_header`` argument in the corresponding call to
543 `Image.serialize`.
544 **kwargs
545 Unsupported keyword arguments are accepted only to provide better
546 error messages (raising `serialization.InvalidParameterError`).
547 """
548 if kwargs:
549 raise InvalidParameterError(f"Unrecognized parameters for Image: {set(kwargs.keys())}.")
550 array_model: ArrayReferenceModel | InlineArrayModel
551 unit: astropy.units.UnitBase | None = None
552 if isinstance(self.data, ArrayReferenceQuantityModel | InlineArrayQuantityModel):
553 array_model = self.data.value
554 unit = self.data.unit
555 else:
556 array_model = self.data
558 def _strip_header(header: astropy.io.fits.Header) -> None:
559 if unit is not None:
560 header.pop("BUNIT", None)
561 fits.strip_wcs_cards(header)
562 strip_header(header)
564 slices = bbox.slice_within(self.bbox) if bbox is not None else ...
565 array = archive.get_array(array_model, strip_header=_strip_header, slices=slices)
566 projection = self.projection.deserialize(archive) if self.projection is not None else None
567 return Image(
568 array,
569 start=self.start if bbox is None else bbox.start,
570 unit=unit,
571 projection=projection,
572 )._finish_deserialize(self)
574 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
575 if kwargs:
576 raise InvalidParameterError(f"Unsupported parameters for Image components: {set(kwargs.keys())}.")
577 return super().deserialize_component(component, archive)