Coverage for python/lsst/images/_mask.py: 22%

389 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 08:40 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "Mask", 

16 "MaskPlane", 

17 "MaskPlaneBit", 

18 "MaskSchema", 

19 "MaskSerializationModel", 

20 "get_legacy_deep_coadd_mask_planes", 

21 "get_legacy_visit_image_mask_planes", 

22) 

23 

24import dataclasses 

25import math 

26from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set 

27from types import EllipsisType 

28from typing import TYPE_CHECKING, Any, ClassVar, cast 

29 

30import astropy.io.fits 

31import astropy.wcs 

32import numpy as np 

33import numpy.typing as npt 

34import pydantic 

35 

36from lsst.resources import ResourcePath, ResourcePathExpression 

37 

38from . import fits 

39from ._generalized_image import GeneralizedImage 

40from ._geom import YX, Box, NoOverlapError 

41from ._transforms import Frame, Projection, ProjectionSerializationModel 

42from .serialization import ( 

43 ArchiveReadError, 

44 ArchiveTree, 

45 ArrayReferenceModel, 

46 InlineArrayModel, 

47 InputArchive, 

48 IntegerType, 

49 InvalidParameterError, 

50 MetadataValue, 

51 NumberType, 

52 OutputArchive, 

53 is_integer, 

54 no_header_updates, 

55) 

56from .utils import is_none 

57 

58if TYPE_CHECKING: 

59 try: 

60 from lsst.afw.image import Mask as LegacyMask 

61 except ImportError: 

62 type LegacyMask = Any # type: ignore[no-redef] 

63 

64 

65@dataclasses.dataclass(frozen=True) 

66class MaskPlane: 

67 """Name and description of a single plane in a mask array.""" 

68 

69 name: str 

70 """Unique name for the mask plane (`str`).""" 

71 

72 description: str 

73 """Human-readable documentation for the mask plane (`str`).""" 

74 

75 @classmethod 

76 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]: 

77 """Read mask plane descriptions written by 

78 `lsst.afw.image.Mask.writeFits`. 

79 

80 Parameters 

81 ---------- 

82 header 

83 FITS header. 

84 

85 Returns 

86 ------- 

87 `dict` [`str`, `int`] 

88 A dictionary mapping mask plane name to integer bit index. 

89 """ 

90 result: dict[str, int] = {} 

91 for card in list(header.cards): 

92 if card.keyword.startswith("MP_"): 

93 result[card.keyword.removeprefix("MP_")] = card.value 

94 del header[card.keyword] 

95 return result 

96 

97 

98@dataclasses.dataclass(frozen=True) 

99class MaskPlaneBit: 

100 """The nested array index and mask value associated with a single mask 

101 plane. 

102 """ 

103 

104 index: int 

105 """Index into the last dimension of the mask array where this plane's bit 

106 is stored. 

107 """ 

108 

109 mask: np.integer 

110 """Bitmask that selects just this plane's bit from a mask array value 

111 (`numpy.integer`). 

112 """ 

113 

114 @classmethod 

115 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit: 

116 """Construct a `MaskPlaneBit` from the overall index of a plane in a 

117 `MaskSchema` and the stride (number of bits per mask array element). 

118 """ 

119 index, bit = divmod(overall_index, stride) 

120 return cls(index, mask_type(1 << bit)) 

121 

122 

123class MaskSchema: 

124 """A schema for a bit-packed mask array. 

125 

126 Parameters 

127 ---------- 

128 planes 

129 Iterable of `MaskPlane` instances that define the schema. `None` 

130 values may be included to reserve bits for future use. 

131 dtype 

132 The numpy data type of the mask arrays that use this schema. 

133 

134 Notes 

135 ----- 

136 A `MaskSchema` is a collection of mask planes, which each correspond to a 

137 single bit in a mask array. Mask schemas are immutable and associated with 

138 a particular array data type, allowing them to safely precompute the index 

139 and bitmask for each plane. 

140 

141 `MaskSchema` indexing is by integer (the overall index of a plane in the 

142 schema). The `descriptions` attribute may be indexed by plane name to get 

143 the description for that plane, and the `bitmask` method can be used to 

144 obtain an array that can be used to select one or more planes by name in 

145 a mask array that uses this schema. 

146 

147 If no mask planes are provided, a `None` placeholder is automatically 

148 added. 

149 """ 

150 

151 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8): 

152 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,) 

153 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype)) 

154 stride = self.bits_per_element(self._dtype) 

155 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None} 

156 self._mask_size = math.ceil(len(self._planes) / stride) 

