Coverage for python / lsst / daf / butler / datastores / fileDatastore.py: 9%

1077 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-14 07:38 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Generic file-based datastore code.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("FileDatastore",) 

33 

34import contextlib 

35import hashlib 

36import logging 

37import math 

38from collections import defaultdict 

39from collections.abc import Callable, Collection, Iterable, Iterator, Mapping, Sequence 

40from typing import TYPE_CHECKING, Any, ClassVar, cast 

41 

42from sqlalchemy import BigInteger, String 

43 

44from lsst.daf.butler import ( 

45 Config, 

46 DatasetDatastoreRecords, 

47 DatasetId, 

48 DatasetRef, 

49 DatasetType, 

50 DatasetTypeNotSupportedError, 

51 FileDataset, 

52 FileDescriptor, 

53 Formatter, 

54 FormatterFactory, 

55 FormatterV1inV2, 

56 FormatterV2, 

57 Location, 

58 LocationFactory, 

59 Progress, 

60 StorageClass, 

61 ddl, 

62) 

63from lsst.daf.butler.datastore import ( 

64 DatasetRefURIs, 

65 Datastore, 

66 DatastoreConfig, 

67 DatastoreOpaqueTable, 

68 DatastoreValidationError, 

69) 

70from lsst.daf.butler.datastore.cache_manager import ( 

71 AbstractDatastoreCacheManager, 

72 DatastoreCacheManager, 

73 DatastoreDisabledCacheManager, 

74) 

75from lsst.daf.butler.datastore.composites import CompositesMap 

76from lsst.daf.butler.datastore.file_templates import FileTemplates, FileTemplateValidationError 

77from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore 

78from lsst.daf.butler.datastore.record_data import DatastoreRecordData, DatastoreRecordTable 

79from lsst.daf.butler.datastore.stored_file_info import ( 

80 StoredDatastoreItemInfo, 

81 StoredFileInfo, 

82 StoredFileInfoTable, 

83) 

84from lsst.daf.butler.datastores.file_datastore.get import ( 

85 DatasetLocationInformation, 

86 DatastoreFileGetInformation, 

87 generate_datastore_get_information, 

88 get_dataset_as_python_object_from_get_info, 

89) 

90from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ( 

91 ArtifactIndexInfo, 

92 ZipIndex, 

93 determine_destination_for_retrieved_artifact, 

94 unpack_zips, 

95) 

96from lsst.daf.butler.registry.interfaces import ( 

97 DatabaseInsertMode, 

98 DatastoreRegistryBridge, 

99 FakeDatasetRef, 

100 ReadOnlyDatabaseError, 

101) 

102from lsst.daf.butler.repo_relocation import replaceRoot 

103from lsst.daf.butler.utils import transactional 

104from lsst.resources import ResourcePath, ResourcePathExpression 

105from lsst.utils.introspection import get_class_of, get_full_type_name 

106from lsst.utils.iteration import chunk_iterable 

107 

108# For VERBOSE logging usage. 

109from lsst.utils.logging import VERBOSE, getLogger 

110from lsst.utils.timer import time_this 

111 

112from ..datastore import FileTransferMap, FileTransferRecord 

113 

114if TYPE_CHECKING: 

115 from lsst.daf.butler import DatasetProvenance, LookupKey 

116 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager 

117 

118log = getLogger(__name__) 

119 

120 

121class _IngestPrepData(Datastore.IngestPrepData): 

122 """Helper class for FileDatastore ingest implementation. 

123 

124 Parameters 

125 ---------- 

126 datasets : `~collections.abc.Iterable` of `FileDataset` 

127 Files to be ingested by this datastore. 

128 """ 

129 

130 def __init__(self, datasets: Iterable[FileDataset]): 

131 super().__init__(ref for dataset in datasets for ref in dataset.refs) 

132 self.datasets = datasets 

133 

134 

135class FileDatastore(GenericBaseDatastore[StoredFileInfo]): 

136 """Generic Datastore for file-based implementations. 

137 

138 Should always be sub-classed since key abstract methods are missing. 

139 

140 Parameters 

141 ---------- 

142 config : `DatastoreConfig` or `str` 

143 Configuration as either a `Config` object or URI to file. 

144 bridgeManager : `DatastoreRegistryBridgeManager` 

145 Object that manages the interface between `Registry` and datastores. 

146 root : `lsst.resources.ResourcePath` 

147 Root directory URI of this `Datastore`. 

148 formatterFactory : `FormatterFactory` 

149 Factory for creating instances of formatters. 

150 templates : `FileTemplates` 

151 File templates that can be used by this `Datastore`. 

152 composites : `CompositesMap` 

153 Determines whether a dataset should be disassembled on put. 

154 trustGetRequest : `bool` 

155 Determine whether we can fall back to configuration if a requested 

156 dataset is not known to registry. 

157 

158 Raises 

159 ------ 

160 ValueError 

161 If root location does not exist and ``create`` is `False` in the 

162 configuration. 

163 """ 

164 

165 defaultConfigFile: ClassVar[str | None] = None 

166 """Path to configuration defaults. Accessed within the ``config`` resource 

167 or relative to a search path. Can be None if no defaults specified. 

168 """ 

169 

170 root: ResourcePath 

171 """Root directory URI of this `Datastore`.""" 

172 

173 locationFactory: LocationFactory 

174 """Factory for creating locations relative to the datastore root.""" 

175 

176 formatterFactory: FormatterFactory 

177 """Factory for creating instances of formatters.""" 

178 

179 templates: FileTemplates 

180 """File templates that can be used by this `Datastore`.""" 

181 

182 composites: CompositesMap 

183 """Determines whether a dataset should be disassembled on put.""" 

184 

185 defaultConfigFile = "datastores/fileDatastore.yaml" 

186 """Path to configuration defaults. Accessed within the ``config`` resource 

187 or relative to a search path. Can be None if no defaults specified. 

188 """ 

189 

190 _retrieve_dataset_method: Callable[[str], DatasetType | None] | None = None 

191 """Callable that is used in trusted mode to retrieve registry definition 

192 of a named dataset type. 

193 """ 

194 

195 @classmethod 

196 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: 

197 """Set any filesystem-dependent config options for this Datastore to 

198 be appropriate for a new empty repository with the given root. 

199 

200 Parameters 

201 ---------- 

202 root : `str` 

203 URI to the root of the data repository. 

204 config : `Config` 

205 A `Config` to update. Only the subset understood by 

206 this component will be updated. Will not expand 

207 defaults. 

208 full : `Config` 

209 A complete config with all defaults expanded that can be 

210 converted to a `DatastoreConfig`. Read-only and will not be 

211 modified by this method. 

212 Repository-specific options that should not be obtained 

213 from defaults when Butler instances are constructed 

214 should be copied from ``full`` to ``config``. 

215 overwrite : `bool`, optional 

216 If `False`, do not modify a value in ``config`` if the value 

217 already exists. Default is always to overwrite with the provided 

218 ``root``. 

219 

220 Notes 

221 ----- 

222 If a keyword is explicitly defined in the supplied ``config`` it 

223 will not be overridden by this method if ``overwrite`` is `False`. 

224 This allows explicit values set in external configs to be retained. 

225 """ 

226 Config.updateParameters( 

227 DatastoreConfig, 

228 config, 

229 full, 

230 toUpdate={"root": root}, 

231 toCopy=("cls", ("records", "table")), 

232 overwrite=overwrite, 

233 ) 

234 

235 @classmethod 

236 def makeTableSpec(cls) -> ddl.TableSpec: 

237 return ddl.TableSpec( 

238 fields=[ 

239 ddl.FieldSpec(name="dataset_id", dtype=ddl.GUID, primaryKey=True), 

240 ddl.FieldSpec(name="path", dtype=String, length=256, nullable=False), 

241 ddl.FieldSpec(name="formatter", dtype=String, length=128, nullable=False), 

242 ddl.FieldSpec(name="storage_class", dtype=String, length=64, nullable=False), 

243 # Use empty string to indicate no component 

244 ddl.FieldSpec(name="component", dtype=String, length=32, primaryKey=True), 

245 # TODO: should checksum be Base64Bytes instead? 

246 ddl.FieldSpec(name="checksum", dtype=String, length=128, nullable=True), 

247 ddl.FieldSpec(name="file_size", dtype=BigInteger, nullable=True), 

248 ], 

249 unique=frozenset(), 

250 indexes=[ddl.IndexSpec("path")], 

251 ) 

252 

253 def __init__( 

254 self, 

255 config: DatastoreConfig, 

256 bridgeManager: DatastoreRegistryBridgeManager, 

257 root: ResourcePath, 

258 formatterFactory: FormatterFactory, 

259 templates: FileTemplates, 

260 composites: CompositesMap, 

261 trustGetRequest: bool, 

262 ): 

263 super().__init__(config, bridgeManager) 

264 self.root = ResourcePath(root) 

265 self.formatterFactory = formatterFactory 

266 self.templates = templates 

267 self.composites = composites 

268 self.trustGetRequest = trustGetRequest 

269 

270 # Name ourselves either using an explicit name or a name 

271 # derived from the (unexpanded) root 

272 if "name" in self.config: 

273 self.name = self.config["name"] 

274 else: 

275 # We use the unexpanded root in the name to indicate that this 

276 # datastore can be moved without having to update registry. 

277 self.name = "{}@{}".format(type(self).__name__, self.config["root"]) 

278 

279 self.locationFactory = LocationFactory(self.root) 

280 

281 self._opaque_table_name = self.config["records", "table"] 

282 try: 

283 # Storage of paths and formatters, keyed by dataset_id 

284 self._table = bridgeManager.opaque.register(self._opaque_table_name, self.makeTableSpec()) 

285 # Interface to Registry. 

286 self._bridge = bridgeManager.register(self.name) 

287 except ReadOnlyDatabaseError: 

288 # If the database is read only and we just tried and failed to 

289 # create a table, it means someone is trying to create a read-only 

290 # butler client for an empty repo. That should be okay, as long 

291 # as they then try to get any datasets before some other client 

292 # creates the table. Chances are they're just validating 

293 # configuration. 

294 pass 

295 

296 # Determine whether checksums should be used - default to False 

297 self.useChecksum = self.config.get("checksum", False) 

298 

299 # Create a cache manager 

300 self.cacheManager: AbstractDatastoreCacheManager 

301 if "cached" in self.config: 

302 self.cacheManager = DatastoreCacheManager(self.config["cached"], universe=bridgeManager.universe) 

303 else: 

304 self.cacheManager = DatastoreDisabledCacheManager("", universe=bridgeManager.universe) 

305 

306 self.universe = bridgeManager.universe 

307 

308 @classmethod 

309 def _create_from_config( 

310 cls, 

311 config: DatastoreConfig, 

312 bridgeManager: DatastoreRegistryBridgeManager, 

313 butlerRoot: ResourcePathExpression | None, 

314 ) -> FileDatastore: 

315 if "root" not in config: 

316 raise ValueError("No root directory specified in configuration") 

317 

318 # Support repository relocation in config 

319 # Existence of self.root is checked in subclass 

320 root = ResourcePath(replaceRoot(config["root"], butlerRoot), forceDirectory=True, forceAbsolute=True) 

321 

322 # Now associate formatters with storage classes 

323 formatterFactory = FormatterFactory() 

324 formatterFactory.registerFormatters(config["formatters"], universe=bridgeManager.universe) 

325 

326 # Read the file naming templates 

327 templates = FileTemplates(config["templates"], universe=bridgeManager.universe) 

328 

329 # See if composites should be disassembled 

330 composites = CompositesMap(config["composites"], universe=bridgeManager.universe) 

331 

332 # Determine whether we can fall back to configuration if a 

333 # requested dataset is not known to registry 

334 trustGetRequest = config.get("trust_get_request", False) 

335 

336 self = FileDatastore( 

337 config, bridgeManager, root, formatterFactory, templates, composites, trustGetRequest 

338 ) 

339 

340 # Check existence and create directory structure if necessary. 

341 # 

342 # The concept of a 'root directory' is problematic for some resource 

343 # path types that don't necessarily support the concept of a directory 

344 # (http, s3, gs... basically anything that isn't a local filesystem or 

345 # WebDAV.) 

346 # On these resource paths an object representing the 

347 # "root" directory may not exist even though files under the root do, 

348 # and in a read-only repository we will be unable to create it. 

349 # So we only immediately verify the root for local filesystems, 

350 # the only case where this check will definitely not give a false 

351 # negative. 

352 if self.root.isLocal and not self.root.exists(): 

353 if "create" not in self.config or not self.config["create"]: 

354 raise ValueError(f"No valid root and not allowed to create one at: {self.root}") 

355 try: 

356 self.root.mkdir() 

357 except Exception as e: 

358 raise ValueError( 

359 f"Can not create datastore root '{self.root}', check permissions. Got error: {e}" 

360 ) from e 

361 

362 return self 

363 

364 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore: 

365 return FileDatastore( 

366 self.config, 

367 bridgeManager, 

368 self.root, 

369 self.formatterFactory, 

370 self.templates, 

371 self.composites, 

372 self.trustGetRequest, 

373 ) 

374 

375 def __str__(self) -> str: 

376 return str(self.root) 

377 

378 @property 

379 def bridge(self) -> DatastoreRegistryBridge: 

380 return self._bridge 

381 

382 @property 

383 def roots(self) -> dict[str, ResourcePath | None]: 

384 # Docstring inherited. 

385 return {self.name: self.root} 

386 

387 def _set_trust_mode(self, mode: bool) -> None: 

388 self.trustGetRequest = mode 

389 

390 def _artifact_exists(self, location: Location) -> bool: 

391 """Check that an artifact exists in this datastore at the specified 

392 location. 

393 

394 Parameters 

395 ---------- 

396 location : `Location` 

397 Expected location of the artifact associated with this datastore. 

398 

399 Returns 

400 ------- 

401 exists : `bool` 

402 True if the location can be found, false otherwise. 

403 """ 

404 log.debug("Checking if resource exists: %s", location.uri) 

405 return location.uri.exists() 

406 

407 def addStoredItemInfo( 

408 self, 

409 refs: Iterable[DatasetRef], 

410 infos: Iterable[StoredFileInfo], 

411 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, 

412 ) -> None: 

413 """Record internal storage information associated with one or more 

414 datasets. 

415 

416 Parameters 

417 ---------- 

418 refs : sequence of `DatasetRef` 

419 The datasets that have been stored. 

420 infos : sequence of `StoredDatastoreItemInfo` 

421 Metadata associated with the stored datasets. 

422 insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode` 

423 Mode to use to insert the new records into the table. The 

424 options are ``INSERT`` (error if pre-existing), ``REPLACE`` 

425 (replace content with new values), and ``ENSURE`` (skip if the row 

426 already exists). 

427 """ 

428 records = [ 

429 info.rebase(ref).to_record(dataset_id=ref.id) for ref, info in zip(refs, infos, strict=True) 

430 ] 

431 match insert_mode: 

432 case DatabaseInsertMode.INSERT: 

433 self._table.insert(*records, transaction=self._transaction) 

434 case DatabaseInsertMode.ENSURE: 

435 self._table.ensure(*records, transaction=self._transaction) 

436 case DatabaseInsertMode.REPLACE: 

437 self._table.replace(*records, transaction=self._transaction) 

438 case _: 

439 raise ValueError(f"Unknown insert mode of '{insert_mode}'") 

440 

441 def getStoredItemsInfo( 

442 self, ref: DatasetIdRef, ignore_datastore_records: bool = False 

443 ) -> list[StoredFileInfo]: 

444 """Retrieve information associated with files stored in this 

445 `Datastore` associated with this dataset ref. 

446 

447 Parameters 

448 ---------- 

449 ref : `DatasetRef` 

450 The dataset that is to be queried. 

451 ignore_datastore_records : `bool` 

452 If `True` then do not use datastore records stored in refs. 

453 

454 Returns 

455 ------- 

456 items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`] 

457 Stored information about the files and associated formatters 

458 associated with this dataset. Only one file will be returned 

459 if the dataset has not been disassembled. Can return an empty 

460 list if no matching datasets can be found. 

461 """ 

