Coverage for python/lsst/images/_image.py: 23%

226 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 08:10 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Image", "ImageSerializationModel") 

15 

16from collections.abc import Callable, Sequence 

17from contextlib import ExitStack 

18from types import EllipsisType 

19from typing import TYPE_CHECKING, Any, ClassVar, final 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import numpy as np 

25import numpy.typing as npt 

26import pydantic 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from . import fits 

31from ._generalized_image import GeneralizedImage 

32from ._geom import YX, Box 

33from ._transforms import Frame, Projection, ProjectionSerializationModel 

34from .serialization import ( 

35 ArchiveTree, 

36 ArrayReferenceModel, 

37 ArrayReferenceQuantityModel, 

38 InlineArrayModel, 

39 InlineArrayQuantityModel, 

40 InputArchive, 

41 InvalidParameterError, 

42 MetadataValue, 

43 OutputArchive, 

44 no_header_updates, 

45) 

46from .utils import is_none 

47 

48if TYPE_CHECKING: 

49 try: 

50 from lsst.afw.image import Image as LegacyImage 

51 except ImportError: 

52 type LegacyImage = Any # type: ignore[no-redef] 

53 

54 

55@final 

56class Image(GeneralizedImage): 

57 """A 2-d array that may be augmented with units and a nonzero origin. 

58 

59 Parameters 

60 ---------- 

61 array_or_fill 

62 Array or fill value for the image. If a fill value, ``bbox`` or 

63 ``shape`` must be provided. 

64 bbox 

65 Bounding box for the image. 

66 start 

67 Logical coordinates of the first pixel in the array, ordered ``y``, 

68 ``x`` (unless an `XY` instance is passed). Ignored if 

69 ``bbox`` is provided. Defaults to zeros. 

70 shape 

71 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

72 instance is passed). Only needed if ``array_or_fill`` is not an 

73 array and ``bbox`` is not provided. Like the bbox, this does not 

74 include the last dimension of the array. 

75 dtype 

76 Pixel data type override. 

77 unit 

78 Units for the image's pixel values. 

79 projection 

80 Projection that maps the pixel grid to the sky. 

81 metadata 

82 Arbitrary flexible metadata to associate with the image. 

83 

84 Notes 

85 ----- 

86 Indexing the `array` attribute of an `Image` does not take into account its 

87 ``start`` offset, but accessing a subimage by indexing an `Image` with a 

88 `Box` does, and the `bbox` of the subimage is set to match its location 

89 within the original image. 

90 

91 Indexed assignment to a subimage requires consistency between the 

92 coordinate systems and units of both operands, but it will automatically 

93 select a subimage of the right-hand side and convert compatible units when 

94 possible. In other words:: 

95 

96 a[box] = b 

97 

98 is a shortcut for 

99 

100 a[box].quantity = b[box].quantity 

101 

102 An ellipsis (``...``) can be used instead of a `Box` to assign to the full 

103 image. 

104 """ 

105 

106 def __init__( 

107 self, 

108 array_or_fill: np.ndarray | int | float = 0, 

109 /, 

110 *, 

111 bbox: Box | None = None, 

112 start: Sequence[int] | None = None, 

113 shape: Sequence[int] | None = None, 

114 dtype: npt.DTypeLike | None = None, 

115 unit: astropy.units.UnitBase | None = None, 

116 projection: Projection[Any] | None = None, 

117 metadata: dict[str, MetadataValue] | None = None, 

118 ): 

119 super().__init__(metadata) 

120 if isinstance(array_or_fill, np.ndarray): 

121 if dtype is not None: 

122 array = np.array(array_or_fill, dtype=dtype, copy=None) 

123 else: 

124 array = array_or_fill 

125 if bbox is None: 

126 bbox = Box.from_shape(array.shape, start=start) 

127 elif bbox.shape != array.shape: 

128 raise ValueError( 

129 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}." 

130 ) 

131 if shape is not None and shape != array.shape: 

132 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.") 

133 else: 

134 if bbox is None: 

135 if shape is None: 

136 raise TypeError("No bbox, shape, or array provided.") 