157 self._bits: dict[str, MaskPlaneBit] = { 

158 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type) 

159 for n, plane in enumerate(self._planes) 

160 if plane is not None 

161 } 

162 

163 @staticmethod 

164 def bits_per_element(dtype: npt.DTypeLike) -> int: 

165 """Return the number of mask bits per array element for the given 

166 data type. 

167 """ 

168 dtype = np.dtype(dtype) 

169 match dtype.kind: 

170 case "u": 

171 return dtype.itemsize * 8 

172 case "i": 

173 return dtype.itemsize * 8 - 1 

174 case _: 

175 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.") 

176 

177 def __iter__(self) -> Iterator[MaskPlane | None]: 

178 return iter(self._planes) 

179 

180 def __len__(self) -> int: 

181 return len(self._planes) 

182 

183 def __getitem__(self, i: int) -> MaskPlane | None: 

184 return self._planes[i] 

185 

186 def __repr__(self) -> str: 

187 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})" 

188 

189 def __str__(self) -> str: 

190 return "\n".join( 

191 [ 

192 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}" 

193 for name, bit in self._bits.items() 

194 ] 

195 ) 

196 

197 def __eq__(self, other: object) -> bool: 

198 if isinstance(other, MaskSchema): 

199 return self._planes == other._planes and self._dtype == other._dtype 

200 return False 

201 

202 @property 

203 def dtype(self) -> np.dtype: 

204 """The numpy data type of the mask arrays that use this schema.""" 

205 return self._dtype 

206 

207 @property 

208 def mask_size(self) -> int: 

209 """The number of elements in the last dimension of any mask array that 

210 uses this schema. 

211 """ 

212 return self._mask_size 

213 

214 @property 

215 def names(self) -> Set[str]: 

216 """The names of the mask planes, in bit order.""" 

217 return self._bits.keys() 

218 

219 @property 

220 def descriptions(self) -> Mapping[str, str]: 

221 """A mapping from plane name to description.""" 

222 return self._descriptions 

223 

224 def bit(self, plane: str) -> MaskPlaneBit: 

225 """Return the last array index and mask for the given mask plane.""" 

226 return self._bits[plane] 

227 

228 def bitmask(self, *planes: str) -> np.ndarray: 

229 """Return a 1-d mask array that represents the union (i.e. bitwise OR) 

230 of the planes with the given names. 

231 

232 Parameters 

233 ---------- 

234 *planes 

235 Mask plane names. 

236 

237 Returns 

238 ------- 

239 numpy.ndarray 

240 A 1-d array with shape ``(mask_size,)``. 

241 """ 

242 result = np.zeros(self.mask_size, dtype=self._dtype) 

243 for plane in planes: 

244 bit = self._bits[plane] 

245 result[bit.index] |= bit.mask 

246 return result 

247 

248 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]: 

249 """Split the schema into an equivalent series of schemas that each 

250 have a `mask_size` of ``1``, dropping all `None` placeholders. 

251 

252 Parameters 

253 ---------- 

254 dtype 

255 Data type of the new mask pixels. 

256 

257 Returns 

258 ------- 

259 `list` [`MaskSchema`] 

260 A list of mask schemas that together include all planes in 

261 ``self`` and have `mask_size` equal to ``1``. If there are no 

262 mask planes (only `None` placeholders) in ``self``, a single mask 

263 schema with a `None` placeholder is returned; otherwise `None` 

264 placeholders are returned. 

265 """ 

266 dtype = np.dtype(dtype) 

267 planes: list[MaskPlane] = [] 

268 schemas: list[MaskSchema] = [] 

269 n_planes_per_schema = self.bits_per_element(dtype) 

270 for plane in self._planes: 

271 if plane is not None: 

272 planes.append(plane) 

273 if len(planes) == n_planes_per_schema: 

274 schemas.append(MaskSchema(planes, dtype=dtype)) 

275 planes.clear() 

276 if planes: 

277 schemas.append(MaskSchema(planes, dtype=dtype)) 

278 if not schemas: 

279 schemas.append(MaskSchema([None], dtype=dtype)) 

280 return schemas 

281 

282 def update_header(self, header: astropy.io.fits.Header) -> None: 

283 """Add a description of this mask schema to a FITS header.""" 

284 for n, plane in enumerate(self): 

285 if plane is not None: 

286 bit = self.bit(plane.name) 

287 if bit.index != 0: 

288 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.") 

289 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.") 

290 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.") 

291 # We don't add a comment to the description card, because it's 

292 # likely to overrun a single card and get the CONTINUE 

293 # treatment. That will cause Astropy to warn about the comment 

