Coverage for python / lsst / images / _mask.py: 22%
389 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 08:26 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "Mask",
16 "MaskPlane",
17 "MaskPlaneBit",
18 "MaskSchema",
19 "MaskSerializationModel",
20 "get_legacy_deep_coadd_mask_planes",
21 "get_legacy_visit_image_mask_planes",
22)
24import dataclasses
25import math
26from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
27from types import EllipsisType
28from typing import Any, ClassVar, cast
30import astropy.io.fits
31import astropy.wcs
32import numpy as np
33import numpy.typing as npt
34import pydantic
36from lsst.resources import ResourcePath, ResourcePathExpression
38from . import fits
39from ._generalized_image import GeneralizedImage
40from ._geom import YX, Box, NoOverlapError
41from ._transforms import Frame, Projection, ProjectionSerializationModel
42from .serialization import (
43 ArchiveReadError,
44 ArchiveTree,
45 ArrayReferenceModel,
46 InlineArrayModel,
47 InputArchive,
48 IntegerType,
49 InvalidParameterError,
50 MetadataValue,
51 NumberType,
52 OutputArchive,
53 is_integer,
54 no_header_updates,
55)
56from .utils import is_none
59@dataclasses.dataclass(frozen=True)
60class MaskPlane:
61 """Name and description of a single plane in a mask array."""
63 name: str
64 """Unique name for the mask plane (`str`)."""
66 description: str
67 """Human-readable documentation for the mask plane (`str`)."""
69 @classmethod
70 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]:
71 """Read mask plane descriptions written by
72 `lsst.afw.image.Mask.writeFits`.
74 Parameters
75 ----------
76 header
77 FITS header.
79 Returns
80 -------
81 `dict` [`str`, `int`]
82 A dictionary mapping mask plane name to integer bit index.
83 """
84 result: dict[str, int] = {}
85 for card in list(header.cards):
86 if card.keyword.startswith("MP_"):
87 result[card.keyword.removeprefix("MP_")] = card.value
88 del header[card.keyword]
89 return result
92@dataclasses.dataclass(frozen=True)
93class MaskPlaneBit:
94 """The nested array index and mask value associated with a single mask
95 plane.
96 """
98 index: int
99 """Index into the last dimension of the mask array where this plane's bit
100 is stored.
101 """
103 mask: np.integer
104 """Bitmask that selects just this plane's bit from a mask array value
105 (`numpy.integer`).
106 """
108 @classmethod
109 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit:
110 """Construct a `MaskPlaneBit` from the overall index of a plane in a
111 `MaskSchema` and the stride (number of bits per mask array element).
112 """
113 index, bit = divmod(overall_index, stride)
114 return cls(index, mask_type(1 << bit))
117class MaskSchema:
118 """A schema for a bit-packed mask array.
120 Parameters
121 ----------
122 planes
123 Iterable of `MaskPlane` instances that define the schema. `None`
124 values may be included to reserve bits for future use.
125 dtype
126 The numpy data type of the mask arrays that use this schema.
128 Notes
129 -----
130 A `MaskSchema` is a collection of mask planes, which each correspond to a
131 single bit in a mask array. Mask schemas are immutable and associated with
132 a particular array data type, allowing them to safely precompute the index
133 and bitmask for each plane.
135 `MaskSchema` indexing is by integer (the overall index of a plane in the
136 schema). The `descriptions` attribute may be indexed by plane name to get
137 the description for that plane, and the `bitmask` method can be used to
138 obtain an array that can be used to select one or more planes by name in
139 a mask array that uses this schema.
141 If no mask planes are provided, a `None` placeholder is automatically
142 added.
143 """
145 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8):
146 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,)
147 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype))
148 stride = self.bits_per_element(self._dtype)
149 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None}
150 self._mask_size = math.ceil(len(self._planes) / stride)
151 self._bits: dict[str, MaskPlaneBit] = {
152 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type)
153 for n, plane in enumerate(self._planes)
154 if plane is not None
155 }
157 @staticmethod
158 def bits_per_element(dtype: npt.DTypeLike) -> int:
159 """Return the number of mask bits per array element for the given
160 data type.
161 """
162 dtype = np.dtype(dtype)
163 match dtype.kind:
164 case "u":
165 return dtype.itemsize * 8
166 case "i":
167 return dtype.itemsize * 8 - 1
168 case _:
169 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.")
171 def __iter__(self) -> Iterator[MaskPlane | None]:
172 return iter(self._planes)
174 def __len__(self) -> int:
175 return len(self._planes)
177 def __getitem__(self, i: int) -> MaskPlane | None:
178 return self._planes[i]
180 def __repr__(self) -> str:
181 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})"
183 def __str__(self) -> str:
184 return "\n".join(
185 [
186 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}"
187 for name, bit in self._bits.items()
188 ]
189 )
191 def __eq__(self, other: object) -> bool:
192 if isinstance(other, MaskSchema):
193 return self._planes == other._planes and self._dtype == other._dtype
194 return False
196 @property
197 def dtype(self) -> np.dtype:
198 """The numpy data type of the mask arrays that use this schema."""
199 return self._dtype
201 @property
202 def mask_size(self) -> int:
203 """The number of elements in the last dimension of any mask array that
204 uses this schema.
205 """
206 return self._mask_size
208 @property
209 def names(self) -> Set[str]:
210 """The names of the mask planes, in bit order."""
211 return self._bits.keys()
213 @property
214 def descriptions(self) -> Mapping[str, str]:
215 """A mapping from plane name to description."""
216 return self._descriptions
218 def bit(self, plane: str) -> MaskPlaneBit:
219 """Return the last array index and mask for the given mask plane."""
220 return self._bits[plane]
222 def bitmask(self, *planes: str) -> np.ndarray:
223 """Return a 1-d mask array that represents the union (i.e. bitwise OR)
224 of the planes with the given names.
226 Parameters
227 ----------
228 *planes
229 Mask plane names.
231 Returns
232 -------
233 numpy.ndarray
234 A 1-d array with shape ``(mask_size,)``.
235 """
236 result = np.zeros(self.mask_size, dtype=self._dtype)
237 for plane in planes:
238 bit = self._bits[plane]
239 result[bit.index] |= bit.mask
240 return result
242 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]:
243 """Split the schema into an equivalent series of schemas that each
244 have a `mask_size` of ``1``, dropping all `None` placeholders.
246 Parameters
247 ----------
248 dtype
249 Data type of the new mask pixels.
251 Returns
252 -------
253 `list` [`MaskSchema`]
254 A list of mask schemas that together include all planes in
255 ``self`` and have `mask_size` equal to ``1``. If there are no
256 mask planes (only `None` placeholders) in ``self``, a single mask
257 schema with a `None` placeholder is returned; otherwise `None`
258 placeholders are returned.
259 """
260 dtype = np.dtype(dtype)
261 planes: list[MaskPlane] = []
262 schemas: list[MaskSchema] = []
263 n_planes_per_schema = self.bits_per_element(dtype)
264 for plane in self._planes:
265 if plane is not None:
266 planes.append(plane)
267 if len(planes) == n_planes_per_schema:
268 schemas.append(MaskSchema(planes, dtype=dtype))
269 planes.clear()
270 if planes:
271 schemas.append(MaskSchema(planes, dtype=dtype))
272 if not schemas:
273 schemas.append(MaskSchema([None], dtype=dtype))
274 return schemas
276 def update_header(self, header: astropy.io.fits.Header) -> None:
277 """Add a description of this mask schema to a FITS header."""
278 for n, plane in enumerate(self):
279 if plane is not None:
280 bit = self.bit(plane.name)
281 if bit.index != 0:
282 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.")
283 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.")
284 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.")
285 # We don't add a comment to the description card, because it's
286 # likely to overrun a single card and get the CONTINUE
287 # treatment. That will cause Astropy to warn about the comment
288 # being truncated and that's worse than just leaving it
289 # unexplained; it's pretty obvious from context what it is.
290 header.set(f"MSKD{n:04d}", plane.description)
292 def strip_header(self, header: astropy.io.fits.Header) -> None:
293 """Remove all header cards added by `update_header`."""
294 for n, plane in enumerate(self):
295 if plane is not None:
296 header.remove(f"MSKN{n:04d}", ignore_missing=True)
297 header.remove(f"MSKM{n:04d}", ignore_missing=True)
298 header.remove(f"MSKD{n:04d}", ignore_missing=True)
301class Mask(GeneralizedImage):
302 """A 2-d bitmask image backed by a 3-d byte array.
304 Parameters
305 ----------
306 array_or_fill
307 Array or fill value for the mask. If a fill value, ``bbox`` or
308 ``shape`` must be provided.
309 schema
310 Schema that defines the planes and their bit assignments.
311 bbox
312 Bounding box for the mask. This sets the shape of the first two
313 dimensions of the array.
314 start
315 Logical coordinates of the first pixel in the array, ordered ``y``,
316 ``x`` (unless an `XY` instance is passed). Ignored if
317 ``bbox`` is provided. Defaults to zeros.
318 shape
319 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
320 instance is passed). Only needed if ``array_or_fill`` is not an
321 array and ``bbox`` is not provided. Like the bbox, this does not
322 include the last dimension of the array.
323 projection
324 Projection that maps the pixel grid to the sky.
325 metadata
326 Arbitrary flexible metadata to associate with the mask.
328 Notes
329 -----
330 Indexing the `array` attribute of a `Mask` does not take into account its
331 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with
332 a `Box` does, and the `bbox` of the subimage is set to match its location
333 within the original mask.
335 A mask's ``bbox`` corresponds to the leading dimensions of its backing
336 `numpy.ndarray`, while the last dimension's size is always equal to the
337 `~MaskSchema.mask_size` of its schema, since a schema can in general
338 require multiple array elements to represent all of its planes.
339 """
341 def __init__(
342 self,
343 array_or_fill: np.ndarray | int = 0,
344 /,
345 *,
346 schema: MaskSchema,
347 bbox: Box | None = None,
348 start: Sequence[int] | None = None,
349 shape: Sequence[int] | None = None,
350 projection: Projection | None = None,
351 metadata: dict[str, MetadataValue] | None = None,
352 ):
353 super().__init__(metadata)
354 if shape is not None:
355 shape = tuple(shape)
356 if start is not None:
357 start = tuple(start)
358 if isinstance(array_or_fill, np.ndarray):
359 array = np.array(array_or_fill, dtype=schema.dtype, copy=None)
360 if array.ndim != 3:
361 raise ValueError("Mask array must be 3-d.")
362 if bbox is None:
363 bbox = Box.from_shape(array.shape[:-1], start=start)
364 elif bbox.shape + (schema.mask_size,) != array.shape:
365 raise ValueError(
366 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not "
367 f"match array with shape {array.shape}."
368 )
369 if shape is not None and shape + (schema.mask_size,) != array.shape:
370 raise ValueError(
371 f"Explicit shape {shape} and schema of size {schema.mask_size} do "
372 f"not match array with shape {array.shape}."
373 )
375 else:
376 if bbox is None:
377 if shape is None:
378 raise TypeError("No bbox, size, or array provided.")
379 bbox = Box.from_shape(shape, start=start)
380 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype)
381 self._array = array
382 self._bbox: Box = bbox
383 self._schema: MaskSchema = schema
384 self._projection = projection
386 @property
387 def array(self) -> np.ndarray:
388 """The low-level array (`numpy.ndarray`).
390 Assigning to this attribute modifies the existing array in place; the
391 bounding box and underlying data pointer are never changed.
392 """
393 return self._array
395 @array.setter
396 def array(self, value: np.ndarray | int) -> None:
397 self._array[:, :] = value
399 @property
400 def schema(self) -> MaskSchema:
401 """Schema that defines the planes and their bit assignments
402 (`MaskSchema`).
403 """
404 return self._schema
406 @property
407 def bbox(self) -> Box:
408 """2-d bounding box of the mask (`Box`).
410 This sets the shape of the first two dimensions of the array.
411 """
412 return self._bbox
414 @property
415 def projection(self) -> Projection[Any] | None:
416 """The projection that maps this mask's pixel grid to the sky
417 (`Projection` | `None`).
419 Notes
420 -----
421 The pixel coordinates used by this projection account for the bounding
422 box ``start``; they are not just array indices.
423 """
424 return self._projection
426 def __getitem__(self, bbox: Box | EllipsisType) -> Mask:
427 if bbox is ...:
428 return self
429 super().__getitem__(bbox)
430 return self._transfer_metadata(
431 Mask(
432 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :],
433 bbox=bbox,
434 schema=self.schema,
435 projection=self._projection,
436 ),
437 bbox=bbox,
438 )
440 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None:
441 subview = self[bbox]
442 subview.clear()
443 subview.update(value)
445 def __str__(self) -> str:
446 return f"Mask({self.bbox!s}, {list(self.schema.names)})"
448 def __repr__(self) -> str:
449 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})"
451 def __eq__(self, other: object) -> bool:
452 if not isinstance(other, Mask):
453 return NotImplemented
454 return (
455 self._bbox == other._bbox
456 and self._schema == other._schema
457 and np.array_equal(self._array, other._array, equal_nan=True)
458 )
460 def copy(self) -> Mask:
461 """Deep-copy the mask and metadata."""
462 return self._transfer_metadata(
463 Mask(self._array.copy(), bbox=self._bbox, schema=self._schema, projection=self._projection),
464 copy=True,
465 )
467 def view(
468 self,
469 *,
470 schema: MaskSchema | EllipsisType = ...,
471 projection: Projection | None | EllipsisType = ...,
472 start: Sequence[int] | EllipsisType = ...,
473 ) -> Mask:
474 """Make a view of the mask, with optional updates.
476 Notes
477 -----
478 This can only be used to make changes to schema descriptions; plane
479 names must remain the same (in the same order).
480 """
481 if schema is ...:
482 schema = self._schema
483 else:
484 if list(schema.names) != list(self.schema.names):
485 raise ValueError("Cannot create a mask view with a schema with different names.")
486 if projection is ...:
487 projection = self._projection
488 if start is ...:
489 start = self._bbox.start
490 return self._transfer_metadata(Mask(self._array, start=start, schema=schema, projection=projection))
492 def update(self, other: Mask) -> None:
493 """Update ``self`` to include all common mask values set in ``other``.
495 Notes
496 -----
497 This only operates on the intersection of the two mask bounding boxes
498 and the mask planes that are present in both. Mask bits are only set,
499 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments).
500 """
501 lhs = self
502 rhs = other
503 if other.bbox != self.bbox:
504 try:
505 bbox = self.bbox.intersection(other.bbox)
506 except NoOverlapError:
507 return
508 lhs = self[bbox]
509 rhs = other[bbox]
510 for name in self.schema.names & other.schema.names:
511 lhs.set(name, rhs.get(name))
513 def get(self, plane: str) -> np.ndarray:
514 """Return a 2-d boolean array for the given mask plane.
516 Parameters
517 ----------
518 plane
519 Name of the mask plane.
521 Returns
522 -------
523 numpy.ndarray
524 A 2-d boolean array with the same shape as `bbox` that is `True`
525 where the bit for ``plane`` is set and `False` elsewhere.
526 """
527 bit = self.schema.bit(plane)
528 return (self._array[..., bit.index] & bit.mask).astype(bool)
530 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
531 """Set a mask plane.
533 Parameters
534 ----------
535 plane
536 Name of the mask plane to set
537 boolean_mask
538 A 2-d boolean array with the same shape as `bbox` that is `True`
539 where the bit for ``plane`` should be set and `False` where it
540 should be left unchanged (*not* set to zero). May be ``...`` to
541 set the bit everywhere.
542 """
543 bit = self.schema.bit(plane)
544 if boolean_mask is not ...:
545 boolean_mask = boolean_mask.astype(bool)
546 self._array[boolean_mask, bit.index] |= bit.mask
548 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
549 """Clear one or more mask planes.
551 Parameters
552 ----------
553 plane
554 Name of the mask plane to set. If `None` all mask planes are
555 cleared.
556 boolean_mask
557 A 2-d boolean array with the same shape as `bbox` that is `True`
558 where the bit for ``plane`` should be cleared and `False` where it
559 should be left unchanged. May be ``...`` to clear the bit
560 everywhere.
561 """
562 if boolean_mask is not ...:
563 boolean_mask = boolean_mask.astype(bool)
564 if plane is None:
565 self._array[boolean_mask, :] = 0
566 else:
567 bit = self.schema.bit(plane)
568 self._array[boolean_mask, bit.index] &= ~bit.mask
570 def serialize[P: pydantic.BaseModel](
571 self,
572 archive: OutputArchive[P],
573 *,
574 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
575 save_projection: bool = True,
576 add_offset_wcs: str | None = "A",
577 ) -> MaskSerializationModel[P]:
578 """Serialize the mask to an output archive.
580 Parameters
581 ----------
582 archive
583 Archive to write to.
584 update_header
585 A callback that will be given the FITS header for the HDU
586 containing this mask in order to add keys to it. This callback
587 may be provided but will not be called if the output format is not
588 FITS. As multiple HDUs may be added, this function may be called
589 multiple times.
590 save_projection
591 If `True`, save the `Projection` attached to the image, if there
592 is one. This does not affect whether a FITS WCS corresponding to
593 the projection is written (it always is, if available, and if
594 ``add_offset_wcs`` is not ``" "``).
595 add_offset_wcs
596 A FITS WCS single-character suffix to use when adding a linear
597 WCS that maps the FITS array to the logical pixel coordinates
598 defined by ``bbox.start``. Set to `None` to not write this WCS.
599 If this is set to ``" "``, it will prevent the `Projection` from
600 being saved as a FITS WCS.
601 """
602 if _archive_prefers_native_mask_arrays(archive):
603 # HDS presents array dimensions in Fortran order, which is the
604 # reverse of the h5py dataset shape. Store the in-memory trailing
605 # mask-byte axis first in HDF5 so Starlink tools see HDS axes
606 # (x, y, byte), without changing the bit packing within a pixel.
607 array_model = archive.add_array(np.moveaxis(self._array, -1, 0), update_header=update_header)
608 if not isinstance(array_model, ArrayReferenceModel):
609 raise RuntimeError("Native mask arrays require reference array storage.")
610 array_model.shape = list(self._array.shape)
611 data: list[ArrayReferenceModel | InlineArrayModel] = [array_model]
612 else:
613 data = []
614 for schema_2d in self.schema.split(np.int32):
615 mask_2d = Mask(0, bbox=self.bbox, schema=schema_2d, projection=self._projection)
616 mask_2d.update(self)
617 data.append(
618 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs)
619 )
620 serialized_projection: ProjectionSerializationModel[P] | None = None
621 if save_projection and self.projection is not None:
622 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
623 serialized_dtype = NumberType.from_numpy(self.schema.dtype)
624 assert is_integer(serialized_dtype), "Mask dtypes should always be integers."
625 return MaskSerializationModel.model_construct(
626 data=data,
627 start=list(self.bbox.start),
628 planes=list(self.schema),
629 dtype=serialized_dtype,
630 projection=serialized_projection,
631 metadata=self.metadata,
632 )
634 def _serialize_2d[P: pydantic.BaseModel](
635 self,
636 archive: OutputArchive[P],
637 *,
638 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
639 add_offset_wcs: str | None = "A",
640 ) -> ArrayReferenceModel | InlineArrayModel:
641 def _update_header(header: astropy.io.fits.Header) -> None:
642 update_header(header)
643 self.schema.update_header(header)
644 if self.projection is not None and add_offset_wcs != " ":
645 if self.fits_wcs:
646 header.update(self.fits_wcs.to_header(relax=True))
647 if add_offset_wcs is not None:
648 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
650 assert self.array.shape[2] == 1, "Mask should be split before calling this method."
651 return archive.add_array(self._array[:, :, 0], update_header=_update_header)
653 @staticmethod
654 def _get_archive_tree_type[P: pydantic.BaseModel](
655 pointer_type: type[P],
656 ) -> type[MaskSerializationModel[P]]:
657 """Return the serialization model type for this object for an archive
658 type that uses the given pointer type.
659 """
660 return MaskSerializationModel[pointer_type] # type: ignore
662 _archive_default_name: ClassVar[str] = "mask"
663 """The name this object should be serialized with when written as the
664 top-level object.
665 """
667 def write_fits(
668 self,
669 filename: str,
670 *,
671 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
672 ) -> None:
673 """Write the mask to a FITS file.
675 Parameters
676 ----------
677 filename
678 Name of the file to write to. Must be a local file.
679 compression
680 Compression options.
681 """
682 compression_options = {}
683 if compression is not fits.FitsCompressionOptions.DEFAULT:
684 compression_options[self._archive_default_name] = compression
685 fits.write(self, filename, compression_options)
687 @staticmethod
688 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask:
689 """Read an image from a FITS file.
691 Parameters
692 ----------
693 url
694 URL of the file to read; may be any type supported by
695 `lsst.resources.ResourcePath`.
696 bbox
697 Bounding box of a subimage to read instead.
698 """
699 return fits.read(Mask, url, bbox=bbox).deserialized
701 @staticmethod
702 def from_legacy(
703 legacy: Any,
704 plane_map: Mapping[str, MaskPlane] | None = None,
705 ) -> Mask:
706 """Convert from an `lsst.afw.image.Mask` instance.
708 Parameters
709 ----------
710 legacy
711 An `lsst.afw.image.Mask` instance. This will not share pixel
712 data with the new object.
713 plane_map
714 A mapping from legacy mask plane name to the new plane name and
715 description.
716 """
717 return Mask._from_legacy_array(
718 legacy.array,
719 legacy.getMaskPlaneDict(),
720 start=YX(y=legacy.getY0(), x=legacy.getX0()),
721 plane_map=plane_map,
722 )
724 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
725 """Convert to an `lsst.afw.image.Mask` instance.
727 The pixel data will not be shared between the two objects.
729 Parameters
730 ----------
731 plane_map
732 A mapping from legacy mask plane name to the new plane name and
733 description.
734 """
735 import lsst.afw.image
736 import lsst.geom
738 result = lsst.afw.image.Mask(self.bbox.to_legacy())
739 if plane_map is None:
740 plane_map = {plane.name: plane for plane in self.schema if plane is not None}
741 for old_name, new_plane in plane_map.items():
742 old_bit = result.addMaskPlane(old_name)
743 old_bitmask = 1 << old_bit
744 result.array[self.get(new_plane.name)] |= old_bitmask
745 return result
747 @staticmethod
748 def _from_legacy_array(
749 array2d: np.ndarray,
750 old_planes: Mapping[str, int],
751 *,
752 start: YX[int],
753 plane_map: Mapping[str, MaskPlane] | None = None,
754 projection: Projection | None = None,
755 ) -> Mask:
756 planes: list[MaskPlane] = []
757 new_name_to_old_bitmask: dict[str, int] = {}
758 for old_name, old_bit in old_planes.items():
759 old_bitmask = 1 << old_bit
760 if plane_map is not None:
761 if new_plane := plane_map.get(old_name):
762 planes.append(new_plane)
763 new_name_to_old_bitmask[new_plane.name] = old_bitmask
764 else:
765 if n_orphaned := np.count_nonzero(array2d & old_bitmask):
766 raise RuntimeError(
767 f"Legacy mask plane {old_name!r} is not remapped, "
768 f"but {n_orphaned} pixels have this bit set."
769 )
770 else:
771 planes.append(MaskPlane(old_name, ""))
772 new_name_to_old_bitmask[old_name] = old_bitmask
773 schema = MaskSchema(planes)
774 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection)
775 for new_name, old_bitmask in new_name_to_old_bitmask.items():
776 mask.set(new_name, array2d & old_bitmask)
777 return mask
779 @staticmethod
780 def read_legacy(
781 uri: ResourcePathExpression,
782 *,
783 plane_map: Mapping[str, MaskPlane] | None = None,
784 ext: str | int = 1,
785 fits_wcs_frame: Frame | None = None,
786 ) -> Mask:
787 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`.
789 Parameters
790 ----------
791 uri
792 URI or file name.
793 plane_map
794 A mapping from legacy mask plane name to the new plane name and
795 description.
796 ext
797 Name or index of the FITS HDU to read.
798 fits_wcs_frame
799 If not `None` and the HDU containing the mask has a FITS WCS,
800 attach a `Projection` to the returned mask by converting that WCS.
801 """
802 opaque_metadata = fits.FitsOpaqueMetadata()
803 fs, fspath = ResourcePath(uri).to_fsspec()
804 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
805 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
806 result = Mask._read_legacy_hdu(
807 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame
808 )
809 result._opaque_metadata = opaque_metadata
810 return result
812 @staticmethod
813 def _read_legacy_hdu(
814 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
815 opaque_metadata: fits.FitsOpaqueMetadata,
816 plane_map: Mapping[str, MaskPlane] | None = None,
817 fits_wcs_frame: Frame | None = None,
818 ) -> Mask:
819 if isinstance(hdu, astropy.io.fits.BinTableHDU):
820 hdu = astropy.io.fits.CompImageHDU(bintable=hdu)
821 dx: int = hdu.header.pop("LTV1")
822 dy: int = hdu.header.pop("LTV2")
823 start = YX(y=-dy, x=-dx)
824 old_planes = MaskPlane.read_legacy(hdu.header)
825 projection: Projection | None = None
826 if fits_wcs_frame is not None:
827 try:
828 fits_wcs = astropy.wcs.WCS(hdu.header)
829 except KeyError:
830 pass
831 else:
832 projection = Projection.from_fits_wcs(
833 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
834 )
835 mask = Mask._from_legacy_array(
836 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection
837 )
838 fits.strip_wcs_cards(hdu.header)
839 hdu.header.strip()
840 hdu.header.remove("EXTTYPE", ignore_missing=True)
841 hdu.header.remove("INHERIT", ignore_missing=True)
842 # afw set BUNIT on masks because of limitations in how FITS
843 # metadata is handled there.
844 hdu.header.remove("BUNIT", ignore_missing=True)
845 opaque_metadata.add_header(hdu.header)
846 return mask
849class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree):
850 """Pydantic model used to represent the serialized form of a `.Mask`."""
852 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field(
853 description="References to pixel data."
854 )
855 start: list[int] = pydantic.Field(
856 description="Coordinate of the first pixels in the array, ordered (y, x)."
857 )
858 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.")
859 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.")
860 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
861 default=None,
862 exclude_if=is_none,
863 description="Projection that maps the logical pixel grid onto the sky.",
864 )
866 @property
867 def bbox(self) -> Box:
868 """The 2-d bounding box of the mask."""
869 shape = self.data[0].shape
870 if len(shape) == 3:
871 shape = shape[:2]
872 return Box.from_shape(shape, start=self.start)
874 def deserialize(
875 self,
876 archive: InputArchive[Any],
877 *,
878 bbox: Box | None = None,
879 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
880 **kwargs: Any,
881 ) -> Mask:
882 """Deserialize a mask from an input archive.
884 Parameters
885 ----------
886 archive
887 Archive to read from.
888 bbox
889 Bounding box of a subimage to read instead.
890 strip_header
891 A callable that strips out any FITS header cards added by the
892 ``update_header`` argument in the corresponding call to
893 `Mask.serialize`.
894 **kwargs
895 Unsupported keyword arguments are accepted only to provide better
896 error messages (raising `serialization.InvalidParameterError`).
897 """
898 if kwargs:
899 raise InvalidParameterError(f"Unrecognized parameters for Mask: {set(kwargs.keys())}.")
900 slices: tuple[slice, ...] | EllipsisType = ...
901 if bbox is not None:
902 slices = bbox.slice_within(self.bbox)
903 else:
904 bbox = self.bbox
905 if not is_integer(self.dtype):
906 raise ArchiveReadError(f"Mask array has a non-integer dtype: {self.dtype}.")
907 schema = MaskSchema(self.planes, dtype=self.dtype.to_numpy())
908 projection = self.projection.deserialize(archive) if self.projection is not None else None
909 if len(self.data) == 1 and tuple(self.data[0].shape) == tuple(self.bbox.shape) + (schema.mask_size,):
910 storage_slices = slices if slices is ... else (slice(None),) + slices
911 array = archive.get_array(self.data[0], strip_header=strip_header, slices=storage_slices)
912 array = np.moveaxis(array, 0, -1)
913 return Mask(array, schema=schema, bbox=bbox, projection=projection)._finish_deserialize(self)
914 result = Mask(0, schema=schema, bbox=bbox, projection=projection)
915 schemas_2d = schema.split(np.int32)
916 if len(schemas_2d) != len(self.data):
917 raise ArchiveReadError(
918 f"Number of mask arrays ({len(self.data)}) does not match expectation ({len(schemas_2d)})."
919 )
920 for array_model, schema_2d in zip(self.data, schemas_2d):
921 mask_2d = self._deserialize_2d(
922 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices
923 )
924 result.update(mask_2d)
925 return result._finish_deserialize(self)
927 @staticmethod
928 def _deserialize_2d(
929 ref: ArrayReferenceModel | InlineArrayModel,
930 schema_2d: MaskSchema,
931 start: Sequence[int],
932 archive: InputArchive[Any],
933 *,
934 slices: tuple[slice, ...] | EllipsisType = ...,
935 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
936 ) -> Mask:
937 def _strip_header(header: astropy.io.fits.Header) -> None:
938 strip_header(header)
939 schema_2d.strip_header(header)
940 fits.strip_wcs_cards(header)
942 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices)
943 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start)
945 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any:
946 if kwargs:
947 raise InvalidParameterError(f"Unsupported parameters for Mask components: {set(kwargs.keys())}.")
948 return super().deserialize_component(component, archive)
951def _archive_prefers_native_mask_arrays(archive: OutputArchive[Any]) -> bool:
952 """Return whether an archive wants masks in their native 3-D layout."""
953 current: Any = archive
954 while current is not None:
955 if getattr(current, "_prefer_native_mask_arrays", False):
956 return True
957 current = getattr(current, "_parent", None)
958 return False
961def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]:
962 """Return a mapping from legacy mask plane name to `MaskPlane` instance
963 for LSST visit images, c. DP2.
964 """
965 return {
966 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."),
967 "SAT": MaskPlane(
968 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel."
969 ),
970 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."),
971 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."),
972 "EDGE": MaskPlane(
973 "DETECTION_EDGE",
974 "Pixel was too close to the edge to be considered for detection, "
975 "due to the finite size of the detection kernel.",
976 ),
977 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
978 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "),
979 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
980 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."),
981 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."),
982 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."),
983 "ITL_DIP": MaskPlane(
984 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD."
985 ),
986 "NOT_DEBLENDED": MaskPlane(
987 "NOT_DEBLENDED",
988 "Pixel belonged to a detection that was not deblended, usually due to size limits.",
989 ),
990 "SPIKE": MaskPlane(
991 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star."
992 ),
993 }
996def get_legacy_deep_coadd_mask_planes() -> dict[str, MaskPlane]:
997 """Return a mapping from legacy mask plane name to `MaskPlane` instance
998 for LSST deep coadds, c. DP2.
999 """
1000 return {
1001 # TODO: reconcile this with counts from the DP2 coadds.
1002 # BAD, CLIPPED, SUSPECT, PARTLY_VIGNETTED, SPIKE: should be fully
1003 # rejected from (cell) coadds with no propagation.
1004 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
1005 "INTRP": MaskPlane("INTERPOLATED", "Pixel value is the result of interpolating nearby good pixels."),
1006 "CR": MaskPlane(
1007 "COSMIC_RAY",
1008 "A cosmic ray affected this pixel on at least one input image (and was interpolated).",
1009 ),
1010 "SAT": MaskPlane("SATURATED", "More than 10% of the potential input visits."),
1011 "EDGE": MaskPlane(
1012 "DETECTION_EDGE",
1013 "Pixel was too close to the edge to be considered for detection, "
1014 "due to the finite size of the detection kernel.",
1015 ),
1016 "REJECTED": MaskPlane(
1017 "REJECTED", "At least one input visit was left out of the coadd for this pixel due to masking."
1018 ),
1019 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
1020 "INEXACT_PSF": MaskPlane(
1021 "INEXACT_PSF",
1022 "Pixel is on or near a cell boundary and hence its PSF may be (usually slightly) discontinuous.",
1023 ),
1024 }