Coverage for python / lsst / daf / butler / _dataset_ref.py: 29%

325 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:07 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = [ 

30 "AmbiguousDatasetError", 

31 "DatasetDatastoreRecords", 

32 "DatasetId", 

33 "DatasetIdFactory", 

34 "DatasetIdGenEnum", 

35 "DatasetRef", 

36 "SerializedDatasetRef", 

37 "SerializedDatasetRefContainerV1", 

38 "SerializedDatasetRefContainers", 

39] 

40 

41import enum 

42import logging 

43import sys 

44import uuid 

45from collections.abc import Callable, Iterable, Mapping 

46from typing import ( 

47 TYPE_CHECKING, 

48 Annotated, 

49 Any, 

50 ClassVar, 

51 Literal, 

52 Protocol, 

53 Self, 

54 TypeAlias, 

55 cast, 

56 runtime_checkable, 

57) 

58 

59import pydantic 

60from pydantic import StrictStr 

61 

62from lsst.utils.classes import immutable 

63 

64from ._config_support import LookupKey 

65from ._dataset_type import DatasetType, SerializedDatasetType 

66from ._exceptions import InconsistentUniverseError 

67from ._named import NamedKeyDict 

68from ._uuid import generate_uuidv7 

69from .datastore.stored_file_info import StoredDatastoreItemInfo 

70from .dimensions import ( 

71 DataCoordinate, 

72 DimensionDataAttacher, 

73 DimensionDataExtractor, 

74 DimensionGroup, 

75 DimensionUniverse, 

76 SerializableDimensionData, 

77 SerializedDataCoordinate, 

78 SerializedDataId, 

79) 

80from .json import from_json_pydantic, to_json_pydantic 

81from .persistence_context import PersistenceContextVars 

82 

83if TYPE_CHECKING: 

84 from ._storage_class import StorageClass 

85 from .registry import Registry 

86 

87# Per-dataset records grouped by opaque table name, usually there is just one 

88# opaque table. 

89DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]] 

90 

91 

92_LOG = logging.getLogger(__name__) 

93 

94 

95class AmbiguousDatasetError(Exception): 

96 """Raised when a `DatasetRef` is not resolved but should be. 

97 

98 This happens when the `DatasetRef` has no ID or run but the requested 

99 operation requires one of them. 

100 """ 

101 

102 

103@runtime_checkable 

104class _DatasetRefGroupedIterable(Protocol): 

105 """A package-private interface for iterables of `DatasetRef` that know how 

106 to efficiently group their contents by `DatasetType`. 

107 

108 """ 

109 

110 def _iter_by_dataset_type(self) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]: 

111 """Iterate over `DatasetRef` instances, one `DatasetType` at a time. 

112 

113 Returns 

114 ------- 

115 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \ 

116 `~collections.abc.Iterable` [ `DatasetRef` ] 

117 An iterable of tuples, in which the first element is a dataset type 

118 and the second is an iterable of `DatasetRef` objects with exactly 

119 that dataset type. 

120 """ 

121 ... 

122 

123 

124class DatasetIdGenEnum(enum.Enum): 

125 """Enum used to specify dataset ID generation options.""" 

126 

127 UNIQUE = 0 

128 """Unique mode generates unique ID for each inserted dataset, e.g. 

129 auto-generated by database or random UUID. 

130 """ 

131 

132 DATAID_TYPE = 1 

133 """In this mode ID is computed deterministically from a combination of 

134 dataset type and dataId. 

135 """ 

136 

137 DATAID_TYPE_RUN = 2 

138 """In this mode ID is computed deterministically from a combination of 

139 dataset type, dataId, and run collection name. 

140 """ 

141 

142 

143class DatasetIdFactory: 

144 """Factory for dataset IDs (UUIDs). 

145 

146 For now the logic is hard-coded and is controlled by the user-provided 

147 value of `DatasetIdGenEnum`. In the future we may implement a configurable 

148 logic that can guess `DatasetIdGenEnum` value from other parameters. 

149 """ 

150 

151 NS_UUID = uuid.UUID("840b31d9-05cd-5161-b2c8-00d32b280d0f") 

152 """Namespace UUID used for UUID5 generation. Do not change. This was 

153 produced by ``uuid.uuid5(uuid.NAMESPACE_DNS, "lsst.org")``. 

154 """ 

155 

156 def makeDatasetId( 

157 self, 

158 run: str, 

159 datasetType: DatasetType, 

160 dataId: DataCoordinate, 

161 idGenerationMode: DatasetIdGenEnum, 

162 ) -> uuid.UUID: 

163 """Generate dataset ID for a dataset. 

164 

165 Parameters 

166 ---------- 

167 run : `str` 

168 Name of the RUN collection for the dataset. 

169 datasetType : `DatasetType` 

170 Dataset type. 

171 dataId : `DataCoordinate` 

172 Expanded data ID for the dataset. 

173 idGenerationMode : `DatasetIdGenEnum` 

174 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random 

175 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a 

176 deterministic UUID5-type ID based on a dataset type name and 

177 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a 

178 deterministic UUID5-type ID based on a dataset type name, run 

179 collection name, and ``dataId``. 

180 

181 Returns 

182 ------- 

183 datasetId : `uuid.UUID` 

184 Dataset identifier. 

185 """ 

