Coverage for python/lsst/images/_transforms/_ast.py: 4%
198 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 01:09 -0700
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 01:09 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14from collections.abc import Iterable
15from typing import TYPE_CHECKING, Any, ClassVar, Self
17import numpy as np
19__all__ = (
20 "USING_STARLINK_PYAST",
21 "Channel",
22 "CmpFrame",
23 "CmpMap",
24 "FitsChan",
25 "Frame",
26 "FrameSet",
27 "Mapping",
28 "PolyMap",
29 "ShiftMap",
30 "SkyFrame",
31 "StringStream",
32 "UnitMap",
33 "ZoomMap",
34)
36if TYPE_CHECKING:
37 import starlink.Ast
39 USING_STARLINK_PYAST = True
40else:
41 try:
42 from astshim import (
43 Channel,
44 CmpFrame,
45 CmpMap,
46 FitsChan,
47 Frame,
48 FrameDict,
49 FrameSet,
50 Mapping,
51 Object,
52 PolyMap,
53 ShiftMap,
54 SkyFrame,
55 StringStream,
56 UnitMap,
57 ZoomMap,
58 )
59 except ImportError:
60 import starlink.Ast
62 USING_STARLINK_PYAST = True
63 else:
64 USING_STARLINK_PYAST = False
67if USING_STARLINK_PYAST: 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was never true
69 class StringStream:
70 """A bridge object that mimics both astshim.StringStream and the
71 interface expected by starlink.Ast.Channel.
73 Notes
74 -----
75 This object can be constructed like an `astshim.StringStream`, but its
76 `astsink` and `astsource` methods actually correspond to the
77 `starlink.Ast` interface, so we can use `starlink.Ast` objects to
78 implement the `FitsChan` classes in this module
79 """
81 def __init__(self, text: str = ""):
82 if "\n" in text or "\r" in text:
83 self._lines = text.splitlines()
84 elif text and len(text) % 80 == 0:
85 # Astropy WCS.to_header_string() yields a single concatenated
86 # FITS header block; FitsChan expects one card per source line.
87 self._lines = [text[n : n + 80] for n in range(0, len(text), 80)]
88 else:
89 self._lines = text.splitlines()
91 def astsource(self) -> str | None:
92 if not self._lines:
93 return None
94 return self._lines.pop(0)
96 def astsink(self, line: str) -> None:
97 self._lines.append(line)
99 def to_string(self) -> str:
100 if not self._lines:
101 return ""
102 return "\n".join(self._lines) + "\n"
104 def getSinkData(self) -> str:
105 return self.to_string()
107 class Object:
108 """Bridge class that exposes the `astshim.Object` interface while
109 being backed by an `astshim.Ast.Object`.
110 """
112 def __init__(self, impl: starlink.Ast.Object):
113 if not isinstance(impl, self._IMPL_TYPE):
114 raise TypeError(f"{type(self).__name__} cannot wrap {type(impl).__name__}.")
115 self._impl = impl
117 _IMPL_TYPE: ClassVar[type[starlink.Ast.Object]] = starlink.Ast.Object
119 def show(self, showComments: bool = True) -> str:
120 sink = StringStream()
121 options = "" if showComments else "Comment=0"
122 chan = starlink.Ast.Channel(None, sink, options=options)
123 chan.write(self._impl)
124 return sink.to_string()
126 def __eq__(self, other: object) -> bool:
127 if not isinstance(other, Object) or type(self) is not type(other):
128 return NotImplemented
129 # ``astshim.Object`` ships a structural ``__eq__``; mirror that on
130 # the starlink-pyast wrapper by comparing the AST channel
131 # serialisation, which is the canonical content-faithful
132 # representation for AST objects. Strip comments so cosmetic
133 # changes between equivalent objects do not break equality.
134 return self.show(showComments=False) == other.show(showComments=False)
136 __hash__ = None # type: ignore[assignment]
138 @classmethod
139 def fromString(cls, serialized: str) -> Self:
140 source = StringStream(serialized)
141 channel = starlink.Ast.Channel(source)
142 return cls._wrap(channel.read())
144 @classmethod
145 def _wrap(cls, impl: starlink.Ast.Object) -> Self:
146 subcls = cls._most_derived_type(impl)
147 result = object.__new__(subcls)
148 Object.__init__(result, impl)
149 return result
151 @classmethod
152 def _most_derived_type(cls, impl: starlink.Ast.Object) -> type[Self]:
153 for subcls in cls.__subclasses__():
154 if isinstance(impl, subcls._IMPL_TYPE):
155 return subcls._most_derived_type(impl)
156 return cls
158 class Mapping(Object):
159 _IMPL_TYPE: ClassVar[type[starlink.Ast.Mapping]] = starlink.Ast.Mapping
161 def simplified(self) -> Mapping:
162 return Mapping._wrap(self._impl.simplify())
164 def applyForward(self, xy: Any) -> Any:
165 return self._impl.tran(xy, True)
167 def applyInverse(self, xy: Any) -> Any:
168 return self._impl.tran(xy, False)
170 def then(self, other: Mapping) -> Mapping:
171 return Mapping._wrap(starlink.Ast.CmpMap(self._impl, other._impl, True))
173 def inverted(self) -> Mapping:
174 copy = self._impl.copy()
175 copy.invert()
176 return Mapping._wrap(copy)
178 def linearApprox(self, lbnd: Any, ubnd: Any, tol: float) -> np.ndarray:
179 """Best linear approximation to this mapping over a hyper-box.
181 Parameters
182 ----------
183 lbnd, ubnd
184 Per-axis lower / upper input-coordinate bounds of the
185 box over which the approximation is required.
186 tol
187 Maximum permitted deviation from linearity, expressed
188 as a positive Cartesian displacement in the output
189 coordinate system.
191 Returns
192 -------
193 fit : `numpy.ndarray`
194 A ``(1 + Nout, Nin)`` array whose first row holds the
195 per-output constant offsets and whose remaining rows hold
196 the Jacobian (``J[i][j] = ∂out_i/∂in_j``).
198 Raises
199 ------
200 RuntimeError
201 Raised if no linear approximation within ``tol`` exists
202 over the requested box.
204 Notes
205 -----
206 This matches ``astshim.Mapping.linearApprox``. starlink-pyast
207 instead returns ``(success_flag, flat_coeffs)`` with the
208 coefficients in the same row-major-by-output ordering as the
209 astshim flat buffer, so reshaping recovers astshim's layout.
210 """
211 success, coeffs = self._impl.linearapprox(lbnd, ubnd, tol)
212 if not success:
213 raise RuntimeError("Mapping not sufficiently linear")
214 nin = self._impl.Nin
215 nout = self._impl.Nout
216 return np.asarray(coeffs, dtype=float).reshape(1 + nout, nin)
218 class UnitMap(Mapping):
219 def __init__(self, n_coord: int):
220 super().__init__(starlink.Ast.UnitMap(n_coord))
222 _IMPL_TYPE: ClassVar[type[starlink.Ast.UnitMap]] = starlink.Ast.UnitMap
224 class ShiftMap(Mapping):
225 def __init__(self, shift: Iterable[float]):
226 super().__init__(starlink.Ast.ShiftMap(list(shift)))
228 _IMPL_TYPE: ClassVar[type[starlink.Ast.ShiftMap]] = starlink.Ast.ShiftMap
230 class CmpMap(Mapping):
231 def __init__(self, map_a: Mapping, map_b: Mapping, series: bool):
232 super().__init__(starlink.Ast.CmpMap(map_a._impl, map_b._impl, series))
234 _IMPL_TYPE: ClassVar[type[starlink.Ast.CmpMap]] = starlink.Ast.CmpMap
236 class ZoomMap(Mapping):
237 def __init__(self, n_coord: int, zoom: float):
238 super().__init__(starlink.Ast.ZoomMap(n_coord, zoom))
240 _IMPL_TYPE: ClassVar[type[starlink.Ast.ZoomMap]] = starlink.Ast.ZoomMap
242 class PolyMap(Mapping):
243 def __init__(self, coeff_f: Any, coeff_i_or_nout: Any, options: str = ""):
244 # astshim's PolyMap takes ``nout`` as the second positional;
245 # starlink.Ast.PolyMap requires an explicit inverse-coefficient
246 # array. Adapt to both by synthesizing an empty inverse when
247 # an integer ``nout`` is supplied.
248 coeff_f_arr = np.asarray(coeff_f, dtype=float)
249 if isinstance(coeff_i_or_nout, int):
250 nin = coeff_f_arr.shape[1] - 2
251 coeff_i = np.zeros((0, 2 + nin), dtype=float)
252 else:
253 coeff_i = np.asarray(coeff_i_or_nout, dtype=float)
254 super().__init__(starlink.Ast.PolyMap(coeff_f_arr, coeff_i, options))
256 _IMPL_TYPE: ClassVar[type[starlink.Ast.PolyMap]] = starlink.Ast.PolyMap
258 class Frame(Mapping):
259 def __init__(self, n_axes: int, options: str = ""):
260 super().__init__(starlink.Ast.Frame(n_axes, options))
262 _IMPL_TYPE: ClassVar[type[starlink.Ast.Frame]] = starlink.Ast.Frame
264 @property
265 def ident(self) -> str:
266 return self._impl.Ident
268 @property
269 def domain(self) -> str:
270 return self._impl.Domain
272 @domain.setter
273 def domain(self, value: str) -> None:
274 self._impl.Domain = value
276 def setUnit(self, axis: int, unit: str) -> None:
277 setattr(self._impl, f"Unit_{axis}", unit)
279 def getUnit(self, axis: int) -> str:
280 return getattr(self._impl, f"Unit_{axis}")
282 def setLabel(self, axis: int, label: str) -> None:
283 setattr(self._impl, f"Label_{axis}", label)
285 def getBottom(self, axis: int) -> float:
286 return getattr(self._impl, f"Bottom_{axis}")
288 def getTop(self, axis: int) -> float:
289 return getattr(self._impl, f"Top_{axis}")
291 class SkyFrame(Frame):
292 def __init__(self, options: str = ""):
293 Object.__init__(self, starlink.Ast.SkyFrame(options))
295 _IMPL_TYPE: ClassVar[type[starlink.Ast.SkyFrame]] = starlink.Ast.SkyFrame
297 class CmpFrame(Frame):
298 def __init__(self, frame_a: Frame, frame_b: Frame):
299 Object.__init__(self, starlink.Ast.CmpFrame(frame_a._impl, frame_b._impl, ""))
301 _IMPL_TYPE: ClassVar[type[starlink.Ast.CmpFrame]] = starlink.Ast.CmpFrame
303 class FrameSet(Frame):
304 def __init__(self, base_frame: Frame):
305 Object.__init__(self, starlink.Ast.FrameSet(base_frame._impl))
307 BASE: ClassVar[int] = 1
308 _IMPL_TYPE: ClassVar[type[starlink.Ast.FrameSet]] = starlink.Ast.FrameSet
310 @property
311 def nFrame(self) -> int:
312 return self._impl.Nframe
314 @property
315 def base(self) -> int:
316 return self._impl.Base
318 @base.setter
319 def base(self, value: int) -> None:
320 self._impl.Base = value
322 @property
323 def current(self) -> int:
324 return self._impl.Current
326 @current.setter
327 def current(self, value: int) -> None:
328 self._impl.Current = value
330 def addFrame(self, iframe: int, mapping: Mapping, frame: Frame) -> None:
331 self._impl.addframe(iframe, mapping._impl, frame._impl)
333 def getFrame(self, iframe: int, copy: bool = True) -> Frame:
334 result = self._impl.getframe(iframe)
335 if copy:
336 result = result.copy()
337 return Frame._wrap(result)
339 def getMapping(self, iframe1: int | None = None, iframe2: int | None = None) -> Mapping:
340 if iframe1 is None:
341 iframe1 = self.base
342 if iframe2 is None:
343 iframe2 = self.current
344 return Mapping._wrap(self._impl.getmapping(iframe1, iframe2))
346 class FrameDict(FrameSet):
347 def __init__(self, obj: Object):
348 Object.__init__(self, obj._impl)
350 class FitsChan(Object):
351 def __init__(self, stream: StringStream | None = None, options: str = ""):
352 source = stream if stream is not None else None
353 sink = stream if stream is not None else None
354 super().__init__(starlink.Ast.FitsChan(source, sink, options))
356 _IMPL_TYPE: ClassVar[type[starlink.Ast.FitsChan]] = starlink.Ast.FitsChan
358 def read(self) -> Any:
359 return Object._wrap(self._impl.read())
361 def write(self, obj: Any) -> int:
362 return self._impl.write(obj._impl)
364 def setFitsI(self, keyword: str, value: int) -> None:
365 self._impl.setfitsI(keyword, value, "", 1)
367 def __iter__(self) -> Any:
368 return iter(self._impl)
370 class Channel(Object):
371 def __init__(self, stream: StringStream, options: str = ""):
372 super().__init__(starlink.Ast.Channel(None, stream, options))
374 _IMPL_TYPE: ClassVar[type[starlink.Ast.Channel]] = starlink.Ast.Channel
376 def write(self, obj: Object) -> int:
377 return self._impl.write(obj._impl)