Coverage for python/lsst/images/_image.py: 22%

223 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 08:40 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Image", "ImageSerializationModel") 

15 

16from collections.abc import Callable, Sequence 

17from contextlib import ExitStack 

18from types import EllipsisType 

19from typing import TYPE_CHECKING, Any, ClassVar, final 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import numpy as np 

25import numpy.typing as npt 

26import pydantic 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from . import fits 

31from ._generalized_image import GeneralizedImage 

32from ._geom import YX, Box 

33from ._transforms import Frame, Projection, ProjectionSerializationModel 

34from .serialization import ( 

35 ArchiveTree, 

36 ArrayReferenceModel, 

37 ArrayReferenceQuantityModel, 

38 InlineArrayModel, 

39 InlineArrayQuantityModel, 

40 InputArchive, 

41 InvalidParameterError, 

42 MetadataValue, 

43 OutputArchive, 

44 no_header_updates, 

45) 

46from .utils import is_none 

47 

48if TYPE_CHECKING: 

49 try: 

50 from lsst.afw.image import Image as LegacyImage 

51 except ImportError: 

52 type LegacyImage = Any # type: ignore[no-redef] 

53 

54 

55@final 

56class Image(GeneralizedImage): 

57 """A 2-d array that may be augmented with units and a nonzero origin. 

58 

59 Parameters 

60 ---------- 

61 array_or_fill 

62 Array or fill value for the image. If a fill value, ``bbox`` or 

63 ``shape`` must be provided. 

64 bbox 

65 Bounding box for the image. 

66 start 

67 Logical coordinates of the first pixel in the array, ordered ``y``, 

68 ``x`` (unless an `XY` instance is passed). Ignored if 

69 ``bbox`` is provided. Defaults to zeros. 

70 shape 

71 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

72 instance is passed). Only needed if ``array_or_fill`` is not an 

73 array and ``bbox`` is not provided. Like the bbox, this does not 

74 include the last dimension of the array. 

75 dtype 

76 Pixel data type override. 

77 unit 

78 Units for the image's pixel values. 

79 projection 

80 Projection that maps the pixel grid to the sky. 

81 metadata 

82 Arbitrary flexible metadata to associate with the image. 

83 

84 Notes 

85 ----- 

86 Indexing the `array` attribute of an `Image` does not take into account its 

87 ``start`` offset, but accessing a subimage by indexing an `Image` with a 

88 `Box` does, and the `bbox` of the subimage is set to match its location 

89 within the original image. 

90 

91 Indexed assignment to a subimage requires consistency between the 

92 coordinate systems and units of both operands, but it will automatically 

93 select a subimage of the right-hand side and convert compatible units when 

94 possible. In other words:: 

95 

96 a[box] = b 

97 

98 is a shortcut for 

99 

100 a[box].quantity = b[box].quantity 

101 

102 An ellipsis (``...``) can be used instead of a `Box` to assign to the full 

103 image. 

104 """ 

105 

106 def __init__( 

107 self, 

108 array_or_fill: np.ndarray | int | float = 0, 

109 /, 

110 *, 

111 bbox: Box | None = None, 

112 start: Sequence[int] | None = None, 

113 shape: Sequence[int] | None = None, 

114 dtype: npt.DTypeLike | None = None, 

115 unit: astropy.units.UnitBase | None = None, 

116 projection: Projection[Any] | None = None, 

117 metadata: dict[str, MetadataValue] | None = None, 

118 ): 

119 super().__init__(metadata) 

120 if isinstance(array_or_fill, np.ndarray): 

121 if dtype is not None: 

122 array = np.array(array_or_fill, dtype=dtype, copy=None) 

123 else: 

124 array = array_or_fill 

125 if bbox is None: 

126 bbox = Box.from_shape(array.shape, start=start) 

127 elif bbox.shape != array.shape: 

128 raise ValueError( 

129 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}." 

130 ) 

131 if shape is not None and shape != array.shape: 

132 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.") 

133 else: 

134 if bbox is None: 

