Coverage for python/lsst/ctrl/bps/panda/utils.py: 5%

505 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 08:49 +0000

1# This file is part of ctrl_bps_panda. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Utilities for bps PanDA plugin.""" 

29 

30__all__ = [ 

31 "add_decoder_prefix", 

32 "aggregate_by_basename", 

33 "convert_exec_string_to_hex", 

34 "copy_files_for_distribution", 

35 "extract_taskname", 

36 "get_idds_client", 

37 "get_idds_result", 

38 "idds_call_with_check", 

39] 

40 

41import binascii 

42import json 

43import logging 

44import os 

45import random 

46import re 

47import tarfile 

48import time 

49import uuid 

50 

51import idds.common.utils as idds_utils 

52import pandaclient.idds_api 

53from idds.doma.workflowv2.domapandawork import DomaPanDAWork 

54from idds.workflowv2.workflow import AndCondition 

55from idds.workflowv2.workflow import Workflow as IDDS_client_workflow 

56 

57from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob, WmsStates 

58from lsst.ctrl.bps.panda.cmd_line_embedder import CommandLineEmbedder 

59from lsst.ctrl.bps.panda.constants import ( 

60 PANDA_DEFAULT_CLOUD, 

61 PANDA_DEFAULT_CORE_COUNT, 

62 PANDA_DEFAULT_MAX_ATTEMPTS, 

63 PANDA_DEFAULT_MAX_JOBS_PER_TASK, 

64 PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB, 

65 PANDA_DEFAULT_MAX_WALLTIME, 

66 PANDA_DEFAULT_NAME_LENGTH, 

67 PANDA_DEFAULT_ORDER_ID_MAP_FILE, 

68 PANDA_DEFAULT_PRIORITY, 

69 PANDA_DEFAULT_PROCESSING_TYPE, 

70 PANDA_DEFAULT_PROD_SOURCE_LABEL, 

71 PANDA_DEFAULT_RSS, 

72 PANDA_DEFAULT_RSS_MAX, 

73 PANDA_DEFAULT_TASK_TYPE, 

74 PANDA_DEFAULT_VO, 

75) 

76from lsst.resources import ResourcePath 

77 

78_LOG = logging.getLogger(__name__) 

79 

80 

81def extract_taskname(s: str) -> str: 

82 """Extract the task name from a string that follows a pattern 

83 CampaignName_timestamp_TaskNumber_TaskLabel_ChunkNumber. 

84 

85 Parameters 

86 ---------- 

87 s : `str` 

88 The input string from which to extract the task name. 

89 

90 Returns 

91 ------- 

92 taskname : `str` 

93 The extracted task name as per the rules above. 

94 """ 

95 # remove surrounding quotes/spaces if present 

96 s = s.strip().strip("'\"") 

97 

98 # find all occurrences of underscore + digits + underscore, 

99 # take the last one 

100 matches = re.findall(r"_(\d+)_", s) 

101 if matches: 

102 last_number = matches[-1] 

103 last_pos = s.rfind(f"_{last_number}_") + len(f"_{last_number}_") 

104 taskname = s[last_pos:] 

105 return taskname 

106 

107 # fallback: if no such pattern, return everything 

108 taskname = s 

109 return taskname 

110 

111 

112def aggregate_by_basename(job_summary, exit_code_summary, run_summary): 

113 """Aggregate job exit code and run summaries by 

114 their base label (basename). 

115 

116 Parameters 

117 ---------- 

118 job_summary : `dict` [`str`, `dict` [`str`, `int`]] 

119 A mapping of job labels to state-count mappings. 

120 exit_code_summary : `dict` [`str`, `list` [`int`]] 

121 A mapping of job labels to lists of exit codes. 

122 run_summary : `str` 

123 A semicolon-separated string of job summaries 

124 where each entry has the format "<label>:<count>". 

125 

126 Returns 

127 ------- 

128 aggregated_jobs : `dict` [`str`, `dict` [`str`, `int`]] 

129 A dictionary mapping each base label to the summed job state counts 

130 across all matching labels. 

131 aggregated_exits : `dict` [`str`, `list` [`int`]] 

132 A dictionary mapping each base label to a combined list of exit codes 

133 from all matching labels. 

134 aggregated_run : `str` 

135 A semicolon-separated string with aggregated job counts by base label. 

136 """ 

137 

138 def base_label(label): 

139 return re.sub(r"_\d+$", "", label) 

140 

141 aggregated_jobs = {} 

142 aggregated_exits = {} 

143 

144 for label, states in job_summary.items(): 

145 base = base_label(label) 

146 if base not in aggregated_jobs: 

147 aggregated_jobs[base] = dict.fromkeys(WmsStates, 0) 

148 for state, count in states.items(): 

149 aggregated_jobs[base][state] += count 

150 

151 for label, codes in exit_code_summary.items(): 

152 base = base_label(label) 

153 aggregated_exits.setdefault(base, []).extend(codes) 

154 

155 aggregated = {} 

156 for entry in run_summary.split(";"): 

157 entry = entry.strip() 

158 if not entry: 

159 continue 

160 try: 

161 label, num = entry.split(":") 

162 num = int(num) 

163 except ValueError: 

164 continue 

165 

166 base = base_label(label) 

167 aggregated[base] = aggregated.get(base, 0) + num 

168 

169 aggregated_run = ";".join(f"{base}:{count}" for base, count in aggregated.items()) 

170 return aggregated_jobs, aggregated_exits, aggregated_run 

171 

172 

173def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_workers): 