186 if idGenerationMode is DatasetIdGenEnum.UNIQUE: 

187 # Earlier versions of this code used UUIDv4. However, totally 

188 # random IDs create problems for Postgres insert performance, 

189 # because it scatters index updates randomly around the disk. 

190 # UUIDv7 has similar uniqueness properties to v4, but IDs generated 

191 # at the same time are close together in the index. 

192 return generate_uuidv7() 

193 else: 

194 # WARNING: If you modify this code make sure that the order of 

195 # items in the `items` list below never changes. 

196 items: list[tuple[str, str]] = [] 

197 if idGenerationMode is DatasetIdGenEnum.DATAID_TYPE: 

198 items = [ 

199 ("dataset_type", datasetType.name), 

200 ] 

201 elif idGenerationMode is DatasetIdGenEnum.DATAID_TYPE_RUN: 

202 items = [ 

203 ("dataset_type", datasetType.name), 

204 ("run", run), 

205 ] 

206 else: 

207 raise ValueError(f"Unexpected ID generation mode: {idGenerationMode}") 

208 

209 for name, value in sorted(dataId.required.items()): 

210 items.append((name, str(value))) 

211 data = ",".join(f"{key}={value}" for key, value in items) 

212 return uuid.uuid5(self.NS_UUID, data) 

213 

214 

215# This is constant, so don't recreate a set for each instance 

216_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"} 

217 

218 

219class SerializedDatasetRef(pydantic.BaseModel): 

220 """Simplified model of a `DatasetRef` suitable for serialization.""" 

221 

222 id: uuid.UUID 

223 datasetType: SerializedDatasetType | None = None 

224 dataId: SerializedDataCoordinate | None = None 

225 run: StrictStr | None = None 

226 component: StrictStr | None = None 

227 

228 # Can not use "after" validator since in some cases the validator 

229 # seems to trigger with the datasetType field not yet set. 

230 @pydantic.model_validator(mode="before") # type: ignore[attr-defined] 

231 @classmethod 

232 def check_consistent_parameters(cls, data: dict[str, Any]) -> dict[str, Any]: 

233 has_datasetType = data.get("datasetType") is not None 

234 has_dataId = data.get("dataId") is not None 

235 if has_datasetType is not has_dataId: 

236 raise ValueError("If specifying datasetType or dataId, must specify both.") 

237 

238 if data.get("component") is not None and has_datasetType: 

239 raise ValueError("datasetType can not be set if component is given.") 

240 return data 

241 

242 @classmethod 

243 def direct( 

244 cls, 

245 *, 

246 id: str, 

247 run: str, 

248 datasetType: dict[str, Any] | None = None, 

249 dataId: dict[str, Any] | None = None, 

250 component: str | None = None, 

251 ) -> SerializedDatasetRef: 

252 """Construct a `SerializedDatasetRef` directly without validators. 

253 

254 Parameters 

255 ---------- 

256 id : `str` 

257 The UUID in string form. 

258 run : `str` 

259 The run for this dataset. 

260 datasetType : `dict` [`str`, `typing.Any`] 

261 A representation of the dataset type. 

262 dataId : `dict` [`str`, `typing.Any`] 

263 A representation of the data ID. 

264 component : `str` or `None` 

265 Any component associated with this ref. 

266 

267 Returns 

268 ------- 

269 serialized : `SerializedDatasetRef` 

270 A Pydantic model representing the given parameters. 

271 

272 Notes 

273 ----- 

274 This differs from the pydantic "construct" method in that the arguments 

275 are explicitly what the model requires, and it will recurse through 

276 members, constructing them from their corresponding `direct` methods. 

277 

278 The ``id`` parameter is a string representation of dataset ID, it is 

279 converted to UUID by this method. 

280 

281 This method should only be called when the inputs are trusted. 

282 """ 

283 serialized_datasetType = ( 

284 SerializedDatasetType.direct(**datasetType) if datasetType is not None else None 

285 ) 

286 serialized_dataId = SerializedDataCoordinate.direct(**dataId) if dataId is not None else None 

287 

288 node = cls.model_construct( 

289 _fields_set=_serializedDatasetRefFieldsSet, 

290 id=uuid.UUID(id), 

291 datasetType=serialized_datasetType, 

292 dataId=serialized_dataId, 

293 run=sys.intern(run), 

294 component=component, 

295 ) 

296 

297 return node 

298 

299 

300DatasetId: TypeAlias = uuid.UUID 

301"""A type-annotation alias for dataset ID providing typing flexibility. 

302""" 

303 

304 

305@immutable 

306class DatasetRef: 

