Coverage for python / lsst / images / _transforms / _projection.py: 46%

164 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:32 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Projection", "ProjectionAstropyView", "ProjectionSerializationModel") 

15 

16import functools 

17from typing import TYPE_CHECKING, Any, Self, TypeVar, final 

18 

19import astropy.units as u 

20import astropy.wcs 

21import numpy as np 

22import pydantic 

23from astropy.coordinates import ICRS, Latitude, Longitude, SkyCoord 

24from astropy.wcs.wcsapi import BaseLowLevelWCS, HighLevelWCSMixin 

25 

26from .._geom import XY, YX, Bounds, Box 

27from ..serialization import ArchiveTree, InputArchive, InvalidParameterError, OutputArchive 

28from ..utils import is_none 

29from . import _ast as astshim 

30from ._frames import Frame, SkyFrame 

31from ._transform import Transform, TransformSerializationModel, _ast_apply 

32 

33if TYPE_CHECKING: 

34 try: 

35 from lsst.afw.geom import SkyWcs as LegacySkyWcs 

36 except ImportError: 

37 type LegacySkyWcs = Any # type: ignore[no-redef] 

38 

39 

40# This pre-python-3.12 declaration is needed by Sphinx (probably the 

41# autodoc-typehints plugin. 

42F = TypeVar("F", bound=Frame) 

43P = TypeVar("P", bound=pydantic.BaseModel) 

44 

45 

46def _set_ast_skyframe_system(frame: astshim.SkyFrame, system: str) -> None: 

47 """Set an AST SkyFrame coordinate system across supported wrappers.""" 

48 if hasattr(frame, "_impl"): 

49 frame._impl.System = system 

50 else: 

51 setattr(frame, "system", system) 

52 

53 

54@final 

55class Projection[F: Frame]: 

56 """A transform from pixel coordinates to sky coordinates. 

57 

58 Parameters 

59 ---------- 

60 pixel_to_sky 

61 A low-level transform that maps pixel coordinates to sky coordinates. 

62 fits_approximation 

63 An approximation to ``pixel_to_sky`` that is guaranteed to have a 

64 `~Transform.as_fits_wcs` method that does not return `None`. This 

65 should not be provided if ``pixel_to_sky`` is itself representable 

66 as a FITS WCS. 

67 

68 Notes 

69 ----- 

70 `Transform` is conceptually immutable (the internal AST Mapping should 

71 never be modified in-place after construction), and hence does not need to 

72 be copied when any object that holds it is copied. 

73 """ 

74 

75 def __init__( 

76 self, pixel_to_sky: Transform[F, SkyFrame], fits_approximation: Transform[F, SkyFrame] | None = None 

77 ): 

78 self._pixel_to_sky = pixel_to_sky 

79 if pixel_to_sky.in_frame.unit != u.pix: 

80 raise ValueError("Transform is not a mapping from pixel coordinates.") 

81 if pixel_to_sky.out_frame != SkyFrame.ICRS: 

82 raise ValueError("Transform is not a mapping to ICRS.") 

83 self._fits_approximation = fits_approximation 

84 

85 @staticmethod 

86 def from_fits_wcs( 

87 fits_wcs: astropy.wcs.WCS, 

88 pixel_frame: F, 

89 pixel_bounds: Bounds | None = None, 

90 x0: int = 0, 

91 y0: int = 0, 

92 ) -> Projection[F]: 

93 """Construct a transform from a FITS WCS. 

94 

95 Parameters 

96 ---------- 

97 fits_wcs 

98 FITS WCS to convert. 

99 pixel_frame 

100 Coordinate frame for the pixel grid. 

101 pixel_bounds 

102 The region that bounds valid pixels for this transform. 

103 x0 

104 Logical coordinate of the first column in the array this WCS 

105 relates to world coordinates. 

106 y0 

107 Logical coordinate of the first column in the array this WCS 

108 relates to world coordinates. 

109 

110 Notes 

111 ----- 

112 The ``x0`` and ``y0`` parameters reflect the fact that for FITS, the 

113 first row and column are always labeled ``(1, 1)``, while in Astropy 

114 and most other Python libraries, they are ``(0, 0)``. The `types` in 

115 this package (e.g. `Image`, `Mask`) allow them to be any pair of 

116 integers. 

117 

118 See Also 

119 -------- 

120 Transform.from_fits_wcs 

121 """ 