174 """Brings locally generated files into Cloud for further 

175 utilization them on the edge nodes. 

176 

177 Parameters 

178 ---------- 

179 files_to_stage : `dict` [`str`, `str`] 

180 Files which need to be copied to a workflow staging area. 

181 file_distribution_uri : `ResourcePath` 

182 Path on the edge node accessed storage, 

183 including access protocol, bucket name to place files. 

184 max_copy_workers : `int` 

185 Maximum number of workers for copying files. Present for API 

186 compatibility; worker selection is handled internally by 

187 `ResourcePath.mtransfer`. 

188 

189 Raises 

190 ------ 

191 ExceptionGroup 

192 Raised by `ResourcePath.mtransfer` when one or more transfers fail. 

193 RuntimeError 

194 Raised if a copied file is not found at the distribution point after 

195 transfer completes. 

196 """ 

197 files_to_copy = {} 

198 

199 # In case there are folders we iterate over its content 

200 for local_pfn in files_to_stage.values(): 

201 folder_name = os.path.basename(os.path.normpath(local_pfn)) 

202 if os.path.isdir(local_pfn): 

203 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True) 

204 files_in_folder = ResourcePath.findFileResources([local_pfn]) 

205 for file in files_in_folder: 

206 file_name = file.basename() 

207 files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False) 

208 else: 

209 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False) 

210 files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri 

211 

212 for src, trgt in files_to_copy.items(): 

213 _LOG.info("Staging %s to %s", src, trgt) 

214 results = ResourcePath.mtransfer("copy", files_to_copy.items()) 

215 

216 for trgt in results: 

217 if not trgt.exists(): 

218 raise RuntimeError(f"File was not copied to the distribution point: {trgt}") 

219 

220 

221def get_idds_client(config): 

222 """Get the idds client. 

223 

224 Parameters 

225 ---------- 

226 config : `lsst.ctrl.bps.BpsConfig` 

227 BPS configuration. 

228 

229 Returns 

230 ------- 

231 idds_client: `idds.client.clientmanager.ClientManager` 

232 The iDDS ClientManager object. 

233 """ 

234 idds_server = None 

235 if isinstance(config, BpsConfig): 

236 _, idds_server = config.search("iddsServer", opt={"default": None}) 

237 elif isinstance(config, dict) and "iddsServer" in config: 

238 idds_server = config["iddsServer"] 

239 # if idds_server is None, a default value on the panda relay service 

240 # will be used 

241 idds_client = pandaclient.idds_api.get_api( 

242 idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True 

243 ) 

244 return idds_client 

245 

246 

247def get_idds_result(ret): 

248 """Parse the results returned from iDDS. 

249 

250 Parameters 

251 ---------- 

252 ret : `tuple` [`int`, `tuple` [`bool`, payload ]] 

253 The first part ``ret[0]`` is the status of PanDA relay service. 

254 The part of ``ret[1][0]`` is the status of iDDS service. 

255 The part of ``ret[1][1]`` is the returned payload. 

256 If ``ret[1][0]`` is `False`, ``ret[1][1]`` can be error messages. 

257 

258 Returns 

259 ------- 

260 status: `bool` 

261 The status of iDDS calls. 

262 result: `int` or `list` or `dict` or `None` 

263 The result returned from iDDS. `None` if error state. 

264 error: `str` or `None` 

265 Error messages. `None` if no error state. 

266 """ 

267 # https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html 

268 if not isinstance(ret, list | tuple) or ret[0] != 0: 

269 # Something wrong with the PanDA relay service. 

270 # The call may not be delivered to iDDS. 

271 status = False 

272 result = None 

273 error = f"PanDA relay service returns errors: {ret}" 

274 else: 

275 if ret[1][0]: 

276 status = True 

277 result = ret[1][1] 

278 error = None 

279 if isinstance(result, str) and "Authentication no permission" in result: 

280 status = False 

281 result = None 

282 error = result 

283 else: 

284 # iDDS returns errors 

285 status = False 

286 result = None 

287 error = f"iDDS returns errors: {ret[1][1]}" 

288 return status, result, error 

289 

290 

291def idds_call_with_check(func, *, func_name: str, request_id: int, **kwargs): 

292 """Call an iDDS client function, log, and check the return code. 

293 

294 Parameters 

295 ---------- 

296 func : `~collections.abc.Callable` 

297 The iDDS client function to call. 

298 func_name : `str` 

299 Name used for logging. 

300 request_id : `int` 

301 The request or workflow ID. 

302 **kwargs 

303 Additional keyword arguments passed to the function. 

304 

305 Returns 

306 ------- 

307 ret : `~typing.Any` 

308 The return value from the iDDS client function. 

309 """ 

310 call_kwargs = dict(kwargs) 

311 if request_id is not None: 

312 call_kwargs["request_id"] = request_id 

313 

314 ret = func(**call_kwargs) 

315 

316 _LOG.debug("PanDA %s returned = %s", func_name, str(ret)) 

317 

318 request_status = ret[0] 

319 if request_status != 0: 

320 raise RuntimeError(f"Error calling {func_name}: {ret} for id: {request_id}") 

321 

322 return ret 

323 

324 

325def _make_pseudo_filename(config, gwjob): 

326 """Make the job pseudo filename. 

327 

328 Parameters 

329 ---------- 

330 config : `lsst.ctrl.bps.BpsConfig` 

331 BPS configuration. 

332 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

333 Job for which to create the pseudo filename. 

334 

335 Returns 

336 ------- 

337 pseudo_filename : `str` 

338 The pseudo filename for the given job. 

339 """ 

340 cmd_line_embedder = CommandLineEmbedder(config) 

341 _, pseudo_filename = cmd_line_embedder.substitute_command_line( 

342 gwjob.executable.src_uri + " " + gwjob.arguments, gwjob.cmdvals, gwjob.name, [] 

343 ) 

344 return pseudo_filename 

345 

346 

347def _make_doma_work( 

348 config, 

349 generic_workflow, 

350 gwjob, 

351 task_count, 

352 task_chunk, 

353 enable_event_service=False, 

354 enable_job_name_map=False, 

355 order_id_map_files=None, 

356 es_label=None, 

357 max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB, 

358 max_wms_job_wall_time=None, 

359 remote_filename=None, 

360 qnode_map_filename=None, 

361): 

