Coverage for python / lsst / images / _masked_image.py: 34%

179 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-16 07:54 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("MaskedImage", "MaskedImageSerializationModel") 

15 

16import functools 

17from collections.abc import Mapping 

18from contextlib import ExitStack 

19from types import EllipsisType 

20from typing import Any, Literal, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import Box 

34from ._image import Image, ImageSerializationModel 

35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel 

36from ._transforms import Frame, Projection, ProjectionSerializationModel 

37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive 

38from .utils import is_none 

39 

40 

41class MaskedImage(GeneralizedImage): 

42 """A multi-plane image with data (image), mask, and variance planes. 

43 

44 Parameters 

45 ---------- 

46 image 

47 The main image plane. If this has a `Projection`, it will be used 

48 for all planes unless a ``projection`` is passed separately. 

49 mask 

50 A bitmask image that annotates the main image plane. Must have the 

51 same bounding box as ``image`` if provided. Any attached projection 

52 is replaced (possibly by `None`). 

53 variance 

54 The per-pixel uncertainty of the main image as an image of variance 

55 values. Must have the same bounding box as ``image`` if provided, and 

56 its units must be the square of ``image.unit`` or `None`. 

57 Values default to ``1.0``. Any attached projection is replaced 

58 (possibly by `None`). 

59 mask_schema 

60 Schema for the mask plane. Must be provided if and only if ``mask`` is 

61 not provided. 

62 projection 

63 Projection that maps the pixel grid to the sky. 

64 obs_info 

65 General information about the associated observation in standardized 

66 form. 

67 metadata 

68 Arbitrary flexible metadata to associate with the image. 

69 """ 

70 

71 def __init__( 

72 self, 

73 image: Image, 

74 *, 

75 mask: Mask | None = None, 

76 variance: Image | None = None, 

77 mask_schema: MaskSchema | None = None, 

78 projection: Projection | None = None, 

79 obs_info: ObservationInfo | None = None, 

80 metadata: dict[str, MetadataValue] | None = None, 

81 ): 

82 super().__init__(metadata) 

83 if projection is None: 

84 projection = image.projection 

85 else: 

86 image = image.view(projection=projection) 

87 if obs_info is None: 

88 obs_info = image.obs_info 

89 else: 

90 image = image.view(obs_info=obs_info) 

91 if mask is None: 

92 if mask_schema is None: 

93 raise TypeError("'mask_schema' must be provided if 'mask' is not.") 

94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info) 

95 elif mask_schema is not None: 

96 raise TypeError("'mask_schema' may not be provided if 'mask' is.") 

97 else: 

98 if image.bbox != mask.bbox: 

99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.") 

100 mask = mask.view(projection=projection, obs_info=obs_info) 

101 if variance is None: 

102 variance = Image( 

103 1.0, 

104 dtype=np.float32, 

105 bbox=image.bbox, 

106 unit=None if image.unit is None else image.unit**2, 

107 projection=projection, 

108 obs_info=obs_info, 

109 ) 

110 else: 

111 if image.bbox != variance.bbox: 

112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.") 

113 variance = variance.view(projection=projection, obs_info=obs_info) 

114 if image.unit is None: 

115 if variance.unit is not None: 

116 raise ValueError(f"Image has no units but variance does ({variance.unit}).") 

117 elif variance.unit is None: 

118 variance = variance.view(unit=image.unit**2) 

119 elif variance.unit != image.unit**2: 

120 raise ValueError( 

121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})." 

122 ) 

123 self._image = image 

124 self._mask = mask 

125 self._variance = variance 

126 

127 @property 

128 def image(self) -> Image: 

129 """The main image plane (`~lsst.images.Image`).""" 

130 return self._image 

131 

132 @property 

133 def mask(self) -> Mask: 

134 """The mask plane (`~lsst.images.Mask`).""" 

135 return self._mask 

136 

137 @property 

138 def variance(self) -> Image: 

139 """The variance plane (`~lsst.images.Image`).""" 

140 return self._variance 

141 

142 @property 

143 def bbox(self) -> Box: 

144 """The bounding box shared by all three image planes 

145 (`~lsst.images.Box`). 

146 """ 

147 return self._image.bbox 

148 

149 @property 

150 def unit(self) -> astropy.units.UnitBase | None: 

151 """The units of the image plane (`astropy.units.Unit` | `None`).""" 

