Coverage for python / lsst / images / _image.py: 25%

218 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-13 01:50 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Image", "ImageSerializationModel") 

15 

16from collections.abc import Callable, Sequence 

17from contextlib import ExitStack 

18from types import EllipsisType 

19from typing import Any, ClassVar, final 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import numpy as np 

25import numpy.typing as npt 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import YX, Box 

34from ._transforms import Frame, Projection, ProjectionSerializationModel 

35from .serialization import ( 

36 ArchiveTree, 

37 ArrayReferenceModel, 

38 ArrayReferenceQuantityModel, 

39 InlineArrayModel, 

40 InlineArrayQuantityModel, 

41 InputArchive, 

42 MetadataValue, 

43 OutputArchive, 

44 no_header_updates, 

45) 

46from .utils import is_none 

47 

48 

49@final 

50class Image(GeneralizedImage): 

51 """A 2-d array that may be augmented with units and a nonzero origin. 

52 

53 Parameters 

54 ---------- 

55 array_or_fill 

56 Array or fill value for the image. If a fill value, ``bbox`` or 

57 ``shape`` must be provided. 

58 bbox 

59 Bounding box for the image. 

60 start 

61 Logical coordinates of the first pixel in the array, ordered ``y``, 

62 ``x`` (unless an `XY` instance is passed). Ignored if 

63 ``bbox`` is provided. Defaults to zeros. 

64 shape 

65 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

66 instance is passed). Only needed if ``array_or_fill`` is not an 

67 array and ``bbox`` is not provided. Like the bbox, this does not 

68 include the last dimension of the array. 

69 dtype 

70 Pixel data type override. 

71 unit 

72 Units for the image's pixel values. 

73 projection 

74 Projection that maps the pixel grid to the sky. 

75 obs_info 

76 General information about the associated observation in standardized 

77 form. 

78 metadata 

79 Arbitrary flexible metadata to associate with the image. 

80 

81 Notes 

82 ----- 

83 Indexing the `array` attribute of an `Image` does not take into account its 

84 ``start`` offset, but accessing a subimage by indexing an `Image` with a 

85 `Box` does, and the `bbox` of the subimage is set to match its location 

86 within the original image. 

87 

88 Indexed assignment to a subimage requires consistency between the 

89 coordinate systems and units of both operands, but it will automatically 

90 select a subimage of the right-hand side and convert compatible units when 

91 possible. In other words:: 

92 

93 a[box] = b 

94 

95 is a shortcut for 

96 

97 a[box].quantity = b[box].quantity 

98 

99 An ellipsis (``...``) can be used instead of a `Box` to assign to the full 

100 image. 

101 """ 

102 

103 def __init__( 

104 self, 

105 array_or_fill: np.ndarray | int | float = 0, 

106 /, 

107 *, 

108 bbox: Box | None = None, 

109 start: Sequence[int] | None = None, 

110 shape: Sequence[int] | None = None, 

111 dtype: npt.DTypeLike | None = None, 

112 unit: astropy.units.UnitBase | None = None, 

113 projection: Projection[Any] | None = None, 

114 obs_info: ObservationInfo | None = None, 

115 metadata: dict[str, MetadataValue] | None = None, 

116 ): 

117 super().__init__(metadata) 

118 if isinstance(array_or_fill, np.ndarray): 

119 if dtype is not None: 

120 array = np.array(array_or_fill, dtype=dtype) 

121 else: 

122 array = array_or_fill 

123 if bbox is None: 

124 bbox = Box.from_shape(array.shape, start=start) 

125 elif bbox.shape != array.shape: 

126 raise ValueError( 

127 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}." 

128 ) 

129 if shape is not None and shape != array.shape: 

130 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.") 

131 else: 

132 if bbox is None: 

133 if shape is None: 

134 raise TypeError("No bbox, shape, or array provided.") 

135 bbox = Box.from_shape(shape, start=start) 

136 elif shape is not None and shape != bbox.shape: 