307 """Reference to a Dataset in a `Registry`. 

308 

309 A `DatasetRef` may point to a Dataset that currently does not yet exist 

310 (e.g., because it is a predicted input for provenance). 

311 

312 Parameters 

313 ---------- 

314 datasetType : `DatasetType` 

315 The `DatasetType` for this Dataset. 

316 dataId : `DataCoordinate` 

317 A mapping of dimensions that labels the Dataset within a Collection. 

318 run : `str` 

319 The name of the run this dataset was associated with when it was 

320 created. 

321 id : `DatasetId`, optional 

322 The unique identifier assigned when the dataset is created. If ``id`` 

323 is not specified, a new unique ID will be created. 

324 conform : `bool`, optional 

325 If `True` (default), call `DataCoordinate.standardize` to ensure that 

326 the data ID's dimensions are consistent with the dataset type's. 

327 `DatasetRef` instances for which those dimensions are not equal should 

328 not be created in new code, but are still supported for backwards 

329 compatibility. New code should only pass `False` if it can guarantee 

330 that the dimensions are already consistent. 

331 id_generation_mode : `DatasetIdGenEnum` 

332 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random 

333 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a 

334 deterministic UUID5-type ID based on a dataset type name and 

335 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a 

336 deterministic UUID5-type ID based on a dataset type name, run 

337 collection name, and ``dataId``. 

338 datastore_records : `DatasetDatastoreRecords` or `None` 

339 Datastore records to attach. 

340 

341 Notes 

342 ----- 

343 See also :ref:`daf_butler_organizing_datasets` 

344 """ 

345 

346 _serializedType: ClassVar[type[pydantic.BaseModel]] = SerializedDatasetRef 

347 __slots__ = ( 

348 "_id", 

349 "datasetType", 

350 "dataId", 

351 "run", 

352 "_datastore_records", 

353 ) 

354 

355 def __init__( 

356 self, 

357 datasetType: DatasetType, 

358 dataId: DataCoordinate, 

359 run: str, 

360 *, 

361 id: DatasetId | None = None, 

362 conform: bool = True, 

363 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

364 datastore_records: DatasetDatastoreRecords | None = None, 

365 ): 

366 self.datasetType = datasetType 

367 if conform: 

368 self.dataId = DataCoordinate.standardize(dataId, dimensions=datasetType.dimensions) 

369 else: 

370 self.dataId = dataId 

371 self.run = run 

372 if id is not None: 

373 self._id = id.int 

374 else: 

375 self._id = ( 

376 DatasetIdFactory() 

377 .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode) 

378 .int 

379 ) 

380 self._datastore_records = datastore_records 

381 

382 @property 

383 def id(self) -> DatasetId: 

384 """Primary key of the dataset (`DatasetId`). 

385 

386 Cannot be changed after a `DatasetRef` is constructed. 

387 """ 

388 return uuid.UUID(int=self._id) 

389 

390 def __eq__(self, other: Any) -> bool: 

391 try: 

392 return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id) 

393 except AttributeError: 

394 return NotImplemented 

395 

396 def __hash__(self) -> int: 

397 return hash((self.datasetType, self.dataId, self.id)) 

398 

399 @property 

400 def dimensions(self) -> DimensionGroup: 

401 """Dimensions associated with the underlying `DatasetType`.""" 

402 return self.datasetType.dimensions 

403 

404 def __repr__(self) -> str: 

405 # We delegate to __str__ (i.e use "!s") for the data ID) below because 

406 # DataCoordinate's __repr__ - while adhering to the guidelines for 

407 # __repr__ - is much harder to users to read, while its __str__ just 

408 # produces a dict that can also be passed to DatasetRef's constructor. 

409 return f"DatasetRef({self.datasetType!r}, {self.dataId!s}, run={self.run!r}, id={self.id})" 

410 

411 def __str__(self) -> str: 

412 s = ( 

413 f"{self.datasetType.name}@{self.dataId!s} [sc={self.datasetType.storageClass_name}]" 

414 f" (run={self.run} id={self.id})" 

415 ) 

416 return s 

417 

418 def __lt__(self, other: Any) -> bool: 

419 # Sort by run, DatasetType name and then by DataCoordinate 

420 # The __str__ representation is probably close enough but we 

421 # need to ensure that sorting a DatasetRef matches what you would 

422 # get if you sorted DatasetType+DataCoordinate 

423 if not isinstance(other, type(self)): 

424 return NotImplemented 

425 

426 # Group by run if defined, takes precedence over DatasetType 

427 self_run = "" if self.run is None else self.run 

428 other_run = "" if other.run is None else other.run 

429 

430 # Compare tuples in the priority order 

431 return (self_run, self.datasetType, self.dataId) < (other_run, other.datasetType, other.dataId) 

432 

433 def to_simple(self, minimal: bool = False) -> SerializedDatasetRef: 

434 """Convert this class to a simple python type. 

435 

436 This makes it suitable for serialization. 

437 

438 Parameters 

439 ---------- 

440 minimal : `bool`, optional 

441 Use minimal serialization. Requires Registry to convert 

442 back to a full type. 

443 

444 Returns 

445 ------- 

446 simple : `dict` or `int` 

447 The object converted to a dictionary. 

448 """ 

449 if minimal: 

450 # The only thing needed to uniquely define a DatasetRef is its id 

