Coverage for python / lsst / daf / butler / direct_butler / _direct_butler.py: 10%
975 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 08:22 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 08:22 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Butler top level classes."""
30from __future__ import annotations
32__all__ = (
33 "ButlerValidationError",
34 "DirectButler",
35)
37import collections.abc
38import contextlib
39import io
40import itertools
41import math
42import numbers
43import os
44import uuid
45import warnings
46from collections import Counter, defaultdict
47from collections.abc import Collection, Iterable, Iterator, Mapping, MutableMapping, Sequence
48from functools import partial
49from types import EllipsisType
50from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, TextIO, cast
52from deprecated.sphinx import deprecated
53from sqlalchemy.exc import IntegrityError
55from lsst.resources import ResourcePath, ResourcePathExpression
56from lsst.utils.introspection import find_outside_stacklevel, get_class_of
57from lsst.utils.iteration import chunk_iterable
58from lsst.utils.logging import VERBOSE, getLogger
59from lsst.utils.timer import time_this
61from .._butler import Butler, _DeprecatedDefault
62from .._butler_config import ButlerConfig
63from .._butler_instance_options import ButlerInstanceOptions
64from .._butler_metrics import ButlerMetrics
65from .._collection_type import CollectionType
66from .._dataset_existence import DatasetExistence
67from .._dataset_ref import DatasetRef
68from .._dataset_type import DatasetType
69from .._deferredDatasetHandle import DeferredDatasetHandle
70from .._exceptions import (
71 DatasetNotFoundError,
72 DimensionValueError,
73 EmptyQueryResultError,
74 InconsistentUniverseError,
75 ValidationError,
76)
77from .._file_dataset import FileDataset
78from .._limited_butler import LimitedButler
79from .._query_all_datasets import QueryAllDatasetsParameters, query_all_datasets
80from .._registry_shim import RegistryShim
81from .._storage_class import StorageClass, StorageClassFactory
82from .._timespan import Timespan
83from ..datastore import Datastore, NullDatastore
84from ..datastores.file_datastore.retrieve_artifacts import ZipIndex, retrieve_and_zip
85from ..datastores.file_datastore.transfer import retrieve_file_transfer_records
86from ..dimensions import DataCoordinate, Dimension, DimensionGroup
87from ..direct_query_driver import DirectQueryDriver
88from ..progress import Progress
89from ..queries import Query
90from ..registry import (
91 ConflictingDefinitionError,
92 DataIdError,
93 MissingDatasetTypeError,
94 RegistryDefaults,
95 _RegistryFactory,
96)
97from ..registry.sql_registry import SqlRegistry
98from ..transfers import RepoExportContext
99from ..utils import transactional
100from ._direct_butler_collections import DirectButlerCollections
102if TYPE_CHECKING:
103 from lsst.resources import ResourceHandleProtocol
105 from .._dataset_provenance import DatasetProvenance
106 from .._dataset_ref import DatasetId
107 from ..datastore import DatasetRefURIs
108 from ..dimensions import DataId, DataIdValue, DimensionElement, DimensionRecord, DimensionUniverse
109 from ..registry import CollectionArgType, Registry
110 from ..transfers import RepoImportBackend
112_LOG = getLogger(__name__)
115class ButlerValidationError(ValidationError):
116 """There is a problem with the Butler configuration."""
118 pass
121class DirectButler(Butler): # numpydoc ignore=PR02
122 """Main entry point for the data access system.
124 Parameters
125 ----------
126 config : `ButlerConfig`
127 The configuration for this Butler instance.
128 registry : `SqlRegistry`
129 The object that manages dataset metadata and relationships.
130 datastore : Datastore
131 The object that manages actual dataset storage.
132 storageClasses : StorageClassFactory
133 An object that maps known storage class names to objects that fully
134 describe them.
136 Notes
137 -----
138 Most users should call the top-level `Butler`.``from_config`` instead of
139 using this constructor directly.
140 """
142 # This is __new__ instead of __init__ because we have to support
143 # instantiation via the legacy constructor Butler.__new__(), which
144 # reads the configuration and selects which subclass to instantiate. The
145 # interaction between __new__ and __init__ is kind of wacky in Python. If
146 # we were using __init__ here, __init__ would be called twice (once when
147 # the DirectButler instance is constructed inside Butler.from_config(), and
148 # a second time with the original arguments to Butler() when the instance
149 # is returned from Butler.__new__()
150 def __new__(
151 cls,
152 *,
153 config: ButlerConfig,
154 registry: SqlRegistry,
155 datastore: Datastore,
156 storageClasses: StorageClassFactory,
157 metrics: ButlerMetrics | None = None,
158 ) -> DirectButler:
159 self = cast(DirectButler, super().__new__(cls))
160 self._config = config
161 self._registry = registry
162 self._datastore = datastore
163 self.storageClasses = storageClasses
164 self._metrics = metrics if metrics is not None else ButlerMetrics()
166 # For execution butler the datastore needs a special
167 # dependency-inversion trick. This is not used by regular butler,
168 # but we do not have a way to distinguish regular butler from execution
169 # butler.
170 self._datastore.set_retrieve_dataset_type_method(partial(_retrieve_dataset_type, registry))
172 self._closed = False
174 return self
176 @classmethod
177 def create_from_config(
178 cls,
179 config: ButlerConfig,
180 *,
181 options: ButlerInstanceOptions,
182 without_datastore: bool = False,
183 ) -> DirectButler:
184 """Construct a Butler instance from a configuration file.
186 Parameters
187 ----------
188 config : `ButlerConfig`
189 The configuration for this Butler instance.
190 options : `ButlerInstanceOptions`
191 Default values and other settings for the Butler instance.
192 without_datastore : `bool`, optional
193 If `True` do not attach a datastore to this butler. Any attempts
194 to use a datastore will fail.
196 Notes
197 -----
198 Most users should call the top-level `Butler`.``from_config``
199 instead of using this function directly.
200 """
201 if "run" in config or "collection" in config:
202 raise ValueError("Passing a run or collection via configuration is no longer supported.")
204 defaults = RegistryDefaults.from_butler_instance_options(options)
205 try:
206 butlerRoot = config.get("root", config.configDir)
207 writeable = options.writeable
208 if writeable is None:
209 writeable = options.run is not None
210 registry = _RegistryFactory(config).from_config(
211 butlerRoot=butlerRoot, writeable=writeable, defaults=defaults
212 )
213 if without_datastore:
214 datastore: Datastore = NullDatastore(None, None)
215 else:
216 datastore = Datastore.fromConfig(
217 config, registry.getDatastoreBridgeManager(), butlerRoot=butlerRoot
218 )
219 # TODO: Once datastore drops dependency on registry we can
220 # construct datastore first and pass opaque tables to registry
221 # constructor.
222 registry.make_datastore_tables(datastore.get_opaque_table_definitions())
223 storageClasses = StorageClassFactory()
224 storageClasses.addFromConfig(config)
226 return DirectButler(
227 config=config,
228 registry=registry,
229 datastore=datastore,
230 storageClasses=storageClasses,
231 metrics=options.metrics,
232 )
233 except Exception:
234 # Failures here usually mean that configuration is incomplete,
235 # just issue an error message which includes config file URI.
236 _LOG.error(f"Failed to instantiate Butler from config {config.configFile}.")
237 raise
239 def clone(
240 self,
241 *,
242 collections: CollectionArgType | None | EllipsisType = ...,
243 run: str | None | EllipsisType = ...,
244 inferDefaults: bool | EllipsisType = ...,
245 dataId: dict[str, str] | EllipsisType = ...,
246 metrics: ButlerMetrics | None = None,
247 ) -> DirectButler:
248 # Docstring inherited
249 defaults = self._registry.defaults.clone(collections, run, inferDefaults, dataId)
250 registry = self._registry.copy(defaults)
252 return DirectButler(
253 registry=registry,
254 config=self._config,
255 datastore=self._datastore.clone(registry.getDatastoreBridgeManager()),
256 storageClasses=self.storageClasses,
257 metrics=metrics,
258 )
260 def close(self) -> None:
261 if not self._closed:
262 self._closed = True
263 self._registry.close()
264 # Cause exceptions to be raised if a user attempts to use the
265 # instance after closing it. Without this, Butler would still
266 # work after being closed because of implementation details
267 # of SqlAlchemy, but this may not continue to be the case in the
268 # future and we don't want users to get in the habit of doing this.
269 self._registry = _BUTLER_CLOSED_INSTANCE
270 self._datastore = _BUTLER_CLOSED_INSTANCE
272 GENERATION: ClassVar[int] = 3
273 """This is a Generation 3 Butler.
275 This attribute may be removed in the future, once the Generation 2 Butler
276 interface has been fully retired; it should only be used in transitional
277 code.
278 """
280 @classmethod
281 def _unpickle(
282 cls,
283 config: ButlerConfig,
284 collections: tuple[str, ...] | None,
285 run: str | None,
286 defaultDataId: dict[str, str],
287 writeable: bool,
288 ) -> DirectButler:
289 """Callable used to unpickle a Butler.
291 We prefer not to use ``Butler.__init__`` directly so we can force some
292 of its many arguments to be keyword-only (note that ``__reduce__``
293 can only invoke callables with positional arguments).
295 Parameters
296 ----------
297 config : `ButlerConfig`
298 Butler configuration, already coerced into a true `ButlerConfig`
299 instance (and hence after any search paths for overrides have been
300 utilized).
301 collections : `tuple` [ `str` ]
302 Names of the default collections to read from.
303 run : `str`, optional
304 Name of the default `~CollectionType.RUN` collection to write to.
305 defaultDataId : `dict` [ `str`, `str` ]
306 Default data ID values.
307 writeable : `bool`
308 Whether the Butler should support write operations.
310 Returns
311 -------
312 butler : `Butler`
313 A new `Butler` instance.
314 """
315 return cls.create_from_config(
316 config=config,
317 options=ButlerInstanceOptions(
318 collections=collections, run=run, writeable=writeable, kwargs=defaultDataId
319 ),
320 )
322 def __reduce__(self) -> tuple:
323 """Support pickling."""
324 return (
325 DirectButler._unpickle,
326 (
327 self._config,
328 self.collections.defaults,
329 self.run,
330 dict(self._registry.defaults.dataId.required),
331 self._registry.isWriteable(),
332 ),
333 )
335 def __str__(self) -> str:
336 return (
337 f"Butler(collections={self.collections}, run={self.run}, "
338 f"datastore='{self._datastore}', registry='{self._registry}')"
339 )
341 def isWriteable(self) -> bool:
342 # Docstring inherited.
343 return self._registry.isWriteable()
345 def _caching_context(self) -> contextlib.AbstractContextManager[None]:
346 """Context manager that enables caching."""
347 return self._registry.caching_context()
349 @contextlib.contextmanager
350 def transaction(self) -> Iterator[None]:
351 """Context manager supporting `Butler` transactions.
353 Transactions can be nested.
354 """
355 with self._registry.transaction(), self._datastore.transaction():
356 yield
358 def _standardizeArgs(
359 self,
360 datasetRefOrType: DatasetRef | DatasetType | str,
361 dataId: DataId | None = None,
362 for_put: bool = True,
363 **kwargs: Any,
364 ) -> tuple[DatasetType, DataId | None]:
365 """Standardize the arguments passed to several Butler APIs.
367 Parameters
368 ----------
369 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
370 When `DatasetRef` the `dataId` should be `None`.
371 Otherwise the `DatasetType` or name thereof.
372 dataId : `dict` or `DataCoordinate`
373 A `dict` of `Dimension` link name, value pairs that label the
374 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
375 should be provided as the second argument.
376 for_put : `bool`, optional
377 If `True` this call is invoked as part of a `Butler.put`.
378 Otherwise it is assumed to be part of a `Butler.get()`. This
379 parameter is only relevant if there is dataset type
380 inconsistency.
381 **kwargs
382 Additional keyword arguments used to augment or construct a
383 `DataCoordinate`. See `DataCoordinate.standardize`
384 parameters.
386 Returns
387 -------
388 datasetType : `DatasetType`
389 A `DatasetType` instance extracted from ``datasetRefOrType``.
390 dataId : `dict` or `DataId`, optional
391 Argument that can be used (along with ``kwargs``) to construct a
392 `DataId`.
394 Notes
395 -----
396 Butler APIs that conceptually need a DatasetRef also allow passing a
397 `DatasetType` (or the name of one) and a `DataId` (or a dict and
398 keyword arguments that can be used to construct one) separately. This
399 method accepts those arguments and always returns a true `DatasetType`
400 and a `DataId` or `dict`.
402 Standardization of `dict` vs `DataId` is best handled by passing the
403 returned ``dataId`` (and ``kwargs``) to `Registry` APIs, which are
404 generally similarly flexible.
405 """
406 externalDatasetType: DatasetType | None = None
407 internalDatasetType: DatasetType | None = None
408 if isinstance(datasetRefOrType, DatasetRef):
409 if dataId is not None or kwargs:
410 raise ValueError("DatasetRef given, cannot use dataId as well")
411 externalDatasetType = datasetRefOrType.datasetType
412 dataId = datasetRefOrType.dataId
413 else:
414 # Don't check whether DataId is provided, because Registry APIs
415 # can usually construct a better error message when it wasn't.
416 if isinstance(datasetRefOrType, DatasetType):
417 externalDatasetType = datasetRefOrType
418 else:
419 internalDatasetType = self.get_dataset_type(datasetRefOrType)
421 # Check that they are self-consistent
422 if externalDatasetType is not None:
423 internalDatasetType = self.get_dataset_type(externalDatasetType.name)
424 if externalDatasetType != internalDatasetType:
425 # We can allow differences if they are compatible, depending
426 # on whether this is a get or a put. A get requires that
427 # the python type associated with the datastore can be
428 # converted to the user type. A put requires that the user
429 # supplied python type can be converted to the internal
430 # type expected by registry.
431 relevantDatasetType = internalDatasetType
432 if for_put:
433 is_compatible = internalDatasetType.is_compatible_with(externalDatasetType)
434 else:
435 is_compatible = externalDatasetType.is_compatible_with(internalDatasetType)
436 relevantDatasetType = externalDatasetType
437 if not is_compatible:
438 raise ValueError(
439 f"Supplied dataset type ({externalDatasetType}) inconsistent with "
440 f"registry definition ({internalDatasetType})"
441 )
442 # Override the internal definition.
443 internalDatasetType = relevantDatasetType
445 assert internalDatasetType is not None
446 return internalDatasetType, dataId
448 def _rewrite_data_id(
449 self, dataId: DataId | None, datasetType: DatasetType, **kwargs: Any
450 ) -> tuple[DataId | None, dict[str, Any]]:
451 """Rewrite a data ID taking into account dimension records.
453 Take a Data ID and keyword args and rewrite it if necessary to
454 allow the user to specify dimension records rather than dimension
455 primary values.
457 This allows a user to include a dataId dict with keys of
458 ``exposure.day_obs`` and ``exposure.seq_num`` instead of giving
459 the integer exposure ID. It also allows a string to be given
460 for a dimension value rather than the integer ID if that is more
461 convenient. For example, rather than having to specifying the
462 detector with ``detector.full_name``, a string given for ``detector``
463 will be interpreted as the full name and converted to the integer
464 value.
466 Keyword arguments can also use strings for dimensions like detector
467 and exposure but python does not allow them to include ``.`` and
468 so the ``exposure.day_obs`` syntax can not be used in a keyword
469 argument.
471 Parameters
472 ----------
473 dataId : `dict` or `DataCoordinate`
474 A `dict` of `Dimension` link name, value pairs that will label the
475 `DatasetRef` within a Collection.
476 datasetType : `DatasetType`
477 The dataset type associated with this dataId. Required to
478 determine the relevant dimensions.
479 **kwargs
480 Additional keyword arguments used to augment or construct a
481 `DataId`. See `DataId` parameters.
483 Returns
484 -------
485 dataId : `dict` or `DataCoordinate`
486 The, possibly rewritten, dataId. If given a `DataCoordinate` and
487 no keyword arguments, the original dataId will be returned
488 unchanged.
489 **kwargs : `dict`
490 Any unused keyword arguments (would normally be empty dict).
491 """
492 # Process dimension records that are using record information
493 # rather than ids
494 newDataId: dict[str, DataIdValue] = {}
495 byRecord: dict[str, dict[str, Any]] = defaultdict(dict)
497 if isinstance(dataId, DataCoordinate):
498 # Do nothing if we have a DataCoordinate and no kwargs.
499 if not kwargs:
500 return dataId, kwargs
501 # If we have a DataCoordinate with kwargs, we know the
502 # DataCoordinate only has values for real dimensions.
503 newDataId.update(dataId.mapping)
504 elif dataId:
505 # The data is mapping, which means it might have keys like
506 # "exposure.obs_id" (unlike kwargs, because a "." is not allowed in
507 # a keyword parameter).
508 for k, v in dataId.items():
509 if isinstance(k, str) and "." in k:
510 # Someone is using a more human-readable dataId
511 dimensionName, record = k.split(".", 1)
512 byRecord[dimensionName][record] = v
513 else:
514 newDataId[k] = v
516 # Go through the updated dataId and check the type in case someone is
517 # using an alternate key. We have already filtered out the compound
518 # keys dimensions.record format.
519 not_dimensions = {}
521 # Will need to look in the dataId and the keyword arguments
522 # and will remove them if they need to be fixed or are unrecognized.
523 for dataIdDict in (newDataId, kwargs):
524 # Use a list so we can adjust the dict safely in the loop
525 for dimensionName in list(dataIdDict):
526 value = dataIdDict[dimensionName]
527 try:
528 dimension = self.dimensions.dimensions[dimensionName]
529 except KeyError:
530 # This is not a real dimension
531 not_dimensions[dimensionName] = value
532 del dataIdDict[dimensionName]
533 continue
535 # Convert an integral type to an explicit int to simplify
536 # comparisons here
537 if isinstance(value, numbers.Integral):
538 value = int(value)
540 if not isinstance(value, dimension.primaryKey.getPythonType()):
541 for alternate in dimension.alternateKeys:
542 if isinstance(value, alternate.getPythonType()):
543 byRecord[dimensionName][alternate.name] = value
544 del dataIdDict[dimensionName]
545 _LOG.debug(
546 "Converting dimension %s to %s.%s=%s",
547 dimensionName,
548 dimensionName,
549 alternate.name,
550 value,
551 )
552 break
553 else:
554 _LOG.warning(
555 "Type mismatch found for value '%r' provided for dimension %s. "
556 "Could not find matching alternative (primary key has type %s) "
557 "so attempting to use as-is.",
558 value,
559 dimensionName,
560 dimension.primaryKey.getPythonType(),
561 )
563 # By this point kwargs and newDataId should only include valid
564 # dimensions. Merge kwargs in to the new dataId and log if there
565 # are dimensions in both (rather than calling update).
566 for k, v in kwargs.items():
567 if k in newDataId and newDataId[k] != v:
568 _LOG.debug(
569 "Keyword arg %s overriding explicit value in dataId of %s with %s", k, newDataId[k], v
570 )
571 newDataId[k] = v
572 # No need to retain any values in kwargs now.
573 kwargs = {}
575 # If we have some unrecognized dimensions we have to try to connect
576 # them to records in other dimensions. This is made more complicated
577 # by some dimensions having records with clashing names. A mitigation
578 # is that we can tell by this point which dimensions are missing
579 # for the DatasetType but this does not work for calibrations
580 # where additional dimensions can be used to constrain the temporal
581 # axis.
582 if not_dimensions:
583 # Search for all dimensions even if we have been given a value
584 # explicitly. In some cases records are given as well as the
585 # actually dimension and this should not be an error if they
586 # match.
587 mandatoryDimensions = datasetType.dimensions.names # - provided
589 candidateDimensions: set[str] = set()
590 candidateDimensions.update(mandatoryDimensions)
592 # For calibrations we may well be needing temporal dimensions
593 # so rather than always including all dimensions in the scan
594 # restrict things a little. It is still possible for there
595 # to be confusion over day_obs in visit vs exposure for example.
596 # If we are not searching calibration collections things may
597 # fail but they are going to fail anyway because of the
598 # ambiguousness of the dataId...
599 if datasetType.isCalibration():
600 for dim in self.dimensions.dimensions:
601 if dim.temporal:
602 candidateDimensions.add(str(dim))
604 # Look up table for the first association with a dimension
605 guessedAssociation: dict[str, dict[str, Any]] = defaultdict(dict)
607 # Keep track of whether an item is associated with multiple
608 # dimensions.
609 counter: Counter[str] = Counter()
610 assigned: dict[str, set[str]] = defaultdict(set)
612 # Go through the missing dimensions and associate the
613 # given names with records within those dimensions
614 matched_dims = set()
615 for dimensionName in candidateDimensions:
616 dimension = self.dimensions.dimensions[dimensionName]
617 fields = dimension.metadata.names | dimension.uniqueKeys.names
618 for field in not_dimensions:
619 if field in fields:
620 guessedAssociation[dimensionName][field] = not_dimensions[field]
621 counter[dimensionName] += 1
622 assigned[field].add(dimensionName)
623 matched_dims.add(field)
625 # Calculate the fields that matched nothing.
626 never_found = set(not_dimensions) - matched_dims
628 if never_found:
629 raise DimensionValueError(f"Unrecognized keyword args given: {never_found}")
631 # There is a chance we have allocated a single dataId item
632 # to multiple dimensions. Need to decide which should be retained.
633 # For now assume that the most popular alternative wins.
634 # This means that day_obs with seq_num will result in
635 # exposure.day_obs and not visit.day_obs
636 # Also prefer an explicitly missing dimension over an inferred
637 # temporal dimension.
638 for fieldName, assignedDimensions in assigned.items():
639 if len(assignedDimensions) > 1:
640 # Pick the most popular (preferring mandatory dimensions)
641 requiredButMissing = assignedDimensions.intersection(mandatoryDimensions)
642 if requiredButMissing:
643 candidateDimensions = requiredButMissing
644 else:
645 candidateDimensions = assignedDimensions
647 # If this is a choice between visit and exposure and
648 # neither was a required part of the dataset type,
649 # (hence in this branch) always prefer exposure over
650 # visit since exposures are always defined and visits
651 # are defined from exposures.
652 if candidateDimensions == {"exposure", "visit"}:
653 candidateDimensions = {"exposure"}
655 # Select the relevant items and get a new restricted
656 # counter.
657 theseCounts = {k: v for k, v in counter.items() if k in candidateDimensions}
658 duplicatesCounter: Counter[str] = Counter()
659 duplicatesCounter.update(theseCounts)
661 # Choose the most common. If they are equally common
662 # we will pick the one that was found first.
663 # Returns a list of tuples
664 selected = duplicatesCounter.most_common(1)[0][0]
666 _LOG.debug(
667 "Ambiguous dataId entry '%s' associated with multiple dimensions: %s."
668 " Removed ambiguity by choosing dimension %s.",
669 fieldName,
670 ", ".join(assignedDimensions),
671 selected,
672 )
674 for candidateDimension in assignedDimensions:
675 if candidateDimension != selected:
676 del guessedAssociation[candidateDimension][fieldName]
678 # Update the record look up dict with the new associations
679 for dimensionName, values in guessedAssociation.items():
680 if values: # A dict might now be empty
681 _LOG.debug(
682 "Assigned non-dimension dataId keys to dimension %s: %s", dimensionName, values
683 )
684 byRecord[dimensionName].update(values)
686 if byRecord:
687 # Some record specifiers were found so we need to convert
688 # them to the Id form
689 for dimensionName, values in byRecord.items():
690 if dimensionName in newDataId:
691 _LOG.debug(
692 "DataId specified explicit %s dimension value of %s in addition to"
693 " general record specifiers for it of %s. Checking for self-consistency.",
694 dimensionName,
695 newDataId[dimensionName],
696 str(values),
697 )
698 # Get the actual record and compare with these values.
699 # Only query with relevant data ID values.
700 filtered_data_id = {
701 k: v for k, v in newDataId.items() if k in self.dimensions[dimensionName].required
702 }
703 try:
704 recs = self.query_dimension_records(
705 dimensionName,
706 data_id=filtered_data_id,
707 )
708 except (DataIdError, EmptyQueryResultError):
709 raise DimensionValueError(
710 f"Could not find dimension '{dimensionName}'"
711 f" with dataId {filtered_data_id} as part of comparing with"
712 f" record values {byRecord[dimensionName]}"
713 ) from None
714 if len(recs) == 1:
715 errmsg: list[str] = []
716 for k, v in values.items():
717 if (recval := getattr(recs[0], k)) != v:
718 errmsg.append(f"{k} ({recval} != {v})")
719 if errmsg:
720 raise DimensionValueError(
721 f"Dimension {dimensionName} in dataId has explicit value"
722 f" {newDataId[dimensionName]} inconsistent with"
723 f" {dimensionName} dimension record: " + ", ".join(errmsg)
724 )
725 else:
726 # Multiple matches for an explicit dimension
727 # should never happen but let downstream complain.
728 pass
729 continue
731 # Do not use data ID keys in query that aren't relevant.
732 # Otherwise we can have detector queries being constrained
733 # by an exposure ID that doesn't exist and return no matches
734 # for a detector even though it's a good detector name.
735 filtered_data_id = {
736 k: v
737 for k, v in newDataId.items()
738 if k in self.dimensions[dimensionName].minimal_group.names
739 }
741 def _get_attr(obj: Any, attr: str) -> Any:
742 # Used to implement x.exposure.seq_num when given
743 # x and "exposure.seq_num".
744 for component in attr.split("."):
745 obj = getattr(obj, component)
746 return obj
748 with self.query() as q:
749 x = q.expression_factory
750 # Build up a WHERE expression.
751 predicates = tuple(_get_attr(x, f"{dimensionName}.{k}") == v for k, v in values.items())
752 extra_args: dict[str, Any] = {} # For mypy.
753 extra_args.update(filtered_data_id)
754 extra_args.update(kwargs)
755 q = q.where(x.all(*predicates), **extra_args)
756 records = set(q.dimension_records(dimensionName))
758 if len(records) != 1:
759 if len(records) > 1:
760 # visit can have an ambiguous answer without involving
761 # visit_system. The default visit_system is defined
762 # by the instrument.
763 if (
764 dimensionName == "visit"
765 and "visit_system_membership" in self.dimensions
766 and "visit_system" in self.dimensions["instrument"].metadata
767 ):
768 instrument_records = self.query_dimension_records(
769 "instrument",
770 data_id=newDataId,
771 explain=False,
772 **kwargs,
773 )
774 if len(instrument_records) == 1:
775 visit_system = instrument_records[0].visit_system
776 if visit_system is None:
777 # Set to a value that will never match.
778 visit_system = -1
780 # Look up each visit in the
781 # visit_system_membership records.
782 for rec in records:
783 membership = self.query_dimension_records(
784 # Use bind to allow zero results.
785 # This is a fully-specified query.
786 "visit_system_membership",
787 instrument=instrument_records[0].name,
788 visit_system=visit_system,
789 visit=rec.id,
790 explain=False,
791 )
792 if membership:
793 # This record is the right answer.
794 records = {rec}
795 break
797 # The ambiguity may have been resolved so check again.
798 if len(records) > 1:
799 _LOG.debug(
800 "Received %d records from constraints of %s", len(records), str(values)
801 )
802 for r in records:
803 _LOG.debug("- %s", str(r))
804 raise DimensionValueError(
805 f"DataId specification for dimension {dimensionName} is not"
806 f" uniquely constrained to a single dataset by {values}."
807 f" Got {len(records)} results."
808 )
809 else:
810 raise DimensionValueError(
811 f"DataId specification for dimension {dimensionName} matched no"
812 f" records when constrained by {values}"
813 )
815 # Get the primary key from the real dimension object
816 dimension = self.dimensions.dimensions[dimensionName]
817 if not isinstance(dimension, Dimension):
818 raise RuntimeError(
819 f"{dimension.name} is not a true dimension, and cannot be used in data IDs."
820 )
821 newDataId[dimensionName] = getattr(records.pop(), dimension.primaryKey.name)
823 return newDataId, kwargs
825 def _findDatasetRef(
826 self,
827 datasetRefOrType: DatasetRef | DatasetType | str,
828 dataId: DataId | None = None,
829 *,
830 collections: Any = None,
831 predict: bool = False,
832 run: str | None = None,
833 datastore_records: bool = False,
834 timespan: Timespan | None = None,
835 **kwargs: Any,
836 ) -> DatasetRef:
837 """Shared logic for methods that start with a search for a dataset in
838 the registry.
840 Parameters
841 ----------
842 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
843 When `DatasetRef` the `dataId` should be `None`.
844 Otherwise the `DatasetType` or name thereof.
845 dataId : `dict` or `DataCoordinate`, optional
846 A `dict` of `Dimension` link name, value pairs that label the
847 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
848 should be provided as the first argument.
849 collections : Any, optional
850 Collections to be searched, overriding ``self.collections``.
851 Can be any of the types supported by the ``collections`` argument
852 to butler construction.
853 predict : `bool`, optional
854 If `True`, return a newly created `DatasetRef` with a unique
855 dataset ID if finding a reference in the `Registry` fails.
856 Defaults to `False`.
857 run : `str`, optional
858 Run collection name to use for creating `DatasetRef` for predicted
859 datasets. Only used if ``predict`` is `True`.
860 datastore_records : `bool`, optional
861 If `True` add datastore records to returned `DatasetRef`.
862 timespan : `Timespan` or `None`, optional
863 A timespan that the validity range of the dataset must overlap.
864 If not provided and this is a calibration dataset type, an attempt
865 will be made to find the timespan from any temporal coordinate
866 in the data ID.
867 **kwargs
868 Additional keyword arguments used to augment or construct a
869 `DataId`. See `DataId` parameters.
871 Returns
872 -------
873 ref : `DatasetRef`
874 A reference to the dataset identified by the given arguments.
875 This can be the same dataset reference as given if it was
876 resolved.
878 Raises
879 ------
880 LookupError
881 Raised if no matching dataset exists in the `Registry` (and
882 ``predict`` is `False`).
883 ValueError
884 Raised if a resolved `DatasetRef` was passed as an input, but it
885 differs from the one found in the registry.
886 TypeError
887 Raised if no collections were provided.
888 """
889 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, for_put=False, **kwargs)
890 if isinstance(datasetRefOrType, DatasetRef):
891 if collections is not None:
892 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=3)
893 if predict and not datasetRefOrType.dataId.hasRecords():
894 return datasetRefOrType.expanded(self.registry.expandDataId(datasetRefOrType.dataId))
895 # May need to retrieve datastore records if requested.
896 if datastore_records and datasetRefOrType._datastore_records is None:
897 datasetRefOrType = self._registry.get_datastore_records(datasetRefOrType)
898 return datasetRefOrType
900 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs)
902 if datasetType.isCalibration():
903 # Because this is a calibration dataset, first try to make a
904 # standardize the data ID without restricting the dimensions to
905 # those of the dataset type requested, because there may be extra
906 # dimensions that provide temporal information for a validity-range
907 # lookup.
908 dataId = DataCoordinate.standardize(
909 dataId, universe=self.dimensions, defaults=self._registry.defaults.dataId, **kwargs
910 )
911 if timespan is None:
912 if dataId.dimensions.temporal:
913 dataId = self._registry.expandDataId(dataId)
914 # Use the timespan from the data ID to constrain the
915 # calibration lookup, but only if the caller has not
916 # specified an explicit timespan.
917 timespan = dataId.timespan
918 else:
919 # Try an arbitrary timespan. Downstream will fail if this
920 # results in more than one matching dataset.
921 timespan = Timespan(None, None)
922 else:
923 # Standardize the data ID to just the dimensions of the dataset
924 # type instead of letting registry.findDataset do it, so we get the
925 # result even if no dataset is found.
926 dataId = DataCoordinate.standardize(
927 dataId,
928 dimensions=datasetType.dimensions,
929 defaults=self._registry.defaults.dataId,
930 **kwargs,
931 )
932 # Always lookup the DatasetRef, even if one is given, to ensure it is
933 # present in the current collection.
934 ref = self.find_dataset(
935 datasetType,
936 dataId,
937 collections=collections,
938 timespan=timespan,
939 datastore_records=datastore_records,
940 )
941 if ref is None:
942 if predict:
943 if run is None:
944 run = self.run
945 if run is None:
946 raise TypeError("Cannot predict dataset ID/location with run=None.")
947 dataId = self.registry.expandDataId(dataId)
948 return DatasetRef(datasetType, dataId, run=run)
949 else:
950 if collections is None:
951 collections = self._registry.defaults.collections
952 raise DatasetNotFoundError(
953 f"Dataset {datasetType.name} with data ID {dataId} "
954 f"could not be found in collections {collections}."
955 )
956 if datasetType != ref.datasetType:
957 # If they differ it is because the user explicitly specified
958 # a compatible dataset type to this call rather than using the
959 # registry definition. The DatasetRef must therefore be recreated
960 # using the user definition such that the expected type is
961 # returned.
962 ref = DatasetRef(
963 datasetType, ref.dataId, run=ref.run, id=ref.id, datastore_records=ref._datastore_records
964 )
966 return ref
968 @transactional
969 def put(
970 self,
971 obj: Any,
972 datasetRefOrType: DatasetRef | DatasetType | str,
973 /,
974 dataId: DataId | None = None,
975 *,
976 run: str | None = None,
977 provenance: DatasetProvenance | None = None,
978 **kwargs: Any,
979 ) -> DatasetRef:
980 """Store and register a dataset.
982 Parameters
983 ----------
984 obj : `object`
985 The dataset.
986 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
987 When `DatasetRef` is provided, ``dataId`` should be `None`.
988 Otherwise the `DatasetType` or name thereof. If a fully resolved
989 `DatasetRef` is given the run and ID are used directly.
990 dataId : `dict` or `DataCoordinate`
991 A `dict` of `Dimension` link name, value pairs that label the
992 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
993 should be provided as the second argument.
994 run : `str`, optional
995 The name of the run the dataset should be added to, overriding
996 ``self.run``. Not used if a resolved `DatasetRef` is provided.
997 provenance : `DatasetProvenance` or `None`, optional
998 Any provenance that should be attached to the serialized dataset.
999 Not supported by all serialization mechanisms.
1000 **kwargs
1001 Additional keyword arguments used to augment or construct a
1002 `DataCoordinate`. See `DataCoordinate.standardize`
1003 parameters. Not used if a resolve `DatasetRef` is provided.
1005 Returns
1006 -------
1007 ref : `DatasetRef`
1008 A reference to the stored dataset, updated with the correct id if
1009 given.
1011 Raises
1012 ------
1013 TypeError
1014 Raised if the butler is read-only or if no run has been provided.
1015 """
1016 if isinstance(datasetRefOrType, DatasetRef):
1017 # This is a direct put of predefined DatasetRef.
1018 _LOG.debug("Butler put direct: %s", datasetRefOrType)
1019 if run is not None:
1020 warnings.warn("Run collection is not used for DatasetRef", stacklevel=3)
1022 with self._metrics.instrument_put(_LOG, msg="Dataset put direct"):
1023 # If registry already has a dataset with the same dataset ID,
1024 # dataset type and DataId, then _importDatasets will do
1025 # nothing and just return an original ref. We have to raise in
1026 # this case, there is a datastore check below for that.
1027 self._registry._importDatasets([datasetRefOrType], expand=True)
1028 # Before trying to write to the datastore check that it does
1029 # not know this dataset. This is prone to races, of course.
1030 if self._datastore.knows(datasetRefOrType):
1031 raise ConflictingDefinitionError(
1032 f"Datastore already contains dataset: {datasetRefOrType}"
1033 )
1034 # Try to write dataset to the datastore, if it fails due to a
1035 # race with another write, the content of stored data may be
1036 # unpredictable.
1037 try:
1038 self._datastore.put(obj, datasetRefOrType, provenance=provenance)
1039 except IntegrityError as e:
1040 raise ConflictingDefinitionError(f"Datastore already contains dataset: {e}") from e
1042 return datasetRefOrType
1044 _LOG.debug("Butler put: %s, dataId=%s, run=%s", datasetRefOrType, dataId, run)
1045 if not self.isWriteable():
1046 raise TypeError("Butler is read-only.")
1048 with self._metrics.instrument_put(_LOG, msg="Dataset put with dataID"):
1049 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwargs)
1051 # Handle dimension records in dataId
1052 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs)
1054 # Add Registry Dataset entry.
1055 dataId = self._registry.expandDataId(dataId, dimensions=datasetType.dimensions, **kwargs)
1056 (ref,) = self._registry.insertDatasets(datasetType, run=run, dataIds=[dataId])
1057 self._datastore.put(obj, ref, provenance=provenance)
1059 return ref
1061 def getDeferred(
1062 self,
1063 datasetRefOrType: DatasetRef | DatasetType | str,
1064 /,
1065 dataId: DataId | None = None,
1066 *,
1067 parameters: dict | None = None,
1068 collections: Any = None,
1069 storageClass: str | StorageClass | None = None,
1070 timespan: Timespan | None = None,
1071 **kwargs: Any,
1072 ) -> DeferredDatasetHandle:
1073 """Create a `DeferredDatasetHandle` which can later retrieve a dataset,
1074 after an immediate registry lookup.
1076 Parameters
1077 ----------
1078 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
1079 When `DatasetRef` the `dataId` should be `None`.
1080 Otherwise the `DatasetType` or name thereof.
1081 dataId : `dict` or `DataCoordinate`, optional
1082 A `dict` of `Dimension` link name, value pairs that label the
1083 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
1084 should be provided as the first argument.
1085 parameters : `dict`
1086 Additional StorageClass-defined options to control reading,
1087 typically used to efficiently read only a subset of the dataset.
1088 collections : Any, optional
1089 Collections to be searched, overriding ``self.collections``.
1090 Can be any of the types supported by the ``collections`` argument
1091 to butler construction.
1092 storageClass : `StorageClass` or `str`, optional
1093 The storage class to be used to override the Python type
1094 returned by this method. By default the returned type matches
1095 the dataset type definition for this dataset. Specifying a
1096 read `StorageClass` can force a different type to be returned.
1097 This type must be compatible with the original type.
1098 timespan : `Timespan` or `None`, optional
1099 A timespan that the validity range of the dataset must overlap.
1100 If not provided and this is a calibration dataset type, an attempt
1101 will be made to find the timespan from any temporal coordinate
1102 in the data ID.
1103 **kwargs
1104 Additional keyword arguments used to augment or construct a
1105 `DataId`. See `DataId` parameters.
1107 Returns
1108 -------
1109 obj : `DeferredDatasetHandle`
1110 A handle which can be used to retrieve a dataset at a later time.
1112 Raises
1113 ------
1114 LookupError
1115 Raised if no matching dataset exists in the `Registry` or
1116 datastore.
1117 ValueError
1118 Raised if a resolved `DatasetRef` was passed as an input, but it
1119 differs from the one found in the registry.
1120 TypeError
1121 Raised if no collections were provided.
1122 """
1123 if isinstance(datasetRefOrType, DatasetRef):
1124 # Do the quick check first and if that fails, check for artifact
1125 # existence. This is necessary for datastores that are configured
1126 # in trust mode where there won't be a record but there will be
1127 # a file.
1128 if self._datastore.knows(datasetRefOrType) or self._datastore.exists(datasetRefOrType):
1129 ref = datasetRefOrType
1130 else:
1131 raise LookupError(f"Dataset reference {datasetRefOrType} does not exist.")
1132 else:
1133 ref = self._findDatasetRef(
1134 datasetRefOrType, dataId, collections=collections, timespan=timespan, **kwargs
1135 )
1136 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass)
1138 def get(
1139 self,
1140 datasetRefOrType: DatasetRef | DatasetType | str,
1141 /,
1142 dataId: DataId | None = None,
1143 *,
1144 parameters: dict[str, Any] | None = None,
1145 collections: Any = None,
1146 storageClass: StorageClass | str | None = None,
1147 timespan: Timespan | None = None,
1148 **kwargs: Any,
1149 ) -> Any:
1150 """Retrieve a stored dataset.
1152 Parameters
1153 ----------
1154 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
1155 When `DatasetRef` the `dataId` should be `None`.
1156 Otherwise the `DatasetType` or name thereof.
1157 If a resolved `DatasetRef`, the associated dataset
1158 is returned directly without additional querying.
1159 dataId : `dict` or `DataCoordinate`
1160 A `dict` of `Dimension` link name, value pairs that label the
1161 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
1162 should be provided as the first argument.
1163 parameters : `dict`
1164 Additional StorageClass-defined options to control reading,
1165 typically used to efficiently read only a subset of the dataset.
1166 collections : Any, optional
1167 Collections to be searched, overriding ``self.collections``.
1168 Can be any of the types supported by the ``collections`` argument
1169 to butler construction.
1170 storageClass : `StorageClass` or `str`, optional
1171 The storage class to be used to override the Python type
1172 returned by this method. By default the returned type matches
1173 the dataset type definition for this dataset. Specifying a
1174 read `StorageClass` can force a different type to be returned.
1175 This type must be compatible with the original type.
1176 timespan : `Timespan` or `None`, optional
1177 A timespan that the validity range of the dataset must overlap.
1178 If not provided and this is a calibration dataset type, an attempt
1179 will be made to find the timespan from any temporal coordinate
1180 in the data ID.
1181 **kwargs
1182 Additional keyword arguments used to augment or construct a
1183 `DataCoordinate`. See `DataCoordinate.standardize`
1184 parameters.
1186 Returns
1187 -------
1188 obj : `object`
1189 The dataset.
1191 Raises
1192 ------
1193 LookupError
1194 Raised if no matching dataset exists in the `Registry`.
1195 TypeError
1196 Raised if no collections were provided.
1198 Notes
1199 -----
1200 When looking up datasets in a `~CollectionType.CALIBRATION` collection,
1201 this method requires that the given data ID include temporal dimensions
1202 beyond the dimensions of the dataset type itself, in order to find the
1203 dataset with the appropriate validity range. For example, a "bias"
1204 dataset with native dimensions ``{instrument, detector}`` could be
1205 fetched with a ``{instrument, detector, exposure}`` data ID, because
1206 ``exposure`` is a temporal dimension.
1207 """
1208 _LOG.debug("Butler get: %s, dataId=%s, parameters=%s", datasetRefOrType, dataId, parameters)
1209 with self._metrics.instrument_get(_LOG, msg="Retrieved dataset"):
1210 ref = self._findDatasetRef(
1211 datasetRefOrType,
1212 dataId,
1213 collections=collections,
1214 datastore_records=True,
1215 timespan=timespan,
1216 **kwargs,
1217 )
1218 return self._datastore.get(ref, parameters=parameters, storageClass=storageClass)
1220 def getURIs(
1221 self,
1222 datasetRefOrType: DatasetRef | DatasetType | str,
1223 /,
1224 dataId: DataId | None = None,
1225 *,
1226 predict: bool = False,
1227 collections: Any = None,
1228 run: str | None = None,
1229 **kwargs: Any,
1230 ) -> DatasetRefURIs:
1231 """Return the URIs associated with the dataset.
1233 Parameters
1234 ----------
1235 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
1236 When `DatasetRef` the `dataId` should be `None`.
1237 Otherwise the `DatasetType` or name thereof.
1238 dataId : `dict` or `DataCoordinate`
1239 A `dict` of `Dimension` link name, value pairs that label the
1240 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
1241 should be provided as the first argument.
1242 predict : `bool`
1243 If `True`, allow URIs to be returned of datasets that have not
1244 been written.
1245 collections : Any, optional
1246 Collections to be searched, overriding ``self.collections``.
1247 Can be any of the types supported by the ``collections`` argument
1248 to butler construction.
1249 run : `str`, optional
1250 Run to use for predictions, overriding ``self.run``.
1251 **kwargs
1252 Additional keyword arguments used to augment or construct a
1253 `DataCoordinate`. See `DataCoordinate.standardize`
1254 parameters.
1256 Returns
1257 -------
1258 uris : `DatasetRefURIs`
1259 The URI to the primary artifact associated with this dataset (if
1260 the dataset was disassembled within the datastore this may be
1261 `None`), and the URIs to any components associated with the dataset
1262 artifact. (can be empty if there are no components).
1263 """
1264 ref = self._findDatasetRef(
1265 datasetRefOrType, dataId, predict=predict, run=run, collections=collections, **kwargs
1266 )
1267 return self._datastore.getURIs(ref, predict)
1269 def get_dataset_type(self, name: str) -> DatasetType:
1270 return self._registry.getDatasetType(name)
1272 def get_dataset(
1273 self,
1274 id: DatasetId | str,
1275 *,
1276 storage_class: str | StorageClass | None = None,
1277 dimension_records: bool = False,
1278 datastore_records: bool = False,
1279 ) -> DatasetRef | None:
1280 id = _to_uuid(id)
1281 ref = self._registry.getDataset(id)
1282 if ref is not None:
1283 if dimension_records:
1284 ref = ref.expanded(
1285 self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions)
1286 )
1287 if storage_class:
1288 ref = ref.overrideStorageClass(storage_class)
1289 if datastore_records:
1290 ref = self._registry.get_datastore_records(ref)
1291 return ref
1293 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]:
1294 uuids = [_to_uuid(id) for id in ids]
1295 return self._registry._managers.datasets.get_dataset_refs(uuids)
1297 def find_dataset(
1298 self,
1299 dataset_type: DatasetType | str,
1300 data_id: DataId | None = None,
1301 *,
1302 collections: str | Sequence[str] | None = None,
1303 timespan: Timespan | None = None,
1304 storage_class: str | StorageClass | None = None,
1305 dimension_records: bool = False,
1306 datastore_records: bool = False,
1307 **kwargs: Any,
1308 ) -> DatasetRef | None:
1309 # Handle any parts of the dataID that are not using primary dimension
1310 # keys.
1311 if isinstance(dataset_type, str):
1312 actual_type = self.get_dataset_type(dataset_type)
1313 else:
1314 actual_type = dataset_type
1316 # Store the component for later.
1317 component_name = actual_type.component()
1318 if actual_type.isComponent():
1319 parent_type = actual_type.makeCompositeDatasetType()
1320 else:
1321 parent_type = actual_type
1323 data_id, kwargs = self._rewrite_data_id(data_id, parent_type, **kwargs)
1325 ref = self.registry.findDataset(
1326 parent_type,
1327 data_id,
1328 collections=collections,
1329 timespan=timespan,
1330 datastore_records=datastore_records,
1331 **kwargs,
1332 )
1333 if ref is not None and dimension_records:
1334 ref = ref.expanded(self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions))
1335 if ref is not None and component_name:
1336 ref = ref.makeComponentRef(component_name)
1337 if ref is not None and storage_class is not None:
1338 ref = ref.overrideStorageClass(storage_class)
1340 return ref
1342 def retrieve_artifacts_zip(
1343 self,
1344 refs: Iterable[DatasetRef],
1345 destination: ResourcePathExpression,
1346 overwrite: bool = True,
1347 ) -> ResourcePath:
1348 return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite)
1350 def retrieveArtifacts(
1351 self,
1352 refs: Iterable[DatasetRef],
1353 destination: ResourcePathExpression,
1354 transfer: str = "auto",
1355 preserve_path: bool = True,
1356 overwrite: bool = False,
1357 ) -> list[ResourcePath]:
1358 # Docstring inherited.
1359 outdir = ResourcePath(destination)
1360 artifact_map = self._datastore.retrieveArtifacts(
1361 refs,
1362 outdir,
1363 transfer=transfer,
1364 preserve_path=preserve_path,
1365 overwrite=overwrite,
1366 write_index=True,
1367 )
1368 return list(artifact_map)
1370 def exists(
1371 self,
1372 dataset_ref_or_type: DatasetRef | DatasetType | str,
1373 /,
1374 data_id: DataId | None = None,
1375 *,
1376 full_check: bool = True,
1377 collections: Any = None,
1378 **kwargs: Any,
1379 ) -> DatasetExistence:
1380 # Docstring inherited.
1381 existence = DatasetExistence.UNRECOGNIZED
1383 if isinstance(dataset_ref_or_type, DatasetRef):
1384 if collections is not None:
1385 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=2)
1386 if data_id is not None:
1387 warnings.warn("A DataID should not be specified with DatasetRef", stacklevel=2)
1388 ref = dataset_ref_or_type
1389 registry_ref = self._registry.getDataset(dataset_ref_or_type.id)
1390 if registry_ref is not None:
1391 existence |= DatasetExistence.RECORDED
1393 if dataset_ref_or_type != registry_ref:
1394 # This could mean that storage classes differ, so we should
1395 # check for that but use the registry ref for the rest of
1396 # the method.
1397 if registry_ref.is_compatible_with(dataset_ref_or_type):
1398 # Use the registry version from now on.
1399 ref = registry_ref
1400 else:
1401 raise ValueError(
1402 f"The ref given to exists() ({ref}) has the same dataset ID as one "
1403 f"in registry but has different incompatible values ({registry_ref})."
1404 )
1405 else:
1406 try:
1407 ref = self._findDatasetRef(dataset_ref_or_type, data_id, collections=collections, **kwargs)
1408 except (LookupError, TypeError):
1409 return existence
1410 existence |= DatasetExistence.RECORDED
1412 if self._datastore.knows(ref):
1413 existence |= DatasetExistence.DATASTORE
1415 if full_check:
1416 if self._datastore.exists(ref):
1417 existence |= DatasetExistence._ARTIFACT
1418 elif existence.value != DatasetExistence.UNRECOGNIZED.value:
1419 # Do not add this flag if we have no other idea about a dataset.
1420 existence |= DatasetExistence(DatasetExistence._ASSUMED)
1422 return existence
1424 def _exists_many(
1425 self,
1426 refs: Iterable[DatasetRef],
1427 /,
1428 *,
1429 full_check: bool = True,
1430 ) -> dict[DatasetRef, DatasetExistence]:
1431 # Docstring inherited.
1432 existence = {ref: DatasetExistence.UNRECOGNIZED for ref in refs}
1434 # Check which refs exist in the registry.
1435 id_map = {ref.id: ref for ref in existence.keys()}
1436 for registry_ref in self.get_many_datasets(id_map.keys()):
1437 # Consistency between the given DatasetRef and the information
1438 # recorded in the registry is not verified.
1439 existence[id_map[registry_ref.id]] |= DatasetExistence.RECORDED
1441 # Ask datastore if it knows about these refs.
1442 knows = self._datastore.knows_these(refs)
1443 for ref, known in knows.items():
1444 if known:
1445 existence[ref] |= DatasetExistence.DATASTORE
1447 if full_check:
1448 mexists = self._datastore.mexists(refs)
1449 for ref, exists in mexists.items():
1450 if exists:
1451 existence[ref] |= DatasetExistence._ARTIFACT
1452 else:
1453 # Do not set this flag if nothing is known about the dataset.
1454 for ref in existence:
1455 if existence[ref] != DatasetExistence.UNRECOGNIZED:
1456 existence[ref] |= DatasetExistence._ASSUMED
1458 return existence
1460 def removeRuns(
1461 self,
1462 names: Iterable[str],
1463 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault,
1464 *,
1465 unlink_from_chains: bool = False,
1466 ) -> None:
1467 # Docstring inherited.
1468 if not self.isWriteable():
1469 raise TypeError("Butler is read-only.")
1471 if unstore is not _DeprecatedDefault:
1472 # The value was passed in by a user. Must report it is now
1473 # ignored.
1474 if unstore is True:
1475 msg = "The unstore parameter is deprecated and is now always treated as True. "
1476 else:
1477 msg = "The unstore parameter for removeRuns can no longer be False and is now ignored. "
1478 warnings.warn(
1479 msg + " The parameter will be removed after v30.",
1480 category=FutureWarning,
1481 stacklevel=find_outside_stacklevel("lsst.daf.butler"),
1482 )
1484 names = list(names)
1485 refs: list[DatasetRef] = []
1486 # Map of the chained collections to the RUN children.
1487 parents_to_children: dict[str, set[str]] = defaultdict(set)
1489 with self._caching_context():
1490 # Get information about these RUNs.
1491 collections_info = self.collections.query_info(names, include_parents=unlink_from_chains)
1492 for info in collections_info:
1493 if info.type is not CollectionType.RUN:
1494 raise TypeError(f"The collection type of '{info.name}' is {info.type.name}, not RUN.")
1495 if unlink_from_chains:
1496 if info.parents is None: # For mypy.
1497 raise AssertionError("Internal error: Collection parents required but not received")
1498 for parent in info.parents:
1499 parents_to_children[parent].add(info.name)
1501 # Update the names in case the query unexpectedly had a wildcard.
1502 names = [info.name for info in collections_info]
1504 # Get all the datasets from these runs.
1505 refs = self.query_all_datasets(names, find_first=False, limit=None)
1507 # Call pruneDatasets since we are deliberately removing
1508 # datasets in chunks from the RUN collections rather than
1509 # attempting to remove everything at once.
1510 with time_this(
1511 _LOG,
1512 msg="Removing %d dataset%s from %s",
1513 args=(len(refs), "s" if len(refs) != 1 else "", ", ".join(names)),
1514 ):
1515 self.pruneDatasets(refs, unstore=True, purge=True, disassociate=True)
1517 # Now can remove the actual RUN collection and unlink from chains.
1518 with self._registry.transaction():
1519 # This will fail if caller is not unlinking from chains but the
1520 # RUN is in a chain -- but we have already deleted all the datasets
1521 # by this point.
1522 if unlink_from_chains:
1523 # Use deterministic order for deletions to attempt to minimize
1524 # risk of deadlocks for parallel deletes.
1525 for parent in sorted(parents_to_children):
1526 self.collections.remove_from_chain(parent, sorted(parents_to_children[parent]))
1527 # Sort to avoid potential deadlocks.
1528 for name in sorted(names):
1529 # This should be fast since the collection should be empty.
1530 with time_this(_LOG, msg="Removing RUN collection %s", args=(name,)):
1531 self._registry.removeCollection(name)
1532 _LOG.info("Completely removed the following RUN collections: %s", ", ".join(names))
1534 def pruneDatasets(
1535 self,
1536 refs: Iterable[DatasetRef],
1537 *,
1538 disassociate: bool = True,
1539 unstore: bool = False,
1540 tags: Iterable[str] = (),
1541 purge: bool = False,
1542 ) -> None:
1543 # docstring inherited from LimitedButler
1545 if not self.isWriteable():
1546 raise TypeError("Butler is read-only.")
1547 if purge:
1548 if not disassociate:
1549 raise TypeError("Cannot pass purge=True without disassociate=True.")
1550 if not unstore:
1551 raise TypeError("Cannot pass purge=True without unstore=True.")
1552 elif disassociate:
1553 tags = tuple(tags)
1554 if not tags:
1555 raise TypeError("No tags provided but disassociate=True.")
1556 for tag in tags:
1557 collectionType = self._registry.getCollectionType(tag)
1558 if collectionType is not CollectionType.TAGGED:
1559 raise TypeError(
1560 f"Cannot disassociate from collection '{tag}' "
1561 f"of non-TAGGED type {collectionType.name}."
1562 )
1563 # Transform possibly-single-pass iterable into something we can iterate
1564 # over multiple times.
1565 refs = list(refs)
1566 # Pruning a component of a DatasetRef makes no sense since registry
1567 # doesn't know about components and datastore might not store
1568 # components in a separate file
1569 for ref in refs:
1570 if ref.datasetType.component():
1571 raise ValueError(f"Can not prune a component of a dataset (ref={ref})")
1573 # Chunk the deletions using a size that is reasonably efficient whilst
1574 # also giving reasonable feedback to the user. Chunking also minimizes
1575 # what needs to rollback if there is a failure and should allow
1576 # incremental re-running of the pruning (assuming the query is
1577 # repeated). The only issue will be if the Ctrl-C comes during
1578 # emptyTrash since an admin command would need to run to finish the
1579 # emptying of that chunk.
1580 progress = Progress("lsst.daf.butler.Butler.pruneDatasets", level=_LOG.INFO)
1581 chunk_size = 50_000
1582 n_chunks = math.ceil(len(refs) / chunk_size)
1583 if n_chunks > 1:
1584 _LOG.verbose("Pruning a total of %d datasets", len(refs))
1585 chunk_num = 0
1586 for chunked_refs in progress.wrap(
1587 chunk_iterable(refs, chunk_size=chunk_size), desc="Deleting datasets", total=n_chunks
1588 ):
1589 chunk_num += 1
1590 _LOG.verbose(
1591 "Pruning %d dataset%s in chunk %d/%d",
1592 len(chunked_refs),
1593 "s" if len(chunked_refs) != 1 else "",
1594 chunk_num,
1595 n_chunks,
1596 )
1597 with time_this(
1598 _LOG,
1599 msg="Removing %d datasets for chunk %d/%d",
1600 args=(len(chunked_refs), chunk_num, n_chunks),
1601 ):
1602 self._prune_datasets(
1603 chunked_refs, tags=tags, unstore=unstore, purge=purge, disassociate=disassociate
1604 )
1606 def _prune_datasets(
1607 self,
1608 refs: Collection[DatasetRef],
1609 *,
1610 disassociate: bool = True,
1611 unstore: bool = False,
1612 tags: Iterable[str] = (),
1613 purge: bool = False,
1614 ) -> None:
1615 # We don't need an unreliable Datastore transaction for this, because
1616 # we've been extra careful to ensure that Datastore.trash only involves
1617 # mutating the Registry (it can _look_ at Datastore-specific things,
1618 # but shouldn't change them), and hence all operations here are
1619 # Registry operations.
1620 with self.transaction():
1621 plural = "s" if len(refs) != 1 else ""
1622 if unstore:
1623 with time_this(
1624 _LOG,
1625 msg="Marking %d dataset%s for removal during pruneDatasets",
1626 args=(len(refs), plural),
1627 ):
1628 self._datastore.trash(refs)
1629 if purge:
1630 with time_this(
1631 _LOG, msg="Removing %d pruned dataset%s from registry", args=(len(refs), plural)
1632 ):
1633 self._registry.removeDatasets(refs)
1634 elif disassociate:
1635 assert tags, "Guaranteed by earlier logic in this function."
1636 with time_this(
1637 _LOG, msg="Disassociating %d dataset%ss from tagged collections", args=(len(refs), plural)
1638 ):
1639 for tag in tags:
1640 self._registry.disassociate(tag, refs)
1641 # We've exited the Registry transaction, and apparently committed.
1642 # (if there was an exception, everything rolled back, and it's as if
1643 # nothing happened - and we never get here).
1644 # Datastore artifacts are not yet gone, but they're clearly marked
1645 # as trash, so if we fail to delete now because of (e.g.) filesystem
1646 # problems we can try again later, and if manual administrative
1647 # intervention is required, it's pretty clear what that should entail:
1648 # deleting everything on disk and in private Datastore tables that is
1649 # in the dataset_location_trash table.
1650 if unstore:
1651 # Point of no return for removing artifacts. Restrict the trash
1652 # emptying to the refs that this call trashed.
1653 with time_this(
1654 _LOG,
1655 msg="Attempting to remove artifacts for %d dataset%s associated with pruning",
1656 args=(len(refs), plural),
1657 ):
1658 self._datastore.emptyTrash(refs=refs)
1660 def ingest_zip(
1661 self,
1662 zip_file: ResourcePathExpression,
1663 transfer: str = "auto",
1664 *,
1665 transfer_dimensions: bool = False,
1666 dry_run: bool = False,
1667 skip_existing: bool = False,
1668 ) -> None:
1669 # Docstring inherited.
1670 if not self.isWriteable():
1671 raise TypeError("Butler is read-only.")
1673 zip_path = ResourcePath(zip_file)
1674 index = ZipIndex.from_zip_file(zip_path)
1675 _LOG.verbose(
1676 "Ingesting %s containing %d datasets and %d files.", zip_path, len(index.refs), len(index)
1677 )
1679 # Need to ingest the refs into registry. Re-use the standard ingest
1680 # code by reconstructing FileDataset from the index.
1681 refs = index.refs.to_refs(universe=self.dimensions)
1682 id_to_ref = {ref.id: ref for ref in refs}
1683 datasets: list[FileDataset] = []
1684 processed_ids: set[uuid.UUID] = set()
1685 for path_in_zip, index_info in index.artifact_map.items():
1686 # Disassembled composites need to check this ref isn't already
1687 # included.
1688 unprocessed = {id_ for id_ in index_info.ids if id_ not in processed_ids}
1689 if not unprocessed:
1690 continue
1691 dataset = FileDataset(refs=[id_to_ref[id_] for id_ in unprocessed], path=path_in_zip)
1692 datasets.append(dataset)
1693 processed_ids.update(unprocessed)
1695 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets)
1696 if existing_datasets:
1697 if skip_existing:
1698 _LOG.info(
1699 "Skipping %d datasets from zip file %s which already exist in the repository.",
1700 len(existing_datasets),
1701 zip_file,
1702 )
1703 else:
1704 raise ConflictingDefinitionError(
1705 f"Datastore already contains {len(existing_datasets)} of the given datasets."
1706 f" Example: {existing_datasets[0]}"
1707 )
1708 if new_datasets:
1709 # Can not yet support partial zip ingests where a zip contains
1710 # some datasets that are already in another zip.
1711 raise ValueError(
1712 f"The given zip file from {zip_file} contains {len(new_datasets)} datasets not known "
1713 f"to this butler but also contains {len(existing_datasets)} datasets already known to "
1714 "this butler. Currently butler can not ingest zip files with overlapping content."
1715 )
1716 return
1718 # Ingest doesn't create the RUN collections so we have to do that
1719 # here.
1720 #
1721 # Sort by run collection name to ensure Postgres takes locks in the
1722 # same order between different processes, to mitigate an issue
1723 # where Postgres can deadlock due to the unique index on collection
1724 # name. (See DM-47543).
1725 runs = {ref.run for ref in refs}
1726 for run in sorted(runs):
1727 registered = self.collections.register(run)
1728 if registered:
1729 _LOG.verbose("Created RUN collection %s as part of zip ingest", run)
1731 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE)
1732 import_info = self._prepare_ingest_file_datasets(
1733 datasets, progress, dry_run=dry_run, transfer_dimensions=transfer_dimensions
1734 )
1736 # Calculate some statistics based on the given list of datasets.
1737 n_datasets = 0
1738 for d in datasets:
1739 n_datasets += len(d.refs)
1740 srefs = "s" if n_datasets != 1 else ""
1742 with (
1743 self._metrics.instrument_ingest(
1744 n_datasets,
1745 _LOG,
1746 msg="Ingesting zip file %s with %s dataset%s",
1747 args=(zip_file, n_datasets, srefs),
1748 ),
1749 self.transaction(),
1750 ):
1751 # Do not need expanded dataset refs so can ignore the return value.
1752 self._ingest_file_datasets(datasets, import_info, progress, dry_run=dry_run)
1754 try:
1755 self._datastore.ingest_zip(zip_path, transfer=transfer, dry_run=dry_run)
1756 except IntegrityError as e:
1757 raise ConflictingDefinitionError(
1758 f"Datastore already contains one or more datasets: {e}"
1759 ) from e
1761 def _prepare_ingest_file_datasets(
1762 self,
1763 datasets: Sequence[FileDataset],
1764 progress: Progress,
1765 *,
1766 transfer_dimensions: bool = False,
1767 dry_run: bool = False,
1768 ) -> _ImportDatasetsInfo:
1769 # Track DataIDs that are being ingested so we can spot issues early
1770 # with duplication. Retain previous FileDataset so we can report it.
1771 groupedDataIds: MutableMapping[tuple[DatasetType, str], dict[DataCoordinate, FileDataset]] = (
1772 defaultdict(dict)
1773 )
1775 # All the refs we need to import.
1776 refs: list[DatasetRef] = []
1778 for dataset in progress.wrap(datasets, desc="Validating dataIDs"):
1779 for ref in dataset.refs:
1780 group_key = (ref.datasetType, ref.run)
1782 if ref.dataId in groupedDataIds[group_key]:
1783 raise ConflictingDefinitionError(
1784 f"Ingest conflict. Dataset {dataset.path} has same"
1785 " DataId as other ingest dataset"
1786 f" {groupedDataIds[group_key][ref.dataId].path} "
1787 f" ({ref.dataId})"
1788 )
1790 groupedDataIds[group_key][ref.dataId] = dataset
1791 refs.extend(dataset.refs)
1793 # Ensure that dataset types are created and all ref information
1794 # extracted.
1795 import_info = self._prepare_for_import_refs(
1796 self,
1797 refs,
1798 register_dataset_types=True,
1799 dry_run=dry_run,
1800 transfer_dimensions=transfer_dimensions,
1801 )
1802 return import_info
1804 def _ingest_file_datasets(
1805 self,
1806 datasets: Sequence[FileDataset],
1807 import_info: _ImportDatasetsInfo,
1808 progress: Progress,
1809 *,
1810 dry_run: bool = False,
1811 ) -> None:
1812 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run)
1813 imported_refs = self._import_grouped_refs(
1814 import_info.grouped_refs, None, progress, dry_run=dry_run, expand_refs=True
1815 )
1817 # The expanded refs need to be attached back to the original
1818 # FileDatasets for datastore to use.
1819 id_to_ref = {ref.id: ref for ref in imported_refs}
1821 for dataset in progress.wrap(datasets, desc="Re-attaching expanded refs"):
1822 dataset.refs = [id_to_ref[ref.id] for ref in dataset.refs]
1824 def ingest(
1825 self,
1826 *datasets: FileDataset,
1827 transfer: str | None = "auto",
1828 record_validation_info: bool = True,
1829 skip_existing: bool = False,
1830 ) -> None:
1831 # Docstring inherited.
1832 if not datasets:
1833 return
1834 if not self.isWriteable():
1835 raise TypeError("Butler is read-only.")
1836 _LOG.verbose("Ingesting %d file dataset%s.", len(datasets), "" if len(datasets) == 1 else "s")
1837 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE)
1839 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets)
1840 if existing_datasets:
1841 if skip_existing:
1842 _LOG.info(
1843 "Skipping %d datasets which already exist in the repository.", len(existing_datasets)
1844 )
1845 else:
1846 raise ConflictingDefinitionError(
1847 f"Datastore already contains {len(existing_datasets)} of the given datasets."
1848 f" Example: {existing_datasets[0]}"
1849 )
1851 # Calculate some statistics based on the given list of datasets.
1852 n_files = len(datasets)
1853 n_datasets = 0
1854 for d in datasets:
1855 n_datasets += len(d.refs)
1856 sfiles = "s" if n_files != 1 else ""
1857 srefs = "s" if n_datasets != 1 else ""
1859 # We use `datasets` rather `new_datasets` for the Registry
1860 # portion of this, to let it confirm that everything matches the
1861 # existing datasets.
1862 import_info = self._prepare_ingest_file_datasets(datasets, progress)
1864 with (
1865 self._metrics.instrument_ingest(
1866 n_datasets,
1867 _LOG,
1868 msg="Ingesting %s file%s with %s dataset%s",
1869 args=(n_files, sfiles, n_datasets, srefs),
1870 ),
1871 self.transaction(),
1872 ):
1873 self._ingest_file_datasets(datasets, import_info, progress)
1875 # Bulk-insert everything into Datastore.
1876 # We do not know if any of the registry entries already existed
1877 # (_importDatasets only complains if they exist but differ).
1878 # The _partition_datasets_by_known logic above should catch most
1879 # instances where we attempt to re-ingest files that were already
1880 # ingested, but a concurrent writer could cause a unique constraint
1881 # violation here.
1882 try:
1883 self._datastore.ingest(
1884 *new_datasets, transfer=transfer, record_validation_info=record_validation_info
1885 )
1886 except IntegrityError as e:
1887 raise ConflictingDefinitionError(
1888 f"Datastore already contains one or more datasets: {e}"
1889 ) from e
1891 def _partition_datasets_by_known(
1892 self, datasets: Iterable[FileDataset]
1893 ) -> tuple[list[FileDataset], list[FileDataset]]:
1894 """Divides the given `FileDataset` objects into two groups: those for
1895 which the Datastore already has an entry, and those for which it does
1896 not.
1897 """
1898 new_datasets = []
1899 existing_datasets = []
1901 refs = itertools.chain.from_iterable(dataset.refs for dataset in datasets)
1902 known_refs = self._datastore.knows_these(refs)
1904 for dataset in datasets:
1905 if any(known_refs[ref] for ref in dataset.refs):
1906 existing_datasets.append(dataset)
1907 else:
1908 new_datasets.append(dataset)
1910 return new_datasets, existing_datasets
1912 @contextlib.contextmanager
1913 def export(
1914 self,
1915 *,
1916 directory: str | None = None,
1917 filename: str | None = None,
1918 format: str | None = None,
1919 transfer: str | None = None,
1920 ) -> Iterator[RepoExportContext]:
1921 # Docstring inherited.
1922 if directory is None and transfer is not None:
1923 raise TypeError("Cannot transfer without providing a directory.")
1924 if transfer == "move":
1925 raise TypeError("Transfer may not be 'move': export is read-only")
1926 if format is None:
1927 if filename is None:
1928 raise TypeError("At least one of 'filename' or 'format' must be provided.")
1929 else:
1930 _, format = os.path.splitext(filename)
1931 if not format:
1932 raise ValueError("Please specify a file extension to determine export format.")
1933 format = format[1:] # Strip leading ".""
1934 elif filename is None:
1935 filename = f"export.{format}"
1936 if directory is not None:
1937 filename = os.path.join(directory, filename)
1938 formats = self._config["repo_transfer_formats"]
1939 if format not in formats:
1940 raise ValueError(f"Unknown export format {format!r}, allowed: {','.join(formats.keys())}")
1941 BackendClass = get_class_of(formats[format, "export"])
1942 with open(filename, "w") as stream:
1943 backend = BackendClass(stream, universe=self.dimensions)
1944 try:
1945 helper = RepoExportContext(self, backend=backend, directory=directory, transfer=transfer)
1946 with self._caching_context():
1947 yield helper
1948 except BaseException:
1949 raise
1950 else:
1951 helper._finish()
1953 def import_(
1954 self,
1955 *,
1956 directory: ResourcePathExpression | None = None,
1957 filename: ResourcePathExpression | TextIO | None = None,
1958 format: str | None = None,
1959 transfer: str | None = None,
1960 skip_dimensions: set | None = None,
1961 record_validation_info: bool = True,
1962 without_datastore: bool = False,
1963 ) -> None:
1964 # Docstring inherited.
1965 if not self.isWriteable():
1966 raise TypeError("Butler is read-only.")
1967 if filename is None and format is not None:
1968 filename = ResourcePath(f"export.{format}", forceAbsolute=False)
1969 if directory is not None:
1970 directory = ResourcePath(directory, forceDirectory=True)
1971 # mypy doesn't think this will work but it does in python >= 3.10.
1972 if isinstance(filename, ResourcePathExpression): # type: ignore
1973 filename = ResourcePath(filename, forceAbsolute=False) # type: ignore
1974 if format is None:
1975 format = filename.getExtension()
1976 if not filename.isabs() and directory is not None:
1977 potential = directory.join(filename)
1978 exists_in_cwd = filename.exists()
1979 exists_in_dir = potential.exists()
1980 if exists_in_cwd and exists_in_dir:
1981 _LOG.warning(
1982 "A relative path for filename was specified (%s) which exists relative to cwd. "
1983 "Additionally, the file exists relative to the given search directory (%s). "
1984 "Using the export file in the given directory.",
1985 filename,
1986 potential,
1987 )
1988 # Given they specified an explicit directory and that
1989 # directory has the export file in it, assume that that
1990 # is what was meant despite the file in cwd.
1991 filename = potential
1992 elif exists_in_dir:
1993 filename = potential
1994 elif not exists_in_cwd and not exists_in_dir:
1995 # Raise early.
1996 raise FileNotFoundError(
1997 f"Export file could not be found in {filename.abspath()} or {potential.abspath()}."
1998 )
1999 elif format is None:
2000 format = ".yaml"
2001 BackendClass: type[RepoImportBackend] = get_class_of(
2002 self._config["repo_transfer_formats"][format]["import"]
2003 )
2005 def doImport(importStream: TextIO | ResourceHandleProtocol) -> None:
2006 with self._caching_context():
2007 backend = BackendClass(importStream, self) # type: ignore[call-arg]
2008 backend.register()
2009 with self.transaction():
2010 backend.load(
2011 datastore=self._datastore if not without_datastore else None,
2012 directory=directory,
2013 transfer=transfer,
2014 skip_dimensions=skip_dimensions,
2015 record_validation_info=record_validation_info,
2016 )
2018 if isinstance(filename, ResourcePath):
2019 # We can not use open() here at the moment because of
2020 # DM-38589 since yaml does stream.read(8192) in a loop.
2021 stream = io.StringIO(filename.read().decode())
2022 doImport(stream)
2023 else:
2024 doImport(filename) # type: ignore
2026 def transfer_dimension_records_from(
2027 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate]
2028 ) -> None:
2029 # Allowed dimensions in the target butler.
2030 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table)
2032 data_ids = {ref.dataId for ref in source_refs}
2034 dimension_records = self._extract_all_dimension_records_from_data_ids(
2035 source_butler, data_ids, elements
2036 )
2038 # Insert order is important.
2039 for element in self.dimensions.sorted(dimension_records.keys()):
2040 records = [r for r in dimension_records[element].values()]
2041 # Assume that if the record is already present that we can
2042 # use it without having to check that the record metadata
2043 # is consistent.
2044 self._registry.insertDimensionData(element, *records, skip_existing=True)
2045 _LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records))
2047 def _extract_all_dimension_records_from_data_ids(
2048 self,
2049 source_butler: LimitedButler | Butler,
2050 data_ids: set[DataCoordinate],
2051 allowed_elements: frozenset[DimensionElement],
2052 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
2053 primary_records = self._extract_dimension_records_from_data_ids(
2054 source_butler, data_ids, allowed_elements
2055 )
2057 additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
2058 for original_element, record_mapping in primary_records.items():
2059 # Get dimensions that depend on this dimension.
2060 populated_by = self.dimensions.get_elements_populated_by(
2061 self.dimensions[original_element.name] # type: ignore
2062 )
2063 if populated_by:
2064 for element in populated_by:
2065 if element not in allowed_elements:
2066 continue
2067 if element.name == original_element.name:
2068 continue
2070 if element.name in primary_records:
2071 # If this element has already been stored avoid
2072 # re-finding records since that may lead to additional
2073 # spurious records. e.g. visit is populated_by
2074 # visit_detector_region but querying
2075 # visit_detector_region by visit will return all the
2076 # detectors for this visit -- the visit dataId does not
2077 # constrain this.
2078 # To constrain the query the original dataIds would
2079 # have to be scanned.
2080 continue
2082 if record_mapping:
2083 if not isinstance(source_butler, Butler):
2084 raise RuntimeError(
2085 f"Transferring populated_by records like {element.name}"
2086 " requires a full Butler."
2087 )
2089 with source_butler.query() as query:
2090 records = query.join_data_coordinates(record_mapping.keys()).dimension_records(
2091 element.name
2092 )
2093 for record in records:
2094 additional_records[record.definition].setdefault(record.dataId, record)
2096 # The next step is to walk back through the additional records to
2097 # pick up any missing content (such as visit_definition needing to
2098 # know the exposure). Want to ensure we do not request records we
2099 # already have.
2100 missing_data_ids = set()
2101 for record_mapping in additional_records.values():
2102 for data_id in record_mapping.keys():
2103 for dimension in data_id.dimensions.required:
2104 element = source_butler.dimensions[dimension]
2105 dimension_key = data_id.subset(dimension)
2106 if dimension_key not in primary_records[element]:
2107 missing_data_ids.add(dimension_key)
2109 # Fill out the new records. Assume that these new records do not
2110 # also need to carry over additional populated_by records.
2111 secondary_records = self._extract_dimension_records_from_data_ids(
2112 source_butler, missing_data_ids, allowed_elements
2113 )
2115 # Merge the extra sets of records in with the original.
2116 for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()):
2117 primary_records[name].update(record_mapping)
2119 return primary_records
2121 def _extract_dimension_records_from_data_ids(
2122 self,
2123 source_butler: LimitedButler | Butler,
2124 data_ids: Iterable[DataCoordinate],
2125 allowed_elements: frozenset[DimensionElement],
2126 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
2127 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
2129 data_ids = set(data_ids)
2130 if not all(data_id.hasRecords() for data_id in data_ids):
2131 if isinstance(source_butler, Butler):
2132 data_ids = source_butler._expand_data_ids(data_ids)
2133 else:
2134 raise TypeError("Input butler needs to be a full butler to expand DataId.")
2136 for data_id in data_ids:
2137 # If this butler doesn't know about a dimension in the source
2138 # butler things will break later.
2139 for element_name in data_id.dimensions.elements:
2140 record = data_id.records[element_name]
2141 if record is not None and record.definition in allowed_elements:
2142 dimension_records[record.definition].setdefault(record.dataId, record)
2144 return dimension_records
2146 def _cast_universe_for_import_refs(
2147 self, source_refs: Iterable[DatasetRef]
2148 ) -> Mapping[DatasetType, list[DatasetRef]]:
2149 """Try to cast imported refs to the target universe if possible.
2151 Parameters
2152 ----------
2153 source_refs
2154 The refs to be imported.
2156 Returns
2157 -------
2158 refs
2159 The refs to be imported, grouped by dataset type, with the dataset
2160 types cast to the target universe.
2162 Raises
2163 ------
2164 InconsistentUniverseError
2165 Raised if any reference cannot be converted to target universe.
2167 Notes
2168 -----
2169 Potentially this method can perform a non-trivial migrations of the
2170 datasets by modifying dimensions and dataIds. Presently though it can
2171 only perform a trivial validation of the dataset types compatibility.
2172 Returned mapping will contain dataset types in the new universe, but
2173 returned references will still have the original dataset types as
2174 there is presently no easy way to replace dataset type in a reference.
2175 """
2176 # In theory input refs could come from multiple universes, but in
2177 # practice this will not happen, so just group everything by dataset
2178 # type.
2179 refs_by_source_type: defaultdict[DatasetType, list[DatasetRef]] = defaultdict(list)
2180 for ref in source_refs:
2181 refs_by_source_type[ref.datasetType].append(ref)
2183 refs_by_type: defaultdict[DatasetType, list[DatasetRef]] = defaultdict(list)
2184 for source_type, refs in refs_by_source_type.items():
2185 source_universe = source_type.dimensions.universe
2186 if source_universe is self.dimensions:
2187 target_type = source_type
2188 else:
2189 if source_universe.namespace != self.dimensions.namespace:
2190 raise InconsistentUniverseError(
2191 f"Source refs have universe {source_universe} with different namespace "
2192 f"than target universe {self.dimensions}."
2193 )
2195 # Try to handle case of different universe versions. For now
2196 # we can only do trivial check that dimension groups are
2197 # identical.
2198 try:
2199 target_dimensions = self.dimensions.conform(source_type.dimensions.names)
2200 except Exception as exc:
2201 raise InconsistentUniverseError(
2202 f"Source dimensions {source_type.dimensions} are not compatible with "
2203 f"target universe dimensions {self.dimensions}."
2204 ) from exc
2205 if target_dimensions != source_type.dimensions:
2206 raise InconsistentUniverseError(
2207 f"Source dimensions {source_type.dimensions} are different from a conforming "
2208 f"set of target universe dimensions {target_dimensions}."
2209 )
2211 # Rebuild dataset type in new universe.
2212 target_type = DatasetType(
2213 name=source_type.name,
2214 dimensions=target_dimensions,
2215 storageClass=source_type.storageClass,
2216 parentStorageClass=source_type.parentStorageClass,
2217 universe=self.dimensions,
2218 isCalibration=source_type.isCalibration(),
2219 )
2220 refs_by_type[target_type] = refs
2222 return refs_by_type
2224 def _prepare_for_import_refs(
2225 self,
2226 source_butler: LimitedButler,
2227 source_refs: Iterable[DatasetRef],
2228 *,
2229 register_dataset_types: bool = False,
2230 transfer_dimensions: bool = False,
2231 dry_run: bool = False,
2232 ) -> _ImportDatasetsInfo:
2233 # Docstring inherited.
2234 if not self.isWriteable() and not dry_run:
2235 raise TypeError("Butler is read-only.")
2237 # Will iterate through the refs multiple times so need to convert
2238 # to a list if this isn't a collection.
2239 if not isinstance(source_refs, collections.abc.Collection):
2240 source_refs = list(source_refs)
2242 original_count = len(source_refs)
2243 log_level = _LOG.INFO if original_count > 1 else _LOG.VERBOSE
2244 _LOG.log(
2245 log_level,
2246 "Importing %d dataset%s into %s",
2247 original_count,
2248 "s" if original_count != 1 else "",
2249 str(self),
2250 )
2252 refs_by_type = self._cast_universe_for_import_refs(source_refs)
2254 # Importing requires that we group the refs by dimension group and run
2255 # before doing the import.
2256 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]] = defaultdict(list)
2257 for ref in source_refs:
2258 grouped_refs[_RefGroup(ref.datasetType.dimensions, ref.run)].append(ref)
2260 # Check to see if the dataset type in the source butler has
2261 # the same definition in the target butler and register missing
2262 # ones if requested. Registration must happen outside a transaction.
2263 newly_registered_dataset_types = set()
2264 for datasetType in refs_by_type:
2265 if register_dataset_types:
2266 # Let this raise immediately if inconsistent. Continuing
2267 # on to find additional inconsistent dataset types
2268 # might result in additional unwanted dataset types being
2269 # registered.
2270 try:
2271 if not dry_run and self._registry.registerDatasetType(datasetType):
2272 newly_registered_dataset_types.add(datasetType)
2273 except ConflictingDefinitionError as e:
2274 # Be safe and require that conversions be bidirectional
2275 # when there are storage class mismatches. This is because
2276 # get() will have to support conversion from source to
2277 # target python type (the source formatter will be
2278 # returning source python type) but there also is an
2279 # expectation that people will want to be able to get() in
2280 # the target using the source python type, which will not
2281 # require conversion for transferred datasets but might
2282 # for target-native types. Additionally, butler.get does
2283 # not know that the formatter will return the wrong
2284 # python type and so will always check that the conversion
2285 # works even though it won't need it.
2286 target_dataset_type = self.get_dataset_type(datasetType.name)
2287 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType)
2288 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type)
2289 if not (target_compatible_with_source and source_compatible_with_target):
2290 if target_compatible_with_source:
2291 e.add_note(
2292 "Target dataset type storage class is compatible with source "
2293 "but the reverse is not true."
2294 )
2295 elif source_compatible_with_target:
2296 e.add_note(
2297 "Source dataset type storage class is compatible with target "
2298 "but the reverse is not true."
2299 )
2300 else:
2301 e.add_note("If storage classes differ, please register converters.")
2302 raise
2303 else:
2304 # If the dataset type is missing, let it fail immediately.
2305 target_dataset_type = self.get_dataset_type(datasetType.name)
2306 if target_dataset_type != datasetType:
2307 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType)
2308 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type)
2309 # Both conversion directions are currently required.
2310 if not (target_compatible_with_source and source_compatible_with_target):
2311 msg = ""
2312 if target_compatible_with_source:
2313 msg = (
2314 "Target storage class is compatible with the source storage class "
2315 "but the reverse is not true."
2316 )
2317 elif source_compatible_with_target:
2318 msg = (
2319 "Source storage class is compatible with the target storage class"
2320 " but the reverse is not true."
2321 )
2322 else:
2323 msg = "If storage classes differ register converters."
2324 if msg:
2325 msg = f"({msg})"
2326 raise ConflictingDefinitionError(
2327 "Source butler dataset type differs from definition"
2328 f" in target butler: {datasetType} !="
2329 f" {target_dataset_type} {msg}"
2330 )
2331 if newly_registered_dataset_types:
2332 # We may have registered some even if there were inconsistencies
2333 # but should let people know (or else remove them again).
2334 _LOG.verbose(
2335 "Registered the following dataset types in the target Butler: %s",
2336 ", ".join(d.name for d in newly_registered_dataset_types),
2337 )
2338 else:
2339 _LOG.verbose("All required dataset types are known to the target Butler")
2341 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
2342 if transfer_dimensions:
2343 # Collect all the dimension records for these refs.
2344 # All dimensions are to be copied but the list of valid dimensions
2345 # come from this butler's universe.
2346 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table)
2347 dataIds = {ref.dataId for ref in source_refs}
2348 dimension_records = self._extract_all_dimension_records_from_data_ids(
2349 source_butler, dataIds, elements
2350 )
2351 return _ImportDatasetsInfo(grouped_refs, dimension_records)
2353 def _import_dimension_records(
2354 self,
2355 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]],
2356 *,
2357 dry_run: bool,
2358 ) -> None:
2359 """Import dimension records collected during import pre-process."""
2360 if dimension_records and not dry_run:
2361 _LOG.verbose("Ensuring that dimension records exist for transferred datasets.")
2362 # Order matters.
2363 for element in self.dimensions.sorted(dimension_records.keys()):
2364 records = list(dimension_records[element].values())
2365 # Assume that if the record is already present that we can
2366 # use it without having to check that the record metadata
2367 # is consistent.
2368 self._registry.insertDimensionData(element, *records, skip_existing=True)
2370 def _import_grouped_refs(
2371 self,
2372 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]],
2373 source_butler: LimitedButler | None,
2374 progress: Progress,
2375 *,
2376 dry_run: bool = False,
2377 expand_refs: bool = False,
2378 ) -> list[DatasetRef]:
2379 handled_collections: set[str] = set()
2380 n_to_import = 0
2381 all_imported_refs: list[DatasetRef] = []
2382 # Sort by run collection name to ensure Postgres takes locks in the
2383 # same order between different processes, to mitigate an issue
2384 # where Postgres can deadlock due to the unique index on collection
2385 # name. (See DM-47543).
2386 groups = sorted(grouped_refs.items(), key=lambda item: item[0].run)
2387 for (dimension_group, run), refs_to_import in progress.iter_item_chunks(
2388 groups, desc="Importing to registry by run and dataset type"
2389 ):
2390 if run not in handled_collections:
2391 # May need to create output collection. If source butler
2392 # has a registry, ask for documentation string.
2393 run_doc = None
2394 if source_butler is not None and (registry := getattr(source_butler, "registry", None)):
2395 run_doc = registry.getCollectionDocumentation(run)
2396 if not dry_run:
2397 registered = self.collections.register(run, doc=run_doc)
2398 else:
2399 registered = True
2400 handled_collections.add(run)
2401 if registered:
2402 _LOG.verbose("Creating output run %s", run)
2404 n_refs = len(refs_to_import)
2405 n_to_import += n_refs
2406 _LOG.verbose(
2407 "Importing %d ref%s with dimensions %s into run %s",
2408 n_refs,
2409 "" if n_refs == 1 else "s",
2410 dimension_group.names,
2411 run,
2412 )
2414 # Assume we are using UUIDs and the source refs will match
2415 # those imported.
2416 if not dry_run:
2417 imported_refs = self._registry._importDatasets(refs_to_import, expand=expand_refs)
2418 else:
2419 imported_refs = refs_to_import
2421 all_imported_refs.extend(imported_refs)
2423 assert n_to_import == len(all_imported_refs)
2424 _LOG.verbose("Imported %d datasets into destination butler", n_to_import)
2425 return all_imported_refs
2427 def transfer_from(
2428 self,
2429 source_butler: LimitedButler,
2430 source_refs: Iterable[DatasetRef],
2431 transfer: str = "auto",
2432 skip_missing: bool = True,
2433 register_dataset_types: bool = False,
2434 transfer_dimensions: bool = False,
2435 dry_run: bool = False,
2436 ) -> collections.abc.Collection[DatasetRef]:
2437 # Docstring inherited.
2438 source_refs = list(source_refs)
2439 if not self.isWriteable() and not dry_run:
2440 raise TypeError("Butler is read-only.")
2442 progress = Progress("lsst.daf.butler.Butler.transfer_from", level=VERBOSE)
2444 artifact_existence: dict[ResourcePath, bool] = {}
2445 file_transfer_source = source_butler._file_transfer_source
2446 transfer_records = retrieve_file_transfer_records(
2447 file_transfer_source, source_refs, artifact_existence
2448 )
2449 # In some situations the datastore artifact may be missing and we do
2450 # not want that registry entry to be imported. For example, this can
2451 # happen if a file was removed but the dataset was left in the registry
2452 # for provenance, or if a pipeline task didn't create all of the
2453 # possible files in a QuantumBackedButler.
2454 if skip_missing:
2455 original_ids = {ref.id for ref in source_refs}
2456 missing_ids = original_ids - transfer_records.keys()
2457 if missing_ids:
2458 original_count = len(source_refs)
2459 source_refs = [ref for ref in source_refs if ref.id not in missing_ids]
2460 filtered_count = len(source_refs)
2461 n_missing = original_count - filtered_count
2462 _LOG.verbose(
2463 "%d dataset%s removed because the artifact does not exist. Now have %d.",
2464 n_missing,
2465 "" if n_missing == 1 else "s",
2466 filtered_count,
2467 )
2469 import_info = self._prepare_for_import_refs(
2470 source_butler,
2471 source_refs,
2472 register_dataset_types=register_dataset_types,
2473 dry_run=dry_run,
2474 transfer_dimensions=transfer_dimensions,
2475 )
2477 # Do all the importing in a single transaction.
2478 with self.transaction():
2479 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run)
2480 imported_refs = self._import_grouped_refs(
2481 import_info.grouped_refs, source_butler, progress, dry_run=dry_run
2482 )
2484 # Ask the datastore to transfer. The datastore has to check that
2485 # the source datastore is compatible with the target datastore.
2486 _LOG.verbose("Transferring %d datasets from %s", len(transfer_records), file_transfer_source.name)
2487 accepted, rejected = self._datastore.transfer_from(
2488 transfer_records,
2489 imported_refs,
2490 transfer=transfer,
2491 artifact_existence=artifact_existence,
2492 dry_run=dry_run,
2493 )
2494 if rejected:
2495 # For now, accept the registry entries but not the files.
2496 _LOG.warning(
2497 "%d datasets were rejected and %d accepted for transfer.",
2498 len(rejected),
2499 len(accepted),
2500 )
2502 return imported_refs
2504 def validateConfiguration(
2505 self,
2506 logFailures: bool = False,
2507 datasetTypeNames: Iterable[str] | None = None,
2508 ignore: Iterable[str] | None = None,
2509 ) -> None:
2510 # Docstring inherited.
2511 if datasetTypeNames:
2512 datasetTypes = [self.get_dataset_type(name) for name in datasetTypeNames]
2513 else:
2514 datasetTypes = list(self._registry.queryDatasetTypes())
2516 # filter out anything from the ignore list
2517 if ignore:
2518 ignore = set(ignore)
2519 datasetTypes = [
2520 e for e in datasetTypes if e.name not in ignore and e.nameAndComponent()[0] not in ignore
2521 ]
2522 else:
2523 ignore = set()
2525 # For each datasetType that has an instrument dimension, create
2526 # a DatasetRef for each defined instrument
2527 datasetRefs = []
2529 # Find all the registered instruments (if "instrument" is in the
2530 # universe).
2531 instruments: set[str] = set()
2532 if "instrument" in self.dimensions:
2533 instruments = {rec.name for rec in self.query_dimension_records("instrument", explain=False)}
2535 for datasetType in datasetTypes:
2536 if "instrument" in datasetType.dimensions:
2537 # In order to create a conforming dataset ref, create
2538 # fake DataCoordinate values for the non-instrument
2539 # dimensions. The type of the value does not matter here.
2540 dataId = {dim: 1 for dim in datasetType.dimensions.names if dim != "instrument"}
2542 for instrument in instruments:
2543 datasetRef = DatasetRef(
2544 datasetType,
2545 DataCoordinate.standardize(
2546 dataId, instrument=instrument, dimensions=datasetType.dimensions
2547 ),
2548 run="validate",
2549 )
2550 datasetRefs.append(datasetRef)
2552 entities: list[DatasetType | DatasetRef] = []
2553 entities.extend(datasetTypes)
2554 entities.extend(datasetRefs)
2556 datastoreErrorStr = None
2557 try:
2558 self._datastore.validateConfiguration(entities, logFailures=logFailures)
2559 except ValidationError as e:
2560 datastoreErrorStr = str(e)
2562 # Also check that the LookupKeys used by the datastores match
2563 # registry and storage class definitions
2564 keys = self._datastore.getLookupKeys()
2566 failedNames = set()
2567 failedDataId = set()
2568 for key in keys:
2569 if key.name is not None:
2570 if key.name in ignore:
2571 continue
2573 # skip if specific datasetType names were requested and this
2574 # name does not match
2575 if datasetTypeNames and key.name not in datasetTypeNames:
2576 continue
2578 # See if it is a StorageClass or a DatasetType
2579 if key.name in self.storageClasses:
2580 pass
2581 else:
2582 try:
2583 self.get_dataset_type(key.name)
2584 except KeyError:
2585 if logFailures:
2586 _LOG.critical(
2587 "Key '%s' does not correspond to a DatasetType or StorageClass", key
2588 )
2589 failedNames.add(key)
2590 else:
2591 # Dimensions are checked for consistency when the Butler
2592 # is created and rendezvoused with a universe.
2593 pass
2595 # Check that the instrument is a valid instrument
2596 # Currently only support instrument so check for that
2597 if key.dataId:
2598 dataIdKeys = set(key.dataId)
2599 if {"instrument"} != dataIdKeys:
2600 if logFailures:
2601 _LOG.critical("Key '%s' has unsupported DataId override", key)
2602 failedDataId.add(key)
2603 elif key.dataId["instrument"] not in instruments:
2604 if logFailures:
2605 _LOG.critical("Key '%s' has unknown instrument", key)
2606 failedDataId.add(key)
2608 messages = []
2610 if datastoreErrorStr:
2611 messages.append(datastoreErrorStr)
2613 for failed, msg in (
2614 (failedNames, "Keys without corresponding DatasetType or StorageClass entry: "),
2615 (failedDataId, "Keys with bad DataId entries: "),
2616 ):
2617 if failed:
2618 msg += ", ".join(str(k) for k in failed)
2619 messages.append(msg)
2621 if messages:
2622 raise ValidationError(";\n".join(messages))
2624 @property
2625 @deprecated(
2626 "Please use 'collections' instead. collection_chains will be removed after v28.",
2627 version="v28",
2628 category=FutureWarning,
2629 )
2630 def collection_chains(self) -> DirectButlerCollections:
2631 """Object with methods for modifying collection chains."""
2632 return DirectButlerCollections(self._registry)
2634 @property
2635 def collections(self) -> DirectButlerCollections:
2636 """Object with methods for modifying and inspecting collections."""
2637 return DirectButlerCollections(self._registry)
2639 @property
2640 def run(self) -> str | None:
2641 """Name of the run this butler writes outputs to by default (`str` or
2642 `None`).
2644 This is an alias for ``self.registry.defaults.run``. It cannot be set
2645 directly in isolation, but all defaults may be changed together by
2646 assigning a new `RegistryDefaults` instance to
2647 ``self.registry.defaults``.
2648 """
2649 return self._registry.defaults.run
2651 @property
2652 def registry(self) -> Registry:
2653 """The object that manages dataset metadata and relationships
2654 (`Registry`).
2656 Many operations that don't involve reading or writing butler datasets
2657 are accessible only via `Registry` methods. Eventually these methods
2658 will be replaced by equivalent `Butler` methods.
2659 """
2660 return RegistryShim(self)
2662 @property
2663 def dimensions(self) -> DimensionUniverse:
2664 # Docstring inherited.
2665 return self._registry.dimensions
2667 def query(self) -> contextlib.AbstractContextManager[Query]:
2668 # Docstring inherited.
2669 return self._registry._query()
2671 def _query_driver(
2672 self,
2673 default_collections: Iterable[str],
2674 default_data_id: DataCoordinate,
2675 ) -> contextlib.AbstractContextManager[DirectQueryDriver]:
2676 """Set up a QueryDriver instance for use with this Butler. Although
2677 this is marked as a private method, it is also used by Butler server.
2678 """
2679 return self._registry._query_driver(default_collections, default_data_id)
2681 @contextlib.contextmanager
2682 def _query_all_datasets_by_page(
2683 self, args: QueryAllDatasetsParameters
2684 ) -> Iterator[Iterator[list[DatasetRef]]]:
2685 with self.query() as query:
2686 pages = query_all_datasets(self, query, args)
2687 yield iter(page.data for page in pages)
2689 def _preload_cache(self, *, load_dimension_record_cache: bool = True) -> None:
2690 """Immediately load caches that are used for common operations."""
2691 self._registry.preload_cache(load_dimension_record_cache=load_dimension_record_cache)
2693 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]:
2694 return self._registry.expand_data_ids(data_ids)
2696 _config: ButlerConfig
2697 """Configuration for this Butler instance."""
2699 _registry: SqlRegistry
2700 """The object that manages dataset metadata and relationships
2701 (`SqlRegistry`).
2703 Most operations that don't involve reading or writing butler datasets are
2704 accessible only via `SqlRegistry` methods.
2705 """
2707 storageClasses: StorageClassFactory
2708 """An object that maps known storage class names to objects that fully
2709 describe them (`StorageClassFactory`).
2710 """
2712 _closed: bool
2713 """`True` if close() has already been called on this instance; `False`
2714 otherwise.
2715 """
2718class _RefGroup(NamedTuple):
2719 """Key identifying a batch of DatasetRefs to be inserted in
2720 `Butler.transfer_from`.
2721 """
2723 dimensions: DimensionGroup
2724 run: str
2727class _ImportDatasetsInfo(NamedTuple):
2728 """Information extracted from datasets to be imported."""
2730 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]]
2731 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]
2734def _to_uuid(id: DatasetId | str) -> uuid.UUID:
2735 if isinstance(id, uuid.UUID):
2736 return id
2737 else:
2738 return uuid.UUID(id)
2741class _ButlerClosed:
2742 def __getattr__(self, name: str) -> Any:
2743 raise RuntimeError("Attempted to use a Butler instance which has been closed.")
2746_BUTLER_CLOSED_INSTANCE: Any = _ButlerClosed()
2749def _retrieve_dataset_type(registry: SqlRegistry, name: str) -> DatasetType | None:
2750 """Return DatasetType defined in registry given dataset type name."""
2751 try:
2752 return registry.getDatasetType(name)
2753 except MissingDatasetTypeError:
2754 return None