137 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.") 

138 array = np.full(bbox.shape, array_or_fill, dtype=dtype) 

139 self._array: np.ndarray = array 

140 self._bbox: Box = bbox 

141 self._unit = unit 

142 self._projection = projection 

143 self._obs_info = obs_info 

144 

145 @property 

146 def array(self) -> np.ndarray: 

147 """The low-level array (`numpy.ndarray`). 

148 

149 Assigning to this attribute modifies the existing array in place; the 

150 bounding box and underlying data pointer are never changed. 

151 """ 

152 return self._array 

153 

154 @array.setter 

155 def array(self, value: np.ndarray | int | float) -> None: 

156 self._array[...] = value 

157 

158 @property 

159 def quantity(self) -> astropy.units.Quantity: 

160 """The low-level array with units (`astropy.units.Quantity`). 

161 

162 Assigning to this attribute modifies the existing array in place; the 

163 bounding box and underlying data pointer are never changed. 

164 """ 

165 return astropy.units.Quantity(self._array, self._unit, copy=False) 

166 

167 @quantity.setter 

168 def quantity(self, value: astropy.units.Quantity) -> None: 

169 self.quantity[...] = value 

170 

171 @property 

172 def bbox(self) -> Box: 

173 """Bounding box for the image (`Box`).""" 

174 return self._bbox 

175 

176 @property 

177 def unit(self) -> astropy.units.UnitBase | None: 

178 """Units for the image's pixel values (`astropy.units.Unit` or 

179 `None`). 

180 """ 

181 return self._unit 

182 

183 @property 

184 def projection(self) -> Projection[Any] | None: 

185 """The projection that maps this image's pixel grid to the sky 

186 (`Projection` | `None`). 

187 

188 Notes 

189 ----- 

190 The pixel coordinates used by this projection account for the bounding 

191 box ``start``; they are not just array indices. 

192 """ 

193 return self._projection 

194 

195 @property 

196 def obs_info(self) -> ObservationInfo | None: 

197 """General information about the associated observation in standard 

198 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

199 """ 

200 return self._obs_info 

201 

202 def __getitem__(self, bbox: Box | EllipsisType) -> Image: 

203 if bbox is ...: 

204 return self 

205 super().__getitem__(bbox) 

206 indices = bbox.slice_within(self._bbox) 

207 return self._transfer_metadata( 

208 Image( 

209 self._array[indices], 

210 bbox=bbox, 

211 unit=self._unit, 

212 projection=self._projection, 

213 obs_info=self._obs_info, 

214 ), 

215 bbox=bbox, 

216 ) 

217 

218 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None: 

219 self[bbox].quantity[...] = value.quantity 

220 

221 def __str__(self) -> str: 

222 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})" 

223 

224 def __repr__(self) -> str: 

225 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})" 

226 

227 def __eq__(self, other: object) -> bool: 

228 if not isinstance(other, Image): 

229 return NotImplemented 

230 return ( 

231 self._bbox == other._bbox 

232 and self._unit == other._unit 

233 and np.array_equal(self._array, other._array, equal_nan=True) 

234 ) 

235 

236 def copy(self) -> Image: 

237 return self._transfer_metadata( 

238 Image( 

239 self._array.copy(), 

240 bbox=self._bbox, 

241 unit=self._unit, 

242 projection=self._projection, 

243 obs_info=self._obs_info, 

244 ), 

245 copy=True, 

246 ) 

247 

248 def view( 

249 self, 

250 *, 

251 unit: astropy.units.UnitBase | None | EllipsisType = ..., 

252 projection: Projection | None | EllipsisType = ..., 

253 start: Sequence[int] | EllipsisType = ..., 

254 obs_info: ObservationInfo | None | EllipsisType = ..., 

255 ) -> Image: 

256 """Make a view of the image, with optional updates.""" 

257 if unit is ...: 

258 unit = self._unit 

259 if projection is ...: 

260 projection = self._projection 

