Coverage for python / lsst / images / _image.py: 23%

220 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:32 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Image", "ImageSerializationModel") 

15 

16from collections.abc import Callable, Sequence 

17from contextlib import ExitStack 

18from types import EllipsisType 

19from typing import Any, ClassVar, final 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import numpy as np 

25import numpy.typing as npt 

26import pydantic 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from . import fits 

31from ._generalized_image import GeneralizedImage 

32from ._geom import YX, Box 

33from ._transforms import Frame, Projection, ProjectionSerializationModel 

34from .serialization import ( 

35 ArchiveTree, 

36 ArrayReferenceModel, 

37 ArrayReferenceQuantityModel, 

38 InlineArrayModel, 

39 InlineArrayQuantityModel, 

40 InputArchive, 

41 InvalidParameterError, 

42 MetadataValue, 

43 OutputArchive, 

44 no_header_updates, 

45) 

46from .utils import is_none 

47 

48 

49@final 

50class Image(GeneralizedImage): 

51 """A 2-d array that may be augmented with units and a nonzero origin. 

52 

53 Parameters 

54 ---------- 

55 array_or_fill 

56 Array or fill value for the image. If a fill value, ``bbox`` or 

57 ``shape`` must be provided. 

58 bbox 

59 Bounding box for the image. 

60 start 

61 Logical coordinates of the first pixel in the array, ordered ``y``, 

62 ``x`` (unless an `XY` instance is passed). Ignored if 

63 ``bbox`` is provided. Defaults to zeros. 

64 shape 

65 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

66 instance is passed). Only needed if ``array_or_fill`` is not an 

67 array and ``bbox`` is not provided. Like the bbox, this does not 

68 include the last dimension of the array. 

69 dtype 

70 Pixel data type override. 

71 unit 

72 Units for the image's pixel values. 

73 projection 

74 Projection that maps the pixel grid to the sky. 

75 metadata 

76 Arbitrary flexible metadata to associate with the image. 

77 

78 Notes 

79 ----- 

80 Indexing the `array` attribute of an `Image` does not take into account its 

81 ``start`` offset, but accessing a subimage by indexing an `Image` with a 

82 `Box` does, and the `bbox` of the subimage is set to match its location 

83 within the original image. 

84 

85 Indexed assignment to a subimage requires consistency between the 

86 coordinate systems and units of both operands, but it will automatically 

87 select a subimage of the right-hand side and convert compatible units when 

88 possible. In other words:: 

89 

90 a[box] = b 

91 

92 is a shortcut for 

93 

94 a[box].quantity = b[box].quantity 

95 

96 An ellipsis (``...``) can be used instead of a `Box` to assign to the full 

97 image. 

98 """ 

99 

100 def __init__( 

101 self, 

102 array_or_fill: np.ndarray | int | float = 0, 

103 /, 

104 *, 

105 bbox: Box | None = None, 

106 start: Sequence[int] | None = None, 

107 shape: Sequence[int] | None = None, 

108 dtype: npt.DTypeLike | None = None, 

109 unit: astropy.units.UnitBase | None = None, 

110 projection: Projection[Any] | None = None, 

111 metadata: dict[str, MetadataValue] | None = None, 

112 ): 

113 super().__init__(metadata) 

114 if isinstance(array_or_fill, np.ndarray): 

115 if dtype is not None: 

116 array = np.array(array_or_fill, dtype=dtype, copy=None) 

117 else: 

118 array = array_or_fill 

119 if bbox is None: 

120 bbox = Box.from_shape(array.shape, start=start) 

121 elif bbox.shape != array.shape: 

122 raise ValueError( 

123 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}." 

124 ) 

125 if shape is not None and shape != array.shape: 

126 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.") 

127 else: 

128 if bbox is None: 

129 if shape is None: 

130 raise TypeError("No bbox, shape, or array provided.") 

131 bbox = Box.from_shape(shape, start=start) 

