Coverage for python / lsst / daf / butler / registry / bridge / monolithic.py: 25%

122 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:07 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29from ... import ddl 

30 

31__all__ = ("MonolithicDatastoreRegistryBridge", "MonolithicDatastoreRegistryBridgeManager") 

32 

33from collections import namedtuple 

34from collections.abc import Collection, Iterable, Iterator 

35from contextlib import contextmanager 

36from typing import TYPE_CHECKING, cast 

37 

38import sqlalchemy 

39 

40from lsst.utils.iteration import chunk_iterable 

41 

42from ..._dataset_ref import DatasetId 

43from ...datastore.stored_file_info import StoredDatastoreItemInfo 

44from ..interfaces import ( 

45 DatasetIdRef, 

46 DatastoreRegistryBridge, 

47 DatastoreRegistryBridgeManager, 

48 FakeDatasetRef, 

49 OpaqueTableStorage, 

50 VersionTuple, 

51) 

52from ..opaque import ByNameOpaqueTableStorage 

53from .ephemeral import EphemeralDatastoreRegistryBridge 

54 

55if TYPE_CHECKING: 

56 from ...datastore import DatastoreTransaction 

57 from ...dimensions import DimensionUniverse 

58 from ..interfaces import ( 

59 Database, 

60 DatasetRecordStorageManager, 

61 OpaqueTableStorageManager, 

62 StaticTablesContext, 

63 ) 

64 

65_TablesTuple = namedtuple( 

66 "_TablesTuple", 

67 [ 

68 "dataset_location", 

69 "dataset_location_trash", 

70 ], 

71) 

72 

73# This has to be updated on every schema change 

74_VERSION = VersionTuple(0, 2, 1) 

75 

76 

77def _makeTableSpecs(datasets: type[DatasetRecordStorageManager]) -> _TablesTuple: 

78 """Construct specifications for tables used by the monolithic datastore 

79 bridge classes. 

80 

81 Parameters 

82 ---------- 

83 datasets : subclass of `DatasetRecordStorageManager` 

84 Manager class for datasets; used only to create foreign key fields. 

85 

86 Returns 

87 ------- 

88 specs : `_TablesTuple` 

89 A named tuple containing `ddl.TableSpec` instances. 

90 """ 

91 # We want the dataset_location and dataset_location_trash tables 

92 # to have the same definition, aside from the behavior of their link 

93 # to the dataset table: the trash table has no foreign key constraint. 

94 # The order of columns in dataset_location_trash is reversed, it is more 

95 # optimal for query planner. 

96 

97 datastore_field = ddl.FieldSpec( 

98 name="datastore_name", 

99 dtype=sqlalchemy.String, 

100 length=256, 

101 primaryKey=True, 

102 nullable=False, 

103 doc="Name of the Datastore this entry corresponds to.", 

104 ) 

105 

106 dataset_location = ddl.TableSpec( 

107 doc=( 

108 "A table that provides information on whether a dataset is stored in " 

109 "one or more Datastores. The presence or absence of a record in this " 

110 "table itself indicates whether the dataset is present in that " 

111 "Datastore. " 

112 ), 

113 fields=[datastore_field], 

114 ) 

115 datasets.addDatasetForeignKey(dataset_location, primaryKey=True) 

116 

117 dataset_location_trash = ddl.TableSpec( 

118 doc="A table that keeps iinformation about datasets that are removed from Datastores.", 

119 fields=[], 

120 ) 

121 datasets.addDatasetForeignKey(dataset_location_trash, primaryKey=True, constraint=False) 

122 dataset_location_trash.fields.add(datastore_field) 

123 

124 return _TablesTuple( 

125 dataset_location=dataset_location, 

126 dataset_location_trash=dataset_location_trash, 

127 ) 

128 

129 

130class MonolithicDatastoreRegistryBridge(DatastoreRegistryBridge): 

131 """An implementation of `DatastoreRegistryBridge` that uses the same two 

132 tables for all non-ephemeral datastores. 

133 

134 Parameters 

135 ---------- 

136 datastoreName : `str` 

137 Name of the `Datastore` as it should appear in `Registry` tables 

138 referencing it. 

139 db : `Database` 

140 Object providing a database connection and generic distractions. 

141 tables : `_TablesTuple` 

142 Named tuple containing `sqlalchemy.schema.Table` instances. 

143 """ 

144 

145 def __init__(self, datastoreName: str, *, db: Database, tables: _TablesTuple): 

