Coverage for python / lsst / images / _mask.py: 22%

389 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 01:57 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "Mask", 

16 "MaskPlane", 

17 "MaskPlaneBit", 

18 "MaskSchema", 

19 "MaskSerializationModel", 

20 "get_legacy_deep_coadd_mask_planes", 

21 "get_legacy_visit_image_mask_planes", 

22) 

23 

24import dataclasses 

25import math 

26from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set 

27from types import EllipsisType 

28from typing import Any, ClassVar, cast 

29 

30import astropy.io.fits 

31import astropy.wcs 

32import numpy as np 

33import numpy.typing as npt 

34import pydantic 

35 

36from lsst.resources import ResourcePath, ResourcePathExpression 

37 

38from . import fits 

39from ._generalized_image import GeneralizedImage 

40from ._geom import YX, Box, NoOverlapError 

41from ._transforms import Frame, Projection, ProjectionSerializationModel 

42from .serialization import ( 

43 ArchiveReadError, 

44 ArchiveTree, 

45 ArrayReferenceModel, 

46 InlineArrayModel, 

47 InputArchive, 

48 IntegerType, 

49 InvalidParameterError, 

50 MetadataValue, 

51 NumberType, 

52 OutputArchive, 

53 is_integer, 

54 no_header_updates, 

55) 

56from .utils import is_none 

57 

58 

59@dataclasses.dataclass(frozen=True) 

60class MaskPlane: 

61 """Name and description of a single plane in a mask array.""" 

62 

63 name: str 

64 """Unique name for the mask plane (`str`).""" 

65 

66 description: str 

67 """Human-readable documentation for the mask plane (`str`).""" 

68 

69 @classmethod 

70 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]: 

71 """Read mask plane descriptions written by 

72 `lsst.afw.image.Mask.writeFits`. 

73 

74 Parameters 

75 ---------- 

76 header 

77 FITS header. 

78 

79 Returns 

80 ------- 

81 `dict` [`str`, `int`] 

82 A dictionary mapping mask plane name to integer bit index. 

83 """ 

84 result: dict[str, int] = {} 

85 for card in list(header.cards): 

86 if card.keyword.startswith("MP_"): 

87 result[card.keyword.removeprefix("MP_")] = card.value 

88 del header[card.keyword] 

89 return result 

90 

91 

92@dataclasses.dataclass(frozen=True) 

93class MaskPlaneBit: 

94 """The nested array index and mask value associated with a single mask 

95 plane. 

96 """ 

97 

98 index: int 

99 """Index into the last dimension of the mask array where this plane's bit 

100 is stored. 

101 """ 

102 

103 mask: np.integer 

104 """Bitmask that selects just this plane's bit from a mask array value 

105 (`numpy.integer`). 

106 """ 

107 

108 @classmethod 

109 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit: 

110 """Construct a `MaskPlaneBit` from the overall index of a plane in a 

111 `MaskSchema` and the stride (number of bits per mask array element). 

112 """ 

113 index, bit = divmod(overall_index, stride) 

114 return cls(index, mask_type(1 << bit)) 

115 

116 

117class MaskSchema: 

118 """A schema for a bit-packed mask array. 

119 

120 Parameters 

121 ---------- 

122 planes 

123 Iterable of `MaskPlane` instances that define the schema. `None` 

124 values may be included to reserve bits for future use. 

125 dtype 

126 The numpy data type of the mask arrays that use this schema. 

127 

128 Notes 

129 ----- 

130 A `MaskSchema` is a collection of mask planes, which each correspond to a 

131 single bit in a mask array. Mask schemas are immutable and associated with 

132 a particular array data type, allowing them to safely precompute the index 

133 and bitmask for each plane. 

134 

135 `MaskSchema` indexing is by integer (the overall index of a plane in the 

136 schema). The `descriptions` attribute may be indexed by plane name to get 

137 the description for that plane, and the `bitmask` method can be used to 

138 obtain an array that can be used to select one or more planes by name in 

139 a mask array that uses this schema. 

140 

141 If no mask planes are provided, a `None` placeholder is automatically 

142 added. 

143 """ 

144 

145 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8): 

146 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,) 

147 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype)) 

148 stride = self.bits_per_element(self._dtype) 

149 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None} 

150 self._mask_size = math.ceil(len(self._planes) / stride) 

151 self._bits: dict[str, MaskPlaneBit] = { 

152 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type) 

153 for n, plane in enumerate(self._planes) 

154 if plane is not None 

155 } 

156 

157 @staticmethod 

