Coverage for python/lsst/pipe/tasks/multiBand.py: 0%

348 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 02:19 -0700

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["DetectCoaddSourcesConfig", "DetectCoaddSourcesTask", 

23 "MeasureMergedCoaddSourcesConfig", "MeasureMergedCoaddSourcesTask", 

24 ] 

25 

26import numpy as np 

27 

28from lsst.geom import Extent2I 

29from lsst.pipe.base import ( 

30 AnnotatedPartialOutputsError, 

31 Struct, 

32 PipelineTask, 

33 PipelineTaskConfig, 

34 PipelineTaskConnections 

35) 

36import lsst.pipe.base.connectionTypes as cT 

37from lsst.pex.config import Field, ConfigurableField, ChoiceField 

38from lsst.meas.algorithms import ( 

39 DynamicDetectionTask, 

40 ExceedsMaxVarianceScaleError, 

41 InsufficientSourcesError, 

42 PsfGenerationError, 

43 ReferenceObjectLoader, 

44 ScaleVarianceTask, 

45 SetPrimaryFlagsTask, 

46 TooManyMaskedPixelsError, 

47 ZeroFootprintError, 

48) 

49from lsst.meas.base import ( 

50 SingleFrameMeasurementTask, 

51 ApplyApCorrTask, 

52 CatalogCalculationTask, 

53 SkyMapIdGeneratorConfig, 

54) 

55from lsst.meas.extensions.scarlet.io import updateCatalogFootprints 

56from lsst.meas.astrom import DirectMatchTask, denormalizeMatches 

57from lsst.pipe.tasks.propagateSourceFlags import PropagateSourceFlagsTask 

58import lsst.afw.image as afwImage 

59import lsst.afw.math as afwMath 

60import lsst.afw.table as afwTable 

61from lsst.daf.base import PropertyList 

62from lsst.skymap import BaseSkyMap 

63 

64# NOTE: these imports are a convenience so multiband users only have to import this file. 

65from .mergeDetections import MergeDetectionsConfig, MergeDetectionsTask # noqa: F401 

66from .mergeMeasurements import MergeMeasurementsConfig, MergeMeasurementsTask # noqa: F401 

67from .multiBandUtils import CullPeaksConfig # noqa: F401 

68from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiConfig # noqa: F401 

69from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiTask # noqa: F401 

70 

71 

72""" 

73New set types: 

74* deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter) 

75* deepCoadd_mergeDet: merged detections (tract, patch) 

76* deepCoadd_meas: measurements of merged detections (tract, patch, filter) 

77* deepCoadd_ref: reference sources (tract, patch) 

78All of these have associated *_schema catalogs that require no data ID and hold no records. 

79 

80In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in 

81the mergeDet, meas, and ref dataset Footprints: 

82* deepCoadd_peak_schema 

83""" 

84 

85 

86############################################################################################################## 

87class DetectCoaddSourcesConnections(PipelineTaskConnections, 

88 dimensions=("tract", "patch", "band", "skymap"), 

89 defaultTemplates={"inputCoaddName": "deep", "outputCoaddName": "deep"}): 

90 detectionSchema = cT.InitOutput( 

91 doc="Schema of the detection catalog", 

92 name="{outputCoaddName}Coadd_det_schema", 

93 storageClass="SourceCatalog", 

94 ) 

95 exposure = cT.Input( 

96 doc="Exposure on which detections are to be performed. ", 

97 name="{inputCoaddName}Coadd", 

98 storageClass="ExposureF", 

99 dimensions=("tract", "patch", "band", "skymap") 

100 ) 

101 exposure_cells = cT.Input( 

102 doc="Exposure on which detections are to be performed. ", 

103 name="{inputCoaddName}CoaddCell", 

104 storageClass="MultipleCellCoadd", 

105 dimensions=("tract", "patch", "band", "skymap"), 

106 ) 

107 skyMap = cT.Input( 

108 doc="Description of the skymap's tracts and patches.", 

109 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

110 storageClass="SkyMap", 

111 dimensions=("skymap",), 

112 ) 

113 outputBackgrounds = cT.Output( 

114 doc="Output Backgrounds used in detection", 

115 name="{outputCoaddName}Coadd_calexp_background", 

116 storageClass="Background", 

117 dimensions=("tract", "patch", "band", "skymap") 

118 ) 

119 outputSources = cT.Output( 

120 doc="Detected sources catalog", 

121 name="{outputCoaddName}Coadd_det", 

122 storageClass="SourceCatalog", 

123 dimensions=("tract", "patch", "band", "skymap") 

124 ) 

125 outputExposure = cT.Output( 

126 doc="Exposure post detection", 

127 name="{outputCoaddName}Coadd_calexp", 

128 storageClass="ExposureF", 

129 dimensions=("tract", "patch", "band", "skymap") 

130 ) 

131 

132 def __init__(self, *, config=None): 

133 super().__init__(config=config) 

134 assert isinstance(config, DetectCoaddSourcesConfig) 

135 

136 if config.useCellCoadds: 

137 del self.exposure 

138 else: 

139 del self.exposure_cells 

140 

141 if not self.config.forceExactBinning: 

142 del self.skyMap 

143 if self.config.writeOnlyBackgrounds: 

144 del self.outputExposure 

145 del self.outputSources 

146 del self.detectionSchema 

147 

148 

149class DetectCoaddSourcesConfig(PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections): 

150 """Configuration parameters for the DetectCoaddSourcesTask 

151 """ 

152 

153 doScaleVariance = Field(dtype=bool, default=True, doc="Scale variance plane using empirical noise?") 

