Coverage for python / lsst / daf / butler / datastore / record_data.py: 31%
123 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 07:40 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 07:40 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Support for generic data stores."""
30from __future__ import annotations
32__all__ = ("DatastoreRecordData", "SerializedDatastoreRecordData")
34import dataclasses
35import uuid
36from collections.abc import Iterable, Mapping
37from functools import cache
38from typing import TYPE_CHECKING, TypeAlias
40import pyarrow as pa
41import pyarrow.compute as pc
42import pydantic
44from .._dataset_ref import DatasetId
45from ..dimensions import DimensionUniverse
46from ..persistence_context import PersistenceContextVars
47from .stored_file_info import StoredDatastoreItemInfo, StoredFileInfoTable
49if TYPE_CHECKING:
50 from ..registry import Registry
52# Pydantic requires the possible value types to be explicitly enumerated in
53# order for `uuid.UUID` in particular to work. `typing.Any` does not work
54# here.
55_Record: TypeAlias = dict[str, int | str | None]
58class SerializedDatastoreRecordData(pydantic.BaseModel):
59 """Representation of a `DatastoreRecordData` suitable for serialization."""
61 dataset_ids: list[uuid.UUID]
62 """List of dataset IDs"""
64 records: Mapping[str, Mapping[str, Mapping[str, list[_Record]]]]
65 """List of records indexed by record class name, dataset ID (encoded as
66 str, because JSON), and opaque table name.
67 """
69 @classmethod
70 def direct(
71 cls,
72 *,
73 dataset_ids: list[str | uuid.UUID],
74 records: dict[str, dict[str, dict[str, list[_Record]]]],
75 ) -> SerializedDatastoreRecordData:
76 """Construct a `SerializedDatastoreRecordData` directly without
77 validators.
79 Parameters
80 ----------
81 dataset_ids : `list` [`str` or `uuid.UUID`]
82 The dataset UUIDs.
83 records : `dict`
84 The datastore records.
86 Notes
87 -----
88 This differs from the pydantic "construct" method in that the
89 arguments are explicitly what the model requires, and it will recurse
90 through members, constructing them from their corresponding `direct`
91 methods.
93 This method should only be called when the inputs are trusted.
94 """
95 data = cls.model_construct(
96 _fields_set={"dataset_ids", "records"},
97 # JSON makes strings out of UUIDs, need to convert them back
98 dataset_ids=[uuid.UUID(id) if isinstance(id, str) else id for id in dataset_ids],
99 records=records,
100 )
102 return data
105@dataclasses.dataclass
106class DatastoreRecordData:
107 """A struct that represents a tabular data export from a single
108 datastore.
109 """
111 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = dataclasses.field(
112 default_factory=dict
113 )
114 """Opaque table data, indexed by dataset ID and grouped by opaque table
115 name."""
117 @staticmethod
118 def merge_mappings(*args: Mapping[str, DatastoreRecordData]) -> dict[str, DatastoreRecordData]:
119 """Merge mappings of datastore record data.
121 Parameters
122 ----------
123 *args : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ]
124 Mappings of record data, keyed by datastore name.
126 Returns
127 -------
128 merged : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ]
129 Merged mapping of record data, keyed by datastore name.
130 """
131 result: dict[str, DatastoreRecordData] = {}
132 for arg in args:
133 for datastore_name, record_data in arg.items():
134 if datastore_name not in result:
135 result[datastore_name] = DatastoreRecordData()
136 result[datastore_name].update(record_data)
137 return result
139 def update(self, other: DatastoreRecordData) -> None:
140 """Update contents of this instance with data from another instance.
142 Parameters
143 ----------
144 other : `DatastoreRecordData`
145 Records to merge into this instance.
147 Notes
148 -----
149 If a ``(dataset_id, table_name)`` combination has any records in
150 ``self``, it is assumed that all records for that combination are
151 already present. This allows duplicates of the same dataset to be
152 handled gracefully.
153 """
154 for dataset_id, table_records in other.records.items():
155 this_table_records = self.records.setdefault(dataset_id, {})
156 for table_name, records in table_records.items():
157 # If this (dataset_id, table_name) combination already has
158 # records in `self`, we assume that means all of the records
159 # for that combination; we require other code to ensure entire
160 # (parent) datasets are exported to these data structures
161 # (never components).
162 if not (this_records := this_table_records.setdefault(table_name, [])):
163 this_records.extend(records)
165 def subset(self, dataset_ids: set[DatasetId]) -> DatastoreRecordData | None:
166 """Extract a subset of the records that match given dataset IDs.
168 Parameters
169 ----------
170 dataset_ids : `set` [ `DatasetId` ]
171 Dataset IDs to match.
173 Returns
174 -------
175 record_data : `DatastoreRecordData` or `None`
176 `None` is returned if there are no matching refs.
178 Notes
179 -----
180 Records in the returned instance are shared with this instance, clients
181 should not update or extend records in the returned instance.
182 """
183 matching_records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
184 for dataset_id in dataset_ids:
185 if (id_records := self.records.get(dataset_id)) is not None:
186 matching_records[dataset_id] = id_records
187 if matching_records:
188 return DatastoreRecordData(records=matching_records)
189 else:
190 return None
192 def to_simple(self, minimal: bool = False) -> SerializedDatastoreRecordData:
193 """Make representation of the object for serialization.
195 Implements `~lsst.daf.butler.json.SupportsSimple` protocol.
197 Parameters
198 ----------
199 minimal : `bool`, optional
200 If True produce minimal representation, not used by this method.
202 Returns
203 -------
204 simple : `dict`
205 Representation of this instance as a simple dictionary.
206 """
207 records: dict[str, dict[str, dict[str, list[_Record]]]] = {}
208 for dataset_id, table_data in self.records.items():
209 for table_name, table_records in table_data.items():
210 class_name, infos = StoredDatastoreItemInfo.to_records(table_records)
211 class_records = records.setdefault(class_name, {})
212 dataset_records = class_records.setdefault(dataset_id.hex, {})
213 dataset_records.setdefault(table_name, []).extend(dict(info) for info in infos)
214 return SerializedDatastoreRecordData(dataset_ids=list(self.records.keys()), records=records)
216 @classmethod
217 def from_simple(
218 cls,
219 simple: SerializedDatastoreRecordData,
220 universe: DimensionUniverse | None = None,
221 registry: Registry | None = None,
222 ) -> DatastoreRecordData:
223 """Make an instance of this class from serialized data.
225 Implements `~lsst.daf.butler.json.SupportsSimple` protocol.
227 Parameters
228 ----------
229 simple : `dict`
230 Serialized representation returned from `to_simple` method.
231 universe : `DimensionUniverse`, optional
232 Dimension universe, not used by this method.
233 registry : `Registry`, optional
234 Registry instance, not used by this method.
236 Returns
237 -------
238 item_info : `StoredDatastoreItemInfo`
239 De-serialized instance of `StoredDatastoreItemInfo`.
240 """
241 cache = PersistenceContextVars.dataStoreRecords.get()
242 key = frozenset(simple.dataset_ids)
243 if cache is not None and (cachedRecord := cache.get(key)) is not None:
244 return cachedRecord
245 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
246 # make sure that all dataset IDs appear in the dict even if they don't
247 # have records.
248 for dataset_id in simple.dataset_ids:
249 records[dataset_id] = {}
250 for class_name, class_data in simple.records.items():
251 for dataset_id_str, dataset_data in class_data.items():
252 for table_name, table_records in dataset_data.items():
253 try:
254 infos = StoredDatastoreItemInfo.from_records(class_name, table_records)
255 except TypeError as exc:
256 raise RuntimeError(
257 "The class specified in the SerializedDatastoreRecordData "
258 f"({class_name}) is not a StoredDatastoreItemInfo."
259 ) from exc
260 dataset_records = records.setdefault(uuid.UUID(dataset_id_str), {})
261 dataset_records.setdefault(table_name, []).extend(infos)
262 newRecord = cls(records=records)
263 if cache is not None:
264 cache[key] = newRecord
265 return newRecord
268class DatastoreRecordTable:
269 """Arrow table representation of datastore records. Contains the same
270 information as ``DatastoreRecordData`` in a flat table format.
272 Parameters
273 ----------
274 table
275 Arrow table containing the file information records.
277 Notes
278 -----
279 Users should not call the constructor directly -- use one of the ``from_*``
280 methods.
281 """
283 def __init__(self, table: pa.Table) -> None:
284 self._table = table
286 @staticmethod
287 def from_arrow(table: pa.Table) -> DatastoreRecordTable:
288 """Create an instance from an external `pyarrow.Table` object.
290 Parameters
291 ----------
292 table
293 `pyarrow.Table` instance with a schema compatible with the one
294 returned by ``DatastoreRecordTable.make_arrow_schema()``.
296 Returns
297 -------
298 table
299 New ``DatastoreRecordTable`` instance backed by the given table.
300 """
301 return DatastoreRecordTable(table.cast(DatastoreRecordTable.make_arrow_schema()))
303 def to_arrow(self) -> pa.Table:
304 """Convert to a raw `pyarrow.Table`."""
305 return self._table
307 def filter_by_datastore_name(self, datastore_name: str) -> DatastoreRecordTable:
308 """Return a table containing only the entries corresponding to the
309 given datastore name.
311 Parameters
312 ----------
313 datastore_name
314 Datastore name to filter on.
316 Return
317 ------
318 table
319 A copy of this table with only the rows that have a
320 ``datastore_name`` column value matching the given value.
321 """
322 return DatastoreRecordTable(self._table.filter(pc.field("datastore_name") == datastore_name))
324 def validate_datastore_names(self, names: Iterable[str]) -> None:
325 """Check that all entries in the ``datastore_name`` column are in the
326 given list of names.
328 Parameters
329 ----------
330 names
331 List of allowed datastore names.
333 Raises
334 ------
335 ValueError
336 If any of the ``datastore_name`` column entries has a value not in
337 the given list.
338 """
339 column = self._table.column("datastore_name")
340 if len(column) == 0:
341 return
343 matches = pc.is_in(column, pa.array(names))
344 if not pc.all(matches).as_py():
345 mismatches = column.filter(pc.invert(matches)).unique().to_pylist()
346 raise ValueError(
347 f"Datastore names '{mismatches}' in table do not match known datastores: '{names}'"
348 )
350 @staticmethod
351 def from_stored_file_info_table(datastore_name: str, table: StoredFileInfoTable) -> DatastoreRecordTable:
352 """Create an instance of ``DatastoreRecordTable`` given datastore
353 records as a ``StoredFileInfoTable``.
355 Parameters
356 ----------
357 datastore_name
358 Datastore name to assign to the ``datastore_name`` column in all
359 rows of the resulting table.
360 table
361 Table of file information records.
363 Returns
364 -------
365 table
366 New ``DatastoreRecordTable`` instance.
367 """
368 column_type = DatastoreRecordTable.make_arrow_schema().field("datastore_name").type
369 datastore_name_column = pa.repeat(pa.scalar(datastore_name, type=column_type), len(table))
370 return DatastoreRecordTable(
371 pa.Table.from_arrays(
372 [datastore_name_column, *table.to_arrow().columns],
373 schema=DatastoreRecordTable.make_arrow_schema(),
374 )
375 )
377 def to_stored_file_info_table(self) -> StoredFileInfoTable:
378 """Convert this table to a ``StoredFileInfoTable``.
380 Returns
381 -------
382 table
383 ``StoredFileInfoTable`` containing a row corresponding to each row
384 of the original table.
385 """
386 table = self._table.drop_columns("datastore_name")
387 return StoredFileInfoTable.from_arrow(table)
389 @cache
390 @staticmethod
391 def make_arrow_schema() -> pa.Schema:
392 """Return the `pyarrow.Schema` for the arrow table."""
393 return StoredFileInfoTable.make_arrow_schema().insert(
394 0, pa.field("datastore_name", pa.dictionary(pa.int8(), pa.string()))
395 )
397 @staticmethod
398 def combine(tables: Iterable[DatastoreRecordTable]) -> DatastoreRecordTable:
399 """Concatenate multiple ``DatastoreRecordTable`` instances into a
400 single table.
402 Parameters
403 ----------
404 tables
405 Tables to combine.
407 Returns
408 -------
409 combined_table
410 ``DatastoreRecordTable`` containing all the rows from all of the
411 given tables.
412 """
413 arrow_tables = [t._table for t in tables]
414 if len(arrow_tables) == 0:
415 return DatastoreRecordTable.create_empty()
417 return DatastoreRecordTable(pa.concat_tables(arrow_tables))
419 @staticmethod
420 def create_empty() -> DatastoreRecordTable:
421 """Create an empty ``DatastoreRecordTable``.
423 Returns
424 -------
425 table
426 New ``DatastoreRecordTable`` instance with no rows.
427 """
428 return DatastoreRecordTable(pa.Table.from_pylist([], schema=DatastoreRecordTable.make_arrow_schema()))
430 def __len__(self) -> int:
431 return len(self._table)