362 """Make the DOMA Work object for a PanDA task. 

363 

364 Parameters 

365 ---------- 

366 config : `lsst.ctrl.bps.BpsConfig` 

367 BPS configuration. 

368 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

369 The workflow. 

370 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

371 Job representing the jobs for the PanDA task. 

372 task_count : `int` 

373 Count of PanDA tasks used when making unique names. 

374 task_chunk : `int` 

375 Count of chunk of a PanDA tasks used when making unique names. 

376 enable_event_service : `bool`, optional 

377 ???. 

378 enable_job_name_map : `bool`, optional 

379 ???. 

380 order_id_map_files : `typing.Any`, optional 

381 ???. 

382 es_label : `typing.Any`, optional 

383 ???. 

384 max_payloads_per_panda_job : `int`, optional 

385 ???. 

386 max_wms_job_wall_time : `typing.Any`, optional 

387 ???. 

388 remote_filename : `typing.Any`, optional 

389 ???. 

390 qnode_map_filename : `typing.Any`, optional 

391 ???. 

392 

393 Returns 

394 ------- 

395 work : `idds.doma.workflowv2.domapandawork.DomaPanDAWork` 

396 The client representation of a PanDA task. 

397 local_pfns : `dict` [`str`, `str`] 

398 Files which need to be copied to a workflow staging area. 

399 """ 

400 if order_id_map_files is None: 

401 order_id_map_files = {} 

402 _LOG.debug("Using gwjob %s to create new PanDA task (gwjob=%s)", gwjob.name, gwjob) 

403 cvals = {"curr_cluster": gwjob.label} 

404 _, site = config.search("computeSite", opt={"curvals": cvals, "required": True}) 

405 cvals["curr_site"] = site 

406 cvals["curr_pipetask"] = gwjob.label 

407 _, processing_type = config.search( 

408 "processingType", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROCESSING_TYPE} 

409 ) 

410 if gwjob.label in ["finalJob", "customJob"]: 

411 _, nonpipetask = config.search(gwjob.label) 

412 default_type = "Rubin_Merge" 

413 if gwjob.label == "customJob": 

414 default_type = PANDA_DEFAULT_PROCESSING_TYPE 

415 processing_type = nonpipetask["processingType"] if nonpipetask["processingType"] else default_type 

416 _, task_type = config.search("taskType", opt={"curvals": cvals, "default": PANDA_DEFAULT_TASK_TYPE}) 

417 _, prod_source_label = config.search( 

418 "prodSourceLabel", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROD_SOURCE_LABEL} 

419 ) 

420 _, vo = config.search("vo", opt={"curvals": cvals, "default": PANDA_DEFAULT_VO}) 

421 

422 _, file_distribution_end_point = config.search( 

423 "fileDistributionEndPoint", opt={"curvals": cvals, "default": None} 

424 ) 

425 

426 _, file_distribution_end_point_default = config.search( 

427 "fileDistributionEndPointDefault", opt={"curvals": cvals, "default": None} 

428 ) 

429 

430 task_rss = gwjob.request_memory if gwjob.request_memory else PANDA_DEFAULT_RSS 

431 task_rss_retry_step = task_rss * gwjob.memory_multiplier if gwjob.memory_multiplier else 0 

432 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss 

433 

434 # Assume input files are same across task 

435 local_pfns = {} 

436 direct_io_files = set() 

437 

438 if gwjob.executable.transfer_executable: 

439 local_pfns["job_executable"] = gwjob.executable.src_uri 

440 job_executable = f"./{os.path.basename(gwjob.executable.src_uri)}" 

441 else: 

442 job_executable = gwjob.executable.src_uri 

443 cmd_line_embedder = CommandLineEmbedder(config) 

444 _LOG.debug( 

445 "job %s inputs = %s, outputs = %s", 

446 gwjob.name, 

447 generic_workflow.get_job_inputs(gwjob.name), 

448 generic_workflow.get_job_outputs(gwjob.name), 

449 ) 

450 

451 job_env = "" 

452 if gwjob.environment: 

453 for key, value in gwjob.environment.items(): 

454 try: 

455 sub_value = value.format_map(gwjob.cmdvals) 

456 except (KeyError, TypeError) as exc: 

457 _LOG.error("Could not replace command variables: replacement for %s not provided", str(exc)) 

458 raise 

459 job_env += f"export {key}={sub_value}; " 

460 

461 cmd_line, _ = cmd_line_embedder.substitute_command_line( 

462 job_env + job_executable + " " + gwjob.arguments, 

463 gwjob.cmdvals, 

464 gwjob.name, 

465 generic_workflow.get_job_inputs(gwjob.name) + generic_workflow.get_job_outputs(gwjob.name), 

466 ) 

467 

468 my_log = f"enable_event_service {enable_event_service} for {gwjob.label}" 

469 _LOG.info(my_log) 

470 if enable_event_service: 

471 if gwjob.request_walltime and max_wms_job_wall_time: 

472 my_log = ( 

473 f"requestWalltime({gwjob.request_walltime}) " 

474 f"and maxWmsJobWalltime({max_wms_job_wall_time}) are set, " 

475 "max_payloads_per_panda_job is int(max_wms_job_wall_time / gwjob.request_walltime), " 

476 "ignore maxPayloadsPerPandaJob." 

477 ) 

478 _LOG.info(my_log) 

479 max_payloads_per_panda_job = int(max_wms_job_wall_time / gwjob.request_walltime) 

480 if max_payloads_per_panda_job < 2: 

481 my_log = ( 

482 f"max_payloads_per_panda_job ({max_payloads_per_panda_job}) is too small, " 

483 "disable EventService" 

484 ) 

485 _LOG.info(my_log) 

486 enable_event_service = False 

487 

488 maxwalltime = gwjob.request_walltime if gwjob.request_walltime else PANDA_DEFAULT_MAX_WALLTIME 

489 if enable_event_service: 

490 if gwjob.request_walltime and max_payloads_per_panda_job: 

