Coverage for python/lsst/images/fits/_output_archive.py: 21%

215 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 01:09 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("FitsOutputArchive", "write") 

15 

16import dataclasses 

17from collections import Counter 

18from collections.abc import Callable, Hashable, Iterator, Mapping 

19from contextlib import contextmanager 

20from typing import Any, Self 

21 

22import astropy.io.fits 

23import astropy.table 

24import numpy as np 

25import pydantic 

26 

27from .._transforms import FrameSet 

28from ..serialization import ( 

29 ArchiveTree, 

30 ArrayReferenceModel, 

31 ButlerInfo, 

32 MetadataValue, 

33 NestedOutputArchive, 

34 NumberType, 

35 OutputArchive, 

36 TableColumnModel, 

37 TableModel, 

38 no_header_updates, 

39) 

40from ._common import ( 

41 JSON_COLUMN, 

42 JSON_EXTNAME, 

43 ExtensionHDU, 

44 ExtensionKey, 

45 FitsCompressionOptions, 

46 FitsOpaqueMetadata, 

47 PointerModel, 

48) 

49 

50_FITS_FORMAT_VERSION = 1 

51"""Container layout version for files written by `FitsOutputArchive`. 

52 

53Bumps when the on-disk FITS layout (HDU placement, INDX/JSON keyword schema) 

54changes. Independent of any data-model ``SCHEMA_VERSION``. 

55""" 

56 

57 

58def write( 

59 obj: Any, 

60 path: str, 

61 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

62 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

63 compression_seed: int | None = None, 

64 metadata: dict[str, MetadataValue] | None = None, 

65 butler_info: ButlerInfo | None = None, 

66) -> Any: 

67 """Write an object with a ``serialize`` method to a FITS file. 

68 

69 Parameters 

70 ---------- 

71 path 

72 Name of the file to write to. Must not already exist. 

73 compression_options 

74 Options for how to compress the FITS file, keyed by the name of 

75 the attribute (with JSON pointer ``/`` separators for nested 

76 attributes). 

77 update_header 

78 A callback that will be given the primary HDU FITS header and an 

79 opportunity to modify it. 

80 compression_seed 

81 A FITS tile compression seed to use whenever the configured 

82 compression seed is `None` or (for backwards compatibility) ``0``. 

83 This value is then incremented every time it is used. 

84 metadata 

85 Additional metadata to save with the object. This will override any 

86 flexible metadata carried by the object itself with the same keys. 

87 butler_info 

88 Butler information to store in the file. 

89 

90 Returns 

91 ------- 

92 `.serialization.ArchiveTree` 

93 The serialized representation of the object. 

94 """ 

95 opaque_metadata = getattr(obj, "_opaque_metadata", None) 

96 name = getattr(obj, "_archive_default_name", None) 

97 with FitsOutputArchive.open( 

98 path, 

99 compression_options=compression_options, 

100 opaque_metadata=opaque_metadata, 

101 update_header=update_header, 

102 compression_seed=compression_seed, 

103 ) as archive: 

104 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive) 

105 if metadata is not None: 

106 tree.metadata.update(metadata) 

107 if butler_info is not None: 

108 tree.butler_info = butler_info 

109 archive.add_tree(tree) 

110 return tree 

111 

112 

113class FitsOutputArchive(OutputArchive[PointerModel]): 

114 """An implementation of the `.serialization.OutputArchive` interface that 

115 writes to FITS files. 

116 

117 Instances of this class should only be constructed via the `open` 

118 context manager. 

119 """ 

120 

121 def __init__( 

122 self, 

123 hdu_list: astropy.io.fits.HDUList, 

124 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

125 opaque_metadata: Any = None, 

126 compression_seed: int | None = None, 

127 ): 

128 # JSON blobs for objects we've saved as pointers: 

129 self._pointer_targets: list[bytes] = [] 

130 # Mapping from user provided key (e.g. id(some object)) to a table 

