Coverage for python / lsst / images / _mask.py: 22%
389 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:27 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:27 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "Mask",
16 "MaskPlane",
17 "MaskPlaneBit",
18 "MaskSchema",
19 "MaskSerializationModel",
20 "get_legacy_deep_coadd_mask_planes",
21 "get_legacy_visit_image_mask_planes",
22)
24import dataclasses
25import math
26from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
27from types import EllipsisType
28from typing import TYPE_CHECKING, Any, ClassVar, cast
30import astropy.io.fits
31import astropy.wcs
32import numpy as np
33import numpy.typing as npt
34import pydantic
36from lsst.resources import ResourcePath, ResourcePathExpression
38from . import fits
39from ._generalized_image import GeneralizedImage
40from ._geom import YX, Box, NoOverlapError
41from ._transforms import Frame, Projection, ProjectionSerializationModel
42from .serialization import (
43 ArchiveReadError,
44 ArchiveTree,
45 ArrayReferenceModel,
46 InlineArrayModel,
47 InputArchive,
48 IntegerType,
49 InvalidParameterError,
50 MetadataValue,
51 NumberType,
52 OutputArchive,
53 is_integer,
54 no_header_updates,
55)
56from .utils import is_none
58if TYPE_CHECKING:
59 try:
60 from lsst.afw.image import Mask as LegacyMask
61 except ImportError:
62 type LegacyMask = Any # type: ignore[no-redef]
65@dataclasses.dataclass(frozen=True)
66class MaskPlane:
67 """Name and description of a single plane in a mask array."""
69 name: str
70 """Unique name for the mask plane (`str`)."""
72 description: str
73 """Human-readable documentation for the mask plane (`str`)."""
75 @classmethod
76 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]:
77 """Read mask plane descriptions written by
78 `lsst.afw.image.Mask.writeFits`.
80 Parameters
81 ----------
82 header
83 FITS header.
85 Returns
86 -------
87 `dict` [`str`, `int`]
88 A dictionary mapping mask plane name to integer bit index.
89 """
90 result: dict[str, int] = {}
91 for card in list(header.cards):
92 if card.keyword.startswith("MP_"):
93 result[card.keyword.removeprefix("MP_")] = card.value
94 del header[card.keyword]
95 return result
98@dataclasses.dataclass(frozen=True)
99class MaskPlaneBit:
100 """The nested array index and mask value associated with a single mask
101 plane.
102 """
104 index: int
105 """Index into the last dimension of the mask array where this plane's bit
106 is stored.
107 """
109 mask: np.integer
110 """Bitmask that selects just this plane's bit from a mask array value
111 (`numpy.integer`).
112 """
114 @classmethod
115 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit:
116 """Construct a `MaskPlaneBit` from the overall index of a plane in a
117 `MaskSchema` and the stride (number of bits per mask array element).
118 """
119 index, bit = divmod(overall_index, stride)
120 return cls(index, mask_type(1 << bit))
123class MaskSchema:
124 """A schema for a bit-packed mask array.
126 Parameters
127 ----------
128 planes
129 Iterable of `MaskPlane` instances that define the schema. `None`
130 values may be included to reserve bits for future use.
131 dtype
132 The numpy data type of the mask arrays that use this schema.
134 Notes
135 -----
136 A `MaskSchema` is a collection of mask planes, which each correspond to a
137 single bit in a mask array. Mask schemas are immutable and associated with
138 a particular array data type, allowing them to safely precompute the index
139 and bitmask for each plane.
141 `MaskSchema` indexing is by integer (the overall index of a plane in the
142 schema). The `descriptions` attribute may be indexed by plane name to get
143 the description for that plane, and the `bitmask` method can be used to
144 obtain an array that can be used to select one or more planes by name in
145 a mask array that uses this schema.
147 If no mask planes are provided, a `None` placeholder is automatically
148 added.
149 """
151 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8):
152 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,)
153 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype))
154 stride = self.bits_per_element(self._dtype)
155 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None}
156 self._mask_size = math.ceil(len(self._planes) / stride)
157 self._bits: dict[str, MaskPlaneBit] = {
158 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type)
159 for n, plane in enumerate(self._planes)
160 if plane is not None
161 }
163 @staticmethod
164 def bits_per_element(dtype: npt.DTypeLike) -> int:
165 """Return the number of mask bits per array element for the given
166 data type.
167 """
168 dtype = np.dtype(dtype)
169 match dtype.kind:
170 case "u":
171 return dtype.itemsize * 8
172 case "i":
173 return dtype.itemsize * 8 - 1
174 case _:
175 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.")
177 def __iter__(self) -> Iterator[MaskPlane | None]:
178 return iter(self._planes)
180 def __len__(self) -> int:
181 return len(self._planes)
183 def __getitem__(self, i: int) -> MaskPlane | None:
184 return self._planes[i]
186 def __repr__(self) -> str:
187 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})"
189 def __str__(self) -> str:
190 return "\n".join(
191 [
192 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}"
193 for name, bit in self._bits.items()
194 ]
195 )
197 def __eq__(self, other: object) -> bool:
198 if isinstance(other, MaskSchema):
199 return self._planes == other._planes and self._dtype == other._dtype
200 return False
202 @property
203 def dtype(self) -> np.dtype:
204 """The numpy data type of the mask arrays that use this schema."""
205 return self._dtype
207 @property
208 def mask_size(self) -> int:
209 """The number of elements in the last dimension of any mask array that
210 uses this schema.
211 """
212 return self._mask_size
214 @property
215 def names(self) -> Set[str]:
216 """The names of the mask planes, in bit order."""
217 return self._bits.keys()
219 @property
220 def descriptions(self) -> Mapping[str, str]:
221 """A mapping from plane name to description."""
222 return self._descriptions
224 def bit(self, plane: str) -> MaskPlaneBit:
225 """Return the last array index and mask for the given mask plane."""
226 return self._bits[plane]
228 def bitmask(self, *planes: str) -> np.ndarray:
229 """Return a 1-d mask array that represents the union (i.e. bitwise OR)
230 of the planes with the given names.
232 Parameters
233 ----------
234 *planes
235 Mask plane names.
237 Returns
238 -------
239 numpy.ndarray
240 A 1-d array with shape ``(mask_size,)``.
241 """
242 result = np.zeros(self.mask_size, dtype=self._dtype)
243 for plane in planes:
244 bit = self._bits[plane]
245 result[bit.index] |= bit.mask
246 return result
248 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]:
249 """Split the schema into an equivalent series of schemas that each
250 have a `mask_size` of ``1``, dropping all `None` placeholders.
252 Parameters
253 ----------
254 dtype
255 Data type of the new mask pixels.
257 Returns
258 -------
259 `list` [`MaskSchema`]
260 A list of mask schemas that together include all planes in
261 ``self`` and have `mask_size` equal to ``1``. If there are no
262 mask planes (only `None` placeholders) in ``self``, a single mask
263 schema with a `None` placeholder is returned; otherwise `None`
264 placeholders are returned.
265 """
266 dtype = np.dtype(dtype)
267 planes: list[MaskPlane] = []
268 schemas: list[MaskSchema] = []
269 n_planes_per_schema = self.bits_per_element(dtype)
270 for plane in self._planes:
271 if plane is not None:
272 planes.append(plane)
273 if len(planes) == n_planes_per_schema:
274 schemas.append(MaskSchema(planes, dtype=dtype))
275 planes.clear()
276 if planes:
277 schemas.append(MaskSchema(planes, dtype=dtype))
278 if not schemas:
279 schemas.append(MaskSchema([None], dtype=dtype))
280 return schemas
282 def update_header(self, header: astropy.io.fits.Header) -> None:
283 """Add a description of this mask schema to a FITS header."""
284 for n, plane in enumerate(self):
285 if plane is not None:
286 bit = self.bit(plane.name)
287 if bit.index != 0:
288 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.")
289 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.")
290 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.")
291 # We don't add a comment to the description card, because it's
292 # likely to overrun a single card and get the CONTINUE
293 # treatment. That will cause Astropy to warn about the comment
294 # being truncated and that's worse than just leaving it
295 # unexplained; it's pretty obvious from context what it is.
296 header.set(f"MSKD{n:04d}", plane.description)
298 def strip_header(self, header: astropy.io.fits.Header) -> None:
299 """Remove all header cards added by `update_header`."""
300 for n, plane in enumerate(self):
301 if plane is not None:
302 header.remove(f"MSKN{n:04d}", ignore_missing=True)
303 header.remove(f"MSKM{n:04d}", ignore_missing=True)
304 header.remove(f"MSKD{n:04d}", ignore_missing=True)
307class Mask(GeneralizedImage):
308 """A 2-d bitmask image backed by a 3-d byte array.
310 Parameters
311 ----------
312 array_or_fill
313 Array or fill value for the mask. If a fill value, ``bbox`` or
314 ``shape`` must be provided.
315 schema
316 Schema that defines the planes and their bit assignments.
317 bbox
318 Bounding box for the mask. This sets the shape of the first two
319 dimensions of the array.
320 start
321 Logical coordinates of the first pixel in the array, ordered ``y``,
322 ``x`` (unless an `XY` instance is passed). Ignored if
323 ``bbox`` is provided. Defaults to zeros.
324 shape
325 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
326 instance is passed). Only needed if ``array_or_fill`` is not an
327 array and ``bbox`` is not provided. Like the bbox, this does not
328 include the last dimension of the array.
329 projection
330 Projection that maps the pixel grid to the sky.
331 metadata
332 Arbitrary flexible metadata to associate with the mask.
334 Notes
335 -----
336 Indexing the `array` attribute of a `Mask` does not take into account its
337 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with
338 a `Box` does, and the `bbox` of the subimage is set to match its location
339 within the original mask.
341 A mask's ``bbox`` corresponds to the leading dimensions of its backing
342 `numpy.ndarray`, while the last dimension's size is always equal to the
343 `~MaskSchema.mask_size` of its schema, since a schema can in general
344 require multiple array elements to represent all of its planes.
345 """
347 def __init__(
348 self,
349 array_or_fill: np.ndarray | int = 0,
350 /,
351 *,
352 schema: MaskSchema,
353 bbox: Box | None = None,
354 start: Sequence[int] | None = None,
355 shape: Sequence[int] | None = None,
356 projection: Projection | None = None,
357 metadata: dict[str, MetadataValue] | None = None,
358 ):
359 super().__init__(metadata)
360 if shape is not None:
361 shape = tuple(shape)
362 if start is not None:
363 start = tuple(start)
364 if isinstance(array_or_fill, np.ndarray):
365 array = np.array(array_or_fill, dtype=schema.dtype, copy=None)
366 if array.ndim != 3:
367 raise ValueError("Mask array must be 3-d.")
368 if bbox is None:
369 bbox = Box.from_shape(array.shape[:-1], start=start)
370 elif bbox.shape + (schema.mask_size,) != array.shape:
371 raise ValueError(
372 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not "
373 f"match array with shape {array.shape}."
374 )
375 if shape is not None and shape + (schema.mask_size,) != array.shape:
376 raise ValueError(
377 f"Explicit shape {shape} and schema of size {schema.mask_size} do "
378 f"not match array with shape {array.shape}."
379 )
381 else:
382 if bbox is None:
383 if shape is None:
384 raise TypeError("No bbox, size, or array provided.")
385 bbox = Box.from_shape(shape, start=start)
386 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype)
387 self._array = array
388 self._bbox: Box = bbox
389 self._schema: MaskSchema = schema
390 self._projection = projection
392 @property
393 def array(self) -> np.ndarray:
394 """The low-level array (`numpy.ndarray`).
396 Assigning to this attribute modifies the existing array in place; the
397 bounding box and underlying data pointer are never changed.
398 """
399 return self._array
401 @array.setter
402 def array(self, value: np.ndarray | int) -> None:
403 self._array[:, :] = value
405 @property
406 def schema(self) -> MaskSchema:
407 """Schema that defines the planes and their bit assignments
408 (`MaskSchema`).
409 """
410 return self._schema
412 @property
413 def bbox(self) -> Box:
414 """2-d bounding box of the mask (`Box`).
416 This sets the shape of the first two dimensions of the array.
417 """
418 return self._bbox
420 @property
421 def projection(self) -> Projection[Any] | None:
422 """The projection that maps this mask's pixel grid to the sky
423 (`Projection` | `None`).
425 Notes
426 -----
427 The pixel coordinates used by this projection account for the bounding
428 box ``start``; they are not just array indices.
429 """
430 return self._projection
432 def __getitem__(self, bbox: Box | EllipsisType) -> Mask:
433 if bbox is ...:
434 return self
435 super().__getitem__(bbox)
436 return self._transfer_metadata(
437 Mask(
438 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :],
439 bbox=bbox,
440 schema=self.schema,
441 projection=self._projection,
442 ),
443 bbox=bbox,
444 )
446 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None:
447 subview = self[bbox]
448 subview.clear()
449 subview.update(value)
451 def __str__(self) -> str:
452 return f"Mask({self.bbox!s}, {list(self.schema.names)})"
454 def __repr__(self) -> str:
455 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})"
457 def __eq__(self, other: object) -> bool:
458 if not isinstance(other, Mask):
459 return NotImplemented
460 return (
461 self._bbox == other._bbox
462 and self._schema == other._schema
463 and np.array_equal(self._array, other._array, equal_nan=True)
464 )
466 def copy(self) -> Mask:
467 """Deep-copy the mask and metadata."""
468 return self._transfer_metadata(
469 Mask(self._array.copy(), bbox=self._bbox, schema=self._schema, projection=self._projection),
470 copy=True,
471 )
473 def view(
474 self,
475 *,
476 schema: MaskSchema | EllipsisType = ...,
477 projection: Projection | None | EllipsisType = ...,
478 start: Sequence[int] | EllipsisType = ...,
479 ) -> Mask:
480 """Make a view of the mask, with optional updates.
482 Notes
483 -----
484 This can only be used to make changes to schema descriptions; plane
485 names must remain the same (in the same order).
486 """
487 if schema is ...:
488 schema = self._schema
489 else:
490 if list(schema.names) != list(self.schema.names):
491 raise ValueError("Cannot create a mask view with a schema with different names.")
492 if projection is ...:
493 projection = self._projection
494 if start is ...:
495 start = self._bbox.start
496 return self._transfer_metadata(Mask(self._array, start=start, schema=schema, projection=projection))
498 def update(self, other: Mask) -> None:
499 """Update ``self`` to include all common mask values set in ``other``.
501 Notes
502 -----
503 This only operates on the intersection of the two mask bounding boxes
504 and the mask planes that are present in both. Mask bits are only set,
505 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments).
506 """
507 lhs = self
508 rhs = other
509 if other.bbox != self.bbox:
510 try:
511 bbox = self.bbox.intersection(other.bbox)
512 except NoOverlapError:
513 return
514 lhs = self[bbox]
515 rhs = other[bbox]
516 for name in self.schema.names & other.schema.names:
517 lhs.set(name, rhs.get(name))
519 def get(self, plane: str) -> np.ndarray:
520 """Return a 2-d boolean array for the given mask plane.
522 Parameters
523 ----------
524 plane
525 Name of the mask plane.
527 Returns
528 -------
529 numpy.ndarray
530 A 2-d boolean array with the same shape as `bbox` that is `True`
531 where the bit for ``plane`` is set and `False` elsewhere.
532 """
533 bit = self.schema.bit(plane)
534 return (self._array[..., bit.index] & bit.mask).astype(bool)
536 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
537 """Set a mask plane.
539 Parameters
540 ----------
541 plane
542 Name of the mask plane to set
543 boolean_mask
544 A 2-d boolean array with the same shape as `bbox` that is `True`
545 where the bit for ``plane`` should be set and `False` where it
546 should be left unchanged (*not* set to zero). May be ``...`` to
547 set the bit everywhere.
548 """
549 bit = self.schema.bit(plane)
550 if boolean_mask is not ...:
551 boolean_mask = boolean_mask.astype(bool)
552 self._array[boolean_mask, bit.index] |= bit.mask
554 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
555 """Clear one or more mask planes.
557 Parameters
558 ----------
559 plane
560 Name of the mask plane to set. If `None` all mask planes are
561 cleared.
562 boolean_mask
563 A 2-d boolean array with the same shape as `bbox` that is `True`
564 where the bit for ``plane`` should be cleared and `False` where it
565 should be left unchanged. May be ``...`` to clear the bit
566 everywhere.
567 """
568 if boolean_mask is not ...:
569 boolean_mask = boolean_mask.astype(bool)
570 if plane is None:
571 self._array[boolean_mask, :] = 0
572 else:
573 bit = self.schema.bit(plane)
574 self._array[boolean_mask, bit.index] &= ~bit.mask
576 def serialize[P: pydantic.BaseModel](
577 self,
578 archive: OutputArchive[P],
579 *,
580 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
581 save_projection: bool = True,
582 add_offset_wcs: str | None = "A",
583 ) -> MaskSerializationModel[P]:
584 """Serialize the mask to an output archive.
586 Parameters
587 ----------
588 archive
589 Archive to write to.
590 update_header
591 A callback that will be given the FITS header for the HDU
592 containing this mask in order to add keys to it. This callback
593 may be provided but will not be called if the output format is not
594 FITS. As multiple HDUs may be added, this function may be called
595 multiple times.
596 save_projection
597 If `True`, save the `Projection` attached to the image, if there
598 is one. This does not affect whether a FITS WCS corresponding to
599 the projection is written (it always is, if available, and if
600 ``add_offset_wcs`` is not ``" "``).
601 add_offset_wcs
602 A FITS WCS single-character suffix to use when adding a linear
603 WCS that maps the FITS array to the logical pixel coordinates
604 defined by ``bbox.start``. Set to `None` to not write this WCS.
605 If this is set to ``" "``, it will prevent the `Projection` from
606 being saved as a FITS WCS.
607 """
608 if _archive_prefers_native_mask_arrays(archive):
609 # HDS presents array dimensions in Fortran order, which is the
610 # reverse of the h5py dataset shape. Store the in-memory trailing
611 # mask-byte axis first in HDF5 so Starlink tools see HDS axes
612 # (x, y, byte), without changing the bit packing within a pixel.
613 array_model = archive.add_array(np.moveaxis(self._array, -1, 0), update_header=update_header)
614 if not isinstance(array_model, ArrayReferenceModel):
615 raise RuntimeError("Native mask arrays require reference array storage.")
616 array_model.shape = list(self._array.shape)
617 data: list[ArrayReferenceModel | InlineArrayModel] = [array_model]
618 else:
619 data = []
620 for schema_2d in self.schema.split(np.int32):
621 mask_2d = Mask(0, bbox=self.bbox, schema=schema_2d, projection=self._projection)
622 mask_2d.update(self)
623 data.append(
624 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs)
625 )
626 serialized_projection: ProjectionSerializationModel[P] | None = None
627 if save_projection and self.projection is not None:
628 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
629 serialized_dtype = NumberType.from_numpy(self.schema.dtype)
630 assert is_integer(serialized_dtype), "Mask dtypes should always be integers."
631 return MaskSerializationModel.model_construct(
632 data=data,
633 start=list(self.bbox.start),
634 planes=list(self.schema),
635 dtype=serialized_dtype,
636 projection=serialized_projection,
637 metadata=self.metadata,
638 )
640 def _serialize_2d[P: pydantic.BaseModel](
641 self,
642 archive: OutputArchive[P],
643 *,
644 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
645 add_offset_wcs: str | None = "A",
646 ) -> ArrayReferenceModel | InlineArrayModel:
647 def _update_header(header: astropy.io.fits.Header) -> None:
648 update_header(header)
649 self.schema.update_header(header)
650 if self.projection is not None and add_offset_wcs != " ":
651 if self.fits_wcs:
652 header.update(self.fits_wcs.to_header(relax=True))
653 if add_offset_wcs is not None:
654 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
656 assert self.array.shape[2] == 1, "Mask should be split before calling this method."
657 return archive.add_array(self._array[:, :, 0], update_header=_update_header)
659 @staticmethod
660 def _get_archive_tree_type[P: pydantic.BaseModel](
661 pointer_type: type[P],
662 ) -> type[MaskSerializationModel[P]]:
663 """Return the serialization model type for this object for an archive
664 type that uses the given pointer type.
665 """
666 return MaskSerializationModel[pointer_type] # type: ignore
668 _archive_default_name: ClassVar[str] = "mask"
669 """The name this object should be serialized with when written as the
670 top-level object.
671 """
673 def write_fits(
674 self,
675 filename: str,
676 *,
677 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
678 ) -> None:
679 """Write the mask to a FITS file.
681 Parameters
682 ----------
683 filename
684 Name of the file to write to. Must be a local file.
685 compression
686 Compression options.
687 """
688 compression_options = {}
689 if compression is not fits.FitsCompressionOptions.DEFAULT:
690 compression_options[self._archive_default_name] = compression
691 fits.write(self, filename, compression_options)
693 @staticmethod
694 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask:
695 """Read an image from a FITS file.
697 Parameters
698 ----------
699 url
700 URL of the file to read; may be any type supported by
701 `lsst.resources.ResourcePath`.
702 bbox
703 Bounding box of a subimage to read instead.
704 """
705 return fits.read(Mask, url, bbox=bbox).deserialized
707 @staticmethod
708 def from_legacy(
709 legacy: Any,
710 plane_map: Mapping[str, MaskPlane] | None = None,
711 ) -> Mask:
712 """Convert from an `lsst.afw.image.Mask` instance.
714 Parameters
715 ----------
716 legacy
717 An `lsst.afw.image.Mask` instance. This will not share pixel
718 data with the new object.
719 plane_map
720 A mapping from legacy mask plane name to the new plane name and
721 description.
722 """
723 return Mask._from_legacy_array(
724 legacy.array,
725 legacy.getMaskPlaneDict(),
726 start=YX(y=legacy.getY0(), x=legacy.getX0()),
727 plane_map=plane_map,
728 )
730 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
731 """Convert to an `lsst.afw.image.Mask` instance.
733 The pixel data will not be shared between the two objects.
735 Parameters
736 ----------
737 plane_map
738 A mapping from legacy mask plane name to the new plane name and
739 description.
740 """
741 import lsst.afw.image
742 import lsst.geom
744 result = lsst.afw.image.Mask(self.bbox.to_legacy())
745 if plane_map is None:
746 plane_map = {plane.name: plane for plane in self.schema if plane is not None}
747 for old_name, new_plane in plane_map.items():
748 old_bit = result.addMaskPlane(old_name)
749 old_bitmask = 1 << old_bit
750 result.array[self.get(new_plane.name)] |= old_bitmask
751 return result
753 @staticmethod
754 def _from_legacy_array(
755 array2d: np.ndarray,
756 old_planes: Mapping[str, int],
757 *,
758 start: YX[int],
759 plane_map: Mapping[str, MaskPlane] | None = None,
760 projection: Projection | None = None,
761 ) -> Mask:
762 planes: list[MaskPlane] = []
763 new_name_to_old_bitmask: dict[str, int] = {}
764 for old_name, old_bit in old_planes.items():
765 old_bitmask = 1 << old_bit
766 if plane_map is not None:
767 if new_plane := plane_map.get(old_name):
768 planes.append(new_plane)
769 new_name_to_old_bitmask[new_plane.name] = old_bitmask
770 else:
771 if n_orphaned := np.count_nonzero(array2d & old_bitmask):
772 raise RuntimeError(
773 f"Legacy mask plane {old_name!r} is not remapped, "
774 f"but {n_orphaned} pixels have this bit set."
775 )
776 else:
777 planes.append(MaskPlane(old_name, ""))
778 new_name_to_old_bitmask[old_name] = old_bitmask
779 schema = MaskSchema(planes)
780 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection)
781 for new_name, old_bitmask in new_name_to_old_bitmask.items():
782 mask.set(new_name, array2d & old_bitmask)
783 return mask
785 @staticmethod
786 def read_legacy(
787 uri: ResourcePathExpression,
788 *,
789 plane_map: Mapping[str, MaskPlane] | None = None,
790 ext: str | int = 1,
791 fits_wcs_frame: Frame | None = None,
792 ) -> Mask:
793 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`.
795 Parameters
796 ----------
797 uri
798 URI or file name.
799 plane_map
800 A mapping from legacy mask plane name to the new plane name and
801 description.
802 ext
803 Name or index of the FITS HDU to read.
804 fits_wcs_frame
805 If not `None` and the HDU containing the mask has a FITS WCS,
806 attach a `Projection` to the returned mask by converting that WCS.
807 """
808 opaque_metadata = fits.FitsOpaqueMetadata()
809 fs, fspath = ResourcePath(uri).to_fsspec()
810 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
811 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
812 result = Mask._read_legacy_hdu(
813 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame
814 )
815 result._opaque_metadata = opaque_metadata
816 return result
818 @staticmethod
819 def _read_legacy_hdu(
820 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
821 opaque_metadata: fits.FitsOpaqueMetadata,
822 plane_map: Mapping[str, MaskPlane] | None = None,
823 fits_wcs_frame: Frame | None = None,
824 ) -> Mask:
825 if isinstance(hdu, astropy.io.fits.BinTableHDU):
826 hdu = astropy.io.fits.CompImageHDU(bintable=hdu)
827 dx: int = hdu.header.pop("LTV1")
828 dy: int = hdu.header.pop("LTV2")
829 start = YX(y=-dy, x=-dx)
830 old_planes = MaskPlane.read_legacy(hdu.header)
831 projection: Projection | None = None
832 if fits_wcs_frame is not None:
833 try:
834 fits_wcs = astropy.wcs.WCS(hdu.header)
835 except KeyError:
836 pass
837 else:
838 projection = Projection.from_fits_wcs(
839 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
840 )
841 mask = Mask._from_legacy_array(
842 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection
843 )
844 fits.strip_wcs_cards(hdu.header)
845 hdu.header.strip()
846 hdu.header.remove("EXTTYPE", ignore_missing=True)
847 hdu.header.remove("INHERIT", ignore_missing=True)
848 # afw set BUNIT on masks because of limitations in how FITS
849 # metadata is handled there.
850 hdu.header.remove("BUNIT", ignore_missing=True)
851 opaque_metadata.add_header(hdu.header)
852 return mask
855class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree):
856 """Pydantic model used to represent the serialized form of a `.Mask`."""
858 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field(
859 description="References to pixel data."
860 )
861 start: list[int] = pydantic.Field(
862 description="Coordinate of the first pixels in the array, ordered (y, x)."
863 )
864 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.")
865 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.")
866 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
867 default=None,
868 exclude_if=is_none,
869 description="Projection that maps the logical pixel grid onto the sky.",
870 )
872 @property
873 def bbox(self) -> Box:
874 """The 2-d bounding box of the mask."""
875 shape = self.data[0].shape
876 if len(shape) == 3:
877 shape = shape[:2]
878 return Box.from_shape(shape, start=self.start)
880 def deserialize(
881 self,
882 archive: InputArchive[Any],
883 *,
884 bbox: Box | None = None,
885 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
886 **kwargs: Any,
887 ) -> Mask:
888 """Deserialize a mask from an input archive.
890 Parameters
891 ----------
892 archive
893 Archive to read from.
894 bbox
895 Bounding box of a subimage to read instead.
896 strip_header
897 A callable that strips out any FITS header cards added by the
898 ``update_header`` argument in the corresponding call to
899 `Mask.serialize`.
900 **kwargs
901 Unsupported keyword arguments are accepted only to provide better
902 error messages (raising `serialization.InvalidParameterError`).
903 """
904 if kwargs:
905 raise InvalidParameterError(f"Unrecognized parameters for Mask: {set(kwargs.keys())}.")
906 slices: tuple[slice, ...] | EllipsisType = ...
907 if bbox is not None:
908 slices = bbox.slice_within(self.bbox)
909 else:
910 bbox = self.bbox
911 if not is_integer(self.dtype):
912 raise ArchiveReadError(f"Mask array has a non-integer dtype: {self.dtype}.")
913 schema = MaskSchema(self.planes, dtype=self.dtype.to_numpy())
914 projection = self.projection.deserialize(archive) if self.projection is not None else None
915 if len(self.data) == 1 and tuple(self.data[0].shape) == tuple(self.bbox.shape) + (schema.mask_size,):
916 storage_slices = slices if slices is ... else (slice(None),) + slices
917 array = archive.get_array(self.data[0], strip_header=strip_header, slices=storage_slices)
918 array = np.moveaxis(array, 0, -1)
919 return Mask(array, schema=schema, bbox=bbox, projection=projection)._finish_deserialize(self)
920 result = Mask(0, schema=schema, bbox=bbox, projection=projection)
921 schemas_2d = schema.split(np.int32)
922 if len(schemas_2d) != len(self.data):
923 raise ArchiveReadError(
924 f"Number of mask arrays ({len(self.data)}) does not match expectation ({len(schemas_2d)})."
925 )
926 for array_model, schema_2d in zip(self.data, schemas_2d):
927 mask_2d = self._deserialize_2d(
928 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices
929 )
930 result.update(mask_2d)
931 return result._finish_deserialize(self)
933 @staticmethod
934 def _deserialize_2d(
935 ref: ArrayReferenceModel | InlineArrayModel,
936 schema_2d: MaskSchema,
937 start: Sequence[int],
938 archive: InputArchive[Any],
939 *,
940 slices: tuple[slice, ...] | EllipsisType = ...,
941 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
942 ) -> Mask:
943 def _strip_header(header: astropy.io.fits.Header) -> None:
944 strip_header(header)
945 schema_2d.strip_header(header)
946 fits.strip_wcs_cards(header)
948 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices)
949 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start)
951 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
952 if kwargs:
953 raise InvalidParameterError(f"Unsupported parameters for Mask components: {set(kwargs.keys())}.")
954 return super().deserialize_component(component, archive)
957def _archive_prefers_native_mask_arrays(archive: OutputArchive[Any]) -> bool:
958 """Return whether an archive wants masks in their native 3-D layout."""
959 current: Any = archive
960 while current is not None:
961 if getattr(current, "_prefer_native_mask_arrays", False):
962 return True
963 current = getattr(current, "_parent", None)
964 return False
967def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]:
968 """Return a mapping from legacy mask plane name to `MaskPlane` instance
969 for LSST visit images, c. DP2.
970 """
971 return {
972 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."),
973 "SAT": MaskPlane(
974 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel."
975 ),
976 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."),
977 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."),
978 "EDGE": MaskPlane(
979 "DETECTION_EDGE",
980 "Pixel was too close to the edge to be considered for detection, "
981 "due to the finite size of the detection kernel.",
982 ),
983 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
984 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "),
985 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
986 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."),
987 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."),
988 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."),
989 "ITL_DIP": MaskPlane(
990 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD."
991 ),
992 "NOT_DEBLENDED": MaskPlane(
993 "NOT_DEBLENDED",
994 "Pixel belonged to a detection that was not deblended, usually due to size limits.",
995 ),
996 "SPIKE": MaskPlane(
997 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star."
998 ),
999 }
1002def get_legacy_deep_coadd_mask_planes() -> dict[str, MaskPlane]:
1003 """Return a mapping from legacy mask plane name to `MaskPlane` instance
1004 for LSST deep coadds, c. DP2.
1005 """
1006 return {
1007 # TODO: reconcile this with counts from the DP2 coadds.
1008 # BAD, CLIPPED, SUSPECT, PARTLY_VIGNETTED, SPIKE: should be fully
1009 # rejected from (cell) coadds with no propagation.
1010 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
1011 "INTRP": MaskPlane("INTERPOLATED", "Pixel value is the result of interpolating nearby good pixels."),
1012 "CR": MaskPlane(
1013 "COSMIC_RAY",
1014 "A cosmic ray affected this pixel on at least one input image (and was interpolated).",
1015 ),
1016 "SAT": MaskPlane("SATURATED", "More than 10% of the potential input visits."),
1017 "EDGE": MaskPlane(
1018 "DETECTION_EDGE",
1019 "Pixel was too close to the edge to be considered for detection, "
1020 "due to the finite size of the detection kernel.",
1021 ),
1022 "REJECTED": MaskPlane(
1023 "REJECTED", "At least one input visit was left out of the coadd for this pixel due to masking."
1024 ),
1025 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
1026 "INEXACT_PSF": MaskPlane(
1027 "INEXACT_PSF",
1028 "Pixel is on or near a cell boundary and hence its PSF may be (usually slightly) discontinuous.",
1029 ),
1030 }