Coverage for python / lsst / daf / butler / tests / _dummyRegistry.py: 25%

112 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-23 01:06 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("DummyRegistry",) 

30 

31from collections.abc import Iterable, Iterator 

32from typing import Any 

33 

34from lsst.daf.butler import DimensionUniverse, ddl 

35from lsst.daf.butler.registry.bridge.ephemeral import EphemeralDatastoreRegistryBridge 

36from lsst.daf.butler.registry.interfaces import ( 

37 Database, 

38 DatabaseInsertMode, 

39 DatasetIdRef, 

40 DatasetRecordStorageManager, 

41 DatastoreRegistryBridge, 

42 DatastoreRegistryBridgeManager, 

43 OpaqueTableStorage, 

44 OpaqueTableStorageManager, 

45 StaticTablesContext, 

46 VersionTuple, 

47) 

48 

49from ..datastore import DatastoreTransaction 

50 

51 

52class DummyOpaqueTableStorage(OpaqueTableStorage): 

53 def __init__(self, name: str, spec: ddl.TableSpec) -> None: 

54 super().__init__(name=name) 

55 self._rows: list[dict] = [] 

56 self._spec = spec 

57 

58 def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

59 # Docstring inherited from OpaqueTableStorage. 

60 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.INSERT) 

61 

62 def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

63 # Docstring inherited from OpaqueTableStorage. 

64 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.REPLACE) 

65 

66 def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

67 # Docstring inherited from OpaqueTableStorage. 

68 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.ENSURE) 

69 

70 def _insert( 

71 self, 

72 *data: dict, 

73 transaction: DatastoreTransaction | None = None, 

74 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, 

75 ) -> None: 

76 uniqueConstraints = list(self._spec.unique) 

77 uniqueConstraints.append(tuple(field.name for field in self._spec.fields if field.primaryKey)) 

78 for d in data: 

79 skipping = False 

80 for constraint in uniqueConstraints: 

81 matching = list(self.fetch(**{k: d[k] for k in constraint})) 

82 if len(matching) != 0: 

83 match insert_mode: 

84 case DatabaseInsertMode.INSERT: 

85 raise RuntimeError( 

86 f"Unique constraint {constraint} violation in external table {self.name}." 

87 ) 

88 case DatabaseInsertMode.ENSURE: 

89 # Row already exists. Skip. 

90 skipping = True 

91 case DatabaseInsertMode.REPLACE: 

92 # Should try to put these rows back on transaction 

93 # rollback... 

94 self.delete([], *matching) 

95 case _: 

96 raise ValueError(f"Unrecognized insert mode: {insert_mode}.") 

97 

98 if skipping: 

99 continue 

100 self._rows.append(d) 

101 if transaction is not None: 

102 transaction.registerUndo("insert", self.delete, [], d) 

103 

104 def fetch(self, **where: Any) -> Iterator[dict]: 

105 # Docstring inherited from OpaqueTableStorage. 

106 where = where.copy() # May need to modify it. 

107 

108 # Can support an IN operator if given list. 

109 wherein = {} 

110 for k in list(where): 

111 if isinstance(where[k], tuple | list | set): 

112 wherein[k] = set(where[k]) 

113 del where[k] 

114 

115 for d in self._rows: 

116 if all(d[k] == v for k, v in where.items()): 

117 if wherein: 

118 match = True 

119 for k, v in wherein.items(): 

120 if d[k] not in v: 

121 match = False 

122 break 

123 if match: 

124 yield d 

125 else: 

126 yield d 

127 

128 def fetch_batches(self, **where: Any) -> Iterator[list[dict]]: 

129 # Docstring inherited from OpaqueTableStorage. 

130 yield list(self.fetch(**where)) 

131 

132 def delete(self, columns: Iterable[str], *rows: dict) -> None: 

133 # Docstring inherited from OpaqueTableStorage. 

134 kept_rows = [] 

135 for table_row in self._rows: 

136 for where_row in rows: 

137 if all(table_row[k] == v for k, v in where_row.items()): 

138 break 

139 else: 

140 kept_rows.append(table_row) 

141 self._rows = kept_rows 

142 

143 

144class DummyOpaqueTableStorageManager(OpaqueTableStorageManager): 

