Coverage for python / lsst / images / _mask.py: 23%

391 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-16 00:52 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "Mask", 

16 "MaskPlane", 

17 "MaskPlaneBit", 

18 "MaskSchema", 

19 "MaskSerializationModel", 

20 "get_legacy_deep_coadd_mask_planes", 

21 "get_legacy_visit_image_mask_planes", 

22) 

23 

24import dataclasses 

25import math 

26from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set 

27from types import EllipsisType 

28from typing import Any, ClassVar, cast 

29 

30import astropy.io.fits 

31import astropy.wcs 

32import numpy as np 

33import numpy.typing as npt 

34import pydantic 

35from astro_metadata_translator import ObservationInfo 

36 

37from lsst.resources import ResourcePath, ResourcePathExpression 

38 

39from . import fits 

40from ._generalized_image import GeneralizedImage 

41from ._geom import YX, Box, NoOverlapError 

42from ._transforms import Frame, Projection, ProjectionSerializationModel 

43from .serialization import ( 

44 ArchiveReadError, 

45 ArchiveTree, 

46 ArrayReferenceModel, 

47 InlineArrayModel, 

48 InputArchive, 

49 IntegerType, 

50 MetadataValue, 

51 NumberType, 

52 OutputArchive, 

53 is_integer, 

54 no_header_updates, 

55) 

56from .utils import is_none 

57 

58 

59@dataclasses.dataclass(frozen=True) 

60class MaskPlane: 

61 """Name and description of a single plane in a mask array.""" 

62 

63 name: str 

64 """Unique name for the mask plane (`str`).""" 

65 

66 description: str 

67 """Human-readable documentation for the mask plane (`str`).""" 

68 

69 @classmethod 

70 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]: 

71 """Read mask plane descriptions written by 

72 `lsst.afw.image.Mask.writeFits`. 

73 

74 Parameters 

75 ---------- 

76 header 

77 FITS header. 

78 

79 Returns 

80 ------- 

81 `dict` [`str`, `int`] 

82 A dictionary mapping mask plane name to integer bit index. 

83 """ 

84 result: dict[str, int] = {} 

85 for card in list(header.cards): 

86 if card.keyword.startswith("MP_"): 

87 result[card.keyword.removeprefix("MP_")] = card.value 

88 del header[card.keyword] 

89 return result 

90 

91 

92@dataclasses.dataclass(frozen=True) 

93class MaskPlaneBit: 

94 """The nested array index and mask value associated with a single mask 

95 plane. 

96 """ 

97 

98 index: int 

99 """Index into the last dimension of the mask array where this plane's bit 

100 is stored. 

101 """ 

102 

103 mask: np.integer 

104 """Bitmask that selects just this plane's bit from a mask array value 

105 (`numpy.integer`). 

106 """ 

107 

108 @classmethod 

109 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit: 

110 """Construct a `MaskPlaneBit` from the overall index of a plane in a 

111 `MaskSchema` and the stride (number of bits per mask array element). 

112 """ 

113 index, bit = divmod(overall_index, stride) 

114 return cls(index, mask_type(1 << bit)) 

115 

116 

117class MaskSchema: 

118 """A schema for a bit-packed mask array. 

119 

120 Parameters 

121 ---------- 

122 planes 

123 Iterable of `MaskPlane` instances that define the schema. `None` 

124 values may be included to reserve bits for future use. 

125 dtype 

126 The numpy data type of the mask arrays that use this schema. 

127 

128 Notes 

129 ----- 

130 A `MaskSchema` is a collection of mask planes, which each correspond to a 

131 single bit in a mask array. Mask schemas are immutable and associated with 

132 a particular array data type, allowing them to safely precompute the index 

133 and bitmask for each plane. 

134 

135 `MaskSchema` indexing is by integer (the overall index of a plane in the 

136 schema). The `descriptions` attribute may be indexed by plane name to get 

137 the description for that plane, and the `bitmask` method can be used to 

138 obtain an array that can be used to select one or more planes by name in 

139 a mask array that uses this schema. 

140 

141 If no mask planes are provided, a `None` placeholder is automatically 

142 added. 

143 """ 

144 

145 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8): 

146 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,) 

147 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype)) 

148 stride = self.bits_per_element(self._dtype) 

149 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None} 

150 self._mask_size = math.ceil(len(self._planes) / stride) 

151 self._bits: dict[str, MaskPlaneBit] = { 

152 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type) 

153 for n, plane in enumerate(self._planes) 

154 if plane is not None 

155 } 

156 

157 @staticmethod 

158 def bits_per_element(dtype: npt.DTypeLike) -> int: 

159 """Return the number of mask bits per array element for the given 

160 data type. 

161 """ 

