Coverage for python / lsst / images / fits / _output_archive.py: 21%

211 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 01:57 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("FitsOutputArchive", "write") 

15 

16import dataclasses 

17from collections import Counter 

18from collections.abc import Callable, Hashable, Iterator, Mapping 

19from contextlib import contextmanager 

20from typing import Any, Self 

21 

22import astropy.io.fits 

23import astropy.table 

24import numpy as np 

25import pydantic 

26 

27from .._transforms import FrameSet 

28from ..serialization import ( 

29 ArchiveTree, 

30 ArrayReferenceModel, 

31 ButlerInfo, 

32 MetadataValue, 

33 NestedOutputArchive, 

34 NumberType, 

35 OutputArchive, 

36 TableColumnModel, 

37 TableModel, 

38 no_header_updates, 

39) 

40from ._common import ( 

41 JSON_COLUMN, 

42 JSON_EXTNAME, 

43 ExtensionHDU, 

44 ExtensionKey, 

45 FitsCompressionOptions, 

46 FitsOpaqueMetadata, 

47 PointerModel, 

48) 

49 

50 

51def write( 

52 obj: Any, 

53 path: str, 

54 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

55 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

56 compression_seed: int | None = None, 

57 metadata: dict[str, MetadataValue] | None = None, 

58 butler_info: ButlerInfo | None = None, 

59) -> Any: 

60 """Write an object with a ``serialize`` method to a FITS file. 

61 

62 Parameters 

63 ---------- 

64 path 

65 Name of the file to write to. Must not already exist. 

66 compression_options 

67 Options for how to compress the FITS file, keyed by the name of 

68 the attribute (with JSON pointer ``/`` separators for nested 

69 attributes). 

70 update_header 

71 A callback that will be given the primary HDU FITS header and an 

72 opportunity to modify it. 

73 compression_seed 

74 A FITS tile compression seed to use whenever the configured 

75 compression seed is `None` or (for backwards compatibility) ``0``. 

76 This value is then incremented every time it is used. 

77 metadata 

78 Additional metadata to save with the object. This will override any 

79 flexible metadata carried by the object itself with the same keys. 

80 butler_info 

81 Butler information to store in the file. 

82 

83 Returns 

84 ------- 

85 `.serialization.ArchiveTree` 

86 The serialized representation of the object. 

87 """ 

88 opaque_metadata = getattr(obj, "_opaque_metadata", None) 

89 name = getattr(obj, "_archive_default_name", None) 

90 with FitsOutputArchive.open( 

91 path, 

92 compression_options=compression_options, 

93 opaque_metadata=opaque_metadata, 

94 update_header=update_header, 

95 compression_seed=compression_seed, 

96 ) as archive: 

97 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive) 

98 if metadata is not None: 

99 tree.metadata.update(metadata) 

100 if butler_info is not None: 

101 tree.butler_info = butler_info 

102 archive.add_tree(tree) 

103 return tree 

104 

105 

106class FitsOutputArchive(OutputArchive[PointerModel]): 

107 """An implementation of the `.serialization.OutputArchive` interface that 

108 writes to FITS files. 

109 

110 Instances of this class should only be constructed via the `open` 

111 context manager. 

112 """ 

113 

114 def __init__( 

115 self, 

116 hdu_list: astropy.io.fits.HDUList, 

117 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

118 opaque_metadata: Any = None, 

119 compression_seed: int | None = None, 

120 ): 

121 # JSON blobs for objects we've saved as pointers: 

122 self._pointer_targets: list[bytes] = [] 

123 # Mapping from user provided key (e.g. id(some object)) to a table 

124 # pointer to where we actually saved it: 

125 self._pointers_by_key: dict[Hashable, PointerModel] = {} 

126 self._hdu_list = hdu_list 

127 self._primary_hdu = astropy.io.fits.PrimaryHDU() 

128 # TODO: add subformat description and version to primary HDU. 

129 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.") 

130 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.") 

131 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.") 

132 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.") 

133 self._hdus_by_name = Counter[str]() 

134 self._compression_options = dict(compression_options) if compression_options is not None else {} 

135 self._compression_seed = compression_seed 

136 self._opaque_metadata = ( 

137 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata() 

138 ) 

139 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None: 

140 self._primary_hdu.header.extend(opaque_primary_header) 

141 self._hdu_list.append(self._primary_hdu) 

142 self._json_hdu_added: bool = False 

143 self._frame_sets: list[tuple[FrameSet, PointerModel]] = [] 

144 

145 @classmethod 

146 @contextmanager 

147 def open( 

148 cls, 

149 filename: str, 

150 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

151 opaque_metadata: Any = None, 

152 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

153 compression_seed: int | None = None, 

154 ) -> Iterator[Self]: 