152 return self._image.unit 

153 

154 @property 

155 def projection(self) -> Projection[Any] | None: 

156 """The projection that maps the pixel grid to the sky 

157 (`~lsst.images.Projection` | `None`). 

158 """ 

159 return self._image.projection 

160 

161 @property 

162 def obs_info(self) -> ObservationInfo | None: 

163 """General information about the associated observation in standard 

164 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

165 """ 

166 return self._image.obs_info 

167 

168 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage: 

169 if bbox is ...: 

170 return self 

171 super().__getitem__(bbox) 

172 return self._transfer_metadata( 

173 MaskedImage( 

174 # Projection and obs_info propagate from the image. 

175 self.image[bbox], 

176 mask=self.mask[bbox], 

177 variance=self.variance[bbox], 

178 ), 

179 bbox=bbox, 

180 ) 

181 

182 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None: 

183 self._image[bbox] = value.image 

184 self._mask[bbox] = value.mask 

185 self._variance[bbox] = value.variance 

186 

187 def __str__(self) -> str: 

188 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})" 

189 

190 def __repr__(self) -> str: 

191 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})" 

192 

193 def copy(self) -> MaskedImage: 

194 """Deep-copy the masked image and metadata.""" 

195 return self._transfer_metadata( 

196 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()), 

197 copy=True, 

198 ) 

199 

200 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel: 

201 """Serialize the masked image to an output archive. 

202 

203 Parameters 

204 ---------- 

205 archive 

206 Archive to write to. 

207 """ 

208 serialized_image = archive.serialize_direct( 

209 "image", functools.partial(self.image.serialize, save_projection=False, save_obs_info=False) 

210 ) 

211 serialized_mask = archive.serialize_direct( 

212 "mask", functools.partial(self.mask.serialize, save_projection=False, save_obs_info=False) 

213 ) 

214 serialized_variance = archive.serialize_direct( 

215 "variance", functools.partial(self.variance.serialize, save_projection=False, save_obs_info=False) 

216 ) 

217 serialized_projection = ( 

218 archive.serialize_direct("projection", self.projection.serialize) 

219 if self.projection is not None 

220 else None 

221 ) 

222 return MaskedImageSerializationModel( 

223 image=serialized_image, 

224 mask=serialized_mask, 

225 variance=serialized_variance, 

226 projection=serialized_projection, 

227 obs_info=self.obs_info, 

228 metadata=self.metadata, 

229 ) 

230 

231 @staticmethod 

232 def _get_archive_tree_type[P: pydantic.BaseModel]( 

233 pointer_type: type[P], 

234 ) -> type[MaskedImageSerializationModel[P]]: 

235 """Return the serialization model type for this object for an archive 

236 type that uses the given pointer type. 

237 """ 

238 return MaskedImageSerializationModel[pointer_type] # type: ignore 

239 

240 def write_fits( 

241 self, 

242 filename: str, 

243 *, 

244 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

245 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

246 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

247 compression_seed: int | None = None, 

248 ) -> None: 

249 """Write the image to a FITS file. 

250 

251 Parameters 

252 ---------- 

253 filename 

254 Name of the file to write to. Must be a local file. 

255 image_compression 

256 Compression options for the `image` plane. 

257 mask_compression 

258 Compression options for the `mask` plane. 

259 variance_compression 

260 Compression options for the `variance` plane. 

261 compression_seed 

262 A FITS tile compression seed to use whenever the configured 

263 compression seed is `None` or (for backwards compatibility) ``0``. 

264 This value is then incremented every time it is used. 

265 """ 

266 compression_options = {} 

267 if image_compression is not fits.FitsCompressionOptions.DEFAULT: 

268 compression_options["image"] = image_compression 

269 if mask_compression is not fits.FitsCompressionOptions.DEFAULT: 

270 compression_options["mask"] = mask_compression 

271 if variance_compression is not fits.FitsCompressionOptions.DEFAULT: 

272 compression_options["variance"] = variance_compression 

273 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed) 

274 

275 @classmethod 

276 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage: 

277 """Read an image from a FITS file. 

278 

279 Parameters 

280 ---------- 

281 url 

282 URL of the file to read; may be any type supported by 

283 `lsst.resources.ResourcePath`. 

284 bbox 

285 Bounding box of a subimage to read instead. 

286 """ 