294 # being truncated and that's worse than just leaving it 

295 # unexplained; it's pretty obvious from context what it is. 

296 header.set(f"MSKD{n:04d}", plane.description) 

297 

298 def strip_header(self, header: astropy.io.fits.Header) -> None: 

299 """Remove all header cards added by `update_header`.""" 

300 for n, plane in enumerate(self): 

301 if plane is not None: 

302 header.remove(f"MSKN{n:04d}", ignore_missing=True) 

303 header.remove(f"MSKM{n:04d}", ignore_missing=True) 

304 header.remove(f"MSKD{n:04d}", ignore_missing=True) 

305 

306 

307class Mask(GeneralizedImage): 

308 """A 2-d bitmask image backed by a 3-d byte array. 

309 

310 Parameters 

311 ---------- 

312 array_or_fill 

313 Array or fill value for the mask. If a fill value, ``bbox`` or 

314 ``shape`` must be provided. 

315 schema 

316 Schema that defines the planes and their bit assignments. 

317 bbox 

318 Bounding box for the mask. This sets the shape of the first two 

319 dimensions of the array. 

320 start 

321 Logical coordinates of the first pixel in the array, ordered ``y``, 

322 ``x`` (unless an `XY` instance is passed). Ignored if 

323 ``bbox`` is provided. Defaults to zeros. 

324 shape 

325 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

326 instance is passed). Only needed if ``array_or_fill`` is not an 

327 array and ``bbox`` is not provided. Like the bbox, this does not 

328 include the last dimension of the array. 

329 projection 

330 Projection that maps the pixel grid to the sky. 

331 metadata 

332 Arbitrary flexible metadata to associate with the mask. 

333 

334 Notes 

335 ----- 

336 Indexing the `array` attribute of a `Mask` does not take into account its 

337 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with 

338 a `Box` does, and the `bbox` of the subimage is set to match its location 

339 within the original mask. 

340 

341 A mask's ``bbox`` corresponds to the leading dimensions of its backing 

342 `numpy.ndarray`, while the last dimension's size is always equal to the 

343 `~MaskSchema.mask_size` of its schema, since a schema can in general 

344 require multiple array elements to represent all of its planes. 

345 """ 

346 

347 def __init__( 

348 self, 

349 array_or_fill: np.ndarray | int = 0, 

350 /, 

351 *, 

352 schema: MaskSchema, 

353 bbox: Box | None = None, 

354 start: Sequence[int] | None = None, 

355 shape: Sequence[int] | None = None, 

356 projection: Projection | None = None, 

357 metadata: dict[str, MetadataValue] | None = None, 

358 ): 

359 super().__init__(metadata) 

360 if shape is not None: 

361 shape = tuple(shape) 

362 if start is not None: 

363 start = tuple(start) 

364 if isinstance(array_or_fill, np.ndarray): 

365 array = np.array(array_or_fill, dtype=schema.dtype, copy=None) 

366 if array.ndim != 3: 

367 raise ValueError("Mask array must be 3-d.") 

368 if bbox is None: 

369 bbox = Box.from_shape(array.shape[:-1], start=start) 

370 elif bbox.shape + (schema.mask_size,) != array.shape: 

371 raise ValueError( 

372 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not " 

373 f"match array with shape {array.shape}." 

374 ) 

375 if shape is not None and shape + (schema.mask_size,) != array.shape: 

376 raise ValueError( 

377 f"Explicit shape {shape} and schema of size {schema.mask_size} do " 

378 f"not match array with shape {array.shape}." 

379 ) 

380 

381 else: 

382 if bbox is None: 

383 if shape is None: 

384 raise TypeError("No bbox, size, or array provided.") 

385 bbox = Box.from_shape(shape, start=start) 

386 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype) 

387 self._array = array 

388 self._bbox: Box = bbox 

389 self._schema: MaskSchema = schema 

390 self._projection = projection 

391 

392 @property 

393 def array(self) -> np.ndarray: 

394 """The low-level array (`numpy.ndarray`). 

395 

396 Assigning to this attribute modifies the existing array in place; the 

397 bounding box and underlying data pointer are never changed. 

398 """ 

399 return self._array 

400 

401 @array.setter 

402 def array(self, value: np.ndarray | int) -> None: 

403 self._array[:, :] = value 

404 

405 @property 

406 def schema(self) -> MaskSchema: 

407 """Schema that defines the planes and their bit assignments 

408 (`MaskSchema`). 

409 """ 

410 return self._schema 

411 

412 @property 

413 def bbox(self) -> Box: 

