Coverage for python/lsst/ap/association/diaPipe.py: 17%

454 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 08:16 +0000

1# 

2# LSST Data Management System 

3# Copyright 2008-2016 AURA/LSST. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22 

23"""PipelineTask for associating DiaSources with previous DiaObjects. 

24 

25Additionally performs forced photometry on the calibrated and difference 

26images at the updated locations of DiaObjects. 

27""" 

28 

29__all__ = ("DiaPipelineConfig", 

30 "DiaPipelineTask", 

31 "DiaPipelineConnections") 

32 

33 

34import lsst.dax.apdb as daxApdb 

35import lsst.pex.config as pexConfig 

36import lsst.pipe.base as pipeBase 

37import lsst.pipe.base.connectionTypes as connTypes 

38import lsst.sphgeom 

39 

40from astropy.table import Table 

41import numpy as np 

42import pandas as pd 

43from lsst.ap.association import ( 

44 AssociationTask, 

45 DiaForcedSourceTask, 

46 PackageAlertsTask) 

47 

48from lsst.ap.association.utils import makeEmptyForcedSourceTable, getRegion, paddedRegion, readSchemaFromApdb 

49from lsst.daf.base import DateTime 

50from lsst.meas.base import DetectorVisitIdGeneratorConfig, \ 

51 DiaObjectCalculationTask 

52from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema, checkSdmSchemaColumns, \ 

53 dropEmptyColumns, make_empty_catalog 

54from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask 

55from lsst.utils.timer import timeMethod, duration_from_timeMethod 

56 

57 

58class TooManyDiaObjectsError(pipeBase.AlgorithmError): 

59 """Raised if there are an unusually large number of unassociated DiaSources. 

60 This is usually indicative of an image subtraction error, and needs to be 

61 caught before updating the APDB with a large number of spurious entries. 

62 """ 

63 def __init__(self, *, nNewDiaObjects, threshold): 

64 msg = ("Aborting processing before writing to the APDB." 

65 f" {nNewDiaObjects} new DiaObjects would be created, which exceeds the" 

66 f" configured maximum of {threshold}") 

67 super().__init__(msg) 

68 self.nNewDiaObjects = nNewDiaObjects 

69 self.threshold = threshold 

70 

71 @property 

72 def metadata(self): 

73 return {"nNewDiaObjects": self.nNewDiaObjects, 

74 "threshold": self.threshold 

75 } 

76 

77 

78class PostApdbUpdateError(pipeBase.AlgorithmError): 

79 """Raised for any error that occurs after the APDB has been updated. 

80 This allows partial outputs to be written, and signals that processing 

81 can't be retried. 

82 """ 

83 def __init__(self, *, errorMsg): 

84 msg = ("Aborting processing after writing to the APDB. " 

85 "This image cannot be retried! " 

86 "The original error was:\n" 

87 f"{errorMsg}") 

88 super().__init__(msg) 

89 

90 @property 

91 def metadata(self): 

92 return {} 

93 

94 

95class DiaPipelineConnections( 

96 pipeBase.PipelineTaskConnections, 

97 dimensions=("instrument", "visit", "detector"), 

98 defaultTemplates={"coaddName": "deep", "fakesType": ""}): 

99 """Butler connections for DiaPipelineTask. 

100 """ 

101 diaSourceTable = connTypes.Input( 

102 doc="Catalog of calibrated DiaSources.", 

103 name="{fakesType}{coaddName}Diff_diaSrcTable", 

104 storageClass="DataFrame", 

105 dimensions=("instrument", "visit", "detector"), 

106 ) 

107 solarSystemObjectTable = connTypes.Input( 

108 doc="Catalog of SolarSolarSystem objects expected to be observable in " 

109 "this detectorVisit.", 

110 name="preloaded_SsObjects", 

111 storageClass="ArrowAstropy", 

112 dimensions=("instrument", "group", "detector"), 

113 minimum=0, 

114 ) 

115 diffIm = connTypes.Input( 

116 doc="Difference image on which the DiaSources were detected.", 

117 name="{fakesType}{coaddName}Diff_differenceExp", 

118 storageClass="ExposureF", 

119 dimensions=("instrument", "visit", "detector"), 

120 ) 

121 exposure = connTypes.Input( 

122 doc="Calibrated exposure differenced with a template image during " 

123 "image differencing.", 

124 name="{fakesType}calexp", 

125 storageClass="ExposureF", 

126 dimensions=("instrument", "visit", "detector"), 

127 ) 

128 template = connTypes.Input( 

129 doc="Warped template used to create `subtractedExposure`. Not PSF " 

130 "matched.", 

131 dimensions=("instrument", "visit", "detector"), 

132 storageClass="ExposureF", 

133 name="{fakesType}{coaddName}Diff_templateExp", 

134 ) 

135 preloadedDiaObjects = connTypes.Input( 

136 doc="DiaObjects preloaded from the APDB.", 

137 name="preloaded_diaObjects", 

138 storageClass="DataFrame", 

139 dimensions=("instrument", "group", "detector"), 

140 ) 

141 preloadedDiaSources = connTypes.Input( 

142 doc="DiaSources preloaded from the APDB.", 

143 name="preloaded_diaSources", 

144 storageClass="DataFrame", 

145 dimensions=("instrument", "group", "detector"), 

146 ) 

147 preloadedDiaForcedSources = connTypes.Input( 

148 doc="DiaForcedSources preloaded from the APDB.", 

149 name="preloaded_diaForcedSources", 

150 storageClass="DataFrame", 

151 dimensions=("instrument", "group", "detector"), 

152 ) 

153 apdbMarker = connTypes.Output( 

154 doc="Marker dataset storing the configuration of the Apdb for each " 

155 "visit/detector. Used to signal the completion of the pipeline.", 

156 name="apdb_marker", 

157 storageClass="Config", 

158 dimensions=("instrument", "visit", "detector"), 

159 ) 

160 associatedDiaSources = connTypes.Output( 

161 doc="Optional output storing the DiaSource catalog after matching, " 

162 "calibration, and standardization for insertion into the Apdb.", 

163 name="{fakesType}{coaddName}Diff_assocDiaSrc", 

164 storageClass="ArrowAstropy", 

165 dimensions=("instrument", "visit", "detector"), 

166 ) 

167 associatedSsSources = connTypes.Output( 

168 doc="Optional output storing ssSource data computed during association.", 

169 name="{fakesType}{coaddName}Diff_associatedSsSources", 

170 storageClass="ArrowAstropy", 

171 dimensions=("instrument", "visit", "detector"), 

172 ) 

173 unassociatedSsObjects = connTypes.Output( 

174 doc="Expected locations of an ssObject with no source", 

175 name="ssUnassociatedObjects", 

176 storageClass="ArrowAstropy", 

177 dimensions=("instrument", "visit", "detector"), 

178 ) 

179 

180 diaForcedSources = connTypes.Output( 

181 doc="Optional output storing the forced sources computed at the diaObject positions.", 

182 name="{fakesType}{coaddName}Diff_diaForcedSrc", 

183 storageClass="ArrowAstropy", 

184 dimensions=("instrument", "visit", "detector"), 

185 ) 

186 diaObjects = connTypes.Output( 

187 doc="Optional output storing the updated diaObjects associated to these sources.", 

188 name="{fakesType}{coaddName}Diff_diaObject", 

189 storageClass="ArrowAstropy", 

190 dimensions=("instrument", "visit", "detector"), 

191 ) 

192 newDiaSources = connTypes.Output( 

193 doc="New diaSources not associated with an existing diaObject that" 

194 " were used to create a new diaObject", 

195 name="{fakesType}new_dia_source", 

196 storageClass="ArrowAstropy", 

197 dimensions=("instrument", "visit", "detector"), 

198 ) 

199 marginalDiaSources = connTypes.Output( 

200 doc="Low SNR diaSources not associated with an existing diaObject that" 

201 " were rejected instead of creating a new diaObject", 

202 name="{fakesType}marginal_new_dia_source", 

203 storageClass="ArrowAstropy", 

204 dimensions=("instrument", "visit", "detector"), 

205 ) 

206 

207 def __init__(self, *, config=None): 

208 super().__init__(config=config) 

209 

210 if not config.doWriteAssociatedSources: 

211 self.outputs.remove("associatedDiaSources") 

212 self.outputs.remove("diaForcedSources") 

213 self.outputs.remove("diaObjects") 

214 self.outputs.remove("newDiaSources") 

215 self.outputs.remove("marginalDiaSources") 

216 else: 

217 if not config.doRunForcedMeasurement: 

218 self.outputs.remove("diaForcedSources") 

219 if not config.filterUnAssociatedSources: 

220 self.outputs.remove("newDiaSources") 

