Coverage for python/lsst/dax/apdb/cassandra/config.py: 64%
87 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:21 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:21 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ApdbCassandraConfig",
26 "ApdbCassandraConnectionConfig",
27 "ApdbCassandraPartitioningConfig",
28 "ApdbCassandraTimePartitionRange",
29]
31import json
32from collections.abc import Iterable, Iterator
33from typing import TYPE_CHECKING, Any, ClassVar, Self
35from pydantic import BaseModel, Field, field_validator
37# If cassandra-driver is not there the module can still be imported.
38try:
39 import cassandra
41 CASSANDRA_IMPORTED = True
42except ImportError:
43 CASSANDRA_IMPORTED = False
46from ..config import ApdbConfig
48if TYPE_CHECKING:
49 from .apdbMetadataCassandra import ApdbMetadataCassandra
52class ApdbCassandraConnectionConfig(BaseModel):
53 """Connection configuration for Cassandra APDB."""
55 port: int = Field(
56 default=9042,
57 description="Port number to connect to.",
58 )
60 private_ips: tuple[str, ...] = Field(
61 default=(),
62 description="List of internal IP addresses for contact_points.",
63 )
65 username: str = Field(
66 default="",
67 description=(
68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password."
69 ),
70 )
72 dbauth_alias: str = Field(
73 default="",
74 description=(
75 "If specified then this string will be used as a host name when checking credentials in "
76 "db-auth.yaml in addition to regular host names in contact_points. For example if "
77 "dbauth_alias='pp_apdb_prod_cluster' then the entry 'cassandra://pp_apdb_prod_cluster/' will "
78 "match. Port number should not be used in that entry. Alias has higher priority than host names. "
79 "If not specified, the default value depends on `username` and `keyspace` values."
80 ),
81 )
83 read_consistency: str = Field(
84 default="QUORUM",
85 description="Name for consistency level of read operations, default: QUORUM, can be ONE.",
86 )
88 write_consistency: str = Field(
89 default="QUORUM",
90 description="Name for consistency level of write operations, default: QUORUM, can be ONE.",
91 )
93 read_timeout: float = Field(
94 default=120.0,
95 description="Timeout in seconds for read operations.",
96 )
98 write_timeout: float = Field(
99 default=60.0,
100 description="Timeout in seconds for write operations.",
101 )
103 remove_timeout: float = Field(
104 default=600.0,
105 description="Timeout in seconds for remove operations.",
106 )
108 read_concurrency: int = Field(
109 default=500,
110 description="Concurrency level for read operations.",
111 )
113 protocol_version: int = Field(
114 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4,
115 description="Cassandra protocol version to use, default is V4.",
116 )
118 extra_parameters: dict[str, Any] = Field(
119 default={}, description="Additional keyword parameters passed to connect() method verbatim."
120 )
123class ApdbCassandraPartitioningConfig(BaseModel):
124 """Partitioning configuration for Cassandra APDB."""
126 part_pixelization: str = Field(
127 default="mq3c",
128 description="Pixelization used for partitioning index.",
129 )
131 part_pix_level: int = Field(
132 default=11,
133 description="Pixelization level used for partitioning index.",
134 )
136 part_pix_max_ranges: int = Field(
137 default=128,
138 description="Max number of ranges in pixelization envelope",
139 )
141 time_partition_tables: bool = Field(
142 default=False,
143 description="Use per-partition tables for sources instead of partitioning by time",
144 )
146 time_partition_days: int = Field(
147 default=30,
148 description=(
149 "Time partitioning granularity in days, this value must not be changed after database is "
150 "initialized"
151 ),
152 )
154 time_partition_start: str = Field(
155 default="2018-12-01T00:00:00",
156 description=(
157 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
158 "This is used only when time_partition_tables is True."
159 ),
160 )
162 time_partition_end: str = Field(
163 default="2030-01-01T00:00:00",
164 description=(
165 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
166 "This is used only when time_partition_tables is True."
167 ),
168 )
170 query_per_time_part: bool = Field(
171 default=False,
172 description=(
173 "If True then build separate query for each time partition, otherwise build one single query. "
174 "This is only used when time_partition_tables is False in schema config."
175 ),
176 )
178 query_per_spatial_part: bool = Field(
179 default=False,
180 description="If True then build one query per spatial partition, otherwise build single query.",
181 )
183 num_part_dedup: int = Field(
184 default=64,
185 description="Number of partitions in DiaObjectDedup table.",
186 )
188 @field_validator("part_pixelization")
189 @classmethod
190 def check_pixelization(cls, v: str) -> str:
191 allowed = {"htm", "q3c", "mq3c", "healpix"}
192 if v not in allowed:
193 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}")
194 return v
197class ApdbCassandraConfig(ApdbConfig):
198 """Configuration class for Cassandra-based APDB implementation."""
200 _implementation_type: ClassVar[str] = "cassandra"
202 contact_points: tuple[str, ...] = Field(
203 default=("127.0.0.1",),
204 description="The list of contact points to try connecting for cluster discovery.",
205 )
207 keyspace: str = Field(
208 default="apdb",
209 description="Keyspace name for APDB tables.",
210 )
212 connection_config: ApdbCassandraConnectionConfig = Field(
213 default_factory=ApdbCassandraConnectionConfig,
214 description="Database connection configuration",
215 )
217 partitioning: ApdbCassandraPartitioningConfig = Field(
218 default_factory=ApdbCassandraPartitioningConfig,
219 description="Configuration for partitioning.",
220 )
222 dia_object_columns: list[str] = Field(
223 default=[],
224 description="List of columns to read from DiaObject[Last], by default read all columns.",
225 )
227 prefix: str = Field(
228 default="",
229 description="Prefix to add to table names.",
230 )
232 ra_dec_columns: tuple[str, str] = Field(
233 default=("ra", "dec"),
234 description="Names of ra/dec columns in DiaObject table",
235 )
237 replica_skips_diaobjects: bool = Field(
238 default=False,
239 description=(
240 "If True then do not store DiaObjects when enable_replica is True "
241 "(DiaObjectsChunks has the same data)."
242 ),
243 )
245 replica_sub_chunk_count: int = Field(
246 default=64,
247 description="Number of sub-partitions in replica chunk tables.",
248 )
250 batch_statement_limit: int = Field(
251 default=65_535,
252 description=(
253 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535."
254 ),
255 )
257 batch_size_limit: int = Field(
258 default=1_000_000,
259 description=(
260 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. "
261 "Set to 0 or negative to disable this limit. "
262 "Server-side batch size warning threshold needs to be set to at least this value."
263 ),
264 )
266 @field_validator("ra_dec_columns")
267 @classmethod
268 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]:
269 # This validation method is needed in case we initialize model from
270 # JSON in strict mode, in that mode JSON list is rejected by default.
271 vtup = tuple(v)
272 if len(vtup) != 2:
273 raise ValueError("ra_dec_columns must have exactly two column names")
274 return vtup
276 def get_dbauth_alias(self) -> str:
277 """Return alias name for dbauth lookup.
279 Returns
280 -------
281 alias : `str`
282 Possibly empty alias name.
284 Notes
285 -----
286 If ``connection_config.dbauth_alias`` is set then it is returned.
287 Otherwise this method tries to guess whether the instance should be in
288 ``prod`` or ``dev`` cluster based on username and keyspace. It returns
289 "pp_apdb_prod_cluster" or "pp_apdb_dev_cluster" for those two, empty
290 string is returned when it cannot guess.
291 """
292 if self.connection_config.dbauth_alias:
293 return self.connection_config.dbauth_alias
295 if self.connection_config.username == "apdb-prod" and self.keyspace.startswith("pp_apdb_"):
296 return "pp_apdb_prod_cluster"
298 if (
299 self.connection_config.username == "apdb"
300 and self.keyspace.startswith("pp_apdb_")
301 and self.keyspace.endswith("_dev")
302 ):
303 return "pp_apdb_dev_cluster"
305 return ""
308class ApdbCassandraTimePartitionRange(BaseModel):
309 """Configuration of the time partitions, this is not user-configurable,
310 but it is reflected in metadata.
311 """
313 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json"
314 """Name of the metadata key to store time partition range."""
316 start: int = Field(
317 description="Start partition number for per-time-partition tables that exist in the schema."
318 )
320 end: int = Field(
321 description="End partition number (inclusive) for per-time-partition tables that exist in the schema."
322 )
324 def range(self) -> Iterator[int]:
325 """Generate a sequence of partition numbers."""
326 yield from range(self.start, self.end + 1)
328 @classmethod
329 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self:
330 """Read this configuration object from metadata table.
332 Parameters
333 ----------
334 metadata : `ApdbMetadataCassandra`
335 Metadata table.
337 Returns
338 -------
339 range : `ApdbCassandraTimePartitionRange`
340 Configuration retrieved from database.
341 """
342 time_partitions_str = metadata.get(cls.metadataTimePartitionKey)
343 if time_partitions_str is None:
344 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.")
345 time_partitions_json = json.loads(time_partitions_str)
346 return cls.model_validate(time_partitions_json)
348 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None:
349 """Save this configuration to metadata table.
351 Parameters
352 ----------
353 metadata : `ApdbMetadataCassandra`
354 Metadata table.
355 """
356 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)