Coverage for tests / test_image_fits.py: 18%

228 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 02:05 -0700

1# This file is part of afw. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import platform 

23import unittest 

24from collections.abc import Iterator, Mapping, Sequence 

25from contextlib import contextmanager 

26 

27import astropy.io.fits 

28import numpy as np 

29 

30import lsst.afw.image.testUtils # for TestCase monkey-patch side-effects. # noqa: F401 

31from lsst.afw.fits import DitherAlgorithm 

32from lsst.afw.image import ( 

33 ExposureF, 

34 Image, 

35 ImageFitsReader, 

36 MaskedImage, 

37 MaskedImageFitsReader, 

38 makeMaskedImageFromArrays, 

39) 

40from lsst.geom import Box2I, Extent2I, Point2I 

41from lsst.utils.tests import TestCase, getTempFilePath 

42 

43 

44class ImageFitsTestCase(TestCase): 

45 """Tests for serializing Image, Mask, and MaskedImage to FITS.""" 

46 

47 def setUp(self) -> None: 

48 self.rng = np.random.default_rng(500) 

49 

50 def sprinkle(self, masked_image: MaskedImage, value: float | int, count: int, mask_plane: str) -> None: 

51 """set a small number of randomly-selected pixels to the given value. 

52 

53 Parameters 

54 ---------- 

55 masked_image : `MaskedImage` 

56 Masked image to modify in place. The image plane and mask plane 

57 are modified; the variance is not. 

58 value : `int` or `float` 

59 Value of the modified pixels. 

60 count : `int` 

61 Number of pixels to modify. 

62 mask_plane : `str` 

63 Name of a mask plane to add and set. 

64 """ 

65 bit = masked_image.mask.addMaskPlane(mask_plane) 

66 if not count: 

67 return 

68 x = self.rng.integers(0, masked_image.width, count, dtype=int) 

69 y = self.rng.integers(0, masked_image.height, count, dtype=int) 

70 masked_image.image.array[y, x] = value 

71 masked_image.mask.array[y, x] = 1 << bit 

72 

73 def set_nontrivial_xy0(self, image: MaskedImage) -> MaskedImage: 

74 """Set the origin of the given `Image` or `MaskedImage` to something 

75 other than (0, 0). 

76 """ 

