Coverage for python/lsst/source/injection/bin/make_injection_pipeline.py: 20%

40 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 02:15 -0700

1# This file is part of source_injection. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import logging 

25import os 

26import time 

27from argparse import SUPPRESS, ArgumentParser 

28 

29from ..utils import make_injection_pipeline 

30from .source_injection_help_formatter import SourceInjectionHelpFormatter 

31 

32 

33def build_argparser(): 

34 """Build an argument parser for this script.""" 

35 parser = ArgumentParser( 

36 description="""Make an expanded source injection pipeline. 

37 

38This command takes a reference pipeline definition file in YAML format and 

39prefixes the input connections for all immediate consuming tasks of an injected 

40dataset with the injected prefix. If an optional injection pipeline definition 

41YAML file is also provided, the injection task will be merged into the pipeline. 

42 

43By default, all subsets from the reference pipeline containing the task which 

44generates the injection dataset type will also be updated to include the 

45injection task. 

46 

47When the injection pipeline is constructed, a check on all existing pipeline 

48contracts is performed. If any contracts are violated, they are removed from 

49the pipeline. A warning is logged for each contract that is removed. 

50""", 

51 formatter_class=SourceInjectionHelpFormatter, 

52 epilog="More information is available at https://pipelines.lsst.io.", 

53 add_help=False, 

54 argument_default=SUPPRESS, 

55 ) 

56 parser.add_argument( 

57 "-t", 

58 "--dataset-type-name", 

59 type=str, 

60 help="Name of the dataset type being injected into.", 

61 required=True, 

62 metavar="TEXT", 

63 ) 

64 parser.add_argument( 

65 "-r", 

66 "--reference-pipeline", 

67 type=str, 

68 help="Location of a reference pipeline definition YAML file.", 

69 required=True, 

70 metavar="FILE", 

71 ) 

72 parser.add_argument( 

73 "-i", 

74 "--injection-pipeline", 

75 type=str, 

76 help="Location of an injection pipeline definition YAML file stub. If " 

77 "this is not explicitly provided, an attempt to infer the injection " 

78 "pipeline stub will be made using the injected dataset type name.", 

79 metavar="FILE", 

80 ) 

81 parser.add_argument( 

82 "--no-update-subsets", 

83 help="Do not update pipeline subsets to include the injection task.", 

84 dest="update_subsets", 

85 action="store_false", 

86 ) 

87 parser.add_argument( 

88 "-x", 

89 "--excluded-tasks", 

90 type=str, 

91 help="Comma-separated set of task labels to exclude from the pipeline.", 

92 metavar="task", 

93 default="jointcal,gbdesAstrometricFit,fgcmBuildFromIsolatedStars,fgcmFitCycle,fgcmOutputProducts", 

94 ) 

95 parser.add_argument( 

96 "-f", 

97 "--filename", 

98 help="Path to save a modified pipeline definition YAML file.", 

99 metavar="FILE", 

100 ) 

101 parser.add_argument( 

102 "--overwrite", 

103 help="Overwrite the output saved pipeline definition file if it already exists.", 

104 action="store_true", 

105 ) 

106 parser.add_argument( 

107 "--prefix", 

108 type=str, 

109 help="Prefix to prepend to each affected post-injection dataset type name.", 

110 default="injected_", 

111 ) 

112 parser.add_argument( 

113 "--instrument", 

114 type=str, 

115 help="Add instrument overrides. Must be a fully qualified class name.", 

116 metavar="instrument", 

117 ) 

118 parser.add_argument( 

119 "-c", 

120 "--config", 

121 type=str, 

122 help="Config override for a task, in the format 'label:key=value'.", 

123 action="append", 

124 ) 

125 parser.add_argument( 

126 "-a", 

127 "--additional-pipelines", 

128 type=str, 

129 help="Location(s) of additional input pipeline definition YAML file(s)." 

130 "Tasks from these additional pipelines will be added to the output injection pipeline.", 

131 metavar="FILE", 

132 nargs="+", 

133 ) 

134 parser.add_argument( 

135 "-s", 

136 "--additional-subset", 

137 type=str, 

138 help="Subset for additional tasks, in the form 'name[:description]'." 

139 "All tasks from any additional pipelines will be added into this subset." 

140 "The subset will be created if it does not already exist." 

141 "Description text is optional, and ignored if the subset already exists." 

142 "This argument can be specified multiple times to add multiple subsets.", 

143 metavar="TEXT", 

144 action="append", 

145 ) 

146 parser.add_argument( 

147 "-h", 

148 "--help", 

149 action="help", 

150 help="Show this help message and exit.", 

151 ) 

152 return parser 

153 

154 

155def main(): 

156 """Use this as the main entry point when calling from the command line.""" 

157 # Set up logging. 

158 tz = time.strftime("%z") 

159 logging.basicConfig( 

160 format="%(levelname)s %(asctime)s.%(msecs)03d" + tz + " - %(message)s", 

161 datefmt="%Y-%m-%dT%H:%M:%S", 

162 ) 

163 logger = logging.getLogger(__name__) 

164 logger.setLevel(logging.DEBUG) 

165 

166 args = build_argparser().parse_args() 

167 kwargs = {k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]} 

168 

169 if hasattr(args, "filename"): 

170 if os.path.exists(args.filename): 

171 if not hasattr(args, "overwrite"): 

172 raise RuntimeError(f"File {args.filename} already exists; use --overwrite to write anyway.") 

173 else: 

174 logger.warning("File %s already exists; overwriting.", args.filename) 

175 pipeline = make_injection_pipeline(**kwargs) 

176 pipeline.write_to_uri(args.filename) 

177 logger.info( 

178 "Modified pipeline definition YAML file saved at %s.", 

179 os.path.realpath(args.filename), 

180 ) 

181 else: 

182 pipeline = make_injection_pipeline(**kwargs) 

183 print("\n", pipeline, sep="")