158 def bits_per_element(dtype: npt.DTypeLike) -> int: 

159 """Return the number of mask bits per array element for the given 

160 data type. 

161 """ 

162 dtype = np.dtype(dtype) 

163 match dtype.kind: 

164 case "u": 

165 return dtype.itemsize * 8 

166 case "i": 

167 return dtype.itemsize * 8 - 1 

168 case _: 

169 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.") 

170 

171 def __iter__(self) -> Iterator[MaskPlane | None]: 

172 return iter(self._planes) 

173 

174 def __len__(self) -> int: 

175 return len(self._planes) 

176 

177 def __getitem__(self, i: int) -> MaskPlane | None: 

178 return self._planes[i] 

179 

180 def __repr__(self) -> str: 

181 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})" 

182 

183 def __str__(self) -> str: 

184 return "\n".join( 

185 [ 

186 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}" 

187 for name, bit in self._bits.items() 

188 ] 

189 ) 

190 

191 def __eq__(self, other: object) -> bool: 

192 if isinstance(other, MaskSchema): 

193 return self._planes == other._planes and self._dtype == other._dtype 

194 return False 

195 

196 @property 

197 def dtype(self) -> np.dtype: 

198 """The numpy data type of the mask arrays that use this schema.""" 

199 return self._dtype 

200 

201 @property 

202 def mask_size(self) -> int: 

203 """The number of elements in the last dimension of any mask array that 

204 uses this schema. 

205 """ 

206 return self._mask_size 

207 

208 @property 

209 def names(self) -> Set[str]: 

210 """The names of the mask planes, in bit order.""" 

211 return self._bits.keys() 

212 

213 @property 

214 def descriptions(self) -> Mapping[str, str]: 

215 """A mapping from plane name to description.""" 

216 return self._descriptions 

217 

218 def bit(self, plane: str) -> MaskPlaneBit: 

219 """Return the last array index and mask for the given mask plane.""" 

220 return self._bits[plane] 

221 

222 def bitmask(self, *planes: str) -> np.ndarray: 

223 """Return a 1-d mask array that represents the union (i.e. bitwise OR) 

224 of the planes with the given names. 

225 

226 Parameters 

227 ---------- 

228 *planes 

229 Mask plane names. 

230 

231 Returns 

232 ------- 

233 numpy.ndarray 

234 A 1-d array with shape ``(mask_size,)``. 

235 """ 

236 result = np.zeros(self.mask_size, dtype=self._dtype) 

237 for plane in planes: 

238 bit = self._bits[plane] 

239 result[bit.index] |= bit.mask 

240 return result 

241 

242 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]: 

243 """Split the schema into an equivalent series of schemas that each 

244 have a `mask_size` of ``1``, dropping all `None` placeholders. 

245 

246 Parameters 

247 ---------- 

248 dtype 

249 Data type of the new mask pixels. 

250 

251 Returns 

252 ------- 

253 `list` [`MaskSchema`] 

254 A list of mask schemas that together include all planes in 

255 ``self`` and have `mask_size` equal to ``1``. If there are no 

256 mask planes (only `None` placeholders) in ``self``, a single mask 

257 schema with a `None` placeholder is returned; otherwise `None` 

258 placeholders are returned. 

259 """ 

260 dtype = np.dtype(dtype) 

261 planes: list[MaskPlane] = [] 

262 schemas: list[MaskSchema] = [] 

263 n_planes_per_schema = self.bits_per_element(dtype) 

264 for plane in self._planes: 

265 if plane is not None: 

266 planes.append(plane) 

267 if len(planes) == n_planes_per_schema: 

268 schemas.append(MaskSchema(planes, dtype=dtype)) 

269 planes.clear() 

270 if planes: 

271 schemas.append(MaskSchema(planes, dtype=dtype)) 

272 if not schemas: 

273 schemas.append(MaskSchema([None], dtype=dtype)) 

274 return schemas 

275 

276 def update_header(self, header: astropy.io.fits.Header) -> None: 

277 """Add a description of this mask schema to a FITS header.""" 

278 for n, plane in enumerate(self): 

279 if plane is not None: 

280 bit = self.bit(plane.name) 

281 if bit.index != 0: 

282 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.") 

283 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.") 

284 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.") 

285 # We don't add a comment to the description card, because it's 

286 # likely to overrun a single card and get the CONTINUE 

287 # treatment. That will cause Astropy to warn about the comment 

288 # being truncated and that's worse than just leaving it 

289 # unexplained; it's pretty obvious from context what it is. 

290 header.set(f"MSKD{n:04d}", plane.description) 

291 