221 self.outputs.remove("marginalDiaSources") 

222 if not config.doSolarSystemAssociation: 

223 self.inputs.remove("solarSystemObjectTable") 

224 if (not config.doWriteAssociatedSources) or (not config.doSolarSystemAssociation): 

225 self.outputs.remove("associatedSsSources") 

226 self.outputs.remove("unassociatedSsObjects") 

227 

228 def adjustQuantum(self, inputs, outputs, label, dataId): 

229 """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects 

230 in the `lsst.daf.butler.core.Quantum` during the graph generation stage 

231 of the activator. 

232 

233 This implementation checks to make sure that the filters in the dataset 

234 are compatible with AP processing as set by the Apdb/DPDD schema. 

235 

236 Parameters 

237 ---------- 

238 inputs : `dict` 

239 Dictionary whose keys are an input (regular or prerequisite) 

240 connection name and whose values are a tuple of the connection 

241 instance and a collection of associated `DatasetRef` objects. 

242 The exact type of the nested collections is unspecified; it can be 

243 assumed to be multi-pass iterable and support `len` and ``in``, but 

244 it should not be mutated in place. In contrast, the outer 

245 dictionaries are guaranteed to be temporary copies that are true 

246 `dict` instances, and hence may be modified and even returned; this 

247 is especially useful for delegating to `super` (see notes below). 

248 outputs : `dict` 

249 Dict of output datasets, with the same structure as ``inputs``. 

250 label : `str` 

251 Label for this task in the pipeline (should be used in all 

252 diagnostic messages). 

253 data_id : `lsst.daf.butler.DataCoordinate` 

254 Data ID for this quantum in the pipeline (should be used in all 

255 diagnostic messages). 

256 

257 Returns 

258 ------- 

259 adjusted_inputs : `dict` 

260 Dict of the same form as ``inputs`` with updated containers of 

261 input `DatasetRef` objects. Connections that are not changed 

262 should not be returned at all. Datasets may only be removed, not 

263 added. Nested collections may be of any multi-pass iterable type, 

264 and the order of iteration will set the order of iteration within 

265 `PipelineTask.runQuantum`. 

266 adjusted_outputs : `dict` 

267 Dict of updated output datasets, with the same structure and 

268 interpretation as ``adjusted_inputs``. 

269 

270 Raises 

271 ------ 

272 ScalarError 

273 Raised if any `Input` or `PrerequisiteInput` connection has 

274 ``multiple`` set to `False`, but multiple datasets. 

275 NoWorkFound 

276 Raised to indicate that this quantum should not be run; not enough 

277 datasets were found for a regular `Input` connection, and the 

278 quantum should be pruned or skipped. 

279 FileNotFoundError 

280 Raised to cause QuantumGraph generation to fail (with the message 

281 included in this exception); not enough datasets were found for a 

282 `PrerequisiteInput` connection. 

283 """ 

284 _, refs = inputs["diffIm"] 

285 for ref in refs: 

286 if ref.dataId["band"] not in self.config.validBands: 

287 raise ValueError( 

288 f"Requested '{ref.dataId['band']}' not in " 

289 "DiaPipelineConfig.validBands. To process bands not in " 

290 "the standard Rubin set (ugrizy) you must add the band to " 

291 "the validBands list in DiaPipelineConfig and add the " 

292 "appropriate columns to the Apdb schema.") 

293 return super().adjustQuantum(inputs, outputs, label, dataId) 

294 

295 

296class DiaPipelineConfig(pipeBase.PipelineTaskConfig, 

297 pipelineConnections=DiaPipelineConnections): 

298 """Config for DiaPipelineTask. 

299 """ 

300 coaddName = pexConfig.Field( 

301 doc="coadd name: typically one of deep, goodSeeing, or dcr", 

302 dtype=str, 

303 default="deep", 

304 ) 

305 apdb_config_url = pexConfig.Field( 

306 dtype=str, 

307 default=None, 

308 optional=False, 

309 doc="A config file specifying the APDB and its connection parameters, " 

310 "typically written by the apdb-cli command-line utility. " 

311 "The database must already be initialized.", 

312 ) 

313 validBands = pexConfig.ListField( 

314 dtype=str, 

315 default=["u", "g", "r", "i", "z", "y"], 

316 doc="List of bands that are valid for AP processing. To process a " 

317 "band not on this list, the appropriate band specific columns " 

318 "must be added to the Apdb schema in dax_apdb.", 

319 ) 

320 associator = pexConfig.ConfigurableField( 

321 target=AssociationTask, 

322 doc="Task used to associate DiaSources with DiaObjects.", 

323 ) 

324 doSolarSystemAssociation = pexConfig.Field( 

325 dtype=bool, 

326 default=True, 

327 doc="Process SolarSystem objects through the pipeline.", 

328 ) 

329 solarSystemAssociator = pexConfig.ConfigurableField( 

330 target=SolarSystemAssociationTask, 

331 doc="Task used to associate DiaSources with SolarSystemObjects.", 

332 ) 

333 diaCalculation = pexConfig.ConfigurableField( 

334 target=DiaObjectCalculationTask, 

335 doc="Task to compute summary statistics for DiaObjects.", 

336 ) 

337 doReloadDiaObjects = pexConfig.Field( 

338 dtype=bool, 

339 default=True, 

340 doc="Drop preloaded DiaObjects and reload them from the APDB?" 

341 "Used in production when the very latest objects from the APDB " 

342 "are needed.", 

343 ) 

344 angleMargin = pexConfig.RangeField( 

345 doc="Padding to add when loading diaObjects if `doReloadDiaObjects=True`", 

346 dtype=float, 

347 default=2, 

348 min=0, 

349 ) 

350 doRunForcedMeasurement = pexConfig.Field( 

351 dtype=bool, 

352 default=True, 

353 deprecated="Added to allow disabling forced sources for performance " 

354 "reasons during the ops rehearsal. " 

355 "It is expected to be removed.", 

356 doc="Run forced measurement on all of the diaObjects? " 

357 "This should only be turned off for debugging purposes.", 

358 ) 

359 diaForcedSource = pexConfig.ConfigurableField( 

360 target=DiaForcedSourceTask, 

361 doc="Task used for force photometer DiaObject locations in direct and " 

362 "difference images.", 

363 ) 

364 forcedReliabilityThreshold = pexConfig.Field( 

365 dtype=float, 

366 default=0.5, 

367 doc="Minimum reliability score of constituent diaSources to use when " 

368 "determining whether to calculate forced photometry for a " 

369 "diaObject.", 

370 ) 

371 forcedTrailLengthThreshold = pexConfig.Field( 

372 dtype=float, 

373 default=1.4, 

374 doc="Maximum trail length (arseconds) of constituent diaSources to use " 

375 "when determining whether to calculate forced photometry for a " 

376 "diaObject.", 

377 ) 

378 forcedBadFlags = pexConfig.ListField( 

379 dtype=str, 

380 default=["glint_trail", "isDipole"], 

381 doc="Flags of constituent diaSources to exclude when determining " 

382 "whether to calculate forced photometry for a diaObject.", 

383 ) 

384 alertPackager = pexConfig.ConfigurableField( 

385 target=PackageAlertsTask, 

386 doc="Subtask for packaging Ap data into alerts.", 

387 ) 

388 doPackageAlerts = pexConfig.Field( 

389 dtype=bool, 

390 default=False, 

391 doc="Package Dia-data into serialized alerts for distribution and " 

392 "write them to disk.", 

393 ) 

394 doWriteAssociatedSources = pexConfig.Field( 

395 dtype=bool, 

396 default=True, 

397 doc="Write out associated DiaSources, DiaForcedSources, and DiaObjects, " 

398 "formatted following the Science Data Model.", 

399 ) 

400 imagePixelMargin = pexConfig.RangeField( 

401 dtype=int, 

402 default=10, 

403 min=0, 

404 doc="Pad the image by this many pixels before removing off-image " 

405 "diaObjects for association.", 

406 ) 

407 filterUnAssociatedSources = pexConfig.Field( 

408 dtype=bool, 

409 default=True, 

410 doc="Check unassociated diaSources for quality before creating new ." 

411 "diaObjects. DiaSources that are associated with existing diaObjects " 

412 "will not be affected." 

413 ) 

414 newObjectBadFlags = pexConfig.ListField( 

415 dtype=str, 

416 default=("centroid_flag", 

417 "pixelFlags_crCenter", 

418 "pixelFlags_nodataCenter", 

419 "pixelFlags_interpolatedCenter", 

420 "pixelFlags_saturatedCenter", 

421 "pixelFlags_suspectCenter", 

422 "pixelFlags_streakCenter", 

423 "glint_trail"), 

424 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

425 "these flags set before creating new diaObjects.", 

426 ) 

