Coverage for python / lsst / daf / butler / direct_butler / _direct_butler.py: 10%

975 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 01:21 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Butler top level classes.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ( 

33 "ButlerValidationError", 

34 "DirectButler", 

35) 

36 

37import collections.abc 

38import contextlib 

39import io 

40import itertools 

41import math 

42import numbers 

43import os 

44import uuid 

45import warnings 

46from collections import Counter, defaultdict 

47from collections.abc import Collection, Iterable, Iterator, Mapping, MutableMapping, Sequence 

48from functools import partial 

49from types import EllipsisType 

50from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, TextIO, cast 

51 

52from deprecated.sphinx import deprecated 

53from sqlalchemy.exc import IntegrityError 

54 

55from lsst.resources import ResourcePath, ResourcePathExpression 

56from lsst.utils.introspection import find_outside_stacklevel, get_class_of 

57from lsst.utils.iteration import chunk_iterable 

58from lsst.utils.logging import VERBOSE, getLogger 

59from lsst.utils.timer import time_this 

60 

61from .._butler import Butler, _DeprecatedDefault 

62from .._butler_config import ButlerConfig 

63from .._butler_instance_options import ButlerInstanceOptions 

64from .._butler_metrics import ButlerMetrics 

65from .._collection_type import CollectionType 

66from .._dataset_existence import DatasetExistence 

67from .._dataset_ref import DatasetRef 

68from .._dataset_type import DatasetType 

69from .._deferredDatasetHandle import DeferredDatasetHandle 

70from .._exceptions import ( 

71 DatasetNotFoundError, 

72 DimensionValueError, 

73 EmptyQueryResultError, 

74 InconsistentUniverseError, 

75 ValidationError, 

76) 

77from .._file_dataset import FileDataset 

78from .._limited_butler import LimitedButler 

79from .._query_all_datasets import QueryAllDatasetsParameters, query_all_datasets 

80from .._registry_shim import RegistryShim 

81from .._storage_class import StorageClass, StorageClassFactory 

82from .._timespan import Timespan 

83from ..datastore import Datastore, NullDatastore 

84from ..datastores.file_datastore.retrieve_artifacts import ZipIndex, retrieve_and_zip 

85from ..datastores.file_datastore.transfer import retrieve_file_transfer_records 

86from ..dimensions import DataCoordinate, Dimension, DimensionGroup 

87from ..direct_query_driver import DirectQueryDriver 

88from ..progress import Progress 

89from ..queries import Query 

90from ..registry import ( 

91 ConflictingDefinitionError, 

92 DataIdError, 

93 MissingDatasetTypeError, 

94 RegistryDefaults, 

95 _RegistryFactory, 

96) 

97from ..registry.sql_registry import SqlRegistry 

98from ..transfers import RepoExportContext 

99from ..utils import transactional 

100from ._direct_butler_collections import DirectButlerCollections 

101 

102if TYPE_CHECKING: 

103 from lsst.resources import ResourceHandleProtocol 

104 

105 from .._dataset_provenance import DatasetProvenance 

106 from .._dataset_ref import DatasetId 

107 from ..datastore import DatasetRefURIs 

108 from ..dimensions import DataId, DataIdValue, DimensionElement, DimensionRecord, DimensionUniverse 

109 from ..registry import CollectionArgType, Registry 

110 from ..transfers import RepoImportBackend 

111 

112_LOG = getLogger(__name__) 

113 

114 

115class ButlerValidationError(ValidationError): 

116 """There is a problem with the Butler configuration.""" 

117 

118 pass 

119 

120 

121class DirectButler(Butler): # numpydoc ignore=PR02 

122 """Main entry point for the data access system. 

123 

124 Parameters 

125 ---------- 

126 config : `ButlerConfig` 

127 The configuration for this Butler instance. 

128 registry : `SqlRegistry` 

129 The object that manages dataset metadata and relationships. 

130 datastore : Datastore 

131 The object that manages actual dataset storage. 

132 storageClasses : StorageClassFactory 

133 An object that maps known storage class names to objects that fully 

134 describe them. 

135 

136 Notes 

137 ----- 

138 Most users should call the top-level `Butler`.``from_config`` instead of 

139 using this constructor directly. 

140 """ 

141 

142 # This is __new__ instead of __init__ because we have to support 

143 # instantiation via the legacy constructor Butler.__new__(), which 

144 # reads the configuration and selects which subclass to instantiate. The 

145 # interaction between __new__ and __init__ is kind of wacky in Python. If 

146 # we were using __init__ here, __init__ would be called twice (once when 

147 # the DirectButler instance is constructed inside Butler.from_config(), and 

148 # a second time with the original arguments to Butler() when the instance 

149 # is returned from Butler.__new__() 

150 def __new__( 

151 cls, 

152 *, 

153 config: ButlerConfig, 

154 registry: SqlRegistry, 

155 datastore: Datastore, 

156 storageClasses: StorageClassFactory, 

157 metrics: ButlerMetrics | None = None, 

158 ) -> DirectButler: 

159 self = cast(DirectButler, super().__new__(cls)) 

160 self._config = config 

161 self._registry = registry 

162 self._datastore = datastore 

163 self.storageClasses = storageClasses 

164 self._metrics = metrics if metrics is not None else ButlerMetrics() 

165 

166 # For execution butler the datastore needs a special 

167 # dependency-inversion trick. This is not used by regular butler, 

168 # but we do not have a way to distinguish regular butler from execution 

169 # butler. 

170 self._datastore.set_retrieve_dataset_type_method(partial(_retrieve_dataset_type, registry)) 

171 

172 self._closed = False 

173 

174 return self 

175 

176 @classmethod 

177 def create_from_config( 

178 cls, 

179 config: ButlerConfig, 

180 *, 

181 options: ButlerInstanceOptions, 

182 without_datastore: bool = False, 

183 ) -> DirectButler: 

184 """Construct a Butler instance from a configuration file. 

185 

186 Parameters 

187 ---------- 

188 config : `ButlerConfig` 

189 The configuration for this Butler instance. 

190 options : `ButlerInstanceOptions` 

191 Default values and other settings for the Butler instance. 

192 without_datastore : `bool`, optional 

193 If `True` do not attach a datastore to this butler. Any attempts 

194 to use a datastore will fail. 

195 

196 Notes 

197 ----- 

198 Most users should call the top-level `Butler`.``from_config`` 

199 instead of using this function directly. 

200 """ 

201 if "run" in config or "collection" in config: 

202 raise ValueError("Passing a run or collection via configuration is no longer supported.") 

203 

204 defaults = RegistryDefaults.from_butler_instance_options(options) 

205 try: 

206 butlerRoot = config.get("root", config.configDir) 

207 writeable = options.writeable 

208 if writeable is None: 

209 writeable = options.run is not None 

210 registry = _RegistryFactory(config).from_config( 

211 butlerRoot=butlerRoot, writeable=writeable, defaults=defaults 

212 ) 

213 if without_datastore: 

214 datastore: Datastore = NullDatastore(None, None) 

215 else: 

216 datastore = Datastore.fromConfig( 

217 config, registry.getDatastoreBridgeManager(), butlerRoot=butlerRoot 

218 ) 

219 # TODO: Once datastore drops dependency on registry we can 

220 # construct datastore first and pass opaque tables to registry 

221 # constructor. 

222 registry.make_datastore_tables(datastore.get_opaque_table_definitions()) 

223 storageClasses = StorageClassFactory() 

224 storageClasses.addFromConfig(config) 

225 

226 return DirectButler( 

227 config=config, 

228 registry=registry, 

229 datastore=datastore, 

230 storageClasses=storageClasses, 

231 metrics=options.metrics, 

232 ) 

233 except Exception: 

234 # Failures here usually mean that configuration is incomplete, 

235 # just issue an error message which includes config file URI. 

236 _LOG.error(f"Failed to instantiate Butler from config {config.configFile}.") 

237 raise 

238 

239 def clone( 

240 self, 

241 *, 

242 collections: CollectionArgType | None | EllipsisType = ..., 

243 run: str | None | EllipsisType = ..., 

244 inferDefaults: bool | EllipsisType = ..., 

245 dataId: dict[str, str] | EllipsisType = ..., 

246 metrics: ButlerMetrics | None = None, 

247 ) -> DirectButler: 

248 # Docstring inherited 

249 defaults = self._registry.defaults.clone(collections, run, inferDefaults, dataId) 

250 registry = self._registry.copy(defaults) 

251 

252 return DirectButler( 

253 registry=registry, 

254 config=self._config, 

255 datastore=self._datastore.clone(registry.getDatastoreBridgeManager()), 

256 storageClasses=self.storageClasses, 

257 metrics=metrics, 

258 ) 

259 

260 def close(self) -> None: 

261 if not self._closed: 

262 self._closed = True 

263 self._registry.close() 

264 # Cause exceptions to be raised if a user attempts to use the 

265 # instance after closing it. Without this, Butler would still 

266 # work after being closed because of implementation details 

267 # of SqlAlchemy, but this may not continue to be the case in the 

268 # future and we don't want users to get in the habit of doing this. 

269 self._registry = _BUTLER_CLOSED_INSTANCE 

270 self._datastore = _BUTLER_CLOSED_INSTANCE 

271 

272 GENERATION: ClassVar[int] = 3 

273 """This is a Generation 3 Butler. 

274 

275 This attribute may be removed in the future, once the Generation 2 Butler 

276 interface has been fully retired; it should only be used in transitional 

277 code. 

278 """ 

279 

280 @classmethod 

281 def _unpickle( 

282 cls, 

283 config: ButlerConfig, 

284 collections: tuple[str, ...] | None, 

285 run: str | None, 

286 defaultDataId: dict[str, str], 

287 writeable: bool, 

288 ) -> DirectButler: 

289 """Callable used to unpickle a Butler. 

290 

291 We prefer not to use ``Butler.__init__`` directly so we can force some 

292 of its many arguments to be keyword-only (note that ``__reduce__`` 

293 can only invoke callables with positional arguments). 

294 

295 Parameters 

296 ---------- 

297 config : `ButlerConfig` 

298 Butler configuration, already coerced into a true `ButlerConfig` 

299 instance (and hence after any search paths for overrides have been 

300 utilized). 

301 collections : `tuple` [ `str` ] 

302 Names of the default collections to read from. 

303 run : `str`, optional 

304 Name of the default `~CollectionType.RUN` collection to write to. 

305 defaultDataId : `dict` [ `str`, `str` ] 

306 Default data ID values. 

307 writeable : `bool` 

308 Whether the Butler should support write operations. 

309 

310 Returns 

311 ------- 

312 butler : `Butler` 

313 A new `Butler` instance. 

314 """ 

315 return cls.create_from_config( 

316 config=config, 

317 options=ButlerInstanceOptions( 

318 collections=collections, run=run, writeable=writeable, kwargs=defaultDataId 

319 ), 

320 ) 

321 

322 def __reduce__(self) -> tuple: 

323 """Support pickling.""" 

324 return ( 

325 DirectButler._unpickle, 

326 ( 

327 self._config, 

328 self.collections.defaults, 

329 self.run, 

330 dict(self._registry.defaults.dataId.required), 

331 self._registry.isWriteable(), 

332 ), 

333 ) 

334 

335 def __str__(self) -> str: 

336 return ( 

337 f"Butler(collections={self.collections}, run={self.run}, " 

338 f"datastore='{self._datastore}', registry='{self._registry}')" 

339 ) 

340 

341 def isWriteable(self) -> bool: 

342 # Docstring inherited. 

343 return self._registry.isWriteable() 

344 

345 def _caching_context(self) -> contextlib.AbstractContextManager[None]: 

346 """Context manager that enables caching.""" 

347 return self._registry.caching_context() 

348 

349 @contextlib.contextmanager 

350 def transaction(self) -> Iterator[None]: 

351 """Context manager supporting `Butler` transactions. 

352 

353 Transactions can be nested. 

354 """ 

355 with self._registry.transaction(), self._datastore.transaction(): 

356 yield 

357 

358 def _standardizeArgs( 

359 self, 

360 datasetRefOrType: DatasetRef | DatasetType | str, 

361 dataId: DataId | None = None, 

362 for_put: bool = True, 

363 **kwargs: Any, 

364 ) -> tuple[DatasetType, DataId | None]: 

365 """Standardize the arguments passed to several Butler APIs. 

366 

367 Parameters 

368 ---------- 

369 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

370 When `DatasetRef` the `dataId` should be `None`. 

371 Otherwise the `DatasetType` or name thereof. 

372 dataId : `dict` or `DataCoordinate` 

373 A `dict` of `Dimension` link name, value pairs that label the 

374 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

375 should be provided as the second argument. 

376 for_put : `bool`, optional 

377 If `True` this call is invoked as part of a `Butler.put`. 

378 Otherwise it is assumed to be part of a `Butler.get()`. This 

379 parameter is only relevant if there is dataset type 

380 inconsistency. 

381 **kwargs 

382 Additional keyword arguments used to augment or construct a 

383 `DataCoordinate`. See `DataCoordinate.standardize` 

384 parameters. 

385 

386 Returns 

387 ------- 

388 datasetType : `DatasetType` 

389 A `DatasetType` instance extracted from ``datasetRefOrType``. 

390 dataId : `dict` or `DataId`, optional 

391 Argument that can be used (along with ``kwargs``) to construct a 

392 `DataId`. 

393 

394 Notes 

395 ----- 

396 Butler APIs that conceptually need a DatasetRef also allow passing a 

397 `DatasetType` (or the name of one) and a `DataId` (or a dict and 

398 keyword arguments that can be used to construct one) separately. This 

399 method accepts those arguments and always returns a true `DatasetType` 

400 and a `DataId` or `dict`. 

401 

402 Standardization of `dict` vs `DataId` is best handled by passing the 

403 returned ``dataId`` (and ``kwargs``) to `Registry` APIs, which are 

404 generally similarly flexible. 

405 """ 

