Coverage for python / lsst / images / _masked_image.py: 33%

178 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:47 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("MaskedImage", "MaskedImageSerializationModel") 

15 

16import functools 

17from collections.abc import Mapping 

18from contextlib import ExitStack 

19from types import EllipsisType 

20from typing import Any, Literal, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from . import fits 

31from ._generalized_image import GeneralizedImage 

32from ._geom import Box 

33from ._image import Image, ImageSerializationModel 

34from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel 

35from ._transforms import Frame, Projection, ProjectionSerializationModel 

36from .serialization import ArchiveTree, InputArchive, InvalidParameterError, MetadataValue, OutputArchive 

37from .utils import is_none 

38 

39 

40class MaskedImage(GeneralizedImage): 

41 """A multi-plane image with data (image), mask, and variance planes. 

42 

43 Parameters 

44 ---------- 

45 image 

46 The main image plane. If this has a `Projection`, it will be used 

47 for all planes unless a ``projection`` is passed separately. 

48 mask 

49 A bitmask image that annotates the main image plane. Must have the 

50 same bounding box as ``image`` if provided. Any attached projection 

51 is replaced (possibly by `None`). 

52 variance 

53 The per-pixel uncertainty of the main image as an image of variance 

54 values. Must have the same bounding box as ``image`` if provided, and 

55 its units must be the square of ``image.unit`` or `None`. 

56 Values default to ``1.0``. Any attached projection is replaced 

57 (possibly by `None`). 

58 mask_schema 

59 Schema for the mask plane. Must be provided if and only if ``mask`` is 

60 not provided. 

61 projection 

62 Projection that maps the pixel grid to the sky. 

63 metadata 

64 Arbitrary flexible metadata to associate with the image. 

65 """ 

66 

67 def __init__( 

68 self, 

69 image: Image, 

70 *, 

71 mask: Mask | None = None, 

72 variance: Image | None = None, 

73 mask_schema: MaskSchema | None = None, 

74 projection: Projection | None = None, 

75 metadata: dict[str, MetadataValue] | None = None, 

76 ): 

77 super().__init__(metadata) 

78 if projection is None: 

79 projection = image.projection 

80 else: 

81 image = image.view(projection=projection) 

82 if mask is None: 

83 if mask_schema is None: 

84 raise TypeError("'mask_schema' must be provided if 'mask' is not.") 

85 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection) 

86 elif mask_schema is not None: 

87 raise TypeError("'mask_schema' may not be provided if 'mask' is.") 

88 else: 

89 if image.bbox != mask.bbox: 

90 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.") 

91 mask = mask.view(projection=projection) 

92 if variance is None: 

93 variance = Image( 

94 1.0, 

95 dtype=np.float32, 

96 bbox=image.bbox, 

97 unit=None if image.unit is None else image.unit**2, 

98 projection=projection, 

99 ) 

100 else: 

101 if image.bbox != variance.bbox: 

102 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.") 

103 variance = variance.view(projection=projection) 

104 if image.unit is None: 

105 if variance.unit is not None: 

106 raise ValueError(f"Image has no units but variance does ({variance.unit}).") 

107 elif variance.unit is None: 

108 variance = variance.view(unit=image.unit**2) 

109 elif variance.unit != image.unit**2: 

110 raise ValueError( 

111 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})." 

112 ) 

113 self._image = image 

114 self._mask = mask 

115 self._variance = variance 

116 

117 @property 

118 def image(self) -> Image: 

119 """The main image plane (`~lsst.images.Image`).""" 

120 return self._image 

121 

122 @property 

123 def mask(self) -> Mask: 

124 """The mask plane (`~lsst.images.Mask`).""" 

125 return self._mask 

126 

127 @property 

128 def variance(self) -> Image: 

129 """The variance plane (`~lsst.images.Image`).""" 

130 return self._variance 

131 

132 @property 

133 def bbox(self) -> Box: 

134 """The bounding box shared by all three image planes 

135 (`~lsst.images.Box`). 

136 """ 

137 return self._image.bbox 

138 

139 @property 

140 def unit(self) -> astropy.units.UnitBase | None: 

141 """The units of the image plane (`astropy.units.Unit` | `None`).""" 

142 return self._image.unit 

143 

144 @property 

145 def projection(self) -> Projection[Any] | None: 

146 """The projection that maps the pixel grid to the sky 

147 (`~lsst.images.Projection` | `None`). 

148 """ 