414 """2-d bounding box of the mask (`Box`). 

415 

416 This sets the shape of the first two dimensions of the array. 

417 """ 

418 return self._bbox 

419 

420 @property 

421 def projection(self) -> Projection[Any] | None: 

422 """The projection that maps this mask's pixel grid to the sky 

423 (`Projection` | `None`). 

424 

425 Notes 

426 ----- 

427 The pixel coordinates used by this projection account for the bounding 

428 box ``start``; they are not just array indices. 

429 """ 

430 return self._projection 

431 

432 def __getitem__(self, bbox: Box | EllipsisType) -> Mask: 

433 if bbox is ...: 

434 return self 

435 super().__getitem__(bbox) 

436 return self._transfer_metadata( 

437 Mask( 

438 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :], 

439 bbox=bbox, 

440 schema=self.schema, 

441 projection=self._projection, 

442 ), 

443 bbox=bbox, 

444 ) 

445 

446 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None: 

447 subview = self[bbox] 

448 subview.clear() 

449 subview.update(value) 

450 

451 def __str__(self) -> str: 

452 return f"Mask({self.bbox!s}, {list(self.schema.names)})" 

453 

454 def __repr__(self) -> str: 

455 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})" 

456 

457 def __eq__(self, other: object) -> bool: 

458 if not isinstance(other, Mask): 

459 return NotImplemented 

460 return ( 

461 self._bbox == other._bbox 

462 and self._schema == other._schema 

463 and np.array_equal(self._array, other._array, equal_nan=True) 

464 ) 

465 

466 def copy(self) -> Mask: 

467 """Deep-copy the mask and metadata.""" 

468 return self._transfer_metadata( 

469 Mask(self._array.copy(), bbox=self._bbox, schema=self._schema, projection=self._projection), 

470 copy=True, 

471 ) 

472 

473 def view( 

474 self, 

475 *, 

476 schema: MaskSchema | EllipsisType = ..., 

477 projection: Projection | None | EllipsisType = ..., 

478 start: Sequence[int] | EllipsisType = ..., 

479 ) -> Mask: 

480 """Make a view of the mask, with optional updates. 

481 

482 Notes 

483 ----- 

484 This can only be used to make changes to schema descriptions; plane 

485 names must remain the same (in the same order). 

486 """ 

487 if schema is ...: 

488 schema = self._schema 

489 else: 

490 if list(schema.names) != list(self.schema.names): 

491 raise ValueError("Cannot create a mask view with a schema with different names.") 

492 if projection is ...: 

493 projection = self._projection 

494 if start is ...: 

495 start = self._bbox.start 

496 return self._transfer_metadata(Mask(self._array, start=start, schema=schema, projection=projection)) 

497 

498 def update(self, other: Mask) -> None: 

499 """Update ``self`` to include all common mask values set in ``other``. 

500 

501 Notes 

502 ----- 

503 This only operates on the intersection of the two mask bounding boxes 

504 and the mask planes that are present in both. Mask bits are only set, 

505 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments). 

506 """ 

507 lhs = self 

508 rhs = other 

509 if other.bbox != self.bbox: 

510 try: 

511 bbox = self.bbox.intersection(other.bbox) 

512 except NoOverlapError: 

513 return 

514 lhs = self[bbox] 

515 rhs = other[bbox] 

516 for name in self.schema.names & other.schema.names: 

517 lhs.set(name, rhs.get(name)) 

518 

519 def get(self, plane: str) -> np.ndarray: 

520 """Return a 2-d boolean array for the given mask plane. 

521 

522 Parameters 

523 ---------- 

524 plane 

525 Name of the mask plane. 

526 

527 Returns 

528 ------- 

529 numpy.ndarray 

530 A 2-d boolean array with the same shape as `bbox` that is `True` 

531 where the bit for ``plane`` is set and `False` elsewhere. 

532 """ 

533 bit = self.schema.bit(plane) 

534 return (self._array[..., bit.index] & bit.mask).astype(bool) 

535 

536 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

537 """Set a mask plane. 

538 

539 Parameters 

540 ---------- 

541 plane 

542 Name of the mask plane to set 

543 boolean_mask 

544 A 2-d boolean array with the same shape as `bbox` that is `True` 

545 where the bit for ``plane`` should be set and `False` where it 

546 should be left unchanged (*not* set to zero). May be ``...`` to 

547 set the bit everywhere. 

548 """ 

549 bit = self.schema.bit(plane) 

550 if boolean_mask is not ...: 

551 boolean_mask = boolean_mask.astype(bool) 

552 self._array[boolean_mask, bit.index] |= bit.mask 

553 

554 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