77 image.setXY0(Point2I(image.width // 3, image.height // 2)) 

78 return image 

79 

80 def make_int_image( 

81 self, 

82 dtype: np.typing.DTypeLike, 

83 noise_min: int, 

84 noise_max: int, 

85 *, 

86 hot_value: int = 0, 

87 hot_count: int = 50, 

88 cold_value: int = 0, 

89 cold_count: int = 50, 

90 shape: tuple[int, int] = (256, 225), 

91 variance: float | None = None, 

92 ) -> MaskedImage: 

93 """Make an integer test image. 

94 

95 Parameters 

96 ---------- 

97 dtype : `numpy.dtype` 

98 Numpy type object for the image's pixels. 

99 noise_min : `int` 

100 Minimum value (inclusive) for the discrete uniform noise that fills 

101 most of the image. 

102 noise_max : `int` 

103 Maximum value (inclusive) for the discrete uniform noise that fills 

104 most of the image. 

105 hot_value : `int`, optional 

106 Value for hot pixels to `sprinkle` into the image. 

107 hot_count : `int`, optional 

108 Number of hot pixels to `sprinkle` into the image. 

109 cold_value : `int`, optional 

110 Value for cold pixels to `sprinkle` into the image. 

111 cold_count : `int`, optional 

112 Number of cold pixels to `sprinkle` into the image. 

113 shape : `tuple`, optional 

114 Dimensions of the image (y, x). 

115 variance : `float`, optional 

116 Constant value for the variance plane. Default is to use the 

117 actual variance of the added noise. 

118 

119 Returns 

120 ------- 

121 masked_image : `MaskedImage` 

122 Test masked image. Sprinkled pixels are masked as "HOT" or "COLD". 

123 """ 

124 array = self.rng.integers(noise_min, noise_max, shape, dtype, endpoint=True) 

125 masked_image = makeMaskedImageFromArrays(array) 

126 masked_image.variance.array = ( 

127 variance if variance is not None else (noise_max - noise_min) ** 2 / 12.0 

128 ) 

129 self.sprinkle(masked_image, hot_value, hot_count, mask_plane="HOT") 

130 self.sprinkle(masked_image, cold_value, cold_count, mask_plane="COLD") 

131 return self.set_nontrivial_xy0(masked_image) 

132 

133 def make_float_image( 

134 self, 

135 dtype: np.typing.DTypeLike, 

136 noise_sigma: float = 1.0, 

137 noise_mean: float = 0.0, 

138 *, 

139 hot_value: float = 20000.0, 

140 hot_count: int = 50, 

141 cold_value: float = -20000.0, 

142 cold_count: int = 50, 

143 nan_count: int = 50, 

144 shape: tuple[int, int] = (256, 256), 

145 ) -> MaskedImage: 

146 """Make a floating-point test image. 

147 

148 Parameters 

149 ---------- 

150 dtype : `numpy.dtype` 

151 Numpy type object for the image's pixels. 

152 noise_sigma : `float` 

153 Standard deviation of the Gaussian noise distribution that fills 

154 most of the image. 

155 noise_mean : `int` 

156 Mean of the Gaussian noise distribution that fills most of the 

157 image. 

158 hot_value : `float`, optional 

159 Value for hot pixels to `sprinkle` into the image. 

160 hot_count : `float`, optional 

161 Number of hot pixels to `sprinkle` into the image. 

162 cold_value : `float`, optional 

163 Value for cold pixels to `sprinkle` into the image. 

164 cold_count : `float`, optional 

165 Number of cold pixels to `sprinkle` into the image. 

166 nan_count : `float`, optional 

167 Number of NaN pixels to `sprinkle` into the image. 

168 shape : `tuple`, optional 

169 Dimensions of the image (y, x). 

170 

171 Returns 

172 ------- 

173 masked_image : `MaskedImage` 

174 Test masked image. Sprinkled pixels are masked as "HOT", "COLD", 

175 or `NaN`, and the variance plane is set to a constant 

176 ``nan_sigma**2``. 

177 """ 

178 array = self.rng.normal(noise_mean, noise_sigma, shape).astype(dtype) 

179 masked_image = makeMaskedImageFromArrays(array) 

180 masked_image.variance.array = noise_sigma**2 

181 self.sprinkle(masked_image, hot_value, hot_count, mask_plane="HOT") 

182 self.sprinkle(masked_image, cold_value, cold_count, mask_plane="COLD") 

183 self.sprinkle(masked_image, np.nan, nan_count, mask_plane="NAN") 

184 return self.set_nontrivial_xy0(masked_image) 

185 

186 @contextmanager 

187 def roundtrip_image_reader( 

188 self, image: Image, compression: Mapping[str, object] | None = None, original_fits: bool = False 

189 ) -> Iterator[tuple[ImageFitsReader, astropy.io.fits.HDUList]]: 

190 """Return a context manager that writes and reads a test image. 

191 

192 Parameters 

193 ---------- 

194 image : `Image` 

195 Image to write and then read. 

196 compression : `collections.abc.Mapping` or `None`, optional 

197 How to compress the image (`None` for no compression), as a 

198 mapping compatible with `Image.writeFitsWithOptions`. 

199 original_fits : `bool`, optional 

200 If `True`, open the astropy FITS object with 

201 ``disable_image_compression=True``, interpreting all compressed 

202 image HDUs as binary tables and leaving their headers unchanged. 

203 

204 Returns 

205 ------- 

206 context : `contextlib.AbstractContextManager 

207 A context manager that, when entered, returns: 

208 

209 - reader (`ImageFitsReader`): a reader for the saved image; 

210 - fits (`astropy.io.fits.HDUList`): an Astropy FITS object. 

211 """ 

212 with getTempFilePath(".fits") as filename: 

213 image.writeFitsWithOptions(filename, compression) 

214 with astropy.io.fits.open(filename, disable_image_compression=original_fits) as fits: 

215 yield ImageFitsReader(filename), fits 

216 

217 @contextmanager 

218 def roundtrip_masked_image_reader( 

219 self, 

220 masked_image: MaskedImage, 

221 compression: Mapping[str, object] | None = None, 

222 original_fits: bool = False, 

223 ) -> Iterator[tuple[MaskedImageFitsReader, astropy.io.fits.HDUList]]: 

224 """Return a context manager that writes and reads a test masked image. 

225 

226 Parameters 

227 ---------- 

228 image : `MaskedImage` 

229 Masked image to write and then read. 

230 compression : `collections.abc.Mapping` or `None`, optional 

231 How to compress the image (`None` for no compression), as a 

232 mapping compatible with `MaskedImage.writeFitsWithOptions`. 

233 original_fits : `bool`, optional 

234 If `True`, open the astropy FITS object with 

235 ``disable_image_compression=True``, interpreting all compressed 

236 image HDUs as binary tables and leaving their headers unchanged. 

237 

238 Returns 

239 ------- 

240 context : `contextlib.AbstractContextManager 

241 A context manager that, when entered, returns: 

242 

243 - reader (`MaskedImageFitsReader`): a reader for the saved image; 

244 - fits (`astropy.io.fits.HDUList`): an Astropy FITS object. 

245 """ 

246 with getTempFilePath(".fits") as filename: 

247 masked_image.writeFitsWithOptions(filename, compression) 

248 with astropy.io.fits.open(filename, disable_image_compression=original_fits) as fits: 

249 yield MaskedImageFitsReader(filename), fits 

250 

251 @contextmanager 

252 def check_roundtrip_image_invariants( 

253 self, 

254 image: Image, 

255 compression: Mapping[str, object] | None = None, 

256 safe_dtypes: Sequence[np.typing.DTypeLike] = (), 

257 *, 

258 rtol: float = 0.0, 

259 atol: float = 0.0, 

260 ) -> Iterator[Image]: 

261 """Return a context manager that writes and reads a test image and then 

262 runs a battery of tests. 

263 

264 Parameters 

265 ---------- 

266 image : `Image` 

267 Image to write and then read. 

268 compression : `collections.abc.Mapping` or `None`, optional 

269 How to compress the image (`None` for no compression), as a 

270 mapping compatible with `Image.writeFitsWithOptions`. 

271 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional 

272 Other pixel types that the image can be safely read back in as 

273 without any loss of precision or range. 

274 rtol : `float`, optional 

275 Relative tolerance for floating-point comparisons between our 

276 reader and astropy. Defaults to zero. 

277 atol : `float`, optional 

278 Absolute tolerance for floating-point comparisons between our 

279 reader and astropy. Defaults to zero. 

280 

281 Returns 

282 ------- 

283 context : `contextlib.AbstractContextManager 

284 A context manager that returns the roundtripped `Image` when 

285 entered. 

286 

287 Notes 

288 ----- 

289 This method tests: 

290 

291 - reading just the bounding box; 

292 - reading the pixel type; 

293 - reading a subimage; 

294 - consistency with reading with astropy (by comparing to the 

295 roundtripped image, not the original); 

296 - consistency with reading into other pixel types (``safe_dtypes``). 

297 

298 It does not test that the roundtripped image matches the original (that 

299 is for calling code to do). 

300 """ 

301 hdu = 0 if compression is None else 1 

302 with self.subTest(compression=compression): 

303 with self.roundtrip_image_reader(image, compression) as (reader, fits): 

304 self.assertEqual(reader.readDType(), image.array.dtype) 

305 self.assertEqual(reader.readBBox(), image.getBBox()) 

306 roundtripped = reader.read() 

307 np.testing.assert_allclose(roundtripped.array, fits[hdu].data, atol=atol, rtol=rtol) 

308 subbox = Box2I( 

309 image.getXY0() + Extent2I(image.width // 5, image.height // 6), 

310 image.getXY0() + Extent2I(4 * image.width // 5, 5 * image.height // 6), 

311 ) 

312 subrt = reader.read(bbox=subbox) 

313 self.assertEqual(subrt.getBBox(), subbox) 

314 self.assertImagesEqual(subrt, roundtripped[subbox]) 

315 for dtype in [image.dtype, *safe_dtypes]: 

316 np.testing.assert_array_equal( 

317 reader.readArray(dtype=np.dtype(dtype)), roundtripped.array.astype(dtype) 

318 ) 

319 for dtype in [image.dtype, *safe_dtypes]: 

320 np.testing.assert_array_equal( 

321 reader.readArray(dtype=np.dtype(dtype)), roundtripped.array.astype(dtype) 

322 ) 

323 yield roundtripped 

324 

325 @contextmanager 

326 def check_roundtrip_masked_image_invariants( 

327 self, 

328 masked_image: MaskedImage, 

329 compression: Mapping[str, object] | None = None, 

330 safe_dtypes: Sequence[np.typing.DTypeLike] = (), 

331 *, 

332 rtol: float = 0.0, 

333 atol: float = 0.0, 

334 ) -> Iterator[MaskedImage]: 

335 """Return a context manager that writes and reads a test masked image 

336 and then runs a battery of tests. 

337 

338 Parameters 

339 ---------- 

340 masked_image : `MaskedImage` 

341 Masked image to write and then read. 

342 compression : `collections.abc.Mapping` or `None`, optional 

343 How to compress the image (`None` for no compression), as a 

344 mapping compatible with `Image.writeFitsWithOptions`. 

345 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional 

346 Other image plane pixel types that the image can be safely read 

347 back in as without any loss of precision or range. 

348 rtol : `float`, optional 

349 Relative tolerance for floating-point comparisons between our 

350 reader and astropy. Defaults to zero. 

351 atol : `float`, optional 

352 Absolute tolerance for floating-point comparisons between our 

353 reader and astropy. Defaults to zero. 

354 

355 Returns 

356 ------- 

357 context : `contextlib.AbstractContextManager 

358 A context manager that returns the roundtripped `Image` when 

359 entered. 

360 

361 Notes 

362 ----- 

363 This method tests: 

364 

365 - reading just the bounding box; 

366 - reading the pixel type; 

367 - reading a subimage; 

368 - consistency with reading with astropy (by comparing to the 

369 roundtripped image, not the original); 

370 - consistency with reading into other pixel types (``safe_dtypes``). 

371 

372 It does not test that the roundtripped image matches the original (that 

373 is for calling code to do). 

374 """ 

375 with self.subTest(compression=compression): 

376 with self.roundtrip_masked_image_reader(masked_image, compression) as (reader, fits): 

377 self.assertEqual(reader.readImageDType(), masked_image.image.array.dtype) 

378 self.assertEqual(reader.readMaskDType(), masked_image.mask.array.dtype) 

379 self.assertEqual(reader.readVarianceDType(), masked_image.variance.array.dtype) 

380 self.assertEqual(reader.readBBox(), masked_image.getBBox()) 

381 roundtripped = reader.read() 

382 np.testing.assert_allclose(roundtripped.image.array, fits["IMAGE"].data, rtol=rtol, atol=atol) 

383 np.testing.assert_allclose(roundtripped.mask.array, fits["MASK"].data, rtol=rtol, atol=atol) 

384 np.testing.assert_allclose( 

385 roundtripped.variance.array, fits["VARIANCE"].data, rtol=rtol, atol=atol 

386 ) 

387 subbox = Box2I( 

388 masked_image.getXY0() + Extent2I(masked_image.width // 5, masked_image.height // 6), 

389 masked_image.getXY0() 

390 + Extent2I(4 * masked_image.width // 5, 5 * masked_image.height // 6), 

391 ) 

392 subrt = reader.read(bbox=subbox) 

393 self.assertEqual(subrt.getBBox(), subbox) 

394 self.assertMaskedImagesEqual(subrt, roundtripped[subbox]) 

395 for dtype in [masked_image.image.dtype, *safe_dtypes]: 

396 np.testing.assert_array_equal( 

397 reader.readImageArray(dtype=np.dtype(dtype)), roundtripped.image.array.astype(dtype) 

398 ) 

399 for dtype in [masked_image.image.dtype, *safe_dtypes]: 

400 np.testing.assert_array_equal( 

401 reader.readImageArray(dtype=np.dtype(dtype)), 

402 roundtripped.image.array.astype(dtype), 

403 ) 

404 yield roundtripped 

405 

406 def check_exact_roundtrip( 

407 self, 

408 masked_image: MaskedImage, 

409 compression: Mapping[str, object] | None = None, 

410 *, 

411 safe_dtypes: Sequence[np.typing.DTypeLike] = (), 

412 ) -> None: 

413 """Test that a masked image round-trips through serialization exactly. 

414 

415 Parameters 

416 ---------- 

417 masked_image : `MaskedImage` 

418 Masked image to write and then read. Roundtripping just the image 

419 plane on its own is also tested. 

420 compression : `collections.abc.Mapping` or `None`, optional 

421 How to compress the image (`None` for no compression), as a 

422 mapping compatible with `MaskedImage.writeFitsWithOptions`. 

423 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional 

424 Other image plane pixel types that the image can be safely read 

425 back in as without any loss of precision or range. 

426 """ 

427 with self.check_roundtrip_image_invariants(masked_image.image, compression, safe_dtypes) as image_rt: 

428 self.assertImagesEqual(image_rt, masked_image.image) 

429 with self.check_roundtrip_masked_image_invariants( 

430 masked_image, compression, safe_dtypes=safe_dtypes 

431 ) as masked_image_rt: 

432 self.assertMaskedImagesEqual(masked_image_rt, masked_image) 

433 

434 def check_quantized_roundtrip( 

435 self, 

436 masked_image: MaskedImage, 

437 *, 

438 compression: Mapping[str, object], 

439 roundtrip_atol: float, 

440 safe_dtypes: Sequence[np.typing.DTypeLike] = (), 

441 ) -> None: 

442 """Test that a masked image round-trips through serialization with some 

443 loss of precision expected. 

444 

445 Parameters 

446 ---------- 

447 masked_image : `MaskedImage` 

448 Masked image to write and then read. Roundtripping just the image 

449 plane on its own is also tested. 

450 compression : `collections.abc.Mapping` 

451 How to compress the image, as a mapping compatible with 

452 `MaskedImage.writeFitsWithOptions`. 

453 roundtrip_atol : `float` 

454 Absolute tolerance for comparisons between the input and output 

455 image. 

456 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional 

457 Other image plane pixel types that the image can be safely read 

458 back in as without any loss of precision or range. 

459 """ 

460 atol = 0.0 

461 rtol = 0.0 

462 if platform.machine() in ("aarch64", "arm64"): 

463 # CFITSIO on Arm64 (both Linux and macOS reads quantized images 

464 # back slightly but significantly differently from CFITSIO on 

465 # x86_64, Astropy on x86_64, and Astropy on aarch64 (which all 

466 # agree). This has been reported upstream. The difference is 

467 # typically smaller than the change due to quantization but larger 

468 # than expected from floating-point round-off error from just 

469 # switching around the order of operations in applying 

470 # ZZERO/ZSCALE, especially for float64. 

471 atol = 1E-4 

472 rtol = 1E-6 

473 with self.check_roundtrip_image_invariants( 

474 masked_image.image, compression, safe_dtypes, atol=atol, rtol=rtol 

475 ) as image_rt: 

476 self.assertImagesAlmostEqual(image_rt, masked_image.image, atol=roundtrip_atol) 

477 with self.check_roundtrip_masked_image_invariants( 

478 masked_image, compression, safe_dtypes=safe_dtypes, atol=atol, rtol=rtol 

479 ) as masked_image_rt: 

480 self.assertMaskedImagesAlmostEqual(masked_image_rt, masked_image, atol=roundtrip_atol) 

481 

482 def test_u16(self) -> None: 

483 """Test FITS serialization (including compression) for uint16.""" 

484 masked_image = self.make_int_image( 

485 np.uint16, noise_min=256, noise_max=512, cold_value=1, hot_value=32750 

486 ) 

487 safe_dtypes = (np.int32, np.float32, np.float64) 

488 # uint64 is only safe when not decompressing; otherwise CFITSIO can't 

489 # handle it. 

490 self.check_exact_roundtrip(masked_image, safe_dtypes=safe_dtypes + (np.uint64,)) 

491 self.check_exact_roundtrip( 

492 masked_image, {"image": {}, "mask": {}, "variance": {}}, safe_dtypes=safe_dtypes 

493 ) 

494 self.check_exact_roundtrip( 

495 masked_image, 

496 { 

497 "image": {"algorithm": "GZIP_1"}, 

498 "mask": {"algorithm": "GZIP_1"}, 

499 "variance": {"algorithm": "GZIP_1"}, 

500 }, 

501 safe_dtypes=safe_dtypes, 

502 ) 

503 self.check_exact_roundtrip( 

504 masked_image, 

505 { 

506 "image": {"algorithm": "GZIP_2"}, 

507 "mask": {"algorithm": "GZIP_2"}, 

508 "variance": {"algorithm": "GZIP_2"}, 

509 }, 

510 safe_dtypes=safe_dtypes, 

511 ) 

512 self.check_exact_roundtrip( 

513 masked_image, 

514 { 

515 "image": {"algorithm": "RICE_1"}, 

516 "mask": {"algorithm": "RICE_1"}, 

517 "variance": {}, 

518 }, 

519 safe_dtypes=safe_dtypes, 

520 ) 

521 

522 def test_i32(self) -> None: 

523 """Test FITS serialization (including compression) for int32.""" 

524 masked_image = self.make_int_image( 

525 np.int32, noise_min=-(1 << 21), noise_max=(1 << 22), cold_value=-(1 << 24), hot_value=(1 << 25) 

526 ) 

527 safe_dtypes = (np.float64,) 

528 self.check_exact_roundtrip(masked_image, safe_dtypes=safe_dtypes) 

529 self.check_exact_roundtrip( 

530 masked_image, {"image": {}, "mask": {}, "variance": {}}, safe_dtypes=safe_dtypes 

531 ) 

532 self.check_exact_roundtrip( 

533 masked_image, 

534 { 

535 "image": {"algorithm": "GZIP_1"}, 

536 "mask": {"algorithm": "GZIP_1"}, 

537 "variance": {"algorithm": "GZIP_1"}, 

538 }, 

539 safe_dtypes=safe_dtypes, 

540 ) 

541 self.check_exact_roundtrip( 

542 masked_image, 

543 { 

544 "image": {"algorithm": "GZIP_2"}, 

545 "mask": {"algorithm": "GZIP_2"}, 

546 "variance": {"algorithm": "GZIP_2"}, 

547 }, 

548 safe_dtypes=safe_dtypes, 

549 ) 

550 self.check_exact_roundtrip( 

551 masked_image, 

552 { 

553 "image": {"algorithm": "RICE_1"}, 

554 "mask": {"algorithm": "RICE_1"}, 

555 "variance": {}, 

556 }, 

557 safe_dtypes=safe_dtypes, 

558 ) 

559 

560 def test_u64(self) -> None: 

561 """Test FITS serialization for uint64.""" 

562 masked_image = self.make_int_image( 

563 np.uint64, 

564 noise_min=256, 

565 noise_max=(1 << 42), 

566 cold_value=1, 

567 hot_value=(1 << 50), 

568 # We need to lie about the variance because the actual variance 

569 # of this uniform distribution is too big to be represented as 

570 # float32. 

571 variance=50.0, 

572 ) 

573 self.check_exact_roundtrip(masked_image) 

574 # CFITSIO does not support compressing uint64, so neither do we. 

575 

576 def test_f32_lossless(self) -> None: 

577 """Test uncompressed FITS serialization and lossless compression for 

578 float32. 

579 """ 

580 masked_image = self.make_float_image( 

581 np.float32, noise_mean=100.0, noise_sigma=15.0, cold_value=-(1 << 24), hot_value=(1 << 25) 

582 ) 

583 safe_dtypes = (np.float64,) 

584 self.check_exact_roundtrip(masked_image, compression=None, safe_dtypes=safe_dtypes) 

585 self.check_exact_roundtrip( 

586 masked_image, {"image": {}, "mask": {}, "variance": {}}, safe_dtypes=safe_dtypes 

587 ) 

588 self.check_exact_roundtrip( 

589 masked_image, 

590 { 

591 "image": {"algorithm": "GZIP_1"}, 

592 "mask": {"algorithm": "GZIP_1"}, 

593 "variance": {"algorithm": "GZIP_1"}, 

594 }, 

595 safe_dtypes=safe_dtypes, 

596 ) 

597 self.check_exact_roundtrip( 

598 masked_image, 

599 { 

600 "image": {"algorithm": "GZIP_2"}, 

601 "mask": {"algorithm": "GZIP_2"}, 

602 "variance": {"algorithm": "GZIP_2"}, 

603 }, 

604 safe_dtypes=safe_dtypes, 

605 ) 

606 

607 def test_f32_lossy_no_mask(self) -> None: 

608 """Test quantized compression of float32 with no bad pixels other than 

609 NaNs. 

610 """ 

611 masked_image = self.make_float_image( 

612 # Don't add hot or cold pixels because we won't be using a mask to 

613 # keep them out of the stdev estimation. 

614 np.float32, 

615 noise_mean=100.0, 

616 noise_sigma=15.0, 

617 cold_count=0, 

618 hot_count=0, 

619 ) 

620 lossy = {"algorithm": "RICE_1", "quantization": {"seed": 1}} 

621 lossless = {"algorithm": "GZIP_2"} 

622 for dither in DitherAlgorithm.__members__.keys(): 

623 lossy["quantization"]["dither"] = dither 

624 lossy["quantization"]["level"] = 30.0 

625 lossy["quantization"]["scaling"] = "STDEV_CFITSIO" 

626 self.check_quantized_roundtrip( 

627 masked_image, 

628 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

629 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15. 

630 ) 

631 lossy["quantization"]["scaling"] = "MANUAL" 

632 lossy["quantization"]["level"] = 0.25 

633 self.check_quantized_roundtrip( 

634 masked_image, 

635 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

636 roundtrip_atol=0.25, 

637 ) 

638 

639 def test_f32_lossy_masked(self) -> None: 

640 """Test quantized compression of float 32 with hot and cold pixels.""" 

641 masked_image = self.make_float_image( 

642 np.float32, noise_mean=100.0, noise_sigma=15.0, hot_value=1e5, cold_value=-1e4 

643 ) 

644 lossy = { 

645 "algorithm": "RICE_1", 

646 "quantization": {"level": 30.0, "mask_planes": ["HOT", "COLD", "NAN"], "seed": 1}, 

647 } 

648 lossless = {"algorithm": "GZIP_2"} 

649 for dither in DitherAlgorithm.__members__.keys(): 

650 lossy["quantization"]["dither"] = dither 

651 lossy["quantization"]["scaling"] = "STDEV_MASKED" 

652 self.check_quantized_roundtrip( 

653 masked_image, 

654 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

655 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15. 

656 ) 

657 lossy["quantization"]["scaling"] = "RANGE" 

658 with self.subTest(dither=dither, scaling="RANGE"): 

659 self.check_quantized_roundtrip( 

660 masked_image, 

661 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

662 roundtrip_atol=1e-7, # since the dynamic range is tiny, this is actually barely lossy 

663 ) 

664 

665 def test_f64_lossless(self) -> None: 

666 """Test uncompressed FITS serialization and lossless compression for 

667 float36. 

668 """ 

669 masked_image = self.make_float_image( 

670 np.float64, noise_mean=100.0, noise_sigma=15.0, cold_value=-(1 << 24), hot_value=(1 << 25) 

671 ) 

672 self.check_exact_roundtrip(masked_image, compression=None) 

673 self.check_exact_roundtrip(masked_image, {"image": {}, "mask": {}, "variance": {}}) 

674 self.check_exact_roundtrip( 

675 masked_image, 

676 { 

677 "image": {"algorithm": "GZIP_1"}, 

678 "mask": {"algorithm": "GZIP_1"}, 

679 "variance": {"algorithm": "GZIP_1"}, 

680 }, 

681 ) 

682 self.check_exact_roundtrip( 

683 masked_image, 

684 { 

685 "image": {"algorithm": "GZIP_2"}, 

686 "mask": {"algorithm": "GZIP_2"}, 

687 "variance": {"algorithm": "GZIP_2"}, 

688 }, 

689 ) 

690 

691 def test_f64_lossy_no_mask(self) -> None: 

692 """Test quantized compression of float64 with no bad pixels other than 

693 NaNs. 

694 """ 

695 masked_image = self.make_float_image( 

696 # Don't add hot or cold pixels because we won't be using a mask to 

697 # keep them out of the stdev estimation. 

698 np.float64, 

699 noise_mean=100.0, 

700 noise_sigma=15.0, 

701 cold_count=0, 

702 hot_count=0, 

703 ) 

704 lossy = {"algorithm": "RICE_1", "quantization": {"seed": 1}} 

705 lossless = {"algorithm": "GZIP_2"} 

706 for dither in DitherAlgorithm.__members__.keys(): 

707 lossy["quantization"]["dither"] = dither 

708 lossy["quantization"]["level"] = 30.0 

709 lossy["quantization"]["scaling"] = "STDEV_CFITSIO" 

710 self.check_quantized_roundtrip( 

711 masked_image, 

712 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

713 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15. 

714 ) 

715 lossy["quantization"]["scaling"] = "MANUAL" 

716 lossy["quantization"]["level"] = 0.25 

717 self.check_quantized_roundtrip( 

718 masked_image, 

719 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

720 roundtrip_atol=0.25, 

721 ) 

722 

723 def test_f64_lossy_masked(self) -> None: 

724 """Test quantized compression of float64 with hot and cold pixels.""" 

725 masked_image = self.make_float_image( 

726 np.float64, noise_mean=100.0, noise_sigma=15.0, hot_value=1e5, cold_value=-1e4 

727 ) 

728 lossy = { 

729 "algorithm": "RICE_1", 

730 "quantization": {"level": 30.0, "mask_planes": ["HOT", "COLD", "NAN"], "seed": 1}, 

731 } 

732 lossless = {"algorithm": "GZIP_2"} 

733 for dither in DitherAlgorithm.__members__.keys(): 

734 lossy["quantization"]["dither"] = dither 

735 lossy["quantization"]["scaling"] = "STDEV_MASKED" 

736 self.check_quantized_roundtrip( 

737 masked_image, 

738 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

739 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15. 

740 ) 

741 lossy["quantization"]["scaling"] = "RANGE" 

742 with self.subTest(dither=dither, scaling="RANGE"): 

743 self.check_quantized_roundtrip( 

744 masked_image, 

745 compression={"image": lossy, "mask": lossless, "variance": lossy}, 

746 roundtrip_atol=1e-7, # since the dynamic range is tiny, this is actually barely lossy 

747 ) 

748 

749 def check_tile_shape( 

750 self, image: Image, compression: Mapping[str, object], ztile1: int, ztile2: int 

751 ) -> None: 

752 with self.roundtrip_image_reader(image, compression=compression, original_fits=True) as (_, fits): 

753 hdu = fits[1] 

754 self.assertEqual(hdu.header["ZTILE1"], ztile1) 

755 self.assertEqual(hdu.header["ZTILE2"], ztile2) 

756 

757 def test_tile_shapes(self) -> None: 

758 """Test customizing the tile size in FITS compression.""" 

759 masked_image = self.make_float_image( 

760 # Don't add hot or cold pixels because we won't be using a mask to 

761 # keep them out of the stdev estimation. 

762 np.float32, 

763 noise_mean=100.0, 

764 noise_sigma=15.0, 

765 cold_count=0, 

766 hot_count=0, 

767 ) 

768 lossy = { 

769 "image": { 

770 "algorithm": "RICE_1", 

771 "quantization": { 

772 "dither": "NO_DITHER", 

773 "scaling": "STDEV_CFITSIO", 

774 "level": 30.0, 

775 "seed": 1, 

776 }, 

777 } 

778 } 

779 self.check_tile_shape(masked_image.image, lossy, ztile1=masked_image.width, ztile2=1) 

780 lossy["image"]["tile_width"] = 1 

781 lossy["image"]["tile_height"] = 0 

782 self.check_tile_shape(masked_image.image, lossy, ztile1=1, ztile2=masked_image.height) 

783 lossy["image"]["tile_width"] = 25 

784 lossy["image"]["tile_height"] = 17 

785 self.check_tile_shape(masked_image.image, lossy, ztile1=25, ztile2=17) 

786 

787 def test_uzscale_header(self) -> None: 

788 """Test that we write the UZSCALE header card into the header when 

789 ZSCALE is universal. 

790 """ 

791 masked_image = self.make_float_image( 

792 # Don't add hot or cold pixels because we won't be using a mask to 

793 # keep them out of the stdev estimation. 

794 np.float32, 

795 noise_mean=100.0, 

796 noise_sigma=15.0, 

797 cold_count=0, 

798 hot_count=0, 

799 ) 

800 lossy = { 

801 "algorithm": "RICE_1", 

802 "quantization": {"level": 30.0, "mask_planes": ["HOT", "COLD", "NAN"], "seed": 1}, 

803 } 

804 lossy = { 

805 "image": { 

806 "algorithm": "RICE_1", 

807 "quantization": { 

808 "dither": "NO_DITHER", 

809 "scaling": "STDEV_MASKED", 

810 "level": 30.0, 

811 "seed": 1, 

812 }, 

813 } 

814 } 

815 with self.roundtrip_image_reader( 

816 masked_image.image, compression=lossy, original_fits=True 

817 ) as (_, fits): 

818 self.assertFloatsAlmostEqual(fits[1].header["UZSCALE"], 0.5, rtol=1E-2) 

819 

820 # I can't get CFITSIO to make the correction described by this test; if we 

821 # update the header key after asking it to write the image, it gets into 

822 # a weird state that prevents (at least) appending binary table HDUs. 

823 @unittest.expectedFailure 

824 def test_no_rice_one(self) -> None: 

825 """Test that we've corrected CFITSIO's non-standard writing of RICE_ONE 

826 instead of RICE_1 when the dither mode is SUBTRACTIVE_DITHER_2. 

827 

828 RICE_ONE is an old pre-standard key that is already in enough images 

829 that it's reasonable for readers to accept it, but it is not correct 

830 for CFITSIO to be writing it. 

831 """ 

832 masked_image = self.make_float_image( 

833 # Don't add hot or cold pixels because we won't be using a mask to 

834 # keep them out of the stdev estimation. 

835 np.float32, 

836 noise_mean=100.0, 

837 noise_sigma=15.0, 

838 cold_count=0, 

839 hot_count=0, 

840 ) 

841 lossy = { 

842 "image": { 

843 "algorithm": "RICE_1", 

844 "quantization": { 

845 "dither": "SUBTRACTIVE_DITHER_2", 

846 "scaling": "STDEV_CFITSIO", 

847 "level": 30.0, 

848 "seed": 1, 

849 }, 

850 }, 

851 "mask": { 

852 "algorithm": "GZIP_2", 

853 }, 

854 "variance": { 

855 "algorithm": "RICE_1", 

856 "quantization": { 

857 "dither": "SUBTRACTIVE_DITHER_2", 

858 "scaling": "STDEV_CFITSIO", 

859 "level": 30.0, 

860 "seed": 1, 

861 }, 

862 }, 

863 } 

864 with self.roundtrip_image_reader(masked_image.image, lossy, original_fits=True) as (_, fits): 

865 self.assertEqual(len(fits), 2) 

866 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

867 with self.roundtrip_masked_image_reader(masked_image, lossy, original_fits=True) as (_, fits): 

868 self.assertEqual(len(fits), 4) 

869 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

870 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

871 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

872 

873 def test_compressed_exposure(self) -> None: 

874 masked_image = self.make_float_image( 

875 np.float32, noise_mean=100.0, noise_sigma=15.0, hot_value=1e5, cold_value=-1e4 

876 ) 

877 exposure = ExposureF(masked_image) 

878 lossy = { 

879 "algorithm": "RICE_1", 

880 "quantization": { 

881 "dither": "SUBTRACTIVE_DITHER_2", 

882 "scaling": "STDEV_MASKED", 

883 "mask_planes": ["HOT", "COLD", "NAN"], 

884 "level": 30.0, 

885 "seed": 1, 

886 }, 

887 } 

888 lossless = {"algorithm": "GZIP_2"} 

889 with getTempFilePath(".fits") as filename: 

890 exposure.writeFitsWithOptions( 

891 filename, 

892 {"image": lossy, "mask": lossless, "variance": lossy}, 

893 ) 

894 roundtripped = ExposureF(filename) 

895 self.assertMaskedImagesAlmostEqual(masked_image, roundtripped.maskedImage, atol=0.25) 

896 

897 

898if __name__ == "__main__": 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true

899 unittest.main()