491 maxwalltime = gwjob.request_walltime * max_payloads_per_panda_job 

492 elif max_wms_job_wall_time: 

493 maxwalltime = max_wms_job_wall_time 

494 

495 if enable_event_service or enable_job_name_map: 

496 for es_name in order_id_map_files: 

497 local_pfns[es_name] = order_id_map_files[es_name] 

498 

499 for gwfile in generic_workflow.get_job_inputs(gwjob.name, transfer_only=True): 

500 local_pfns[gwfile.name] = gwfile.src_uri 

501 if os.path.isdir(gwfile.src_uri): 

502 # this is needed to make isdir function working 

503 # properly in ButlerURL instance on the edge node 

504 local_pfns[gwfile.name] += "/" 

505 

506 if gwfile.job_access_remote: 

507 direct_io_files.add(gwfile.name) 

508 

509 if qnode_map_filename: 

510 local_pfns.update(qnode_map_filename) 

511 

512 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False) 

513 

514 if not direct_io_files: 

515 if submit_cmd: 

516 direct_io_files.add(remote_filename) 

517 else: 

518 direct_io_files.add("cmdlineplaceholder") 

519 

520 lsst_temp = "LSST_RUN_TEMP_SPACE" 

521 if lsst_temp in file_distribution_end_point and lsst_temp not in os.environ: 

522 file_distribution_end_point = file_distribution_end_point_default 

523 if submit_cmd and not file_distribution_end_point: 

524 file_distribution_end_point = "FileDistribution" 

525 

526 executable = add_decoder_prefix( 

527 config, cmd_line, file_distribution_end_point, (local_pfns, direct_io_files) 

528 ) 

529 work = DomaPanDAWork( 

530 executable=executable, 

531 primary_input_collection={ 

532 "scope": "pseudo_dataset", 

533 "name": f"pseudo_input_collection#{task_count}", 

534 }, 

535 output_collections=[{"scope": "pseudo_dataset", "name": f"pseudo_output_collection#{task_count}"}], 

536 log_collections=[], 

537 dependency_map=[], 

538 task_name=f"{generic_workflow.name}_{task_count:02d}_{gwjob.label}_{task_chunk:02d}", 

539 task_queue=gwjob.queue, 

540 task_log={ 

541 "destination": "local", 

542 "value": "log.tgz", 

543 "dataset": "PandaJob_#{pandaid}/", 

544 "token": "local", 

545 "param_type": "log", 

546 "type": "template", 

547 }, 

548 encode_command_line=True, 

549 task_rss=task_rss, 

550 task_rss_retry_offset=task_rss_retry_offset, 

551 task_rss_retry_step=task_rss_retry_step, 

552 task_rss_max=gwjob.request_memory_max if gwjob.request_memory_max else PANDA_DEFAULT_RSS_MAX, 

553 task_cloud=gwjob.compute_cloud if gwjob.compute_cloud else PANDA_DEFAULT_CLOUD, 

554 task_site=site, 

555 task_priority=int(gwjob.priority) if gwjob.priority else PANDA_DEFAULT_PRIORITY, 

556 core_count=gwjob.request_cpus if gwjob.request_cpus else PANDA_DEFAULT_CORE_COUNT, 

557 working_group=gwjob.accounting_group, 

558 processing_type=processing_type, 

559 task_type=task_type, 

560 prodSourceLabel=prod_source_label, 

561 vo=vo, 

562 es=enable_event_service, 

563 es_label=es_label, 

564 max_events_per_job=max_payloads_per_panda_job, 

565 maxattempt=gwjob.number_of_retries if gwjob.number_of_retries else PANDA_DEFAULT_MAX_ATTEMPTS, 

566 maxwalltime=maxwalltime, 

567 ) 

568 return work, local_pfns 

569 

570 

571def add_final_idds_work( 

572 config, generic_workflow, idds_client_workflow, dag_sink_work, task_count, task_chunk 

573): 

574 """Add the special final PanDA task to the client workflow. 

575 

576 Parameters 

577 ---------- 

578 config : `lsst.ctrl.bps.BpsConfig` 

579 BPS configuration. 

580 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

581 Generic workflow in which to find the final job. 

582 idds_client_workflow : `idds.workflowv2.workflow.Workflow` 

583 The iDDS client representation of the workflow to which the final task 

584 is added. 

585 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`] 

586 The work nodes in the client workflow which have no successors. 

587 task_count : `int` 

588 Count of PanDA tasks used when making unique names. 

589 task_chunk : `int` 

590 Count of chunk of a PanDA tasks used when making unique names. 

591 

592 Returns 

593 ------- 

594 files : `dict` [`str`, `str`] 

595 Files which need to be copied to a workflow staging area. 

596 

597 Raises 

598 ------ 

599 NotImplementedError 

600 Raised if final job in GenericWorkflow is itself a workflow. 

601 TypeError 

602 Raised if final job in GenericWorkflow is invalid type. 

603 """ 

604 files = {} 

605 

606 # If final job exists in generic workflow, create DAG final job 

607 final = generic_workflow.get_final() 

608 if final: 

609 if isinstance(final, GenericWorkflow): 

610 raise NotImplementedError("PanDA plugin does not support a workflow as the final job") 

611 

612 if not isinstance(final, GenericWorkflowJob): 

613 raise TypeError(f"Invalid type for GenericWorkflow.get_final() results ({type(final)})") 

614 

615 dag_final_work, files = _make_doma_work( 

616 config, 

617 generic_workflow, 

618 final, 

619 task_count, 

620 task_chunk, 

621 ) 

622 pseudo_filename = "pure_pseudoinput+qgraphNodeId:+qgraphId:" 

623 dag_final_work.dependency_map.append( 

624 {"name": pseudo_filename, "submitted": False, "dependencies": []} 

625 ) 

626 idds_client_workflow.add_work(dag_final_work) 

627 conditions = [] 

628 for work in dag_sink_work: 