145 def __init__(self, registry_schema_version: VersionTuple | None = None) -> None: 

146 super().__init__(registry_schema_version=registry_schema_version) 

147 self._storages: dict[str, DummyOpaqueTableStorage] = {} 

148 

149 def clone(self, db: Database) -> OpaqueTableStorageManager: 

150 return self 

151 

152 @classmethod 

153 def initialize( 

154 cls, db: Database, context: StaticTablesContext, registry_schema_version: VersionTuple | None = None 

155 ) -> OpaqueTableStorageManager: 

156 # Docstring inherited from OpaqueTableStorageManager. 

157 # Not used, but needed to satisfy ABC requirement. 

158 return cls(registry_schema_version=registry_schema_version) 

159 

160 def get(self, name: str) -> OpaqueTableStorage | None: 

161 # Docstring inherited from OpaqueTableStorageManager. 

162 return self._storages.get(name) 

163 

164 def register(self, name: str, spec: ddl.TableSpec) -> OpaqueTableStorage: 

165 # Docstring inherited from OpaqueTableStorageManager. 

166 return self._storages.setdefault(name, DummyOpaqueTableStorage(name, spec)) 

167 

168 @classmethod 

169 def currentVersions(cls) -> list[VersionTuple]: 

170 # Docstring inherited from VersionedExtension. 

171 return [] 

172 

173 

174class DummyDatastoreRegistryBridgeManager(DatastoreRegistryBridgeManager): 

175 def __init__( 

176 self, 

177 opaque: OpaqueTableStorageManager, 

178 universe: DimensionUniverse, 

179 registry_schema_version: VersionTuple | None = None, 

180 ): 

181 super().__init__( 

182 opaque=opaque, 

183 universe=universe, 

184 registry_schema_version=registry_schema_version, 

185 ) 

186 self._bridges: dict[str, EphemeralDatastoreRegistryBridge] = {} 

187 

188 def clone(self, *, db: Database, opaque: OpaqueTableStorageManager) -> DatastoreRegistryBridgeManager: 

189 return DummyDatastoreRegistryBridgeManager( 

190 opaque=opaque, 

191 universe=self.universe, 

192 registry_schema_version=self._registry_schema_version, 

193 ) 

194 

195 @classmethod 

196 def initialize( 

197 cls, 

198 db: Database, 

199 context: StaticTablesContext, 

200 *, 

201 opaque: OpaqueTableStorageManager, 

202 datasets: type[DatasetRecordStorageManager], 

203 universe: DimensionUniverse, 

204 registry_schema_version: VersionTuple | None = None, 

205 ) -> DatastoreRegistryBridgeManager: 

206 # Docstring inherited from DatastoreRegistryBridgeManager 

207 # Not used, but needed to satisfy ABC requirement. 

208 return cls( 

209 opaque=opaque, 

210 universe=universe, 

211 registry_schema_version=registry_schema_version, 

212 ) 

213 

214 def refresh(self) -> None: 

215 # Docstring inherited from DatastoreRegistryBridgeManager 

216 pass 

217 

218 def register(self, name: str, *, ephemeral: bool = False) -> DatastoreRegistryBridge: 

219 # Docstring inherited from DatastoreRegistryBridgeManager 

220 return self._bridges.setdefault(name, EphemeralDatastoreRegistryBridge(name)) 

221 

222 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]: 

223 # Docstring inherited from DatastoreRegistryBridgeManager 

224 for name, bridge in self._bridges.items(): 

225 if ref in bridge: 

226 yield name 

227 

228 @classmethod 

229 def currentVersions(cls) -> list[VersionTuple]: 

230 # Docstring inherited from VersionedExtension. 

231 return [] 

232 

233 

234class DummyRegistry: 

235 """Dummy Registry, for Datastore test purposes.""" 

236 

237 def __init__(self) -> None: 

238 self._opaque = DummyOpaqueTableStorageManager() 

239 self.dimensions = DimensionUniverse() 

240 self._datastoreBridges = DummyDatastoreRegistryBridgeManager(self._opaque, self.dimensions, None) 

241 

242 def getDatastoreBridgeManager(self) -> DatastoreRegistryBridgeManager: 

243 return self._datastoreBridges