Coverage for python / lsst / daf / butler / datastore / record_data.py: 31%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-23 01:06 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Support for generic data stores.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("DatastoreRecordData", "SerializedDatastoreRecordData") 

33 

34import dataclasses 

35import uuid 

36from collections.abc import Iterable, Mapping 

37from functools import cache 

38from typing import TYPE_CHECKING, TypeAlias 

39 

40import pyarrow as pa 

41import pyarrow.compute as pc 

42import pydantic 

43 

44from .._dataset_ref import DatasetId 

45from ..dimensions import DimensionUniverse 

46from ..persistence_context import PersistenceContextVars 

47from .stored_file_info import StoredDatastoreItemInfo, StoredFileInfoTable 

48 

49if TYPE_CHECKING: 

50 from ..registry import Registry 

51 

52# Pydantic requires the possible value types to be explicitly enumerated in 

53# order for `uuid.UUID` in particular to work. `typing.Any` does not work 

54# here. 

55_Record: TypeAlias = dict[str, int | str | None] 

56 

57 

58class SerializedDatastoreRecordData(pydantic.BaseModel): 

59 """Representation of a `DatastoreRecordData` suitable for serialization.""" 

60 

61 dataset_ids: list[uuid.UUID] 

62 """List of dataset IDs""" 

63 

64 records: Mapping[str, Mapping[str, Mapping[str, list[_Record]]]] 

65 """List of records indexed by record class name, dataset ID (encoded as 

66 str, because JSON), and opaque table name. 

67 """ 

68 

69 @classmethod 

70 def direct( 

71 cls, 

72 *, 

73 dataset_ids: list[str | uuid.UUID], 

74 records: dict[str, dict[str, dict[str, list[_Record]]]], 

75 ) -> SerializedDatastoreRecordData: 

76 """Construct a `SerializedDatastoreRecordData` directly without 

77 validators. 

78 

79 Parameters 

80 ---------- 

81 dataset_ids : `list` [`str` or `uuid.UUID`] 

82 The dataset UUIDs. 

83 records : `dict` 

84 The datastore records. 

85 

86 Notes 

87 ----- 

88 This differs from the pydantic "construct" method in that the 

89 arguments are explicitly what the model requires, and it will recurse 

90 through members, constructing them from their corresponding `direct` 

91 methods. 

92 

93 This method should only be called when the inputs are trusted. 

94 """ 

95 data = cls.model_construct( 

96 _fields_set={"dataset_ids", "records"}, 

97 # JSON makes strings out of UUIDs, need to convert them back 

98 dataset_ids=[uuid.UUID(id) if isinstance(id, str) else id for id in dataset_ids], 

99 records=records, 

100 ) 

101 

102 return data 

103 

104 

105@dataclasses.dataclass 

106class DatastoreRecordData: 

107 """A struct that represents a tabular data export from a single 

108 datastore. 

109 """ 

110 

111 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = dataclasses.field( 

112 default_factory=dict 

113 ) 

114 """Opaque table data, indexed by dataset ID and grouped by opaque table 

115 name.""" 

116 

117 @staticmethod 

118 def merge_mappings(*args: Mapping[str, DatastoreRecordData]) -> dict[str, DatastoreRecordData]: 

119 """Merge mappings of datastore record data. 

120 

121 Parameters 

122 ---------- 

123 *args : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ] 

124 Mappings of record data, keyed by datastore name. 

125 

126 Returns 

127 ------- 

128 merged : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ] 

129 Merged mapping of record data, keyed by datastore name. 

130 """ 

131 result: dict[str, DatastoreRecordData] = {} 

132 for arg in args: 

133 for datastore_name, record_data in arg.items(): 

134 if datastore_name not in result: 

135 result[datastore_name] = DatastoreRecordData() 

136 result[datastore_name].update(record_data) 

137 return result 

138 

139 def update(self, other: DatastoreRecordData) -> None: 

140 """Update contents of this instance with data from another instance. 

141 

142 Parameters 

143 ---------- 

144 other : `DatastoreRecordData` 

145 Records to merge into this instance. 

146 

147 Notes 

148 ----- 

149 If a ``(dataset_id, table_name)`` combination has any records in 

150 ``self``, it is assumed that all records for that combination are 

151 already present. This allows duplicates of the same dataset to be 

152 handled gracefully. 

153 """ 

154 for dataset_id, table_records in other.records.items(): 

155 this_table_records = self.records.setdefault(dataset_id, {}) 

156 for table_name, records in table_records.items(): 

157 # If this (dataset_id, table_name) combination already has 

158 # records in `self`, we assume that means all of the records 

159 # for that combination; we require other code to ensure entire 

160 # (parent) datasets are exported to these data structures 

161 # (never components). 

162 if not (this_records := this_table_records.setdefault(table_name, [])): 

163 this_records.extend(records) 

164 

165 def subset(self, dataset_ids: set[DatasetId]) -> DatastoreRecordData | None: 