427 maxNewDiaObjects = pexConfig.RangeField( 

428 dtype=float, 

429 default=0, 

430 min=0, 

431 doc="Maximum number of new DiaObjects to create before raising an error." 

432 "Set to zero to disable.", 

433 ) 

434 newObjectSnrThreshold = pexConfig.RangeField( 

435 dtype=float, 

436 default=0, 

437 min=0, 

438 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

439 "Abs(flux/fluxErr) less than this threshold before creating new " 

440 "diaObjects." 

441 "Set to zero to disable.", 

442 ) 

443 newObjectLowReliabilitySnrThreshold = pexConfig.RangeField( 

444 dtype=float, 

445 default=0, 

446 min=0, 

447 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

448 "signal-to-noise ratios less than this threshold if they have" 

449 " low reliability scores before creating new diaObjects." 

450 "Set to zero to disable.", 

451 ) 

452 newObjectReliabilityThreshold = pexConfig.RangeField( 

453 dtype=float, 

454 default=0.1, 

455 min=0, 

456 max=1, 

457 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

458 "reliability scores less than this threshold before creating new " 

459 "diaObjects." 

460 "Set to zero to disable.", 

461 ) 

462 newObjectLowSnrReliabilityThreshold = pexConfig.RangeField( 

463 dtype=float, 

464 default=0.1, 

465 min=0, 

466 max=1, 

467 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

468 "low signal-to-noise and reliability scores below this threshold " 

469 "before creating new diaObjects." 

470 "Set to zero to disable.", 

471 ) 

472 newObjectFluxField = pexConfig.Field( 

473 dtype=str, 

474 default="apFlux", 

475 doc="Name of the flux field in the standardized diaSource catalog to " 

476 "use for checking the signal-to-noise before creating new diaObjects.", 

477 ) 

478 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

479 

480 def setDefaults(self): 

481 self.diaCalculation.plugins = ["ap_meanPosition", 

482 "ap_nDiaSources", 

483 "ap_meanFlux", 

484 "ap_sigmaFlux", 

485 "ap_minMaxFlux", 

486 "ap_maxSlopeFlux", 

487 "ap_meanErrFlux", 

488 "ap_meanTotFlux"] 

489 

490 

491class DiaPipelineTask(pipeBase.PipelineTask): 

492 """Task for loading, associating and storing Difference Image Analysis 

493 (DIA) Objects and Sources. 

494 """ 

495 ConfigClass = DiaPipelineConfig 

496 _DefaultName = "diaPipe" 

497 

498 def __init__(self, initInputs=None, **kwargs): 

499 super().__init__(**kwargs) 

500 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url) 

501 self.schema = readSchemaFromApdb(self.apdb) 

502 self.makeSubtask("associator") 

503 self.makeSubtask("diaCalculation") 

504 if self.config.doRunForcedMeasurement: 

505 self.makeSubtask("diaForcedSource") 

506 if self.config.doPackageAlerts: 

507 self.makeSubtask("alertPackager") 

508 if self.config.doSolarSystemAssociation: 

509 self.makeSubtask("solarSystemAssociator") 

510 if self.config.filterUnAssociatedSources: 

511 columns = [self.config.newObjectFluxField, 

512 self.config.newObjectFluxField + "Err", 

513 "reliability"] 

514 columns += self.config.newObjectBadFlags 

515 

516 missing = checkSdmSchemaColumns(self.schema, columns, "DiaSource") 

517 if missing: 

518 raise pipeBase.InvalidQuantumError("Field %s not in the DiaSource schema" % missing) 

519 

520 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

521 inputs = butlerQC.get(inputRefs) 

522 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId) 

523 inputs["band"] = butlerQC.quantum.dataId["band"] 

524 inputs["legacySolarSystemTable"] = None 

525 if not self.config.doSolarSystemAssociation: 

526 inputs["solarSystemObjectTable"] = None 

527 

528 associationResults = pipeBase.Struct( 

529 apdbMarker=None, 

530 associatedDiaSources=None, 

531 diaForcedSources=None, 

532 diaObjects=None, 

533 associatedSsSources=None, 

534 unassociatedSsObjects=None, 

535 newDiaSources=None, 

536 marginalDiaSources=None, 

537 ) 

538 try: 

539 self.run(**inputs, associationResults=associationResults) 

540 except pipeBase.AlgorithmError as e: 

541 error = pipeBase.AnnotatedPartialOutputsError.annotate( 

542 e, 

543 self, 

544 log=self.log 

545 ) 

546 butlerQC.put(associationResults, outputRefs) 

547 raise error from e 

548 butlerQC.put(associationResults, outputRefs) 

549 

550 @timeMethod 

551 def run(self, 

552 diaSourceTable, 

553 legacySolarSystemTable, 

554 diffIm, 

555 exposure, 

556 template, 

557 preloadedDiaObjects, 

558 preloadedDiaSources, 

559 preloadedDiaForcedSources, 

560 band, 

561 idGenerator, 

562 solarSystemObjectTable=None, 

563 associationResults=None): 

564 """Process DiaSources and DiaObjects. 

565 

566 Load previous DiaObjects and their DiaSource history. Calibrate the 

567 values in the diaSourceCat. Associate new DiaSources with previous 

568 DiaObjects. Run forced photometry at the updated DiaObject locations. 

569 Store the results in the Alert Production Database (Apdb). 

570 

571 Parameters 

572 ---------- 

573 diaSourceTable : `pandas.DataFrame` 

574 Newly detected DiaSources. 

575 legacySolarSystemTable : `pandas.DataFrame` 

576 Not used 

577 diffIm : `lsst.afw.image.ExposureF` 

578 Difference image exposure in which the sources in ``diaSourceCat`` 

579 were detected. 

580 exposure : `lsst.afw.image.ExposureF` 

581 Calibrated exposure differenced with a template to create 

582 ``diffIm``. 

583 template : `lsst.afw.image.ExposureF` 

584 Template exposure used to create diffIm. 

585 preloadedDiaObjects : `pandas.DataFrame` 

586 Previously detected DiaObjects, loaded from the APDB. 

587 preloadedDiaSources : `pandas.DataFrame` 

588 Previously detected DiaSources, loaded from the APDB. 

589 preloadedDiaForcedSources : `pandas.DataFrame` 

590 Catalog of previously detected forced DiaSources, from the APDB 

591 band : `str` 

592 The band in which the new DiaSources were detected. 

593 idGenerator : `lsst.meas.base.IdGenerator` 

594 Object that generates source IDs and random number generator seeds. 

595 solarSystemObjectTable : `astropy.table.Table` 

596 Preloaded Solar System objects expected to be visible in the image. 

597 associationResults : `lsst.pipe.base.Struct`, optional 

598 Result struct that is modified to allow saving of partial outputs 

599 for some failure conditions. If the task completes successfully, 

600 this is also returned. 

601 

602 Returns 

603 ------- 

604 associationResults : `lsst.pipe.base.Struct` 

605 Results struct with components. 

606 

607 - ``apdbMarker`` : Marker dataset to store in the Butler indicating 

608 that this ccdVisit has completed successfully. 

609 (`lsst.dax.apdb.ApdbConfig`) 

610 - ``associatedDiaSources`` : Catalog of newly associated 

611 DiaSources. (`pandas.DataFrame`) 

612 - ``diaForcedSources`` : Catalog of new and previously detected 

613 forced DiaSources. (`pandas.DataFrame`) 

614 - ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`) 

615 - ``associatedSsSources`` : Catalog of ssSource records. 

616 (`pandas.DataFrame`) 

617 

618 Raises 

619 ------ 

620 RuntimeError 

621 Raised if duplicate DiaObjects or duplicate DiaSources are found. 

622 """ 

623 if associationResults is None: 

624 associationResults = pipeBase.Struct() 

625 self._validateExposure(exposure, "Science") 

626 self._validateExposure(diffIm, "Difference") 

627 self._validateExposure(template, "Template") 

628 

629 # Accept either legacySolarSystemTable or optional solarSystemObjectTable. 

630 if legacySolarSystemTable is not None and solarSystemObjectTable is None: 

631 solarSystemObjectTable = Table.from_pandas(legacySolarSystemTable) 

632 if self.config.doReloadDiaObjects: 

633 try: 

634 preloadedDiaObjects = self.loadRefreshedDiaObjects(getRegion(exposure), preloadedDiaObjects) 

635 except Exception as e: 

636 self.log.warning("Error encountered while attempting to load " 

637 "the latest diaObjects from the APDB. Processing " 

638 "will continue with only the diaObjects from " 

639 "preload.", exc_info=e) 

640 finally: 

641 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod( 

642 self.metadata, "loadRefreshedDiaObjects", clock="Utc" 

643 ) 