154 scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc="Variance rescaling") 

155 detection = ConfigurableField(target=DynamicDetectionTask, doc="Source detection") 

156 coaddName = Field(dtype=str, default="deep", doc="Name of coadd") 

157 useCellCoadds = Field(dtype=bool, default=False, doc="Whether to use cell coadds?") 

158 hasFakes = Field( 

159 dtype=bool, 

160 default=False, 

161 doc="Should be set to True if fake sources have been inserted into the input data.", 

162 ) 

163 idGenerator = SkyMapIdGeneratorConfig.make_field() 

164 forceExactBinning = Field( 

165 dtype=bool, 

166 default=False, 

167 doc=( 

168 "Check that the background bin size evenly divides the patch inner region, and " 

169 "crop the outer region to an integer number of bins." 

170 ) 

171 ) 

172 writeOnlyBackgrounds = Field(dtype=bool, default=False, doc="If true, only save the background models.") 

173 writeEmptyBackgrounds = Field( 

174 dtype=bool, 

175 default=True, 

176 doc=( 

177 "If true, save a placeholder background with NaNs in all bins (but the right geometry) when " 

178 "there are no pixels to compute a background from. This can be useful if a later task combines " 

179 "backgrounds from multiple patches as input." 

180 ) 

181 ) 

182 

183 def setDefaults(self): 

184 super().setDefaults() 

185 self.detection.thresholdType = "pixel_stdev" 

186 self.detection.isotropicGrow = True 

187 # Coadds are made from background-subtracted CCDs, so any background subtraction should be very basic 

188 self.detection.reEstimateBackground = False 

189 self.detection.background.useApprox = False 

190 self.detection.background.binSize = 4096 

191 self.detection.background.undersampleStyle = 'REDUCE_INTERP_ORDER' 

192 self.detection.doTempWideBackground = True # Suppress large footprints that overwhelm the deblender 

193 # Include band in packed data IDs that go into object IDs (None -> "as 

194 # many bands as are defined", rather than the default of zero). 

195 self.idGenerator.packer.n_bands = None 

196 

197 

198class DetectCoaddSourcesTask(PipelineTask): 

199 """Detect sources on a single filter coadd. 

200 

201 Coadding individual visits requires each exposure to be warped. This 

202 introduces covariance in the noise properties across pixels. Before 

203 detection, we correct the coadd variance by scaling the variance plane in 

204 the coadd to match the observed variance. This is an approximate 

205 approach -- strictly, we should propagate the full covariance matrix -- 

206 but it is simple and works well in practice. 

207 

208 After scaling the variance plane, we detect sources and generate footprints 

209 by delegating to the @ref SourceDetectionTask_ "detection" subtask. 

210 

211 DetectCoaddSourcesTask is meant to be run after assembling a coadded image 

212 in a given band. The purpose of the task is to update the background, 

213 detect all sources in a single band and generate a set of parent 

214 footprints. Subsequent tasks in the multi-band processing procedure will 

215 merge sources across bands and, eventually, perform forced photometry. 

216 

217 Parameters 

218 ---------- 

219 schema : `lsst.afw.table.Schema`, optional 

220 Initial schema for the output catalog, modified-in place to include all 

221 fields set by this task. If None, the source minimal schema will be used. 

222 **kwargs 

223 Additional keyword arguments. 

224 """ 

225 

226 _DefaultName = "detectCoaddSources" 

227 ConfigClass = DetectCoaddSourcesConfig 

228 

229 def __init__(self, schema=None, **kwargs): 

230 # N.B. Super is used here to handle the multiple inheritance of PipelineTasks, the init tree 

231 # call structure has been reviewed carefully to be sure super will work as intended. 

232 super().__init__(**kwargs) 

233 if schema is None: 

234 schema = afwTable.SourceTable.makeMinimalSchema() 

235 self.schema = schema 

236 self.makeSubtask("detection", schema=self.schema) 

237 if self.config.doScaleVariance: 

238 self.makeSubtask("scaleVariance") 

239 

240 self.detectionSchema = afwTable.SourceCatalog(self.schema) 

241 

242 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

243 inputs = butlerQC.get(inputRefs) 

244 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId) 

245 

246 if self.config.useCellCoadds: 

247 multiple_cell_coadd = inputs.pop("exposure_cells") 

248 exposure = multiple_cell_coadd.stitch().asExposure() 

249 else: 

250 exposure = inputs.pop("exposure") 

251 

252 skyMap = inputs.pop("skyMap", None) 

253 if skyMap is not None: 

254 patchInfo = skyMap[butlerQC.quantum.dataId["tract"]][butlerQC.quantum.dataId["patch"]] 

255 else: 

256 patchInfo = None 

257 

258 assert not inputs, "runQuantum got more inputs than expected." 

259 try: 

260 outputs = self.run( 

261 exposure=exposure, 

262 idFactory=idGenerator.make_table_id_factory(), 

263 expId=idGenerator.catalog_id, 

264 patchInfo=patchInfo, 

265 ) 

266 except ( 

267 TooManyMaskedPixelsError, 

268 ExceedsMaxVarianceScaleError, 

269 InsufficientSourcesError, 

270 PsfGenerationError, 

271 ZeroFootprintError, 

272 ) as e: 

273 if self.config.writeEmptyBackgrounds: 

274 butlerQC.put(self._makeEmptyBackground(exposure, patchInfo), outputRefs.outputBackgrounds) 

275 # Detection failed, so clear any leftover the detected mask planes. 