406 externalDatasetType: DatasetType | None = None 

407 internalDatasetType: DatasetType | None = None 

408 if isinstance(datasetRefOrType, DatasetRef): 

409 if dataId is not None or kwargs: 

410 raise ValueError("DatasetRef given, cannot use dataId as well") 

411 externalDatasetType = datasetRefOrType.datasetType 

412 dataId = datasetRefOrType.dataId 

413 else: 

414 # Don't check whether DataId is provided, because Registry APIs 

415 # can usually construct a better error message when it wasn't. 

416 if isinstance(datasetRefOrType, DatasetType): 

417 externalDatasetType = datasetRefOrType 

418 else: 

419 internalDatasetType = self.get_dataset_type(datasetRefOrType) 

420 

421 # Check that they are self-consistent 

422 if externalDatasetType is not None: 

423 internalDatasetType = self.get_dataset_type(externalDatasetType.name) 

424 if externalDatasetType != internalDatasetType: 

425 # We can allow differences if they are compatible, depending 

426 # on whether this is a get or a put. A get requires that 

427 # the python type associated with the datastore can be 

428 # converted to the user type. A put requires that the user 

429 # supplied python type can be converted to the internal 

430 # type expected by registry. 

431 relevantDatasetType = internalDatasetType 

432 if for_put: 

433 is_compatible = internalDatasetType.is_compatible_with(externalDatasetType) 

434 else: 

435 is_compatible = externalDatasetType.is_compatible_with(internalDatasetType) 

436 relevantDatasetType = externalDatasetType 

437 if not is_compatible: 

438 raise ValueError( 

439 f"Supplied dataset type ({externalDatasetType}) inconsistent with " 

440 f"registry definition ({internalDatasetType})" 

441 ) 

442 # Override the internal definition. 

443 internalDatasetType = relevantDatasetType 

444 

445 assert internalDatasetType is not None 

446 return internalDatasetType, dataId 

447 

448 def _rewrite_data_id( 

449 self, dataId: DataId | None, datasetType: DatasetType, **kwargs: Any 

450 ) -> tuple[DataId | None, dict[str, Any]]: 

451 """Rewrite a data ID taking into account dimension records. 

452 

453 Take a Data ID and keyword args and rewrite it if necessary to 

454 allow the user to specify dimension records rather than dimension 

455 primary values. 

456 

457 This allows a user to include a dataId dict with keys of 

458 ``exposure.day_obs`` and ``exposure.seq_num`` instead of giving 

459 the integer exposure ID. It also allows a string to be given 

460 for a dimension value rather than the integer ID if that is more 

461 convenient. For example, rather than having to specifying the 

462 detector with ``detector.full_name``, a string given for ``detector`` 

463 will be interpreted as the full name and converted to the integer 

464 value. 

465 

466 Keyword arguments can also use strings for dimensions like detector 

467 and exposure but python does not allow them to include ``.`` and 

468 so the ``exposure.day_obs`` syntax can not be used in a keyword 

469 argument. 

470 

471 Parameters 

472 ---------- 

473 dataId : `dict` or `DataCoordinate` 

474 A `dict` of `Dimension` link name, value pairs that will label the 

475 `DatasetRef` within a Collection. 

476 datasetType : `DatasetType` 

477 The dataset type associated with this dataId. Required to 

478 determine the relevant dimensions. 

479 **kwargs 

480 Additional keyword arguments used to augment or construct a 

481 `DataId`. See `DataId` parameters. 

482 

483 Returns 

484 ------- 

485 dataId : `dict` or `DataCoordinate` 

486 The, possibly rewritten, dataId. If given a `DataCoordinate` and 

487 no keyword arguments, the original dataId will be returned 

488 unchanged. 

489 **kwargs : `dict` 

490 Any unused keyword arguments (would normally be empty dict). 

491 """ 

492 # Process dimension records that are using record information 

493 # rather than ids 

494 newDataId: dict[str, DataIdValue] = {} 

495 byRecord: dict[str, dict[str, Any]] = defaultdict(dict) 

496 

497 if isinstance(dataId, DataCoordinate): 

498 # Do nothing if we have a DataCoordinate and no kwargs. 

499 if not kwargs: 

500 return dataId, kwargs 

501 # If we have a DataCoordinate with kwargs, we know the 

502 # DataCoordinate only has values for real dimensions. 

503 newDataId.update(dataId.mapping) 

504 elif dataId: 

505 # The data is mapping, which means it might have keys like 

506 # "exposure.obs_id" (unlike kwargs, because a "." is not allowed in 

507 # a keyword parameter). 

508 for k, v in dataId.items(): 

509 if isinstance(k, str) and "." in k: 

510 # Someone is using a more human-readable dataId 

511 dimensionName, record = k.split(".", 1) 

512 byRecord[dimensionName][record] = v 

513 else: 

514 newDataId[k] = v 

515 

516 # Go through the updated dataId and check the type in case someone is 

517 # using an alternate key. We have already filtered out the compound 

518 # keys dimensions.record format. 

519 not_dimensions = {} 

520 

521 # Will need to look in the dataId and the keyword arguments 

522 # and will remove them if they need to be fixed or are unrecognized. 

523 for dataIdDict in (newDataId, kwargs): 

524 # Use a list so we can adjust the dict safely in the loop 

525 for dimensionName in list(dataIdDict): 

526 value = dataIdDict[dimensionName] 

527 try: 

528 dimension = self.dimensions.dimensions[dimensionName] 

529 except KeyError: 

530 # This is not a real dimension 

531 not_dimensions[dimensionName] = value 

532 del dataIdDict[dimensionName] 

533 continue 

534 

535 # Convert an integral type to an explicit int to simplify 

536 # comparisons here 

537 if isinstance(value, numbers.Integral): 

538 value = int(value) 

539 

540 if not isinstance(value, dimension.primaryKey.getPythonType()): 

541 for alternate in dimension.alternateKeys: 

542 if isinstance(value, alternate.getPythonType()): 

543 byRecord[dimensionName][alternate.name] = value 

544 del dataIdDict[dimensionName] 

545 _LOG.debug( 

546 "Converting dimension %s to %s.%s=%s", 

547 dimensionName, 

548 dimensionName, 

549 alternate.name, 

550 value, 

551 ) 

552 break 

553 else: 

554 _LOG.warning( 

555 "Type mismatch found for value '%r' provided for dimension %s. " 

556 "Could not find matching alternative (primary key has type %s) " 

557 "so attempting to use as-is.", 

558 value, 

559 dimensionName, 

560 dimension.primaryKey.getPythonType(), 

561 ) 

562 

563 # By this point kwargs and newDataId should only include valid 

564 # dimensions. Merge kwargs in to the new dataId and log if there 

565 # are dimensions in both (rather than calling update). 

566 for k, v in kwargs.items(): 

567 if k in newDataId and newDataId[k] != v: 

568 _LOG.debug( 

569 "Keyword arg %s overriding explicit value in dataId of %s with %s", k, newDataId[k], v 

570 ) 

571 newDataId[k] = v 

572 # No need to retain any values in kwargs now. 

573 kwargs = {} 

574 

575 # If we have some unrecognized dimensions we have to try to connect 

576 # them to records in other dimensions. This is made more complicated 

577 # by some dimensions having records with clashing names. A mitigation 

578 # is that we can tell by this point which dimensions are missing 

579 # for the DatasetType but this does not work for calibrations 

580 # where additional dimensions can be used to constrain the temporal 

581 # axis. 

582 if not_dimensions: 

583 # Search for all dimensions even if we have been given a value 

584 # explicitly. In some cases records are given as well as the 

585 # actually dimension and this should not be an error if they 

586 # match. 

587 mandatoryDimensions = datasetType.dimensions.names # - provided 

588 

589 candidateDimensions: set[str] = set() 

590 candidateDimensions.update(mandatoryDimensions) 

591 

592 # For calibrations we may well be needing temporal dimensions 

593 # so rather than always including all dimensions in the scan 

594 # restrict things a little. It is still possible for there 

595 # to be confusion over day_obs in visit vs exposure for example. 

596 # If we are not searching calibration collections things may 

597 # fail but they are going to fail anyway because of the 

598 # ambiguousness of the dataId... 

599 if datasetType.isCalibration(): 

600 for dim in self.dimensions.dimensions: 

601 if dim.temporal: 

602 candidateDimensions.add(str(dim)) 

603 

604 # Look up table for the first association with a dimension 

605 guessedAssociation: dict[str, dict[str, Any]] = defaultdict(dict) 

606 

607 # Keep track of whether an item is associated with multiple 

608 # dimensions. 

609 counter: Counter[str] = Counter() 

610 assigned: dict[str, set[str]] = defaultdict(set) 

611 

612 # Go through the missing dimensions and associate the 

613 # given names with records within those dimensions 

614 matched_dims = set() 

615 for dimensionName in candidateDimensions: 

616 dimension = self.dimensions.dimensions[dimensionName] 

617 fields = dimension.metadata.names | dimension.uniqueKeys.names 

618 for field in not_dimensions: 

619 if field in fields: 

620 guessedAssociation[dimensionName][field] = not_dimensions[field] 

621 counter[dimensionName] += 1 

622 assigned[field].add(dimensionName) 

623 matched_dims.add(field) 

624 

625 # Calculate the fields that matched nothing. 

626 never_found = set(not_dimensions) - matched_dims 

627 

628 if never_found: 

629 raise DimensionValueError(f"Unrecognized keyword args given: {never_found}") 

630 

631 # There is a chance we have allocated a single dataId item 

632 # to multiple dimensions. Need to decide which should be retained. 

633 # For now assume that the most popular alternative wins. 

634 # This means that day_obs with seq_num will result in 

635 # exposure.day_obs and not visit.day_obs 

636 # Also prefer an explicitly missing dimension over an inferred 

637 # temporal dimension. 

638 for fieldName, assignedDimensions in assigned.items(): 

639 if len(assignedDimensions) > 1: 

640 # Pick the most popular (preferring mandatory dimensions) 

641 requiredButMissing = assignedDimensions.intersection(mandatoryDimensions) 

642 if requiredButMissing: 

643 candidateDimensions = requiredButMissing 

644 else: 

645 candidateDimensions = assignedDimensions 

646 

647 # If this is a choice between visit and exposure and 

648 # neither was a required part of the dataset type, 

649 # (hence in this branch) always prefer exposure over 

650 # visit since exposures are always defined and visits 

651 # are defined from exposures. 

652 if candidateDimensions == {"exposure", "visit"}: 

653 candidateDimensions = {"exposure"} 

654 

655 # Select the relevant items and get a new restricted 

656 # counter. 

657 theseCounts = {k: v for k, v in counter.items() if k in candidateDimensions} 

658 duplicatesCounter: Counter[str] = Counter() 

659 duplicatesCounter.update(theseCounts) 

660 

661 # Choose the most common. If they are equally common 

662 # we will pick the one that was found first. 

663 # Returns a list of tuples 

664 selected = duplicatesCounter.most_common(1)[0][0] 

665 

666 _LOG.debug( 

667 "Ambiguous dataId entry '%s' associated with multiple dimensions: %s." 

668 " Removed ambiguity by choosing dimension %s.", 

669 fieldName, 

670 ", ".join(assignedDimensions), 

671 selected, 

672 ) 

673 

674 for candidateDimension in assignedDimensions: 

675 if candidateDimension != selected: 

676 del guessedAssociation[candidateDimension][fieldName] 

677 

678 # Update the record look up dict with the new associations 

679 for dimensionName, values in guessedAssociation.items(): 

680 if values: # A dict might now be empty 

681 _LOG.debug( 

682 "Assigned non-dimension dataId keys to dimension %s: %s", dimensionName, values 

683 ) 

684 byRecord[dimensionName].update(values) 

685 

686 if byRecord: 

687 # Some record specifiers were found so we need to convert 

688 # them to the Id form 

689 for dimensionName, values in byRecord.items(): 

690 if dimensionName in newDataId: 

691 _LOG.debug( 

692 "DataId specified explicit %s dimension value of %s in addition to" 

693 " general record specifiers for it of %s. Checking for self-consistency.", 

694 dimensionName, 

695 newDataId[dimensionName], 

696 str(values), 

697 ) 

698 # Get the actual record and compare with these values. 

699 # Only query with relevant data ID values. 

700 filtered_data_id = { 

701 k: v for k, v in newDataId.items() if k in self.dimensions[dimensionName].required 

702 } 

703 try: 

704 recs = self.query_dimension_records( 

705 dimensionName, 

706 data_id=filtered_data_id, 

707 ) 

708 except (DataIdError, EmptyQueryResultError): 

709 raise DimensionValueError( 

710 f"Could not find dimension '{dimensionName}'" 

711 f" with dataId {filtered_data_id} as part of comparing with" 

712 f" record values {byRecord[dimensionName]}" 

713 ) from None 

714 if len(recs) == 1: 

715 errmsg: list[str] = [] 

716 for k, v in values.items(): 

717 if (recval := getattr(recs[0], k)) != v: 

718 errmsg.append(f"{k} ({recval} != {v})") 

719 if errmsg: 

720 raise DimensionValueError( 

721 f"Dimension {dimensionName} in dataId has explicit value" 

722 f" {newDataId[dimensionName]} inconsistent with" 

723 f" {dimensionName} dimension record: " + ", ".join(errmsg) 

724 ) 