162 dtype = np.dtype(dtype) 

163 match dtype.kind: 

164 case "u": 

165 return dtype.itemsize * 8 

166 case "i": 

167 return dtype.itemsize * 8 - 1 

168 case _: 

169 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.") 

170 

171 def __iter__(self) -> Iterator[MaskPlane | None]: 

172 return iter(self._planes) 

173 

174 def __len__(self) -> int: 

175 return len(self._planes) 

176 

177 def __getitem__(self, i: int) -> MaskPlane | None: 

178 return self._planes[i] 

179 

180 def __repr__(self) -> str: 

181 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})" 

182 

183 def __str__(self) -> str: 

184 return "\n".join( 

185 [ 

186 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}" 

187 for name, bit in self._bits.items() 

188 ] 

189 ) 

190 

191 def __eq__(self, other: object) -> bool: 

192 if isinstance(other, MaskSchema): 

193 return self._planes == other._planes and self._dtype == other._dtype 

194 return False 

195 

196 @property 

197 def dtype(self) -> np.dtype: 

198 """The numpy data type of the mask arrays that use this schema.""" 

199 return self._dtype 

200 

201 @property 

202 def mask_size(self) -> int: 

203 """The number of elements in the last dimension of any mask array that 

204 uses this schema. 

205 """ 

206 return self._mask_size 

207 

208 @property 

209 def names(self) -> Set[str]: 

210 """The names of the mask planes, in bit order.""" 

211 return self._bits.keys() 

212 

213 @property 

214 def descriptions(self) -> Mapping[str, str]: 

215 """A mapping from plane name to description.""" 

216 return self._descriptions 

217 

218 def bit(self, plane: str) -> MaskPlaneBit: 

219 """Return the last array index and mask for the given mask plane.""" 

220 return self._bits[plane] 

221 

222 def bitmask(self, *planes: str) -> np.ndarray: 

223 """Return a 1-d mask array that represents the union (i.e. bitwise OR) 

224 of the planes with the given names. 

225 

226 Parameters 

227 ---------- 

228 *planes 

229 Mask plane names. 

230 

231 Returns 

232 ------- 

233 numpy.ndarray 

234 A 1-d array with shape ``(mask_size,)``. 

235 """ 

236 result = np.zeros(self.mask_size, dtype=self._dtype) 

237 for plane in planes: 

238 bit = self._bits[plane] 

239 result[bit.index] |= bit.mask 

240 return result 

241 

242 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]: 

243 """Split the schema into an equivalent series of schemas that each 

244 have a `mask_size` of ``1``, dropping all `None` placeholders. 

245 

246 Parameters 

247 ---------- 

248 dtype 

249 Data type of the new mask pixels. 

250 

251 Returns 

252 ------- 

253 `list` [`MaskSchema`] 

254 A list of mask schemas that together include all planes in 

255 ``self`` and have `mask_size` equal to ``1``. If there are no 

256 mask planes (only `None` placeholders) in ``self``, a single mask 

257 schema with a `None` placeholder is returned; otherwise `None` 

258 placeholders are returned. 

259 """ 

260 dtype = np.dtype(dtype) 

261 planes: list[MaskPlane] = [] 

262 schemas: list[MaskSchema] = [] 

263 n_planes_per_schema = self.bits_per_element(dtype) 

264 for plane in self._planes: 

265 if plane is not None: 

266 planes.append(plane) 

267 if len(planes) == n_planes_per_schema: 

268 schemas.append(MaskSchema(planes, dtype=dtype)) 

269 planes.clear() 

270 if planes: 

271 schemas.append(MaskSchema(planes, dtype=dtype)) 

272 if not schemas: 

273 schemas.append(MaskSchema([None], dtype=dtype)) 

274 return schemas 

275 

276 def update_header(self, header: astropy.io.fits.Header) -> None: 

277 """Add a description of this mask schema to a FITS header.""" 

278 for n, plane in enumerate(self): 

279 if plane is not None: 

280 bit = self.bit(plane.name) 

281 if bit.index != 0: 

282 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.") 

283 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.") 

284 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.") 

285 # We don't add a comment to the description card, because it's 

286 # likely to overrun a single card and get the CONTINUE 

287 # treatment. That will cause Astropy to warn about the comment 

288 # being truncated and that's worse than just leaving it 

289 # unexplained; it's pretty obvious from context what it is. 

290 header.set(f"MSKD{n:04d}", plane.description) 

291 

292 def strip_header(self, header: astropy.io.fits.Header) -> None: 

293 """Remove all header cards added by `update_header`.""" 

294 for n, plane in enumerate(self): 

295 if plane is not None: 

