Coverage for python/lsst/images/_transforms/_ast.py: 4%

198 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 08:09 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14from collections.abc import Iterable 

15from typing import TYPE_CHECKING, Any, ClassVar, Self 

16 

17import numpy as np 

18 

19__all__ = ( 

20 "USING_STARLINK_PYAST", 

21 "Channel", 

22 "CmpFrame", 

23 "CmpMap", 

24 "FitsChan", 

25 "Frame", 

26 "FrameSet", 

27 "Mapping", 

28 "PolyMap", 

29 "ShiftMap", 

30 "SkyFrame", 

31 "StringStream", 

32 "UnitMap", 

33 "ZoomMap", 

34) 

35 

36if TYPE_CHECKING: 

37 import starlink.Ast 

38 

39 USING_STARLINK_PYAST = True 

40else: 

41 try: 

42 from astshim import ( 

43 Channel, 

44 CmpFrame, 

45 CmpMap, 

46 FitsChan, 

47 Frame, 

48 FrameDict, 

49 FrameSet, 

50 Mapping, 

51 Object, 

52 PolyMap, 

53 ShiftMap, 

54 SkyFrame, 

55 StringStream, 

56 UnitMap, 

57 ZoomMap, 

58 ) 

59 except ImportError: 

60 import starlink.Ast 

61 

62 USING_STARLINK_PYAST = True 

63 else: 

64 USING_STARLINK_PYAST = False 

65 

66 

67if USING_STARLINK_PYAST: 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was never true

68 

69 class StringStream: 

70 """A bridge object that mimics both astshim.StringStream and the 

71 interface expected by starlink.Ast.Channel. 

72 

73 Notes 

74 ----- 

75 This object can be constructed like an `astshim.StringStream`, but its 

76 `astsink` and `astsource` methods actually correspond to the 

77 `starlink.Ast` interface, so we can use `starlink.Ast` objects to 

78 implement the `FitsChan` classes in this module 

79 """ 

80 

81 def __init__(self, text: str = ""): 

82 if "\n" in text or "\r" in text: 

83 self._lines = text.splitlines() 

84 elif text and len(text) % 80 == 0: 

85 # Astropy WCS.to_header_string() yields a single concatenated 

86 # FITS header block; FitsChan expects one card per source line. 

87 self._lines = [text[n : n + 80] for n in range(0, len(text), 80)] 

88 else: 

89 self._lines = text.splitlines() 

90 

91 def astsource(self) -> str | None: 

92 if not self._lines: 

93 return None 

94 return self._lines.pop(0) 

95 

96 def astsink(self, line: str) -> None: 

97 self._lines.append(line) 

98 

99 def to_string(self) -> str: 

100 if not self._lines: 

101 return "" 

102 return "\n".join(self._lines) + "\n" 

103 

104 def getSinkData(self) -> str: 

105 return self.to_string() 

106 

107 class Object: 

108 """Bridge class that exposes the `astshim.Object` interface while 

109 being backed by an `astshim.Ast.Object`. 

110 """ 

111 

112 def __init__(self, impl: starlink.Ast.Object): 

113 if not isinstance(impl, self._IMPL_TYPE): 

114 raise TypeError(f"{type(self).__name__} cannot wrap {type(impl).__name__}.") 

115 self._impl = impl 

116 

117 _IMPL_TYPE: ClassVar[type[starlink.Ast.Object]] = starlink.Ast.Object 

118 

119 def show(self, showComments: bool = True) -> str: 

120 sink = StringStream() 

121 options = "" if showComments else "Comment=0" 

122 chan = starlink.Ast.Channel(None, sink, options=options) 

123 chan.write(self._impl) 

124 return sink.to_string() 

125 

126 def __eq__(self, other: object) -> bool: 

127 if not isinstance(other, Object) or type(self) is not type(other): 

128 return NotImplemented 

129 # ``astshim.Object`` ships a structural ``__eq__``; mirror that on 

130 # the starlink-pyast wrapper by comparing the AST channel 

131 # serialisation, which is the canonical content-faithful 

132 # representation for AST objects. Strip comments so cosmetic 

133 # changes between equivalent objects do not break equality. 

134 return self.show(showComments=False) == other.show(showComments=False) 

135 

136 __hash__ = None # type: ignore[assignment] 

137 

138 @classmethod 

139 def fromString(cls, serialized: str) -> Self: 

140 source = StringStream(serialized) 

141 channel = starlink.Ast.Channel(source) 

142 return cls._wrap(channel.read()) 

143 

144 @classmethod 

145 def _wrap(cls, impl: starlink.Ast.Object) -> Self: 

146 subcls = cls._most_derived_type(impl) 

147 result = object.__new__(subcls) 

148 Object.__init__(result, impl) 

149 return result 

150 

151 @classmethod 

152 def _most_derived_type(cls, impl: starlink.Ast.Object) -> type[Self]: 

153 for subcls in cls.__subclasses__(): 

