Coverage for tests/test_apdbCassandra.py: 37%

138 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-28 08:37 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Unit test for `ApdbCassandra` class. 

23 

24Notes 

25----- 

26For now this test can only run against actual Cassandra cluster, to specify 

27cluster location use ``DAX_APDB_TEST_CASSANDRA_CLUSTER`` environment variable, 

28e.g.: 

29 

30 export DAX_APDB_TEST_CASSANDRA_CLUSTER=cassandra.example.com 

31 pytest tests/test_apdbCassandra.py 

32 

33Individual tests create and destroy unique keyspaces in the cluster, there is 

34no need to pre-create a keyspace with predefined name. 

35""" 

36 

37import os 

38import unittest 

39from typing import Any 

40 

41import astropy.time 

42 

43import lsst.utils.tests 

44from lsst.dax.apdb import ( 

45 Apdb, 

46 ApdbConfig, 

47 ApdbTables, 

48 ApdbUpdateRecord, 

49 IncompatibleVersionError, 

50 ReplicaChunk, 

51) 

52from lsst.dax.apdb.cassandra import ApdbCassandra, ApdbCassandraConfig 

53from lsst.dax.apdb.cassandra.connectionContext import ConnectionContext 

54from lsst.dax.apdb.cassandra.sessionFactory import SessionFactory 

55from lsst.dax.apdb.pixelization import Pixelization 

56from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest, cassandra_mixin 

57from lsst.dax.apdb.tests.data_factory import makeObjectCatalog 

58from lsst.dax.apdb.tests.utils import modified_environment 

59 

60TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml") 

61TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml") 

62# Schema that uses `datetime` for timestamps and combines APDB and SSP. 

63TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml") 

64 

65 

66class ApdbCassandraMixin(cassandra_mixin.ApdbCassandraMixin): 

67 """Mixin class which defines common methods for unit tests.""" 

68 

69 def pixelization(self, config: ApdbConfig) -> Pixelization: 

70 """Return pixelization used by implementation.""" 

71 assert isinstance(config, ApdbCassandraConfig), "Only expect ApdbCassandraConfig here" 

72 return Pixelization( 

73 config.partitioning.part_pixelization, 

74 config.partitioning.part_pix_level, 

75 config.partitioning.part_pix_max_ranges, 

76 ) 

77 

78 

79class ApdbCassandraTestCase(ApdbCassandraMixin, ApdbTest, unittest.TestCase): 

80 """A test case for ApdbCassandra class""" 

81 

82 time_partition_tables = False 

83 time_partition_start: str | None = None 

84 time_partition_end: str | None = None 

85 extra_chunk_columns = 2 

86 

87 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

88 """Make config class instance used in all tests.""" 

89 kw: dict[str, Any] = { 

90 "hosts": (self.cluster_host,), 

91 "keyspace": self.keyspace, 

92 "schema_file": TEST_SCHEMA, 

93 "ss_schema_file": TEST_SCHEMA_SSO, 

94 "time_partition_tables": self.time_partition_tables, 

95 "enable_replica": self.enable_replica, 

96 } 

97 if self.time_partition_start: 

98 kw["time_partition_start"] = self.time_partition_start 

99 if self.time_partition_end: 

100 kw["time_partition_end"] = self.time_partition_end 

101 kw.update(kwargs) 

102 return ApdbCassandra.init_database(**kw) 

103 

104 def getDiaObjects_table(self) -> ApdbTables: 

105 """Return type of table returned from getDiaObjects method.""" 

106 return ApdbTables.DiaObjectLast 

107 

108 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None: 

109 # Docstring inherited. 

110 assert isinstance(apdb, ApdbCassandra), "Expecting ApdbCassandra instance" 

111 apdb._storeUpdateRecords(records, chunk, store_chunk=True) 

112 

113 def _count_after_reset_dedup(self, count_before: int) -> int: 

114 return 0 

115 

116 

117class ApdbCassandraPerMonthTestCase(ApdbCassandraTestCase): 

118 """A test case for ApdbCassandra class with per-month tables.""" 

119 

120 time_partition_tables = True 

121 time_partition_start = "2020-06-01T00:00:00" 

122 time_partition_end = "2021-06-01T00:00:00" 

123 meta_row_count = 4 

124 

125 def test_store_partition_range(self) -> None: 

126 """Test that writing to non-existing partition raises an error.""" 

127 config = self.make_instance() 

128 apdb = Apdb.from_config(config) 

129 

130 region = self.make_region() 

131 

132 # Visit time is beyond time_partition_end. 

133 visit_time = astropy.time.Time("2022-01-01", format="isot", scale="tai") 

134 catalog = makeObjectCatalog(region, 100) 

135 with self.assertRaisesRegex(ValueError, "time partitions that do not yet exist"): 

136 apdb.store(visit_time, catalog) 

137 

138 # Writing to last partition makes a warning. 

139 visit_time = astropy.time.Time("2021-06-01", format="isot", scale="tai") 

140 catalog = makeObjectCatalog(region, 100) 

141 with self.assertWarnsRegex(UserWarning, "Writing into the last temporal partition"): 

142 apdb.store(visit_time, catalog) 

143 

144 

145class ApdbCassandraTestCaseReplica(ApdbCassandraTestCase): 

146 """A test case with enabled replica tables.""" 

147 

148 enable_replica = True 

149 meta_row_count = 4 

150 

151 

152class ApdbCassandraTestCaseDatetimeReplica(ApdbCassandraTestCaseReplica): 

153 """A test case with datetime timestamps.""" 

154 

155 use_mjd = False 

156 

157 def setUp(self) -> None: 

158 super().setUp() 

159 # Schema for datetime case is also missing a validityTime column in 

160 # DiaObjectLast table. 

161 self.table_column_count = dict(self.table_column_count) 

162 self.table_column_count[ApdbTables.DiaObjectLast] = 5 

163 

164 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

165 if "schema_file" in kwargs: 

166 return super().make_instance(**kwargs) 

167 else: 

168 return super().make_instance(schema_file=TEST_SCHEMA_DT, **kwargs) 

169 

170 

171class ApdbSchemaUpdateCassandraTestCase(ApdbCassandraMixin, ApdbSchemaUpdateTest, unittest.TestCase): 

172 """A test case for schema updates using Cassandra backend.""" 

173 

174 time_partition_tables = False 

175 

176 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

177 """Make config class instance used in all tests.""" 

178 kw = { 

179 "hosts": (self.cluster_host,), 

180 "keyspace": self.keyspace, 

181 "schema_file": TEST_SCHEMA, 

182 "ss_schema_file": TEST_SCHEMA_SSO, 

183 "time_partition_tables": self.time_partition_tables, 

184 } 

185 kw.update(kwargs) 

186 return ApdbCassandra.init_database(**kw) # type: ignore[arg-type] 

187 

188 

189class ApdbCassandraVersionCheck(cassandra_mixin.ApdbCassandraMixin, unittest.TestCase): 

190 """A test case to verify that version check happens before reading 