555 """Clear one or more mask planes. 

556 

557 Parameters 

558 ---------- 

559 plane 

560 Name of the mask plane to set. If `None` all mask planes are 

561 cleared. 

562 boolean_mask 

563 A 2-d boolean array with the same shape as `bbox` that is `True` 

564 where the bit for ``plane`` should be cleared and `False` where it 

565 should be left unchanged. May be ``...`` to clear the bit 

566 everywhere. 

567 """ 

568 if boolean_mask is not ...: 

569 boolean_mask = boolean_mask.astype(bool) 

570 if plane is None: 

571 self._array[boolean_mask, :] = 0 

572 else: 

573 bit = self.schema.bit(plane) 

574 self._array[boolean_mask, bit.index] &= ~bit.mask 

575 

576 def serialize[P: pydantic.BaseModel]( 

577 self, 

578 archive: OutputArchive[P], 

579 *, 

580 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

581 save_projection: bool = True, 

582 add_offset_wcs: str | None = "A", 

583 ) -> MaskSerializationModel[P]: 

584 """Serialize the mask to an output archive. 

585 

586 Parameters 

587 ---------- 

588 archive 

589 Archive to write to. 

590 update_header 

591 A callback that will be given the FITS header for the HDU 

592 containing this mask in order to add keys to it. This callback 

593 may be provided but will not be called if the output format is not 

594 FITS. As multiple HDUs may be added, this function may be called 

595 multiple times. 

596 save_projection 

597 If `True`, save the `Projection` attached to the image, if there 

598 is one. This does not affect whether a FITS WCS corresponding to 

599 the projection is written (it always is, if available, and if 

600 ``add_offset_wcs`` is not ``" "``). 

601 add_offset_wcs 

602 A FITS WCS single-character suffix to use when adding a linear 

603 WCS that maps the FITS array to the logical pixel coordinates 

604 defined by ``bbox.start``. Set to `None` to not write this WCS. 

605 If this is set to ``" "``, it will prevent the `Projection` from 

606 being saved as a FITS WCS. 

607 """ 

608 if _archive_prefers_native_mask_arrays(archive): 

609 # HDS presents array dimensions in Fortran order, which is the 

610 # reverse of the h5py dataset shape. Store the in-memory trailing 

611 # mask-byte axis first in HDF5 so Starlink tools see HDS axes 

612 # (x, y, byte), without changing the bit packing within a pixel. 

613 array_model = archive.add_array(np.moveaxis(self._array, -1, 0), update_header=update_header) 

614 if not isinstance(array_model, ArrayReferenceModel): 

615 raise RuntimeError("Native mask arrays require reference array storage.") 

616 array_model.shape = list(self._array.shape) 

617 data: list[ArrayReferenceModel | InlineArrayModel] = [array_model] 

618 else: 

619 data = [] 

620 for schema_2d in self.schema.split(np.int32): 

621 mask_2d = Mask(0, bbox=self.bbox, schema=schema_2d, projection=self._projection) 

622 mask_2d.update(self) 

623 data.append( 

624 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs) 

625 ) 

626 serialized_projection: ProjectionSerializationModel[P] | None = None 

627 if save_projection and self.projection is not None: 

628 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

629 serialized_dtype = NumberType.from_numpy(self.schema.dtype) 

630 assert is_integer(serialized_dtype), "Mask dtypes should always be integers." 

631 return MaskSerializationModel.model_construct( 

632 data=data, 

633 start=list(self.bbox.start), 

634 planes=list(self.schema), 

635 dtype=serialized_dtype, 

636 projection=serialized_projection, 

637 metadata=self.metadata, 

638 ) 

639 

640 def _serialize_2d[P: pydantic.BaseModel]( 

641 self, 

642 archive: OutputArchive[P], 

643 *, 

644 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

645 add_offset_wcs: str | None = "A", 

646 ) -> ArrayReferenceModel | InlineArrayModel: 

647 def _update_header(header: astropy.io.fits.Header) -> None: 

648 update_header(header) 

649 self.schema.update_header(header) 

650 if self.projection is not None and add_offset_wcs != " ": 

651 if self.fits_wcs: 

652 header.update(self.fits_wcs.to_header(relax=True)) 

653 if add_offset_wcs is not None: 

654 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

655 

656 assert self.array.shape[2] == 1, "Mask should be split before calling this method." 

657 return archive.add_array(self._array[:, :, 0], update_header=_update_header) 

658 

659 @staticmethod 

660 def _get_archive_tree_type[P: pydantic.BaseModel]( 

661 pointer_type: type[P], 

662 ) -> type[MaskSerializationModel[P]]: 

