Coverage for python / lsst / rucio / register / rucio_interface.py: 14%
189 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 01:08 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 01:08 -0700
1# This file is part of rucio_register
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import logging
23import random
24import time
25import zlib
27import rucio.common.exception
28from rucio.client.didclient import DIDClient
29from rucio.client.replicaclient import ReplicaClient
31import lsst.daf.butler
32from lsst.daf.butler import DatasetRef
33from lsst.resources import ResourcePath
34from lsst.rucio.register.resource_bundle import ResourceBundle
35from lsst.rucio.register.rubin_meta import RubinMeta
36from lsst.rucio.register.rucio_did import RucioDID
38__all__ = ["RucioInterface"]
40logger = logging.getLogger(__name__)
43class RucioInterface:
44 """Add files as replicas in Rucio, along with metadata,
45 and attach them to datasets.
47 Parameters
48 ----------
49 butler : `lsst.daf.butler.Butler`
50 Butler we're operating upon
51 rucio_rse : `str`
52 Name of the RSE that the files live in.
53 scope : `str`
54 Rucio scope to register the files in.
55 rse_root : `str`
56 Full path to root directory of RSE directory structure
57 dtn_url : `str`
58 Base URL of the data transfer node for the Rucio physical filename.
59 rubin_butler_type: `str`
60 the type registered in "rubin_butler" metadata for rucio
61 """
63 def __init__(
64 self,
65 butler: lsst.daf.butler.Butler,
66 rucio_rse: str,
67 scope: str,
68 rse_root: str,
69 dtn_url: str,
70 rubin_butler_type: str,
71 ):
72 self.butler = butler
73 self.rse = rucio_rse
74 self.scope = scope
75 self.rse_root = rse_root
76 self.dtn_url = dtn_url
77 self.pfn_base = f"{dtn_url}"
78 self.replica_client = ReplicaClient()
79 self.did_client = DIDClient()
80 self.rubin_butler_type = rubin_butler_type
82 def _make_dataset_ref_bundle(self, dataset_id: str, dataset_ref: DatasetRef) -> ResourceBundle:
83 """Make a ResourceBundle
85 Parameters
86 ----------
87 dataset_id : `str`
88 Rucio dataset name
89 dataset_ref : `DatasetRef`
90 Butler DatasetRef
92 Returns
93 -------
94 rb : `ResourceBundle`
95 ResourceBundle consolidating dataset id and DatasetRef
96 """
97 logging.debug("%s", dataset_ref.to_json())
98 did = self._make_did(self.butler.getURI(dataset_ref), dataset_ref.to_json())
99 rb = ResourceBundle(dataset_id=dataset_id, did=did)
100 return rb
102 def _make_zip_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle:
103 """Make a ResourceBundle
105 Parameters
106 ----------
107 dataset_id : `str`
108 Rucio dataset name
109 resouce_path : `ResourcePath`
110 ResourcePath to a file
112 Returns
113 -------
114 rb: ResourceBundle
115 ResourceBundle consolidating dataset id and ResourcePath
116 """
117 did = self._make_did(resource_path)
118 rb = ResourceBundle(dataset_id=dataset_id, did=did)
119 return rb
121 def _make_dim_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle:
122 """Make a ResourceBundle
124 Parameters
125 ----------
126 dataset_id : `str`
127 Rucio dataset name
128 resouce_path : `lsst.resource.ResourcePath`
129 ResourcePath to a file
131 Returns
132 -------
133 rb: `lsst.rucio.register.rucio_bundle.ResourceBundle`
134 ResourceBundle consolidating dataset id and ResourcePath
135 """
136 did = self._make_did(resource_path)
137 rb = ResourceBundle(dataset_id=dataset_id, did=did)
138 return rb
140 def compute_hashes(self, resource_path: ResourcePath) -> tuple[int, str]:
141 """return the length and adler32 hash for a file.
143 Parameters
144 ----------
145 path: `lsst.resources.ResourcePath`
146 Path to the file.
148 Returns
149 -------
150 hashes: `tuple` [ `int`, `str` ]
151 Size in bytes and Adler32 hex hash.
152 """
154 info = resource_path.get_info()
155 size = info.size
156 checksums = info.checksums
157 if "adler32" in checksums:
158 adler32 = checksums["adler32"]
159 logger.debug("found adler32 for %s", resource_path)
160 return size, adler32
161 return size, self._compute_adler32(resource_path)
163 def _compute_adler32(self, resource_path: ResourcePath) -> tuple[int, str]:
164 logger.debug("computing adler32 for %s", resource_path)
165 adler32 = zlib.adler32(b"")
166 buffer_size = 10 * 1024 * 1024
167 with resource_path.open("rb") as f:
168 while buffer := f.read(buffer_size):
169 adler32 = zlib.adler32(buffer, adler32)
170 adler32_digest = f"{adler32:08x}"
171 return adler32_digest
173 def _make_did(self, resource_path: ResourcePath, metadata: str = None) -> RucioDID:
174 """Make a Rucio data identifier dictionary from a resource.
176 Parameters
177 ----------
178 resource_path: ResourcePath
179 ResourcePath object
181 metadata: `str`
182 String containing Rubin dataset specific metadata
184 Returns
185 -------
186 did : `dict` [`str`, `str`|`int`]
187 Rucio data identifier including physical and logical names,
188 byte length, adler32 checksum, meta, and scope.
189 """
191 size, adler32 = self.compute_hashes(resource_path)
192 path = resource_path.unquoted_path.removeprefix(self.rse_root)
193 pfn = self.pfn_base + path
194 logging.debug("pfn=%s", pfn)
195 name = path.removeprefix("/" + self.scope + "/")
196 logging.debug("name=%s", name)
197 logging.debug("path=%s", path)
199 if metadata:
200 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar=metadata)
201 else:
202 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar="")
203 d = RucioDID(
204 pfn=pfn,
205 bytes=size,
206 adler32=adler32,
207 name=name,
208 scope=self.scope,
209 meta=meta,
210 )
212 return d
214 def _add_replicas(self, bundles: list[ResourceBundle]) -> None:
215 """Call the Rucio method add_replica for a list of DIDs
217 Parameters
218 ----------
219 bundles : `list` [`ResourceBundle`]
220 A list of ResourceBundles
221 """
222 dids = [bundle.get_did() for bundle in bundles]
223 retries = 0
224 max_retries = 5
225 while True:
226 try:
227 self.replica_client.add_replicas(rse=self.rse, files=dids)
228 break
229 except rucio.common.exception.RucioException:
230 retries += 1
231 if retries < max_retries:
232 seconds = random.randint(10, 20)
233 logger.debug("failed to add_replicas; sleeping %d seconds", seconds)
234 time.sleep(seconds)
235 self.replica_client = ReplicaClient() # XXX not sure we need to do this.
236 else:
237 raise Exception(f"Tried {max_retries} times and couldn't add_replicas")
239 def _add_file_to_dataset_with_retries(self, dataset_id, did):
240 retries = 0
241 max_retries = 5
242 while True:
243 try:
244 self.did_client.add_files_to_dataset(
245 scope=self.scope, name=dataset_id, files=[did], rse=self.rse
246 )
247 break
248 except rucio.common.exception.FileAlreadyExists:
249 if "pfn" in did:
250 logger.debug("file %s already registered in dataset %s", did["pfn"], dataset_id)
251 return # we can return, because it's already in the dataset
252 except rucio.common.exception.RucioException:
253 retries += 1
254 if retries < max_retries:
255 seconds = random.randint(10, 20)
256 logger.debug("failed to register one did to %s; sleeping %d seconds", dataset_id, seconds)
257 time.sleep(seconds)
258 self.did_client = DIDClient() # XXX not sure we need to do this.
259 else:
260 # we tried max_retries times, and failed, so we'll bail out
261 raise Exception(f"Couldn't add {did['pfn']} to dataset {dataset_id}")
263 def _add_files_to_dataset(self, dataset_id: str, dids: list[dict]) -> None:
264 """Attach a list of files specified by Rucio DIDs to a Rucio dataset.
266 Ignores already-attached files for idempotency.
268 Parameters
269 ----------
270 dataset_id : `str`
271 Logical name of the Rucio dataset.
272 dids : `list` [`dict` [`str`, `str`|`int`] ]
273 List of Rucio data identifiers.
274 """
275 retries = 0
276 max_retries = 5
277 while True:
278 try:
279 self.did_client.add_files_to_dataset(
280 scope=self.scope,
281 name=dataset_id,
282 files=dids,
283 rse=self.rse,
284 )
285 return
286 except rucio.common.exception.FileAlreadyExists:
287 # At least one already is in the dataset.
288 # This shouldn't happen, but if it does,
289 # we have to retry each individually.
290 for did in dids:
291 self._add_file_to_dataset_with_retries(
292 dataset_id=dataset_id,
293 did=did,
294 )
295 return
296 except rucio.common.exception.DataIdentifierNotFound as e:
297 raise e
298 except rucio.common.exception.RucioException:
299 retries += 1
300 if retries < max_retries:
301 seconds = random.randint(10, 20)
302 logger.debug("failed to register dids to %s; sleeping %d", dataset_id, seconds)
303 time.sleep(seconds)
304 continue
305 else:
306 raise Exception(f"Couldn't add files to dataset {dataset_id}")
308 def _add_dataset_with_retries(self, dataset_id: str, statuses: dict) -> None:
309 retries = 0
310 max_retries = 5
311 while True:
312 try:
313 self.did_client.add_dataset(
314 scope=self.scope,
315 name=dataset_id,
316 statuses=statuses,
317 rse=self.rse,
318 )
319 return
320 except rucio.common.exception.DataIdentifierAlreadyExists as e:
321 # If someone else created it in the meantime
322 raise e
323 except rucio.common.exception.RucioException:
324 retries += 1
325 if retries < max_retries:
326 seconds = random.randint(10, 20)
327 logger.debug("couldn't register dids to %s; waiting %d", dataset_id, seconds)
328 time.sleep(seconds)
329 continue
330 else:
331 raise Exception(f"Tried {max_retries} times and couldn't add dataset {dataset_id}")
333 def register_to_dataset(self, bundles) -> None:
334 """Register a list of files in Rucio.
336 Parameters
337 ----------
338 bundles : `list` [`ResourceBundle`]
339 List of resource bundles
340 """
341 logger.debug("register to dataset")
343 datasets = dict()
344 for bundle in bundles:
345 dataset_id = bundle.dataset_id
346 datasets.setdefault(dataset_id, []).append(bundle)
348 for dataset_id, bundles in datasets.items():
349 try:
350 dids = [rb.get_did() for rb in bundles]
351 names = [did["pfn"] for did in dids]
352 logger.info("Registering %s in dataset %s, RSE %s", names, dataset_id, self.rse)
353 self._add_files_to_dataset(dataset_id, dids)
354 except rucio.common.exception.DataIdentifierNotFound:
355 # No such dataset, so create it
356 try:
357 logger.info("Creating Rucio dataset %s", dataset_id)
358 self._add_dataset_with_retries(
359 dataset_id=dataset_id,
360 statuses={"monotonic": True},
361 )
362 except rucio.common.exception.DataIdentifierAlreadyExists:
363 # If someone else created it in the meantime
364 pass
365 # And then retry adding DIDs
366 self._add_files_to_dataset(dataset_id, dids)
368 logger.debug("Done with Rucio for %s", bundles)
370 def register_as_replicas(self, dataset_id, dataset_refs) -> None:
371 """Register a list of DatasetRefs to a Rucio dataset
373 Parameters
374 ----------
375 dataset_id : `str`
376 RUCIO dataset id
377 dataset_refs : `list` [`DatasetRef`]
378 list of Butler DatasetRefs
379 """
380 bundles = []
381 for dataset_ref in dataset_refs:
382 if type(dataset_ref) is list:
383 for dsr in dataset_ref:
384 bundles.append(self._make_dataset_ref_bundle(dataset_id, dsr))
385 else:
386 bundles.append(self._make_dataset_ref_bundle(dataset_id, dataset_ref))
387 if len(bundles) == 0:
388 return 0
389 self._add_replicas(bundles)
390 self.register_to_dataset(bundles)
391 return len(bundles)
393 def register_zips(self, dataset_id: str, zip_files: list) -> int:
394 """Register a list of zips to a Rucio Dataset
396 Parameters
397 ----------
398 dataset_id : `str`
399 RUCIO dataset id
400 zip_files : `list` [`ResourcePath`]
401 list of ResourcePath
403 Returns
404 -------
405 num : `int`
406 number of zip files ingested
407 """
408 bundles = []
409 for zip_file in zip_files:
410 bundles.append(self._make_zip_bundle(dataset_id, zip_file))
411 self._add_replicas(bundles)
412 self.register_to_dataset(bundles)
413 return len(bundles)
415 def register_dims(self, dataset_id: str, dim_files: list) -> int:
416 """Register a list of dimension files to a Rucio Dataset
418 Parameters
419 ----------
420 dataset_id : `str`
421 RUCIO dataset id
422 dim_files : `list` [`lsst.resource.ResourcePath`]
423 list of ResourcePath
425 Returns
426 -------
427 num : `int`
428 number of dimension files ingested
429 """
430 bundles = []
431 for dim_file in dim_files:
432 bundles.append(self._make_dim_bundle(dataset_id, dim_file))
433 self._add_replicas(bundles)
434 self.register_to_dataset(bundles)
435 return len(bundles)