132 elif shape is not None and shape != bbox.shape: 

133 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.") 

134 array = np.full(bbox.shape, array_or_fill, dtype=dtype) 

135 self._array: np.ndarray = array 

136 self._bbox: Box = bbox 

137 self._unit = unit 

138 self._projection = projection 

139 

140 @property 

141 def array(self) -> np.ndarray: 

142 """The low-level array (`numpy.ndarray`). 

143 

144 Assigning to this attribute modifies the existing array in place; the 

145 bounding box and underlying data pointer are never changed. 

146 """ 

147 return self._array 

148 

149 @array.setter 

150 def array(self, value: np.ndarray | int | float) -> None: 

151 self._array[...] = value 

152 

153 @property 

154 def quantity(self) -> astropy.units.Quantity: 

155 """The low-level array with units (`astropy.units.Quantity`). 

156 

157 Assigning to this attribute modifies the existing array in place; the 

158 bounding box and underlying data pointer are never changed. 

159 """ 

160 return astropy.units.Quantity(self._array, self._unit, copy=False) 

161 

162 @quantity.setter 

163 def quantity(self, value: astropy.units.Quantity) -> None: 

164 self.quantity[...] = value 

165 

166 @property 

167 def bbox(self) -> Box: 

168 """Bounding box for the image (`Box`).""" 

169 return self._bbox 

170 

171 @property 

172 def unit(self) -> astropy.units.UnitBase | None: 

173 """Units for the image's pixel values (`astropy.units.Unit` or 

174 `None`). 

175 """ 

176 return self._unit 

177 

178 @property 

179 def projection(self) -> Projection[Any] | None: 

180 """The projection that maps this image's pixel grid to the sky 

181 (`Projection` | `None`). 

182 

183 Notes 

184 ----- 

185 The pixel coordinates used by this projection account for the bounding 

186 box ``start``; they are not just array indices. 

187 """ 

188 return self._projection 

189 

190 def __getitem__(self, bbox: Box | EllipsisType) -> Image: 

191 if bbox is ...: 

192 return self 

193 super().__getitem__(bbox) 

194 indices = bbox.slice_within(self._bbox) 

195 return self._transfer_metadata( 

196 Image(self._array[indices], bbox=bbox, unit=self._unit, projection=self._projection), 

197 bbox=bbox, 

198 ) 

199 

200 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None: 

201 self[bbox].quantity[...] = value.quantity 

202 

203 def __str__(self) -> str: 

204 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})" 

205 

206 def __repr__(self) -> str: 

207 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})" 

208 

209 def __eq__(self, other: object) -> bool: 

210 if not isinstance(other, Image): 

211 return NotImplemented 

212 return ( 

213 self._bbox == other._bbox 

214 and self._unit == other._unit 

215 and np.array_equal(self._array, other._array, equal_nan=True) 

216 ) 

217 

218 def copy(self) -> Image: 

219 return self._transfer_metadata( 

220 Image(self._array.copy(), bbox=self._bbox, unit=self._unit, projection=self._projection), 

221 copy=True, 

222 ) 

223 

224 def view( 

225 self, 

226 *, 

227 unit: astropy.units.UnitBase | None | EllipsisType = ..., 

228 projection: Projection | None | EllipsisType = ..., 

229 start: Sequence[int] | EllipsisType = ..., 

230 ) -> Image: 

231 """Make a view of the image, with optional updates.""" 

232 if unit is ...: 

233 unit = self._unit 

234 if projection is ...: 

235 projection = self._projection 

236 if start is ...: 

237 start = self._bbox.start 

238 return self._transfer_metadata(Image(self._array, start=start, unit=unit, projection=projection)) 

239 

240 def serialize[P: pydantic.BaseModel]( 

241 self, 

242 archive: OutputArchive[P], 

243 *, 

244 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

245 save_projection: bool = True, 

246 add_offset_wcs: str | None = "A", 

247 ) -> ImageSerializationModel[P]: 