155 """Create an output archive that writes to the given file. 

156 

157 Parameters 

158 ---------- 

159 filename 

160 Name of the file to write to. Must not already exist. 

161 compression_options 

162 Options for how to compress the FITS file, keyed by the name of 

163 the attribute (with JSON pointer ``/`` separators for nested 

164 attributes). 

165 opaque_metadata 

166 Metadata read from an input archive along with the object being 

167 written now. Ignored if the metadata is not from a FITS archive. 

168 update_header 

169 A callback that will be given the primary HDU FITS header and an 

170 opportunity to modify it. 

171 compression_seed 

172 A FITS tile compression seed to use whenever the configured 

173 compression seed is `None` or (for backwards compatibility) ``0``. 

174 This value is then incremented every time it is used. 

175 

176 Returns 

177 ------- 

178 `contextlib.AbstractContextManager` [`FitsOutputArchive`] 

179 A context manager that returns a `FitsOutputArchive` when entered. 

180 """ 

181 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

182 if hdu_list: 

183 raise OSError(f"File {filename!r} already exists.") 

184 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed) 

185 update_header(hdu_list[0].header) 

186 yield archive 

187 if not archive._json_hdu_added: 

188 raise RuntimeError("Write context exited without 'add_tree' being called.") 

189 hdu_list.flush() 

190 # This multi-open dance is necessary to get Astropy to tell us the 

191 # byte addresses of the HDUs. Hopefully we can get an upstream change 

192 # make this unnecessary at some point. 

193 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list: 

194 index_hdu = cls._make_index_table(hdu_list) 

195 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

196 hdu_list.append(index_hdu) 

197 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1]) 

198 index_bytes = _HDUBytes.from_write_hdu(index_hdu) 

199 # Update the primary HDU with the address and size of the index and 

200 # JSON HDUs, and rewrite just that. We do this write manually, since 

201 # astropy's docs on its 'update' mode are scarce and it's not obvious 

202 # whether we can guarantee it won't rewrite the whole file if we edit 

203 # the primary header. 

204 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address 

205 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size 

206 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address 

207 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size 

208 with open(filename, "r+b") as stream: 

209 stream.write(archive._primary_hdu.header.tostring().encode()) 

210 

211 def serialize_direct[T: pydantic.BaseModel]( 

212 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T] 

213 ) -> T: 

214 nested = NestedOutputArchive[PointerModel](name, self) 

215 return serializer(nested) 

216 

217 def serialize_pointer[T: ArchiveTree]( 

218 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T], key: Hashable 

219 ) -> PointerModel: 

220 if (pointer := self._pointers_by_key.get(key)) is not None: 

221 return pointer 

222 model = self.serialize_direct("", serializer) 

223 json_bytes = model.model_dump_json().encode() 

224 self._pointer_targets.append(json_bytes) 

225 pointer = PointerModel( 

226 column=TableColumnModel( 

227 name=JSON_COLUMN, 

228 data=ArrayReferenceModel( 

229 source=f"fits:{JSON_EXTNAME}[1]", 

230 shape=[len(json_bytes)], 

231 datatype=NumberType.uint8, 

232 ), 

233 ), 

234 row=len(self._pointer_targets), 

235 ) 

236 self._pointers_by_key[key] = pointer 

237 return pointer 

238 

239 def serialize_frame_set[T: ArchiveTree]( 

240 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

241 ) -> PointerModel: 

242 # Docstring inherited. 

243 pointer = self.serialize_pointer(name, serializer, key) 

244 self._frame_sets.append((frame_set, pointer)) 

245 return pointer 

246 

247 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, PointerModel]]: 

248 return iter(self._frame_sets) 

249 

250 def add_array( 

251 self, 

252 array: np.ndarray, 

253 *, 

254 name: str | None = None, 

255 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

256 ) -> ArrayReferenceModel: 

257 if name is None: 

258 raise RuntimeError("Cannot save array with name=None unless it is nested.") 

259 extname = name.upper() 

260 hdu = self._opaque_metadata.maybe_use_precompressed(extname) 

261 if hdu is None: 

262 if (compression_options := self._get_compression_options(name)) is not None: 

263 hdu = compression_options.make_hdu(array, name=extname) 

264 else: 

265 hdu = astropy.io.fits.ImageHDU(array, name=extname) 

266 key = self._add_hdu(hdu, update_header) 

267 return ArrayReferenceModel( 

268 source=str(key), 

269 shape=list(array.shape), 

270 datatype=NumberType.from_numpy(array.dtype), 

271 ) 

272 

273 def add_table( 

274 self, 

275 table: astropy.table.Table, 

276 *, 

277 name: str | None = None, 

278 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

279 ) -> TableModel: 

280 if name is None: 

281 raise RuntimeError("Cannot save table with name=None unless it is nested.") 