462 # Try to get them from the ref first. 

463 if ref._datastore_records is not None and not ignore_datastore_records: 

464 ref_records = ref._datastore_records.get(self._table.name, []) 

465 # Need to make sure they have correct type. 

466 for record in ref_records: 

467 if not isinstance(record, StoredFileInfo): 

468 raise TypeError(f"Datastore record has unexpected type {record.__class__.__name__}") 

469 return cast(list[StoredFileInfo], ref_records) 

470 

471 # Look for the dataset_id -- there might be multiple matches 

472 # if we have disassembled the dataset. 

473 records = self._table.fetch(dataset_id=ref.id) 

474 return [StoredFileInfo.from_record(record) for record in records] 

475 

476 def _register_datasets( 

477 self, 

478 refsAndInfos: Iterable[tuple[DatasetRef, StoredFileInfo]], 

479 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, 

480 ) -> None: 

481 """Update registry to indicate that one or more datasets have been 

482 stored. 

483 

484 Parameters 

485 ---------- 

486 refsAndInfos : sequence `tuple` [`DatasetRef`, 

487 `StoredDatastoreItemInfo`] 

488 Datasets to register and the internal datastore metadata associated 

489 with them. 

490 insert_mode : `str`, optional 

491 Indicate whether the new records should be new ("insert", default), 

492 or allowed to exists ("ensure") or be replaced if already present 

493 ("replace"). 

494 """ 

495 expandedRefs: list[DatasetRef] = [] 

496 expandedItemInfos: list[StoredFileInfo] = [] 

497 

498 for ref, itemInfo in refsAndInfos: 

499 expandedRefs.append(ref) 

500 expandedItemInfos.append(itemInfo) 

501 

502 # Dataset location only cares about registry ID so if we have 

503 # disassembled in datastore we have to deduplicate. Since they 

504 # will have different datasetTypes we can't use a set 

505 registryRefs = {r.id: r for r in expandedRefs} 

506 if insert_mode == DatabaseInsertMode.INSERT: 

507 self.bridge.insert(registryRefs.values()) 

508 else: 

509 # There are only two columns and all that matters is the 

510 # dataset ID. 

511 self.bridge.ensure(registryRefs.values()) 

512 self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode) 

513 

514 def _get_stored_records_associated_with_refs( 

515 self, refs: Iterable[DatasetIdRef], ignore_datastore_records: bool = False 

516 ) -> dict[DatasetId, list[StoredFileInfo]]: 

517 """Retrieve all records associated with the provided refs. 

518 

519 Parameters 

520 ---------- 

521 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

522 The refs for which records are to be retrieved. 

523 ignore_datastore_records : `bool` 

524 If `True` then do not use datastore records stored in refs. 

525 

526 Returns 

527 ------- 

528 records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`] 

529 The matching records indexed by the ref ID. The number of entries 

530 in the dict can be smaller than the number of requested refs. 

531 """ 

532 # Check datastore records in refs first. 

533 records_by_ref: defaultdict[DatasetId, list[StoredFileInfo]] = defaultdict(list) 

534 refs_with_no_records = [] 

535 for ref in refs: 

536 if ignore_datastore_records or ref._datastore_records is None: 

537 refs_with_no_records.append(ref) 

538 else: 

539 if (ref_records := ref._datastore_records.get(self._table.name)) is not None: 

540 # Need to make sure they have correct type. 

541 for ref_record in ref_records: 

542 if not isinstance(ref_record, StoredFileInfo): 

543 raise TypeError( 

544 f"Datastore record has unexpected type {ref_record.__class__.__name__}" 

545 ) 

546 records_by_ref[ref.id].append(ref_record) 

547 

548 # If there were any refs without datastore records, check opaque table. 

549 records = self._table.fetch(dataset_id=[ref.id for ref in refs_with_no_records]) 

550 

551 # Uniqueness is dataset_id + component so can have multiple records 

552 # per ref. 

553 for record in records: 

554 records_by_ref[record["dataset_id"]].append(StoredFileInfo.from_record(record)) 

555 return records_by_ref 

556 

557 def _refs_associated_with_artifacts( 

558 self, paths: Iterable[str | ResourcePath] 

559 ) -> dict[str, set[DatasetId]]: 

560 """Return paths and associated dataset refs. 

561 

562 Parameters 

563 ---------- 

564 paths : `list` of `str` or `lsst.resources.ResourcePath` 

565 All the paths to include in search. These are exact matches 

566 to the entries in the records table and can include fragments. 

567 

568 Returns 

569 ------- 

570 mapping : `dict` of [`str`, `set` [`DatasetId`]] 

571 Mapping of each path to a set of associated database IDs. 

572 These are artifacts and so any fragments are stripped from the 

573 keys. 

574 """ 

575 # Group paths by those that have fragments and those that do not. 

576 with_fragment = set() 

577 without_fragment = set() 

578 for rpath in paths: 

579 spath = str(rpath) # Typing says can be ResourcePath so must force to string. 

580 if "#" in spath: 

581 spath, fragment = spath.rsplit("#", 1) 

582 with_fragment.add(spath) 

583 else: 

584 without_fragment.add(spath) 

585 

586 result: dict[str, set[DatasetId]] = defaultdict(set) 

587 if without_fragment: 

588 records = self._table.fetch(path=without_fragment) 

589 for row in records: 

590 path = row["path"] 

591 result[path].add(row["dataset_id"]) 

592 if with_fragment: 

593 # Do a query per prefix. 

594 for path in with_fragment: 

595 records = self._table.fetch(path=f"{path}#%") 

596 for row in records: 

597 # Need to strip fragments before adding to dict. 

598 row_path = row["path"] 

599 artifact_path = row_path[: row_path.rfind("#")] 

600 result[artifact_path].add(row["dataset_id"]) 

601 return result 

602 

603 def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[DatasetId]: 

604 """Return all dataset refs associated with the supplied path. 

605 

606 Parameters 

607 ---------- 

608 pathInStore : `lsst.resources.ResourcePath` 

609 Path of interest in the data store. 

610 

611 Returns 

612 ------- 

613 ids : `set` of `int` 

614 All `DatasetRef` IDs associated with this path. 

615 """ 

616 records = list(self._table.fetch(path=str(pathInStore))) 

617 ids = {r["dataset_id"] for r in records} 

618 return ids 

619 

620 def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: 

621 """Remove information about the file associated with this dataset. 

622 

623 Parameters 

624 ---------- 

625 ref : `DatasetRef` 

626 The dataset that has been removed. 

627 """ 

628 # Note that this method is actually not used by this implementation, 

629 # we depend on bridge to delete opaque records. But there are some 

630 # tests that check that this method works, so we keep it for now. 

631 self._table.delete(["dataset_id"], {"dataset_id": ref.id}) 

632 

633 def _get_dataset_locations_info( 

634 self, ref: DatasetIdRef, ignore_datastore_records: bool = False 

635 ) -> list[DatasetLocationInformation]: 

636 r"""Find all the `Location`\ s of the requested dataset in the 

637 `Datastore` and the associated stored file information. 

638 

639 Parameters 

640 ---------- 

641 ref : `DatasetRef` 

642 Reference to the required `Dataset`. 

643 ignore_datastore_records : `bool` 

644 If `True` then do not use datastore records stored in refs. 

645 

646 Returns 

647 ------- 

648 results : `list` [`tuple` [`Location`, `StoredFileInfo` ]] 

649 Location of the dataset within the datastore and 

650 stored information about each file and its formatter. 

651 """ 

652 # Get the file information (this will fail if no file) 

653 records = self.getStoredItemsInfo(ref, ignore_datastore_records) 

654 

655 # Use the path to determine the location -- we need to take 

656 # into account absolute URIs in the datastore record 

657 return [(r.file_location(self.locationFactory), r) for r in records] 

658 

659 def _can_remove_dataset_artifact(self, ref: DatasetIdRef, location: Location) -> bool: 

660 """Check that there is only one dataset associated with the 

661 specified artifact. 

662 

663 Parameters 

664 ---------- 

665 ref : `DatasetRef` or `FakeDatasetRef` 

666 Dataset to be removed. 

667 location : `Location` 

668 The location of the artifact to be removed. 

669 

670 Returns 

671 ------- 

672 can_remove : `Bool` 

673 True if the artifact can be safely removed. 

674 """ 

675 # Can't ever delete absolute URIs. 

676 if location.pathInStore.isabs(): 

677 return False 

678 

679 # Get all entries associated with this path 

680 allRefs = self._registered_refs_per_artifact(location.pathInStore) 

681 if not allRefs: 

682 raise RuntimeError(f"Datastore inconsistency error. {location.pathInStore} not in registry") 

683 

684 # Remove these refs from all the refs and if there is nothing left 

685 # then we can delete 

686 remainingRefs = allRefs - {ref.id} 

687 

688 if remainingRefs: 

689 return False 

690 return True 

691 

692 def _get_expected_dataset_locations_info(self, ref: DatasetRef) -> list[tuple[Location, StoredFileInfo]]: 

693 """Predict the location and related file information of the requested 

694 dataset in this datastore. 

695 

696 Parameters 

697 ---------- 

698 ref : `DatasetRef` 

699 Reference to the required `Dataset`. 

700 

701 Returns 

702 ------- 

703 results : `list` [`tuple` [`Location`, `StoredFileInfo` ]] 

704 Expected Location of the dataset within the datastore and 

705 placeholder information about each file and its formatter. 

706 

707 Notes 

708 ----- 

709 Uses the current configuration to determine how we would expect the 

710 datastore files to have been written if we couldn't ask registry. 

711 This is safe so long as there has been no change to datastore 

712 configuration between writing the dataset and wanting to read it. 

713 Will not work for files that have been ingested without using the 

714 standard file template or default formatter. 

715 """ 

716 # If we have a component ref we always need to ask the questions 

717 # of the composite. If the composite is disassembled this routine 

718 # should return all components. If the composite was not 

719 # disassembled the composite is what is stored regardless of 

720 # component request. Note that if the caller has disassembled 

721 # a composite there is no way for this guess to know that 

722 # without trying both the composite and component ref and seeing 

723 # if there is something at the component Location even without 

724 # disassembly being enabled. 

725 if ref.datasetType.isComponent(): 

726 ref = ref.makeCompositeRef() 

727 

728 # See if the ref is a composite that should be disassembled 

729 doDisassembly = self.composites.shouldBeDisassembled(ref) 

730 

731 all_info: list[tuple[Location, Formatter | FormatterV2, StorageClass, str | None]] = [] 

732 

733 if doDisassembly: 

734 for component, componentStorage in ref.datasetType.storageClass.components.items(): 

735 compRef = ref.makeComponentRef(component) 

736 location, formatter = self._determine_put_formatter_location(compRef) 

737 all_info.append((location, formatter, componentStorage, component)) 

738 

739 else: 

740 # Always use the composite ref if no disassembly 

741 location, formatter = self._determine_put_formatter_location(ref) 

742 all_info.append((location, formatter, ref.datasetType.storageClass, None)) 

743 

744 # Convert the list of tuples to have StoredFileInfo as second element 

745 return [ 

746 ( 

747 location, 

748 StoredFileInfo( 

749 formatter=formatter, 

750 path=location.pathInStore.path, 

751 storageClass=storageClass, 

752 component=component, 

753 checksum=None, 

754 file_size=-1, 

755 ), 

756 ) 

757 for location, formatter, storageClass, component in all_info 

758 ] 

759 

760 def _prepare_for_direct_get( 

761 self, ref: DatasetRef, parameters: Mapping[str, Any] | None = None 

762 ) -> list[DatastoreFileGetInformation]: 

763 """Check parameters for ``get`` and obtain formatter and 

764 location. 

765 

766 Parameters 

767 ---------- 

768 ref : `DatasetRef` 

769 Reference to the required Dataset. 

770 parameters : `dict` 

771 `StorageClass`-specific parameters that specify, for example, 

772 a slice of the dataset to be loaded. 

773 

774 Returns 

775 ------- 

776 getInfo : `list` [`DatastoreFileGetInformation`] 

777 Parameters needed to retrieve each file. 

778 """ 

779 log.debug("Retrieve %s from %s with parameters %s", ref, self.name, parameters) 

780 

781 # The storage class we want to use eventually 

782 refStorageClass = ref.datasetType.storageClass 

783 

784 # For trusted mode need to reset storage class. 

785 ref = self._cast_storage_class(ref) 

786 

787 # Get file metadata and internal metadata 

788 fileLocations = self._get_dataset_locations_info(ref) 

789 if not fileLocations: 

790 if not self.trustGetRequest: 

791 raise FileNotFoundError(f"Could not retrieve dataset {ref}.") 

792 # Assume the dataset is where we think it should be 

793 fileLocations = self._get_expected_dataset_locations_info(ref) 

794 

795 if len(fileLocations) > 1: 

796 # If trust is involved it is possible that there will be 

797 # components listed here that do not exist in the datastore. 

798 # Explicitly check for file artifact existence and filter out any 

799 # that are missing. 

800 if self.trustGetRequest: 

801 fileLocations = [loc for loc in fileLocations if loc[0].uri.exists()] 

802 

803 # For now complain only if we have no components at all. One 

804 # component is probably a problem but we can punt that to the 

805 # assembler. 

806 if not fileLocations: 

807 raise FileNotFoundError(f"None of the component files for dataset {ref} exist.") 

808 

809 return generate_datastore_get_information( 

810 fileLocations, 

811 readStorageClass=refStorageClass, 

812 ref=ref, 

813 parameters=parameters, 

814 ) 

815 

816 def _determine_put_formatter_location( 

817 self, ref: DatasetRef, provenance: DatasetProvenance | None = None 

818 ) -> tuple[Location, Formatter | FormatterV2]: 

819 """Calculate the formatter and output location to use for put. 

820 

821 Parameters 

822 ---------- 

823 ref : `DatasetRef` 

824 Reference to the associated Dataset. 

825 provenance : `DatasetProvenance` 

826 Any provenance that should be attached to the serialized dataset. 

827 

828 Returns 

829 ------- 

830 location : `Location` 

831 The location to write the dataset. 

832 formatter : `Formatter` 

833 The `Formatter` to use to write the dataset. 

834 """ 

835 # Work out output file name 

836 try: 

837 template = self.templates.getTemplate(ref) 

838 except KeyError as e: 

839 raise DatasetTypeNotSupportedError(f"Unable to find template for {ref}") from e 

840 

841 # Validate the template to protect against filenames from different 

842 # dataIds returning the same and causing overwrite confusion. 

843 template.validateTemplate(ref) 

844 

845 location = self.locationFactory.fromPath(template.format(ref), trusted_path=True) 

846 

847 # Get the formatter based on the storage class 

848 storageClass = ref.datasetType.storageClass 

849 try: 

850 formatter = self.formatterFactory.getFormatter( 

851 ref, 

852 FileDescriptor(location, storageClass=storageClass, component=ref.datasetType.component()), 

853 dataId=ref.dataId, 

854 ref=ref, 

855 provenance=provenance, 

856 ) 

857 except KeyError as e: 

858 raise DatasetTypeNotSupportedError( 

859 f"Unable to find formatter for {ref} in datastore {self.name}" 

860 ) from e 

861 

862 # Now that we know the formatter, update the location 

863 location = formatter.make_updated_location(location) 

864 

865 return location, formatter 

866 

867 def _overrideTransferMode(self, *datasets: FileDataset, transfer: str | None = None) -> str | None: 

868 # Docstring inherited from base class 

869 if transfer != "auto": 

870 return transfer 

871 

872 # See if the paths are within the datastore or not 

873 inside = [self._pathInStore(d.path) is not None for d in datasets] 

874 

875 if all(inside): 

876 transfer = None 

877 elif not any(inside): 

878 # Allow ResourcePath to use its own knowledge 

