22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask",
37 "ComputeColumnsAction",
"ModelExtendednessColumnAction",
40from collections
import defaultdict
42from deprecated.sphinx
import deprecated
47from typing
import Iterable
53from numpy.typing
import NDArray
57import lsst.pipe.base
as pipeBase
59from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
61from lsst.pipe.base
import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
64from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
65from lsst.obs.base.utils
import strip_provenance_from_fits_header, TableVStack
68from .coaddBase
import reorderRefs
69from .functors
import CompositeFunctor, Column
70from .schemaUtils
import convertDataFrameToSdmSchema, readSdmSchemaFile
72log = logging.getLogger(__name__)
75def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
76 """Flattens a dataframe with multilevel column index.
78 newDf = pd.DataFrame()
80 dfBands = df.columns.unique(level=0).values
83 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
84 newColumns = {c: columnFormat.format(band, c)
85 for c
in subdf.columns
if c
not in noDupCols}
86 cols = list(newColumns.keys())
87 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
90 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
92 noDupDf = df[presentBands[0]][noDupCols]
93 newDf = pd.concat([noDupDf, newDf], axis=1)
98 defaultTemplates={
"coaddName":
"deep"},
99 dimensions=(
"tract",
"patch",
"skymap")):
100 inputCatalogMeas = connectionTypes.Input(
101 doc=
"Catalog of source measurements on the deepCoadd.",
102 dimensions=(
"tract",
"patch",
"band",
"skymap"),
103 storageClass=
"SourceCatalog",
104 name=
"{coaddName}Coadd_meas",
107 inputCatalogForcedSrc = connectionTypes.Input(
108 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
109 dimensions=(
"tract",
"patch",
"band",
"skymap"),
110 storageClass=
"SourceCatalog",
111 name=
"{coaddName}Coadd_forced_src",
114 inputCatalogPsfsMultiprofit = connectionTypes.Input(
115 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
116 dimensions=(
"tract",
"patch",
"band",
"skymap"),
117 storageClass=
"ArrowAstropy",
118 name=
"{coaddName}Coadd_psfs_multiprofit",
121 outputCatalog = connectionTypes.Output(
122 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
123 "stored as a DataFrame with a multi-level column index per-patch.",
124 dimensions=(
"tract",
"patch",
"skymap"),
125 storageClass=
"DataFrame",
126 name=
"{coaddName}Coadd_obj"
130class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
131 pipelineConnections=WriteObjectTableConnections):
132 coaddName = pexConfig.Field(
139class WriteObjectTableTask(pipeBase.PipelineTask):
140 """Write filter-merged object tables as a DataFrame in parquet format.
142 _DefaultName =
"writeObjectTable"
143 ConfigClass = WriteObjectTableConfig
146 outputDataset =
"obj"
148 def runQuantum(self, butlerQC, inputRefs, outputRefs):
149 inputs = butlerQC.get(inputRefs)
151 catalogs = defaultdict(dict)
152 for dataset, connection
in (
153 (
"meas",
"inputCatalogMeas"),
154 (
"forced_src",
"inputCatalogForcedSrc"),
155 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
157 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
158 catalogs[ref.dataId[
"band"]][dataset] = cat
160 dataId = butlerQC.quantum.dataId
161 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
162 outputs = pipeBase.Struct(outputCatalog=df)
163 butlerQC.put(outputs, outputRefs)
165 def run(self, catalogs, tract, patch):
166 """Merge multiple catalogs.
171 Mapping from filter names to dict of catalogs.
173 tractId to use for the tractId column.
175 patchId to use for the patchId column.
179 catalog : `pandas.DataFrame`
185 Raised if any of the catalogs is of an unsupported type.
188 for filt, tableDict
in catalogs.items():
189 for dataset, table
in tableDict.items():
191 if isinstance(table, pd.DataFrame):
193 elif isinstance(table, afwTable.SourceCatalog):
194 df = table.asAstropy().to_pandas()
195 elif isinstance(table, astropy.table.Table):
196 df = table.to_pandas()
198 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
199 df.set_index(
"id", drop=
True, inplace=
True)
202 df = df.reindex(sorted(df.columns), axis=1)
203 df = df.assign(tractId=tract, patchId=patch)
206 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
207 names=(
"dataset",
"band",
"column"))
212 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
216class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
217 defaultTemplates={
"catalogType":
""},
218 dimensions=(
"instrument",
"visit",
"detector")):
220 catalog = connectionTypes.Input(
221 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
222 name=
"{catalogType}src",
223 storageClass=
"SourceCatalog",
224 dimensions=(
"instrument",
"visit",
"detector")
226 outputCatalog = connectionTypes.Output(
227 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
228 name=
"{catalogType}source",
229 storageClass=
"ArrowAstropy",
230 dimensions=(
"instrument",
"visit",
"detector")
234class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
235 pipelineConnections=WriteSourceTableConnections):
239class WriteSourceTableTask(pipeBase.PipelineTask):
240 """Write source table to DataFrame Parquet format.
242 _DefaultName =
"writeSourceTable"
243 ConfigClass = WriteSourceTableConfig
245 def runQuantum(self, butlerQC, inputRefs, outputRefs):
246 inputs = butlerQC.get(inputRefs)
247 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
248 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
249 result = self.run(**inputs)
250 outputs = pipeBase.Struct(outputCatalog=result.table)
251 butlerQC.put(outputs, outputRefs)
253 def run(self, catalog, visit, detector, **kwargs):
254 """Convert `src` catalog to an Astropy table.
258 catalog: `afwTable.SourceCatalog`
259 catalog to be converted
260 visit, detector: `int`
261 Visit and detector ids to be added as columns.
263 Additional keyword arguments are ignored as a convenience for
264 subclasses that pass the same arguments to several different
269 result : `~lsst.pipe.base.Struct`
271 `astropy.table.Table` version of the input catalog
273 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
274 tbl = catalog.asAstropy()
277 tbl[
"detector"] = np.int16(detector)
279 return pipeBase.Struct(table=tbl)
282class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
283 defaultTemplates={
"catalogType":
""},
284 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
285 visitSummary = connectionTypes.Input(
286 doc=
"Input visit-summary catalog with updated calibration objects.",
287 name=
"finalVisitSummary",
288 storageClass=
"ExposureCatalog",
289 dimensions=(
"instrument",
"visit",),
300 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
303class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
304 pipelineConnections=WriteRecalibratedSourceTableConnections):
306 doReevaluatePhotoCalib = pexConfig.Field(
309 doc=(
"Add or replace local photoCalib columns"),
311 doReevaluateSkyWcs = pexConfig.Field(
314 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
318class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
319 """Write source table to DataFrame Parquet format.
321 _DefaultName =
"writeRecalibratedSourceTable"
322 ConfigClass = WriteRecalibratedSourceTableConfig
324 def runQuantum(self, butlerQC, inputRefs, outputRefs):
325 inputs = butlerQC.get(inputRefs)
327 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
328 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
330 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
331 exposure = ExposureF()
332 inputs[
"exposure"] = self.prepareCalibratedExposure(
334 visitSummary=inputs[
"visitSummary"],
335 detectorId=butlerQC.quantum.dataId[
"detector"]
337 inputs[
"catalog"] = self.addCalibColumns(**inputs)
339 result = self.run(**inputs)
340 outputs = pipeBase.Struct(outputCatalog=result.table)
341 butlerQC.put(outputs, outputRefs)
343 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
344 """Prepare a calibrated exposure and apply external calibrations
349 exposure : `lsst.afw.image.exposure.Exposure`
350 Input exposure to adjust calibrations. May be an empty Exposure.
352 Detector ID associated with the exposure.
353 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
354 Exposure catalog with all calibration objects. WCS and PhotoCalib
355 are always applied if ``visitSummary`` is provided and those
356 components are not `None`.
360 exposure : `lsst.afw.image.exposure.Exposure`
361 Exposure with adjusted calibrations.
363 if visitSummary
is not None:
364 row = visitSummary.find(detectorId)
366 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
367 if (photoCalib := row.getPhotoCalib())
is None:
368 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
369 "skipping reevaluation of photoCalib.", detectorId)
370 exposure.setPhotoCalib(
None)
372 exposure.setPhotoCalib(photoCalib)
373 if (skyWcs := row.getWcs())
is None:
374 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
375 "skipping reevaluation of skyWcs.", detectorId)
376 exposure.setWcs(
None)
378 exposure.setWcs(skyWcs)
382 def addCalibColumns(self, catalog, exposure, **kwargs):
383 """Add replace columns with calibs evaluated at each centroid
385 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
386 a source catalog, by rerunning the plugins.
390 catalog : `lsst.afw.table.SourceCatalog`
391 catalog to which calib columns will be added
392 exposure : `lsst.afw.image.exposure.Exposure`
393 Exposure with attached PhotoCalibs and SkyWcs attributes to be
394 reevaluated at local centroids. Pixels are not required.
396 Additional keyword arguments are ignored to facilitate passing the
397 same arguments to several methods.
401 newCat: `lsst.afw.table.SourceCatalog`
402 Source Catalog with requested local calib columns
404 measureConfig = SingleFrameMeasurementTask.ConfigClass()
405 measureConfig.doReplaceWithNoise =
False
408 for slot
in measureConfig.slots:
409 setattr(measureConfig.slots, slot,
None)
411 measureConfig.plugins.names = []
412 if self.config.doReevaluateSkyWcs:
413 measureConfig.plugins.names.add(
"base_LocalWcs")
414 self.log.info(
"Re-evaluating base_LocalWcs plugin")
415 if self.config.doReevaluatePhotoCalib:
416 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
417 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
418 pluginsNotToCopy = tuple(measureConfig.plugins.names)
422 aliasMap = catalog.schema.getAliasMap()
423 mapper = afwTable.SchemaMapper(catalog.schema)
424 for item
in catalog.schema:
425 if not item.field.getName().startswith(pluginsNotToCopy):
426 mapper.addMapping(item.key)
428 schema = mapper.getOutputSchema()
429 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
430 schema.setAliasMap(aliasMap)
431 newCat = afwTable.SourceCatalog(schema)
432 newCat.extend(catalog, mapper=mapper)
438 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
439 afwTable.updateSourceCoords(exposure.wcs, newCat)
440 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
444 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
445 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
450 if wcsPlugin
is not None:
451 wcsPlugin.measure(row, exposure)
452 if pcPlugin
is not None:
453 pcPlugin.measure(row, exposure)
458class PostprocessAnalysis(object):
459 """Calculate columns from DataFrames or handles storing DataFrames.
461 This object manages and organizes an arbitrary set of computations
462 on a catalog. The catalog is defined by a
463 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
464 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
465 computations are defined by a collection of
466 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
467 ``CompositeFunctor``).
469 After the object is initialized, accessing the ``.df`` attribute (which
470 holds the `pandas.DataFrame` containing the results of the calculations)
471 triggers computation of said dataframe.
473 One of the conveniences of using this object is the ability to define a
474 desired common filter for all functors. This enables the same functor
475 collection to be passed to several different `PostprocessAnalysis` objects
476 without having to change the original functor collection, since the ``filt``
477 keyword argument of this object triggers an overwrite of the ``filt``
478 property for all functors in the collection.
480 This object also allows a list of refFlags to be passed, and defines a set
481 of default refFlags that are always included even if not requested.
483 If a list of DataFrames or Handles is passed, rather than a single one,
484 then the calculations will be mapped over all the input catalogs. In
485 principle, it should be straightforward to parallelize this activity, but
486 initial tests have failed (see TODO in code comments).
490 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
491 `~lsst.pipe.base.InMemoryDatasetHandle` or
493 Source catalog(s) for computation.
494 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
495 Computations to do (functors that act on ``handles``).
496 If a dict, the output
497 DataFrame will have columns keyed accordingly.
498 If a list, the column keys will come from the
499 ``.shortname`` attribute of each functor.
501 filt : `str`, optional
502 Filter in which to calculate. If provided,
503 this will overwrite any existing ``.filt`` attribute
504 of the provided functors.
506 flags : `list`, optional
507 List of flags (per-band) to include in output table.
508 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
510 refFlags : `list`, optional
511 List of refFlags (only reference band) to include in output table.
513 forcedFlags : `list`, optional
514 List of flags (per-band) to include in output table.
515 Taken from the ``forced_src`` dataset if applied to a
516 multilevel Object Table. Intended for flags from measurement plugins
517 only run during multi-band forced-photometry.
519 _defaultRefFlags = []
522 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
523 self.handles = handles
524 self.functors = functors
527 self.flags = list(flags)
if flags
is not None else []
528 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
529 self.refFlags = list(self._defaultRefFlags)
530 if refFlags
is not None:
531 self.refFlags += list(refFlags)
536 def defaultFuncs(self):
537 funcs = dict(self._defaultFuncs)
542 additionalFuncs = self.defaultFuncs
543 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
544 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
545 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
547 if isinstance(self.functors, CompositeFunctor):
552 func.funcDict.update(additionalFuncs)
553 func.filt = self.filt
559 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
567 def compute(self, dropna=False, pool=None):
569 if type(self.handles)
in (list, tuple):
571 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
575 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
576 self._df = pd.concat(dflist)
578 self._df = self.func(self.handles, dropna=dropna)
583class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
585 """Expected Connections for subclasses of TransformCatalogBaseTask.
589 inputCatalog = connectionTypes.Input(
591 storageClass=
"DataFrame",
593 outputCatalog = connectionTypes.Output(
595 storageClass=
"ArrowAstropy",
599class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
600 pipelineConnections=TransformCatalogBaseConnections):
601 functorFile = pexConfig.Field(
603 doc=
"Path to YAML file specifying Science Data Model functors to use "
604 "when copying columns and computing calibrated values.",
608 primaryKey = pexConfig.Field(
610 doc=
"Name of column to be set as the DataFrame index. If None, the index"
611 "will be named `id`",
615 columnsFromDataId = pexConfig.ListField(
619 doc=
"Columns to extract from the dataId",
623class TransformCatalogBaseTask(pipeBase.PipelineTask):
624 """Base class for transforming/standardizing a catalog by applying functors
625 that convert units and apply calibrations.
627 The purpose of this task is to perform a set of computations on an input
628 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
629 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
630 a new dataset (which needs to be declared in an ``outputDataset``
633 The calculations to be performed are defined in a YAML file that specifies
634 a set of functors to be computed, provided as a ``--functorFile`` config
635 parameter. An example of such a YAML file is the following:
642 args: slot_Centroid_x
645 args: slot_Centroid_y
647 functor: LocalNanojansky
649 - slot_PsfFlux_instFlux
650 - slot_PsfFlux_instFluxErr
651 - base_LocalPhotoCalib
652 - base_LocalPhotoCalibErr
654 functor: LocalNanojanskyErr
656 - slot_PsfFlux_instFlux
657 - slot_PsfFlux_instFluxErr
658 - base_LocalPhotoCalib
659 - base_LocalPhotoCalibErr
663 The names for each entry under "func" will become the names of columns in
664 the output dataset. All the functors referenced are defined in
665 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
666 functor are in the `args` list, and any additional entries for each column
667 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
668 treated as keyword arguments to be passed to the functor initialization.
670 The "flags" entry is the default shortcut for `Column` functors.
671 All columns listed under "flags" will be copied to the output table
672 untransformed. They can be of any datatype.
673 In the special case of transforming a multi-level oject table with
674 band and dataset indices (deepCoadd_obj), these will be taked from the
675 ``meas`` dataset and exploded out per band.
677 There are two special shortcuts that only apply when transforming
678 multi-level Object (deepCoadd_obj) tables:
679 - The "refFlags" entry is shortcut for `Column` functor
680 taken from the ``ref`` dataset if transforming an ObjectTable.
681 - The "forcedFlags" entry is shortcut for `Column` functors.
682 taken from the ``forced_src`` dataset if transforming an ObjectTable.
683 These are expanded out per band.
686 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
687 to organize and excecute the calculations.
690 def _DefaultName(self):
691 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
694 def outputDataset(self):
695 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
698 def inputDataset(self):
699 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
702 def ConfigClass(self):
703 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
705 def __init__(self, *args, **kwargs):
707 if self.config.functorFile:
708 self.log.info(
"Loading tranform functor definitions from %s",
709 self.config.functorFile)
710 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
711 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
715 def runQuantum(self, butlerQC, inputRefs, outputRefs):
716 inputs = butlerQC.get(inputRefs)
717 if self.
funcs is None:
718 raise ValueError(
"config.functorFile is None. "
719 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
720 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
721 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
722 butlerQC.put(result, outputRefs)
724 def run(self, handle, funcs=None, dataId=None, band=None):
725 """Do postprocessing calculations
727 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
728 ``DataFrame`` object and dataId,
729 returns a dataframe with results of postprocessing calculations.
733 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
734 `~lsst.pipe.base.InMemoryDatasetHandle` or
735 `~pandas.DataFrame`, or list of these.
736 DataFrames from which calculations are done.
737 funcs : `~lsst.pipe.tasks.functors.Functor`
738 Functors to apply to the table's columns
739 dataId : dict, optional
740 Used to add a `patchId` column to the output dataframe.
741 band : `str`, optional
742 Filter band that is being processed.
746 result : `lsst.pipe.base.Struct`
747 Result struct, with a single ``outputCatalog`` attribute holding
748 the transformed catalog.
750 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
752 df = self.transform(band, handle, funcs, dataId).df
753 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
756 raise UpstreamFailureNoWorkFound(
757 "Input catalog is empty, so there is nothing to transform/standardize",
760 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
763 def getFunctors(self):
766 def getAnalysis(self, handles, funcs=None, band=None):
769 analysis = PostprocessAnalysis(handles, funcs, filt=band)
772 def transform(self, band, handles, funcs, dataId):
773 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
775 if dataId
and self.config.columnsFromDataId:
776 for key
in self.config.columnsFromDataId:
778 if key ==
"detector":
780 df[key] = np.int16(dataId[key])
782 df[key] = dataId[key]
784 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
786 if self.config.primaryKey:
787 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
788 df.reset_index(inplace=
True, drop=
True)
789 df.set_index(self.config.primaryKey, inplace=
True)
791 return pipeBase.Struct(
798 defaultTemplates={
"coaddName":
"deep"},
799 dimensions=(
"tract",
"patch",
"skymap")):
800 inputCatalog = connectionTypes.Input(
801 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
802 "stored as a DataFrame with a multi-level column index per-patch.",
803 dimensions=(
"tract",
"patch",
"skymap"),
804 storageClass=
"DataFrame",
805 name=
"{coaddName}Coadd_obj",
808 inputCatalogRef = connectionTypes.Input(
809 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
810 "for each detection in deepCoadd_mergeDet.",
811 dimensions=(
"tract",
"patch",
"skymap"),
812 storageClass=
"SourceCatalog",
813 name=
"{coaddName}Coadd_ref",
816 inputCatalogExpMultiprofit = connectionTypes.Input(
817 doc=
"Catalog of multiband Exponential fits.",
818 dimensions=(
"tract",
"patch",
"skymap"),
819 storageClass=
"ArrowAstropy",
820 name=
"{coaddName}Coadd_Exp_multiprofit",
823 inputCatalogSersicMultiprofit = connectionTypes.Input(
824 doc=
"Catalog of multiband Sersic fits.",
825 dimensions=(
"tract",
"patch",
"skymap"),
826 storageClass=
"ArrowAstropy",
827 name=
"{coaddName}Coadd_Sersic_multiprofit",
830 inputCatalogEpoch = connectionTypes.Input(
831 doc=
"Catalog of mean epochs for each object per band.",
832 dimensions=(
"tract",
"patch",
"skymap"),
833 storageClass=
"ArrowAstropy",
837 outputCatalog = connectionTypes.Output(
838 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
840 dimensions=(
"tract",
"patch",
"skymap"),
841 storageClass=
"ArrowAstropy",
847 if config.multilevelOutput:
848 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
851class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
852 pipelineConnections=TransformObjectCatalogConnections):
853 coaddName = pexConfig.Field(
858 outputBands = pexConfig.ListField(
862 doc=(
"These bands and only these bands will appear in the output,"
863 " NaN-filled if the input does not include them."
864 " If None, then use all bands found in the input.")
866 camelCase = pexConfig.Field(
869 doc=(
"Write per-band columns names with camelCase, else underscore "
870 "For example: gPsFlux instead of g_PsFlux.")
872 multilevelOutput = pexConfig.Field(
875 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
876 "and name-munged (False). If True, the output storage class will be "
877 "set to DataFrame, since astropy tables do not support multi-level indexing."),
878 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
880 goodFlags = pexConfig.ListField(
883 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
884 "All other flags are considered to be 'bad' flags and will be set to True.")
886 floatFillValue = pexConfig.Field(
889 doc=
"Fill value for float fields when populating empty tables."
891 integerFillValue = pexConfig.Field(
894 doc=
"Fill value for integer fields when populating empty tables."
897 def setDefaults(self):
898 super().setDefaults()
899 self.
functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
900 self.primaryKey =
"objectId"
901 self.columnsFromDataId = [
"tract",
"patch"]
902 self.goodFlags = [
"calib_astrometry_used",
903 "calib_photometry_reserved",
904 "calib_photometry_used",
905 "calib_psf_candidate",
906 "calib_psf_reserved",
910class TransformObjectCatalogTask(TransformCatalogBaseTask):
911 """Produce a flattened Object Table to match the format specified in
914 Do the same set of postprocessing calculations on all bands.
916 This is identical to `TransformCatalogBaseTask`, except for that it does
917 the specified functor calculations for all filters present in the
918 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
919 by the YAML file will be superceded.
921 _DefaultName =
"transformObjectCatalog"
922 ConfigClass = TransformObjectCatalogConfig
925 datasets_multiband = (
"ref",
"epoch",
"Exp_multiprofit",
"Sersic_multiprofit")
927 def runQuantum(self, butlerQC, inputRefs, outputRefs):
928 inputs = butlerQC.get(inputRefs)
929 if self.
funcs is None:
930 raise ValueError(
"config.functorFile is None. "
931 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
932 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
933 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
934 handle_ref=inputs[
"inputCatalogRef"],
935 handle_epoch=inputs[
"inputCatalogEpoch"],
936 handle_Exp_multiprofit=inputs[
"inputCatalogExpMultiprofit"],
937 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
939 butlerQC.put(result, outputRefs)
941 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
945 if isinstance(funcs, CompositeFunctor):
946 funcDict_in = funcs.funcDict
947 elif isinstance(funcs, dict):
949 elif isinstance(funcs, list):
950 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
952 raise TypeError(f
"Unsupported {type(funcs)=}")
955 funcDicts_multiband = {}
956 for dataset
in self.datasets_multiband:
957 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
958 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
959 handles_multi[dataset] = handle_multi
960 funcDicts_multiband[dataset] = {}
964 templateDf = pd.DataFrame()
966 columns = handle.get(component=
"columns")
967 inputBands = columns.unique(level=1).values
969 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
974 columns_add_from_ref = defaultdict(list)
976 for name, func
in funcDict_in.items():
977 if func.dataset
in funcDicts_multiband:
979 if band := getattr(func,
"band_to_check",
None):
980 if band
not in outputBands:
983 elif hasattr(func,
"bands"):
988 func.bands = tuple(inputBands)
989 if hasattr(func,
"columns_ref"):
990 columns_add_from_ref[func.dataset].extend(func.columns_ref)
992 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
993 funcDict[name] = func
998 for inputBand
in inputBands:
999 if inputBand
not in outputBands:
1000 self.log.info(
"Ignoring %s band data in the input", inputBand)
1002 self.log.info(
"Transforming the catalog of band %s", inputBand)
1003 result = self.transform(inputBand, handle, funcs_band, dataId)
1004 dfDict[inputBand] = result.df
1005 analysisDict[inputBand] = result.analysis
1006 if templateDf.empty:
1007 templateDf = result.df
1010 for filt
in outputBands:
1011 if filt
not in dfDict:
1012 self.log.info(
"Adding empty columns for band %s", filt)
1013 dfTemp = templateDf.copy()
1014 for col
in dfTemp.columns:
1015 testValue = dfTemp[col].values[0]
1016 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1018 if col
in self.config.goodFlags:
1022 elif isinstance(testValue, numbers.Integral):
1026 if isinstance(testValue, np.unsignedinteger):
1027 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1029 fillValue = self.config.integerFillValue
1031 fillValue = self.config.floatFillValue
1032 dfTemp[col].values[:] = fillValue
1033 dfDict[filt] = dfTemp
1036 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1037 name_index = df.index.name
1040 if not self.config.multilevelOutput:
1041 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1042 if self.config.primaryKey
in noDupCols:
1043 noDupCols.remove(self.config.primaryKey)
1044 if dataId
and self.config.columnsFromDataId:
1045 noDupCols += self.config.columnsFromDataId
1046 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1047 inputBands=inputBands)
1053 for dataset, funcDict
in funcDicts_multiband.items():
1054 handle_multiband = handles_multi[dataset]
1055 df_dataset = handle_multiband.get()
1056 if isinstance(df_dataset, astropy.table.Table):
1058 if name_index
not in df_dataset.colnames:
1059 if self.config.primaryKey
in df_dataset.colnames:
1060 name_index_ap = self.config.primaryKey
1063 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1064 f
" {df_dataset.colnames=} for {dataset=}"
1067 name_index_ap = name_index
1068 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1069 elif isinstance(df_dataset, afwTable.SourceCatalog):
1070 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1072 if dataset ==
"ref":
1076 raise RuntimeError(f
"ref must be the first dataset, not {dataset}")
1078 for column
in {key:
None for key
in columns_add_from_ref.get(dataset, [])}:
1079 df_dataset[column] = df_ref[column]
1083 result = self.transform(
1085 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1089 result.df.index.name = name_index
1091 if self.config.columnsFromDataId:
1092 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1094 result.df.drop(columns_drop, axis=1, inplace=
True)
1098 to_concat = pd.concat(
1099 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1100 )
if self.config.multilevelOutput
else result.df
1101 df = pd.concat([df, to_concat], axis=1)
1102 analysisDict[dataset] = result.analysis
1105 df.index.name = self.config.primaryKey
1107 if not self.config.multilevelOutput:
1108 tbl = pandas_to_astropy(df)
1112 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1114 return pipeBase.Struct(outputCatalog=tbl)
1117class ComputeColumnsAction(ConfigurableAction):
1118 """An action that computes multiple vectors from an input.
1120 This class is meant to be compatible with analysis_tools'
1121 AnalysisAction class, which cannot be a dependency of pipe_tasks."""
1123 def getInputSchema(self) -> dict[str, type[NDArray]]:
1124 """Return the required inputs for this action.
1126 This function is meant to be compatible with
1128 raise NotImplementedError(
"This method must be overloaded in subclasses")
1130 def __call__(self, table: astropy.table.Table) -> dict[str, NDArray]:
1131 """This method must return a dict of computed columns."""
1132 raise NotImplementedError(
"This method must be overloaded in subclasses")
1135class ExtendednessColumnActionBase(ComputeColumnsAction):
1136 bands = pexConfig.ListField[str](
1137 doc=
"The bands to make single-band outputs for.",
1138 default=[
"u",
"g",
"r",
"i",
"z",
"y"]
1140 bands_combined = pexConfig.DictField[str, str](
1141 doc=
"Multiband classification column specialization. Keys specify the"
1142 " name of the column and values are a comma-separated list of"
1143 " bands, all of which must be contained in the bands listed.",
1144 default={
"griz":
"g,r,i,z"},
1145 itemCheck=
lambda x: (len(y := x.split(
",")) > 1) & (len(set(y)) == len(y)),
1147 model_column_flux = pexConfig.Field[str](
1148 doc=
"The model flux column to use for computing the difference to"
1149 " to the S/N flux. Must contain the {band} and {model} templates.",
1150 default=
"{band}_{model}Flux",
1151 check=
lambda x: (
"{band}" in x)
and (
"{model}" in x,),
1153 model_column_flux_err = pexConfig.Field[str](
1154 doc=
"The model flux error column to use for computing the difference"
1155 " to the S/N flux. Must contain the {band} and {model} templates.",
1156 default=
"{band}_{model}FluxErr",
1157 check=
lambda x: (
"{band}" in x)
and (
"{model}" in x,),
1159 model_flux_name = pexConfig.Field[str](
1160 doc=
"The extended object model to use to compared to PSF model fluxes",
1163 output_column = pexConfig.Field[str](
1164 doc=
"Name of the output column. Must contain the {band} template",
1165 default=
"{band}_model_extendedness",
1166 check=
lambda x:
"{band}" in x,
1168 psf_column_flux = pexConfig.Field[str](
1169 doc=
"The name of the PSF flux column. Must contain the {band} template.",
1170 default=
"{band}_psfFlux",
1171 check=
lambda x:
"{band}" in x,
1173 psf_column_flux_err = pexConfig.Field[str](
1174 doc=
"The name of the PSF flux error column. Must contain the {band} template.",
1175 default=
"{band}_psfFluxErr",
1176 check=
lambda x:
"{band}" in x,
1178 size_column = pexConfig.Field[str](
1179 doc=
"The column to use for applying size cuts. Must contain the {axis} template.",
1180 default=
"exponential_reff_{axis}",
1183 def getInputSchema(self) -> Iterable[tuple[str, type[NDArray]]]:
1184 size_column = self.size_column
1186 (size_column.format(axis=axis), NDArray[float])
for axis
in (
"x",
"y")
1188 model = self.model_flux_name
1190 self.psf_column_flux, self.psf_column_flux_err,
1191 self.model_column_flux, self.model_column_flux_err,
1194 (column.format(band=band, model=model), NDArray[float])
for band
in self.bands
1202 for name, band_combined
in self.bands_combined.items():
1203 bands = band_combined.split(
",")
1204 bands_missing = [band
for band
in bands
if band
not in self.bands]
1207 f
"self.bands_combined[{name}] contains bands={bands_missing} not in {self.bands=}"
1210 raise ValueError(f
"Validation failed due to errors: {'; '.join(errors)}")
1212 def _get_fluxes(self, table, band: str):
1213 model = self.model_flux_name
1214 flux_psf, fluxerr_psf, flux_model, fluxerr_model = (
1215 np.array(table[column.format(band=band, model=model)])
1217 self.psf_column_flux, self.psf_column_flux_err,
1218 self.model_column_flux, self.model_column_flux_err,
1221 return flux_psf, fluxerr_psf, flux_model, fluxerr_model
1224class ModelExtendednessColumnAction(ExtendednessColumnActionBase):
1225 fluxerr_coefficent = pexConfig.Field[float](
1226 doc=
"The coefficient to multiply the flux error by when adding to the model flux.",
1228 check=
lambda x: x >= 0,
1230 fluxerr_stretch = pexConfig.Field[float](
1231 doc=
"The factor to multiply flux error-scaled ratios by to derive extendedness.",
1233 check=
lambda x: x > 0,
1235 good_sn_min = pexConfig.Field[float](
1236 doc=
"Minimum PSF S/N to include objects if"
1237 " min_n_good_to_shift_flux_ratio is > 0; ignored otherwise.",
1240 max_reff_compact = pexConfig.Field[float](
1241 doc=
"The maximum effective radius in pixels below which an object is"
1242 " classified as not extended, regardless of other parameter values.",
1245 min_n_good_to_shift_flux_ratio = pexConfig.Field[int](
1246 doc=
"Minimum number of objects with PSF S/N > good_sn_min and with "
1247 " size larger than max_reff_compact to use to compute the median "
1248 " PSF-to-model flux ratio, which is assumed to be 1 otherwise."
1249 " If this value is not >0, the median flux ratio will be kept 1.",
1253 def __call__(self, table: astropy.table.Table) -> dict[str, NDArray]:
1254 size_column = self.size_column
1255 size_model = np.sqrt(
1256 0.5*(table[size_column.format(axis=
'x')]**2 + table[size_column.format(axis=
'y')]**2)
1258 small = size_model < self.max_reff_compact
1260 band_mappings_to_process = {band: [band]
for band
in self.bands}
1261 band_mappings_to_process.update({k: v.split(
",")
for k, v
in self.bands_combined.items()})
1263 for output_band, input_bands
in band_mappings_to_process.items():
1264 if len(input_bands) > 1:
1265 flux_psf, fluxerr_psf_sq, flux_model, fluxerr_model_sq = (
1266 np.zeros(n_obj, dtype=float)
for _
in range(4))
1267 for input_band
in input_bands:
1268 flux_psf_b, fluxerr_psf_b, flux_model_b, fluxerr_model_b = self._get_fluxes(
1269 table, band=input_band)
1271 good = np.isfinite(flux_psf_b) & np.isfinite(flux_model_b) & (
1272 flux_psf_b > 0) & (flux_model_b > 0) & (fluxerr_psf_b > 0) & (fluxerr_model_b > 0)
1273 flux_psf[good] += flux_psf_b[good]
1274 flux_model[good] += flux_model_b[good]
1275 fluxerr_psf_sq[good] += fluxerr_psf_b[good]**2
1276 fluxerr_model_sq[good] += fluxerr_model_b[good]**2
1277 fluxerr_psf = np.sqrt(fluxerr_psf_sq)
1278 fluxerr_model = np.sqrt(fluxerr_model_sq)
1279 fluxerr_psf[fluxerr_psf == 0] = np.inf
1280 fluxerr_model[fluxerr_model == 0] = np.inf
1282 flux_psf, fluxerr_psf, flux_model, fluxerr_model = self._get_fluxes(
1283 table, band=input_bands[0])
1285 psf_sn = flux_psf/fluxerr_psf
1286 flux_ratio = np.array(flux_psf / flux_model)
1288 if self.min_n_good_to_shift_flux_ratio > 0:
1289 good = small & (psf_sn > self.good_sn_min)
1292 if np.sum(good ==
True) > self.min_n_good_to_shift_flux_ratio:
1293 flux_ratio *= 1./np.nanmedian(flux_ratio[good])
1295 flux_ratio_err = np.sqrt(
1296 (fluxerr_psf/flux_model)**2 + (fluxerr_model*fluxerr_psf/flux_model**2)**2
1298 extendedness = (1 - flux_ratio) + self.fluxerr_coefficent*flux_ratio_err
1299 extendedness *= np.sqrt(size_model/self.max_reff_compact)
1300 extendedness[(extendedness < 0) & (extendedness > -np.inf)] = 0
1302 stretch = self.fluxerr_stretch
1303 extendedness *= stretch
1304 extendedness = np.clip((stretch + 1)/stretch*extendedness/(1 + extendedness), 0, 1)
1306 column_out = self.output_column.format(band=output_band)
1307 output[column_out] = extendedness
1312 dimensions=(
"tract",
"skymap")):
1313 inputCatalogs = connectionTypes.Input(
1314 doc=
"Per-Patch objectTables conforming to the standard data model.",
1316 storageClass=
"ArrowAstropy",
1317 dimensions=(
"tract",
"patch",
"skymap"),
1321 outputCatalog = connectionTypes.Output(
1322 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1323 name=
"objectTable_tract",
1324 storageClass=
"ArrowAstropy",
1325 dimensions=(
"tract",
"skymap"),
1330 pipelineConnections=ConsolidateObjectTableConnections):
1331 actions = ConfigurableActionStructField[ComputeColumnsAction](
1332 doc=
"Actions to add columns to the final object table",
1334 coaddName = pexConfig.Field[str](
1341 self.
actions.extendedness = ModelExtendednessColumnAction()
1345 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1347 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1349 _DefaultName =
"consolidateObjectTable"
1350 ConfigClass = ConsolidateObjectTableConfig
1352 inputDataset =
"objectTable"
1353 outputDataset =
"objectTable_tract"
1356 inputs = butlerQC.get(inputRefs)
1357 self.log.info(
"Concatenating %s per-patch Object Tables",
1358 len(inputs[
"inputCatalogs"]))
1359 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1360 for action
in self.config.actions:
1361 computed = action(table)
1362 for key, values
in computed.items():
1363 table[key] = values.astype(np.float32)
if values.dtype == np.float64
else values
1364 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1368 defaultTemplates={
"catalogType":
""},
1369 dimensions=(
"instrument",
"visit",
"detector")):
1371 inputCatalog = connectionTypes.Input(
1372 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1373 name=
"{catalogType}source",
1374 storageClass=
"DataFrame",
1375 dimensions=(
"instrument",
"visit",
"detector"),
1378 outputCatalog = connectionTypes.Output(
1379 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1380 "specified set of functors",
1381 name=
"{catalogType}sourceTable",
1382 storageClass=
"ArrowAstropy",
1383 dimensions=(
"instrument",
"visit",
"detector")
1388 pipelineConnections=TransformSourceTableConnections):
1392 self.
functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1398 """Transform/standardize a source catalog
1400 _DefaultName =
"transformSourceTable"
1401 ConfigClass = TransformSourceTableConfig
1405 dimensions=(
"instrument",
"visit",),
1406 defaultTemplates={
"calexpType":
""}):
1407 camera = connectionTypes.PrerequisiteInput(
1408 doc=
"Camera geometry.",
1410 dimensions=(
"instrument",),
1411 storageClass=
"Camera",
1414 calexp = connectionTypes.Input(
1415 doc=
"Processed exposures used for metadata",
1417 storageClass=
"ExposureF",
1418 dimensions=(
"instrument",
"visit",
"detector"),
1422 visitSummary = connectionTypes.Output(
1423 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1424 "detector id for the id and are sorted for fast lookups of a "
1426 name=
"visitSummary",
1427 storageClass=
"ExposureCatalog",
1428 dimensions=(
"instrument",
"visit"),
1430 visitSummarySchema = connectionTypes.InitOutput(
1431 doc=
"Schema of the visitSummary catalog",
1432 name=
"visitSummary_schema",
1433 storageClass=
"ExposureCatalog",
1438 visit_geometry = connectionTypes.Output(
1439 doc=
"Updated visit[, detector] regions that can be used to update butler dimensions records.",
1440 name=
"visit_geometry",
1441 dimensions=(
"instrument",
"visit"),
1442 storageClass=
"VisitGeometry",
1446 if self.config.do_refit_pointing:
1447 self.camera = dataclasses.replace(self.camera, dimensions=config.cameraDimensions)
1450 if not self.config.do_write_visit_geometry:
1451 del self.visit_geometry
1455 pipelineConnections=ConsolidateVisitSummaryConnections):
1456 """Config for ConsolidateVisitSummaryTask"""
1458 full = pexConfig.Field(
1459 "Whether to propate all exposure components. "
1460 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1461 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1462 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1466 refitPointing = pexConfig.ConfigurableField(
1467 "A subtask for refitting the boresight pointing and orientation, "
1468 "and using those to produce new regions for butler dimensions.",
1469 target=RefitPointingTask,
1471 cameraDimensions = pexConfig.ListField(
1472 "The dimensions of the 'camera' prerequisite input connection.",
1474 default=[
"instrument"],
1476 do_refit_pointing = pexConfig.Field(
1477 "Whether to re-fit the pointing model.",
1481 do_write_visit_geometry = pexConfig.Field(
1482 "Whether to write refit-pointing regions that can be used to update butler dimension records.",
1490 raise ValueError(
"Cannot write visit_geometry without refitting the pointing.")
1500 """Task to consolidate per-detector visit metadata.
1502 This task aggregates the following metadata from all the detectors in a
1503 single visit into an exposure catalog:
1507 - The physical_filter and band (if available).
1509 - The aperture correction map.
1510 - The transmission curve.
1511 - The psf size, shape, and effective area at the center of the detector.
1512 - The corners of the bounding box in right ascension/declination.
1514 Tests for this task are performed in ci_hsc_gen3.
1516 _DefaultName =
"consolidateVisitSummary"
1517 ConfigClass = ConsolidateVisitSummaryConfig
1521 self.
schema = afwTable.ExposureTable.makeMinimalSchema()
1522 self.
schema.addField(
"visit", type=
"L", doc=
"Visit number")
1523 self.
schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1524 self.
schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1525 ExposureSummaryStats.update_schema(self.
schema)
1526 if self.config.do_refit_pointing:
1527 self.makeSubtask(
"refitPointing", schema=self.
schema)
1531 handles = butlerQC.get(inputRefs.calexp)
1532 visit = handles[0].dataId[
"visit"]
1534 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1535 len(handles), visit)
1537 camera = butlerQC.get(inputRefs.camera)
if self.config.do_refit_pointing
else None
1539 result = pipeBase.Struct()
1541 self.
run(visit=visit, handles=handles, camera=camera, result=result)
1542 except pipeBase.AlgorithmError
as e:
1543 error = pipeBase.AnnotatedPartialOutputsError.annotate(
1544 e, self, result.visitSummary, log=self.log
1546 butlerQC.put(result, outputRefs)
1549 butlerQC.put(result, outputRefs)
1551 def run(self, *, visit, handles, camera=None, result=None):
1552 """Make a combined exposure catalog from a list of handles.
1553 These handles must point to exposures with wcs, summaryStats,
1554 and other visit metadata.
1559 Visit identification number.
1560 handles : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1561 List of handles in visit.
1562 camera : `lsst.afw.cameraGeom.Camera`, optional
1563 Camera geometry. Required if and only if
1564 ``do_refit_pointing=True``.
1565 result : `lsst.pipe.base.Struct`, optional
1566 Output struct to modify in-place.
1570 result : `lsst.pipe.base.Struct`
1571 Struct with the following attributes:
1573 - ``visitSummary`` (`lsst.afw.table.ExposureCatalog`): an Exposure
1574 catalog with per-detector summary information.
1575 - ``visit_geometry`` (`lsst.obs.base.visit_geometry.VisitGeometry`):
1576 Regions that can be used to update butler dimension regions for
1577 this visit. Only present if ``do_refit_pointing=True``.
1579 cat = afwTable.ExposureCatalog(self.
schema)
1580 cat.resize(len(handles))
1582 cat[
"visit"] = visit
1589 for i, dataRef
in enumerate(handles):
1590 if visitInfo
is None:
1591 visitInfo = dataRef.get(component=
"visitInfo")
1592 if filterLabel
is None:
1593 filterLabel = dataRef.get(component=
"filter")
1594 summaryStats = dataRef.get(component=
"summaryStats")
1595 wcs = dataRef.get(component=
"wcs")
1596 photoCalib = dataRef.get(component=
"photoCalib")
1597 bbox = dataRef.get(component=
"bbox")
1598 validPolygon = dataRef.get(component=
"validPolygon")
1602 rec.setVisitInfo(visitInfo)
1604 rec.setPhotoCalib(photoCalib)
1605 rec.setValidPolygon(validPolygon)
1607 if self.config.full:
1608 rec.setPsf(dataRef.get(component=
"psf"))
1609 rec.setApCorrMap(dataRef.get(component=
"apCorrMap"))
1610 rec.setTransmissionCurve(dataRef.get(component=
"transmissionCurve"))
1612 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1613 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1614 rec.setId(dataRef.dataId[
"detector"])
1615 summaryStats.update_record(rec)
1618 raise pipeBase.NoWorkFound(
1619 "No detectors had sufficient information to make a visit summary row."
1622 metadata = dafBase.PropertyList()
1623 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1625 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1626 cat.setMetadata(metadata)
1631 result = pipeBase.Struct()
1632 result.visitSummary = cat
1634 if self.config.do_refit_pointing:
1635 refitPointingResult = self.refitPointing.run(catalog=cat, camera=camera)
1636 result.visit_geometry = refitPointingResult.regions
1642 defaultTemplates={
"catalogType":
""},
1643 dimensions=(
"instrument",
"visit")):
1644 inputCatalogs = connectionTypes.Input(
1645 doc=
"Input per-detector Source Tables",
1646 name=
"{catalogType}sourceTable",
1647 storageClass=
"ArrowAstropy",
1648 dimensions=(
"instrument",
"visit",
"detector"),
1652 outputCatalog = connectionTypes.Output(
1653 doc=
"Per-visit concatenation of Source Table",
1654 name=
"{catalogType}sourceTable_visit",
1655 storageClass=
"ArrowAstropy",
1656 dimensions=(
"instrument",
"visit")
1661 pipelineConnections=ConsolidateSourceTableConnections):
1666 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1668 _DefaultName =
"consolidateSourceTable"
1669 ConfigClass = ConsolidateSourceTableConfig
1671 inputDataset =
"sourceTable"
1672 outputDataset =
"sourceTable_visit"
1676 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1677 detectorOrder.sort()
1678 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1679 inputs = butlerQC.get(inputRefs)
1680 self.log.info(
"Concatenating %s per-detector Source Tables",
1681 len(inputs[
"inputCatalogs"]))
1682 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1683 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1687 dimensions=(
"instrument",),
1688 defaultTemplates={
"calexpType":
""}):
1689 visitSummaryRefs = connectionTypes.Input(
1690 doc=
"Data references for per-visit consolidated exposure metadata",
1691 name=
"finalVisitSummary",
1692 storageClass=
"ExposureCatalog",
1693 dimensions=(
"instrument",
"visit"),
1697 outputCatalog = connectionTypes.Output(
1698 doc=
"CCD and Visit metadata table",
1699 name=
"ccdVisitTable",
1700 storageClass=
"ArrowAstropy",
1701 dimensions=(
"instrument",)
1706 pipelineConnections=MakeCcdVisitTableConnections):
1707 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1711 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1713 _DefaultName =
"makeCcdVisitTable"
1714 ConfigClass = MakeCcdVisitTableConfig
1716 def run(self, visitSummaryRefs):
1717 """Make a table of ccd information from the visit summary catalogs.
1721 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1722 List of DeferredDatasetHandles pointing to exposure catalogs with
1723 per-detector summary information.
1727 result : `~lsst.pipe.base.Struct`
1728 Results struct with attribute:
1731 Catalog of ccd and visit information.
1734 for visitSummaryRef
in visitSummaryRefs:
1735 visitSummary = visitSummaryRef.get()
1736 if not visitSummary:
1738 visitInfo = visitSummary[0].getVisitInfo()
1741 strip_provenance_from_fits_header(visitSummary.metadata)
1744 summaryTable = visitSummary.asAstropy()
1745 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1746 "pixelScale",
"zenithDistance",
1747 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1748 "psfAdaptiveThresholdValue",
"psfAdaptiveIncludeThresholdMultiplier",
1749 "nShapeletsStar",
"shapeletsOnlyIqScore",
"shapeletsIqScore",
1750 "centroidDiffShapeletsVsSlotMedian",
1751 "shapeletsStarEMedian",
"shapeletsStarUnNormalizedEMedian",
1752 "refCatSourceDensity",
"astromOffsetMean",
"astromOffsetStd",
1753 "nPsfStar",
"psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1754 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1755 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1756 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1757 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1758 "maxDistToNearestPsf",
"starEMedian",
"starUnNormalizedEMedian",
1759 "starComa1Median",
"starComa2Median",
1760 "starTrefoil1Median",
"starTrefoil2Median",
1761 "starKurtosisMedian",
"starE41Median",
"starE42Median",
1762 "effTime",
"effTimePsfSigmaScale",
1763 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1765 ccdEntry = summaryTable[selectColumns]
1770 ccdEntry.rename_column(
"visit",
"visitId")
1771 ccdEntry.rename_column(
"id",
"detectorId")
1775 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1777 ccdEntry[
"ccdVisitId"] = [
1778 self.config.idGenerator.apply(
1779 visitSummaryRef.dataId,
1780 detector=detector_id,
1787 for detector_id
in summaryTable[
"id"]
1789 ccdEntry[
"detector"] = summaryTable[
"id"]
1790 ccdEntry[
"seeing"] = (
1791 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1793 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1794 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1795 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1796 expTime = visitInfo.getExposureTime()
1797 ccdEntry[
"obsStart"] = (
1798 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1800 expTime_days = expTime / (60*60*24)
1801 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1802 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1803 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1804 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1805 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1806 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1807 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1808 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1809 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1810 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1811 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1812 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1816 for inName, outName
in {
1817 "wcs_corner_max_offset":
"wcsCornerMaxOffset",
1818 "wcs_detector_pointing_residual":
"wcsDetectorPointingResidual",
1819 "wcs_visit_pointing_residual":
"wcsVisitPointingResidual",
1820 "preliminary_wcs_detector_pointing_residual":
"wcsPreliminaryDetectorPointingResidual",
1821 "preliminary_wcs_visit_pointing_residual":
"wcsPreliminaryVisitPointingResidual",
1823 if inName
in summaryTable.columns:
1824 inCol = summaryTable[inName]
1825 ccdEntry[outName] = astropy.table.Column(
1826 (np.asarray(inCol) * inCol.unit).to_value(astropy.units.arcsec),
1827 unit=astropy.units.arcsec
1832 ccdEntries.append(ccdEntry)
1834 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1835 return pipeBase.Struct(outputCatalog=outputCatalog)
1839 dimensions=(
"instrument",),
1840 defaultTemplates={
"calexpType":
""}):
1841 visitSummaries = connectionTypes.Input(
1842 doc=
"Per-visit consolidated exposure metadata",
1843 name=
"finalVisitSummary",
1844 storageClass=
"ExposureCatalog",
1845 dimensions=(
"instrument",
"visit",),
1849 outputCatalog = connectionTypes.Output(
1850 doc=
"Visit metadata table",
1852 storageClass=
"ArrowAstropy",
1853 dimensions=(
"instrument",)
1858 pipelineConnections=MakeVisitTableConnections):
1863 """Produce a `visitTable` from the visit summary exposure catalogs.
1865 _DefaultName =
"makeVisitTable"
1866 ConfigClass = MakeVisitTableConfig
1868 def run(self, visitSummaries):
1869 """Make a table of visit information from the visit summary catalogs.
1873 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1874 List of exposure catalogs with per-detector summary information.
1877 result : `~lsst.pipe.base.Struct`
1878 Results struct with attribute:
1881 Catalog of visit information.
1884 for visitSummary
in visitSummaries:
1885 visitSummary = visitSummary.get()
1886 if not visitSummary:
1888 visitRow = visitSummary[0]
1889 visitInfo = visitRow.getVisitInfo()
1892 visitEntry[
"visitId"] = visitRow[
"visit"]
1893 visitEntry[
"visit"] = visitRow[
"visit"]
1894 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1895 visitEntry[
"band"] = visitRow[
"band"]
1896 raDec = visitInfo.getBoresightRaDec()
1897 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1898 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1902 visitEntry[
"decl"] = visitEntry[
"dec"]
1904 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1905 azAlt = visitInfo.getBoresightAzAlt()
1906 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1907 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1908 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1909 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1910 expTime = visitInfo.getExposureTime()
1911 visitEntry[
"expTime"] = expTime
1912 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1913 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1914 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1915 expTime_days = expTime / (60*60*24)
1916 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1917 visitEntries.append(visitEntry)
1923 outputCatalog = astropy.table.Table(rows=visitEntries)
1924 return pipeBase.Struct(outputCatalog=outputCatalog)
1927@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1928 "This task will be removed after v30.",
1929 version=
"v29.0", category=FutureWarning)
1931 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1933 inputCatalog = connectionTypes.Input(
1934 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1935 "By default, it is the output of ForcedPhotCcdTask on calexps",
1937 storageClass=
"SourceCatalog",
1938 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1940 inputCatalogDiff = connectionTypes.Input(
1941 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1942 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1944 storageClass=
"SourceCatalog",
1945 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1947 outputCatalog = connectionTypes.Output(
1948 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1949 name=
"mergedForcedSource",
1950 storageClass=
"DataFrame",
1951 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1955@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1956 "This task will be removed after v30.",
1957 version=
"v29.0", category=FutureWarning)
1959 pipelineConnections=WriteForcedSourceTableConnections):
1960 key = lsst.pex.config.Field(
1961 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1967@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1968 "This task will be removed after v30.",
1969 version=
"v29.0", category=FutureWarning)
1971 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1973 Because the predecessor ForcedPhotCcdTask operates per-detector,
1974 per-tract, (i.e., it has tract in its dimensions), detectors
1975 on the tract boundary may have multiple forced source catalogs.
1977 The successor task TransformForcedSourceTable runs per-patch
1978 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1979 available multiple epochs.
1981 _DefaultName =
"writeForcedSourceTable"
1982 ConfigClass = WriteForcedSourceTableConfig
1985 inputs = butlerQC.get(inputRefs)
1986 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1987 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1988 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1989 outputs = self.
run(**inputs)
1990 butlerQC.put(outputs, outputRefs)
1992 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1994 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1995 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1996 df = df.reindex(sorted(df.columns), axis=1)
1999 df[
"detector"] = np.int16(detector)
2000 df[
"band"] = band
if band
else pd.NA
2001 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
2002 names=(
"dataset",
"column"))
2006 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
2007 return pipeBase.Struct(outputCatalog=outputCatalog)
2011 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
2013 inputCatalogs = connectionTypes.Input(
2014 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
2015 name=
"mergedForcedSource",
2016 storageClass=
"DataFrame",
2017 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
2021 referenceCatalog = connectionTypes.Input(
2022 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
2023 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
2026 storageClass=
"DataFrame",
2027 dimensions=(
"tract",
"patch",
"skymap"),
2030 outputCatalog = connectionTypes.Output(
2031 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
2032 "specified set of functors",
2033 name=
"forcedSourceTable",
2034 storageClass=
"ArrowAstropy",
2035 dimensions=(
"tract",
"patch",
"skymap")
2040 pipelineConnections=TransformForcedSourceTableConnections):
2041 referenceColumns = pexConfig.ListField(
2043 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
2045 doc=
"Columns to pull from reference catalog",
2047 keyRef = lsst.pex.config.Field(
2048 doc=
"Column on which to join the two input tables on and make the primary key of the output",
2052 key = lsst.pex.config.Field(
2053 doc=
"Rename the output DataFrame index to this name",
2055 default=
"forcedSourceId",
2060 self.
functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
2065 """Transform/standardize a ForcedSource catalog
2067 Transforms each wide, per-detector forcedSource DataFrame per the
2068 specification file (per-camera defaults found in ForcedSource.yaml).
2069 All epochs that overlap the patch are aggregated into one per-patch
2070 narrow-DataFrame file.
2072 No de-duplication of rows is performed. Duplicate resolutions flags are
2073 pulled in from the referenceCatalog: `detect_isPrimary`,
2074 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
2075 for analysis or compare duplicates for QA.
2077 The resulting table includes multiple bands. Epochs (MJDs) and other useful
2078 per-visit rows can be retreived by joining with the CcdVisitTable on
2081 _DefaultName =
"transformForcedSourceTable"
2082 ConfigClass = TransformForcedSourceTableConfig
2085 inputs = butlerQC.get(inputRefs)
2086 if self.funcs
is None:
2087 raise ValueError(
"config.functorFile is None. "
2088 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
2089 outputs = self.
run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
2090 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
2092 butlerQC.put(outputs, outputRefs)
2094 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
2096 refColumns = list(self.config.referenceColumns)
2097 refColumns.append(self.config.keyRef)
2099 ref = referenceCatalog.get(parameters={
"columns": refColumns})
2101 raise NoWorkFound(f
"No objects to run forced photometry for {dataId}.")
2102 if ref.index.name != self.config.keyRef:
2108 ref.set_index(self.config.keyRef, inplace=
True)
2109 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
2110 for handle
in inputCatalogs:
2111 result = self.transform(
None, handle, funcs, dataId)
2113 dfs.append(result.df.join(ref, how=
"inner"))
2115 outputCatalog = pd.concat(dfs)
2117 if outputCatalog.empty:
2118 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
2122 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
2124 outputCatalog.reset_index(inplace=
True)
2126 if "forcedSourceId" in outputCatalog.columns:
2129 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
2131 outputCatalog.index.rename(self.config.key, inplace=
True)
2133 self.log.info(
"Made a table of %d columns and %d rows",
2134 len(outputCatalog.columns), len(outputCatalog))
2135 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
2139 defaultTemplates={
"catalogType":
""},
2140 dimensions=(
"instrument",
"tract")):
2141 inputCatalogs = connectionTypes.Input(
2142 doc=
"Input per-patch DataFrame Tables to be concatenated",
2143 name=
"{catalogType}ForcedSourceTable",
2144 storageClass=
"DataFrame",
2145 dimensions=(
"tract",
"patch",
"skymap"),
2149 outputCatalog = connectionTypes.Output(
2150 doc=
"Output per-tract concatenation of DataFrame Tables",
2151 name=
"{catalogType}ForcedSourceTable_tract",
2152 storageClass=
"DataFrame",
2153 dimensions=(
"tract",
"skymap"),
2158 pipelineConnections=ConsolidateTractConnections):
2160 doUseSchema = pexConfig.Field(
2163 doc=
"Use an existing schema to coerce the data types of the output columns."
2165 schemaDir = pexConfig.Field(
2167 doc=
"Path to the directory containing schema definitions.",
2168 default=os.path.join(
"${SDM_SCHEMAS_DIR}",
2172 schemaFile = pexConfig.Field(
2174 doc=
"Yaml file specifying the schema of the output catalog.",
2177 tableName = pexConfig.Field(
2179 doc=
"Name of the table in the schema file to read.",
2185 """Concatenate any per-patch, dataframe list into a single
2186 per-tract DataFrame.
2188 _DefaultName =
"ConsolidateTract"
2189 ConfigClass = ConsolidateTractConfig
2192 inputs = butlerQC.get(inputRefs)
2195 self.log.info(
"Concatenating %s per-patch %s Tables",
2196 len(inputs[
"inputCatalogs"]),
2197 inputRefs.inputCatalogs[0].datasetType.name)
2198 df = pd.concat(inputs[
"inputCatalogs"])
2199 if self.config.doUseSchema:
2200 schemaFile = os.path.join(self.config.schemaDir, self.config.schemaFile)
2201 schema = readSdmSchemaFile(schemaFile)
2202 df = convertDataFrameToSdmSchema(schema, df, tableName=self.config.tableName)
2203 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
2207 pipeBase.PipelineTaskConnections,
2208 dimensions=(
"instrument",
"tract")
2210 inputCatalogs = connectionTypes.Input(
2211 doc=
"Parents of the deblended objects",
2212 name=
"object_parent_patch",
2213 storageClass=
"SourceCatalog",
2214 dimensions=(
"tract",
"patch",
"skymap"),
2218 outputCatalog = connectionTypes.Output(
2219 doc=
"Output per-tract concatenation of DataFrame Tables",
2220 name=
"object_parent",
2221 storageClass=
"ArrowAstropy",
2222 dimensions=(
"tract",
"skymap"),
2227 pipeBase.PipelineTaskConfig,
2228 pipelineConnections=ConsolidateParentTractConnections,
2234 """Concatenate any per-patch, dataframe list into a single
2235 per-tract DataFrame.
2237 _DefaultName =
"ConsolidateTract"
2238 ConfigClass = ConsolidateParentTractConfig
2241 self.log.info(
"Concatenating %s per-patch %s Tables",
2242 len(inputRefs.inputCatalogs),
2243 inputRefs.inputCatalogs[0].datasetType.name)
2246 for ref
in inputRefs.inputCatalogs:
2247 catalog = butlerQC.get(ref)
2248 table = catalog.asAstropy()
2251 table.rename_column(
"id",
"objectId")
2252 table.rename_column(
"parent",
"parentObjectId")
2253 table.rename_column(
"merge_peak_sky",
"sky_object")
2256 table[
"tract"] = ref.dataId[
"tract"]
2257 table[
"patch"] = ref.dataId[
"patch"]
2259 tables.append(table)
2260 outputTable = astropy.table.vstack(tables, join_type=
"exact")
2261 butlerQC.put(pipeBase.Struct(outputCatalog=outputTable), outputRefs)
runQuantum(self, butlerQC, inputRefs, outputRefs)
runQuantum(self, butlerQC, inputRefs, outputRefs)
runQuantum(self, butlerQC, inputRefs, outputRefs)
runQuantum(self, butlerQC, inputRefs, outputRefs)
runQuantum(self, butlerQC, inputRefs, outputRefs)
run(self, *, visit, handles, camera=None, result=None)
run(self, visitSummaryRefs)
run(self, visitSummaries)
run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None)
runQuantum(self, butlerQC, inputRefs, outputRefs)
runQuantum(self, butlerQC, inputRefs, outputRefs)
run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None)
__init__(self, *, config=None)
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)