725 else: 

726 # Multiple matches for an explicit dimension 

727 # should never happen but let downstream complain. 

728 pass 

729 continue 

730 

731 # Do not use data ID keys in query that aren't relevant. 

732 # Otherwise we can have detector queries being constrained 

733 # by an exposure ID that doesn't exist and return no matches 

734 # for a detector even though it's a good detector name. 

735 filtered_data_id = { 

736 k: v 

737 for k, v in newDataId.items() 

738 if k in self.dimensions[dimensionName].minimal_group.names 

739 } 

740 

741 def _get_attr(obj: Any, attr: str) -> Any: 

742 # Used to implement x.exposure.seq_num when given 

743 # x and "exposure.seq_num". 

744 for component in attr.split("."): 

745 obj = getattr(obj, component) 

746 return obj 

747 

748 with self.query() as q: 

749 x = q.expression_factory 

750 # Build up a WHERE expression. 

751 predicates = tuple(_get_attr(x, f"{dimensionName}.{k}") == v for k, v in values.items()) 

752 extra_args: dict[str, Any] = {} # For mypy. 

753 extra_args.update(filtered_data_id) 

754 extra_args.update(kwargs) 

755 q = q.where(x.all(*predicates), **extra_args) 

756 records = set(q.dimension_records(dimensionName)) 

757 

758 if len(records) != 1: 

759 if len(records) > 1: 

760 # visit can have an ambiguous answer without involving 

761 # visit_system. The default visit_system is defined 

762 # by the instrument. 

763 if ( 

764 dimensionName == "visit" 

765 and "visit_system_membership" in self.dimensions 

766 and "visit_system" in self.dimensions["instrument"].metadata 

767 ): 

768 instrument_records = self.query_dimension_records( 

769 "instrument", 

770 data_id=newDataId, 

771 explain=False, 

772 **kwargs, 

773 ) 

774 if len(instrument_records) == 1: 

775 visit_system = instrument_records[0].visit_system 

776 if visit_system is None: 

777 # Set to a value that will never match. 

778 visit_system = -1 

779 

780 # Look up each visit in the 

781 # visit_system_membership records. 

782 for rec in records: 

783 membership = self.query_dimension_records( 

784 # Use bind to allow zero results. 

785 # This is a fully-specified query. 

786 "visit_system_membership", 

787 instrument=instrument_records[0].name, 

788 visit_system=visit_system, 

789 visit=rec.id, 

790 explain=False, 

791 ) 

792 if membership: 

793 # This record is the right answer. 

794 records = {rec} 

795 break 

796 

797 # The ambiguity may have been resolved so check again. 

798 if len(records) > 1: 

799 _LOG.debug( 

800 "Received %d records from constraints of %s", len(records), str(values) 

801 ) 

802 for r in records: 

803 _LOG.debug("- %s", str(r)) 

804 raise DimensionValueError( 

805 f"DataId specification for dimension {dimensionName} is not" 

806 f" uniquely constrained to a single dataset by {values}." 

807 f" Got {len(records)} results." 

808 ) 

809 else: 

810 raise DimensionValueError( 

811 f"DataId specification for dimension {dimensionName} matched no" 

812 f" records when constrained by {values}" 

813 ) 

814 

815 # Get the primary key from the real dimension object 

816 dimension = self.dimensions.dimensions[dimensionName] 

817 if not isinstance(dimension, Dimension): 

818 raise RuntimeError( 

819 f"{dimension.name} is not a true dimension, and cannot be used in data IDs." 

820 ) 

821 newDataId[dimensionName] = getattr(records.pop(), dimension.primaryKey.name) 

822 

823 return newDataId, kwargs 

824 

825 def _findDatasetRef( 

826 self, 

827 datasetRefOrType: DatasetRef | DatasetType | str, 

828 dataId: DataId | None = None, 

829 *, 

830 collections: Any = None, 

831 predict: bool = False, 

832 run: str | None = None, 

833 datastore_records: bool = False, 

834 timespan: Timespan | None = None, 

835 **kwargs: Any, 

836 ) -> DatasetRef: 

837 """Shared logic for methods that start with a search for a dataset in 

838 the registry. 

839 

840 Parameters 

841 ---------- 

842 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

843 When `DatasetRef` the `dataId` should be `None`. 

844 Otherwise the `DatasetType` or name thereof. 

845 dataId : `dict` or `DataCoordinate`, optional 

846 A `dict` of `Dimension` link name, value pairs that label the 

847 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

848 should be provided as the first argument. 

849 collections : Any, optional 

850 Collections to be searched, overriding ``self.collections``. 

851 Can be any of the types supported by the ``collections`` argument 

852 to butler construction. 

853 predict : `bool`, optional 

854 If `True`, return a newly created `DatasetRef` with a unique 

855 dataset ID if finding a reference in the `Registry` fails. 

856 Defaults to `False`. 

857 run : `str`, optional 

858 Run collection name to use for creating `DatasetRef` for predicted 

859 datasets. Only used if ``predict`` is `True`. 

860 datastore_records : `bool`, optional 

861 If `True` add datastore records to returned `DatasetRef`. 

862 timespan : `Timespan` or `None`, optional 

863 A timespan that the validity range of the dataset must overlap. 

864 If not provided and this is a calibration dataset type, an attempt 

865 will be made to find the timespan from any temporal coordinate 

866 in the data ID. 

867 **kwargs 

868 Additional keyword arguments used to augment or construct a 

869 `DataId`. See `DataId` parameters. 

870 

871 Returns 

872 ------- 

873 ref : `DatasetRef` 

874 A reference to the dataset identified by the given arguments. 

875 This can be the same dataset reference as given if it was 

876 resolved. 

877 

878 Raises 

879 ------ 

880 LookupError 

881 Raised if no matching dataset exists in the `Registry` (and 

882 ``predict`` is `False`). 

883 ValueError 

884 Raised if a resolved `DatasetRef` was passed as an input, but it 

885 differs from the one found in the registry. 

886 TypeError 

887 Raised if no collections were provided. 

888 """ 

889 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, for_put=False, **kwargs) 

890 if isinstance(datasetRefOrType, DatasetRef): 

891 if collections is not None: 

892 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=3) 

893 if predict and not datasetRefOrType.dataId.hasRecords(): 

894 return datasetRefOrType.expanded(self.registry.expandDataId(datasetRefOrType.dataId)) 

895 # May need to retrieve datastore records if requested. 

896 if datastore_records and datasetRefOrType._datastore_records is None: 

897 datasetRefOrType = self._registry.get_datastore_records(datasetRefOrType) 

898 return datasetRefOrType 

899 

900 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs) 

901 

902 if datasetType.isCalibration(): 

903 # Because this is a calibration dataset, first try to make a 

904 # standardize the data ID without restricting the dimensions to 

905 # those of the dataset type requested, because there may be extra 

906 # dimensions that provide temporal information for a validity-range 

907 # lookup. 

908 dataId = DataCoordinate.standardize( 

909 dataId, universe=self.dimensions, defaults=self._registry.defaults.dataId, **kwargs 

910 ) 

911 if timespan is None: 

912 if dataId.dimensions.temporal: 

913 dataId = self._registry.expandDataId(dataId) 

914 # Use the timespan from the data ID to constrain the 

915 # calibration lookup, but only if the caller has not 

916 # specified an explicit timespan. 

917 timespan = dataId.timespan 

918 else: 

919 # Try an arbitrary timespan. Downstream will fail if this 

920 # results in more than one matching dataset. 

921 timespan = Timespan(None, None) 

922 else: 

923 # Standardize the data ID to just the dimensions of the dataset 

924 # type instead of letting registry.findDataset do it, so we get the 

925 # result even if no dataset is found. 

926 dataId = DataCoordinate.standardize( 

927 dataId, 

928 dimensions=datasetType.dimensions, 

929 defaults=self._registry.defaults.dataId, 

930 **kwargs, 

931 ) 

932 # Always lookup the DatasetRef, even if one is given, to ensure it is 

933 # present in the current collection. 

934 ref = self.find_dataset( 

935 datasetType, 

936 dataId, 

937 collections=collections, 

938 timespan=timespan, 

939 datastore_records=datastore_records, 

940 ) 

941 if ref is None: 

942 if predict: 

943 if run is None: 

944 run = self.run 

945 if run is None: 

946 raise TypeError("Cannot predict dataset ID/location with run=None.") 

947 dataId = self.registry.expandDataId(dataId) 

948 return DatasetRef(datasetType, dataId, run=run) 

949 else: 

950 if collections is None: 

951 collections = self._registry.defaults.collections 

952 raise DatasetNotFoundError( 

953 f"Dataset {datasetType.name} with data ID {dataId} " 

954 f"could not be found in collections {collections}." 

955 ) 

956 if datasetType != ref.datasetType: 

957 # If they differ it is because the user explicitly specified 

958 # a compatible dataset type to this call rather than using the 

959 # registry definition. The DatasetRef must therefore be recreated 

960 # using the user definition such that the expected type is 

961 # returned. 

962 ref = DatasetRef( 

963 datasetType, ref.dataId, run=ref.run, id=ref.id, datastore_records=ref._datastore_records 

964 ) 

965 

966 return ref 

967 

968 @transactional 

969 def put( 

970 self, 

971 obj: Any, 

972 datasetRefOrType: DatasetRef | DatasetType | str, 

973 /, 

974 dataId: DataId | None = None, 

975 *, 

976 run: str | None = None, 

977 provenance: DatasetProvenance | None = None, 

978 **kwargs: Any, 

979 ) -> DatasetRef: 

980 """Store and register a dataset. 

981 

982 Parameters 

983 ---------- 

984 obj : `object` 

985 The dataset. 

986 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

987 When `DatasetRef` is provided, ``dataId`` should be `None`. 

988 Otherwise the `DatasetType` or name thereof. If a fully resolved 

989 `DatasetRef` is given the run and ID are used directly. 

990 dataId : `dict` or `DataCoordinate` 

991 A `dict` of `Dimension` link name, value pairs that label the 

992 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

993 should be provided as the second argument. 

994 run : `str`, optional 

995 The name of the run the dataset should be added to, overriding 

996 ``self.run``. Not used if a resolved `DatasetRef` is provided. 

997 provenance : `DatasetProvenance` or `None`, optional 

998 Any provenance that should be attached to the serialized dataset. 

999 Not supported by all serialization mechanisms. 

1000 **kwargs 

1001 Additional keyword arguments used to augment or construct a 

1002 `DataCoordinate`. See `DataCoordinate.standardize` 

1003 parameters. Not used if a resolve `DatasetRef` is provided. 

1004 

1005 Returns 

1006 ------- 

1007 ref : `DatasetRef` 

1008 A reference to the stored dataset, updated with the correct id if 

1009 given. 

1010 

1011 Raises 

1012 ------ 

1013 TypeError 

1014 Raised if the butler is read-only or if no run has been provided. 

1015 """ 

1016 if isinstance(datasetRefOrType, DatasetRef): 

1017 # This is a direct put of predefined DatasetRef. 

1018 _LOG.debug("Butler put direct: %s", datasetRefOrType) 

1019 if run is not None: 

1020 warnings.warn("Run collection is not used for DatasetRef", stacklevel=3) 

1021 

1022 with self._metrics.instrument_put(_LOG, msg="Dataset put direct"): 

1023 # If registry already has a dataset with the same dataset ID, 

1024 # dataset type and DataId, then _importDatasets will do 

1025 # nothing and just return an original ref. We have to raise in 

1026 # this case, there is a datastore check below for that. 

1027 self._registry._importDatasets([datasetRefOrType], expand=True) 

1028 # Before trying to write to the datastore check that it does 

1029 # not know this dataset. This is prone to races, of course. 

1030 if self._datastore.knows(datasetRefOrType): 

1031 raise ConflictingDefinitionError( 

1032 f"Datastore already contains dataset: {datasetRefOrType}" 

1033 ) 

1034 # Try to write dataset to the datastore, if it fails due to a 

1035 # race with another write, the content of stored data may be 

1036 # unpredictable. 

1037 try: 

1038 self._datastore.put(obj, datasetRefOrType, provenance=provenance) 

1039 except IntegrityError as e: 

1040 raise ConflictingDefinitionError(f"Datastore already contains dataset: {e}") from e 

1041 

1042 return datasetRefOrType 

1043 

1044 _LOG.debug("Butler put: %s, dataId=%s, run=%s", datasetRefOrType, dataId, run) 

1045 if not self.isWriteable(): 

1046 raise TypeError("Butler is read-only.") 

1047 

1048 with self._metrics.instrument_put(_LOG, msg="Dataset put with dataID"): 

1049 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwargs) 

1050 

1051 # Handle dimension records in dataId 

1052 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs) 

1053 

1054 # Add Registry Dataset entry. 

1055 dataId = self._registry.expandDataId(dataId, dimensions=datasetType.dimensions, **kwargs) 

1056 (ref,) = self._registry.insertDatasets(datasetType, run=run, dataIds=[dataId]) 

1057 self._datastore.put(obj, ref, provenance=provenance) 

1058 

1059 return ref 

1060 

1061 def getDeferred( 

1062 self, 

1063 datasetRefOrType: DatasetRef | DatasetType | str, 

1064 /, 

1065 dataId: DataId | None = None, 

1066 *, 

1067 parameters: dict | None = None, 

1068 collections: Any = None, 

1069 storageClass: str | StorageClass | None = None, 

1070 timespan: Timespan | None = None, 

1071 **kwargs: Any, 

1072 ) -> DeferredDatasetHandle: 

