Coverage for python / lsst / rucio / register / rucio_interface.py: 14%

189 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:24 +0000

1# This file is part of rucio_register 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import logging 

23import random 

24import time 

25import zlib 

26 

27import rucio.common.exception 

28from rucio.client.didclient import DIDClient 

29from rucio.client.replicaclient import ReplicaClient 

30 

31import lsst.daf.butler 

32from lsst.daf.butler import DatasetRef 

33from lsst.resources import ResourcePath 

34from lsst.rucio.register.resource_bundle import ResourceBundle 

35from lsst.rucio.register.rubin_meta import RubinMeta 

36from lsst.rucio.register.rucio_did import RucioDID 

37 

38__all__ = ["RucioInterface"] 

39 

40logger = logging.getLogger(__name__) 

41 

42 

43class RucioInterface: 

44 """Add files as replicas in Rucio, along with metadata, 

45 and attach them to datasets. 

46 

47 Parameters 

48 ---------- 

49 butler : `lsst.daf.butler.Butler` 

50 Butler we're operating upon 

51 rucio_rse : `str` 

52 Name of the RSE that the files live in. 

53 scope : `str` 

54 Rucio scope to register the files in. 

55 rse_root : `str` 

56 Full path to root directory of RSE directory structure 

57 dtn_url : `str` 

58 Base URL of the data transfer node for the Rucio physical filename. 

59 rubin_butler_type: `str` 

60 the type registered in "rubin_butler" metadata for rucio 

61 """ 

62 

63 def __init__( 

64 self, 

65 butler: lsst.daf.butler.Butler, 

66 rucio_rse: str, 

67 scope: str, 

68 rse_root: str, 

69 dtn_url: str, 

70 rubin_butler_type: str, 

71 ): 

72 self.butler = butler 

73 self.rse = rucio_rse 

74 self.scope = scope 

75 self.rse_root = rse_root 

76 self.dtn_url = dtn_url 

77 self.pfn_base = f"{dtn_url}" 

78 self.replica_client = ReplicaClient() 

79 self.did_client = DIDClient() 

80 self.rubin_butler_type = rubin_butler_type 

81 

82 def _make_dataset_ref_bundle(self, dataset_id: str, dataset_ref: DatasetRef) -> ResourceBundle: 

83 """Make a ResourceBundle 

84 

85 Parameters 

86 ---------- 

87 dataset_id : `str` 

88 Rucio dataset name 

89 dataset_ref : `DatasetRef` 

90 Butler DatasetRef 

91 

92 Returns 

93 ------- 

94 rb : `ResourceBundle` 

95 ResourceBundle consolidating dataset id and DatasetRef 

96 """ 

97 logging.debug("%s", dataset_ref.to_json()) 

98 did = self._make_did(self.butler.getURI(dataset_ref), dataset_ref.to_json()) 

99 rb = ResourceBundle(dataset_id=dataset_id, did=did) 

100 return rb 

101 

102 def _make_zip_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle: 

103 """Make a ResourceBundle 

104 

105 Parameters 

106 ---------- 

107 dataset_id : `str` 

108 Rucio dataset name 

109 resouce_path : `ResourcePath` 

110 ResourcePath to a file 

111 

112 Returns 

113 ------- 

114 rb: ResourceBundle 

115 ResourceBundle consolidating dataset id and ResourcePath 

116 """ 

117 did = self._make_did(resource_path) 

118 rb = ResourceBundle(dataset_id=dataset_id, did=did) 

119 return rb 

120 

121 def _make_dim_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle: 

122 """Make a ResourceBundle 

123 

124 Parameters 

125 ---------- 

126 dataset_id : `str` 

127 Rucio dataset name 

128 resouce_path : `lsst.resource.ResourcePath` 

129 ResourcePath to a file 

130 

131 Returns 

132 ------- 

133 rb: `lsst.rucio.register.rucio_bundle.ResourceBundle` 

134 ResourceBundle consolidating dataset id and ResourcePath 

135 """ 

136 did = self._make_did(resource_path) 

137 rb = ResourceBundle(dataset_id=dataset_id, did=did) 

138 return rb 

139 

140 def compute_hashes(self, resource_path: ResourcePath) -> tuple[int, str]: 

141 """return the length and adler32 hash for a file. 

142 

143 Parameters 

144 ---------- 

145 path: `lsst.resources.ResourcePath` 

146 Path to the file. 

147 

148 Returns 

149 ------- 

150 hashes: `tuple` [ `int`, `str` ] 

151 Size in bytes and Adler32 hex hash. 

152 """ 

153 

154 info = resource_path.get_info() 

155 size = info.size 

156 checksums = info.checksums 

157 if "adler32" in checksums: 

158 adler32 = checksums["adler32"] 

159 logger.debug("found adler32 for %s", resource_path) 

160 return size, adler32 

161 return size, self._compute_adler32(resource_path) 

162 

163 def _compute_adler32(self, resource_path: ResourcePath) -> tuple[int, str]: 

164 logger.debug("computing adler32 for %s", resource_path) 

