Coverage for tests/test_apdbCassandra.py: 37%
138 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 08:42 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-30 08:42 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Unit test for `ApdbCassandra` class.
24Notes
25-----
26For now this test can only run against actual Cassandra cluster, to specify
27cluster location use ``DAX_APDB_TEST_CASSANDRA_CLUSTER`` environment variable,
28e.g.:
30 export DAX_APDB_TEST_CASSANDRA_CLUSTER=cassandra.example.com
31 pytest tests/test_apdbCassandra.py
33Individual tests create and destroy unique keyspaces in the cluster, there is
34no need to pre-create a keyspace with predefined name.
35"""
37import os
38import unittest
39from typing import Any
41import astropy.time
43import lsst.utils.tests
44from lsst.dax.apdb import (
45 Apdb,
46 ApdbConfig,
47 ApdbTables,
48 ApdbUpdateRecord,
49 IncompatibleVersionError,
50 ReplicaChunk,
51)
52from lsst.dax.apdb.cassandra import ApdbCassandra, ApdbCassandraConfig
53from lsst.dax.apdb.cassandra.connectionContext import ConnectionContext
54from lsst.dax.apdb.cassandra.sessionFactory import SessionFactory
55from lsst.dax.apdb.pixelization import Pixelization
56from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest, cassandra_mixin
57from lsst.dax.apdb.tests.data_factory import makeObjectCatalog
58from lsst.dax.apdb.tests.utils import modified_environment
60TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml")
61TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml")
62# Schema that uses `datetime` for timestamps and combines APDB and SSP.
63TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml")
66class ApdbCassandraMixin(cassandra_mixin.ApdbCassandraMixin):
67 """Mixin class which defines common methods for unit tests."""
69 def pixelization(self, config: ApdbConfig) -> Pixelization:
70 """Return pixelization used by implementation."""
71 assert isinstance(config, ApdbCassandraConfig), "Only expect ApdbCassandraConfig here"
72 return Pixelization(
73 config.partitioning.part_pixelization,
74 config.partitioning.part_pix_level,
75 config.partitioning.part_pix_max_ranges,
76 )
79class ApdbCassandraTestCase(ApdbCassandraMixin, ApdbTest, unittest.TestCase):
80 """A test case for ApdbCassandra class"""
82 time_partition_tables = False
83 time_partition_start: str | None = None
84 time_partition_end: str | None = None
85 extra_chunk_columns = 2
87 def make_instance(self, **kwargs: Any) -> ApdbConfig:
88 """Make config class instance used in all tests."""
89 kw: dict[str, Any] = {
90 "hosts": (self.cluster_host,),
91 "keyspace": self.keyspace,
92 "schema_file": TEST_SCHEMA,
93 "ss_schema_file": TEST_SCHEMA_SSO,
94 "time_partition_tables": self.time_partition_tables,
95 "enable_replica": self.enable_replica,
96 }
97 if self.time_partition_start:
98 kw["time_partition_start"] = self.time_partition_start
99 if self.time_partition_end:
100 kw["time_partition_end"] = self.time_partition_end
101 kw.update(kwargs)
102 return ApdbCassandra.init_database(**kw)
104 def getDiaObjects_table(self) -> ApdbTables:
105 """Return type of table returned from getDiaObjects method."""
106 return ApdbTables.DiaObjectLast
108 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None:
109 # Docstring inherited.
110 assert isinstance(apdb, ApdbCassandra), "Expecting ApdbCassandra instance"
111 apdb._storeUpdateRecords(records, chunk, store_chunk=True)
113 def _count_after_reset_dedup(self, count_before: int) -> int:
114 return 0
117class ApdbCassandraPerMonthTestCase(ApdbCassandraTestCase):
118 """A test case for ApdbCassandra class with per-month tables."""
120 time_partition_tables = True
121 time_partition_start = "2020-06-01T00:00:00"
122 time_partition_end = "2021-06-01T00:00:00"
123 meta_row_count = 4
125 def test_store_partition_range(self) -> None:
126 """Test that writing to non-existing partition raises an error."""
127 config = self.make_instance()
128 apdb = Apdb.from_config(config)
130 region = self.make_region()
132 # Visit time is beyond time_partition_end.
133 visit_time = astropy.time.Time("2022-01-01", format="isot", scale="tai")
134 catalog = makeObjectCatalog(region, 100)
135 with self.assertRaisesRegex(ValueError, "time partitions that do not yet exist"):
136 apdb.store(visit_time, catalog)
138 # Writing to last partition makes a warning.
139 visit_time = astropy.time.Time("2021-06-01", format="isot", scale="tai")
140 catalog = makeObjectCatalog(region, 100)
141 with self.assertWarnsRegex(UserWarning, "Writing into the last temporal partition"):
142 apdb.store(visit_time, catalog)
145class ApdbCassandraTestCaseReplica(ApdbCassandraTestCase):
146 """A test case with enabled replica tables."""
148 enable_replica = True
149 meta_row_count = 4
152class ApdbCassandraTestCaseDatetimeReplica(ApdbCassandraTestCaseReplica):
153 """A test case with datetime timestamps."""
155 use_mjd = False
157 def setUp(self) -> None:
158 super().setUp()
159 # Schema for datetime case is also missing a validityTime column in
160 # DiaObjectLast table.
161 self.table_column_count = dict(self.table_column_count)
162 self.table_column_count[ApdbTables.DiaObjectLast] = 5
164 def make_instance(self, **kwargs: Any) -> ApdbConfig:
165 if "schema_file" in kwargs:
166 return super().make_instance(**kwargs)
167 else:
168 return super().make_instance(schema_file=TEST_SCHEMA_DT, **kwargs)
171class ApdbSchemaUpdateCassandraTestCase(ApdbCassandraMixin, ApdbSchemaUpdateTest, unittest.TestCase):
172 """A test case for schema updates using Cassandra backend."""
174 time_partition_tables = False
176 def make_instance(self, **kwargs: Any) -> ApdbConfig:
177 """Make config class instance used in all tests."""
178 kw = {
179 "hosts": (self.cluster_host,),
180 "keyspace": self.keyspace,
181 "schema_file": TEST_SCHEMA,
182 "ss_schema_file": TEST_SCHEMA_SSO,
183 "time_partition_tables": self.time_partition_tables,
184 }
185 kw.update(kwargs)
186 return ApdbCassandra.init_database(**kw) # type: ignore[arg-type]
189class ApdbCassandraVersionCheck(cassandra_mixin.ApdbCassandraMixin, unittest.TestCase):
190 """A test case to verify that version check happens before reading
191 frozen configuration.
192 """
194 def setUp(self) -> None:
195 super().setUp()
197 self.config = ApdbCassandra.init_database(
198 hosts=(self.cluster_host,),
199 keyspace=self.keyspace,
200 schema_file=TEST_SCHEMA,
201 ss_schema_file=TEST_SCHEMA_SSO,
202 time_partition_tables=False,
203 )
205 def test_version_check(self) -> None:
206 """Test that version check happens before reading config."""
207 apdb = Apdb.from_config(self.config)
208 assert isinstance(apdb, ApdbCassandra)
210 # Store incompatible version.
211 apdb.metadata.set(ConnectionContext.metadataSchemaVersionKey, "99.0.0", force=True)
213 # Overwrite frozen config with something that will break.
214 apdb.metadata.set(ConnectionContext.metadataConfigKey, '{"not_a_config_key": 0}', force=True)
216 # Try again.
217 with self.assertRaises(IncompatibleVersionError):
218 # Need to call some actual method to initiate connection.
219 Apdb.from_config(self.config).metadata.items()
222_DB_AUTH_JSON = """\
223[{
224 "url": "cassandra://user1000@node1.slac.stanford.edu:9042/",
225 "username": "user01",
226 "password": "pass01"
227}, {
228 "url": "cassandra://node2.slac.stanford.edu:9042/",
229 "username": "user02",
230 "password": "pass02"
231}, {
232 "url": "cassandra://node1.slac.stanford.edu:9042/apdb_dev",
233 "username": "user03",
234 "password": "pass03"
235}, {
236 "url": "cassandra://user2000@test_cluster/",
237 "username": "user04",
238 "password": "pass04"
239}, {
240 "url": "cassandra://test_cluster/",
241 "username": "user05",
242 "password": "pass05"
243}, {
244 "url": "cassandra://pp_apdb_prod_cluster/",
245 "username": "user06",
246 "password": "pass06"
247}, {
248 "url": "cassandra://pp_apdb_dev_cluster/",
249 "username": "user07",
250 "password": "pass07"
251}]
252"""
255class ApdbCassandraDbAuthTest(unittest.TestCase):
256 """A test case for extracting credentials from db-auth.yaml."""
258 def _make_config(self) -> ApdbCassandraConfig:
259 config = ApdbCassandraConfig(
260 contact_points=("node1.slac.stanford.edu", "node2.slac.stanford.edu"),
261 keyspace="apdb",
262 )
263 return config
265 @unittest.skipIf(not cassandra_mixin.CASSANDRA_IMPORTED, "cassandra_driver cannot be imported")
266 def test_dbauth(self) -> None:
267 """Check credentials access."""
268 with modified_environment(LSST_DB_AUTH_CREDENTIALS=_DB_AUTH_JSON):
269 config = self._make_config()
271 factory = SessionFactory(config)
273 # Should match second entry.
274 auth = factory._make_auth_provider()
275 assert auth is not None
276 self.assertEqual(auth.username, "user02")
278 config.keyspace = "apdb_dev"
279 # Should match third entry.
280 auth = factory._make_auth_provider()
281 assert auth is not None
282 self.assertEqual(auth.username, "user03")
284 config.connection_config.username = "user1000"
285 # Should match first entry, returns original user name.
286 auth = factory._make_auth_provider()
287 assert auth is not None
288 self.assertEqual(auth.username, "user1000")
289 self.assertEqual(auth.password, "pass01")
291 config.connection_config.username = ""
292 config.connection_config.dbauth_alias = "test_cluster"
293 # Should match fifth entry.
294 auth = factory._make_auth_provider()
295 assert auth is not None
296 self.assertEqual(auth.username, "user05")
298 config.connection_config.username = "user2000"
299 config.connection_config.dbauth_alias = "test_cluster"
300 # Should match fourth entry.
301 auth = factory._make_auth_provider()
302 assert auth is not None
303 self.assertEqual(auth.username, "user2000")
304 self.assertEqual(auth.password, "pass04")
306 # Default value of alias is based on username and keyspace.
307 config.connection_config.dbauth_alias = ""
308 config.contact_points = ("127.0.0.1",)
309 config.keyspace = "apdb"
310 config.connection_config.username = "apdb-prod"
311 auth = factory._make_auth_provider()
312 self.assertIsNone(auth)
314 config.keyspace = "pp_apdb_cam"
315 auth = factory._make_auth_provider()
316 assert auth is not None
317 self.assertEqual(auth.password, "pass06")
319 config.connection_config.username = "apdb"
320 config.keyspace = "pp_apdb_cam_dev"
321 auth = factory._make_auth_provider()
322 assert auth is not None
323 self.assertEqual(auth.password, "pass07")
326class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
327 """Run file leak tests."""
330def setup_module(module: Any) -> None:
331 """Configure pytest."""
332 lsst.utils.tests.init()
335if __name__ == "__main__":
336 lsst.utils.tests.init()
337 unittest.main()