1073 """Create a `DeferredDatasetHandle` which can later retrieve a dataset, 

1074 after an immediate registry lookup. 

1075 

1076 Parameters 

1077 ---------- 

1078 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

1079 When `DatasetRef` the `dataId` should be `None`. 

1080 Otherwise the `DatasetType` or name thereof. 

1081 dataId : `dict` or `DataCoordinate`, optional 

1082 A `dict` of `Dimension` link name, value pairs that label the 

1083 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

1084 should be provided as the first argument. 

1085 parameters : `dict` 

1086 Additional StorageClass-defined options to control reading, 

1087 typically used to efficiently read only a subset of the dataset. 

1088 collections : Any, optional 

1089 Collections to be searched, overriding ``self.collections``. 

1090 Can be any of the types supported by the ``collections`` argument 

1091 to butler construction. 

1092 storageClass : `StorageClass` or `str`, optional 

1093 The storage class to be used to override the Python type 

1094 returned by this method. By default the returned type matches 

1095 the dataset type definition for this dataset. Specifying a 

1096 read `StorageClass` can force a different type to be returned. 

1097 This type must be compatible with the original type. 

1098 timespan : `Timespan` or `None`, optional 

1099 A timespan that the validity range of the dataset must overlap. 

1100 If not provided and this is a calibration dataset type, an attempt 

1101 will be made to find the timespan from any temporal coordinate 

1102 in the data ID. 

1103 **kwargs 

1104 Additional keyword arguments used to augment or construct a 

1105 `DataId`. See `DataId` parameters. 

1106 

1107 Returns 

1108 ------- 

1109 obj : `DeferredDatasetHandle` 

1110 A handle which can be used to retrieve a dataset at a later time. 

1111 

1112 Raises 

1113 ------ 

1114 LookupError 

1115 Raised if no matching dataset exists in the `Registry` or 

1116 datastore. 

1117 ValueError 

1118 Raised if a resolved `DatasetRef` was passed as an input, but it 

1119 differs from the one found in the registry. 

1120 TypeError 

1121 Raised if no collections were provided. 

1122 """ 

1123 if isinstance(datasetRefOrType, DatasetRef): 

1124 # Do the quick check first and if that fails, check for artifact 

1125 # existence. This is necessary for datastores that are configured 

1126 # in trust mode where there won't be a record but there will be 

1127 # a file. 

1128 if self._datastore.knows(datasetRefOrType) or self._datastore.exists(datasetRefOrType): 

1129 ref = datasetRefOrType 

1130 else: 

1131 raise LookupError(f"Dataset reference {datasetRefOrType} does not exist.") 

1132 else: 

1133 ref = self._findDatasetRef( 

1134 datasetRefOrType, dataId, collections=collections, timespan=timespan, **kwargs 

1135 ) 

1136 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass) 

1137 

1138 def get( 

1139 self, 

1140 datasetRefOrType: DatasetRef | DatasetType | str, 

1141 /, 

1142 dataId: DataId | None = None, 

1143 *, 

1144 parameters: dict[str, Any] | None = None, 

1145 collections: Any = None, 

1146 storageClass: StorageClass | str | None = None, 

1147 timespan: Timespan | None = None, 

1148 **kwargs: Any, 

1149 ) -> Any: 

1150 """Retrieve a stored dataset. 

1151 

1152 Parameters 

1153 ---------- 

1154 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

1155 When `DatasetRef` the `dataId` should be `None`. 

1156 Otherwise the `DatasetType` or name thereof. 

1157 If a resolved `DatasetRef`, the associated dataset 

1158 is returned directly without additional querying. 

1159 dataId : `dict` or `DataCoordinate` 

1160 A `dict` of `Dimension` link name, value pairs that label the 

1161 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

1162 should be provided as the first argument. 

1163 parameters : `dict` 

1164 Additional StorageClass-defined options to control reading, 

1165 typically used to efficiently read only a subset of the dataset. 

1166 collections : Any, optional 

1167 Collections to be searched, overriding ``self.collections``. 

1168 Can be any of the types supported by the ``collections`` argument 

1169 to butler construction. 

1170 storageClass : `StorageClass` or `str`, optional 

1171 The storage class to be used to override the Python type 

1172 returned by this method. By default the returned type matches 

1173 the dataset type definition for this dataset. Specifying a 

1174 read `StorageClass` can force a different type to be returned. 

1175 This type must be compatible with the original type. 

1176 timespan : `Timespan` or `None`, optional 

1177 A timespan that the validity range of the dataset must overlap. 

1178 If not provided and this is a calibration dataset type, an attempt 

1179 will be made to find the timespan from any temporal coordinate 

1180 in the data ID. 

1181 **kwargs 

1182 Additional keyword arguments used to augment or construct a 

1183 `DataCoordinate`. See `DataCoordinate.standardize` 

1184 parameters. 

1185 

1186 Returns 

1187 ------- 

1188 obj : `object` 

1189 The dataset. 

1190 

1191 Raises 

1192 ------ 

1193 LookupError 

1194 Raised if no matching dataset exists in the `Registry`. 

1195 TypeError 

1196 Raised if no collections were provided. 

1197 

1198 Notes 

1199 ----- 

1200 When looking up datasets in a `~CollectionType.CALIBRATION` collection, 

1201 this method requires that the given data ID include temporal dimensions 

1202 beyond the dimensions of the dataset type itself, in order to find the 

1203 dataset with the appropriate validity range. For example, a "bias" 

1204 dataset with native dimensions ``{instrument, detector}`` could be 

1205 fetched with a ``{instrument, detector, exposure}`` data ID, because 

1206 ``exposure`` is a temporal dimension. 

1207 """ 

1208 _LOG.debug("Butler get: %s, dataId=%s, parameters=%s", datasetRefOrType, dataId, parameters) 

1209 with self._metrics.instrument_get(_LOG, msg="Retrieved dataset"): 

1210 ref = self._findDatasetRef( 

1211 datasetRefOrType, 

1212 dataId, 

1213 collections=collections, 

1214 datastore_records=True, 

1215 timespan=timespan, 

1216 **kwargs, 

1217 ) 

1218 return self._datastore.get(ref, parameters=parameters, storageClass=storageClass) 

1219 

1220 def getURIs( 

1221 self, 

1222 datasetRefOrType: DatasetRef | DatasetType | str, 

1223 /, 

1224 dataId: DataId | None = None, 

1225 *, 

1226 predict: bool = False, 

1227 collections: Any = None, 

1228 run: str | None = None, 

1229 **kwargs: Any, 

1230 ) -> DatasetRefURIs: 

1231 """Return the URIs associated with the dataset. 

1232 

1233 Parameters 

1234 ---------- 

1235 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

1236 When `DatasetRef` the `dataId` should be `None`. 

1237 Otherwise the `DatasetType` or name thereof. 

1238 dataId : `dict` or `DataCoordinate` 

1239 A `dict` of `Dimension` link name, value pairs that label the 

1240 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

1241 should be provided as the first argument. 

1242 predict : `bool` 

1243 If `True`, allow URIs to be returned of datasets that have not 

1244 been written. 

1245 collections : Any, optional 

1246 Collections to be searched, overriding ``self.collections``. 

1247 Can be any of the types supported by the ``collections`` argument 

1248 to butler construction. 

1249 run : `str`, optional 

1250 Run to use for predictions, overriding ``self.run``. 

1251 **kwargs 

1252 Additional keyword arguments used to augment or construct a 

1253 `DataCoordinate`. See `DataCoordinate.standardize` 

1254 parameters. 

1255 

1256 Returns 

1257 ------- 

1258 uris : `DatasetRefURIs` 

1259 The URI to the primary artifact associated with this dataset (if 

1260 the dataset was disassembled within the datastore this may be 

1261 `None`), and the URIs to any components associated with the dataset 

1262 artifact. (can be empty if there are no components). 

1263 """ 

1264 ref = self._findDatasetRef( 

1265 datasetRefOrType, dataId, predict=predict, run=run, collections=collections, **kwargs 

1266 ) 

1267 return self._datastore.getURIs(ref, predict) 

1268 

1269 def get_dataset_type(self, name: str) -> DatasetType: 

1270 return self._registry.getDatasetType(name) 

1271 

1272 def get_dataset( 

1273 self, 

1274 id: DatasetId | str, 

1275 *, 

1276 storage_class: str | StorageClass | None = None, 

1277 dimension_records: bool = False, 

1278 datastore_records: bool = False, 

1279 ) -> DatasetRef | None: 

1280 id = _to_uuid(id) 

1281 ref = self._registry.getDataset(id) 

1282 if ref is not None: 

1283 if dimension_records: 

1284 ref = ref.expanded( 

1285 self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions) 

1286 ) 

1287 if storage_class: 

1288 ref = ref.overrideStorageClass(storage_class) 

1289 if datastore_records: 

1290 ref = self._registry.get_datastore_records(ref) 

1291 return ref 

1292 

1293 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]: 

1294 uuids = [_to_uuid(id) for id in ids] 

1295 return self._registry._managers.datasets.get_dataset_refs(uuids) 

1296 

1297 def find_dataset( 

1298 self, 

1299 dataset_type: DatasetType | str, 

1300 data_id: DataId | None = None, 

1301 *, 

1302 collections: str | Sequence[str] | None = None, 

1303 timespan: Timespan | None = None, 

1304 storage_class: str | StorageClass | None = None, 

1305 dimension_records: bool = False, 

1306 datastore_records: bool = False, 

1307 **kwargs: Any, 

1308 ) -> DatasetRef | None: 

1309 # Handle any parts of the dataID that are not using primary dimension 

1310 # keys. 

1311 if isinstance(dataset_type, str): 

1312 actual_type = self.get_dataset_type(dataset_type) 

1313 else: 

1314 actual_type = dataset_type 

1315 

1316 # Store the component for later. 

1317 component_name = actual_type.component() 

1318 if actual_type.isComponent(): 

1319 parent_type = actual_type.makeCompositeDatasetType() 

1320 else: 

1321 parent_type = actual_type 

1322 

1323 data_id, kwargs = self._rewrite_data_id(data_id, parent_type, **kwargs) 

1324 

1325 ref = self.registry.findDataset( 

1326 parent_type, 

1327 data_id, 

1328 collections=collections, 

1329 timespan=timespan, 

1330 datastore_records=datastore_records, 

1331 **kwargs, 

1332 ) 

1333 if ref is not None and dimension_records: 

1334 ref = ref.expanded(self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions)) 

1335 if ref is not None and component_name: 

1336 ref = ref.makeComponentRef(component_name) 

1337 if ref is not None and storage_class is not None: 

1338 ref = ref.overrideStorageClass(storage_class) 

1339 

1340 return ref 

1341 

1342 def retrieve_artifacts_zip( 

1343 self, 

1344 refs: Iterable[DatasetRef], 

1345 destination: ResourcePathExpression, 

1346 overwrite: bool = True, 

1347 ) -> ResourcePath: 

1348 return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite) 

1349 

1350 def retrieveArtifacts( 

1351 self, 

1352 refs: Iterable[DatasetRef], 

1353 destination: ResourcePathExpression, 

1354 transfer: str = "auto", 

1355 preserve_path: bool = True, 

1356 overwrite: bool = False, 

1357 ) -> list[ResourcePath]: 

1358 # Docstring inherited. 

1359 outdir = ResourcePath(destination) 

1360 artifact_map = self._datastore.retrieveArtifacts( 

1361 refs, 

1362 outdir, 

1363 transfer=transfer, 

1364 preserve_path=preserve_path, 

1365 overwrite=overwrite, 

1366 write_index=True, 

1367 ) 

1368 return list(artifact_map) 

1369 

1370 def exists( 

1371 self, 

1372 dataset_ref_or_type: DatasetRef | DatasetType | str, 

1373 /, 

1374 data_id: DataId | None = None, 

1375 *, 

1376 full_check: bool = True, 

1377 collections: Any = None, 

1378 **kwargs: Any, 

1379 ) -> DatasetExistence: 

1380 # Docstring inherited. 

1381 existence = DatasetExistence.UNRECOGNIZED 

1382 

1383 if isinstance(dataset_ref_or_type, DatasetRef): 

1384 if collections is not None: 

1385 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=2) 

1386 if data_id is not None: 

1387 warnings.warn("A DataID should not be specified with DatasetRef", stacklevel=2) 

1388 ref = dataset_ref_or_type 

1389 registry_ref = self._registry.getDataset(dataset_ref_or_type.id) 

1390 if registry_ref is not None: 

1391 existence |= DatasetExistence.RECORDED 

1392 

1393 if dataset_ref_or_type != registry_ref: 

1394 # This could mean that storage classes differ, so we should 

1395 # check for that but use the registry ref for the rest of 

1396 # the method. 

1397 if registry_ref.is_compatible_with(dataset_ref_or_type): 

1398 # Use the registry version from now on. 

1399 ref = registry_ref 

1400 else: 

1401 raise ValueError( 

1402 f"The ref given to exists() ({ref}) has the same dataset ID as one " 

1403 f"in registry but has different incompatible values ({registry_ref})." 

1404 ) 

1405 else: 

1406 try: 

1407 ref = self._findDatasetRef(dataset_ref_or_type, data_id, collections=collections, **kwargs) 

1408 except (LookupError, TypeError): 

1409 return existence 

1410 existence |= DatasetExistence.RECORDED 

1411 

1412 if self._datastore.knows(ref): 

1413 existence |= DatasetExistence.DATASTORE 

1414 

1415 if full_check: 

1416 if self._datastore.exists(ref): 

1417 existence |= DatasetExistence._ARTIFACT 

1418 elif existence.value != DatasetExistence.UNRECOGNIZED.value: 