451 # so that can be used directly if it is not a component DatasetRef. 

452 # Store is in a dict to allow us to easily add the planned origin 

453 # information later without having to support an int and dict in 

454 # simple form. 

455 simple: dict[str, Any] = {"id": self.id} 

456 if self.isComponent(): 

457 # We can still be a little minimalist with a component 

458 # but we will also need to record the datasetType component 

459 simple["component"] = self.datasetType.component() 

460 return SerializedDatasetRef(**simple) 

461 

462 return SerializedDatasetRef( 

463 datasetType=self.datasetType.to_simple(minimal=minimal), 

464 dataId=self.dataId.to_simple(), 

465 run=self.run, 

466 id=self.id, 

467 ) 

468 

469 @classmethod 

470 def from_simple( 

471 cls, 

472 simple: SerializedDatasetRef, 

473 universe: DimensionUniverse | None = None, 

474 registry: Registry | None = None, 

475 datasetType: DatasetType | None = None, 

476 ) -> DatasetRef: 

477 """Construct a new object from simplified form. 

478 

479 Generally this is data returned from the `to_simple` method. 

480 

481 Parameters 

482 ---------- 

483 simple : `dict` of [`str`, `typing.Any`] 

484 The value returned by `to_simple()`. 

485 universe : `DimensionUniverse` 

486 The special graph of all known dimensions. 

487 Can be `None` if a registry is provided. 

488 registry : `lsst.daf.butler.Registry`, optional 

489 Registry to use to convert simple form of a DatasetRef to 

490 a full `DatasetRef`. Can be `None` if a full description of 

491 the type is provided along with a universe. 

492 datasetType : DatasetType, optional 

493 If datasetType is supplied, this will be used as the datasetType 

494 object in the resulting DatasetRef instead of being read from 

495 the `SerializedDatasetRef`. This is useful when many refs share 

496 the same type as memory can be saved. Defaults to None. 

497 

498 Returns 

499 ------- 

500 ref : `DatasetRef` 

501 Newly-constructed object. 

502 """ 

503 cache = PersistenceContextVars.datasetRefs.get() 

504 key = simple.id.int 

505 if cache is not None and (ref := cache.get(key, None)) is not None: 

506 if datasetType is not None: 

507 if (component := datasetType.component()) is not None: 

508 ref = ref.makeComponentRef(component) 

509 ref = ref.overrideStorageClass(datasetType.storageClass_name) 

510 return ref 

511 if simple.datasetType is not None: 

512 _, component = DatasetType.splitDatasetTypeName(simple.datasetType.name) 

513 if component is not None: 

514 ref = ref.makeComponentRef(component) 

515 if simple.datasetType.storageClass is not None: 

516 ref = ref.overrideStorageClass(simple.datasetType.storageClass) 

517 return ref 

518 # If dataset type is not given ignore the cache, because we can't 

519 # reliably return the right storage class. 

520 # Minimalist component will just specify component and id and 

521 # require registry to reconstruct 

522 if simple.datasetType is None and simple.dataId is None and simple.run is None: 

523 if registry is None: 

524 raise ValueError("Registry is required to construct component DatasetRef from integer id") 

525 if simple.id is None: 

526 raise ValueError("For minimal DatasetRef the ID must be defined.") 

527 ref = registry.getDataset(simple.id) 

528 if ref is None: 

529 raise RuntimeError(f"No matching dataset found in registry for id {simple.id}") 

530 if simple.component: 

531 ref = ref.makeComponentRef(simple.component) 

532 else: 

533 if universe is None: 

534 if registry is None: 

535 raise ValueError("One of universe or registry must be provided.") 

536 universe = registry.dimensions 

537 if datasetType is None: 

538 if simple.datasetType is None: 

539 raise ValueError("Cannot determine Dataset type of this serialized class") 

540 datasetType = DatasetType.from_simple( 

541 simple.datasetType, universe=universe, registry=registry 

542 ) 

543 if simple.dataId is None: 

544 # mypy 

545 raise ValueError("The DataId must be specified to construct a DatasetRef") 

546 dataId = DataCoordinate.from_simple(simple.dataId, universe=universe) 

547 # Check that simple ref is resolved. 

548 if simple.run is None: 

549 dstr = "" 

550 if simple.datasetType is None: 

551 dstr = f" (datasetType={datasetType.name!r})" 

552 raise ValueError( 

553 "Run collection name is missing from serialized representation. " 

554 f"Encountered with {simple!r}{dstr}." 

555 ) 

556 ref = cls( 

557 datasetType, 

558 dataId, 

559 id=simple.id, 

560 run=simple.run, 

561 ) 

562 if cache is not None: 

563 if ref.datasetType.component() is not None: 

564 cache[key] = ref.makeCompositeRef() 

565 else: 

566 cache[key] = ref 

567 return ref 

568 

569 to_json = to_json_pydantic 

570 from_json: ClassVar[Callable[..., Self]] = cast(Callable[..., Self], classmethod(from_json_pydantic)) 

571 

572 @classmethod 

