Coverage for python / lsst / daf / butler / registry / opaque.py: 28%

83 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:22 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""The default concrete implementations of the classes that manage 

29opaque tables for `Registry`. 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = ["ByNameOpaqueTableStorage", "ByNameOpaqueTableStorageManager"] 

35 

36import itertools 

37from collections.abc import Iterable, Iterator, Sequence 

38from typing import TYPE_CHECKING, Any, ClassVar 

39 

40import sqlalchemy 

41 

42from .._utilities.thread_safe_cache import ThreadSafeCache 

43from ..ddl import FieldSpec, TableSpec 

44from .interfaces import ( 

45 Database, 

46 OpaqueTableStorage, 

47 OpaqueTableStorageManager, 

48 StaticTablesContext, 

49 VersionTuple, 

50) 

51 

52if TYPE_CHECKING: 

53 from ..datastore import DatastoreTransaction 

54 

55# This has to be updated on every schema change 

56_VERSION = VersionTuple(0, 2, 0) 

57 

58 

59class ByNameOpaqueTableStorage(OpaqueTableStorage): 

60 """An implementation of `OpaqueTableStorage` that simply creates a true 

61 table for each different named opaque logical table. 

62 

63 A `ByNameOpaqueTableStorageManager` instance should always be used to 

64 construct and manage instances of this class. 

65 

66 Parameters 

67 ---------- 

68 db : `Database` 

69 Database engine interface for the namespace in which this table lives. 

70 name : `str` 

71 Name of the logical table (also used as the name of the actual table). 

72 table : `sqlalchemy.schema.Table` 

73 SQLAlchemy representation of the table, which must have already been 

74 created in the namespace managed by ``db`` (this is the responsibility 

75 of `ByNameOpaqueTableStorageManager`). 

76 """ 

77 

78 def __init__(self, *, db: Database, name: str, table: sqlalchemy.schema.Table): 

79 super().__init__(name=name) 

80 self._db = db 

81 self._table = table 

82 

83 def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

84 # Docstring inherited from OpaqueTableStorage. 

85 # The provided transaction object can be ignored since we rely on 

86 # the database itself providing any rollback functionality. 

87 self._db.insert(self._table, *data) 

88 

89 def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

90 # Docstring inherited from OpaqueTableStorage. 

91 # The provided transaction object can be ignored since we rely on 

92 # the database itself providing any rollback functionality. 

93 self._db.ensure(self._table, *data) 

94 

95 def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

96 # Docstring inherited from OpaqueTableStorage. 

97 # The provided transaction object can be ignored since we rely on 

98 # the database itself providing any rollback functionality. 

99 self._db.replace(self._table, *data) 

100 

101 def fetch( 

102 self, 

103 **where: Any, 

104 ) -> Iterator[sqlalchemy.RowMapping]: 

105 # Docstring inherited from OpaqueTableStorage. 

106 for batch in self.fetch_batches(**where): 

107 yield from batch 

108 

109 def fetch_batches( 

110 self, 

111 **where: Any, 

112 ) -> Iterator[Sequence[sqlalchemy.RowMapping]]: 

113 def _batch_in_clause( 

114 column: sqlalchemy.schema.Column, values: Iterable[Any] 

115 ) -> Iterator[sqlalchemy.sql.expression.ClauseElement]: 

116 """Split one long IN clause into a series of shorter ones.""" 

117 in_limit = 1000 

118 # We have to remove possible duplicates from values; and in many 

119 # cases it should be helpful to order the items in the clause. 

120 values = sorted(set(values)) 

121 for iposn in range(0, len(values), in_limit): 

122 in_clause = column.in_(values[iposn : iposn + in_limit]) 

123 yield in_clause 

124 

125 def _batch_in_clauses(**where: Any) -> Iterator[sqlalchemy.sql.expression.ColumnElement]: 

126 """Generate a sequence of WHERE clauses with a limited number of 

127 items in IN clauses. 

128 """ 

129 batches: list[Iterable[Any]] = [] 

130 for k, v in where.items(): 

131 column = self._table.columns[k] 

132 if isinstance(v, list | tuple | set): 

133 batches.append(_batch_in_clause(column, v)) 

134 else: 

135 if isinstance(v, str) and v.endswith("%"): 

136 # Special case prefix queries. 

137 batches.append([column.startswith(v[:-1])]) 

138 else: 

139 # single "batch" for a regular eq operator 

140 batches.append([column == v]) 

141 

142 for clauses in itertools.product(*batches): 