296 header.remove(f"MSKN{n:04d}", ignore_missing=True) 

297 header.remove(f"MSKM{n:04d}", ignore_missing=True) 

298 header.remove(f"MSKD{n:04d}", ignore_missing=True) 

299 

300 

301class Mask(GeneralizedImage): 

302 """A 2-d bitmask image backed by a 3-d byte array. 

303 

304 Parameters 

305 ---------- 

306 array_or_fill 

307 Array or fill value for the mask. If a fill value, ``bbox`` or 

308 ``shape`` must be provided. 

309 schema 

310 Schema that defines the planes and their bit assignments. 

311 bbox 

312 Bounding box for the mask. This sets the shape of the first two 

313 dimensions of the array. 

314 start 

315 Logical coordinates of the first pixel in the array, ordered ``y``, 

316 ``x`` (unless an `XY` instance is passed). Ignored if 

317 ``bbox`` is provided. Defaults to zeros. 

318 shape 

319 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

320 instance is passed). Only needed if ``array_or_fill`` is not an 

321 array and ``bbox`` is not provided. Like the bbox, this does not 

322 include the last dimension of the array. 

323 projection 

324 Projection that maps the pixel grid to the sky. 

325 obs_info 

326 General information about the associated observation in standardized 

327 form. 

328 metadata 

329 Arbitrary flexible metadata to associate with the mask. 

330 

331 Notes 

332 ----- 

333 Indexing the `array` attribute of a `Mask` does not take into account its 

334 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with 

335 a `Box` does, and the `bbox` of the subimage is set to match its location 

336 within the original mask. 

337 

338 A mask's ``bbox`` corresponds to the leading dimensions of its backing 

339 `numpy.ndarray`, while the last dimension's size is always equal to the 

340 `~MaskSchema.mask_size` of its schema, since a schema can in general 

341 require multiple array elements to represent all of its planes. 

342 """ 

343 

344 def __init__( 

345 self, 

346 array_or_fill: np.ndarray | int = 0, 

347 /, 

348 *, 

349 schema: MaskSchema, 

350 bbox: Box | None = None, 

351 start: Sequence[int] | None = None, 

352 shape: Sequence[int] | None = None, 

353 projection: Projection | None = None, 

354 obs_info: ObservationInfo | None = None, 

355 metadata: dict[str, MetadataValue] | None = None, 

356 ): 

357 super().__init__(metadata) 

358 if shape is not None: 

359 shape = tuple(shape) 

360 if start is not None: 

361 start = tuple(start) 

362 if isinstance(array_or_fill, np.ndarray): 

363 array = np.array(array_or_fill, dtype=schema.dtype) 

364 if array.ndim != 3: 

365 raise ValueError("Mask array must be 3-d.") 

366 if bbox is None: 

367 bbox = Box.from_shape(array.shape[:-1], start=start) 

368 elif bbox.shape + (schema.mask_size,) != array.shape: 

369 raise ValueError( 

370 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not " 

371 f"match array with shape {array.shape}." 

372 ) 

373 if shape is not None and shape + (schema.mask_size,) != array.shape: 

374 raise ValueError( 

375 f"Explicit shape {shape} and schema of size {schema.mask_size} do " 

376 f"not match array with shape {array.shape}." 

377 ) 

378 

379 else: 

380 if bbox is None: 

381 if shape is None: 

382 raise TypeError("No bbox, size, or array provided.") 

383 bbox = Box.from_shape(shape, start=start) 

384 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype) 

385 self._array = array 

386 self._bbox: Box = bbox 

387 self._schema: MaskSchema = schema 

388 self._projection = projection 

389 self._obs_info = obs_info 

390 

391 @property 

392 def array(self) -> np.ndarray: 

393 """The low-level array (`numpy.ndarray`). 

394 

395 Assigning to this attribute modifies the existing array in place; the 

396 bounding box and underlying data pointer are never changed. 

397 """ 

398 return self._array 

399 

400 @array.setter 

401 def array(self, value: np.ndarray | int) -> None: 

402 self._array[:, :] = value 

403 

404 @property 

405 def schema(self) -> MaskSchema: 

406 """Schema that defines the planes and their bit assignments 

407 (`MaskSchema`). 

408 """ 

409 return self._schema 

410 

411 @property 

412 def bbox(self) -> Box: 

413 """2-d bounding box of the mask (`Box`). 

414 

415 This sets the shape of the first two dimensions of the array. 

416 """ 

417 return self._bbox 

418 

419 @property 

420 def projection(self) -> Projection[Any] | None: 