137 bbox = Box.from_shape(shape, start=start) 

138 elif shape is not None and shape != bbox.shape: 

139 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.") 

140 array = np.full(bbox.shape, array_or_fill, dtype=dtype) 

141 self._array: np.ndarray = array 

142 self._bbox: Box = bbox 

143 self._unit = unit 

144 self._projection = projection 

145 

146 @property 

147 def array(self) -> np.ndarray: 

148 """The low-level array (`numpy.ndarray`). 

149 

150 Assigning to this attribute modifies the existing array in place; the 

151 bounding box and underlying data pointer are never changed. 

152 """ 

153 return self._array 

154 

155 @array.setter 

156 def array(self, value: np.ndarray | int | float) -> None: 

157 self._array[...] = value 

158 

159 @property 

160 def quantity(self) -> astropy.units.Quantity: 

161 """The low-level array with units (`astropy.units.Quantity`). 

162 

163 Assigning to this attribute modifies the existing array in place; the 

164 bounding box and underlying data pointer are never changed. 

165 """ 

166 return astropy.units.Quantity(self._array, self._unit, copy=False) 

167 

168 @quantity.setter 

169 def quantity(self, value: astropy.units.Quantity) -> None: 

170 self.quantity[...] = value 

171 

172 @property 

173 def bbox(self) -> Box: 

174 """Bounding box for the image (`Box`).""" 

175 return self._bbox 

176 

177 @property 

178 def unit(self) -> astropy.units.UnitBase | None: 

179 """Units for the image's pixel values (`astropy.units.Unit` or 

180 `None`). 

181 """ 

182 return self._unit 

183 

184 @property 

185 def projection(self) -> Projection[Any] | None: 

186 """The projection that maps this image's pixel grid to the sky 

187 (`Projection` | `None`). 

188 

189 Notes 

190 ----- 

191 The pixel coordinates used by this projection account for the bounding 

192 box ``start``; they are not just array indices. 

193 """ 

194 return self._projection 

195 

196 def __getitem__(self, bbox: Box | EllipsisType) -> Image: 

197 if bbox is ...: 

198 return self 

199 super().__getitem__(bbox) 

200 indices = bbox.slice_within(self._bbox) 

201 return self._transfer_metadata( 

202 Image(self._array[indices], bbox=bbox, unit=self._unit, projection=self._projection), 

203 bbox=bbox, 

204 ) 

205 

206 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None: 

207 self[bbox].quantity[...] = value.quantity 

208 

209 def __str__(self) -> str: 

210 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})" 

211 

212 def __repr__(self) -> str: 

213 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})" 

214 

215 def __eq__(self, other: object) -> bool: 

216 if not isinstance(other, Image): 

217 return NotImplemented 

218 return ( 

219 self._bbox == other._bbox 

220 and self._unit == other._unit 

221 and np.array_equal(self._array, other._array, equal_nan=True) 

222 ) 

223 

224 def copy(self) -> Image: 

225 return self._transfer_metadata( 

226 Image(self._array.copy(), bbox=self._bbox, unit=self._unit, projection=self._projection), 

227 copy=True, 

228 ) 

229 

230 def view( 

231 self, 

232 *, 

233 unit: astropy.units.UnitBase | None | EllipsisType = ..., 

234 projection: Projection | None | EllipsisType = ..., 

235 start: Sequence[int] | EllipsisType = ..., 

236 ) -> Image: 

237 """Make a view of the image, with optional updates.""" 

238 if unit is ...: 

239 unit = self._unit 

240 if projection is ...: 

241 projection = self._projection 

242 if start is ...: 

243 start = self._bbox.start 

244 return self._transfer_metadata(Image(self._array, start=start, unit=unit, projection=projection)) 

245 

246 def serialize[P: pydantic.BaseModel]( 

247 self, 

248 archive: OutputArchive[P], 

249 *, 

250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

251 save_projection: bool = True, 

252 add_offset_wcs: str | None = "A", 

253 ) -> ImageSerializationModel[P]: 