146 super().__init__(datastoreName) 

147 self._db = db 

148 self._tables = tables 

149 

150 def _refsToRows(self, refs: Iterable[DatasetIdRef]) -> list[dict]: 

151 """Transform an iterable of `DatasetRef` or `FakeDatasetRef` objects to 

152 a list of dictionaries that match the schema of the tables used by this 

153 class. 

154 

155 Parameters 

156 ---------- 

157 refs : `~collections.abc.Iterable` [ `DatasetRef` or `FakeDatasetRef` ] 

158 Datasets to transform. 

159 

160 Returns 

161 ------- 

162 rows : `list` [ `dict` ] 

163 List of dictionaries, with "datastoreName" and "dataset_id" keys. 

164 """ 

165 return [{"datastore_name": self.datastoreName, "dataset_id": ref.id} for ref in refs] 

166 

167 def ensure(self, refs: Iterable[DatasetIdRef]) -> None: 

168 # Docstring inherited from DatastoreRegistryBridge 

169 self._db.ensure(self._tables.dataset_location, *self._refsToRows(refs)) 

170 

171 def insert(self, refs: Iterable[DatasetIdRef]) -> None: 

172 # Docstring inherited from DatastoreRegistryBridge 

173 self._db.insert(self._tables.dataset_location, *self._refsToRows(refs)) 

174 

175 def forget(self, refs: Iterable[DatasetIdRef]) -> None: 

176 # Docstring inherited from DatastoreRegistryBridge 

177 with self._db.transaction(): 

178 # The list of IDs can be very large, split it into reasonable size 

179 # chunks to avoid hitting limits. 

180 for refs_chunk in chunk_iterable(refs, 50_000): 

181 dataset_ids = [ref.id for ref in refs_chunk] 

182 where = sqlalchemy.sql.and_( 

183 self._tables.dataset_location.columns.datastore_name == self.datastoreName, 

184 self._tables.dataset_location.columns.dataset_id.in_(dataset_ids), 

185 ) 

186 self._db.deleteWhere(self._tables.dataset_location, where) 

187 

188 def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: DatastoreTransaction | None) -> None: 

189 # Docstring inherited from DatastoreRegistryBridge 

190 location = self._tables.dataset_location 

191 location_trash = self._tables.dataset_location_trash 

192 with self._db.transaction(): 

193 for refs_chunk in chunk_iterable(refs, 50_000): 

194 # We only want to move IDs that actually exist in the 

195 # dataset_location table. Instead of querying for existing IDs, 

196 # which would need an extra query, we use INSERT ... SELECT 

197 # and DELETE using WHERE clause that limits operations to 

198 # existing IDs. 

199 dataset_ids = [ref.id for ref in refs_chunk] 

200 

201 where = sqlalchemy.sql.and_( 

202 location.columns.datastore_name == self.datastoreName, 

203 location.columns.dataset_id.in_(dataset_ids), 

204 ) 

205 

206 select = ( 

207 sqlalchemy.sql.select(location.columns.datastore_name, location.columns.dataset_id) 

208 .where(where) 

209 .with_for_update() 

210 ) 

211 self._db.insert(location_trash, select=select) 

212 

213 self._db.deleteWhere(location, where) 

214 

215 def check(self, datasets: Iterable[DatasetId]) -> set[DatasetId]: 

216 # Docstring inherited from DatastoreRegistryBridge 

217 found: set[DatasetId] = set() 

218 with self._db.session(): 

219 for batch in chunk_iterable(datasets, 50000): 

220 sql = ( 

221 sqlalchemy.sql.select(self._tables.dataset_location.columns.dataset_id) 

222 .select_from(self._tables.dataset_location) 

223 .where( 

224 sqlalchemy.sql.and_( 

225 self._tables.dataset_location.columns.datastore_name == self.datastoreName, 

226 self._tables.dataset_location.columns.dataset_id.in_(batch), 

227 ) 

228 ) 

229 ) 

230 with self._db.query(sql) as sql_result: 

231 sql_ids = sql_result.scalars().all() 

232 found.update(sql_ids) 

233 

234 return found 

235 

236 @contextmanager 

237 def emptyTrash( 

238 self, 

239 records_table: OpaqueTableStorage | None = None, 

240 record_class: type[StoredDatastoreItemInfo] | None = None, 

241 record_column: str | None = None, 

242 selected_ids: Collection[DatasetId] | None = None, 

243 dry_run: bool = False, 

244 ) -> Iterator[tuple[Iterable[tuple[DatasetIdRef, StoredDatastoreItemInfo | None]], set[str] | None]]: 

