Coverage for python/lsst/images/_masked_image.py: 33%

178 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 09:08 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("MaskedImage", "MaskedImageSerializationModel") 

15 

16import functools 

17from collections.abc import Mapping 

18from contextlib import ExitStack 

19from types import EllipsisType 

20from typing import TYPE_CHECKING, Any, Literal, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from . import fits 

31from ._generalized_image import GeneralizedImage 

32from ._geom import Box 

33from ._image import Image, ImageSerializationModel 

34from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel 

35from ._transforms import Frame, Projection, ProjectionSerializationModel 

36from .serialization import ArchiveTree, InputArchive, InvalidParameterError, MetadataValue, OutputArchive 

37from .utils import is_none 

38 

39if TYPE_CHECKING: 

40 try: 

41 from lsst.afw.image import MaskedImage as LegacyMaskedImage 

42 except ImportError: 

43 type LegacyMaskedImage = Any # type: ignore[no-redef] 

44 

45 

46class MaskedImage(GeneralizedImage): 

47 """A multi-plane image with data (image), mask, and variance planes. 

48 

49 Parameters 

50 ---------- 

51 image 

52 The main image plane. If this has a `Projection`, it will be used 

53 for all planes unless a ``projection`` is passed separately. 

54 mask 

55 A bitmask image that annotates the main image plane. Must have the 

56 same bounding box as ``image`` if provided. Any attached projection 

57 is replaced (possibly by `None`). 

58 variance 

59 The per-pixel uncertainty of the main image as an image of variance 

60 values. Must have the same bounding box as ``image`` if provided, and 

61 its units must be the square of ``image.unit`` or `None`. 

62 Values default to ``1.0``. Any attached projection is replaced 

63 (possibly by `None`). 

64 mask_schema 

65 Schema for the mask plane. Must be provided if and only if ``mask`` is 

66 not provided. 

67 projection 

68 Projection that maps the pixel grid to the sky. 

69 metadata 

70 Arbitrary flexible metadata to associate with the image. 

71 """ 

72 

73 def __init__( 

74 self, 

75 image: Image, 

76 *, 

77 mask: Mask | None = None, 

78 variance: Image | None = None, 

79 mask_schema: MaskSchema | None = None, 

80 projection: Projection | None = None, 

81 metadata: dict[str, MetadataValue] | None = None, 

82 ): 

83 super().__init__(metadata) 

84 if projection is None: 

85 projection = image.projection 

86 else: 

87 image = image.view(projection=projection) 

88 if mask is None: 

89 if mask_schema is None: 

90 raise TypeError("'mask_schema' must be provided if 'mask' is not.") 

91 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection) 

92 elif mask_schema is not None: 

93 raise TypeError("'mask_schema' may not be provided if 'mask' is.") 

94 else: 

95 if image.bbox != mask.bbox: 

96 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.") 

97 mask = mask.view(projection=projection) 

98 if variance is None: 

99 variance = Image( 

100 1.0, 

101 dtype=np.float32, 

102 bbox=image.bbox, 

103 unit=None if image.unit is None else image.unit**2, 

104 projection=projection, 

105 ) 

106 else: 

107 if image.bbox != variance.bbox: 

108 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.") 

109 variance = variance.view(projection=projection) 

110 if image.unit is None: 

111 if variance.unit is not None: 

112 raise ValueError(f"Image has no units but variance does ({variance.unit}).") 

113 elif variance.unit is None: 

114 variance = variance.view(unit=image.unit**2) 

115 elif variance.unit != image.unit**2: 

116 raise ValueError( 

117 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})." 

118 ) 

119 self._image = image 

120 self._mask = mask 

121 self._variance = variance 

122 

123 @property 

124 def image(self) -> Image: 

125 """The main image plane (`~lsst.images.Image`).""" 

126 return self._image 

127 

128 @property 

129 def mask(self) -> Mask: 

130 """The mask plane (`~lsst.images.Mask`).""" 

131 return self._mask 

132 

133 @property 

134 def variance(self) -> Image: 

135 """The variance plane (`~lsst.images.Image`).""" 

136 return self._variance 

137 

138 @property 

139 def bbox(self) -> Box: 

140 """The bounding box shared by all three image planes 

141 (`~lsst.images.Box`). 

142 """ 

143 return self._image.bbox 

144 

145 @property 

146 def unit(self) -> astropy.units.UnitBase | None: 

147 """The units of the image plane (`astropy.units.Unit` | `None`).""" 