287 return fits.read(cls, url, bbox=bbox).deserialized 

288 

289 @staticmethod 

290 def from_legacy( 

291 legacy: Any, 

292 *, 

293 unit: astropy.units.Unit | None = None, 

294 plane_map: Mapping[str, MaskPlane] | None = None, 

295 ) -> MaskedImage: 

296 """Convert from an `lsst.afw.image.MaskedImage` instance. 

297 

298 Parameters 

299 ---------- 

300 legacy 

301 An `lsst.afw.image.MaskedImage` instance that will share image and 

302 variance (but not mask) pixel data with the returned object. 

303 unit 

304 Units of the image. 

305 plane_map 

306 A mapping from legacy mask plane name to the new plane name and 

307 description. 

308 """ 

309 return MaskedImage( 

310 image=Image.from_legacy(legacy.getImage(), unit), 

311 mask=Mask.from_legacy(legacy.getMask(), plane_map), 

312 variance=Image.from_legacy(legacy.getVariance()), 

313 ) 

314 

315 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

316 """Convert to an `lsst.afw.image.MaskedImage` instance. 

317 

318 Parameters 

319 ---------- 

320 copy 

321 If `True`, always copy the image and variance pixel data. 

322 If `False`, return a view, and raise `TypeError` if the pixel data 

323 is read-only (this is not supported by afw). If `None`, onyl if 

324 the pixel data is read-only. Mask pixel data is always copied. 

325 plane_map 

326 A mapping from legacy mask plane name to the new plane name and 

327 description. 

328 """ 

329 import lsst.afw.image 

330 

331 return lsst.afw.image.MaskedImage( 

332 self.image.to_legacy(copy=copy), 

333 mask=self.mask.to_legacy(plane_map), 

334 variance=self.variance.to_legacy(copy=copy), 

335 dtype=self.image.array.dtype, 

336 ) 

337 

338 @overload 

339 @staticmethod 

340 def read_legacy( 340 ↛ exitline 340 didn't return from function 'read_legacy' because

341 uri: ResourcePathExpression, 

342 *, 

343 preserve_quantization: bool = False, 

344 component: Literal["image"], 

345 fits_wcs_frame: Frame | None = None, 

346 ) -> Image: ... 

347 

348 @overload 

349 @staticmethod 

350 def read_legacy( 350 ↛ exitline 350 didn't return from function 'read_legacy' because

351 uri: ResourcePathExpression, 

352 *, 

353 plane_map: Mapping[str, MaskPlane] | None = None, 

354 component: Literal["mask"], 

355 fits_wcs_frame: Frame | None = None, 

356 ) -> Mask: ... 

357 

358 @overload 

359 @staticmethod 

360 def read_legacy( 360 ↛ exitline 360 didn't return from function 'read_legacy' because

361 uri: ResourcePathExpression, 

362 *, 

363 preserve_quantization: bool = False, 

364 component: Literal["variance"], 

365 fits_wcs_frame: Frame | None = None, 

366 ) -> Image: ... 

367 

368 @overload 

369 @staticmethod 

370 def read_legacy( 370 ↛ exitline 370 didn't return from function 'read_legacy' because

371 uri: ResourcePathExpression, 

372 *, 

373 preserve_quantization: bool = False, 

374 plane_map: Mapping[str, MaskPlane] | None = None, 

375 component: None = None, 

376 fits_wcs_frame: Frame | None = None, 

377 ) -> MaskedImage: ... 

378 

379 @staticmethod 

380 def read_legacy( 

381 uri: ResourcePathExpression, 

382 *, 

383 preserve_quantization: bool = False, 

384 plane_map: Mapping[str, MaskPlane] | None = None, 

385 component: Literal["image", "mask", "variance"] | None = None, 

386 fits_wcs_frame: Frame | None = None, 

387 ) -> Any: 