149 return self._image.projection 

150 

151 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage: 

152 if bbox is ...: 

153 return self 

154 super().__getitem__(bbox) 

155 return self._transfer_metadata( 

156 MaskedImage( 

157 # Projection and obs_info propagate from the image. 

158 self.image[bbox], 

159 mask=self.mask[bbox], 

160 variance=self.variance[bbox], 

161 ), 

162 bbox=bbox, 

163 ) 

164 

165 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None: 

166 self._image[bbox] = value.image 

167 self._mask[bbox] = value.mask 

168 self._variance[bbox] = value.variance 

169 

170 def __str__(self) -> str: 

171 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})" 

172 

173 def __repr__(self) -> str: 

174 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})" 

175 

176 def copy(self) -> MaskedImage: 

177 """Deep-copy the masked image and metadata.""" 

178 return self._transfer_metadata( 

179 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()), 

180 copy=True, 

181 ) 

182 

183 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel: 

184 """Serialize the masked image to an output archive. 

185 

186 Parameters 

187 ---------- 

188 archive 

189 Archive to write to. 

190 """ 

191 serialized_image = archive.serialize_direct( 

192 "image", functools.partial(self.image.serialize, save_projection=False) 

193 ) 

194 serialized_mask = archive.serialize_direct( 

195 "mask", functools.partial(self.mask.serialize, save_projection=False) 

196 ) 

197 serialized_variance = archive.serialize_direct( 

198 "variance", functools.partial(self.variance.serialize, save_projection=False) 

199 ) 

200 serialized_projection = ( 

201 archive.serialize_direct("projection", self.projection.serialize) 

202 if self.projection is not None 

203 else None 

204 ) 

205 return MaskedImageSerializationModel( 

206 image=serialized_image, 

207 mask=serialized_mask, 

208 variance=serialized_variance, 

209 projection=serialized_projection, 

210 metadata=self.metadata, 

211 ) 

212 

213 @staticmethod 

214 def _get_archive_tree_type[P: pydantic.BaseModel]( 

215 pointer_type: type[P], 

216 ) -> type[MaskedImageSerializationModel[P]]: 

217 """Return the serialization model type for this object for an archive 

218 type that uses the given pointer type. 

219 """ 

220 return MaskedImageSerializationModel[pointer_type] # type: ignore 

221 

222 def write_fits( 

223 self, 

224 filename: str, 

225 *, 

226 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

227 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

228 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

229 compression_seed: int | None = None, 

230 ) -> None: 

231 """Write the image to a FITS file. 

232 

233 Parameters 

234 ---------- 

235 filename 

236 Name of the file to write to. Must be a local file. 

237 image_compression 

238 Compression options for the `image` plane. 

239 mask_compression 

240 Compression options for the `mask` plane. 

241 variance_compression 

242 Compression options for the `variance` plane. 

243 compression_seed 

244 A FITS tile compression seed to use whenever the configured 

245 compression seed is `None` or (for backwards compatibility) ``0``. 

246 This value is then incremented every time it is used. 

247 """ 

248 compression_options = {} 

249 if image_compression is not fits.FitsCompressionOptions.DEFAULT: 

250 compression_options["image"] = image_compression 

251 if mask_compression is not fits.FitsCompressionOptions.DEFAULT: 

252 compression_options["mask"] = mask_compression 

253 if variance_compression is not fits.FitsCompressionOptions.DEFAULT: 

254 compression_options["variance"] = variance_compression 

255 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed) 

256 

257 @classmethod 

258 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage: 

259 """Read an image from a FITS file. 

260 

261 Parameters 

262 ---------- 

263 url 

264 URL of the file to read; may be any type supported by 

265 `lsst.resources.ResourcePath`. 

266 bbox 

267 Bounding box of a subimage to read instead. 

268 """ 

269 return fits.read(cls, url, bbox=bbox).deserialized 

270 

271 @staticmethod 

272 def from_legacy( 

273 legacy: Any, 

274 *, 

275 unit: astropy.units.UnitBase | None = None, 

276 plane_map: Mapping[str, MaskPlane] | None = None, 

277 ) -> MaskedImage: 