131 # pointer to where we actually saved it: 

132 self._pointers_by_key: dict[Hashable, PointerModel] = {} 

133 self._hdu_list = hdu_list 

134 self._primary_hdu = astropy.io.fits.PrimaryHDU() 

135 self._primary_hdu.header.set("FMTVER", _FITS_FORMAT_VERSION, "FITS container layout version.") 

136 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.") 

137 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.") 

138 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.") 

139 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.") 

140 self._hdus_by_name = Counter[str]() 

141 self._compression_options = dict(compression_options) if compression_options is not None else {} 

142 self._compression_seed = compression_seed 

143 self._opaque_metadata = ( 

144 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata() 

145 ) 

146 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None: 

147 self._primary_hdu.header.extend(opaque_primary_header) 

148 self._hdu_list.append(self._primary_hdu) 

149 self._json_hdu_added: bool = False 

150 self._frame_sets: list[tuple[FrameSet, PointerModel]] = [] 

151 

152 @classmethod 

153 @contextmanager 

154 def open( 

155 cls, 

156 filename: str, 

157 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

158 opaque_metadata: Any = None, 

159 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

160 compression_seed: int | None = None, 

161 ) -> Iterator[Self]: 

162 """Create an output archive that writes to the given file. 

163 

164 Parameters 

165 ---------- 

166 filename 

167 Name of the file to write to. Must not already exist. 

168 compression_options 

169 Options for how to compress the FITS file, keyed by the name of 

170 the attribute (with JSON pointer ``/`` separators for nested 

171 attributes). 

172 opaque_metadata 

173 Metadata read from an input archive along with the object being 

174 written now. Ignored if the metadata is not from a FITS archive. 

175 update_header 

176 A callback that will be given the primary HDU FITS header and an 

177 opportunity to modify it. 

178 compression_seed 

179 A FITS tile compression seed to use whenever the configured 

180 compression seed is `None` or (for backwards compatibility) ``0``. 

181 This value is then incremented every time it is used. 

182 

183 Returns 

184 ------- 

185 `contextlib.AbstractContextManager` [`FitsOutputArchive`] 

186 A context manager that returns a `FitsOutputArchive` when entered. 

187 """ 

188 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

189 if hdu_list: 

190 raise OSError(f"File {filename!r} already exists.") 

191 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed) 

192 update_header(hdu_list[0].header) 

193 yield archive 

194 if not archive._json_hdu_added: 

195 raise RuntimeError("Write context exited without 'add_tree' being called.") 

196 hdu_list.flush() 

197 # This multi-open dance is necessary to get Astropy to tell us the 

198 # byte addresses of the HDUs. Hopefully we can get an upstream change 

199 # make this unnecessary at some point. 

200 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list: 

201 index_hdu = cls._make_index_table(hdu_list) 

202 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

203 hdu_list.append(index_hdu) 

204 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1]) 

205 index_bytes = _HDUBytes.from_write_hdu(index_hdu) 

206 # Update the primary HDU with the address and size of the index and 

207 # JSON HDUs, and rewrite just that. We do this write manually, since 

208 # astropy's docs on its 'update' mode are scarce and it's not obvious 

209 # whether we can guarantee it won't rewrite the whole file if we edit 

210 # the primary header. 

211 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address 

212 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size 

213 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address 

214 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size 

215 with open(filename, "r+b") as stream: 

216 stream.write(archive._primary_hdu.header.tostring().encode()) 

217 

218 def serialize_direct[T: pydantic.BaseModel]( 

219 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T] 

220 ) -> T: 

221 nested = NestedOutputArchive[PointerModel](name, self) 

222 return serializer(nested) 

223 

224 def serialize_pointer[T: ArchiveTree]( 

225 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T], key: Hashable 

226 ) -> PointerModel: 

227 if (pointer := self._pointers_by_key.get(key)) is not None: 

228 return pointer 

229 model = self.serialize_direct("", serializer) 

