Coverage for python/lsst/daf/butler/registry/opaque.py: 28%
83 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:15 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""The default concrete implementations of the classes that manage
29opaque tables for `Registry`.
30"""
32from __future__ import annotations
34__all__ = ["ByNameOpaqueTableStorage", "ByNameOpaqueTableStorageManager"]
36import itertools
37from collections.abc import Iterable, Iterator, Sequence
38from typing import TYPE_CHECKING, Any, ClassVar
40import sqlalchemy
42from .._utilities.thread_safe_cache import ThreadSafeCache
43from ..ddl import FieldSpec, TableSpec
44from .interfaces import (
45 Database,
46 OpaqueTableStorage,
47 OpaqueTableStorageManager,
48 StaticTablesContext,
49 VersionTuple,
50)
52if TYPE_CHECKING:
53 from ..datastore import DatastoreTransaction
55# This has to be updated on every schema change
56_VERSION = VersionTuple(0, 2, 0)
59class ByNameOpaqueTableStorage(OpaqueTableStorage):
60 """An implementation of `OpaqueTableStorage` that simply creates a true
61 table for each different named opaque logical table.
63 A `ByNameOpaqueTableStorageManager` instance should always be used to
64 construct and manage instances of this class.
66 Parameters
67 ----------
68 db : `Database`
69 Database engine interface for the namespace in which this table lives.
70 name : `str`
71 Name of the logical table (also used as the name of the actual table).
72 table : `sqlalchemy.schema.Table`
73 SQLAlchemy representation of the table, which must have already been
74 created in the namespace managed by ``db`` (this is the responsibility
75 of `ByNameOpaqueTableStorageManager`).
76 """
78 def __init__(self, *, db: Database, name: str, table: sqlalchemy.schema.Table):
79 super().__init__(name=name)
80 self._db = db
81 self._table = table
83 def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
84 # Docstring inherited from OpaqueTableStorage.
85 # The provided transaction object can be ignored since we rely on
86 # the database itself providing any rollback functionality.
87 self._db.insert(self._table, *data)
89 def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
90 # Docstring inherited from OpaqueTableStorage.
91 # The provided transaction object can be ignored since we rely on
92 # the database itself providing any rollback functionality.
93 self._db.ensure(self._table, *data)
95 def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
96 # Docstring inherited from OpaqueTableStorage.
97 # The provided transaction object can be ignored since we rely on
98 # the database itself providing any rollback functionality.
99 self._db.replace(self._table, *data)
101 def fetch(
102 self,
103 **where: Any,
104 ) -> Iterator[sqlalchemy.RowMapping]:
105 # Docstring inherited from OpaqueTableStorage.
106 for batch in self.fetch_batches(**where):
107 yield from batch
109 def fetch_batches(
110 self,
111 **where: Any,
112 ) -> Iterator[Sequence[sqlalchemy.RowMapping]]:
113 def _batch_in_clause(
114 column: sqlalchemy.schema.Column, values: Iterable[Any]
115 ) -> Iterator[sqlalchemy.sql.expression.ClauseElement]:
116 """Split one long IN clause into a series of shorter ones."""
117 in_limit = 1000
118 # We have to remove possible duplicates from values; and in many
119 # cases it should be helpful to order the items in the clause.
120 values = sorted(set(values))
121 for iposn in range(0, len(values), in_limit):
122 in_clause = column.in_(values[iposn : iposn + in_limit])
123 yield in_clause
125 def _batch_in_clauses(**where: Any) -> Iterator[sqlalchemy.sql.expression.ColumnElement]:
126 """Generate a sequence of WHERE clauses with a limited number of
127 items in IN clauses.
128 """
129 batches: list[Iterable[Any]] = []
130 for k, v in where.items():
131 column = self._table.columns[k]
132 if isinstance(v, list | tuple | set):
133 batches.append(_batch_in_clause(column, v))
134 else:
135 if isinstance(v, str) and v.endswith("%"):
136 # Special case prefix queries.
137 batches.append([column.startswith(v[:-1])])
138 else:
139 # single "batch" for a regular eq operator
140 batches.append([column == v])
142 for clauses in itertools.product(*batches):
143 yield sqlalchemy.sql.and_(*clauses)
145 sql = self._table.select()
146 if where:
147 # Split long IN clauses into shorter batches
148 batched_sql = [sql.where(clause) for clause in _batch_in_clauses(**where)]
149 else:
150 batched_sql = [sql]
151 for sql_batch in batched_sql:
152 with self._db.query(sql_batch) as sql_result:
153 sql_mappings = sql_result.mappings().fetchall()
154 yield sql_mappings
156 def delete(self, columns: Iterable[str], *rows: dict) -> None:
157 # Docstring inherited from OpaqueTableStorage.
158 self._db.delete(self._table, columns, *rows)
161class ByNameOpaqueTableStorageManager(OpaqueTableStorageManager):
162 """An implementation of `OpaqueTableStorageManager` that simply creates a
163 true table for each different named opaque logical table.
165 Instances of this class should generally be constructed via the
166 `initialize` class method instead of invoking ``__init__`` directly.
168 Parameters
169 ----------
170 db : `Database`
171 Database engine interface for the namespace in which this table lives.
172 metaTable : `sqlalchemy.schema.Table`
173 SQLAlchemy representation of the table that records which opaque
174 logical tables exist.
175 tables : `ThreadSafeCache` [`str`, `~sqlalchemy.schema.Table`]
176 Mapping from string to table, to track which tables have already been
177 created. This mapping is shared between cloned instances of this
178 manager.
179 registry_schema_version : `VersionTuple` or `None`, optional
180 Version of registry schema.
181 """
183 def __init__(
184 self,
185 db: Database,
186 metaTable: sqlalchemy.schema.Table,
187 tables: ThreadSafeCache[str, sqlalchemy.schema.Table],
188 registry_schema_version: VersionTuple | None = None,
189 ):
190 super().__init__(registry_schema_version=registry_schema_version)
191 self._db = db
192 self._metaTable = metaTable
193 self._tables = tables
195 def clone(self, db: Database) -> ByNameOpaqueTableStorageManager:
196 return ByNameOpaqueTableStorageManager(
197 db, self._metaTable, self._tables, self._registry_schema_version
198 )
200 _META_TABLE_NAME: ClassVar[str] = "opaque_meta"
202 _META_TABLE_SPEC: ClassVar[TableSpec] = TableSpec(
203 fields=[
204 FieldSpec("table_name", dtype=sqlalchemy.String, length=128, primaryKey=True),
205 ],
206 )
208 @classmethod
209 def initialize(
210 cls, db: Database, context: StaticTablesContext, registry_schema_version: VersionTuple | None = None
211 ) -> OpaqueTableStorageManager:
212 # Docstring inherited from OpaqueTableStorageManager.
213 metaTable = context.addTable(cls._META_TABLE_NAME, cls._META_TABLE_SPEC)
214 return cls(
215 db=db,
216 metaTable=metaTable,
217 tables=ThreadSafeCache(),
218 registry_schema_version=registry_schema_version,
219 )
221 def get(self, name: str) -> OpaqueTableStorage | None:
222 # Docstring inherited from OpaqueTableStorageManager.
223 table = self._tables.get(name)
224 if table is None:
225 return None
226 return ByNameOpaqueTableStorage(name=name, table=table, db=self._db)
228 def register(self, name: str, spec: TableSpec) -> OpaqueTableStorage:
229 # Docstring inherited from OpaqueTableStorageManager.
230 result = self.get(name)
231 if result is None:
232 # Create the table itself. If it already exists but wasn't in
233 # the dict because it was added by another client since this one
234 # was initialized, that's fine.
235 table = self._db.ensureTableExists(name, spec)
236 # Add a row to the meta table so we can find this table in the
237 # future. Also okay if that already exists, so we use sync.
238 self._db.sync(self._metaTable, keys={"table_name": name})
239 self._tables.set_or_get(name, table)
240 result = self.get(name)
241 assert result is not None
242 return result
244 @classmethod
245 def currentVersions(cls) -> list[VersionTuple]:
246 # Docstring inherited from VersionedExtension.
247 return [_VERSION]