254 """Serialize the image to an output archive. 

255 

256 Parameters 

257 ---------- 

258 archive 

259 Archive to write to. 

260 update_header 

261 A callback that will be given the FITS header for the HDU 

262 containing this image in order to add keys to it. This callback 

263 may be provided but will not be called if the output format is not 

264 FITS. 

265 save_projection 

266 If `True`, save the `Projection` attached to the image, if there 

267 is one. This does not affect whether a FITS WCS corresponding to 

268 the projection is written (it always is, if available, and if 

269 ``add_offset_wcs`` is not ``" "``). 

270 add_offset_wcs 

271 A FITS WCS single-character suffix to use when adding a linear 

272 WCS that maps the FITS array to the logical pixel coordinates 

273 defined by ``bbox.start``. Set to `None` to not write this WCS. 

274 If this is set to ``" "``, it will prevent the `Projection` from 

275 being saved as a FITS WCS. 

276 """ 

277 

278 def _update_header(header: astropy.io.fits.Header) -> None: 

279 update_header(header) 

280 if self.unit is not None: 

281 try: 

282 header["BUNIT"] = self.unit.to_string(format="fits") 

283 except ValueError: 

284 # Units not supported by FITS; write it anyway because 

285 # the accepted units are just a recommendation in the 

286 # standard. 

287 header["BUNIT"] = self.unit.to_string() 

288 if self.projection is not None and add_offset_wcs != " ": 

289 if self.fits_wcs: 

290 header.update(self.fits_wcs.to_header(relax=True)) 

291 if add_offset_wcs is not None: 

292 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

293 

294 array_model = archive.add_array(self.array, update_header=_update_header) 

295 serialized_projection: ProjectionSerializationModel[P] | None = None 

296 if save_projection and self.projection is not None: 

297 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

298 data = array_model if self.unit is None else array_model.with_units(self.unit) 

299 return ImageSerializationModel.model_construct( 

300 data=data, 

301 start=list(self.bbox.start), 

302 projection=serialized_projection, 

303 metadata=self.metadata, 

304 ) 

305 

306 @staticmethod 

307 def _get_archive_tree_type[P: pydantic.BaseModel]( 

308 pointer_type: type[P], 

309 ) -> type[ImageSerializationModel[P]]: 

310 """Return the serialization model type for this object for an archive 

311 type that uses the given pointer type. 

312 """ 

313 return ImageSerializationModel[pointer_type] # type: ignore 

314 

315 _archive_default_name: ClassVar[str] = "image" 

316 """The name this object should be serialized with when written as the 

317 top-level object. 

318 """ 

319 

320 def write_fits( 

321 self, 

322 filename: str, 

323 *, 

324 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

325 compression_seed: int | None = None, 

326 ) -> None: 

327 """Write the image to a FITS file. 

328 

329 Parameters 

330 ---------- 

331 filename 

332 Name of the file to write to. Must be a local file. 

333 compression 

334 Compression options. 

335 compression_seed 

336 A FITS tile compression seed to use whenever the configured 

337 compression seed is `None` or (for backwards compatibility) ``0``. 

338 """ 

339 compression_options = {} 

340 if compression is not fits.FitsCompressionOptions.DEFAULT: 

341 compression_options[self._archive_default_name] = compression 

342 fits.write(self, filename, compression_options, compression_seed=compression_seed) 

343 

344 @staticmethod 

345 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image: 

346 """Read an image from a FITS file. 

347 

348 Parameters 

349 ---------- 

350 url 

351 URL of the file to read; may be any type supported by 

352 `lsst.resources.ResourcePath`. 

353 bbox 

354 Bounding box of a subimage to read instead. 

355 """ 

356 return fits.read(Image, url, bbox=bbox).deserialized 

357 

358 @staticmethod 

359 def from_legacy(legacy: LegacyImage, unit: astropy.units.UnitBase | None = None) -> Image: 

360 """Convert from an `lsst.afw.image.Image` instance. 

361 

362 Parameters 

363 ---------- 

364 legacy 

365 An `lsst.afw.image.Image` instance that will share pixel data with 

366 the returned object. 

367 unit 

368 Units of the image. 

369 """ 