143 yield sqlalchemy.sql.and_(*clauses) 

144 

145 sql = self._table.select() 

146 if where: 

147 # Split long IN clauses into shorter batches 

148 batched_sql = [sql.where(clause) for clause in _batch_in_clauses(**where)] 

149 else: 

150 batched_sql = [sql] 

151 for sql_batch in batched_sql: 

152 with self._db.query(sql_batch) as sql_result: 

153 sql_mappings = sql_result.mappings().fetchall() 

154 yield sql_mappings 

155 

156 def delete(self, columns: Iterable[str], *rows: dict) -> None: 

157 # Docstring inherited from OpaqueTableStorage. 

158 self._db.delete(self._table, columns, *rows) 

159 

160 

161class ByNameOpaqueTableStorageManager(OpaqueTableStorageManager): 

162 """An implementation of `OpaqueTableStorageManager` that simply creates a 

163 true table for each different named opaque logical table. 

164 

165 Instances of this class should generally be constructed via the 

166 `initialize` class method instead of invoking ``__init__`` directly. 

167 

168 Parameters 

169 ---------- 

170 db : `Database` 

171 Database engine interface for the namespace in which this table lives. 

172 metaTable : `sqlalchemy.schema.Table` 

173 SQLAlchemy representation of the table that records which opaque 

174 logical tables exist. 

175 tables : `ThreadSafeCache` [`str`, `~sqlalchemy.schema.Table`] 

176 Mapping from string to table, to track which tables have already been 

177 created. This mapping is shared between cloned instances of this 

178 manager. 

179 registry_schema_version : `VersionTuple` or `None`, optional 

180 Version of registry schema. 

181 """ 

182 

183 def __init__( 

184 self, 

185 db: Database, 

186 metaTable: sqlalchemy.schema.Table, 

187 tables: ThreadSafeCache[str, sqlalchemy.schema.Table], 

188 registry_schema_version: VersionTuple | None = None, 

189 ): 

190 super().__init__(registry_schema_version=registry_schema_version) 

191 self._db = db 

192 self._metaTable = metaTable 

193 self._tables = tables 

194 

195 def clone(self, db: Database) -> ByNameOpaqueTableStorageManager: 

196 return ByNameOpaqueTableStorageManager( 

197 db, self._metaTable, self._tables, self._registry_schema_version 

198 ) 

199 

200 _META_TABLE_NAME: ClassVar[str] = "opaque_meta" 

201 

202 _META_TABLE_SPEC: ClassVar[TableSpec] = TableSpec( 

203 fields=[ 

204 FieldSpec("table_name", dtype=sqlalchemy.String, length=128, primaryKey=True), 

205 ], 

206 ) 

207 

208 @classmethod 

209 def initialize( 

210 cls, db: Database, context: StaticTablesContext, registry_schema_version: VersionTuple | None = None 

211 ) -> OpaqueTableStorageManager: 

212 # Docstring inherited from OpaqueTableStorageManager. 

213 metaTable = context.addTable(cls._META_TABLE_NAME, cls._META_TABLE_SPEC) 

214 return cls( 

215 db=db, 

216 metaTable=metaTable, 

217 tables=ThreadSafeCache(), 

218 registry_schema_version=registry_schema_version, 

219 ) 

220 

221 def get(self, name: str) -> OpaqueTableStorage | None: 

222 # Docstring inherited from OpaqueTableStorageManager. 

223 table = self._tables.get(name) 

224 if table is None: 

225 return None 

226 return ByNameOpaqueTableStorage(name=name, table=table, db=self._db) 

227 

228 def register(self, name: str, spec: TableSpec) -> OpaqueTableStorage: 

229 # Docstring inherited from OpaqueTableStorageManager. 

230 result = self.get(name) 

231 if result is None: 

232 # Create the table itself. If it already exists but wasn't in 

233 # the dict because it was added by another client since this one 

234 # was initialized, that's fine. 

235 table = self._db.ensureTableExists(name, spec) 

236 # Add a row to the meta table so we can find this table in the 

237 # future. Also okay if that already exists, so we use sync. 

238 self._db.sync(self._metaTable, keys={"table_name": name}) 

239 self._tables.set_or_get(name, table) 

240 result = self.get(name) 

241 assert result is not None 

242 return result 

243 

244 @classmethod 

245 def currentVersions(cls) -> list[VersionTuple]: 

246 # Docstring inherited from VersionedExtension. 

247 return [_VERSION]