135 if shape is None: 

136 raise TypeError("No bbox, shape, or array provided.") 

137 bbox = Box.from_shape(shape, start=start) 

138 elif shape is not None and shape != bbox.shape: 

139 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.") 

140 array = np.full(bbox.shape, array_or_fill, dtype=dtype) 

141 self._array: np.ndarray = array 

142 self._bbox: Box = bbox 

143 self._unit = unit 

144 self._projection = projection 

145 

146 @property 

147 def array(self) -> np.ndarray: 

148 """The low-level array (`numpy.ndarray`). 

149 

150 Assigning to this attribute modifies the existing array in place; the 

151 bounding box and underlying data pointer are never changed. 

152 """ 

153 return self._array 

154 

155 @array.setter 

156 def array(self, value: np.ndarray | int | float) -> None: 

157 self._array[...] = value 

158 

159 @property 

160 def quantity(self) -> astropy.units.Quantity: 

161 """The low-level array with units (`astropy.units.Quantity`). 

162 

163 Assigning to this attribute modifies the existing array in place; the 

164 bounding box and underlying data pointer are never changed. 

165 """ 

166 return astropy.units.Quantity(self._array, self._unit, copy=False) 

167 

168 @quantity.setter 

169 def quantity(self, value: astropy.units.Quantity) -> None: 

170 self.quantity[...] = value 

171 

172 @property 

173 def bbox(self) -> Box: 

174 """Bounding box for the image (`Box`).""" 

175 return self._bbox 

176 

177 @property 

178 def unit(self) -> astropy.units.UnitBase | None: 

179 """Units for the image's pixel values (`astropy.units.Unit` or 

180 `None`). 

181 """ 

182 return self._unit 

183 

184 @property 

185 def projection(self) -> Projection[Any] | None: 

186 """The projection that maps this image's pixel grid to the sky 

187 (`Projection` | `None`). 

188 

189 Notes 

190 ----- 

191 The pixel coordinates used by this projection account for the bounding 

192 box ``start``; they are not just array indices. 

193 """ 

194 return self._projection 

195 

196 def __getitem__(self, bbox: Box | EllipsisType) -> Image: 

197 if bbox is ...: 

198 return self 

199 super().__getitem__(bbox) 

200 indices = bbox.slice_within(self._bbox) 

201 return self._transfer_metadata( 

202 Image(self._array[indices], bbox=bbox, unit=self._unit, projection=self._projection), 

203 bbox=bbox, 

204 ) 

205 

206 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None: 

207 self[bbox].quantity[...] = value.quantity 

208 

209 def __str__(self) -> str: 

210 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})" 

211 

212 def __repr__(self) -> str: 

213 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})" 

214 

215 def __eq__(self, other: object) -> bool: 

216 if not isinstance(other, Image): 

217 return NotImplemented 

218 return ( 

219 self._bbox == other._bbox 

220 and self._unit == other._unit 

221 and np.array_equal(self._array, other._array, equal_nan=True) 

222 ) 

223 

224 def copy(self) -> Image: 

225 return self._transfer_metadata( 

226 Image(self._array.copy(), bbox=self._bbox, unit=self._unit, projection=self._projection), 

227 copy=True, 

228 ) 

229 

230 def view( 

231 self, 

232 *, 

233 unit: astropy.units.UnitBase | None | EllipsisType = ..., 

234 projection: Projection | None | EllipsisType = ..., 

235 start: Sequence[int] | EllipsisType = ..., 

236 ) -> Image: 

237 """Make a view of the image, with optional updates.""" 

238 if unit is ...: 

239 unit = self._unit 

240 if projection is ...: 

241 projection = self._projection 

242 if start is ...: 

243 start = self._bbox.start 

244 return self._transfer_metadata(Image(self._array, start=start, unit=unit, projection=projection)) 

245 

246 def serialize[P: pydantic.BaseModel]( 

247 self, 

248 archive: OutputArchive[P], 

249 *, 

250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

251 save_projection: bool = True, 

252 add_offset_wcs: str | None = "A", 

253 ) -> ImageSerializationModel[P]: 