122 return Projection( 

123 Transform.from_fits_wcs( 

124 fits_wcs, pixel_frame, SkyFrame.ICRS, in_bounds=pixel_bounds, x0=x0, y0=y0 

125 ) 

126 ) 

127 

128 @staticmethod 

129 def from_ast_frame_set( 

130 ast_frame_set: astshim.FrameSet, 

131 pixel_frame: F, 

132 pixel_bounds: Bounds | None = None, 

133 ) -> Projection[F]: 

134 """Construct a projection from an AST FrameSet. 

135 

136 The current frame of the FrameSet must be an AST SkyFrame. Its 

137 coordinate system is forced to ICRS (AST adjusts the mapping 

138 automatically) so the resulting Projection is always in ICRS 

139 regardless of the original sky system. 

140 

141 Parameters 

142 ---------- 

143 ast_frame_set 

144 An AST FrameSet whose base frame is pixel coordinates and 

145 whose current frame is a SkyFrame (in any supported sky 

146 coordinate system). 

147 pixel_frame 

148 Coordinate frame for the pixel grid. 

149 pixel_bounds 

150 The region that bounds valid pixels for this transform. 

151 

152 Raises 

153 ------ 

154 ValueError 

155 If the current frame of the FrameSet is not a SkyFrame. 

156 """ 

157 current_frame = ast_frame_set.getFrame(ast_frame_set.current, copy=False) 

158 if not isinstance(current_frame, astshim.SkyFrame): 

159 raise ValueError( 

160 "The current frame of the AST FrameSet is not a SkyFrame " 

161 f"(got {type(current_frame).__name__})." 

162 ) 

163 _set_ast_skyframe_system(current_frame, "ICRS") 

164 return Projection(Transform(pixel_frame, SkyFrame.ICRS, ast_frame_set, in_bounds=pixel_bounds)) 

165 

166 @property 

167 def pixel_frame(self) -> F: 

168 """Coordinate frame for the pixel grid.""" 

169 return self._pixel_to_sky.in_frame 

170 

171 @property 

172 def sky_frame(self) -> SkyFrame: 

173 """Coordinate frame for the sky.""" 

174 return self._pixel_to_sky.out_frame 

175 

176 @property 

177 def pixel_bounds(self) -> Bounds | None: 

178 """The region that bounds valid pixel points (`Bounds` | `None`).""" 

179 return self._pixel_to_sky.in_bounds 

180 

181 @property 

182 def pixel_to_sky_transform(self) -> Transform[F, SkyFrame]: 

183 """Low-level transform from pixel to sky coordinates (`Transform`).""" 

184 return self._pixel_to_sky 

185 

186 @property 

187 def sky_to_pixel_transform(self) -> Transform[SkyFrame, F]: 

188 """Low-level transform from sky to pixel coordinates (`Transform`).""" 

189 return self._pixel_to_sky.inverted() 

190 

191 @property 

192 def fits_approximation(self) -> Projection[F] | None: 

193 """An approximation to this projection that is guaranteed to have an 

194 `as_fits_wcs` method that does not return `None`. 

195 """ 

196 return Projection(self._fits_approximation) if self._fits_approximation is not None else None 

197 

198 def show(self, simplified: bool = False, comments: bool = False) -> str: 

199 """Return the AST native representation of the transform. 

200 

201 Parameters 

202 ---------- 

203 simplified 

204 Whether to ask AST to simplify the mapping before showing it. 

205 This will make it much more likely that two equivalent transforms 

206 have the same `show` result. 

207 comments 

208 Whether to include descriptive comments. 

209 """ 

210 return self._pixel_to_sky.show(simplified=simplified, comments=comments) 

