lsst.pipe.tasks gef5401d743+4408856ac0
Loading...
Searching...
No Matches
multiBand.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["DetectCoaddSourcesConfig", "DetectCoaddSourcesTask",
23 "MeasureMergedCoaddSourcesConfig", "MeasureMergedCoaddSourcesTask",
24 ]
25
26import numpy as np
27
28from lsst.geom import Extent2I
29from lsst.pipe.base import (
30 AnnotatedPartialOutputsError,
31 Struct,
32 PipelineTask,
33 PipelineTaskConfig,
34 PipelineTaskConnections
35)
36import lsst.pipe.base.connectionTypes as cT
37from lsst.pex.config import Field, ConfigurableField, ChoiceField
38from lsst.meas.algorithms import (
39 DynamicDetectionTask,
40 ExceedsMaxVarianceScaleError,
41 InsufficientSourcesError,
42 PsfGenerationError,
43 ReferenceObjectLoader,
44 ScaleVarianceTask,
45 SetPrimaryFlagsTask,
46 TooManyMaskedPixelsError,
47 ZeroFootprintError,
48)
49from lsst.meas.base import (
50 SingleFrameMeasurementTask,
51 ApplyApCorrTask,
52 CatalogCalculationTask,
53 SkyMapIdGeneratorConfig,
54)
55from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
56from lsst.meas.astrom import DirectMatchTask, denormalizeMatches
57from lsst.pipe.tasks.propagateSourceFlags import PropagateSourceFlagsTask
58import lsst.afw.image as afwImage
59import lsst.afw.math as afwMath
60import lsst.afw.table as afwTable
61from lsst.daf.base import PropertyList
62from lsst.skymap import BaseSkyMap
63
64# NOTE: these imports are a convenience so multiband users only have to import this file.
65from .mergeDetections import MergeDetectionsConfig, MergeDetectionsTask # noqa: F401
66from .mergeMeasurements import MergeMeasurementsConfig, MergeMeasurementsTask # noqa: F401
67from .multiBandUtils import CullPeaksConfig # noqa: F401
68from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiConfig # noqa: F401
69from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiTask # noqa: F401
70
71
72"""
73New set types:
74* deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
75* deepCoadd_mergeDet: merged detections (tract, patch)
76* deepCoadd_meas: measurements of merged detections (tract, patch, filter)
77* deepCoadd_ref: reference sources (tract, patch)
78All of these have associated *_schema catalogs that require no data ID and hold no records.
79
80In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
81the mergeDet, meas, and ref dataset Footprints:
82* deepCoadd_peak_schema
83"""
84
85
86
87class DetectCoaddSourcesConnections(PipelineTaskConnections,
88 dimensions=("tract", "patch", "band", "skymap"),
89 defaultTemplates={"inputCoaddName": "deep", "outputCoaddName": "deep"}):
90 detectionSchema = cT.InitOutput(
91 doc="Schema of the detection catalog",
92 name="{outputCoaddName}Coadd_det_schema",
93 storageClass="SourceCatalog",
94 )
95 exposure = cT.Input(
96 doc="Exposure on which detections are to be performed. ",
97 name="{inputCoaddName}Coadd",
98 storageClass="ExposureF",
99 dimensions=("tract", "patch", "band", "skymap")
100 )
101 exposure_cells = cT.Input(
102 doc="Exposure on which detections are to be performed. ",
103 name="{inputCoaddName}CoaddCell",
104 storageClass="MultipleCellCoadd",
105 dimensions=("tract", "patch", "band", "skymap"),
106 )
107 skyMap = cT.Input(
108 doc="Description of the skymap's tracts and patches.",
109 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
110 storageClass="SkyMap",
111 dimensions=("skymap",),
112 )
113 outputBackgrounds = cT.Output(
114 doc="Output Backgrounds used in detection",
115 name="{outputCoaddName}Coadd_calexp_background",
116 storageClass="Background",
117 dimensions=("tract", "patch", "band", "skymap")
118 )
119 outputSources = cT.Output(
120 doc="Detected sources catalog",
121 name="{outputCoaddName}Coadd_det",
122 storageClass="SourceCatalog",
123 dimensions=("tract", "patch", "band", "skymap")
124 )
125 outputExposure = cT.Output(
126 doc="Exposure post detection",
127 name="{outputCoaddName}Coadd_calexp",
128 storageClass="ExposureF",
129 dimensions=("tract", "patch", "band", "skymap")
130 )
131
132 def __init__(self, *, config=None):
133 super().__init__(config=config)
134 assert isinstance(config, DetectCoaddSourcesConfig)
135
136 if config.useCellCoadds:
137 del self.exposure
138 else:
139 del self.exposure_cells
140
141 if not self.config.forceExactBinning:
142 del self.skyMap
143 if self.config.writeOnlyBackgrounds:
144 del self.outputExposure
145 del self.outputSources
146 del self.detectionSchema
147
148
149class DetectCoaddSourcesConfig(PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
150 """Configuration parameters for the DetectCoaddSourcesTask
151 """
152
153 doScaleVariance = Field(dtype=bool, default=True, doc="Scale variance plane using empirical noise?")
154 scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc="Variance rescaling")
155 detection = ConfigurableField(target=DynamicDetectionTask, doc="Source detection")
156 coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
157 useCellCoadds = Field(dtype=bool, default=False, doc="Whether to use cell coadds?")
158 hasFakes = Field(
159 dtype=bool,
160 default=False,
161 doc="Should be set to True if fake sources have been inserted into the input data.",
162 )
163 idGenerator = SkyMapIdGeneratorConfig.make_field()
164 forceExactBinning = Field(
165 dtype=bool,
166 default=False,
167 doc=(
168 "Check that the background bin size evenly divides the patch inner region, and "
169 "crop the outer region to an integer number of bins."
170 )
171 )
172 writeOnlyBackgrounds = Field(dtype=bool, default=False, doc="If true, only save the background models.")
173 writeEmptyBackgrounds = Field(
174 dtype=bool,
175 default=True,
176 doc=(
177 "If true, save a placeholder background with NaNs in all bins (but the right geometry) when "
178 "there are no pixels to compute a background from. This can be useful if a later task combines "
179 "backgrounds from multiple patches as input."
180 )
181 )
182
183 def setDefaults(self):
184 super().setDefaults()
185 self.detection.thresholdType = "pixel_stdev"
186 self.detection.isotropicGrow = True
187 # Coadds are made from background-subtracted CCDs, so any background subtraction should be very basic
188 self.detection.reEstimateBackground = False
189 self.detection.background.useApprox = False
190 self.detection.background.binSize = 4096
191 self.detection.background.undersampleStyle = 'REDUCE_INTERP_ORDER'
192 self.detection.doTempWideBackground = True # Suppress large footprints that overwhelm the deblender
193 # Include band in packed data IDs that go into object IDs (None -> "as
194 # many bands as are defined", rather than the default of zero).
195 self.idGenerator.packer.n_bands = None
196
197
198class DetectCoaddSourcesTask(PipelineTask):
199 """Detect sources on a single filter coadd.
200
201 Coadding individual visits requires each exposure to be warped. This
202 introduces covariance in the noise properties across pixels. Before
203 detection, we correct the coadd variance by scaling the variance plane in
204 the coadd to match the observed variance. This is an approximate
205 approach -- strictly, we should propagate the full covariance matrix --
206 but it is simple and works well in practice.
207
208 After scaling the variance plane, we detect sources and generate footprints
209 by delegating to the @ref SourceDetectionTask_ "detection" subtask.
210
211 DetectCoaddSourcesTask is meant to be run after assembling a coadded image
212 in a given band. The purpose of the task is to update the background,
213 detect all sources in a single band and generate a set of parent
214 footprints. Subsequent tasks in the multi-band processing procedure will
215 merge sources across bands and, eventually, perform forced photometry.
216
217 Parameters
218 ----------
219 schema : `lsst.afw.table.Schema`, optional
220 Initial schema for the output catalog, modified-in place to include all
221 fields set by this task. If None, the source minimal schema will be used.
222 **kwargs
223 Additional keyword arguments.
224 """
225
226 _DefaultName = "detectCoaddSources"
227 ConfigClass = DetectCoaddSourcesConfig
228
229 def __init__(self, schema=None, **kwargs):
230 # N.B. Super is used here to handle the multiple inheritance of PipelineTasks, the init tree
231 # call structure has been reviewed carefully to be sure super will work as intended.
232 super().__init__(**kwargs)
233 if schema is None:
234 schema = afwTable.SourceTable.makeMinimalSchema()
235 self.schema = schema
236 self.makeSubtask("detection", schema=self.schema)
237 if self.config.doScaleVariance:
238 self.makeSubtask("scaleVariance")
239
240 self.detectionSchema = afwTable.SourceCatalog(self.schema)
241
242 def runQuantum(self, butlerQC, inputRefs, outputRefs):
243 inputs = butlerQC.get(inputRefs)
244 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
245
246 if self.config.useCellCoadds:
247 multiple_cell_coadd = inputs.pop("exposure_cells")
248 exposure = multiple_cell_coadd.stitch().asExposure()
249 else:
250 exposure = inputs.pop("exposure")
251
252 skyMap = inputs.pop("skyMap", None)
253 if skyMap is not None:
254 patchInfo = skyMap[butlerQC.quantum.dataId["tract"]][butlerQC.quantum.dataId["patch"]]
255 else:
256 patchInfo = None
257
258 assert not inputs, "runQuantum got more inputs than expected."
259 try:
260 outputs = self.run(
261 exposure=exposure,
262 idFactory=idGenerator.make_table_id_factory(),
263 expId=idGenerator.catalog_id,
264 patchInfo=patchInfo,
265 )
266 except (
267 TooManyMaskedPixelsError,
268 ExceedsMaxVarianceScaleError,
269 InsufficientSourcesError,
270 PsfGenerationError,
271 ZeroFootprintError,
272 ) as e:
273 if self.config.writeEmptyBackgrounds:
274 butlerQC.put(self._makeEmptyBackground(exposure, patchInfo), outputRefs.outputBackgrounds)
275 # Detection failed, so clear any leftover the detected mask planes.
276 for maskName in ["DETECTED", "DETECTED_NEGATIVE"]:
277 if maskName in exposure.mask.getMaskPlaneDict().keys():
278 detectedMask = exposure.mask.getMaskPlane(maskName)
279 exposure.mask.clearMaskPlane(detectedMask)
280 butlerQC.put(exposure, outputRefs.outputExposure)
281 error = AnnotatedPartialOutputsError.annotate(
282 e,
283 self,
284 exposure,
285 log=self.log,
286 )
287 raise error from e
288
289 butlerQC.put(outputs, outputRefs)
290
291 def run(self, exposure, idFactory, expId, patchInfo=None):
292 """Run detection on an exposure.
293
294 First scale the variance plane to match the observed variance
295 using ``ScaleVarianceTask``. Then invoke the ``SourceDetectionTask_`` "detection" subtask to
296 detect sources.
297
298 Parameters
299 ----------
300 exposure : `lsst.afw.image.Exposure`
301 Exposure on which to detect (may be background-subtracted and scaled,
302 depending on configuration).
303 idFactory : `lsst.afw.table.IdFactory`
304 IdFactory to set source identifiers.
305 expId : `int`
306 Exposure identifier (integer) for RNG seed.
307 patchInfo : `lsst.skymap.PatchInfo`, optional
308 Description of the patch geometry. Only needed if
309 `~DetectCoaddSourceConfig.forceExactBinning` is `True`.
310
311 Returns
312 -------
313 result : `lsst.pipe.base.Struct`
314 Results as a struct with attributes:
315
316 ``sources``
317 Catalog of detections (`lsst.afw.table.SourceCatalog`).
318 ``backgrounds``
319 List of backgrounds (`list`).
320 """
321 if self.config.forceExactBinning:
322 exposure = self._cropToExactBinning(exposure, patchInfo)
323 if self.config.doScaleVariance:
324 varScale = self.scaleVariance.run(exposure.maskedImage)
325 exposure.getMetadata().add("VARIANCE_SCALE", varScale)
326 backgrounds = afwMath.BackgroundList()
327 table = afwTable.SourceTable.make(self.schema, idFactory)
328 detections = self.detection.run(table, exposure, expId=expId)
329 sources = detections.sources
330 if hasattr(detections, "background") and detections.background:
331 for bg in detections.background:
332 backgrounds.append(bg)
333 if len(backgrounds) == 0:
334 # Persist a constant background with value of NaN to get around
335 # inability to persist empty BackgroundList.
336 emptyBg = self._makeEmptyBackground(exposure, patchInfo)
337 backgrounds.append(emptyBg)
338
339 return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
340
341 def _cropToExactBinning(self, exposure, patchInfo):
342 """Crop a coadd `~lsst.afw.image.Exposure` instance to ensure exact
343 background binning.
344
345 Parameters
346 ----------
347 exposure : `lsst.afw.image.Exposure`
348 Exposure to crop, assumed to cover the patch outer bounding box.
349 patchInfo : `lsst.skymap.PatchInfo`
350 Description of the patch geometry.
351
352 Returns
353 -------
354 cropped : `lsst.afw.image.Exposure`
355 View of ``exposure`` with background bins that evenly divide both
356 the full cropped image and the patch inner region. The bounding
357 box is guaranteed to contain the patch inner bounding box and be
358 contained by the patch outer bounding box.
359
360 Raises
361 ------
362 ValueError
363 Raised if the patch inner region width or height is not a multiple
364 of the background bin size.
365 """
366 bbox = patchInfo.getInnerBBox()
367 if bbox.width % self.detection.background.binSizeX:
368 raise ValueError(
369 f"Patch inner width {bbox.width} does not evenly "
370 f"divide bin width {self.detection.background.binSizeX}."
371 )
372 if bbox.height % self.detection.background.binSizeY:
373 raise ValueError(
374 f"Patch inner height {bbox.height} does not evenly "
375 f"divide bin height {self.detection.background.binSizeY}."
376 )
377 outer_bbox = patchInfo.getOuterBBox()
378 n_bins_grow_x = (bbox.x.begin - outer_bbox.x.begin) // self.detection.background.binSizeX
379 n_bins_grow_y = (bbox.y.begin - outer_bbox.y.begin) // self.detection.background.binSizeY
380 bbox.grow(
381 Extent2I(
382 n_bins_grow_x*self.detection.background.binSizeX,
383 n_bins_grow_y*self.detection.background.binSizeY,
384 )
385 )
386 assert outer_bbox.contains(bbox)
387 assert bbox.contains(patchInfo.getInnerBBox())
388 assert bbox.width % self.detection.background.binSizeX == 0
389 assert bbox.height % self.detection.background.binSizeY == 0
390 return exposure[bbox]
391
392 def _makeEmptyBackground(self, exposure, patchInfo=None):
393 """Construct an empty `lsst.afw.math.BackgroundList` with NaN values.
394
395 Parameters
396 ----------
397 exposure : `lsst.afw.image.Exposure`
398 Exposure that the background should correspond to.
399 patchInfo : `lsst.skymap.PatchInfo`, optional
400 Description of the patch geometry. Only needed if
401 `~DetectCoaddSourceConfig.forceExactBinning` is `True`.
402
403 Returns
404 -------
405 background : `lsst.afw.math.BackgroundList`
406 A background object with a single layer and the same bin geometry
407 that a background for that exposure would have had if it had enough
408 usable pixels. This object cannot actually be used for background
409 subtraction.
410 """
411 # Create a backgroundList with one entry whose "stats image" is NaNs
412 # and has all pixels set as NO_DATA.
413 if self.config.forceExactBinning:
414 exposure = self._cropToExactBinning(exposure, patchInfo).clone()
415
416 bgLevel = np.nan
417 bgStats = afwImage.MaskedImageF(1, 1)
418 bgStats.set(bgLevel, 0, bgLevel)
419 bg = afwMath.BackgroundMI(exposure.getBBox(), bgStats)
420 bgData = (bg, afwMath.Interpolate.LINEAR, afwMath.REDUCE_INTERP_ORDER,
421 afwMath.ApproximateControl.UNKNOWN, 0, 0, False)
422 background = afwMath.BackgroundList()
423 background.append(bgData)
424 for bg, *_ in background:
425 stats = bg.getStatsImage()
426 stats.mask.array[:, :] = stats.mask.getPlaneBitMask("NO_DATA")
427 stats.variance.array[:, :] = 0.0
428 return background
429
430
431class MeasureMergedCoaddSourcesConnections(
432 PipelineTaskConnections,
433 dimensions=("tract", "patch", "band", "skymap"),
434 defaultTemplates={
435 "inputCoaddName": "deep",
436 "outputCoaddName": "deep",
437 "deblendedCatalog": "deblendedFlux",
438 },
439 deprecatedTemplates={
440 # TODO[DM-47797]: remove this deprecated connection template.
441 "deblendedCatalog": "Support for old deblender outputs will be removed after v29."
442 },
443):
444 inputSchema = cT.InitInput(
445 doc="Input schema for measure merged task produced by a deblender or detection task",
446 name="{inputCoaddName}Coadd_deblendedFlux_schema",
447 storageClass="SourceCatalog"
448 )
449 outputSchema = cT.InitOutput(
450 doc="Output schema after all new fields are added by task",
451 name="{inputCoaddName}Coadd_meas_schema",
452 storageClass="SourceCatalog"
453 )
454 # TODO[DM-47797]: remove this deprecated connection.
455 refCat = cT.PrerequisiteInput(
456 doc="Reference catalog used to match measured sources against known sources",
457 name="ref_cat",
458 storageClass="SimpleCatalog",
459 dimensions=("skypix",),
460 deferLoad=True,
461 multiple=True,
462 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
463 )
464 exposure = cT.Input(
465 doc="Input non-cell-based coadd image",
466 name="{inputCoaddName}Coadd_calexp",
467 storageClass="ExposureF",
468 dimensions=("tract", "patch", "band", "skymap")
469 )
470 exposure_cells = cT.Input(
471 doc="Input cell-based coadd image",
472 name="{inputCoaddName}CoaddCell",
473 storageClass="MultipleCellCoadd",
474 dimensions=("tract", "patch", "band", "skymap"),
475 )
476 background = cT.Input(
477 doc="Background to subtract from cell-based coadd image",
478 name="{inputCoaddName}Coadd_calexp_background",
479 storageClass="Background",
480 dimensions=("tract", "patch", "band", "skymap")
481 )
482 skyMap = cT.Input(
483 doc="SkyMap to use in processing",
484 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
485 storageClass="SkyMap",
486 dimensions=("skymap",),
487 )
488 # TODO[DM-47424]: remove this deprecated connection.
489 visitCatalogs = cT.Input(
490 doc="Deprecated and unused.",
491 name="src",
492 dimensions=("instrument", "visit", "detector"),
493 storageClass="SourceCatalog",
494 multiple=True,
495 deprecated="Deprecated and unused. Will be removed after v29.",
496 )
497 sourceTableHandles = cT.Input(
498 doc=("Source tables that are derived from the ``CalibrateTask`` sources. "
499 "These tables contain astrometry and photometry flags, and optionally "
500 "PSF flags."),
501 name="sourceTable_visit",
502 storageClass="ArrowAstropy",
503 dimensions=("instrument", "visit"),
504 multiple=True,
505 deferLoad=True,
506 )
507 finalizedSourceTableHandles = cT.Input(
508 doc=("Finalized source tables from ``FinalizeCalibrationTask``. These "
509 "tables contain PSF flags from the finalized PSF estimation."),
510 name="finalized_src_table",
511 storageClass="ArrowAstropy",
512 dimensions=("instrument", "visit"),
513 multiple=True,
514 deferLoad=True,
515 )
516 finalVisitSummaryHandles = cT.Input(
517 doc="Final visit summary table",
518 name="finalVisitSummary",
519 storageClass="ExposureCatalog",
520 dimensions=("instrument", "visit"),
521 multiple=True,
522 deferLoad=True,
523 )
524 # TODO[DM-47797]: remove this deprecated connection.
525 inputCatalog = cT.Input(
526 doc=("Name of the input catalog to use."
527 "If the single band deblender was used this should be 'deblendedFlux."
528 "If the multi-band deblender was used this should be 'deblendedModel, "
529 "or deblendedFlux if the multiband deblender was configured to output "
530 "deblended flux catalogs. If no deblending was performed this should "
531 "be 'mergeDet'"),
532 name="{inputCoaddName}Coadd_{deblendedCatalog}",
533 storageClass="SourceCatalog",
534 deprecated="Support for old deblender outputs will be removed after v29.",
535 dimensions=("tract", "patch", "band", "skymap"),
536 )
537 scarletCatalog = cT.Input(
538 doc="Catalogs produced by multiband deblending",
539 name="{inputCoaddName}Coadd_deblendedCatalog",
540 storageClass="SourceCatalog",
541 dimensions=("tract", "patch", "skymap"),
542 )
543 scarletModels = cT.Input(
544 doc="Multiband scarlet models produced by the deblender",
545 name="{inputCoaddName}Coadd_scarletModelData",
546 storageClass="LsstScarletModelData",
547 dimensions=("tract", "patch", "skymap"),
548 )
549 outputSources = cT.Output(
550 doc="Source catalog containing all the measurement information generated in this task",
551 name="{outputCoaddName}Coadd_meas",
552 dimensions=("tract", "patch", "band", "skymap"),
553 storageClass="SourceCatalog",
554 )
555 # TODO[DM-47797]: remove this deprecated connection.
556 matchResult = cT.Output(
557 doc="Match catalog produced by configured matcher, optional on doMatchSources",
558 name="{outputCoaddName}Coadd_measMatch",
559 dimensions=("tract", "patch", "band", "skymap"),
560 storageClass="Catalog",
561 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
562 )
563 # TODO[DM-47797]: remove this deprecated connection.
564 denormMatches = cT.Output(
565 doc="Denormalized Match catalog produced by configured matcher, optional on "
566 "doWriteMatchesDenormalized",
567 name="{outputCoaddName}Coadd_measMatchFull",
568 dimensions=("tract", "patch", "band", "skymap"),
569 storageClass="Catalog",
570 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
571 )
572
573 def __init__(self, *, config=None):
574 super().__init__(config=config)
575 del self.visitCatalogs
576 if not config.doPropagateFlags:
577 del self.sourceTableHandles
578 del self.finalizedSourceTableHandles
579 else:
580 # Check for types of flags required.
581 if not config.propagateFlags.source_flags:
582 del self.sourceTableHandles
583 if not config.propagateFlags.finalized_source_flags:
584 del self.finalizedSourceTableHandles
585 # TODO[DM-47797]: only the 'if' block contents here should survive.
586 if config.inputCatalog == "deblendedCatalog":
587 del self.inputCatalog
588 if not config.doAddFootprints:
589 del self.scarletModels
590 else:
591 del self.deblendedCatalog
592 del self.scarletModels
593
594 # TODO[DM-47797]: delete the conditionals below.
595 if not config.doMatchSources:
596 del self.refCat
597 del self.matchResult
598
599 if not config.doWriteMatchesDenormalized:
600 del self.denormMatches
601
602 if config.useCellCoadds:
603 del self.exposure
604 else:
605 del self.exposure_cells
606 del self.background
607
608
609class MeasureMergedCoaddSourcesConfig(PipelineTaskConfig,
610 pipelineConnections=MeasureMergedCoaddSourcesConnections):
611 """Configuration parameters for the MeasureMergedCoaddSourcesTask
612 """
613 inputCatalog = ChoiceField(
614 dtype=str,
615 default="deblendedCatalog",
616 allowed={
617 "deblendedCatalog": "Output catalog from ScarletDeblendTask",
618 "deblendedFlux": "Output catalog from SourceDeblendTask",
619 "mergeDet": "The merged detections before deblending."
620 },
621 doc="The name of the input catalog.",
622 # TODO[DM-47797]: remove this config option and anything using it.
623 deprecated="Support for old deblender outputs will be removed after v29.",
624 )
625 doAddFootprints = Field(dtype=bool,
626 default=True,
627 doc="Whether or not to add footprints to the input catalog from scarlet models. "
628 "This should be true whenever using the multi-band deblender, "
629 "otherwise this should be False.")
630 doConserveFlux = Field(dtype=bool, default=True,
631 doc="Whether to use the deblender models as templates to re-distribute the flux "
632 "from the 'exposure' (True), or to perform measurements on the deblender "
633 "model footprints.")
634 doStripFootprints = Field(dtype=bool, default=True,
635 doc="Whether to strip footprints from the output catalog before "
636 "saving to disk. "
637 "This is usually done when using scarlet models to save disk space.")
638 useCellCoadds = Field(dtype=bool, default=False, doc="Whether to use cell coadds?")
639 measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc="Source measurement")
640 setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc="Set flags for primary tract/patch")
641 doPropagateFlags = Field(
642 dtype=bool, default=True,
643 doc="Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)"
644 )
645 propagateFlags = ConfigurableField(target=PropagateSourceFlagsTask, doc="Propagate source flags to coadd")
646 doMatchSources = Field(
647 dtype=bool,
648 default=False,
649 doc="Match sources to reference catalog?",
650 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
651 )
652 match = ConfigurableField(
653 target=DirectMatchTask,
654 doc="Matching to reference catalog",
655 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
656 )
657 doWriteMatchesDenormalized = Field(
658 dtype=bool,
659 default=False,
660 doc=("Write reference matches in denormalized format? "
661 "This format uses more disk space, but is more convenient to read."),
662 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
663 )
664 coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
665 psfCache = Field(dtype=int, default=100, doc="Size of psfCache")
666 checkUnitsParseStrict = Field(
667 doc="Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
668 dtype=str,
669 default="raise",
670 )
671 doApCorr = Field(
672 dtype=bool,
673 default=True,
674 doc="Apply aperture corrections"
675 )
676 applyApCorr = ConfigurableField(
677 target=ApplyApCorrTask,
678 doc="Subtask to apply aperture corrections"
679 )
680 doRunCatalogCalculation = Field(
681 dtype=bool,
682 default=True,
683 doc='Run catalogCalculation task'
684 )
685 catalogCalculation = ConfigurableField(
686 target=CatalogCalculationTask,
687 doc="Subtask to run catalogCalculation plugins on catalog"
688 )
689
690 hasFakes = Field(
691 dtype=bool,
692 default=False,
693 doc="Should be set to True if fake sources have been inserted into the input data."
694 )
695 idGenerator = SkyMapIdGeneratorConfig.make_field()
696
697 @property
698 def refObjLoader(self):
699 return self.match.refObjLoader
700
701 def setDefaults(self):
702 super().setDefaults()
703 self.measurement.plugins.names |= ['base_InputCount',
704 'base_Variance',
705 'base_LocalPhotoCalib',
706 'base_LocalWcs']
707
708 # TODO: Remove STREAK in DM-44658, streak masking to happen only in
709 # ip_diffim; if we can propagate the streak mask from diffim, we can
710 # still set flags with it here.
711 self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['CLIPPED', 'SENSOR_EDGE',
712 'INEXACT_PSF']
713 self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['CLIPPED', 'SENSOR_EDGE',
714 'INEXACT_PSF']
715
716 def validate(self):
717 super().validate()
718
719 if not self.doMatchSources and self.doWriteMatchesDenormalized:
720 raise ValueError("Cannot set doWriteMatchesDenormalized if doMatchSources is False.")
721
722
723class MeasureMergedCoaddSourcesTask(PipelineTask):
724 """Deblend sources from main catalog in each coadd seperately and measure.
725
726 Use peaks and footprints from a master catalog to perform deblending and
727 measurement in each coadd.
728
729 Given a master input catalog of sources (peaks and footprints) or deblender
730 outputs(including a HeavyFootprint in each band), measure each source on
731 the coadd. Repeating this procedure with the same master catalog across
732 multiple coadds will generate a consistent set of child sources.
733
734 The deblender retains all peaks and deblends any missing peaks (dropouts in
735 that band) as PSFs. Source properties are measured and the @c is-primary
736 flag (indicating sources with no children) is set. Visit flags are
737 propagated to the coadd sources.
738
739 Optionally, we can match the coadd sources to an external reference
740 catalog.
741
742 After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we
743 have a set of per-band catalogs. The next stage in the multi-band
744 processing procedure will merge these measurements into a suitable catalog
745 for driving forced photometry.
746
747 Parameters
748 ----------
749 schema : ``lsst.afw.table.Schema`, optional
750 The schema of the merged detection catalog used as input to this one.
751 peakSchema : ``lsst.afw.table.Schema`, optional
752 The schema of the PeakRecords in the Footprints in the merged detection catalog.
753 refObjLoader : `lsst.meas.algorithms.ReferenceObjectLoader`, optional
754 An instance of ReferenceObjectLoader that supplies an external reference
755 catalog. May be None if the loader can be constructed from the butler argument or all steps
756 requiring a reference catalog are disabled.
757 initInputs : `dict`, optional
758 Dictionary that can contain a key ``inputSchema`` containing the
759 input schema. If present will override the value of ``schema``.
760 **kwargs
761 Additional keyword arguments.
762 """
763
764 _DefaultName = "measureCoaddSources"
765 ConfigClass = MeasureMergedCoaddSourcesConfig
766
767 def __init__(self, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
768 **kwargs):
769 super().__init__(**kwargs)
770 self.deblended = self.config.inputCatalog.startswith("deblended")
771 self.inputCatalog = "Coadd_" + self.config.inputCatalog
772 if initInputs is not None:
773 schema = initInputs['inputSchema'].schema
774 if schema is None:
775 raise ValueError("Schema must be defined.")
776 self.schemaMapper = afwTable.SchemaMapper(schema)
777 self.schemaMapper.addMinimalSchema(schema)
778 self.schema = self.schemaMapper.getOutputSchema()
779 self.algMetadata = PropertyList()
780 self.makeSubtask("measurement", schema=self.schema, algMetadata=self.algMetadata)
781 self.makeSubtask("setPrimaryFlags", schema=self.schema)
782 # TODO[DM-47797]: remove match subtask
783 if self.config.doMatchSources:
784 self.makeSubtask("match", refObjLoader=refObjLoader)
785 if self.config.doPropagateFlags:
786 self.makeSubtask("propagateFlags", schema=self.schema)
787 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
788 if self.config.doApCorr:
789 self.makeSubtask("applyApCorr", schema=self.schema)
790 if self.config.doRunCatalogCalculation:
791 self.makeSubtask("catalogCalculation", schema=self.schema)
792
793 self.outputSchema = afwTable.SourceCatalog(self.schema)
794
795 def runQuantum(self, butlerQC, inputRefs, outputRefs):
796 inputs = butlerQC.get(inputRefs)
797
798 # TODO[DM-47797]: remove this block
799 if self.config.doMatchSources:
800 refObjLoader = ReferenceObjectLoader([ref.datasetRef.dataId for ref in inputRefs.refCat],
801 inputs.pop('refCat'),
802 name=self.config.connections.refCat,
803 config=self.config.refObjLoader,
804 log=self.log)
805 self.match.setRefObjLoader(refObjLoader)
806
807 if self.config.useCellCoadds:
808 multiple_cell_coadd = inputs.pop("exposure_cells")
809 stitched_coadd = multiple_cell_coadd.stitch()
810 exposure = stitched_coadd.asExposure()
811 background = inputs.pop("background")
812 exposure.image -= background.getImage()
813
814 ccdInputs = stitched_coadd.ccds
815 apCorrMap = stitched_coadd.ap_corr_map
816 band = inputRefs.exposure_cells.dataId["band"]
817 else:
818 exposure = inputs.pop("exposure")
819 # Set psfcache
820 # move this to run after gen2 deprecation
821 exposure.getPsf().setCacheCapacity(self.config.psfCache)
822
823 ccdInputs = exposure.getInfo().getCoaddInputs().ccds
824 apCorrMap = exposure.getInfo().getApCorrMap()
825 band = inputRefs.exposure.dataId["band"]
826
827 # Get unique integer ID for IdFactory and RNG seeds; only the latter
828 # should really be used as the IDs all come from the input catalog.
829 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
830
831 # Transform inputCatalog
832 table = afwTable.SourceTable.make(self.schema, idGenerator.make_table_id_factory())
833 sources = afwTable.SourceCatalog(table)
834 # Load the correct input catalog
835 if "scarletCatalog" in inputs:
836 inputCatalog = inputs.pop("scarletCatalog")
837 catalogRef = inputRefs.scarletCatalog
838 else:
839 inputCatalog = inputs.pop("inputCatalog")
840 catalogRef = inputRefs.inputCatalog
841 sources.extend(inputCatalog, self.schemaMapper)
842 del inputCatalog
843 # Add the HeavyFootprints to the deblended sources
844 if self.config.doAddFootprints:
845 modelData = inputs.pop('scarletModels')
846 if self.config.doConserveFlux:
847 imageForRedistribution = exposure
848 else:
849 imageForRedistribution = None
850 updateCatalogFootprints(
851 modelData=modelData,
852 catalog=sources,
853 band=band,
854 imageForRedistribution=imageForRedistribution,
855 removeScarletData=True,
856 updateFluxColumns=True,
857 )
858 table = sources.getTable()
859 table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog.
860
861 skyMap = inputs.pop('skyMap')
862 tractNumber = catalogRef.dataId['tract']
863 tractInfo = skyMap[tractNumber]
864 patchInfo = tractInfo.getPatchInfo(catalogRef.dataId['patch'])
865 skyInfo = Struct(
866 skyMap=skyMap,
867 tractInfo=tractInfo,
868 patchInfo=patchInfo,
869 wcs=tractInfo.getWcs(),
870 bbox=patchInfo.getOuterBBox()
871 )
872
873 if self.config.doPropagateFlags:
874 if "sourceTableHandles" in inputs:
875 sourceTableHandles = inputs.pop("sourceTableHandles")
876 sourceTableHandleDict = {handle.dataId["visit"]: handle for handle in sourceTableHandles}
877 else:
878 sourceTableHandleDict = None
879 if "finalizedSourceTableHandles" in inputs:
880 finalizedSourceTableHandles = inputs.pop("finalizedSourceTableHandles")
881 finalizedSourceTableHandleDict = {handle.dataId["visit"]: handle
882 for handle in finalizedSourceTableHandles}
883 else:
884 finalizedSourceTableHandleDict = None
885 if "finalVisitSummaryHandles" in inputs:
886 finalVisitSummaryHandles = inputs.pop("finalVisitSummaryHandles")
887 finalVisitSummaryHandleDict = {handle.dataId["visit"]: handle
888 for handle in finalVisitSummaryHandles}
889 else:
890 finalVisitSummaryHandleDict = None
891
892 assert not inputs, "runQuantum got more inputs than expected."
893 outputs = self.run(
894 exposure=exposure,
895 sources=sources,
896 skyInfo=skyInfo,
897 exposureId=idGenerator.catalog_id,
898 ccdInputs=ccdInputs,
899 sourceTableHandleDict=sourceTableHandleDict,
900 finalizedSourceTableHandleDict=finalizedSourceTableHandleDict,
901 finalVisitSummaryHandleDict=finalVisitSummaryHandleDict,
902 apCorrMap=apCorrMap,
903 )
904 # Strip HeavyFootprints to save space on disk
905 if self.config.doStripFootprints:
906 sources = outputs.outputSources
907 for source in sources[sources["parent"] != 0]:
908 source.setFootprint(None)
909 butlerQC.put(outputs, outputRefs)
910
911 def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
912 sourceTableHandleDict=None, finalizedSourceTableHandleDict=None, finalVisitSummaryHandleDict=None,
913 apCorrMap=None):
914 """Run measurement algorithms on the input exposure, and optionally populate the
915 resulting catalog with extra information.
916
917 Parameters
918 ----------
919 exposure : `lsst.afw.exposure.Exposure`
920 The input exposure on which measurements are to be performed.
921 sources : `lsst.afw.table.SourceCatalog`
922 A catalog built from the results of merged detections, or
923 deblender outputs.
924 parentCatalog : `lsst.afw.table.SourceCatalog`
925 Catalog of parent sources corresponding to sources.
926 skyInfo : `lsst.pipe.base.Struct`
927 A struct containing information about the position of the input exposure within
928 a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box.
929 exposureId : `int` or `bytes`
930 Packed unique number or bytes unique to the input exposure.
931 ccdInputs : `lsst.afw.table.ExposureCatalog`, optional
932 Catalog containing information on the individual visits which went into making
933 the coadd.
934 sourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
935 Dict for sourceTable_visit handles (key is visit) for propagating flags.
936 These tables contain astrometry and photometry flags, and optionally PSF flags.
937 finalizedSourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
938 Dict for finalized_src_table handles (key is visit) for propagating flags.
939 These tables contain PSF flags from the finalized PSF estimation.
940 finalVisitSummaryHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
941 Dict for visit_summary handles (key is visit) for visit-level information.
942 These tables contain the WCS information of the single-visit input images.
943 apCorrMap : `lsst.afw.image.ApCorrMap`, optional
944 Aperture correction map attached to the ``exposure``. If None, it
945 will be read from the ``exposure``.
946
947 Returns
948 -------
949 results : `lsst.pipe.base.Struct`
950 Results of running measurement task. Will contain the catalog in the
951 sources attribute. Optionally will have results of matching to a
952 reference catalog in the matchResults attribute, and denormalized
953 matches in the denormMatches attribute.
954 """
955 if self.config.doPropagateFlags:
956 # These mask planes may not be defined on the coadds always.
957 # We add the mask planes, which is a no-op if already defined.
958 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpAnywhere:
959 exposure.mask.addMaskPlane(maskPlane)
960 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpCenter:
961 exposure.mask.addMaskPlane(maskPlane)
962
963 self.measurement.run(sources, exposure, exposureId=exposureId)
964
965 if self.config.doApCorr:
966 if apCorrMap is None:
967 apCorrMap = exposure.getInfo().getApCorrMap()
968 self.applyApCorr.run(
969 catalog=sources,
970 apCorrMap=apCorrMap,
971 )
972
973 # TODO DM-11568: this contiguous check-and-copy could go away if we
974 # reserve enough space during SourceDetection and/or SourceDeblend.
975 # NOTE: sourceSelectors require contiguous catalogs, so ensure
976 # contiguity now, so views are preserved from here on.
977 if not sources.isContiguous():
978 sources = sources.copy(deep=True)
979
980 if self.config.doRunCatalogCalculation:
981 self.catalogCalculation.run(sources)
982
983 self.setPrimaryFlags.run(sources, skyMap=skyInfo.skyMap, tractInfo=skyInfo.tractInfo,
984 patchInfo=skyInfo.patchInfo)
985 if self.config.doPropagateFlags:
986 self.propagateFlags.run(
987 sources,
988 ccdInputs,
989 sourceTableHandleDict,
990 finalizedSourceTableHandleDict,
991 finalVisitSummaryHandleDict,
992 )
993
994 results = Struct()
995
996 # TODO[DM-47797]: remove this block
997 if self.config.doMatchSources:
998 matchResult = self.match.run(sources, exposure.getInfo().getFilter().bandLabel)
999 matches = afwTable.packMatches(matchResult.matches)
1000 matches.table.setMetadata(matchResult.matchMeta)
1001 results.matchResult = matches
1002 if self.config.doWriteMatchesDenormalized:
1003 if matchResult.matches:
1004 denormMatches = denormalizeMatches(matchResult.matches, matchResult.matchMeta)
1005 else:
1006 self.log.warning("No matches, so generating dummy denormalized matches file")
1007 denormMatches = afwTable.BaseCatalog(afwTable.Schema())
1008 denormMatches.setMetadata(PropertyList())
1009 denormMatches.getMetadata().add("COMMENT",
1010 "This catalog is empty because no matches were found.")
1011 results.denormMatches = denormMatches
1012 results.denormMatches = denormMatches
1013
1014 results.outputSources = sources
1015 return results