248 """Serialize the image to an output archive. 

249 

250 Parameters 

251 ---------- 

252 archive 

253 Archive to write to. 

254 update_header 

255 A callback that will be given the FITS header for the HDU 

256 containing this image in order to add keys to it. This callback 

257 may be provided but will not be called if the output format is not 

258 FITS. 

259 save_projection 

260 If `True`, save the `Projection` attached to the image, if there 

261 is one. This does not affect whether a FITS WCS corresponding to 

262 the projection is written (it always is, if available, and if 

263 ``add_offset_wcs`` is not ``" "``). 

264 add_offset_wcs 

265 A FITS WCS single-character suffix to use when adding a linear 

266 WCS that maps the FITS array to the logical pixel coordinates 

267 defined by ``bbox.start``. Set to `None` to not write this WCS. 

268 If this is set to ``" "``, it will prevent the `Projection` from 

269 being saved as a FITS WCS. 

270 """ 

271 

272 def _update_header(header: astropy.io.fits.Header) -> None: 

273 update_header(header) 

274 if self.unit is not None: 

275 try: 

276 header["BUNIT"] = self.unit.to_string(format="fits") 

277 except ValueError: 

278 # Units not supported by FITS. 

279 pass 

280 if self.projection is not None and add_offset_wcs != " ": 

281 if self.fits_wcs: 

282 header.update(self.fits_wcs.to_header(relax=True)) 

283 if add_offset_wcs is not None: 

284 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

285 

286 array_model = archive.add_array(self.array, update_header=_update_header) 

287 serialized_projection: ProjectionSerializationModel[P] | None = None 

288 if save_projection and self.projection is not None: 

289 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

290 data = array_model if self.unit is None else array_model.with_units(self.unit) 

291 return ImageSerializationModel.model_construct( 

292 data=data, 

293 start=list(self.bbox.start), 

294 projection=serialized_projection, 

295 metadata=self.metadata, 

296 ) 

297 

298 @staticmethod 

299 def _get_archive_tree_type[P: pydantic.BaseModel]( 

300 pointer_type: type[P], 

301 ) -> type[ImageSerializationModel[P]]: 

302 """Return the serialization model type for this object for an archive 

303 type that uses the given pointer type. 

304 """ 

305 return ImageSerializationModel[pointer_type] # type: ignore 

306 

307 _archive_default_name: ClassVar[str] = "image" 

308 """The name this object should be serialized with when written as the 

309 top-level object. 

310 """ 

311 

312 def write_fits( 

313 self, 

314 filename: str, 

315 *, 

316 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

317 compression_seed: int | None = None, 

318 ) -> None: 

319 """Write the image to a FITS file. 

320 

321 Parameters 

322 ---------- 

323 filename 

324 Name of the file to write to. Must be a local file. 

325 compression 

326 Compression options. 

327 compression_seed 

328 A FITS tile compression seed to use whenever the configured 

329 compression seed is `None` or (for backwards compatibility) ``0``. 

330 """ 

331 compression_options = {} 

332 if compression is not fits.FitsCompressionOptions.DEFAULT: 

333 compression_options[self._archive_default_name] = compression 

334 fits.write(self, filename, compression_options, compression_seed=compression_seed) 

335 

336 @staticmethod 

337 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image: 

338 """Read an image from a FITS file. 

339 

340 Parameters 

341 ---------- 

342 url 

343 URL of the file to read; may be any type supported by 

344 `lsst.resources.ResourcePath`. 

345 bbox 

346 Bounding box of a subimage to read instead. 

347 """ 

348 return fits.read(Image, url, bbox=bbox).deserialized 

349 

350 @staticmethod 

351 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image: 

352 """Convert from an `lsst.afw.image.Image` instance. 

353 

354 Parameters 

355 ---------- 

356 legacy 

357 An `lsst.afw.image.Image` instance that will share pixel data with 

358 the returned object. 

359 unit 

360 Units of the image. 

361 """ 