278 """Convert from an `lsst.afw.image.MaskedImage` instance. 

279 

280 Parameters 

281 ---------- 

282 legacy 

283 An `lsst.afw.image.MaskedImage` instance that will share image and 

284 variance (but not mask) pixel data with the returned object. 

285 unit 

286 Units of the image. 

287 plane_map 

288 A mapping from legacy mask plane name to the new plane name and 

289 description. 

290 """ 

291 return MaskedImage( 

292 image=Image.from_legacy(legacy.getImage(), unit), 

293 mask=Mask.from_legacy(legacy.getMask(), plane_map), 

294 variance=Image.from_legacy(legacy.getVariance()), 

295 ) 

296 

297 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

298 """Convert to an `lsst.afw.image.MaskedImage` instance. 

299 

300 Parameters 

301 ---------- 

302 copy 

303 If `True`, always copy the image and variance pixel data. 

304 If `False`, return a view, and raise `TypeError` if the pixel data 

305 is read-only (this is not supported by afw). If `None`, onyl if 

306 the pixel data is read-only. Mask pixel data is always copied. 

307 plane_map 

308 A mapping from legacy mask plane name to the new plane name and 

309 description. 

310 """ 

311 import lsst.afw.image 

312 

313 return lsst.afw.image.MaskedImage( 

314 self.image.to_legacy(copy=copy), 

315 mask=self.mask.to_legacy(plane_map), 

316 variance=self.variance.to_legacy(copy=copy), 

317 dtype=self.image.array.dtype, 

318 ) 

319 

320 @overload 

321 @staticmethod 

322 def read_legacy( 322 ↛ exitline 322 didn't return from function 'read_legacy' because

323 uri: ResourcePathExpression, 

324 *, 

325 preserve_quantization: bool = False, 

326 component: Literal["image"], 

327 fits_wcs_frame: Frame | None = None, 

328 ) -> Image: ... 

329 

330 @overload 

331 @staticmethod 

332 def read_legacy( 332 ↛ exitline 332 didn't return from function 'read_legacy' because

333 uri: ResourcePathExpression, 

334 *, 

335 plane_map: Mapping[str, MaskPlane] | None = None, 

336 component: Literal["mask"], 

337 fits_wcs_frame: Frame | None = None, 

338 ) -> Mask: ... 

339 

340 @overload 

341 @staticmethod 

342 def read_legacy( 342 ↛ exitline 342 didn't return from function 'read_legacy' because

343 uri: ResourcePathExpression, 

344 *, 

345 preserve_quantization: bool = False, 

346 component: Literal["variance"], 

347 fits_wcs_frame: Frame | None = None, 

348 ) -> Image: ... 

349 

350 @overload 

351 @staticmethod 

352 def read_legacy( 352 ↛ exitline 352 didn't return from function 'read_legacy' because

353 uri: ResourcePathExpression, 

354 *, 

355 preserve_quantization: bool = False, 

356 plane_map: Mapping[str, MaskPlane] | None = None, 

357 component: None = None, 

358 fits_wcs_frame: Frame | None = None, 

359 ) -> MaskedImage: ... 

360 

361 @staticmethod 

362 def read_legacy( 

363 uri: ResourcePathExpression, 

364 *, 

365 preserve_quantization: bool = False, 

366 plane_map: Mapping[str, MaskPlane] | None = None, 

367 component: Literal["image", "mask", "variance"] | None = None, 

368 fits_wcs_frame: Frame | None = None, 

369 ) -> Any: 