254 """Serialize the image to an output archive. 

255 

256 Parameters 

257 ---------- 

258 archive 

259 Archive to write to. 

260 update_header 

261 A callback that will be given the FITS header for the HDU 

262 containing this image in order to add keys to it. This callback 

263 may be provided but will not be called if the output format is not 

264 FITS. 

265 save_projection 

266 If `True`, save the `Projection` attached to the image, if there 

267 is one. This does not affect whether a FITS WCS corresponding to 

268 the projection is written (it always is, if available, and if 

269 ``add_offset_wcs`` is not ``" "``). 

270 add_offset_wcs 

271 A FITS WCS single-character suffix to use when adding a linear 

272 WCS that maps the FITS array to the logical pixel coordinates 

273 defined by ``bbox.start``. Set to `None` to not write this WCS. 

274 If this is set to ``" "``, it will prevent the `Projection` from 

275 being saved as a FITS WCS. 

276 """ 

277 

278 def _update_header(header: astropy.io.fits.Header) -> None: 

279 update_header(header) 

280 if self.unit is not None: 

281 try: 

282 header["BUNIT"] = self.unit.to_string(format="fits") 

283 except ValueError: 

284 # Units not supported by FITS; write it anyway because 

285 # the accepted units are just a recommendation in the 

286 # standard. 

287 header["BUNIT"] = self.unit.to_string() 

288 if self.projection is not None and add_offset_wcs != " ": 

289 if self.fits_wcs: 

290 header.update(self.fits_wcs.to_header(relax=True)) 

291 if add_offset_wcs is not None: 

292 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

293 

294 array_model = archive.add_array(self.array, update_header=_update_header) 

295 serialized_projection: ProjectionSerializationModel[P] | None = None 

296 if save_projection and self.projection is not None: 

297 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

298 data = array_model if self.unit is None else array_model.with_units(self.unit) 

299 return ImageSerializationModel.model_construct( 

300 data=data, 

301 start=list(self.bbox.start), 

302 projection=serialized_projection, 

303 metadata=self.metadata, 

304 ) 

305 

306 @staticmethod 

307 def _get_archive_tree_type[P: pydantic.BaseModel]( 

308 pointer_type: type[P], 

309 ) -> type[ImageSerializationModel[P]]: 

310 """Return the serialization model type for this object for an archive 

311 type that uses the given pointer type. 

312 """ 

313 return ImageSerializationModel[pointer_type] # type: ignore 

314 

315 _archive_default_name: ClassVar[str] = "image" 

316 """The name this object should be serialized with when written as the 

317 top-level object. 

318 """ 

319 

320 def write_fits( 

321 self, 

322 filename: str, 

323 *, 

324 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

325 compression_seed: int | None = None, 

326 ) -> None: 

327 """Write the image to a FITS file. 

328 

329 Parameters 

330 ---------- 

331 filename 

332 Name of the file to write to. Must be a local file. 

333 compression 

334 Compression options. 

335 compression_seed 

336 A FITS tile compression seed to use whenever the configured 

337 compression seed is `None` or (for backwards compatibility) ``0``. 

338 """ 

339 compression_options = {} 

340 if compression is not fits.FitsCompressionOptions.DEFAULT: 

341 compression_options[self._archive_default_name] = compression 

342 fits.write(self, filename, compression_options, compression_seed=compression_seed) 

343 

344 @staticmethod 

345 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image: 

346 """Read an image from a FITS file. 

347 

348 Parameters 

349 ---------- 

350 url 

351 URL of the file to read; may be any type supported by 

352 `lsst.resources.ResourcePath`. 

353 bbox 

354 Bounding box of a subimage to read instead. 

355 """ 

356 return fits.read(Image, url, bbox=bbox).deserialized 

357 

358 @staticmethod 

359 def from_legacy(legacy: LegacyImage, unit: astropy.units.UnitBase | None = None) -> Image: 