663 """Return the serialization model type for this object for an archive 

664 type that uses the given pointer type. 

665 """ 

666 return MaskSerializationModel[pointer_type] # type: ignore 

667 

668 _archive_default_name: ClassVar[str] = "mask" 

669 """The name this object should be serialized with when written as the 

670 top-level object. 

671 """ 

672 

673 def write_fits( 

674 self, 

675 filename: str, 

676 *, 

677 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

678 ) -> None: 

679 """Write the mask to a FITS file. 

680 

681 Parameters 

682 ---------- 

683 filename 

684 Name of the file to write to. Must be a local file. 

685 compression 

686 Compression options. 

687 """ 

688 compression_options = {} 

689 if compression is not fits.FitsCompressionOptions.DEFAULT: 

690 compression_options[self._archive_default_name] = compression 

691 fits.write(self, filename, compression_options) 

692 

693 @staticmethod 

694 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask: 

695 """Read an image from a FITS file. 

696 

697 Parameters 

698 ---------- 

699 url 

700 URL of the file to read; may be any type supported by 

701 `lsst.resources.ResourcePath`. 

702 bbox 

703 Bounding box of a subimage to read instead. 

704 """ 

705 return fits.read(Mask, url, bbox=bbox).deserialized 

706 

707 @staticmethod 

708 def from_legacy( 

709 legacy: Any, 

710 plane_map: Mapping[str, MaskPlane] | None = None, 

711 ) -> Mask: 

712 """Convert from an `lsst.afw.image.Mask` instance. 

713 

714 Parameters 

715 ---------- 

716 legacy 

717 An `lsst.afw.image.Mask` instance. This will not share pixel 

718 data with the new object. 

719 plane_map 

720 A mapping from legacy mask plane name to the new plane name and 

721 description. 

722 """ 

723 return Mask._from_legacy_array( 

724 legacy.array, 

725 legacy.getMaskPlaneDict(), 

726 start=YX(y=legacy.getY0(), x=legacy.getX0()), 

727 plane_map=plane_map, 

728 ) 

729 

730 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

731 """Convert to an `lsst.afw.image.Mask` instance. 

732 

733 The pixel data will not be shared between the two objects. 

734 

735 Parameters 

736 ---------- 

737 plane_map 

738 A mapping from legacy mask plane name to the new plane name and 

739 description. 

740 """ 

741 import lsst.afw.image 

742 import lsst.geom 

743 

744 result = lsst.afw.image.Mask(self.bbox.to_legacy()) 

745 if plane_map is None: 

746 plane_map = {plane.name: plane for plane in self.schema if plane is not None} 

747 for old_name, new_plane in plane_map.items(): 

748 old_bit = result.addMaskPlane(old_name) 

749 old_bitmask = 1 << old_bit 

750 result.array[self.get(new_plane.name)] |= old_bitmask 

751 return result 

752 

753 @staticmethod 

754 def _from_legacy_array( 

755 array2d: np.ndarray, 

756 old_planes: Mapping[str, int], 

757 *, 

758 start: YX[int], 

759 plane_map: Mapping[str, MaskPlane] | None = None, 

760 projection: Projection | None = None, 

761 ) -> Mask: 

762 planes: list[MaskPlane] = [] 

763 new_name_to_old_bitmask: dict[str, int] = {} 

764 for old_name, old_bit in old_planes.items(): 

765 old_bitmask = 1 << old_bit 

766 if plane_map is not None: 

767 if new_plane := plane_map.get(old_name): 

768 planes.append(new_plane) 

769 new_name_to_old_bitmask[new_plane.name] = old_bitmask 

770 else: 

771 if n_orphaned := np.count_nonzero(array2d & old_bitmask): 

772 raise RuntimeError( 

773 f"Legacy mask plane {old_name!r} is not remapped, " 

774 f"but {n_orphaned} pixels have this bit set." 

775 ) 

776 else: 

777 planes.append(MaskPlane(old_name, "")) 

778 new_name_to_old_bitmask[old_name] = old_bitmask 

779 schema = MaskSchema(planes) 

780 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection) 

781 for new_name, old_bitmask in new_name_to_old_bitmask.items(): 

782 mask.set(new_name, array2d & old_bitmask) 

783 return mask 

784 

785 @staticmethod 

786 def read_legacy( 

787 uri: ResourcePathExpression, 

788 *, 

789 plane_map: Mapping[str, MaskPlane] | None = None, 

790 ext: str | int = 1, 

791 fits_wcs_frame: Frame | None = None, 

792 ) -> Mask: 