370 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit) 

371 

372 def to_legacy(self, *, copy: bool | None = None) -> LegacyImage: 

373 """Convert to an `lsst.afw.image.Image` instance. 

374 

375 Parameters 

376 ---------- 

377 copy 

378 If `True`, always copy the pixel data. If `False`, return a view, 

379 and raise `TypeError` if the pixel data is read-only (this is not 

380 supported by afw). If `None`, onyl if the pixel data is 

381 read-only. 

382 """ 

383 import lsst.afw.image 

384 import lsst.geom 

385 

386 array = self._array 

387 if copy: 

388 array = array.copy() 

389 elif not self._array.flags.writeable: 

390 if copy is None: 

391 array = array.copy() 

392 else: 

393 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.") 

394 

395 return lsst.afw.image.Image( 

396 array, 

397 deep=False, 

398 dtype=array.dtype.type, 

399 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min), 

400 ) 

401 

402 @staticmethod 

403 def read_legacy( 

404 uri: ResourcePathExpression, 

405 *, 

406 preserve_quantization: bool = False, 

407 ext: str | int = 1, 

408 fits_wcs_frame: Frame | None = None, 

409 ) -> Image: 

410 """Read a FITS file written by `lsst.afw.image.Image.writeFits`. 

411 

412 Parameters 

413 ---------- 

414 uri 

415 URI or file name. 

416 preserve_quantization 

417 If `True`, ensure that writing the image back out again will 

418 exactly preserve quantization-compressed pixel values. This causes 

419 the arrays to be marked as read-only and stores the original binary 

420 table data for those planes in memory. If the `Image` is copied, 

421 the precompressed pixel values are not transferred to the copy. 

422 ext 

423 Name or index of the FITS HDU to read. 

424 fits_wcs_frame 

425 If not `None` and the HDU containing the image has a FITS WCS, 

426 attach a `Projection` to the returned image by converting that 

427 WCS. 

428 """ 

429 opaque_metadata = fits.FitsOpaqueMetadata() 

430 with ExitStack() as exit_stack: 

431 fs, fspath = ResourcePath(uri).to_fsspec() 

432 stream = exit_stack.enter_context(fs.open(fspath)) 

433 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream)) 

434 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

435 bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

436 if preserve_quantization: 

437 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

438 bintable_hdu_list = exit_stack.enter_context( 

439 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

440 ) 

441 bintable_hdu = bintable_hdu_list[ext] 

442 result = Image._read_legacy_hdu( 

443 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame 

444 ) 

445 result._opaque_metadata = opaque_metadata 

446 return result 

447 

448 @staticmethod 

449 def _read_legacy_hdu( 

450 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU, 

451 opaque_metadata: fits.FitsOpaqueMetadata, 

452 *, 

453 preserve_bintable: astropy.io.fits.BinTableHDU | None, 

454 fits_wcs_frame: Frame | None = None, 

455 ) -> Image: 

456 unit: astropy.units.UnitBase | None = None 

457 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None: 

458 try: 

459 unit = astropy.units.Unit(fits_unit, format="fits") 

460 except ValueError: 

461 # Accept non-FITS units by assuming Astropy can still figure 

462 # them out if we don't specify the format. 

463 unit = astropy.units.Unit(fits_unit) 

464 if opaque_metadata.get_instrumental_unit() == astropy.units.electron: 

465 # Fix incorrect BUNIT='adu' in LSST preliminary_visit_image. 

466 if unit == astropy.units.adu: 

467 unit = astropy.units.electron 

468 if unit == astropy.units.adu**2: 

469 unit = astropy.units.electron**2 

470 dx: int = hdu.header.pop("LTV1") 

471 dy: int = hdu.header.pop("LTV2") 

472 start = YX(y=-dy, x=-dx) 

473 read_only: bool = False 

474 if preserve_bintable is not None: 

475 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable) 

476 read_only = True 

477 projection: Projection | None = None 

478 if fits_wcs_frame is not None: 