879 transfer = "auto" 

880 else: 

881 # This can happen when importing from a datastore that 

882 # has had some datasets ingested using "direct" mode. 

883 # Also allow ResourcePath to sort it out but warn about it. 

884 # This can happen if you are importing from a datastore 

885 # that had some direct transfer datasets. 

886 log.warning( 

887 "Some datasets are inside the datastore and some are outside. Using 'split' " 

888 "transfer mode. This assumes that the files outside the datastore are " 

889 "still accessible to the new butler since they will not be copied into " 

890 "the target datastore." 

891 ) 

892 transfer = "split" 

893 

894 return transfer 

895 

896 def _pathInStore(self, path: ResourcePathExpression) -> str | None: 

897 """Return path relative to datastore root. 

898 

899 Parameters 

900 ---------- 

901 path : `lsst.resources.ResourcePathExpression` 

902 Path to dataset. Can be absolute URI. If relative assumed to 

903 be relative to the datastore. Returns path in datastore 

904 or raises an exception if the path it outside. 

905 

906 Returns 

907 ------- 

908 inStore : `str` 

909 Path relative to datastore root. Returns `None` if the file is 

910 outside the root. 

911 """ 

912 # Relative path will always be relative to datastore 

913 pathUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

914 return pathUri.relative_to(self.root) 

915 

916 def _standardizeIngestPath( 

917 self, 

918 path: str | ResourcePath, 

919 *, 

920 transfer: str | None = None, 

921 check_existence: bool = False, 

922 ) -> str | ResourcePath: 

923 """Standardize the path of a to-be-ingested file. 

924 

925 Parameters 

926 ---------- 

927 path : `str` or `lsst.resources.ResourcePath` 

928 Path of a file to be ingested. This parameter is not expected 

929 to be all the types that can be used to construct a 

930 `~lsst.resources.ResourcePath`. 

931 transfer : `str`, optional 

932 How (and whether) the dataset should be added to the datastore. 

933 See `ingest` for details of transfer modes. 

934 This implementation is provided only so 

935 `NotImplementedError` can be raised if the mode is not supported; 

936 actual transfers are deferred to `_extractIngestInfo`. 

937 check_existence : `bool`, optional 

938 If `True` the existence of the file will be checked, otherwise 

939 no check will be made. 

940 

941 Returns 

942 ------- 

943 path : `str` or `lsst.resources.ResourcePath` 

944 New path in what the datastore considers standard form. If an 

945 absolute URI was given that will be returned unchanged. 

946 

947 Notes 

948 ----- 

949 Subclasses of `FileDatastore` can implement this method instead 

950 of `_prepIngest`. It should not modify the data repository or given 

951 file in any way. 

952 

953 Raises 

954 ------ 

955 NotImplementedError 

956 Raised if the datastore does not support the given transfer mode 

957 (including the case where ingest is not supported at all). 

958 """ 

959 if transfer not in (None, "direct", "split") + self.root.transferModes: 

960 raise NotImplementedError(f"Transfer mode {transfer} not supported.") 

961 

962 # A relative URI indicates relative to datastore root 

963 srcUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

964 if not srcUri.isabs(): 

965 srcUri = self.root.join(path) 

966 

967 if check_existence and not srcUri.exists(): 

968 raise FileNotFoundError( 

969 f"Resource at {srcUri} does not exist; note that paths to ingest " 

970 f"are assumed to be relative to {self.root} unless they are absolute." 

971 ) 

972 

973 if transfer is None: 

974 relpath = srcUri.relative_to(self.root) 

975 if not relpath: 

976 raise RuntimeError( 

977 f"Transfer is none but source file ({srcUri}) is not within datastore ({self.root})" 

978 ) 

979 

980 # Return the relative path within the datastore for internal 

981 # transfer 

982 path = relpath 

983 

984 return path 

985 

986 def _extractIngestInfo( 

987 self, 

988 path: ResourcePathExpression, 

989 ref: DatasetRef, 

990 *, 

991 formatter: Formatter | FormatterV2 | type[Formatter | FormatterV2], 

992 transfer: str | None = None, 

993 record_validation_info: bool = True, 

994 ) -> StoredFileInfo: 

995 """Relocate (if necessary) and extract `StoredFileInfo` from a 

996 to-be-ingested file. 

997 

998 Parameters 

999 ---------- 

1000 path : `lsst.resources.ResourcePathExpression` 

1001 URI or path of a file to be ingested. 

1002 ref : `DatasetRef` 

1003 Reference for the dataset being ingested. Guaranteed to have 

1004 ``dataset_id not None`. 

1005 formatter : `type` or `Formatter` 

1006 `Formatter` subclass to use for this dataset or an instance. 

1007 transfer : `str`, optional 

1008 How (and whether) the dataset should be added to the datastore. 

1009 See `ingest` for details of transfer modes. 

1010 record_validation_info : `bool`, optional 

1011 If `True`, the default, the datastore can record validation 

1012 information associated with the file. If `False` the datastore 

1013 will not attempt to track any information such as checksums 

1014 or file sizes. This can be useful if such information is tracked 

1015 in an external system or if the file is to be compressed in place. 

1016 It is up to the datastore whether this parameter is relevant. 

1017 

1018 Returns 

1019 ------- 

1020 info : `StoredFileInfo` 

1021 Internal datastore record for this file. This will be inserted by 

1022 the caller; the `_extractIngestInfo` is only responsible for 

1023 creating and populating the struct. 

1024 

1025 Raises 

1026 ------ 

1027 FileNotFoundError 

1028 Raised if one of the given files does not exist. 

1029 FileExistsError 

1030 Raised if transfer is not `None` but the (internal) location the 

1031 file would be moved to is already occupied. 

1032 """ 

1033 if self._transaction is None: 

1034 raise RuntimeError("Ingest called without transaction enabled") 

1035 

1036 # Create URI of the source path, do not need to force a relative 

1037 # path to absolute. 

1038 srcUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

1039 

1040 # Track whether we have read the size of the source yet 

1041 have_sized = False 

1042 

1043 tgtLocation: Location | None 

1044 if transfer is None or transfer == "split": 

1045 # A relative path is assumed to be relative to the datastore 

1046 # in this context 

1047 if not srcUri.isabs(): 

1048 tgtLocation = self.locationFactory.fromPath(srcUri.ospath, trusted_path=False) 

1049 else: 

1050 # Work out the path in the datastore from an absolute URI 

1051 # This is required to be within the datastore. 

1052 pathInStore = srcUri.relative_to(self.root) 

1053 if pathInStore is None and transfer is None: 

1054 raise RuntimeError( 

1055 f"Unexpectedly learned that {srcUri} is not within datastore {self.root}" 

1056 ) 

1057 if pathInStore: 

1058 tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True) 

1059 elif transfer == "split": 

1060 # Outside the datastore but treat that as a direct ingest 

1061 # instead. 

1062 tgtLocation = None 

1063 else: 

1064 raise RuntimeError(f"Unexpected transfer mode encountered: {transfer} for URI {srcUri}") 

1065 elif transfer == "direct": 

1066 # Want to store the full URI to the resource directly in 

1067 # datastore. This is useful for referring to permanent archive 

1068 # storage for raw data. 

1069 # Trust that people know what they are doing. 

1070 tgtLocation = None 

1071 else: 

1072 # Work out the name we want this ingested file to have 

1073 # inside the datastore 

1074 tgtLocation = self._calculate_ingested_datastore_name(srcUri, ref, formatter) 

1075 

1076 # if we are transferring from a local file to a remote location 

1077 # it may be more efficient to get the size and checksum of the 

1078 # local file rather than the transferred one 

1079 if record_validation_info and srcUri.isLocal: 

1080 size = srcUri.size() 

1081 checksum = self.computeChecksum(srcUri) if self.useChecksum else None 

1082 have_sized = True 

1083 

1084 # Transfer the resource to the destination. 

1085 # Allow overwrite of an existing file. This matches the behavior 

1086 # of datastore.put() in that it trusts that registry would not 

1087 # be asking to overwrite unless registry thought that the 

1088 # overwrite was allowed. 

1089 tgtLocation.uri.transfer_from( 

1090 srcUri, transfer=transfer, transaction=self._transaction, overwrite=True 

1091 ) 

1092 

1093 if tgtLocation is None: 

1094 # This means we are using direct mode 

1095 targetUri = srcUri 

1096 targetPath = str(srcUri) 

1097 else: 

1098 targetUri = tgtLocation.uri 

1099 targetPath = tgtLocation.pathInStore.path 

1100 

1101 # the file should exist in the datastore now 

1102 if record_validation_info: 

1103 if not have_sized: 

1104 size = targetUri.size() 

1105 checksum = self.computeChecksum(targetUri) if self.useChecksum else None 

1106 else: 

1107 # Not recording any file information. 

1108 size = -1 

1109 checksum = None 

1110 

1111 return StoredFileInfo( 

1112 formatter=formatter, 

1113 path=targetPath, 

1114 storageClass=ref.datasetType.storageClass, 

1115 component=ref.datasetType.component(), 

1116 file_size=size, 

1117 checksum=checksum, 

1118 ) 

1119 

1120 def _prepIngest(self, *datasets: FileDataset, transfer: str | None = None) -> _IngestPrepData: 

1121 # Docstring inherited from Datastore._prepIngest. 

1122 filtered = [] 

1123 

1124 # Ingest could be given tens of thousands of files. It is not efficient 

1125 # to check for the existence of every single file (especially if they 

1126 # are remote URIs) but in some transfer modes the files will be checked 

1127 # anyhow when they are relocated. For direct or None transfer modes 

1128 # it is possible to not know if the file is accessible at all. 

1129 # Therefore limit number of files that will be checked (but always 

1130 # include the first one). 

1131 max_checks = 200 

1132 n_datasets = len(datasets) 

1133 if n_datasets <= max_checks: 

1134 check_every_n = 1 

1135 elif transfer in ("direct", None): 

1136 check_every_n = int(n_datasets / max_checks + 1) # +1 so that if n < max_checks the answer is 1. 

1137 else: 

1138 check_every_n = 0 

1139 

1140 for count, dataset in enumerate(datasets): 

1141 acceptable = [ref for ref in dataset.refs if self.constraints.isAcceptable(ref)] 

1142 if not acceptable: 

1143 continue 

1144 else: 

1145 dataset.refs = acceptable 

1146 if dataset.formatter is None: 

1147 dataset.formatter = self.formatterFactory.getFormatterClass(dataset.refs[0]) 

1148 else: 

1149 assert isinstance(dataset.formatter, type | str) 

1150 formatter_class = get_class_of(dataset.formatter) 

1151 if not issubclass(formatter_class, Formatter | FormatterV2): 

1152 raise TypeError(f"Requested formatter {dataset.formatter} is not a Formatter class.") 

1153 dataset.formatter = formatter_class 

1154 

1155 # Decide whether the file should be checked. 

1156 check_existence = False 

1157 if check_every_n != 0: 

1158 # First time through count is 0 so we guarantee to check 

1159 # the first file but not necessarily the final one. 

1160 check_existence = count % check_every_n == 0 

1161 

1162 if check_existence: 

1163 log.debug( 

1164 "Checking file existence: %s (%d/%d) [%s]", 

1165 check_existence, 

1166 count + 1, 

1167 n_datasets, 

1168 transfer, 

1169 ) 

1170 

1171 dataset.path = self._standardizeIngestPath( 

1172 dataset.path, transfer=transfer, check_existence=check_existence 

1173 ) 

1174 filtered.append(dataset) 

1175 return _IngestPrepData(filtered) 

1176 

1177 @transactional 

1178 def _finishIngest( 

1179 self, 

1180 prepData: Datastore.IngestPrepData, 

1181 *, 

1182 transfer: str | None = None, 

1183 record_validation_info: bool = True, 

1184 ) -> None: 

1185 # Docstring inherited from Datastore._finishIngest. 

1186 refsAndInfos = [] 

1187 progress = Progress("lsst.daf.butler.datastores.FileDatastore.ingest", level=logging.DEBUG) 

1188 for dataset in progress.wrap(prepData.datasets, desc="Ingesting dataset files"): 

1189 # Do ingest as if the first dataset ref is associated with the file 

1190 info = self._extractIngestInfo( 

1191 dataset.path, 

1192 dataset.refs[0], 

1193 formatter=dataset.formatter, 

1194 transfer=transfer, 

1195 record_validation_info=record_validation_info, 

1196 ) 

1197 refsAndInfos.extend([(ref, info) for ref in dataset.refs]) 

1198 

1199 # In direct mode we can allow repeated ingests of the same thing 

1200 # if we are sure that the external dataset is immutable. We use 

1201 # UUIDv5 to indicate this. If there is a mix of v4 and v5 they are 

1202 # separated. 

1203 refs_and_infos_replace = [] 

1204 refs_and_infos_insert = [] 

1205 if transfer == "direct": 

1206 for entry in refsAndInfos: 

1207 if entry[0].id.version == 5: 

1208 refs_and_infos_replace.append(entry) 

1209 else: 

1210 refs_and_infos_insert.append(entry) 

1211 else: 

1212 refs_and_infos_insert = refsAndInfos 

1213 

1214 if refs_and_infos_insert: 

1215 self._register_datasets(refs_and_infos_insert, insert_mode=DatabaseInsertMode.INSERT) 

1216 if refs_and_infos_replace: 

1217 self._register_datasets(refs_and_infos_replace, insert_mode=DatabaseInsertMode.REPLACE) 

1218 

1219 def _calculate_ingested_datastore_name( 

1220 self, 

1221 srcUri: ResourcePath, 

1222 ref: DatasetRef, 

1223 formatter: Formatter | FormatterV2 | type[Formatter | FormatterV2] | None = None, 

1224 ) -> Location: 

1225 """Given a source URI and a DatasetRef, determine the name the 

1226 dataset will have inside datastore. 

1227 

1228 Parameters 

1229 ---------- 

1230 srcUri : `lsst.resources.ResourcePath` 

1231 URI to the source dataset file. 

1232 ref : `DatasetRef` 

1233 Ref associated with the newly-ingested dataset artifact. This 

1234 is used to determine the name within the datastore. 

1235 formatter : `Formatter` or Formatter class. 

1236 Formatter to use for validation. Can be a class or an instance. 

1237 No validation of the file extension is performed if the 

1238 ``formatter`` is `None`. This can be used if the caller knows 

1239 that the source URI and target URI will use the same formatter. 

1240 

1241 Returns 

1242 ------- 

1243 location : `Location` 

1244 Target location for the newly-ingested dataset. 

1245 """ 

1246 # Ingesting a file from outside the datastore. 

1247 # This involves a new name. 

1248 template = self.templates.getTemplate(ref) 

1249 location = self.locationFactory.fromPath(template.format(ref), trusted_path=True) 

1250 

1251 # Get the extension 

1252 ext = srcUri.getExtension() 

1253 

1254 # Update the destination to include that extension 

1255 location.updateExtension(ext) 

1256 

1257 # Ask the formatter to validate this extension 

1258 if formatter is not None: 

1259 formatter.validate_extension(location) 

1260 

1261 return location 

1262 

1263 def _write_in_memory_to_artifact( 

1264 self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None 

1265 ) -> StoredFileInfo: 

1266 """Write out in memory dataset to datastore. 

1267 

1268 Parameters 

1269 ---------- 

1270 inMemoryDataset : `object` 

1271 Dataset to write to datastore. 

1272 ref : `DatasetRef` 

1273 Registry information associated with this dataset. 

1274 provenance : `DatasetProvenance` or `None`, optional 

1275 Any provenance that should be attached to the serialized dataset. 

1276 Not supported by all formatters. 

1277 

1278 Returns 

1279 ------- 

1280 info : `StoredFileInfo` 

1281 Information describing the artifact written to the datastore. 

1282 """ 

1283 # May need to coerce the in memory dataset to the correct 

1284 # python type, but first we need to make sure the storage class 

1285 # reflects the one defined in the data repository. 

