Coverage for python / lsst / images / ndf / _model.py: 20%

297 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-27 01:31 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Python intermediate representation for NDF/HDS content.""" 

13 

14from __future__ import annotations 

15 

16__all__ = ( 

17 "HdsExtension", 

18 "HdsPrimitive", 

19 "HdsStructure", 

20 "Ndf", 

21 "NdfArray", 

22 "NdfContainer", 

23 "NdfDocument", 

24 "NdfQuality", 

25 "NdfWcs", 

26) 

27 

28from collections.abc import Iterable, Mapping, Sequence 

29from dataclasses import dataclass, field 

30from typing import Any, Self, cast 

31 

32import h5py 

33import numpy as np 

34 

35from . import _hds 

36 

37 

38def _decode_ascii_attr(value: Any) -> str | None: 

39 """Decode an HDS ASCII attribute value from h5py.""" 

40 if value is None: 

41 return None 

42 if isinstance(value, bytes): 

43 return value.decode("ascii") 

44 if isinstance(value, np.bytes_): 

45 return bytes(value).decode("ascii") 

46 if isinstance(value, str): 

47 return value 

48 return str(value) 

49 

50 

51@dataclass 

52class HdsPrimitive: 

53 """An HDS primitive component. 

54 

55 Parameters 

56 ---------- 

57 data 

58 Numeric/logical array data, or an open HDF5 dataset when this 

59 primitive was read lazily from an existing file. 

60 char_lines 

61 Character data for ``_CHAR*N`` primitives. A scalar string is 

62 represented as a one-element list with ``char_scalar=True``. 

63 char_width 

64 Fixed HDS character width for ``char_lines``. 

65 is_char_scalar 

66 If `True`, write ``char_lines`` as a scalar ``_CHAR*N`` primitive. 

67 Otherwise write them as a one-dimensional character array. 

68 compression_options 

69 Optional h5py compression options for numeric array data. 