421 """The projection that maps this mask's pixel grid to the sky 

422 (`Projection` | `None`). 

423 

424 Notes 

425 ----- 

426 The pixel coordinates used by this projection account for the bounding 

427 box ``start``; they are not just array indices. 

428 """ 

429 return self._projection 

430 

431 @property 

432 def obs_info(self) -> ObservationInfo | None: 

433 """General information about the associated observation in standard 

434 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

435 """ 

436 return self._obs_info 

437 

438 def __getitem__(self, bbox: Box | EllipsisType) -> Mask: 

439 if bbox is ...: 

440 return self 

441 super().__getitem__(bbox) 

442 return self._transfer_metadata( 

443 Mask( 

444 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :], 

445 bbox=bbox, 

446 schema=self.schema, 

447 ), 

448 bbox=bbox, 

449 ) 

450 

451 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None: 

452 subview = self[bbox] 

453 subview.clear() 

454 subview.update(value) 

455 

456 def __str__(self) -> str: 

457 return f"Mask({self.bbox!s}, {list(self.schema.names)})" 

458 

459 def __repr__(self) -> str: 

460 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})" 

461 

462 def __eq__(self, other: object) -> bool: 

463 if not isinstance(other, Mask): 

464 return NotImplemented 

465 return ( 

466 self._bbox == other._bbox 

467 and self._schema == other._schema 

468 and np.array_equal(self._array, other._array, equal_nan=True) 

469 ) 

470 

471 def copy(self) -> Mask: 

472 """Deep-copy the mask and metadata.""" 

473 return self._transfer_metadata( 

474 Mask( 

475 self._array.copy(), 

476 bbox=self._bbox, 

477 schema=self._schema, 

478 projection=self._projection, 

479 obs_info=self._obs_info, 

480 ), 

481 copy=True, 

482 ) 

483 

484 def view( 

485 self, 

486 *, 

487 schema: MaskSchema | EllipsisType = ..., 

488 projection: Projection | None | EllipsisType = ..., 

489 start: Sequence[int] | EllipsisType = ..., 

490 obs_info: ObservationInfo | None | EllipsisType = ..., 

491 ) -> Mask: 

492 """Make a view of the mask, with optional updates. 

493 

494 Notes 

495 ----- 

496 This can only be used to make changes to schema descriptions; plane 

497 names must remain the same (in the same order). 

498 """ 

499 if schema is ...: 

500 schema = self._schema 

501 else: 

502 if list(schema.names) != list(self.schema.names): 

503 raise ValueError("Cannot create a mask view with a schema with different names.") 

504 if projection is ...: 

505 projection = self._projection 

506 if start is ...: 

507 start = self._bbox.start 

508 if obs_info is ...: 

509 obs_info = self._obs_info 

510 return self._transfer_metadata( 

511 Mask(self._array, start=start, schema=schema, projection=projection, obs_info=obs_info) 

512 ) 

513 

514 def update(self, other: Mask) -> None: 

515 """Update ``self`` to include all common mask values set in ``other``. 

516 

517 Notes 

518 ----- 

519 This only operates on the intersection of the two mask bounding boxes 

520 and the mask planes that are present in both. Mask bits are only set, 

521 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments). 

522 """ 

523 lhs = self 

524 rhs = other 

525 if other.bbox != self.bbox: 

526 try: 

527 bbox = self.bbox.intersection(other.bbox) 

528 except NoOverlapError: 

529 return 

530 lhs = self[bbox] 

531 rhs = other[bbox] 

532 for name in self.schema.names & other.schema.names: 

533 lhs.set(name, rhs.get(name)) 

534 

535 def get(self, plane: str) -> np.ndarray: 

536 """Return a 2-d boolean array for the given mask plane. 

537 

538 Parameters 

539 ---------- 

540 plane 

541 Name of the mask plane. 

542 

543 Returns 

544 ------- 

545 numpy.ndarray 

546 A 2-d boolean array with the same shape as `bbox` that is `True` 

547 where the bit for ``plane`` is set and `False` elsewhere. 

548 """ 

549 bit = self.schema.bit(plane) 

550 return (self._array[..., bit.index] & bit.mask).astype(bool) 

551 

552 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

553 """Set a mask plane. 

554 

555 Parameters 

556 ---------- 

557 plane 

558 Name of the mask plane to set 

559 boolean_mask 

560 A 2-d boolean array with the same shape as `bbox` that is `True` 

561 where the bit for ``plane`` should be set and `False` where it 

562 should be left unchanged (*not* set to zero). May be ``...`` to 

563 set the bit everywhere. 

564 """ 

565 bit = self.schema.bit(plane) 

566 if boolean_mask is not ...: 

567 boolean_mask = boolean_mask.astype(bool) 