292 def strip_header(self, header: astropy.io.fits.Header) -> None: 

293 """Remove all header cards added by `update_header`.""" 

294 for n, plane in enumerate(self): 

295 if plane is not None: 

296 header.remove(f"MSKN{n:04d}", ignore_missing=True) 

297 header.remove(f"MSKM{n:04d}", ignore_missing=True) 

298 header.remove(f"MSKD{n:04d}", ignore_missing=True) 

299 

300 

301class Mask(GeneralizedImage): 

302 """A 2-d bitmask image backed by a 3-d byte array. 

303 

304 Parameters 

305 ---------- 

306 array_or_fill 

307 Array or fill value for the mask. If a fill value, ``bbox`` or 

308 ``shape`` must be provided. 

309 schema 

310 Schema that defines the planes and their bit assignments. 

311 bbox 

312 Bounding box for the mask. This sets the shape of the first two 

313 dimensions of the array. 

314 start 

315 Logical coordinates of the first pixel in the array, ordered ``y``, 

316 ``x`` (unless an `XY` instance is passed). Ignored if 

317 ``bbox`` is provided. Defaults to zeros. 

318 shape 

319 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

320 instance is passed). Only needed if ``array_or_fill`` is not an 

321 array and ``bbox`` is not provided. Like the bbox, this does not 

322 include the last dimension of the array. 

323 projection 

324 Projection that maps the pixel grid to the sky. 

325 metadata 

326 Arbitrary flexible metadata to associate with the mask. 

327 

328 Notes 

329 ----- 

330 Indexing the `array` attribute of a `Mask` does not take into account its 

331 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with 

332 a `Box` does, and the `bbox` of the subimage is set to match its location 

333 within the original mask. 

334 

335 A mask's ``bbox`` corresponds to the leading dimensions of its backing 

336 `numpy.ndarray`, while the last dimension's size is always equal to the 

337 `~MaskSchema.mask_size` of its schema, since a schema can in general 

338 require multiple array elements to represent all of its planes. 

339 """ 

340 

341 def __init__( 

342 self, 

343 array_or_fill: np.ndarray | int = 0, 

344 /, 

345 *, 

346 schema: MaskSchema, 

347 bbox: Box | None = None, 

348 start: Sequence[int] | None = None, 

349 shape: Sequence[int] | None = None, 

350 projection: Projection | None = None, 

351 metadata: dict[str, MetadataValue] | None = None, 

352 ): 

353 super().__init__(metadata) 

354 if shape is not None: 

355 shape = tuple(shape) 

356 if start is not None: 

357 start = tuple(start) 

358 if isinstance(array_or_fill, np.ndarray): 

359 array = np.array(array_or_fill, dtype=schema.dtype, copy=None) 

360 if array.ndim != 3: 

361 raise ValueError("Mask array must be 3-d.") 

362 if bbox is None: 

363 bbox = Box.from_shape(array.shape[:-1], start=start) 

364 elif bbox.shape + (schema.mask_size,) != array.shape: 

365 raise ValueError( 

366 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not " 

367 f"match array with shape {array.shape}." 

368 ) 

369 if shape is not None and shape + (schema.mask_size,) != array.shape: 

370 raise ValueError( 

371 f"Explicit shape {shape} and schema of size {schema.mask_size} do " 

372 f"not match array with shape {array.shape}." 

373 ) 

374 

375 else: 

376 if bbox is None: 

377 if shape is None: 

378 raise TypeError("No bbox, size, or array provided.") 

379 bbox = Box.from_shape(shape, start=start) 

380 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype) 

381 self._array = array 

382 self._bbox: Box = bbox 

383 self._schema: MaskSchema = schema 

384 self._projection = projection 

385 

386 @property 

387 def array(self) -> np.ndarray: 

388 """The low-level array (`numpy.ndarray`). 

389 

390 Assigning to this attribute modifies the existing array in place; the 

391 bounding box and underlying data pointer are never changed. 

392 """ 

393 return self._array 

394 

395 @array.setter 

396 def array(self, value: np.ndarray | int) -> None: 

397 self._array[:, :] = value 

398 

399 @property 

400 def schema(self) -> MaskSchema: 

401 """Schema that defines the planes and their bit assignments 

402 (`MaskSchema`). 

403 """ 

404 return self._schema 

405 

406 @property 

407 def bbox(self) -> Box: 

408 """2-d bounding box of the mask (`Box`). 

409 

410 This sets the shape of the first two dimensions of the array. 

411 """ 

412 return self._bbox 

413 

414 @property 

