Coverage for tests/test_image_fits.py: 18%
228 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 01:21 -0700
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 01:21 -0700
1# This file is part of afw.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import platform
23import unittest
24from collections.abc import Iterator, Mapping, Sequence
25from contextlib import contextmanager
27import astropy.io.fits
28import numpy as np
30import lsst.afw.image.testUtils # for TestCase monkey-patch side-effects. # noqa: F401
31from lsst.afw.fits import DitherAlgorithm
32from lsst.afw.image import (
33 ExposureF,
34 Image,
35 ImageFitsReader,
36 MaskedImage,
37 MaskedImageFitsReader,
38 makeMaskedImageFromArrays,
39)
40from lsst.geom import Box2I, Extent2I, Point2I
41from lsst.utils.tests import TestCase, getTempFilePath
44class ImageFitsTestCase(TestCase):
45 """Tests for serializing Image, Mask, and MaskedImage to FITS."""
47 def setUp(self) -> None:
48 self.rng = np.random.default_rng(500)
50 def sprinkle(self, masked_image: MaskedImage, value: float | int, count: int, mask_plane: str) -> None:
51 """set a small number of randomly-selected pixels to the given value.
53 Parameters
54 ----------
55 masked_image : `MaskedImage`
56 Masked image to modify in place. The image plane and mask plane
57 are modified; the variance is not.
58 value : `int` or `float`
59 Value of the modified pixels.
60 count : `int`
61 Number of pixels to modify.
62 mask_plane : `str`
63 Name of a mask plane to add and set.
64 """
65 bit = masked_image.mask.addMaskPlane(mask_plane)
66 if not count:
67 return
68 x = self.rng.integers(0, masked_image.width, count, dtype=int)
69 y = self.rng.integers(0, masked_image.height, count, dtype=int)
70 masked_image.image.array[y, x] = value
71 masked_image.mask.array[y, x] = 1 << bit
73 def set_nontrivial_xy0(self, image: MaskedImage) -> MaskedImage:
74 """Set the origin of the given `Image` or `MaskedImage` to something
75 other than (0, 0).
76 """
77 image.setXY0(Point2I(image.width // 3, image.height // 2))
78 return image
80 def make_int_image(
81 self,
82 dtype: np.typing.DTypeLike,
83 noise_min: int,
84 noise_max: int,
85 *,
86 hot_value: int = 0,
87 hot_count: int = 50,
88 cold_value: int = 0,
89 cold_count: int = 50,
90 shape: tuple[int, int] = (256, 225),
91 variance: float | None = None,
92 ) -> MaskedImage:
93 """Make an integer test image.
95 Parameters
96 ----------
97 dtype : `numpy.dtype`
98 Numpy type object for the image's pixels.
99 noise_min : `int`
100 Minimum value (inclusive) for the discrete uniform noise that fills
101 most of the image.
102 noise_max : `int`
103 Maximum value (inclusive) for the discrete uniform noise that fills
104 most of the image.
105 hot_value : `int`, optional
106 Value for hot pixels to `sprinkle` into the image.
107 hot_count : `int`, optional
108 Number of hot pixels to `sprinkle` into the image.
109 cold_value : `int`, optional
110 Value for cold pixels to `sprinkle` into the image.
111 cold_count : `int`, optional
112 Number of cold pixels to `sprinkle` into the image.
113 shape : `tuple`, optional
114 Dimensions of the image (y, x).
115 variance : `float`, optional
116 Constant value for the variance plane. Default is to use the
117 actual variance of the added noise.
119 Returns
120 -------
121 masked_image : `MaskedImage`
122 Test masked image. Sprinkled pixels are masked as "HOT" or "COLD".
123 """
124 array = self.rng.integers(noise_min, noise_max, shape, dtype, endpoint=True)
125 masked_image = makeMaskedImageFromArrays(array)
126 masked_image.variance.array = (
127 variance if variance is not None else (noise_max - noise_min) ** 2 / 12.0
128 )
129 self.sprinkle(masked_image, hot_value, hot_count, mask_plane="HOT")
130 self.sprinkle(masked_image, cold_value, cold_count, mask_plane="COLD")
131 return self.set_nontrivial_xy0(masked_image)
133 def make_float_image(
134 self,
135 dtype: np.typing.DTypeLike,
136 noise_sigma: float = 1.0,
137 noise_mean: float = 0.0,
138 *,
139 hot_value: float = 20000.0,
140 hot_count: int = 50,
141 cold_value: float = -20000.0,
142 cold_count: int = 50,
143 nan_count: int = 50,
144 shape: tuple[int, int] = (256, 256),
145 ) -> MaskedImage:
146 """Make a floating-point test image.
148 Parameters
149 ----------
150 dtype : `numpy.dtype`
151 Numpy type object for the image's pixels.
152 noise_sigma : `float`
153 Standard deviation of the Gaussian noise distribution that fills
154 most of the image.
155 noise_mean : `int`
156 Mean of the Gaussian noise distribution that fills most of the
157 image.
158 hot_value : `float`, optional
159 Value for hot pixels to `sprinkle` into the image.
160 hot_count : `float`, optional
161 Number of hot pixels to `sprinkle` into the image.
162 cold_value : `float`, optional
163 Value for cold pixels to `sprinkle` into the image.
164 cold_count : `float`, optional
165 Number of cold pixels to `sprinkle` into the image.
166 nan_count : `float`, optional
167 Number of NaN pixels to `sprinkle` into the image.
168 shape : `tuple`, optional
169 Dimensions of the image (y, x).
171 Returns
172 -------
173 masked_image : `MaskedImage`
174 Test masked image. Sprinkled pixels are masked as "HOT", "COLD",
175 or `NaN`, and the variance plane is set to a constant
176 ``nan_sigma**2``.
177 """
178 array = self.rng.normal(noise_mean, noise_sigma, shape).astype(dtype)
179 masked_image = makeMaskedImageFromArrays(array)
180 masked_image.variance.array = noise_sigma**2
181 self.sprinkle(masked_image, hot_value, hot_count, mask_plane="HOT")
182 self.sprinkle(masked_image, cold_value, cold_count, mask_plane="COLD")
183 self.sprinkle(masked_image, np.nan, nan_count, mask_plane="NAN")
184 return self.set_nontrivial_xy0(masked_image)
186 @contextmanager
187 def roundtrip_image_reader(
188 self, image: Image, compression: Mapping[str, object] | None = None, original_fits: bool = False
189 ) -> Iterator[tuple[ImageFitsReader, astropy.io.fits.HDUList]]:
190 """Return a context manager that writes and reads a test image.
192 Parameters
193 ----------
194 image : `Image`
195 Image to write and then read.
196 compression : `collections.abc.Mapping` or `None`, optional
197 How to compress the image (`None` for no compression), as a
198 mapping compatible with `Image.writeFitsWithOptions`.
199 original_fits : `bool`, optional
200 If `True`, open the astropy FITS object with
201 ``disable_image_compression=True``, interpreting all compressed
202 image HDUs as binary tables and leaving their headers unchanged.
204 Returns
205 -------
206 context : `contextlib.AbstractContextManager
207 A context manager that, when entered, returns:
209 - reader (`ImageFitsReader`): a reader for the saved image;
210 - fits (`astropy.io.fits.HDUList`): an Astropy FITS object.
211 """
212 with getTempFilePath(".fits") as filename:
213 image.writeFitsWithOptions(filename, compression)
214 with astropy.io.fits.open(filename, disable_image_compression=original_fits) as fits:
215 yield ImageFitsReader(filename), fits
217 @contextmanager
218 def roundtrip_masked_image_reader(
219 self,
220 masked_image: MaskedImage,
221 compression: Mapping[str, object] | None = None,
222 original_fits: bool = False,
223 ) -> Iterator[tuple[MaskedImageFitsReader, astropy.io.fits.HDUList]]:
224 """Return a context manager that writes and reads a test masked image.
226 Parameters
227 ----------
228 image : `MaskedImage`
229 Masked image to write and then read.
230 compression : `collections.abc.Mapping` or `None`, optional
231 How to compress the image (`None` for no compression), as a
232 mapping compatible with `MaskedImage.writeFitsWithOptions`.
233 original_fits : `bool`, optional
234 If `True`, open the astropy FITS object with
235 ``disable_image_compression=True``, interpreting all compressed
236 image HDUs as binary tables and leaving their headers unchanged.
238 Returns
239 -------
240 context : `contextlib.AbstractContextManager
241 A context manager that, when entered, returns:
243 - reader (`MaskedImageFitsReader`): a reader for the saved image;
244 - fits (`astropy.io.fits.HDUList`): an Astropy FITS object.
245 """
246 with getTempFilePath(".fits") as filename:
247 masked_image.writeFitsWithOptions(filename, compression)
248 with astropy.io.fits.open(filename, disable_image_compression=original_fits) as fits:
249 yield MaskedImageFitsReader(filename), fits
251 @contextmanager
252 def check_roundtrip_image_invariants(
253 self,
254 image: Image,
255 compression: Mapping[str, object] | None = None,
256 safe_dtypes: Sequence[np.typing.DTypeLike] = (),
257 *,
258 rtol: float = 0.0,
259 atol: float = 0.0,
260 ) -> Iterator[Image]:
261 """Return a context manager that writes and reads a test image and then
262 runs a battery of tests.
264 Parameters
265 ----------
266 image : `Image`
267 Image to write and then read.
268 compression : `collections.abc.Mapping` or `None`, optional
269 How to compress the image (`None` for no compression), as a
270 mapping compatible with `Image.writeFitsWithOptions`.
271 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional
272 Other pixel types that the image can be safely read back in as
273 without any loss of precision or range.
274 rtol : `float`, optional
275 Relative tolerance for floating-point comparisons between our
276 reader and astropy. Defaults to zero.
277 atol : `float`, optional
278 Absolute tolerance for floating-point comparisons between our
279 reader and astropy. Defaults to zero.
281 Returns
282 -------
283 context : `contextlib.AbstractContextManager
284 A context manager that returns the roundtripped `Image` when
285 entered.
287 Notes
288 -----
289 This method tests:
291 - reading just the bounding box;
292 - reading the pixel type;
293 - reading a subimage;
294 - consistency with reading with astropy (by comparing to the
295 roundtripped image, not the original);
296 - consistency with reading into other pixel types (``safe_dtypes``).
298 It does not test that the roundtripped image matches the original (that
299 is for calling code to do).
300 """
301 hdu = 0 if compression is None else 1
302 with self.subTest(compression=compression):
303 with self.roundtrip_image_reader(image, compression) as (reader, fits):
304 self.assertEqual(reader.readDType(), image.array.dtype)
305 self.assertEqual(reader.readBBox(), image.getBBox())
306 roundtripped = reader.read()
307 np.testing.assert_allclose(roundtripped.array, fits[hdu].data, atol=atol, rtol=rtol)
308 subbox = Box2I(
309 image.getXY0() + Extent2I(image.width // 5, image.height // 6),
310 image.getXY0() + Extent2I(4 * image.width // 5, 5 * image.height // 6),
311 )
312 subrt = reader.read(bbox=subbox)
313 self.assertEqual(subrt.getBBox(), subbox)
314 self.assertImagesEqual(subrt, roundtripped[subbox])
315 for dtype in [image.dtype, *safe_dtypes]:
316 np.testing.assert_array_equal(
317 reader.readArray(dtype=np.dtype(dtype)), roundtripped.array.astype(dtype)
318 )
319 for dtype in [image.dtype, *safe_dtypes]:
320 np.testing.assert_array_equal(
321 reader.readArray(dtype=np.dtype(dtype)), roundtripped.array.astype(dtype)
322 )
323 yield roundtripped
325 @contextmanager
326 def check_roundtrip_masked_image_invariants(
327 self,
328 masked_image: MaskedImage,
329 compression: Mapping[str, object] | None = None,
330 safe_dtypes: Sequence[np.typing.DTypeLike] = (),
331 *,
332 rtol: float = 0.0,
333 atol: float = 0.0,
334 ) -> Iterator[MaskedImage]:
335 """Return a context manager that writes and reads a test masked image
336 and then runs a battery of tests.
338 Parameters
339 ----------
340 masked_image : `MaskedImage`
341 Masked image to write and then read.
342 compression : `collections.abc.Mapping` or `None`, optional
343 How to compress the image (`None` for no compression), as a
344 mapping compatible with `Image.writeFitsWithOptions`.
345 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional
346 Other image plane pixel types that the image can be safely read
347 back in as without any loss of precision or range.
348 rtol : `float`, optional
349 Relative tolerance for floating-point comparisons between our
350 reader and astropy. Defaults to zero.
351 atol : `float`, optional
352 Absolute tolerance for floating-point comparisons between our
353 reader and astropy. Defaults to zero.
355 Returns
356 -------
357 context : `contextlib.AbstractContextManager
358 A context manager that returns the roundtripped `Image` when
359 entered.
361 Notes
362 -----
363 This method tests:
365 - reading just the bounding box;
366 - reading the pixel type;
367 - reading a subimage;
368 - consistency with reading with astropy (by comparing to the
369 roundtripped image, not the original);
370 - consistency with reading into other pixel types (``safe_dtypes``).
372 It does not test that the roundtripped image matches the original (that
373 is for calling code to do).
374 """
375 with self.subTest(compression=compression):
376 with self.roundtrip_masked_image_reader(masked_image, compression) as (reader, fits):
377 self.assertEqual(reader.readImageDType(), masked_image.image.array.dtype)
378 self.assertEqual(reader.readMaskDType(), masked_image.mask.array.dtype)
379 self.assertEqual(reader.readVarianceDType(), masked_image.variance.array.dtype)
380 self.assertEqual(reader.readBBox(), masked_image.getBBox())
381 roundtripped = reader.read()
382 np.testing.assert_allclose(roundtripped.image.array, fits["IMAGE"].data, rtol=rtol, atol=atol)
383 np.testing.assert_allclose(roundtripped.mask.array, fits["MASK"].data, rtol=rtol, atol=atol)
384 np.testing.assert_allclose(
385 roundtripped.variance.array, fits["VARIANCE"].data, rtol=rtol, atol=atol
386 )
387 subbox = Box2I(
388 masked_image.getXY0() + Extent2I(masked_image.width // 5, masked_image.height // 6),
389 masked_image.getXY0()
390 + Extent2I(4 * masked_image.width // 5, 5 * masked_image.height // 6),
391 )
392 subrt = reader.read(bbox=subbox)
393 self.assertEqual(subrt.getBBox(), subbox)
394 self.assertMaskedImagesEqual(subrt, roundtripped[subbox])
395 for dtype in [masked_image.image.dtype, *safe_dtypes]:
396 np.testing.assert_array_equal(
397 reader.readImageArray(dtype=np.dtype(dtype)), roundtripped.image.array.astype(dtype)
398 )
399 for dtype in [masked_image.image.dtype, *safe_dtypes]:
400 np.testing.assert_array_equal(
401 reader.readImageArray(dtype=np.dtype(dtype)),
402 roundtripped.image.array.astype(dtype),
403 )
404 yield roundtripped
406 def check_exact_roundtrip(
407 self,
408 masked_image: MaskedImage,
409 compression: Mapping[str, object] | None = None,
410 *,
411 safe_dtypes: Sequence[np.typing.DTypeLike] = (),
412 ) -> None:
413 """Test that a masked image round-trips through serialization exactly.
415 Parameters
416 ----------
417 masked_image : `MaskedImage`
418 Masked image to write and then read. Roundtripping just the image
419 plane on its own is also tested.
420 compression : `collections.abc.Mapping` or `None`, optional
421 How to compress the image (`None` for no compression), as a
422 mapping compatible with `MaskedImage.writeFitsWithOptions`.
423 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional
424 Other image plane pixel types that the image can be safely read
425 back in as without any loss of precision or range.
426 """
427 with self.check_roundtrip_image_invariants(masked_image.image, compression, safe_dtypes) as image_rt:
428 self.assertImagesEqual(image_rt, masked_image.image)
429 with self.check_roundtrip_masked_image_invariants(
430 masked_image, compression, safe_dtypes=safe_dtypes
431 ) as masked_image_rt:
432 self.assertMaskedImagesEqual(masked_image_rt, masked_image)
434 def check_quantized_roundtrip(
435 self,
436 masked_image: MaskedImage,
437 *,
438 compression: Mapping[str, object],
439 roundtrip_atol: float,
440 safe_dtypes: Sequence[np.typing.DTypeLike] = (),
441 ) -> None:
442 """Test that a masked image round-trips through serialization with some
443 loss of precision expected.
445 Parameters
446 ----------
447 masked_image : `MaskedImage`
448 Masked image to write and then read. Roundtripping just the image
449 plane on its own is also tested.
450 compression : `collections.abc.Mapping`
451 How to compress the image, as a mapping compatible with
452 `MaskedImage.writeFitsWithOptions`.
453 roundtrip_atol : `float`
454 Absolute tolerance for comparisons between the input and output
455 image.
456 safe_dtypes : `~collections.abc.Sequence` [`numpy.dtype`], optional
457 Other image plane pixel types that the image can be safely read
458 back in as without any loss of precision or range.
459 """
460 atol = 0.0
461 rtol = 0.0
462 if platform.machine() in ("aarch64", "arm64"):
463 # CFITSIO on Arm64 (both Linux and macOS reads quantized images
464 # back slightly but significantly differently from CFITSIO on
465 # x86_64, Astropy on x86_64, and Astropy on aarch64 (which all
466 # agree). This has been reported upstream. The difference is
467 # typically smaller than the change due to quantization but larger
468 # than expected from floating-point round-off error from just
469 # switching around the order of operations in applying
470 # ZZERO/ZSCALE, especially for float64.
471 atol = 1E-4
472 rtol = 1E-6
473 with self.check_roundtrip_image_invariants(
474 masked_image.image, compression, safe_dtypes, atol=atol, rtol=rtol
475 ) as image_rt:
476 self.assertImagesAlmostEqual(image_rt, masked_image.image, atol=roundtrip_atol)
477 with self.check_roundtrip_masked_image_invariants(
478 masked_image, compression, safe_dtypes=safe_dtypes, atol=atol, rtol=rtol
479 ) as masked_image_rt:
480 self.assertMaskedImagesAlmostEqual(masked_image_rt, masked_image, atol=roundtrip_atol)
482 def test_u16(self) -> None:
483 """Test FITS serialization (including compression) for uint16."""
484 masked_image = self.make_int_image(
485 np.uint16, noise_min=256, noise_max=512, cold_value=1, hot_value=32750
486 )
487 safe_dtypes = (np.int32, np.float32, np.float64)
488 # uint64 is only safe when not decompressing; otherwise CFITSIO can't
489 # handle it.
490 self.check_exact_roundtrip(masked_image, safe_dtypes=safe_dtypes + (np.uint64,))
491 self.check_exact_roundtrip(
492 masked_image, {"image": {}, "mask": {}, "variance": {}}, safe_dtypes=safe_dtypes
493 )
494 self.check_exact_roundtrip(
495 masked_image,
496 {
497 "image": {"algorithm": "GZIP_1"},
498 "mask": {"algorithm": "GZIP_1"},
499 "variance": {"algorithm": "GZIP_1"},
500 },
501 safe_dtypes=safe_dtypes,
502 )
503 self.check_exact_roundtrip(
504 masked_image,
505 {
506 "image": {"algorithm": "GZIP_2"},
507 "mask": {"algorithm": "GZIP_2"},
508 "variance": {"algorithm": "GZIP_2"},
509 },
510 safe_dtypes=safe_dtypes,
511 )
512 self.check_exact_roundtrip(
513 masked_image,
514 {
515 "image": {"algorithm": "RICE_1"},
516 "mask": {"algorithm": "RICE_1"},
517 "variance": {},
518 },
519 safe_dtypes=safe_dtypes,
520 )
522 def test_i32(self) -> None:
523 """Test FITS serialization (including compression) for int32."""
524 masked_image = self.make_int_image(
525 np.int32, noise_min=-(1 << 21), noise_max=(1 << 22), cold_value=-(1 << 24), hot_value=(1 << 25)
526 )
527 safe_dtypes = (np.float64,)
528 self.check_exact_roundtrip(masked_image, safe_dtypes=safe_dtypes)
529 self.check_exact_roundtrip(
530 masked_image, {"image": {}, "mask": {}, "variance": {}}, safe_dtypes=safe_dtypes
531 )
532 self.check_exact_roundtrip(
533 masked_image,
534 {
535 "image": {"algorithm": "GZIP_1"},
536 "mask": {"algorithm": "GZIP_1"},
537 "variance": {"algorithm": "GZIP_1"},
538 },
539 safe_dtypes=safe_dtypes,
540 )
541 self.check_exact_roundtrip(
542 masked_image,
543 {
544 "image": {"algorithm": "GZIP_2"},
545 "mask": {"algorithm": "GZIP_2"},
546 "variance": {"algorithm": "GZIP_2"},
547 },
548 safe_dtypes=safe_dtypes,
549 )
550 self.check_exact_roundtrip(
551 masked_image,
552 {
553 "image": {"algorithm": "RICE_1"},
554 "mask": {"algorithm": "RICE_1"},
555 "variance": {},
556 },
557 safe_dtypes=safe_dtypes,
558 )
560 def test_u64(self) -> None:
561 """Test FITS serialization for uint64."""
562 masked_image = self.make_int_image(
563 np.uint64,
564 noise_min=256,
565 noise_max=(1 << 42),
566 cold_value=1,
567 hot_value=(1 << 50),
568 # We need to lie about the variance because the actual variance
569 # of this uniform distribution is too big to be represented as
570 # float32.
571 variance=50.0,
572 )
573 self.check_exact_roundtrip(masked_image)
574 # CFITSIO does not support compressing uint64, so neither do we.
576 def test_f32_lossless(self) -> None:
577 """Test uncompressed FITS serialization and lossless compression for
578 float32.
579 """
580 masked_image = self.make_float_image(
581 np.float32, noise_mean=100.0, noise_sigma=15.0, cold_value=-(1 << 24), hot_value=(1 << 25)
582 )
583 safe_dtypes = (np.float64,)
584 self.check_exact_roundtrip(masked_image, compression=None, safe_dtypes=safe_dtypes)
585 self.check_exact_roundtrip(
586 masked_image, {"image": {}, "mask": {}, "variance": {}}, safe_dtypes=safe_dtypes
587 )
588 self.check_exact_roundtrip(
589 masked_image,
590 {
591 "image": {"algorithm": "GZIP_1"},
592 "mask": {"algorithm": "GZIP_1"},
593 "variance": {"algorithm": "GZIP_1"},
594 },
595 safe_dtypes=safe_dtypes,
596 )
597 self.check_exact_roundtrip(
598 masked_image,
599 {
600 "image": {"algorithm": "GZIP_2"},
601 "mask": {"algorithm": "GZIP_2"},
602 "variance": {"algorithm": "GZIP_2"},
603 },
604 safe_dtypes=safe_dtypes,
605 )
607 def test_f32_lossy_no_mask(self) -> None:
608 """Test quantized compression of float32 with no bad pixels other than
609 NaNs.
610 """
611 masked_image = self.make_float_image(
612 # Don't add hot or cold pixels because we won't be using a mask to
613 # keep them out of the stdev estimation.
614 np.float32,
615 noise_mean=100.0,
616 noise_sigma=15.0,
617 cold_count=0,
618 hot_count=0,
619 )
620 lossy = {"algorithm": "RICE_1", "quantization": {"seed": 1}}
621 lossless = {"algorithm": "GZIP_2"}
622 for dither in DitherAlgorithm.__members__.keys():
623 lossy["quantization"]["dither"] = dither
624 lossy["quantization"]["level"] = 30.0
625 lossy["quantization"]["scaling"] = "STDEV_CFITSIO"
626 self.check_quantized_roundtrip(
627 masked_image,
628 compression={"image": lossy, "mask": lossless, "variance": lossy},
629 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15.
630 )
631 lossy["quantization"]["scaling"] = "MANUAL"
632 lossy["quantization"]["level"] = 0.25
633 self.check_quantized_roundtrip(
634 masked_image,
635 compression={"image": lossy, "mask": lossless, "variance": lossy},
636 roundtrip_atol=0.25,
637 )
639 def test_f32_lossy_masked(self) -> None:
640 """Test quantized compression of float 32 with hot and cold pixels."""
641 masked_image = self.make_float_image(
642 np.float32, noise_mean=100.0, noise_sigma=15.0, hot_value=1e5, cold_value=-1e4
643 )
644 lossy = {
645 "algorithm": "RICE_1",
646 "quantization": {"level": 30.0, "mask_planes": ["HOT", "COLD", "NAN"], "seed": 1},
647 }
648 lossless = {"algorithm": "GZIP_2"}
649 for dither in DitherAlgorithm.__members__.keys():
650 lossy["quantization"]["dither"] = dither
651 lossy["quantization"]["scaling"] = "STDEV_MASKED"
652 self.check_quantized_roundtrip(
653 masked_image,
654 compression={"image": lossy, "mask": lossless, "variance": lossy},
655 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15.
656 )
657 lossy["quantization"]["scaling"] = "RANGE"
658 with self.subTest(dither=dither, scaling="RANGE"):
659 self.check_quantized_roundtrip(
660 masked_image,
661 compression={"image": lossy, "mask": lossless, "variance": lossy},
662 roundtrip_atol=1e-7, # since the dynamic range is tiny, this is actually barely lossy
663 )
665 def test_f64_lossless(self) -> None:
666 """Test uncompressed FITS serialization and lossless compression for
667 float36.
668 """
669 masked_image = self.make_float_image(
670 np.float64, noise_mean=100.0, noise_sigma=15.0, cold_value=-(1 << 24), hot_value=(1 << 25)
671 )
672 self.check_exact_roundtrip(masked_image, compression=None)
673 self.check_exact_roundtrip(masked_image, {"image": {}, "mask": {}, "variance": {}})
674 self.check_exact_roundtrip(
675 masked_image,
676 {
677 "image": {"algorithm": "GZIP_1"},
678 "mask": {"algorithm": "GZIP_1"},
679 "variance": {"algorithm": "GZIP_1"},
680 },
681 )
682 self.check_exact_roundtrip(
683 masked_image,
684 {
685 "image": {"algorithm": "GZIP_2"},
686 "mask": {"algorithm": "GZIP_2"},
687 "variance": {"algorithm": "GZIP_2"},
688 },
689 )
691 def test_f64_lossy_no_mask(self) -> None:
692 """Test quantized compression of float64 with no bad pixels other than
693 NaNs.
694 """
695 masked_image = self.make_float_image(
696 # Don't add hot or cold pixels because we won't be using a mask to
697 # keep them out of the stdev estimation.
698 np.float64,
699 noise_mean=100.0,
700 noise_sigma=15.0,
701 cold_count=0,
702 hot_count=0,
703 )
704 lossy = {"algorithm": "RICE_1", "quantization": {"seed": 1}}
705 lossless = {"algorithm": "GZIP_2"}
706 for dither in DitherAlgorithm.__members__.keys():
707 lossy["quantization"]["dither"] = dither
708 lossy["quantization"]["level"] = 30.0
709 lossy["quantization"]["scaling"] = "STDEV_CFITSIO"
710 self.check_quantized_roundtrip(
711 masked_image,
712 compression={"image": lossy, "mask": lossless, "variance": lossy},
713 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15.
714 )
715 lossy["quantization"]["scaling"] = "MANUAL"
716 lossy["quantization"]["level"] = 0.25
717 self.check_quantized_roundtrip(
718 masked_image,
719 compression={"image": lossy, "mask": lossless, "variance": lossy},
720 roundtrip_atol=0.25,
721 )
723 def test_f64_lossy_masked(self) -> None:
724 """Test quantized compression of float64 with hot and cold pixels."""
725 masked_image = self.make_float_image(
726 np.float64, noise_mean=100.0, noise_sigma=15.0, hot_value=1e5, cold_value=-1e4
727 )
728 lossy = {
729 "algorithm": "RICE_1",
730 "quantization": {"level": 30.0, "mask_planes": ["HOT", "COLD", "NAN"], "seed": 1},
731 }
732 lossless = {"algorithm": "GZIP_2"}
733 for dither in DitherAlgorithm.__members__.keys():
734 lossy["quantization"]["dither"] = dither
735 lossy["quantization"]["scaling"] = "STDEV_MASKED"
736 self.check_quantized_roundtrip(
737 masked_image,
738 compression={"image": lossy, "mask": lossless, "variance": lossy},
739 roundtrip_atol=0.5, # target is 30 steps per sigma, and sigma=15.
740 )
741 lossy["quantization"]["scaling"] = "RANGE"
742 with self.subTest(dither=dither, scaling="RANGE"):
743 self.check_quantized_roundtrip(
744 masked_image,
745 compression={"image": lossy, "mask": lossless, "variance": lossy},
746 roundtrip_atol=1e-7, # since the dynamic range is tiny, this is actually barely lossy
747 )
749 def check_tile_shape(
750 self, image: Image, compression: Mapping[str, object], ztile1: int, ztile2: int
751 ) -> None:
752 with self.roundtrip_image_reader(image, compression=compression, original_fits=True) as (_, fits):
753 hdu = fits[1]
754 self.assertEqual(hdu.header["ZTILE1"], ztile1)
755 self.assertEqual(hdu.header["ZTILE2"], ztile2)
757 def test_tile_shapes(self) -> None:
758 """Test customizing the tile size in FITS compression."""
759 masked_image = self.make_float_image(
760 # Don't add hot or cold pixels because we won't be using a mask to
761 # keep them out of the stdev estimation.
762 np.float32,
763 noise_mean=100.0,
764 noise_sigma=15.0,
765 cold_count=0,
766 hot_count=0,
767 )
768 lossy = {
769 "image": {
770 "algorithm": "RICE_1",
771 "quantization": {
772 "dither": "NO_DITHER",
773 "scaling": "STDEV_CFITSIO",
774 "level": 30.0,
775 "seed": 1,
776 },
777 }
778 }
779 self.check_tile_shape(masked_image.image, lossy, ztile1=masked_image.width, ztile2=1)
780 lossy["image"]["tile_width"] = 1
781 lossy["image"]["tile_height"] = 0
782 self.check_tile_shape(masked_image.image, lossy, ztile1=1, ztile2=masked_image.height)
783 lossy["image"]["tile_width"] = 25
784 lossy["image"]["tile_height"] = 17
785 self.check_tile_shape(masked_image.image, lossy, ztile1=25, ztile2=17)
787 def test_uzscale_header(self) -> None:
788 """Test that we write the UZSCALE header card into the header when
789 ZSCALE is universal.
790 """
791 masked_image = self.make_float_image(
792 # Don't add hot or cold pixels because we won't be using a mask to
793 # keep them out of the stdev estimation.
794 np.float32,
795 noise_mean=100.0,
796 noise_sigma=15.0,
797 cold_count=0,
798 hot_count=0,
799 )
800 lossy = {
801 "algorithm": "RICE_1",
802 "quantization": {"level": 30.0, "mask_planes": ["HOT", "COLD", "NAN"], "seed": 1},
803 }
804 lossy = {
805 "image": {
806 "algorithm": "RICE_1",
807 "quantization": {
808 "dither": "NO_DITHER",
809 "scaling": "STDEV_MASKED",
810 "level": 30.0,
811 "seed": 1,
812 },
813 }
814 }
815 with self.roundtrip_image_reader(
816 masked_image.image, compression=lossy, original_fits=True
817 ) as (_, fits):
818 self.assertFloatsAlmostEqual(fits[1].header["UZSCALE"], 0.5, rtol=1E-2)
820 # I can't get CFITSIO to make the correction described by this test; if we
821 # update the header key after asking it to write the image, it gets into
822 # a weird state that prevents (at least) appending binary table HDUs.
823 @unittest.expectedFailure
824 def test_no_rice_one(self) -> None:
825 """Test that we've corrected CFITSIO's non-standard writing of RICE_ONE
826 instead of RICE_1 when the dither mode is SUBTRACTIVE_DITHER_2.
828 RICE_ONE is an old pre-standard key that is already in enough images
829 that it's reasonable for readers to accept it, but it is not correct
830 for CFITSIO to be writing it.
831 """
832 masked_image = self.make_float_image(
833 # Don't add hot or cold pixels because we won't be using a mask to
834 # keep them out of the stdev estimation.
835 np.float32,
836 noise_mean=100.0,
837 noise_sigma=15.0,
838 cold_count=0,
839 hot_count=0,
840 )
841 lossy = {
842 "image": {
843 "algorithm": "RICE_1",
844 "quantization": {
845 "dither": "SUBTRACTIVE_DITHER_2",
846 "scaling": "STDEV_CFITSIO",
847 "level": 30.0,
848 "seed": 1,
849 },
850 },
851 "mask": {
852 "algorithm": "GZIP_2",
853 },
854 "variance": {
855 "algorithm": "RICE_1",
856 "quantization": {
857 "dither": "SUBTRACTIVE_DITHER_2",
858 "scaling": "STDEV_CFITSIO",
859 "level": 30.0,
860 "seed": 1,
861 },
862 },
863 }
864 with self.roundtrip_image_reader(masked_image.image, lossy, original_fits=True) as (_, fits):
865 self.assertEqual(len(fits), 2)
866 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
867 with self.roundtrip_masked_image_reader(masked_image, lossy, original_fits=True) as (_, fits):
868 self.assertEqual(len(fits), 4)
869 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
870 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
871 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
873 def test_compressed_exposure(self) -> None:
874 masked_image = self.make_float_image(
875 np.float32, noise_mean=100.0, noise_sigma=15.0, hot_value=1e5, cold_value=-1e4
876 )
877 exposure = ExposureF(masked_image)
878 lossy = {
879 "algorithm": "RICE_1",
880 "quantization": {
881 "dither": "SUBTRACTIVE_DITHER_2",
882 "scaling": "STDEV_MASKED",
883 "mask_planes": ["HOT", "COLD", "NAN"],
884 "level": 30.0,
885 "seed": 1,
886 },
887 }
888 lossless = {"algorithm": "GZIP_2"}
889 with getTempFilePath(".fits") as filename:
890 exposure.writeFitsWithOptions(
891 filename,
892 {"image": lossy, "mask": lossless, "variance": lossy},
893 )
894 roundtripped = ExposureF(filename)
895 self.assertMaskedImagesAlmostEqual(masked_image, roundtripped.maskedImage, atol=0.25)
898if __name__ == "__main__": 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true
899 unittest.main()