261 if start is ...: 

262 start = self._bbox.start 

263 if obs_info is ...: 

264 obs_info = self._obs_info 

265 return self._transfer_metadata( 

266 Image(self._array, start=start, unit=unit, projection=projection, obs_info=obs_info) 

267 ) 

268 

269 def serialize[P: pydantic.BaseModel]( 

270 self, 

271 archive: OutputArchive[P], 

272 *, 

273 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

274 save_projection: bool = True, 

275 save_obs_info: bool = True, 

276 add_offset_wcs: str | None = "A", 

277 ) -> ImageSerializationModel[P]: 

278 """Serialize the image to an output archive. 

279 

280 Parameters 

281 ---------- 

282 archive 

283 Archive to write to. 

284 update_header 

285 A callback that will be given the FITS header for the HDU 

286 containing this image in order to add keys to it. This callback 

287 may be provided but will not be called if the output format is not 

288 FITS. 

289 save_projection 

290 If `True`, save the `Projection` attached to the image, if there 

291 is one. This does not affect whether a FITS WCS corresponding to 

292 the projection is written (it always is, if available, and if 

293 ``add_offset_wcs`` is not ``" "``). 

294 save_obs_info 

295 If `True`, save the 

296 `~astro_metadata_translator.ObservationInfo` attached to the 

297 image, if there is one. 

298 add_offset_wcs 

299 A FITS WCS single-character suffix to use when adding a linear 

300 WCS that maps the FITS array to the logical pixel coordinates 

301 defined by ``bbox.start``. Set to `None` to not write this WCS. 

302 If this is set to ``" "``, it will prevent the `Projection` from 

303 being saved as a FITS WCS. 

304 """ 

305 

306 def _update_header(header: astropy.io.fits.Header) -> None: 

307 update_header(header) 

308 if self.unit is not None: 

309 try: 

310 header["BUNIT"] = self.unit.to_string(format="fits") 

311 except ValueError: 

312 # Units not supported by FITS. 

313 pass 

314 if self.projection is not None and add_offset_wcs != " ": 

315 if self.fits_wcs: 

316 header.update(self.fits_wcs.to_header(relax=True)) 

317 if add_offset_wcs is not None: 

318 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

319 

320 array_model = archive.add_array(self.array, update_header=_update_header) 

321 serialized_projection: ProjectionSerializationModel[P] | None = None 

322 if save_projection and self.projection is not None: 

323 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

324 data = array_model if self.unit is None else array_model.with_units(self.unit) 

325 return ImageSerializationModel.model_construct( 

326 data=data, 

327 start=list(self.bbox.start), 

328 projection=serialized_projection, 

329 obs_info=self._obs_info if save_obs_info else None, 

330 metadata=self.metadata, 

331 ) 

332 

333 @staticmethod 

334 def deserialize( 

335 model: ImageSerializationModel[Any], 

336 archive: InputArchive[Any], 

337 *, 

338 bbox: Box | None = None, 

339 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

340 ) -> Image: 

341 """Deserialize an image from an input archive. 

342 

343 Parameters 

344 ---------- 

345 model 

346 A Pydantic model representation of the image, holding references 

347 to data stored in the archive. 

348 archive 

349 Archive to read from. 

350 bbox 

351 Bounding box of a subimage to read instead. 

352 strip_header 

353 A callable that strips out any FITS header cards added by the 

354 ``update_header`` argument in the corresponding call to 

355 `serialize`. 

356 """ 

357 array_model: ArrayReferenceModel | InlineArrayModel 

358 unit: astropy.units.UnitBase | None = None 

359 if isinstance(model.data, ArrayReferenceQuantityModel | InlineArrayQuantityModel): 

360 array_model = model.data.value 

361 unit = model.data.unit 

362 else: 

363 array_model = model.data 

364 

365 def _strip_header(header: astropy.io.fits.Header) -> None: 

366 if unit is not None: 

367 header.pop("BUNIT", None) 