245 # Docstring inherited from DatastoreRegistryBridge 

246 

247 if records_table is None: 

248 raise ValueError("This implementation requires a records table.") 

249 

250 assert isinstance(records_table, ByNameOpaqueTableStorage), ( 

251 f"Records table must support hidden attributes. Got {type(records_table)}." 

252 ) 

253 

254 if record_class is None: 

255 raise ValueError("Record class must be provided if records table is given.") 

256 

257 # Helper closure to generate the common join+where clause. 

258 def join_records( 

259 select: sqlalchemy.sql.Select, location_table: sqlalchemy.schema.Table 

260 ) -> sqlalchemy.sql.Select: 

261 # mypy needs to be sure 

262 assert isinstance(records_table, ByNameOpaqueTableStorage) 

263 return select.select_from( 

264 records_table._table.join( 

265 location_table, 

266 onclause=records_table._table.columns.dataset_id == location_table.columns.dataset_id, 

267 ) 

268 ).where(location_table.columns.datastore_name == self.datastoreName) 

269 

270 # SELECT records.dataset_id, records.path FROM records 

271 # JOIN records on dataset_location.dataset_id == records.dataset_id 

272 # WHERE dataset_location.datastore_name = datastoreName 

273 

274 # It's possible that we may end up with a ref listed in the trash 

275 # table that is not listed in the records table. Such an 

276 # inconsistency would be missed by this query. 

277 info_in_trash = join_records(records_table._table.select(), self._tables.dataset_location_trash) 

278 if selected_ids: 

279 info_in_trash = info_in_trash.where( 

280 self._tables.dataset_location_trash.columns["dataset_id"].in_(selected_ids) 

281 ) 

282 info_in_trash = info_in_trash.with_for_update(skip_locked=True) 

283 

284 # Run query, transform results into a list of dicts that we can later 

285 # use to delete. 

286 with self._db.query(info_in_trash) as sql_result: 

287 rows = [dict(row, datastore_name=self.datastoreName) for row in sql_result.mappings()] 

288 

289 # It is possible for trashed refs to be linked to artifacts that 

290 # are still associated with refs that are not to be trashed. We 

291 # need to be careful to consider those and indicate to the caller 

292 # that those artifacts should be retained. Can only do this check 

293 # if the caller provides a column name that can map to multiple 

294 # refs. 

295 preserved: set[str] | None = None 

296 if record_column is not None: 

297 # Some helper subqueries 

298 items_not_in_trash = join_records( 

299 sqlalchemy.sql.select(records_table._table.columns[record_column]), 

300 self._tables.dataset_location, 

301 ).alias("items_not_in_trash") 

302 items_in_trash = join_records( 

303 sqlalchemy.sql.select(records_table._table.columns[record_column]), 

304 self._tables.dataset_location_trash, 

305 ) 

306 if selected_ids: 

307 items_in_trash = items_in_trash.where( 

308 self._tables.dataset_location_trash.columns["dataset_id"].in_(selected_ids) 

309 ) 

310 items_in_trash_alias = items_in_trash.alias("items_in_trash") 

311 

312 # A query for paths that are referenced by datasets in the trash 

313 # and datasets not in the trash. 

314 items_to_preserve = sqlalchemy.sql.select( 

315 items_in_trash_alias.columns[record_column] 

316 ).select_from( 

317 items_not_in_trash.join( 

318 items_in_trash_alias, 

319 onclause=items_in_trash_alias.columns[record_column] 

320 == items_not_in_trash.columns[record_column], 

321 ) 

322 ) 

323 with self._db.query(items_to_preserve) as sql_result: 

324 preserved = {row[record_column] for row in sql_result.mappings()} 

325 

326 # Convert results to a tuple of id+info and a record of the artifacts 

327 # that should not be deleted from datastore. The id+info tuple is 

328 # solely to allow logging to report the relevant ID. 

329 id_info = ((FakeDatasetRef(row["dataset_id"]), record_class.from_record(row)) for row in rows) 

330 

331 # Start contextmanager, return results 

332 yield ((id_info, preserved)) 

333 

334 # No exception raised in context manager block. 

335 if not rows or dry_run: 

336 return 

337 

338 # Delete the rows from the records table 