154 if isinstance(impl, subcls._IMPL_TYPE): 

155 return subcls._most_derived_type(impl) 

156 return cls 

157 

158 class Mapping(Object): 

159 _IMPL_TYPE: ClassVar[type[starlink.Ast.Mapping]] = starlink.Ast.Mapping 

160 

161 def simplified(self) -> Mapping: 

162 return Mapping._wrap(self._impl.simplify()) 

163 

164 def applyForward(self, xy: Any) -> Any: 

165 return self._impl.tran(xy, True) 

166 

167 def applyInverse(self, xy: Any) -> Any: 

168 return self._impl.tran(xy, False) 

169 

170 def then(self, other: Mapping) -> Mapping: 

171 return Mapping._wrap(starlink.Ast.CmpMap(self._impl, other._impl, True)) 

172 

173 def inverted(self) -> Mapping: 

174 copy = self._impl.copy() 

175 copy.invert() 

176 return Mapping._wrap(copy) 

177 

178 def linearApprox(self, lbnd: Any, ubnd: Any, tol: float) -> np.ndarray: 

179 """Best linear approximation to this mapping over a hyper-box. 

180 

181 Parameters 

182 ---------- 

183 lbnd, ubnd 

184 Per-axis lower / upper input-coordinate bounds of the 

185 box over which the approximation is required. 

186 tol 

187 Maximum permitted deviation from linearity, expressed 

188 as a positive Cartesian displacement in the output 

189 coordinate system. 

190 

191 Returns 

192 ------- 

193 fit : `numpy.ndarray` 

194 A ``(1 + Nout, Nin)`` array whose first row holds the 

195 per-output constant offsets and whose remaining rows hold 

196 the Jacobian (``J[i][j] = ∂out_i/∂in_j``). 

197 

198 Raises 

199 ------ 

200 RuntimeError 

201 Raised if no linear approximation within ``tol`` exists 

202 over the requested box. 

203 

204 Notes 

205 ----- 

206 This matches ``astshim.Mapping.linearApprox``. starlink-pyast 

207 instead returns ``(success_flag, flat_coeffs)`` with the 

208 coefficients in the same row-major-by-output ordering as the 

209 astshim flat buffer, so reshaping recovers astshim's layout. 

210 """ 

211 success, coeffs = self._impl.linearapprox(lbnd, ubnd, tol) 

212 if not success: 

213 raise RuntimeError("Mapping not sufficiently linear") 

214 nin = self._impl.Nin 

215 nout = self._impl.Nout 

216 return np.asarray(coeffs, dtype=float).reshape(1 + nout, nin) 

217 

218 class UnitMap(Mapping): 

219 def __init__(self, n_coord: int): 

220 super().__init__(starlink.Ast.UnitMap(n_coord)) 

221 

222 _IMPL_TYPE: ClassVar[type[starlink.Ast.UnitMap]] = starlink.Ast.UnitMap 

223 

224 class ShiftMap(Mapping): 

225 def __init__(self, shift: Iterable[float]): 

226 super().__init__(starlink.Ast.ShiftMap(list(shift))) 

227 

228 _IMPL_TYPE: ClassVar[type[starlink.Ast.ShiftMap]] = starlink.Ast.ShiftMap 

229 

230 class CmpMap(Mapping): 

231 def __init__(self, map_a: Mapping, map_b: Mapping, series: bool): 

232 super().__init__(starlink.Ast.CmpMap(map_a._impl, map_b._impl, series)) 

233 

234 _IMPL_TYPE: ClassVar[type[starlink.Ast.CmpMap]] = starlink.Ast.CmpMap 

235 

236 class ZoomMap(Mapping): 

237 def __init__(self, n_coord: int, zoom: float): 

238 super().__init__(starlink.Ast.ZoomMap(n_coord, zoom)) 

239 

240 _IMPL_TYPE: ClassVar[type[starlink.Ast.ZoomMap]] = starlink.Ast.ZoomMap 

241 

242 class PolyMap(Mapping): 

243 def __init__(self, coeff_f: Any, coeff_i_or_nout: Any, options: str = ""): 

244 # astshim's PolyMap takes ``nout`` as the second positional; 

245 # starlink.Ast.PolyMap requires an explicit inverse-coefficient 

246 # array. Adapt to both by synthesizing an empty inverse when 

247 # an integer ``nout`` is supplied. 

248 coeff_f_arr = np.asarray(coeff_f, dtype=float) 

249 if isinstance(coeff_i_or_nout, int): 

250 nin = coeff_f_arr.shape[1] - 2 

251 coeff_i = np.zeros((0, 2 + nin), dtype=float) 

252 else: 

253 coeff_i = np.asarray(coeff_i_or_nout, dtype=float) 

254 super().__init__(starlink.Ast.PolyMap(coeff_f_arr, coeff_i, options)) 

255 

256 _IMPL_TYPE: ClassVar[type[starlink.Ast.PolyMap]] = starlink.Ast.PolyMap 