415 def projection(self) -> Projection[Any] | None: 

416 """The projection that maps this mask's pixel grid to the sky 

417 (`Projection` | `None`). 

418 

419 Notes 

420 ----- 

421 The pixel coordinates used by this projection account for the bounding 

422 box ``start``; they are not just array indices. 

423 """ 

424 return self._projection 

425 

426 def __getitem__(self, bbox: Box | EllipsisType) -> Mask: 

427 if bbox is ...: 

428 return self 

429 super().__getitem__(bbox) 

430 return self._transfer_metadata( 

431 Mask( 

432 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :], 

433 bbox=bbox, 

434 schema=self.schema, 

435 projection=self._projection, 

436 ), 

437 bbox=bbox, 

438 ) 

439 

440 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None: 

441 subview = self[bbox] 

442 subview.clear() 

443 subview.update(value) 

444 

445 def __str__(self) -> str: 

446 return f"Mask({self.bbox!s}, {list(self.schema.names)})" 

447 

448 def __repr__(self) -> str: 

449 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})" 

450 

451 def __eq__(self, other: object) -> bool: 

452 if not isinstance(other, Mask): 

453 return NotImplemented 

454 return ( 

455 self._bbox == other._bbox 

456 and self._schema == other._schema 

457 and np.array_equal(self._array, other._array, equal_nan=True) 

458 ) 

459 

460 def copy(self) -> Mask: 

461 """Deep-copy the mask and metadata.""" 

462 return self._transfer_metadata( 

463 Mask(self._array.copy(), bbox=self._bbox, schema=self._schema, projection=self._projection), 

464 copy=True, 

465 ) 

466 

467 def view( 

468 self, 

469 *, 

470 schema: MaskSchema | EllipsisType = ..., 

471 projection: Projection | None | EllipsisType = ..., 

472 start: Sequence[int] | EllipsisType = ..., 

473 ) -> Mask: 

474 """Make a view of the mask, with optional updates. 

475 

476 Notes 

477 ----- 

478 This can only be used to make changes to schema descriptions; plane 

479 names must remain the same (in the same order). 

480 """ 

481 if schema is ...: 

482 schema = self._schema 

483 else: 

484 if list(schema.names) != list(self.schema.names): 

485 raise ValueError("Cannot create a mask view with a schema with different names.") 

486 if projection is ...: 

487 projection = self._projection 

488 if start is ...: 

489 start = self._bbox.start 

490 return self._transfer_metadata(Mask(self._array, start=start, schema=schema, projection=projection)) 

491 

492 def update(self, other: Mask) -> None: 

493 """Update ``self`` to include all common mask values set in ``other``. 

494 

495 Notes 

496 ----- 

497 This only operates on the intersection of the two mask bounding boxes 

498 and the mask planes that are present in both. Mask bits are only set, 

499 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments). 

500 """ 

501 lhs = self 

502 rhs = other 

503 if other.bbox != self.bbox: 

504 try: 

505 bbox = self.bbox.intersection(other.bbox) 

506 except NoOverlapError: 

507 return 

508 lhs = self[bbox] 

509 rhs = other[bbox] 

510 for name in self.schema.names & other.schema.names: 

511 lhs.set(name, rhs.get(name)) 

512 

513 def get(self, plane: str) -> np.ndarray: 

514 """Return a 2-d boolean array for the given mask plane. 

515 

516 Parameters 

517 ---------- 

518 plane 

519 Name of the mask plane. 

520 

521 Returns 

522 ------- 

523 numpy.ndarray 

524 A 2-d boolean array with the same shape as `bbox` that is `True` 

525 where the bit for ``plane`` is set and `False` elsewhere. 

526 """ 

527 bit = self.schema.bit(plane) 

528 return (self._array[..., bit.index] & bit.mask).astype(bool) 

529 

530 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

531 """Set a mask plane. 

532 

533 Parameters 

534 ---------- 

535 plane 

536 Name of the mask plane to set 

537 boolean_mask 

538 A 2-d boolean array with the same shape as `bbox` that is `True` 

539 where the bit for ``plane`` should be set and `False` where it 

540 should be left unchanged (*not* set to zero). May be ``...`` to 

541 set the bit everywhere. 

542 """ 

543 bit = self.schema.bit(plane) 

544 if boolean_mask is not ...: 

545 boolean_mask = boolean_mask.astype(bool) 

546 self._array[boolean_mask, bit.index] |= bit.mask 

547 

548 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