644 self.log.verbose("Re-loading DiaObjects: Took %.4f seconds", 

645 self.metadata["loadDiaObjectsDuration"]) 

646 

647 else: 

648 self.metadata["loadDiaObjectsDuration"] = -1 

649 

650 self.checkTableIndex(preloadedDiaSources, index=["diaObjectId", "band", "diaSourceId"]) 

651 self.checkTableIndex(preloadedDiaObjects, index="diaObjectId") 

652 self.checkTableIndex(preloadedDiaForcedSources, index=["diaObjectId", "diaForcedSourceId"]) 

653 

654 if not preloadedDiaObjects.empty: 

655 # Include a small buffer outside the image so that we can associate sources near the edge 

656 diaObjects, _ = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects, 

657 buffer=self.config.imagePixelMargin) 

658 else: 

659 self.log.info("Preloaded DiaObject table is empty.") 

660 diaObjects = preloadedDiaObjects 

661 

662 # Associate DiaSources with DiaObjects 

663 assocResults = self.associateDiaSources(diaSourceTable, solarSystemObjectTable, diffIm, diaObjects) 

664 

665 # Set unassociated diaObjectIds and ssObjectIds to NULL, and convert to SDM schema format 

666 standardizedAssociatedDiaSources = self.standardizeDataFrame( 

667 assocResults.associatedDiaSources, 

668 "DiaSource", 

669 nullColumns=["diaObjectId", "ssObjectId"] 

670 ) 

671 

672 # Merge associated diaSources 

673 mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds = self.mergeAssociatedCatalogs( 

674 preloadedDiaSources, 

675 assocResults.associatedDiaSources, 

676 diaObjects, 

677 assocResults.newDiaObjects, 

678 diffIm 

679 ) 

680 

681 # Compute DiaObject Summary statistics from their full DiaSource 

682 # history. 

683 diaCalResult = self.diaCalculation.run( 

684 mergedDiaObjects, 

685 mergedDiaSourceHistory, 

686 updatedDiaObjectIds, 

687 self.config.validBands) 

688 updatedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaCalResult.updatedDiaObjects, 

689 tableName="DiaObject", skipIndex=True) 

690 

691 # Test for duplication in the updated DiaObjects. 

692 if self.testDataFrameIndex(diaCalResult.diaObjectCat): 

693 raise RuntimeError( 

694 "Duplicate DiaObjects (loaded + updated) created after " 

695 "DiaCalculation. This is unexpected behavior and should be " 

696 "reported. Exiting.") 

697 if self.testDataFrameIndex(updatedDiaObjects): 

698 raise RuntimeError( 

699 "Duplicate DiaObjects (updated) created after " 

700 "DiaCalculation. This is unexpected behavior and should be " 

701 "reported. Exiting.") 

702 

703 # Forced source measurement 

704 if self.config.doRunForcedMeasurement: 

705 diaObjectsForced = self._selectGoodDiaObjects(diaCalResult.diaObjectCat, mergedDiaSourceHistory) 

706 diaForcedSources = self.runForcedMeasurement( 

707 diaObjectsForced, updatedDiaObjects, exposure, diffIm, idGenerator 

708 ) 

709 forcedSourceHistoryThreshold = self.diaForcedSource.config.historyThreshold 

710 else: 

711 # alertPackager needs correct columns 

712 diaForcedSources = makeEmptyForcedSourceTable(self.schema) 

713 forcedSourceHistoryThreshold = 0 

714 

715 # Write results to Alert Production Database (APDB) 

716 try: 

717 validityStart = self.writeToApdb(updatedDiaObjects, standardizedAssociatedDiaSources, 

718 diaForcedSources) 

719 finally: 

720 self.metadata["writeToApdbDuration"] = duration_from_timeMethod(self.metadata, "writeToApdb", 

721 clock="Utc") 

722 # A single log message is easier for Loki to parse than timeMethod's start+end pairs. 

723 self.log.verbose("writeToApdb: Took %.4f seconds", self.metadata["writeToApdbDuration"]) 

724 

725 # For historical reasons, apdbMarker is a Config even if it's not meant to be read. 

726 # A default Config is the cheapest way to satisfy the storage class. 

727 associationResults.apdbMarker = pexConfig.Config() 

728 associationResults.associatedDiaSources = assocResults.associatedDiaSources 

729 associationResults.diaForcedSources = diaForcedSources 

730 associationResults.diaObjects = diaCalResult.diaObjectCat 

731 associationResults.unassociatedSsObjects = assocResults.unassociatedSsObjects 

732 associationResults.newDiaSources = assocResults.newDiaSources 

733 associationResults.marginalDiaSources = assocResults.marginalDiaSources 

734 

735 # Catch *any* error after we have updated the APDB 

736 try: 

737 associatedSsSources = assocResults.associatedSsSources 

738 associatedSsSourcesPlusMpcorb = None 

739 if associatedSsSources is not None: 

740 mpcorbColumns = [col for col in associatedSsSources.columns if col[:7] == 'MPCORB_'] 

741 associatedSsSourceMpcorb = associatedSsSources[mpcorbColumns].copy() 

742 associatedSsSources = self.standardizeTable(associatedSsSources, "SSSource", nullColumns=[]) 

743 associatedSsSourcesPlusMpcorb = associatedSsSources.copy() 

744 for mpcorbColumn in mpcorbColumns: 

745 associatedSsSourcesPlusMpcorb[mpcorbColumn] = associatedSsSourceMpcorb[mpcorbColumn] 

746 associationResults.associatedSsSources = associatedSsSources 

747 

748 # patch the otherwise-empty validityStart field for the alerts 

749 updatedDiaObjects['validityStartMjdTai'] = validityStart.get(system=DateTime.MJD, 

750 scale=DateTime.TAI) 

751 

752 # Package alerts 

753 if self.config.doPackageAlerts: 

754 # Append new forced sources to the full history 

755 diaForcedSourcesFull = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources, 

756 tableName="DiaForcedSource") 

757 if self.testDataFrameIndex(diaForcedSourcesFull): 

758 self.log.warning( 

759 "Duplicate DiaForcedSources created after merge with " 

760 "history and new sources. This may cause downstream " 

761 "problems. Dropping duplicates.") 

762 # Drop duplicates via index and keep the first appearance. 

763 diaForcedSourcesFull = diaForcedSourcesFull[ 

764 ~diaForcedSourcesFull.index.duplicated(keep="first")] 

765 

766 self.alertPackager.run(assocResults.associatedDiaSources, 

767 updatedDiaObjects, 

768 preloadedDiaSources, 

769 diaForcedSourcesFull, 

770 diffIm, 

771 exposure, 

772 template, 

773 ssSrc=associatedSsSourcesPlusMpcorb, 

774 doRunForcedMeasurement=self.config.doRunForcedMeasurement, 

775 forcedSourceHistoryThreshold=forcedSourceHistoryThreshold, 

776 ) 

777 except Exception as e: 

778 # Catch *any* error after we have updated the APDB 

779 raise PostApdbUpdateError(errorMsg=repr(e)) from e 

780 

781 return associationResults 

782 

783 def _validateExposure(self, exposure, label): 

784 """Validate exposure metadata early to avoid failures after updating 

785 the APDB. 

786 

787 Parameters 

788 ---------- 

789 exposure : `lsst.afw.image.ExposureF` 

790 Calibrated science image that was subtracted. Corresponds to the 

791 ``exposure`` Butler connection (``{fakesType}calexp``). 

792 label : `str` 

793 Specification of the image being validated, for logging messages. 

794 

795 Raises 

796 ------ 

797 ValueError 

798 Raised when any required metadata field is missing or invalid. 

799 """ 

800 errors = [] 

801 

802 photo_calib = exposure.getPhotoCalib() 

803 if photo_calib is None: 

804 errors.append( 

805 f"{label}: PhotoCalib is None; flux-to-nJy calibration " 

806 "in alert packaging will fail." 

807 ) 

808 else: 

809 mean = photo_calib.getCalibrationMean() 

810 if not np.isfinite(mean) or mean <= 0.0: 

811 errors.append( 

812 f"{label}: PhotoCalib calibration mean is {mean!r}; " 

813 "must be a finite, positive value." 

814 ) 

815 

816 if exposure.getWcs() is None: 

817 errors.append( 

818 f"{label}: WCS is None; sky-to-pixel projection and " 

819 "DIA Object association will fail." 

820 ) 

821 

822 # Skip visitInfo checks for the template 

823 if label != "Template": 

824 visit_info = exposure.getInfo().getVisitInfo() 

825 if visit_info is None: 

826 errors.append( 

827 f"{label}: VisitInfo is None; observation time, pointing, " 

828 "rotation angle, and exposure duration are unavailable." 

829 ) 

