Coverage for python / lsst / ap / association / diaPipe.py: 17%
454 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-22 00:50 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-22 00:50 -0700
1#
2# LSST Data Management System
3# Copyright 2008-2016 AURA/LSST.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
21#
23"""PipelineTask for associating DiaSources with previous DiaObjects.
25Additionally performs forced photometry on the calibrated and difference
26images at the updated locations of DiaObjects.
27"""
29__all__ = ("DiaPipelineConfig",
30 "DiaPipelineTask",
31 "DiaPipelineConnections")
34import lsst.dax.apdb as daxApdb
35import lsst.pex.config as pexConfig
36import lsst.pipe.base as pipeBase
37import lsst.pipe.base.connectionTypes as connTypes
38import lsst.sphgeom
40from astropy.table import Table
41import numpy as np
42import pandas as pd
43from lsst.ap.association import (
44 AssociationTask,
45 DiaForcedSourceTask,
46 PackageAlertsTask)
48from lsst.ap.association.utils import makeEmptyForcedSourceTable, getRegion, paddedRegion, readSchemaFromApdb
49from lsst.daf.base import DateTime
50from lsst.meas.base import DetectorVisitIdGeneratorConfig, \
51 DiaObjectCalculationTask
52from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema, checkSdmSchemaColumns, \
53 dropEmptyColumns, make_empty_catalog
54from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask
55from lsst.utils.timer import timeMethod, duration_from_timeMethod
58class TooManyDiaObjectsError(pipeBase.AlgorithmError):
59 """Raised if there are an unusually large number of unassociated DiaSources.
60 This is usually indicative of an image subtraction error, and needs to be
61 caught before updating the APDB with a large number of spurious entries.
62 """
63 def __init__(self, *, nNewDiaObjects, threshold):
64 msg = ("Aborting processing before writing to the APDB."
65 f" {nNewDiaObjects} new DiaObjects would be created, which exceeds the"
66 f" configured maximum of {threshold}")
67 super().__init__(msg)
68 self.nNewDiaObjects = nNewDiaObjects
69 self.threshold = threshold
71 @property
72 def metadata(self):
73 return {"nNewDiaObjects": self.nNewDiaObjects,
74 "threshold": self.threshold
75 }
78class PostApdbUpdateError(pipeBase.AlgorithmError):
79 """Raised for any error that occurs after the APDB has been updated.
80 This allows partial outputs to be written, and signals that processing
81 can't be retried.
82 """
83 def __init__(self, *, errorMsg):
84 msg = ("Aborting processing after writing to the APDB. "
85 "This image cannot be retried! "
86 "The original error was:\n"
87 f"{errorMsg}")
88 super().__init__(msg)
90 @property
91 def metadata(self):
92 return {}
95class DiaPipelineConnections(
96 pipeBase.PipelineTaskConnections,
97 dimensions=("instrument", "visit", "detector"),
98 defaultTemplates={"coaddName": "deep", "fakesType": ""}):
99 """Butler connections for DiaPipelineTask.
100 """
101 diaSourceTable = connTypes.Input(
102 doc="Catalog of calibrated DiaSources.",
103 name="{fakesType}{coaddName}Diff_diaSrcTable",
104 storageClass="DataFrame",
105 dimensions=("instrument", "visit", "detector"),
106 )
107 solarSystemObjectTable = connTypes.Input(
108 doc="Catalog of SolarSolarSystem objects expected to be observable in "
109 "this detectorVisit.",
110 name="preloaded_SsObjects",
111 storageClass="ArrowAstropy",
112 dimensions=("instrument", "group", "detector"),
113 minimum=0,
114 )
115 diffIm = connTypes.Input(
116 doc="Difference image on which the DiaSources were detected.",
117 name="{fakesType}{coaddName}Diff_differenceExp",
118 storageClass="ExposureF",
119 dimensions=("instrument", "visit", "detector"),
120 )
121 exposure = connTypes.Input(
122 doc="Calibrated exposure differenced with a template image during "
123 "image differencing.",
124 name="{fakesType}calexp",
125 storageClass="ExposureF",
126 dimensions=("instrument", "visit", "detector"),
127 )
128 template = connTypes.Input(
129 doc="Warped template used to create `subtractedExposure`. Not PSF "
130 "matched.",
131 dimensions=("instrument", "visit", "detector"),
132 storageClass="ExposureF",
133 name="{fakesType}{coaddName}Diff_templateExp",
134 )
135 preloadedDiaObjects = connTypes.Input(
136 doc="DiaObjects preloaded from the APDB.",
137 name="preloaded_diaObjects",
138 storageClass="DataFrame",
139 dimensions=("instrument", "group", "detector"),
140 )
141 preloadedDiaSources = connTypes.Input(
142 doc="DiaSources preloaded from the APDB.",
143 name="preloaded_diaSources",
144 storageClass="DataFrame",
145 dimensions=("instrument", "group", "detector"),
146 )
147 preloadedDiaForcedSources = connTypes.Input(
148 doc="DiaForcedSources preloaded from the APDB.",
149 name="preloaded_diaForcedSources",
150 storageClass="DataFrame",
151 dimensions=("instrument", "group", "detector"),
152 )
153 apdbMarker = connTypes.Output(
154 doc="Marker dataset storing the configuration of the Apdb for each "
155 "visit/detector. Used to signal the completion of the pipeline.",
156 name="apdb_marker",
157 storageClass="Config",
158 dimensions=("instrument", "visit", "detector"),
159 )
160 associatedDiaSources = connTypes.Output(
161 doc="Optional output storing the DiaSource catalog after matching, "
162 "calibration, and standardization for insertion into the Apdb.",
163 name="{fakesType}{coaddName}Diff_assocDiaSrc",
164 storageClass="ArrowAstropy",
165 dimensions=("instrument", "visit", "detector"),
166 )
167 associatedSsSources = connTypes.Output(
168 doc="Optional output storing ssSource data computed during association.",
169 name="{fakesType}{coaddName}Diff_associatedSsSources",
170 storageClass="ArrowAstropy",
171 dimensions=("instrument", "visit", "detector"),
172 )
173 unassociatedSsObjects = connTypes.Output(
174 doc="Expected locations of an ssObject with no source",
175 name="ssUnassociatedObjects",
176 storageClass="ArrowAstropy",
177 dimensions=("instrument", "visit", "detector"),
178 )
180 diaForcedSources = connTypes.Output(
181 doc="Optional output storing the forced sources computed at the diaObject positions.",
182 name="{fakesType}{coaddName}Diff_diaForcedSrc",
183 storageClass="ArrowAstropy",
184 dimensions=("instrument", "visit", "detector"),
185 )
186 diaObjects = connTypes.Output(
187 doc="Optional output storing the updated diaObjects associated to these sources.",
188 name="{fakesType}{coaddName}Diff_diaObject",
189 storageClass="ArrowAstropy",
190 dimensions=("instrument", "visit", "detector"),
191 )
192 newDiaSources = connTypes.Output(
193 doc="New diaSources not associated with an existing diaObject that"
194 " were used to create a new diaObject",
195 name="{fakesType}new_dia_source",
196 storageClass="ArrowAstropy",
197 dimensions=("instrument", "visit", "detector"),
198 )
199 marginalDiaSources = connTypes.Output(
200 doc="Low SNR diaSources not associated with an existing diaObject that"
201 " were rejected instead of creating a new diaObject",
202 name="{fakesType}marginal_new_dia_source",
203 storageClass="ArrowAstropy",
204 dimensions=("instrument", "visit", "detector"),
205 )
207 def __init__(self, *, config=None):
208 super().__init__(config=config)
210 if not config.doWriteAssociatedSources:
211 self.outputs.remove("associatedDiaSources")
212 self.outputs.remove("diaForcedSources")
213 self.outputs.remove("diaObjects")
214 self.outputs.remove("newDiaSources")
215 self.outputs.remove("marginalDiaSources")
216 else:
217 if not config.doRunForcedMeasurement:
218 self.outputs.remove("diaForcedSources")
219 if not config.filterUnAssociatedSources:
220 self.outputs.remove("newDiaSources")
221 self.outputs.remove("marginalDiaSources")
222 if not config.doSolarSystemAssociation:
223 self.inputs.remove("solarSystemObjectTable")
224 if (not config.doWriteAssociatedSources) or (not config.doSolarSystemAssociation):
225 self.outputs.remove("associatedSsSources")
226 self.outputs.remove("unassociatedSsObjects")
228 def adjustQuantum(self, inputs, outputs, label, dataId):
229 """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects
230 in the `lsst.daf.butler.core.Quantum` during the graph generation stage
231 of the activator.
233 This implementation checks to make sure that the filters in the dataset
234 are compatible with AP processing as set by the Apdb/DPDD schema.
236 Parameters
237 ----------
238 inputs : `dict`
239 Dictionary whose keys are an input (regular or prerequisite)
240 connection name and whose values are a tuple of the connection
241 instance and a collection of associated `DatasetRef` objects.
242 The exact type of the nested collections is unspecified; it can be
243 assumed to be multi-pass iterable and support `len` and ``in``, but
244 it should not be mutated in place. In contrast, the outer
245 dictionaries are guaranteed to be temporary copies that are true
246 `dict` instances, and hence may be modified and even returned; this
247 is especially useful for delegating to `super` (see notes below).
248 outputs : `dict`
249 Dict of output datasets, with the same structure as ``inputs``.
250 label : `str`
251 Label for this task in the pipeline (should be used in all
252 diagnostic messages).
253 data_id : `lsst.daf.butler.DataCoordinate`
254 Data ID for this quantum in the pipeline (should be used in all
255 diagnostic messages).
257 Returns
258 -------
259 adjusted_inputs : `dict`
260 Dict of the same form as ``inputs`` with updated containers of
261 input `DatasetRef` objects. Connections that are not changed
262 should not be returned at all. Datasets may only be removed, not
263 added. Nested collections may be of any multi-pass iterable type,
264 and the order of iteration will set the order of iteration within
265 `PipelineTask.runQuantum`.
266 adjusted_outputs : `dict`
267 Dict of updated output datasets, with the same structure and
268 interpretation as ``adjusted_inputs``.
270 Raises
271 ------
272 ScalarError
273 Raised if any `Input` or `PrerequisiteInput` connection has
274 ``multiple`` set to `False`, but multiple datasets.
275 NoWorkFound
276 Raised to indicate that this quantum should not be run; not enough
277 datasets were found for a regular `Input` connection, and the
278 quantum should be pruned or skipped.
279 FileNotFoundError
280 Raised to cause QuantumGraph generation to fail (with the message
281 included in this exception); not enough datasets were found for a
282 `PrerequisiteInput` connection.
283 """
284 _, refs = inputs["diffIm"]
285 for ref in refs:
286 if ref.dataId["band"] not in self.config.validBands:
287 raise ValueError(
288 f"Requested '{ref.dataId['band']}' not in "
289 "DiaPipelineConfig.validBands. To process bands not in "
290 "the standard Rubin set (ugrizy) you must add the band to "
291 "the validBands list in DiaPipelineConfig and add the "
292 "appropriate columns to the Apdb schema.")
293 return super().adjustQuantum(inputs, outputs, label, dataId)
296class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
297 pipelineConnections=DiaPipelineConnections):
298 """Config for DiaPipelineTask.
299 """
300 coaddName = pexConfig.Field(
301 doc="coadd name: typically one of deep, goodSeeing, or dcr",
302 dtype=str,
303 default="deep",
304 )
305 apdb_config_url = pexConfig.Field(
306 dtype=str,
307 default=None,
308 optional=False,
309 doc="A config file specifying the APDB and its connection parameters, "
310 "typically written by the apdb-cli command-line utility. "
311 "The database must already be initialized.",
312 )
313 validBands = pexConfig.ListField(
314 dtype=str,
315 default=["u", "g", "r", "i", "z", "y"],
316 doc="List of bands that are valid for AP processing. To process a "
317 "band not on this list, the appropriate band specific columns "
318 "must be added to the Apdb schema in dax_apdb.",
319 )
320 associator = pexConfig.ConfigurableField(
321 target=AssociationTask,
322 doc="Task used to associate DiaSources with DiaObjects.",
323 )
324 doSolarSystemAssociation = pexConfig.Field(
325 dtype=bool,
326 default=True,
327 doc="Process SolarSystem objects through the pipeline.",
328 )
329 solarSystemAssociator = pexConfig.ConfigurableField(
330 target=SolarSystemAssociationTask,
331 doc="Task used to associate DiaSources with SolarSystemObjects.",
332 )
333 diaCalculation = pexConfig.ConfigurableField(
334 target=DiaObjectCalculationTask,
335 doc="Task to compute summary statistics for DiaObjects.",
336 )
337 doReloadDiaObjects = pexConfig.Field(
338 dtype=bool,
339 default=True,
340 doc="Drop preloaded DiaObjects and reload them from the APDB?"
341 "Used in production when the very latest objects from the APDB "
342 "are needed.",
343 )
344 angleMargin = pexConfig.RangeField(
345 doc="Padding to add when loading diaObjects if `doReloadDiaObjects=True`",
346 dtype=float,
347 default=2,
348 min=0,
349 )
350 doRunForcedMeasurement = pexConfig.Field(
351 dtype=bool,
352 default=True,
353 deprecated="Added to allow disabling forced sources for performance "
354 "reasons during the ops rehearsal. "
355 "It is expected to be removed.",
356 doc="Run forced measurement on all of the diaObjects? "
357 "This should only be turned off for debugging purposes.",
358 )
359 diaForcedSource = pexConfig.ConfigurableField(
360 target=DiaForcedSourceTask,
361 doc="Task used for force photometer DiaObject locations in direct and "
362 "difference images.",
363 )
364 forcedReliabilityThreshold = pexConfig.Field(
365 dtype=float,
366 default=0.5,
367 doc="Minimum reliability score of constituent diaSources to use when "
368 "determining whether to calculate forced photometry for a "
369 "diaObject.",
370 )
371 forcedTrailLengthThreshold = pexConfig.Field(
372 dtype=float,
373 default=1.4,
374 doc="Maximum trail length (arseconds) of constituent diaSources to use "
375 "when determining whether to calculate forced photometry for a "
376 "diaObject.",
377 )
378 forcedBadFlags = pexConfig.ListField(
379 dtype=str,
380 default=["glint_trail", "isDipole"],
381 doc="Flags of constituent diaSources to exclude when determining "
382 "whether to calculate forced photometry for a diaObject.",
383 )
384 alertPackager = pexConfig.ConfigurableField(
385 target=PackageAlertsTask,
386 doc="Subtask for packaging Ap data into alerts.",
387 )
388 doPackageAlerts = pexConfig.Field(
389 dtype=bool,
390 default=False,
391 doc="Package Dia-data into serialized alerts for distribution and "
392 "write them to disk.",
393 )
394 doWriteAssociatedSources = pexConfig.Field(
395 dtype=bool,
396 default=True,
397 doc="Write out associated DiaSources, DiaForcedSources, and DiaObjects, "
398 "formatted following the Science Data Model.",
399 )
400 imagePixelMargin = pexConfig.RangeField(
401 dtype=int,
402 default=10,
403 min=0,
404 doc="Pad the image by this many pixels before removing off-image "
405 "diaObjects for association.",
406 )
407 filterUnAssociatedSources = pexConfig.Field(
408 dtype=bool,
409 default=True,
410 doc="Check unassociated diaSources for quality before creating new ."
411 "diaObjects. DiaSources that are associated with existing diaObjects "
412 "will not be affected."
413 )
414 newObjectBadFlags = pexConfig.ListField(
415 dtype=str,
416 default=("centroid_flag",
417 "pixelFlags_crCenter",
418 "pixelFlags_nodataCenter",
419 "pixelFlags_interpolatedCenter",
420 "pixelFlags_saturatedCenter",
421 "pixelFlags_suspectCenter",
422 "pixelFlags_streakCenter",
423 "glint_trail"),
424 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
425 "these flags set before creating new diaObjects.",
426 )
427 maxNewDiaObjects = pexConfig.RangeField(
428 dtype=float,
429 default=0,
430 min=0,
431 doc="Maximum number of new DiaObjects to create before raising an error."
432 "Set to zero to disable.",
433 )
434 newObjectSnrThreshold = pexConfig.RangeField(
435 dtype=float,
436 default=0,
437 min=0,
438 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
439 "Abs(flux/fluxErr) less than this threshold before creating new "
440 "diaObjects."
441 "Set to zero to disable.",
442 )
443 newObjectLowReliabilitySnrThreshold = pexConfig.RangeField(
444 dtype=float,
445 default=0,
446 min=0,
447 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
448 "signal-to-noise ratios less than this threshold if they have"
449 " low reliability scores before creating new diaObjects."
450 "Set to zero to disable.",
451 )
452 newObjectReliabilityThreshold = pexConfig.RangeField(
453 dtype=float,
454 default=0.1,
455 min=0,
456 max=1,
457 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
458 "reliability scores less than this threshold before creating new "
459 "diaObjects."
460 "Set to zero to disable.",
461 )
462 newObjectLowSnrReliabilityThreshold = pexConfig.RangeField(
463 dtype=float,
464 default=0.1,
465 min=0,
466 max=1,
467 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
468 "low signal-to-noise and reliability scores below this threshold "
469 "before creating new diaObjects."
470 "Set to zero to disable.",
471 )
472 newObjectFluxField = pexConfig.Field(
473 dtype=str,
474 default="apFlux",
475 doc="Name of the flux field in the standardized diaSource catalog to "
476 "use for checking the signal-to-noise before creating new diaObjects.",
477 )
478 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
480 def setDefaults(self):
481 self.diaCalculation.plugins = ["ap_meanPosition",
482 "ap_nDiaSources",
483 "ap_meanFlux",
484 "ap_sigmaFlux",
485 "ap_minMaxFlux",
486 "ap_maxSlopeFlux",
487 "ap_meanErrFlux",
488 "ap_meanTotFlux"]
491class DiaPipelineTask(pipeBase.PipelineTask):
492 """Task for loading, associating and storing Difference Image Analysis
493 (DIA) Objects and Sources.
494 """
495 ConfigClass = DiaPipelineConfig
496 _DefaultName = "diaPipe"
498 def __init__(self, initInputs=None, **kwargs):
499 super().__init__(**kwargs)
500 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url)
501 self.schema = readSchemaFromApdb(self.apdb)
502 self.makeSubtask("associator")
503 self.makeSubtask("diaCalculation")
504 if self.config.doRunForcedMeasurement:
505 self.makeSubtask("diaForcedSource")
506 if self.config.doPackageAlerts:
507 self.makeSubtask("alertPackager")
508 if self.config.doSolarSystemAssociation:
509 self.makeSubtask("solarSystemAssociator")
510 if self.config.filterUnAssociatedSources:
511 columns = [self.config.newObjectFluxField,
512 self.config.newObjectFluxField + "Err",
513 "reliability"]
514 columns += self.config.newObjectBadFlags
516 missing = checkSdmSchemaColumns(self.schema, columns, "DiaSource")
517 if missing:
518 raise pipeBase.InvalidQuantumError("Field %s not in the DiaSource schema" % missing)
520 def runQuantum(self, butlerQC, inputRefs, outputRefs):
521 inputs = butlerQC.get(inputRefs)
522 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
523 inputs["band"] = butlerQC.quantum.dataId["band"]
524 inputs["legacySolarSystemTable"] = None
525 if not self.config.doSolarSystemAssociation:
526 inputs["solarSystemObjectTable"] = None
528 associationResults = pipeBase.Struct(
529 apdbMarker=None,
530 associatedDiaSources=None,
531 diaForcedSources=None,
532 diaObjects=None,
533 associatedSsSources=None,
534 unassociatedSsObjects=None,
535 newDiaSources=None,
536 marginalDiaSources=None,
537 )
538 try:
539 self.run(**inputs, associationResults=associationResults)
540 except pipeBase.AlgorithmError as e:
541 error = pipeBase.AnnotatedPartialOutputsError.annotate(
542 e,
543 self,
544 log=self.log
545 )
546 butlerQC.put(associationResults, outputRefs)
547 raise error from e
548 butlerQC.put(associationResults, outputRefs)
550 @timeMethod
551 def run(self,
552 diaSourceTable,
553 legacySolarSystemTable,
554 diffIm,
555 exposure,
556 template,
557 preloadedDiaObjects,
558 preloadedDiaSources,
559 preloadedDiaForcedSources,
560 band,
561 idGenerator,
562 solarSystemObjectTable=None,
563 associationResults=None):
564 """Process DiaSources and DiaObjects.
566 Load previous DiaObjects and their DiaSource history. Calibrate the
567 values in the diaSourceCat. Associate new DiaSources with previous
568 DiaObjects. Run forced photometry at the updated DiaObject locations.
569 Store the results in the Alert Production Database (Apdb).
571 Parameters
572 ----------
573 diaSourceTable : `pandas.DataFrame`
574 Newly detected DiaSources.
575 legacySolarSystemTable : `pandas.DataFrame`
576 Not used
577 diffIm : `lsst.afw.image.ExposureF`
578 Difference image exposure in which the sources in ``diaSourceCat``
579 were detected.
580 exposure : `lsst.afw.image.ExposureF`
581 Calibrated exposure differenced with a template to create
582 ``diffIm``.
583 template : `lsst.afw.image.ExposureF`
584 Template exposure used to create diffIm.
585 preloadedDiaObjects : `pandas.DataFrame`
586 Previously detected DiaObjects, loaded from the APDB.
587 preloadedDiaSources : `pandas.DataFrame`
588 Previously detected DiaSources, loaded from the APDB.
589 preloadedDiaForcedSources : `pandas.DataFrame`
590 Catalog of previously detected forced DiaSources, from the APDB
591 band : `str`
592 The band in which the new DiaSources were detected.
593 idGenerator : `lsst.meas.base.IdGenerator`
594 Object that generates source IDs and random number generator seeds.
595 solarSystemObjectTable : `astropy.table.Table`
596 Preloaded Solar System objects expected to be visible in the image.
597 associationResults : `lsst.pipe.base.Struct`, optional
598 Result struct that is modified to allow saving of partial outputs
599 for some failure conditions. If the task completes successfully,
600 this is also returned.
602 Returns
603 -------
604 associationResults : `lsst.pipe.base.Struct`
605 Results struct with components.
607 - ``apdbMarker`` : Marker dataset to store in the Butler indicating
608 that this ccdVisit has completed successfully.
609 (`lsst.dax.apdb.ApdbConfig`)
610 - ``associatedDiaSources`` : Catalog of newly associated
611 DiaSources. (`pandas.DataFrame`)
612 - ``diaForcedSources`` : Catalog of new and previously detected
613 forced DiaSources. (`pandas.DataFrame`)
614 - ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`)
615 - ``associatedSsSources`` : Catalog of ssSource records.
616 (`pandas.DataFrame`)
618 Raises
619 ------
620 RuntimeError
621 Raised if duplicate DiaObjects or duplicate DiaSources are found.
622 """
623 if associationResults is None:
624 associationResults = pipeBase.Struct()
625 self._validateExposure(exposure, "Science")
626 self._validateExposure(diffIm, "Difference")
627 self._validateExposure(template, "Template")
629 # Accept either legacySolarSystemTable or optional solarSystemObjectTable.
630 if legacySolarSystemTable is not None and solarSystemObjectTable is None:
631 solarSystemObjectTable = Table.from_pandas(legacySolarSystemTable)
632 if self.config.doReloadDiaObjects:
633 try:
634 preloadedDiaObjects = self.loadRefreshedDiaObjects(getRegion(exposure), preloadedDiaObjects)
635 except Exception as e:
636 self.log.warning("Error encountered while attempting to load "
637 "the latest diaObjects from the APDB. Processing "
638 "will continue with only the diaObjects from "
639 "preload.", exc_info=e)
640 finally:
641 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod(
642 self.metadata, "loadRefreshedDiaObjects", clock="Utc"
643 )
644 self.log.verbose("Re-loading DiaObjects: Took %.4f seconds",
645 self.metadata["loadDiaObjectsDuration"])
647 else:
648 self.metadata["loadDiaObjectsDuration"] = -1
650 self.checkTableIndex(preloadedDiaSources, index=["diaObjectId", "band", "diaSourceId"])
651 self.checkTableIndex(preloadedDiaObjects, index="diaObjectId")
652 self.checkTableIndex(preloadedDiaForcedSources, index=["diaObjectId", "diaForcedSourceId"])
654 if not preloadedDiaObjects.empty:
655 # Include a small buffer outside the image so that we can associate sources near the edge
656 diaObjects, _ = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects,
657 buffer=self.config.imagePixelMargin)
658 else:
659 self.log.info("Preloaded DiaObject table is empty.")
660 diaObjects = preloadedDiaObjects
662 # Associate DiaSources with DiaObjects
663 assocResults = self.associateDiaSources(diaSourceTable, solarSystemObjectTable, diffIm, diaObjects)
665 # Set unassociated diaObjectIds and ssObjectIds to NULL, and convert to SDM schema format
666 standardizedAssociatedDiaSources = self.standardizeDataFrame(
667 assocResults.associatedDiaSources,
668 "DiaSource",
669 nullColumns=["diaObjectId", "ssObjectId"]
670 )
672 # Merge associated diaSources
673 mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds = self.mergeAssociatedCatalogs(
674 preloadedDiaSources,
675 assocResults.associatedDiaSources,
676 diaObjects,
677 assocResults.newDiaObjects,
678 diffIm
679 )
681 # Compute DiaObject Summary statistics from their full DiaSource
682 # history.
683 diaCalResult = self.diaCalculation.run(
684 mergedDiaObjects,
685 mergedDiaSourceHistory,
686 updatedDiaObjectIds,
687 self.config.validBands)
688 updatedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaCalResult.updatedDiaObjects,
689 tableName="DiaObject", skipIndex=True)
691 # Test for duplication in the updated DiaObjects.
692 if self.testDataFrameIndex(diaCalResult.diaObjectCat):
693 raise RuntimeError(
694 "Duplicate DiaObjects (loaded + updated) created after "
695 "DiaCalculation. This is unexpected behavior and should be "
696 "reported. Exiting.")
697 if self.testDataFrameIndex(updatedDiaObjects):
698 raise RuntimeError(
699 "Duplicate DiaObjects (updated) created after "
700 "DiaCalculation. This is unexpected behavior and should be "
701 "reported. Exiting.")
703 # Forced source measurement
704 if self.config.doRunForcedMeasurement:
705 diaObjectsForced = self._selectGoodDiaObjects(diaCalResult.diaObjectCat, mergedDiaSourceHistory)
706 diaForcedSources = self.runForcedMeasurement(
707 diaObjectsForced, updatedDiaObjects, exposure, diffIm, idGenerator
708 )
709 forcedSourceHistoryThreshold = self.diaForcedSource.config.historyThreshold
710 else:
711 # alertPackager needs correct columns
712 diaForcedSources = makeEmptyForcedSourceTable(self.schema)
713 forcedSourceHistoryThreshold = 0
715 # Write results to Alert Production Database (APDB)
716 try:
717 validityStart = self.writeToApdb(updatedDiaObjects, standardizedAssociatedDiaSources,
718 diaForcedSources)
719 finally:
720 self.metadata["writeToApdbDuration"] = duration_from_timeMethod(self.metadata, "writeToApdb",
721 clock="Utc")
722 # A single log message is easier for Loki to parse than timeMethod's start+end pairs.
723 self.log.verbose("writeToApdb: Took %.4f seconds", self.metadata["writeToApdbDuration"])
725 # For historical reasons, apdbMarker is a Config even if it's not meant to be read.
726 # A default Config is the cheapest way to satisfy the storage class.
727 associationResults.apdbMarker = pexConfig.Config()
728 associationResults.associatedDiaSources = assocResults.associatedDiaSources
729 associationResults.diaForcedSources = diaForcedSources
730 associationResults.diaObjects = diaCalResult.diaObjectCat
731 associationResults.unassociatedSsObjects = assocResults.unassociatedSsObjects
732 associationResults.newDiaSources = assocResults.newDiaSources
733 associationResults.marginalDiaSources = assocResults.marginalDiaSources
735 # Catch *any* error after we have updated the APDB
736 try:
737 associatedSsSources = assocResults.associatedSsSources
738 associatedSsSourcesPlusMpcorb = None
739 if associatedSsSources is not None:
740 mpcorbColumns = [col for col in associatedSsSources.columns if col[:7] == 'MPCORB_']
741 associatedSsSourceMpcorb = associatedSsSources[mpcorbColumns].copy()
742 associatedSsSources = self.standardizeTable(associatedSsSources, "SSSource", nullColumns=[])
743 associatedSsSourcesPlusMpcorb = associatedSsSources.copy()
744 for mpcorbColumn in mpcorbColumns:
745 associatedSsSourcesPlusMpcorb[mpcorbColumn] = associatedSsSourceMpcorb[mpcorbColumn]
746 associationResults.associatedSsSources = associatedSsSources
748 # patch the otherwise-empty validityStart field for the alerts
749 updatedDiaObjects['validityStartMjdTai'] = validityStart.get(system=DateTime.MJD,
750 scale=DateTime.TAI)
752 # Package alerts
753 if self.config.doPackageAlerts:
754 # Append new forced sources to the full history
755 diaForcedSourcesFull = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources,
756 tableName="DiaForcedSource")
757 if self.testDataFrameIndex(diaForcedSourcesFull):
758 self.log.warning(
759 "Duplicate DiaForcedSources created after merge with "
760 "history and new sources. This may cause downstream "
761 "problems. Dropping duplicates.")
762 # Drop duplicates via index and keep the first appearance.
763 diaForcedSourcesFull = diaForcedSourcesFull[
764 ~diaForcedSourcesFull.index.duplicated(keep="first")]
766 self.alertPackager.run(assocResults.associatedDiaSources,
767 updatedDiaObjects,
768 preloadedDiaSources,
769 diaForcedSourcesFull,
770 diffIm,
771 exposure,
772 template,
773 ssSrc=associatedSsSourcesPlusMpcorb,
774 doRunForcedMeasurement=self.config.doRunForcedMeasurement,
775 forcedSourceHistoryThreshold=forcedSourceHistoryThreshold,
776 )
777 except Exception as e:
778 # Catch *any* error after we have updated the APDB
779 raise PostApdbUpdateError(errorMsg=repr(e)) from e
781 return associationResults
783 def _validateExposure(self, exposure, label):
784 """Validate exposure metadata early to avoid failures after updating
785 the APDB.
787 Parameters
788 ----------
789 exposure : `lsst.afw.image.ExposureF`
790 Calibrated science image that was subtracted. Corresponds to the
791 ``exposure`` Butler connection (``{fakesType}calexp``).
792 label : `str`
793 Specification of the image being validated, for logging messages.
795 Raises
796 ------
797 ValueError
798 Raised when any required metadata field is missing or invalid.
799 """
800 errors = []
802 photo_calib = exposure.getPhotoCalib()
803 if photo_calib is None:
804 errors.append(
805 f"{label}: PhotoCalib is None; flux-to-nJy calibration "
806 "in alert packaging will fail."
807 )
808 else:
809 mean = photo_calib.getCalibrationMean()
810 if not np.isfinite(mean) or mean <= 0.0:
811 errors.append(
812 f"{label}: PhotoCalib calibration mean is {mean!r}; "
813 "must be a finite, positive value."
814 )
816 if exposure.getWcs() is None:
817 errors.append(
818 f"{label}: WCS is None; sky-to-pixel projection and "
819 "DIA Object association will fail."
820 )
822 # Skip visitInfo checks for the template
823 if label != "Template":
824 visit_info = exposure.getInfo().getVisitInfo()
825 if visit_info is None:
826 errors.append(
827 f"{label}: VisitInfo is None; observation time, pointing, "
828 "rotation angle, and exposure duration are unavailable."
829 )
830 else:
831 obs_date = visit_info.getDate()
832 if not obs_date.isValid():
833 errors.append(
834 f"{label}: VisitInfo.date is invalid."
835 )
837 # boresightRaDec — written to APDB rows and alert payloads.
838 boresight = visit_info.getBoresightRaDec()
839 ra_deg = boresight.getRa().asDegrees()
840 dec_deg = boresight.getDec().asDegrees()
841 if not np.isfinite(ra_deg) or not np.isfinite(dec_deg):
842 errors.append(
843 f"{label}: VisitInfo.boresightRaDec contains NaN "
844 f"(RA={ra_deg}, Dec={dec_deg}); pointing metadata "
845 "is missing or corrupt."
846 )
848 rot_deg = visit_info.getBoresightRotAngle().asDegrees()
849 if not np.isfinite(rot_deg):
850 errors.append(
851 f"{label}: VisitInfo.boresightRotAngle is NaN; the "
852 "ROTPA keyword will be corrupt in all alert cutout "
853 "FITS headers."
854 )
856 filter_label = exposure.getFilter()
857 if filter_label is None:
858 errors.append(
859 f"{label}: filter label is None."
860 )
861 else:
862 band = filter_label.bandLabel if filter_label.hasBandLabel() else ""
863 if not band:
864 errors.append(
865 f"{label}: filter label carries no band name."
866 )
868 bbox = exposure.getBBox()
869 if bbox.getWidth() <= 0 or bbox.getHeight() <= 0:
870 errors.append(
871 f"{label}: bounding box is empty "
872 f"({bbox.getWidth()}×{bbox.getHeight()} px); the "
873 "readout or ISR step may have failed."
874 )
876 if exposure.getPsf() is None:
877 errors.append(
878 f"{label}: PSF model is None."
879 )
881 if errors:
882 detail = "\n ".join(errors)
883 raise ValueError(
884 "Input exposure metadata is invalid; aborting before any "
885 "APDB writes to prevent non-recoverable data corruption:\n "
886 + detail
887 )
889 def _selectGoodDiaObjects(self, diaObjects, diaSources):
890 """
891 Extract a subset of diaObjects with multiple associated diaSources that
892 pass selection requirements.
893 """
894 n_history = self.diaForcedSource.config.historyThreshold
895 required_columns = ["reliability", "trailLength"]
896 for col in required_columns:
897 if col not in diaSources.columns:
898 raise RuntimeError("Required column '%s' missing from diaSource table." % col)
900 rel_ok = diaSources["reliability"] >= self.config.forcedReliabilityThreshold
901 trailed_ok = diaSources["trailLength"] <= self.config.forcedTrailLengthThreshold
902 badFlags = [f for f in self.config.forcedBadFlags if f in diaSources.columns]
903 if badFlags:
904 self.log.info("Excluding diaSources with %s flags set when calculating"
905 " diaObject history for forced photometry." % badFlags)
906 # Exclude rows where ANY bad flag is True.
907 # If bad flag columns can contain NA, treat NA as True (i.e., bad) via fillna(True).
908 bad_any = diaSources[badFlags].fillna(True).any(axis=1)
909 mask = rel_ok & trailed_ok & ~bad_any
910 ids = diaSources.index.get_level_values("diaObjectId")[mask]
912 # Count occurrences in diaSource
913 counts = pd.Series(ids, copy=False).value_counts()
915 # IDs that exceed threshold
916 keep_ids = counts[counts >= n_history].index
918 # Since diaObject index is unique, direct index filtering is safe and efficient
919 return diaObjects.loc[diaObjects.index.intersection(keep_ids)]
921 def createNewDiaObjects(self, unassociatedDiaSources):
922 """Loop through the set of DiaSources and create new DiaObjects
923 for unassociated DiaSources.
925 Parameters
926 ----------
927 unassociatedDiaSources : `pandas.DataFrame`
928 Set of DiaSources to create new DiaObjects from.
930 Returns
931 -------
932 results : `lsst.pipe.base.Struct`
933 Results struct containing:
935 - diaSources : `pandas.DataFrame`
936 DiaSource catalog with updated DiaObject ids.
937 - newDiaObjects : `pandas.DataFrame`
938 Newly created DiaObjects from the unassociated DiaSources.
939 - nNewDiaObjects : `int`
940 Number of newly created diaObjects.
941 - marginalDiaSources : `pandas.DataFrame`
942 Unassociated diaSources with low signal-to-noise and/or
943 reliability, which are excluded from the new DiaObjects.
944 """
945 marginalDiaSources = make_empty_catalog(self.schema, tableName="DiaSource")
946 if len(unassociatedDiaSources) == 0:
947 newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject")
948 else:
949 if self.config.filterUnAssociatedSources:
950 results = self.filterSources(unassociatedDiaSources)
951 unassociatedDiaSources = results.goodSources
952 marginalDiaSources = results.badSources
953 unassociatedDiaSources["diaObjectId"] = unassociatedDiaSources["diaSourceId"]
954 newDiaObjects = convertDataFrameToSdmSchema(self.schema, unassociatedDiaSources,
955 tableName="DiaObject", skipIndex=True)
956 self.metadata["nRejectedNewDiaObjects"] = len(marginalDiaSources)
957 return pipeBase.Struct(diaSources=unassociatedDiaSources,
958 newDiaObjects=newDiaObjects,
959 nNewDiaObjects=len(newDiaObjects),
960 marginalDiaSources=marginalDiaSources)
962 def filterSources(self, sources, snrThreshold=None, lowReliabilitySnrThreshold=None,
963 reliabilityThreshold=None, lowSnrReliabilityThreshold=None, badFlags=None):
964 """Select good sources out of a catalog.
966 Parameters
967 ----------
968 sources : `pandas.DataFrame`
969 Set of DiaSources to check.
970 snrThreshold : `float`, optional
971 The minimum signal to noise diaSource to make a new diaObject.
972 Included for unit tests. Uses the task config value if not set.
973 lowReliabilitySnrThreshold : `float`, optional
974 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for
975 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold``
976 Included for unit tests. Uses the task config value if not set.
977 reliabilityThreshold : `float`, optional
978 The minimum reliability score diaSource to make a new diaObject
979 Included for unit tests. Uses the task config value if not set.
980 lowSnrReliabilityThreshold : `float`, optional
981 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for
982 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold``
983 Included for unit tests. Uses the task config value if not set.
984 badFlags : `list` of `str`, optional
985 Do not create new diaObjects for any diaSource with any of these
986 flags set.
987 Included for unit tests. Uses the task config value if not set.
989 Returns
990 -------
991 results : `lsst.pipe.base.Struct`
992 Results struct containing:
994 - goodSources : `pandas.DataFrame`
995 Subset of the input sources that pass all checks.
996 - badSources : `pandas.DataFrame`
997 Subset of the input sources that fail any check.
998 """
999 if snrThreshold is None:
1000 snrThreshold = self.config.newObjectSnrThreshold
1001 if lowReliabilitySnrThreshold is None:
1002 lowReliabilitySnrThreshold = self.config.newObjectLowReliabilitySnrThreshold
1003 if reliabilityThreshold is None:
1004 reliabilityThreshold = self.config.newObjectReliabilityThreshold
1005 if lowSnrReliabilityThreshold is None:
1006 lowSnrReliabilityThreshold = self.config.newObjectLowSnrReliabilityThreshold
1007 if badFlags is None:
1008 badFlags = self.config.newObjectBadFlags
1009 flagged = sources[badFlags].fillna(False).any(axis=1)
1010 fluxField = self.config.newObjectFluxField
1011 fluxErrField = fluxField + "Err"
1012 signalToNoise = np.abs(np.array(sources[fluxField]/sources[fluxErrField]))
1013 reliability = np.array(sources['reliability'])
1014 nFlagged = np.count_nonzero(flagged)
1015 if nFlagged > 0:
1016 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to flags", nFlagged)
1018 if snrThreshold > 0:
1019 snr_flag = signalToNoise < snrThreshold
1020 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux"
1021 " signal to noise < %f",
1022 np.sum(snr_flag), fluxField, snrThreshold)
1023 flagged |= snr_flag
1024 if reliabilityThreshold > 0:
1025 reliability_flag = reliability < reliabilityThreshold
1026 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to "
1027 "reliability<%f",
1028 np.sum(reliability_flag), reliabilityThreshold)
1029 flagged |= reliability_flag
1030 if min(lowReliabilitySnrThreshold, lowSnrReliabilityThreshold) > 0:
1031 # Only run the combined test if both thresholds are greater than zero
1032 lowSnrReliability_flag = ((signalToNoise < lowReliabilitySnrThreshold)
1033 & (reliability < lowSnrReliabilityThreshold))
1034 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux"
1035 " signal to noise < %f combined with reliability< %f",
1036 np.sum(lowSnrReliability_flag),
1037 fluxField,
1038 lowReliabilitySnrThreshold,
1039 lowSnrReliabilityThreshold)
1040 flagged |= lowSnrReliability_flag
1042 if np.count_nonzero(~flagged) > 0:
1043 goodSources = sources[~flagged].copy(deep=True)
1044 else:
1045 goodSources = make_empty_catalog(self.schema, tableName="DiaSource")
1046 if np.count_nonzero(flagged) > 0:
1047 badSources = sources[flagged].copy(deep=True)
1048 else:
1049 badSources = make_empty_catalog(self.schema, tableName="DiaSource")
1050 return pipeBase.Struct(goodSources=goodSources,
1051 badSources=badSources
1052 )
1054 @timeMethod
1055 def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, diaObjects):
1056 """Associate DiaSources with DiaObjects.
1058 Associate new DiaSources with existing DiaObjects. Create new
1059 DiaObjects fron unassociated DiaSources. Index DiaSource catalogue
1060 after associations. Append new DiaObjects and DiaSources to their
1061 previous history. Test for DiaSource and DiaObject duplications.
1062 Compute DiaObject Summary statistics from their full DiaSource
1063 history. Test for duplication in the updated DiaObjects.
1065 Parameters
1066 ----------
1067 diaSourceTable : `pandas.DataFrame`
1068 Newly detected DiaSources.
1069 solarSystemObjectTable : `astropy.table.Table`
1070 Preloaded Solar System objects expected to be visible in the image.
1071 diffIm : `lsst.afw.image.ExposureF`
1072 Difference image exposure in which the sources in ``diaSourceCat``
1073 were detected.
1074 diaObjects : `pandas.DataFrame`
1075 Table of DiaObjects from preloaded DiaObjects.
1077 Returns
1078 -------
1079 results : `lsst.pipe.base.Struct`
1080 Results struct containing:
1082 - associatedDiaSources : `pandas.DataFrame`
1083 Associated DiaSources with DiaObjects.
1084 - newDiaObjects : `pandas.DataFrame`
1085 Table of new DiaObjects after association.
1086 - associatedSsSources : `astropy.table.Table`
1087 Table of new ssSources after association.
1088 - unassociatedSsObjects : `astropy.table.Table`
1089 Table of expected ssSources that were not associated with a
1090 diaSource.
1091 - newDiaSources : `pandas.DataFrame`
1092 Subset of `associatedDiaSources` consisting of only the
1093 unassociated diaSources that were added as new diaObjects.
1094 - marginalDiaSources : `pandas.DataFrame`
1095 Unassociated diaSources with marginal detections, which were
1096 removed from `associatedDiaSources` and were not added as new
1097 diaObjects.
1098 """
1099 associatedCatalogs = []
1100 # First associate diaSources with known asteroids
1101 if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None:
1102 ssoAssocResult = self.solarSystemAssociator.run(
1103 Table.from_pandas(diaSourceTable),
1104 solarSystemObjectTable,
1105 diffIm.visitInfo,
1106 diffIm.getBBox(),
1107 diffIm.wcs
1108 )
1109 nTotalSsObjects = ssoAssocResult.nTotalSsObjects
1110 nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects
1111 associatedSsSources = ssoAssocResult.associatedSsSources
1112 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects
1113 if len(ssoAssocResult.ssoAssocDiaSources) > 0:
1114 associatedCatalogs.append(ssoAssocResult.ssoAssocDiaSources.to_pandas())
1115 if len(ssoAssocResult.unAssocDiaSources) > 0:
1116 # If the table is empty then converting time fields to pandas
1117 # will raise an error. Pass in an empty Dataframe in that case.
1118 unAssocSSDiaSources = ssoAssocResult.unAssocDiaSources.to_pandas()
1119 else:
1120 unAssocSSDiaSources = make_empty_catalog(self.schema, tableName="DiaSource")
1121 else:
1122 unAssocSSDiaSources = diaSourceTable
1123 nTotalSsObjects = 0
1124 nAssociatedSsObjects = 0
1125 associatedSsSources = None
1126 unassociatedSsObjects = None
1127 # Associate new DiaSources with existing DiaObjects.
1128 assocResults = self.associator.run(unAssocSSDiaSources, diaObjects)
1129 createResults = self.createNewDiaObjects(assocResults.unAssocDiaSources)
1131 if not assocResults.matchedDiaSources.empty:
1132 associatedCatalogs.append(assocResults.matchedDiaSources)
1133 if not createResults.diaSources.empty:
1134 associatedCatalogs.append(createResults.diaSources)
1135 if len(associatedCatalogs) == 0:
1136 associatedDiaSources = make_empty_catalog(self.schema, tableName="DiaSource")
1137 else:
1138 # Standardize each component to the schema before concatinating, so
1139 # that type mis-matches are fixed as early as possible.
1140 associatedDiaSources = pd.concat(
1141 [convertDataFrameToSdmSchema(self.schema, df, tableName="DiaSource", skipIndex=True)
1142 for df in associatedCatalogs])
1144 self._add_association_meta_data(assocResults.nUpdatedDiaObjects,
1145 assocResults.nUnassociatedDiaObjects,
1146 createResults.nNewDiaObjects,
1147 nTotalSsObjects,
1148 nAssociatedSsObjects)
1149 self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects"
1150 " and dropping %i marginal diaSources.",
1151 assocResults.nUpdatedDiaObjects,
1152 assocResults.nUnassociatedDiaObjects,
1153 createResults.nNewDiaObjects,
1154 len(createResults.marginalDiaSources),
1155 )
1156 if createResults.nNewDiaObjects > self.config.maxNewDiaObjects > 0:
1157 raise TooManyDiaObjectsError(nNewDiaObjects=createResults.nNewDiaObjects,
1158 threshold=self.config.maxNewDiaObjects)
1159 return pipeBase.Struct(associatedDiaSources=associatedDiaSources,
1160 newDiaObjects=createResults.newDiaObjects,
1161 associatedSsSources=associatedSsSources,
1162 unassociatedSsObjects=unassociatedSsObjects,
1163 newDiaSources=createResults.diaSources,
1164 marginalDiaSources=createResults.marginalDiaSources
1165 )
1167 def standardizeDataFrame(self, dataFrame, tableName, nullColumns=None):
1168 """Convert a catalog to SDM schema format with NULL ID values
1170 Parameters
1171 ----------
1172 dataFrame : `pandas.DataFrame`
1173 The catalog to standardize/convert
1174 tableName : `str`
1175 Schema name of table to which this dataFrame should be standardized
1176 nullColumns : `list` of `str`, optional
1177 List of column names to check for default values of 0 that should be
1178 replaced with `NULL`.
1181 Returns
1182 -------
1183 standardizedAssociatedDiaSources : pandas.DataFrame
1184 The standardized DiaSource catalog
1185 """
1186 standardizedDataFrame = convertDataFrameToSdmSchema(self.schema,
1187 dataFrame,
1188 tableName=tableName,
1189 skipIndex=True)
1191 def _setNullColumn(dataframe, colName):
1192 """Set specified columns with default values of 0 to NULL."""
1193 dataframe.loc[dataframe[colName] == 0, colName] = pd.NA
1195 if nullColumns is not None:
1196 for colName in nullColumns:
1197 _setNullColumn(standardizedDataFrame, colName)
1198 return standardizedDataFrame
1200 def standardizeTable(self, table, tableName, nullColumns=None):
1201 """Convert a catalog to SDM schema format with NULL ID values
1203 Parameters
1204 ----------
1205 table : `astropy.table.Table`
1206 The catalog to standardize/convert
1207 tableName : `str`
1208 Schema name of table to which this dataFrame should be standardized
1209 nullColumns : `list` of `str`, optional
1210 List of column names to check for default values of 0 that should be
1211 replaced with `NULL`.
1214 Returns
1215 -------
1216 standardizedAssociatedDiaSources : pandas.DataFrame
1217 The standardized DiaSource catalog
1218 """
1219 dataFrame = table.to_pandas()
1220 standardizedDataFrame = self.standardizeDataFrame(dataFrame, tableName, nullColumns=nullColumns)
1221 return standardizedDataFrame
1223 @timeMethod
1224 def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects,
1225 diffIm):
1226 """Merge the associated diaSource and diaObjects to their previous history.
1228 Parameters
1229 ----------
1230 preloadedDiaSources : `pandas.DataFrame`
1231 Previously detected DiaSources, loaded from the APDB.
1232 associatedDiaSources : `pandas.DataFrame`
1233 Associated DiaSources with DiaObjects.
1234 diaObjects : `pandas.DataFrame`
1235 Table of DiaObjects from preloaded DiaObjects.
1236 newDiaObjects : `pandas.DataFrame`
1237 Table of new DiaObjects after association.
1239 Raises
1240 ------
1241 RuntimeError
1242 Raised if duplicate DiaObjects or duplicate DiaSources are found.
1244 Returns
1245 -------
1246 mergedDiaSourceHistory : `pandas.DataFrame`
1247 The combined catalog, with all of the rows from preloadedDiaSources
1248 catalog ordered before the rows of associatedDiaSources catalog.
1249 mergedDiaObjects : `pandas.DataFrame`
1250 Table of new DiaObjects merged with their history.
1251 updatedDiaObjectIds : `numpy.Array`
1252 Object Id's from associated diaSources.
1253 """
1254 # Index the DiaSource catalog for this visit after all associations
1255 # have been made.
1256 updatedDiaObjectIds = associatedDiaSources["diaObjectId"][
1257 associatedDiaSources["diaObjectId"] != 0].to_numpy()
1258 associatedDiaSources.set_index(["diaObjectId",
1259 "band",
1260 "diaSourceId"],
1261 drop=False,
1262 inplace=True)
1264 # Append new DiaObjects and DiaSources to their previous history.
1265 if diaObjects.empty:
1266 mergedDiaObjects = newDiaObjects.set_index("diaObjectId", drop=False)
1267 elif not newDiaObjects.empty:
1268 mergedDiaObjects = pd.concat(
1269 [diaObjects,
1270 newDiaObjects.set_index("diaObjectId", drop=False)],
1271 sort=True)
1272 else:
1273 mergedDiaObjects = diaObjects
1275 # Exclude any objects that are off the image after association.
1276 mergedDiaObjects, updatedDiaObjectIds = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(),
1277 mergedDiaObjects,
1278 diaObjectIds=updatedDiaObjectIds,
1279 buffer=-1)
1280 if self.testDataFrameIndex(mergedDiaObjects):
1281 raise RuntimeError(
1282 "Duplicate DiaObjects created after association. This is "
1283 "likely due to re-running data with an already populated "
1284 "Apdb. If this was not the case then there was an unexpected "
1285 "failure in Association while matching and creating new "
1286 "DiaObjects and should be reported. Exiting.")
1288 mergedDiaSourceHistory = self.mergeCatalogs(preloadedDiaSources, associatedDiaSources,
1289 tableName="DiaSource")
1291 # Test for DiaSource duplication first. If duplicates are found,
1292 # this likely means this is duplicate data being processed and sent
1293 # to the Apdb.
1294 if self.testDataFrameIndex(mergedDiaSourceHistory):
1295 raise RuntimeError(
1296 "Duplicate DiaSources found after association and merging "
1297 "with history. This is likely due to re-running data with an "
1298 "already populated Apdb. If this was not the case then there "
1299 "was an unexpected failure in Association while matching "
1300 "sources to objects, and should be reported. Exiting.")
1301 # Finally, update the diaObject table with the number of associated diaSources
1302 mergedUpdatedDiaObjects = self.updateObjectTable(mergedDiaObjects, mergedDiaSourceHistory)
1303 return (mergedDiaSourceHistory, mergedUpdatedDiaObjects, updatedDiaObjectIds)
1305 @timeMethod
1306 def runForcedMeasurement(self, diaObjects, updatedDiaObjects, exposure, diffIm, idGenerator):
1307 """Forced Source Measurement
1309 Forced photometry on the difference and calibrated exposures using the
1310 new and updated DiaObject locations.
1312 Parameters
1313 ----------
1314 diaObjects : `pandas.DataFrame`
1315 Catalog of DiaObjects.
1316 updatedDiaObjects : `pandas.DataFrame`
1317 Catalog of updated DiaObjects.
1318 exposure : `lsst.afw.image.ExposureF`
1319 Calibrated exposure differenced with a template to create
1320 ``diffIm``.
1321 diffIm : `lsst.afw.image.ExposureF`
1322 Difference image exposure in which the sources in ``diaSourceCat``
1323 were detected.
1324 idGenerator : `lsst.meas.base.IdGenerator`
1325 Object that generates source IDs and random number generator seeds.
1327 Returns
1328 -------
1329 diaForcedSources : `pandas.DataFrame`
1330 Catalog of calibrated forced photometered fluxes on both the
1331 difference and direct images at DiaObject locations.
1332 """
1333 # Force photometer on the Difference and Calibrated exposures using
1334 # the new and updated DiaObject locations.
1335 diaForcedSources = self.diaForcedSource.run(
1336 diaObjects,
1337 updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(),
1338 exposure,
1339 diffIm,
1340 idGenerator=idGenerator)
1341 self.log.info(f"Updating {len(diaForcedSources)} diaForcedSources in the APDB")
1342 diaForcedSources = convertDataFrameToSdmSchema(self.schema, diaForcedSources,
1343 tableName="DiaForcedSource", skipIndex=True)
1344 return diaForcedSources
1346 @timeMethod
1347 def loadRefreshedDiaObjects(self, region, preloadedDiaObjects):
1348 """Load DiaObjects from the Apdb based on their HTM location.
1350 Parameters
1351 ----------
1352 region : `sphgeom.Region`
1353 Region containing the current exposure to load diaObjects from the
1354 APDB.
1355 preloadedDiaObjects : `pandas.DataFrame`
1356 Previously detected DiaObjects, loaded from the APDB.
1358 Returns
1359 -------
1360 diaObjects : `pandas.DataFrame`
1361 DiaObjects loaded from the Apdb that are within the area defined
1362 by ``pixelRanges``.
1363 """
1364 angleMargin = lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.)
1365 diaObjects = self.apdb.getDiaObjects(paddedRegion(region, angleMargin))
1367 diaObjects.set_index("diaObjectId", drop=False, inplace=True)
1368 if diaObjects.index.has_duplicates:
1369 self.log.warning(
1370 "Duplicate DiaObjects loaded from the Apdb. This may cause "
1371 "downstream pipeline issues. Dropping duplicated rows")
1372 # Drop duplicates via index and keep the first appearance.
1373 diaObjects = diaObjects[~diaObjects.index.duplicated(keep="first")]
1374 self.log.info("Loaded %d DiaObjects", len(diaObjects))
1375 refreshedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaObjects, tableName="DiaObject",
1376 skipIndex=True)
1378 refreshedIsInPreloaded = refreshedDiaObjects.index.isin(preloadedDiaObjects.index)
1379 preloadedIsInRefreshed = preloadedDiaObjects.index.isin(refreshedDiaObjects.index)
1380 nUniqueRefreshed = (~refreshedIsInPreloaded).sum()
1381 nUniquePreloaded = (~preloadedIsInRefreshed).sum()
1382 if nUniqueRefreshed > 0:
1383 self.log.info("Reloading the diaObject table during association yielded "
1384 "an additional %d objects over the %d preloaded diaObjects.",
1385 nUniqueRefreshed, len(preloadedDiaObjects))
1386 if nUniquePreloaded == 0:
1387 return refreshedDiaObjects
1388 # ap_verify CI datasets ship a preloaded diaObject catalog
1389 # alongside an empty APDB, so some preloaded objects may not
1390 # appear in the refresh. Combine the two, giving precedence to
1391 # refreshed entries on overlap so that updated column values are
1392 # not silently discarded when the refresh adds no new ids.
1393 return pd.concat([refreshedDiaObjects, preloadedDiaObjects.loc[~preloadedIsInRefreshed]])
1395 @timeMethod
1396 def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources):
1397 """Write to the Alert Production Database (Apdb).
1399 Store DiaSources, updated DiaObjects, and DiaForcedSources in the
1400 Alert Production Database (Apdb).
1402 Parameters
1403 ----------
1404 updatedDiaObjects : `pandas.DataFrame`
1405 Catalog of updated DiaObjects.
1406 associatedDiaSources : `pandas.DataFrame`
1407 Associated DiaSources with DiaObjects.
1408 diaForcedSources : `pandas.DataFrame`
1409 Catalog of calibrated forced photometered fluxes on both the
1410 difference and direct images at DiaObject locations.
1412 Returns
1413 -------
1414 validityStart : `lsst.daf.base.DateTime`
1415 Time at which the APDB was updated.
1416 """
1417 # Store DiaSources, updated DiaObjects, and DiaForcedSources in the
1418 # Apdb.
1419 # Drop empty columns that are nullable in the APDB.
1420 diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject")
1421 diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource")
1422 diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource")
1424 validityStart = DateTime.now()
1425 self.apdb.store(
1426 validityStart.toAstropy(),
1427 diaObjectStore,
1428 diaSourceStore,
1429 diaForcedSourceStore)
1430 self.log.info("APDB updated.")
1432 return validityStart
1434 def testDataFrameIndex(self, df):
1435 """Test the sorted DataFrame index for duplicates.
1437 Wrapped as a separate function to allow for mocking of the this task
1438 in unittesting. Default of a mock return for this test is True.
1440 Parameters
1441 ----------
1442 df : `pandas.DataFrame`
1443 DataFrame to text.
1445 Returns
1446 -------
1447 `bool`
1448 True if DataFrame contains duplicate rows.
1449 """
1450 return df.index.has_duplicates
1452 def _add_association_meta_data(self,
1453 nUpdatedDiaObjects,
1454 nUnassociatedDiaObjects,
1455 nNewDiaObjects,
1456 nTotalSsObjects,
1457 nAssociatedSsObjects):
1458 """Store summaries of the association step in the task metadata.
1460 Parameters
1461 ----------
1462 nUpdatedDiaObjects : `int`
1463 Number of previous DiaObjects associated and updated in this
1464 ccdVisit.
1465 nUnassociatedDiaObjects : `int`
1466 Number of previous DiaObjects that were not associated or updated
1467 in this ccdVisit.
1468 nNewDiaObjects : `int`
1469 Number of newly created DiaObjects for this ccdVisit.
1470 nTotalSsObjects : `int`
1471 Number of SolarSystemObjects within the observable detector
1472 area.
1473 nAssociatedSsObjects : `int`
1474 Number of successfully associated SolarSystemObjects.
1475 """
1476 self.metadata['numUpdatedDiaObjects'] = nUpdatedDiaObjects
1477 self.metadata['numUnassociatedDiaObjects'] = nUnassociatedDiaObjects
1478 self.metadata['numNewDiaObjects'] = nNewDiaObjects
1479 self.metadata['numTotalSolarSystemObjects'] = nTotalSsObjects
1480 self.metadata['numAssociatedSsObjects'] = nAssociatedSsObjects
1482 def purgeDiaObjects(self, bbox, wcs, diaObjCat, diaObjectIds=None, buffer=0):
1483 """Drop diaObjects that are outside the exposure bounding box.
1485 Parameters
1486 ----------
1487 bbox : `lsst.geom.Box2I`
1488 Bounding box of the exposure.
1489 wcs : `lsst.afw.geom.SkyWcs`
1490 Coordinate system definition (wcs) for the exposure.
1491 diaObjCat : `pandas.DataFrame`
1492 DiaObjects loaded from the Apdb.
1493 buffer : `int`, optional
1494 Width, in pixels, to pad the exposure bounding box.
1496 Returns
1497 -------
1498 diaObjCat : `pandas.DataFrame`
1499 DiaObjects loaded from the Apdb, restricted to the exposure
1500 bounding box.
1501 """
1502 # Copy the bbox so the caller's box is not mutated by grow().
1503 bbox = type(bbox)(bbox)
1504 try:
1505 bbox.grow(buffer)
1506 raVals = diaObjCat.ra.to_numpy()
1507 decVals = diaObjCat.dec.to_numpy()
1508 xVals, yVals = wcs.skyToPixelArray(raVals, decVals, degrees=True)
1509 selector = bbox.contains(xVals, yVals)
1510 nPurged = np.sum(~selector)
1511 if nPurged > 0:
1512 if diaObjectIds is not None:
1513 # We also need to drop any of the associated IDs if this runs after association
1514 purgedIds = diaObjCat[~selector].diaObjectId
1515 diaObjectIds = diaObjectIds[~np.isin(diaObjectIds, purgedIds)]
1516 self.log.info("Dropped %i diaObjects that were outside the bbox "
1517 "after association, leaving %i in the catalog",
1518 nPurged, len(diaObjCat) - nPurged)
1519 else:
1520 self.log.info("Dropped %i diaObjects that were outside the padded bbox "
1521 "before association, leaving %i in the catalog",
1522 nPurged, len(diaObjCat) - nPurged)
1523 diaObjCat = diaObjCat[selector].copy()
1524 except (AttributeError, KeyError, ValueError) as e:
1525 self.log.warning("Error attempting to check diaObject history: %s", e, exc_info=e)
1526 return diaObjCat, diaObjectIds
1528 def mergeCatalogs(self, originalCatalog, newCatalog, tableName):
1529 """Combine two catalogs, ensuring that the new catalog conforms to the schema.
1531 Parameters
1532 ----------
1533 originalCatalog : `pandas.DataFrame`
1534 The original catalog to be added to.
1535 newCatalog : `pandas.DataFrame`
1536 The new catalog to append to `originalCatalog`
1537 tableName : `str`
1538 Name of the table in the schema to use.
1540 Returns
1541 -------
1542 mergedCatalog : `pandas.DataFrame`
1543 The combined catalog, with all of the rows from ``originalCatalog``
1544 ordered before the rows of ``newCatalog``
1545 """
1546 if len(newCatalog) > 0:
1547 catalog = convertDataFrameToSdmSchema(self.schema, newCatalog,
1548 tableName=tableName, skipIndex=True)
1550 mergedCatalog = pd.concat([originalCatalog, catalog], sort=True)
1551 else:
1552 mergedCatalog = pd.concat([originalCatalog], sort=True)
1553 return mergedCatalog.loc[:, originalCatalog.columns]
1555 def updateObjectTable(self, diaObjects, diaSources):
1556 """Update the diaObject table with the new diaSource records.
1558 Parameters
1559 ----------
1560 diaObjects : `pandas.DataFrame`
1561 Table of new DiaObjects merged with their history.
1562 diaSources : `pandas.DataFrame`
1563 The combined preloaded and associated diaSource catalog.
1565 Returns
1566 -------
1567 updatedDiaObjects : `pandas.DataFrame`
1568 Table of DiaObjects updated with the number of associated DiaSources
1569 """
1570 # Group on the index level explicitly since diaObjectId is both an
1571 # index and a (duplicated) column
1572 nDiaSources = diaSources.groupby(level="diaObjectId").size().rename("nDiaSources")
1573 diaObjects = diaObjects.drop(columns="nDiaSources", errors="ignore")
1574 updatedDiaObjects = diaObjects.join(nDiaSources, how="left")
1575 return updatedDiaObjects
1577 @staticmethod
1578 def checkTableIndex(dataFrame, index):
1579 if dataFrame.index.name is None:
1580 # The expected index may or may not be set, depending on whether
1581 # the table was written originally as a DataFrame or something else
1582 # Parquet-friendly.
1583 dataFrame.set_index(index, drop=False, inplace=True)