Coverage for python/lsst/dax/apdb/cassandra/config.py: 64%

87 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 08:21 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCassandraConfig", 

26 "ApdbCassandraConnectionConfig", 

27 "ApdbCassandraPartitioningConfig", 

28 "ApdbCassandraTimePartitionRange", 

29] 

30 

31import json 

32from collections.abc import Iterable, Iterator 

33from typing import TYPE_CHECKING, Any, ClassVar, Self 

34 

35from pydantic import BaseModel, Field, field_validator 

36 

37# If cassandra-driver is not there the module can still be imported. 

38try: 

39 import cassandra 

40 

41 CASSANDRA_IMPORTED = True 

42except ImportError: 

43 CASSANDRA_IMPORTED = False 

44 

45 

46from ..config import ApdbConfig 

47 

48if TYPE_CHECKING: 

49 from .apdbMetadataCassandra import ApdbMetadataCassandra 

50 

51 

52class ApdbCassandraConnectionConfig(BaseModel): 

53 """Connection configuration for Cassandra APDB.""" 

54 

55 port: int = Field( 

56 default=9042, 

57 description="Port number to connect to.", 

58 ) 

59 

60 private_ips: tuple[str, ...] = Field( 

61 default=(), 

62 description="List of internal IP addresses for contact_points.", 

63 ) 

64 

65 username: str = Field( 

66 default="", 

67 description=( 

68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password." 

69 ), 

70 ) 

71 

72 dbauth_alias: str = Field( 

73 default="", 

74 description=( 

75 "If specified then this string will be used as a host name when checking credentials in " 

76 "db-auth.yaml in addition to regular host names in contact_points. For example if " 

77 "dbauth_alias='pp_apdb_prod_cluster' then the entry 'cassandra://pp_apdb_prod_cluster/' will " 

78 "match. Port number should not be used in that entry. Alias has higher priority than host names. " 

79 "If not specified, the default value depends on `username` and `keyspace` values." 

80 ), 

81 ) 

82 

83 read_consistency: str = Field( 

84 default="QUORUM", 

85 description="Name for consistency level of read operations, default: QUORUM, can be ONE.", 

86 ) 

87 

88 write_consistency: str = Field( 

89 default="QUORUM", 

90 description="Name for consistency level of write operations, default: QUORUM, can be ONE.", 

91 ) 

92 

93 read_timeout: float = Field( 

94 default=120.0, 

95 description="Timeout in seconds for read operations.", 

96 ) 

97 

98 write_timeout: float = Field( 

99 default=60.0, 

100 description="Timeout in seconds for write operations.", 

101 ) 

102 

103 remove_timeout: float = Field( 

104 default=600.0, 

105 description="Timeout in seconds for remove operations.", 

106 ) 

107 

108 read_concurrency: int = Field( 

109 default=500, 

110 description="Concurrency level for read operations.", 

111 ) 

112 

113 protocol_version: int = Field( 

114 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4, 

115 description="Cassandra protocol version to use, default is V4.", 

116 ) 

117 

118 extra_parameters: dict[str, Any] = Field( 

119 default={}, description="Additional keyword parameters passed to connect() method verbatim." 

120 ) 

121 

122 

123class ApdbCassandraPartitioningConfig(BaseModel): 

124 """Partitioning configuration for Cassandra APDB.""" 

125 

126 part_pixelization: str = Field( 

127 default="mq3c", 

128 description="Pixelization used for partitioning index.", 

129 ) 

130 

131 part_pix_level: int = Field( 

132 default=11, 

133 description="Pixelization level used for partitioning index.", 

134 ) 

135 

136 part_pix_max_ranges: int = Field( 

137 default=128, 

138 description="Max number of ranges in pixelization envelope", 

139 ) 

140 

141 time_partition_tables: bool = Field( 

142 default=False, 

143 description="Use per-partition tables for sources instead of partitioning by time", 

144 ) 

145 

146 time_partition_days: int = Field( 

147 default=30, 

148 description=( 

149 "Time partitioning granularity in days, this value must not be changed after database is " 

150 "initialized" 

151 ), 

152 ) 

153 

154 time_partition_start: str = Field( 

155 default="2018-12-01T00:00:00", 

156 description=( 

157 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

158 "This is used only when time_partition_tables is True." 

159 ), 

160 ) 

161 

162 time_partition_end: str = Field( 

163 default="2030-01-01T00:00:00", 

164 description=( 

165 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

166 "This is used only when time_partition_tables is True." 

167 ), 

168 ) 

169 

170 query_per_time_part: bool = Field( 

171 default=False, 

172 description=( 

173 "If True then build separate query for each time partition, otherwise build one single query. " 

174 "This is only used when time_partition_tables is False in schema config." 

175 ), 

176 ) 

177 

178 query_per_spatial_part: bool = Field( 

179 default=False, 

180 description="If True then build one query per spatial partition, otherwise build single query.", 

181 ) 

182 

183 num_part_dedup: int = Field( 

184 default=64, 

185 description="Number of partitions in DiaObjectDedup table.", 

186 ) 

187 

188 @field_validator("part_pixelization") 

189 @classmethod 

190 def check_pixelization(cls, v: str) -> str: 

191 allowed = {"htm", "q3c", "mq3c", "healpix"} 

192 if v not in allowed: 

193 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}") 