360 """Convert from an `lsst.afw.image.Image` instance. 

361 

362 Parameters 

363 ---------- 

364 legacy 

365 An `lsst.afw.image.Image` instance that will share pixel data with 

366 the returned object. 

367 unit 

368 Units of the image. 

369 """ 

370 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit) 

371 

372 def to_legacy(self, *, copy: bool | None = None) -> LegacyImage: 

373 """Convert to an `lsst.afw.image.Image` instance. 

374 

375 Parameters 

376 ---------- 

377 copy 

378 If `True`, always copy the pixel data. If `False`, return a view, 

379 and raise `TypeError` if the pixel data is read-only (this is not 

380 supported by afw). If `None`, onyl if the pixel data is 

381 read-only. 

382 """ 

383 import lsst.afw.image 

384 import lsst.geom 

385 

386 array = self._array 

387 if copy: 

388 array = array.copy() 

389 elif not self._array.flags.writeable: 

390 if copy is None: 

391 array = array.copy() 

392 else: 

393 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.") 

394 

395 return lsst.afw.image.Image( 

396 array, 

397 deep=False, 

398 dtype=array.dtype.type, 

399 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min), 

400 ) 

401 

402 @staticmethod 

403 def read_legacy( 

404 uri: ResourcePathExpression, 

405 *, 

406 preserve_quantization: bool = False, 

407 ext: str | int = 1, 

408 fits_wcs_frame: Frame | None = None, 

409 ) -> Image: 

410 """Read a FITS file written by `lsst.afw.image.Image.writeFits`. 

411 

412 Parameters 

413 ---------- 

414 uri 

415 URI or file name. 

416 preserve_quantization 

417 If `True`, ensure that writing the image back out again will 

418 exactly preserve quantization-compressed pixel values. This causes 

419 the arrays to be marked as read-only and stores the original binary 

420 table data for those planes in memory. If the `Image` is copied, 

421 the precompressed pixel values are not transferred to the copy. 

422 ext 

423 Name or index of the FITS HDU to read. 

424 fits_wcs_frame 

425 If not `None` and the HDU containing the image has a FITS WCS, 

426 attach a `Projection` to the returned image by converting that 

427 WCS. 

428 """ 

429 opaque_metadata = fits.FitsOpaqueMetadata() 

430 with ExitStack() as exit_stack: 

431 fs, fspath = ResourcePath(uri).to_fsspec() 

432 stream = exit_stack.enter_context(fs.open(fspath)) 

433 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream)) 

434 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

435 bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

436 if preserve_quantization: 

437 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

438 bintable_hdu_list = exit_stack.enter_context( 

439 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

440 ) 

441 bintable_hdu = bintable_hdu_list[ext] 

442 result = Image._read_legacy_hdu( 

443 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame 

444 ) 

445 result._opaque_metadata = opaque_metadata 

446 return result 

447 

448 @staticmethod 

449 def _read_legacy_hdu( 

450 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU, 

451 opaque_metadata: fits.FitsOpaqueMetadata, 

452 *, 

453 preserve_bintable: astropy.io.fits.BinTableHDU | None, 

454 fits_wcs_frame: Frame | None = None, 

455 ) -> Image: 

456 unit: astropy.units.UnitBase | None = None 

457 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None: 

458 try: 

459 unit = astropy.units.Unit(fits_unit, format="fits") 

460 except ValueError: 

461 # Accept non-FITS units by assuming Astropy can still figure 

462 # them out if we don't specify the format. 

463 unit = astropy.units.Unit(fits_unit) 

464 if opaque_metadata.get_instrumental_unit() == astropy.units.electron: 

465 # Fix incorrect BUNIT='adu' in LSST preliminary_visit_image. 

466 if unit == astropy.units.adu: 

467 unit = astropy.units.electron 

468 if unit == astropy.units.adu**2: 

469 unit = astropy.units.electron**2 

470 dx: int = hdu.header.pop("LTV1") 

471 dy: int = hdu.header.pop("LTV2") 

472 start = YX(y=-dy, x=-dx) 

473 read_only: bool = False 

