Coverage for python/lsst/ap/pipe/prune_orphan_preloads.py: 0%

59 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 01:38 -0700

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Prune orphan preload quanta from an Alert Production QuantumGraph. 

23 

24The Alert Production pipeline generates a ``loadDiaCatalogs`` quantum 

25(keyed by ``group``) for every visit in the data query, regardless of 

26whether that visit's image-differencing chain produces an 

27``associateApdb`` quantum downstream. In batch contexts where some 

28visits lack template coverage, the preload quanta for those visits are 

29orphans: their outputs are never consumed by any associate quantum. 

30 

31Their presence breaks BPS visit ordering, because the ordering walk 

32cannot map their ``group``-keyed data IDs to ``visit`` without a 

33reachable ``associateApdb`` quantum. Removing the orphan preload 

34quanta (along with the downstream metric quanta they feed) before BPS 

35sees the graph sidesteps this entirely. 

36""" 

37 

38import argparse 

39import logging 

40 

41import networkx as nx 

42 

43from lsst.pipe.base import QuantumGraph 

44 

45__all__ = ["prune_orphan_preloads", "main"] 

46 

47_LOG = logging.getLogger(__spec__.name if __spec__ is not None else __name__) 

48 

49PRELOAD_LABEL = "loadDiaCatalogs" 

50ANCHOR_LABEL = "associateApdb" 

51 

52 

53def _find_task_def(qg, label): 

54 """Return the TaskDef in ``qg`` whose label matches, or None.""" 

55 for task_def in qg.iterTaskGraph(): 

56 if task_def.label == label: 

57 return task_def 

58 return None 

59 

60 

61def prune_orphan_preloads(qg, preload_label=PRELOAD_LABEL, anchor_label=ANCHOR_LABEL): 

62 """Return a new QG with orphan preload quanta and their downstream chain removed. 

63 

64 A preload quantum is considered an orphan if no quantum with 

65 ``anchor_label`` is reachable along the directed edges of the 

66 quantum graph from it. Orphan preloads and every descendant quantum 

67 reachable from them are dropped. 

68 

69 Parameters 

70 ---------- 

71 qg : `lsst.pipe.base.QuantumGraph` 

72 The graph to prune. Not modified. 

73 preload_label : `str`, optional 

74 Task label of the preload task whose orphan quanta to remove. 

75 anchor_label : `str`, optional 

76 Task label that must appear downstream of a preload quantum 

77 for that preload to be considered non-orphan. 

78 

79 Returns 

80 ------- 

81 pruned : `lsst.pipe.base.QuantumGraph` 

82 A new graph with the orphan preload quanta and their downstream 

83 descendants removed. The original ``qg`` is returned unchanged 

84 if there is nothing to prune. 

85 """ 

86 preload_task = _find_task_def(qg, preload_label) 

87 if preload_task is None: 

88 _LOG.info("No %r task in QG; nothing to prune.", preload_label) 

89 return qg 

90 

91 anchor_task = _find_task_def(qg, anchor_label) 

92 if anchor_task is None: 

93 anchor_nodes = frozenset() 

94 _LOG.warning( 

95 "No %r quanta in QG; every %r quantum will be treated as an orphan.", 

96 anchor_label, 

97 preload_label, 

98 ) 

99 else: 

100 anchor_nodes = qg.getNodesForTask(anchor_task) 

101 

102 graph = qg._connectedQuanta 

103 

104 reachable_from_anchor = set(anchor_nodes) 

105 for node in anchor_nodes: 

106 reachable_from_anchor.update(nx.ancestors(graph, node)) 

107 

108 preload_nodes = qg.getNodesForTask(preload_task) 

109 orphan_preloads = preload_nodes - reachable_from_anchor 

110 

111 if not orphan_preloads: 

112 _LOG.info("No orphan %r quanta found; QG unchanged.", preload_label) 

113 return qg 

114 

115 to_remove = set(orphan_preloads) 

116 for node in orphan_preloads: 

117 to_remove.update(nx.descendants(graph, node)) 

118 

119 descendants_count = len(to_remove) - len(orphan_preloads) 

120 _LOG.info( 

121 "Pruning %d orphan %r quanta and %d downstream quanta (%d total).", 

122 len(orphan_preloads), 

123 preload_label, 

124 descendants_count, 

125 len(to_remove), 

126 ) 

127 

128 pruned = qg.subset(set(qg) - to_remove) 

129 

130 # QuantumGraph.subset() does not propagate per-task init input/output 

131 # refs (see the "TODO: Do we need to copy initInputs/initOutputs?" in 

132 # pipe_base graph.py). Copy them across by hand so downstream tools 

133 # like `pipetask update-graph-run` — which BPS invokes whenever the 

134 # config defines a finalJob — still find init outputs like isr_config. 

135 surviving_labels = {td.label for td in pruned.iterTaskGraph()} 

136 pruned._initInputRefs = { 

137 label: list(refs) for label, refs in qg._initInputRefs.items() if label in surviving_labels 

138 } 

139 pruned._initOutputRefs = { 

140 label: list(refs) for label, refs in qg._initOutputRefs.items() if label in surviving_labels 

141 } 

142 return pruned 

143 

144 

145def main(argv=None): 

146 """Command-line entry point.""" 

147 parser = argparse.ArgumentParser( 

148 description=( 

149 "Prune loadDiaCatalogs quanta with no downstream associateApdb " 

150 "quantum (and their downstream chain) from a QuantumGraph file." 

151 ) 

152 ) 

153 parser.add_argument("input", help="Path to the input QuantumGraph (.qgraph or .qg).") 

154 parser.add_argument("output", help="Path to write the pruned QuantumGraph.") 

155 parser.add_argument( 

156 "--preload-label", 

157 default=PRELOAD_LABEL, 

158 help="Task label of the preload task (default: %(default)s).", 

159 ) 

160 parser.add_argument( 

161 "--anchor-label", 

162 default=ANCHOR_LABEL, 

163 help=( 

164 "Task label whose presence downstream marks a preload as " 

165 "non-orphan (default: %(default)s)." 

166 ), 

167 ) 

168 args = parser.parse_args(argv) 

169 

170 logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") 

171 logging.getLogger("numexpr").setLevel(logging.WARNING) 

172 

173 _LOG.info("Loading QG from %s", args.input) 

174 qg = QuantumGraph.loadUri(args.input) 

175 _LOG.info("Loaded %d quanta.", len(qg)) 

176 

177 pruned = prune_orphan_preloads(qg, args.preload_label, args.anchor_label) 

178 

179 _LOG.info("Writing pruned QG (%d quanta) to %s", len(pruned), args.output) 

180 pruned.saveUri(args.output) 

181 

182 

183if __name__ == "__main__": 

184 main()