Coverage for python/lsst/daf/butler/_dataset_ref.py: 29%
325 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:15 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = [
30 "AmbiguousDatasetError",
31 "DatasetDatastoreRecords",
32 "DatasetId",
33 "DatasetIdFactory",
34 "DatasetIdGenEnum",
35 "DatasetRef",
36 "SerializedDatasetRef",
37 "SerializedDatasetRefContainerV1",
38 "SerializedDatasetRefContainers",
39]
41import enum
42import logging
43import sys
44import uuid
45from collections.abc import Callable, Iterable, Mapping
46from typing import (
47 TYPE_CHECKING,
48 Annotated,
49 Any,
50 ClassVar,
51 Literal,
52 Protocol,
53 Self,
54 TypeAlias,
55 cast,
56 runtime_checkable,
57)
59import pydantic
60from pydantic import StrictStr
62from lsst.utils.classes import immutable
64from ._config_support import LookupKey
65from ._dataset_type import DatasetType, SerializedDatasetType
66from ._exceptions import InconsistentUniverseError
67from ._named import NamedKeyDict
68from ._uuid import generate_uuidv7
69from .datastore.stored_file_info import StoredDatastoreItemInfo
70from .dimensions import (
71 DataCoordinate,
72 DimensionDataAttacher,
73 DimensionDataExtractor,
74 DimensionGroup,
75 DimensionUniverse,
76 SerializableDimensionData,
77 SerializedDataCoordinate,
78 SerializedDataId,
79)
80from .json import from_json_pydantic, to_json_pydantic
81from .persistence_context import PersistenceContextVars
83if TYPE_CHECKING:
84 from ._storage_class import StorageClass
85 from .registry import Registry
87# Per-dataset records grouped by opaque table name, usually there is just one
88# opaque table.
89DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]]
92_LOG = logging.getLogger(__name__)
95class AmbiguousDatasetError(Exception):
96 """Raised when a `DatasetRef` is not resolved but should be.
98 This happens when the `DatasetRef` has no ID or run but the requested
99 operation requires one of them.
100 """
103@runtime_checkable
104class _DatasetRefGroupedIterable(Protocol):
105 """A package-private interface for iterables of `DatasetRef` that know how
106 to efficiently group their contents by `DatasetType`.
108 """
110 def _iter_by_dataset_type(self) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
111 """Iterate over `DatasetRef` instances, one `DatasetType` at a time.
113 Returns
114 -------
115 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
116 `~collections.abc.Iterable` [ `DatasetRef` ]
117 An iterable of tuples, in which the first element is a dataset type
118 and the second is an iterable of `DatasetRef` objects with exactly
119 that dataset type.
120 """
121 ...
124class DatasetIdGenEnum(enum.Enum):
125 """Enum used to specify dataset ID generation options."""
127 UNIQUE = 0
128 """Unique mode generates unique ID for each inserted dataset, e.g.
129 auto-generated by database or random UUID.
130 """
132 DATAID_TYPE = 1
133 """In this mode ID is computed deterministically from a combination of
134 dataset type and dataId.
135 """
137 DATAID_TYPE_RUN = 2
138 """In this mode ID is computed deterministically from a combination of
139 dataset type, dataId, and run collection name.
140 """
143class DatasetIdFactory:
144 """Factory for dataset IDs (UUIDs).
146 For now the logic is hard-coded and is controlled by the user-provided
147 value of `DatasetIdGenEnum`. In the future we may implement a configurable
148 logic that can guess `DatasetIdGenEnum` value from other parameters.
149 """
151 NS_UUID = uuid.UUID("840b31d9-05cd-5161-b2c8-00d32b280d0f")
152 """Namespace UUID used for UUID5 generation. Do not change. This was
153 produced by ``uuid.uuid5(uuid.NAMESPACE_DNS, "lsst.org")``.
154 """
156 def makeDatasetId(
157 self,
158 run: str,
159 datasetType: DatasetType,
160 dataId: DataCoordinate,
161 idGenerationMode: DatasetIdGenEnum,
162 ) -> uuid.UUID:
163 """Generate dataset ID for a dataset.
165 Parameters
166 ----------
167 run : `str`
168 Name of the RUN collection for the dataset.
169 datasetType : `DatasetType`
170 Dataset type.
171 dataId : `DataCoordinate`
172 Expanded data ID for the dataset.
173 idGenerationMode : `DatasetIdGenEnum`
174 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random
175 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a
176 deterministic UUID5-type ID based on a dataset type name and
177 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a
178 deterministic UUID5-type ID based on a dataset type name, run
179 collection name, and ``dataId``.
181 Returns
182 -------
183 datasetId : `uuid.UUID`
184 Dataset identifier.
185 """
186 if idGenerationMode is DatasetIdGenEnum.UNIQUE:
187 # Earlier versions of this code used UUIDv4. However, totally
188 # random IDs create problems for Postgres insert performance,
189 # because it scatters index updates randomly around the disk.
190 # UUIDv7 has similar uniqueness properties to v4, but IDs generated
191 # at the same time are close together in the index.
192 return generate_uuidv7()
193 else:
194 # WARNING: If you modify this code make sure that the order of
195 # items in the `items` list below never changes.
196 items: list[tuple[str, str]] = []
197 if idGenerationMode is DatasetIdGenEnum.DATAID_TYPE:
198 items = [
199 ("dataset_type", datasetType.name),
200 ]
201 elif idGenerationMode is DatasetIdGenEnum.DATAID_TYPE_RUN:
202 items = [
203 ("dataset_type", datasetType.name),
204 ("run", run),
205 ]
206 else:
207 raise ValueError(f"Unexpected ID generation mode: {idGenerationMode}")
209 for name, value in sorted(dataId.required.items()):
210 items.append((name, str(value)))
211 data = ",".join(f"{key}={value}" for key, value in items)
212 return uuid.uuid5(self.NS_UUID, data)
215# This is constant, so don't recreate a set for each instance
216_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"}
219class SerializedDatasetRef(pydantic.BaseModel):
220 """Simplified model of a `DatasetRef` suitable for serialization."""
222 id: uuid.UUID
223 datasetType: SerializedDatasetType | None = None
224 dataId: SerializedDataCoordinate | None = None
225 run: StrictStr | None = None
226 component: StrictStr | None = None
228 # Can not use "after" validator since in some cases the validator
229 # seems to trigger with the datasetType field not yet set.
230 @pydantic.model_validator(mode="before") # type: ignore[attr-defined]
231 @classmethod
232 def check_consistent_parameters(cls, data: dict[str, Any]) -> dict[str, Any]:
233 has_datasetType = data.get("datasetType") is not None
234 has_dataId = data.get("dataId") is not None
235 if has_datasetType is not has_dataId:
236 raise ValueError("If specifying datasetType or dataId, must specify both.")
238 if data.get("component") is not None and has_datasetType:
239 raise ValueError("datasetType can not be set if component is given.")
240 return data
242 @classmethod
243 def direct(
244 cls,
245 *,
246 id: str,
247 run: str,
248 datasetType: dict[str, Any] | None = None,
249 dataId: dict[str, Any] | None = None,
250 component: str | None = None,
251 ) -> SerializedDatasetRef:
252 """Construct a `SerializedDatasetRef` directly without validators.
254 Parameters
255 ----------
256 id : `str`
257 The UUID in string form.
258 run : `str`
259 The run for this dataset.
260 datasetType : `dict` [`str`, `typing.Any`]
261 A representation of the dataset type.
262 dataId : `dict` [`str`, `typing.Any`]
263 A representation of the data ID.
264 component : `str` or `None`
265 Any component associated with this ref.
267 Returns
268 -------
269 serialized : `SerializedDatasetRef`
270 A Pydantic model representing the given parameters.
272 Notes
273 -----
274 This differs from the pydantic "construct" method in that the arguments
275 are explicitly what the model requires, and it will recurse through
276 members, constructing them from their corresponding `direct` methods.
278 The ``id`` parameter is a string representation of dataset ID, it is
279 converted to UUID by this method.
281 This method should only be called when the inputs are trusted.
282 """
283 serialized_datasetType = (
284 SerializedDatasetType.direct(**datasetType) if datasetType is not None else None
285 )
286 serialized_dataId = SerializedDataCoordinate.direct(**dataId) if dataId is not None else None
288 node = cls.model_construct(
289 _fields_set=_serializedDatasetRefFieldsSet,
290 id=uuid.UUID(id),
291 datasetType=serialized_datasetType,
292 dataId=serialized_dataId,
293 run=sys.intern(run),
294 component=component,
295 )
297 return node
300DatasetId: TypeAlias = uuid.UUID
301"""A type-annotation alias for dataset ID providing typing flexibility.
302"""
305@immutable
306class DatasetRef:
307 """Reference to a Dataset in a `Registry`.
309 A `DatasetRef` may point to a Dataset that currently does not yet exist
310 (e.g., because it is a predicted input for provenance).
312 Parameters
313 ----------
314 datasetType : `DatasetType`
315 The `DatasetType` for this Dataset.
316 dataId : `DataCoordinate`
317 A mapping of dimensions that labels the Dataset within a Collection.
318 run : `str`
319 The name of the run this dataset was associated with when it was
320 created.
321 id : `DatasetId`, optional
322 The unique identifier assigned when the dataset is created. If ``id``
323 is not specified, a new unique ID will be created.
324 conform : `bool`, optional
325 If `True` (default), call `DataCoordinate.standardize` to ensure that
326 the data ID's dimensions are consistent with the dataset type's.
327 `DatasetRef` instances for which those dimensions are not equal should
328 not be created in new code, but are still supported for backwards
329 compatibility. New code should only pass `False` if it can guarantee
330 that the dimensions are already consistent.
331 id_generation_mode : `DatasetIdGenEnum`
332 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random
333 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a
334 deterministic UUID5-type ID based on a dataset type name and
335 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a
336 deterministic UUID5-type ID based on a dataset type name, run
337 collection name, and ``dataId``.
338 datastore_records : `DatasetDatastoreRecords` or `None`
339 Datastore records to attach.
341 Notes
342 -----
343 See also :ref:`daf_butler_organizing_datasets`
344 """
346 _serializedType: ClassVar[type[pydantic.BaseModel]] = SerializedDatasetRef
347 __slots__ = (
348 "_id",
349 "datasetType",
350 "dataId",
351 "run",
352 "_datastore_records",
353 )
355 def __init__(
356 self,
357 datasetType: DatasetType,
358 dataId: DataCoordinate,
359 run: str,
360 *,
361 id: DatasetId | None = None,
362 conform: bool = True,
363 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
364 datastore_records: DatasetDatastoreRecords | None = None,
365 ):
366 self.datasetType = datasetType
367 if conform:
368 self.dataId = DataCoordinate.standardize(dataId, dimensions=datasetType.dimensions)
369 else:
370 self.dataId = dataId
371 self.run = run
372 if id is not None:
373 self._id = id.int
374 else:
375 self._id = (
376 DatasetIdFactory()
377 .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode)
378 .int
379 )
380 self._datastore_records = datastore_records
382 @property
383 def id(self) -> DatasetId:
384 """Primary key of the dataset (`DatasetId`).
386 Cannot be changed after a `DatasetRef` is constructed.
387 """
388 return uuid.UUID(int=self._id)
390 def __eq__(self, other: Any) -> bool:
391 try:
392 return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id)
393 except AttributeError:
394 return NotImplemented
396 def __hash__(self) -> int:
397 return hash((self.datasetType, self.dataId, self.id))
399 @property
400 def dimensions(self) -> DimensionGroup:
401 """Dimensions associated with the underlying `DatasetType`."""
402 return self.datasetType.dimensions
404 def __repr__(self) -> str:
405 # We delegate to __str__ (i.e use "!s") for the data ID) below because
406 # DataCoordinate's __repr__ - while adhering to the guidelines for
407 # __repr__ - is much harder to users to read, while its __str__ just
408 # produces a dict that can also be passed to DatasetRef's constructor.
409 return f"DatasetRef({self.datasetType!r}, {self.dataId!s}, run={self.run!r}, id={self.id})"
411 def __str__(self) -> str:
412 s = (
413 f"{self.datasetType.name}@{self.dataId!s} [sc={self.datasetType.storageClass_name}]"
414 f" (run={self.run} id={self.id})"
415 )
416 return s
418 def __lt__(self, other: Any) -> bool:
419 # Sort by run, DatasetType name and then by DataCoordinate
420 # The __str__ representation is probably close enough but we
421 # need to ensure that sorting a DatasetRef matches what you would
422 # get if you sorted DatasetType+DataCoordinate
423 if not isinstance(other, type(self)):
424 return NotImplemented
426 # Group by run if defined, takes precedence over DatasetType
427 self_run = "" if self.run is None else self.run
428 other_run = "" if other.run is None else other.run
430 # Compare tuples in the priority order
431 return (self_run, self.datasetType, self.dataId) < (other_run, other.datasetType, other.dataId)
433 def to_simple(self, minimal: bool = False) -> SerializedDatasetRef:
434 """Convert this class to a simple python type.
436 This makes it suitable for serialization.
438 Parameters
439 ----------
440 minimal : `bool`, optional
441 Use minimal serialization. Requires Registry to convert
442 back to a full type.
444 Returns
445 -------
446 simple : `dict` or `int`
447 The object converted to a dictionary.
448 """
449 if minimal:
450 # The only thing needed to uniquely define a DatasetRef is its id
451 # so that can be used directly if it is not a component DatasetRef.
452 # Store is in a dict to allow us to easily add the planned origin
453 # information later without having to support an int and dict in
454 # simple form.
455 simple: dict[str, Any] = {"id": self.id}
456 if self.isComponent():
457 # We can still be a little minimalist with a component
458 # but we will also need to record the datasetType component
459 simple["component"] = self.datasetType.component()
460 return SerializedDatasetRef(**simple)
462 return SerializedDatasetRef(
463 datasetType=self.datasetType.to_simple(minimal=minimal),
464 dataId=self.dataId.to_simple(),
465 run=self.run,
466 id=self.id,
467 )
469 @classmethod
470 def from_simple(
471 cls,
472 simple: SerializedDatasetRef,
473 universe: DimensionUniverse | None = None,
474 registry: Registry | None = None,
475 datasetType: DatasetType | None = None,
476 ) -> DatasetRef:
477 """Construct a new object from simplified form.
479 Generally this is data returned from the `to_simple` method.
481 Parameters
482 ----------
483 simple : `dict` of [`str`, `typing.Any`]
484 The value returned by `to_simple()`.
485 universe : `DimensionUniverse`
486 The special graph of all known dimensions.
487 Can be `None` if a registry is provided.
488 registry : `lsst.daf.butler.Registry`, optional
489 Registry to use to convert simple form of a DatasetRef to
490 a full `DatasetRef`. Can be `None` if a full description of
491 the type is provided along with a universe.
492 datasetType : DatasetType, optional
493 If datasetType is supplied, this will be used as the datasetType
494 object in the resulting DatasetRef instead of being read from
495 the `SerializedDatasetRef`. This is useful when many refs share
496 the same type as memory can be saved. Defaults to None.
498 Returns
499 -------
500 ref : `DatasetRef`
501 Newly-constructed object.
502 """
503 cache = PersistenceContextVars.datasetRefs.get()
504 key = simple.id.int
505 if cache is not None and (ref := cache.get(key, None)) is not None:
506 if datasetType is not None:
507 if (component := datasetType.component()) is not None:
508 ref = ref.makeComponentRef(component)
509 ref = ref.overrideStorageClass(datasetType.storageClass_name)
510 return ref
511 if simple.datasetType is not None:
512 _, component = DatasetType.splitDatasetTypeName(simple.datasetType.name)
513 if component is not None:
514 ref = ref.makeComponentRef(component)
515 if simple.datasetType.storageClass is not None:
516 ref = ref.overrideStorageClass(simple.datasetType.storageClass)
517 return ref
518 # If dataset type is not given ignore the cache, because we can't
519 # reliably return the right storage class.
520 # Minimalist component will just specify component and id and
521 # require registry to reconstruct
522 if simple.datasetType is None and simple.dataId is None and simple.run is None:
523 if registry is None:
524 raise ValueError("Registry is required to construct component DatasetRef from integer id")
525 if simple.id is None:
526 raise ValueError("For minimal DatasetRef the ID must be defined.")
527 ref = registry.getDataset(simple.id)
528 if ref is None:
529 raise RuntimeError(f"No matching dataset found in registry for id {simple.id}")
530 if simple.component:
531 ref = ref.makeComponentRef(simple.component)
532 else:
533 if universe is None:
534 if registry is None:
535 raise ValueError("One of universe or registry must be provided.")
536 universe = registry.dimensions
537 if datasetType is None:
538 if simple.datasetType is None:
539 raise ValueError("Cannot determine Dataset type of this serialized class")
540 datasetType = DatasetType.from_simple(
541 simple.datasetType, universe=universe, registry=registry
542 )
543 if simple.dataId is None:
544 # mypy
545 raise ValueError("The DataId must be specified to construct a DatasetRef")
546 dataId = DataCoordinate.from_simple(simple.dataId, universe=universe)
547 # Check that simple ref is resolved.
548 if simple.run is None:
549 dstr = ""
550 if simple.datasetType is None:
551 dstr = f" (datasetType={datasetType.name!r})"
552 raise ValueError(
553 "Run collection name is missing from serialized representation. "
554 f"Encountered with {simple!r}{dstr}."
555 )
556 ref = cls(
557 datasetType,
558 dataId,
559 id=simple.id,
560 run=simple.run,
561 )
562 if cache is not None:
563 if ref.datasetType.component() is not None:
564 cache[key] = ref.makeCompositeRef()
565 else:
566 cache[key] = ref
567 return ref
569 to_json = to_json_pydantic
570 from_json: ClassVar[Callable[..., Self]] = cast(Callable[..., Self], classmethod(from_json_pydantic))
572 @classmethod
573 def _unpickle(
574 cls,
575 datasetType: DatasetType,
576 dataId: DataCoordinate,
577 id: DatasetId,
578 run: str,
579 datastore_records: DatasetDatastoreRecords | None,
580 ) -> DatasetRef:
581 """Create new `DatasetRef`.
583 A custom factory method for use by `__reduce__` as a workaround for
584 its lack of support for keyword arguments.
585 """
586 return cls(datasetType, dataId, id=id, run=run, datastore_records=datastore_records)
588 def __reduce__(self) -> tuple:
589 return (
590 self._unpickle,
591 (self.datasetType, self.dataId, self.id, self.run, self._datastore_records),
592 )
594 def __deepcopy__(self, memo: dict) -> DatasetRef:
595 # DatasetRef is recursively immutable; see note in @immutable
596 # decorator.
597 return self
599 def expanded(self, dataId: DataCoordinate) -> DatasetRef:
600 """Return a new `DatasetRef` with the given expanded data ID.
602 Parameters
603 ----------
604 dataId : `DataCoordinate`
605 Data ID for the new `DatasetRef`. Must compare equal to the
606 original data ID.
608 Returns
609 -------
610 ref : `DatasetRef`
611 A new `DatasetRef` with the given data ID.
612 """
613 assert dataId == self.dataId
614 return DatasetRef(
615 datasetType=self.datasetType,
616 dataId=dataId,
617 id=self.id,
618 run=self.run,
619 conform=False,
620 datastore_records=self._datastore_records,
621 )
623 def isComponent(self) -> bool:
624 """Indicate whether this `DatasetRef` refers to a component.
626 Returns
627 -------
628 isComponent : `bool`
629 `True` if this `DatasetRef` is a component, `False` otherwise.
630 """
631 return self.datasetType.isComponent()
633 def isComposite(self) -> bool:
634 """Boolean indicating whether this `DatasetRef` is a composite type.
636 Returns
637 -------
638 isComposite : `bool`
639 `True` if this `DatasetRef` is a composite type, `False`
640 otherwise.
641 """
642 return self.datasetType.isComposite()
644 def _lookupNames(self) -> tuple[LookupKey, ...]:
645 """Name keys to use when looking up this DatasetRef in a configuration.
647 The names are returned in order of priority.
649 Returns
650 -------
651 names : `tuple` of `LookupKey`
652 Tuple of the `DatasetType` name and the `StorageClass` name.
653 If ``instrument`` is defined in the dataId, each of those names
654 is added to the start of the tuple with a key derived from the
655 value of ``instrument``.
656 """
657 # Special case the instrument Dimension since we allow configs
658 # to include the instrument name in the hierarchy.
659 names: tuple[LookupKey, ...] = self.datasetType._lookupNames()
661 if "instrument" in self.dataId:
662 names = tuple(n.clone(dataId={"instrument": self.dataId["instrument"]}) for n in names) + names
664 return names
666 @staticmethod
667 def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[DatasetRef]]:
668 """Group an iterable of `DatasetRef` by `DatasetType`.
670 Parameters
671 ----------
672 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
673 `DatasetRef` instances to group.
675 Returns
676 -------
677 grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ]
678 Grouped `DatasetRef` instances.
680 Notes
681 -----
682 When lazy item-iterables are acceptable instead of a full mapping,
683 `iter_by_type` can in some cases be far more efficient.
684 """
685 result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict()
686 for ref in refs:
687 result.setdefault(ref.datasetType, []).append(ref)
688 return result
690 @staticmethod
691 def iter_by_type(
692 refs: Iterable[DatasetRef],
693 ) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
694 """Group an iterable of `DatasetRef` by `DatasetType` with special
695 hooks for custom iterables that can do this efficiently.
697 Parameters
698 ----------
699 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
700 `DatasetRef` instances to group. If this satisfies the
701 `_DatasetRefGroupedIterable` protocol, its
702 `~_DatasetRefGroupedIterable._iter_by_dataset_type` method will
703 be called.
705 Returns
706 -------
707 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
708 `~collections.abc.Iterable` [ `DatasetRef` ] ]]
709 Grouped `DatasetRef` instances.
710 """
711 if isinstance(refs, _DatasetRefGroupedIterable):
712 return refs._iter_by_dataset_type()
713 return DatasetRef.groupByType(refs).items()
715 def makeCompositeRef(self) -> DatasetRef:
716 """Create a `DatasetRef` of the composite from a component ref.
718 Requires that this `DatasetRef` is a component.
720 Returns
721 -------
722 ref : `DatasetRef`
723 A `DatasetRef` with a dataset type that corresponds to the
724 composite parent of this component, and the same ID and run
725 (which may be `None`, if they are `None` in ``self``).
726 """
727 # Assume that the data ID does not need to be standardized
728 # and should match whatever this ref already has.
729 return DatasetRef(
730 self.datasetType.makeCompositeDatasetType(),
731 self.dataId,
732 id=self.id,
733 run=self.run,
734 conform=False,
735 datastore_records=self._datastore_records,
736 )
738 def makeComponentRef(self, name: str) -> DatasetRef:
739 """Create a `DatasetRef` that corresponds to a component.
741 Parameters
742 ----------
743 name : `str`
744 Name of the component.
746 Returns
747 -------
748 ref : `DatasetRef`
749 A `DatasetRef` with a dataset type that corresponds to the given
750 component, and the same ID and run
751 (which may be `None`, if they are `None` in ``self``).
752 """
753 # Assume that the data ID does not need to be standardized
754 # and should match whatever this ref already has.
755 return DatasetRef(
756 self.datasetType.makeComponentDatasetType(name),
757 self.dataId,
758 id=self.id,
759 run=self.run,
760 conform=False,
761 datastore_records=self._datastore_records,
762 )
764 def overrideStorageClass(self, storageClass: str | StorageClass) -> DatasetRef:
765 """Create a new `DatasetRef` from this one, but with a modified
766 `DatasetType` that has a different `StorageClass`.
768 Parameters
769 ----------
770 storageClass : `str` or `StorageClass`
771 The new storage class.
773 Returns
774 -------
775 modified : `DatasetRef`
776 A new dataset reference that is the same as the current one but
777 with a different storage class in the `DatasetType`.
778 """
779 return self.replace(storage_class=storageClass)
781 def replace(
782 self,
783 *,
784 id: DatasetId | None = None,
785 run: str | None = None,
786 storage_class: str | StorageClass | None = None,
787 datastore_records: DatasetDatastoreRecords | None | Literal[False] = False,
788 ) -> DatasetRef:
789 """Create a new `DatasetRef` from this one, but with some modified
790 attributes.
792 Parameters
793 ----------
794 id : `DatasetId` or `None`
795 If not `None` then update dataset ID.
796 run : `str` or `None`
797 If not `None` then update run collection name. If ``dataset_id`` is
798 `None` then this will also cause new dataset ID to be generated.
799 storage_class : `str` or `StorageClass` or `None`
800 The new storage class. If not `None`, replaces existing storage
801 class.
802 datastore_records : `DatasetDatastoreRecords` or `None`
803 New datastore records. If `None` remove all records. By default
804 datastore records are preserved.
806 Returns
807 -------
808 modified : `DatasetRef`
809 A new dataset reference with updated attributes.
810 """
811 if datastore_records is False:
812 datastore_records = self._datastore_records
813 if storage_class is None:
814 datasetType = self.datasetType
815 else:
816 datasetType = self.datasetType.overrideStorageClass(storage_class)
817 if run is None:
818 run = self.run
819 # Do not regenerate dataset ID if run is the same.
820 if id is None:
821 id = self.id
822 return DatasetRef(
823 datasetType=datasetType,
824 dataId=self.dataId,
825 run=run,
826 id=id,
827 conform=False,
828 datastore_records=datastore_records,
829 )
831 def is_compatible_with(self, other: DatasetRef) -> bool:
832 """Determine if the given `DatasetRef` is compatible with this one.
834 Parameters
835 ----------
836 other : `DatasetRef`
837 Dataset ref to check.
839 Returns
840 -------
841 is_compatible : `bool`
842 Returns `True` if the other dataset ref is either the same as this
843 or the dataset type associated with the other is compatible with
844 this one and the dataId and dataset ID match.
846 Notes
847 -----
848 Compatibility requires that the dataId and dataset ID match and the
849 `DatasetType` is compatible. Compatibility is defined as the storage
850 class associated with the dataset type of the other ref can be
851 converted to this storage class.
853 Specifically this means that if you have done:
855 .. code-block:: py
857 new_ref = ref.overrideStorageClass(sc)
859 and this is successful, then the guarantee is that:
861 .. code-block:: py
863 assert ref.is_compatible_with(new_ref) is True
865 since we know that the python type associated with the new ref can
866 be converted to the original python type. The reverse is not guaranteed
867 and depends on whether bidirectional converters have been registered.
868 """
869 if self.id != other.id:
870 return False
871 if self.dataId != other.dataId:
872 return False
873 if self.run != other.run:
874 return False
875 return self.datasetType.is_compatible_with(other.datasetType)
877 datasetType: DatasetType
878 """The definition of this dataset (`DatasetType`).
880 Cannot be changed after a `DatasetRef` is constructed.
881 """
883 dataId: DataCoordinate
884 """A mapping of `Dimension` primary key values that labels the dataset
885 within a Collection (`DataCoordinate`).
887 Cannot be changed after a `DatasetRef` is constructed.
888 """
890 run: str
891 """The name of the run that produced the dataset.
893 Cannot be changed after a `DatasetRef` is constructed.
894 """
896 datastore_records: DatasetDatastoreRecords | None
897 """Optional datastore records (`DatasetDatastoreRecords`).
899 Cannot be changed after a `DatasetRef` is constructed.
900 """
903class MinimalistSerializableDatasetRef(pydantic.BaseModel):
904 """Minimal information needed to define a DatasetRef.
906 The ID is not included and is presumed to be the key to a mapping
907 to this information.
908 """
910 model_config = pydantic.ConfigDict(frozen=True)
912 dataset_type_name: str
913 """Name of the dataset type."""
915 run: str
916 """Name of the RUN collection."""
918 data_id: SerializedDataId
919 """Data coordinate of this dataset."""
921 def to_dataset_ref(
922 self,
923 id: DatasetId,
924 *,
925 dataset_type: DatasetType,
926 universe: DimensionUniverse,
927 attacher: DimensionDataAttacher | None = None,
928 ) -> DatasetRef:
929 """Convert serialized object to a `DatasetRef`.
931 Parameters
932 ----------
933 id : `DatasetId`
934 UUID identifying the dataset.
935 dataset_type : `DatasetType`
936 `DatasetType` record corresponding to the dataset type name in the
937 serialized object.
938 universe : `DimensionUniverse`
939 Dimension universe for the dataset.
940 attacher : `DimensionDataAttacher`, optional
941 If provided, will be used to add dimension records to the
942 deserialized `DatasetRef` instance.
944 Returns
945 -------
946 ref : `DatasetRef`
947 The deserialized object.
948 """
949 assert dataset_type.name == self.dataset_type_name, (
950 "Given DatasetType does not match the serialized dataset type name"
951 )
952 simple_data_id = SerializedDataCoordinate(dataId=self.data_id)
953 data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe)
954 if attacher:
955 data_ids = attacher.attach(dataset_type.dimensions, [data_id])
956 data_id = data_ids[0]
957 return DatasetRef(
958 id=id,
959 run=self.run,
960 datasetType=dataset_type,
961 dataId=data_id,
962 )
964 @staticmethod
965 def from_dataset_ref(ref: DatasetRef) -> MinimalistSerializableDatasetRef:
966 """Serialize a ``DatasetRef` to a simplified format.
968 Parameters
969 ----------
970 ref : `DatasetRef`
971 `DatasetRef` object to serialize.
972 """
973 return MinimalistSerializableDatasetRef(
974 dataset_type_name=ref.datasetType.name, run=ref.run, data_id=dict(ref.dataId.mapping)
975 )
978class SerializedDatasetRefContainer(pydantic.BaseModel):
979 """Serializable model for a collection of DatasetRef.
981 Dimension records are not included.
982 """
984 model_config = pydantic.ConfigDict(extra="allow", frozen=True)
985 container_version: str
988class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer):
989 """Serializable model for a collection of DatasetRef.
991 Dimension records are not included.
992 """
994 container_version: Literal["V1"] = "V1"
996 universe_version: int
997 """Dimension universe version."""
999 universe_namespace: str
1000 """Dimension universe namespace."""
1002 dataset_types: dict[str, SerializedDatasetType]
1003 """Dataset types indexed by their name."""
1005 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef]
1006 """Minimal dataset ref information indexed by UUID."""
1008 dimension_records: SerializableDimensionData | None = None
1009 """Dimension record information"""
1011 def __len__(self) -> int:
1012 """Return the number of datasets in the container."""
1013 return len(self.compact_refs)
1015 @classmethod
1016 def from_refs(cls, refs: Iterable[DatasetRef]) -> Self:
1017 """Construct a serializable form from a list of `DatasetRef`.
1019 Parameters
1020 ----------
1021 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
1022 The datasets to include in the container.
1023 """
1024 # The serialized DatasetRef contains a lot of duplicated information.
1025 # We also want to drop dimension records and assume that the records
1026 # are already in the registry.
1027 universe: DimensionUniverse | None = None
1028 dataset_types: dict[str, SerializedDatasetType] = {}
1029 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {}
1030 data_ids: list[DataCoordinate] = []
1031 dimensions: list[DimensionGroup] = []
1032 for ref in refs:
1033 if universe is None:
1034 universe = ref.datasetType.dimensions.universe
1035 if (name := ref.datasetType.name) not in dataset_types:
1036 dataset_types[name] = ref.datasetType.to_simple()
1037 compact_refs[ref.id] = MinimalistSerializableDatasetRef.from_dataset_ref(ref)
1038 if ref.dataId.hasRecords():
1039 dimensions.append(ref.datasetType.dimensions)
1040 data_ids.append(ref.dataId)
1042 # Extract dimension record metadata if present.
1043 dimension_records = None
1044 if data_ids and len(compact_refs) == len(data_ids):
1045 dimension_group = DimensionGroup.union(*dimensions, universe=universe)
1047 # Records were attached to all refs. Store them.
1048 extractor = DimensionDataExtractor.from_dimension_group(
1049 dimension_group,
1050 ignore_cached=False,
1051 include_skypix=False,
1052 )
1053 extractor.update(data_ids)
1054 dimension_records = SerializableDimensionData.from_record_sets(extractor.records.values())
1056 if universe:
1057 universe_version = universe.version
1058 universe_namespace = universe.namespace
1059 else:
1060 # No refs so no universe.
1061 universe_version = 0
1062 universe_namespace = "unknown"
1063 return cls(
1064 universe_version=universe_version,
1065 universe_namespace=universe_namespace,
1066 dataset_types=dataset_types,
1067 compact_refs=compact_refs,
1068 dimension_records=dimension_records,
1069 )
1071 def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]:
1072 """Construct the original `DatasetRef`.
1074 Parameters
1075 ----------
1076 universe : `DimensionUniverse`
1077 The universe to use when constructing the `DatasetRef`.
1079 Returns
1080 -------
1081 refs : `list` [ `DatasetRef` ]
1082 The `DatasetRef` that were serialized.
1083 """
1084 if not self.compact_refs:
1085 return []
1087 if universe.namespace != self.universe_namespace:
1088 raise InconsistentUniverseError(
1089 f"Can not convert to refs in universe {universe.namespace} that were created from "
1090 f"universe {self.universe_namespace}"
1091 )
1093 if universe.version != self.universe_version:
1094 _LOG.warning(
1095 "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. "
1096 "Serialized with version %d but asked to use version %d. "
1097 "There could be failures due to different universe versions.",
1098 self.universe_version,
1099 universe.version,
1100 )
1102 # Reconstruct the DatasetType objects.
1103 dataset_types: dict[str, DatasetType] = {}
1104 if universe.version == self.universe_version:
1105 dataset_types = {
1106 name: DatasetType.from_simple(dtype, universe=universe)
1107 for name, dtype in self.dataset_types.items()
1108 }
1109 else:
1110 # When versions are different the dimensions may either disappear
1111 # or new dimensions can be aedded to conforming set.
1112 for name, dtype in self.dataset_types.items():
1113 try:
1114 dataset_type = DatasetType.from_simple(dtype, universe=universe)
1115 except KeyError as exc:
1116 raise InconsistentUniverseError(
1117 f"Source dimensions {dtype.dimensions} are not compatible with "
1118 f"target universe dimensions {universe}."
1119 ) from exc
1120 if set(dataset_type.dimensions.required) != set(dtype.dimensions or []):
1121 raise InconsistentUniverseError(
1122 f"Source dimensions {dtype.dimensions} are different from a conforming "
1123 f"set of target universe dimensions {dataset_type.dimensions}."
1124 )
1125 dataset_types[name] = dataset_type
1127 # Dimension records can be attached if available.
1128 # We assume that all dimension information was stored.
1129 attacher = None
1130 if self.dimension_records:
1131 attacher = DimensionDataAttacher(
1132 deserializers=self.dimension_records.make_deserializers(universe)
1133 )
1135 refs: list[DatasetRef] = []
1136 for id_, minimal in self.compact_refs.items():
1137 ref = minimal.to_dataset_ref(
1138 id_,
1139 dataset_type=dataset_types[minimal.dataset_type_name],
1140 universe=universe,
1141 attacher=attacher,
1142 )
1143 refs.append(ref)
1144 return refs
1147SerializedDatasetRefContainers: TypeAlias = Annotated[
1148 SerializedDatasetRefContainerV1,
1149 pydantic.Field(discriminator="container_version"),
1150]