830 else: 

831 obs_date = visit_info.getDate() 

832 if not obs_date.isValid(): 

833 errors.append( 

834 f"{label}: VisitInfo.date is invalid." 

835 ) 

836 

837 # boresightRaDec — written to APDB rows and alert payloads. 

838 boresight = visit_info.getBoresightRaDec() 

839 ra_deg = boresight.getRa().asDegrees() 

840 dec_deg = boresight.getDec().asDegrees() 

841 if not np.isfinite(ra_deg) or not np.isfinite(dec_deg): 

842 errors.append( 

843 f"{label}: VisitInfo.boresightRaDec contains NaN " 

844 f"(RA={ra_deg}, Dec={dec_deg}); pointing metadata " 

845 "is missing or corrupt." 

846 ) 

847 

848 rot_deg = visit_info.getBoresightRotAngle().asDegrees() 

849 if not np.isfinite(rot_deg): 

850 errors.append( 

851 f"{label}: VisitInfo.boresightRotAngle is NaN; the " 

852 "ROTPA keyword will be corrupt in all alert cutout " 

853 "FITS headers." 

854 ) 

855 

856 filter_label = exposure.getFilter() 

857 if filter_label is None: 

858 errors.append( 

859 f"{label}: filter label is None." 

860 ) 

861 else: 

862 band = filter_label.bandLabel if filter_label.hasBandLabel() else "" 

863 if not band: 

864 errors.append( 

865 f"{label}: filter label carries no band name." 

866 ) 

867 

868 bbox = exposure.getBBox() 

869 if bbox.getWidth() <= 0 or bbox.getHeight() <= 0: 

870 errors.append( 

871 f"{label}: bounding box is empty " 

872 f"({bbox.getWidth()}×{bbox.getHeight()} px); the " 

873 "readout or ISR step may have failed." 

874 ) 

875 

876 if exposure.getPsf() is None: 

877 errors.append( 

878 f"{label}: PSF model is None." 

879 ) 

880 

881 if errors: 

882 detail = "\n ".join(errors) 

883 raise ValueError( 

884 "Input exposure metadata is invalid; aborting before any " 

885 "APDB writes to prevent non-recoverable data corruption:\n " 

886 + detail 

887 ) 

888 

889 def _selectGoodDiaObjects(self, diaObjects, diaSources): 

890 """ 

891 Extract a subset of diaObjects with multiple associated diaSources that 

892 pass selection requirements. 

893 """ 

894 n_history = self.diaForcedSource.config.historyThreshold 

895 required_columns = ["reliability", "trailLength"] 

896 for col in required_columns: 

897 if col not in diaSources.columns: 

898 raise RuntimeError("Required column '%s' missing from diaSource table." % col) 

899 

900 rel_ok = diaSources["reliability"] >= self.config.forcedReliabilityThreshold 

901 trailed_ok = diaSources["trailLength"] <= self.config.forcedTrailLengthThreshold 

902 badFlags = [f for f in self.config.forcedBadFlags if f in diaSources.columns] 

903 if badFlags: 

904 self.log.info("Excluding diaSources with %s flags set when calculating" 

905 " diaObject history for forced photometry." % badFlags) 

906 # Exclude rows where ANY bad flag is True. 

907 # If bad flag columns can contain NA, treat NA as True (i.e., bad) via fillna(True). 

908 bad_any = diaSources[badFlags].fillna(True).any(axis=1) 

909 mask = rel_ok & trailed_ok & ~bad_any 

910 ids = diaSources.index.get_level_values("diaObjectId")[mask] 

911 

912 # Count occurrences in diaSource 

913 counts = pd.Series(ids, copy=False).value_counts() 

914 

915 # IDs that exceed threshold 

916 keep_ids = counts[counts >= n_history].index 

917 

918 # Since diaObject index is unique, direct index filtering is safe and efficient 

919 return diaObjects.loc[diaObjects.index.intersection(keep_ids)] 

920 

921 def createNewDiaObjects(self, unassociatedDiaSources): 

922 """Loop through the set of DiaSources and create new DiaObjects 

923 for unassociated DiaSources. 

924 

925 Parameters 

926 ---------- 

927 unassociatedDiaSources : `pandas.DataFrame` 

928 Set of DiaSources to create new DiaObjects from. 

929 

930 Returns 

931 ------- 

932 results : `lsst.pipe.base.Struct` 

933 Results struct containing: 

934 

935 - diaSources : `pandas.DataFrame` 

936 DiaSource catalog with updated DiaObject ids. 

937 - newDiaObjects : `pandas.DataFrame` 

938 Newly created DiaObjects from the unassociated DiaSources. 

939 - nNewDiaObjects : `int` 

940 Number of newly created diaObjects. 

941 - marginalDiaSources : `pandas.DataFrame` 

942 Unassociated diaSources with low signal-to-noise and/or 

943 reliability, which are excluded from the new DiaObjects. 

944 """ 

945 marginalDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

946 if len(unassociatedDiaSources) == 0: 

947 newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject") 

948 else: 

949 if self.config.filterUnAssociatedSources: 

950 results = self.filterSources(unassociatedDiaSources) 

951 unassociatedDiaSources = results.goodSources 

952 marginalDiaSources = results.badSources 

953 unassociatedDiaSources["diaObjectId"] = unassociatedDiaSources["diaSourceId"] 

954 newDiaObjects = convertDataFrameToSdmSchema(self.schema, unassociatedDiaSources, 

955 tableName="DiaObject", skipIndex=True) 

956 self.metadata["nRejectedNewDiaObjects"] = len(marginalDiaSources) 

957 return pipeBase.Struct(diaSources=unassociatedDiaSources, 

958 newDiaObjects=newDiaObjects, 

959 nNewDiaObjects=len(newDiaObjects), 

960 marginalDiaSources=marginalDiaSources) 

961 

962 def filterSources(self, sources, snrThreshold=None, lowReliabilitySnrThreshold=None, 

963 reliabilityThreshold=None, lowSnrReliabilityThreshold=None, badFlags=None): 

964 """Select good sources out of a catalog. 

965 

966 Parameters 

967 ---------- 

968 sources : `pandas.DataFrame` 

969 Set of DiaSources to check. 

970 snrThreshold : `float`, optional 

971 The minimum signal to noise diaSource to make a new diaObject. 

972 Included for unit tests. Uses the task config value if not set. 

973 lowReliabilitySnrThreshold : `float`, optional 

974 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for 

975 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold`` 

976 Included for unit tests. Uses the task config value if not set. 

977 reliabilityThreshold : `float`, optional 

978 The minimum reliability score diaSource to make a new diaObject 

979 Included for unit tests. Uses the task config value if not set. 

980 lowSnrReliabilityThreshold : `float`, optional 

981 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for 

982 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold`` 

983 Included for unit tests. Uses the task config value if not set. 

984 badFlags : `list` of `str`, optional 

985 Do not create new diaObjects for any diaSource with any of these 

986 flags set. 

987 Included for unit tests. Uses the task config value if not set. 

988 

989 Returns 

990 ------- 

991 results : `lsst.pipe.base.Struct` 

992 Results struct containing: 

993 

994 - goodSources : `pandas.DataFrame` 

995 Subset of the input sources that pass all checks. 

996 - badSources : `pandas.DataFrame` 

997 Subset of the input sources that fail any check. 

998 """ 

999 if snrThreshold is None: 

1000 snrThreshold = self.config.newObjectSnrThreshold 

1001 if lowReliabilitySnrThreshold is None: 

1002 lowReliabilitySnrThreshold = self.config.newObjectLowReliabilitySnrThreshold 

1003 if reliabilityThreshold is None: 

1004 reliabilityThreshold = self.config.newObjectReliabilityThreshold 

1005 if lowSnrReliabilityThreshold is None: 

1006 lowSnrReliabilityThreshold = self.config.newObjectLowSnrReliabilityThreshold 

1007 if badFlags is None: 

1008 badFlags = self.config.newObjectBadFlags 

1009 flagged = sources[badFlags].fillna(False).any(axis=1) 

1010 fluxField = self.config.newObjectFluxField 

1011 fluxErrField = fluxField + "Err" 

1012 signalToNoise = np.abs(np.array(sources[fluxField]/sources[fluxErrField])) 

1013 reliability = np.array(sources['reliability']) 

1014 nFlagged = np.count_nonzero(flagged) 

1015 if nFlagged > 0: 

1016 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to flags", nFlagged) 

1017 

1018 if snrThreshold > 0: 

1019 snr_flag = signalToNoise < snrThreshold 

1020 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux" 

1021 " signal to noise < %f", 

1022 np.sum(snr_flag), fluxField, snrThreshold) 

1023 flagged |= snr_flag 

1024 if reliabilityThreshold > 0: 