276 for maskName in ["DETECTED", "DETECTED_NEGATIVE"]: 

277 if maskName in exposure.mask.getMaskPlaneDict().keys(): 

278 detectedMask = exposure.mask.getMaskPlane(maskName) 

279 exposure.mask.clearMaskPlane(detectedMask) 

280 butlerQC.put(exposure, outputRefs.outputExposure) 

281 error = AnnotatedPartialOutputsError.annotate( 

282 e, 

283 self, 

284 exposure, 

285 log=self.log, 

286 ) 

287 raise error from e 

288 

289 butlerQC.put(outputs, outputRefs) 

290 

291 def run(self, exposure, idFactory, expId, patchInfo=None): 

292 """Run detection on an exposure. 

293 

294 First scale the variance plane to match the observed variance 

295 using ``ScaleVarianceTask``. Then invoke the ``SourceDetectionTask_`` "detection" subtask to 

296 detect sources. 

297 

298 Parameters 

299 ---------- 

300 exposure : `lsst.afw.image.Exposure` 

301 Exposure on which to detect (may be background-subtracted and scaled, 

302 depending on configuration). 

303 idFactory : `lsst.afw.table.IdFactory` 

304 IdFactory to set source identifiers. 

305 expId : `int` 

306 Exposure identifier (integer) for RNG seed. 

307 patchInfo : `lsst.skymap.PatchInfo`, optional 

308 Description of the patch geometry. Only needed if 

309 `~DetectCoaddSourceConfig.forceExactBinning` is `True`. 

310 

311 Returns 

312 ------- 

313 result : `lsst.pipe.base.Struct` 

314 Results as a struct with attributes: 

315 

316 ``sources`` 

317 Catalog of detections (`lsst.afw.table.SourceCatalog`). 

318 ``backgrounds`` 

319 List of backgrounds (`list`). 

320 """ 

321 if self.config.forceExactBinning: 

322 exposure = self._cropToExactBinning(exposure, patchInfo) 

323 if self.config.doScaleVariance: 

324 varScale = self.scaleVariance.run(exposure.maskedImage) 

325 exposure.getMetadata().add("VARIANCE_SCALE", varScale) 

326 backgrounds = afwMath.BackgroundList() 

327 table = afwTable.SourceTable.make(self.schema, idFactory) 

328 detections = self.detection.run(table, exposure, expId=expId) 

329 sources = detections.sources 

330 if hasattr(detections, "background") and detections.background: 

331 for bg in detections.background: 

332 backgrounds.append(bg) 

333 if len(backgrounds) == 0: 

334 # Persist a constant background with value of NaN to get around 

335 # inability to persist empty BackgroundList. 

336 emptyBg = self._makeEmptyBackground(exposure, patchInfo) 

337 backgrounds.append(emptyBg) 

338 

339 return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure) 

340 

341 def _cropToExactBinning(self, exposure, patchInfo): 

342 """Crop a coadd `~lsst.afw.image.Exposure` instance to ensure exact 

343 background binning. 

344 

345 Parameters 

346 ---------- 

347 exposure : `lsst.afw.image.Exposure` 

348 Exposure to crop, assumed to cover the patch outer bounding box. 

349 patchInfo : `lsst.skymap.PatchInfo` 

350 Description of the patch geometry. 

351 

352 Returns 

353 ------- 

354 cropped : `lsst.afw.image.Exposure` 

355 View of ``exposure`` with background bins that evenly divide both 

356 the full cropped image and the patch inner region. The bounding 

357 box is guaranteed to contain the patch inner bounding box and be 

358 contained by the patch outer bounding box. 

359 

360 Raises 

361 ------ 

362 ValueError 

363 Raised if the patch inner region width or height is not a multiple 

364 of the background bin size. 

365 """ 

366 bbox = patchInfo.getInnerBBox() 

367 if bbox.width % self.detection.background.binSizeX: 

368 raise ValueError( 

369 f"Patch inner width {bbox.width} does not evenly " 

370 f"divide bin width {self.detection.background.binSizeX}." 

371 ) 

372 if bbox.height % self.detection.background.binSizeY: 

373 raise ValueError( 

374 f"Patch inner height {bbox.height} does not evenly " 

375 f"divide bin height {self.detection.background.binSizeY}." 

376 ) 

377 outer_bbox = patchInfo.getOuterBBox() 

378 n_bins_grow_x = (bbox.x.begin - outer_bbox.x.begin) // self.detection.background.binSizeX 

379 n_bins_grow_y = (bbox.y.begin - outer_bbox.y.begin) // self.detection.background.binSizeY 

380 bbox.grow( 

381 Extent2I( 

382 n_bins_grow_x*self.detection.background.binSizeX, 

383 n_bins_grow_y*self.detection.background.binSizeY, 

384 ) 

385 ) 

386 assert outer_bbox.contains(bbox) 

387 assert bbox.contains(patchInfo.getInnerBBox()) 

388 assert bbox.width % self.detection.background.binSizeX == 0 

389 assert bbox.height % self.detection.background.binSizeY == 0 

390 return exposure[bbox] 

391 

392 def _makeEmptyBackground(self, exposure, patchInfo=None): 

393 """Construct an empty `lsst.afw.math.BackgroundList` with NaN values. 

394 

395 Parameters 

396 ---------- 

397 exposure : `lsst.afw.image.Exposure` 

398 Exposure that the background should correspond to. 

399 patchInfo : `lsst.skymap.PatchInfo`, optional 

400 Description of the patch geometry. Only needed if 

401 `~DetectCoaddSourceConfig.forceExactBinning` is `True`. 

402 

403 Returns 

404 ------- 

405 background : `lsst.afw.math.BackgroundList` 

406 A background object with a single layer and the same bin geometry 

407 that a background for that exposure would have had if it had enough 

408 usable pixels. This object cannot actually be used for background 

409 subtraction. 

410 """ 