368 fits.strip_wcs_cards(header) 

369 strip_header(header) 

370 

371 slices = bbox.slice_within(model.bbox) if bbox is not None else ... 

372 array = archive.get_array(array_model, strip_header=_strip_header, slices=slices) 

373 projection = ( 

374 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

375 ) 

376 return Image( 

377 array, 

378 start=model.start if bbox is None else bbox.start, 

379 unit=unit, 

380 projection=projection, 

381 obs_info=model.obs_info, 

382 )._finish_deserialize(model) 

383 

384 @staticmethod 

385 def _get_archive_tree_type[P: pydantic.BaseModel]( 

386 pointer_type: type[P], 

387 ) -> type[ImageSerializationModel[P]]: 

388 """Return the serialization model type for this object for an archive 

389 type that uses the given pointer type. 

390 """ 

391 return ImageSerializationModel[pointer_type] # type: ignore 

392 

393 _archive_default_name: ClassVar[str] = "image" 

394 """The name this object should be serialized with when written as the 

395 top-level object. 

396 """ 

397 

398 def write_fits( 

399 self, 

400 filename: str, 

401 *, 

402 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

403 compression_seed: int | None = None, 

404 ) -> None: 

405 """Write the image to a FITS file. 

406 

407 Parameters 

408 ---------- 

409 filename 

410 Name of the file to write to. Must be a local file. 

411 compression 

412 Compression options. 

413 compression_seed 

414 A FITS tile compression seed to use whenever the configured 

415 compression seed is `None` or (for backwards compatibility) ``0``. 

416 """ 

417 compression_options = {} 

418 if compression is not fits.FitsCompressionOptions.DEFAULT: 

419 compression_options[self._archive_default_name] = compression 

420 fits.write(self, filename, compression_options, compression_seed=compression_seed) 

421 

422 @staticmethod 

423 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image: 

424 """Read an image from a FITS file. 

425 

426 Parameters 

427 ---------- 

428 url 

429 URL of the file to read; may be any type supported by 

430 `lsst.resources.ResourcePath`. 

431 bbox 

432 Bounding box of a subimage to read instead. 

433 """ 

434 return fits.read(Image, url, bbox=bbox).deserialized 

435 

436 @staticmethod 

437 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image: 

438 """Convert from an `lsst.afw.image.Image` instance. 

439 

440 Parameters 

441 ---------- 

442 legacy 

443 An `lsst.afw.image.Image` instance that will share pixel data with 

444 the returned object. 

445 unit 

446 Units of the image. 

447 """ 

448 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit) 

449 

450 def to_legacy(self, *, copy: bool | None = None) -> Any: 

451 """Convert to an `lsst.afw.image.Image` instance. 

452 

453 Parameters 

454 ---------- 

455 copy 

456 If `True`, always copy the pixel data. If `False`, return a view, 

457 and raise `TypeError` if the pixel data is read-only (this is not 

458 supported by afw). If `None`, onyl if the pixel data is 

459 read-only. 

460 """ 

461 import lsst.afw.image 

462 import lsst.geom 

463 

464 array = self._array 

465 if copy: 

466 array = array.copy() 

467 elif not self._array.flags.writeable: 

468 if copy is None: 

469 array = array.copy() 

470 else: 

471 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.") 

472 

473 return lsst.afw.image.Image( 

474 array, 

475 deep=False, 

476 dtype=array.dtype.type, 

477 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min), 

478 ) 

479 

480 @staticmethod 

481 def read_legacy( 

482 uri: ResourcePathExpression, 

483 *, 

484 preserve_quantization: bool = False, 

485 ext: str | int = 1, 

486 fits_wcs_frame: Frame | None = None, 

487 ) -> Image: 

