Coverage for python / lsst / ctrl / bps / panda / utils.py: 5%
505 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:41 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 01:41 -0700
1# This file is part of ctrl_bps_panda.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Utilities for bps PanDA plugin."""
30__all__ = [
31 "add_decoder_prefix",
32 "aggregate_by_basename",
33 "convert_exec_string_to_hex",
34 "copy_files_for_distribution",
35 "extract_taskname",
36 "get_idds_client",
37 "get_idds_result",
38 "idds_call_with_check",
39]
41import binascii
42import json
43import logging
44import os
45import random
46import re
47import tarfile
48import time
49import uuid
51import idds.common.utils as idds_utils
52import pandaclient.idds_api
53from idds.doma.workflowv2.domapandawork import DomaPanDAWork
54from idds.workflowv2.workflow import AndCondition
55from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
57from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob, WmsStates
58from lsst.ctrl.bps.panda.cmd_line_embedder import CommandLineEmbedder
59from lsst.ctrl.bps.panda.constants import (
60 PANDA_DEFAULT_CLOUD,
61 PANDA_DEFAULT_CORE_COUNT,
62 PANDA_DEFAULT_MAX_ATTEMPTS,
63 PANDA_DEFAULT_MAX_JOBS_PER_TASK,
64 PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
65 PANDA_DEFAULT_MAX_WALLTIME,
66 PANDA_DEFAULT_NAME_LENGTH,
67 PANDA_DEFAULT_ORDER_ID_MAP_FILE,
68 PANDA_DEFAULT_PRIORITY,
69 PANDA_DEFAULT_PROCESSING_TYPE,
70 PANDA_DEFAULT_PROD_SOURCE_LABEL,
71 PANDA_DEFAULT_RSS,
72 PANDA_DEFAULT_RSS_MAX,
73 PANDA_DEFAULT_TASK_TYPE,
74 PANDA_DEFAULT_VO,
75)
76from lsst.resources import ResourcePath
78_LOG = logging.getLogger(__name__)
81def extract_taskname(s: str) -> str:
82 """Extract the task name from a string that follows a pattern
83 CampaignName_timestamp_TaskNumber_TaskLabel_ChunkNumber.
85 Parameters
86 ----------
87 s : `str`
88 The input string from which to extract the task name.
90 Returns
91 -------
92 taskname : `str`
93 The extracted task name as per the rules above.
94 """
95 # remove surrounding quotes/spaces if present
96 s = s.strip().strip("'\"")
98 # find all occurrences of underscore + digits + underscore,
99 # take the last one
100 matches = re.findall(r"_(\d+)_", s)
101 if matches:
102 last_number = matches[-1]
103 last_pos = s.rfind(f"_{last_number}_") + len(f"_{last_number}_")
104 taskname = s[last_pos:]
105 return taskname
107 # fallback: if no such pattern, return everything
108 taskname = s
109 return taskname
112def aggregate_by_basename(job_summary, exit_code_summary, run_summary):
113 """Aggregate job exit code and run summaries by
114 their base label (basename).
116 Parameters
117 ----------
118 job_summary : `dict` [`str`, `dict` [`str`, `int`]]
119 A mapping of job labels to state-count mappings.
120 exit_code_summary : `dict` [`str`, `list` [`int`]]
121 A mapping of job labels to lists of exit codes.
122 run_summary : `str`
123 A semicolon-separated string of job summaries
124 where each entry has the format "<label>:<count>".
126 Returns
127 -------
128 aggregated_jobs : `dict` [`str`, `dict` [`str`, `int`]]
129 A dictionary mapping each base label to the summed job state counts
130 across all matching labels.
131 aggregated_exits : `dict` [`str`, `list` [`int`]]
132 A dictionary mapping each base label to a combined list of exit codes
133 from all matching labels.
134 aggregated_run : `str`
135 A semicolon-separated string with aggregated job counts by base label.
136 """
138 def base_label(label):
139 return re.sub(r"_\d+$", "", label)
141 aggregated_jobs = {}
142 aggregated_exits = {}
144 for label, states in job_summary.items():
145 base = base_label(label)
146 if base not in aggregated_jobs:
147 aggregated_jobs[base] = dict.fromkeys(WmsStates, 0)
148 for state, count in states.items():
149 aggregated_jobs[base][state] += count
151 for label, codes in exit_code_summary.items():
152 base = base_label(label)
153 aggregated_exits.setdefault(base, []).extend(codes)
155 aggregated = {}
156 for entry in run_summary.split(";"):
157 entry = entry.strip()
158 if not entry:
159 continue
160 try:
161 label, num = entry.split(":")
162 num = int(num)
163 except ValueError:
164 continue
166 base = base_label(label)
167 aggregated[base] = aggregated.get(base, 0) + num
169 aggregated_run = ";".join(f"{base}:{count}" for base, count in aggregated.items())
170 return aggregated_jobs, aggregated_exits, aggregated_run
173def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_workers):
174 """Brings locally generated files into Cloud for further
175 utilization them on the edge nodes.
177 Parameters
178 ----------
179 files_to_stage : `dict` [`str`, `str`]
180 Files which need to be copied to a workflow staging area.
181 file_distribution_uri : `ResourcePath`
182 Path on the edge node accessed storage,
183 including access protocol, bucket name to place files.
184 max_copy_workers : `int`
185 Maximum number of workers for copying files. Present for API
186 compatibility; worker selection is handled internally by
187 `ResourcePath.mtransfer`.
189 Raises
190 ------
191 ExceptionGroup
192 Raised by `ResourcePath.mtransfer` when one or more transfers fail.
193 RuntimeError
194 Raised if a copied file is not found at the distribution point after
195 transfer completes.
196 """
197 files_to_copy = {}
199 # In case there are folders we iterate over its content
200 for local_pfn in files_to_stage.values():
201 folder_name = os.path.basename(os.path.normpath(local_pfn))
202 if os.path.isdir(local_pfn):
203 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True)
204 files_in_folder = ResourcePath.findFileResources([local_pfn])
205 for file in files_in_folder:
206 file_name = file.basename()
207 files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False)
208 else:
209 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
210 files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri
212 for src, trgt in files_to_copy.items():
213 _LOG.info("Staging %s to %s", src, trgt)
214 results = ResourcePath.mtransfer("copy", files_to_copy.items())
216 for trgt in results:
217 if not trgt.exists():
218 raise RuntimeError(f"File was not copied to the distribution point: {trgt}")
221def get_idds_client(config):
222 """Get the idds client.
224 Parameters
225 ----------
226 config : `lsst.ctrl.bps.BpsConfig`
227 BPS configuration.
229 Returns
230 -------
231 idds_client: `idds.client.clientmanager.ClientManager`
232 The iDDS ClientManager object.
233 """
234 idds_server = None
235 if isinstance(config, BpsConfig):
236 _, idds_server = config.search("iddsServer", opt={"default": None})
237 elif isinstance(config, dict) and "iddsServer" in config:
238 idds_server = config["iddsServer"]
239 # if idds_server is None, a default value on the panda relay service
240 # will be used
241 idds_client = pandaclient.idds_api.get_api(
242 idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True
243 )
244 return idds_client
247def get_idds_result(ret):
248 """Parse the results returned from iDDS.
250 Parameters
251 ----------
252 ret : `tuple` [`int`, `tuple` [`bool`, payload ]]
253 The first part ``ret[0]`` is the status of PanDA relay service.
254 The part of ``ret[1][0]`` is the status of iDDS service.
255 The part of ``ret[1][1]`` is the returned payload.
256 If ``ret[1][0]`` is `False`, ``ret[1][1]`` can be error messages.
258 Returns
259 -------
260 status: `bool`
261 The status of iDDS calls.
262 result: `int` or `list` or `dict` or `None`
263 The result returned from iDDS. `None` if error state.
264 error: `str` or `None`
265 Error messages. `None` if no error state.
266 """
267 # https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html
268 if not isinstance(ret, list | tuple) or ret[0] != 0:
269 # Something wrong with the PanDA relay service.
270 # The call may not be delivered to iDDS.
271 status = False
272 result = None
273 error = f"PanDA relay service returns errors: {ret}"
274 else:
275 if ret[1][0]:
276 status = True
277 result = ret[1][1]
278 error = None
279 if isinstance(result, str) and "Authentication no permission" in result:
280 status = False
281 result = None
282 error = result
283 else:
284 # iDDS returns errors
285 status = False
286 result = None
287 error = f"iDDS returns errors: {ret[1][1]}"
288 return status, result, error
291def idds_call_with_check(func, *, func_name: str, request_id: int, **kwargs):
292 """Call an iDDS client function, log, and check the return code.
294 Parameters
295 ----------
296 func : `~collections.abc.Callable`
297 The iDDS client function to call.
298 func_name : `str`
299 Name used for logging.
300 request_id : `int`
301 The request or workflow ID.
302 **kwargs
303 Additional keyword arguments passed to the function.
305 Returns
306 -------
307 ret : `~typing.Any`
308 The return value from the iDDS client function.
309 """
310 call_kwargs = dict(kwargs)
311 if request_id is not None:
312 call_kwargs["request_id"] = request_id
314 ret = func(**call_kwargs)
316 _LOG.debug("PanDA %s returned = %s", func_name, str(ret))
318 request_status = ret[0]
319 if request_status != 0:
320 raise RuntimeError(f"Error calling {func_name}: {ret} for id: {request_id}")
322 return ret
325def _make_pseudo_filename(config, gwjob):
326 """Make the job pseudo filename.
328 Parameters
329 ----------
330 config : `lsst.ctrl.bps.BpsConfig`
331 BPS configuration.
332 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
333 Job for which to create the pseudo filename.
335 Returns
336 -------
337 pseudo_filename : `str`
338 The pseudo filename for the given job.
339 """
340 cmd_line_embedder = CommandLineEmbedder(config)
341 _, pseudo_filename = cmd_line_embedder.substitute_command_line(
342 gwjob.executable.src_uri + " " + gwjob.arguments, gwjob.cmdvals, gwjob.name, []
343 )
344 return pseudo_filename
347def _make_doma_work(
348 config,
349 generic_workflow,
350 gwjob,
351 task_count,
352 task_chunk,
353 enable_event_service=False,
354 enable_job_name_map=False,
355 order_id_map_files=None,
356 es_label=None,
357 max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
358 max_wms_job_wall_time=None,
359 remote_filename=None,
360 qnode_map_filename=None,
361):
362 """Make the DOMA Work object for a PanDA task.
364 Parameters
365 ----------
366 config : `lsst.ctrl.bps.BpsConfig`
367 BPS configuration.
368 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
369 The workflow.
370 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
371 Job representing the jobs for the PanDA task.
372 task_count : `int`
373 Count of PanDA tasks used when making unique names.
374 task_chunk : `int`
375 Count of chunk of a PanDA tasks used when making unique names.
376 enable_event_service : `bool`, optional
377 ???.
378 enable_job_name_map : `bool`, optional
379 ???.
380 order_id_map_files : `typing.Any`, optional
381 ???.
382 es_label : `typing.Any`, optional
383 ???.
384 max_payloads_per_panda_job : `int`, optional
385 ???.
386 max_wms_job_wall_time : `typing.Any`, optional
387 ???.
388 remote_filename : `typing.Any`, optional
389 ???.
390 qnode_map_filename : `typing.Any`, optional
391 ???.
393 Returns
394 -------
395 work : `idds.doma.workflowv2.domapandawork.DomaPanDAWork`
396 The client representation of a PanDA task.
397 local_pfns : `dict` [`str`, `str`]
398 Files which need to be copied to a workflow staging area.
399 """
400 if order_id_map_files is None:
401 order_id_map_files = {}
402 _LOG.debug("Using gwjob %s to create new PanDA task (gwjob=%s)", gwjob.name, gwjob)
403 cvals = {"curr_cluster": gwjob.label}
404 _, site = config.search("computeSite", opt={"curvals": cvals, "required": True})
405 cvals["curr_site"] = site
406 cvals["curr_pipetask"] = gwjob.label
407 _, processing_type = config.search(
408 "processingType", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROCESSING_TYPE}
409 )
410 if gwjob.label in ["finalJob", "customJob"]:
411 _, nonpipetask = config.search(gwjob.label)
412 default_type = "Rubin_Merge"
413 if gwjob.label == "customJob":
414 default_type = PANDA_DEFAULT_PROCESSING_TYPE
415 processing_type = nonpipetask["processingType"] if nonpipetask["processingType"] else default_type
416 _, task_type = config.search("taskType", opt={"curvals": cvals, "default": PANDA_DEFAULT_TASK_TYPE})
417 _, prod_source_label = config.search(
418 "prodSourceLabel", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROD_SOURCE_LABEL}
419 )
420 _, vo = config.search("vo", opt={"curvals": cvals, "default": PANDA_DEFAULT_VO})
422 _, file_distribution_end_point = config.search(
423 "fileDistributionEndPoint", opt={"curvals": cvals, "default": None}
424 )
426 _, file_distribution_end_point_default = config.search(
427 "fileDistributionEndPointDefault", opt={"curvals": cvals, "default": None}
428 )
430 task_rss = gwjob.request_memory if gwjob.request_memory else PANDA_DEFAULT_RSS
431 task_rss_retry_step = task_rss * gwjob.memory_multiplier if gwjob.memory_multiplier else 0
432 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss
434 # Assume input files are same across task
435 local_pfns = {}
436 direct_io_files = set()
438 if gwjob.executable.transfer_executable:
439 local_pfns["job_executable"] = gwjob.executable.src_uri
440 job_executable = f"./{os.path.basename(gwjob.executable.src_uri)}"
441 else:
442 job_executable = gwjob.executable.src_uri
443 cmd_line_embedder = CommandLineEmbedder(config)
444 _LOG.debug(
445 "job %s inputs = %s, outputs = %s",
446 gwjob.name,
447 generic_workflow.get_job_inputs(gwjob.name),
448 generic_workflow.get_job_outputs(gwjob.name),
449 )
451 job_env = ""
452 if gwjob.environment:
453 for key, value in gwjob.environment.items():
454 try:
455 sub_value = value.format_map(gwjob.cmdvals)
456 except (KeyError, TypeError) as exc:
457 _LOG.error("Could not replace command variables: replacement for %s not provided", str(exc))
458 raise
459 job_env += f"export {key}={sub_value}; "
461 cmd_line, _ = cmd_line_embedder.substitute_command_line(
462 job_env + job_executable + " " + gwjob.arguments,
463 gwjob.cmdvals,
464 gwjob.name,
465 generic_workflow.get_job_inputs(gwjob.name) + generic_workflow.get_job_outputs(gwjob.name),
466 )
468 my_log = f"enable_event_service {enable_event_service} for {gwjob.label}"
469 _LOG.info(my_log)
470 if enable_event_service:
471 if gwjob.request_walltime and max_wms_job_wall_time:
472 my_log = (
473 f"requestWalltime({gwjob.request_walltime}) "
474 f"and maxWmsJobWalltime({max_wms_job_wall_time}) are set, "
475 "max_payloads_per_panda_job is int(max_wms_job_wall_time / gwjob.request_walltime), "
476 "ignore maxPayloadsPerPandaJob."
477 )
478 _LOG.info(my_log)
479 max_payloads_per_panda_job = int(max_wms_job_wall_time / gwjob.request_walltime)
480 if max_payloads_per_panda_job < 2:
481 my_log = (
482 f"max_payloads_per_panda_job ({max_payloads_per_panda_job}) is too small, "
483 "disable EventService"
484 )
485 _LOG.info(my_log)
486 enable_event_service = False
488 maxwalltime = gwjob.request_walltime if gwjob.request_walltime else PANDA_DEFAULT_MAX_WALLTIME
489 if enable_event_service:
490 if gwjob.request_walltime and max_payloads_per_panda_job:
491 maxwalltime = gwjob.request_walltime * max_payloads_per_panda_job
492 elif max_wms_job_wall_time:
493 maxwalltime = max_wms_job_wall_time
495 if enable_event_service or enable_job_name_map:
496 for es_name in order_id_map_files:
497 local_pfns[es_name] = order_id_map_files[es_name]
499 for gwfile in generic_workflow.get_job_inputs(gwjob.name, transfer_only=True):
500 local_pfns[gwfile.name] = gwfile.src_uri
501 if os.path.isdir(gwfile.src_uri):
502 # this is needed to make isdir function working
503 # properly in ButlerURL instance on the edge node
504 local_pfns[gwfile.name] += "/"
506 if gwfile.job_access_remote:
507 direct_io_files.add(gwfile.name)
509 if qnode_map_filename:
510 local_pfns.update(qnode_map_filename)
512 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False)
514 if not direct_io_files:
515 if submit_cmd:
516 direct_io_files.add(remote_filename)
517 else:
518 direct_io_files.add("cmdlineplaceholder")
520 lsst_temp = "LSST_RUN_TEMP_SPACE"
521 if lsst_temp in file_distribution_end_point and lsst_temp not in os.environ:
522 file_distribution_end_point = file_distribution_end_point_default
523 if submit_cmd and not file_distribution_end_point:
524 file_distribution_end_point = "FileDistribution"
526 executable = add_decoder_prefix(
527 config, cmd_line, file_distribution_end_point, (local_pfns, direct_io_files)
528 )
529 work = DomaPanDAWork(
530 executable=executable,
531 primary_input_collection={
532 "scope": "pseudo_dataset",
533 "name": f"pseudo_input_collection#{task_count}",
534 },
535 output_collections=[{"scope": "pseudo_dataset", "name": f"pseudo_output_collection#{task_count}"}],
536 log_collections=[],
537 dependency_map=[],
538 task_name=f"{generic_workflow.name}_{task_count:02d}_{gwjob.label}_{task_chunk:02d}",
539 task_queue=gwjob.queue,
540 task_log={
541 "destination": "local",
542 "value": "log.tgz",
543 "dataset": "PandaJob_#{pandaid}/",
544 "token": "local",
545 "param_type": "log",
546 "type": "template",
547 },
548 encode_command_line=True,
549 task_rss=task_rss,
550 task_rss_retry_offset=task_rss_retry_offset,
551 task_rss_retry_step=task_rss_retry_step,
552 task_rss_max=gwjob.request_memory_max if gwjob.request_memory_max else PANDA_DEFAULT_RSS_MAX,
553 task_cloud=gwjob.compute_cloud if gwjob.compute_cloud else PANDA_DEFAULT_CLOUD,
554 task_site=site,
555 task_priority=int(gwjob.priority) if gwjob.priority else PANDA_DEFAULT_PRIORITY,
556 core_count=gwjob.request_cpus if gwjob.request_cpus else PANDA_DEFAULT_CORE_COUNT,
557 working_group=gwjob.accounting_group,
558 processing_type=processing_type,
559 task_type=task_type,
560 prodSourceLabel=prod_source_label,
561 vo=vo,
562 es=enable_event_service,
563 es_label=es_label,
564 max_events_per_job=max_payloads_per_panda_job,
565 maxattempt=gwjob.number_of_retries if gwjob.number_of_retries else PANDA_DEFAULT_MAX_ATTEMPTS,
566 maxwalltime=maxwalltime,
567 )
568 return work, local_pfns
571def add_final_idds_work(
572 config, generic_workflow, idds_client_workflow, dag_sink_work, task_count, task_chunk
573):
574 """Add the special final PanDA task to the client workflow.
576 Parameters
577 ----------
578 config : `lsst.ctrl.bps.BpsConfig`
579 BPS configuration.
580 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
581 Generic workflow in which to find the final job.
582 idds_client_workflow : `idds.workflowv2.workflow.Workflow`
583 The iDDS client representation of the workflow to which the final task
584 is added.
585 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`]
586 The work nodes in the client workflow which have no successors.
587 task_count : `int`
588 Count of PanDA tasks used when making unique names.
589 task_chunk : `int`
590 Count of chunk of a PanDA tasks used when making unique names.
592 Returns
593 -------
594 files : `dict` [`str`, `str`]
595 Files which need to be copied to a workflow staging area.
597 Raises
598 ------
599 NotImplementedError
600 Raised if final job in GenericWorkflow is itself a workflow.
601 TypeError
602 Raised if final job in GenericWorkflow is invalid type.
603 """
604 files = {}
606 # If final job exists in generic workflow, create DAG final job
607 final = generic_workflow.get_final()
608 if final:
609 if isinstance(final, GenericWorkflow):
610 raise NotImplementedError("PanDA plugin does not support a workflow as the final job")
612 if not isinstance(final, GenericWorkflowJob):
613 raise TypeError(f"Invalid type for GenericWorkflow.get_final() results ({type(final)})")
615 dag_final_work, files = _make_doma_work(
616 config,
617 generic_workflow,
618 final,
619 task_count,
620 task_chunk,
621 )
622 pseudo_filename = "pure_pseudoinput+qgraphNodeId:+qgraphId:"
623 dag_final_work.dependency_map.append(
624 {"name": pseudo_filename, "submitted": False, "dependencies": []}
625 )
626 idds_client_workflow.add_work(dag_final_work)
627 conditions = []
628 for work in dag_sink_work:
629 conditions.append(work.is_terminated)
630 and_cond = AndCondition(conditions=conditions, true_works=[dag_final_work])
631 idds_client_workflow.add_condition(and_cond)
632 else:
633 _LOG.debug("No final job in GenericWorkflow")
634 return files
637def convert_exec_string_to_hex(cmdline):
638 """Convert the command line into hex representation.
640 This step is currently involved because large blocks of command lines
641 including special symbols passed to the pilot/container. To make sure
642 the 1 to 1 matching and pass by the special symbol stripping
643 performed by the Pilot we applied the hexing.
645 Parameters
646 ----------
647 cmdline : `str`
648 UTF-8 command line string.
650 Returns
651 -------
652 hex : `str`
653 Hex representation of string.
654 """
655 return binascii.hexlify(cmdline.encode()).decode("utf-8")
658def add_decoder_prefix(config, cmd_line, distribution_path, files):
659 """Compose the command line sent to the pilot from the functional part
660 (the actual SW running) and the middleware part (containers invocation).
662 Parameters
663 ----------
664 config : `lsst.ctrl.bps.BpsConfig`
665 Configuration information.
666 cmd_line : `str`
667 UTF-8 based functional part of the command line.
668 distribution_path : `str`
669 URI of path where all files are located for distribution.
670 files : `tuple` [`dict` [`str`, `str`], `list` [`str`]]
671 File names needed for a task (copied local, direct access).
673 Returns
674 -------
675 decoder_prefix : `str`
676 Full command line to be executed on the edge node.
677 """
678 # Manipulate file paths for placement on cmdline
679 files_plc_hldr = {}
680 for key, pfn in files[0].items():
681 if pfn.endswith("/"):
682 files_plc_hldr[key] = os.path.basename(pfn[:-1])
683 isdir = True
684 else:
685 files_plc_hldr[key] = os.path.basename(pfn)
686 _, extension = os.path.splitext(pfn)
687 isdir = os.path.isdir(pfn) or (key == "butlerConfig" and extension != "yaml")
688 if isdir:
689 # this is needed to make isdir function working
690 # properly in ButlerURL instance on the egde node
691 files_plc_hldr[key] += "/"
692 _LOG.debug("files_plc_hldr[%s] = %s", key, files_plc_hldr[key])
694 cmdline_hex = convert_exec_string_to_hex(cmd_line)
695 _, runner_command = config.search("runnerCommand", opt={"replaceEnvVars": False, "expandEnvVars": False})
696 order_id_map_filename = files[0].get("orderIdMapFilename", None)
697 if order_id_map_filename:
698 order_id_map_filename = os.path.basename(order_id_map_filename)
699 order_id_map_filename = os.path.join(distribution_path, order_id_map_filename)
700 runner_command = runner_command.replace("orderIdMapFilename", order_id_map_filename)
701 runner_command = runner_command.replace("\n", " ")
702 decoder_prefix = runner_command.replace(
703 "_cmd_line_",
704 str(cmdline_hex)
705 + " ${IN/L} "
706 + distribution_path
707 + " "
708 + "+".join(f"{k}:{v}" for k, v in files_plc_hldr.items())
709 + " "
710 + "+".join(files[1]),
711 )
712 return decoder_prefix
715def add_idds_work(config, generic_workflow, idds_workflow):
716 """Convert GenericWorkflowJobs to iDDS work and add them to the iDDS
717 workflow.
719 Parameters
720 ----------
721 config : `lsst.ctrl.bps.BpsConfig`
722 BPS configuration.
723 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
724 Generic workflow containing jobs to convert.
725 idds_workflow : `idds.workflowv2.workflow.Workflow`
726 The iDDS workflow to which the converted jobs should be added.
728 Returns
729 -------
730 files_to_pre_stage : `dict` [`str`, `str`]
731 Files that need to be copied to the staging area before submission.
732 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`]
733 The work nodes in the client workflow which have no successors.
734 task_count : `int`
735 Number of tasks in iDDS workflow used for unique task names.
737 Raises
738 ------
739 RuntimeError
740 If cannot recover from dependency issues after pass through workflow.
741 """
742 # event service
743 _, enable_event_service = config.search("enableEventService", opt={"default": None})
744 _, enable_qnode_map = config.search("enableQnodeMap", opt={"default": None})
745 _, max_payloads_per_panda_job = config.search(
746 "maxPayloadsPerPandaJob", opt={"default": PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB}
747 )
748 _, max_wms_job_wall_time = config.search("maxWmsJobWalltime", opt={"default": None})
749 my_log = (
750 f"enableEventService: {enable_event_service}, maxPayloadsPerPandaJob: {max_payloads_per_panda_job}"
751 )
752 _LOG.info(my_log)
754 # job name map: Use a short job name to map the long job name
755 _, enable_job_name_map = config.search("enableJobNameMap", opt={"default": None})
756 _LOG.info(f"enable_job_name_map: {enable_job_name_map}, {type(enable_job_name_map)}")
757 if enable_event_service and not enable_job_name_map:
758 enable_job_name_map = True
759 my_log = "enable_event_service is set, set enable_job_name_map True."
760 _LOG.info(my_log)
762 # Limit number of jobs in single PanDA task
763 _, max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": PANDA_DEFAULT_MAX_JOBS_PER_TASK})
765 files_to_pre_stage = {}
766 dag_sink_work = [] # Workflow sink nodes that need to be connected to final task
767 job_to_task = {}
768 job_to_pseudo_filename = {}
769 task_count = 0 # Task number/ID in idds workflow used for unique name
770 remote_archive_filename = None
772 submit_path = config["submitPath"]
774 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False)
775 if submit_cmd:
776 files = generic_workflow.get_executables(data=False, transfer_only=True)
777 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz"
778 archive_filename = create_archive_file(submit_path, archive_filename, files)
779 remote_archive_filename = copy_files_to_pandacache(archive_filename)
781 order_id_map_files = {}
782 name_works = {}
783 order_id_map = {}
784 job_name_to_order_id_map = {}
785 order_id_map_file = None
786 max_payloads_per_panda_job_by_label = {}
787 if enable_event_service:
788 enable_event_service = enable_event_service.split(",")
789 enable_event_service_tmp = []
790 for es_def in enable_event_service:
791 if ":" in es_def:
792 es_label, m_payloads = es_def.split(":")
793 else:
794 es_label, m_payloads = es_def, max_payloads_per_panda_job
795 es_label = es_label.strip()
796 enable_event_service_tmp.append(es_label)
797 max_payloads_per_panda_job_by_label[es_label] = int(m_payloads)
798 enable_event_service = enable_event_service_tmp
799 if enable_job_name_map:
800 _, order_id_map_filename = config.search(
801 "orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE}
802 )
803 order_id_map_file = os.path.join(submit_path, order_id_map_filename)
804 order_id_map_files = {"orderIdMapFilename": order_id_map_file}
805 files_to_pre_stage.update(order_id_map_files)
807 # To avoid dying due to optimizing number of times through workflow,
808 # catch dependency issues to loop through again later.
809 jobs_with_dependency_issues = {}
811 # Initialize quantum node map
812 qnode_map = {}
813 qnode_map_filename = None
814 if enable_qnode_map:
815 qnode_map_file = os.path.join(submit_path, "qnode_map.json")
816 qnode_map_filename = {"qnodemap": qnode_map_file}
817 files_to_pre_stage.update(qnode_map_filename)
819 # Assume jobs with same label share config values
820 for job_label in generic_workflow.labels:
821 _LOG.debug("job_label = %s", job_label)
823 if enable_job_name_map:
824 order_id_map[job_label] = {}
825 job_name_to_order_id_map[job_label] = {}
827 # Add each job with a particular label to a corresponding PanDA task
828 # A PanDA task has a limit on number of jobs, so break into multiple
829 # PanDA tasks if needed.
830 job_count = 0 # Number of jobs in idds task used for task chunking
831 task_chunk = 1 # Task chunk number within job label used for unique name
832 work = None
833 order_id = -1
835 # Instead of changing code to make chunks up front and round-robin
836 # assign jobs to chunks, for now keeping chunk creation in loop
837 # but using knowledge of how many chunks there will be to set better
838 # maximum number of jobs in a chunk for more even distribution.
839 jobs_by_label = generic_workflow.get_jobs_by_label(job_label)
840 num_chunks = -(-len(jobs_by_label) // max_jobs_per_task) # ceil
841 max_jobs_per_task_this_label = -(-len(jobs_by_label) // num_chunks)
842 _LOG.debug(
843 "For job_label = %s, num jobs = %s, num_chunks = %s, max_jobs = %s",
844 job_label,
845 len(jobs_by_label),
846 num_chunks,
847 max_jobs_per_task_this_label,
848 )
849 for gwjob in jobs_by_label:
850 order_id += 1
851 pseudo_filename = _make_pseudo_filename(config, gwjob)
852 job_to_pseudo_filename[gwjob.name] = pseudo_filename
853 if enable_job_name_map:
854 order_id_map[job_label][str(order_id)] = pseudo_filename
855 job_name_to_order_id_map[job_label][gwjob.name] = str(order_id)
857 job_count += 1
858 if job_count > max_jobs_per_task_this_label:
859 job_count = 1
860 task_chunk += 1
862 if job_count == 1:
863 # Create new PanDA task object
864 task_count += 1
865 work_enable_event_service = False
866 if enable_event_service and job_label in enable_event_service:
867 work_enable_event_service = True
868 max_payloads_per_panda_job_current = max_payloads_per_panda_job_by_label.get(
869 job_label, max_payloads_per_panda_job
870 )
871 work, files = _make_doma_work(
872 config,
873 generic_workflow,
874 gwjob,
875 task_count,
876 task_chunk,
877 enable_event_service=work_enable_event_service,
878 enable_job_name_map=enable_job_name_map,
879 order_id_map_files=order_id_map_files,
880 es_label=job_label,
881 max_payloads_per_panda_job=max_payloads_per_panda_job_current,
882 max_wms_job_wall_time=max_wms_job_wall_time,
883 remote_filename=remote_archive_filename,
884 qnode_map_filename=qnode_map_filename,
885 )
886 work.dependency_tasks = []
887 name_works[work.task_name] = work
888 files_to_pre_stage.update(files)
889 idds_workflow.add_work(work)
890 if generic_workflow.out_degree(gwjob.name) == 0:
891 dag_sink_work.append(work)
893 if enable_qnode_map:
894 job_name_PH = "PH:" + gwjob.name
895 job_to_pseudo_filename[gwjob.name] = job_name_PH
896 qnode_map[job_name_PH] = pseudo_filename
898 job_to_task[gwjob.name] = work.get_work_name()
899 deps = []
900 missing_deps = False
901 for parent_job_name in generic_workflow.predecessors(gwjob.name):
902 if parent_job_name not in job_to_task:
903 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys())
904 missing_deps = True
905 break
906 else:
907 if enable_job_name_map:
908 parent_job = generic_workflow.get_job(parent_job_name)
909 parent_job_label = parent_job.label
910 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
911 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}"
912 else:
913 inputname = job_to_pseudo_filename[parent_job_name]
915 parent_task_name = job_to_task[parent_job_name]
916 deps.append(
917 {
918 "task": parent_task_name,
919 "inputname": inputname,
920 }
921 )
922 if parent_task_name not in work.dependency_tasks:
923 work.dependency_tasks.append(parent_task_name)
924 if not missing_deps:
925 j_name = job_to_pseudo_filename[gwjob.name]
926 f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else j_name
927 work.dependency_map.append(
928 {
929 "name": f_name,
930 "order_id": order_id,
931 "dependencies": deps,
932 }
933 )
934 else:
935 jobs_with_dependency_issues[gwjob.name] = {
936 "work": work,
937 "order_id": order_id,
938 "label": job_label,
939 }
941 if enable_qnode_map:
942 with open(qnode_map_file, "w", encoding="utf-8") as f:
943 json.dump(qnode_map, f, indent=2)
945 # If there were any issues figuring out dependencies through earlier loop
946 if jobs_with_dependency_issues:
947 _LOG.warning("Could not prepare workflow in single pass. Please notify developers.")
948 _LOG.info("Trying to recover...")
949 for job_name, work_item in jobs_with_dependency_issues.items():
950 deps = []
951 work = work_item["work"]
952 order_id = work_item["order_id"]
953 job_label = work_item["label"]
955 for parent_job_name in generic_workflow.predecessors(job_name):
956 if parent_job_name not in job_to_task:
957 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys())
958 raise RuntimeError(
959 "Could not recover from dependency issues ({job_name} missing {parent_job_name})."
960 )
961 if enable_job_name_map:
962 parent_job = generic_workflow.get_job(parent_job_name)
963 parent_job_label = parent_job.label
964 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
965 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}"
966 else:
967 inputname = job_to_pseudo_filename[parent_job_name]
969 parent_task_name = job_to_task[parent_job_name]
970 deps.append(
971 {
972 "task": parent_task_name,
973 "inputname": inputname,
974 }
975 )
976 if parent_task_name not in work.dependency_tasks:
977 work.dependency_tasks.append(parent_task_name)
979 work.dependency_map.append(
980 {
981 "name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else job_name,
982 "order_id": order_id,
983 "dependencies": deps,
984 }
985 )
987 _LOG.info("Successfully recovered.")
989 for task_name in name_works:
990 work = name_works[task_name]
991 # trigger the setter function which will validate the dependency_map:
992 # 1) check the name length to avoid the the name too long,
993 # 2) check to avoid duplicated items.
994 sorted_dep_map = sorted(work.dependency_map, key=lambda x: x["order_id"])
995 work.dependency_map = sorted_dep_map
997 if enable_job_name_map:
998 with open(order_id_map_file, "w") as f:
999 json.dump(order_id_map, f)
1001 return files_to_pre_stage, dag_sink_work, task_count
1004def create_archive_file(submit_path, archive_filename, files):
1005 if not archive_filename.startswith("/"):
1006 archive_filename = os.path.join(submit_path, archive_filename)
1008 with tarfile.open(archive_filename, "w:gz", dereference=True) as tar:
1009 for local_file in files:
1010 base_name = os.path.basename(local_file)
1011 tar.add(local_file, arcname=os.path.basename(base_name))
1012 return archive_filename
1015def copy_files_to_pandacache(filename):
1016 from pandaclient import Client
1018 attempt = 0
1019 max_attempts = 3
1020 done = False
1021 while attempt < max_attempts and not done:
1022 status, out = Client.putFile(filename, True)
1023 if status == 0:
1024 done = True
1025 print(f"copy_files_to_pandacache: status: {status}, out: {out}")
1026 if out.startswith("NewFileName:"):
1027 # found the same input sandbox to reuse
1028 filename = out.split(":")[-1]
1029 elif out != "True":
1030 print(out)
1031 return None
1033 filename = os.path.basename(filename)
1034 cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache")
1035 filename = os.path.join(cache_path, filename)
1036 return filename
1039def download_extract_archive(filename, prefix=None):
1040 """Download and extract the tarball from pandacache.
1042 Parameters
1043 ----------
1044 filename : `str`
1045 The filename to download.
1046 prefix : `str`, optional
1047 The target directory the tarball will be downloaded and extracted to.
1048 If None (default), the current directory will be used.
1049 """
1050 archive_basename = os.path.basename(filename)
1051 target_dir = prefix if prefix is not None else os.getcwd()
1052 full_output_filename = os.path.join(target_dir, archive_basename)
1054 if filename.startswith("https:"):
1055 panda_cache_url = os.path.dirname(os.path.dirname(filename))
1056 os.environ["PANDACACHE_URL"] = panda_cache_url
1057 elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
1058 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
1059 panda_cache_url = os.environ.get("PANDACACHE_URL", None)
1060 print(f"PANDACACHE_URL: {panda_cache_url}")
1062 # The import of PanDA client must happen *after* the PANDACACHE_URL is set.
1063 # Otherwise, the PanDA client the environment setting will not be parsed.
1064 from pandaclient import Client
1066 attempt = 0
1067 max_attempts = 3
1068 while attempt < max_attempts:
1069 status, output = Client.getFile(archive_basename, output_path=full_output_filename)
1070 if status == 0:
1071 break
1072 if attempt <= 1:
1073 secs = random.randint(1, 10)
1074 elif attempt <= 2:
1075 secs = random.randint(1, 60)
1076 else:
1077 secs = random.randint(1, 120)
1078 time.sleep(secs)
1079 print(f"Download archive file from pandacache status: {status}, output: {output}")
1080 if status != 0:
1081 raise RuntimeError("Failed to download archive file from pandacache")
1082 with tarfile.open(full_output_filename, "r:gz") as f:
1083 f.extractall(target_dir)
1084 print(f"Extracted {full_output_filename} to {target_dir}")
1085 os.remove(full_output_filename)
1086 print(f"Removed {full_output_filename}")
1089def get_task_parameter(config, remote_build, key):
1090 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False}
1091 _, value = remote_build.search(key, search_opt)
1092 if not value:
1093 _, value = config.search(key, search_opt)
1094 return value
1097def create_idds_build_workflow(**kwargs):
1098 config = kwargs["config"] if "config" in kwargs else None
1099 remote_build = kwargs["remote_build"] if "remote_build" in kwargs else None
1100 config_file = kwargs["config_file"] if "config_file" in kwargs else None
1101 config_file_base = os.path.basename(config_file) if config_file else None
1102 compute_site = kwargs["compute_site"] if "compute_site" in kwargs else None
1103 _, files = remote_build.search("files", opt={"default": []})
1104 submit_path = config["submitPath"]
1105 files.append(config_file)
1106 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz"
1107 archive_filename = create_archive_file(submit_path, archive_filename, files)
1108 _LOG.info(f"archive file name: {archive_filename}")
1109 remote_filename = copy_files_to_pandacache(archive_filename)
1110 _LOG.info(f"pandacache file: {remote_filename}")
1112 _LOG.info(type(remote_build))
1113 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False}
1114 cvals = {"LSST_VERSION": get_task_parameter(config, remote_build, "LSST_VERSION")}
1115 cvals["custom_lsst_setup"] = get_task_parameter(config, remote_build, "custom_lsst_setup")
1116 max_name_length = PANDA_DEFAULT_NAME_LENGTH
1117 if "IDDS_MAX_NAME_LENGTH" in os.environ:
1118 max_name_length = int(os.environ["IDDS_MAX_NAME_LENGTH"])
1119 cvals["IDDS_MAX_NAME_LENGTH"] = max_name_length
1120 search_opt["curvals"] = cvals
1121 _, executable = remote_build.search("runnerCommand", opt=search_opt)
1122 executable = executable.replace("_download_cmd_line_", remote_filename)
1123 executable = executable.replace("_build_cmd_line_", config_file_base)
1124 executable = executable.replace("_compute_site_", compute_site or "")
1126 task_cloud = get_task_parameter(config, remote_build, "computeCloud")
1127 task_site = get_task_parameter(config, remote_build, "computeSite")
1128 task_queue = get_task_parameter(config, remote_build, "queue")
1129 task_rss = get_task_parameter(config, remote_build, "requestMemory")
1130 task_rss_max = get_task_parameter(config, remote_build, "requestMemoryMax")
1131 memory_multiplier = get_task_parameter(config, remote_build, "memoryMultiplier")
1132 task_rss_retry_step = task_rss * memory_multiplier if memory_multiplier else 0
1133 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss
1134 nretries = get_task_parameter(config, remote_build, "numberOfRetries")
1135 processing_type = get_task_parameter(config, remote_build, "processingType")
1136 priority = get_task_parameter(config, remote_build, "priority")
1137 _LOG.info("requestMemory: %s", task_rss)
1138 _LOG.info("Site: %s", task_site)
1139 # _LOG.info("executable: %s", executable)
1140 # TODO: fill other parameters based on config
1141 build_work = DomaPanDAWork(
1142 executable=executable,
1143 task_type="lsst_build",
1144 primary_input_collection={"scope": "pseudo_dataset", "name": "pseudo_input_collection#1"},
1145 output_collections=[{"scope": "pseudo_dataset", "name": "pseudo_output_collection#1"}],
1146 log_collections=[],
1147 dependency_map=None,
1148 task_name="build_task",
1149 task_queue=task_queue,
1150 encode_command_line=True,
1151 prodSourceLabel="managed",
1152 processing_type=processing_type,
1153 task_log={
1154 "dataset": "PandaJob_#{pandaid}/",
1155 "destination": "local",
1156 "param_type": "log",
1157 "token": "local",
1158 "type": "template",
1159 "value": "log.tgz",
1160 },
1161 task_rss=task_rss if task_rss else PANDA_DEFAULT_RSS,
1162 task_rss_max=task_rss_max if task_rss_max else PANDA_DEFAULT_RSS_MAX,
1163 task_rss_retry_offset=task_rss_retry_offset,
1164 task_rss_retry_step=task_rss_retry_step,
1165 task_cloud=task_cloud,
1166 task_site=task_site,
1167 task_priority=int(priority) if priority else PANDA_DEFAULT_PRIORITY,
1168 maxattempt=nretries if nretries > 0 else PANDA_DEFAULT_MAX_ATTEMPTS,
1169 )
1171 workflow = IDDS_client_workflow()
1173 workflow.add_work(build_work)
1174 workflow.name = config["bps_defined"]["uniqProcName"]
1175 return workflow