191 frozen configuration. 

192 """ 

193 

194 def setUp(self) -> None: 

195 super().setUp() 

196 

197 self.config = ApdbCassandra.init_database( 

198 hosts=(self.cluster_host,), 

199 keyspace=self.keyspace, 

200 schema_file=TEST_SCHEMA, 

201 ss_schema_file=TEST_SCHEMA_SSO, 

202 time_partition_tables=False, 

203 ) 

204 

205 def test_version_check(self) -> None: 

206 """Test that version check happens before reading config.""" 

207 apdb = Apdb.from_config(self.config) 

208 assert isinstance(apdb, ApdbCassandra) 

209 

210 # Store incompatible version. 

211 apdb.metadata.set(ConnectionContext.metadataSchemaVersionKey, "99.0.0", force=True) 

212 

213 # Overwrite frozen config with something that will break. 

214 apdb.metadata.set(ConnectionContext.metadataConfigKey, '{"not_a_config_key": 0}', force=True) 

215 

216 # Try again. 

217 with self.assertRaises(IncompatibleVersionError): 

218 # Need to call some actual method to initiate connection. 

219 Apdb.from_config(self.config).metadata.items() 

220 

221 

222_DB_AUTH_JSON = """\ 

223[{ 

224 "url": "cassandra://user1000@node1.slac.stanford.edu:9042/", 

225 "username": "user01", 

226 "password": "pass01" 

227}, { 

228 "url": "cassandra://node2.slac.stanford.edu:9042/", 

229 "username": "user02", 

230 "password": "pass02" 

231}, { 

232 "url": "cassandra://node1.slac.stanford.edu:9042/apdb_dev", 

233 "username": "user03", 

234 "password": "pass03" 

235}, { 

236 "url": "cassandra://user2000@test_cluster/", 

237 "username": "user04", 

238 "password": "pass04" 

239}, { 

240 "url": "cassandra://test_cluster/", 

241 "username": "user05", 

242 "password": "pass05" 

243}, { 

244 "url": "cassandra://pp_apdb_prod_cluster/", 

245 "username": "user06", 

246 "password": "pass06" 

247}, { 

248 "url": "cassandra://pp_apdb_dev_cluster/", 

249 "username": "user07", 

250 "password": "pass07" 

251}] 