1025 reliability_flag = reliability < reliabilityThreshold 

1026 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to " 

1027 "reliability<%f", 

1028 np.sum(reliability_flag), reliabilityThreshold) 

1029 flagged |= reliability_flag 

1030 if min(lowReliabilitySnrThreshold, lowSnrReliabilityThreshold) > 0: 

1031 # Only run the combined test if both thresholds are greater than zero 

1032 lowSnrReliability_flag = ((signalToNoise < lowReliabilitySnrThreshold) 

1033 & (reliability < lowSnrReliabilityThreshold)) 

1034 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux" 

1035 " signal to noise < %f combined with reliability< %f", 

1036 np.sum(lowSnrReliability_flag), 

1037 fluxField, 

1038 lowReliabilitySnrThreshold, 

1039 lowSnrReliabilityThreshold) 

1040 flagged |= lowSnrReliability_flag 

1041 

1042 if np.count_nonzero(~flagged) > 0: 

1043 goodSources = sources[~flagged].copy(deep=True) 

1044 else: 

1045 goodSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1046 if np.count_nonzero(flagged) > 0: 

1047 badSources = sources[flagged].copy(deep=True) 

1048 else: 

1049 badSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1050 return pipeBase.Struct(goodSources=goodSources, 

1051 badSources=badSources 

1052 ) 

1053 

1054 @timeMethod 

1055 def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, diaObjects): 

1056 """Associate DiaSources with DiaObjects. 

1057 

1058 Associate new DiaSources with existing DiaObjects. Create new 

1059 DiaObjects fron unassociated DiaSources. Index DiaSource catalogue 

1060 after associations. Append new DiaObjects and DiaSources to their 

1061 previous history. Test for DiaSource and DiaObject duplications. 

1062 Compute DiaObject Summary statistics from their full DiaSource 

1063 history. Test for duplication in the updated DiaObjects. 

1064 

1065 Parameters 

1066 ---------- 

1067 diaSourceTable : `pandas.DataFrame` 

1068 Newly detected DiaSources. 

1069 solarSystemObjectTable : `astropy.table.Table` 

1070 Preloaded Solar System objects expected to be visible in the image. 

1071 diffIm : `lsst.afw.image.ExposureF` 

1072 Difference image exposure in which the sources in ``diaSourceCat`` 

1073 were detected. 

1074 diaObjects : `pandas.DataFrame` 

1075 Table of DiaObjects from preloaded DiaObjects. 

1076 

1077 Returns 

1078 ------- 

1079 results : `lsst.pipe.base.Struct` 

1080 Results struct containing: 

1081 

1082 - associatedDiaSources : `pandas.DataFrame` 

1083 Associated DiaSources with DiaObjects. 

1084 - newDiaObjects : `pandas.DataFrame` 

1085 Table of new DiaObjects after association. 

1086 - associatedSsSources : `astropy.table.Table` 

1087 Table of new ssSources after association. 

1088 - unassociatedSsObjects : `astropy.table.Table` 

1089 Table of expected ssSources that were not associated with a 

1090 diaSource. 

1091 - newDiaSources : `pandas.DataFrame` 

1092 Subset of `associatedDiaSources` consisting of only the 

1093 unassociated diaSources that were added as new diaObjects. 

1094 - marginalDiaSources : `pandas.DataFrame` 

1095 Unassociated diaSources with marginal detections, which were 

1096 removed from `associatedDiaSources` and were not added as new 

1097 diaObjects. 

1098 """ 

1099 associatedCatalogs = [] 

1100 # First associate diaSources with known asteroids 

1101 if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None: 

1102 ssoAssocResult = self.solarSystemAssociator.run( 

1103 Table.from_pandas(diaSourceTable), 

1104 solarSystemObjectTable, 

1105 diffIm.visitInfo, 

1106 diffIm.getBBox(), 

1107 diffIm.wcs 

1108 ) 

1109 nTotalSsObjects = ssoAssocResult.nTotalSsObjects 

1110 nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects 

1111 associatedSsSources = ssoAssocResult.associatedSsSources 

1112 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects 

1113 if len(ssoAssocResult.ssoAssocDiaSources) > 0: 

1114 associatedCatalogs.append(ssoAssocResult.ssoAssocDiaSources.to_pandas()) 

1115 if len(ssoAssocResult.unAssocDiaSources) > 0: 

1116 # If the table is empty then converting time fields to pandas 

1117 # will raise an error. Pass in an empty Dataframe in that case. 

1118 unAssocSSDiaSources = ssoAssocResult.unAssocDiaSources.to_pandas() 

1119 else: 

1120 unAssocSSDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1121 else: 

1122 unAssocSSDiaSources = diaSourceTable 

1123 nTotalSsObjects = 0 

1124 nAssociatedSsObjects = 0 

1125 associatedSsSources = None 

1126 unassociatedSsObjects = None 

1127 # Associate new DiaSources with existing DiaObjects. 

1128 assocResults = self.associator.run(unAssocSSDiaSources, diaObjects) 

1129 createResults = self.createNewDiaObjects(assocResults.unAssocDiaSources) 

1130 

1131 if not assocResults.matchedDiaSources.empty: 

1132 associatedCatalogs.append(assocResults.matchedDiaSources) 

1133 if not createResults.diaSources.empty: 

1134 associatedCatalogs.append(createResults.diaSources) 

1135 if len(associatedCatalogs) == 0: 

1136 associatedDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1137 else: 

1138 # Standardize each component to the schema before concatinating, so 

1139 # that type mis-matches are fixed as early as possible. 

1140 associatedDiaSources = pd.concat( 

1141 [convertDataFrameToSdmSchema(self.schema, df, tableName="DiaSource", skipIndex=True) 

1142 for df in associatedCatalogs]) 

1143 

1144 self._add_association_meta_data(assocResults.nUpdatedDiaObjects, 

1145 assocResults.nUnassociatedDiaObjects, 

1146 createResults.nNewDiaObjects, 

1147 nTotalSsObjects, 

1148 nAssociatedSsObjects) 

1149 self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects" 

1150 " and dropping %i marginal diaSources.", 

1151 assocResults.nUpdatedDiaObjects, 

1152 assocResults.nUnassociatedDiaObjects, 

1153 createResults.nNewDiaObjects, 

1154 len(createResults.marginalDiaSources), 

1155 ) 

1156 if createResults.nNewDiaObjects > self.config.maxNewDiaObjects > 0: 

1157 raise TooManyDiaObjectsError(nNewDiaObjects=createResults.nNewDiaObjects, 

1158 threshold=self.config.maxNewDiaObjects) 

1159 return pipeBase.Struct(associatedDiaSources=associatedDiaSources, 

1160 newDiaObjects=createResults.newDiaObjects, 

1161 associatedSsSources=associatedSsSources, 

1162 unassociatedSsObjects=unassociatedSsObjects, 

1163 newDiaSources=createResults.diaSources, 

1164 marginalDiaSources=createResults.marginalDiaSources 

1165 ) 

1166 

1167 def standardizeDataFrame(self, dataFrame, tableName, nullColumns=None): 

1168 """Convert a catalog to SDM schema format with NULL ID values 

1169 

1170 Parameters 

1171 ---------- 

1172 dataFrame : `pandas.DataFrame` 

1173 The catalog to standardize/convert 

1174 tableName : `str` 

1175 Schema name of table to which this dataFrame should be standardized 

1176 nullColumns : `list` of `str`, optional 

1177 List of column names to check for default values of 0 that should be 

1178 replaced with `NULL`. 

1179 

1180 

1181 Returns 

1182 ------- 

1183 standardizedAssociatedDiaSources : pandas.DataFrame 

1184 The standardized DiaSource catalog 

1185 """ 

1186 standardizedDataFrame = convertDataFrameToSdmSchema(self.schema, 

1187 dataFrame, 

1188 tableName=tableName, 

1189 skipIndex=True) 

1190 

1191 def _setNullColumn(dataframe, colName): 

1192 """Set specified columns with default values of 0 to NULL.""" 

1193 dataframe.loc[dataframe[colName] == 0, colName] = pd.NA 

1194 

1195 if nullColumns is not None: 

1196 for colName in nullColumns: 

1197 _setNullColumn(standardizedDataFrame, colName) 

1198 return standardizedDataFrame 

1199 

1200 def standardizeTable(self, table, tableName, nullColumns=None): 

1201 """Convert a catalog to SDM schema format with NULL ID values 

1202 

1203 Parameters 

1204 ---------- 

1205 table : `astropy.table.Table` 

1206 The catalog to standardize/convert 

1207 tableName : `str` 

1208 Schema name of table to which this dataFrame should be standardized 

1209 nullColumns : `list` of `str`, optional 

1210 List of column names to check for default values of 0 that should be 

1211 replaced with `NULL`. 

1212 

1213 

1214 Returns 

1215 ------- 

1216 standardizedAssociatedDiaSources : pandas.DataFrame 

1217 The standardized DiaSource catalog 

1218 """ 