1286 ref = self._cast_storage_class(ref) 

1287 

1288 # Confirm that we can accept this dataset 

1289 if not self.constraints.isAcceptable(ref): 

1290 # Raise rather than use boolean return value. 

1291 raise DatasetTypeNotSupportedError( 

1292 f"Dataset {ref} has been rejected by this datastore via configuration." 

1293 ) 

1294 

1295 location, formatter = self._determine_put_formatter_location(ref) 

1296 

1297 # The external storage class can differ from the registry storage 

1298 # class AND the given in-memory dataset might not match any of the 

1299 # storage class definitions. 

1300 if formatter.can_accept(inMemoryDataset): 

1301 # Do not need to coerce. Must assume that the formatter can handle 

1302 # it without further checking of types. 

1303 pass 

1304 else: 

1305 # Coerce to a type that it can accept. 

1306 inMemoryDataset = ref.datasetType.storageClass.coerce_type(inMemoryDataset) 

1307 required_pytype = ref.datasetType.storageClass.pytype 

1308 

1309 if not isinstance(inMemoryDataset, required_pytype): 

1310 raise TypeError( 

1311 f"Inconsistency between supplied object ({type(inMemoryDataset)}) " 

1312 f"and storage class type ({required_pytype})" 

1313 ) 

1314 

1315 if self._transaction is None: 

1316 raise RuntimeError("Attempting to write artifact without transaction enabled") 

1317 

1318 def _removeFileExists(uri: ResourcePath) -> None: 

1319 """Remove a file and do not complain if it is not there. 

1320 

1321 This is important since a formatter might fail before the file 

1322 is written and we should not confuse people by writing spurious 

1323 error messages to the log. 

1324 """ 

1325 with contextlib.suppress(FileNotFoundError): 

1326 uri.remove() 

1327 

1328 # Register a callback to try to delete the uploaded data if 

1329 # something fails below 

1330 uri = location.uri 

1331 self._transaction.registerUndo("artifactWrite", _removeFileExists, uri) 

1332 

1333 # Need to record the specified formatter but if this is a V1 formatter 

1334 # we need to convert it to a V2 compatible shim to do the write. 

1335 if not isinstance(formatter, Formatter): 

1336 formatter_compat = formatter 

1337 else: 

1338 formatter_compat = FormatterV1inV2( 

1339 formatter.file_descriptor, 

1340 ref=ref, 

1341 formatter=formatter, 

1342 write_parameters=formatter.write_parameters, 

1343 write_recipes=formatter.write_recipes, 

1344 ) 

1345 

1346 assert isinstance(formatter_compat, FormatterV2) 

1347 

1348 with time_this(log, msg="Writing dataset %s with formatter %s", args=(ref, formatter.name())): 

1349 try: 

1350 formatter_compat.write( 

1351 inMemoryDataset, cache_manager=self.cacheManager, provenance=provenance 

1352 ) 

1353 except Exception as e: 

1354 raise RuntimeError( 

1355 f"Failed to serialize dataset {ref} of type {get_full_type_name(inMemoryDataset)} " 

1356 f"using formatter {formatter.name()}." 

1357 ) from e 

1358 

1359 # URI is needed to resolve what ingest case are we dealing with 

1360 return self._extractIngestInfo(uri, ref, formatter=formatter) 

1361 

1362 def knows(self, ref: DatasetRef) -> bool: 

1363 """Check if the dataset is known to the datastore. 

1364 

1365 Does not check for existence of any artifact. 

1366 

1367 Parameters 

1368 ---------- 

1369 ref : `DatasetRef` 

1370 Reference to the required dataset. 

1371 

1372 Returns 

1373 ------- 

1374 exists : `bool` 

1375 `True` if the dataset is known to the datastore. 

1376 """ 

1377 fileLocations = self._get_dataset_locations_info(ref) 

1378 if fileLocations: 

1379 return True 

1380 return False 

1381 

1382 def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: 

1383 # Docstring inherited from the base class. 

1384 refs = list(refs) 

1385 

1386 # The records themselves. Could be missing some entries. 

1387 records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

1388 

1389 return {ref: ref.id in records for ref in refs} 

1390 

1391 def _process_mexists_records( 

1392 self, 

1393 id_to_ref: dict[DatasetId, DatasetRef], 

1394 records: dict[DatasetId, list[StoredFileInfo]], 

1395 all_required: bool, 

1396 artifact_existence: dict[ResourcePath, bool] | None = None, 

1397 ) -> dict[DatasetRef, bool]: 

1398 """Check given records for existence. 

1399 

1400 Helper function for `mexists()`. 

1401 

1402 Parameters 

1403 ---------- 

1404 id_to_ref : `dict` of [`DatasetId`, `DatasetRef`] 

1405 Mapping of the dataset ID to the dataset ref itself. 

1406 records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`] 

1407 Records as generally returned by 

1408 ``_get_stored_records_associated_with_refs``. 

1409 all_required : `bool` 

1410 Flag to indicate whether existence requires all artifacts 

1411 associated with a dataset ID to exist or not for existence. 

1412 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1413 Optional mapping of datastore artifact to existence. Updated by 

1414 this method with details of all artifacts tested. Can be `None` 

1415 if the caller is not interested. 

1416 

1417 Returns 

1418 ------- 

1419 existence : `dict` of [`DatasetRef`, `bool`] 

1420 Mapping from dataset to boolean indicating existence. 

1421 """ 

1422 # The URIs to be checked and a mapping of those URIs to 

1423 # the dataset ID. 

1424 uris_to_check: list[ResourcePath] = [] 

1425 location_map: dict[ResourcePath, DatasetId] = {} 

1426 

1427 location_factory = self.locationFactory 

1428 

1429 uri_existence: dict[ResourcePath, bool] = {} 

1430 for ref_id, infos in records.items(): 

1431 # Key is the dataset Id, value is list of StoredItemInfo 

1432 uris = [info.file_location(location_factory).uri for info in infos] 

1433 location_map.update({uri: ref_id for uri in uris}) 

1434 

1435 # Check the local cache directly for a dataset corresponding 

1436 # to the remote URI. 

1437 if self.cacheManager.file_count > 0: 

1438 ref = id_to_ref[ref_id] 

1439 for uri, storedFileInfo in zip(uris, infos, strict=True): 

1440 check_ref = ref 

1441 if not ref.datasetType.isComponent() and (component := storedFileInfo.component): 

1442 check_ref = ref.makeComponentRef(component) 

1443 if self.cacheManager.known_to_cache(check_ref, uri.getExtension()): 

1444 # Proxy for URI existence. 

1445 uri_existence[uri] = True 

1446 else: 

1447 uris_to_check.append(uri) 

1448 else: 

1449 # Check all of them. 

1450 uris_to_check.extend(uris) 

1451 

1452 if artifact_existence is not None: 

1453 # If a URI has already been checked remove it from the list 

1454 # and immediately add the status to the output dict. 

1455 filtered_uris_to_check = [] 

1456 for uri in uris_to_check: 

1457 if uri in artifact_existence: 

1458 uri_existence[uri] = artifact_existence[uri] 

1459 else: 

1460 filtered_uris_to_check.append(uri) 

1461 uris_to_check = filtered_uris_to_check 

1462 

1463 # Results. 

1464 dataset_existence: dict[DatasetRef, bool] = {} 

1465 

1466 uri_existence.update(ResourcePath.mexists(uris_to_check)) 

1467 for uri, exists in uri_existence.items(): 

1468 dataset_id = location_map[uri] 

1469 ref = id_to_ref[dataset_id] 

1470 

1471 # Disassembled composite needs to check all locations. 

1472 # all_required indicates whether all need to exist or not. 

1473 if ref in dataset_existence: 

1474 if all_required: 

1475 exists = dataset_existence[ref] and exists 

1476 else: 

1477 exists = dataset_existence[ref] or exists 

1478 dataset_existence[ref] = exists 

1479 

1480 if artifact_existence is not None: 

1481 artifact_existence.update(uri_existence) 

1482 

1483 return dataset_existence 

1484 

1485 def mexists( 

1486 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

1487 ) -> dict[DatasetRef, bool]: 

1488 """Check the existence of multiple datasets at once. 

1489 

1490 Parameters 

1491 ---------- 

1492 refs : `~collections.abc.Iterable` of `DatasetRef` 

1493 The datasets to be checked. 

1494 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1495 Optional mapping of datastore artifact to existence. Updated by 

1496 this method with details of all artifacts tested. Can be `None` 

1497 if the caller is not interested. 

1498 

1499 Returns 

1500 ------- 

1501 existence : `dict` of [`DatasetRef`, `bool`] 

1502 Mapping from dataset to boolean indicating existence. 

1503 

1504 Notes 

1505 ----- 

1506 To minimize potentially costly remote existence checks, the local 

1507 cache is checked as a proxy for existence. If a file for this 

1508 `DatasetRef` does exist no check is done for the actual URI. This 

1509 could result in possibly unexpected behavior if the dataset itself 

1510 has been removed from the datastore by another process whilst it is 

1511 still in the cache. 

1512 """ 

1513 chunk_size = 50_000 

1514 dataset_existence: dict[DatasetRef, bool] = {} 

1515 log.debug("Checking for the existence of multiple artifacts in datastore in chunks of %d", chunk_size) 

1516 n_found_total = 0 

1517 n_checked = 0 

1518 n_chunks = 0 

1519 for chunk in chunk_iterable(refs, chunk_size=chunk_size): 

1520 chunk_result = self._mexists(chunk, artifact_existence) 

1521 

1522 # The log message level and content depend on how many 

1523 # datasets we are processing. 

1524 n_results = len(chunk_result) 

1525 

1526 # Use verbose logging to ensure that messages can be seen 

1527 # easily if many refs are being checked. 

1528 log_threshold = VERBOSE 

1529 n_checked += n_results 

1530 

1531 # This sum can take some time so only do it if we know the 

1532 # result is going to be used. 

1533 n_found = 0 

1534 if log.isEnabledFor(log_threshold): 

1535 # Can treat the booleans as 0, 1 integers and sum them. 

1536 n_found = sum(chunk_result.values()) 

1537 n_found_total += n_found 

1538 

1539 # We are deliberately not trying to count the number of refs 

1540 # provided in case it's in the millions. This means there is a 

1541 # situation where the number of refs exactly matches the chunk 

1542 # size and we will switch to the multi-chunk path even though 

1543 # we only have a single chunk. 

1544 if n_results < chunk_size and n_chunks == 0: 

1545 # Single chunk will be processed so we can provide more detail. 

1546 if n_results == 1: 

1547 ref = list(chunk_result)[0] 

1548 # Use debug logging to be consistent with `exists()`. 

1549 log.debug( 

1550 "Calling mexists() with single ref that does%s exist (%s).", 

1551 "" if chunk_result[ref] else " not", 

1552 ref, 

1553 ) 

1554 else: 

1555 # Single chunk but multiple files. Summarize. 

1556 log.log( 

1557 log_threshold, 

1558 "Number of datasets found in datastore %s: %d out of %d datasets checked.", 

1559 self.name, 

1560 n_found, 

1561 n_checked, 

1562 ) 

1563 

1564 else: 

1565 # Use incremental verbose logging when we have multiple chunks. 

1566 log.log( 

1567 log_threshold, 

1568 "Number of datasets found in datastore for chunk %d: %d out of %d checked " 

1569 "(running total from all chunks so far: %d found out of %d checked)", 

1570 n_chunks, 

1571 n_found, 

1572 n_results, 

1573 n_found_total, 

1574 n_checked, 

1575 ) 

1576 dataset_existence.update(chunk_result) 

1577 n_chunks += 1 

1578 

1579 return dataset_existence 

1580 