362 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit) 

363 

364 def to_legacy(self, *, copy: bool | None = None) -> Any: 

365 """Convert to an `lsst.afw.image.Image` instance. 

366 

367 Parameters 

368 ---------- 

369 copy 

370 If `True`, always copy the pixel data. If `False`, return a view, 

371 and raise `TypeError` if the pixel data is read-only (this is not 

372 supported by afw). If `None`, onyl if the pixel data is 

373 read-only. 

374 """ 

375 import lsst.afw.image 

376 import lsst.geom 

377 

378 array = self._array 

379 if copy: 

380 array = array.copy() 

381 elif not self._array.flags.writeable: 

382 if copy is None: 

383 array = array.copy() 

384 else: 

385 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.") 

386 

387 return lsst.afw.image.Image( 

388 array, 

389 deep=False, 

390 dtype=array.dtype.type, 

391 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min), 

392 ) 

393 

394 @staticmethod 

395 def read_legacy( 

396 uri: ResourcePathExpression, 

397 *, 

398 preserve_quantization: bool = False, 

399 ext: str | int = 1, 

400 fits_wcs_frame: Frame | None = None, 

401 ) -> Image: 

402 """Read a FITS file written by `lsst.afw.image.Image.writeFits`. 

403 

404 Parameters 

405 ---------- 

406 uri 

407 URI or file name. 

408 preserve_quantization 

409 If `True`, ensure that writing the image back out again will 

410 exactly preserve quantization-compressed pixel values. This causes 

411 the arrays to be marked as read-only and stores the original binary 

412 table data for those planes in memory. If the `Image` is copied, 

413 the precompressed pixel values are not transferred to the copy. 

414 ext 

415 Name or index of the FITS HDU to read. 

416 fits_wcs_frame 

417 If not `None` and the HDU containing the image has a FITS WCS, 

418 attach a `Projection` to the returned image by converting that 

419 WCS. 

420 """ 

421 opaque_metadata = fits.FitsOpaqueMetadata() 

422 with ExitStack() as exit_stack: 

423 fs, fspath = ResourcePath(uri).to_fsspec() 

424 stream = exit_stack.enter_context(fs.open(fspath)) 

425 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream)) 

426 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

427 bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

428 if preserve_quantization: 

429 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

430 bintable_hdu_list = exit_stack.enter_context( 

431 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

432 ) 

433 bintable_hdu = bintable_hdu_list[ext] 

434 result = Image._read_legacy_hdu( 

435 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame 

436 ) 

437 result._opaque_metadata = opaque_metadata 

438 return result 

439 

440 @staticmethod 

441 def _read_legacy_hdu( 

442 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU, 

443 opaque_metadata: fits.FitsOpaqueMetadata, 

444 *, 

445 preserve_bintable: astropy.io.fits.BinTableHDU | None, 

446 fits_wcs_frame: Frame | None = None, 

447 ) -> Image: 

448 unit: astropy.units.UnitBase | None = None 

449 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None: 

450 unit = astropy.units.Unit(fits_unit, format="fits") 

451 if opaque_metadata.get_instrumental_unit() == astropy.units.electron: 

452 # Fix incorrect BUNIT='adu' in LSST preliminary_visit_image. 

453 if unit == astropy.units.adu: 

454 unit = astropy.units.electron 

455 if unit == astropy.units.adu**2: 

456 unit = astropy.units.electron**2 

457 dx: int = hdu.header.pop("LTV1") 

458 dy: int = hdu.header.pop("LTV2") 

459 start = YX(y=-dy, x=-dx) 

460 read_only: bool = False 

461 if preserve_bintable is not None: 

462 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable) 

463 read_only = True 

464 projection: Projection | None = None 

465 if fits_wcs_frame is not None: 

466 try: 