1219 dataFrame = table.to_pandas() 

1220 standardizedDataFrame = self.standardizeDataFrame(dataFrame, tableName, nullColumns=nullColumns) 

1221 return standardizedDataFrame 

1222 

1223 @timeMethod 

1224 def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects, 

1225 diffIm): 

1226 """Merge the associated diaSource and diaObjects to their previous history. 

1227 

1228 Parameters 

1229 ---------- 

1230 preloadedDiaSources : `pandas.DataFrame` 

1231 Previously detected DiaSources, loaded from the APDB. 

1232 associatedDiaSources : `pandas.DataFrame` 

1233 Associated DiaSources with DiaObjects. 

1234 diaObjects : `pandas.DataFrame` 

1235 Table of DiaObjects from preloaded DiaObjects. 

1236 newDiaObjects : `pandas.DataFrame` 

1237 Table of new DiaObjects after association. 

1238 

1239 Raises 

1240 ------ 

1241 RuntimeError 

1242 Raised if duplicate DiaObjects or duplicate DiaSources are found. 

1243 

1244 Returns 

1245 ------- 

1246 mergedDiaSourceHistory : `pandas.DataFrame` 

1247 The combined catalog, with all of the rows from preloadedDiaSources 

1248 catalog ordered before the rows of associatedDiaSources catalog. 

1249 mergedDiaObjects : `pandas.DataFrame` 

1250 Table of new DiaObjects merged with their history. 

1251 updatedDiaObjectIds : `numpy.Array` 

1252 Object Id's from associated diaSources. 

1253 """ 

1254 # Index the DiaSource catalog for this visit after all associations 

1255 # have been made. 

1256 updatedDiaObjectIds = associatedDiaSources["diaObjectId"][ 

1257 associatedDiaSources["diaObjectId"] != 0].to_numpy() 

1258 associatedDiaSources.set_index(["diaObjectId", 

1259 "band", 

1260 "diaSourceId"], 

1261 drop=False, 

1262 inplace=True) 

1263 

1264 # Append new DiaObjects and DiaSources to their previous history. 

1265 if diaObjects.empty: 

1266 mergedDiaObjects = newDiaObjects.set_index("diaObjectId", drop=False) 

1267 elif not newDiaObjects.empty: 

1268 mergedDiaObjects = pd.concat( 

1269 [diaObjects, 

1270 newDiaObjects.set_index("diaObjectId", drop=False)], 

1271 sort=True) 

1272 else: 

1273 mergedDiaObjects = diaObjects 

1274 

1275 # Exclude any objects that are off the image after association. 

1276 mergedDiaObjects, updatedDiaObjectIds = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), 

1277 mergedDiaObjects, 

1278 diaObjectIds=updatedDiaObjectIds, 

1279 buffer=-1) 

1280 if self.testDataFrameIndex(mergedDiaObjects): 

1281 raise RuntimeError( 

1282 "Duplicate DiaObjects created after association. This is " 

1283 "likely due to re-running data with an already populated " 

1284 "Apdb. If this was not the case then there was an unexpected " 

1285 "failure in Association while matching and creating new " 

1286 "DiaObjects and should be reported. Exiting.") 

1287 

1288 mergedDiaSourceHistory = self.mergeCatalogs(preloadedDiaSources, associatedDiaSources, 

1289 tableName="DiaSource") 

1290 

1291 # Test for DiaSource duplication first. If duplicates are found, 

1292 # this likely means this is duplicate data being processed and sent 

1293 # to the Apdb. 

1294 if self.testDataFrameIndex(mergedDiaSourceHistory): 

1295 raise RuntimeError( 

1296 "Duplicate DiaSources found after association and merging " 

1297 "with history. This is likely due to re-running data with an " 

1298 "already populated Apdb. If this was not the case then there " 

1299 "was an unexpected failure in Association while matching " 

1300 "sources to objects, and should be reported. Exiting.") 

1301 # Finally, update the diaObject table with the number of associated diaSources 

1302 mergedUpdatedDiaObjects = self.updateObjectTable(mergedDiaObjects, mergedDiaSourceHistory) 

1303 return (mergedDiaSourceHistory, mergedUpdatedDiaObjects, updatedDiaObjectIds) 

1304 

1305 @timeMethod 

1306 def runForcedMeasurement(self, diaObjects, updatedDiaObjects, exposure, diffIm, idGenerator): 

1307 """Forced Source Measurement 

1308 

1309 Forced photometry on the difference and calibrated exposures using the 

1310 new and updated DiaObject locations. 

1311 

1312 Parameters 

1313 ---------- 

1314 diaObjects : `pandas.DataFrame` 

1315 Catalog of DiaObjects. 

1316 updatedDiaObjects : `pandas.DataFrame` 

1317 Catalog of updated DiaObjects. 

1318 exposure : `lsst.afw.image.ExposureF` 

1319 Calibrated exposure differenced with a template to create 

1320 ``diffIm``. 

1321 diffIm : `lsst.afw.image.ExposureF` 

1322 Difference image exposure in which the sources in ``diaSourceCat`` 

1323 were detected. 

1324 idGenerator : `lsst.meas.base.IdGenerator` 

1325 Object that generates source IDs and random number generator seeds. 

1326 

1327 Returns 

1328 ------- 

1329 diaForcedSources : `pandas.DataFrame` 

1330 Catalog of calibrated forced photometered fluxes on both the 

1331 difference and direct images at DiaObject locations. 

1332 """ 

1333 # Force photometer on the Difference and Calibrated exposures using 

1334 # the new and updated DiaObject locations. 

1335 diaForcedSources = self.diaForcedSource.run( 

1336 diaObjects, 

1337 updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(), 

1338 exposure, 

1339 diffIm, 

1340 idGenerator=idGenerator) 

1341 self.log.info(f"Updating {len(diaForcedSources)} diaForcedSources in the APDB") 

1342 diaForcedSources = convertDataFrameToSdmSchema(self.schema, diaForcedSources, 

1343 tableName="DiaForcedSource", skipIndex=True) 

1344 return diaForcedSources 

1345 

1346 @timeMethod 

1347 def loadRefreshedDiaObjects(self, region, preloadedDiaObjects): 

1348 """Load DiaObjects from the Apdb based on their HTM location. 

1349 

1350 Parameters 

1351 ---------- 

1352 region : `sphgeom.Region` 

1353 Region containing the current exposure to load diaObjects from the 

1354 APDB. 

1355 preloadedDiaObjects : `pandas.DataFrame` 

1356 Previously detected DiaObjects, loaded from the APDB. 

1357 

1358 Returns 

1359 ------- 

1360 diaObjects : `pandas.DataFrame` 

1361 DiaObjects loaded from the Apdb that are within the area defined 

1362 by ``pixelRanges``. 

1363 """ 

1364 angleMargin = lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.) 

1365 diaObjects = self.apdb.getDiaObjects(paddedRegion(region, angleMargin)) 

1366 

1367 diaObjects.set_index("diaObjectId", drop=False, inplace=True) 

1368 if diaObjects.index.has_duplicates: 

1369 self.log.warning( 

1370 "Duplicate DiaObjects loaded from the Apdb. This may cause " 

1371 "downstream pipeline issues. Dropping duplicated rows") 

1372 # Drop duplicates via index and keep the first appearance. 

1373 diaObjects = diaObjects[~diaObjects.index.duplicated(keep="first")] 

1374 self.log.info("Loaded %d DiaObjects", len(diaObjects)) 

1375 refreshedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaObjects, tableName="DiaObject", 

1376 skipIndex=True) 

1377 

1378 refreshedIsInPreloaded = refreshedDiaObjects.index.isin(preloadedDiaObjects.index) 

1379 preloadedIsInRefreshed = preloadedDiaObjects.index.isin(refreshedDiaObjects.index) 

1380 nUniqueRefreshed = (~refreshedIsInPreloaded).sum() 

1381 nUniquePreloaded = (~preloadedIsInRefreshed).sum() 

1382 if nUniqueRefreshed > 0: 

1383 self.log.info("Reloading the diaObject table during association yielded " 

1384 "an additional %d objects over the %d preloaded diaObjects.", 

1385 nUniqueRefreshed, len(preloadedDiaObjects)) 

1386 if nUniquePreloaded == 0: 

1387 return refreshedDiaObjects 

1388 # ap_verify CI datasets ship a preloaded diaObject catalog 

1389 # alongside an empty APDB, so some preloaded objects may not 

