Coverage for python / lsst / images / _transforms / _projection.py: 46%
164 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:30 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:30 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("Projection", "ProjectionAstropyView", "ProjectionSerializationModel")
16import functools
17from typing import TYPE_CHECKING, Any, Self, TypeVar, final
19import astropy.units as u
20import astropy.wcs
21import numpy as np
22import pydantic
23from astropy.coordinates import ICRS, Latitude, Longitude, SkyCoord
24from astropy.wcs.wcsapi import BaseLowLevelWCS, HighLevelWCSMixin
26from .._geom import XY, YX, Bounds, Box
27from ..serialization import ArchiveTree, InputArchive, InvalidParameterError, OutputArchive
28from ..utils import is_none
29from . import _ast as astshim
30from ._frames import Frame, SkyFrame
31from ._transform import Transform, TransformSerializationModel, _ast_apply
33if TYPE_CHECKING:
34 try:
35 from lsst.afw.geom import SkyWcs as LegacySkyWcs
36 except ImportError:
37 type LegacySkyWcs = Any # type: ignore[no-redef]
40# This pre-python-3.12 declaration is needed by Sphinx (probably the
41# autodoc-typehints plugin.
42F = TypeVar("F", bound=Frame)
43P = TypeVar("P", bound=pydantic.BaseModel)
46def _set_ast_skyframe_system(frame: astshim.SkyFrame, system: str) -> None:
47 """Set an AST SkyFrame coordinate system across supported wrappers."""
48 if hasattr(frame, "_impl"):
49 frame._impl.System = system
50 else:
51 setattr(frame, "system", system)
54@final
55class Projection[F: Frame]:
56 """A transform from pixel coordinates to sky coordinates.
58 Parameters
59 ----------
60 pixel_to_sky
61 A low-level transform that maps pixel coordinates to sky coordinates.
62 fits_approximation
63 An approximation to ``pixel_to_sky`` that is guaranteed to have a
64 `~Transform.as_fits_wcs` method that does not return `None`. This
65 should not be provided if ``pixel_to_sky`` is itself representable
66 as a FITS WCS.
68 Notes
69 -----
70 `Transform` is conceptually immutable (the internal AST Mapping should
71 never be modified in-place after construction), and hence does not need to
72 be copied when any object that holds it is copied.
73 """
75 def __init__(
76 self, pixel_to_sky: Transform[F, SkyFrame], fits_approximation: Transform[F, SkyFrame] | None = None
77 ):
78 self._pixel_to_sky = pixel_to_sky
79 if pixel_to_sky.in_frame.unit != u.pix:
80 raise ValueError("Transform is not a mapping from pixel coordinates.")
81 if pixel_to_sky.out_frame != SkyFrame.ICRS:
82 raise ValueError("Transform is not a mapping to ICRS.")
83 self._fits_approximation = fits_approximation
85 @staticmethod
86 def from_fits_wcs(
87 fits_wcs: astropy.wcs.WCS,
88 pixel_frame: F,
89 pixel_bounds: Bounds | None = None,
90 x0: int = 0,
91 y0: int = 0,
92 ) -> Projection[F]:
93 """Construct a transform from a FITS WCS.
95 Parameters
96 ----------
97 fits_wcs
98 FITS WCS to convert.
99 pixel_frame
100 Coordinate frame for the pixel grid.
101 pixel_bounds
102 The region that bounds valid pixels for this transform.
103 x0
104 Logical coordinate of the first column in the array this WCS
105 relates to world coordinates.
106 y0
107 Logical coordinate of the first column in the array this WCS
108 relates to world coordinates.
110 Notes
111 -----
112 The ``x0`` and ``y0`` parameters reflect the fact that for FITS, the
113 first row and column are always labeled ``(1, 1)``, while in Astropy
114 and most other Python libraries, they are ``(0, 0)``. The `types` in
115 this package (e.g. `Image`, `Mask`) allow them to be any pair of
116 integers.
118 See Also
119 --------
120 Transform.from_fits_wcs
121 """
122 return Projection(
123 Transform.from_fits_wcs(
124 fits_wcs, pixel_frame, SkyFrame.ICRS, in_bounds=pixel_bounds, x0=x0, y0=y0
125 )
126 )
128 @staticmethod
129 def from_ast_frame_set(
130 ast_frame_set: astshim.FrameSet,
131 pixel_frame: F,
132 pixel_bounds: Bounds | None = None,
133 ) -> Projection[F]:
134 """Construct a projection from an AST FrameSet.
136 The current frame of the FrameSet must be an AST SkyFrame. Its
137 coordinate system is forced to ICRS (AST adjusts the mapping
138 automatically) so the resulting Projection is always in ICRS
139 regardless of the original sky system.
141 Parameters
142 ----------
143 ast_frame_set
144 An AST FrameSet whose base frame is pixel coordinates and
145 whose current frame is a SkyFrame (in any supported sky
146 coordinate system).
147 pixel_frame
148 Coordinate frame for the pixel grid.
149 pixel_bounds
150 The region that bounds valid pixels for this transform.
152 Raises
153 ------
154 ValueError
155 If the current frame of the FrameSet is not a SkyFrame.
156 """
157 current_frame = ast_frame_set.getFrame(ast_frame_set.current, copy=False)
158 if not isinstance(current_frame, astshim.SkyFrame):
159 raise ValueError(
160 "The current frame of the AST FrameSet is not a SkyFrame "
161 f"(got {type(current_frame).__name__})."
162 )
163 _set_ast_skyframe_system(current_frame, "ICRS")
164 return Projection(Transform(pixel_frame, SkyFrame.ICRS, ast_frame_set, in_bounds=pixel_bounds))
166 @property
167 def pixel_frame(self) -> F:
168 """Coordinate frame for the pixel grid."""
169 return self._pixel_to_sky.in_frame
171 @property
172 def sky_frame(self) -> SkyFrame:
173 """Coordinate frame for the sky."""
174 return self._pixel_to_sky.out_frame
176 @property
177 def pixel_bounds(self) -> Bounds | None:
178 """The region that bounds valid pixel points (`Bounds` | `None`)."""
179 return self._pixel_to_sky.in_bounds
181 @property
182 def pixel_to_sky_transform(self) -> Transform[F, SkyFrame]:
183 """Low-level transform from pixel to sky coordinates (`Transform`)."""
184 return self._pixel_to_sky
186 @property
187 def sky_to_pixel_transform(self) -> Transform[SkyFrame, F]:
188 """Low-level transform from sky to pixel coordinates (`Transform`)."""
189 return self._pixel_to_sky.inverted()
191 @property
192 def fits_approximation(self) -> Projection[F] | None:
193 """An approximation to this projection that is guaranteed to have an
194 `as_fits_wcs` method that does not return `None`.
195 """
196 return Projection(self._fits_approximation) if self._fits_approximation is not None else None
198 def show(self, simplified: bool = False, comments: bool = False) -> str:
199 """Return the AST native representation of the transform.
201 Parameters
202 ----------
203 simplified
204 Whether to ask AST to simplify the mapping before showing it.
205 This will make it much more likely that two equivalent transforms
206 have the same `show` result.
207 comments
208 Whether to include descriptive comments.
209 """
210 return self._pixel_to_sky.show(simplified=simplified, comments=comments)
212 def pixel_to_sky[T: np.ndarray | float](self, *, x: T, y: T) -> SkyCoord:
213 """Transform one or more pixel points to sky coordinates.
215 Parameters
216 ----------
217 x : `numpy.ndarray` | `float`
218 ``x`` values of the pixel points to transform.
219 y : `numpy.ndarray` | `float`
220 ``y`` values of the pixel points to transform.
222 Returns
223 -------
224 astropy.coordinates.SkyCoord
225 Transformed sky coordinates.
226 """
227 sky_rad = self._pixel_to_sky.apply_forward(x=x, y=y)
228 return SkyCoord(ra=sky_rad.x, dec=sky_rad.y, unit=u.rad)
230 def sky_to_pixel(self, sky: SkyCoord) -> XY[np.ndarray | float]:
231 """Transform one or more sky coordinates to pixels
233 Parameters
234 ----------
235 sky
236 Sky coordinates to transform.
238 Returns
239 -------
240 `XY` [`numpy.ndarray` | `float`]
241 Transformed pixel coordinates.
242 """
243 if sky.frame.name != "icrs":
244 sky = sky.transform_to("icrs")
245 ra: Longitude = sky.ra
246 dec: Latitude = sky.dec
247 return self._pixel_to_sky.apply_inverse(
248 x=ra.to_value(u.rad),
249 y=dec.to_value(u.rad),
250 )
252 def as_astropy(self, bbox: Box | None = None) -> ProjectionAstropyView:
253 """Return an `astropy.wcs` view of this `Projection`.
255 Parameters
256 ----------
257 bbox
258 Bounding box of the array the view will describe. This
259 projection object is assumed to work on the same coordinate system
260 in which ``bbox`` is defined, while the Astropy view will consider
261 the first row and column in that box to be ``(0, 0)``.
263 Notes
264 -----
265 This returns an object that satisfies the
266 `astropy.wcs.wcsapi.BaseHighLevelWCS` and
267 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces while evaluating the
268 underlying `Projection` itself. It is *not* an `astropy.wcs.WCS`
269 instance, which is a type that also satisfies those interfaces but
270 only supports FITS WCS representations (see `as_fits_wcs`).
271 """
272 return ProjectionAstropyView(self._pixel_to_sky._ast_mapping, bbox)
274 def as_fits_wcs(self, bbox: Box, allow_approximation: bool = False) -> astropy.wcs.WCS | None:
275 """Return a FITS WCS representation of this projection, if possible.
277 Parameters
278 ----------
279 bbox
280 Bounding box of the array the FITS WCS will describe. This
281 transform object is assumed to work on the same coordinate system
282 in which ``bbox`` is defined, while the FITS WCS will consider the
283 first row and column in that box to be ``(0, 0)`` (in Astropy
284 interfaces) or ``(1, 1)`` (in the FITS representation itself).
285 allow_approximation
286 If `True` and this `Projection` holds a FITS approximation to
287 itself, return that approximation.
288 """
289 if allow_approximation and self._fits_approximation:
290 return self._fits_approximation.as_fits_wcs(bbox)
291 return self._pixel_to_sky.as_fits_wcs(bbox)
293 def serialize[P: pydantic.BaseModel](
294 self, archive: OutputArchive[P], *, use_frame_sets: bool = False
295 ) -> ProjectionSerializationModel[P]:
296 """Serialize a projection to an archive.
298 Parameters
299 ----------
300 archive
301 Archive to serialize to.
302 use_frame_sets
303 If `True`, decompose the underlying transform and try to reference
304 component mappings that were already serialized into a `FrameSet`
305 in the archive. The FITS approximation transform is never
306 decomposed.
308 Returns
309 -------
310 `ProjectionSerializationModel`
311 Serialized form of the projection.
312 """
313 pixel_to_sky = archive.serialize_direct(
314 "pixel_to_sky", functools.partial(self._pixel_to_sky.serialize, use_frame_sets=use_frame_sets)
315 )
316 fits_approximation = (
317 archive.serialize_direct("fits_approximation", self._fits_approximation.serialize)
318 if self._fits_approximation is not None
319 else None
320 )
321 return ProjectionSerializationModel(pixel_to_sky=pixel_to_sky, fits_approximation=fits_approximation)
323 @staticmethod
324 def _get_archive_tree_type[P: pydantic.BaseModel](
325 pointer_type: type[P],
326 ) -> type[ProjectionSerializationModel[P]]:
327 """Return the serialization model type for this object for an archive
328 type that uses the given pointer type.
329 """
330 return ProjectionSerializationModel[pointer_type] # type: ignore
332 @staticmethod
333 def from_legacy(
334 sky_wcs: LegacySkyWcs, pixel_frame: F, pixel_bounds: Bounds | None = None
335 ) -> Projection[F]:
336 """Construct a transform from a legacy `lsst.afw.geom.SkyWcs`.
338 Parameters
339 ----------
340 sky_wcs : `lsst.afw.geom.SkyWcs`
341 Legacy WCS object.
342 pixel_frame
343 Coordinate frame for the pixel grid.
344 pixel_bounds
345 The region that bounds valid pixels for this transform.
346 """
347 fits_approximation: Transform[F, SkyFrame] | None = None
348 if (legacy_fits_approximation := sky_wcs.getFitsApproximation()) is not None:
349 fits_approximation = Transform(
350 pixel_frame,
351 SkyFrame.ICRS,
352 legacy_fits_approximation.getFrameDict(),
353 pixel_bounds,
354 )
355 return Projection(
356 Transform(pixel_frame, SkyFrame.ICRS, sky_wcs.getFrameDict(), pixel_bounds),
357 fits_approximation=fits_approximation,
358 )
360 def to_legacy(self) -> LegacySkyWcs:
361 """Convert to a legacy `lsst.afw.geom.SkyWcs` instance."""
362 from lsst.afw.geom import SkyWcs as LegacySkyWcs
364 try:
365 ast_mapping = astshim.FrameDict(self._pixel_to_sky._ast_mapping)
366 except TypeError as err:
367 err.add_note(
368 "Only Projections created by from_legacy and from_fits_wcs "
369 "are guaranteed to be convertible to SkyWcs."
370 )
371 raise
372 legacy_wcs = LegacySkyWcs(ast_mapping)
373 if self.fits_approximation is not None:
374 legacy_wcs = legacy_wcs.copyWithFitsApproximation(self.fits_approximation.to_legacy())
375 return legacy_wcs
378class ProjectionAstropyView(BaseLowLevelWCS, HighLevelWCSMixin):
379 """An Astropy-interface view of a `Projection`.
381 Notes
382 -----
383 The constructor of this classe is considered a private implementation
384 detail; use `Projection.as_astropy` instead.
386 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
387 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces while evaluating the
388 underlying `Projection` itself. It is *not* an `astropy.wcs.WCS`
389 subclass, which is a type that also satisfies those interfaces but
390 only supports FITS WCS representations (see `Projection.as_fits_wcs`).
391 """
393 def __init__(self, ast_pixel_to_sky: astshim.Mapping, bbox: Box | None):
394 self._bbox = bbox
395 if bbox is not None:
396 ast_pixel_to_sky = astshim.ShiftMap(list(bbox.start.xy)).then(ast_pixel_to_sky)
397 self._ast_pixel_to_sky = ast_pixel_to_sky
399 @property
400 def low_level_wcs(self) -> Self:
401 return self
403 @property
404 def array_shape(self) -> YX[int] | None:
405 return self._bbox.shape if self._bbox is not None else None
407 @property
408 def axis_correlation_matrix(self) -> np.ndarray:
409 return np.array([[True, True], [True, True]])
411 @property
412 def pixel_axis_names(self) -> XY[str]:
413 return XY("x", "y")
415 @property
416 def pixel_bounds(self) -> XY[tuple[int, int]] | None:
417 if self._bbox is None:
418 return None
419 return XY((self._bbox.x.min, self._bbox.x.max), (self._bbox.y.min, self._bbox.y.max))
421 @property
422 def pixel_n_dim(self) -> int:
423 return 2
425 @property
426 def pixel_shape(self) -> XY[int] | None:
427 array_shape = self.array_shape
428 return array_shape.xy if array_shape is not None else None
430 @property
431 def serialized_classes(self) -> bool:
432 return False
434 @property
435 def world_axis_names(self) -> tuple[str, str]:
436 return ("ra", "dec")
438 @property
439 def world_axis_object_classes(self) -> dict[str, tuple[type[SkyCoord], tuple[()], dict[str, Any]]]:
440 return {"celestial": (SkyCoord, (), {"frame": ICRS, "unit": (u.rad, u.rad)})}
442 @property
443 def world_axis_object_components(self) -> list[tuple[str, int, str]]:
444 return [("celestial", 0, "spherical.lon.radian"), ("celestial", 1, "spherical.lat.radian")]
446 @property
447 def world_axis_physical_types(self) -> tuple[str, str]:
448 return ("pos.eq.ra", "pos.eq.dec")
450 @property
451 def world_axis_units(self) -> tuple[str, str]:
452 return ("rad", "rad")
454 @property
455 def world_n_dim(self) -> int:
456 return 2
458 def pixel_to_world_values(self, x: np.ndarray, y: np.ndarray) -> XY[np.ndarray]:
459 return _ast_apply(self._ast_pixel_to_sky.applyForward, x=x, y=y)
461 def world_to_pixel_values(self, ra: np.ndarray, dec: np.ndarray) -> XY[np.ndarray]:
462 return _ast_apply(self._ast_pixel_to_sky.applyInverse, x=ra, y=dec)
465class ProjectionSerializationModel[P: pydantic.BaseModel](ArchiveTree):
466 """Serialization model for projetions."""
468 pixel_to_sky: TransformSerializationModel[P] = pydantic.Field(
469 description="The transform that maps pixel coordinates to the sky."
470 )
471 fits_approximation: TransformSerializationModel[P] | None = pydantic.Field(
472 default=None,
473 description=(
474 "An approximation of the pixel-to-sky transform that is exactly representable as a FITS WCS."
475 ),
476 exclude_if=is_none,
477 )
479 def deserialize(self, archive: InputArchive[P], **kwargs: Any) -> Projection[Any]:
480 """Deserialize a projection from an archive.
482 Parameters
483 ----------
484 archive
485 Archive to read from.
486 **kwargs
487 Unsupported keyword arguments are accepted only to provide better
488 error messages (raising `serialization.InvalidParameterError`).
489 """
490 if kwargs:
491 raise InvalidParameterError(f"Unrecognized parameters for Projection: {set(kwargs.keys())}.")
492 pixel_to_sky = self.pixel_to_sky.deserialize(archive)
493 fits_approximation = (
494 self.fits_approximation.deserialize(archive) if self.fits_approximation is not None else None
495 )
496 return Projection(pixel_to_sky, fits_approximation=fits_approximation)