148 return self._image.unit 

149 

150 @property 

151 def projection(self) -> Projection[Any] | None: 

152 """The projection that maps the pixel grid to the sky 

153 (`~lsst.images.Projection` | `None`). 

154 """ 

155 return self._image.projection 

156 

157 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage: 

158 if bbox is ...: 

159 return self 

160 super().__getitem__(bbox) 

161 return self._transfer_metadata( 

162 MaskedImage( 

163 # Projection and obs_info propagate from the image. 

164 self.image[bbox], 

165 mask=self.mask[bbox], 

166 variance=self.variance[bbox], 

167 ), 

168 bbox=bbox, 

169 ) 

170 

171 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None: 

172 self._image[bbox] = value.image 

173 self._mask[bbox] = value.mask 

174 self._variance[bbox] = value.variance 

175 

176 def __str__(self) -> str: 

177 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})" 

178 

179 def __repr__(self) -> str: 

180 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})" 

181 

182 def copy(self) -> MaskedImage: 

183 """Deep-copy the masked image and metadata.""" 

184 return self._transfer_metadata( 

185 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()), 

186 copy=True, 

187 ) 

188 

189 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel: 

190 """Serialize the masked image to an output archive. 

191 

192 Parameters 

193 ---------- 

194 archive 

195 Archive to write to. 

196 """ 

197 serialized_image = archive.serialize_direct( 

198 "image", functools.partial(self.image.serialize, save_projection=False) 

199 ) 

200 serialized_mask = archive.serialize_direct( 

201 "mask", functools.partial(self.mask.serialize, save_projection=False) 

202 ) 

203 serialized_variance = archive.serialize_direct( 

204 "variance", functools.partial(self.variance.serialize, save_projection=False) 

205 ) 

206 serialized_projection = ( 

207 archive.serialize_direct("projection", self.projection.serialize) 

208 if self.projection is not None 

209 else None 

210 ) 

211 return MaskedImageSerializationModel( 

212 image=serialized_image, 

213 mask=serialized_mask, 

214 variance=serialized_variance, 

215 projection=serialized_projection, 

216 metadata=self.metadata, 

217 ) 

218 

219 @staticmethod 

220 def _get_archive_tree_type[P: pydantic.BaseModel]( 

221 pointer_type: type[P], 

222 ) -> type[MaskedImageSerializationModel[P]]: 

223 """Return the serialization model type for this object for an archive 

224 type that uses the given pointer type. 

225 """ 

226 return MaskedImageSerializationModel[pointer_type] # type: ignore 

227 

228 def write_fits( 

229 self, 

230 filename: str, 

231 *, 

232 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

233 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

234 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

235 compression_seed: int | None = None, 

236 ) -> None: 

237 """Write the image to a FITS file. 

238 

239 Parameters 

240 ---------- 

241 filename 

242 Name of the file to write to. Must be a local file. 

243 image_compression 

244 Compression options for the `image` plane. 

245 mask_compression 

246 Compression options for the `mask` plane. 

247 variance_compression 

248 Compression options for the `variance` plane. 

249 compression_seed 

250 A FITS tile compression seed to use whenever the configured 

251 compression seed is `None` or (for backwards compatibility) ``0``. 

252 This value is then incremented every time it is used. 

253 """ 

254 compression_options = {} 

255 if image_compression is not fits.FitsCompressionOptions.DEFAULT: 

256 compression_options["image"] = image_compression 

257 if mask_compression is not fits.FitsCompressionOptions.DEFAULT: 

258 compression_options["mask"] = mask_compression 

259 if variance_compression is not fits.FitsCompressionOptions.DEFAULT: 

260 compression_options["variance"] = variance_compression 

261 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed) 

262 

263 @classmethod 

264 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage: 

265 """Read an image from a FITS file. 

266 

267 Parameters 

268 ---------- 

269 url 

270 URL of the file to read; may be any type supported by 

271 `lsst.resources.ResourcePath`. 

272 bbox 

273 Bounding box of a subimage to read instead. 

274 """ 

275 return fits.read(cls, url, bbox=bbox).deserialized 

276 

277 @staticmethod 

278 def from_legacy( 

279 legacy: LegacyMaskedImage, 

280 *, 

281 unit: astropy.units.UnitBase | None = None, 

282 plane_map: Mapping[str, MaskPlane] | None = None, 

283 ) -> MaskedImage: 