1581 def _mexists( 

1582 self, refs: Sequence[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

1583 ) -> dict[DatasetRef, bool]: 

1584 """Check the existence of multiple datasets at once. 

1585 

1586 Parameters 

1587 ---------- 

1588 refs : `~collections.abc.Iterable` of `DatasetRef` 

1589 The datasets to be checked. 

1590 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1591 Optional mapping of datastore artifact to existence. Updated by 

1592 this method with details of all artifacts tested. Can be `None` 

1593 if the caller is not interested. 

1594 

1595 Returns 

1596 ------- 

1597 existence : `dict` of [`DatasetRef`, `bool`] 

1598 Mapping from dataset to boolean indicating existence. 

1599 """ 

1600 # Make a mapping from refs with the internal storage class to the given 

1601 # refs that may have a different one. We'll use the internal refs 

1602 # throughout this method and convert back at the very end. 

1603 internal_ref_to_input_ref = {self._cast_storage_class(ref): ref for ref in refs} 

1604 

1605 # Need a mapping of dataset_id to (internal) dataset ref since some 

1606 # internal APIs work with dataset_id. 

1607 id_to_ref = {ref.id: ref for ref in internal_ref_to_input_ref} 

1608 

1609 # Set of all IDs we are checking for. 

1610 requested_ids = set(id_to_ref.keys()) 

1611 

1612 # The records themselves. Could be missing some entries. 

1613 records = self._get_stored_records_associated_with_refs( 

1614 id_to_ref.values(), ignore_datastore_records=True 

1615 ) 

1616 

1617 dataset_existence = self._process_mexists_records( 

1618 id_to_ref, records, True, artifact_existence=artifact_existence 

1619 ) 

1620 

1621 # Set of IDs that have been handled. 

1622 handled_ids = {ref.id for ref in dataset_existence} 

1623 

1624 missing_ids = requested_ids - handled_ids 

1625 if missing_ids: 

1626 dataset_existence.update( 

1627 self._mexists_check_expected( 

1628 [id_to_ref[missing] for missing in missing_ids], artifact_existence 

1629 ) 

1630 ) 

1631 

1632 return { 

1633 internal_ref_to_input_ref[internal_ref]: existence 

1634 for internal_ref, existence in dataset_existence.items() 

1635 } 

1636 

1637 def _mexists_check_expected( 

1638 self, refs: Sequence[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

1639 ) -> dict[DatasetRef, bool]: 

1640 """Check existence of refs that are not known to datastore. 

1641 

1642 Parameters 

1643 ---------- 

1644 refs : `~collections.abc.Iterable` of `DatasetRef` 

1645 The datasets to be checked. These are assumed not to be known 

1646 to datastore. 

1647 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1648 Optional mapping of datastore artifact to existence. Updated by 

1649 this method with details of all artifacts tested. Can be `None` 

1650 if the caller is not interested. 

1651 

1652 Returns 

1653 ------- 

1654 existence : `dict` of [`DatasetRef`, `bool`] 

1655 Mapping from dataset to boolean indicating existence. 

1656 """ 

1657 dataset_existence: dict[DatasetRef, bool] = {} 

1658 if not self.trustGetRequest: 

1659 # Must assume these do not exist 

1660 for ref in refs: 

1661 dataset_existence[ref] = False 

1662 else: 

1663 log.debug( 

1664 "%d datasets were not known to datastore during initial existence check.", 

1665 len(refs), 

1666 ) 

1667 

1668 # Construct data structure identical to that returned 

1669 # by _get_stored_records_associated_with_refs() but using 

1670 # guessed names. 

1671 records = {} 

1672 id_to_ref = {} 

1673 for missing_ref in refs: 

1674 expected = self._get_expected_dataset_locations_info(missing_ref) 

1675 dataset_id = missing_ref.id 

1676 records[dataset_id] = [info for _, info in expected] 

1677 id_to_ref[dataset_id] = missing_ref 

1678 

1679 dataset_existence.update( 

1680 self._process_mexists_records( 

1681 id_to_ref, 

1682 records, 

1683 False, 

1684 artifact_existence=artifact_existence, 

1685 ) 

1686 ) 

1687 

1688 return dataset_existence 

1689 

1690 def exists(self, ref: DatasetRef) -> bool: 

1691 """Check if the dataset exists in the datastore. 

1692 

1693 Parameters 

1694 ---------- 

1695 ref : `DatasetRef` 

1696 Reference to the required dataset. 

1697 

1698 Returns 

1699 ------- 

1700 exists : `bool` 

1701 `True` if the entity exists in the `Datastore`. 

1702 

1703 Notes 

1704 ----- 

1705 The local cache is checked as a proxy for existence in the remote 

1706 object store. It is possible that another process on a different 

1707 compute node could remove the file from the object store even 

1708 though it is present in the local cache. 

1709 """ 

1710 ref = self._cast_storage_class(ref) 

1711 # We cannot trust datastore records from ref, as many unit tests delete 

1712 # datasets and check their existence. 

1713 fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True) 

1714 

1715 # if we are being asked to trust that registry might not be correct 

1716 # we ask for the expected locations and check them explicitly 

1717 if not fileLocations: 

1718 if not self.trustGetRequest: 

1719 return False 

1720 

1721 # First check the cache. If it is not found we must check 

1722 # the datastore itself. Assume that any component in the cache 

1723 # means that the dataset does exist somewhere. 

1724 if self.cacheManager.known_to_cache(ref): 

1725 return True 

1726 

1727 # When we are guessing a dataset location we can not check 

1728 # for the existence of every component since we can not 

1729 # know if every component was written. Instead we check 

1730 # for the existence of any of the expected locations. 

1731 for location, _ in self._get_expected_dataset_locations_info(ref): 

1732 if self._artifact_exists(location): 

1733 return True 

1734 return False 

1735 

1736 # All listed artifacts must exist. 

1737 for location, storedFileInfo in fileLocations: 

1738 # Checking in cache needs the component ref. 

1739 check_ref = ref 

1740 if not ref.datasetType.isComponent() and (component := storedFileInfo.component): 

1741 check_ref = ref.makeComponentRef(component) 

1742 if self.cacheManager.known_to_cache(check_ref, location.getExtension()): 

1743 continue 

1744 

1745 if not self._artifact_exists(location): 

1746 return False 

1747 

1748 return True 

1749 

1750 def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs: 

1751 """Return URIs associated with dataset. 

1752 

1753 Parameters 

1754 ---------- 

1755 ref : `DatasetRef` 

1756 Reference to the required dataset. 

1757 predict : `bool`, optional 

1758 If the datastore does not know about the dataset, controls whether 

1759 it should return a predicted URI or not. 

1760 

1761 Returns 

1762 ------- 

1763 uris : `DatasetRefURIs` 

1764 The URI to the primary artifact associated with this dataset (if 

1765 the dataset was disassembled within the datastore this may be 

1766 `None`), and the URIs to any components associated with the dataset 

1767 artifact. (can be empty if there are no components). 

1768 """ 

1769 many = self.getManyURIs([ref], predict=predict, allow_missing=False) 

1770 return many[ref] 

1771 

1772 def getURI(self, ref: DatasetRef, predict: bool = False) -> ResourcePath: 

1773 """URI to the Dataset. 

1774 

1775 Parameters 

1776 ---------- 

1777 ref : `DatasetRef` 

1778 Reference to the required Dataset. 

1779 predict : `bool` 

1780 If `True`, allow URIs to be returned of datasets that have not 

1781 been written. 

1782 

1783 Returns 

1784 ------- 

1785 uri : `str` 

1786 URI pointing to the dataset within the datastore. If the 

1787 dataset does not exist in the datastore, and if ``predict`` is 

1788 `True`, the URI will be a prediction and will include a URI 

1789 fragment "#predicted". 

1790 If the datastore does not have entities that relate well 

1791 to the concept of a URI the returned URI will be 

1792 descriptive. The returned URI is not guaranteed to be obtainable. 

1793 

1794 Raises 

1795 ------ 

1796 FileNotFoundError 

1797 Raised if a URI has been requested for a dataset that does not 

1798 exist and guessing is not allowed. 

1799 RuntimeError 

1800 Raised if a request is made for a single URI but multiple URIs 

1801 are associated with this dataset. 

1802 

1803 Notes 

1804 ----- 

1805 When a predicted URI is requested an attempt will be made to form 

1806 a reasonable URI based on file templates and the expected formatter. 

1807 """ 

1808 primary, components = self.getURIs(ref, predict) 

1809 if primary is None or components: 

1810 raise RuntimeError( 

1811 f"Dataset ({ref}) includes distinct URIs for components. Use Datastore.getURIs() instead." 

1812 ) 

1813 return primary 

1814 

1815 def _predict_URIs( 

1816 self, 

1817 ref: DatasetRef, 

1818 ) -> DatasetRefURIs: 

1819 """Predict the URIs of a dataset ref. 

1820 

1821 Parameters 

1822 ---------- 

1823 ref : `DatasetRef` 

1824 Reference to the required Dataset. 

1825 

1826 Returns 

1827 ------- 

1828 URI : DatasetRefUris 

1829 Primary and component URIs. URIs will contain a URI fragment 

1830 "#predicted". 

1831 """ 

1832 uris = DatasetRefURIs() 

1833 

1834 if self.composites.shouldBeDisassembled(ref): 

1835 for component, _ in ref.datasetType.storageClass.components.items(): 

1836 comp_ref = ref.makeComponentRef(component) 

1837 comp_location, _ = self._determine_put_formatter_location(comp_ref) 

1838 

1839 # Add the "#predicted" URI fragment to indicate this is a 

1840 # guess 

1841 uris.componentURIs[component] = ResourcePath( 

1842 comp_location.uri.geturl() + "#predicted", forceDirectory=comp_location.uri.dirLike 

1843 ) 

1844 

1845 else: 

1846 location, _ = self._determine_put_formatter_location(ref) 

1847 

1848 # Add the "#predicted" URI fragment to indicate this is a guess 

1849 uris.primaryURI = ResourcePath( 

1850 location.uri.geturl() + "#predicted", forceDirectory=location.uri.dirLike 

1851 ) 

1852 

1853 return uris 

1854 

1855 def getManyURIs( 

1856 self, 

1857 refs: Iterable[DatasetRef], 

1858 predict: bool = False, 

1859 allow_missing: bool = False, 

1860 ) -> dict[DatasetRef, DatasetRefURIs]: 

1861 # Docstring inherited 

1862 

1863 uris: dict[DatasetRef, DatasetRefURIs] = {} 

1864 

1865 records = self._get_stored_records_associated_with_refs(refs) 

1866 records_keys = records.keys() 

1867 

1868 existing_refs = tuple(ref for ref in refs if ref.id in records_keys) 

1869 missing_refs = tuple(ref for ref in refs if ref.id not in records_keys) 

1870 

1871 # Have to handle trustGetRequest mode by checking for the existence 

1872 # of the missing refs on disk. 

1873 if missing_refs and not predict: 

1874 dataset_existence = self._mexists_check_expected(missing_refs, None) 

1875 really_missing = set() 

1876 not_missing = set() 

1877 for ref, exists in dataset_existence.items(): 

1878 if exists: 

1879 not_missing.add(ref) 

1880 else: 

1881 really_missing.add(ref) 

1882 

1883 if not_missing: 

1884 # Need to recalculate the missing/existing split. 

1885 existing_refs = existing_refs + tuple(not_missing) 

1886 missing_refs = tuple(really_missing) 

1887 

1888 for ref in missing_refs: 

1889 # if this has never been written then we have to guess 

1890 if not predict: 

1891 if not allow_missing: 

1892 raise FileNotFoundError(f"Dataset {ref} not in this datastore.") 

1893 else: 

1894 uris[ref] = self._predict_URIs(ref) 

1895 

1896 for ref in existing_refs: 

1897 file_infos = records[ref.id] 

1898 file_locations = [(i.file_location(self.locationFactory), i) for i in file_infos] 

1899 uris[ref] = self._locations_to_URI(ref, file_locations) 

1900 

1901 return uris 

1902 

1903 def _locations_to_URI( 

1904 self, 

1905 ref: DatasetRef, 

1906 file_locations: Sequence[tuple[Location, StoredFileInfo]], 

1907 ) -> DatasetRefURIs: 

1908 """Convert one or more file locations associated with a DatasetRef 

1909 to a DatasetRefURIs. 

1910 

1911 Parameters 

1912 ---------- 

1913 ref : `DatasetRef` 

1914 Reference to the dataset. 

1915 file_locations : Sequence[Tuple[Location, StoredFileInfo]] 

1916 Each item in the sequence is the location of the dataset within the 

1917 datastore and stored information about the file and its formatter. 

1918 If there is only one item in the sequence then it is treated as the 

1919 primary URI. If there is more than one item then they are treated 

1920 as component URIs. If there are no items then an error is raised 

1921 unless ``self.trustGetRequest`` is `True`. 

1922 

1923 Returns 

1924 ------- 

1925 uris: DatasetRefURIs 

1926 Represents the primary URI or component URIs described by the 

1927 inputs. 

1928 

1929 Raises 

1930 ------ 

1931 RuntimeError 

1932 If no file locations are passed in and ``self.trustGetRequest`` is 

1933 `False`. 

1934 FileNotFoundError 

1935 If the a passed-in URI does not exist, and ``self.trustGetRequest`` 

1936 is `False`. 

1937 RuntimeError 

1938 If a passed in `StoredFileInfo`'s ``component`` is `None` (this is 

1939 unexpected). 

1940 """ 

1941 guessing = False 

1942 uris = DatasetRefURIs() 

1943 

1944 if not file_locations: 

1945 if not self.trustGetRequest: 

1946 raise RuntimeError(f"Unexpectedly got no artifacts for dataset {ref}") 

1947 file_locations = self._get_expected_dataset_locations_info(ref) 

1948 guessing = True 

1949 

1950 if len(file_locations) == 1: 

1951 # No disassembly so this is the primary URI 

1952 uris.primaryURI = file_locations[0][0].uri 

1953 if guessing and not uris.primaryURI.exists(): 

1954 raise FileNotFoundError(f"Expected URI ({uris.primaryURI}) does not exist") 

1955 else: 

1956 for location, file_info in file_locations: 

1957 if file_info.component is None: 

1958 raise RuntimeError(f"Unexpectedly got no component name for a component at {location}") 

1959 if guessing and not location.uri.exists(): 

1960 # If we are trusting then it is entirely possible for 

1961 # some components to be missing. In that case we skip 

1962 # to the next component. 

1963 if self.trustGetRequest: 

1964 continue 

1965 raise FileNotFoundError(f"Expected URI ({location.uri}) does not exist") 

1966 uris.componentURIs[file_info.component] = location.uri 

1967 

1968 return uris 

1969 

1970 def _find_missing_records( 

1971 self, 

1972 refs: Iterable[DatasetRef], 

1973 missing_ids: set[DatasetId], 

1974 artifact_existence: dict[ResourcePath, bool] | None = None, 

1975 warn_for_missing: bool = True, 

1976 ) -> dict[DatasetId, list[StoredFileInfo]]: 

1977 if not missing_ids: 

1978 return {} 

1979 

1980 if artifact_existence is None: 

1981 artifact_existence = {} 

1982 

1983 found_records: dict[DatasetId, list[StoredFileInfo]] = defaultdict(list) 

1984 id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids} 

1985 

1986 # This should be chunked in case we end up having to check 

1987 # the file store since we need some log output to show 

1988 # progress. 

1989 chunk_size = 50_000 

1990 for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=chunk_size): 

1991 records = {} 

1992 for missing in missing_ids_chunk: 

1993 # Ask the source datastore where the missing artifacts 

1994 # should be. An execution butler might not know about the 

1995 # artifacts even if they are there. 

1996 expected = self._get_expected_dataset_locations_info(id_to_ref[missing]) 

1997 records[missing] = [info for _, info in expected] 

1998 

1999 # Call the mexist helper method in case we have not already 

2000 # checked these artifacts such that artifact_existence is 

2001 # empty. This allows us to benefit from parallelism. 

2002 # datastore.mexists() itself does not give us access to the 

2003 # derived datastore record. 

2004 log.verbose("Checking existence of %d datasets unknown to datastore", len(records)) 

2005 ref_exists = self._process_mexists_records( 

2006 id_to_ref, records, False, artifact_existence=artifact_existence 

2007 ) 

2008 

2009 # Now go through the records and propagate the ones that exist. 

2010 location_factory = self.locationFactory 

2011 for missing, record_list in records.items(): 

2012 # Skip completely if the ref does not exist. 

2013 ref = id_to_ref[missing] 

2014 if not ref_exists[ref]: 

2015 if warn_for_missing: 

2016 log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref) 

2017 continue 

2018 # Check for file artifact to decide which parts of a 

2019 # disassembled composite do exist. If there is only a 

2020 # single record we don't even need to look because it can't 

2021 # be a composite and must exist. 

2022 if len(record_list) == 1: 

2023 dataset_records = record_list 

2024 else: 

2025 dataset_records = [ 

2026 record 

2027 for record in record_list 

2028 if artifact_existence[record.file_location(location_factory).uri] 

2029 ] 

2030 assert len(dataset_records) > 0, "Disassembled composite should have had some files." 

2031 

2032 # Rely on source_records being a defaultdict. 

2033 found_records[missing].extend(dataset_records) 

2034 log.verbose("Completed scan for missing data files") 

2035 return found_records 

2036 

2037 def retrieveArtifacts( 

2038 self, 

2039 refs: Iterable[DatasetRef], 

2040 destination: ResourcePath, 

2041 transfer: str = "auto", 

2042 preserve_path: bool = True, 

2043 overwrite: bool = False, 

2044 write_index: bool = True, 

2045 add_prefix: bool = False, 

2046 ) -> dict[ResourcePath, ArtifactIndexInfo]: 

2047 """Retrieve the file artifacts associated with the supplied refs. 

2048 

2049 Parameters 

2050 ---------- 

2051 refs : `~collections.abc.Iterable` of `DatasetRef` 

2052 The datasets for which file artifacts are to be retrieved. 

2053 A single ref can result in multiple files. The refs must 

2054 be resolved. 

2055 destination : `lsst.resources.ResourcePath` 

2056 Location to write the file artifacts. 

2057 transfer : `str`, optional 

2058 Method to use to transfer the artifacts. Must be one of the options 

2059 supported by `lsst.resources.ResourcePath.transfer_from`. 

2060 "move" is not allowed. 

2061 preserve_path : `bool`, optional 

2062 If `True` the full path of the file artifact within the datastore 

2063 is preserved. If `False` the final file component of the path 

2064 is used. 

2065 overwrite : `bool`, optional 

2066 If `True` allow transfers to overwrite existing files at the 

2067 destination. 

2068 write_index : `bool`, optional 

2069 If `True` write a file at the top level containing a serialization 

2070 of a `ZipIndex` for the downloaded datasets. 

2071 add_prefix : `bool`, optional 

2072 If `True` and if ``preserve_path`` is `False`, apply a prefix to 

2073 the filenames corresponding to some part of the dataset ref ID. 

2074 This can be used to guarantee uniqueness. 

2075 

2076 Returns 

2077 ------- 

2078 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ 

2079 `ArtifactIndexInfo` ] 

2080 Mapping of retrieved file to associated index information. 

2081 """ 

2082 if not destination.isdir(): 

2083 raise ValueError(f"Destination location must refer to a directory. Given {destination}") 

2084 

2085 if transfer == "move": 

2086 raise ValueError("Can not move artifacts out of datastore. Use copy instead.") 

2087 

2088 # Source -> Destination 

2089 # This also helps filter out duplicate DatasetRef in the request 

2090 # that will map to the same underlying file transfer. 

2091 to_transfer: dict[ResourcePath, ResourcePath] = {} 

2092 zips_to_transfer: set[ResourcePath] = set() 

2093 

2094 # Retrieve all the records in bulk indexed by ref.id. 

2095 records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

2096 

2097 # Check for missing records. 

2098 known_ids = set(records) 

2099 log.debug("Number of datastore records found in database: %d", len(known_ids)) 

2100 requested_ids = {ref.id for ref in refs} 

2101 missing_ids = requested_ids - known_ids 

2102 

2103 if missing_ids and not self.trustGetRequest: 