388 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`. 

389 

390 Parameters 

391 ---------- 

392 uri 

393 URI or file name. 

394 preserve_quantization 

395 If `True`, ensure that writing the masked image back out again will 

396 exactly preserve quantization-compressed pixel values. This causes 

397 the image and variance plane arrays to be marked as read-only and 

398 stores the original binary table data for those planes in memory. 

399 If the `~lsst.images.MaskedImage` is copied, the precompressed 

400 pixel values are not transferred to the copy. 

401 plane_map 

402 A mapping from legacy mask plane name to the new plane name and 

403 description. 

404 component 

405 A component to read instead of the full image. 

406 fits_wcs_frame 

407 If not `None` and the HDU containing the image plane has a FITS 

408 WCS, attach a `~lsst.images.Projection` to the returned masked 

409 image by converting that WCS. When ``component`` is one of 

410 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the 

411 component HDU is used instead (all three should have the same WCS). 

412 """ 

413 fs, fspath = ResourcePath(uri).to_fsspec() 

414 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

415 return MaskedImage._read_legacy_hdus( 

416 hdu_list, 

417 uri, 

418 preserve_quantization=preserve_quantization, 

419 plane_map=plane_map, 

420 component=component, 

421 fits_wcs_frame=fits_wcs_frame, 

422 ) 

423 

424 @staticmethod 

425 def _read_legacy_hdus( 

426 hdu_list: astropy.io.fits.HDUList, 

427 uri: ResourcePathExpression, 

428 *, 

429 preserve_quantization: bool = False, 

430 plane_map: Mapping[str, MaskPlane] | None = None, 

431 component: Literal["image", "mask", "variance"] | None, 

432 fits_wcs_frame: Frame | None = None, 

433 ) -> Any: 

434 opaque_metadata = fits.FitsOpaqueMetadata() 

435 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

436 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

437 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

438 result: Any 

439 with ExitStack() as exit_stack: 

440 if preserve_quantization: 

441 fs, fspath = ResourcePath(uri).to_fsspec() 

442 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

443 bintable_hdu_list = exit_stack.enter_context( 

444 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

445 ) 

446 image_bintable_hdu = bintable_hdu_list[1] 

447 variance_bintable_hdu = bintable_hdu_list[3] 

448 if component is None or component == "image": 

449 image = Image._read_legacy_hdu( 

450 hdu_list[1], 

451 opaque_metadata, 

452 preserve_bintable=image_bintable_hdu, 

453 fits_wcs_frame=fits_wcs_frame, 

454 ) 

455 if component == "image": 

456 result = image 

457 if component is None or component == "mask": 

458 mask = Mask._read_legacy_hdu( 

459 hdu_list[2], 

460 opaque_metadata, 

461 plane_map=plane_map, 

462 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

463 ) 

464 if component == "mask": 

465 result = mask 

466 if component is None or component == "variance": 

467 variance = Image._read_legacy_hdu( 

468 hdu_list[3], 

469 opaque_metadata, 

470 preserve_bintable=variance_bintable_hdu, 

471 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

472 ) 

473 if component == "variance": 

474 result = variance 

475 if component is None: 

476 result = MaskedImage(image, mask=mask, variance=variance) 

477 result._opaque_metadata = opaque_metadata 

478 return result 

479 

480 

481class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

482 """A Pydantic model used to represent a serialized `MaskedImage`.""" 

483 

484 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

485 mask: MaskSerializationModel[P] = pydantic.Field( 

486 description="Bitmask that annotates the main image's pixels." 

487 ) 

488 variance: ImageSerializationModel[P] = pydantic.Field( 

489 description="Per-pixel variance estimates for the main image." 

490 ) 

491 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

492 default=None, 

493 exclude_if=is_none, 

494 description="Projection that maps the pixel grid to the sky.", 

495 ) 

496 obs_info: ObservationInfo | None = pydantic.Field( 

497 default=None, 

498 exclude_if=is_none, 

499 description="Standardized description of image metadata", 

500 ) 

501 

502 @property 

503 def bbox(self) -> Box: 

504 """The bounding box of the image.""" 

505 return self.image.bbox 

506 

507 def deserialize(self, archive: InputArchive[Any], *, bbox: Box | None = None) -> MaskedImage: 

508 """Deserialize an image from an input archive. 

509 

510 Parameters 

511 ---------- 

512 archive 

513 Archive to read from. 

514 bbox 

515 Bounding box of a subimage to read instead. 

516 """ 

517 image = self.image.deserialize(archive, bbox=bbox) 

518 mask = self.mask.deserialize(archive, bbox=bbox) 

519 variance = self.variance.deserialize(archive, bbox=bbox) 

520 projection = self.projection.deserialize(archive) if self.projection is not None else None 

521 return MaskedImage( 

522 image, mask=mask, variance=variance, projection=projection, obs_info=self.obs_info 

523 )._finish_deserialize(self)