467 fits_wcs = astropy.wcs.WCS(hdu.header) 

468 except KeyError: 

469 pass 

470 else: 

471 projection = Projection.from_fits_wcs( 

472 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

473 ) 

474 image = Image(hdu.data, start=start, unit=unit, projection=projection) 

475 if read_only: 

476 image._array.flags["WRITEABLE"] = False 

477 fits.strip_wcs_cards(hdu.header) 

478 hdu.header.strip() 

479 hdu.header.remove("EXTTYPE", ignore_missing=True) 

480 hdu.header.remove("INHERIT", ignore_missing=True) 

481 hdu.header.remove("UZSCALE", ignore_missing=True) 

482 opaque_metadata.add_header(hdu.header) 

483 return image 

484 

485 

486class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

487 """Pydantic model used to represent the serialized form of an `.Image`.""" 

488 

489 data: ArrayReferenceQuantityModel | ArrayReferenceModel | InlineArrayModel | InlineArrayQuantityModel = ( 

490 pydantic.Field(description="Reference to pixel data.") 

491 ) 

492 start: list[int] = pydantic.Field( 

493 description="Coordinate of the first pixels in the array, ordered (y, x)." 

494 ) 

495 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

496 default=None, 

497 exclude_if=is_none, 

498 description="Projection that maps the logical pixel grid onto the sky.", 

499 ) 

500 

501 @property 

502 def bbox(self) -> Box: 

503 """The bounding box of the image.""" 

504 match self.data: 

505 case ArrayReferenceQuantityModel() | InlineArrayQuantityModel(): 

506 shape = self.data.value.shape 

507 case ArrayReferenceModel() | InlineArrayModel(): 

508 shape = self.data.shape 

509 return Box.from_shape(shape, self.start) 

510 

511 def deserialize( 

512 self, 

513 archive: InputArchive[Any], 

514 *, 

515 bbox: Box | None = None, 

516 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

517 **kwargs: Any, 

518 ) -> Image: 

519 """Deserialize an image from an input archive. 

520 

521 Parameters 

522 ---------- 

523 archive 

524 Archive to read from. 

525 bbox 

526 Bounding box of a subimage to read instead. 

527 strip_header 

528 A callable that strips out any FITS header cards added by the 

529 ``update_header`` argument in the corresponding call to 

530 `Image.serialize`. 

531 **kwargs 

532 Unsupported keyword arguments are accepted only to provide better 

533 error messages (raising `serialization.InvalidParameterError`). 

534 """ 

535 if kwargs: 

536 raise InvalidParameterError(f"Unrecognized parameters for Image: {set(kwargs.keys())}.") 

537 array_model: ArrayReferenceModel | InlineArrayModel 

538 unit: astropy.units.UnitBase | None = None 

539 if isinstance(self.data, ArrayReferenceQuantityModel | InlineArrayQuantityModel): 

540 array_model = self.data.value 

541 unit = self.data.unit 

542 else: 

543 array_model = self.data 

544 

545 def _strip_header(header: astropy.io.fits.Header) -> None: 

546 if unit is not None: 

547 header.pop("BUNIT", None) 

548 fits.strip_wcs_cards(header) 

549 strip_header(header) 

550 

551 slices = bbox.slice_within(self.bbox) if bbox is not None else ... 

552 array = archive.get_array(array_model, strip_header=_strip_header, slices=slices) 

553 projection = self.projection.deserialize(archive) if self.projection is not None else None 

554 return Image( 

555 array, 

556 start=self.start if bbox is None else bbox.start, 

557 unit=unit, 

558 projection=projection, 

559 )._finish_deserialize(self) 

560 

561 def deserialize_component(self, component: str, archive: InputArchive[Any], **kwargs: Any) -> Any: 

562 if kwargs: 

563 raise InvalidParameterError(f"Unsupported parameters for Image components: {set(kwargs.keys())}.") 

564 return super().deserialize_component(component, archive)