573 def _unpickle( 

574 cls, 

575 datasetType: DatasetType, 

576 dataId: DataCoordinate, 

577 id: DatasetId, 

578 run: str, 

579 datastore_records: DatasetDatastoreRecords | None, 

580 ) -> DatasetRef: 

581 """Create new `DatasetRef`. 

582 

583 A custom factory method for use by `__reduce__` as a workaround for 

584 its lack of support for keyword arguments. 

585 """ 

586 return cls(datasetType, dataId, id=id, run=run, datastore_records=datastore_records) 

587 

588 def __reduce__(self) -> tuple: 

589 return ( 

590 self._unpickle, 

591 (self.datasetType, self.dataId, self.id, self.run, self._datastore_records), 

592 ) 

593 

594 def __deepcopy__(self, memo: dict) -> DatasetRef: 

595 # DatasetRef is recursively immutable; see note in @immutable 

596 # decorator. 

597 return self 

598 

599 def expanded(self, dataId: DataCoordinate) -> DatasetRef: 

600 """Return a new `DatasetRef` with the given expanded data ID. 

601 

602 Parameters 

603 ---------- 

604 dataId : `DataCoordinate` 

605 Data ID for the new `DatasetRef`. Must compare equal to the 

606 original data ID. 

607 

608 Returns 

609 ------- 

610 ref : `DatasetRef` 

611 A new `DatasetRef` with the given data ID. 

612 """ 

613 assert dataId == self.dataId 

614 return DatasetRef( 

615 datasetType=self.datasetType, 

616 dataId=dataId, 

617 id=self.id, 

618 run=self.run, 

619 conform=False, 

620 datastore_records=self._datastore_records, 

621 ) 

622 

623 def isComponent(self) -> bool: 

624 """Indicate whether this `DatasetRef` refers to a component. 

625 

626 Returns 

627 ------- 

628 isComponent : `bool` 

629 `True` if this `DatasetRef` is a component, `False` otherwise. 

630 """ 

631 return self.datasetType.isComponent() 

632 

633 def isComposite(self) -> bool: 

634 """Boolean indicating whether this `DatasetRef` is a composite type. 

635 

636 Returns 

637 ------- 

638 isComposite : `bool` 

639 `True` if this `DatasetRef` is a composite type, `False` 

640 otherwise. 

641 """ 

642 return self.datasetType.isComposite() 

643 

644 def _lookupNames(self) -> tuple[LookupKey, ...]: 

645 """Name keys to use when looking up this DatasetRef in a configuration. 

646 

647 The names are returned in order of priority. 

648 

649 Returns 

650 ------- 

651 names : `tuple` of `LookupKey` 

652 Tuple of the `DatasetType` name and the `StorageClass` name. 

653 If ``instrument`` is defined in the dataId, each of those names 

654 is added to the start of the tuple with a key derived from the 

655 value of ``instrument``. 

656 """ 

657 # Special case the instrument Dimension since we allow configs 

658 # to include the instrument name in the hierarchy. 

659 names: tuple[LookupKey, ...] = self.datasetType._lookupNames() 

660 

661 if "instrument" in self.dataId: 

662 names = tuple(n.clone(dataId={"instrument": self.dataId["instrument"]}) for n in names) + names 

663 

664 return names 

665 

666 @staticmethod 

667 def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[DatasetRef]]: 

668 """Group an iterable of `DatasetRef` by `DatasetType`. 

669 

670 Parameters 

671 ---------- 

672 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

673 `DatasetRef` instances to group. 

674 

675 Returns 

676 ------- 

677 grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ] 

678 Grouped `DatasetRef` instances. 

679 

680 Notes 

681 ----- 

682 When lazy item-iterables are acceptable instead of a full mapping, 

683 `iter_by_type` can in some cases be far more efficient. 

684 """ 

685 result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict() 

686 for ref in refs: 

687 result.setdefault(ref.datasetType, []).append(ref) 

688 return result 

689 

690 @staticmethod 

691 def iter_by_type( 

692 refs: Iterable[DatasetRef], 

693 ) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]: 

694 """Group an iterable of `DatasetRef` by `DatasetType` with special 

695 hooks for custom iterables that can do this efficiently. 

696 

697 Parameters 

698 ---------- 

699 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

700 `DatasetRef` instances to group. If this satisfies the 

701 `_DatasetRefGroupedIterable` protocol, its 

702 `~_DatasetRefGroupedIterable._iter_by_dataset_type` method will 

703 be called. 

704 

705 Returns 

706 ------- 

707 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \ 

708 `~collections.abc.Iterable` [ `DatasetRef` ] ]] 

709 Grouped `DatasetRef` instances. 

710 """ 

711 if isinstance(refs, _DatasetRefGroupedIterable): 

712 return refs._iter_by_dataset_type() 

713 return DatasetRef.groupByType(refs).items() 

714 

715 def makeCompositeRef(self) -> DatasetRef: 

716 """Create a `DatasetRef` of the composite from a component ref. 

717 

718 Requires that this `DatasetRef` is a component. 

719 

720 Returns 

721 ------- 

722 ref : `DatasetRef` 

723 A `DatasetRef` with a dataset type that corresponds to the 

724 composite parent of this component, and the same ID and run 

725 (which may be `None`, if they are `None` in ``self``). 

726 """ 