411 # Create a backgroundList with one entry whose "stats image" is NaNs 

412 # and has all pixels set as NO_DATA. 

413 if self.config.forceExactBinning: 

414 exposure = self._cropToExactBinning(exposure, patchInfo).clone() 

415 

416 bgLevel = np.nan 

417 bgStats = afwImage.MaskedImageF(1, 1) 

418 bgStats.set(bgLevel, 0, bgLevel) 

419 bg = afwMath.BackgroundMI(exposure.getBBox(), bgStats) 

420 bgData = (bg, afwMath.Interpolate.LINEAR, afwMath.REDUCE_INTERP_ORDER, 

421 afwMath.ApproximateControl.UNKNOWN, 0, 0, False) 

422 background = afwMath.BackgroundList() 

423 background.append(bgData) 

424 for bg, *_ in background: 

425 stats = bg.getStatsImage() 

426 stats.mask.array[:, :] = stats.mask.getPlaneBitMask("NO_DATA") 

427 stats.variance.array[:, :] = 0.0 

428 return background 

429 

430 

431class MeasureMergedCoaddSourcesConnections( 

432 PipelineTaskConnections, 

433 dimensions=("tract", "patch", "band", "skymap"), 

434 defaultTemplates={ 

435 "inputCoaddName": "deep", 

436 "outputCoaddName": "deep", 

437 "deblendedCatalog": "deblendedFlux", 

438 }, 

439 deprecatedTemplates={ 

440 # TODO[DM-47797]: remove this deprecated connection template. 

441 "deblendedCatalog": "Support for old deblender outputs will be removed after v29." 

442 }, 

443): 

444 inputSchema = cT.InitInput( 

445 doc="Input schema for measure merged task produced by a deblender or detection task", 

446 name="{inputCoaddName}Coadd_deblendedFlux_schema", 

447 storageClass="SourceCatalog" 

448 ) 

449 outputSchema = cT.InitOutput( 

450 doc="Output schema after all new fields are added by task", 

451 name="{inputCoaddName}Coadd_meas_schema", 

452 storageClass="SourceCatalog" 

453 ) 

454 # TODO[DM-47797]: remove this deprecated connection. 

455 refCat = cT.PrerequisiteInput( 

456 doc="Reference catalog used to match measured sources against known sources", 

457 name="ref_cat", 

458 storageClass="SimpleCatalog", 

459 dimensions=("skypix",), 

460 deferLoad=True, 

461 multiple=True, 

462 deprecated="Reference matching in measureCoaddSources will be removed after v29.", 

463 ) 

464 exposure = cT.Input( 

465 doc="Input non-cell-based coadd image", 

466 name="{inputCoaddName}Coadd_calexp", 

467 storageClass="ExposureF", 

468 dimensions=("tract", "patch", "band", "skymap") 

469 ) 

470 exposure_cells = cT.Input( 

471 doc="Input cell-based coadd image", 

472 name="{inputCoaddName}CoaddCell", 

473 storageClass="MultipleCellCoadd", 

474 dimensions=("tract", "patch", "band", "skymap"), 

475 ) 

476 background = cT.Input( 

477 doc="Background to subtract from cell-based coadd image", 

478 name="{inputCoaddName}Coadd_calexp_background", 

479 storageClass="Background", 

480 dimensions=("tract", "patch", "band", "skymap") 

481 ) 

482 skyMap = cT.Input( 

483 doc="SkyMap to use in processing", 

484 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

485 storageClass="SkyMap", 

486 dimensions=("skymap",), 

487 ) 

488 # TODO[DM-47424]: remove this deprecated connection. 

489 visitCatalogs = cT.Input( 

490 doc="Deprecated and unused.", 

491 name="src", 

492 dimensions=("instrument", "visit", "detector"), 

493 storageClass="SourceCatalog", 

494 multiple=True, 

495 deprecated="Deprecated and unused. Will be removed after v29.", 

496 ) 

497 sourceTableHandles = cT.Input( 

498 doc=("Source tables that are derived from the ``CalibrateTask`` sources. " 

499 "These tables contain astrometry and photometry flags, and optionally " 

500 "PSF flags."), 

501 name="sourceTable_visit", 

502 storageClass="ArrowAstropy", 

503 dimensions=("instrument", "visit"), 

504 multiple=True, 

505 deferLoad=True, 

506 ) 

507 finalizedSourceTableHandles = cT.Input( 

508 doc=("Finalized source tables from ``FinalizeCalibrationTask``. These " 

509 "tables contain PSF flags from the finalized PSF estimation."), 

510 name="finalized_src_table", 

511 storageClass="ArrowAstropy", 

512 dimensions=("instrument", "visit"), 

513 multiple=True, 

514 deferLoad=True, 

515 ) 

516 finalVisitSummaryHandles = cT.Input( 

517 doc="Final visit summary table", 

518 name="finalVisitSummary", 

519 storageClass="ExposureCatalog", 

520 dimensions=("instrument", "visit"), 

521 multiple=True, 

522 deferLoad=True, 

523 ) 

524 # TODO[DM-47797]: remove this deprecated connection. 