1419 # Do not add this flag if we have no other idea about a dataset. 

1420 existence |= DatasetExistence(DatasetExistence._ASSUMED) 

1421 

1422 return existence 

1423 

1424 def _exists_many( 

1425 self, 

1426 refs: Iterable[DatasetRef], 

1427 /, 

1428 *, 

1429 full_check: bool = True, 

1430 ) -> dict[DatasetRef, DatasetExistence]: 

1431 # Docstring inherited. 

1432 existence = {ref: DatasetExistence.UNRECOGNIZED for ref in refs} 

1433 

1434 # Check which refs exist in the registry. 

1435 id_map = {ref.id: ref for ref in existence.keys()} 

1436 for registry_ref in self.get_many_datasets(id_map.keys()): 

1437 # Consistency between the given DatasetRef and the information 

1438 # recorded in the registry is not verified. 

1439 existence[id_map[registry_ref.id]] |= DatasetExistence.RECORDED 

1440 

1441 # Ask datastore if it knows about these refs. 

1442 knows = self._datastore.knows_these(refs) 

1443 for ref, known in knows.items(): 

1444 if known: 

1445 existence[ref] |= DatasetExistence.DATASTORE 

1446 

1447 if full_check: 

1448 mexists = self._datastore.mexists(refs) 

1449 for ref, exists in mexists.items(): 

1450 if exists: 

1451 existence[ref] |= DatasetExistence._ARTIFACT 

1452 else: 

1453 # Do not set this flag if nothing is known about the dataset. 

1454 for ref in existence: 

1455 if existence[ref] != DatasetExistence.UNRECOGNIZED: 

1456 existence[ref] |= DatasetExistence._ASSUMED 

1457 

1458 return existence 

1459 

1460 def removeRuns( 

1461 self, 

1462 names: Iterable[str], 

1463 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault, 

1464 *, 

1465 unlink_from_chains: bool = False, 

1466 ) -> None: 

1467 # Docstring inherited. 

1468 if not self.isWriteable(): 

1469 raise TypeError("Butler is read-only.") 

1470 

1471 if unstore is not _DeprecatedDefault: 

1472 # The value was passed in by a user. Must report it is now 

1473 # ignored. 

1474 if unstore is True: 

1475 msg = "The unstore parameter is deprecated and is now always treated as True. " 

1476 else: 

1477 msg = "The unstore parameter for removeRuns can no longer be False and is now ignored. " 

1478 warnings.warn( 

1479 msg + " The parameter will be removed after v30.", 

1480 category=FutureWarning, 

1481 stacklevel=find_outside_stacklevel("lsst.daf.butler"), 

1482 ) 

1483 

1484 names = list(names) 

1485 refs: list[DatasetRef] = [] 

1486 # Map of the chained collections to the RUN children. 

1487 parents_to_children: dict[str, set[str]] = defaultdict(set) 

1488 

1489 with self._caching_context(): 

1490 # Get information about these RUNs. 

1491 collections_info = self.collections.query_info(names, include_parents=unlink_from_chains) 

1492 for info in collections_info: 

1493 if info.type is not CollectionType.RUN: 

1494 raise TypeError(f"The collection type of '{info.name}' is {info.type.name}, not RUN.") 

1495 if unlink_from_chains: 

1496 if info.parents is None: # For mypy. 

1497 raise AssertionError("Internal error: Collection parents required but not received") 

1498 for parent in info.parents: 

1499 parents_to_children[parent].add(info.name) 

1500 

1501 # Update the names in case the query unexpectedly had a wildcard. 

1502 names = [info.name for info in collections_info] 

1503 

1504 # Get all the datasets from these runs. 

1505 refs = self.query_all_datasets(names, find_first=False, limit=None) 

1506 

1507 # Call pruneDatasets since we are deliberately removing 

1508 # datasets in chunks from the RUN collections rather than 

1509 # attempting to remove everything at once. 

1510 with time_this( 

1511 _LOG, 

1512 msg="Removing %d dataset%s from %s", 

1513 args=(len(refs), "s" if len(refs) != 1 else "", ", ".join(names)), 

1514 ): 

1515 self.pruneDatasets(refs, unstore=True, purge=True, disassociate=True) 

1516 

1517 # Now can remove the actual RUN collection and unlink from chains. 

1518 with self._registry.transaction(): 

1519 # This will fail if caller is not unlinking from chains but the 

1520 # RUN is in a chain -- but we have already deleted all the datasets 

1521 # by this point. 

1522 if unlink_from_chains: 

1523 # Use deterministic order for deletions to attempt to minimize 

1524 # risk of deadlocks for parallel deletes. 

1525 for parent in sorted(parents_to_children): 

1526 self.collections.remove_from_chain(parent, sorted(parents_to_children[parent])) 

1527 # Sort to avoid potential deadlocks. 

1528 for name in sorted(names): 

1529 # This should be fast since the collection should be empty. 

1530 with time_this(_LOG, msg="Removing RUN collection %s", args=(name,)): 

1531 self._registry.removeCollection(name) 

1532 _LOG.info("Completely removed the following RUN collections: %s", ", ".join(names)) 

1533 

1534 def pruneDatasets( 

1535 self, 

1536 refs: Iterable[DatasetRef], 

1537 *, 

1538 disassociate: bool = True, 

1539 unstore: bool = False, 

1540 tags: Iterable[str] = (), 

1541 purge: bool = False, 

1542 ) -> None: 

1543 # docstring inherited from LimitedButler 

1544 

1545 if not self.isWriteable(): 

1546 raise TypeError("Butler is read-only.") 

1547 if purge: 

1548 if not disassociate: 

1549 raise TypeError("Cannot pass purge=True without disassociate=True.") 

1550 if not unstore: 

1551 raise TypeError("Cannot pass purge=True without unstore=True.") 

1552 elif disassociate: 

1553 tags = tuple(tags) 

1554 if not tags: 

1555 raise TypeError("No tags provided but disassociate=True.") 

1556 for tag in tags: 

1557 collectionType = self._registry.getCollectionType(tag) 

1558 if collectionType is not CollectionType.TAGGED: 

1559 raise TypeError( 

1560 f"Cannot disassociate from collection '{tag}' " 

1561 f"of non-TAGGED type {collectionType.name}." 

1562 ) 

1563 # Transform possibly-single-pass iterable into something we can iterate 

1564 # over multiple times. 

1565 refs = list(refs) 

1566 # Pruning a component of a DatasetRef makes no sense since registry 

1567 # doesn't know about components and datastore might not store 

1568 # components in a separate file 

1569 for ref in refs: 

1570 if ref.datasetType.component(): 

1571 raise ValueError(f"Can not prune a component of a dataset (ref={ref})") 

1572 

1573 # Chunk the deletions using a size that is reasonably efficient whilst 

1574 # also giving reasonable feedback to the user. Chunking also minimizes 

1575 # what needs to rollback if there is a failure and should allow 

1576 # incremental re-running of the pruning (assuming the query is 

1577 # repeated). The only issue will be if the Ctrl-C comes during 

1578 # emptyTrash since an admin command would need to run to finish the 

1579 # emptying of that chunk. 

1580 progress = Progress("lsst.daf.butler.Butler.pruneDatasets", level=_LOG.INFO) 

1581 chunk_size = 50_000 

1582 n_chunks = math.ceil(len(refs) / chunk_size) 

1583 if n_chunks > 1: 

1584 _LOG.verbose("Pruning a total of %d datasets", len(refs)) 

1585 chunk_num = 0 

1586 for chunked_refs in progress.wrap( 

1587 chunk_iterable(refs, chunk_size=chunk_size), desc="Deleting datasets", total=n_chunks 

1588 ): 

1589 chunk_num += 1 

1590 _LOG.verbose( 

1591 "Pruning %d dataset%s in chunk %d/%d", 

1592 len(chunked_refs), 

1593 "s" if len(chunked_refs) != 1 else "", 

1594 chunk_num, 

1595 n_chunks, 

1596 ) 

1597 with time_this( 

1598 _LOG, 

1599 msg="Removing %d datasets for chunk %d/%d", 

1600 args=(len(chunked_refs), chunk_num, n_chunks), 

1601 ): 

1602 self._prune_datasets( 

1603 chunked_refs, tags=tags, unstore=unstore, purge=purge, disassociate=disassociate 

1604 ) 

1605 

1606 def _prune_datasets( 

1607 self, 

1608 refs: Collection[DatasetRef], 

1609 *, 

1610 disassociate: bool = True, 

1611 unstore: bool = False, 

1612 tags: Iterable[str] = (), 

1613 purge: bool = False, 

1614 ) -> None: 

1615 # We don't need an unreliable Datastore transaction for this, because 

1616 # we've been extra careful to ensure that Datastore.trash only involves 

1617 # mutating the Registry (it can _look_ at Datastore-specific things, 

1618 # but shouldn't change them), and hence all operations here are 

1619 # Registry operations. 

1620 with self.transaction(): 

1621 plural = "s" if len(refs) != 1 else "" 

1622 if unstore: 

1623 with time_this( 

1624 _LOG, 

1625 msg="Marking %d dataset%s for removal during pruneDatasets", 

1626 args=(len(refs), plural), 

1627 ): 

1628 self._datastore.trash(refs) 

1629 if purge: 

1630 with time_this( 

1631 _LOG, msg="Removing %d pruned dataset%s from registry", args=(len(refs), plural) 

1632 ): 

1633 self._registry.removeDatasets(refs) 

1634 elif disassociate: 

1635 assert tags, "Guaranteed by earlier logic in this function." 

1636 with time_this( 

1637 _LOG, msg="Disassociating %d dataset%ss from tagged collections", args=(len(refs), plural) 

1638 ): 

1639 for tag in tags: 

1640 self._registry.disassociate(tag, refs) 

1641 # We've exited the Registry transaction, and apparently committed. 

1642 # (if there was an exception, everything rolled back, and it's as if 

1643 # nothing happened - and we never get here). 

1644 # Datastore artifacts are not yet gone, but they're clearly marked 

1645 # as trash, so if we fail to delete now because of (e.g.) filesystem 

1646 # problems we can try again later, and if manual administrative 

1647 # intervention is required, it's pretty clear what that should entail: 

1648 # deleting everything on disk and in private Datastore tables that is 

1649 # in the dataset_location_trash table. 

1650 if unstore: 

1651 # Point of no return for removing artifacts. Restrict the trash 

1652 # emptying to the refs that this call trashed. 

1653 with time_this( 

1654 _LOG, 

1655 msg="Attempting to remove artifacts for %d dataset%s associated with pruning", 

1656 args=(len(refs), plural), 

1657 ): 

1658 self._datastore.emptyTrash(refs=refs) 

1659 

1660 def ingest_zip( 

1661 self, 

1662 zip_file: ResourcePathExpression, 

1663 transfer: str = "auto", 

1664 *, 

1665 transfer_dimensions: bool = False, 

1666 dry_run: bool = False, 

1667 skip_existing: bool = False, 

1668 ) -> None: 

1669 # Docstring inherited. 

1670 if not self.isWriteable(): 

1671 raise TypeError("Butler is read-only.") 

1672 

1673 zip_path = ResourcePath(zip_file) 

1674 index = ZipIndex.from_zip_file(zip_path) 

1675 _LOG.verbose( 

1676 "Ingesting %s containing %d datasets and %d files.", zip_path, len(index.refs), len(index) 

1677 ) 

1678 

1679 # Need to ingest the refs into registry. Re-use the standard ingest 

1680 # code by reconstructing FileDataset from the index. 

1681 refs = index.refs.to_refs(universe=self.dimensions) 

1682 id_to_ref = {ref.id: ref for ref in refs} 

1683 datasets: list[FileDataset] = [] 

1684 processed_ids: set[uuid.UUID] = set() 

1685 for path_in_zip, index_info in index.artifact_map.items(): 

1686 # Disassembled composites need to check this ref isn't already 

1687 # included. 

1688 unprocessed = {id_ for id_ in index_info.ids if id_ not in processed_ids} 

1689 if not unprocessed: 

1690 continue 

1691 dataset = FileDataset(refs=[id_to_ref[id_] for id_ in unprocessed], path=path_in_zip) 

1692 datasets.append(dataset) 

1693 processed_ids.update(unprocessed) 

1694 

1695 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets) 

1696 if existing_datasets: 

1697 if skip_existing: 

1698 _LOG.info( 

1699 "Skipping %d datasets from zip file %s which already exist in the repository.", 

1700 len(existing_datasets), 

1701 zip_file, 

1702 ) 

1703 else: 

1704 raise ConflictingDefinitionError( 

1705 f"Datastore already contains {len(existing_datasets)} of the given datasets." 

1706 f" Example: {existing_datasets[0]}" 

1707 ) 

1708 if new_datasets: 

1709 # Can not yet support partial zip ingests where a zip contains 

1710 # some datasets that are already in another zip. 

1711 raise ValueError( 

1712 f"The given zip file from {zip_file} contains {len(new_datasets)} datasets not known " 

1713 f"to this butler but also contains {len(existing_datasets)} datasets already known to " 

1714 "this butler. Currently butler can not ingest zip files with overlapping content." 

1715 ) 

1716 return 

1717 

1718 # Ingest doesn't create the RUN collections so we have to do that 

1719 # here. 

1720 # 

1721 # Sort by run collection name to ensure Postgres takes locks in the 

1722 # same order between different processes, to mitigate an issue 

1723 # where Postgres can deadlock due to the unique index on collection 

1724 # name. (See DM-47543). 

1725 runs = {ref.run for ref in refs} 

1726 for run in sorted(runs): 

1727 registered = self.collections.register(run) 

1728 if registered: 