284 """Convert from an `lsst.afw.image.MaskedImage` instance. 

285 

286 Parameters 

287 ---------- 

288 legacy 

289 An `lsst.afw.image.MaskedImage` instance that will share image and 

290 variance (but not mask) pixel data with the returned object. 

291 unit 

292 Units of the image. 

293 plane_map 

294 A mapping from legacy mask plane name to the new plane name and 

295 description. 

296 """ 

297 return MaskedImage( 

298 image=Image.from_legacy(legacy.getImage(), unit), 

299 mask=Mask.from_legacy(legacy.getMask(), plane_map), 

300 variance=Image.from_legacy(legacy.getVariance()), 

301 ) 

302 

303 def to_legacy( 

304 self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None 

305 ) -> LegacyMaskedImage: 

306 """Convert to an `lsst.afw.image.MaskedImage` instance. 

307 

308 Parameters 

309 ---------- 

310 copy 

311 If `True`, always copy the image and variance pixel data. 

312 If `False`, return a view, and raise `TypeError` if the pixel data 

313 is read-only (this is not supported by afw). If `None`, only copy 

314 if the pixel data is read-only. Mask pixel data is always copied. 

315 plane_map 

316 A mapping from legacy mask plane name to the new plane name and 

317 description. 

318 """ 

319 import lsst.afw.image 

320 

321 return lsst.afw.image.MaskedImage( 

322 self.image.to_legacy(copy=copy), 

323 mask=self.mask.to_legacy(plane_map), 

324 variance=self.variance.to_legacy(copy=copy), 

325 dtype=self.image.array.dtype, 

326 ) 

327 

328 @overload 

329 @staticmethod 

330 def read_legacy( 330 ↛ exitline 330 didn't return from function 'read_legacy' because

331 uri: ResourcePathExpression, 

332 *, 

333 preserve_quantization: bool = False, 

334 component: Literal["image"], 

335 fits_wcs_frame: Frame | None = None, 

336 ) -> Image: ... 

337 

338 @overload 

339 @staticmethod 

340 def read_legacy( 340 ↛ exitline 340 didn't return from function 'read_legacy' because

341 uri: ResourcePathExpression, 

342 *, 

343 plane_map: Mapping[str, MaskPlane] | None = None, 

344 component: Literal["mask"], 

345 fits_wcs_frame: Frame | None = None, 

346 ) -> Mask: ... 

347 

348 @overload 

349 @staticmethod 

350 def read_legacy( 350 ↛ exitline 350 didn't return from function 'read_legacy' because

351 uri: ResourcePathExpression, 

352 *, 

353 preserve_quantization: bool = False, 

354 component: Literal["variance"], 

355 fits_wcs_frame: Frame | None = None, 

356 ) -> Image: ... 

357 

358 @overload 

359 @staticmethod 

360 def read_legacy( 360 ↛ exitline 360 didn't return from function 'read_legacy' because

361 uri: ResourcePathExpression, 

362 *, 

363 preserve_quantization: bool = False, 

364 plane_map: Mapping[str, MaskPlane] | None = None, 

365 component: None = None, 

366 fits_wcs_frame: Frame | None = None, 

367 ) -> MaskedImage: ... 

368 

369 @staticmethod 

370 def read_legacy( 

371 uri: ResourcePathExpression, 

372 *, 

373 preserve_quantization: bool = False, 

374 plane_map: Mapping[str, MaskPlane] | None = None, 

375 component: Literal["image", "mask", "variance"] | None = None, 

376 fits_wcs_frame: Frame | None = None, 

377 ) -> Any: 

