Coverage for tests/test_gbdesAstrometricFit.py: 9%
627 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 09:08 +0000
1# This file is part of drp_tasks
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os.path
23import unittest
24from copy import copy
26import astropy.table
27import astropy.time
28import astropy.units as u
29import numpy as np
30import pandas as pd
31import wcsfit
32import yaml
33from astropy.coordinates import Distance, SkyCoord
34from smatch.matcher import Matcher
36import lsst.afw.geom as afwgeom
37import lsst.afw.table as afwTable
38import lsst.geom
39import lsst.utils
40from lsst import sphgeom
41from lsst.daf.base import PropertyList
42from lsst.drp.tasks.gbdesAstrometricFit import (
43 GbdesAstrometricFitConfig,
44 GbdesAstrometricFitTask,
45 GbdesGlobalAstrometricFitConfig,
46 GbdesGlobalAstrometricFitTask,
47 calculate_apparent_motion,
48)
49from lsst.meas.algorithms import ReferenceObjectLoader
50from lsst.meas.algorithms.testUtils import MockRefcatDataId
51from lsst.pipe.base import InMemoryDatasetHandle
53TESTDIR = os.path.abspath(os.path.dirname(__file__))
56class TestGbdesAstrometricFit(lsst.utils.tests.TestCase):
57 """This class tests `GbdesAstrometricFit` using real `visitSummaryTable`s
58 from HSC RC2 processing, with simulated sources for those visits.
59 """
61 @classmethod
62 def setUpClass(cls):
63 # Set random seed
64 np.random.seed(1234)
66 # Make fake data
67 cls.datadir = os.path.join(TESTDIR, "data")
69 cls.fieldNumber = 0
70 cls.nFields = 1
71 cls.instrumentName = "HSC"
72 cls.refEpoch = 57205.5
74 # Make test inputVisitSummary. VisitSummaryTables are taken from
75 # collection HSC/runs/RC2/w_2022_20/DM-34794
76 cls.testVisits = [1176, 17900, 17930, 17934]
77 cls.inputVisitSummary = []
78 for testVisit in cls.testVisits:
79 visSum = afwTable.ExposureCatalog.readFits(
80 os.path.join(cls.datadir, f"visitSummary_{testVisit}.fits")
81 )
82 cls.inputVisitSummary.append(visSum)
84 cls.config = GbdesAstrometricFitConfig()
85 cls.config.systematicError = 0
86 cls.config.devicePolyOrder = 4
87 cls.config.exposurePolyOrder = 6
88 cls.config.fitReserveFraction = 0
89 cls.config.fitReserveRandomSeed = 1234
90 cls.config.saveModelParams = True
91 cls.config.allowSelfMatches = True
92 cls.config.saveCameraModel = True
93 cls.config.applyRefCatProperMotion = False
94 cls.task = GbdesAstrometricFitTask(config=cls.config)
96 (
97 cls.exposureInfo,
98 cls.exposuresHelper,
99 cls.extensionInfo,
100 cls.instruments,
101 ) = cls.task._get_exposure_info(cls.inputVisitSummary, refEpoch=cls.refEpoch)
103 cls.fields, cls.fieldCenter, cls.fieldRadius = cls.task._prep_sky(
104 cls.inputVisitSummary, cls.exposureInfo.medianEpoch
105 )
107 # Bounding box of observations:
108 raMins, raMaxs = [], []
109 decMins, decMaxs = [], []
110 for visSum in cls.inputVisitSummary:
111 raMins.append(visSum["raCorners"].min())
112 raMaxs.append(visSum["raCorners"].max())
113 decMins.append(visSum["decCorners"].min())
114 decMaxs.append(visSum["decCorners"].max())
115 raMin = min(raMins)
116 raMax = max(raMaxs)
117 decMin = min(decMins)
118 decMax = max(decMaxs)
120 corners = [
121 lsst.geom.SpherePoint(raMin, decMin, lsst.geom.degrees).getVector(),
122 lsst.geom.SpherePoint(raMax, decMin, lsst.geom.degrees).getVector(),
123 lsst.geom.SpherePoint(raMax, decMax, lsst.geom.degrees).getVector(),
124 lsst.geom.SpherePoint(raMin, decMax, lsst.geom.degrees).getVector(),
125 ]
126 cls.boundingPolygon = sphgeom.ConvexPolygon(corners)
128 # Make random set of data in a bounding box determined by input visits
129 # Make wcs objects for the "true" model
130 cls.nStars, starIds, starRAs, starDecs = cls._make_simulated_stars(
131 raMin, raMax, decMin, decMax, cls.config.matchRadius
132 )
134 # Fraction of simulated stars in the reference catalog and science
135 # exposures
136 inReferenceFraction = 1
137 inScienceFraction = 1
139 # Make a reference catalog and load it into ReferenceObjectLoader
140 refDataId, deferredRefCat = cls._make_refCat(
141 starIds, starRAs, starDecs, inReferenceFraction, cls.boundingPolygon
142 )
143 cls.refObjectLoader = ReferenceObjectLoader([refDataId], [deferredRefCat])
144 cls.refObjectLoader.config.requireProperMotion = False
145 cls.refObjectLoader.config.anyFilterMapsToThis = "test_filter"
147 cls.task.refObjectLoader = cls.refObjectLoader
149 # Get True WCS for stars:
150 with open(os.path.join(cls.datadir, "sample_wcs.yaml"), "r") as f:
151 cls.trueModel = yaml.load(f, Loader=yaml.Loader)
153 trueWCSs = cls._make_wcs(cls.trueModel, cls.inputVisitSummary)
155 cls.isolatedStarCatalogs, cls.isolatedStarSources = cls._make_isolatedStars(
156 [starIds], [starRAs], [starDecs], trueWCSs, inScienceFraction
157 )
159 # Make source catalogs:
160 cls.inputCatalogRefs = cls._make_sourceCat(starIds, starRAs, starDecs, trueWCSs, inScienceFraction)
162 cls.colorCatalog = cls._make_colors(starRAs, starDecs)
164 cls.outputs = cls.task.run(
165 cls.inputCatalogRefs,
166 cls.inputVisitSummary,
167 instrumentName=cls.instrumentName,
168 refEpoch=cls.refEpoch,
169 refObjectLoader=cls.refObjectLoader,
170 )
172 @staticmethod
173 def _make_simulated_stars(raMin, raMax, decMin, decMax, matchRadius, nStars=10000):
174 """Generate random positions for "stars" in an RA/Dec box.
176 Parameters
177 ----------
178 raMin : `float`
179 Minimum RA for simulated stars.
180 raMax : `float`
181 Maximum RA for simulated stars.
182 decMin : `float`
183 Minimum Dec for simulated stars.
184 decMax : `float`
185 Maximum Dec for simulated stars.
186 matchRadius : `float`
187 Minimum allowed distance in arcsec between stars.
188 nStars : `int`, optional
189 Number of stars to simulate. Final number will be lower if any
190 too-close stars are dropped.
192 Returns
193 -------
194 nStars : `int`
195 Number of stars simulated.
196 starIds: `np.ndarray`
197 Unique identification number for stars.
198 starRAs: `np.ndarray`
199 Simulated Right Ascensions.
200 starDecs: `np.ndarray`
201 Simulated Declination.
202 """
203 starIds = np.arange(nStars)
204 starRAs = np.random.random(nStars) * (raMax - raMin) + raMin
205 starDecs = np.random.random(nStars) * (decMax - decMin) + decMin
206 # Remove neighbors:
207 with Matcher(starRAs, starDecs) as matcher:
208 idx = matcher.query_groups(matchRadius / 3600.0, min_match=2)
209 if len(idx) > 0:
210 neighbors = np.unique(np.concatenate(idx))
211 starRAs = np.delete(starRAs, neighbors)
212 starDecs = np.delete(starDecs, neighbors)
213 nStars = len(starRAs)
214 starIds = np.arange(nStars)
216 return nStars, starIds, starRAs, starDecs
218 @classmethod
219 def _make_isolatedStars(cls, allStarIds, allStarRAs, allStarDecs, trueWCSs, inScienceFraction):
220 """Given a subset of the simulated data, make source catalogs and star
221 catalogs following the isolated star association format.
223 This takes the true WCSs to go from the RA and Decs of the simulated
224 stars to pixel coordinates for a given visit and detector. If those
225 pixel coordinates are within the bounding box of the detector, the
226 source and visit information is put in the corresponding catalog of
227 isolated star sources.
229 Parameters
230 ----------
231 allStarIds : `np.ndarray` [`int`]
232 Source ids for the simulated stars
233 allStarRas : `np.ndarray` [`float`]
234 RAs of the simulated stars
235 allStarDecs : `np.ndarray` [`float`]
236 Decs of the simulated stars
237 trueWCSs : `list` [`lsst.afw.geom.SkyWcs`]
238 WCS with which to simulate the source pixel coordinates
239 inReferenceFraction : float
240 Percentage of simulated stars to include in reference catalog
242 Returns
243 -------
244 isolatedStarCatalogRefs : `list`
245 [`lsst.pipe.base.InMemoryDatasetHandle`]
246 List of references to isolated star catalogs.
247 isolatedStarSourceRefs : `list`
248 [`lsst.pipe.base.InMemoryDatasetHandle`]
249 List of references to isolated star sources.
250 """
251 bbox = lsst.geom.BoxD(
252 lsst.geom.Point2D(
253 cls.inputVisitSummary[0][0]["bbox_min_x"], cls.inputVisitSummary[0][0]["bbox_min_y"]
254 ),
255 lsst.geom.Point2D(
256 cls.inputVisitSummary[0][0]["bbox_max_x"], cls.inputVisitSummary[0][0]["bbox_max_y"]
257 ),
258 )
259 bboxCorners = bbox.getCorners()
261 isolatedStarCatalogRefs = []
262 isolatedStarSourceRefs = []
263 for i in range(len(allStarIds)):
264 starIds = allStarIds[i]
265 starRAs = allStarRAs[i]
266 starDecs = allStarDecs[i]
267 isolatedStarCatalog = pd.DataFrame({"ra": starRAs, "dec": starDecs}, index=starIds)
268 isolatedStarCatalogRefs.append(
269 InMemoryDatasetHandle(isolatedStarCatalog, storageClass="DataFrame", dataId={"tract": 0})
270 )
271 sourceCats = []
272 for v, visit in enumerate(cls.testVisits):
273 nVisStars = int(cls.nStars * inScienceFraction)
274 visitStarIndices = np.random.choice(cls.nStars, nVisStars, replace=False)
275 visitStarIds = starIds[visitStarIndices]
276 visitStarRas = starRAs[visitStarIndices]
277 visitStarDecs = starDecs[visitStarIndices]
278 for detector in trueWCSs[visit]:
279 detWcs = detector.getWcs()
280 detectorId = detector["id"]
281 radecCorners = detWcs.pixelToSky(bboxCorners)
282 detectorFootprint = sphgeom.ConvexPolygon([rd.getVector() for rd in radecCorners])
283 detectorIndices = detectorFootprint.contains(
284 (visitStarRas * u.degree).to(u.radian), (visitStarDecs * u.degree).to(u.radian)
285 )
286 nDetectorStars = detectorIndices.sum()
287 detectorArray = np.ones(nDetectorStars, dtype=int) * detector["id"]
288 visitArray = np.ones(nDetectorStars, dtype=int) * visit
290 ones_like = np.ones(nDetectorStars)
291 zeros_like = np.zeros(nDetectorStars, dtype=bool)
293 x, y = detWcs.skyToPixelArray(
294 visitStarRas[detectorIndices], visitStarDecs[detectorIndices], degrees=True
295 )
297 origWcs = (cls.inputVisitSummary[v][cls.inputVisitSummary[v]["id"] == detectorId])[
298 0
299 ].getWcs()
300 inputRa, inputDec = origWcs.pixelToSkyArray(x, y, degrees=True)
302 sourceDict = {}
303 sourceDict["detector"] = detectorArray
304 sourceDict["visit"] = visitArray
305 sourceDict["obj_index"] = visitStarIds[detectorIndices]
306 sourceDict["x"] = x
307 sourceDict["y"] = y
308 sourceDict["xErr"] = 1e-3 * ones_like
309 sourceDict["yErr"] = 1e-3 * ones_like
310 sourceDict["inputRA"] = inputRa
311 sourceDict["inputDec"] = inputDec
312 sourceDict["trueRA"] = visitStarRas[detectorIndices]
313 sourceDict["trueDec"] = visitStarDecs[detectorIndices]
314 for key in ["apFlux_12_0_flux", "apFlux_12_0_instFlux", "ixx", "iyy"]:
315 sourceDict[key] = ones_like
316 for key in [
317 "pixelFlags_edge",
318 "pixelFlags_saturated",
319 "pixelFlags_interpolatedCenter",
320 "pixelFlags_interpolated",
321 "pixelFlags_crCenter",
322 "pixelFlags_bad",
323 "hsmPsfMoments_flag",
324 "apFlux_12_0_flag",
325 "extendedness",
326 "parentSourceId",
327 "deblend_nChild",
328 "ixy",
329 ]:
330 sourceDict[key] = zeros_like
331 sourceDict["apFlux_12_0_instFluxErr"] = 1e-2 * ones_like
332 sourceDict["detect_isPrimary"] = ones_like.astype(bool)
334 sourceCat = pd.DataFrame(sourceDict)
335 sourceCats.append(sourceCat)
337 isolatedStarSourceTable = pd.concat(sourceCats, ignore_index=True)
338 isolatedStarSourceTable = isolatedStarSourceTable.sort_values(by=["obj_index"])
339 isolatedStarSourceRefs.append(
340 InMemoryDatasetHandle(isolatedStarSourceTable, storageClass="DataFrame", dataId={"tract": 0})
341 )
343 return isolatedStarCatalogRefs, isolatedStarSourceRefs
345 @classmethod
346 def _make_refCat(cls, starIds, starRas, starDecs, inReferenceFraction, bounds):
347 """Make reference catalog from a subset of the simulated data
349 Parameters
350 ----------
351 starIds : `np.ndarray` [`int`]
352 Source ids for the simulated stars
353 starRas : `np.ndarray` [`float`]
354 RAs of the simulated stars
355 starDecs : `np.ndarray` [`float`]
356 Decs of the simulated stars
357 inReferenceFraction : float
358 Percentage of simulated stars to include in reference catalog
359 bounds : `lsst.sphgeom.ConvexPolygon`
360 Boundary of the reference catalog region
362 Returns
363 -------
364 refDataId : `lsst.meas.algorithms.testUtils.MockRefcatDataId`
365 Object that replicates the functionality of a dataId.
366 deferredRefCat : `lsst.pipe.base.InMemoryDatasetHandle`
367 Dataset handle for reference catalog.
368 """
369 nRefs = int(cls.nStars * inReferenceFraction)
370 refStarIndices = np.random.choice(cls.nStars, nRefs, replace=False)
371 # Make simpleCatalog to hold data, create datasetRef with `region`
372 # determined by bounding box used in above simulate.
373 refSchema = afwTable.SimpleTable.makeMinimalSchema()
374 idKey = refSchema.addField("sourceId", type="I")
375 fluxKey = refSchema.addField("test_filter_flux", units="nJy", type=np.float64)
376 raErrKey = refSchema.addField("coord_raErr", units="rad", type=np.float64)
377 decErrKey = refSchema.addField("coord_decErr", units="rad", type=np.float64)
378 pmraErrKey = refSchema.addField("pm_raErr", units="rad2 / yr", type=np.float64)
379 pmdecErrKey = refSchema.addField("pm_decErr", units="rad2 / yr", type=np.float64)
380 refCat = afwTable.SimpleCatalog(refSchema)
381 ref_md = PropertyList()
382 ref_md.set("REFCAT_FORMAT_VERSION", 1)
383 refCat.table.setMetadata(ref_md)
384 for i in refStarIndices:
385 record = refCat.addNew()
386 record.set(idKey, starIds[i])
387 record.setRa(lsst.geom.Angle(starRas[i], lsst.geom.degrees))
388 record.setDec(lsst.geom.Angle(starDecs[i], lsst.geom.degrees))
389 record.set(fluxKey, 1)
390 record.set(raErrKey, 1e-8)
391 record.set(decErrKey, 1e-8)
392 record.set(pmraErrKey, 1e-9)
393 record.set(pmdecErrKey, 1e-9)
394 refDataId = MockRefcatDataId(bounds)
395 deferredRefCat = InMemoryDatasetHandle(refCat, storageClass="SourceCatalog", htm7="mockRefCat")
397 return refDataId, deferredRefCat
399 @classmethod
400 def _make_sourceCat(cls, starIds, starRas, starDecs, trueWCSs, inScienceFraction):
401 """Make a `pd.DataFrame` catalog with the columns needed for the
402 object selector.
404 Parameters
405 ----------
406 starIds : `np.ndarray` [`int`]
407 Source ids for the simulated stars
408 starRas : `np.ndarray` [`float`]
409 RAs of the simulated stars
410 starDecs : `np.ndarray` [`float`]
411 Decs of the simulated stars
412 trueWCSs : `list` [`lsst.afw.geom.SkyWcs`]
413 WCS with which to simulate the source pixel coordinates
414 inReferenceFraction : float
415 Percentage of simulated stars to include in reference catalog
417 Returns
418 -------
419 sourceCat : `list` [`lsst.pipe.base.InMemoryDatasetHandle`]
420 List of reference to source catalogs.
421 """
422 inputCatalogRefs = []
423 # Take a subset of the simulated data
424 # Use true wcs objects to put simulated data into ccds
425 bbox = lsst.geom.BoxD(
426 lsst.geom.Point2D(
427 cls.inputVisitSummary[0][0]["bbox_min_x"], cls.inputVisitSummary[0][0]["bbox_min_y"]
428 ),
429 lsst.geom.Point2D(
430 cls.inputVisitSummary[0][0]["bbox_max_x"], cls.inputVisitSummary[0][0]["bbox_max_y"]
431 ),
432 )
433 bboxCorners = bbox.getCorners()
434 cls.inputCatalogRefs = []
435 for v, visit in enumerate(cls.testVisits):
436 nVisStars = int(cls.nStars * inScienceFraction)
437 visitStarIndices = np.random.choice(cls.nStars, nVisStars, replace=False)
438 visitStarIds = starIds[visitStarIndices]
439 visitStarRas = starRas[visitStarIndices]
440 visitStarDecs = starDecs[visitStarIndices]
441 sourceCats = []
442 for detector in trueWCSs[visit]:
443 detWcs = detector.getWcs()
444 detectorId = detector["id"]
445 radecCorners = detWcs.pixelToSky(bboxCorners)
446 detectorFootprint = sphgeom.ConvexPolygon([rd.getVector() for rd in radecCorners])
447 detectorIndices = detectorFootprint.contains(
448 (visitStarRas * u.degree).to(u.radian), (visitStarDecs * u.degree).to(u.radian)
449 )
450 nDetectorStars = detectorIndices.sum()
451 detectorArray = np.ones(nDetectorStars, dtype=bool) * detector["id"]
453 ones_like = np.ones(nDetectorStars)
454 zeros_like = np.zeros(nDetectorStars, dtype=bool)
456 x, y = detWcs.skyToPixelArray(
457 visitStarRas[detectorIndices], visitStarDecs[detectorIndices], degrees=True
458 )
460 origWcs = (cls.inputVisitSummary[v][cls.inputVisitSummary[v]["id"] == detectorId])[0].getWcs()
461 inputRa, inputDec = origWcs.pixelToSkyArray(x, y, degrees=True)
463 sourceDict = {}
464 sourceDict["detector"] = detectorArray
465 sourceDict["sourceId"] = visitStarIds[detectorIndices]
466 sourceDict["x"] = x
467 sourceDict["y"] = y
468 sourceDict["xErr"] = 1e-3 * ones_like
469 sourceDict["yErr"] = 1e-3 * ones_like
470 sourceDict["inputRA"] = inputRa
471 sourceDict["inputDec"] = inputDec
472 sourceDict["trueRA"] = visitStarRas[detectorIndices]
473 sourceDict["trueDec"] = visitStarDecs[detectorIndices]
474 for key in ["apFlux_12_0_flux", "apFlux_12_0_instFlux", "ixx", "iyy"]:
475 sourceDict[key] = ones_like
476 for key in [
477 "pixelFlags_edge",
478 "pixelFlags_saturated",
479 "pixelFlags_interpolatedCenter",
480 "pixelFlags_interpolated",
481 "pixelFlags_crCenter",
482 "pixelFlags_bad",
483 "hsmPsfMoments_flag",
484 "apFlux_12_0_flag",
485 "extendedness",
486 "sizeExtendedness",
487 "parentSourceId",
488 "deblend_nChild",
489 "ixy",
490 ]:
491 sourceDict[key] = zeros_like
492 sourceDict["apFlux_12_0_instFluxErr"] = 1e-2 * ones_like
493 sourceDict["detect_isPrimary"] = ones_like.astype(bool)
495 sourceCat = pd.DataFrame(sourceDict)
496 sourceCats.append(sourceCat)
498 visitSourceTable = pd.concat(sourceCats)
500 inputCatalogRef = InMemoryDatasetHandle(
501 visitSourceTable, storageClass="DataFrame", dataId={"visit": visit}
502 )
504 inputCatalogRefs.append(inputCatalogRef)
506 return inputCatalogRefs
508 @classmethod
509 def _make_wcs(cls, model, inputVisitSummaries):
510 """Make a `lsst.afw.geom.SkyWcs` from given model parameters
512 Parameters
513 ----------
514 model : `dict`
515 Dictionary with WCS model parameters
516 inputVisitSummaries : `list` [`lsst.afw.table.ExposureCatalog`]
517 Visit summary catalogs
518 Returns
519 -------
520 catalogs : `dict` [`int`, `lsst.afw.table.ExposureCatalog`]
521 Visit summary catalogs with WCS set to input model
522 """
524 # Pixels will need to be rescaled before going into the mappings
525 xscale = inputVisitSummaries[0][0]["bbox_max_x"] - inputVisitSummaries[0][0]["bbox_min_x"]
526 yscale = inputVisitSummaries[0][0]["bbox_max_y"] - inputVisitSummaries[0][0]["bbox_min_y"]
528 catalogs = {}
529 schema = lsst.afw.table.ExposureTable.makeMinimalSchema()
530 schema.addField("visit", type="L", doc="Visit number")
531 for visitSum in inputVisitSummaries:
532 visit = visitSum[0]["visit"]
533 visitMapName = f"{visit}/poly"
534 visitModel = model[visitMapName]
536 catalog = lsst.afw.table.ExposureCatalog(schema)
537 catalog.resize(len(visitSum))
538 catalog["visit"] = visit
540 raDec = visitSum[0].getVisitInfo().getBoresightRaDec()
542 visitMapType = visitModel["Type"]
543 visitDict = {"Type": visitMapType}
544 if visitMapType == "Poly":
545 mapCoefficients = visitModel["XPoly"]["Coefficients"] + visitModel["YPoly"]["Coefficients"]
546 visitDict["Coefficients"] = mapCoefficients
548 for d, detector in enumerate(visitSum):
549 detectorId = detector["id"]
550 filterName = detector["physical_filter"]
551 detectorMapName = f"{filterName}/{detectorId}/poly"
552 detectorModel = model[detectorMapName]
554 detectorMapType = detectorModel["Type"]
555 mapDict = {detectorMapName: {"Type": detectorMapType}, visitMapName: visitDict}
556 if detectorMapType == "Poly":
557 mapCoefficients = (
558 detectorModel["XPoly"]["Coefficients"] + detectorModel["YPoly"]["Coefficients"]
559 )
560 mapDict[detectorMapName]["Coefficients"] = mapCoefficients
562 outWCS = cls.task._make_afw_wcs(
563 mapDict,
564 raDec.getRa(),
565 raDec.getDec(),
566 doNormalizePixels=True,
567 xScale=xscale,
568 yScale=yscale,
569 )
570 catalog[d].setId(detectorId)
571 catalog[d].setWcs(outWCS)
573 catalog.sort()
574 catalogs[visit] = catalog
576 return catalogs
578 @classmethod
579 def _make_colors(cls, starRas, starDecs):
580 """Make a catalog with the star magnitudes.
582 Parameters
583 ----------
584 starRas : `np.ndarray` [`float`]
585 RAs of the simulated stars
586 starDecs : `np.ndarray` [`float`]
587 Decs of the simulated stars
589 Returns
590 -------
591 colorCatalog : `lsst.afw.table.SimpleCatalog`
592 Catalog with star magnitudes.
593 """
594 bands = ["g", "r", "i", "z", "y"]
595 nStars = len(starRas)
597 # Make a catalog following what is done in `fgcmCal`.
598 schema = afwTable.SimpleTable.makeMinimalSchema()
599 schema.addField(
600 "mag_std_noabs",
601 type="ArrayF",
602 doc="Standard magnitude (no absolute calibration)",
603 size=len(bands),
604 )
605 colorCatalog = afwTable.SimpleCatalog(schema)
606 colorCatalog.resize(len(starRas))
607 colorCatalog["coord_ra"] = (starRas * u.degree + 10 * np.random.randn(nStars) * u.mas).to(u.radian)
608 colorCatalog["coord_dec"] = (starDecs * u.degree + 10 * np.random.randn(nStars) * u.mas).to(u.radian)
610 magMin = 19
611 magMax = 23
612 for i in range(len(bands)):
613 colorCatalog["mag_std_noabs"][:, i] = np.random.random(nStars) * (magMax - magMin) + magMin
615 md = PropertyList()
616 md.set("BANDS", bands)
617 colorCatalog.setMetadata(md)
618 return colorCatalog
620 def test_get_exposure_info(self):
621 """Test that information for input exposures is as expected and that
622 the WCS in the class object gives approximately the same results as the
623 input `lsst.afw.geom.SkyWcs`.
624 """
626 # The total number of extensions is the number of detectors for each
627 # visit plus one for the reference catalog for each field.
628 totalExtensions = sum([len(visSum) for visSum in self.inputVisitSummary]) + self.nFields
630 self.assertEqual(totalExtensions, len(self.extensionInfo.visit))
632 taskVisits = set(self.extensionInfo.visit)
633 refVisits = np.arange(0, -1 * self.nFields, -1).tolist()
634 self.assertEqual(taskVisits, set(self.testVisits + refVisits))
636 xx = np.linspace(0, 2000, 3)
637 yy = np.linspace(0, 4000, 6)
638 xgrid, ygrid = np.meshgrid(xx, yy)
639 for visSum in self.inputVisitSummary:
640 visit = visSum[0]["visit"]
641 for detectorInfo in visSum:
642 detector = detectorInfo["id"]
643 extensionIndex = np.flatnonzero(
644 (self.extensionInfo.visit == visit) & (self.extensionInfo.detector == detector)
645 )[0]
646 fitWcs = self.extensionInfo.wcs[extensionIndex]
647 calexpWcs = detectorInfo.getWcs()
649 tanPlaneXY = np.array([fitWcs.toWorld(x, y) for (x, y) in zip(xgrid.ravel(), ygrid.ravel())])
651 calexpra, calexpdec = calexpWcs.pixelToSkyArray(xgrid.ravel(), ygrid.ravel(), degrees=True)
653 # The pixel origin maps to a position slightly off from the
654 # tangent plane origin, so we want to use this value for the
655 # tangent-plane-to-sky part of the mapping.
656 tangentPlaneCenter = fitWcs.toWorld(
657 calexpWcs.getPixelOrigin().getX(), calexpWcs.getPixelOrigin().getY()
658 )
659 tangentPlaneOrigin = lsst.geom.Point2D(tangentPlaneCenter)
660 skyOrigin = calexpWcs.pixelToSky(
661 calexpWcs.getPixelOrigin().getX(), calexpWcs.getPixelOrigin().getY()
662 )
663 cdMatrix = afwgeom.makeCdMatrix(1.0 * lsst.geom.degrees, 0.0 * lsst.geom.degrees, True)
664 iwcToSkyWcs = afwgeom.makeSkyWcs(tangentPlaneOrigin, skyOrigin, cdMatrix)
665 newRAdeg, newDecdeg = iwcToSkyWcs.pixelToSkyArray(
666 tanPlaneXY[:, 0], tanPlaneXY[:, 1], degrees=True
667 )
669 np.testing.assert_allclose(calexpra, newRAdeg)
670 np.testing.assert_allclose(calexpdec, newDecdeg)
672 def test_refCatLoader(self):
673 """Test that we can load objects from refCat"""
675 tmpAssociations = wcsfit.FoFClass(
676 self.fields,
677 self.instruments,
678 self.exposuresHelper,
679 [self.fieldRadius.asDegrees()],
680 (self.task.config.matchRadius * u.arcsec).to(u.degree).value,
681 )
683 self.task._load_refcat(
684 self.refObjectLoader,
685 self.extensionInfo,
686 associations=tmpAssociations,
687 center=self.fieldCenter,
688 radius=self.fieldRadius,
689 epoch=self.exposureInfo.medianEpoch,
690 )
692 # We have only loaded one catalog, so getting the 'matches' should just
693 # return the same objects we put in, except some random objects that
694 # are too close together.
695 tmpAssociations.sortMatches(self.fieldNumber, minMatches=1)
697 nMatches = (np.array(tmpAssociations.sequence) == 0).sum()
699 self.assertLessEqual(nMatches, self.nStars)
700 self.assertGreater(nMatches, self.nStars * 0.9)
702 def test_loading_and_association(self):
703 """Test that objects can be loaded and correctly associated."""
704 # Running `_load_catalogs_and_associate` changes the input WCSs, so
705 # recalculate them here so that the variables shared among tests are
706 # not affected.
707 _, exposuresHelper, extensionInfo, instruments = self.task._get_exposure_info(
708 self.inputVisitSummary, refEpoch=self.refEpoch
709 )
711 tmpAssociations = wcsfit.FoFClass(
712 self.fields,
713 self.instruments,
714 exposuresHelper,
715 [self.fieldRadius.asDegrees()],
716 (self.task.config.matchRadius * u.arcsec).to(u.degree).value,
717 )
718 self.task._load_catalogs_and_associate(tmpAssociations, self.inputCatalogRefs, extensionInfo)
720 tmpAssociations.sortMatches(self.fieldNumber, minMatches=2)
722 matchIds = []
723 correctMatches = []
724 for s, e, o in zip(tmpAssociations.sequence, tmpAssociations.extn, tmpAssociations.obj):
725 objVisitInd = extensionInfo.visitIndex[e]
726 objDet = extensionInfo.detector[e]
727 extnInds = self.inputCatalogRefs[objVisitInd].get()["detector"] == objDet
728 objInfo = self.inputCatalogRefs[objVisitInd].get()[extnInds].iloc[o]
729 if s == 0:
730 if len(matchIds) > 0:
731 correctMatches.append(len(set(matchIds)) == 1)
732 matchIds = []
734 matchIds.append(objInfo["sourceId"])
736 # A few matches may incorrectly associate sources because of the random
737 # positions
738 self.assertGreater(sum(correctMatches), len(correctMatches) * 0.95)
740 def test_make_outputs(self):
741 """Test that the run method recovers the input model parameters."""
742 for v, visit in enumerate(self.testVisits):
743 visitSummary = self.inputVisitSummary[v]
744 outputWcsCatalog = self.outputs.outputWcss[visit]
745 visitSources = self.inputCatalogRefs[v].get()
746 for d, detectorRow in enumerate(visitSummary):
747 detectorId = detectorRow["id"]
748 fitwcs = outputWcsCatalog[d].getWcs()
749 detSources = visitSources[visitSources["detector"] == detectorId]
750 fitRA, fitDec = fitwcs.pixelToSkyArray(detSources["x"], detSources["y"], degrees=True)
751 dRA = fitRA - detSources["trueRA"]
752 dDec = fitDec - detSources["trueDec"]
753 # Check that input coordinates match the output coordinates
754 self.assertAlmostEqual(np.mean(dRA), 0)
755 self.assertAlmostEqual(np.std(dRA), 0)
756 self.assertAlmostEqual(np.mean(dDec), 0)
757 self.assertAlmostEqual(np.std(dDec), 0)
759 def test_compute_model_params(self):
760 """Test the optional model parameters and covariance output."""
761 modelParams = pd.DataFrame(self.outputs.modelParams)
762 # Check that DataFrame is the expected size.
763 shape = modelParams.shape
764 self.assertEqual(shape[0] + 4, shape[1])
765 # Check that covariance matrix is symmetric.
766 covariance = (modelParams.iloc[:, 4:]).to_numpy()
767 np.testing.assert_allclose(covariance, covariance.T, atol=1e-18)
769 def test_run(self):
770 """Test that run method recovers the input model parameters"""
771 outputMaps = self.outputs.fitModel.mapCollection.getParamDict()
773 for v, visit in enumerate(self.testVisits):
774 visitSummary = self.inputVisitSummary[v]
775 visitMapName = f"{visit}/poly"
777 origModel = self.trueModel[visitMapName]
778 if origModel["Type"] != "Identity":
779 fitModel = outputMaps[visitMapName]
780 origXPoly = origModel["XPoly"]["Coefficients"]
781 origYPoly = origModel["YPoly"]["Coefficients"]
782 fitXPoly = fitModel[: len(origXPoly)]
783 fitYPoly = fitModel[len(origXPoly) :]
785 absDiffX = abs(fitXPoly - origXPoly)
786 absDiffY = abs(fitYPoly - origYPoly)
787 # Check that input visit model matches fit
788 np.testing.assert_array_less(absDiffX, 1e-6)
789 np.testing.assert_array_less(absDiffY, 1e-6)
790 for d, detectorRow in enumerate(visitSummary):
791 detectorId = detectorRow["id"]
792 filterName = detectorRow["physical_filter"]
793 detectorMapName = f"{filterName}/{detectorId}/poly"
794 origModel = self.trueModel[detectorMapName]
795 if (origModel["Type"] != "Identity") and (v == 0):
796 fitModel = outputMaps[detectorMapName]
797 origXPoly = origModel["XPoly"]["Coefficients"]
798 origYPoly = origModel["YPoly"]["Coefficients"]
799 fitXPoly = fitModel[: len(origXPoly)]
800 fitYPoly = fitModel[len(origXPoly) :]
801 absDiffX = abs(fitXPoly - origXPoly)
802 absDiffY = abs(fitYPoly - origYPoly)
803 # Check that input detector model matches fit
804 np.testing.assert_array_less(absDiffX, 1e-7)
805 np.testing.assert_array_less(absDiffY, 1e-7)
807 def test_missingWcs(self):
808 """Test that task does not fail when the input WCS is None for one
809 extension and that the fit WCS for that extension returns a finite
810 result.
811 """
812 # Need to copy the catalogs since they are modified later.
813 inputVisitSummary = [cat.copy(deep=True) for cat in self.inputVisitSummary]
814 # Set one WCS to be None
815 testVisit = 0
816 testDetector = 20
817 inputVisitSummary[testVisit][testDetector].setWcs(None)
819 outputs = self.task.run(
820 self.inputCatalogRefs,
821 inputVisitSummary,
822 instrumentName=self.instrumentName,
823 refEpoch=self.refEpoch,
824 refObjectLoader=self.refObjectLoader,
825 )
827 # Check that the fit WCS for the extension with input WCS=None returns
828 # finite sky values.
829 testWcs = outputs.outputWcss[self.testVisits[testVisit]][testDetector].getWcs()
830 testSky = testWcs.pixelToSky(0, 0)
831 self.assertTrue(testSky.isFinite())
833 def test_inputCameraModel(self):
834 """Test running task with an input camera model, and check that true
835 object coordinates are recovered.
836 """
837 config = copy(self.config)
838 config.saveCameraModel = False
839 config.useInputCameraModel = True
840 task = GbdesAstrometricFitTask(config=config)
841 with open(os.path.join(self.datadir, "sample_camera_model.yaml"), "r") as f:
842 cameraModel = yaml.load(f, Loader=yaml.Loader)
844 outputs = task.run(
845 self.inputCatalogRefs,
846 self.inputVisitSummary,
847 instrumentName=self.instrumentName,
848 refObjectLoader=self.refObjectLoader,
849 inputCameraModel=cameraModel,
850 )
852 # Check that the output WCS is close (not necessarily exactly equal) to
853 # the result when fitting the full model.
854 for v, visit in enumerate(self.testVisits):
855 visitSummary = self.inputVisitSummary[v]
856 outputWcsCatalog = outputs.outputWcss[visit]
857 visitSources = self.inputCatalogRefs[v].get()
858 for d, detectorRow in enumerate(visitSummary):
859 detectorId = detectorRow["id"]
860 fitwcs = outputWcsCatalog[d].getWcs()
861 detSources = visitSources[visitSources["detector"] == detectorId]
862 fitRA, fitDec = fitwcs.pixelToSkyArray(detSources["x"], detSources["y"], degrees=True)
863 dRA = fitRA - detSources["trueRA"]
864 dDec = fitDec - detSources["trueDec"]
865 # Check that input coordinates match the output coordinates
866 self.assertAlmostEqual(np.mean(dRA), 0)
867 self.assertAlmostEqual(np.std(dRA), 0)
868 self.assertAlmostEqual(np.mean(dDec), 0)
869 self.assertAlmostEqual(np.std(dDec), 0)
871 def test_useColor(self):
872 """Test running task with color catalog and DCR fitting."""
873 config = copy(self.config)
874 config.useColor = True
876 task = GbdesAstrometricFitTask(config=config)
877 outputs = task.run(
878 self.inputCatalogRefs,
879 self.inputVisitSummary,
880 instrumentName=self.instrumentName,
881 refObjectLoader=self.refObjectLoader,
882 colorCatalog=self.colorCatalog,
883 )
884 self.assertEqual(len(outputs.colorParams["visit"]), len(self.testVisits))
886 def test_useIsolatedStars(self):
887 """Test that the input star positions are recovered when isolated star
888 catalogs are used for the input."""
890 config = copy(self.config)
891 config.useIsolatedStars = True
893 task = GbdesAstrometricFitTask(config=config)
894 outputs = task.run(
895 None,
896 self.inputVisitSummary,
897 isolatedStarSources=self.isolatedStarSources,
898 isolatedStarCatalogs=self.isolatedStarCatalogs,
899 refObjectLoader=self.refObjectLoader,
900 )
901 for isolatedStarSourceRef in self.isolatedStarSources:
902 iss = isolatedStarSourceRef.get()
903 visits = np.unique(iss["visit"])
904 for v, visit in enumerate(visits):
905 outputWcsCatalog = outputs.outputWcss[visit]
906 visitSources = iss[iss["visit"] == visit]
907 detectors = outputWcsCatalog["id"]
908 for d, detectorId in enumerate(detectors):
909 fitwcs = outputWcsCatalog[d].getWcs()
910 detSources = visitSources[visitSources["detector"] == detectorId]
911 fitRA, fitDec = fitwcs.pixelToSkyArray(detSources["x"], detSources["y"], degrees=True)
912 dRA = fitRA - detSources["trueRA"]
913 dDec = fitDec - detSources["trueDec"]
914 # Check that input coordinates match the output coordinates
915 self.assertAlmostEqual(np.mean(dRA), 0)
916 self.assertAlmostEqual(np.std(dRA), 0)
917 self.assertAlmostEqual(np.mean(dDec), 0)
918 self.assertAlmostEqual(np.std(dDec), 0)
921class TestGbdesGlobalAstrometricFit(TestGbdesAstrometricFit):
922 @classmethod
923 def setUpClass(cls):
924 # Set random seed
925 np.random.seed(1234)
927 # Fraction of simulated stars in the reference catalog and science
928 # exposures
929 inReferenceFraction = 1
930 inScienceFraction = 1
932 # Make fake data
933 cls.datadir = os.path.join(TESTDIR, "data")
935 cls.nFields = 2
936 cls.instrumentName = "HSC"
937 cls.refEpoch = 57205.5
939 # Make test inputVisitSummary. VisitSummaryTables are taken from
940 # collection HSC/runs/RC2/w_2022_20/DM-34794
941 cls.testVisits = [1176, 17900, 17930, 17934, 36434, 36460, 36494, 36446]
942 cls.inputVisitSummary = []
943 for testVisit in cls.testVisits:
944 visSum = afwTable.ExposureCatalog.readFits(
945 os.path.join(cls.datadir, f"visitSummary_{testVisit}.fits")
946 )
947 cls.inputVisitSummary.append(visSum)
949 cls.config = GbdesGlobalAstrometricFitConfig()
950 cls.config.systematicError = 0
951 cls.config.devicePolyOrder = 4
952 cls.config.exposurePolyOrder = 6
953 cls.config.fitReserveFraction = 0
954 cls.config.fitReserveRandomSeed = 1234
955 cls.config.saveModelParams = True
956 cls.config.saveCameraModel = True
957 cls.config.applyRefCatProperMotion = False
958 cls.task = GbdesGlobalAstrometricFitTask(config=cls.config)
960 cls.fields, cls.fieldRegions = cls.task._prep_sky(cls.inputVisitSummary)
962 (
963 cls.exposureInfo,
964 cls.exposuresHelper,
965 cls.extensionInfo,
966 cls.instruments,
967 ) = cls.task._get_exposure_info(cls.inputVisitSummary, fieldRegions=cls.fieldRegions)
969 refDataIds, deferredRefCats = [], []
970 allStarIds = []
971 allStarRAs = []
972 allStarDecs = []
973 for region in cls.fieldRegions.values():
974 # Bounding box of observations:
975 bbox = region.getBoundingBox()
976 raMin = bbox.getLon().getA().asDegrees()
977 raMax = bbox.getLon().getB().asDegrees()
978 decMin = bbox.getLat().getA().asDegrees()
979 decMax = bbox.getLat().getB().asDegrees()
981 # Make random set of data in a bounding box determined by input
982 # visits
983 cls.nStars, starIds, starRAs, starDecs = cls._make_simulated_stars(
984 raMin, raMax, decMin, decMax, cls.config.matchRadius
985 )
987 allStarIds.append(starIds)
988 allStarRAs.append(starRAs)
989 allStarDecs.append(starDecs)
991 corners = [
992 lsst.geom.SpherePoint(ra, dec, lsst.geom.degrees).getVector()
993 for (ra, dec) in zip([raMin, raMax, raMax, raMin], [decMin, decMin, decMax, decMax])
994 ]
995 conv_box = lsst.sphgeom.ConvexPolygon.convexHull(corners)
996 # Make a reference catalog that will be loaded into
997 # ReferenceObjectLoader
998 refDataId, deferredRefCat = cls._make_refCat(
999 starIds, starRAs, starDecs, inReferenceFraction, conv_box
1000 )
1001 refDataIds.append(refDataId)
1002 deferredRefCats.append(deferredRefCat)
1004 cls.refObjectLoader = ReferenceObjectLoader(refDataIds, deferredRefCats)
1005 cls.refObjectLoader.config.requireProperMotion = False
1006 cls.refObjectLoader.config.anyFilterMapsToThis = "test_filter"
1008 cls.task.refObjectLoader = cls.refObjectLoader
1010 allRefObjects, allRefCovariances = {}, {}
1011 for f, fieldRegion in cls.fieldRegions.items():
1012 refObjects, refCovariance = cls.task._load_refcat(
1013 cls.refObjectLoader, cls.extensionInfo, epoch=cls.exposureInfo.medianEpoch, region=fieldRegion
1014 )
1015 allRefObjects[f] = refObjects
1016 allRefCovariances[f] = refCovariance
1017 cls.refObjects = allRefObjects
1019 # Get True WCS for stars:
1020 with open(os.path.join(cls.datadir, "sample_global_wcs.yaml"), "r") as f:
1021 cls.trueModel = yaml.load(f, Loader=yaml.Loader)
1023 cls.trueWCSs = cls._make_wcs(cls.trueModel, cls.inputVisitSummary)
1025 cls.isolatedStarCatalogs, cls.isolatedStarSources = cls._make_isolatedStars(
1026 allStarIds, allStarRAs, allStarDecs, cls.trueWCSs, inScienceFraction
1027 )
1029 cls.colorCatalog = cls._make_colors(np.concatenate(allStarRAs), np.concatenate(allStarDecs))
1031 cls.outputs = cls.task.run(
1032 cls.inputVisitSummary,
1033 cls.isolatedStarSources,
1034 cls.isolatedStarCatalogs,
1035 instrumentName=cls.instrumentName,
1036 refEpoch=cls.refEpoch,
1037 refObjectLoader=cls.refObjectLoader,
1038 )
1040 def test_loading_and_association(self):
1041 """Test that associated objects actually correspond to the same
1042 simulated object."""
1043 associations, sourceDict = self.task._associate_from_isolated_sources(
1044 self.isolatedStarSources, self.isolatedStarCatalogs, self.extensionInfo, self.refObjects
1045 )
1047 object_groups = np.flatnonzero(np.array(associations.sequence) == 0)
1048 for i in range(len(object_groups) - 1)[:10]:
1049 ras, decs = [], []
1050 for ind in np.arange(object_groups[i], object_groups[i + 1]):
1051 visit = self.extensionInfo.visit[associations.extn[ind]]
1052 detectorInd = self.extensionInfo.detectorIndex[associations.extn[ind]]
1053 detector = self.extensionInfo.detector[associations.extn[ind]]
1054 if detectorInd == -1:
1055 ra = self.refObjects[visit * -1]["ra"][associations.obj[ind]]
1056 dec = self.refObjects[visit * -1]["dec"][associations.obj[ind]]
1057 ras.append(ra)
1058 decs.append(dec)
1059 else:
1060 x = sourceDict[visit][detector]["x"][associations.obj[ind]]
1061 y = sourceDict[visit][detector]["y"][associations.obj[ind]]
1062 ra, dec = self.trueWCSs[visit][detectorInd].getWcs().pixelToSky(x, y)
1063 ras.append(ra.asDegrees())
1064 decs.append(dec.asDegrees())
1065 np.testing.assert_allclose(ras, ras[0])
1066 np.testing.assert_allclose(decs, decs[0])
1068 def test_refCatLoader(self):
1069 """Test loading objects from the refCat in each of the fields."""
1071 for region in self.fieldRegions.values():
1072 refCat, refCov = self.task._load_refcat(
1073 self.refObjectLoader, self.extensionInfo, region=region, epoch=self.exposureInfo.medianEpoch
1074 )
1075 assert len(refCat) > 0
1077 def test_make_outputs(self):
1078 """Test that the run method recovers the input model parameters."""
1079 for isolatedStarSourceRef in self.isolatedStarSources:
1080 iss = isolatedStarSourceRef.get()
1081 visits = np.unique(iss["visit"])
1082 for v, visit in enumerate(visits):
1083 outputWcsCatalog = self.outputs.outputWcss[visit]
1084 visitSources = iss[iss["visit"] == visit]
1085 detectors = outputWcsCatalog["id"]
1086 for d, detectorId in enumerate(detectors):
1087 fitwcs = outputWcsCatalog[d].getWcs()
1088 detSources = visitSources[visitSources["detector"] == detectorId]
1089 fitRA, fitDec = fitwcs.pixelToSkyArray(detSources["x"], detSources["y"], degrees=True)
1090 dRA = fitRA - detSources["trueRA"]
1091 dDec = fitDec - detSources["trueDec"]
1092 # Check that input coordinates match the output coordinates
1093 self.assertAlmostEqual(np.mean(dRA), 0)
1094 self.assertAlmostEqual(np.std(dRA), 0)
1095 self.assertAlmostEqual(np.mean(dDec), 0)
1096 self.assertAlmostEqual(np.std(dDec), 0)
1098 def test_missingWcs(self):
1099 """Test that task does not fail when the input WCS is None for one
1100 extension and that the fit WCS for that extension returns a finite
1101 result.
1102 """
1103 # Need to copy the catalogs since they are modified later.
1104 inputVisitSummary = [cat.copy(deep=True) for cat in self.inputVisitSummary]
1105 # Set one WCS to be None
1106 testVisit = 0
1107 testDetector = 20
1108 inputVisitSummary[testVisit][testDetector].setWcs(None)
1110 outputs = self.task.run(
1111 inputVisitSummary,
1112 self.isolatedStarSources,
1113 self.isolatedStarCatalogs,
1114 instrumentName=self.instrumentName,
1115 refEpoch=self.refEpoch,
1116 refObjectLoader=self.refObjectLoader,
1117 )
1119 # Check that the fit WCS for the extension with input WCS=None returns
1120 # finite sky values.
1121 testWcs = outputs.outputWcss[self.testVisits[testVisit]][testDetector].getWcs()
1122 testSky = testWcs.pixelToSky(0, 0)
1123 self.assertTrue(testSky.isFinite())
1125 def test_inputCameraModel(self):
1126 """Test running task with an input camera model, and check that true
1127 object coordinates are recovered.
1128 """
1129 config = copy(self.config)
1130 config.saveCameraModel = False
1131 config.useInputCameraModel = True
1132 task = GbdesGlobalAstrometricFitTask(config=config)
1133 with open(os.path.join(self.datadir, "sample_global_camera_model.yaml"), "r") as f:
1134 cameraModel = yaml.load(f, Loader=yaml.Loader)
1136 outputs = task.run(
1137 self.inputVisitSummary,
1138 self.isolatedStarSources,
1139 self.isolatedStarCatalogs,
1140 instrumentName=self.instrumentName,
1141 refObjectLoader=self.refObjectLoader,
1142 inputCameraModel=cameraModel,
1143 )
1145 # Check that the output WCS is close (not necessarily exactly equal) to
1146 # the result when fitting the full model.
1147 for isolatedStarSourceRef in self.isolatedStarSources:
1148 iss = isolatedStarSourceRef.get()
1149 visits = np.unique(iss["visit"])
1150 for v, visit in enumerate(visits):
1151 outputWcsCatalog = outputs.outputWcss[visit]
1152 visitSources = iss[iss["visit"] == visit]
1153 detectors = outputWcsCatalog["id"]
1154 for d, detectorId in enumerate(detectors):
1155 fitwcs = outputWcsCatalog[d].getWcs()
1156 detSources = visitSources[visitSources["detector"] == detectorId]
1157 fitRA, fitDec = fitwcs.pixelToSkyArray(detSources["x"], detSources["y"], degrees=True)
1158 dRA = fitRA - detSources["trueRA"]
1159 dDec = fitDec - detSources["trueDec"]
1160 # Check that input coordinates match the output coordinates
1161 self.assertAlmostEqual(np.mean(dRA), 0, places=6)
1162 self.assertAlmostEqual(np.std(dRA), 0)
1163 self.assertAlmostEqual(np.mean(dDec), 0, places=6)
1164 self.assertAlmostEqual(np.std(dDec), 0)
1166 def test_useColor(self):
1167 """Test running task with color catalog and DCR fitting."""
1168 config = copy(self.config)
1169 config.useColor = True
1171 task = GbdesGlobalAstrometricFitTask(config=config)
1173 outputs = task.run(
1174 self.inputVisitSummary,
1175 self.isolatedStarSources,
1176 self.isolatedStarCatalogs,
1177 instrumentName=self.instrumentName,
1178 refObjectLoader=self.refObjectLoader,
1179 colorCatalog=self.colorCatalog,
1180 )
1181 self.assertEqual(len(outputs.colorParams["visit"]), len(self.testVisits))
1184class TestApparentMotion(lsst.utils.tests.TestCase):
1185 """This class simulates an array of objects with random proper motions and
1186 parallaxes and compares the results of
1187 `lsst.drp.tasks.gbdesAstrometricFit.calculate_apparent_motion` against the
1188 results calculated by astropy."""
1190 def setUp(self):
1191 np.random.seed(12345)
1193 self.ras = (np.random.rand(10) + 150.0) * u.degree
1194 self.decs = (np.random.rand(10) + 2.5) * u.degree
1196 self.pmRAs = (np.random.random(10) * 10) * u.mas / u.yr
1197 self.pmDecs = (np.random.random(10) * 10) * u.mas / u.yr
1199 self.parallaxes = (abs(np.random.random(10) * 5)) * u.mas
1201 self.mjds = astropy.time.Time(np.random.rand(10) * 20 + 57000, format="mjd", scale="tai")
1203 self.refEpoch = astropy.time.Time(57100, format="mjd", scale="tai")
1205 def test_proper_motion(self):
1206 """Calculate the change in position due to proper motion only for a
1207 given time change, and compare the results against astropy.
1208 """
1209 # Set parallaxes to zero to test proper motion part by itself.
1210 parallaxes = np.zeros(len(self.ras)) * u.mas
1212 data = astropy.table.Table(
1213 {
1214 "ra": self.ras,
1215 "dec": self.decs,
1216 "pmRA": self.pmRAs,
1217 "pmDec": self.pmDecs,
1218 "parallax": parallaxes,
1219 "MJD": self.mjds,
1220 }
1221 )
1222 raCorrection, decCorrection = calculate_apparent_motion(data, self.refEpoch)
1224 pmRACosDec = self.pmRAs * np.cos(self.decs)
1225 coords = SkyCoord(
1226 ra=self.ras,
1227 dec=self.decs,
1228 pm_ra_cosdec=pmRACosDec,
1229 pm_dec=self.pmDecs,
1230 obstime=self.refEpoch,
1231 )
1233 newCoords = coords.apply_space_motion(new_obstime=self.mjds)
1234 astropyDRA = newCoords.ra.degree - coords.ra.degree
1235 astropyDDec = newCoords.dec.degree - coords.dec.degree
1237 # The proper motion values are on the order of 0-3 mas/yr and the
1238 # differences are on the scale of 1e-8 mas.
1239 self.assertFloatsAlmostEqual(astropyDRA, raCorrection.value, rtol=1e-6)
1240 self.assertFloatsAlmostEqual(astropyDDec, decCorrection.value, rtol=1e-6)
1242 def test_parallax(self):
1243 """Calculate the change in position due to parallax for a given time
1244 change and compare the results against astropy. Astropy will not give
1245 the parallax part separate from other sources of space motion, such as
1246 annual aberration, but it can be obtained by comparing the calculated
1247 positions of an object with a given parallax and one with a much
1248 further distance. The result is still only approximately the same, but
1249 is close enough for accuracy needed here.
1250 """
1251 # Set proper motions to zero to test parallax correction by itself.
1252 pms = np.zeros(len(self.ras)) * u.mas / u.yr
1254 data = astropy.table.Table(
1255 {
1256 "ra": self.ras,
1257 "dec": self.decs,
1258 "pmRA": pms,
1259 "pmDec": pms,
1260 "parallax": self.parallaxes,
1261 "MJD": self.mjds,
1262 }
1263 )
1264 raCorrection, decCorrection = calculate_apparent_motion(data, self.refEpoch)
1266 coords = SkyCoord(
1267 ra=self.ras,
1268 dec=self.decs,
1269 pm_ra_cosdec=pms,
1270 pm_dec=pms,
1271 distance=Distance(parallax=self.parallaxes),
1272 obstime=self.refEpoch,
1273 )
1274 # Make another set of objects at the same coordinates but a much
1275 # greater distant in order to isolate parallax when applying space
1276 # motion.
1277 refCoords = SkyCoord(
1278 ra=self.ras,
1279 dec=self.decs,
1280 pm_ra_cosdec=pms,
1281 pm_dec=pms,
1282 distance=1e10 * Distance(parallax=self.parallaxes),
1283 obstime=self.refEpoch,
1284 )
1286 newCoords = coords.apply_space_motion(new_obstime=self.mjds).transform_to("gcrs")
1287 refNewCoords = refCoords.apply_space_motion(new_obstime=self.mjds).transform_to("gcrs")
1288 astropyDRA = newCoords.ra.degree - refNewCoords.ra.degree
1289 astropyDDec = newCoords.dec.degree - refNewCoords.dec.degree
1291 # The parallax values are on the scale of 1-3 mas, and the difference
1292 # between the two calculation methods is ~0.05 mas.
1293 self.assertFloatsAlmostEqual(astropyDRA, raCorrection.value, rtol=2e-2)
1294 self.assertFloatsAlmostEqual(astropyDDec, decCorrection.value, rtol=2e-2)
1297def setup_module(module):
1298 lsst.utils.tests.init()
1301if __name__ == "__main__": 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true
1302 lsst.utils.tests.init()
1303 unittest.main()