1729 _LOG.verbose("Created RUN collection %s as part of zip ingest", run) 

1730 

1731 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE) 

1732 import_info = self._prepare_ingest_file_datasets( 

1733 datasets, progress, dry_run=dry_run, transfer_dimensions=transfer_dimensions 

1734 ) 

1735 

1736 # Calculate some statistics based on the given list of datasets. 

1737 n_datasets = 0 

1738 for d in datasets: 

1739 n_datasets += len(d.refs) 

1740 srefs = "s" if n_datasets != 1 else "" 

1741 

1742 with ( 

1743 self._metrics.instrument_ingest( 

1744 n_datasets, 

1745 _LOG, 

1746 msg="Ingesting zip file %s with %s dataset%s", 

1747 args=(zip_file, n_datasets, srefs), 

1748 ), 

1749 self.transaction(), 

1750 ): 

1751 # Do not need expanded dataset refs so can ignore the return value. 

1752 self._ingest_file_datasets(datasets, import_info, progress, dry_run=dry_run) 

1753 

1754 try: 

1755 self._datastore.ingest_zip(zip_path, transfer=transfer, dry_run=dry_run) 

1756 except IntegrityError as e: 

1757 raise ConflictingDefinitionError( 

1758 f"Datastore already contains one or more datasets: {e}" 

1759 ) from e 

1760 

1761 def _prepare_ingest_file_datasets( 

1762 self, 

1763 datasets: Sequence[FileDataset], 

1764 progress: Progress, 

1765 *, 

1766 transfer_dimensions: bool = False, 

1767 dry_run: bool = False, 

1768 ) -> _ImportDatasetsInfo: 

1769 # Track DataIDs that are being ingested so we can spot issues early 

1770 # with duplication. Retain previous FileDataset so we can report it. 

1771 groupedDataIds: MutableMapping[tuple[DatasetType, str], dict[DataCoordinate, FileDataset]] = ( 

1772 defaultdict(dict) 

1773 ) 

1774 

1775 # All the refs we need to import. 

1776 refs: list[DatasetRef] = [] 

1777 

1778 for dataset in progress.wrap(datasets, desc="Validating dataIDs"): 

1779 for ref in dataset.refs: 

1780 group_key = (ref.datasetType, ref.run) 

1781 

1782 if ref.dataId in groupedDataIds[group_key]: 

1783 raise ConflictingDefinitionError( 

1784 f"Ingest conflict. Dataset {dataset.path} has same" 

1785 " DataId as other ingest dataset" 

1786 f" {groupedDataIds[group_key][ref.dataId].path} " 

1787 f" ({ref.dataId})" 

1788 ) 

1789 

1790 groupedDataIds[group_key][ref.dataId] = dataset 

1791 refs.extend(dataset.refs) 

1792 

1793 # Ensure that dataset types are created and all ref information 

1794 # extracted. 

1795 import_info = self._prepare_for_import_refs( 

1796 self, 

1797 refs, 

1798 register_dataset_types=True, 

1799 dry_run=dry_run, 

1800 transfer_dimensions=transfer_dimensions, 

1801 ) 

1802 return import_info 

1803 

1804 def _ingest_file_datasets( 

1805 self, 

1806 datasets: Sequence[FileDataset], 

1807 import_info: _ImportDatasetsInfo, 

1808 progress: Progress, 

1809 *, 

1810 dry_run: bool = False, 

1811 ) -> None: 

1812 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run) 

1813 imported_refs = self._import_grouped_refs( 

1814 import_info.grouped_refs, None, progress, dry_run=dry_run, expand_refs=True 

1815 ) 

1816 

1817 # The expanded refs need to be attached back to the original 

1818 # FileDatasets for datastore to use. 

1819 id_to_ref = {ref.id: ref for ref in imported_refs} 

1820 

1821 for dataset in progress.wrap(datasets, desc="Re-attaching expanded refs"): 

1822 dataset.refs = [id_to_ref[ref.id] for ref in dataset.refs] 

1823 

1824 def ingest( 

1825 self, 

1826 *datasets: FileDataset, 

1827 transfer: str | None = "auto", 

1828 record_validation_info: bool = True, 

1829 skip_existing: bool = False, 

1830 ) -> None: 

1831 # Docstring inherited. 

1832 if not datasets: 

1833 return 

1834 if not self.isWriteable(): 

1835 raise TypeError("Butler is read-only.") 

1836 _LOG.verbose("Ingesting %d file dataset%s.", len(datasets), "" if len(datasets) == 1 else "s") 

1837 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE) 

1838 

1839 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets) 

1840 if existing_datasets: 

1841 if skip_existing: 

1842 _LOG.info( 

1843 "Skipping %d datasets which already exist in the repository.", len(existing_datasets) 

1844 ) 

1845 else: 

1846 raise ConflictingDefinitionError( 

1847 f"Datastore already contains {len(existing_datasets)} of the given datasets." 

1848 f" Example: {existing_datasets[0]}" 

1849 ) 

1850 

1851 # Calculate some statistics based on the given list of datasets. 

1852 n_files = len(datasets) 

1853 n_datasets = 0 

1854 for d in datasets: 

1855 n_datasets += len(d.refs) 

1856 sfiles = "s" if n_files != 1 else "" 

1857 srefs = "s" if n_datasets != 1 else "" 

1858 

1859 # We use `datasets` rather `new_datasets` for the Registry 

1860 # portion of this, to let it confirm that everything matches the 

1861 # existing datasets. 

1862 import_info = self._prepare_ingest_file_datasets(datasets, progress) 

1863 

1864 with ( 

1865 self._metrics.instrument_ingest( 

1866 n_datasets, 

1867 _LOG, 

1868 msg="Ingesting %s file%s with %s dataset%s", 

1869 args=(n_files, sfiles, n_datasets, srefs), 

1870 ), 

1871 self.transaction(), 

1872 ): 

1873 self._ingest_file_datasets(datasets, import_info, progress) 

1874 

1875 # Bulk-insert everything into Datastore. 

1876 # We do not know if any of the registry entries already existed 

1877 # (_importDatasets only complains if they exist but differ). 

1878 # The _partition_datasets_by_known logic above should catch most 

1879 # instances where we attempt to re-ingest files that were already 

1880 # ingested, but a concurrent writer could cause a unique constraint 

1881 # violation here. 

1882 try: 

1883 self._datastore.ingest( 

1884 *new_datasets, transfer=transfer, record_validation_info=record_validation_info 

1885 ) 

1886 except IntegrityError as e: 

1887 raise ConflictingDefinitionError( 

1888 f"Datastore already contains one or more datasets: {e}" 

1889 ) from e 

1890 

1891 def _partition_datasets_by_known( 

1892 self, datasets: Iterable[FileDataset] 

1893 ) -> tuple[list[FileDataset], list[FileDataset]]: 

1894 """Divides the given `FileDataset` objects into two groups: those for 

1895 which the Datastore already has an entry, and those for which it does 

1896 not. 

1897 """ 

1898 new_datasets = [] 

1899 existing_datasets = [] 

1900 

1901 refs = itertools.chain.from_iterable(dataset.refs for dataset in datasets) 

1902 known_refs = self._datastore.knows_these(refs) 

1903 

1904 for dataset in datasets: 

1905 if any(known_refs[ref] for ref in dataset.refs): 

1906 existing_datasets.append(dataset) 

1907 else: 

1908 new_datasets.append(dataset) 

1909 

1910 return new_datasets, existing_datasets 

1911 

1912 @contextlib.contextmanager 

1913 def export( 

1914 self, 

1915 *, 

1916 directory: str | None = None, 

1917 filename: str | None = None, 

1918 format: str | None = None, 

1919 transfer: str | None = None, 

1920 ) -> Iterator[RepoExportContext]: 

1921 # Docstring inherited. 

1922 if directory is None and transfer is not None: 

1923 raise TypeError("Cannot transfer without providing a directory.") 

1924 if transfer == "move": 

1925 raise TypeError("Transfer may not be 'move': export is read-only") 

1926 if format is None: 

1927 if filename is None: 

1928 raise TypeError("At least one of 'filename' or 'format' must be provided.") 

1929 else: 

1930 _, format = os.path.splitext(filename) 

1931 if not format: 

1932 raise ValueError("Please specify a file extension to determine export format.") 

1933 format = format[1:] # Strip leading "."" 

1934 elif filename is None: 

1935 filename = f"export.{format}" 

1936 if directory is not None: 

1937 filename = os.path.join(directory, filename) 

1938 formats = self._config["repo_transfer_formats"] 

1939 if format not in formats: 

1940 raise ValueError(f"Unknown export format {format!r}, allowed: {','.join(formats.keys())}") 

1941 BackendClass = get_class_of(formats[format, "export"]) 

1942 with open(filename, "w") as stream: 

1943 backend = BackendClass(stream, universe=self.dimensions) 

1944 try: 

1945 helper = RepoExportContext(self, backend=backend, directory=directory, transfer=transfer) 

1946 with self._caching_context(): 

1947 yield helper 

1948 except BaseException: 

1949 raise 

1950 else: 

1951 helper._finish() 

1952 

1953 def import_( 

1954 self, 

1955 *, 

1956 directory: ResourcePathExpression | None = None, 

1957 filename: ResourcePathExpression | TextIO | None = None, 

1958 format: str | None = None, 

1959 transfer: str | None = None, 

1960 skip_dimensions: set | None = None, 

1961 record_validation_info: bool = True, 

1962 without_datastore: bool = False, 

1963 ) -> None: 

1964 # Docstring inherited. 

1965 if not self.isWriteable(): 

1966 raise TypeError("Butler is read-only.") 

1967 if filename is None and format is not None: 

1968 filename = ResourcePath(f"export.{format}", forceAbsolute=False) 

1969 if directory is not None: 

1970 directory = ResourcePath(directory, forceDirectory=True) 

1971 # mypy doesn't think this will work but it does in python >= 3.10. 

1972 if isinstance(filename, ResourcePathExpression): # type: ignore 

1973 filename = ResourcePath(filename, forceAbsolute=False) # type: ignore 

1974 if format is None: 

1975 format = filename.getExtension() 

1976 if not filename.isabs() and directory is not None: 

1977 potential = directory.join(filename) 

1978 exists_in_cwd = filename.exists() 

1979 exists_in_dir = potential.exists() 

1980 if exists_in_cwd and exists_in_dir: 

1981 _LOG.warning( 

1982 "A relative path for filename was specified (%s) which exists relative to cwd. " 

1983 "Additionally, the file exists relative to the given search directory (%s). " 

1984 "Using the export file in the given directory.", 

1985 filename, 

1986 potential, 

1987 ) 

1988 # Given they specified an explicit directory and that 

1989 # directory has the export file in it, assume that that 

1990 # is what was meant despite the file in cwd. 

1991 filename = potential 

1992 elif exists_in_dir: 

1993 filename = potential 

1994 elif not exists_in_cwd and not exists_in_dir: 

1995 # Raise early. 

1996 raise FileNotFoundError( 

1997 f"Export file could not be found in {filename.abspath()} or {potential.abspath()}." 

1998 ) 

1999 elif format is None: 

2000 format = ".yaml" 

2001 BackendClass: type[RepoImportBackend] = get_class_of( 

2002 self._config["repo_transfer_formats"][format]["import"] 

2003 ) 

2004 

2005 def doImport(importStream: TextIO | ResourceHandleProtocol) -> None: 

2006 with self._caching_context(): 

2007 backend = BackendClass(importStream, self) # type: ignore[call-arg] 

2008 backend.register() 

2009 with self.transaction(): 

2010 backend.load( 

2011 datastore=self._datastore if not without_datastore else None, 

2012 directory=directory, 

2013 transfer=transfer, 

2014 skip_dimensions=skip_dimensions, 

2015 record_validation_info=record_validation_info, 

2016 ) 

2017 

2018 if isinstance(filename, ResourcePath): 

2019 # We can not use open() here at the moment because of 

2020 # DM-38589 since yaml does stream.read(8192) in a loop. 

2021 stream = io.StringIO(filename.read().decode()) 

2022 doImport(stream) 

2023 else: 

2024 doImport(filename) # type: ignore 

2025 

2026 def transfer_dimension_records_from( 

2027 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate] 

2028 ) -> None: 

2029 # Allowed dimensions in the target butler. 

2030 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table) 

2031 

2032 data_ids = {ref.dataId for ref in source_refs} 

2033 

2034 dimension_records = self._extract_all_dimension_records_from_data_ids( 

2035 source_butler, data_ids, elements 

2036 ) 

2037 

2038 # Insert order is important. 

2039 for element in self.dimensions.sorted(dimension_records.keys()): 

2040 records = [r for r in dimension_records[element].values()] 

2041 # Assume that if the record is already present that we can 

2042 # use it without having to check that the record metadata 

2043 # is consistent. 

2044 self._registry.insertDimensionData(element, *records, skip_existing=True) 

2045 _LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records)) 

2046 