629 conditions.append(work.is_terminated) 

630 and_cond = AndCondition(conditions=conditions, true_works=[dag_final_work]) 

631 idds_client_workflow.add_condition(and_cond) 

632 else: 

633 _LOG.debug("No final job in GenericWorkflow") 

634 return files 

635 

636 

637def convert_exec_string_to_hex(cmdline): 

638 """Convert the command line into hex representation. 

639 

640 This step is currently involved because large blocks of command lines 

641 including special symbols passed to the pilot/container. To make sure 

642 the 1 to 1 matching and pass by the special symbol stripping 

643 performed by the Pilot we applied the hexing. 

644 

645 Parameters 

646 ---------- 

647 cmdline : `str` 

648 UTF-8 command line string. 

649 

650 Returns 

651 ------- 

652 hex : `str` 

653 Hex representation of string. 

654 """ 

655 return binascii.hexlify(cmdline.encode()).decode("utf-8") 

656 

657 

658def add_decoder_prefix(config, cmd_line, distribution_path, files): 

659 """Compose the command line sent to the pilot from the functional part 

660 (the actual SW running) and the middleware part (containers invocation). 

661 

662 Parameters 

663 ---------- 

664 config : `lsst.ctrl.bps.BpsConfig` 

665 Configuration information. 

666 cmd_line : `str` 

667 UTF-8 based functional part of the command line. 

668 distribution_path : `str` 

669 URI of path where all files are located for distribution. 

670 files : `tuple` [`dict` [`str`, `str`], `list` [`str`]] 

671 File names needed for a task (copied local, direct access). 

672 

673 Returns 

674 ------- 

675 decoder_prefix : `str` 

676 Full command line to be executed on the edge node. 

677 """ 

678 # Manipulate file paths for placement on cmdline 

679 files_plc_hldr = {} 

680 for key, pfn in files[0].items(): 

681 if pfn.endswith("/"): 

682 files_plc_hldr[key] = os.path.basename(pfn[:-1]) 

683 isdir = True 

684 else: 

685 files_plc_hldr[key] = os.path.basename(pfn) 

686 _, extension = os.path.splitext(pfn) 

687 isdir = os.path.isdir(pfn) or (key == "butlerConfig" and extension != "yaml") 

688 if isdir: 

689 # this is needed to make isdir function working 

690 # properly in ButlerURL instance on the egde node 

691 files_plc_hldr[key] += "/" 

692 _LOG.debug("files_plc_hldr[%s] = %s", key, files_plc_hldr[key]) 

693 

694 cmdline_hex = convert_exec_string_to_hex(cmd_line) 

695 _, runner_command = config.search("runnerCommand", opt={"replaceEnvVars": False, "expandEnvVars": False}) 

696 order_id_map_filename = files[0].get("orderIdMapFilename", None) 

697 if order_id_map_filename: 

698 order_id_map_filename = os.path.basename(order_id_map_filename) 

699 order_id_map_filename = os.path.join(distribution_path, order_id_map_filename) 

700 runner_command = runner_command.replace("orderIdMapFilename", order_id_map_filename) 

701 runner_command = runner_command.replace("\n", " ") 

702 decoder_prefix = runner_command.replace( 

703 "_cmd_line_", 

704 str(cmdline_hex) 

705 + " ${IN/L} " 

706 + distribution_path 

707 + " " 

708 + "+".join(f"{k}:{v}" for k, v in files_plc_hldr.items()) 

709 + " " 

710 + "+".join(files[1]), 

711 ) 

712 return decoder_prefix 

713 

714 

715def add_idds_work(config, generic_workflow, idds_workflow): 

716 """Convert GenericWorkflowJobs to iDDS work and add them to the iDDS 

717 workflow. 

718 

719 Parameters 

720 ---------- 

721 config : `lsst.ctrl.bps.BpsConfig` 

722 BPS configuration. 

723 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

724 Generic workflow containing jobs to convert. 

725 idds_workflow : `idds.workflowv2.workflow.Workflow` 

726 The iDDS workflow to which the converted jobs should be added. 

727 

728 Returns 

729 ------- 

730 files_to_pre_stage : `dict` [`str`, `str`] 

731 Files that need to be copied to the staging area before submission. 

732 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`] 

733 The work nodes in the client workflow which have no successors. 

734 task_count : `int` 

735 Number of tasks in iDDS workflow used for unique task names. 

736 

737 Raises 

738 ------ 

739 RuntimeError 

740 If cannot recover from dependency issues after pass through workflow. 

741 """ 

742 # event service 

743 _, enable_event_service = config.search("enableEventService", opt={"default": None}) 

744 _, enable_qnode_map = config.search("enableQnodeMap", opt={"default": None}) 

745 _, max_payloads_per_panda_job = config.search( 

746 "maxPayloadsPerPandaJob", opt={"default": PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB} 

747 ) 

748 _, max_wms_job_wall_time = config.search("maxWmsJobWalltime", opt={"default": None}) 

749 my_log = ( 

750 f"enableEventService: {enable_event_service}, maxPayloadsPerPandaJob: {max_payloads_per_panda_job}" 

751 ) 

752 _LOG.info(my_log) 

753 

754 # job name map: Use a short job name to map the long job name 

755 _, enable_job_name_map = config.search("enableJobNameMap", opt={"default": None}) 

756 _LOG.info(f"enable_job_name_map: {enable_job_name_map}, {type(enable_job_name_map)}") 

757 if enable_event_service and not enable_job_name_map: 

758 enable_job_name_map = True 

759 my_log = "enable_event_service is set, set enable_job_name_map True." 

760 _LOG.info(my_log) 

761 

762 # Limit number of jobs in single PanDA task 

763 _, max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": PANDA_DEFAULT_MAX_JOBS_PER_TASK}) 

764 

765 files_to_pre_stage = {} 

766 dag_sink_work = [] # Workflow sink nodes that need to be connected to final task 