257 

258 class Frame(Mapping): 

259 def __init__(self, n_axes: int, options: str = ""): 

260 super().__init__(starlink.Ast.Frame(n_axes, options)) 

261 

262 _IMPL_TYPE: ClassVar[type[starlink.Ast.Frame]] = starlink.Ast.Frame 

263 

264 @property 

265 def ident(self) -> str: 

266 return self._impl.Ident 

267 

268 @property 

269 def domain(self) -> str: 

270 return self._impl.Domain 

271 

272 @domain.setter 

273 def domain(self, value: str) -> None: 

274 self._impl.Domain = value 

275 

276 def setUnit(self, axis: int, unit: str) -> None: 

277 setattr(self._impl, f"Unit_{axis}", unit) 

278 

279 def getUnit(self, axis: int) -> str: 

280 return getattr(self._impl, f"Unit_{axis}") 

281 

282 def setLabel(self, axis: int, label: str) -> None: 

283 setattr(self._impl, f"Label_{axis}", label) 

284 

285 def getBottom(self, axis: int) -> float: 

286 return getattr(self._impl, f"Bottom_{axis}") 

287 

288 def getTop(self, axis: int) -> float: 

289 return getattr(self._impl, f"Top_{axis}") 

290 

291 class SkyFrame(Frame): 

292 def __init__(self, options: str = ""): 

293 Object.__init__(self, starlink.Ast.SkyFrame(options)) 

294 

295 _IMPL_TYPE: ClassVar[type[starlink.Ast.SkyFrame]] = starlink.Ast.SkyFrame 

296 

297 class CmpFrame(Frame): 

298 def __init__(self, frame_a: Frame, frame_b: Frame): 

299 Object.__init__(self, starlink.Ast.CmpFrame(frame_a._impl, frame_b._impl, "")) 

300 

301 _IMPL_TYPE: ClassVar[type[starlink.Ast.CmpFrame]] = starlink.Ast.CmpFrame 

302 

303 class FrameSet(Frame): 

304 def __init__(self, base_frame: Frame): 

305 Object.__init__(self, starlink.Ast.FrameSet(base_frame._impl)) 

306 

307 BASE: ClassVar[int] = 1 

308 _IMPL_TYPE: ClassVar[type[starlink.Ast.FrameSet]] = starlink.Ast.FrameSet 

309 

310 @property 

311 def nFrame(self) -> int: 

312 return self._impl.Nframe 

313 

314 @property 

315 def base(self) -> int: 

316 return self._impl.Base 

317 

318 @base.setter 

319 def base(self, value: int) -> None: 

320 self._impl.Base = value 

321 

322 @property 

323 def current(self) -> int: 

324 return self._impl.Current 

325 

326 @current.setter 

327 def current(self, value: int) -> None: 

328 self._impl.Current = value 

329 

330 def addFrame(self, iframe: int, mapping: Mapping, frame: Frame) -> None: 

331 self._impl.addframe(iframe, mapping._impl, frame._impl) 

332 

333 def getFrame(self, iframe: int, copy: bool = True) -> Frame: 

334 result = self._impl.getframe(iframe) 

335 if copy: 

336 result = result.copy() 

337 return Frame._wrap(result) 

338 

339 def getMapping(self, iframe1: int | None = None, iframe2: int | None = None) -> Mapping: 

340 if iframe1 is None: 

341 iframe1 = self.base 

342 if iframe2 is None: 

343 iframe2 = self.current 

344 return Mapping._wrap(self._impl.getmapping(iframe1, iframe2)) 

345 

346 class FrameDict(FrameSet): 

347 def __init__(self, obj: Object): 

348 Object.__init__(self, obj._impl) 

349 

350 class FitsChan(Object): 

351 def __init__(self, stream: StringStream | None = None, options: str = ""): 

352 source = stream if stream is not None else None 

353 sink = stream if stream is not None else None 

354 super().__init__(starlink.Ast.FitsChan(source, sink, options)) 

355 

356 _IMPL_TYPE: ClassVar[type[starlink.Ast.FitsChan]] = starlink.Ast.FitsChan 

357 

358 def read(self) -> Any: 

359 return Object._wrap(self._impl.read()) 

360 

361 def write(self, obj: Any) -> int: 

362 return self._impl.write(obj._impl) 

363 

364 def setFitsI(self, keyword: str, value: int) -> None: 

365 self._impl.setfitsI(keyword, value, "", 1) 

366 

367 def __iter__(self) -> Any: 

368 return iter(self._impl) 

369 

370 class Channel(Object): 

371 def __init__(self, stream: StringStream, options: str = ""): 

372 super().__init__(starlink.Ast.Channel(None, stream, options)) 

373 

374 _IMPL_TYPE: ClassVar[type[starlink.Ast.Channel]] = starlink.Ast.Channel 

375 

376 def write(self, obj: Object) -> int: 

377 return self._impl.write(obj._impl)