Coverage for python/lsst/images/_transforms/_camera_frame_set.py: 25%

103 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 08:40 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("CameraFrameSet", "CameraFrameSetSerializationModel") 

15 

16from typing import Any 

17 

18import astropy.units as u 

19import pydantic 

20 

21from .._geom import Bounds, Box 

22from ..serialization import ArchiveTree, InputArchive, InvalidParameterError, OutputArchive 

23from . import _ast as astshim 

24from . import _frames # use this import style to facilitate pattern matching 

25from ._frame_set import FrameLookupError, FrameSet 

26from ._transform import Transform 

27 

28 

29class CameraFrameSet(FrameSet): 

30 """A `FrameSet` that manages the coordinate systems of a camera. 

31 

32 The `CameraFrameSet` class constructor is considered a private 

33 implementation detail. At present, instances can only be obtained by 

34 loading them from storage (via 

35 `~CameraFrameSetSerializationModel.deserialize`) or converting a legacy 

36 `lsst.afw.cameraGeom` object (`from_legacy`). 

37 """ 

38 

39 # This constructor is kept private while we support both the astshim 

40 # and starlink-pyast AST wrappers. For now: 

41 # 'instrument': the short (butler dimension) name. 

42 # 'ast': an astshim.FrameSet as returned by 

43 # lsst.afw.cameraGeom.TransformMap.makeFrameSet. 

44 # Should have frames with Ident values FOCAL_PLANE, FIELD_ANGLE 

45 # and DETECTOR_${ID}, and the focal plane frame must know its 

46 # units. 

47 def __init__(self, instrument: str, ast: astshim.FrameSet): 

48 self._ast = ast 

49 self._focal_plane_frame_id: int = 0 

50 self._field_angle_frame_id: int = 0 

51 self._detector_frame_ids: dict[int, int] = {} 

52 for frame_id in range(1, self._ast.nFrame + 1): 

53 ast_frame = self._ast.getFrame(frame_id, copy=False) 

54 match ast_frame.ident: 

55 case "FOCAL_PLANE": 

56 self._focal_plane_frame_id = frame_id 

57 case "FIELD_ANGLE": 

58 self._field_angle_frame_id = frame_id 

59 case str(s) if s.startswith("DETECTOR_"): 

60 detector_id = int(s.removeprefix("DETECTOR_")) 

61 self._detector_frame_ids[detector_id] = frame_id 

62 case _: 

63 raise ValueError(f"Unexpected frame in camera AST FrameSet:\n{ast_frame.show()}.") 

64 if self._focal_plane_frame_id == 0: 

65 raise ValueError("No FOCAL_PLANE frame in camera AST FrameSet.") 

66 self._focal_plane_frame = _frames.FocalPlaneFrame( 

67 instrument=instrument, 

68 unit=u.Unit(self._ast.getFrame(self._focal_plane_frame_id, copy=False).getUnit(1)), 

69 ) 

70 self._field_angle_frame = _frames.FieldAngleFrame(instrument=instrument) 

71 if self._field_angle_frame_id == 0: 

72 raise ValueError("No FIELD_ANGLE frame in camera AST FrameSet.") 

73 

74 def __eq__(self, other: object) -> bool: 

75 if type(other) is not CameraFrameSet: 

76 return NotImplemented 

77 # Every cached attribute on this class is derived from ``_ast`` and 

78 # the instrument name. Compare the *simplified* AST serialisations 

79 # explicitly rather than relying on ``Object.__eq__``: what we care 

80 # about is that the two frame sets describe the same transforms, and 

81 # simplifying first makes the check about behaviour rather than the 

82 # particular intermediate representation. 

83 return ( 

84 self.instrument == other.instrument 

85 and self._ast.simplified().show() == other._ast.simplified().show() 

86 ) 

87 

88 __hash__ = None # type: ignore[assignment] 

89 

90 @property 

91 def instrument(self) -> str: 

92 """Name of the instrument (`str`).""" 

93 return self._focal_plane_frame.instrument 

94 

95 def focal_plane(self, visit: int | None = None) -> _frames.FocalPlaneFrame: 

96 """Return a focal plane frame for this instrument. 

97 

98 Parameters 

99 ---------- 

100 visit 

101 ID for the visit this frame will correspond to. This only needs 

102 to be provided in contexts where camera frames will be related to 

103 the sky via a `Projection`. 

104 """ 

105 if visit is None: 

106 return self._focal_plane_frame 

107 else: 

108 return self._focal_plane_frame.model_copy(update={"visit": visit}) 

109 

110 def field_angle(self, visit: int | None = None) -> _frames.FieldAngleFrame: 

111 """Return a field angle frame for this instrument. 

112 

113 Parameters 

114 ---------- 

115 visit 

116 ID for the visit this frame will correspond to. This only needs 

117 to be provided in contexts where camera frames will be related to 

118 the sky via a `Projection`. 

119 """ 

120 if visit is None: 

121 return self._field_angle_frame 

122 else: 

123 return self._field_angle_frame.model_copy(update={"visit": visit}) 

124 

125 def detector(self, detector: int, *, visit: int | None = None) -> _frames.DetectorFrame: 