2104 raise ValueError(f"Number of datasets missing from this datastore: {len(missing_ids)}") 

2105 

2106 missing_records = self._find_missing_records(refs, missing_ids) 

2107 records.update(missing_records) 

2108 

2109 # One artifact can be used by multiple DatasetRef. 

2110 # e.g. DECam. 

2111 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} 

2112 # Sort to ensure that in many refs to one file situation the same 

2113 # ref is used for any prefix that might be added. 

2114 for ref in sorted(refs): 

2115 prefix = str(ref.id)[:8] + "-" if add_prefix else "" 

2116 for info in records[ref.id]: 

2117 location = info.file_location(self.locationFactory) 

2118 source_uri = location.uri 

2119 # For DECam/zip we only want to copy once. 

2120 # For zip files we need to unpack so that they can be 

2121 # zipped up again if needed. 

2122 is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment 

2123 # We need to remove fragments for consistency. 

2124 cleaned_source_uri = source_uri.replace(fragment="", query="", params="") 

2125 if is_zip: 

2126 # Assume the DatasetRef definitions are within the Zip 

2127 # file itself and so can be dropped from loop. 

2128 zips_to_transfer.add(cleaned_source_uri) 

2129 elif cleaned_source_uri not in to_transfer: 

2130 target_uri = determine_destination_for_retrieved_artifact( 

2131 destination, location.pathInStore, preserve_path, prefix 

2132 ) 

2133 to_transfer[cleaned_source_uri] = target_uri 

2134 artifact_map[target_uri] = ArtifactIndexInfo.from_single(info.to_simple(), ref.id) 

2135 else: 

2136 target_uri = to_transfer[cleaned_source_uri] 

2137 artifact_map[target_uri].append(ref.id) 

2138 

2139 # Parallelize the transfer. Re-raise as a single exception if 

2140 # a FileExistsError is encountered anywhere. 

2141 log.debug("Number of artifacts to transfer to %s: %d", str(destination), len(to_transfer)) 

2142 try: 

2143 ResourcePath.mtransfer(transfer, tuple(to_transfer.items()), overwrite=overwrite) 

2144 except* FileExistsError as egroup: 

2145 raise FileExistsError( 

2146 "Some files already exist in destination directory and overwrite is False" 

2147 ) from egroup 

2148 

2149 # Transfer the Zip files and unpack them. 

2150 zipped_artifacts = unpack_zips(zips_to_transfer, requested_ids, destination, preserve_path, overwrite) 

2151 artifact_map.update(zipped_artifacts) 

2152 

2153 if write_index: 

2154 index = ZipIndex.from_artifact_map(refs, artifact_map, destination) 

2155 index.write_index(destination) 

2156 

2157 return artifact_map 

2158 

2159 def ingest_zip( 

2160 self, 

2161 zip_path: ResourcePath, 

2162 transfer: str | None, 

2163 *, 

2164 dry_run: bool = False, 

2165 ) -> None: 

2166 """Ingest an indexed Zip file and contents. 

2167 

2168 The Zip file must have an index file as created by `retrieveArtifacts`. 

2169 

2170 Parameters 

2171 ---------- 

2172 zip_path : `lsst.resources.ResourcePath` 

2173 Path to the Zip file. 

2174 transfer : `str` 

2175 Method to use for transferring the Zip file into the datastore. 

2176 dry_run : `bool`, optional 

2177 If `True` the ingest will be processed without any modifications 

2178 made to the target datastore and as if the target datastore did not 

2179 have any of the datasets. 

2180 

2181 Notes 

2182 ----- 

2183 Datastore constraints are bypassed with Zip ingest. A zip file can 

2184 contain multiple dataset types. Should the entire Zip be rejected 

2185 if one dataset type is in the constraints list? 

2186 

2187 If any dataset is already present in the datastore the entire ingest 

2188 will fail. 

2189 """ 

2190 index = ZipIndex.from_zip_file(zip_path) 

2191 

2192 # Refs indexed by UUID. 

2193 refs = index.refs.to_refs(universe=self.universe) 

2194 id_to_ref = {ref.id: ref for ref in refs} 

2195 

2196 # Any failing constraints trigger entire failure. 

2197 if any(not self.constraints.isAcceptable(ref) for ref in refs): 

2198 raise DatasetTypeNotSupportedError( 

2199 "Some refs in the Zip file are not supported by this datastore" 

2200 ) 

2201 

2202 # Transfer the Zip file into the datastore file system. 

2203 # There is no RUN as such to use for naming. 

2204 # Potentially could use the RUN from the first ref in the index 

2205 # There is no requirement that the contents of the Zip files share 

2206 # the same RUN. 

2207 # Could use the Zip UUID from the index + special "zips/" prefix. 

2208 if transfer is None: 

2209 # Indicated that the zip file is already in the right place. 

2210 if not zip_path.isabs(): 

2211 tgtLocation = self.locationFactory.fromPath(zip_path.ospath, trusted_path=False) 

2212 else: 

2213 pathInStore = zip_path.relative_to(self.root) 

2214 if pathInStore is None: 

2215 raise RuntimeError( 

2216 f"Unexpectedly learned that {zip_path} is not within datastore {self.root}" 

2217 ) 

2218 tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True) 

2219 elif transfer == "direct": 

2220 # Reference in original location. 

2221 tgtLocation = None 

2222 else: 

2223 # Name the zip file based on index contents. 

2224 tgtLocation = self.locationFactory.fromPath(index.calculate_zip_file_path_in_store()) 

2225 

2226 # Transfer the Zip file into the datastore. 

2227 if not dry_run: 

2228 tgtLocation.uri.transfer_from( 

2229 zip_path, transfer=transfer, transaction=self._transaction, overwrite=True 

2230 ) 

2231 else: 

2232 log.info("Would be copying Zip from %s to %s", zip_path, tgtLocation) 

2233 

2234 if tgtLocation is None: 

2235 path_in_store = str(zip_path) 

2236 else: 

2237 path_in_store = tgtLocation.pathInStore.path 

2238 

2239 # Associate each file with a (DatasetRef, StoredFileInfo) tuple. 

2240 artifacts: list[tuple[DatasetRef, StoredFileInfo]] = [] 

2241 for path_in_zip, index_info in index.artifact_map.items(): 

2242 # Need to modify the info to include the path to the Zip file 

2243 # that was previously written to the datastore. 

2244 index_info.info.path = f"{path_in_store}#zip-path={path_in_zip}" 

2245 

2246 info = StoredFileInfo.from_simple(index_info.info) 

2247 for id_ in index_info.ids: 

2248 artifacts.append((id_to_ref[id_], info)) 

2249 

2250 if not dry_run: 

2251 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) 

2252 else: 

2253 log.info("Would be registering %d artifacts from Zip into datastore", len(artifacts)) 

2254 

2255 def get( 

2256 self, 

2257 ref: DatasetRef, 

2258 parameters: Mapping[str, Any] | None = None, 

2259 storageClass: StorageClass | str | None = None, 

2260 ) -> Any: 

2261 """Load an InMemoryDataset from the store. 

2262 

2263 Parameters 

2264 ---------- 

2265 ref : `DatasetRef` 

2266 Reference to the required Dataset. 

2267 parameters : `dict` 

2268 `StorageClass`-specific parameters that specify, for example, 

2269 a slice of the dataset to be loaded. 

2270 storageClass : `StorageClass` or `str`, optional 

2271 The storage class to be used to override the Python type 

2272 returned by this method. By default the returned type matches 

2273 the dataset type definition for this dataset. Specifying a 

2274 read `StorageClass` can force a different type to be returned. 

2275 This type must be compatible with the original type. 

2276 

2277 Returns 

2278 ------- 

2279 inMemoryDataset : `object` 

2280 Requested dataset or slice thereof as an InMemoryDataset. 

2281 

2282 Raises 

2283 ------ 

2284 FileNotFoundError 

2285 Requested dataset can not be retrieved. 

2286 TypeError 

2287 Return value from formatter has unexpected type. 

2288 ValueError 

2289 Formatter failed to process the dataset. 

2290 """ 

2291 # Supplied storage class for the component being read is either 

2292 # from the ref itself or some an override if we want to force 

2293 # type conversion. 

2294 if storageClass is not None: 

2295 ref = ref.overrideStorageClass(storageClass) 

2296 

2297 allGetInfo = self._prepare_for_direct_get(ref, parameters) 

2298 return get_dataset_as_python_object_from_get_info( 

2299 allGetInfo, ref=ref, parameters=parameters, cache_manager=self.cacheManager 

2300 ) 

2301 

2302 def prepare_get_for_external_client(self, ref: DatasetRef) -> list[DatasetLocationInformation] | None: 

2303 # Docstring inherited 

2304 

2305 locations = self._get_dataset_locations_info(ref) 

2306 if len(locations) == 0: 

2307 return None 

2308 

2309 return locations 

2310 

2311 @transactional 

2312 def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None: 

2313 """Write a InMemoryDataset with a given `DatasetRef` to the store. 

2314 

2315 Parameters 

2316 ---------- 

2317 inMemoryDataset : `object` 

2318 The dataset to store. 

2319 ref : `DatasetRef` 

2320 Reference to the associated Dataset. 

2321 provenance : `DatasetProvenance` or `None`, optional 

2322 Any provenance that should be attached to the serialized dataset. 

2323 Can be ignored by a formatter or delegate. 

2324 

2325 Raises 

2326 ------ 

2327 TypeError 

2328 Supplied object and storage class are inconsistent. 

2329 DatasetTypeNotSupportedError 

2330 The associated `DatasetType` is not handled by this datastore. 

2331 

2332 Notes 

2333 ----- 

2334 If the datastore is configured to reject certain dataset types it 

2335 is possible that the put will fail and raise a 

2336 `DatasetTypeNotSupportedError`. The main use case for this is to 

2337 allow `ChainedDatastore` to put to multiple datastores without 

2338 requiring that every datastore accepts the dataset. 

2339 """ 

2340 doDisassembly = self.composites.shouldBeDisassembled(ref) 

2341 # doDisassembly = True 

2342 

2343 artifacts = [] 

2344 if doDisassembly: 

2345 inMemoryDataset = ref.datasetType.storageClass.delegate().add_provenance( 

2346 inMemoryDataset, ref, provenance=provenance 

2347 ) 

2348 components = ref.datasetType.storageClass.delegate().disassemble(inMemoryDataset) 

2349 if components is None: 

2350 raise RuntimeError( 

2351 f"Inconsistent configuration: dataset type {ref.datasetType.name} " 

2352 f"with storage class {ref.datasetType.storageClass.name} " 

2353 "is configured to be disassembled, but cannot be." 

2354 ) 

2355 for component, componentInfo in components.items(): 

2356 # Don't recurse because we want to take advantage of 

2357 # bulk insert -- need a new DatasetRef that refers to the 

2358 # same dataset_id but has the component DatasetType 

2359 # DatasetType does not refer to the types of components 

2360 # So we construct one ourselves. 

2361 compRef = ref.makeComponentRef(component) 

2362 # Provenance has already been attached above. 

2363 storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef) 

2364 artifacts.append((compRef, storedInfo)) 

2365 else: 

2366 # Write the entire thing out 

2367 storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref, provenance=provenance) 

2368 artifacts.append((ref, storedInfo)) 

2369 

2370 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) 

2371 

2372 @transactional 

2373 def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: 

2374 doDisassembly = self.composites.shouldBeDisassembled(ref) 

2375 # doDisassembly = True 

2376 

2377 artifacts = [] 

2378 if doDisassembly: 

2379 components = ref.datasetType.storageClass.delegate().disassemble(in_memory_dataset) 

2380 if components is None: 

2381 raise RuntimeError( 

2382 f"Inconsistent configuration: dataset type {ref.datasetType.name} " 

2383 f"with storage class {ref.datasetType.storageClass.name} " 

2384 "is configured to be disassembled, but cannot be." 

2385 ) 

2386 for component, componentInfo in components.items(): 

2387 # Don't recurse because we want to take advantage of 

2388 # bulk insert -- need a new DatasetRef that refers to the 

2389 # same dataset_id but has the component DatasetType 

2390 # DatasetType does not refer to the types of components 

2391 # So we construct one ourselves. 

2392 compRef = ref.makeComponentRef(component) 

2393 storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef) 

2394 artifacts.append((compRef, storedInfo)) 

2395 else: 

2396 # Write the entire thing out 

2397 storedInfo = self._write_in_memory_to_artifact(in_memory_dataset, ref) 

2398 artifacts.append((ref, storedInfo)) 

2399 

2400 ref_records: DatasetDatastoreRecords = {self._opaque_table_name: [info for _, info in artifacts]} 

2401 ref = ref.replace(datastore_records=ref_records) 

2402 return {self.name: ref} 

2403 

2404 @transactional 

2405 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None: 

2406 # At this point can safely remove these datasets from the cache 

2407 # to avoid confusion later on. If they are not trashed later 

2408 # the cache will simply be refilled. 

2409 self.cacheManager.remove_from_cache(ref) 

2410 

2411 # If we are in trust mode there will be nothing to move to 

2412 # the trash table and we will have to try to delete the file 

2413 # immediately. 

2414 if self.trustGetRequest: 

2415 # Try to keep the logic below for a single file trash. 

2416 if isinstance(ref, DatasetRef): 

2417 refs = {ref} 

2418 else: 

2419 # Will recreate ref at the end of this branch. 

2420 refs = set(ref) 

2421 

2422 # Determine which datasets are known to datastore directly. 

2423 id_to_ref = {ref.id: ref for ref in refs} 

2424 existing_ids = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

2425 existing_refs = {id_to_ref[ref_id] for ref_id in existing_ids} 

2426 

2427 missing = refs - existing_refs 

2428 if missing: 

2429 # Do an explicit existence check on these refs. 

2430 # We only care about the artifacts at this point and not 

2431 # the dataset existence. 

2432 artifact_existence: dict[ResourcePath, bool] = {} 

2433 _ = self.mexists(missing, artifact_existence) 

2434 uris = [uri for uri, exists in artifact_existence.items() if exists] 

2435 

2436 # FUTURE UPGRADE: Implement a parallelized bulk remove. 

2437 log.debug("Removing %d artifacts from datastore that are unknown to datastore", len(uris)) 

2438 for uri in uris: 

2439 try: 

2440 uri.remove() 

2441 except Exception as e: 

2442 if ignore_errors: 

2443 log.debug("Artifact %s could not be removed: %s", uri, e) 

2444 continue 

2445 raise 

2446 

2447 # There is no point asking the code below to remove refs we 

2448 # know are missing so update it with the list of existing 

2449 # records. Try to retain one vs many logic. 

2450 if not existing_refs: 

2451 # Nothing more to do since none of the datasets were 

2452 # known to the datastore record table. 

2453 return 

2454 ref = list(existing_refs) 

2455 if len(ref) == 1: 

2456 ref = ref[0] 

2457 

2458 # Get file metadata and internal metadata 

2459 if not isinstance(ref, DatasetRef): 

2460 log.debug("Doing multi-dataset trash in datastore %s", self.name) 

2461 # Assumed to be an iterable of refs so bulk mode enabled. 

2462 try: 

2463 self.bridge.moveToTrash(ref, transaction=self._transaction) 

2464 except Exception as e: 

2465 if ignore_errors: 

2466 log.warning("Unexpected issue moving multiple datasets to trash: %s", e) 

2467 else: 

2468 raise 

2469 return 

2470 

2471 log.debug("Trashing dataset %s in datastore %s", ref, self.name) 

2472 

2473 fileLocations = self._get_dataset_locations_info(ref) 

2474 

2475 if not fileLocations: 

2476 err_msg = f"Requested dataset to trash ({ref}) is not known to datastore {self.name}" 

2477 if ignore_errors: 

2478 log.warning(err_msg) 

2479 return 

2480 else: 

2481 raise FileNotFoundError(err_msg) 

2482 

2483 for location, _ in fileLocations: 