727 # Assume that the data ID does not need to be standardized 

728 # and should match whatever this ref already has. 

729 return DatasetRef( 

730 self.datasetType.makeCompositeDatasetType(), 

731 self.dataId, 

732 id=self.id, 

733 run=self.run, 

734 conform=False, 

735 datastore_records=self._datastore_records, 

736 ) 

737 

738 def makeComponentRef(self, name: str) -> DatasetRef: 

739 """Create a `DatasetRef` that corresponds to a component. 

740 

741 Parameters 

742 ---------- 

743 name : `str` 

744 Name of the component. 

745 

746 Returns 

747 ------- 

748 ref : `DatasetRef` 

749 A `DatasetRef` with a dataset type that corresponds to the given 

750 component, and the same ID and run 

751 (which may be `None`, if they are `None` in ``self``). 

752 """ 

753 # Assume that the data ID does not need to be standardized 

754 # and should match whatever this ref already has. 

755 return DatasetRef( 

756 self.datasetType.makeComponentDatasetType(name), 

757 self.dataId, 

758 id=self.id, 

759 run=self.run, 

760 conform=False, 

761 datastore_records=self._datastore_records, 

762 ) 

763 

764 def overrideStorageClass(self, storageClass: str | StorageClass) -> DatasetRef: 

765 """Create a new `DatasetRef` from this one, but with a modified 

766 `DatasetType` that has a different `StorageClass`. 

767 

768 Parameters 

769 ---------- 

770 storageClass : `str` or `StorageClass` 

771 The new storage class. 

772 

773 Returns 

774 ------- 

775 modified : `DatasetRef` 

776 A new dataset reference that is the same as the current one but 

777 with a different storage class in the `DatasetType`. 

778 """ 

779 return self.replace(storage_class=storageClass) 

780 

781 def replace( 

782 self, 

783 *, 

784 id: DatasetId | None = None, 

785 run: str | None = None, 

786 storage_class: str | StorageClass | None = None, 

787 datastore_records: DatasetDatastoreRecords | None | Literal[False] = False, 

788 ) -> DatasetRef: 

789 """Create a new `DatasetRef` from this one, but with some modified 

790 attributes. 

791 

792 Parameters 

793 ---------- 

794 id : `DatasetId` or `None` 

795 If not `None` then update dataset ID. 

796 run : `str` or `None` 

797 If not `None` then update run collection name. If ``dataset_id`` is 

798 `None` then this will also cause new dataset ID to be generated. 

799 storage_class : `str` or `StorageClass` or `None` 

800 The new storage class. If not `None`, replaces existing storage 

801 class. 

802 datastore_records : `DatasetDatastoreRecords` or `None` 

803 New datastore records. If `None` remove all records. By default 

804 datastore records are preserved. 

805 

806 Returns 

807 ------- 

808 modified : `DatasetRef` 

809 A new dataset reference with updated attributes. 

810 """ 

811 if datastore_records is False: 

812 datastore_records = self._datastore_records 

813 if storage_class is None: 

814 datasetType = self.datasetType 

815 else: 

816 datasetType = self.datasetType.overrideStorageClass(storage_class) 

817 if run is None: 

818 run = self.run 

819 # Do not regenerate dataset ID if run is the same. 

820 if id is None: 

821 id = self.id 

822 return DatasetRef( 

823 datasetType=datasetType, 

824 dataId=self.dataId, 

825 run=run, 

826 id=id, 

827 conform=False, 

828 datastore_records=datastore_records, 

829 ) 

830 

831 def is_compatible_with(self, other: DatasetRef) -> bool: 

832 """Determine if the given `DatasetRef` is compatible with this one. 

833 

834 Parameters 

835 ---------- 

836 other : `DatasetRef` 

837 Dataset ref to check. 

838 

839 Returns 

840 ------- 

841 is_compatible : `bool` 

842 Returns `True` if the other dataset ref is either the same as this 

843 or the dataset type associated with the other is compatible with 

844 this one and the dataId and dataset ID match. 

845 

846 Notes 

847 ----- 

848 Compatibility requires that the dataId and dataset ID match and the 

849 `DatasetType` is compatible. Compatibility is defined as the storage 

850 class associated with the dataset type of the other ref can be 

851 converted to this storage class. 

852 

853 Specifically this means that if you have done: 

854 

855 .. code-block:: py 

856 

857 new_ref = ref.overrideStorageClass(sc) 

858 

859 and this is successful, then the guarantee is that: 

860 

861 .. code-block:: py 

862 

863 assert ref.is_compatible_with(new_ref) is True 

864 

865 since we know that the python type associated with the new ref can 

866 be converted to the original python type. The reverse is not guaranteed 

867 and depends on whether bidirectional converters have been registered. 

868 """ 

869 if self.id != other.id: 

870 return False 

871 if self.dataId != other.dataId: 

872 return False 

873 if self.run != other.run: 