1390 # appear in the refresh. Combine the two, giving precedence to 

1391 # refreshed entries on overlap so that updated column values are 

1392 # not silently discarded when the refresh adds no new ids. 

1393 return pd.concat([refreshedDiaObjects, preloadedDiaObjects.loc[~preloadedIsInRefreshed]]) 

1394 

1395 @timeMethod 

1396 def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources): 

1397 """Write to the Alert Production Database (Apdb). 

1398 

1399 Store DiaSources, updated DiaObjects, and DiaForcedSources in the 

1400 Alert Production Database (Apdb). 

1401 

1402 Parameters 

1403 ---------- 

1404 updatedDiaObjects : `pandas.DataFrame` 

1405 Catalog of updated DiaObjects. 

1406 associatedDiaSources : `pandas.DataFrame` 

1407 Associated DiaSources with DiaObjects. 

1408 diaForcedSources : `pandas.DataFrame` 

1409 Catalog of calibrated forced photometered fluxes on both the 

1410 difference and direct images at DiaObject locations. 

1411 

1412 Returns 

1413 ------- 

1414 validityStart : `lsst.daf.base.DateTime` 

1415 Time at which the APDB was updated. 

1416 """ 

1417 # Store DiaSources, updated DiaObjects, and DiaForcedSources in the 

1418 # Apdb. 

1419 # Drop empty columns that are nullable in the APDB. 

1420 diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject") 

1421 diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource") 

1422 diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource") 

1423 

1424 validityStart = DateTime.now() 

1425 self.apdb.store( 

1426 validityStart.toAstropy(), 

1427 diaObjectStore, 

1428 diaSourceStore, 

1429 diaForcedSourceStore) 

1430 self.log.info("APDB updated.") 

1431 

1432 return validityStart 

1433 

1434 def testDataFrameIndex(self, df): 

1435 """Test the sorted DataFrame index for duplicates. 

1436 

1437 Wrapped as a separate function to allow for mocking of the this task 

1438 in unittesting. Default of a mock return for this test is True. 

1439 

1440 Parameters 

1441 ---------- 

1442 df : `pandas.DataFrame` 

1443 DataFrame to text. 

1444 

1445 Returns 

1446 ------- 

1447 `bool` 

1448 True if DataFrame contains duplicate rows. 

1449 """ 

1450 return df.index.has_duplicates 

1451 

1452 def _add_association_meta_data(self, 

1453 nUpdatedDiaObjects, 

1454 nUnassociatedDiaObjects, 

1455 nNewDiaObjects, 

1456 nTotalSsObjects, 

1457 nAssociatedSsObjects): 

1458 """Store summaries of the association step in the task metadata. 

1459 

1460 Parameters 

1461 ---------- 

1462 nUpdatedDiaObjects : `int` 

1463 Number of previous DiaObjects associated and updated in this 

1464 ccdVisit. 

1465 nUnassociatedDiaObjects : `int` 

1466 Number of previous DiaObjects that were not associated or updated 

1467 in this ccdVisit. 

1468 nNewDiaObjects : `int` 

1469 Number of newly created DiaObjects for this ccdVisit. 

1470 nTotalSsObjects : `int` 

1471 Number of SolarSystemObjects within the observable detector 

1472 area. 

1473 nAssociatedSsObjects : `int` 

1474 Number of successfully associated SolarSystemObjects. 

1475 """ 

1476 self.metadata['numUpdatedDiaObjects'] = nUpdatedDiaObjects 

1477 self.metadata['numUnassociatedDiaObjects'] = nUnassociatedDiaObjects 

1478 self.metadata['numNewDiaObjects'] = nNewDiaObjects 

1479 self.metadata['numTotalSolarSystemObjects'] = nTotalSsObjects 

1480 self.metadata['numAssociatedSsObjects'] = nAssociatedSsObjects 

1481 

1482 def purgeDiaObjects(self, bbox, wcs, diaObjCat, diaObjectIds=None, buffer=0): 

1483 """Drop diaObjects that are outside the exposure bounding box. 

1484 

1485 Parameters 

1486 ---------- 

1487 bbox : `lsst.geom.Box2I` 

1488 Bounding box of the exposure. 

1489 wcs : `lsst.afw.geom.SkyWcs` 

1490 Coordinate system definition (wcs) for the exposure. 

1491 diaObjCat : `pandas.DataFrame` 

1492 DiaObjects loaded from the Apdb. 

1493 buffer : `int`, optional 

1494 Width, in pixels, to pad the exposure bounding box. 

1495 

1496 Returns 

1497 ------- 

1498 diaObjCat : `pandas.DataFrame` 

1499 DiaObjects loaded from the Apdb, restricted to the exposure 

1500 bounding box. 

1501 """ 

1502 # Copy the bbox so the caller's box is not mutated by grow(). 

1503 bbox = type(bbox)(bbox) 

1504 try: 

1505 bbox.grow(buffer) 

1506 raVals = diaObjCat.ra.to_numpy() 

1507 decVals = diaObjCat.dec.to_numpy() 

1508 xVals, yVals = wcs.skyToPixelArray(raVals, decVals, degrees=True) 

1509 selector = bbox.contains(xVals, yVals) 

1510 nPurged = np.sum(~selector) 

1511 if nPurged > 0: 

1512 if diaObjectIds is not None: 

1513 # We also need to drop any of the associated IDs if this runs after association 

1514 purgedIds = diaObjCat[~selector].diaObjectId 

1515 diaObjectIds = diaObjectIds[~np.isin(diaObjectIds, purgedIds)] 

1516 self.log.info("Dropped %i diaObjects that were outside the bbox " 

1517 "after association, leaving %i in the catalog", 

1518 nPurged, len(diaObjCat) - nPurged) 

1519 else: 

1520 self.log.info("Dropped %i diaObjects that were outside the padded bbox " 

1521 "before association, leaving %i in the catalog", 

1522 nPurged, len(diaObjCat) - nPurged) 

1523 diaObjCat = diaObjCat[selector].copy() 

1524 except (AttributeError, KeyError, ValueError) as e: 

1525 self.log.warning("Error attempting to check diaObject history: %s", e, exc_info=e) 

1526 return diaObjCat, diaObjectIds 

1527 

1528 def mergeCatalogs(self, originalCatalog, newCatalog, tableName): 

1529 """Combine two catalogs, ensuring that the new catalog conforms to the schema. 

1530 

1531 Parameters 

1532 ---------- 

1533 originalCatalog : `pandas.DataFrame` 

1534 The original catalog to be added to. 

1535 newCatalog : `pandas.DataFrame` 

1536 The new catalog to append to `originalCatalog` 

1537 tableName : `str` 

1538 Name of the table in the schema to use. 

1539 

1540 Returns 

1541 ------- 

1542 mergedCatalog : `pandas.DataFrame` 

1543 The combined catalog, with all of the rows from ``originalCatalog`` 

1544 ordered before the rows of ``newCatalog`` 

1545 """ 

1546 if len(newCatalog) > 0: 

1547 catalog = convertDataFrameToSdmSchema(self.schema, newCatalog, 

1548 tableName=tableName, skipIndex=True) 

1549 

1550 mergedCatalog = pd.concat([originalCatalog, catalog], sort=True) 

1551 else: 

1552 mergedCatalog = pd.concat([originalCatalog], sort=True) 

1553 return mergedCatalog.loc[:, originalCatalog.columns] 

1554 

1555 def updateObjectTable(self, diaObjects, diaSources): 

1556 """Update the diaObject table with the new diaSource records. 

1557 

1558 Parameters 

1559 ---------- 

1560 diaObjects : `pandas.DataFrame` 

1561 Table of new DiaObjects merged with their history. 

1562 diaSources : `pandas.DataFrame` 

1563 The combined preloaded and associated diaSource catalog. 

1564 

1565 Returns 

1566 ------- 

1567 updatedDiaObjects : `pandas.DataFrame` 

1568 Table of DiaObjects updated with the number of associated DiaSources 

1569 """ 

1570 # Group on the index level explicitly since diaObjectId is both an 

1571 # index and a (duplicated) column 

1572 nDiaSources = diaSources.groupby(level="diaObjectId").size().rename("nDiaSources") 

1573 diaObjects = diaObjects.drop(columns="nDiaSources", errors="ignore") 

1574 updatedDiaObjects = diaObjects.join(nDiaSources, how="left") 

1575 return updatedDiaObjects 

1576 

1577 @staticmethod 

1578 def checkTableIndex(dataFrame, index): 

1579 if dataFrame.index.name is None: 

1580 # The expected index may or may not be set, depending on whether 

1581 # the table was written originally as a DataFrame or something else 

1582 # Parquet-friendly. 

1583 dataFrame.set_index(index, drop=False, inplace=True)