2484 if not self._artifact_exists(location): 

2485 err_msg = ( 

2486 f"Dataset is known to datastore {self.name} but " 

2487 f"associated artifact ({location.uri}) is missing" 

2488 ) 

2489 if ignore_errors: 

2490 log.warning(err_msg) 

2491 return 

2492 else: 

2493 raise FileNotFoundError(err_msg) 

2494 

2495 # Mark dataset as trashed 

2496 try: 

2497 self.bridge.moveToTrash([ref], transaction=self._transaction) 

2498 except Exception as e: 

2499 if ignore_errors: 

2500 log.warning( 

2501 "Attempted to mark dataset (%s) to be trashed in datastore %s " 

2502 "but encountered an error: %s", 

2503 ref, 

2504 self.name, 

2505 e, 

2506 ) 

2507 pass 

2508 else: 

2509 raise 

2510 

2511 def emptyTrash( 

2512 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False 

2513 ) -> set[ResourcePath]: 

2514 """Remove all datasets from the trash. 

2515 

2516 Parameters 

2517 ---------- 

2518 ignore_errors : `bool` 

2519 If `True` return without error even if something went wrong. 

2520 Problems could occur if another process is simultaneously trying 

2521 to delete. 

2522 refs : `collections.abc.Collection` [ `DatasetRef` ] or `None` 

2523 Explicit list of datasets that can be removed from trash. If listed 

2524 datasets are not already stored in the trash table they will be 

2525 ignored. If `None` every entry in the trash table will be 

2526 processed. 

2527 dry_run : `bool`, optional 

2528 If `True`, the trash table will be queried and results reported 

2529 but no artifacts will be removed. 

2530 

2531 Returns 

2532 ------- 

2533 removed : `set` [ `lsst.resources.ResourcePath` ] 

2534 List of artifacts that were removed. 

2535 

2536 Notes 

2537 ----- 

2538 Will empty the records from the trash tables only if this call finishes 

2539 without raising. 

2540 """ 

2541 removed = set() 

2542 if refs: 

2543 selected_ids = {ref.id for ref in refs} 

2544 chunk_size = 50_000 

2545 n_chunks = math.ceil(len(selected_ids) / chunk_size) 

2546 chunk_num = 0 

2547 for chunk in chunk_iterable(selected_ids, chunk_size=chunk_size): 

2548 chunk_num += 1 

2549 if n_chunks == 1: 

2550 log.verbose( 

2551 "Emptying datastore trash for %d dataset%s", 

2552 len(chunk), 

2553 "s" if len(chunk) != 1 else "", 

2554 ) 

2555 else: 

2556 log.verbose( 

2557 "Emptying datastore trash for chunk %d out of %d of size %d", 

2558 chunk_num, 

2559 n_chunks, 

2560 len(chunk), 

2561 ) 

2562 removed.update( 

2563 self._empty_trash_subset(ignore_errors=ignore_errors, selected_ids=chunk, dry_run=dry_run) 

2564 ) 

2565 else: 

2566 log.verbose("Emptying all trash in datastore %s", self.name) 

2567 removed = self._empty_trash_subset(ignore_errors=ignore_errors, dry_run=dry_run) 

2568 log.info( 

2569 "%sRemoved %d file artifact%s from datastore %s", 

2570 "Would have " if dry_run else "", 

2571 len(removed), 

2572 "s" if len(removed) != 1 else "", 

2573 self.name, 

2574 ) 

2575 return removed 

2576 

2577 @transactional 

2578 def _empty_trash_subset( 

2579 self, 

2580 *, 

2581 ignore_errors: bool = True, 

2582 selected_ids: Collection[DatasetId] | None = None, 

2583 dry_run: bool = False, 

2584 ) -> set[ResourcePath]: 

2585 """Empty trash table in transaction. 

2586 

2587 Parameters 

2588 ---------- 

2589 ignore_errors : `bool` 

2590 If `True` return without error even if something went wrong. 

2591 Problems could occur if another process is simultaneously trying 

2592 to delete. 

2593 selected_ids : `collections.abc.collection` [`DatasetId`] or `None` 

2594 Explicit list of dataset IDs that can be removed from the trash. 

2595 If listed datasets are not already included in the trash table 

2596 they will be ignored. If `None` every entry in the trash table 

2597 will be processed. 

2598 dry_run : `bool`, optional 

2599 If `True`, the trash table will be queried and results reported 

2600 but no artifacts will be removed. 

2601 

2602 Returns 

2603 ------- 

2604 removed : `set` [ `lsst.resources.ResourcePath` ] 

2605 Artifacts successfully removed. 

2606 

2607 Notes 

2608 ----- 

2609 Will empty the records from the trash tables only if this call finishes 

2610 without raising. 

2611 """ 

2612 # Context manager will empty trash iff we finish it without raising. 

2613 # It will also automatically delete the relevant rows from the 

2614 # trash table and the records table. 

2615 with self.bridge.emptyTrash( 

2616 self._table, 

2617 record_class=StoredFileInfo, 

2618 record_column="path", 

2619 selected_ids=selected_ids, 

2620 dry_run=dry_run, 

2621 ) as trash_data: 

2622 # Removing the artifacts themselves requires that the files are 

2623 # not also associated with refs that are not to be trashed. 

2624 # Therefore need to do a query with the file paths themselves 

2625 # and return all the refs associated with them. Can only delete 

2626 # a file if the refs to be trashed are the only refs associated 

2627 # with the file. 

2628 # This requires multiple copies of the trashed items 

2629 trashed, artifacts_to_keep = trash_data 

2630 

2631 # Assume that # in path means there are fragments involved. The 

2632 # fragments can not be handled by the emptyTrash bridge call 

2633 # so need to be processed independently. 

2634 # The generator has to be converted to a list for multiple 

2635 # iterations. Clean up the typing so that multiple isinstance 

2636 # tests aren't needed later. 

2637 trashed_list = [(ref, ninfo) for ref, ninfo in trashed if isinstance(ninfo, StoredFileInfo)] 

2638 

2639 if artifacts_to_keep is None or any("#" in info[1].path for info in trashed_list): 

2640 # The bridge is not helping us so have to work it out 

2641 # ourselves. This is not going to be as efficient. 

2642 # This mapping does not include the fragments. 

2643 if artifacts_to_keep is not None: 

2644 # This means we have already checked for non-fragment 

2645 # examples so can filter. 

2646 paths_to_check = {info.path for _, info in trashed_list if "#" in info.path} 

2647 else: 

2648 paths_to_check = {info.path for _, info in trashed_list} 

2649 

2650 path_map = self._refs_associated_with_artifacts(paths_to_check) 

2651 

2652 for ref, info in trashed_list: 

2653 path = info.artifact_path 

2654 # For disassembled composites in a Zip it is possible 

2655 # for the same path to correspond to the same dataset ref 

2656 # multiple times so trap for that. 

2657 if ref.id in path_map[path]: 

2658 path_map[path].remove(ref.id) 

2659 if not path_map[path]: 

2660 del path_map[path] 

2661 

2662 slow_artifacts_to_keep = set(path_map) 

2663 if artifacts_to_keep is not None: 

2664 artifacts_to_keep.update(slow_artifacts_to_keep) 

2665 else: 

2666 artifacts_to_keep = slow_artifacts_to_keep 

2667 

2668 n_direct = 0 

2669 artifacts_to_delete: set[ResourcePath] = set() 

2670 for ref, info in trashed_list: 

2671 # Should not happen for this implementation but need 

2672 # to keep mypy happy. 

2673 assert info is not None, f"Internal logic error in emptyTrash with ref {ref}." 

2674 

2675 if info.artifact_path in artifacts_to_keep: 

2676 # This is a multi-dataset artifact and we are not 

2677 # removing all associated refs. 

2678 continue 

2679 

2680 # Only trashed refs still known to datastore will be returned. 

2681 location = info.file_location(self.locationFactory) 

2682 

2683 if location.pathInStore.isabs(): 

2684 n_direct += 1 

2685 continue 

2686 

2687 # Strip fragment before storing since it is the artifact 

2688 # we are deleting and we do not want repeats for every member 

2689 # in a zip. 

2690 artifacts_to_delete.add(location.uri.replace(fragment="")) 

2691 

2692 if n_direct > 0: 

2693 s = "s" if n_direct != 1 else "" 

2694 log.verbose("Not deleting %d artifact%s using absolute URI%s", n_direct, s, s) 

2695 

2696 if artifacts_to_keep: 

2697 log.verbose( 

2698 "%d artifact%s %s not deleted because of association with other datasets", 

2699 len(artifacts_to_keep), 

2700 "s" if len(artifacts_to_keep) != 1 else "", 

2701 "were" if len(artifacts_to_keep) != 1 else "was", 

2702 ) 

2703 

2704 if not artifacts_to_delete: 

2705 return set() 

2706 

2707 # Now do the deleting. Special case the log message for a single 

2708 # artifact. 

2709 if len(artifacts_to_delete) == 1: 

2710 log.verbose( 

2711 "%s removing file artifact %s from datastore %s", 

2712 "Would be" if dry_run else "Now", 

2713 list(artifacts_to_delete)[0], 

2714 self.name, 

2715 ) 

2716 else: 

2717 log.verbose( 

2718 "%s removing %d file artifacts from datastore %s", 

2719 "Would be" if dry_run else "Now", 

2720 len(artifacts_to_delete), 

2721 self.name, 

2722 ) 

2723 

2724 # For dry-run mode do not attempt to search the file store for 

2725 # the artifacts to determine whether they exist or not. Simply 

2726 # report that an attempt would be made to delete them. Never 

2727 # report direct imports. 

2728 if dry_run: 

2729 return artifacts_to_delete 

2730 

2731 # Now remove the actual file artifacts. 

2732 remove_result = ResourcePath.mremove(artifacts_to_delete, do_raise=False) 

2733 

2734 removed: set[ResourcePath] = set() 

2735 exceptions: list[Exception] = [] 

2736 for uri, result in remove_result.items(): 

2737 if result.exception is None or isinstance(result.exception, FileNotFoundError): 

2738 # File not existing is not an error since some other 

2739 # process might have been trying to clean it and we do not 

2740 # want to raise an error for a situation where the file 

2741 # is not there and we do not want it to be there. 

2742 removed.add(uri) 

2743 else: 

2744 exceptions.append(result.exception) 

2745 

2746 if exceptions: 

2747 s_err = "s" if len(exceptions) != 1 else "" 

2748 e = ExceptionGroup(f"Error{s_err} removing {len(exceptions)} artifact{s_err}", exceptions) 

2749 if ignore_errors: 

2750 # Use a debug message here even though it's not 

2751 # a good situation. In some cases this can be 

2752 # caused by a race between user A and user B 

2753 # and neither of them has permissions for the 

2754 # other's files. Butler does not know about users 

2755 # and trash has no idea what collections these 

2756 # files were in (without guessing from a path). 

2757 log.debug( 

2758 "Encountered %d error%s removing %d artifact%s from datastore %s: %s", 

2759 len(exceptions), 

2760 s_err, 

2761 len(artifacts_to_delete), 

2762 "s" if len(artifacts_to_delete) != 1 else "", 

2763 self.name, 

2764 e, 

2765 ) 

2766 else: 

2767 raise e 

2768 return removed 

2769 

2770 @transactional 

2771 def transfer_from( 

2772 self, 

2773 source_records: FileTransferMap, 

2774 refs: Collection[DatasetRef], 

2775 transfer: str = "auto", 

2776 artifact_existence: dict[ResourcePath, bool] | None = None, 

2777 dry_run: bool = False, 

2778 ) -> tuple[set[DatasetRef], set[DatasetRef]]: 

2779 log.verbose("Transferring %d datasets to %s", len(refs), self.name) 

2780 

2781 # Stop early if "direct" transfer mode is requested. That would 

2782 # require that the URI inside the source datastore should be stored 

2783 # directly in the target datastore, which seems unlikely to be useful 

2784 # since at any moment the source datastore could delete the file. 

2785 if transfer in ("direct", "split"): 

2786 raise ValueError( 

2787 f"Can not transfer from a source datastore using {transfer} mode since" 

2788 " those files are controlled by the other datastore." 

2789 ) 

2790 

2791 if not refs: 

2792 return set(), set() 

2793 

2794 # Empty existence lookup if none given. 

2795 if artifact_existence is None: 

2796 artifact_existence = {} 

2797 

2798 # In order to handle disassembled composites the code works 

2799 # at the records level since it can assume that internal APIs 

2800 # can be used. 

2801 # - If the record already exists in the destination this is assumed 

2802 # to be okay. 

2803 # - If there is no record but the source and destination URIs are 

2804 # identical no transfer is done but the record is added. 

2805 # - If the source record refers to an absolute URI currently assume 

2806 # that that URI should remain absolute and will be visible to the 

2807 # destination butler. May need to have a flag to indicate whether 

2808 # the dataset should be transferred. This will only happen if 

2809 # the detached Butler has had a local ingest. 

2810 

2811 # See if we already have these records 

2812 log.verbose("Looking up existing datastore records in target %s for %d refs", self.name, len(refs)) 

2813 target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

2814 

2815 # The artifacts to register 

2816 artifacts = [] 

2817 

2818 # Refs that already exist 

2819 already_present = [] 

2820 

2821 # Refs that were rejected by this datastore. 

2822 rejected = set() 

2823 

2824 # Refs that were transferred successfully. 

2825 accepted = set() 

2826 

2827 # Record each time we have done a "direct" transfer. 

2828 direct_transfers = [] 

2829 

2830 # Keep track of all the file transfers that are required. 

2831 from_to: list[tuple[ResourcePath, ResourcePath]] = [] 

2832 

2833 # Now can transfer the artifacts 

2834 log.verbose("Transferring artifacts") 

2835 for ref in refs: 

2836 if not self.constraints.isAcceptable(ref): 

2837 # This datastore should not be accepting this dataset. 

2838 rejected.add(ref) 

2839 continue 

2840 

2841 accepted.add(ref) 

2842 

2843 if ref.id in target_records: 

2844 # Already have an artifact for this. 

2845 already_present.append(ref) 

2846 continue 

2847 

2848 # mypy needs to know these are always resolved refs 

2849 for transfer_info in source_records.get(ref.id, []): 

2850 info = transfer_info.file_info 

2851 source_location = transfer_info.location 

2852 target_location = info.file_location(self.locationFactory) 

2853 if transfer == "unsafe_direct": 

2854 # Use the existing file from the source location in place, 

2855 # by recording the absolute URI in the target DB. This is 

2856 # "unsafe" because the file could be deleted from the 

2857 # source Butler at any time, leaving a dangling reference. 

2858 source_location = source_location.toAbsolute() 

2859 direct_transfers.append(source_location) 

2860 info = info.update(path=str(source_location.uri)) 

2861 elif source_location == target_location and not source_location.pathInStore.isabs(): 

2862 # Artifact is already in the target location. 

2863 # (which is how execution butler currently runs) 

2864 pass 

2865 else: 

2866 if target_location.pathInStore.isabs(): 

2867 # Just because we can see the artifact when running 

2868 # the transfer doesn't mean it will be generally 

2869 # accessible to a user of this butler. Need to decide 

2870 # what to do about an absolute path. 

2871 if transfer == "auto": 

2872 # For "auto" transfers we allow the absolute URI 

2873 # to be recorded in the target datastore. 

2874 direct_transfers.append(source_location) 

2875 else: 

2876 # The user is explicitly requesting a transfer 

2877 # even for an absolute URI. This requires us to 

2878 # calculate the target path. 

2879 template_ref = ref 

2880 if info.component: 

2881 template_ref = ref.makeComponentRef(info.component) 

2882 target_location = self._calculate_ingested_datastore_name( 

2883 source_location.uri, 

2884 template_ref, 

2885 ) 

2886 

2887 info = info.update(path=target_location.pathInStore.path) 

2888 

2889 # Need to transfer it to the new location. 

2890 from_to.append((source_location.uri, target_location.uri)) 