378 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`. 

379 

380 Parameters 

381 ---------- 

382 uri 

383 URI or file name. 

384 preserve_quantization 

385 If `True`, ensure that writing the masked image back out again will 

386 exactly preserve quantization-compressed pixel values. This causes 

387 the image and variance plane arrays to be marked as read-only and 

388 stores the original binary table data for those planes in memory. 

389 If the `~lsst.images.MaskedImage` is copied, the precompressed 

390 pixel values are not transferred to the copy. 

391 plane_map 

392 A mapping from legacy mask plane name to the new plane name and 

393 description. 

394 component 

395 A component to read instead of the full image. 

396 fits_wcs_frame 

397 If not `None` and the HDU containing the image plane has a FITS 

398 WCS, attach a `~lsst.images.Projection` to the returned masked 

399 image by converting that WCS. When ``component`` is one of 

400 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the 

401 component HDU is used instead (all three should have the same WCS). 

402 """ 

403 fs, fspath = ResourcePath(uri).to_fsspec() 

404 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

405 return MaskedImage._read_legacy_hdus( 

406 hdu_list, 

407 uri, 

408 preserve_quantization=preserve_quantization, 

409 plane_map=plane_map, 

410 component=component, 

411 fits_wcs_frame=fits_wcs_frame, 

412 ) 

413 

414 @staticmethod 

415 def _read_legacy_hdus( 

416 hdu_list: astropy.io.fits.HDUList, 

417 uri: ResourcePathExpression, 

418 *, 

419 opaque_metadata: fits.FitsOpaqueMetadata | None = None, 

420 preserve_quantization: bool = False, 

421 plane_map: Mapping[str, MaskPlane] | None = None, 

422 component: Literal["image", "mask", "variance"] | None, 

423 fits_wcs_frame: Frame | None = None, 

424 ) -> Any: 

425 if opaque_metadata is None: 

426 opaque_metadata = fits.FitsOpaqueMetadata() 

427 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

428 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

429 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

430 result: Any 

431 with ExitStack() as exit_stack: 

432 if preserve_quantization: 

433 fs, fspath = ResourcePath(uri).to_fsspec() 

434 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

435 bintable_hdu_list = exit_stack.enter_context( 

436 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

437 ) 

438 image_bintable_hdu = bintable_hdu_list[1] 

439 variance_bintable_hdu = bintable_hdu_list[3] 

440 if component is None or component == "image": 

441 image = Image._read_legacy_hdu( 

442 hdu_list[1], 

443 opaque_metadata, 

444 preserve_bintable=image_bintable_hdu, 

445 fits_wcs_frame=fits_wcs_frame, 

446 ) 

447 if component == "image": 

448 result = image 

449 if component is None or component == "mask": 

450 mask = Mask._read_legacy_hdu( 

451 hdu_list[2], 

452 opaque_metadata, 

453 plane_map=plane_map, 

454 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

455 ) 

456 if component == "mask": 

457 result = mask 

458 if component is None or component == "variance": 

459 variance = Image._read_legacy_hdu( 

460 hdu_list[3], 

461 opaque_metadata, 

462 preserve_bintable=variance_bintable_hdu, 

463 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

464 ) 

465 if component == "variance": 

466 result = variance 

467 if component is None: 

468 result = MaskedImage(image, mask=mask, variance=variance) 

469 result._opaque_metadata = opaque_metadata 

470 return result 

471 

472 

473class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

474 """A Pydantic model used to represent a serialized `MaskedImage`.""" 

475 

476 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

477 mask: MaskSerializationModel[P] = pydantic.Field( 

478 description="Bitmask that annotates the main image's pixels." 

479 ) 

480 variance: ImageSerializationModel[P] = pydantic.Field( 

481 description="Per-pixel variance estimates for the main image." 

482 ) 

483 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

484 default=None, 

485 exclude_if=is_none, 

486 description="Projection that maps the pixel grid to the sky.", 

487 ) 

488 

489 @property 

490 def bbox(self) -> Box: 

491 """The bounding box of the image.""" 

492 return self.image.bbox 

493 

494 def deserialize( 

495 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any 

496 ) -> MaskedImage: 

497 """Deserialize an image from an input archive. 

498 

499 Parameters 

500 ---------- 

501 archive 

502 Archive to read from. 

503 bbox 

504 Bounding box of a subimage to read instead. 

505 **kwargs 

506 Unsupported keyword arguments are accepted only to provide better 

507 error messages (raising `serialization.InvalidParameterError`). 

508 """ 

509 if kwargs: 

510 raise InvalidParameterError(f"Unrecognized parameters for MaskedImage: {set(kwargs.keys())}.") 

511 image = self.image.deserialize(archive, bbox=bbox) 

512 mask = self.mask.deserialize(archive, bbox=bbox) 

513 variance = self.variance.deserialize(archive, bbox=bbox) 

514 projection = self.projection.deserialize(archive) if self.projection is not None else None 

515 return MaskedImage(image, mask=mask, variance=variance, projection=projection)._finish_deserialize( 

516 self 

517 ) 

518 

519 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

520 if component == "bbox" and kwargs: 

521 raise InvalidParameterError( 

522 f"Unrecognized parameters for MaskedImage.bbox: {set(kwargs.keys())}." 

523 ) 

524 return super().deserialize_component(component, archive, **kwargs)