230 json_bytes = model.model_dump_json().encode() 

231 self._pointer_targets.append(json_bytes) 

232 pointer = PointerModel( 

233 column=TableColumnModel( 

234 name=JSON_COLUMN, 

235 data=ArrayReferenceModel( 

236 source=f"fits:{JSON_EXTNAME}[1]", 

237 shape=[len(json_bytes)], 

238 datatype=NumberType.uint8, 

239 ), 

240 ), 

241 row=len(self._pointer_targets), 

242 ) 

243 self._pointers_by_key[key] = pointer 

244 return pointer 

245 

246 def serialize_frame_set[T: ArchiveTree]( 

247 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

248 ) -> PointerModel: 

249 # Docstring inherited. 

250 pointer = self.serialize_pointer(name, serializer, key) 

251 self._frame_sets.append((frame_set, pointer)) 

252 return pointer 

253 

254 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, PointerModel]]: 

255 return iter(self._frame_sets) 

256 

257 def add_array( 

258 self, 

259 array: np.ndarray, 

260 *, 

261 name: str | None = None, 

262 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

263 ) -> ArrayReferenceModel: 

264 if name is None: 

265 raise RuntimeError("Cannot save array with name=None unless it is nested.") 

266 extname = name.upper() 

267 hdu = self._opaque_metadata.maybe_use_precompressed(extname) 

268 if hdu is None: 

269 if (compression_options := self._get_compression_options(name)) is not None: 

270 hdu = compression_options.make_hdu(array, name=extname) 

271 else: 

272 hdu = astropy.io.fits.ImageHDU(array, name=extname) 

273 key = self._add_hdu(hdu, update_header) 

274 return ArrayReferenceModel( 

275 source=str(key), 

276 shape=list(array.shape), 

277 datatype=NumberType.from_numpy(array.dtype), 

278 ) 

279 

280 def add_table( 

281 self, 

282 table: astropy.table.Table, 

283 *, 

284 name: str | None = None, 

285 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

286 ) -> TableModel: 

287 if name is None: 

288 raise RuntimeError("Cannot save table with name=None unless it is nested.") 

289 extname = name.upper() 

290 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname) 

291 # Extract column information directly from the input array, not the 

292 # data in the binary table HDU, because we want to assume as little as 

293 # possible about where Astropy does uint -> TZERO stuff. 

294 columns = TableColumnModel.from_table(table) 

295 key = self._add_hdu(hdu, update_header) 

296 for n, c in enumerate(columns, start=1): 

297 assert isinstance(c.data, ArrayReferenceModel) 

298 c.data.source = f"{key}[{n}]" 

299 return TableModel(columns=columns, meta=table.meta) 

300 

301 def add_structured_array( 

302 self, 

303 array: np.ndarray, 

304 *, 

305 name: str | None = None, 

306 units: Mapping[str, astropy.units.Unit] | None = None, 

307 descriptions: Mapping[str, str] | None = None, 

308 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

309 ) -> TableModel: 

310 if name is None: 

311 raise RuntimeError("Cannot save structured array with name=None unless it is nested.") 

312 extname = name.upper() 

313 # Extract column information directly from the input array, not the 

314 # data in the binary table HDU, because we want to assume as little as 

315 # possible about where Astropy does uint -> TZERO stuff. 

316 columns = TableColumnModel.from_record_dtype(array.dtype) 

317 hdu = astropy.io.fits.BinTableHDU(array, name=extname) 

318 if units is not None: 

319 for c in columns: 

320 c.unit = units.get(c.name) 

321 if descriptions is not None: 

322 for c in columns: 

323 c.description = descriptions.get(c.name, "") 

324 key = self._add_hdu(hdu, update_header) 

325 for n, c in enumerate(columns, start=1): 

326 assert isinstance(c.data, ArrayReferenceModel) 

327 c.data.source = f"{key}[{n}]" 

328 return TableModel(columns=columns) 