874 return False 

875 return self.datasetType.is_compatible_with(other.datasetType) 

876 

877 datasetType: DatasetType 

878 """The definition of this dataset (`DatasetType`). 

879 

880 Cannot be changed after a `DatasetRef` is constructed. 

881 """ 

882 

883 dataId: DataCoordinate 

884 """A mapping of `Dimension` primary key values that labels the dataset 

885 within a Collection (`DataCoordinate`). 

886 

887 Cannot be changed after a `DatasetRef` is constructed. 

888 """ 

889 

890 run: str 

891 """The name of the run that produced the dataset. 

892 

893 Cannot be changed after a `DatasetRef` is constructed. 

894 """ 

895 

896 datastore_records: DatasetDatastoreRecords | None 

897 """Optional datastore records (`DatasetDatastoreRecords`). 

898 

899 Cannot be changed after a `DatasetRef` is constructed. 

900 """ 

901 

902 

903class MinimalistSerializableDatasetRef(pydantic.BaseModel): 

904 """Minimal information needed to define a DatasetRef. 

905 

906 The ID is not included and is presumed to be the key to a mapping 

907 to this information. 

908 """ 

909 

910 model_config = pydantic.ConfigDict(frozen=True) 

911 

912 dataset_type_name: str 

913 """Name of the dataset type.""" 

914 

915 run: str 

916 """Name of the RUN collection.""" 

917 

918 data_id: SerializedDataId 

919 """Data coordinate of this dataset.""" 

920 

921 def to_dataset_ref( 

922 self, 

923 id: DatasetId, 

924 *, 

925 dataset_type: DatasetType, 

926 universe: DimensionUniverse, 

927 attacher: DimensionDataAttacher | None = None, 

928 ) -> DatasetRef: 

929 """Convert serialized object to a `DatasetRef`. 

930 

931 Parameters 

932 ---------- 

933 id : `DatasetId` 

934 UUID identifying the dataset. 

935 dataset_type : `DatasetType` 

936 `DatasetType` record corresponding to the dataset type name in the 

937 serialized object. 

938 universe : `DimensionUniverse` 

939 Dimension universe for the dataset. 

940 attacher : `DimensionDataAttacher`, optional 

941 If provided, will be used to add dimension records to the 

942 deserialized `DatasetRef` instance. 

943 

944 Returns 

945 ------- 

946 ref : `DatasetRef` 

947 The deserialized object. 

948 """ 

949 assert dataset_type.name == self.dataset_type_name, ( 

950 "Given DatasetType does not match the serialized dataset type name" 

951 ) 

952 simple_data_id = SerializedDataCoordinate(dataId=self.data_id) 

953 data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe) 

954 if attacher: 

955 data_ids = attacher.attach(dataset_type.dimensions, [data_id]) 

956 data_id = data_ids[0] 

957 return DatasetRef( 

958 id=id, 

959 run=self.run, 

960 datasetType=dataset_type, 

961 dataId=data_id, 

962 ) 

963 

964 @staticmethod 

965 def from_dataset_ref(ref: DatasetRef) -> MinimalistSerializableDatasetRef: 

966 """Serialize a ``DatasetRef` to a simplified format. 

967 

968 Parameters 

969 ---------- 

970 ref : `DatasetRef` 

971 `DatasetRef` object to serialize. 

972 """ 

973 return MinimalistSerializableDatasetRef( 

974 dataset_type_name=ref.datasetType.name, run=ref.run, data_id=dict(ref.dataId.mapping) 

975 ) 

976 

977 

978class SerializedDatasetRefContainer(pydantic.BaseModel): 

979 """Serializable model for a collection of DatasetRef. 

980 

981 Dimension records are not included. 

982 """ 

983 

984 model_config = pydantic.ConfigDict(extra="allow", frozen=True) 

985 container_version: str 

986 

987 

988class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer): 

989 """Serializable model for a collection of DatasetRef. 

990 

991 Dimension records are not included. 

992 """ 

993 

994 container_version: Literal["V1"] = "V1" 

995 

996 universe_version: int 

997 """Dimension universe version.""" 

998 

999 universe_namespace: str 

1000 """Dimension universe namespace.""" 

1001 

1002 dataset_types: dict[str, SerializedDatasetType] 

1003 """Dataset types indexed by their name.""" 

1004 

1005 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] 

1006 """Minimal dataset ref information indexed by UUID.""" 

1007 

1008 dimension_records: SerializableDimensionData | None = None 

1009 """Dimension record information""" 

1010 

1011 def __len__(self) -> int: 

1012 """Return the number of datasets in the container.""" 

1013 return len(self.compact_refs) 

1014 

1015 @classmethod 

1016 def from_refs(cls, refs: Iterable[DatasetRef]) -> Self: 

1017 """Construct a serializable form from a list of `DatasetRef`. 

1018 

1019 Parameters 

1020 ---------- 

1021 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

1022 The datasets to include in the container. 

1023 """ 

1024 # The serialized DatasetRef contains a lot of duplicated information. 

1025 # We also want to drop dimension records and assume that the records 