166 """Extract a subset of the records that match given dataset IDs. 

167 

168 Parameters 

169 ---------- 

170 dataset_ids : `set` [ `DatasetId` ] 

171 Dataset IDs to match. 

172 

173 Returns 

174 ------- 

175 record_data : `DatastoreRecordData` or `None` 

176 `None` is returned if there are no matching refs. 

177 

178 Notes 

179 ----- 

180 Records in the returned instance are shared with this instance, clients 

181 should not update or extend records in the returned instance. 

182 """ 

183 matching_records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

184 for dataset_id in dataset_ids: 

185 if (id_records := self.records.get(dataset_id)) is not None: 

186 matching_records[dataset_id] = id_records 

187 if matching_records: 

188 return DatastoreRecordData(records=matching_records) 

189 else: 

190 return None 

191 

192 def to_simple(self, minimal: bool = False) -> SerializedDatastoreRecordData: 

193 """Make representation of the object for serialization. 

194 

195 Implements `~lsst.daf.butler.json.SupportsSimple` protocol. 

196 

197 Parameters 

198 ---------- 

199 minimal : `bool`, optional 

200 If True produce minimal representation, not used by this method. 

201 

202 Returns 

203 ------- 

204 simple : `dict` 

205 Representation of this instance as a simple dictionary. 

206 """ 

207 records: dict[str, dict[str, dict[str, list[_Record]]]] = {} 

208 for dataset_id, table_data in self.records.items(): 

209 for table_name, table_records in table_data.items(): 

210 class_name, infos = StoredDatastoreItemInfo.to_records(table_records) 

211 class_records = records.setdefault(class_name, {}) 

212 dataset_records = class_records.setdefault(dataset_id.hex, {}) 

213 dataset_records.setdefault(table_name, []).extend(dict(info) for info in infos) 

214 return SerializedDatastoreRecordData(dataset_ids=list(self.records.keys()), records=records) 

215 

216 @classmethod 

217 def from_simple( 

218 cls, 

219 simple: SerializedDatastoreRecordData, 

220 universe: DimensionUniverse | None = None, 

221 registry: Registry | None = None, 

222 ) -> DatastoreRecordData: 

223 """Make an instance of this class from serialized data. 

224 

225 Implements `~lsst.daf.butler.json.SupportsSimple` protocol. 

226 

227 Parameters 

228 ---------- 

229 simple : `dict` 

230 Serialized representation returned from `to_simple` method. 

231 universe : `DimensionUniverse`, optional 

232 Dimension universe, not used by this method. 

233 registry : `Registry`, optional 

234 Registry instance, not used by this method. 

235 

236 Returns 

237 ------- 

238 item_info : `StoredDatastoreItemInfo` 

239 De-serialized instance of `StoredDatastoreItemInfo`. 

240 """ 

241 cache = PersistenceContextVars.dataStoreRecords.get() 

242 key = frozenset(simple.dataset_ids) 

243 if cache is not None and (cachedRecord := cache.get(key)) is not None: 

244 return cachedRecord 

245 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

246 # make sure that all dataset IDs appear in the dict even if they don't 

247 # have records. 

248 for dataset_id in simple.dataset_ids: 

249 records[dataset_id] = {} 

250 for class_name, class_data in simple.records.items(): 

251 for dataset_id_str, dataset_data in class_data.items(): 

252 for table_name, table_records in dataset_data.items(): 

253 try: 

254 infos = StoredDatastoreItemInfo.from_records(class_name, table_records) 

255 except TypeError as exc: 

256 raise RuntimeError( 

257 "The class specified in the SerializedDatastoreRecordData " 

258 f"({class_name}) is not a StoredDatastoreItemInfo." 

259 ) from exc 

260 dataset_records = records.setdefault(uuid.UUID(dataset_id_str), {}) 

261 dataset_records.setdefault(table_name, []).extend(infos) 

262 newRecord = cls(records=records) 

263 if cache is not None: 

264 cache[key] = newRecord 

265 return newRecord 

266 

267 

268class DatastoreRecordTable: 

269 """Arrow table representation of datastore records. Contains the same 

270 information as ``DatastoreRecordData`` in a flat table format. 

271 

272 Parameters 

273 ---------- 

274 table 

275 Arrow table containing the file information records. 

276 

277 Notes 

278 ----- 

279 Users should not call the constructor directly -- use one of the ``from_*`` 

280 methods. 

281 """ 

282 

283 def __init__(self, table: pa.Table) -> None: 

284 self._table = table 

285 

286 @staticmethod 

287 def from_arrow(table: pa.Table) -> DatastoreRecordTable: 

288 """Create an instance from an external `pyarrow.Table` object. 

289 

290 Parameters 

291 ---------- 

292 table 

293 `pyarrow.Table` instance with a schema compatible with the one 

294 returned by ``DatastoreRecordTable.make_arrow_schema()``. 

295 

296 Returns 

297 ------- 

298 table 

299 New ``DatastoreRecordTable`` instance backed by the given table. 

300 """ 