2047 def _extract_all_dimension_records_from_data_ids( 

2048 self, 

2049 source_butler: LimitedButler | Butler, 

2050 data_ids: set[DataCoordinate], 

2051 allowed_elements: frozenset[DimensionElement], 

2052 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: 

2053 primary_records = self._extract_dimension_records_from_data_ids( 

2054 source_butler, data_ids, allowed_elements 

2055 ) 

2056 

2057 additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

2058 for original_element, record_mapping in primary_records.items(): 

2059 # Get dimensions that depend on this dimension. 

2060 populated_by = self.dimensions.get_elements_populated_by( 

2061 self.dimensions[original_element.name] # type: ignore 

2062 ) 

2063 if populated_by: 

2064 for element in populated_by: 

2065 if element not in allowed_elements: 

2066 continue 

2067 if element.name == original_element.name: 

2068 continue 

2069 

2070 if element.name in primary_records: 

2071 # If this element has already been stored avoid 

2072 # re-finding records since that may lead to additional 

2073 # spurious records. e.g. visit is populated_by 

2074 # visit_detector_region but querying 

2075 # visit_detector_region by visit will return all the 

2076 # detectors for this visit -- the visit dataId does not 

2077 # constrain this. 

2078 # To constrain the query the original dataIds would 

2079 # have to be scanned. 

2080 continue 

2081 

2082 if record_mapping: 

2083 if not isinstance(source_butler, Butler): 

2084 raise RuntimeError( 

2085 f"Transferring populated_by records like {element.name}" 

2086 " requires a full Butler." 

2087 ) 

2088 

2089 with source_butler.query() as query: 

2090 records = query.join_data_coordinates(record_mapping.keys()).dimension_records( 

2091 element.name 

2092 ) 

2093 for record in records: 

2094 additional_records[record.definition].setdefault(record.dataId, record) 

2095 

2096 # The next step is to walk back through the additional records to 

2097 # pick up any missing content (such as visit_definition needing to 

2098 # know the exposure). Want to ensure we do not request records we 

2099 # already have. 

2100 missing_data_ids = set() 

2101 for record_mapping in additional_records.values(): 

2102 for data_id in record_mapping.keys(): 

2103 for dimension in data_id.dimensions.required: 

2104 element = source_butler.dimensions[dimension] 

2105 dimension_key = data_id.subset(dimension) 

2106 if dimension_key not in primary_records[element]: 

2107 missing_data_ids.add(dimension_key) 

2108 

2109 # Fill out the new records. Assume that these new records do not 

2110 # also need to carry over additional populated_by records. 

2111 secondary_records = self._extract_dimension_records_from_data_ids( 

2112 source_butler, missing_data_ids, allowed_elements 

2113 ) 

2114 

2115 # Merge the extra sets of records in with the original. 

2116 for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()): 

2117 primary_records[name].update(record_mapping) 

2118 

2119 return primary_records 

2120 

2121 def _extract_dimension_records_from_data_ids( 

2122 self, 

2123 source_butler: LimitedButler | Butler, 

2124 data_ids: Iterable[DataCoordinate], 

2125 allowed_elements: frozenset[DimensionElement], 

2126 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: 

2127 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

2128 

2129 data_ids = set(data_ids) 

2130 if not all(data_id.hasRecords() for data_id in data_ids): 

2131 if isinstance(source_butler, Butler): 

2132 data_ids = source_butler._expand_data_ids(data_ids) 

2133 else: 

2134 raise TypeError("Input butler needs to be a full butler to expand DataId.") 

2135 

2136 for data_id in data_ids: 

2137 # If this butler doesn't know about a dimension in the source 

2138 # butler things will break later. 

2139 for element_name in data_id.dimensions.elements: 

2140 record = data_id.records[element_name] 

2141 if record is not None and record.definition in allowed_elements: 

2142 dimension_records[record.definition].setdefault(record.dataId, record) 

2143 

2144 return dimension_records 

2145 

2146 def _cast_universe_for_import_refs( 

2147 self, source_refs: Iterable[DatasetRef] 

2148 ) -> Mapping[DatasetType, list[DatasetRef]]: 

2149 """Try to cast imported refs to the target universe if possible. 

2150 

2151 Parameters 

2152 ---------- 

2153 source_refs 

2154 The refs to be imported. 

2155 

2156 Returns 

2157 ------- 

2158 refs 

2159 The refs to be imported, grouped by dataset type, with the dataset 

2160 types cast to the target universe. 

2161 

2162 Raises 

2163 ------ 

2164 InconsistentUniverseError 

2165 Raised if any reference cannot be converted to target universe. 

2166 

2167 Notes 

2168 ----- 

2169 Potentially this method can perform a non-trivial migrations of the 

2170 datasets by modifying dimensions and dataIds. Presently though it can 

2171 only perform a trivial validation of the dataset types compatibility. 

2172 Returned mapping will contain dataset types in the new universe, but 

2173 returned references will still have the original dataset types as 

2174 there is presently no easy way to replace dataset type in a reference. 

2175 """ 

2176 # In theory input refs could come from multiple universes, but in 

2177 # practice this will not happen, so just group everything by dataset 

2178 # type. 

2179 refs_by_source_type: defaultdict[DatasetType, list[DatasetRef]] = defaultdict(list) 

2180 for ref in source_refs: 

2181 refs_by_source_type[ref.datasetType].append(ref) 

2182 

2183 refs_by_type: defaultdict[DatasetType, list[DatasetRef]] = defaultdict(list) 

2184 for source_type, refs in refs_by_source_type.items(): 

2185 source_universe = source_type.dimensions.universe 

2186 if source_universe is self.dimensions: 

2187 target_type = source_type 

2188 else: 

2189 if source_universe.namespace != self.dimensions.namespace: 

2190 raise InconsistentUniverseError( 

2191 f"Source refs have universe {source_universe} with different namespace " 

2192 f"than target universe {self.dimensions}." 

2193 ) 

2194 

2195 # Try to handle case of different universe versions. For now 

2196 # we can only do trivial check that dimension groups are 

2197 # identical. 

2198 try: 

2199 target_dimensions = self.dimensions.conform(source_type.dimensions.names) 

2200 except Exception as exc: 

2201 raise InconsistentUniverseError( 

2202 f"Source dimensions {source_type.dimensions} are not compatible with " 

2203 f"target universe dimensions {self.dimensions}." 

2204 ) from exc 

2205 if target_dimensions != source_type.dimensions: 

2206 raise InconsistentUniverseError( 

2207 f"Source dimensions {source_type.dimensions} are different from a conforming " 

2208 f"set of target universe dimensions {target_dimensions}." 

2209 ) 

2210 

2211 # Rebuild dataset type in new universe. 

2212 target_type = DatasetType( 

2213 name=source_type.name, 

2214 dimensions=target_dimensions, 

2215 storageClass=source_type.storageClass, 

2216 parentStorageClass=source_type.parentStorageClass, 

2217 universe=self.dimensions, 

2218 isCalibration=source_type.isCalibration(), 

2219 ) 

2220 refs_by_type[target_type] = refs 

2221 

2222 return refs_by_type 

2223 

2224 def _prepare_for_import_refs( 

2225 self, 

2226 source_butler: LimitedButler, 

2227 source_refs: Iterable[DatasetRef], 

2228 *, 

2229 register_dataset_types: bool = False, 

2230 transfer_dimensions: bool = False, 

2231 dry_run: bool = False, 

2232 ) -> _ImportDatasetsInfo: 

2233 # Docstring inherited. 

2234 if not self.isWriteable() and not dry_run: 

2235 raise TypeError("Butler is read-only.") 

2236 

2237 # Will iterate through the refs multiple times so need to convert 

2238 # to a list if this isn't a collection. 

2239 if not isinstance(source_refs, collections.abc.Collection): 

2240 source_refs = list(source_refs) 

2241 

2242 original_count = len(source_refs) 

2243 log_level = _LOG.INFO if original_count > 1 else _LOG.VERBOSE 

2244 _LOG.log( 

2245 log_level, 

2246 "Importing %d dataset%s into %s", 

2247 original_count, 

2248 "s" if original_count != 1 else "", 

2249 str(self), 

2250 ) 

2251 

2252 refs_by_type = self._cast_universe_for_import_refs(source_refs) 

2253 

2254 # Importing requires that we group the refs by dimension group and run 

2255 # before doing the import. 

2256 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]] = defaultdict(list) 

2257 for ref in source_refs: 

2258 grouped_refs[_RefGroup(ref.datasetType.dimensions, ref.run)].append(ref) 

2259 

2260 # Check to see if the dataset type in the source butler has 

2261 # the same definition in the target butler and register missing 

2262 # ones if requested. Registration must happen outside a transaction. 

2263 newly_registered_dataset_types = set() 

2264 for datasetType in refs_by_type: 

2265 if register_dataset_types: 

2266 # Let this raise immediately if inconsistent. Continuing 

2267 # on to find additional inconsistent dataset types 

2268 # might result in additional unwanted dataset types being 

2269 # registered. 

2270 try: 

2271 if not dry_run and self._registry.registerDatasetType(datasetType): 

2272 newly_registered_dataset_types.add(datasetType) 

2273 except ConflictingDefinitionError as e: 

2274 # Be safe and require that conversions be bidirectional 

2275 # when there are storage class mismatches. This is because 

2276 # get() will have to support conversion from source to 

2277 # target python type (the source formatter will be 

2278 # returning source python type) but there also is an 

2279 # expectation that people will want to be able to get() in 

2280 # the target using the source python type, which will not 

2281 # require conversion for transferred datasets but might 

2282 # for target-native types. Additionally, butler.get does 

2283 # not know that the formatter will return the wrong 

2284 # python type and so will always check that the conversion 

2285 # works even though it won't need it. 

2286 target_dataset_type = self.get_dataset_type(datasetType.name) 

2287 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType) 

2288 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type) 

2289 if not (target_compatible_with_source and source_compatible_with_target): 

2290 if target_compatible_with_source: 

2291 e.add_note( 

2292 "Target dataset type storage class is compatible with source " 

2293 "but the reverse is not true." 

2294 ) 

2295 elif source_compatible_with_target: 

2296 e.add_note( 

2297 "Source dataset type storage class is compatible with target " 

2298 "but the reverse is not true." 

2299 ) 

2300 else: 

2301 e.add_note("If storage classes differ, please register converters.") 

2302 raise 

2303 else: 

2304 # If the dataset type is missing, let it fail immediately. 

2305 target_dataset_type = self.get_dataset_type(datasetType.name) 

2306 if target_dataset_type != datasetType: 

2307 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType) 

2308 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type) 

2309 # Both conversion directions are currently required. 

2310 if not (target_compatible_with_source and source_compatible_with_target): 

2311 msg = "" 

2312 if target_compatible_with_source: 

2313 msg = ( 

2314 "Target storage class is compatible with the source storage class " 

2315 "but the reverse is not true." 

2316 ) 

2317 elif source_compatible_with_target: 

2318 msg = ( 

2319 "Source storage class is compatible with the target storage class" 

2320 " but the reverse is not true." 

2321 ) 

2322 else: 

2323 msg = "If storage classes differ register converters." 

2324 if msg: 

2325 msg = f"({msg})" 

2326 raise ConflictingDefinitionError( 

2327 "Source butler dataset type differs from definition" 

2328 f" in target butler: {datasetType} !=" 

2329 f" {target_dataset_type} {msg}" 

2330 ) 

2331 if newly_registered_dataset_types: 

2332 # We may have registered some even if there were inconsistencies 

2333 # but should let people know (or else remove them again). 

2334 _LOG.verbose( 

2335 "Registered the following dataset types in the target Butler: %s", 

2336 ", ".join(d.name for d in newly_registered_dataset_types), 

2337 ) 

2338 else: 

2339 _LOG.verbose("All required dataset types are known to the target Butler") 

2340 

2341 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

2342 if transfer_dimensions: 

2343 # Collect all the dimension records for these refs. 

2344 # All dimensions are to be copied but the list of valid dimensions 

2345 # come from this butler's universe. 

2346 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table) 

2347 dataIds = {ref.dataId for ref in source_refs} 

2348 dimension_records = self._extract_all_dimension_records_from_data_ids( 

2349 source_butler, dataIds, elements 

2350 ) 

2351 return _ImportDatasetsInfo(grouped_refs, dimension_records) 

2352 

2353 def _import_dimension_records( 

2354 self, 

2355 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]], 

2356 *, 

2357 dry_run: bool, 

2358 ) -> None: 

2359 """Import dimension records collected during import pre-process.""" 

2360 if dimension_records and not dry_run: 

2361 _LOG.verbose("Ensuring that dimension records exist for transferred datasets.") 

2362 # Order matters. 

2363 for element in self.dimensions.sorted(dimension_records.keys()): 

2364 records = list(dimension_records[element].values()) 

2365 # Assume that if the record is already present that we can 

2366 # use it without having to check that the record metadata 

2367 # is consistent. 

2368 self._registry.insertDimensionData(element, *records, skip_existing=True) 

2369 

2370 def _import_grouped_refs( 

2371 self, 

2372 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]], 

2373 source_butler: LimitedButler | None, 

2374 progress: Progress, 

2375 *, 

2376 dry_run: bool = False, 

2377 expand_refs: bool = False, 

2378 ) -> list[DatasetRef]: 

2379 handled_collections: set[str] = set() 

2380 n_to_import = 0 

2381 all_imported_refs: list[DatasetRef] = [] 

2382 # Sort by run collection name to ensure Postgres takes locks in the 

2383 # same order between different processes, to mitigate an issue 

2384 # where Postgres can deadlock due to the unique index on collection 

2385 # name. (See DM-47543). 

2386 groups = sorted(grouped_refs.items(), key=lambda item: item[0].run) 

2387 for (dimension_group, run), refs_to_import in progress.iter_item_chunks( 

2388 groups, desc="Importing to registry by run and dataset type" 

2389 ): 

2390 if run not in handled_collections: 

2391 # May need to create output collection. If source butler 

2392 # has a registry, ask for documentation string. 

2393 run_doc = None 

2394 if source_butler is not None and (registry := getattr(source_butler, "registry", None)): 

2395 run_doc = registry.getCollectionDocumentation(run) 

2396 if not dry_run: 

2397 registered = self.collections.register(run, doc=run_doc) 