767 job_to_task = {} 

768 job_to_pseudo_filename = {} 

769 task_count = 0 # Task number/ID in idds workflow used for unique name 

770 remote_archive_filename = None 

771 

772 submit_path = config["submitPath"] 

773 

774 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False) 

775 if submit_cmd: 

776 files = generic_workflow.get_executables(data=False, transfer_only=True) 

777 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz" 

778 archive_filename = create_archive_file(submit_path, archive_filename, files) 

779 remote_archive_filename = copy_files_to_pandacache(archive_filename) 

780 

781 order_id_map_files = {} 

782 name_works = {} 

783 order_id_map = {} 

784 job_name_to_order_id_map = {} 

785 order_id_map_file = None 

786 max_payloads_per_panda_job_by_label = {} 

787 if enable_event_service: 

788 enable_event_service = enable_event_service.split(",") 

789 enable_event_service_tmp = [] 

790 for es_def in enable_event_service: 

791 if ":" in es_def: 

792 es_label, m_payloads = es_def.split(":") 

793 else: 

794 es_label, m_payloads = es_def, max_payloads_per_panda_job 

795 es_label = es_label.strip() 

796 enable_event_service_tmp.append(es_label) 

797 max_payloads_per_panda_job_by_label[es_label] = int(m_payloads) 

798 enable_event_service = enable_event_service_tmp 

799 if enable_job_name_map: 

800 _, order_id_map_filename = config.search( 

801 "orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE} 

802 ) 

803 order_id_map_file = os.path.join(submit_path, order_id_map_filename) 

804 order_id_map_files = {"orderIdMapFilename": order_id_map_file} 

805 files_to_pre_stage.update(order_id_map_files) 

806 

807 # To avoid dying due to optimizing number of times through workflow, 

808 # catch dependency issues to loop through again later. 

809 jobs_with_dependency_issues = {} 

810 

811 # Initialize quantum node map 

812 qnode_map = {} 

813 qnode_map_filename = None 

814 if enable_qnode_map: 

815 qnode_map_file = os.path.join(submit_path, "qnode_map.json") 

816 qnode_map_filename = {"qnodemap": qnode_map_file} 

817 files_to_pre_stage.update(qnode_map_filename) 

818 

819 # Assume jobs with same label share config values 

820 for job_label in generic_workflow.labels: 

821 _LOG.debug("job_label = %s", job_label) 

822 

823 if enable_job_name_map: 

824 order_id_map[job_label] = {} 

825 job_name_to_order_id_map[job_label] = {} 

826 

827 # Add each job with a particular label to a corresponding PanDA task 

828 # A PanDA task has a limit on number of jobs, so break into multiple 

829 # PanDA tasks if needed. 

830 job_count = 0 # Number of jobs in idds task used for task chunking 

831 task_chunk = 1 # Task chunk number within job label used for unique name 

832 work = None 

833 order_id = -1 

834 

835 # Instead of changing code to make chunks up front and round-robin 

836 # assign jobs to chunks, for now keeping chunk creation in loop 

837 # but using knowledge of how many chunks there will be to set better 

838 # maximum number of jobs in a chunk for more even distribution. 

839 jobs_by_label = generic_workflow.get_jobs_by_label(job_label) 

840 num_chunks = -(-len(jobs_by_label) // max_jobs_per_task) # ceil 

841 max_jobs_per_task_this_label = -(-len(jobs_by_label) // num_chunks) 

842 _LOG.debug( 

843 "For job_label = %s, num jobs = %s, num_chunks = %s, max_jobs = %s", 

844 job_label, 

845 len(jobs_by_label), 

846 num_chunks, 

847 max_jobs_per_task_this_label, 

848 ) 

849 for gwjob in jobs_by_label: 

850 order_id += 1 

851 pseudo_filename = _make_pseudo_filename(config, gwjob) 

852 job_to_pseudo_filename[gwjob.name] = pseudo_filename 

853 if enable_job_name_map: 

854 order_id_map[job_label][str(order_id)] = pseudo_filename 

855 job_name_to_order_id_map[job_label][gwjob.name] = str(order_id) 

856 

857 job_count += 1 

858 if job_count > max_jobs_per_task_this_label: 

859 job_count = 1 

860 task_chunk += 1 

861 

862 if job_count == 1: 

863 # Create new PanDA task object 

864 task_count += 1 

865 work_enable_event_service = False 

866 if enable_event_service and job_label in enable_event_service: 

867 work_enable_event_service = True 

868 max_payloads_per_panda_job_current = max_payloads_per_panda_job_by_label.get( 

869 job_label, max_payloads_per_panda_job 

870 ) 

871 work, files = _make_doma_work( 

872 config, 

873 generic_workflow, 

874 gwjob, 

875 task_count, 

876 task_chunk, 

877 enable_event_service=work_enable_event_service, 

878 enable_job_name_map=enable_job_name_map, 

879 order_id_map_files=order_id_map_files, 

880 es_label=job_label, 

881 max_payloads_per_panda_job=max_payloads_per_panda_job_current, 

882 max_wms_job_wall_time=max_wms_job_wall_time, 

883 remote_filename=remote_archive_filename, 

884 qnode_map_filename=qnode_map_filename, 

885 ) 

886 work.dependency_tasks = [] 

887 name_works[work.task_name] = work 

888 files_to_pre_stage.update(files) 

889 idds_workflow.add_work(work) 

890 if generic_workflow.out_degree(gwjob.name) == 0: 

891 dag_sink_work.append(work) 

892 

893 if enable_qnode_map: 

894 job_name_PH = "PH:" + gwjob.name 

895 job_to_pseudo_filename[gwjob.name] = job_name_PH 

896 qnode_map[job_name_PH] = pseudo_filename 

897 

898 job_to_task[gwjob.name] = work.get_work_name() 

899 deps = [] 

900 missing_deps = False 

901 for parent_job_name in generic_workflow.predecessors(gwjob.name): 

902 if parent_job_name not in job_to_task: 

903 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys()) 

904 missing_deps = True 

905 break 

906 else: 

907 if enable_job_name_map: 

908 parent_job = generic_workflow.get_job(parent_job_name) 

909 parent_job_label = parent_job.label 

910 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name] 