525 inputCatalog = cT.Input( 

526 doc=("Name of the input catalog to use." 

527 "If the single band deblender was used this should be 'deblendedFlux." 

528 "If the multi-band deblender was used this should be 'deblendedModel, " 

529 "or deblendedFlux if the multiband deblender was configured to output " 

530 "deblended flux catalogs. If no deblending was performed this should " 

531 "be 'mergeDet'"), 

532 name="{inputCoaddName}Coadd_{deblendedCatalog}", 

533 storageClass="SourceCatalog", 

534 deprecated="Support for old deblender outputs will be removed after v29.", 

535 dimensions=("tract", "patch", "band", "skymap"), 

536 ) 

537 scarletCatalog = cT.Input( 

538 doc="Catalogs produced by multiband deblending", 

539 name="{inputCoaddName}Coadd_deblendedCatalog", 

540 storageClass="SourceCatalog", 

541 dimensions=("tract", "patch", "skymap"), 

542 ) 

543 scarletModels = cT.Input( 

544 doc="Multiband scarlet models produced by the deblender", 

545 name="{inputCoaddName}Coadd_scarletModelData", 

546 storageClass="LsstScarletModelData", 

547 dimensions=("tract", "patch", "skymap"), 

548 ) 

549 outputSources = cT.Output( 

550 doc="Source catalog containing all the measurement information generated in this task", 

551 name="{outputCoaddName}Coadd_meas", 

552 dimensions=("tract", "patch", "band", "skymap"), 

553 storageClass="SourceCatalog", 

554 ) 

555 # TODO[DM-47797]: remove this deprecated connection. 

556 matchResult = cT.Output( 

557 doc="Match catalog produced by configured matcher, optional on doMatchSources", 

558 name="{outputCoaddName}Coadd_measMatch", 

559 dimensions=("tract", "patch", "band", "skymap"), 

560 storageClass="Catalog", 

561 deprecated="Reference matching in measureCoaddSources will be removed after v29.", 

562 ) 

563 # TODO[DM-47797]: remove this deprecated connection. 

564 denormMatches = cT.Output( 

565 doc="Denormalized Match catalog produced by configured matcher, optional on " 

566 "doWriteMatchesDenormalized", 

567 name="{outputCoaddName}Coadd_measMatchFull", 

568 dimensions=("tract", "patch", "band", "skymap"), 

569 storageClass="Catalog", 

570 deprecated="Reference matching in measureCoaddSources will be removed after v29.", 

571 ) 

572 

573 def __init__(self, *, config=None): 

574 super().__init__(config=config) 

575 del self.visitCatalogs 

576 if not config.doPropagateFlags: 

577 del self.sourceTableHandles 

578 del self.finalizedSourceTableHandles 

579 else: 

580 # Check for types of flags required. 

581 if not config.propagateFlags.source_flags: 

582 del self.sourceTableHandles 

583 if not config.propagateFlags.finalized_source_flags: 

584 del self.finalizedSourceTableHandles 

585 # TODO[DM-47797]: only the 'if' block contents here should survive. 

586 if config.inputCatalog == "deblendedCatalog": 

587 del self.inputCatalog 

588 if not config.doAddFootprints: 

589 del self.scarletModels 

590 else: 

591 del self.deblendedCatalog 

592 del self.scarletModels 

593 

594 # TODO[DM-47797]: delete the conditionals below. 

595 if not config.doMatchSources: 

596 del self.refCat 

597 del self.matchResult 

598 

599 if not config.doWriteMatchesDenormalized: 

600 del self.denormMatches 

601 

602 if config.useCellCoadds: 

603 del self.exposure 

604 else: 

605 del self.exposure_cells 

606 del self.background 

607 

608 

609class MeasureMergedCoaddSourcesConfig(PipelineTaskConfig, 

610 pipelineConnections=MeasureMergedCoaddSourcesConnections): 

611 """Configuration parameters for the MeasureMergedCoaddSourcesTask 

612 """ 

613 inputCatalog = ChoiceField( 

614 dtype=str, 

615 default="deblendedCatalog", 

616 allowed={ 

617 "deblendedCatalog": "Output catalog from ScarletDeblendTask", 

618 "deblendedFlux": "Output catalog from SourceDeblendTask", 

619 "mergeDet": "The merged detections before deblending." 

620 }, 

621 doc="The name of the input catalog.", 

622 # TODO[DM-47797]: remove this config option and anything using it. 

623 deprecated="Support for old deblender outputs will be removed after v29.", 

624 ) 

625 doAddFootprints = Field(dtype=bool, 

626 default=True, 

627 doc="Whether or not to add footprints to the input catalog from scarlet models. " 

628 "This should be true whenever using the multi-band deblender, " 

629 "otherwise this should be False.") 

630 doConserveFlux = Field(dtype=bool, default=True, 

631 doc="Whether to use the deblender models as templates to re-distribute the flux " 

632 "from the 'exposure' (True), or to perform measurements on the deblender " 

633 "model footprints.") 

634 doStripFootprints = Field(dtype=bool, default=True, 

635 doc="Whether to strip footprints from the output catalog before " 

636 "saving to disk. " 

637 "This is usually done when using scarlet models to save disk space.") 

638 useCellCoadds = Field(dtype=bool, default=False, doc="Whether to use cell coadds?") 

639 measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc="Source measurement") 

640 setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc="Set flags for primary tract/patch") 

641 doPropagateFlags = Field( 

642 dtype=bool, default=True, 

643 doc="Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)" 

644 ) 

645 propagateFlags = ConfigurableField(target=PropagateSourceFlagsTask, doc="Propagate source flags to coadd") 