165 adler32 = zlib.adler32(b"") 

166 buffer_size = 10 * 1024 * 1024 

167 with resource_path.open("rb") as f: 

168 while buffer := f.read(buffer_size): 

169 adler32 = zlib.adler32(buffer, adler32) 

170 adler32_digest = f"{adler32:08x}" 

171 return adler32_digest 

172 

173 def _make_did(self, resource_path: ResourcePath, metadata: str = None) -> RucioDID: 

174 """Make a Rucio data identifier dictionary from a resource. 

175 

176 Parameters 

177 ---------- 

178 resource_path: ResourcePath 

179 ResourcePath object 

180 

181 metadata: `str` 

182 String containing Rubin dataset specific metadata 

183 

184 Returns 

185 ------- 

186 did : `dict` [`str`, `str`|`int`] 

187 Rucio data identifier including physical and logical names, 

188 byte length, adler32 checksum, meta, and scope. 

189 """ 

190 

191 size, adler32 = self.compute_hashes(resource_path) 

192 path = resource_path.unquoted_path.removeprefix(self.rse_root) 

193 pfn = self.pfn_base + path 

194 logging.debug("pfn=%s", pfn) 

195 name = path.removeprefix("/" + self.scope + "/") 

196 logging.debug("name=%s", name) 

197 logging.debug("path=%s", path) 

198 

199 if metadata: 

200 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar=metadata) 

201 else: 

202 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar="") 

203 d = RucioDID( 

204 pfn=pfn, 

205 bytes=size, 

206 adler32=adler32, 

207 name=name, 

208 scope=self.scope, 

209 meta=meta, 

210 ) 

211 

212 return d 

213 

214 def _add_replicas(self, bundles: list[ResourceBundle]) -> None: 

215 """Call the Rucio method add_replica for a list of DIDs 

216 

217 Parameters 

218 ---------- 

219 bundles : `list` [`ResourceBundle`] 

220 A list of ResourceBundles 

221 """ 

222 dids = [bundle.get_did() for bundle in bundles] 

223 retries = 0 

224 max_retries = 5 

225 while True: 

226 try: 

227 self.replica_client.add_replicas(rse=self.rse, files=dids) 

228 break 

229 except rucio.common.exception.RucioException: 

230 retries += 1 

231 if retries < max_retries: 

232 seconds = random.randint(10, 20) 

233 logger.debug("failed to add_replicas; sleeping %d seconds", seconds) 

234 time.sleep(seconds) 

235 self.replica_client = ReplicaClient() # XXX not sure we need to do this. 

236 else: 

237 raise Exception(f"Tried {max_retries} times and couldn't add_replicas") 

238 

239 def _add_file_to_dataset_with_retries(self, dataset_id, did): 

240 retries = 0 

241 max_retries = 5 

242 while True: 

243 try: 

244 self.did_client.add_files_to_dataset( 

245 scope=self.scope, name=dataset_id, files=[did], rse=self.rse 

246 ) 

247 break 

248 except rucio.common.exception.FileAlreadyExists: 

249 if "pfn" in did: 

250 logger.debug("file %s already registered in dataset %s", did["pfn"], dataset_id) 

251 return # we can return, because it's already in the dataset 

252 except rucio.common.exception.RucioException: 

253 retries += 1 

254 if retries < max_retries: 

255 seconds = random.randint(10, 20) 

256 logger.debug("failed to register one did to %s; sleeping %d seconds", dataset_id, seconds) 

257 time.sleep(seconds) 

258 self.did_client = DIDClient() # XXX not sure we need to do this. 

259 else: 

260 # we tried max_retries times, and failed, so we'll bail out 

261 raise Exception(f"Couldn't add {did['pfn']} to dataset {dataset_id}") 

262 

263 def _add_files_to_dataset(self, dataset_id: str, dids: list[dict]) -> None: 

264 """Attach a list of files specified by Rucio DIDs to a Rucio dataset. 

265 

266 Ignores already-attached files for idempotency. 

267 

268 Parameters 

269 ---------- 

270 dataset_id : `str` 

271 Logical name of the Rucio dataset. 

272 dids : `list` [`dict` [`str`, `str`|`int`] ] 

273 List of Rucio data identifiers. 

274 """ 

275 retries = 0 

276 max_retries = 5 

277 while True: 

278 try: 

279 self.did_client.add_files_to_dataset( 

280 scope=self.scope, 

281 name=dataset_id, 

282 files=dids, 

283 rse=self.rse, 

284 ) 

285 return 

286 except rucio.common.exception.FileAlreadyExists: 

287 # At least one already is in the dataset. 

288 # This shouldn't happen, but if it does, 

289 # we have to retry each individually. 

290 for did in dids: 

291 self._add_file_to_dataset_with_retries( 

292 dataset_id=dataset_id, 

293 did=did, 

294 ) 

295 return 

296 except rucio.common.exception.DataIdentifierNotFound as e: 

297 raise e 

298 except rucio.common.exception.RucioException: 

299 retries += 1 