479 try: 

480 fits_wcs = astropy.wcs.WCS(hdu.header) 

481 except KeyError: 

482 pass 

483 else: 

484 projection = Projection.from_fits_wcs( 

485 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

486 ) 

487 image = Image(hdu.data, start=start, unit=unit, projection=projection) 

488 if read_only: 

489 image._array.flags["WRITEABLE"] = False 

490 fits.strip_wcs_cards(hdu.header) 

491 hdu.header.strip() 

492 hdu.header.remove("EXTTYPE", ignore_missing=True) 

493 hdu.header.remove("INHERIT", ignore_missing=True) 

494 hdu.header.remove("UZSCALE", ignore_missing=True) 

495 opaque_metadata.add_header(hdu.header) 

496 return image 

497 

498 

499class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

500 """Pydantic model used to represent the serialized form of an `.Image`.""" 

501 

502 SCHEMA_NAME: ClassVar[str] = "image" 

503 SCHEMA_VERSION: ClassVar[str] = "1.0.0" 

504 MIN_READ_VERSION: ClassVar[int] = 1 

505 

506 data: ArrayReferenceQuantityModel | ArrayReferenceModel | InlineArrayModel | InlineArrayQuantityModel = ( 

507 pydantic.Field(description="Reference to pixel data.") 

508 ) 

509 start: list[int] = pydantic.Field( 

510 description="Coordinate of the first pixels in the array, ordered (y, x)." 

511 ) 

512 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

513 default=None, 

514 exclude_if=is_none, 

515 description="Projection that maps the logical pixel grid onto the sky.", 

516 ) 

517 

518 @property 

519 def bbox(self) -> Box: 

520 """The bounding box of the image.""" 

521 match self.data: 

522 case ArrayReferenceQuantityModel() | InlineArrayQuantityModel(): 

523 shape = self.data.value.shape 

524 case ArrayReferenceModel() | InlineArrayModel(): 

525 shape = self.data.shape 

526 return Box.from_shape(shape, self.start) 

527 

528 def deserialize( 

529 self, 

530 archive: InputArchive[Any], 

531 *, 

532 bbox: Box | None = None, 

533 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

534 **kwargs: Any, 

535 ) -> Image: 

536 """Deserialize an image from an input archive. 

537 

538 Parameters 

539 ---------- 

540 archive 

541 Archive to read from. 

542 bbox 

543 Bounding box of a subimage to read instead. 

544 strip_header 

545 A callable that strips out any FITS header cards added by the 

546 ``update_header`` argument in the corresponding call to 

547 `Image.serialize`. 

548 **kwargs 

549 Unsupported keyword arguments are accepted only to provide better 

550 error messages (raising `serialization.InvalidParameterError`). 

551 """ 

552 if kwargs: 

553 raise InvalidParameterError(f"Unrecognized parameters for Image: {set(kwargs.keys())}.") 

554 array_model: ArrayReferenceModel | InlineArrayModel 

555 unit: astropy.units.UnitBase | None = None 

556 if isinstance(self.data, ArrayReferenceQuantityModel | InlineArrayQuantityModel): 

557 array_model = self.data.value 

558 unit = self.data.unit 

559 else: 

560 array_model = self.data 

561 

562 def _strip_header(header: astropy.io.fits.Header) -> None: 

563 if unit is not None: 

564 header.pop("BUNIT", None) 

565 fits.strip_wcs_cards(header) 

566 strip_header(header) 

567 

568 slices = bbox.slice_within(self.bbox) if bbox is not None else ... 

569 array = archive.get_array(array_model, strip_header=_strip_header, slices=slices) 

570 projection = self.projection.deserialize(archive) if self.projection is not None else None 

571 return Image( 

572 array, 

573 start=self.start if bbox is None else bbox.start, 

574 unit=unit, 

575 projection=projection, 

576 )._finish_deserialize(self) 

577 

578 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

579 if kwargs: 

580 raise InvalidParameterError(f"Unsupported parameters for Image components: {set(kwargs.keys())}.") 

581 return super().deserialize_component(component, archive)