568 self._array[boolean_mask, bit.index] |= bit.mask 

569 

570 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

571 """Clear one or more mask planes. 

572 

573 Parameters 

574 ---------- 

575 plane 

576 Name of the mask plane to set. If `None` all mask planes are 

577 cleared. 

578 boolean_mask 

579 A 2-d boolean array with the same shape as `bbox` that is `True` 

580 where the bit for ``plane`` should be cleared and `False` where it 

581 should be left unchanged. May be ``...`` to clear the bit 

582 everywhere. 

583 """ 

584 if boolean_mask is not ...: 

585 boolean_mask = boolean_mask.astype(bool) 

586 if plane is None: 

587 self._array[boolean_mask, :] = 0 

588 else: 

589 bit = self.schema.bit(plane) 

590 self._array[boolean_mask, bit.index] &= ~bit.mask 

591 

592 def serialize[P: pydantic.BaseModel]( 

593 self, 

594 archive: OutputArchive[P], 

595 *, 

596 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

597 save_projection: bool = True, 

598 save_obs_info: bool = True, 

599 add_offset_wcs: str | None = "A", 

600 ) -> MaskSerializationModel[P]: 

601 """Serialize the mask to an output archive. 

602 

603 Parameters 

604 ---------- 

605 archive 

606 Archive to write to. 

607 update_header 

608 A callback that will be given the FITS header for the HDU 

609 containing this mask in order to add keys to it. This callback 

610 may be provided but will not be called if the output format is not 

611 FITS. As multiple HDUs may be added, this function may be called 

612 multiple times. 

613 save_projection 

614 If `True`, save the `Projection` attached to the image, if there 

615 is one. This does not affect whether a FITS WCS corresponding to 

616 the projection is written (it always is, if available, and if 

617 ``add_offset_wcs`` is not ``" "``). 

618 save_obs_info 

619 If `True`, save the 

620 `~astro_metadata_translator.ObservationInfo` attached to the 

621 image, if there is one. 

622 add_offset_wcs 

623 A FITS WCS single-character suffix to use when adding a linear 

624 WCS that maps the FITS array to the logical pixel coordinates 

625 defined by ``bbox.start``. Set to `None` to not write this WCS. 

626 If this is set to ``" "``, it will prevent the `Projection` from 

627 being saved as a FITS WCS. 

628 """ 

629 if _archive_prefers_native_mask_arrays(archive): 

630 # HDS presents array dimensions in Fortran order, which is the 

631 # reverse of the h5py dataset shape. Store the in-memory trailing 

632 # mask-byte axis first in HDF5 so Starlink tools see HDS axes 

633 # (x, y, byte), without changing the bit packing within a pixel. 

634 array_model = archive.add_array(np.moveaxis(self._array, -1, 0), update_header=update_header) 

635 if not isinstance(array_model, ArrayReferenceModel): 

636 raise RuntimeError("Native mask arrays require reference array storage.") 

637 array_model.shape = list(self._array.shape) 

638 data: list[ArrayReferenceModel | InlineArrayModel] = [array_model] 

639 else: 

640 data = [] 

641 for schema_2d in self.schema.split(np.int32): 

642 mask_2d = Mask( 

643 0, bbox=self.bbox, schema=schema_2d, projection=self._projection, obs_info=self._obs_info 

644 ) 

645 mask_2d.update(self) 

646 data.append( 

647 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs) 

648 ) 

649 serialized_projection: ProjectionSerializationModel[P] | None = None 

650 if save_projection and self.projection is not None: 

651 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

652 serialized_dtype = NumberType.from_numpy(self.schema.dtype) 

653 assert is_integer(serialized_dtype), "Mask dtypes should always be integers." 

654 return MaskSerializationModel.model_construct( 

655 data=data, 

656 start=list(self.bbox.start), 

657 planes=list(self.schema), 

658 dtype=serialized_dtype, 

659 projection=serialized_projection, 

660 obs_info=self._obs_info if save_obs_info else None, 

661 metadata=self.metadata, 

662 ) 

663 

664 def _serialize_2d[P: pydantic.BaseModel]( 

665 self, 

666 archive: OutputArchive[P], 

667 *, 

668 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

669 add_offset_wcs: str | None = "A", 

670 ) -> ArrayReferenceModel | InlineArrayModel: 

671 def _update_header(header: astropy.io.fits.Header) -> None: 

672 update_header(header) 

673 self.schema.update_header(header) 

674 if self.projection is not None and add_offset_wcs != " ": 

675 if self.fits_wcs: 

676 header.update(self.fits_wcs.to_header(relax=True)) 

677 if add_offset_wcs is not None: 

678 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

679 