911 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}" 

912 else: 

913 inputname = job_to_pseudo_filename[parent_job_name] 

914 

915 parent_task_name = job_to_task[parent_job_name] 

916 deps.append( 

917 { 

918 "task": parent_task_name, 

919 "inputname": inputname, 

920 } 

921 ) 

922 if parent_task_name not in work.dependency_tasks: 

923 work.dependency_tasks.append(parent_task_name) 

924 if not missing_deps: 

925 j_name = job_to_pseudo_filename[gwjob.name] 

926 f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else j_name 

927 work.dependency_map.append( 

928 { 

929 "name": f_name, 

930 "order_id": order_id, 

931 "dependencies": deps, 

932 } 

933 ) 

934 else: 

935 jobs_with_dependency_issues[gwjob.name] = { 

936 "work": work, 

937 "order_id": order_id, 

938 "label": job_label, 

939 } 

940 

941 if enable_qnode_map: 

942 with open(qnode_map_file, "w", encoding="utf-8") as f: 

943 json.dump(qnode_map, f, indent=2) 

944 

945 # If there were any issues figuring out dependencies through earlier loop 

946 if jobs_with_dependency_issues: 

947 _LOG.warning("Could not prepare workflow in single pass. Please notify developers.") 

948 _LOG.info("Trying to recover...") 

949 for job_name, work_item in jobs_with_dependency_issues.items(): 

950 deps = [] 

951 work = work_item["work"] 

952 order_id = work_item["order_id"] 

953 job_label = work_item["label"] 

954 

955 for parent_job_name in generic_workflow.predecessors(job_name): 

956 if parent_job_name not in job_to_task: 

957 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys()) 

958 raise RuntimeError( 

959 "Could not recover from dependency issues ({job_name} missing {parent_job_name})." 

960 ) 

961 if enable_job_name_map: 

962 parent_job = generic_workflow.get_job(parent_job_name) 

963 parent_job_label = parent_job.label 

964 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name] 

965 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}" 

966 else: 

967 inputname = job_to_pseudo_filename[parent_job_name] 

968 

969 parent_task_name = job_to_task[parent_job_name] 

970 deps.append( 

971 { 

972 "task": parent_task_name, 

973 "inputname": inputname, 

974 } 

975 ) 

976 if parent_task_name not in work.dependency_tasks: 

977 work.dependency_tasks.append(parent_task_name) 

978 

979 work.dependency_map.append( 

980 { 

981 "name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else job_name, 

982 "order_id": order_id, 

983 "dependencies": deps, 

984 } 

985 ) 

986 

987 _LOG.info("Successfully recovered.") 

988 

989 for task_name in name_works: 

990 work = name_works[task_name] 

991 # trigger the setter function which will validate the dependency_map: 

992 # 1) check the name length to avoid the the name too long, 

993 # 2) check to avoid duplicated items. 

994 sorted_dep_map = sorted(work.dependency_map, key=lambda x: x["order_id"]) 

995 work.dependency_map = sorted_dep_map 

996 

997 if enable_job_name_map: 

998 with open(order_id_map_file, "w") as f: 

999 json.dump(order_id_map, f) 

1000 

1001 return files_to_pre_stage, dag_sink_work, task_count 

1002 

1003 

1004def create_archive_file(submit_path, archive_filename, files): 

1005 if not archive_filename.startswith("/"): 

1006 archive_filename = os.path.join(submit_path, archive_filename) 

1007 

1008 with tarfile.open(archive_filename, "w:gz", dereference=True) as tar: 

1009 for local_file in files: 

1010 base_name = os.path.basename(local_file) 

1011 tar.add(local_file, arcname=os.path.basename(base_name)) 

1012 return archive_filename 

1013 

1014 

1015def copy_files_to_pandacache(filename): 

1016 from pandaclient import Client 

1017 

1018 attempt = 0 

1019 max_attempts = 3 

1020 done = False 

1021 while attempt < max_attempts and not done: 

1022 status, out = Client.putFile(filename, True) 

1023 if status == 0: 

1024 done = True 

1025 print(f"copy_files_to_pandacache: status: {status}, out: {out}") 

1026 if out.startswith("NewFileName:"): 

1027 # found the same input sandbox to reuse 

1028 filename = out.split(":")[-1] 

1029 elif out != "True": 

1030 print(out) 

1031 return None 

1032 

1033 filename = os.path.basename(filename) 

1034 cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache") 

1035 filename = os.path.join(cache_path, filename) 

1036 return filename 

1037 

1038 

1039def download_extract_archive(filename, prefix=None): 

1040 """Download and extract the tarball from pandacache. 

1041 

1042 Parameters 

1043 ---------- 

1044 filename : `str` 

1045 The filename to download. 

1046 prefix : `str`, optional 

1047 The target directory the tarball will be downloaded and extracted to. 

1048 If None (default), the current directory will be used. 

1049 """ 

1050 archive_basename = os.path.basename(filename) 

1051 target_dir = prefix if prefix is not None else os.getcwd() 

1052 full_output_filename = os.path.join(target_dir, archive_basename) 

1053 

1054 if filename.startswith("https:"): 

1055 panda_cache_url = os.path.dirname(os.path.dirname(filename)) 

1056 os.environ["PANDACACHE_URL"] = panda_cache_url 

1057 elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ: 

1058 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"] 

1059 panda_cache_url = os.environ.get("PANDACACHE_URL", None) 

1060 print(f"PANDACACHE_URL: {panda_cache_url}") 

1061 

1062 # The import of PanDA client must happen *after* the PANDACACHE_URL is set. 

1063 # Otherwise, the PanDA client the environment setting will not be parsed. 