70 """ 

71 

72 data: np.ndarray | h5py.Dataset | None = None 

73 char_lines: list[str] | None = None 

74 char_width: int | None = None 

75 is_char_scalar: bool = False 

76 compression_options: dict[str, Any] = field(default_factory=dict) 

77 

78 @classmethod 

79 def array( 

80 cls, 

81 data: np.ndarray | h5py.Dataset, 

82 *, 

83 compression_options: Mapping[str, Any] | None = None, 

84 ) -> Self: 

85 """Create a numeric/logical HDS primitive.""" 

86 return cls( 

87 data=data if isinstance(data, h5py.Dataset) else np.asarray(data), 

88 compression_options=dict(compression_options) if compression_options else {}, 

89 ) 

90 

91 @classmethod 

92 def char_array(cls, lines: Sequence[str], *, width: int = 80) -> Self: 

93 """Create a one-dimensional HDS ``_CHAR*N`` primitive.""" 

94 return cls(char_lines=list(lines), char_width=width) 

95 

96 @classmethod 

97 def char_scalar(cls, text: str, *, width: int | None = None) -> Self: 

98 """Create a scalar HDS ``_CHAR*N`` primitive.""" 

99 encoded = text.encode("ascii") 

100 return cls(char_lines=[text], char_width=max(width or 0, len(encoded), 1), is_char_scalar=True) 

101 

102 @classmethod 

103 def from_hdf5(cls, dataset: h5py.Dataset) -> Self: 

104 """Build an HDS primitive model from an open HDF5 dataset.""" 

105 if dataset.dtype.kind == "S": 

106 if dataset.ndim == 0: 

107 raw = dataset[()] 

108 if isinstance(raw, np.bytes_): 

109 raw = bytes(raw) 

110 assert isinstance(raw, bytes) 

111 return cls( 

112 char_lines=[raw.decode("ascii").rstrip(" ")], 

113 char_width=dataset.dtype.itemsize, 

114 is_char_scalar=True, 

115 ) 

116 return cls( 

117 char_lines=_hds.read_char_array(dataset), 

118 char_width=dataset.dtype.itemsize, 

119 is_char_scalar=False, 

120 ) 

121 return cls(data=dataset) 

122 

123 def read_array(self) -> np.ndarray: 

124 """Return this primitive as a numpy array.""" 

125 if self.data is None: 

126 raise TypeError("Character HDS primitives cannot be read as numeric arrays.") 

127 if isinstance(self.data, h5py.Dataset): 

128 return _hds.read_array(self.data) 

129 return self.data 

130 

131 def read_char_array(self) -> list[str]: 

132 """Return this primitive as stripped ASCII strings.""" 

133 if self.char_lines is not None: 

134 return list(self.char_lines) 

135 if isinstance(self.data, h5py.Dataset) and self.data.dtype.kind == "S": 

136 if self.data.ndim == 0: 

137 raw = self.data[()] 

138 if isinstance(raw, np.bytes_): 

139 raw = bytes(raw) 

140 assert isinstance(raw, bytes) 

141 return [raw.decode("ascii").rstrip(" ")] 

142 return _hds.read_char_array(self.data) 

143 raise TypeError("Numeric HDS primitives cannot be read as character arrays.") 

144 

145 def write_to_hdf5(self, parent: h5py.Group, name: str) -> h5py.Dataset: 

146 """Write this primitive to an HDF5 group.""" 

147 if name in parent: 

148 del parent[name] 

149 if self.char_lines is not None: 

150 width = self.char_width 

151 if width is None: 

152 width = max((len(line.encode("ascii")) for line in self.char_lines), default=1) 

153 if self.is_char_scalar: 

154 if len(self.char_lines) != 1: 

155 raise ValueError("Scalar _CHAR*N primitives require exactly one string.") 

156 line = self.char_lines[0] 

157 encoded = line.encode("ascii") 

158 if len(encoded) > width: 

159 raise ValueError( 

160 f"Scalar _CHAR*N primitive {name!r} is {len(encoded)} bytes, " 

161 f"longer than width={width}." 

162 ) 

163 return parent.create_dataset(name, data=np.bytes_(encoded.ljust(width))) 

164 return _hds.write_char_array(parent, name, self.char_lines, width=width) 

165 data = self.read_array() 

166 return _hds.write_array( 

167 parent, 

168 name, 

169 data, 

170 compression=self.compression_options.get("compression"), 

171 compression_opts=self.compression_options.get("compression_opts"), 

172 ) 

173 

174 

175class HdsStructure: 

176 """An HDS structure component with named child components.""" 

177 

178 def __init__( 

179 self, 

180 hds_type: str, 

181 children: Mapping[str, HdsStructure | HdsPrimitive] | None = None, 

182 ) -> None: 

183 self.hds_type = hds_type 

184 self.children: dict[str, HdsStructure | HdsPrimitive] = dict(children) if children else {} 

185 

186 @classmethod 

187 def from_hdf5(cls, group: h5py.Group) -> HdsStructure: 

188 """Build a structure model from an open HDF5 group.""" 

189 hds_type = _decode_ascii_attr(group.attrs.get(_hds.ATTR_CLASS)) 

190 if hds_type is None: 

191 hds_type = _decode_ascii_attr(group.attrs.get("HDSTYPE")) or "EXT" 

192 structure = _new_structure(hds_type) 

193 for name, child in group.items(): 

194 if isinstance(child, h5py.Group): 

195 structure.children[name] = cls.from_hdf5(child) 

196 elif isinstance(child, h5py.Dataset): 

197 structure.children[name] = HdsPrimitive.from_hdf5(child) 

198 return structure 

199 

200 def __contains__(self, path: str) -> bool: 

201 try: 

202 self.get(path) 

203 except KeyError: 

204 return False 

205 return True 

206 

207 def __getitem__(self, path: str) -> HdsStructure | HdsPrimitive: 

208 return self.get(path) 

209 

210 def items(self) -> Iterable[tuple[str, HdsStructure | HdsPrimitive]]: 

211 """Iterate over direct child components.""" 

212 return self.children.items() 

213 

214 def get(self, path: str) -> HdsStructure | HdsPrimitive: 

215 """Return a child component by relative or absolute path.""" 

216 if path in ("", "/"): 

217 return self 

218 cursor: HdsStructure | HdsPrimitive = self 

219 for part in _split_path(path): 

220 if not isinstance(cursor, HdsStructure): 

221 raise KeyError(path) 

222 cursor = cursor.children[part] 

223 return cursor 

224 

225 def get_structure(self, path: str) -> HdsStructure: 

226 """Return a child structure by relative or absolute path.""" 

227 node = self.get(path) 

228 if not isinstance(node, HdsStructure): 

229 raise KeyError(f"{path!r} is an HDS primitive, not a structure.") 

230 return node 

231 

232 def set(self, path: str, node: HdsStructure | HdsPrimitive) -> None: 

233 """Set a child component by relative or absolute path.""" 

234 parts = _split_path(path) 

235 if not parts: 

236 raise ValueError("Cannot replace an HDS structure with itself.") 

237 parent = self.ensure_structure("/".join(parts[:-1]), "EXT") 

238 parent.children[parts[-1]] = node 

239 

240 def delete(self, path: str) -> None: 

241 """Delete a child component if it exists.""" 

242 parts = _split_path(path) 

243 if not parts: 

244 raise ValueError("Cannot delete an HDS structure root.") 

245 parent = self.get_structure("/".join(parts[:-1])) 

246 parent.children.pop(parts[-1], None) 

247 

248 def ensure_structure(self, path: str, hds_type: str | None = None) -> HdsStructure: 

249 """Return an existing structure or create it and its parents. 