680 assert self.array.shape[2] == 1, "Mask should be split before calling this method." 

681 return archive.add_array(self._array[:, :, 0], update_header=_update_header) 

682 

683 @staticmethod 

684 def _get_archive_tree_type[P: pydantic.BaseModel]( 

685 pointer_type: type[P], 

686 ) -> type[MaskSerializationModel[P]]: 

687 """Return the serialization model type for this object for an archive 

688 type that uses the given pointer type. 

689 """ 

690 return MaskSerializationModel[pointer_type] # type: ignore 

691 

692 _archive_default_name: ClassVar[str] = "mask" 

693 """The name this object should be serialized with when written as the 

694 top-level object. 

695 """ 

696 

697 def write_fits( 

698 self, 

699 filename: str, 

700 *, 

701 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

702 ) -> None: 

703 """Write the mask to a FITS file. 

704 

705 Parameters 

706 ---------- 

707 filename 

708 Name of the file to write to. Must be a local file. 

709 compression 

710 Compression options. 

711 """ 

712 compression_options = {} 

713 if compression is not fits.FitsCompressionOptions.DEFAULT: 

714 compression_options[self._archive_default_name] = compression 

715 fits.write(self, filename, compression_options) 

716 

717 @staticmethod 

718 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask: 

719 """Read an image from a FITS file. 

720 

721 Parameters 

722 ---------- 

723 url 

724 URL of the file to read; may be any type supported by 

725 `lsst.resources.ResourcePath`. 

726 bbox 

727 Bounding box of a subimage to read instead. 

728 """ 

729 return fits.read(Mask, url, bbox=bbox).deserialized 

730 

731 @staticmethod 

732 def from_legacy( 

733 legacy: Any, 

734 plane_map: Mapping[str, MaskPlane] | None = None, 

735 ) -> Mask: 

736 """Convert from an `lsst.afw.image.Mask` instance. 

737 

738 Parameters 

739 ---------- 

740 legacy 

741 An `lsst.afw.image.Mask` instance. This will not share pixel 

742 data with the new object. 

743 plane_map 

744 A mapping from legacy mask plane name to the new plane name and 

745 description. 

746 """ 

747 return Mask._from_legacy_array( 

748 legacy.array, 

749 legacy.getMaskPlaneDict(), 

750 start=YX(y=legacy.getY0(), x=legacy.getX0()), 

751 plane_map=plane_map, 

752 ) 

753 

754 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

755 """Convert to an `lsst.afw.image.Mask` instance. 

756 

757 The pixel data will not be shared between the two objects. 

758 

759 Parameters 

760 ---------- 

761 plane_map 

762 A mapping from legacy mask plane name to the new plane name and 

763 description. 

764 """ 

765 import lsst.afw.image 

766 import lsst.geom 

767 

768 result = lsst.afw.image.Mask(self.bbox.to_legacy()) 

769 if plane_map is None: 

770 plane_map = {plane.name: plane for plane in self.schema if plane is not None} 

771 for old_name, new_plane in plane_map.items(): 

772 old_bit = result.addMaskPlane(old_name) 

773 old_bitmask = 1 << old_bit 

774 result.array[self.get(new_plane.name)] |= old_bitmask 

775 return result 

776 

777 @staticmethod 

778 def _from_legacy_array( 

779 array2d: np.ndarray, 

780 old_planes: Mapping[str, int], 

781 *, 

782 start: YX[int], 

783 plane_map: Mapping[str, MaskPlane] | None = None, 

784 projection: Projection | None = None, 

785 ) -> Mask: 

786 planes: list[MaskPlane] = [] 

787 new_name_to_old_bitmask: dict[str, int] = {} 

788 for old_name, old_bit in old_planes.items(): 

789 old_bitmask = 1 << old_bit 

790 if plane_map is not None: 

791 if new_plane := plane_map.get(old_name): 

792 planes.append(new_plane) 

793 new_name_to_old_bitmask[new_plane.name] = old_bitmask 

794 else: 

795 if n_orphaned := np.count_nonzero(array2d & old_bitmask): 

796 raise RuntimeError( 

797 f"Legacy mask plane {old_name!r} is not remapped, " 

798 f"but {n_orphaned} pixels have this bit set." 

799 ) 

800 else: 

801 planes.append(MaskPlane(old_name, "")) 

802 new_name_to_old_bitmask[old_name] = old_bitmask 

803 schema = MaskSchema(planes) 

804 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection) 

805 for new_name, old_bitmask in new_name_to_old_bitmask.items(): 

806 mask.set(new_name, array2d & old_bitmask) 

807 return mask 

808 

809 @staticmethod 