1064 from pandaclient import Client 

1065 

1066 attempt = 0 

1067 max_attempts = 3 

1068 while attempt < max_attempts: 

1069 status, output = Client.getFile(archive_basename, output_path=full_output_filename) 

1070 if status == 0: 

1071 break 

1072 if attempt <= 1: 

1073 secs = random.randint(1, 10) 

1074 elif attempt <= 2: 

1075 secs = random.randint(1, 60) 

1076 else: 

1077 secs = random.randint(1, 120) 

1078 time.sleep(secs) 

1079 print(f"Download archive file from pandacache status: {status}, output: {output}") 

1080 if status != 0: 

1081 raise RuntimeError("Failed to download archive file from pandacache") 

1082 with tarfile.open(full_output_filename, "r:gz") as f: 

1083 f.extractall(target_dir) 

1084 print(f"Extracted {full_output_filename} to {target_dir}") 

1085 os.remove(full_output_filename) 

1086 print(f"Removed {full_output_filename}") 

1087 

1088 

1089def get_task_parameter(config, remote_build, key): 

1090 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False} 

1091 _, value = remote_build.search(key, search_opt) 

1092 if not value: 

1093 _, value = config.search(key, search_opt) 

1094 return value 

1095 

1096 

1097def create_idds_build_workflow(**kwargs): 

1098 config = kwargs["config"] if "config" in kwargs else None 

1099 remote_build = kwargs["remote_build"] if "remote_build" in kwargs else None 

1100 config_file = kwargs["config_file"] if "config_file" in kwargs else None 

1101 config_file_base = os.path.basename(config_file) if config_file else None 

1102 compute_site = kwargs["compute_site"] if "compute_site" in kwargs else None 

1103 _, files = remote_build.search("files", opt={"default": []}) 

1104 submit_path = config["submitPath"] 

1105 files.append(config_file) 

1106 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz" 

1107 archive_filename = create_archive_file(submit_path, archive_filename, files) 

1108 _LOG.info(f"archive file name: {archive_filename}") 

1109 remote_filename = copy_files_to_pandacache(archive_filename) 

1110 _LOG.info(f"pandacache file: {remote_filename}") 

1111 

1112 _LOG.info(type(remote_build)) 

1113 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False} 

1114 cvals = {"LSST_VERSION": get_task_parameter(config, remote_build, "LSST_VERSION")} 

1115 cvals["custom_lsst_setup"] = get_task_parameter(config, remote_build, "custom_lsst_setup") 

1116 max_name_length = PANDA_DEFAULT_NAME_LENGTH 

1117 if "IDDS_MAX_NAME_LENGTH" in os.environ: 

1118 max_name_length = int(os.environ["IDDS_MAX_NAME_LENGTH"]) 

1119 cvals["IDDS_MAX_NAME_LENGTH"] = max_name_length 

1120 search_opt["curvals"] = cvals 

1121 _, executable = remote_build.search("runnerCommand", opt=search_opt) 

1122 executable = executable.replace("_download_cmd_line_", remote_filename) 

1123 executable = executable.replace("_build_cmd_line_", config_file_base) 

1124 executable = executable.replace("_compute_site_", compute_site or "") 

1125 

1126 task_cloud = get_task_parameter(config, remote_build, "computeCloud") 

1127 task_site = get_task_parameter(config, remote_build, "computeSite") 

1128 task_queue = get_task_parameter(config, remote_build, "queue") 

1129 task_rss = get_task_parameter(config, remote_build, "requestMemory") 

1130 task_rss_max = get_task_parameter(config, remote_build, "requestMemoryMax") 

1131 memory_multiplier = get_task_parameter(config, remote_build, "memoryMultiplier") 

1132 task_rss_retry_step = task_rss * memory_multiplier if memory_multiplier else 0 

1133 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss 

1134 nretries = get_task_parameter(config, remote_build, "numberOfRetries") 

1135 processing_type = get_task_parameter(config, remote_build, "processingType") 

1136 priority = get_task_parameter(config, remote_build, "priority") 

1137 _LOG.info("requestMemory: %s", task_rss) 

1138 _LOG.info("Site: %s", task_site) 

1139 # _LOG.info("executable: %s", executable) 

1140 # TODO: fill other parameters based on config 

1141 build_work = DomaPanDAWork( 

1142 executable=executable, 

1143 task_type="lsst_build", 

1144 primary_input_collection={"scope": "pseudo_dataset", "name": "pseudo_input_collection#1"}, 

1145 output_collections=[{"scope": "pseudo_dataset", "name": "pseudo_output_collection#1"}], 

1146 log_collections=[], 

1147 dependency_map=None, 

1148 task_name="build_task", 

1149 task_queue=task_queue, 

1150 encode_command_line=True, 

1151 prodSourceLabel="managed", 

1152 processing_type=processing_type, 

1153 task_log={ 

1154 "dataset": "PandaJob_#{pandaid}/", 

1155 "destination": "local", 

1156 "param_type": "log", 

1157 "token": "local", 

1158 "type": "template", 

1159 "value": "log.tgz", 

1160 }, 

1161 task_rss=task_rss if task_rss else PANDA_DEFAULT_RSS, 

1162 task_rss_max=task_rss_max if task_rss_max else PANDA_DEFAULT_RSS_MAX, 

1163 task_rss_retry_offset=task_rss_retry_offset, 

1164 task_rss_retry_step=task_rss_retry_step, 

1165 task_cloud=task_cloud, 

1166 task_site=task_site, 

1167 task_priority=int(priority) if priority else PANDA_DEFAULT_PRIORITY, 

1168 maxattempt=nretries if nretries > 0 else PANDA_DEFAULT_MAX_ATTEMPTS, 

1169 ) 

1170 

1171 workflow = IDDS_client_workflow() 

1172 

1173 workflow.add_work(build_work) 

1174 workflow.name = config["bps_defined"]["uniqProcName"] 

1175 return workflow