370 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`. 

371 

372 Parameters 

373 ---------- 

374 uri 

375 URI or file name. 

376 preserve_quantization 

377 If `True`, ensure that writing the masked image back out again will 

378 exactly preserve quantization-compressed pixel values. This causes 

379 the image and variance plane arrays to be marked as read-only and 

380 stores the original binary table data for those planes in memory. 

381 If the `~lsst.images.MaskedImage` is copied, the precompressed 

382 pixel values are not transferred to the copy. 

383 plane_map 

384 A mapping from legacy mask plane name to the new plane name and 

385 description. 

386 component 

387 A component to read instead of the full image. 

388 fits_wcs_frame 

389 If not `None` and the HDU containing the image plane has a FITS 

390 WCS, attach a `~lsst.images.Projection` to the returned masked 

391 image by converting that WCS. When ``component`` is one of 

392 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the 

393 component HDU is used instead (all three should have the same WCS). 

394 """ 

395 fs, fspath = ResourcePath(uri).to_fsspec() 

396 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

397 return MaskedImage._read_legacy_hdus( 

398 hdu_list, 

399 uri, 

400 preserve_quantization=preserve_quantization, 

401 plane_map=plane_map, 

402 component=component, 

403 fits_wcs_frame=fits_wcs_frame, 

404 ) 

405 

406 @staticmethod 

407 def _read_legacy_hdus( 

408 hdu_list: astropy.io.fits.HDUList, 

409 uri: ResourcePathExpression, 

410 *, 

411 opaque_metadata: fits.FitsOpaqueMetadata | None = None, 

412 preserve_quantization: bool = False, 

413 plane_map: Mapping[str, MaskPlane] | None = None, 

414 component: Literal["image", "mask", "variance"] | None, 

415 fits_wcs_frame: Frame | None = None, 

416 ) -> Any: 

417 if opaque_metadata is None: 

418 opaque_metadata = fits.FitsOpaqueMetadata() 

419 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

420 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

421 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

422 result: Any 

423 with ExitStack() as exit_stack: 

424 if preserve_quantization: 

425 fs, fspath = ResourcePath(uri).to_fsspec() 

426 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

427 bintable_hdu_list = exit_stack.enter_context( 

428 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

429 ) 

430 image_bintable_hdu = bintable_hdu_list[1] 

431 variance_bintable_hdu = bintable_hdu_list[3] 

432 if component is None or component == "image": 

433 image = Image._read_legacy_hdu( 

434 hdu_list[1], 

435 opaque_metadata, 

436 preserve_bintable=image_bintable_hdu, 

437 fits_wcs_frame=fits_wcs_frame, 

438 ) 

439 if component == "image": 

440 result = image 

441 if component is None or component == "mask": 

442 mask = Mask._read_legacy_hdu( 

443 hdu_list[2], 

444 opaque_metadata, 

445 plane_map=plane_map, 

446 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

447 ) 

448 if component == "mask": 

449 result = mask 

450 if component is None or component == "variance": 

451 variance = Image._read_legacy_hdu( 

452 hdu_list[3], 

453 opaque_metadata, 

454 preserve_bintable=variance_bintable_hdu, 

455 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

456 ) 

457 if component == "variance": 

458 result = variance 

459 if component is None: 

460 result = MaskedImage(image, mask=mask, variance=variance) 

461 result._opaque_metadata = opaque_metadata 

462 return result 

463 

464 

465class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

466 """A Pydantic model used to represent a serialized `MaskedImage`.""" 

467 

468 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

469 mask: MaskSerializationModel[P] = pydantic.Field( 

470 description="Bitmask that annotates the main image's pixels." 

471 ) 

472 variance: ImageSerializationModel[P] = pydantic.Field( 

473 description="Per-pixel variance estimates for the main image." 

474 ) 

475 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

476 default=None, 

477 exclude_if=is_none, 

478 description="Projection that maps the pixel grid to the sky.", 

479 ) 

480 

481 @property 

482 def bbox(self) -> Box: 

483 """The bounding box of the image.""" 

484 return self.image.bbox 

485 

486 def deserialize( 

487 self, archive: InputArchive[Any], *, bbox: Box | None = None, **kwargs: Any 

488 ) -> MaskedImage: 

489 """Deserialize an image from an input archive. 

490 

491 Parameters 

492 ---------- 

493 archive 

494 Archive to read from. 

495 bbox 

496 Bounding box of a subimage to read instead. 

497 **kwargs 

498 Unsupported keyword arguments are accepted only to provide better 

499 error messages (raising `serialization.InvalidParameterError`). 

500 """ 

501 if kwargs: 

502 raise InvalidParameterError(f"Unrecognized parameters for MaskedImage: {set(kwargs.keys())}.") 

503 image = self.image.deserialize(archive, bbox=bbox) 

504 mask = self.mask.deserialize(archive, bbox=bbox) 

505 variance = self.variance.deserialize(archive, bbox=bbox) 

506 projection = self.projection.deserialize(archive) if self.projection is not None else None 

507 return MaskedImage(image, mask=mask, variance=variance, projection=projection)._finish_deserialize( 

508 self 

509 ) 

510 

511 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

512 if component == "bbox" and kwargs: 

513 raise InvalidParameterError( 

514 f"Unrecognized parameters for MaskedImage.bbox: {set(kwargs.keys())}." 

515 ) 

516 return super().deserialize_component(component, archive, **kwargs)