Coverage for python / lsst / images / _masked_image.py: 33%
178 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("MaskedImage", "MaskedImageSerializationModel")
16import functools
17from collections.abc import Mapping
18from contextlib import ExitStack
19from types import EllipsisType
20from typing import Any, Literal, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
28from lsst.resources import ResourcePath, ResourcePathExpression
30from . import fits
31from ._generalized_image import GeneralizedImage
32from ._geom import Box
33from ._image import Image, ImageSerializationModel
34from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel
35from ._transforms import Frame, Projection, ProjectionSerializationModel
36from .serialization import ArchiveTree, InputArchive, InvalidParameterError, MetadataValue, OutputArchive
37from .utils import is_none
40class MaskedImage(GeneralizedImage):
41 """A multi-plane image with data (image), mask, and variance planes.
43 Parameters
44 ----------
45 image
46 The main image plane. If this has a `Projection`, it will be used
47 for all planes unless a ``projection`` is passed separately.
48 mask
49 A bitmask image that annotates the main image plane. Must have the
50 same bounding box as ``image`` if provided. Any attached projection
51 is replaced (possibly by `None`).
52 variance
53 The per-pixel uncertainty of the main image as an image of variance
54 values. Must have the same bounding box as ``image`` if provided, and
55 its units must be the square of ``image.unit`` or `None`.
56 Values default to ``1.0``. Any attached projection is replaced
57 (possibly by `None`).
58 mask_schema
59 Schema for the mask plane. Must be provided if and only if ``mask`` is
60 not provided.
61 projection
62 Projection that maps the pixel grid to the sky.
63 metadata
64 Arbitrary flexible metadata to associate with the image.
65 """
67 def __init__(
68 self,
69 image: Image,
70 *,
71 mask: Mask | None = None,
72 variance: Image | None = None,
73 mask_schema: MaskSchema | None = None,
74 projection: Projection | None = None,
75 metadata: dict[str, MetadataValue] | None = None,
76 ):
77 super().__init__(metadata)
78 if projection is None:
79 projection = image.projection
80 else:
81 image = image.view(projection=projection)
82 if mask is None:
83 if mask_schema is None:
84 raise TypeError("'mask_schema' must be provided if 'mask' is not.")
85 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection)
86 elif mask_schema is not None:
87 raise TypeError("'mask_schema' may not be provided if 'mask' is.")
88 else:
89 if image.bbox != mask.bbox:
90 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.")
91 mask = mask.view(projection=projection)
92 if variance is None:
93 variance = Image(
94 1.0,
95 dtype=np.float32,
96 bbox=image.bbox,
97 unit=None if image.unit is None else image.unit**2,
98 projection=projection,
99 )
100 else:
101 if image.bbox != variance.bbox:
102 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.")
103 variance = variance.view(projection=projection)
104 if image.unit is None:
105 if variance.unit is not None:
106 raise ValueError(f"Image has no units but variance does ({variance.unit}).")
107 elif variance.unit is None:
108 variance = variance.view(unit=image.unit**2)
109 elif variance.unit != image.unit**2:
110 raise ValueError(
111 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})."
112 )
113 self._image = image
114 self._mask = mask
115 self._variance = variance
117 @property
118 def image(self) -> Image:
119 """The main image plane (`~lsst.images.Image`)."""
120 return self._image
122 @property
123 def mask(self) -> Mask:
124 """The mask plane (`~lsst.images.Mask`)."""
125 return self._mask
127 @property
128 def variance(self) -> Image:
129 """The variance plane (`~lsst.images.Image`)."""
130 return self._variance
132 @property
133 def bbox(self) -> Box:
134 """The bounding box shared by all three image planes
135 (`~lsst.images.Box`).
136 """
137 return self._image.bbox
139 @property
140 def unit(self) -> astropy.units.UnitBase | None:
141 """The units of the image plane (`astropy.units.Unit` | `None`)."""
142 return self._image.unit
144 @property
145 def projection(self) -> Projection[Any] | None:
146 """The projection that maps the pixel grid to the sky
147 (`~lsst.images.Projection` | `None`).
148 """
149 return self._image.projection
151 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage:
152 if bbox is ...:
153 return self
154 super().__getitem__(bbox)
155 return self._transfer_metadata(
156 MaskedImage(
157 # Projection and obs_info propagate from the image.
158 self.image[bbox],
159 mask=self.mask[bbox],
160 variance=self.variance[bbox],
161 ),
162 bbox=bbox,
163 )
165 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None:
166 self._image[bbox] = value.image
167 self._mask[bbox] = value.mask
168 self._variance[bbox] = value.variance
170 def __str__(self) -> str:
171 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})"
173 def __repr__(self) -> str:
174 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})"
176 def copy(self) -> MaskedImage:
177 """Deep-copy the masked image and metadata."""
178 return self._transfer_metadata(
179 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()),
180 copy=True,
181 )
183 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel:
184 """Serialize the masked image to an output archive.
186 Parameters
187 ----------
188 archive
189 Archive to write to.
190 """
191 serialized_image = archive.serialize_direct(
192 "image", functools.partial(self.image.serialize, save_projection=False)
193 )
194 serialized_mask = archive.serialize_direct(
195 "mask", functools.partial(self.mask.serialize, save_projection=False)
196 )
197 serialized_variance = archive.serialize_direct(
198 "variance", functools.partial(self.variance.serialize, save_projection=False)
199 )
200 serialized_projection = (
201 archive.serialize_direct("projection", self.projection.serialize)
202 if self.projection is not None
203 else None
204 )
205 return MaskedImageSerializationModel(
206 image=serialized_image,
207 mask=serialized_mask,
208 variance=serialized_variance,
209 projection=serialized_projection,
210 metadata=self.metadata,
211 )
213 @staticmethod
214 def _get_archive_tree_type[P: pydantic.BaseModel](
215 pointer_type: type[P],
216 ) -> type[MaskedImageSerializationModel[P]]:
217 """Return the serialization model type for this object for an archive
218 type that uses the given pointer type.
219 """
220 return MaskedImageSerializationModel[pointer_type] # type: ignore
222 def write_fits(
223 self,
224 filename: str,
225 *,
226 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
227 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
228 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
229 compression_seed: int | None = None,
230 ) -> None:
231 """Write the image to a FITS file.
233 Parameters
234 ----------
235 filename
236 Name of the file to write to. Must be a local file.
237 image_compression
238 Compression options for the `image` plane.
239 mask_compression
240 Compression options for the `mask` plane.
241 variance_compression
242 Compression options for the `variance` plane.
243 compression_seed
244 A FITS tile compression seed to use whenever the configured
245 compression seed is `None` or (for backwards compatibility) ``0``.
246 This value is then incremented every time it is used.
247 """
248 compression_options = {}
249 if image_compression is not fits.FitsCompressionOptions.DEFAULT:
250 compression_options["image"] = image_compression
251 if mask_compression is not fits.FitsCompressionOptions.DEFAULT:
252 compression_options["mask"] = mask_compression
253 if variance_compression is not fits.FitsCompressionOptions.DEFAULT:
254 compression_options["variance"] = variance_compression
255 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed)
257 @classmethod
258 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage:
259 """Read an image from a FITS file.
261 Parameters
262 ----------
263 url
264 URL of the file to read; may be any type supported by
265 `lsst.resources.ResourcePath`.
266 bbox
267 Bounding box of a subimage to read instead.
268 """
269 return fits.read(cls, url, bbox=bbox).deserialized
271 @staticmethod
272 def from_legacy(
273 legacy: Any,
274 *,
275 unit: astropy.units.UnitBase | None = None,
276 plane_map: Mapping[str, MaskPlane] | None = None,
277 ) -> MaskedImage:
278 """Convert from an `lsst.afw.image.MaskedImage` instance.
280 Parameters
281 ----------
282 legacy
283 An `lsst.afw.image.MaskedImage` instance that will share image and
284 variance (but not mask) pixel data with the returned object.
285 unit
286 Units of the image.
287 plane_map
288 A mapping from legacy mask plane name to the new plane name and
289 description.
290 """
291 return MaskedImage(
292 image=Image.from_legacy(legacy.getImage(), unit),
293 mask=Mask.from_legacy(legacy.getMask(), plane_map),
294 variance=Image.from_legacy(legacy.getVariance()),
295 )
297 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
298 """Convert to an `lsst.afw.image.MaskedImage` instance.
300 Parameters
301 ----------
302 copy
303 If `True`, always copy the image and variance pixel data.
304 If `False`, return a view, and raise `TypeError` if the pixel data
305 is read-only (this is not supported by afw). If `None`, onyl if
306 the pixel data is read-only. Mask pixel data is always copied.
307 plane_map
308 A mapping from legacy mask plane name to the new plane name and
309 description.
310 """
311 import lsst.afw.image
313 return lsst.afw.image.MaskedImage(
314 self.image.to_legacy(copy=copy),
315 mask=self.mask.to_legacy(plane_map),
316 variance=self.variance.to_legacy(copy=copy),
317 dtype=self.image.array.dtype,
318 )
320 @overload
321 @staticmethod
322 def read_legacy( 322 ↛ exitline 322 didn't return from function 'read_legacy' because
323 uri: ResourcePathExpression,
324 *,
325 preserve_quantization: bool = False,
326 component: Literal["image"],
327 fits_wcs_frame: Frame | None = None,
328 ) -> Image: ...
330 @overload
331 @staticmethod
332 def read_legacy( 332 ↛ exitline 332 didn't return from function 'read_legacy' because
333 uri: ResourcePathExpression,
334 *,
335 plane_map: Mapping[str, MaskPlane] | None = None,
336 component: Literal["mask"],
337 fits_wcs_frame: Frame | None = None,
338 ) -> Mask: ...
340 @overload
341 @staticmethod
342 def read_legacy( 342 ↛ exitline 342 didn't return from function 'read_legacy' because
343 uri: ResourcePathExpression,
344 *,
345 preserve_quantization: bool = False,
346 component: Literal["variance"],
347 fits_wcs_frame: Frame | None = None,
348 ) -> Image: ...
350 @overload
351 @staticmethod
352 def read_legacy( 352 ↛ exitline 352 didn't return from function 'read_legacy' because
353 uri: ResourcePathExpression,
354 *,
355 preserve_quantization: bool = False,
356 plane_map: Mapping[str, MaskPlane] | None = None,
357 component: None = None,
358 fits_wcs_frame: Frame | None = None,
359 ) -> MaskedImage: ...
361 @staticmethod
362 def read_legacy(
363 uri: ResourcePathExpression,
364 *,
365 preserve_quantization: bool = False,
366 plane_map: Mapping[str, MaskPlane] | None = None,
367 component: Literal["image", "mask", "variance"] | None = None,
368 fits_wcs_frame: Frame | None = None,
369 ) -> Any:
370 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`.
372 Parameters
373 ----------
374 uri
375 URI or file name.
376 preserve_quantization
377 If `True`, ensure that writing the masked image back out again will
378 exactly preserve quantization-compressed pixel values. This causes
379 the image and variance plane arrays to be marked as read-only and
380 stores the original binary table data for those planes in memory.
381 If the `~lsst.images.MaskedImage` is copied, the precompressed
382 pixel values are not transferred to the copy.
383 plane_map
384 A mapping from legacy mask plane name to the new plane name and
385 description.
386 component
387 A component to read instead of the full image.
388 fits_wcs_frame
389 If not `None` and the HDU containing the image plane has a FITS
390 WCS, attach a `~lsst.images.Projection` to the returned masked
391 image by converting that WCS. When ``component`` is one of
392 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the
393 component HDU is used instead (all three should have the same WCS).
394 """
395 fs, fspath = ResourcePath(uri).to_fsspec()
396 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
397 return MaskedImage._read_legacy_hdus(
398 hdu_list,
399 uri,
400 preserve_quantization=preserve_quantization,
401 plane_map=plane_map,
402 component=component,
403 fits_wcs_frame=fits_wcs_frame,
404 )
406 @staticmethod
407 def _read_legacy_hdus(
408 hdu_list: astropy.io.fits.HDUList,
409 uri: ResourcePathExpression,
410 *,
411 opaque_metadata: fits.FitsOpaqueMetadata | None = None,
412 preserve_quantization: bool = False,
413 plane_map: Mapping[str, MaskPlane] | None = None,
414 component: Literal["image", "mask", "variance"] | None,
415 fits_wcs_frame: Frame | None = None,
416 ) -> Any:
417 if opaque_metadata is None:
418 opaque_metadata = fits.FitsOpaqueMetadata()
419 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
420 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
421 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
422 result: Any
423 with ExitStack() as exit_stack:
424 if preserve_quantization:
425 fs, fspath = ResourcePath(uri).to_fsspec()
426 bintable_stream = exit_stack.enter_context(fs.open(fspath))
427 bintable_hdu_list = exit_stack.enter_context(
428 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
429 )
430 image_bintable_hdu = bintable_hdu_list[1]
431 variance_bintable_hdu = bintable_hdu_list[3]
432 if component is None or component == "image":
433 image = Image._read_legacy_hdu(
434 hdu_list[1],
435 opaque_metadata,
436 preserve_bintable=image_bintable_hdu,
437 fits_wcs_frame=fits_wcs_frame,
438 )
439 if component == "image":
440 result = image
441 if component is None or component == "mask":
442 mask = Mask._read_legacy_hdu(
443 hdu_list[2],
444 opaque_metadata,
445 plane_map=plane_map,
446 fits_wcs_frame=fits_wcs_frame if component is not None else None,
447 )
448 if component == "mask":
449 result = mask
450 if component is None or component == "variance":
451 variance = Image._read_legacy_hdu(
452 hdu_list[3],
453 opaque_metadata,
454 preserve_bintable=variance_bintable_hdu,
455 fits_wcs_frame=fits_wcs_frame if component is not None else None,
456 )
457 if component == "variance":
458 result = variance
459 if component is None:
460 result = MaskedImage(image, mask=mask, variance=variance)
461 result._opaque_metadata = opaque_metadata
462 return result
465class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
466 """A Pydantic model used to represent a serialized `MaskedImage`."""
468 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
469 mask: MaskSerializationModel[P] = pydantic.Field(
470 description="Bitmask that annotates the main image's pixels."
471 )
472 variance: ImageSerializationModel[P] = pydantic.Field(
473 description="Per-pixel variance estimates for the main image."
474 )
475 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
476 default=None,
477 exclude_if=is_none,
478 description="Projection that maps the pixel grid to the sky.",
479 )
481 @property
482 def bbox(self) -> Box:
483 """The bounding box of the image."""
484 return self.image.bbox
486 def deserialize(
487 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any
488 ) -> MaskedImage:
489 """Deserialize an image from an input archive.
491 Parameters
492 ----------
493 archive
494 Archive to read from.
495 bbox
496 Bounding box of a subimage to read instead.
497 **kwargs
498 Unsupported keyword arguments are accepted only to provide better
499 error messages (raising `serialization.InvalidParameterError`).
500 """
501 if kwargs:
502 raise InvalidParameterError(f"Unrecognized parameters for MaskedImage: {set(kwargs.keys())}.")
503 image = self.image.deserialize(archive, bbox=bbox)
504 mask = self.mask.deserialize(archive, bbox=bbox)
505 variance = self.variance.deserialize(archive, bbox=bbox)
506 projection = self.projection.deserialize(archive) if self.projection is not None else None
507 return MaskedImage(image, mask=mask, variance=variance, projection=projection)._finish_deserialize(
508 self
509 )
511 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
512 if component == "bbox" and kwargs:
513 raise InvalidParameterError(
514 f"Unrecognized parameters for MaskedImage.bbox: {set(kwargs.keys())}."
515 )
516 return super().deserialize_component(component, archive, **kwargs)