549 """Clear one or more mask planes. 

550 

551 Parameters 

552 ---------- 

553 plane 

554 Name of the mask plane to set. If `None` all mask planes are 

555 cleared. 

556 boolean_mask 

557 A 2-d boolean array with the same shape as `bbox` that is `True` 

558 where the bit for ``plane`` should be cleared and `False` where it 

559 should be left unchanged. May be ``...`` to clear the bit 

560 everywhere. 

561 """ 

562 if boolean_mask is not ...: 

563 boolean_mask = boolean_mask.astype(bool) 

564 if plane is None: 

565 self._array[boolean_mask, :] = 0 

566 else: 

567 bit = self.schema.bit(plane) 

568 self._array[boolean_mask, bit.index] &= ~bit.mask 

569 

570 def serialize[P: pydantic.BaseModel]( 

571 self, 

572 archive: OutputArchive[P], 

573 *, 

574 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

575 save_projection: bool = True, 

576 add_offset_wcs: str | None = "A", 

577 ) -> MaskSerializationModel[P]: 

578 """Serialize the mask to an output archive. 

579 

580 Parameters 

581 ---------- 

582 archive 

583 Archive to write to. 

584 update_header 

585 A callback that will be given the FITS header for the HDU 

586 containing this mask in order to add keys to it. This callback 

587 may be provided but will not be called if the output format is not 

588 FITS. As multiple HDUs may be added, this function may be called 

589 multiple times. 

590 save_projection 

591 If `True`, save the `Projection` attached to the image, if there 

592 is one. This does not affect whether a FITS WCS corresponding to 

593 the projection is written (it always is, if available, and if 

594 ``add_offset_wcs`` is not ``" "``). 

595 add_offset_wcs 

596 A FITS WCS single-character suffix to use when adding a linear 

597 WCS that maps the FITS array to the logical pixel coordinates 

598 defined by ``bbox.start``. Set to `None` to not write this WCS. 

599 If this is set to ``" "``, it will prevent the `Projection` from 

600 being saved as a FITS WCS. 

601 """ 

602 if _archive_prefers_native_mask_arrays(archive): 

603 # HDS presents array dimensions in Fortran order, which is the 

604 # reverse of the h5py dataset shape. Store the in-memory trailing 

605 # mask-byte axis first in HDF5 so Starlink tools see HDS axes 

606 # (x, y, byte), without changing the bit packing within a pixel. 

607 array_model = archive.add_array(np.moveaxis(self._array, -1, 0), update_header=update_header) 

608 if not isinstance(array_model, ArrayReferenceModel): 

609 raise RuntimeError("Native mask arrays require reference array storage.") 

610 array_model.shape = list(self._array.shape) 

611 data: list[ArrayReferenceModel | InlineArrayModel] = [array_model] 

612 else: 

613 data = [] 

614 for schema_2d in self.schema.split(np.int32): 

615 mask_2d = Mask(0, bbox=self.bbox, schema=schema_2d, projection=self._projection) 

616 mask_2d.update(self) 

617 data.append( 

618 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs) 

619 ) 

620 serialized_projection: ProjectionSerializationModel[P] | None = None 

621 if save_projection and self.projection is not None: 

622 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

623 serialized_dtype = NumberType.from_numpy(self.schema.dtype) 

624 assert is_integer(serialized_dtype), "Mask dtypes should always be integers." 

625 return MaskSerializationModel.model_construct( 

626 data=data, 

627 start=list(self.bbox.start), 

628 planes=list(self.schema), 

629 dtype=serialized_dtype, 

630 projection=serialized_projection, 

631 metadata=self.metadata, 

632 ) 

633 

634 def _serialize_2d[P: pydantic.BaseModel]( 

635 self, 

636 archive: OutputArchive[P], 

637 *, 

638 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

639 add_offset_wcs: str | None = "A", 

640 ) -> ArrayReferenceModel | InlineArrayModel: 

641 def _update_header(header: astropy.io.fits.Header) -> None: 

642 update_header(header) 

643 self.schema.update_header(header) 

644 if self.projection is not None and add_offset_wcs != " ": 

645 if self.fits_wcs: 

646 header.update(self.fits_wcs.to_header(relax=True)) 

647 if add_offset_wcs is not None: 

648 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

649 

650 assert self.array.shape[2] == 1, "Mask should be split before calling this method." 

651 return archive.add_array(self._array[:, :, 0], update_header=_update_header) 

652 

653 @staticmethod 

654 def _get_archive_tree_type[P: pydantic.BaseModel]( 

655 pointer_type: type[P], 

656 ) -> type[MaskSerializationModel[P]]: 