300 if retries < max_retries: 

301 seconds = random.randint(10, 20) 

302 logger.debug("failed to register dids to %s; sleeping %d", dataset_id, seconds) 

303 time.sleep(seconds) 

304 continue 

305 else: 

306 raise Exception(f"Couldn't add files to dataset {dataset_id}") 

307 

308 def _add_dataset_with_retries(self, dataset_id: str, statuses: dict) -> None: 

309 retries = 0 

310 max_retries = 5 

311 while True: 

312 try: 

313 self.did_client.add_dataset( 

314 scope=self.scope, 

315 name=dataset_id, 

316 statuses=statuses, 

317 rse=self.rse, 

318 ) 

319 return 

320 except rucio.common.exception.DataIdentifierAlreadyExists as e: 

321 # If someone else created it in the meantime 

322 raise e 

323 except rucio.common.exception.RucioException: 

324 retries += 1 

325 if retries < max_retries: 

326 seconds = random.randint(10, 20) 

327 logger.debug("couldn't register dids to %s; waiting %d", dataset_id, seconds) 

328 time.sleep(seconds) 

329 continue 

330 else: 

331 raise Exception(f"Tried {max_retries} times and couldn't add dataset {dataset_id}") 

332 

333 def register_to_dataset(self, bundles) -> None: 

334 """Register a list of files in Rucio. 

335 

336 Parameters 

337 ---------- 

338 bundles : `list` [`ResourceBundle`] 

339 List of resource bundles 

340 """ 

341 logger.debug("register to dataset") 

342 

343 datasets = dict() 

344 for bundle in bundles: 

345 dataset_id = bundle.dataset_id 

346 datasets.setdefault(dataset_id, []).append(bundle) 

347 

348 for dataset_id, bundles in datasets.items(): 

349 try: 

350 dids = [rb.get_did() for rb in bundles] 

351 names = [did["pfn"] for did in dids] 

352 logger.info("Registering %s in dataset %s, RSE %s", names, dataset_id, self.rse) 

353 self._add_files_to_dataset(dataset_id, dids) 

354 except rucio.common.exception.DataIdentifierNotFound: 

355 # No such dataset, so create it 

356 try: 

357 logger.info("Creating Rucio dataset %s", dataset_id) 

358 self._add_dataset_with_retries( 

359 dataset_id=dataset_id, 

360 statuses={"monotonic": True}, 

361 ) 

362 except rucio.common.exception.DataIdentifierAlreadyExists: 

363 # If someone else created it in the meantime 

364 pass 

365 # And then retry adding DIDs 

366 self._add_files_to_dataset(dataset_id, dids) 

367 

368 logger.debug("Done with Rucio for %s", bundles) 

369 

370 def register_as_replicas(self, dataset_id, dataset_refs) -> None: 

371 """Register a list of DatasetRefs to a Rucio dataset 

372 

373 Parameters 

374 ---------- 

375 dataset_id : `str` 

376 RUCIO dataset id 

377 dataset_refs : `list` [`DatasetRef`] 

378 list of Butler DatasetRefs 

379 """ 

380 bundles = [] 

381 for dataset_ref in dataset_refs: 

382 if type(dataset_ref) is list: 

383 for dsr in dataset_ref: 

384 bundles.append(self._make_dataset_ref_bundle(dataset_id, dsr)) 

385 else: 

386 bundles.append(self._make_dataset_ref_bundle(dataset_id, dataset_ref)) 

387 if len(bundles) == 0: 

388 return 0 

389 self._add_replicas(bundles) 

390 self.register_to_dataset(bundles) 

391 return len(bundles) 

392 

393 def register_zips(self, dataset_id: str, zip_files: list) -> int: 

394 """Register a list of zips to a Rucio Dataset 

395 

396 Parameters 

397 ---------- 

398 dataset_id : `str` 

399 RUCIO dataset id 

400 zip_files : `list` [`ResourcePath`] 

401 list of ResourcePath 

402 

403 Returns 

404 ------- 

405 num : `int` 

406 number of zip files ingested 

407 """ 

408 bundles = [] 

409 for zip_file in zip_files: 

410 bundles.append(self._make_zip_bundle(dataset_id, zip_file)) 

411 self._add_replicas(bundles) 

412 self.register_to_dataset(bundles) 

413 return len(bundles) 

414 

415 def register_dims(self, dataset_id: str, dim_files: list) -> int: 

416 """Register a list of dimension files to a Rucio Dataset 

417 

418 Parameters 

419 ---------- 

420 dataset_id : `str` 

421 RUCIO dataset id 

422 dim_files : `list` [`lsst.resource.ResourcePath`] 

423 list of ResourcePath 

424 

425 Returns 

426 ------- 

427 num : `int` 

428 number of dimension files ingested 

429 """ 

430 bundles = [] 

431 for dim_file in dim_files: 

432 bundles.append(self._make_dim_bundle(dataset_id, dim_file)) 

433 self._add_replicas(bundles) 

434 self.register_to_dataset(bundles) 

435 return len(bundles)