1026 # are already in the registry. 

1027 universe: DimensionUniverse | None = None 

1028 dataset_types: dict[str, SerializedDatasetType] = {} 

1029 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {} 

1030 data_ids: list[DataCoordinate] = [] 

1031 dimensions: list[DimensionGroup] = [] 

1032 for ref in refs: 

1033 if universe is None: 

1034 universe = ref.datasetType.dimensions.universe 

1035 if (name := ref.datasetType.name) not in dataset_types: 

1036 dataset_types[name] = ref.datasetType.to_simple() 

1037 compact_refs[ref.id] = MinimalistSerializableDatasetRef.from_dataset_ref(ref) 

1038 if ref.dataId.hasRecords(): 

1039 dimensions.append(ref.datasetType.dimensions) 

1040 data_ids.append(ref.dataId) 

1041 

1042 # Extract dimension record metadata if present. 

1043 dimension_records = None 

1044 if data_ids and len(compact_refs) == len(data_ids): 

1045 dimension_group = DimensionGroup.union(*dimensions, universe=universe) 

1046 

1047 # Records were attached to all refs. Store them. 

1048 extractor = DimensionDataExtractor.from_dimension_group( 

1049 dimension_group, 

1050 ignore_cached=False, 

1051 include_skypix=False, 

1052 ) 

1053 extractor.update(data_ids) 

1054 dimension_records = SerializableDimensionData.from_record_sets(extractor.records.values()) 

1055 

1056 if universe: 

1057 universe_version = universe.version 

1058 universe_namespace = universe.namespace 

1059 else: 

1060 # No refs so no universe. 

1061 universe_version = 0 

1062 universe_namespace = "unknown" 

1063 return cls( 

1064 universe_version=universe_version, 

1065 universe_namespace=universe_namespace, 

1066 dataset_types=dataset_types, 

1067 compact_refs=compact_refs, 

1068 dimension_records=dimension_records, 

1069 ) 

1070 

1071 def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]: 

1072 """Construct the original `DatasetRef`. 

1073 

1074 Parameters 

1075 ---------- 

1076 universe : `DimensionUniverse` 

1077 The universe to use when constructing the `DatasetRef`. 

1078 

1079 Returns 

1080 ------- 

1081 refs : `list` [ `DatasetRef` ] 

1082 The `DatasetRef` that were serialized. 

1083 """ 

1084 if not self.compact_refs: 

1085 return [] 

1086 

1087 if universe.namespace != self.universe_namespace: 

1088 raise InconsistentUniverseError( 

1089 f"Can not convert to refs in universe {universe.namespace} that were created from " 

1090 f"universe {self.universe_namespace}" 

1091 ) 

1092 

1093 if universe.version != self.universe_version: 

1094 _LOG.warning( 

1095 "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. " 

1096 "Serialized with version %d but asked to use version %d. " 

1097 "There could be failures due to different universe versions.", 

1098 self.universe_version, 

1099 universe.version, 

1100 ) 

1101 

1102 # Reconstruct the DatasetType objects. 

1103 dataset_types: dict[str, DatasetType] = {} 

1104 if universe.version == self.universe_version: 

1105 dataset_types = { 

1106 name: DatasetType.from_simple(dtype, universe=universe) 

1107 for name, dtype in self.dataset_types.items() 

1108 } 

1109 else: 

1110 # When versions are different the dimensions may either disappear 

1111 # or new dimensions can be aedded to conforming set. 

1112 for name, dtype in self.dataset_types.items(): 

1113 try: 

1114 dataset_type = DatasetType.from_simple(dtype, universe=universe) 

1115 except KeyError as exc: 

1116 raise InconsistentUniverseError( 

1117 f"Source dimensions {dtype.dimensions} are not compatible with " 

1118 f"target universe dimensions {universe}." 

1119 ) from exc 

1120 if set(dataset_type.dimensions.required) != set(dtype.dimensions or []): 

1121 raise InconsistentUniverseError( 

1122 f"Source dimensions {dtype.dimensions} are different from a conforming " 

1123 f"set of target universe dimensions {dataset_type.dimensions}." 

1124 ) 

1125 dataset_types[name] = dataset_type 

1126 

1127 # Dimension records can be attached if available. 

1128 # We assume that all dimension information was stored. 

1129 attacher = None 

1130 if self.dimension_records: 

1131 attacher = DimensionDataAttacher( 

1132 deserializers=self.dimension_records.make_deserializers(universe) 

1133 ) 

1134 

1135 refs: list[DatasetRef] = [] 

1136 for id_, minimal in self.compact_refs.items(): 

1137 ref = minimal.to_dataset_ref( 

1138 id_, 

1139 dataset_type=dataset_types[minimal.dataset_type_name], 

1140 universe=universe, 

1141 attacher=attacher, 

1142 ) 

1143 refs.append(ref) 

1144 return refs 

1145 

1146 

1147SerializedDatasetRefContainers: TypeAlias = Annotated[ 

1148 SerializedDatasetRefContainerV1, 

1149 pydantic.Field(discriminator="container_version"), 

1150]