810 def read_legacy( 

811 uri: ResourcePathExpression, 

812 *, 

813 plane_map: Mapping[str, MaskPlane] | None = None, 

814 ext: str | int = 1, 

815 fits_wcs_frame: Frame | None = None, 

816 ) -> Mask: 

817 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`. 

818 

819 Parameters 

820 ---------- 

821 uri 

822 URI or file name. 

823 plane_map 

824 A mapping from legacy mask plane name to the new plane name and 

825 description. 

826 ext 

827 Name or index of the FITS HDU to read. 

828 fits_wcs_frame 

829 If not `None` and the HDU containing the mask has a FITS WCS, 

830 attach a `Projection` to the returned mask by converting that WCS. 

831 """ 

832 opaque_metadata = fits.FitsOpaqueMetadata() 

833 fs, fspath = ResourcePath(uri).to_fsspec() 

834 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

835 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

836 result = Mask._read_legacy_hdu( 

837 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame 

838 ) 

839 result._opaque_metadata = opaque_metadata 

840 return result 

841 

842 @staticmethod 

843 def _read_legacy_hdu( 

844 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

845 opaque_metadata: fits.FitsOpaqueMetadata, 

846 plane_map: Mapping[str, MaskPlane] | None = None, 

847 fits_wcs_frame: Frame | None = None, 

848 ) -> Mask: 

849 if isinstance(hdu, astropy.io.fits.BinTableHDU): 

850 hdu = astropy.io.fits.CompImageHDU(bintable=hdu) 

851 dx: int = hdu.header.pop("LTV1") 

852 dy: int = hdu.header.pop("LTV2") 

853 start = YX(y=-dy, x=-dx) 

854 old_planes = MaskPlane.read_legacy(hdu.header) 

855 projection: Projection | None = None 

856 if fits_wcs_frame is not None: 

857 try: 

858 fits_wcs = astropy.wcs.WCS(hdu.header) 

859 except KeyError: 

860 pass 

861 else: 

862 projection = Projection.from_fits_wcs( 

863 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

864 ) 

865 mask = Mask._from_legacy_array( 

866 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection 

867 ) 

868 fits.strip_wcs_cards(hdu.header) 

869 hdu.header.strip() 

870 hdu.header.remove("EXTTYPE", ignore_missing=True) 

871 hdu.header.remove("INHERIT", ignore_missing=True) 

872 # afw set BUNIT on masks because of limitations in how FITS 

873 # metadata is handled there. 

874 hdu.header.remove("BUNIT", ignore_missing=True) 

875 opaque_metadata.add_header(hdu.header) 

876 return mask 

877 

878 

879class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

880 """Pydantic model used to represent the serialized form of a `.Mask`.""" 

881 

882 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field( 

883 description="References to pixel data." 

884 ) 

885 start: list[int] = pydantic.Field( 

886 description="Coordinate of the first pixels in the array, ordered (y, x)." 

887 ) 

888 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.") 

889 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.") 

890 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

891 default=None, 

892 exclude_if=is_none, 

893 description="Projection that maps the logical pixel grid onto the sky.", 

894 ) 

895 obs_info: ObservationInfo | None = pydantic.Field( 

896 default=None, 

897 exclude_if=is_none, 

898 description="Standardized description of image metadata", 

899 ) 

900 

901 @property 

902 def bbox(self) -> Box: 

903 """The 2-d bounding box of the mask.""" 

904 shape = self.data[0].shape 

905 if len(shape) == 3: 

906 shape = shape[:2] 

907 return Box.from_shape(shape, start=self.start) 

908 

909 def deserialize( 

910 self, 

911 archive: InputArchive[Any], 

912 *, 

913 bbox: Box | None = None, 

914 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

915 ) -> Mask: 

916 """Deserialize a mask from an input archive. 

917 

918 Parameters 

919 ---------- 

920 archive 

921 Archive to read from. 

922 bbox 

923 Bounding box of a subimage to read instead. 

924 strip_header 

925 A callable that strips out any FITS header cards added by the 

926 ``update_header`` argument in the corresponding call to 

927 `Mask.serialize`. 

928 """ 

929 slices: tuple[slice, ...] | EllipsisType = ... 

930 if bbox is not None: 

931 slices = bbox.slice_within(self.bbox) 

932 else: 

933 bbox = self.bbox 

934 if not is_integer(self.dtype): 

935 raise ArchiveReadError(f"Mask array has a non-integer dtype: {self.dtype}.") 

936 schema = MaskSchema(self.planes, dtype=self.dtype.to_numpy()) 

937 projection = self.projection.deserialize(archive) if self.projection is not None else None 

938 if len(self.data) == 1 and tuple(self.data[0].shape) == tuple(self.bbox.shape) + (schema.mask_size,): 

939 storage_slices = slices if slices is ... else (slice(None),) + slices 

940 array = archive.get_array(self.data[0], strip_header=strip_header, slices=storage_slices) 

941 array = np.moveaxis(array, 0, -1) 

942 return Mask( 

943 array, 

944 schema=schema, 

945 bbox=bbox, 

946 projection=projection, 

947 obs_info=self.obs_info, 

948 )._finish_deserialize(self) 

949 result = Mask( 

950 0, 

951 schema=schema, 

952 bbox=bbox, 

953 projection=projection, 

954 obs_info=self.obs_info, 

955 ) 

956 schemas_2d = schema.split(np.int32) 

957 if len(schemas_2d) != len(self.data): 

958 raise ArchiveReadError( 

959 f"Number of mask arrays ({len(self.data)}) does not match expectation ({len(schemas_2d)})." 

960 ) 

961 for array_model, schema_2d in zip(self.data, schemas_2d): 

962 mask_2d = self._deserialize_2d( 

963 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices 

964 ) 

965 result.update(mask_2d) 

966 return result._finish_deserialize(self) 

967 

968 @staticmethod 

969 def _deserialize_2d( 

970 ref: ArrayReferenceModel | InlineArrayModel, 

971 schema_2d: MaskSchema, 

972 start: Sequence[int], 

973 archive: InputArchive[Any], 

974 *, 

975 slices: tuple[slice, ...] | EllipsisType = ..., 

976 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

977 ) -> Mask: 

978 def _strip_header(header: astropy.io.fits.Header) -> None: 

979 strip_header(header) 

980 schema_2d.strip_header(header) 

981 fits.strip_wcs_cards(header) 

982 

983 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

984 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start) 

985 

986 

987def _archive_prefers_native_mask_arrays(archive: OutputArchive[Any]) -> bool: 

988 """Return whether an archive wants masks in their native 3-D layout.""" 

989 current: Any = archive 

990 while current is not None: 

991 if getattr(current, "_prefer_native_mask_arrays", False): 

992 return True 

993 current = getattr(current, "_parent", None) 

994 return False 

995 

996 

997def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]: 

998 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

999 for LSST visit images, c. DP2. 

1000 """ 

1001 return { 

1002 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."), 

1003 "SAT": MaskPlane( 

1004 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel." 

1005 ), 

1006 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."), 

1007 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."), 

1008 "EDGE": MaskPlane( 

1009 "DETECTION_EDGE", 

1010 "Pixel was too close to the edge to be considered for detection, " 

1011 "due to the finite size of the detection kernel.", 

1012 ), 

1013 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

1014 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "), 

1015 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

1016 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."), 

1017 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."), 

1018 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."), 

1019 "ITL_DIP": MaskPlane( 

1020 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD." 

1021 ), 

1022 "NOT_DEBLENDED": MaskPlane( 

1023 "NOT_DEBLENDED", 

1024 "Pixel belonged to a detection that was not deblended, usually due to size limits.", 

1025 ), 

1026 "SPIKE": MaskPlane( 

1027 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star." 

1028 ), 

1029 } 

1030 

1031 

1032def get_legacy_deep_coadd_mask_planes() -> dict[str, MaskPlane]: 

1033 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

1034 for LSST deep coadds, c. DP2. 

1035 """ 

1036 return { 

1037 # TODO: reconcile this with counts from the DP2 coadds. 

1038 # BAD, CLIPPED, SUSPECT, PARTLY_VIGNETTED, SPIKE: should be fully 

1039 # rejected from (cell) coadds with no propagation. 

1040 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

1041 "INTRP": MaskPlane("INTERPOLATED", "Pixel value is the result of interpolating nearby good pixels."), 

1042 "CR": MaskPlane( 

1043 "COSMIC_RAY", 

1044 "A cosmic ray affected this pixel on at least one input image (and was interpolated).", 

1045 ), 

1046 "SAT": MaskPlane("SATURATED", "More than 10% of the potential input visits."), 

1047 "EDGE": MaskPlane( 

1048 "DETECTION_EDGE", 

1049 "Pixel was too close to the edge to be considered for detection, " 

1050 "due to the finite size of the detection kernel.", 

1051 ), 

1052 "REJECTED": MaskPlane( 

1053 "REJECTED", "At least one input visit was left out of the coadd for this pixel due to masking." 

1054 ), 

1055 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

1056 "INEXACT_PSF": MaskPlane( 

1057 "INEXACT_PSF", 

1058 "Pixel is on or near a cell boundary and hence its PSF may be (usually slightly) discontinuous.", 

1059 ), 

1060 }