2398 else: 

2399 registered = True 

2400 handled_collections.add(run) 

2401 if registered: 

2402 _LOG.verbose("Creating output run %s", run) 

2403 

2404 n_refs = len(refs_to_import) 

2405 n_to_import += n_refs 

2406 _LOG.verbose( 

2407 "Importing %d ref%s with dimensions %s into run %s", 

2408 n_refs, 

2409 "" if n_refs == 1 else "s", 

2410 dimension_group.names, 

2411 run, 

2412 ) 

2413 

2414 # Assume we are using UUIDs and the source refs will match 

2415 # those imported. 

2416 if not dry_run: 

2417 imported_refs = self._registry._importDatasets(refs_to_import, expand=expand_refs) 

2418 else: 

2419 imported_refs = refs_to_import 

2420 

2421 all_imported_refs.extend(imported_refs) 

2422 

2423 assert n_to_import == len(all_imported_refs) 

2424 _LOG.verbose("Imported %d datasets into destination butler", n_to_import) 

2425 return all_imported_refs 

2426 

2427 def transfer_from( 

2428 self, 

2429 source_butler: LimitedButler, 

2430 source_refs: Iterable[DatasetRef], 

2431 transfer: str = "auto", 

2432 skip_missing: bool = True, 

2433 register_dataset_types: bool = False, 

2434 transfer_dimensions: bool = False, 

2435 dry_run: bool = False, 

2436 ) -> collections.abc.Collection[DatasetRef]: 

2437 # Docstring inherited. 

2438 source_refs = list(source_refs) 

2439 if not self.isWriteable() and not dry_run: 

2440 raise TypeError("Butler is read-only.") 

2441 

2442 progress = Progress("lsst.daf.butler.Butler.transfer_from", level=VERBOSE) 

2443 

2444 artifact_existence: dict[ResourcePath, bool] = {} 

2445 file_transfer_source = source_butler._file_transfer_source 

2446 transfer_records = retrieve_file_transfer_records( 

2447 file_transfer_source, source_refs, artifact_existence 

2448 ) 

2449 # In some situations the datastore artifact may be missing and we do 

2450 # not want that registry entry to be imported. For example, this can 

2451 # happen if a file was removed but the dataset was left in the registry 

2452 # for provenance, or if a pipeline task didn't create all of the 

2453 # possible files in a QuantumBackedButler. 

2454 if skip_missing: 

2455 original_ids = {ref.id for ref in source_refs} 

2456 missing_ids = original_ids - transfer_records.keys() 

2457 if missing_ids: 

2458 original_count = len(source_refs) 

2459 source_refs = [ref for ref in source_refs if ref.id not in missing_ids] 

2460 filtered_count = len(source_refs) 

2461 n_missing = original_count - filtered_count 

2462 _LOG.verbose( 

2463 "%d dataset%s removed because the artifact does not exist. Now have %d.", 

2464 n_missing, 

2465 "" if n_missing == 1 else "s", 

2466 filtered_count, 

2467 ) 

2468 

2469 import_info = self._prepare_for_import_refs( 

2470 source_butler, 

2471 source_refs, 

2472 register_dataset_types=register_dataset_types, 

2473 dry_run=dry_run, 

2474 transfer_dimensions=transfer_dimensions, 

2475 ) 

2476 

2477 # Do all the importing in a single transaction. 

2478 with self.transaction(): 

2479 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run) 

2480 imported_refs = self._import_grouped_refs( 

2481 import_info.grouped_refs, source_butler, progress, dry_run=dry_run 

2482 ) 

2483 

2484 # Ask the datastore to transfer. The datastore has to check that 

2485 # the source datastore is compatible with the target datastore. 

2486 _LOG.verbose("Transferring %d datasets from %s", len(transfer_records), file_transfer_source.name) 

2487 accepted, rejected = self._datastore.transfer_from( 

2488 transfer_records, 

2489 imported_refs, 

2490 transfer=transfer, 

2491 artifact_existence=artifact_existence, 

2492 dry_run=dry_run, 

2493 ) 

2494 if rejected: 

2495 # For now, accept the registry entries but not the files. 

2496 _LOG.warning( 

2497 "%d datasets were rejected and %d accepted for transfer.", 

2498 len(rejected), 

2499 len(accepted), 

2500 ) 

2501 

2502 return imported_refs 

2503 

2504 def validateConfiguration( 

2505 self, 

2506 logFailures: bool = False, 

2507 datasetTypeNames: Iterable[str] | None = None, 

2508 ignore: Iterable[str] | None = None, 

2509 ) -> None: 

2510 # Docstring inherited. 

2511 if datasetTypeNames: 

2512 datasetTypes = [self.get_dataset_type(name) for name in datasetTypeNames] 

2513 else: 

2514 datasetTypes = list(self._registry.queryDatasetTypes()) 

2515 

2516 # filter out anything from the ignore list 

2517 if ignore: 

2518 ignore = set(ignore) 

2519 datasetTypes = [ 

2520 e for e in datasetTypes if e.name not in ignore and e.nameAndComponent()[0] not in ignore 

2521 ] 

2522 else: 

2523 ignore = set() 

2524 

2525 # For each datasetType that has an instrument dimension, create 

2526 # a DatasetRef for each defined instrument 

2527 datasetRefs = [] 

2528 

2529 # Find all the registered instruments (if "instrument" is in the 

2530 # universe). 

2531 instruments: set[str] = set() 

2532 if "instrument" in self.dimensions: 

2533 instruments = {rec.name for rec in self.query_dimension_records("instrument", explain=False)} 

2534 

2535 for datasetType in datasetTypes: 

2536 if "instrument" in datasetType.dimensions: 

2537 # In order to create a conforming dataset ref, create 

2538 # fake DataCoordinate values for the non-instrument 

2539 # dimensions. The type of the value does not matter here. 

2540 dataId = {dim: 1 for dim in datasetType.dimensions.names if dim != "instrument"} 

2541 

2542 for instrument in instruments: 

2543 datasetRef = DatasetRef( 

2544 datasetType, 

2545 DataCoordinate.standardize( 

2546 dataId, instrument=instrument, dimensions=datasetType.dimensions 

2547 ), 

2548 run="validate", 

2549 ) 

2550 datasetRefs.append(datasetRef) 

2551 

2552 entities: list[DatasetType | DatasetRef] = [] 

2553 entities.extend(datasetTypes) 

2554 entities.extend(datasetRefs) 

2555 

2556 datastoreErrorStr = None 

2557 try: 

2558 self._datastore.validateConfiguration(entities, logFailures=logFailures) 

2559 except ValidationError as e: 

2560 datastoreErrorStr = str(e) 

2561 

2562 # Also check that the LookupKeys used by the datastores match 

2563 # registry and storage class definitions 

2564 keys = self._datastore.getLookupKeys() 

2565 

2566 failedNames = set() 

2567 failedDataId = set() 

2568 for key in keys: 

2569 if key.name is not None: 

2570 if key.name in ignore: 

2571 continue 

2572 

2573 # skip if specific datasetType names were requested and this 

2574 # name does not match 

2575 if datasetTypeNames and key.name not in datasetTypeNames: 

2576 continue 

2577 

2578 # See if it is a StorageClass or a DatasetType 

2579 if key.name in self.storageClasses: 

2580 pass 

2581 else: 

2582 try: 

2583 self.get_dataset_type(key.name) 

2584 except KeyError: 

2585 if logFailures: 

2586 _LOG.critical( 

2587 "Key '%s' does not correspond to a DatasetType or StorageClass", key 

2588 ) 

2589 failedNames.add(key) 

2590 else: 

2591 # Dimensions are checked for consistency when the Butler 

2592 # is created and rendezvoused with a universe. 

2593 pass 

2594 

2595 # Check that the instrument is a valid instrument 

2596 # Currently only support instrument so check for that 

2597 if key.dataId: 

2598 dataIdKeys = set(key.dataId) 

2599 if {"instrument"} != dataIdKeys: 

2600 if logFailures: 

2601 _LOG.critical("Key '%s' has unsupported DataId override", key) 

2602 failedDataId.add(key) 

2603 elif key.dataId["instrument"] not in instruments: 

2604 if logFailures: 

2605 _LOG.critical("Key '%s' has unknown instrument", key) 

2606 failedDataId.add(key) 

2607 

2608 messages = [] 

2609 

2610 if datastoreErrorStr: 

2611 messages.append(datastoreErrorStr) 

2612 

2613 for failed, msg in ( 

2614 (failedNames, "Keys without corresponding DatasetType or StorageClass entry: "), 

2615 (failedDataId, "Keys with bad DataId entries: "), 

2616 ): 

2617 if failed: 

2618 msg += ", ".join(str(k) for k in failed) 

2619 messages.append(msg) 

2620 

2621 if messages: 

2622 raise ValidationError(";\n".join(messages)) 

2623 

2624 @property 

2625 @deprecated( 

2626 "Please use 'collections' instead. collection_chains will be removed after v28.", 

2627 version="v28", 

2628 category=FutureWarning, 

2629 ) 

2630 def collection_chains(self) -> DirectButlerCollections: 

2631 """Object with methods for modifying collection chains.""" 

2632 return DirectButlerCollections(self._registry) 

2633 

2634 @property 

2635 def collections(self) -> DirectButlerCollections: 

2636 """Object with methods for modifying and inspecting collections.""" 

2637 return DirectButlerCollections(self._registry) 

2638 

2639 @property 

2640 def run(self) -> str | None: 

2641 """Name of the run this butler writes outputs to by default (`str` or 

2642 `None`). 

2643 

2644 This is an alias for ``self.registry.defaults.run``. It cannot be set 

2645 directly in isolation, but all defaults may be changed together by 

2646 assigning a new `RegistryDefaults` instance to 

2647 ``self.registry.defaults``. 

2648 """ 

2649 return self._registry.defaults.run 

2650 

2651 @property 

2652 def registry(self) -> Registry: 

2653 """The object that manages dataset metadata and relationships 

2654 (`Registry`). 

2655 

2656 Many operations that don't involve reading or writing butler datasets 

2657 are accessible only via `Registry` methods. Eventually these methods 

2658 will be replaced by equivalent `Butler` methods. 

2659 """ 

2660 return RegistryShim(self) 

2661 

2662 @property 

2663 def dimensions(self) -> DimensionUniverse: 

2664 # Docstring inherited. 

2665 return self._registry.dimensions 

2666 

2667 def query(self) -> contextlib.AbstractContextManager[Query]: 

2668 # Docstring inherited. 

2669 return self._registry._query() 

2670 

2671 def _query_driver( 

2672 self, 

2673 default_collections: Iterable[str], 

2674 default_data_id: DataCoordinate, 

2675 ) -> contextlib.AbstractContextManager[DirectQueryDriver]: 

2676 """Set up a QueryDriver instance for use with this Butler. Although 

2677 this is marked as a private method, it is also used by Butler server. 

2678 """ 

2679 return self._registry._query_driver(default_collections, default_data_id) 

2680 

2681 @contextlib.contextmanager 

2682 def _query_all_datasets_by_page( 

2683 self, args: QueryAllDatasetsParameters 

2684 ) -> Iterator[Iterator[list[DatasetRef]]]: 

2685 with self.query() as query: 

2686 pages = query_all_datasets(self, query, args) 

2687 yield iter(page.data for page in pages) 

2688 

2689 def _preload_cache(self, *, load_dimension_record_cache: bool = True) -> None: 

2690 """Immediately load caches that are used for common operations.""" 

2691 self._registry.preload_cache(load_dimension_record_cache=load_dimension_record_cache) 

2692 

2693 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]: 

2694 return self._registry.expand_data_ids(data_ids) 

2695 

2696 _config: ButlerConfig 

2697 """Configuration for this Butler instance.""" 

2698 

2699 _registry: SqlRegistry 

2700 """The object that manages dataset metadata and relationships 

2701 (`SqlRegistry`). 

2702 

2703 Most operations that don't involve reading or writing butler datasets are 

2704 accessible only via `SqlRegistry` methods. 

2705 """ 

2706 

2707 storageClasses: StorageClassFactory 

2708 """An object that maps known storage class names to objects that fully 

2709 describe them (`StorageClassFactory`). 

2710 """ 

2711 

2712 _closed: bool 

2713 """`True` if close() has already been called on this instance; `False` 

2714 otherwise. 

2715 """ 

2716 

2717 

2718class _RefGroup(NamedTuple): 

2719 """Key identifying a batch of DatasetRefs to be inserted in 

2720 `Butler.transfer_from`. 

2721 """ 

2722 

2723 dimensions: DimensionGroup 

2724 run: str 

2725 

2726 

2727class _ImportDatasetsInfo(NamedTuple): 

2728 """Information extracted from datasets to be imported.""" 

2729 

2730 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]] 

2731 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] 

2732 

2733 

2734def _to_uuid(id: DatasetId | str) -> uuid.UUID: 

2735 if isinstance(id, uuid.UUID): 

2736 return id 

2737 else: 

2738 return uuid.UUID(id) 

2739 

2740 

2741class _ButlerClosed: 

2742 def __getattr__(self, name: str) -> Any: 

2743 raise RuntimeError("Attempted to use a Butler instance which has been closed.") 

2744 

2745 

2746_BUTLER_CLOSED_INSTANCE: Any = _ButlerClosed() 

2747 

2748 

2749def _retrieve_dataset_type(registry: SqlRegistry, name: str) -> DatasetType | None: 

2750 """Return DatasetType defined in registry given dataset type name.""" 

2751 try: 

2752 return registry.getDatasetType(name) 

2753 except MissingDatasetTypeError: 

2754 return None