252""" 

253 

254 

255class ApdbCassandraDbAuthTest(unittest.TestCase): 

256 """A test case for extracting credentials from db-auth.yaml.""" 

257 

258 def _make_config(self) -> ApdbCassandraConfig: 

259 config = ApdbCassandraConfig( 

260 contact_points=("node1.slac.stanford.edu", "node2.slac.stanford.edu"), 

261 keyspace="apdb", 

262 ) 

263 return config 

264 

265 @unittest.skipIf(not cassandra_mixin.CASSANDRA_IMPORTED, "cassandra_driver cannot be imported") 

266 def test_dbauth(self) -> None: 

267 """Check credentials access.""" 

268 with modified_environment(LSST_DB_AUTH_CREDENTIALS=_DB_AUTH_JSON): 

269 config = self._make_config() 

270 

271 factory = SessionFactory(config) 

272 

273 # Should match second entry. 

274 auth = factory._make_auth_provider() 

275 assert auth is not None 

276 self.assertEqual(auth.username, "user02") 

277 

278 config.keyspace = "apdb_dev" 

279 # Should match third entry. 

280 auth = factory._make_auth_provider() 

281 assert auth is not None 

282 self.assertEqual(auth.username, "user03") 

283 

284 config.connection_config.username = "user1000" 

285 # Should match first entry, returns original user name. 

286 auth = factory._make_auth_provider() 

287 assert auth is not None 

288 self.assertEqual(auth.username, "user1000") 

289 self.assertEqual(auth.password, "pass01") 

290 

291 config.connection_config.username = "" 

292 config.connection_config.dbauth_alias = "test_cluster" 

293 # Should match fifth entry. 

294 auth = factory._make_auth_provider() 

295 assert auth is not None 

296 self.assertEqual(auth.username, "user05") 

297 

298 config.connection_config.username = "user2000" 

299 config.connection_config.dbauth_alias = "test_cluster" 

300 # Should match fourth entry. 

301 auth = factory._make_auth_provider() 

302 assert auth is not None 

303 self.assertEqual(auth.username, "user2000") 

304 self.assertEqual(auth.password, "pass04") 

305 

306 # Default value of alias is based on username and keyspace. 

307 config.connection_config.dbauth_alias = "" 

308 config.contact_points = ("127.0.0.1",) 

309 config.keyspace = "apdb" 

310 config.connection_config.username = "apdb-prod" 

311 auth = factory._make_auth_provider() 

312 self.assertIsNone(auth) 

313 

314 config.keyspace = "pp_apdb_cam" 

315 auth = factory._make_auth_provider() 

316 assert auth is not None 

317 self.assertEqual(auth.password, "pass06") 

318 

319 config.connection_config.username = "apdb" 

320 config.keyspace = "pp_apdb_cam_dev" 

321 auth = factory._make_auth_provider() 

322 assert auth is not None 

323 self.assertEqual(auth.password, "pass07") 

324 

325 

326class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

327 """Run file leak tests.""" 

328 

329 

330def setup_module(module: Any) -> None: 

331 """Configure pytest.""" 

332 lsst.utils.tests.init() 

333 

334 

335if __name__ == "__main__": 

336 lsst.utils.tests.init() 

337 unittest.main()