250 

251 In HDS each structure carries a type tag distinct from its name. 

252 Newly-created structures here default to having the part name as 

253 their type (e.g. ``/MORE/LSST/PSF`` → types ``EXT``, ``LSST``, 

254 ``PSF``). The well-known NDF extension container ``MORE`` is the 

255 sole exception and defaults to ``EXT``. Pass ``hds_type`` to 

256 override the type on the leaf component only; existing structures 

257 retain their type unless overridden. 

258 """ 

259 if path in ("", "/"): 

260 return self 

261 cursor: HdsStructure = self 

262 parts = _split_path(path) 

263 for index, part in enumerate(parts): 

264 existing = cursor.children.get(part) 

265 is_leaf = index == len(parts) - 1 

266 if existing is None: 

267 if is_leaf and hds_type is not None: 

268 child_type = hds_type 

269 else: 

270 child_type = _default_hds_type_for_name(part) 

271 existing = _new_structure(child_type) 

272 cursor.children[part] = existing 

273 if not isinstance(existing, HdsStructure): 

274 raise KeyError(f"{part!r} already exists as an HDS primitive.") 

275 cursor = existing 

276 if hds_type is not None and cursor.hds_type != hds_type: 

277 cursor.hds_type = hds_type 

278 return cursor 

279 

280 def write_to_hdf5(self, parent: h5py.Group, name: str | None = None) -> h5py.Group: 

281 """Write this structure to an HDF5 group.""" 

282 if name is None: 

283 group = parent 

284 _clear_hdf5_group(group) 

285 _hds.set_ascii_attr(group, _hds.ATTR_CLASS, self.hds_type) 

286 else: 

287 if name in parent: 

288 del parent[name] 

289 group = _hds.create_structure(parent, name, self.hds_type) 

290 for child_name, child in self.children.items(): 

291 if isinstance(child, HdsStructure): 

292 child.write_to_hdf5(group, child_name) 

293 else: 

294 child.write_to_hdf5(group, child_name) 

295 return group 

296 

297 

298class HdsExtension(HdsStructure): 

299 """A general-purpose HDS extension structure.""" 

300 

301 def __init__(self, children: Mapping[str, HdsStructure | HdsPrimitive] | None = None) -> None: 

302 super().__init__("EXT", children) 

303 

304 

305class NdfContainer(HdsStructure): 

306 """A top-level HDS container for multiple NDFs and shared metadata.""" 

307 

308 def __init__(self, children: Mapping[str, HdsStructure | HdsPrimitive] | None = None) -> None: 

309 super().__init__("EXT", children) 

310 

311 def ensure_ndf(self, path: str) -> Ndf: 

312 """Return or create a child NDF at ``path``.""" 

313 structure = self.ensure_structure(path, "NDF") 

314 if not isinstance(structure, Ndf): 

315 structure.__class__ = Ndf 

316 return cast(Ndf, structure) 

317 

318 

319@dataclass 

320class NdfArray: 

321 """An NDF ARRAY component.""" 

322 

323 data: np.ndarray | h5py.Dataset 

324 origin: np.ndarray | Sequence[int] | None = None 

325 bad_pixel: bool | None = None 

326 compression_options: dict[str, Any] = field(default_factory=dict) 

327 

328 def to_hds_structure(self) -> HdsStructure: 

329 """Convert this array component to an HDS ``ARRAY`` structure.""" 

330 structure = HdsStructure("ARRAY") 

331 structure.children["DATA"] = HdsPrimitive.array( 

332 self.data, 

333 compression_options=self.compression_options, 

334 ) 

335 if self.origin is not None: 

336 structure.children["ORIGIN"] = HdsPrimitive.array(np.asarray(self.origin)) 

337 if self.bad_pixel is not None: 

338 structure.children["BAD_PIXEL"] = HdsPrimitive.array(np.array(self.bad_pixel, dtype=np.bool_)) 

339 return structure 

340 

341 @classmethod 

342 def from_hds_structure(cls, structure: HdsStructure) -> Self: 

343 """Build an `NdfArray` facade from an HDS ``ARRAY`` structure.""" 

344 data = structure.children["DATA"] 

345 if not isinstance(data, HdsPrimitive): 

346 raise TypeError("ARRAY.DATA must be an HDS primitive.") 

347 origin: np.ndarray | None = None 

348 if (origin_node := structure.children.get("ORIGIN")) is not None: 

349 if not isinstance(origin_node, HdsPrimitive): 

350 raise TypeError("ARRAY.ORIGIN must be an HDS primitive.") 

351 origin = origin_node.read_array() 

352 bad_pixel: bool | None = None 

353 if (bad_pixel_node := structure.children.get("BAD_PIXEL")) is not None: 

354 if not isinstance(bad_pixel_node, HdsPrimitive): 

355 raise TypeError("ARRAY.BAD_PIXEL must be an HDS primitive.") 

356 bad_pixel = bool(np.asarray(bad_pixel_node.read_array()).reshape(-1)[0]) 

357 array_data = data.data if data.data is not None else data.read_array() 

358 return cls(data=array_data, origin=origin, bad_pixel=bad_pixel) 

359 

360 

361@dataclass 

362class NdfQuality: 

363 """An NDF QUALITY component.""" 

364 

365 quality: NdfArray 

366 badbits: int = 255 

367 

368 def to_hds_structure(self) -> HdsStructure: 

369 """Convert this quality component to an HDS ``QUALITY`` structure.""" 

370 structure = HdsStructure("QUALITY") 

371 structure.children["BADBITS"] = HdsPrimitive.array(np.array(self.badbits, dtype=np.uint8)) 

372 structure.children["QUALITY"] = self.quality.to_hds_structure() 

373 return structure 

374 

375 

376@dataclass 

377class NdfWcs: 

378 """An NDF WCS component represented by encoded AST channel records.""" 

379 

380 lines: list[str] 

381 width: int = _hds.NDF_AST_DATA_WIDTH 

382 

383 def to_hds_structure(self) -> HdsStructure: 

384 """Convert this WCS component to an HDS ``WCS`` structure.""" 

385 structure = HdsStructure("WCS") 

386 structure.children["DATA"] = HdsPrimitive.char_array(self.lines, width=self.width) 

387 return structure 

388 

389 

390class Ndf(HdsStructure): 

391 """An NDF structure with convenience accessors for standard components.""" 

392 

393 def __init__(self, children: Mapping[str, HdsStructure | HdsPrimitive] | None = None) -> None: 

394 super().__init__("NDF", children) 

395 

396 def set_array_component( 

397 self, 

398 name: str, 

399 data: np.ndarray | h5py.Dataset, 

400 *, 

401 origin: Sequence[int] | np.ndarray | None = None, 

402 bad_pixel: bool | None = None, 

403 compression_options: Mapping[str, Any] | None = None, 

404 ) -> None: 

405 """Set an NDF array-like component such as ``DATA_ARRAY``.""" 

406 self.children[name] = NdfArray( 

407 data, 

408 origin=origin, 

409 bad_pixel=bad_pixel, 

410 compression_options=dict(compression_options) if compression_options else {}, 

411 ).to_hds_structure() 

412 

413 def set_quality(self, quality: NdfQuality) -> None: 

414 """Set the NDF ``QUALITY`` component.""" 

415 self.children["QUALITY"] = quality.to_hds_structure() 

416 

417 def set_wcs(self, wcs: NdfWcs) -> None: 

418 """Set the NDF ``WCS`` component.""" 

419 self.children["WCS"] = wcs.to_hds_structure() 

420 

421 def set_units(self, units: str | None) -> None: 

422 """Set or remove the NDF ``UNITS`` component.""" 

423 if units is None: 

424 self.children.pop("UNITS", None) 

425 else: 

426 self.children["UNITS"] = HdsPrimitive.char_scalar(units) 

427 

428 def get_units(self) -> str | None: 

429 """Return the NDF ``UNITS`` component, if present.""" 

430 node = self.children.get("UNITS") 

431 if node is None: 

432 return None 

433 if not isinstance(node, HdsPrimitive): 

434 raise TypeError("NDF.UNITS must be an HDS primitive.") 

435 lines = node.read_char_array() 

436 return lines[0] if lines else "" 

437 

438 def ensure_lsst_extension(self, *, base_path: str = "MORE/LSST") -> HdsStructure: 

439 """Return or create the LSST extension structure for this NDF.""" 

440 return self.ensure_structure(base_path, "EXT") 

441 

442 

443@dataclass 

444class NdfDocument: 

445 """A complete HDS root object containing one NDF or an NDF container.""" 

446 

447 root: Ndf | NdfContainer = field(default_factory=Ndf) 

448 root_name: str | None = None 

449 

450 @classmethod 

451 def from_hdf5(cls, file: h5py.File) -> Self: 

452 """Read an NDF document model from an open HDF5 file.""" 

453 root = HdsStructure.from_hdf5(file["/"]) 

454 typed_root: Ndf | NdfContainer 

455 if isinstance(root, Ndf | NdfContainer): 

456 typed_root = root 

457 elif root.hds_type == "NDF": 

458 root.__class__ = Ndf 

459 typed_root = cast(Ndf, root) 

460 else: 

461 root.__class__ = NdfContainer 

462 typed_root = cast(NdfContainer, root) 

463 return cls(root=typed_root, root_name=_decode_ascii_attr(file["/"].attrs.get(_hds.ATTR_ROOT_NAME))) 

464 

465 def write_to_hdf5(self, file: h5py.File) -> None: 

466 """Write this document to an open HDF5 file.""" 

467 self.root.write_to_hdf5(file["/"]) 

468 if self.root_name is not None: 

469 _hds.set_root_name(file, self.root_name, self.root.hds_type) 

470 

471 def ensure_ndf(self, path: str = "/") -> Ndf: 

472 """Return or create an NDF at the requested absolute path.""" 

473 if path in ("", "/"): 

474 if not isinstance(self.root, Ndf): 

475 raise TypeError("The document root is an NDF container, not an NDF.") 

476 return self.root 

477 if isinstance(self.root, NdfContainer): 

478 return self.root.ensure_ndf(path) 

479 structure = self.root.ensure_structure(path, "NDF") 

480 if not isinstance(structure, Ndf): 

481 structure.__class__ = Ndf 

482 return cast(Ndf, structure) 

483 

484 def get(self, path: str) -> HdsStructure | HdsPrimitive: 

485 """Return a component by absolute path.""" 

486 return self.root.get(path) 

487 

488 

489def _new_structure(hds_type: str) -> HdsStructure: 

490 """Create a typed HDS structure model for an HDS type.""" 

491 if hds_type == "NDF": 

492 return Ndf() 

493 if hds_type == "EXT": 

494 return HdsExtension() 

495 return HdsStructure(hds_type) 

496 

497 

498def _default_hds_type_for_name(name: str) -> str: 

499 """Return the HDS type to use for a structure with the given name. 

500 

501 Mirrors the convention in real NDF files where each named structure 

502 carries a type tag describing what it is. ``MORE`` is the standard 

503 NDF extension container and is always ``EXT``; every other name is 

504 used verbatim as its own type. 

505 """ 

506 return "EXT" if name == "MORE" else name 

507 

508 

509def _split_path(path: str) -> list[str]: 

510 """Split an HDS/HDF5 path into component names.""" 

511 return [part for part in path.strip("/").split("/") if part] 

512 

513 

514def _clear_hdf5_group(group: h5py.Group) -> None: 

515 """Remove children and HDS root attributes before rewriting a group.""" 

516 for name in list(group.keys()): 

517 del group[name] 

518 for name in (_hds.ATTR_CLASS, _hds.ATTR_ROOT_NAME, _hds.ATTR_STRUCTURE_DIMS, "HDSTYPE"): 

519 if name in group.attrs: 

520 del group.attrs[name]