474 if preserve_bintable is not None: 

475 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable) 

476 read_only = True 

477 projection: Projection | None = None 

478 if fits_wcs_frame is not None: 

479 try: 

480 fits_wcs = astropy.wcs.WCS(hdu.header) 

481 except KeyError: 

482 pass 

483 else: 

484 projection = Projection.from_fits_wcs( 

485 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

486 ) 

487 image = Image(hdu.data, start=start, unit=unit, projection=projection) 

488 if read_only: 

489 image._array.flags["WRITEABLE"] = False 

490 fits.strip_wcs_cards(hdu.header) 

491 hdu.header.strip() 

492 hdu.header.remove("EXTTYPE", ignore_missing=True) 

493 hdu.header.remove("INHERIT", ignore_missing=True) 

494 hdu.header.remove("UZSCALE", ignore_missing=True) 

495 opaque_metadata.add_header(hdu.header) 

496 return image 

497 

498 

499class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

500 """Pydantic model used to represent the serialized form of an `.Image`.""" 

501 

502 data: ArrayReferenceQuantityModel | ArrayReferenceModel | InlineArrayModel | InlineArrayQuantityModel = ( 

503 pydantic.Field(description="Reference to pixel data.") 

504 ) 

505 start: list[int] = pydantic.Field( 

506 description="Coordinate of the first pixels in the array, ordered (y, x)." 

507 ) 

508 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

509 default=None, 

510 exclude_if=is_none, 

511 description="Projection that maps the logical pixel grid onto the sky.", 

512 ) 

513 

514 @property 

515 def bbox(self) -> Box: 

516 """The bounding box of the image.""" 

517 match self.data: 

518 case ArrayReferenceQuantityModel() | InlineArrayQuantityModel(): 

519 shape = self.data.value.shape 

520 case ArrayReferenceModel() | InlineArrayModel(): 

521 shape = self.data.shape 

522 return Box.from_shape(shape, self.start) 

523 

524 def deserialize( 

525 self, 

526 archive: InputArchive[Any], 

527 *, 

528 bbox: Box | None = None, 

529 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

530 **kwargs: Any, 

531 ) -> Image: 

532 """Deserialize an image from an input archive. 

533 

534 Parameters 

535 ---------- 

536 archive 

537 Archive to read from. 

538 bbox 

539 Bounding box of a subimage to read instead. 

540 strip_header 

541 A callable that strips out any FITS header cards added by the 

542 ``update_header`` argument in the corresponding call to 

543 `Image.serialize`. 

544 **kwargs 

545 Unsupported keyword arguments are accepted only to provide better 

546 error messages (raising `serialization.InvalidParameterError`). 

547 """ 

548 if kwargs: 

549 raise InvalidParameterError(f"Unrecognized parameters for Image: {set(kwargs.keys())}.") 

550 array_model: ArrayReferenceModel | InlineArrayModel 

551 unit: astropy.units.UnitBase | None = None 

552 if isinstance(self.data, ArrayReferenceQuantityModel | InlineArrayQuantityModel): 

553 array_model = self.data.value 

554 unit = self.data.unit 

555 else: 

556 array_model = self.data 

557 

558 def _strip_header(header: astropy.io.fits.Header) -> None: 

559 if unit is not None: 

560 header.pop("BUNIT", None) 

561 fits.strip_wcs_cards(header) 

562 strip_header(header) 

563 

564 slices = bbox.slice_within(self.bbox) if bbox is not None else ... 

565 array = archive.get_array(array_model, strip_header=_strip_header, slices=slices) 

566 projection = self.projection.deserialize(archive) if self.projection is not None else None 

567 return Image( 

568 array, 

569 start=self.start if bbox is None else bbox.start, 

570 unit=unit, 

571 projection=projection, 

572 )._finish_deserialize(self) 

573 

574 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

575 if kwargs: 

576 raise InvalidParameterError(f"Unsupported parameters for Image components: {set(kwargs.keys())}.") 

577 return super().deserialize_component(component, archive)