329 

330 def _add_hdu( 

331 self, 

332 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

333 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

334 ) -> ExtensionKey: 

335 n_hdus = self._hdus_by_name.get(hdu.name, 0) 

336 key = ExtensionKey(hdu.name, n_hdus + 1) 

337 key.check() 

338 if n_hdus: 

339 hdu.header["EXTVER"] = key.ver 

340 self._hdus_by_name[hdu.name] += 1 

341 update_header(hdu.header) 

342 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None: 

343 hdu.header.extend(opaque_headers) 

344 self._hdu_list.append(hdu) 

345 return key 

346 

347 def add_tree(self, tree: ArchiveTree) -> None: 

348 """Write the JSON tree to the archive. 

349 

350 This method must be called exactly once, just before the `open` context 

351 is exited. 

352 

353 Parameters 

354 ---------- 

355 tree 

356 Pydantic model that represents the tree. 

357 """ 

358 self._primary_hdu.header.set("DATAMODL", tree.schema_url, "Schema URL.") 

359 json_hdu = astropy.io.fits.BinTableHDU.from_columns( 

360 [astropy.io.fits.Column(JSON_COLUMN, "PB")], 

361 nrows=len(self._pointer_targets) + 1, 

362 name=JSON_EXTNAME, 

363 ) 

364 json_hdu.data[0][JSON_COLUMN] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte) 

365 for n, json_target_data in enumerate(self._pointer_targets): 

366 json_hdu.data[n + 1][JSON_COLUMN] = np.frombuffer(json_target_data, dtype=np.byte) 

367 self._hdu_list.append(json_hdu) 

368 self._json_hdu_added = True 

369 

370 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None: 

371 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT) 

372 if result is None or result.quantization is None: 

373 return result 

374 if self._compression_seed is not None and not result.quantization.seed: 

375 result = result.model_copy( 

376 update={ 

377 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed}) 

378 } 

379 ) 

380 self._compression_seed += 1 

381 if self._compression_seed > 10000: 

382 self._compression_seed = 1 

383 # MyPy can tell that result.quantization is not None in the 'if', but 

384 # forgets that by this 'else': 

385 elif result.quantization.seed is None: # type: ignore[union-attr] 

386 raise RuntimeError("No quantization seed provided.") 

387 return result 

388 

389 @staticmethod 

390 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU: 

391 # We use a fixed-length string for the EXTNAME column; it might be 

392 # better to use a variable-length array, but I have not been able to 

393 # figure out how to get astropy to accept a string for the the 

394 # character (TFORM='A') variant of that. And that's only better if the 

395 # EXTNAMEs get super long, which is not likely (but maybe something to 

396 # guard against). 

397 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list) 

398 index_hdu = astropy.io.fits.BinTableHDU.from_columns( 

399 [ 

400 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"), 

401 astropy.io.fits.Column("EXTVER", "J"), 

402 astropy.io.fits.Column("XTENSION", "A8"), 

403 astropy.io.fits.Column("ZIMAGE", "L"), 

404 ] 

405 + _HDUBytes.get_index_hdu_columns(), 

406 nrows=len(hdu_list), 

407 name="INDEX", 

408 ) 

409 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU 

410 for n, hdu in enumerate(hdu_list): 

411 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "") 

412 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1) 

413 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE") 

414 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False) 

415 bytes = _HDUBytes.from_read_hdu(hdu) 

416 bytes.update_index_row(index_hdu.data[n]) 

417 return index_hdu 

418 

419 

420@dataclasses.dataclass 

421class _HDUBytes: 

422 """A struct that records the byte offsets into a FITS HDU.""" 

423 

424 @classmethod 

425 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

426 """Construct from an Astropy HDU instance that has just been written. 

427 

428 Parameters 

429 ---------- 

430 hdu 

431 An Astropy HDU object. 

432 

433 Returns 

434 ------- 

435 hdu_bytes 

436 Struct with byte offsets. 

437 

438 Notes 

439 ----- 

440 This method relies on internal Astropy attributes and does not work on 

441 CompImageHDU objects. 

442 """ 