646 doMatchSources = Field( 

647 dtype=bool, 

648 default=False, 

649 doc="Match sources to reference catalog?", 

650 deprecated="Reference matching in measureCoaddSources will be removed after v29.", 

651 ) 

652 match = ConfigurableField( 

653 target=DirectMatchTask, 

654 doc="Matching to reference catalog", 

655 deprecated="Reference matching in measureCoaddSources will be removed after v29.", 

656 ) 

657 doWriteMatchesDenormalized = Field( 

658 dtype=bool, 

659 default=False, 

660 doc=("Write reference matches in denormalized format? " 

661 "This format uses more disk space, but is more convenient to read."), 

662 deprecated="Reference matching in measureCoaddSources will be removed after v29.", 

663 ) 

664 coaddName = Field(dtype=str, default="deep", doc="Name of coadd") 

665 psfCache = Field(dtype=int, default=100, doc="Size of psfCache") 

666 checkUnitsParseStrict = Field( 

667 doc="Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'", 

668 dtype=str, 

669 default="raise", 

670 ) 

671 doApCorr = Field( 

672 dtype=bool, 

673 default=True, 

674 doc="Apply aperture corrections" 

675 ) 

676 applyApCorr = ConfigurableField( 

677 target=ApplyApCorrTask, 

678 doc="Subtask to apply aperture corrections" 

679 ) 

680 doRunCatalogCalculation = Field( 

681 dtype=bool, 

682 default=True, 

683 doc='Run catalogCalculation task' 

684 ) 

685 catalogCalculation = ConfigurableField( 

686 target=CatalogCalculationTask, 

687 doc="Subtask to run catalogCalculation plugins on catalog" 

688 ) 

689 

690 hasFakes = Field( 

691 dtype=bool, 

692 default=False, 

693 doc="Should be set to True if fake sources have been inserted into the input data." 

694 ) 

695 idGenerator = SkyMapIdGeneratorConfig.make_field() 

696 

697 @property 

698 def refObjLoader(self): 

699 return self.match.refObjLoader 

700 

701 def setDefaults(self): 

702 super().setDefaults() 

703 self.measurement.plugins.names |= ['base_InputCount', 

704 'base_Variance', 

705 'base_LocalPhotoCalib', 

706 'base_LocalWcs'] 

707 

708 # TODO: Remove STREAK in DM-44658, streak masking to happen only in 

709 # ip_diffim; if we can propagate the streak mask from diffim, we can 

710 # still set flags with it here. 

711 self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['CLIPPED', 'SENSOR_EDGE', 

712 'INEXACT_PSF'] 

713 self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['CLIPPED', 'SENSOR_EDGE', 

714 'INEXACT_PSF'] 

715 

716 def validate(self): 

717 super().validate() 

718 

719 if not self.doMatchSources and self.doWriteMatchesDenormalized: 

720 raise ValueError("Cannot set doWriteMatchesDenormalized if doMatchSources is False.") 

721 

722 

723class MeasureMergedCoaddSourcesTask(PipelineTask): 

724 """Deblend sources from main catalog in each coadd seperately and measure. 

725 

726 Use peaks and footprints from a master catalog to perform deblending and 

727 measurement in each coadd. 

728 

729 Given a master input catalog of sources (peaks and footprints) or deblender 

730 outputs(including a HeavyFootprint in each band), measure each source on 

731 the coadd. Repeating this procedure with the same master catalog across 

732 multiple coadds will generate a consistent set of child sources. 

733 

734 The deblender retains all peaks and deblends any missing peaks (dropouts in 

735 that band) as PSFs. Source properties are measured and the @c is-primary 

736 flag (indicating sources with no children) is set. Visit flags are 

737 propagated to the coadd sources. 

738 

739 Optionally, we can match the coadd sources to an external reference 

740 catalog. 

741 

742 After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we 

743 have a set of per-band catalogs. The next stage in the multi-band 

744 processing procedure will merge these measurements into a suitable catalog 

745 for driving forced photometry. 

746 

747 Parameters 

748 ---------- 

749 schema : ``lsst.afw.table.Schema`, optional 

750 The schema of the merged detection catalog used as input to this one. 

751 peakSchema : ``lsst.afw.table.Schema`, optional 

752 The schema of the PeakRecords in the Footprints in the merged detection catalog. 

753 refObjLoader : `lsst.meas.algorithms.ReferenceObjectLoader`, optional 

754 An instance of ReferenceObjectLoader that supplies an external reference 

755 catalog. May be None if the loader can be constructed from the butler argument or all steps 

756 requiring a reference catalog are disabled. 

757 initInputs : `dict`, optional 

758 Dictionary that can contain a key ``inputSchema`` containing the 

759 input schema. If present will override the value of ``schema``. 

760 **kwargs 

761 Additional keyword arguments. 

762 """ 

763 

764 _DefaultName = "measureCoaddSources" 

765 ConfigClass = MeasureMergedCoaddSourcesConfig 

766 

767 def __init__(self, schema=None, peakSchema=None, refObjLoader=None, initInputs=None, 

768 **kwargs): 

769 super().__init__(**kwargs) 

770 self.deblended = self.config.inputCatalog.startswith("deblended") 

771 self.inputCatalog = "Coadd_" + self.config.inputCatalog 

772 if initInputs is not None: 

773 schema = initInputs['inputSchema'].schema 

774 if schema is None: 

775 raise ValueError("Schema must be defined.") 

776 self.schemaMapper = afwTable.SchemaMapper(schema) 

777 self.schemaMapper.addMinimalSchema(schema) 