2891 

2892 artifacts.append((ref, info)) 

2893 

2894 # Do the file transfers in bulk. 

2895 # Assume we should always overwrite. If the artifact 

2896 # is there this might indicate that a previous transfer 

2897 # was interrupted but was not able to be rolled back 

2898 # completely (eg pre-emption) so follow Datastore default 

2899 # and overwrite. Do not copy if we are in dry-run mode. 

2900 if dry_run: 

2901 log.info("Would be copying %d file artifacts", len(from_to)) 

2902 else: 

2903 log.verbose("Copying %d file artifacts", len(from_to)) 

2904 with time_this(log, msg="Transferring datasets into datastore", level=VERBOSE): 

2905 ResourcePath.mtransfer( 

2906 transfer, 

2907 from_to, 

2908 overwrite=True, 

2909 transaction=self._transaction, 

2910 ) 

2911 

2912 if direct_transfers: 

2913 log.info( 

2914 "Transfer request for an outside-datastore artifact with absolute URI done %d time%s", 

2915 len(direct_transfers), 

2916 "" if len(direct_transfers) == 1 else "s", 

2917 ) 

2918 

2919 # We are overwriting previous datasets that may have already 

2920 # existed. We therefore should ensure that we force the 

2921 # datastore records to agree. Note that this can potentially lead 

2922 # to difficulties if the dataset has previously been ingested 

2923 # disassembled and is somehow now assembled, or vice versa. 

2924 if not dry_run: 

2925 log.verbose("Registering datastore records in database") 

2926 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.REPLACE) 

2927 

2928 if already_present: 

2929 n_skipped = len(already_present) 

2930 log.info( 

2931 "Skipped transfer of %d dataset%s already present in datastore", 

2932 n_skipped, 

2933 "" if n_skipped == 1 else "s", 

2934 ) 

2935 

2936 log.verbose( 

2937 "Finished transfer_from to %s with %d accepted, %d rejected", 

2938 self.name, 

2939 len(accepted), 

2940 len(rejected), 

2941 ) 

2942 return accepted, rejected 

2943 

2944 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap: 

2945 source_records = self._get_stored_records_associated_with_refs( 

2946 [FakeDatasetRef(id) for id in dataset_ids], ignore_datastore_records=True 

2947 ) 

2948 return self._convert_stored_file_info_to_file_transfer_record(source_records) 

2949 

2950 def locate_missing_files_for_transfer( 

2951 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] 

2952 ) -> FileTransferMap: 

2953 missing_ids = {ref.id for ref in refs} 

2954 # Missing IDs can be okay if that datastore has allowed 

2955 # gets based on file existence. Should we transfer what we can 

2956 # or complain about it and warn? 

2957 if not self.trustGetRequest: 

2958 return {} 

2959 

2960 found_records = self._find_missing_records( 

2961 refs, missing_ids, artifact_existence, warn_for_missing=False 

2962 ) 

2963 return self._convert_stored_file_info_to_file_transfer_record(found_records) 

2964 

2965 def _convert_stored_file_info_to_file_transfer_record( 

2966 self, info_map: dict[DatasetId, list[StoredFileInfo]] 

2967 ) -> FileTransferMap: 

2968 output: dict[DatasetId, list[FileTransferRecord]] = {} 

2969 for k, file_info_list in info_map.items(): 

2970 output[k] = [ 

2971 FileTransferRecord(file_info=info, location=info.file_location(self.locationFactory)) 

2972 for info in file_info_list 

2973 ] 

2974 return output 

2975 

2976 @transactional 

2977 def forget(self, refs: Iterable[DatasetRef]) -> None: 

2978 # Docstring inherited. 

2979 refs = list(refs) 

2980 self.bridge.forget(refs) 

2981 self._table.delete(["dataset_id"], *[{"dataset_id": ref.id} for ref in refs]) 

2982 

2983 def validateConfiguration( 

2984 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False 

2985 ) -> None: 

2986 """Validate some of the configuration for this datastore. 

2987 

2988 Parameters 

2989 ---------- 

2990 entities : `~collections.abc.Iterable` [`DatasetRef` | `DatasetType` \ 

2991 | `StorageClass`] 

2992 Entities to test against this configuration. Can be differing 

2993 types. 

2994 logFailures : `bool`, optional 

2995 If `True`, output a log message for every validation error 

2996 detected. 

2997 

2998 Returns 

2999 ------- 

3000 None 

3001 

3002 Raises 

3003 ------ 

3004 DatastoreValidationError 

3005 Raised if there is a validation problem with a configuration. 

3006 All the problems are reported in a single exception. 

3007 

3008 Notes 

3009 ----- 

3010 This method checks that all the supplied entities have valid file 

3011 templates and also have formatters defined. 

3012 """ 

3013 templateFailed = None 

3014 try: 

3015 self.templates.validateTemplates(entities, logFailures=logFailures) 

3016 except FileTemplateValidationError as e: 

3017 templateFailed = str(e) 

3018 

3019 formatterFailed = [] 

3020 for entity in entities: 

3021 try: 

3022 self.formatterFactory.getFormatterClass(entity) 

3023 except KeyError as e: 

3024 formatterFailed.append(str(e)) 

3025 if logFailures: 

3026 log.critical("Formatter failure: %s", e) 

3027 

3028 if templateFailed or formatterFailed: 

3029 messages = [] 

3030 if templateFailed: 

3031 messages.append(templateFailed) 

3032 if formatterFailed: 

3033 messages.append(",".join(formatterFailed)) 

3034 msg = ";\n".join(messages) 

3035 raise DatastoreValidationError(msg) 

3036 

3037 def getLookupKeys(self) -> set[LookupKey]: 

3038 # Docstring is inherited from base class 

3039 return ( 

3040 self.templates.getLookupKeys() 

3041 | self.formatterFactory.getLookupKeys() 

3042 | self.constraints.getLookupKeys() 

3043 ) 

3044 

3045 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None: 

3046 # Docstring is inherited from base class 

3047 # The key can be valid in either formatters or templates so we can 

3048 # only check the template if it exists 

3049 if lookupKey in self.templates: 

3050 try: 

3051 self.templates[lookupKey].validateTemplate(entity) 

3052 except FileTemplateValidationError as e: 

3053 raise DatastoreValidationError(e) from e 

3054 

3055 def export( 

3056 self, 

3057 refs: Iterable[DatasetRef], 

3058 *, 

3059 directory: ResourcePathExpression | None = None, 

3060 transfer: str | None = "auto", 

3061 ) -> Iterable[FileDataset]: 

3062 # Docstring inherited from Datastore.export. 

3063 if transfer == "auto" and directory is None: 

3064 transfer = None 

3065 

3066 if transfer is not None and transfer != "direct" and directory is None: 

3067 raise TypeError(f"Cannot export using transfer mode {transfer} with no export directory given") 

3068 

3069 if transfer == "move": 

3070 raise TypeError("Can not export by moving files out of datastore.") 

3071 

3072 # Force the directory to be a URI object 

3073 directoryUri: ResourcePath | None = None 

3074 if directory is not None: 

3075 directoryUri = ResourcePath(directory, forceDirectory=True) 

3076 

3077 if transfer is not None and directoryUri is not None and not directoryUri.exists(): 

3078 # mypy needs the second test 

3079 raise FileNotFoundError(f"Export location {directory} does not exist") 

3080 

3081 progress = Progress("lsst.daf.butler.datastores.FileDatastore.export", level=logging.DEBUG) 

3082 for ref in progress.wrap(refs, "Exporting dataset files"): 

3083 fileLocations = self._get_dataset_locations_info(ref) 

3084 if not fileLocations: 

3085 raise FileNotFoundError(f"Could not retrieve dataset {ref}.") 

3086 # For now we can not export disassembled datasets 

3087 if len(fileLocations) > 1: 

3088 raise NotImplementedError(f"Can not export disassembled datasets such as {ref}") 

3089 location, storedFileInfo = fileLocations[0] 

3090 

3091 pathInStore = location.pathInStore.path 

3092 if transfer is None: 

3093 # TODO: do we also need to return the readStorageClass somehow? 

3094 # We will use the path in store directly. If this is an 

3095 # absolute URI, preserve it. 

3096 if location.pathInStore.isabs(): 

3097 pathInStore = str(location.uri) 

3098 elif transfer == "direct": 

3099 # Use full URIs to the remote store in the export 

3100 pathInStore = str(location.uri) 

3101 else: 

3102 # mypy needs help 

3103 assert directoryUri is not None, "directoryUri must be defined to get here" 

3104 storeUri = ResourcePath(location.uri, forceDirectory=False) 

3105 

3106 # if the datastore has an absolute URI to a resource, we 

3107 # have two options: 

3108 # 1. Keep the absolute URI in the exported YAML 

3109 # 2. Allocate a new name in the local datastore and transfer 

3110 # it. 

3111 # For now go with option 2 

3112 if location.pathInStore.isabs(): 

3113 template = self.templates.getTemplate(ref) 

3114 newURI = ResourcePath(template.format(ref), forceAbsolute=False, forceDirectory=False) 

3115 pathInStore = str(newURI.updatedExtension(location.pathInStore.getExtension())) 

3116 

3117 exportUri = directoryUri.join(pathInStore) 

3118 exportUri.transfer_from(storeUri, transfer=transfer) 

3119 

3120 yield FileDataset(refs=[ref], path=pathInStore, formatter=storedFileInfo.formatter) 

3121 

3122 @staticmethod 

3123 def computeChecksum(uri: ResourcePath, algorithm: str = "blake2b", block_size: int = 8192) -> str | None: 

3124 """Compute the checksum of the supplied file. 

3125 

3126 Parameters 

3127 ---------- 

3128 uri : `lsst.resources.ResourcePath` 

3129 Name of resource to calculate checksum from. 

3130 algorithm : `str`, optional 

3131 Name of algorithm to use. Must be one of the algorithms supported 

3132 by :py:class`hashlib`. 

3133 block_size : `int` 

3134 Number of bytes to read from file at one time. 

3135 

3136 Returns 

3137 ------- 

3138 hexdigest : `str` 

3139 Hex digest of the file. 

3140 

3141 Notes 

3142 ----- 

3143 Currently returns None if the URI is for a remote resource. 

3144 """ 

3145 if algorithm not in hashlib.algorithms_guaranteed: 

3146 raise NameError(f"The specified algorithm '{algorithm}' is not supported by hashlib") 

3147 

3148 if not uri.isLocal: 

3149 return None 

3150 

3151 hasher = hashlib.new(algorithm) 

3152 

3153 with uri.as_local() as local_uri, open(local_uri.ospath, "rb") as f: 

3154 for chunk in iter(lambda: f.read(block_size), b""): 

3155 hasher.update(chunk) 

3156 

3157 return hasher.hexdigest() 

3158 

3159 def needs_expanded_data_ids( 

3160 self, 

3161 transfer: str | None, 

3162 entity: DatasetRef | DatasetType | StorageClass | None = None, 

3163 ) -> bool: 

3164 # Docstring inherited. 

3165 # This _could_ also use entity to inspect whether the filename template 

3166 # involves placeholders other than the required dimensions for its 

3167 # dataset type, but that's not necessary for correctness; it just 

3168 # enables more optimizations (perhaps only in theory). 

3169 return transfer not in ("direct", None) 

3170 

3171 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None: 

3172 # Docstring inherited from the base class. 

3173 record_data = data.get(self.name) 

3174 if not record_data: 

3175 return 

3176 

3177 self._bridge.insert(FakeDatasetRef(dataset_id) for dataset_id in record_data.records) 

3178 

3179 # TODO: Verify that there are no unexpected table names in the dict? 

3180 unpacked_records = [] 

3181 for dataset_id, dataset_data in record_data.records.items(): 

3182 records = dataset_data.get(self._table.name) 

3183 if records: 

3184 for info in records: 

3185 assert isinstance(info, StoredFileInfo), "Expecting StoredFileInfo records" 

3186 unpacked_records.append(info.to_record(dataset_id=dataset_id)) 

3187 if unpacked_records: 

3188 self._table.insert(*unpacked_records, transaction=self._transaction) 

3189 

3190 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]: 

3191 # Docstring inherited from the base class. 

3192 

3193 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

3194 for batch in self._export_rows([ref.id for ref in refs]): 

3195 for row in batch: 

3196 info: StoredDatastoreItemInfo = StoredFileInfo.from_record(row) 

3197 dataset_records = records.setdefault(row["dataset_id"], {}) 

3198 dataset_records.setdefault(self._table.name, []).append(info) 

3199 

3200 record_data = DatastoreRecordData(records=records) 

3201 return {self.name: record_data} 

3202 

3203 def _export_rows(self, datasets: Collection[DatasetId]) -> Iterator[Sequence[Mapping[str, Any]]]: 

3204 # This call to 'bridge.check' filters out "partially deleted" datasets. 

3205 # Specifically, ones in the unusual edge state that: 

3206 # 1. They have an entry in the registry dataset tables 

3207 # 2. They were "trashed" from the datastore, so they are not 

3208 # present in the "dataset_location" table.) 

3209 # 3. But the trash has not been "emptied", so there are still entries 

3210 # in the "opaque" datastore records table. 

3211 # 

3212 # As far as I can tell, this can only occur in the case of a concurrent 

3213 # or aborted call to `Butler.pruneDatasets(unstore=True, purge=False)`. 

3214 # Datasets (with or without files existing on disk) can persist in 

3215 # this zombie state indefinitely, until someone manually empties 

3216 # the trash. 

3217 found_ids = self._bridge.check(datasets) 

3218 return self._table.fetch_batches(dataset_id=found_ids) 

3219 

3220 def export_table(self, datasets: Collection[DatasetId]) -> DatastoreRecordTable: 

3221 # Docstring inherited from the base class. 

3222 

3223 tables: list[DatastoreRecordTable] = [] 

3224 for batch in self._export_rows(datasets): 

3225 file_info = StoredFileInfoTable.from_records(batch) 

3226 tables.append(DatastoreRecordTable.from_stored_file_info_table(self.name, file_info)) 

3227 return DatastoreRecordTable.combine(tables) 

3228 

3229 def import_table(self, table: DatastoreRecordTable) -> None: 

3230 # Docstring inherited from the base class. 

3231 

3232 records = table.to_stored_file_info_table().to_records() 

3233 dataset_ids = [FakeDatasetRef(record["dataset_id"]) for record in records] 

3234 if len(records) > 0: 

3235 self._bridge.insert(dataset_ids) 

3236 self._table.insert(*records, transaction=self._transaction) 

3237 

3238 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]: 

3239 # Docstring inherited from the base class. 

3240 refs = [self._cast_storage_class(ref) for ref in refs] 

3241 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

3242 for ref in refs: 

3243 if not self.constraints.isAcceptable(ref): 

3244 continue 

3245 fileLocations = self._get_expected_dataset_locations_info(ref) 

3246 if not fileLocations: 

3247 continue 

3248 dataset_records = records.setdefault(ref.id, {}) 

3249 dataset_records.setdefault(self._table.name, []) 

3250 for _, storedFileInfo in fileLocations: 

3251 dataset_records[self._table.name].append(storedFileInfo) 

3252 

3253 record_data = DatastoreRecordData(records=records) 

3254 return {self.name: record_data} 

3255 

3256 def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None: 

3257 # Docstring inherited from the base class. 

3258 self._retrieve_dataset_method = method 

3259 

3260 def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef: 

3261 """Update dataset reference to use the storage class from registry.""" 

3262 if self._retrieve_dataset_method is None: 

3263 # We could raise an exception here but unit tests do not define 

3264 # this method. 

3265 return ref 

3266 dataset_type = self._retrieve_dataset_method(ref.datasetType.name) 

3267 if dataset_type is not None: 

3268 ref = ref.overrideStorageClass(dataset_type.storageClass_name) 

3269 return ref 

3270 

3271 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: 

3272 # Docstring inherited from the base class. 

3273 return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(), StoredFileInfo)}