126 """Return a detector pixel-coordinate frame for this instrument. 

127 

128 Parameters 

129 ---------- 

130 detector 

131 ID of the detector. 

132 visit 

133 ID for the visit this frame will correspond to. This only needs 

134 to be provided in contexts where camera frames will be related to 

135 the sky via a `Projection`. 

136 """ 

137 try: 

138 frame_id = self._detector_frame_ids[detector] 

139 except KeyError: 

140 raise FrameLookupError( 

141 f"No frame for detector {detector!r} in camera for {self.instrument!r}." 

142 ) from None 

143 ast_frame = self._ast.getFrame(frame_id, copy=False) 

144 bbox = Box.factory[ 

145 int(ast_frame.getBottom(2)) : int(ast_frame.getTop(2)), 

146 int(ast_frame.getBottom(1)) : int(ast_frame.getTop(1)), 

147 ] 

148 return _frames.DetectorFrame(instrument=self.instrument, detector=detector, visit=visit, bbox=bbox) 

149 

150 def __contains__(self, frame: _frames.Frame) -> bool: 

151 try: 

152 self._parse_frame_arg(frame) 

153 return True 

154 except FrameLookupError: 

155 return False 

156 

157 def __getitem__[I: _frames.Frame, O: _frames.Frame](self, key: tuple[I, O]) -> Transform[I, O]: 

158 in_frame, out_frame = key 

159 in_frame_id, in_bounds = self._parse_frame_arg(in_frame) 

160 out_frame_id, out_bounds = self._parse_frame_arg(out_frame) 

161 return Transform( 

162 in_frame, 

163 out_frame, 

164 self._ast.getMapping(in_frame_id, out_frame_id), 

165 in_bounds=in_bounds, 

166 out_bounds=out_bounds, 

167 ) 

168 

169 def _parse_frame_arg(self, frame: _frames.Frame) -> tuple[int, Bounds | None]: 

170 bounds: Bounds | None = None 

171 match frame: 

172 case _frames.DetectorFrame(instrument=self.instrument, detector=detector_id): 

173 try: 

174 frame_id = self._detector_frame_ids[detector_id] 

175 except KeyError: 

176 raise FrameLookupError( 

177 f"No frame for detector {detector_id!r} in camera for {self.instrument!r}." 

178 ) from None 

179 bounds = frame.bbox 

180 case _frames.FocalPlaneFrame(instrument=self.instrument): 

181 frame_id = self._focal_plane_frame_id 

182 case _frames.FieldAngleFrame(instrument=self.instrument): 

183 frame_id = self._field_angle_frame_id 

184 case _: 

185 raise FrameLookupError(f"Invalid frame for camera {self.instrument}: {frame!r}.") 

186 return frame_id, bounds 

187 

188 def serialize(self, archive: OutputArchive[Any]) -> CameraFrameSetSerializationModel: 

189 """Serialize the frame set to an archive. 

190 

191 Parameters 

192 ---------- 

193 archive 

194 Archive to serialize to. 

195 

196 Returns 

197 ------- 

198 `CameraFrameSetSerializationModel` 

199 Serialized form of the frame set. 

200 """ 

201 return CameraFrameSetSerializationModel(instrument=self.instrument, ast=self._ast.show()) 

202 

203 @staticmethod 

204 def _get_archive_tree_type( 

205 pointer_type: type[pydantic.BaseModel], 

206 ) -> type[CameraFrameSetSerializationModel]: 

207 """Return the serialization model type for this object for an archive 

208 type that uses the given pointer type. 

209 """ 

210 return CameraFrameSetSerializationModel 

211 

212 @classmethod 

213 def from_legacy(cls, camera: Any) -> CameraFrameSet: 

214 """Construct a transform from a legacy `lsst.afw.cameraGeom.Camera`. 

215 

216 Parameters 

217 ---------- 

218 camera 

219 An `lsst.afw.cameraGeom.Camera` instance to convert. 

220 """ 

221 transform_map = camera.getTransformMap() 

222 ast_frame_set = transform_map.makeFrameSet(list(camera)) 

223 return CameraFrameSet("HSC", ast_frame_set) 

224 

225 

226class CameraFrameSetSerializationModel(ArchiveTree): 

227 """Serialization model for `CameraFrameSet`.""" 

228 

229 instrument: str = pydantic.Field(description="Name of the instrument.") 

230 ast: str = pydantic.Field( 

231 description="A serialized Starlink AST FrameSet, using the AST native encoding." 

232 ) 

233 

234 def deserialize(self, archive: InputArchive[Any], **kwargs: Any) -> CameraFrameSet: 

235 """Deserialize a frame set from an archive. 

236 

237 Parameters 

238 ---------- 

239 archive 

240 Archive to read from. 

241 **kwargs 

242 Unsupported keyword arguments are accepted only to provide better 

243 error messages (raising `serialization.InvalidParameterError`). 

244 """ 

245 if kwargs: 

246 raise InvalidParameterError(f"Unrecognized parameters for CameraFrameSet: {set(kwargs.keys())}.") 

247 return CameraFrameSet(self.instrument, astshim.FrameSet.fromString(self.ast))