Coverage for python / lsst / images / _masked_image.py: 33%
178 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-27 01:31 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-27 01:31 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("MaskedImage", "MaskedImageSerializationModel")
16import functools
17from collections.abc import Mapping
18from contextlib import ExitStack
19from types import EllipsisType
20from typing import TYPE_CHECKING, Any, Literal, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
28from lsst.resources import ResourcePath, ResourcePathExpression
30from . import fits
31from ._generalized_image import GeneralizedImage
32from ._geom import Box
33from ._image import Image, ImageSerializationModel
34from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel
35from ._transforms import Frame, Projection, ProjectionSerializationModel
36from .serialization import ArchiveTree, InputArchive, InvalidParameterError, MetadataValue, OutputArchive
37from .utils import is_none
39if TYPE_CHECKING:
40 try:
41 from lsst.afw.image import MaskedImage as LegacyMaskedImage
42 except ImportError:
43 type LegacyMaskedImage = Any # type: ignore[no-redef]
46class MaskedImage(GeneralizedImage):
47 """A multi-plane image with data (image), mask, and variance planes.
49 Parameters
50 ----------
51 image
52 The main image plane. If this has a `Projection`, it will be used
53 for all planes unless a ``projection`` is passed separately.
54 mask
55 A bitmask image that annotates the main image plane. Must have the
56 same bounding box as ``image`` if provided. Any attached projection
57 is replaced (possibly by `None`).
58 variance
59 The per-pixel uncertainty of the main image as an image of variance
60 values. Must have the same bounding box as ``image`` if provided, and
61 its units must be the square of ``image.unit`` or `None`.
62 Values default to ``1.0``. Any attached projection is replaced
63 (possibly by `None`).
64 mask_schema
65 Schema for the mask plane. Must be provided if and only if ``mask`` is
66 not provided.
67 projection
68 Projection that maps the pixel grid to the sky.
69 metadata
70 Arbitrary flexible metadata to associate with the image.
71 """
73 def __init__(
74 self,
75 image: Image,
76 *,
77 mask: Mask | None = None,
78 variance: Image | None = None,
79 mask_schema: MaskSchema | None = None,
80 projection: Projection | None = None,
81 metadata: dict[str, MetadataValue] | None = None,
82 ):
83 super().__init__(metadata)
84 if projection is None:
85 projection = image.projection
86 else:
87 image = image.view(projection=projection)
88 if mask is None:
89 if mask_schema is None:
90 raise TypeError("'mask_schema' must be provided if 'mask' is not.")
91 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection)
92 elif mask_schema is not None:
93 raise TypeError("'mask_schema' may not be provided if 'mask' is.")
94 else:
95 if image.bbox != mask.bbox:
96 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.")
97 mask = mask.view(projection=projection)
98 if variance is None:
99 variance = Image(
100 1.0,
101 dtype=np.float32,
102 bbox=image.bbox,
103 unit=None if image.unit is None else image.unit**2,
104 projection=projection,
105 )
106 else:
107 if image.bbox != variance.bbox:
108 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.")
109 variance = variance.view(projection=projection)
110 if image.unit is None:
111 if variance.unit is not None:
112 raise ValueError(f"Image has no units but variance does ({variance.unit}).")
113 elif variance.unit is None:
114 variance = variance.view(unit=image.unit**2)
115 elif variance.unit != image.unit**2:
116 raise ValueError(
117 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})."
118 )
119 self._image = image
120 self._mask = mask
121 self._variance = variance
123 @property
124 def image(self) -> Image:
125 """The main image plane (`~lsst.images.Image`)."""
126 return self._image
128 @property
129 def mask(self) -> Mask:
130 """The mask plane (`~lsst.images.Mask`)."""
131 return self._mask
133 @property
134 def variance(self) -> Image:
135 """The variance plane (`~lsst.images.Image`)."""
136 return self._variance
138 @property
139 def bbox(self) -> Box:
140 """The bounding box shared by all three image planes
141 (`~lsst.images.Box`).
142 """
143 return self._image.bbox
145 @property
146 def unit(self) -> astropy.units.UnitBase | None:
147 """The units of the image plane (`astropy.units.Unit` | `None`)."""
148 return self._image.unit
150 @property
151 def projection(self) -> Projection[Any] | None:
152 """The projection that maps the pixel grid to the sky
153 (`~lsst.images.Projection` | `None`).
154 """
155 return self._image.projection
157 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage:
158 if bbox is ...:
159 return self
160 super().__getitem__(bbox)
161 return self._transfer_metadata(
162 MaskedImage(
163 # Projection and obs_info propagate from the image.
164 self.image[bbox],
165 mask=self.mask[bbox],
166 variance=self.variance[bbox],
167 ),
168 bbox=bbox,
169 )
171 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None:
172 self._image[bbox] = value.image
173 self._mask[bbox] = value.mask
174 self._variance[bbox] = value.variance
176 def __str__(self) -> str:
177 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})"
179 def __repr__(self) -> str:
180 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})"
182 def copy(self) -> MaskedImage:
183 """Deep-copy the masked image and metadata."""
184 return self._transfer_metadata(
185 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()),
186 copy=True,
187 )
189 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel:
190 """Serialize the masked image to an output archive.
192 Parameters
193 ----------
194 archive
195 Archive to write to.
196 """
197 serialized_image = archive.serialize_direct(
198 "image", functools.partial(self.image.serialize, save_projection=False)
199 )
200 serialized_mask = archive.serialize_direct(
201 "mask", functools.partial(self.mask.serialize, save_projection=False)
202 )
203 serialized_variance = archive.serialize_direct(
204 "variance", functools.partial(self.variance.serialize, save_projection=False)
205 )
206 serialized_projection = (
207 archive.serialize_direct("projection", self.projection.serialize)
208 if self.projection is not None
209 else None
210 )
211 return MaskedImageSerializationModel(
212 image=serialized_image,
213 mask=serialized_mask,
214 variance=serialized_variance,
215 projection=serialized_projection,
216 metadata=self.metadata,
217 )
219 @staticmethod
220 def _get_archive_tree_type[P: pydantic.BaseModel](
221 pointer_type: type[P],
222 ) -> type[MaskedImageSerializationModel[P]]:
223 """Return the serialization model type for this object for an archive
224 type that uses the given pointer type.
225 """
226 return MaskedImageSerializationModel[pointer_type] # type: ignore
228 def write_fits(
229 self,
230 filename: str,
231 *,
232 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
233 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
234 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
235 compression_seed: int | None = None,
236 ) -> None:
237 """Write the image to a FITS file.
239 Parameters
240 ----------
241 filename
242 Name of the file to write to. Must be a local file.
243 image_compression
244 Compression options for the `image` plane.
245 mask_compression
246 Compression options for the `mask` plane.
247 variance_compression
248 Compression options for the `variance` plane.
249 compression_seed
250 A FITS tile compression seed to use whenever the configured
251 compression seed is `None` or (for backwards compatibility) ``0``.
252 This value is then incremented every time it is used.
253 """
254 compression_options = {}
255 if image_compression is not fits.FitsCompressionOptions.DEFAULT:
256 compression_options["image"] = image_compression
257 if mask_compression is not fits.FitsCompressionOptions.DEFAULT:
258 compression_options["mask"] = mask_compression
259 if variance_compression is not fits.FitsCompressionOptions.DEFAULT:
260 compression_options["variance"] = variance_compression
261 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed)
263 @classmethod
264 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage:
265 """Read an image from a FITS file.
267 Parameters
268 ----------
269 url
270 URL of the file to read; may be any type supported by
271 `lsst.resources.ResourcePath`.
272 bbox
273 Bounding box of a subimage to read instead.
274 """
275 return fits.read(cls, url, bbox=bbox).deserialized
277 @staticmethod
278 def from_legacy(
279 legacy: LegacyMaskedImage,
280 *,
281 unit: astropy.units.UnitBase | None = None,
282 plane_map: Mapping[str, MaskPlane] | None = None,
283 ) -> MaskedImage:
284 """Convert from an `lsst.afw.image.MaskedImage` instance.
286 Parameters
287 ----------
288 legacy
289 An `lsst.afw.image.MaskedImage` instance that will share image and
290 variance (but not mask) pixel data with the returned object.
291 unit
292 Units of the image.
293 plane_map
294 A mapping from legacy mask plane name to the new plane name and
295 description.
296 """
297 return MaskedImage(
298 image=Image.from_legacy(legacy.getImage(), unit),
299 mask=Mask.from_legacy(legacy.getMask(), plane_map),
300 variance=Image.from_legacy(legacy.getVariance()),
301 )
303 def to_legacy(
304 self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None
305 ) -> LegacyMaskedImage:
306 """Convert to an `lsst.afw.image.MaskedImage` instance.
308 Parameters
309 ----------
310 copy
311 If `True`, always copy the image and variance pixel data.
312 If `False`, return a view, and raise `TypeError` if the pixel data
313 is read-only (this is not supported by afw). If `None`, only copy
314 if the pixel data is read-only. Mask pixel data is always copied.
315 plane_map
316 A mapping from legacy mask plane name to the new plane name and
317 description.
318 """
319 import lsst.afw.image
321 return lsst.afw.image.MaskedImage(
322 self.image.to_legacy(copy=copy),
323 mask=self.mask.to_legacy(plane_map),
324 variance=self.variance.to_legacy(copy=copy),
325 dtype=self.image.array.dtype,
326 )
328 @overload
329 @staticmethod
330 def read_legacy( 330 ↛ exitline 330 didn't return from function 'read_legacy' because
331 uri: ResourcePathExpression,
332 *,
333 preserve_quantization: bool = False,
334 component: Literal["image"],
335 fits_wcs_frame: Frame | None = None,
336 ) -> Image: ...
338 @overload
339 @staticmethod
340 def read_legacy( 340 ↛ exitline 340 didn't return from function 'read_legacy' because
341 uri: ResourcePathExpression,
342 *,
343 plane_map: Mapping[str, MaskPlane] | None = None,
344 component: Literal["mask"],
345 fits_wcs_frame: Frame | None = None,
346 ) -> Mask: ...
348 @overload
349 @staticmethod
350 def read_legacy( 350 ↛ exitline 350 didn't return from function 'read_legacy' because
351 uri: ResourcePathExpression,
352 *,
353 preserve_quantization: bool = False,
354 component: Literal["variance"],
355 fits_wcs_frame: Frame | None = None,
356 ) -> Image: ...
358 @overload
359 @staticmethod
360 def read_legacy( 360 ↛ exitline 360 didn't return from function 'read_legacy' because
361 uri: ResourcePathExpression,
362 *,
363 preserve_quantization: bool = False,
364 plane_map: Mapping[str, MaskPlane] | None = None,
365 component: None = None,
366 fits_wcs_frame: Frame | None = None,
367 ) -> MaskedImage: ...
369 @staticmethod
370 def read_legacy(
371 uri: ResourcePathExpression,
372 *,
373 preserve_quantization: bool = False,
374 plane_map: Mapping[str, MaskPlane] | None = None,
375 component: Literal["image", "mask", "variance"] | None = None,
376 fits_wcs_frame: Frame | None = None,
377 ) -> Any:
378 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`.
380 Parameters
381 ----------
382 uri
383 URI or file name.
384 preserve_quantization
385 If `True`, ensure that writing the masked image back out again will
386 exactly preserve quantization-compressed pixel values. This causes
387 the image and variance plane arrays to be marked as read-only and
388 stores the original binary table data for those planes in memory.
389 If the `~lsst.images.MaskedImage` is copied, the precompressed
390 pixel values are not transferred to the copy.
391 plane_map
392 A mapping from legacy mask plane name to the new plane name and
393 description.
394 component
395 A component to read instead of the full image.
396 fits_wcs_frame
397 If not `None` and the HDU containing the image plane has a FITS
398 WCS, attach a `~lsst.images.Projection` to the returned masked
399 image by converting that WCS. When ``component`` is one of
400 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the
401 component HDU is used instead (all three should have the same WCS).
402 """
403 fs, fspath = ResourcePath(uri).to_fsspec()
404 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
405 return MaskedImage._read_legacy_hdus(
406 hdu_list,
407 uri,
408 preserve_quantization=preserve_quantization,
409 plane_map=plane_map,
410 component=component,
411 fits_wcs_frame=fits_wcs_frame,
412 )
414 @staticmethod
415 def _read_legacy_hdus(
416 hdu_list: astropy.io.fits.HDUList,
417 uri: ResourcePathExpression,
418 *,
419 opaque_metadata: fits.FitsOpaqueMetadata | None = None,
420 preserve_quantization: bool = False,
421 plane_map: Mapping[str, MaskPlane] | None = None,
422 component: Literal["image", "mask", "variance"] | None,
423 fits_wcs_frame: Frame | None = None,
424 ) -> Any:
425 if opaque_metadata is None:
426 opaque_metadata = fits.FitsOpaqueMetadata()
427 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
428 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
429 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
430 result: Any
431 with ExitStack() as exit_stack:
432 if preserve_quantization:
433 fs, fspath = ResourcePath(uri).to_fsspec()
434 bintable_stream = exit_stack.enter_context(fs.open(fspath))
435 bintable_hdu_list = exit_stack.enter_context(
436 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
437 )
438 image_bintable_hdu = bintable_hdu_list[1]
439 variance_bintable_hdu = bintable_hdu_list[3]
440 if component is None or component == "image":
441 image = Image._read_legacy_hdu(
442 hdu_list[1],
443 opaque_metadata,
444 preserve_bintable=image_bintable_hdu,
445 fits_wcs_frame=fits_wcs_frame,
446 )
447 if component == "image":
448 result = image
449 if component is None or component == "mask":
450 mask = Mask._read_legacy_hdu(
451 hdu_list[2],
452 opaque_metadata,
453 plane_map=plane_map,
454 fits_wcs_frame=fits_wcs_frame if component is not None else None,
455 )
456 if component == "mask":
457 result = mask
458 if component is None or component == "variance":
459 variance = Image._read_legacy_hdu(
460 hdu_list[3],
461 opaque_metadata,
462 preserve_bintable=variance_bintable_hdu,
463 fits_wcs_frame=fits_wcs_frame if component is not None else None,
464 )
465 if component == "variance":
466 result = variance
467 if component is None:
468 result = MaskedImage(image, mask=mask, variance=variance)
469 result._opaque_metadata = opaque_metadata
470 return result
473class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
474 """A Pydantic model used to represent a serialized `MaskedImage`."""
476 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
477 mask: MaskSerializationModel[P] = pydantic.Field(
478 description="Bitmask that annotates the main image's pixels."
479 )
480 variance: ImageSerializationModel[P] = pydantic.Field(
481 description="Per-pixel variance estimates for the main image."
482 )
483 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
484 default=None,
485 exclude_if=is_none,
486 description="Projection that maps the pixel grid to the sky.",
487 )
489 @property
490 def bbox(self) -> Box:
491 """The bounding box of the image."""
492 return self.image.bbox
494 def deserialize(
495 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any
496 ) -> MaskedImage:
497 """Deserialize an image from an input archive.
499 Parameters
500 ----------
501 archive
502 Archive to read from.
503 bbox
504 Bounding box of a subimage to read instead.
505 **kwargs
506 Unsupported keyword arguments are accepted only to provide better
507 error messages (raising `serialization.InvalidParameterError`).
508 """
509 if kwargs:
510 raise InvalidParameterError(f"Unrecognized parameters for MaskedImage: {set(kwargs.keys())}.")
511 image = self.image.deserialize(archive, bbox=bbox)
512 mask = self.mask.deserialize(archive, bbox=bbox)
513 variance = self.variance.deserialize(archive, bbox=bbox)
514 projection = self.projection.deserialize(archive) if self.projection is not None else None
515 return MaskedImage(image, mask=mask, variance=variance, projection=projection)._finish_deserialize(
516 self
517 )
519 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
520 if component == "bbox" and kwargs:
521 raise InvalidParameterError(
522 f"Unrecognized parameters for MaskedImage.bbox: {set(kwargs.keys())}."
523 )
524 return super().deserialize_component(component, archive, **kwargs)