282 extname = name.upper() 

283 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname) 

284 # Extract column information directly from the input array, not the 

285 # data in the binary table HDU, because we want to assume as little as 

286 # possible about where Astropy does uint -> TZERO stuff. 

287 columns = TableColumnModel.from_table(table) 

288 key = self._add_hdu(hdu, update_header) 

289 for n, c in enumerate(columns, start=1): 

290 assert isinstance(c.data, ArrayReferenceModel) 

291 c.data.source = f"{key}[{n}]" 

292 return TableModel(columns=columns, meta=table.meta) 

293 

294 def add_structured_array( 

295 self, 

296 array: np.ndarray, 

297 *, 

298 name: str | None = None, 

299 units: Mapping[str, astropy.units.Unit] | None = None, 

300 descriptions: Mapping[str, str] | None = None, 

301 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

302 ) -> TableModel: 

303 if name is None: 

304 raise RuntimeError("Cannot save structured array with name=None unless it is nested.") 

305 extname = name.upper() 

306 # Extract column information directly from the input array, not the 

307 # data in the binary table HDU, because we want to assume as little as 

308 # possible about where Astropy does uint -> TZERO stuff. 

309 columns = TableColumnModel.from_record_dtype(array.dtype) 

310 hdu = astropy.io.fits.BinTableHDU(array, name=extname) 

311 if units is not None: 

312 for c in columns: 

313 c.unit = units.get(c.name) 

314 if descriptions is not None: 

315 for c in columns: 

316 c.description = descriptions.get(c.name, "") 

317 key = self._add_hdu(hdu, update_header) 

318 for n, c in enumerate(columns, start=1): 

319 assert isinstance(c.data, ArrayReferenceModel) 

320 c.data.source = f"{key}[{n}]" 

321 return TableModel(columns=columns) 

322 

323 def _add_hdu( 

324 self, 

325 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

326 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

327 ) -> ExtensionKey: 

328 n_hdus = self._hdus_by_name.get(hdu.name, 0) 

329 key = ExtensionKey(hdu.name, n_hdus + 1) 

330 key.check() 

331 if n_hdus: 

332 hdu.header["EXTVER"] = key.ver 

333 self._hdus_by_name[hdu.name] += 1 

334 update_header(hdu.header) 

335 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None: 

336 hdu.header.extend(opaque_headers) 

337 self._hdu_list.append(hdu) 

338 return key 

339 

340 def add_tree(self, tree: ArchiveTree) -> None: 

341 """Write the JSON tree to the archive. 

342 

343 This method must be called exactly once, just before the `open` context 

344 is exited. 

345 

346 Parameters 

347 ---------- 

348 tree 

349 Pydantic model that represents the tree. 

350 """ 

351 json_hdu = astropy.io.fits.BinTableHDU.from_columns( 

352 [astropy.io.fits.Column(JSON_COLUMN, "PB")], 

353 nrows=len(self._pointer_targets) + 1, 

354 name=JSON_EXTNAME, 

355 ) 

356 json_hdu.data[0][JSON_COLUMN] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte) 

357 for n, json_target_data in enumerate(self._pointer_targets): 

358 json_hdu.data[n + 1][JSON_COLUMN] = np.frombuffer(json_target_data, dtype=np.byte) 

359 self._hdu_list.append(json_hdu) 

360 self._json_hdu_added = True 

361 

362 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None: 

363 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT) 

364 if result is None or result.quantization is None: 

365 return result 

366 if self._compression_seed is not None and not result.quantization.seed: 

367 result = result.model_copy( 

368 update={ 

369 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed}) 

370 } 

371 ) 

372 self._compression_seed += 1 

373 if self._compression_seed > 10000: 

374 self._compression_seed = 1 

375 # MyPy can tell that result.quantization is not None in the 'if', but 

376 # forgets that by this 'else': 

377 elif result.quantization.seed is None: # type: ignore[union-attr] 

378 raise RuntimeError("No quantization seed provided.") 

379 return result 

380 

381 @staticmethod 

382 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU: 

383 # We use a fixed-length string for the EXTNAME column; it might be 

384 # better to use a variable-length array, but I have not been able to 

385 # figure out how to get astropy to accept a string for the the 

386 # character (TFORM='A') variant of that. And that's only better if the 

387 # EXTNAMEs get super long, which is not likely (but maybe something to 

388 # guard against). 

389 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list) 

390 index_hdu = astropy.io.fits.BinTableHDU.from_columns( 

391 [ 

392 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"), 

393 astropy.io.fits.Column("EXTVER", "J"), 

394 astropy.io.fits.Column("XTENSION", "A8"), 

395 astropy.io.fits.Column("ZIMAGE", "L"), 

396 ] 

397 + _HDUBytes.get_index_hdu_columns(), 

398 nrows=len(hdu_list), 

399 name="INDEX", 

400 ) 