211 

212 def pixel_to_sky[T: np.ndarray | float](self, *, x: T, y: T) -> SkyCoord: 

213 """Transform one or more pixel points to sky coordinates. 

214 

215 Parameters 

216 ---------- 

217 x : `numpy.ndarray` | `float` 

218 ``x`` values of the pixel points to transform. 

219 y : `numpy.ndarray` | `float` 

220 ``y`` values of the pixel points to transform. 

221 

222 Returns 

223 ------- 

224 astropy.coordinates.SkyCoord 

225 Transformed sky coordinates. 

226 """ 

227 sky_rad = self._pixel_to_sky.apply_forward(x=x, y=y) 

228 return SkyCoord(ra=sky_rad.x, dec=sky_rad.y, unit=u.rad) 

229 

230 def sky_to_pixel(self, sky: SkyCoord) -> XY[np.ndarray | float]: 

231 """Transform one or more sky coordinates to pixels 

232 

233 Parameters 

234 ---------- 

235 sky 

236 Sky coordinates to transform. 

237 

238 Returns 

239 ------- 

240 `XY` [`numpy.ndarray` | `float`] 

241 Transformed pixel coordinates. 

242 """ 

243 if sky.frame.name != "icrs": 

244 sky = sky.transform_to("icrs") 

245 ra: Longitude = sky.ra 

246 dec: Latitude = sky.dec 

247 return self._pixel_to_sky.apply_inverse( 

248 x=ra.to_value(u.rad), 

249 y=dec.to_value(u.rad), 

250 ) 

251 

252 def as_astropy(self, bbox: Box | None = None) -> ProjectionAstropyView: 

253 """Return an `astropy.wcs` view of this `Projection`. 

254 

255 Parameters 

256 ---------- 

257 bbox 

258 Bounding box of the array the view will describe. This 

259 projection object is assumed to work on the same coordinate system 

260 in which ``bbox`` is defined, while the Astropy view will consider 

261 the first row and column in that box to be ``(0, 0)``. 

262 

263 Notes 

264 ----- 

265 This returns an object that satisfies the 

266 `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

267 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces while evaluating the 

268 underlying `Projection` itself. It is *not* an `astropy.wcs.WCS` 

269 instance, which is a type that also satisfies those interfaces but 

270 only supports FITS WCS representations (see `as_fits_wcs`). 

271 """ 

272 return ProjectionAstropyView(self._pixel_to_sky._ast_mapping, bbox) 

273 

274 def as_fits_wcs(self, bbox: Box, allow_approximation: bool = False) -> astropy.wcs.WCS | None: 

275 """Return a FITS WCS representation of this projection, if possible. 

276 

277 Parameters 

278 ---------- 

279 bbox 

280 Bounding box of the array the FITS WCS will describe. This 

281 transform object is assumed to work on the same coordinate system 

282 in which ``bbox`` is defined, while the FITS WCS will consider the 

283 first row and column in that box to be ``(0, 0)`` (in Astropy 

284 interfaces) or ``(1, 1)`` (in the FITS representation itself). 

285 allow_approximation 

286 If `True` and this `Projection` holds a FITS approximation to 

287 itself, return that approximation. 

288 """ 

289 if allow_approximation and self._fits_approximation: 

290 return self._fits_approximation.as_fits_wcs(bbox) 

291 return self._pixel_to_sky.as_fits_wcs(bbox) 

292 

293 def serialize[P: pydantic.BaseModel]( 

294 self, archive: OutputArchive[P], *, use_frame_sets: bool = False 

295 ) -> ProjectionSerializationModel[P]: 

296 """Serialize a projection to an archive. 

297 

298 Parameters 

299 ---------- 

300 archive 

301 Archive to serialize to. 

302 use_frame_sets 

303 If `True`, decompose the underlying transform and try to reference 

304 component mappings that were already serialized into a `FrameSet` 

305 in the archive. The FITS approximation transform is never 

306 decomposed. 

307 

308 Returns 

309 ------- 

310 `ProjectionSerializationModel` 

311 Serialized form of the projection. 

312 """ 