339 records_table.delete(["dataset_id"], *[{"dataset_id": row["dataset_id"]} for row in rows]) 

340 

341 # Delete those rows from the trash table. 

342 self._db.delete( 

343 self._tables.dataset_location_trash, 

344 ["dataset_id", "datastore_name"], 

345 *[{"dataset_id": row["dataset_id"], "datastore_name": row["datastore_name"]} for row in rows], 

346 ) 

347 

348 

349class MonolithicDatastoreRegistryBridgeManager(DatastoreRegistryBridgeManager): 

350 """An implementation of `DatastoreRegistryBridgeManager` that uses the same 

351 two tables for all non-ephemeral datastores. 

352 

353 Parameters 

354 ---------- 

355 db : `Database` 

356 Object providing a database connection and generic distractions. 

357 tables : `_TablesTuple` 

358 Named tuple containing `sqlalchemy.schema.Table` instances. 

359 opaque : `OpaqueTableStorageManager` 

360 Manager object for opaque table storage in the `Registry`. 

361 universe : `DimensionUniverse` 

362 All dimensions know to the `Registry`. 

363 registry_schema_version : `VersionTuple` or `None`, optional 

364 The version of the registry schema. 

365 """ 

366 

367 def __init__( 

368 self, 

369 *, 

370 db: Database, 

371 tables: _TablesTuple, 

372 opaque: OpaqueTableStorageManager, 

373 universe: DimensionUniverse, 

374 registry_schema_version: VersionTuple | None = None, 

375 ): 

376 super().__init__( 

377 opaque=opaque, 

378 universe=universe, 

379 registry_schema_version=registry_schema_version, 

380 ) 

381 self._db = db 

382 self._tables = tables 

383 self._ephemeral: dict[str, EphemeralDatastoreRegistryBridge] = {} 

384 

385 def clone(self, *, db: Database, opaque: OpaqueTableStorageManager) -> DatastoreRegistryBridgeManager: 

386 return MonolithicDatastoreRegistryBridgeManager( 

387 db=db, 

388 tables=self._tables, 

389 opaque=opaque, 

390 universe=self.universe, 

391 registry_schema_version=self._registry_schema_version, 

392 ) 

393 

394 @classmethod 

395 def initialize( 

396 cls, 

397 db: Database, 

398 context: StaticTablesContext, 

399 *, 

400 opaque: OpaqueTableStorageManager, 

401 datasets: type[DatasetRecordStorageManager], 

402 universe: DimensionUniverse, 

403 registry_schema_version: VersionTuple | None = None, 

404 ) -> DatastoreRegistryBridgeManager: 

405 # Docstring inherited from DatastoreRegistryBridge 

406 tables = context.addTableTuple(_makeTableSpecs(datasets)) 

407 return cls( 

408 db=db, 

409 tables=cast(_TablesTuple, tables), 

410 opaque=opaque, 

411 universe=universe, 

412 registry_schema_version=registry_schema_version, 

413 ) 

414 

415 def refresh(self) -> None: 

416 # Docstring inherited from DatastoreRegistryBridge 

417 # This implementation has no in-Python state that depends on which 

418 # datastores exist, so there's nothing to do. 

419 pass 

420 

421 def register(self, name: str, *, ephemeral: bool = False) -> DatastoreRegistryBridge: 

422 # Docstring inherited from DatastoreRegistryBridge 

423 if ephemeral: 

424 return self._ephemeral.setdefault(name, EphemeralDatastoreRegistryBridge(name)) 

425 return MonolithicDatastoreRegistryBridge(name, db=self._db, tables=self._tables) 

426 

427 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]: 

428 # Docstring inherited from DatastoreRegistryBridge 

429 sql = ( 

430 sqlalchemy.sql.select(self._tables.dataset_location.columns.datastore_name) 

431 .select_from(self._tables.dataset_location) 

432 .where(self._tables.dataset_location.columns.dataset_id == ref.id) 

433 ) 

434 with self._db.query(sql) as sql_result: 

435 sql_rows = sql_result.mappings().fetchall() 

436 for row in sql_rows: 

437 yield row[self._tables.dataset_location.columns.datastore_name] 

438 for name, bridge in self._ephemeral.items(): 

439 if ref in bridge: 

440 yield name 

441 

442 @classmethod 

443 def currentVersions(cls) -> list[VersionTuple]: 

444 # Docstring inherited from VersionedExtension. 

445 return [_VERSION]