657 """Return the serialization model type for this object for an archive 

658 type that uses the given pointer type. 

659 """ 

660 return MaskSerializationModel[pointer_type] # type: ignore 

661 

662 _archive_default_name: ClassVar[str] = "mask" 

663 """The name this object should be serialized with when written as the 

664 top-level object. 

665 """ 

666 

667 def write_fits( 

668 self, 

669 filename: str, 

670 *, 

671 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

672 ) -> None: 

673 """Write the mask to a FITS file. 

674 

675 Parameters 

676 ---------- 

677 filename 

678 Name of the file to write to. Must be a local file. 

679 compression 

680 Compression options. 

681 """ 

682 compression_options = {} 

683 if compression is not fits.FitsCompressionOptions.DEFAULT: 

684 compression_options[self._archive_default_name] = compression 

685 fits.write(self, filename, compression_options) 

686 

687 @staticmethod 

688 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask: 

689 """Read an image from a FITS file. 

690 

691 Parameters 

692 ---------- 

693 url 

694 URL of the file to read; may be any type supported by 

695 `lsst.resources.ResourcePath`. 

696 bbox 

697 Bounding box of a subimage to read instead. 

698 """ 

699 return fits.read(Mask, url, bbox=bbox).deserialized 

700 

701 @staticmethod 

702 def from_legacy( 

703 legacy: Any, 

704 plane_map: Mapping[str, MaskPlane] | None = None, 

705 ) -> Mask: 

706 """Convert from an `lsst.afw.image.Mask` instance. 

707 

708 Parameters 

709 ---------- 

710 legacy 

711 An `lsst.afw.image.Mask` instance. This will not share pixel 

712 data with the new object. 

713 plane_map 

714 A mapping from legacy mask plane name to the new plane name and 

715 description. 

716 """ 

717 return Mask._from_legacy_array( 

718 legacy.array, 

719 legacy.getMaskPlaneDict(), 

720 start=YX(y=legacy.getY0(), x=legacy.getX0()), 

721 plane_map=plane_map, 

722 ) 

723 

724 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

725 """Convert to an `lsst.afw.image.Mask` instance. 

726 

727 The pixel data will not be shared between the two objects. 

728 

729 Parameters 

730 ---------- 

731 plane_map 

732 A mapping from legacy mask plane name to the new plane name and 

733 description. 

734 """ 

735 import lsst.afw.image 

736 import lsst.geom 

737 

738 result = lsst.afw.image.Mask(self.bbox.to_legacy()) 

739 if plane_map is None: 

740 plane_map = {plane.name: plane for plane in self.schema if plane is not None} 

741 for old_name, new_plane in plane_map.items(): 

742 old_bit = result.addMaskPlane(old_name) 

743 old_bitmask = 1 << old_bit 

744 result.array[self.get(new_plane.name)] |= old_bitmask 

745 return result 

746 

747 @staticmethod 

748 def _from_legacy_array( 

749 array2d: np.ndarray, 

750 old_planes: Mapping[str, int], 

751 *, 

752 start: YX[int], 

753 plane_map: Mapping[str, MaskPlane] | None = None, 

754 projection: Projection | None = None, 

755 ) -> Mask: 

756 planes: list[MaskPlane] = [] 

757 new_name_to_old_bitmask: dict[str, int] = {} 

758 for old_name, old_bit in old_planes.items(): 

759 old_bitmask = 1 << old_bit 

760 if plane_map is not None: 

761 if new_plane := plane_map.get(old_name): 

762 planes.append(new_plane) 

763 new_name_to_old_bitmask[new_plane.name] = old_bitmask 

764 else: 

765 if n_orphaned := np.count_nonzero(array2d & old_bitmask): 

766 raise RuntimeError( 

767 f"Legacy mask plane {old_name!r} is not remapped, " 

768 f"but {n_orphaned} pixels have this bit set." 

769 ) 

770 else: 

771 planes.append(MaskPlane(old_name, "")) 

772 new_name_to_old_bitmask[old_name] = old_bitmask 

773 schema = MaskSchema(planes) 

774 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection) 

775 for new_name, old_bitmask in new_name_to_old_bitmask.items(): 

776 mask.set(new_name, array2d & old_bitmask) 

777 return mask 

778 

779 @staticmethod 

780 def read_legacy( 

781 uri: ResourcePathExpression, 

782 *, 

783 plane_map: Mapping[str, MaskPlane] | None = None, 

784 ext: str | int = 1, 

785 fits_wcs_frame: Frame | None = None, 

786 ) -> Mask: 