488 """Read a FITS file written by `lsst.afw.image.Image.writeFits`. 

489 

490 Parameters 

491 ---------- 

492 uri 

493 URI or file name. 

494 preserve_quantization 

495 If `True`, ensure that writing the image back out again will 

496 exactly preserve quantization-compressed pixel values. This causes 

497 the arrays to be marked as read-only and stores the original binary 

498 table data for those planes in memory. If the `Image` is copied, 

499 the precompressed pixel values are not transferred to the copy. 

500 ext 

501 Name or index of the FITS HDU to read. 

502 fits_wcs_frame 

503 If not `None` and the HDU containing the image has a FITS WCS, 

504 attach a `Projection` to the returned image by converting that 

505 WCS. 

506 """ 

507 opaque_metadata = fits.FitsOpaqueMetadata() 

508 with ExitStack() as exit_stack: 

509 fs, fspath = ResourcePath(uri).to_fsspec() 

510 stream = exit_stack.enter_context(fs.open(fspath)) 

511 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream)) 

512 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

513 bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

514 if preserve_quantization: 

515 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

516 bintable_hdu_list = exit_stack.enter_context( 

517 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

518 ) 

519 bintable_hdu = bintable_hdu_list[ext] 

520 result = Image._read_legacy_hdu( 

521 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame 

522 ) 

523 result._opaque_metadata = opaque_metadata 

524 return result 

525 

526 @staticmethod 

527 def _read_legacy_hdu( 

528 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU, 

529 opaque_metadata: fits.FitsOpaqueMetadata, 

530 *, 

531 preserve_bintable: astropy.io.fits.BinTableHDU | None, 

532 fits_wcs_frame: Frame | None = None, 

533 ) -> Image: 

534 unit: astropy.units.UnitBase | None = None 

535 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None: 

536 unit = astropy.units.Unit(fits_unit, format="fits") 

537 dx: int = hdu.header.pop("LTV1") 

538 dy: int = hdu.header.pop("LTV2") 

539 start = YX(y=-dy, x=-dx) 

540 read_only: bool = False 

541 if preserve_bintable is not None: 

542 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable) 

543 read_only = True 

544 projection: Projection | None = None 

545 if fits_wcs_frame is not None: 

546 try: 

547 fits_wcs = astropy.wcs.WCS(hdu.header) 

548 except KeyError: 

549 pass 

550 else: 

551 projection = Projection.from_fits_wcs( 

552 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

553 ) 

554 image = Image(hdu.data, start=start, unit=unit, projection=projection) 

555 if read_only: 

556 image._array.flags["WRITEABLE"] = False 

557 fits.strip_wcs_cards(hdu.header) 

558 hdu.header.strip() 

559 hdu.header.remove("EXTTYPE", ignore_missing=True) 

560 hdu.header.remove("INHERIT", ignore_missing=True) 

561 hdu.header.remove("UZSCALE", ignore_missing=True) 

562 opaque_metadata.add_header(hdu.header) 

563 return image 

564 

565 

566class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

567 """Pydantic model used to represent the serialized form of an `.Image`.""" 

568 

569 data: ArrayReferenceQuantityModel | ArrayReferenceModel | InlineArrayModel | InlineArrayQuantityModel = ( 

570 pydantic.Field(description="Reference to pixel data.") 

571 ) 

572 start: list[int] = pydantic.Field( 

573 description="Coordinate of the first pixels in the array, ordered (y, x)." 

574 ) 

575 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

576 default=None, 

577 exclude_if=is_none, 

578 description="Projection that maps the logical pixel grid onto the sky.", 

579 ) 

580 obs_info: ObservationInfo | None = pydantic.Field( 

581 default=None, 

582 exclude_if=is_none, 

583 description="Standardized description of image metadata", 

584 ) 

585 

586 @property 

587 def bbox(self) -> Box: 

588 """The bounding box of the image.""" 

589 match self.data: 

590 case ArrayReferenceQuantityModel() | InlineArrayQuantityModel(): 

591 shape = self.data.value.shape 

592 case ArrayReferenceModel() | InlineArrayModel(): 

593 shape = self.data.shape 

594 return Box.from_shape(shape, self.start)