793 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`. 

794 

795 Parameters 

796 ---------- 

797 uri 

798 URI or file name. 

799 plane_map 

800 A mapping from legacy mask plane name to the new plane name and 

801 description. 

802 ext 

803 Name or index of the FITS HDU to read. 

804 fits_wcs_frame 

805 If not `None` and the HDU containing the mask has a FITS WCS, 

806 attach a `Projection` to the returned mask by converting that WCS. 

807 """ 

808 opaque_metadata = fits.FitsOpaqueMetadata() 

809 fs, fspath = ResourcePath(uri).to_fsspec() 

810 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

811 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

812 result = Mask._read_legacy_hdu( 

813 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame 

814 ) 

815 result._opaque_metadata = opaque_metadata 

816 return result 

817 

818 @staticmethod 

819 def _read_legacy_hdu( 

820 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

821 opaque_metadata: fits.FitsOpaqueMetadata, 

822 plane_map: Mapping[str, MaskPlane] | None = None, 

823 fits_wcs_frame: Frame | None = None, 

824 ) -> Mask: 

825 if isinstance(hdu, astropy.io.fits.BinTableHDU): 

826 hdu = astropy.io.fits.CompImageHDU(bintable=hdu) 

827 dx: int = hdu.header.pop("LTV1") 

828 dy: int = hdu.header.pop("LTV2") 

829 start = YX(y=-dy, x=-dx) 

830 old_planes = MaskPlane.read_legacy(hdu.header) 

831 projection: Projection | None = None 

832 if fits_wcs_frame is not None: 

833 try: 

834 fits_wcs = astropy.wcs.WCS(hdu.header) 

835 except KeyError: 

836 pass 

837 else: 

838 projection = Projection.from_fits_wcs( 

839 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

840 ) 

841 mask = Mask._from_legacy_array( 

842 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection 

843 ) 

844 fits.strip_wcs_cards(hdu.header) 

845 hdu.header.strip() 

846 hdu.header.remove("EXTTYPE", ignore_missing=True) 

847 hdu.header.remove("INHERIT", ignore_missing=True) 

848 # afw set BUNIT on masks because of limitations in how FITS 

849 # metadata is handled there. 

850 hdu.header.remove("BUNIT", ignore_missing=True) 

851 opaque_metadata.add_header(hdu.header) 

852 return mask 

853 

854 

855class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

856 """Pydantic model used to represent the serialized form of a `.Mask`.""" 

857 

858 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field( 

859 description="References to pixel data." 

860 ) 

861 start: list[int] = pydantic.Field( 

862 description="Coordinate of the first pixels in the array, ordered (y, x)." 

863 ) 

864 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.") 

865 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.") 

866 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

867 default=None, 

868 exclude_if=is_none, 

869 description="Projection that maps the logical pixel grid onto the sky.", 

870 ) 

871 

872 @property 

873 def bbox(self) -> Box: 

874 """The 2-d bounding box of the mask.""" 

875 shape = self.data[0].shape 

876 if len(shape) == 3: 

877 shape = shape[:2] 

878 return Box.from_shape(shape, start=self.start) 

879 

880 def deserialize( 

881 self, 

882 archive: InputArchive[Any], 

883 *, 

884 bbox: Box | None = None, 

885 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

886 **kwargs: Any, 

887 ) -> Mask: 

888 """Deserialize a mask from an input archive. 

889 

890 Parameters 

891 ---------- 

892 archive 

893 Archive to read from. 

894 bbox 

895 Bounding box of a subimage to read instead. 

896 strip_header 

897 A callable that strips out any FITS header cards added by the 

898 ``update_header`` argument in the corresponding call to 

899 `Mask.serialize`. 

900 **kwargs 

901 Unsupported keyword arguments are accepted only to provide better 

902 error messages (raising `serialization.InvalidParameterError`). 

903 """ 

904 if kwargs: 

905 raise InvalidParameterError(f"Unrecognized parameters for Mask: {set(kwargs.keys())}.") 

906 slices: tuple[slice, ...] | EllipsisType = ... 

907 if bbox is not None: 

908 slices = bbox.slice_within(self.bbox) 

909 else: 

910 bbox = self.bbox 

911 if not is_integer(self.dtype): 

912 raise ArchiveReadError(f"Mask array has a non-integer dtype: {self.dtype}.") 

913 schema = MaskSchema(self.planes, dtype=self.dtype.to_numpy()) 

914 projection = self.projection.deserialize(archive) if self.projection is not None else None 

915 if len(self.data) == 1 and tuple(self.data[0].shape) == tuple(self.bbox.shape) + (schema.mask_size,): 

916 storage_slices = slices if slices is ... else (slice(None),) + slices 

917 array = archive.get_array(self.data[0], strip_header=strip_header, slices=storage_slices) 

918 array = np.moveaxis(array, 0, -1) 

919 return Mask(array, schema=schema, bbox=bbox, projection=projection)._finish_deserialize(self) 

920 result = Mask(0, schema=schema, bbox=bbox, projection=projection) 

921 schemas_2d = schema.split(np.int32) 

922 if len(schemas_2d) != len(self.data): 

923 raise ArchiveReadError( 

924 f"Number of mask arrays ({len(self.data)}) does not match expectation ({len(schemas_2d)})." 

925 ) 

926 for array_model, schema_2d in zip(self.data, schemas_2d): 

927 mask_2d = self._deserialize_2d( 

928 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices 

929 ) 

930 result.update(mask_2d) 

931 return result._finish_deserialize(self) 

932 

933 @staticmethod 

934 def _deserialize_2d( 

935 ref: ArrayReferenceModel | InlineArrayModel, 

936 schema_2d: MaskSchema, 

937 start: Sequence[int], 

938 archive: InputArchive[Any], 

939 *, 

940 slices: tuple[slice, ...] | EllipsisType = ..., 

941 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

942 ) -> Mask: 

943 def _strip_header(header: astropy.io.fits.Header) -> None: 

944 strip_header(header) 

945 schema_2d.strip_header(header) 

946 fits.strip_wcs_cards(header) 

947 

948 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

949 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start) 

950 

951 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

952 if kwargs: 

953 raise InvalidParameterError(f"Unsupported parameters for Mask components: {set(kwargs.keys())}.") 

954 return super().deserialize_component(component, archive) 

955 

956 

957def _archive_prefers_native_mask_arrays(archive: OutputArchive[Any]) -> bool: 

958 """Return whether an archive wants masks in their native 3-D layout.""" 

959 current: Any = archive 

960 while current is not None: 

961 if getattr(current, "_prefer_native_mask_arrays", False): 

962 return True 

963 current = getattr(current, "_parent", None) 

964 return False 

965 

966 

967def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]: 

968 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

969 for LSST visit images, c. DP2. 

970 """ 

971 return { 

972 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."), 

973 "SAT": MaskPlane( 

974 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel." 

975 ), 

976 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."), 

977 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."), 

978 "EDGE": MaskPlane( 

979 "DETECTION_EDGE", 

980 "Pixel was too close to the edge to be considered for detection, " 

981 "due to the finite size of the detection kernel.", 

982 ), 

983 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

984 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "), 

985 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

986 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."), 

987 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."), 

988 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."), 

989 "ITL_DIP": MaskPlane( 

990 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD." 

991 ), 

992 "NOT_DEBLENDED": MaskPlane( 

993 "NOT_DEBLENDED", 

994 "Pixel belonged to a detection that was not deblended, usually due to size limits.", 

995 ), 

996 "SPIKE": MaskPlane( 

997 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star." 

998 ), 

999 } 

1000 

1001 

1002def get_legacy_deep_coadd_mask_planes() -> dict[str, MaskPlane]: 

1003 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

1004 for LSST deep coadds, c. DP2. 

1005 """ 

1006 return { 

1007 # TODO: reconcile this with counts from the DP2 coadds. 

1008 # BAD, CLIPPED, SUSPECT, PARTLY_VIGNETTED, SPIKE: should be fully 

1009 # rejected from (cell) coadds with no propagation. 

1010 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

1011 "INTRP": MaskPlane("INTERPOLATED", "Pixel value is the result of interpolating nearby good pixels."), 

1012 "CR": MaskPlane( 

1013 "COSMIC_RAY", 

1014 "A cosmic ray affected this pixel on at least one input image (and was interpolated).", 

1015 ), 

1016 "SAT": MaskPlane("SATURATED", "More than 10% of the potential input visits."), 

1017 "EDGE": MaskPlane( 

1018 "DETECTION_EDGE", 

1019 "Pixel was too close to the edge to be considered for detection, " 

1020 "due to the finite size of the detection kernel.", 

1021 ), 

1022 "REJECTED": MaskPlane( 

1023 "REJECTED", "At least one input visit was left out of the coadd for this pixel due to masking." 

1024 ), 

1025 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

1026 "INEXACT_PSF": MaskPlane( 

1027 "INEXACT_PSF", 

1028 "Pixel is on or near a cell boundary and hence its PSF may be (usually slightly) discontinuous.", 

1029 ), 

1030 }