Coverage for python/lsst/ap/pipe/prune_orphan_preloads.py: 0%
59 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 09:00 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 09:00 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Prune orphan preload quanta from an Alert Production QuantumGraph.
24The Alert Production pipeline generates a ``loadDiaCatalogs`` quantum
25(keyed by ``group``) for every visit in the data query, regardless of
26whether that visit's image-differencing chain produces an
27``associateApdb`` quantum downstream. In batch contexts where some
28visits lack template coverage, the preload quanta for those visits are
29orphans: their outputs are never consumed by any associate quantum.
31Their presence breaks BPS visit ordering, because the ordering walk
32cannot map their ``group``-keyed data IDs to ``visit`` without a
33reachable ``associateApdb`` quantum. Removing the orphan preload
34quanta (along with the downstream metric quanta they feed) before BPS
35sees the graph sidesteps this entirely.
36"""
38import argparse
39import logging
41import networkx as nx
43from lsst.pipe.base import QuantumGraph
45__all__ = ["prune_orphan_preloads", "main"]
47_LOG = logging.getLogger(__spec__.name if __spec__ is not None else __name__)
49PRELOAD_LABEL = "loadDiaCatalogs"
50ANCHOR_LABEL = "associateApdb"
53def _find_task_def(qg, label):
54 """Return the TaskDef in ``qg`` whose label matches, or None."""
55 for task_def in qg.iterTaskGraph():
56 if task_def.label == label:
57 return task_def
58 return None
61def prune_orphan_preloads(qg, preload_label=PRELOAD_LABEL, anchor_label=ANCHOR_LABEL):
62 """Return a new QG with orphan preload quanta and their downstream chain removed.
64 A preload quantum is considered an orphan if no quantum with
65 ``anchor_label`` is reachable along the directed edges of the
66 quantum graph from it. Orphan preloads and every descendant quantum
67 reachable from them are dropped.
69 Parameters
70 ----------
71 qg : `lsst.pipe.base.QuantumGraph`
72 The graph to prune. Not modified.
73 preload_label : `str`, optional
74 Task label of the preload task whose orphan quanta to remove.
75 anchor_label : `str`, optional
76 Task label that must appear downstream of a preload quantum
77 for that preload to be considered non-orphan.
79 Returns
80 -------
81 pruned : `lsst.pipe.base.QuantumGraph`
82 A new graph with the orphan preload quanta and their downstream
83 descendants removed. The original ``qg`` is returned unchanged
84 if there is nothing to prune.
85 """
86 preload_task = _find_task_def(qg, preload_label)
87 if preload_task is None:
88 _LOG.info("No %r task in QG; nothing to prune.", preload_label)
89 return qg
91 anchor_task = _find_task_def(qg, anchor_label)
92 if anchor_task is None:
93 anchor_nodes = frozenset()
94 _LOG.warning(
95 "No %r quanta in QG; every %r quantum will be treated as an orphan.",
96 anchor_label,
97 preload_label,
98 )
99 else:
100 anchor_nodes = qg.getNodesForTask(anchor_task)
102 graph = qg._connectedQuanta
104 reachable_from_anchor = set(anchor_nodes)
105 for node in anchor_nodes:
106 reachable_from_anchor.update(nx.ancestors(graph, node))
108 preload_nodes = qg.getNodesForTask(preload_task)
109 orphan_preloads = preload_nodes - reachable_from_anchor
111 if not orphan_preloads:
112 _LOG.info("No orphan %r quanta found; QG unchanged.", preload_label)
113 return qg
115 to_remove = set(orphan_preloads)
116 for node in orphan_preloads:
117 to_remove.update(nx.descendants(graph, node))
119 descendants_count = len(to_remove) - len(orphan_preloads)
120 _LOG.info(
121 "Pruning %d orphan %r quanta and %d downstream quanta (%d total).",
122 len(orphan_preloads),
123 preload_label,
124 descendants_count,
125 len(to_remove),
126 )
128 pruned = qg.subset(set(qg) - to_remove)
130 # QuantumGraph.subset() does not propagate per-task init input/output
131 # refs (see the "TODO: Do we need to copy initInputs/initOutputs?" in
132 # pipe_base graph.py). Copy them across by hand so downstream tools
133 # like `pipetask update-graph-run` — which BPS invokes whenever the
134 # config defines a finalJob — still find init outputs like isr_config.
135 surviving_labels = {td.label for td in pruned.iterTaskGraph()}
136 pruned._initInputRefs = {
137 label: list(refs) for label, refs in qg._initInputRefs.items() if label in surviving_labels
138 }
139 pruned._initOutputRefs = {
140 label: list(refs) for label, refs in qg._initOutputRefs.items() if label in surviving_labels
141 }
142 return pruned
145def main(argv=None):
146 """Command-line entry point."""
147 parser = argparse.ArgumentParser(
148 description=(
149 "Prune loadDiaCatalogs quanta with no downstream associateApdb "
150 "quantum (and their downstream chain) from a QuantumGraph file."
151 )
152 )
153 parser.add_argument("input", help="Path to the input QuantumGraph (.qgraph or .qg).")
154 parser.add_argument("output", help="Path to write the pruned QuantumGraph.")
155 parser.add_argument(
156 "--preload-label",
157 default=PRELOAD_LABEL,
158 help="Task label of the preload task (default: %(default)s).",
159 )
160 parser.add_argument(
161 "--anchor-label",
162 default=ANCHOR_LABEL,
163 help=(
164 "Task label whose presence downstream marks a preload as "
165 "non-orphan (default: %(default)s)."
166 ),
167 )
168 args = parser.parse_args(argv)
170 logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")
171 logging.getLogger("numexpr").setLevel(logging.WARNING)
173 _LOG.info("Loading QG from %s", args.input)
174 qg = QuantumGraph.loadUri(args.input)
175 _LOG.info("Loaded %d quanta.", len(qg))
177 pruned = prune_orphan_preloads(qg, args.preload_label, args.anchor_label)
179 _LOG.info("Writing pruned QG (%d quanta) to %s", len(pruned), args.output)
180 pruned.saveUri(args.output)
183if __name__ == "__main__":
184 main()