778 self.schema = self.schemaMapper.getOutputSchema() 

779 self.algMetadata = PropertyList() 

780 self.makeSubtask("measurement", schema=self.schema, algMetadata=self.algMetadata) 

781 self.makeSubtask("setPrimaryFlags", schema=self.schema) 

782 # TODO[DM-47797]: remove match subtask 

783 if self.config.doMatchSources: 

784 self.makeSubtask("match", refObjLoader=refObjLoader) 

785 if self.config.doPropagateFlags: 

786 self.makeSubtask("propagateFlags", schema=self.schema) 

787 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict) 

788 if self.config.doApCorr: 

789 self.makeSubtask("applyApCorr", schema=self.schema) 

790 if self.config.doRunCatalogCalculation: 

791 self.makeSubtask("catalogCalculation", schema=self.schema) 

792 

793 self.outputSchema = afwTable.SourceCatalog(self.schema) 

794 

795 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

796 inputs = butlerQC.get(inputRefs) 

797 

798 # TODO[DM-47797]: remove this block 

799 if self.config.doMatchSources: 

800 refObjLoader = ReferenceObjectLoader([ref.datasetRef.dataId for ref in inputRefs.refCat], 

801 inputs.pop('refCat'), 

802 name=self.config.connections.refCat, 

803 config=self.config.refObjLoader, 

804 log=self.log) 

805 self.match.setRefObjLoader(refObjLoader) 

806 

807 if self.config.useCellCoadds: 

808 multiple_cell_coadd = inputs.pop("exposure_cells") 

809 stitched_coadd = multiple_cell_coadd.stitch() 

810 exposure = stitched_coadd.asExposure() 

811 background = inputs.pop("background") 

812 exposure.image -= background.getImage() 

813 

814 ccdInputs = stitched_coadd.ccds 

815 apCorrMap = stitched_coadd.ap_corr_map 

816 band = inputRefs.exposure_cells.dataId["band"] 

817 else: 

818 exposure = inputs.pop("exposure") 

819 # Set psfcache 

820 # move this to run after gen2 deprecation 

821 exposure.getPsf().setCacheCapacity(self.config.psfCache) 

822 

823 ccdInputs = exposure.getInfo().getCoaddInputs().ccds 

824 apCorrMap = exposure.getInfo().getApCorrMap() 

825 band = inputRefs.exposure.dataId["band"] 

826 

827 # Get unique integer ID for IdFactory and RNG seeds; only the latter 

828 # should really be used as the IDs all come from the input catalog. 

829 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId) 

830 

831 # Transform inputCatalog 

832 table = afwTable.SourceTable.make(self.schema, idGenerator.make_table_id_factory()) 

833 sources = afwTable.SourceCatalog(table) 

834 # Load the correct input catalog 

835 if "scarletCatalog" in inputs: 

836 inputCatalog = inputs.pop("scarletCatalog") 

837 catalogRef = inputRefs.scarletCatalog 

838 else: 

839 inputCatalog = inputs.pop("inputCatalog") 

840 catalogRef = inputRefs.inputCatalog 

841 sources.extend(inputCatalog, self.schemaMapper) 

842 del inputCatalog 

843 # Add the HeavyFootprints to the deblended sources 

844 if self.config.doAddFootprints: 

845 modelData = inputs.pop('scarletModels') 

846 if self.config.doConserveFlux: 

847 imageForRedistribution = exposure 

848 else: 

849 imageForRedistribution = None 

850 updateCatalogFootprints( 

851 modelData=modelData, 

852 catalog=sources, 

853 band=band, 

854 imageForRedistribution=imageForRedistribution, 

855 removeScarletData=True, 

856 updateFluxColumns=True, 

857 ) 

858 table = sources.getTable() 

859 table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog. 

860 

861 skyMap = inputs.pop('skyMap') 

862 tractNumber = catalogRef.dataId['tract'] 

863 tractInfo = skyMap[tractNumber] 

864 patchInfo = tractInfo.getPatchInfo(catalogRef.dataId['patch']) 

865 skyInfo = Struct( 

866 skyMap=skyMap, 

867 tractInfo=tractInfo, 

868 patchInfo=patchInfo, 

869 wcs=tractInfo.getWcs(), 

870 bbox=patchInfo.getOuterBBox() 

871 ) 

872 

873 if self.config.doPropagateFlags: 

874 if "sourceTableHandles" in inputs: 

875 sourceTableHandles = inputs.pop("sourceTableHandles") 

876 sourceTableHandleDict = {handle.dataId["visit"]: handle for handle in sourceTableHandles} 

877 else: 

878 sourceTableHandleDict = None 

879 if "finalizedSourceTableHandles" in inputs: 

880 finalizedSourceTableHandles = inputs.pop("finalizedSourceTableHandles") 

881 finalizedSourceTableHandleDict = {handle.dataId["visit"]: handle 

882 for handle in finalizedSourceTableHandles} 

883 else: 

884 finalizedSourceTableHandleDict = None 

885 if "finalVisitSummaryHandles" in inputs: 

886 finalVisitSummaryHandles = inputs.pop("finalVisitSummaryHandles") 

887 finalVisitSummaryHandleDict = {handle.dataId["visit"]: handle 

888 for handle in finalVisitSummaryHandles} 

889 else: 

890 finalVisitSummaryHandleDict = None 

891 

892 assert not inputs, "runQuantum got more inputs than expected." 