194 return v 

195 

196 

197class ApdbCassandraConfig(ApdbConfig): 

198 """Configuration class for Cassandra-based APDB implementation.""" 

199 

200 _implementation_type: ClassVar[str] = "cassandra" 

201 

202 contact_points: tuple[str, ...] = Field( 

203 default=("127.0.0.1",), 

204 description="The list of contact points to try connecting for cluster discovery.", 

205 ) 

206 

207 keyspace: str = Field( 

208 default="apdb", 

209 description="Keyspace name for APDB tables.", 

210 ) 

211 

212 connection_config: ApdbCassandraConnectionConfig = Field( 

213 default_factory=ApdbCassandraConnectionConfig, 

214 description="Database connection configuration", 

215 ) 

216 

217 partitioning: ApdbCassandraPartitioningConfig = Field( 

218 default_factory=ApdbCassandraPartitioningConfig, 

219 description="Configuration for partitioning.", 

220 ) 

221 

222 dia_object_columns: list[str] = Field( 

223 default=[], 

224 description="List of columns to read from DiaObject[Last], by default read all columns.", 

225 ) 

226 

227 prefix: str = Field( 

228 default="", 

229 description="Prefix to add to table names.", 

230 ) 

231 

232 ra_dec_columns: tuple[str, str] = Field( 

233 default=("ra", "dec"), 

234 description="Names of ra/dec columns in DiaObject table", 

235 ) 

236 

237 replica_skips_diaobjects: bool = Field( 

238 default=False, 

239 description=( 

240 "If True then do not store DiaObjects when enable_replica is True " 

241 "(DiaObjectsChunks has the same data)." 

242 ), 

243 ) 

244 

245 replica_sub_chunk_count: int = Field( 

246 default=64, 

247 description="Number of sub-partitions in replica chunk tables.", 

248 ) 

249 

250 batch_statement_limit: int = Field( 

251 default=65_535, 

252 description=( 

253 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535." 

254 ), 

255 ) 

256 

257 batch_size_limit: int = Field( 

258 default=1_000_000, 

259 description=( 

260 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. " 

261 "Set to 0 or negative to disable this limit. " 

262 "Server-side batch size warning threshold needs to be set to at least this value." 

263 ), 

264 ) 

265 

266 @field_validator("ra_dec_columns") 

267 @classmethod 

268 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]: 

269 # This validation method is needed in case we initialize model from 

270 # JSON in strict mode, in that mode JSON list is rejected by default. 

271 vtup = tuple(v) 

272 if len(vtup) != 2: 

273 raise ValueError("ra_dec_columns must have exactly two column names") 

274 return vtup 

275 

276 def get_dbauth_alias(self) -> str: 

277 """Return alias name for dbauth lookup. 

278 

279 Returns 

280 ------- 

281 alias : `str` 

282 Possibly empty alias name. 

283 

284 Notes 

285 ----- 

286 If ``connection_config.dbauth_alias`` is set then it is returned. 

287 Otherwise this method tries to guess whether the instance should be in 

288 ``prod`` or ``dev`` cluster based on username and keyspace. It returns 

289 "pp_apdb_prod_cluster" or "pp_apdb_dev_cluster" for those two, empty 

290 string is returned when it cannot guess. 

291 """ 

292 if self.connection_config.dbauth_alias: 

293 return self.connection_config.dbauth_alias 

294 

295 if self.connection_config.username == "apdb-prod" and self.keyspace.startswith("pp_apdb_"): 

296 return "pp_apdb_prod_cluster" 

297 

298 if ( 

299 self.connection_config.username == "apdb" 

300 and self.keyspace.startswith("pp_apdb_") 

301 and self.keyspace.endswith("_dev") 

302 ): 

303 return "pp_apdb_dev_cluster" 

304 

305 return "" 

306 

307 

308class ApdbCassandraTimePartitionRange(BaseModel): 

309 """Configuration of the time partitions, this is not user-configurable, 

310 but it is reflected in metadata. 

311 """ 

312 

313 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json" 

314 """Name of the metadata key to store time partition range.""" 

315 

316 start: int = Field( 

317 description="Start partition number for per-time-partition tables that exist in the schema." 

318 ) 

319 

320 end: int = Field( 

321 description="End partition number (inclusive) for per-time-partition tables that exist in the schema." 

322 ) 

323 

324 def range(self) -> Iterator[int]: 

325 """Generate a sequence of partition numbers.""" 

326 yield from range(self.start, self.end + 1) 

327 

328 @classmethod 

329 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self: 

330 """Read this configuration object from metadata table. 

331 

332 Parameters 

333 ---------- 

334 metadata : `ApdbMetadataCassandra` 

335 Metadata table. 

336 

337 Returns 

338 ------- 

339 range : `ApdbCassandraTimePartitionRange` 

340 Configuration retrieved from database. 

341 """ 

342 time_partitions_str = metadata.get(cls.metadataTimePartitionKey) 

343 if time_partitions_str is None: 

344 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.") 

345 time_partitions_json = json.loads(time_partitions_str) 

346 return cls.model_validate(time_partitions_json) 

347 

348 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None: 

349 """Save this configuration to metadata table. 

350 

351 Parameters 

352 ---------- 

353 metadata : `ApdbMetadataCassandra` 

354 Metadata table. 

355 """ 

356 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)