313 pixel_to_sky = archive.serialize_direct( 

314 "pixel_to_sky", functools.partial(self._pixel_to_sky.serialize, use_frame_sets=use_frame_sets) 

315 ) 

316 fits_approximation = ( 

317 archive.serialize_direct("fits_approximation", self._fits_approximation.serialize) 

318 if self._fits_approximation is not None 

319 else None 

320 ) 

321 return ProjectionSerializationModel(pixel_to_sky=pixel_to_sky, fits_approximation=fits_approximation) 

322 

323 @staticmethod 

324 def _get_archive_tree_type[P: pydantic.BaseModel]( 

325 pointer_type: type[P], 

326 ) -> type[ProjectionSerializationModel[P]]: 

327 """Return the serialization model type for this object for an archive 

328 type that uses the given pointer type. 

329 """ 

330 return ProjectionSerializationModel[pointer_type] # type: ignore 

331 

332 @staticmethod 

333 def from_legacy( 

334 sky_wcs: LegacySkyWcs, pixel_frame: F, pixel_bounds: Bounds | None = None 

335 ) -> Projection[F]: 

336 """Construct a transform from a legacy `lsst.afw.geom.SkyWcs`. 

337 

338 Parameters 

339 ---------- 

340 sky_wcs : `lsst.afw.geom.SkyWcs` 

341 Legacy WCS object. 

342 pixel_frame 

343 Coordinate frame for the pixel grid. 

344 pixel_bounds 

345 The region that bounds valid pixels for this transform. 

346 """ 

347 fits_approximation: Transform[F, SkyFrame] | None = None 

348 if (legacy_fits_approximation := sky_wcs.getFitsApproximation()) is not None: 

349 fits_approximation = Transform( 

350 pixel_frame, 

351 SkyFrame.ICRS, 

352 legacy_fits_approximation.getFrameDict(), 

353 pixel_bounds, 

354 ) 

355 return Projection( 

356 Transform(pixel_frame, SkyFrame.ICRS, sky_wcs.getFrameDict(), pixel_bounds), 

357 fits_approximation=fits_approximation, 

358 ) 

359 

360 def to_legacy(self) -> LegacySkyWcs: 

361 """Convert to a legacy `lsst.afw.geom.SkyWcs` instance.""" 

362 from lsst.afw.geom import SkyWcs as LegacySkyWcs 

363 

364 try: 

365 ast_mapping = astshim.FrameDict(self._pixel_to_sky._ast_mapping) 

366 except TypeError as err: 

367 err.add_note( 

368 "Only Projections created by from_legacy and from_fits_wcs " 

369 "are guaranteed to be convertible to SkyWcs." 

370 ) 

371 raise 

372 legacy_wcs = LegacySkyWcs(ast_mapping) 

373 if self.fits_approximation is not None: 

374 legacy_wcs = legacy_wcs.copyWithFitsApproximation(self.fits_approximation.to_legacy()) 

375 return legacy_wcs 

376 

377 

378class ProjectionAstropyView(BaseLowLevelWCS, HighLevelWCSMixin): 

379 """An Astropy-interface view of a `Projection`. 

380 

381 Notes 

382 ----- 

383 The constructor of this classe is considered a private implementation 

384 detail; use `Projection.as_astropy` instead. 

385 

386 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

387 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces while evaluating the 

388 underlying `Projection` itself. It is *not* an `astropy.wcs.WCS` 

389 subclass, which is a type that also satisfies those interfaces but 

390 only supports FITS WCS representations (see `Projection.as_fits_wcs`). 

391 """ 

392 

393 def __init__(self, ast_pixel_to_sky: astshim.Mapping, bbox: Box | None): 

394 self._bbox = bbox 

395 if bbox is not None: 

396 ast_pixel_to_sky = astshim.ShiftMap(list(bbox.start.xy)).then(ast_pixel_to_sky) 