787 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`. 

788 

789 Parameters 

790 ---------- 

791 uri 

792 URI or file name. 

793 plane_map 

794 A mapping from legacy mask plane name to the new plane name and 

795 description. 

796 ext 

797 Name or index of the FITS HDU to read. 

798 fits_wcs_frame 

799 If not `None` and the HDU containing the mask has a FITS WCS, 

800 attach a `Projection` to the returned mask by converting that WCS. 

801 """ 

802 opaque_metadata = fits.FitsOpaqueMetadata() 

803 fs, fspath = ResourcePath(uri).to_fsspec() 

804 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

805 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

806 result = Mask._read_legacy_hdu( 

807 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame 

808 ) 

809 result._opaque_metadata = opaque_metadata 

810 return result 

811 

812 @staticmethod 

813 def _read_legacy_hdu( 

814 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

815 opaque_metadata: fits.FitsOpaqueMetadata, 

816 plane_map: Mapping[str, MaskPlane] | None = None, 

817 fits_wcs_frame: Frame | None = None, 

818 ) -> Mask: 

819 if isinstance(hdu, astropy.io.fits.BinTableHDU): 

820 hdu = astropy.io.fits.CompImageHDU(bintable=hdu) 

821 dx: int = hdu.header.pop("LTV1") 

822 dy: int = hdu.header.pop("LTV2") 

823 start = YX(y=-dy, x=-dx) 

824 old_planes = MaskPlane.read_legacy(hdu.header) 

825 projection: Projection | None = None 

826 if fits_wcs_frame is not None: 

827 try: 

828 fits_wcs = astropy.wcs.WCS(hdu.header) 

829 except KeyError: 

830 pass 

831 else: 

832 projection = Projection.from_fits_wcs( 

833 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

834 ) 

835 mask = Mask._from_legacy_array( 

836 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection 

837 ) 

838 fits.strip_wcs_cards(hdu.header) 

839 hdu.header.strip() 

840 hdu.header.remove("EXTTYPE", ignore_missing=True) 

841 hdu.header.remove("INHERIT", ignore_missing=True) 

842 # afw set BUNIT on masks because of limitations in how FITS 

843 # metadata is handled there. 

844 hdu.header.remove("BUNIT", ignore_missing=True) 

845 opaque_metadata.add_header(hdu.header) 

846 return mask 

847 

848 

849class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

850 """Pydantic model used to represent the serialized form of a `.Mask`.""" 

851 

852 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field( 

853 description="References to pixel data." 

854 ) 

855 start: list[int] = pydantic.Field( 

856 description="Coordinate of the first pixels in the array, ordered (y, x)." 

857 ) 

858 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.") 

859 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.") 

860 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

861 default=None, 

862 exclude_if=is_none, 

863 description="Projection that maps the logical pixel grid onto the sky.", 

864 ) 

865 

866 @property 

867 def bbox(self) -> Box: 

868 """The 2-d bounding box of the mask.""" 

869 shape = self.data[0].shape 

870 if len(shape) == 3: 

871 shape = shape[:2] 

872 return Box.from_shape(shape, start=self.start) 

873 

874 def deserialize( 

875 self, 

876 archive: InputArchive[Any], 

877 *, 

878 bbox: Box | None = None, 

879 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

880 **kwargs: Any, 

881 ) -> Mask: 

882 """Deserialize a mask from an input archive. 

883 

884 Parameters 

885 ---------- 

886 archive 

887 Archive to read from. 

888 bbox 

889 Bounding box of a subimage to read instead. 

890 strip_header 

891 A callable that strips out any FITS header cards added by the 

892 ``update_header`` argument in the corresponding call to 

893 `Mask.serialize`. 

894 **kwargs 

895 Unsupported keyword arguments are accepted only to provide better 

896 error messages (raising `serialization.InvalidParameterError`). 

897 """ 

898 if kwargs: 

899 raise InvalidParameterError(f"Unrecognized parameters for Mask: {set(kwargs.keys())}.") 

900 slices: tuple[slice, ...] | EllipsisType = ... 

901 if bbox is not None: 

902 slices = bbox.slice_within(self.bbox) 

903 else: 

904 bbox = self.bbox 

905 if not is_integer(self.dtype): 

906 raise ArchiveReadError(f"Mask array has a non-integer dtype: {self.dtype}.") 

907 schema = MaskSchema(self.planes, dtype=self.dtype.to_numpy()) 

908 projection = self.projection.deserialize(archive) if self.projection is not None else None 

909 if len(self.data) == 1 and tuple(self.data[0].shape) == tuple(self.bbox.shape) + (schema.mask_size,): 

910 storage_slices = slices if slices is ... else (slice(None),) + slices 

911 array = archive.get_array(self.data[0], strip_header=strip_header, slices=storage_slices) 

912 array = np.moveaxis(array, 0, -1) 

913 return Mask(array, schema=schema, bbox=bbox, projection=projection)._finish_deserialize(self) 

914 result = Mask(0, schema=schema, bbox=bbox, projection=projection) 

915 schemas_2d = schema.split(np.int32) 

916 if len(schemas_2d) != len(self.data): 

917 raise ArchiveReadError( 

918 f"Number of mask arrays ({len(self.data)}) does not match expectation ({len(schemas_2d)})." 

919 ) 

920 for array_model, schema_2d in zip(self.data, schemas_2d): 

921 mask_2d = self._deserialize_2d( 

922 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices 

923 ) 

924 result.update(mask_2d) 

925 return result._finish_deserialize(self) 

926 

927 @staticmethod 

928 def _deserialize_2d( 

929 ref: ArrayReferenceModel | InlineArrayModel, 

930 schema_2d: MaskSchema, 

931 start: Sequence[int], 

932 archive: InputArchive[Any], 

933 *, 

934 slices: tuple[slice, ...] | EllipsisType = ..., 

935 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

936 ) -> Mask: 

937 def _strip_header(header: astropy.io.fits.Header) -> None: 

938 strip_header(header) 

939 schema_2d.strip_header(header) 

940 fits.strip_wcs_cards(header) 

941 

942 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

943 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start) 

944 

945 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

946 if kwargs: 

947 raise InvalidParameterError(f"Unsupported parameters for Mask components: {set(kwargs.keys())}.") 

948 return super().deserialize_component(component, archive) 

949 

950 

951def _archive_prefers_native_mask_arrays(archive: OutputArchive[Any]) -> bool: 

952 """Return whether an archive wants masks in their native 3-D layout.""" 

953 current: Any = archive 

954 while current is not None: 

955 if getattr(current, "_prefer_native_mask_arrays", False): 

956 return True 

957 current = getattr(current, "_parent", None) 

958 return False 

959 

960 

961def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]: 

962 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

963 for LSST visit images, c. DP2. 

964 """ 

965 return { 

966 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."), 

967 "SAT": MaskPlane( 

968 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel." 

969 ), 

970 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."), 

971 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."), 

972 "EDGE": MaskPlane( 

973 "DETECTION_EDGE", 

974 "Pixel was too close to the edge to be considered for detection, " 

975 "due to the finite size of the detection kernel.", 

976 ), 

977 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

978 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "), 

979 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

980 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."), 

981 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."), 

982 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."), 

983 "ITL_DIP": MaskPlane( 

984 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD." 

985 ), 

986 "NOT_DEBLENDED": MaskPlane( 

987 "NOT_DEBLENDED", 

988 "Pixel belonged to a detection that was not deblended, usually due to size limits.", 

989 ), 

990 "SPIKE": MaskPlane( 

991 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star." 

992 ), 

993 } 

994 

995 

996def get_legacy_deep_coadd_mask_planes() -> dict[str, MaskPlane]: 

997 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

998 for LSST deep coadds, c. DP2. 

999 """ 

1000 return { 

1001 # TODO: reconcile this with counts from the DP2 coadds. 

1002 # BAD, CLIPPED, SUSPECT, PARTLY_VIGNETTED, SPIKE: should be fully 

1003 # rejected from (cell) coadds with no propagation. 

1004 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

1005 "INTRP": MaskPlane("INTERPOLATED", "Pixel value is the result of interpolating nearby good pixels."), 

1006 "CR": MaskPlane( 

1007 "COSMIC_RAY", 

1008 "A cosmic ray affected this pixel on at least one input image (and was interpolated).", 

1009 ), 

1010 "SAT": MaskPlane("SATURATED", "More than 10% of the potential input visits."), 

1011 "EDGE": MaskPlane( 

1012 "DETECTION_EDGE", 

1013 "Pixel was too close to the edge to be considered for detection, " 

1014 "due to the finite size of the detection kernel.", 

1015 ), 

1016 "REJECTED": MaskPlane( 

1017 "REJECTED", "At least one input visit was left out of the coadd for this pixel due to masking." 

1018 ), 

1019 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

1020 "INEXACT_PSF": MaskPlane( 

1021 "INEXACT_PSF", 

1022 "Pixel is on or near a cell boundary and hence its PSF may be (usually slightly) discontinuous.", 

1023 ), 

1024 }