Coverage for python / lsst / images / _masked_image.py: 34%
179 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-15 08:42 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-15 08:42 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("MaskedImage", "MaskedImageSerializationModel")
16import functools
17from collections.abc import Mapping
18from contextlib import ExitStack
19from types import EllipsisType
20from typing import Any, Literal, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import Box
34from ._image import Image, ImageSerializationModel
35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel
36from ._transforms import Frame, Projection, ProjectionSerializationModel
37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive
38from .utils import is_none
41class MaskedImage(GeneralizedImage):
42 """A multi-plane image with data (image), mask, and variance planes.
44 Parameters
45 ----------
46 image
47 The main image plane. If this has a `Projection`, it will be used
48 for all planes unless a ``projection`` is passed separately.
49 mask
50 A bitmask image that annotates the main image plane. Must have the
51 same bounding box as ``image`` if provided. Any attached projection
52 is replaced (possibly by `None`).
53 variance
54 The per-pixel uncertainty of the main image as an image of variance
55 values. Must have the same bounding box as ``image`` if provided, and
56 its units must be the square of ``image.unit`` or `None`.
57 Values default to ``1.0``. Any attached projection is replaced
58 (possibly by `None`).
59 mask_schema
60 Schema for the mask plane. Must be provided if and only if ``mask`` is
61 not provided.
62 projection
63 Projection that maps the pixel grid to the sky.
64 obs_info
65 General information about the associated observation in standardized
66 form.
67 metadata
68 Arbitrary flexible metadata to associate with the image.
69 """
71 def __init__(
72 self,
73 image: Image,
74 *,
75 mask: Mask | None = None,
76 variance: Image | None = None,
77 mask_schema: MaskSchema | None = None,
78 projection: Projection | None = None,
79 obs_info: ObservationInfo | None = None,
80 metadata: dict[str, MetadataValue] | None = None,
81 ):
82 super().__init__(metadata)
83 if projection is None:
84 projection = image.projection
85 else:
86 image = image.view(projection=projection)
87 if obs_info is None:
88 obs_info = image.obs_info
89 else:
90 image = image.view(obs_info=obs_info)
91 if mask is None:
92 if mask_schema is None:
93 raise TypeError("'mask_schema' must be provided if 'mask' is not.")
94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info)
95 elif mask_schema is not None:
96 raise TypeError("'mask_schema' may not be provided if 'mask' is.")
97 else:
98 if image.bbox != mask.bbox:
99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.")
100 mask = mask.view(projection=projection, obs_info=obs_info)
101 if variance is None:
102 variance = Image(
103 1.0,
104 dtype=np.float32,
105 bbox=image.bbox,
106 unit=None if image.unit is None else image.unit**2,
107 projection=projection,
108 obs_info=obs_info,
109 )
110 else:
111 if image.bbox != variance.bbox:
112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.")
113 variance = variance.view(projection=projection, obs_info=obs_info)
114 if image.unit is None:
115 if variance.unit is not None:
116 raise ValueError(f"Image has no units but variance does ({variance.unit}).")
117 elif variance.unit is None:
118 variance = variance.view(unit=image.unit**2)
119 elif variance.unit != image.unit**2:
120 raise ValueError(
121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})."
122 )
123 self._image = image
124 self._mask = mask
125 self._variance = variance
127 @property
128 def image(self) -> Image:
129 """The main image plane (`~lsst.images.Image`)."""
130 return self._image
132 @property
133 def mask(self) -> Mask:
134 """The mask plane (`~lsst.images.Mask`)."""
135 return self._mask
137 @property
138 def variance(self) -> Image:
139 """The variance plane (`~lsst.images.Image`)."""
140 return self._variance
142 @property
143 def bbox(self) -> Box:
144 """The bounding box shared by all three image planes
145 (`~lsst.images.Box`).
146 """
147 return self._image.bbox
149 @property
150 def unit(self) -> astropy.units.UnitBase | None:
151 """The units of the image plane (`astropy.units.Unit` | `None`)."""
152 return self._image.unit
154 @property
155 def projection(self) -> Projection[Any] | None:
156 """The projection that maps the pixel grid to the sky
157 (`~lsst.images.Projection` | `None`).
158 """
159 return self._image.projection
161 @property
162 def obs_info(self) -> ObservationInfo | None:
163 """General information about the associated observation in standard
164 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
165 """
166 return self._image.obs_info
168 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage:
169 if bbox is ...:
170 return self
171 super().__getitem__(bbox)
172 return self._transfer_metadata(
173 MaskedImage(
174 # Projection and obs_info propagate from the image.
175 self.image[bbox],
176 mask=self.mask[bbox],
177 variance=self.variance[bbox],
178 ),
179 bbox=bbox,
180 )
182 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None:
183 self._image[bbox] = value.image
184 self._mask[bbox] = value.mask
185 self._variance[bbox] = value.variance
187 def __str__(self) -> str:
188 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})"
190 def __repr__(self) -> str:
191 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})"
193 def copy(self) -> MaskedImage:
194 """Deep-copy the masked image and metadata."""
195 return self._transfer_metadata(
196 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()),
197 copy=True,
198 )
200 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel:
201 """Serialize the masked image to an output archive.
203 Parameters
204 ----------
205 archive
206 Archive to write to.
207 """
208 serialized_image = archive.serialize_direct(
209 "image", functools.partial(self.image.serialize, save_projection=False, save_obs_info=False)
210 )
211 serialized_mask = archive.serialize_direct(
212 "mask", functools.partial(self.mask.serialize, save_projection=False, save_obs_info=False)
213 )
214 serialized_variance = archive.serialize_direct(
215 "variance", functools.partial(self.variance.serialize, save_projection=False, save_obs_info=False)
216 )
217 serialized_projection = (
218 archive.serialize_direct("projection", self.projection.serialize)
219 if self.projection is not None
220 else None
221 )
222 return MaskedImageSerializationModel(
223 image=serialized_image,
224 mask=serialized_mask,
225 variance=serialized_variance,
226 projection=serialized_projection,
227 obs_info=self.obs_info,
228 metadata=self.metadata,
229 )
231 @staticmethod
232 def _get_archive_tree_type[P: pydantic.BaseModel](
233 pointer_type: type[P],
234 ) -> type[MaskedImageSerializationModel[P]]:
235 """Return the serialization model type for this object for an archive
236 type that uses the given pointer type.
237 """
238 return MaskedImageSerializationModel[pointer_type] # type: ignore
240 def write_fits(
241 self,
242 filename: str,
243 *,
244 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
245 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
246 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
247 compression_seed: int | None = None,
248 ) -> None:
249 """Write the image to a FITS file.
251 Parameters
252 ----------
253 filename
254 Name of the file to write to. Must be a local file.
255 image_compression
256 Compression options for the `image` plane.
257 mask_compression
258 Compression options for the `mask` plane.
259 variance_compression
260 Compression options for the `variance` plane.
261 compression_seed
262 A FITS tile compression seed to use whenever the configured
263 compression seed is `None` or (for backwards compatibility) ``0``.
264 This value is then incremented every time it is used.
265 """
266 compression_options = {}
267 if image_compression is not fits.FitsCompressionOptions.DEFAULT:
268 compression_options["image"] = image_compression
269 if mask_compression is not fits.FitsCompressionOptions.DEFAULT:
270 compression_options["mask"] = mask_compression
271 if variance_compression is not fits.FitsCompressionOptions.DEFAULT:
272 compression_options["variance"] = variance_compression
273 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed)
275 @classmethod
276 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage:
277 """Read an image from a FITS file.
279 Parameters
280 ----------
281 url
282 URL of the file to read; may be any type supported by
283 `lsst.resources.ResourcePath`.
284 bbox
285 Bounding box of a subimage to read instead.
286 """
287 return fits.read(cls, url, bbox=bbox).deserialized
289 @staticmethod
290 def from_legacy(
291 legacy: Any,
292 *,
293 unit: astropy.units.Unit | None = None,
294 plane_map: Mapping[str, MaskPlane] | None = None,
295 ) -> MaskedImage:
296 """Convert from an `lsst.afw.image.MaskedImage` instance.
298 Parameters
299 ----------
300 legacy
301 An `lsst.afw.image.MaskedImage` instance that will share image and
302 variance (but not mask) pixel data with the returned object.
303 unit
304 Units of the image.
305 plane_map
306 A mapping from legacy mask plane name to the new plane name and
307 description.
308 """
309 return MaskedImage(
310 image=Image.from_legacy(legacy.getImage(), unit),
311 mask=Mask.from_legacy(legacy.getMask(), plane_map),
312 variance=Image.from_legacy(legacy.getVariance()),
313 )
315 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
316 """Convert to an `lsst.afw.image.MaskedImage` instance.
318 Parameters
319 ----------
320 copy
321 If `True`, always copy the image and variance pixel data.
322 If `False`, return a view, and raise `TypeError` if the pixel data
323 is read-only (this is not supported by afw). If `None`, onyl if
324 the pixel data is read-only. Mask pixel data is always copied.
325 plane_map
326 A mapping from legacy mask plane name to the new plane name and
327 description.
328 """
329 import lsst.afw.image
331 return lsst.afw.image.MaskedImage(
332 self.image.to_legacy(copy=copy),
333 mask=self.mask.to_legacy(plane_map),
334 variance=self.variance.to_legacy(copy=copy),
335 dtype=self.image.array.dtype,
336 )
338 @overload
339 @staticmethod
340 def read_legacy( 340 ↛ exitline 340 didn't return from function 'read_legacy' because
341 uri: ResourcePathExpression,
342 *,
343 preserve_quantization: bool = False,
344 component: Literal["image"],
345 fits_wcs_frame: Frame | None = None,
346 ) -> Image: ...
348 @overload
349 @staticmethod
350 def read_legacy( 350 ↛ exitline 350 didn't return from function 'read_legacy' because
351 uri: ResourcePathExpression,
352 *,
353 plane_map: Mapping[str, MaskPlane] | None = None,
354 component: Literal["mask"],
355 fits_wcs_frame: Frame | None = None,
356 ) -> Mask: ...
358 @overload
359 @staticmethod
360 def read_legacy( 360 ↛ exitline 360 didn't return from function 'read_legacy' because
361 uri: ResourcePathExpression,
362 *,
363 preserve_quantization: bool = False,
364 component: Literal["variance"],
365 fits_wcs_frame: Frame | None = None,
366 ) -> Image: ...
368 @overload
369 @staticmethod
370 def read_legacy( 370 ↛ exitline 370 didn't return from function 'read_legacy' because
371 uri: ResourcePathExpression,
372 *,
373 preserve_quantization: bool = False,
374 plane_map: Mapping[str, MaskPlane] | None = None,
375 component: None = None,
376 fits_wcs_frame: Frame | None = None,
377 ) -> MaskedImage: ...
379 @staticmethod
380 def read_legacy(
381 uri: ResourcePathExpression,
382 *,
383 preserve_quantization: bool = False,
384 plane_map: Mapping[str, MaskPlane] | None = None,
385 component: Literal["image", "mask", "variance"] | None = None,
386 fits_wcs_frame: Frame | None = None,
387 ) -> Any:
388 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`.
390 Parameters
391 ----------
392 uri
393 URI or file name.
394 preserve_quantization
395 If `True`, ensure that writing the masked image back out again will
396 exactly preserve quantization-compressed pixel values. This causes
397 the image and variance plane arrays to be marked as read-only and
398 stores the original binary table data for those planes in memory.
399 If the `~lsst.images.MaskedImage` is copied, the precompressed
400 pixel values are not transferred to the copy.
401 plane_map
402 A mapping from legacy mask plane name to the new plane name and
403 description.
404 component
405 A component to read instead of the full image.
406 fits_wcs_frame
407 If not `None` and the HDU containing the image plane has a FITS
408 WCS, attach a `~lsst.images.Projection` to the returned masked
409 image by converting that WCS. When ``component`` is one of
410 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the
411 component HDU is used instead (all three should have the same WCS).
412 """
413 fs, fspath = ResourcePath(uri).to_fsspec()
414 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
415 return MaskedImage._read_legacy_hdus(
416 hdu_list,
417 uri,
418 preserve_quantization=preserve_quantization,
419 plane_map=plane_map,
420 component=component,
421 fits_wcs_frame=fits_wcs_frame,
422 )
424 @staticmethod
425 def _read_legacy_hdus(
426 hdu_list: astropy.io.fits.HDUList,
427 uri: ResourcePathExpression,
428 *,
429 preserve_quantization: bool = False,
430 plane_map: Mapping[str, MaskPlane] | None = None,
431 component: Literal["image", "mask", "variance"] | None,
432 fits_wcs_frame: Frame | None = None,
433 ) -> Any:
434 opaque_metadata = fits.FitsOpaqueMetadata()
435 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
436 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
437 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
438 result: Any
439 with ExitStack() as exit_stack:
440 if preserve_quantization:
441 fs, fspath = ResourcePath(uri).to_fsspec()
442 bintable_stream = exit_stack.enter_context(fs.open(fspath))
443 bintable_hdu_list = exit_stack.enter_context(
444 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
445 )
446 image_bintable_hdu = bintable_hdu_list[1]
447 variance_bintable_hdu = bintable_hdu_list[3]
448 if component is None or component == "image":
449 image = Image._read_legacy_hdu(
450 hdu_list[1],
451 opaque_metadata,
452 preserve_bintable=image_bintable_hdu,
453 fits_wcs_frame=fits_wcs_frame,
454 )
455 if component == "image":
456 result = image
457 if component is None or component == "mask":
458 mask = Mask._read_legacy_hdu(
459 hdu_list[2],
460 opaque_metadata,
461 plane_map=plane_map,
462 fits_wcs_frame=fits_wcs_frame if component is not None else None,
463 )
464 if component == "mask":
465 result = mask
466 if component is None or component == "variance":
467 variance = Image._read_legacy_hdu(
468 hdu_list[3],
469 opaque_metadata,
470 preserve_bintable=variance_bintable_hdu,
471 fits_wcs_frame=fits_wcs_frame if component is not None else None,
472 )
473 if component == "variance":
474 result = variance
475 if component is None:
476 result = MaskedImage(image, mask=mask, variance=variance)
477 result._opaque_metadata = opaque_metadata
478 return result
481class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
482 """A Pydantic model used to represent a serialized `MaskedImage`."""
484 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
485 mask: MaskSerializationModel[P] = pydantic.Field(
486 description="Bitmask that annotates the main image's pixels."
487 )
488 variance: ImageSerializationModel[P] = pydantic.Field(
489 description="Per-pixel variance estimates for the main image."
490 )
491 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
492 default=None,
493 exclude_if=is_none,
494 description="Projection that maps the pixel grid to the sky.",
495 )
496 obs_info: ObservationInfo | None = pydantic.Field(
497 default=None,
498 exclude_if=is_none,
499 description="Standardized description of image metadata",
500 )
502 @property
503 def bbox(self) -> Box:
504 """The bounding box of the image."""
505 return self.image.bbox
507 def deserialize(self, archive: InputArchive[Any], *, bbox: Box | None = None) -> MaskedImage:
508 """Deserialize an image from an input archive.
510 Parameters
511 ----------
512 archive
513 Archive to read from.
514 bbox
515 Bounding box of a subimage to read instead.
516 """
517 image = self.image.deserialize(archive, bbox=bbox)
518 mask = self.mask.deserialize(archive, bbox=bbox)
519 variance = self.variance.deserialize(archive, bbox=bbox)
520 projection = self.projection.deserialize(archive) if self.projection is not None else None
521 return MaskedImage(
522 image, mask=mask, variance=variance, projection=projection, obs_info=self.obs_info
523 )._finish_deserialize(self)