397 self._ast_pixel_to_sky = ast_pixel_to_sky 

398 

399 @property 

400 def low_level_wcs(self) -> Self: 

401 return self 

402 

403 @property 

404 def array_shape(self) -> YX[int] | None: 

405 return self._bbox.shape if self._bbox is not None else None 

406 

407 @property 

408 def axis_correlation_matrix(self) -> np.ndarray: 

409 return np.array([[True, True], [True, True]]) 

410 

411 @property 

412 def pixel_axis_names(self) -> XY[str]: 

413 return XY("x", "y") 

414 

415 @property 

416 def pixel_bounds(self) -> XY[tuple[int, int]] | None: 

417 if self._bbox is None: 

418 return None 

419 return XY((self._bbox.x.min, self._bbox.x.max), (self._bbox.y.min, self._bbox.y.max)) 

420 

421 @property 

422 def pixel_n_dim(self) -> int: 

423 return 2 

424 

425 @property 

426 def pixel_shape(self) -> XY[int] | None: 

427 array_shape = self.array_shape 

428 return array_shape.xy if array_shape is not None else None 

429 

430 @property 

431 def serialized_classes(self) -> bool: 

432 return False 

433 

434 @property 

435 def world_axis_names(self) -> tuple[str, str]: 

436 return ("ra", "dec") 

437 

438 @property 

439 def world_axis_object_classes(self) -> dict[str, tuple[type[SkyCoord], tuple[()], dict[str, Any]]]: 

440 return {"celestial": (SkyCoord, (), {"frame": ICRS, "unit": (u.rad, u.rad)})} 

441 

442 @property 

443 def world_axis_object_components(self) -> list[tuple[str, int, str]]: 

444 return [("celestial", 0, "spherical.lon.radian"), ("celestial", 1, "spherical.lat.radian")] 

445 

446 @property 

447 def world_axis_physical_types(self) -> tuple[str, str]: 

448 return ("pos.eq.ra", "pos.eq.dec") 

449 

450 @property 

451 def world_axis_units(self) -> tuple[str, str]: 

452 return ("rad", "rad") 

453 

454 @property 

455 def world_n_dim(self) -> int: 

456 return 2 

457 

458 def pixel_to_world_values(self, x: np.ndarray, y: np.ndarray) -> XY[np.ndarray]: 

459 return _ast_apply(self._ast_pixel_to_sky.applyForward, x=x, y=y) 

460 

461 def world_to_pixel_values(self, ra: np.ndarray, dec: np.ndarray) -> XY[np.ndarray]: 

462 return _ast_apply(self._ast_pixel_to_sky.applyInverse, x=ra, y=dec) 

463 

464 

465class ProjectionSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

466 """Serialization model for projetions.""" 

467 

468 pixel_to_sky: TransformSerializationModel[P] = pydantic.Field( 

469 description="The transform that maps pixel coordinates to the sky." 

470 ) 

471 fits_approximation: TransformSerializationModel[P] | None = pydantic.Field( 

472 default=None, 

473 description=( 

474 "An approximation of the pixel-to-sky transform that is exactly representable as a FITS WCS." 

475 ), 

476 exclude_if=is_none, 

477 ) 

478 

479 def deserialize(self, archive: InputArchive[P], **kwargs: Any) -> Projection[Any]: 

480 """Deserialize a projection from an archive. 

481 

482 Parameters 

483 ---------- 

484 archive 

485 Archive to read from. 

486 **kwargs 

487 Unsupported keyword arguments are accepted only to provide better 

488 error messages (raising `serialization.InvalidParameterError`). 

489 """ 

490 if kwargs: 

491 raise InvalidParameterError(f"Unrecognized parameters for Projection: {set(kwargs.keys())}.") 

492 pixel_to_sky = self.pixel_to_sky.deserialize(archive) 

493 fits_approximation = ( 

494 self.fits_approximation.deserialize(archive) if self.fits_approximation is not None else None 

495 ) 

496 return Projection(pixel_to_sky, fits_approximation=fits_approximation)