301 return DatastoreRecordTable(table.cast(DatastoreRecordTable.make_arrow_schema())) 

302 

303 def to_arrow(self) -> pa.Table: 

304 """Convert to a raw `pyarrow.Table`.""" 

305 return self._table 

306 

307 def filter_by_datastore_name(self, datastore_name: str) -> DatastoreRecordTable: 

308 """Return a table containing only the entries corresponding to the 

309 given datastore name. 

310 

311 Parameters 

312 ---------- 

313 datastore_name 

314 Datastore name to filter on. 

315 

316 Return 

317 ------ 

318 table 

319 A copy of this table with only the rows that have a 

320 ``datastore_name`` column value matching the given value. 

321 """ 

322 return DatastoreRecordTable(self._table.filter(pc.field("datastore_name") == datastore_name)) 

323 

324 def validate_datastore_names(self, names: Iterable[str]) -> None: 

325 """Check that all entries in the ``datastore_name`` column are in the 

326 given list of names. 

327 

328 Parameters 

329 ---------- 

330 names 

331 List of allowed datastore names. 

332 

333 Raises 

334 ------ 

335 ValueError 

336 If any of the ``datastore_name`` column entries has a value not in 

337 the given list. 

338 """ 

339 column = self._table.column("datastore_name") 

340 if len(column) == 0: 

341 return 

342 

343 matches = pc.is_in(column, pa.array(names)) 

344 if not pc.all(matches).as_py(): 

345 mismatches = column.filter(pc.invert(matches)).unique().to_pylist() 

346 raise ValueError( 

347 f"Datastore names '{mismatches}' in table do not match known datastores: '{names}'" 

348 ) 

349 

350 @staticmethod 

351 def from_stored_file_info_table(datastore_name: str, table: StoredFileInfoTable) -> DatastoreRecordTable: 

352 """Create an instance of ``DatastoreRecordTable`` given datastore 

353 records as a ``StoredFileInfoTable``. 

354 

355 Parameters 

356 ---------- 

357 datastore_name 

358 Datastore name to assign to the ``datastore_name`` column in all 

359 rows of the resulting table. 

360 table 

361 Table of file information records. 

362 

363 Returns 

364 ------- 

365 table 

366 New ``DatastoreRecordTable`` instance. 

367 """ 

368 column_type = DatastoreRecordTable.make_arrow_schema().field("datastore_name").type 

369 datastore_name_column = pa.repeat(pa.scalar(datastore_name, type=column_type), len(table)) 

370 return DatastoreRecordTable( 

371 pa.Table.from_arrays( 

372 [datastore_name_column, *table.to_arrow().columns], 

373 schema=DatastoreRecordTable.make_arrow_schema(), 

374 ) 

375 ) 

376 

377 def to_stored_file_info_table(self) -> StoredFileInfoTable: 

378 """Convert this table to a ``StoredFileInfoTable``. 

379 

380 Returns 

381 ------- 

382 table 

383 ``StoredFileInfoTable`` containing a row corresponding to each row 

384 of the original table. 

385 """ 

386 table = self._table.drop_columns("datastore_name") 

387 return StoredFileInfoTable.from_arrow(table) 

388 

389 @cache 

390 @staticmethod 

391 def make_arrow_schema() -> pa.Schema: 

392 """Return the `pyarrow.Schema` for the arrow table.""" 

393 return StoredFileInfoTable.make_arrow_schema().insert( 

394 0, pa.field("datastore_name", pa.dictionary(pa.int8(), pa.string())) 

395 ) 

396 

397 @staticmethod 

398 def combine(tables: Iterable[DatastoreRecordTable]) -> DatastoreRecordTable: 

399 """Concatenate multiple ``DatastoreRecordTable`` instances into a 

400 single table. 

401 

402 Parameters 

403 ---------- 

404 tables 

405 Tables to combine. 

406 

407 Returns 

408 ------- 

409 combined_table 

410 ``DatastoreRecordTable`` containing all the rows from all of the 

411 given tables. 

412 """ 

413 arrow_tables = [t._table for t in tables] 

414 if len(arrow_tables) == 0: 

415 return DatastoreRecordTable.create_empty() 

416 

417 return DatastoreRecordTable(pa.concat_tables(arrow_tables)) 

418 

419 @staticmethod 

420 def create_empty() -> DatastoreRecordTable: 

421 """Create an empty ``DatastoreRecordTable``. 

422 

423 Returns 

424 ------- 

425 table 

426 New ``DatastoreRecordTable`` instance with no rows. 

427 """ 

428 return DatastoreRecordTable(pa.Table.from_pylist([], schema=DatastoreRecordTable.make_arrow_schema())) 

429 

430 def __len__(self) -> int: 

431 return len(self._table)