893 outputs = self.run( 

894 exposure=exposure, 

895 sources=sources, 

896 skyInfo=skyInfo, 

897 exposureId=idGenerator.catalog_id, 

898 ccdInputs=ccdInputs, 

899 sourceTableHandleDict=sourceTableHandleDict, 

900 finalizedSourceTableHandleDict=finalizedSourceTableHandleDict, 

901 finalVisitSummaryHandleDict=finalVisitSummaryHandleDict, 

902 apCorrMap=apCorrMap, 

903 ) 

904 # Strip HeavyFootprints to save space on disk 

905 if self.config.doStripFootprints: 

906 sources = outputs.outputSources 

907 for source in sources[sources["parent"] != 0]: 

908 source.setFootprint(None) 

909 butlerQC.put(outputs, outputRefs) 

910 

911 def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, 

912 sourceTableHandleDict=None, finalizedSourceTableHandleDict=None, finalVisitSummaryHandleDict=None, 

913 apCorrMap=None): 

914 """Run measurement algorithms on the input exposure, and optionally populate the 

915 resulting catalog with extra information. 

916 

917 Parameters 

918 ---------- 

919 exposure : `lsst.afw.exposure.Exposure` 

920 The input exposure on which measurements are to be performed. 

921 sources : `lsst.afw.table.SourceCatalog` 

922 A catalog built from the results of merged detections, or 

923 deblender outputs. 

924 parentCatalog : `lsst.afw.table.SourceCatalog` 

925 Catalog of parent sources corresponding to sources. 

926 skyInfo : `lsst.pipe.base.Struct` 

927 A struct containing information about the position of the input exposure within 

928 a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box. 

929 exposureId : `int` or `bytes` 

930 Packed unique number or bytes unique to the input exposure. 

931 ccdInputs : `lsst.afw.table.ExposureCatalog`, optional 

932 Catalog containing information on the individual visits which went into making 

933 the coadd. 

934 sourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional 

935 Dict for sourceTable_visit handles (key is visit) for propagating flags. 

936 These tables contain astrometry and photometry flags, and optionally PSF flags. 

937 finalizedSourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional 

938 Dict for finalized_src_table handles (key is visit) for propagating flags. 

939 These tables contain PSF flags from the finalized PSF estimation. 

940 finalVisitSummaryHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional 

941 Dict for visit_summary handles (key is visit) for visit-level information. 

942 These tables contain the WCS information of the single-visit input images. 

943 apCorrMap : `lsst.afw.image.ApCorrMap`, optional 

944 Aperture correction map attached to the ``exposure``. If None, it 

945 will be read from the ``exposure``. 

946 

947 Returns 

948 ------- 

949 results : `lsst.pipe.base.Struct` 

950 Results of running measurement task. Will contain the catalog in the 

951 sources attribute. Optionally will have results of matching to a 

952 reference catalog in the matchResults attribute, and denormalized 

953 matches in the denormMatches attribute. 

954 """ 

955 if self.config.doPropagateFlags: 

956 # These mask planes may not be defined on the coadds always. 

957 # We add the mask planes, which is a no-op if already defined. 

958 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpAnywhere: 

959 exposure.mask.addMaskPlane(maskPlane) 

960 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpCenter: 

961 exposure.mask.addMaskPlane(maskPlane) 

962 

963 self.measurement.run(sources, exposure, exposureId=exposureId) 

964 

965 if self.config.doApCorr: 

966 if apCorrMap is None: 

967 apCorrMap = exposure.getInfo().getApCorrMap() 

968 self.applyApCorr.run( 

969 catalog=sources, 

970 apCorrMap=apCorrMap, 

971 ) 

972 

973 # TODO DM-11568: this contiguous check-and-copy could go away if we 

974 # reserve enough space during SourceDetection and/or SourceDeblend. 

975 # NOTE: sourceSelectors require contiguous catalogs, so ensure 

976 # contiguity now, so views are preserved from here on. 

977 if not sources.isContiguous(): 

978 sources = sources.copy(deep=True) 

979 

980 if self.config.doRunCatalogCalculation: 

981 self.catalogCalculation.run(sources) 

982 

983 self.setPrimaryFlags.run(sources, skyMap=skyInfo.skyMap, tractInfo=skyInfo.tractInfo, 

984 patchInfo=skyInfo.patchInfo) 

985 if self.config.doPropagateFlags: 

986 self.propagateFlags.run( 

987 sources, 

988 ccdInputs, 

989 sourceTableHandleDict, 

990 finalizedSourceTableHandleDict, 

991 finalVisitSummaryHandleDict, 

992 ) 

993 

994 results = Struct() 

995 

996 # TODO[DM-47797]: remove this block 

997 if self.config.doMatchSources: 

998 matchResult = self.match.run(sources, exposure.getInfo().getFilter().bandLabel) 

999 matches = afwTable.packMatches(matchResult.matches) 

1000 matches.table.setMetadata(matchResult.matchMeta) 

1001 results.matchResult = matches 

1002 if self.config.doWriteMatchesDenormalized: 

1003 if matchResult.matches: 

1004 denormMatches = denormalizeMatches(matchResult.matches, matchResult.matchMeta) 

1005 else: 

1006 self.log.warning("No matches, so generating dummy denormalized matches file") 

1007 denormMatches = afwTable.BaseCatalog(afwTable.Schema()) 

1008 denormMatches.setMetadata(PropertyList()) 

1009 denormMatches.getMetadata().add("COMMENT", 

1010 "This catalog is empty because no matches were found.") 

1011 results.denormMatches = denormMatches 

1012 results.denormMatches = denormMatches 

1013 

1014 results.outputSources = sources 

1015 return results