401 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU 

402 for n, hdu in enumerate(hdu_list): 

403 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "") 

404 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1) 

405 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE") 

406 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False) 

407 bytes = _HDUBytes.from_read_hdu(hdu) 

408 bytes.update_index_row(index_hdu.data[n]) 

409 return index_hdu 

410 

411 

412@dataclasses.dataclass 

413class _HDUBytes: 

414 """A struct that records the byte offsets into a FITS HDU.""" 

415 

416 @classmethod 

417 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

418 """Construct from an Astropy HDU instance that has just been written. 

419 

420 Parameters 

421 ---------- 

422 hdu 

423 An Astropy HDU object. 

424 

425 Returns 

426 ------- 

427 hdu_bytes 

428 Struct with byte offsets. 

429 

430 Notes 

431 ----- 

432 This method relies on internal Astropy attributes and does not work on 

433 CompImageHDU objects. 

434 """ 

435 # This is implemented by accessing private Astropy attributes because 

436 # it turns out that's much more reliable than the public fileinfo() 

437 # method, which seems to always return a dict with `None` entries or 

438 # raise; it looks buggy, but docs are scarce enough that it's not clear 

439 # what the right behavior is supposed to be. 

440 if (header_address := getattr(hdu, "_header_offset", None)) is None: 

441 raise RuntimeError("Failed to get Astropy's _header_offset.") 

442 if (data_address := getattr(hdu, "_data_offset", None)) is None: 

443 raise RuntimeError("Failed to get Astropy's _data_offset.") 

444 if (data_size := getattr(hdu, "_data_size", None)) is None: 

445 raise RuntimeError("Failed to get Astropy's _data_size.") 

446 return cls(header_address, data_address, data_size) 

447 

448 @classmethod 

449 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

450 """Construct from an Astropy HDU instance that has just been read. 

451 

452 Parameters 

453 ---------- 

454 hdu 

455 An Astropy HDU object. 

456 

457 Returns 

458 ------- 

459 hdu_bytes 

460 Struct with byte offsets. 

461 """ 

462 info = hdu.fileinfo() 

463 header_address = info["hdrLoc"] 

464 data_address = info["datLoc"] 

465 data_size = info["datSpan"] 

466 return cls(header_address, data_address, data_size) 

467 

468 @classmethod 

469 def from_index_row(cls, row: np.void) -> Self: 

470 """Construct from a row of the index HDU. 

471 

472 Parameters 

473 ---------- 

474 row 

475 A Numpy struct-like scalar. 

476 

477 Returns 

478 ------- 

479 hdu_bytes 

480 Struct with byte offsets. 

481 """ 

482 return cls( 

483 header_address=int(row["HDRADDR"]), 

484 data_address=int(row["DATADDR"]), 

485 data_size=int(row["DATSIZE"]), 

486 ) 

487 

488 @staticmethod 

489 def get_index_hdu_columns() -> list[astropy.io.fits.Column]: 

490 """Return the definitions of the columns this class gets and sets 

491 from the index HDU. 

492 

493 Returns 

494 ------- 

495 columns 

496 A `list` of `astropy.io.fits.Column` objects that represent the 

497 header address, data address, and data size. 

498 """ 

499 return [ 

500 astropy.io.fits.Column("HDRADDR", "K"), 

501 astropy.io.fits.Column("DATADDR", "K"), 

502 astropy.io.fits.Column("DATSIZE", "K"), 

503 ] 

504 

505 header_address: int 

506 """Offset from the beginning of the start of the file to the header of this 

507 HDU, in bytes. 

508 """ 

509 

510 data_address: int 

511 """Offset from the beginning of the start of the file to the data section 

512 of this HDU, in bytes. 

513 """ 

514 

515 data_size: int 

516 """Size of the data section in bytes.""" 

517 

518 @property 

519 def header_size(self) -> int: 

520 """Size of the header in bytes.""" 

521 return self.data_address - self.header_address 

522 

523 @property 

524 def end_address(self) -> int: 

525 """Offset in bytes from the start of the file to the end of the HDU.""" 

526 return self.data_address + self.data_size 

527 

528 @property 

529 def size(self) -> int: 

530 """Total size of this HDU in bytes.""" 

531 return self.data_size + self.data_address - self.header_address 

532 

533 def update_index_row(self, row: np.void) -> None: 

534 """Set the values of a row of the index HDU from this strut. 

535 

536 Parameters 

537 ---------- 

538 row 

539 A Numpy struct-like scalar to modify in place. 

540 """ 

541 row["HDRADDR"] = self.header_address 

542 row["DATADDR"] = self.data_address 

543 row["DATSIZE"] = self.data_size