443 # This is implemented by accessing private Astropy attributes because 

444 # it turns out that's much more reliable than the public fileinfo() 

445 # method, which seems to always return a dict with `None` entries or 

446 # raise; it looks buggy, but docs are scarce enough that it's not clear 

447 # what the right behavior is supposed to be. 

448 if (header_address := getattr(hdu, "_header_offset", None)) is None: 

449 raise RuntimeError("Failed to get Astropy's _header_offset.") 

450 if (data_address := getattr(hdu, "_data_offset", None)) is None: 

451 raise RuntimeError("Failed to get Astropy's _data_offset.") 

452 if (data_size := getattr(hdu, "_data_size", None)) is None: 

453 raise RuntimeError("Failed to get Astropy's _data_size.") 

454 return cls(header_address, data_address, data_size) 

455 

456 @classmethod 

457 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

458 """Construct from an Astropy HDU instance that has just been read. 

459 

460 Parameters 

461 ---------- 

462 hdu 

463 An Astropy HDU object. 

464 

465 Returns 

466 ------- 

467 hdu_bytes 

468 Struct with byte offsets. 

469 """ 

470 info = hdu.fileinfo() 

471 header_address = info["hdrLoc"] 

472 data_address = info["datLoc"] 

473 data_size = info["datSpan"] 

474 return cls(header_address, data_address, data_size) 

475 

476 @classmethod 

477 def from_index_row(cls, row: np.void) -> Self: 

478 """Construct from a row of the index HDU. 

479 

480 Parameters 

481 ---------- 

482 row 

483 A Numpy struct-like scalar. 

484 

485 Returns 

486 ------- 

487 hdu_bytes 

488 Struct with byte offsets. 

489 """ 

490 return cls( 

491 header_address=int(row["HDRADDR"]), 

492 data_address=int(row["DATADDR"]), 

493 data_size=int(row["DATSIZE"]), 

494 ) 

495 

496 @staticmethod 

497 def get_index_hdu_columns() -> list[astropy.io.fits.Column]: 

498 """Return the definitions of the columns this class gets and sets 

499 from the index HDU. 

500 

501 Returns 

502 ------- 

503 columns 

504 A `list` of `astropy.io.fits.Column` objects that represent the 

505 header address, data address, and data size. 

506 """ 

507 return [ 

508 astropy.io.fits.Column("HDRADDR", "K"), 

509 astropy.io.fits.Column("DATADDR", "K"), 

510 astropy.io.fits.Column("DATSIZE", "K"), 

511 ] 

512 

513 header_address: int 

514 """Offset from the beginning of the start of the file to the header of this 

515 HDU, in bytes. 

516 """ 

517 

518 data_address: int 

519 """Offset from the beginning of the start of the file to the data section 

520 of this HDU, in bytes. 

521 """ 

522 

523 data_size: int 

524 """Size of the data section in bytes.""" 

525 

526 @property 

527 def header_size(self) -> int: 

528 """Size of the header in bytes.""" 

529 return self.data_address - self.header_address 

530 

531 @property 

532 def end_address(self) -> int: 

533 """Offset in bytes from the start of the file to the end of the HDU.""" 

534 return self.data_address + self.data_size 

535 

536 @property 

537 def size(self) -> int: 

538 """Total size of this HDU in bytes.""" 

539 return self.data_size + self.data_address - self.header_address 

540 

541 def update_index_row(self, row: np.void) -> None: 

542 """Set the values of a row of the index HDU from this strut. 

543 

544 Parameters 

545 ---------- 

546 row 

547 A Numpy struct-like scalar to modify in place. 

548 """ 

549 row["HDRADDR"] = self.header_address 

550 row["DATADDR"] = self.data_address 

551 row["DATSIZE"] = self.data_size