Coverage for python / lsst / dax / apdb / cassandra / config.py: 74%
79 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 01:08 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 01:08 -0700
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ApdbCassandraConfig",
26 "ApdbCassandraConnectionConfig",
27 "ApdbCassandraPartitioningConfig",
28 "ApdbCassandraTimePartitionRange",
29]
31import json
32from collections.abc import Iterable, Iterator
33from typing import TYPE_CHECKING, Any, ClassVar, Self
35from pydantic import BaseModel, Field, field_validator
37# If cassandra-driver is not there the module can still be imported.
38try:
39 import cassandra
41 CASSANDRA_IMPORTED = True
42except ImportError:
43 CASSANDRA_IMPORTED = False
46from ..config import ApdbConfig
48if TYPE_CHECKING:
49 from .apdbMetadataCassandra import ApdbMetadataCassandra
52class ApdbCassandraConnectionConfig(BaseModel):
53 """Connection configuration for Cassandra APDB."""
55 port: int = Field(
56 default=9042,
57 description="Port number to connect to.",
58 )
60 private_ips: tuple[str, ...] = Field(
61 default=(),
62 description="List of internal IP addresses for contact_points.",
63 )
65 username: str = Field(
66 default="",
67 description=(
68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password."
69 ),
70 )
72 dbauth_alias: str = Field(
73 default="",
74 description=(
75 "If specified then this string will be used to as a host name when checking credentials in "
76 "db-auth.yaml in addition to regular host names in contact_points. For example if "
77 "dbauth_alias='pp_apdb_prod_cluster' then the entry 'cassandra://pp_apdb_prod_cluster/' will "
78 "match. Port number should not be used in that entry. Alias has higher priority than host names."
79 ),
80 )
82 read_consistency: str = Field(
83 default="QUORUM",
84 description="Name for consistency level of read operations, default: QUORUM, can be ONE.",
85 )
87 write_consistency: str = Field(
88 default="QUORUM",
89 description="Name for consistency level of write operations, default: QUORUM, can be ONE.",
90 )
92 read_timeout: float = Field(
93 default=120.0,
94 description="Timeout in seconds for read operations.",
95 )
97 write_timeout: float = Field(
98 default=60.0,
99 description="Timeout in seconds for write operations.",
100 )
102 remove_timeout: float = Field(
103 default=600.0,
104 description="Timeout in seconds for remove operations.",
105 )
107 read_concurrency: int = Field(
108 default=500,
109 description="Concurrency level for read operations.",
110 )
112 protocol_version: int = Field(
113 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4,
114 description="Cassandra protocol version to use, default is V4.",
115 )
117 extra_parameters: dict[str, Any] = Field(
118 default={}, description="Additional keyword parameters passed to connect() method verbatim."
119 )
122class ApdbCassandraPartitioningConfig(BaseModel):
123 """Partitioning configuration for Cassandra APDB."""
125 part_pixelization: str = Field(
126 default="mq3c",
127 description="Pixelization used for partitioning index.",
128 )
130 part_pix_level: int = Field(
131 default=11,
132 description="Pixelization level used for partitioning index.",
133 )
135 part_pix_max_ranges: int = Field(
136 default=128,
137 description="Max number of ranges in pixelization envelope",
138 )
140 time_partition_tables: bool = Field(
141 default=False,
142 description="Use per-partition tables for sources instead of partitioning by time",
143 )
145 time_partition_days: int = Field(
146 default=30,
147 description=(
148 "Time partitioning granularity in days, this value must not be changed after database is "
149 "initialized"
150 ),
151 )
153 time_partition_start: str = Field(
154 default="2018-12-01T00:00:00",
155 description=(
156 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
157 "This is used only when time_partition_tables is True."
158 ),
159 )
161 time_partition_end: str = Field(
162 default="2030-01-01T00:00:00",
163 description=(
164 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
165 "This is used only when time_partition_tables is True."
166 ),
167 )
169 query_per_time_part: bool = Field(
170 default=False,
171 description=(
172 "If True then build separate query for each time partition, otherwise build one single query. "
173 "This is only used when time_partition_tables is False in schema config."
174 ),
175 )
177 query_per_spatial_part: bool = Field(
178 default=False,
179 description="If True then build one query per spatial partition, otherwise build single query.",
180 )
182 num_part_dedup: int = Field(
183 default=64,
184 description="Number of partitions in DiaObjectDedup table.",
185 )
187 @field_validator("part_pixelization")
188 @classmethod
189 def check_pixelization(cls, v: str) -> str:
190 allowed = {"htm", "q3c", "mq3c", "healpix"}
191 if v not in allowed:
192 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}")
193 return v
196class ApdbCassandraConfig(ApdbConfig):
197 """Configuration class for Cassandra-based APDB implementation."""
199 _implementation_type: ClassVar[str] = "cassandra"
201 contact_points: tuple[str, ...] = Field(
202 default=("127.0.0.1",),
203 description="The list of contact points to try connecting for cluster discovery.",
204 )
206 keyspace: str = Field(
207 default="apdb",
208 description="Keyspace name for APDB tables.",
209 )
211 connection_config: ApdbCassandraConnectionConfig = Field(
212 default_factory=ApdbCassandraConnectionConfig,
213 description="Database connection configuration",
214 )
216 partitioning: ApdbCassandraPartitioningConfig = Field(
217 default_factory=ApdbCassandraPartitioningConfig,
218 description="Configuration for partitioning.",
219 )
221 dia_object_columns: list[str] = Field(
222 default=[],
223 description="List of columns to read from DiaObject[Last], by default read all columns.",
224 )
226 prefix: str = Field(
227 default="",
228 description="Prefix to add to table names.",
229 )
231 ra_dec_columns: tuple[str, str] = Field(
232 default=("ra", "dec"),
233 description="Names of ra/dec columns in DiaObject table",
234 )
236 replica_skips_diaobjects: bool = Field(
237 default=False,
238 description=(
239 "If True then do not store DiaObjects when enable_replica is True "
240 "(DiaObjectsChunks has the same data)."
241 ),
242 )
244 replica_sub_chunk_count: int = Field(
245 default=64,
246 description="Number of sub-partitions in replica chunk tables.",
247 )
249 batch_statement_limit: int = Field(
250 default=65_535,
251 description=(
252 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535."
253 ),
254 )
256 batch_size_limit: int = Field(
257 default=1_000_000,
258 description=(
259 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. "
260 "Set to 0 or negative to disable this limit. "
261 "Server-side batch size warning threshold needs to be set to at least this value."
262 ),
263 )
265 @field_validator("ra_dec_columns")
266 @classmethod
267 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]:
268 # This validation method is needed in case we initialize model from
269 # JSON in strict mode, in that mode JSON list is rejected by default.
270 vtup = tuple(v)
271 if len(vtup) != 2:
272 raise ValueError("ra_dec_columns must have exactly two column names")
273 return vtup
276class ApdbCassandraTimePartitionRange(BaseModel):
277 """Configuration of the time partitions, this is not user-configurable,
278 but it is reflected in metadata.
279 """
281 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json"
282 """Name of the metadata key to store time partition range."""
284 start: int = Field(
285 description="Start partition number for per-time-partition tables that exist in the schema."
286 )
288 end: int = Field(
289 description="End partition number (inclusive) for per-time-partition tables that exist in the schema."
290 )
292 def range(self) -> Iterator[int]:
293 """Generate a sequence of partition numbers."""
294 yield from range(self.start, self.end + 1)
296 @classmethod
297 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self:
298 """Read this configuration object from metadata table.
300 Parameters
301 ----------
302 metadata : `ApdbMetadataCassandra`
303 Metadata table.
305 Returns
306 -------
307 range : `ApdbCassandraTimePartitionRange`
308 Configuration retrieved from database.
309 """
310 time_partitions_str = metadata.get(cls.metadataTimePartitionKey)
311 if time_partitions_str is None:
312 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.")
313 time_partitions_json = json.loads(time_partitions_str)
314 return cls.model_validate(time_partitions_json)
316 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None:
317 """Save this configuration to metadata table.
319 Parameters
320 ----------
321 metadata : `ApdbMetadataCassandra`
322 Metadata table.
323 """
324 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)