Coverage for python / lsst / dax / apdb / cassandra / config.py: 74%

79 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-20 01:08 -0700

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCassandraConfig", 

26 "ApdbCassandraConnectionConfig", 

27 "ApdbCassandraPartitioningConfig", 

28 "ApdbCassandraTimePartitionRange", 

29] 

30 

31import json 

32from collections.abc import Iterable, Iterator 

33from typing import TYPE_CHECKING, Any, ClassVar, Self 

34 

35from pydantic import BaseModel, Field, field_validator 

36 

37# If cassandra-driver is not there the module can still be imported. 

38try: 

39 import cassandra 

40 

41 CASSANDRA_IMPORTED = True 

42except ImportError: 

43 CASSANDRA_IMPORTED = False 

44 

45 

46from ..config import ApdbConfig 

47 

48if TYPE_CHECKING: 

49 from .apdbMetadataCassandra import ApdbMetadataCassandra 

50 

51 

52class ApdbCassandraConnectionConfig(BaseModel): 

53 """Connection configuration for Cassandra APDB.""" 

54 

55 port: int = Field( 

56 default=9042, 

57 description="Port number to connect to.", 

58 ) 

59 

60 private_ips: tuple[str, ...] = Field( 

61 default=(), 

62 description="List of internal IP addresses for contact_points.", 

63 ) 

64 

65 username: str = Field( 

66 default="", 

67 description=( 

68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password." 

69 ), 

70 ) 

71 

72 dbauth_alias: str = Field( 

73 default="", 

74 description=( 

75 "If specified then this string will be used to as a host name when checking credentials in " 

76 "db-auth.yaml in addition to regular host names in contact_points. For example if " 

77 "dbauth_alias='pp_apdb_prod_cluster' then the entry 'cassandra://pp_apdb_prod_cluster/' will " 

78 "match. Port number should not be used in that entry. Alias has higher priority than host names." 

79 ), 

80 ) 

81 

82 read_consistency: str = Field( 

83 default="QUORUM", 

84 description="Name for consistency level of read operations, default: QUORUM, can be ONE.", 

85 ) 

86 

87 write_consistency: str = Field( 

88 default="QUORUM", 

89 description="Name for consistency level of write operations, default: QUORUM, can be ONE.", 

90 ) 

91 

92 read_timeout: float = Field( 

93 default=120.0, 

94 description="Timeout in seconds for read operations.", 

95 ) 

96 

97 write_timeout: float = Field( 

98 default=60.0, 

99 description="Timeout in seconds for write operations.", 

100 ) 

101 

102 remove_timeout: float = Field( 

103 default=600.0, 

104 description="Timeout in seconds for remove operations.", 

105 ) 

106 

107 read_concurrency: int = Field( 

108 default=500, 

109 description="Concurrency level for read operations.", 

110 ) 

111 

112 protocol_version: int = Field( 

113 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4, 

114 description="Cassandra protocol version to use, default is V4.", 

115 ) 

116 

117 extra_parameters: dict[str, Any] = Field( 

118 default={}, description="Additional keyword parameters passed to connect() method verbatim." 

119 ) 

120 

121 

122class ApdbCassandraPartitioningConfig(BaseModel): 

123 """Partitioning configuration for Cassandra APDB.""" 

124 

125 part_pixelization: str = Field( 

126 default="mq3c", 

127 description="Pixelization used for partitioning index.", 

128 ) 

129 

130 part_pix_level: int = Field( 

131 default=11, 

132 description="Pixelization level used for partitioning index.", 

133 ) 

134 

135 part_pix_max_ranges: int = Field( 

136 default=128, 

137 description="Max number of ranges in pixelization envelope", 

138 ) 

139 

140 time_partition_tables: bool = Field( 

141 default=False, 

142 description="Use per-partition tables for sources instead of partitioning by time", 

143 ) 

144 

145 time_partition_days: int = Field( 

146 default=30, 

147 description=( 

148 "Time partitioning granularity in days, this value must not be changed after database is " 

149 "initialized" 

150 ), 

151 ) 

152 

153 time_partition_start: str = Field( 

154 default="2018-12-01T00:00:00", 

155 description=( 

156 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

157 "This is used only when time_partition_tables is True." 

158 ), 

159 ) 

160 

161 time_partition_end: str = Field( 

162 default="2030-01-01T00:00:00", 

163 description=( 

164 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

165 "This is used only when time_partition_tables is True." 

166 ), 

167 ) 

168 

169 query_per_time_part: bool = Field( 

170 default=False, 

171 description=( 

172 "If True then build separate query for each time partition, otherwise build one single query. " 

173 "This is only used when time_partition_tables is False in schema config." 

174 ), 

175 ) 

176 

177 query_per_spatial_part: bool = Field( 

178 default=False, 

179 description="If True then build one query per spatial partition, otherwise build single query.", 

180 ) 

181 

182 num_part_dedup: int = Field( 

183 default=64, 

184 description="Number of partitions in DiaObjectDedup table.", 

185 ) 

186 

187 @field_validator("part_pixelization") 

188 @classmethod 

189 def check_pixelization(cls, v: str) -> str: 

190 allowed = {"htm", "q3c", "mq3c", "healpix"} 

191 if v not in allowed: 

192 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}") 

193 return v 

194 

195 

196class ApdbCassandraConfig(ApdbConfig): 

197 """Configuration class for Cassandra-based APDB implementation.""" 

198 

199 _implementation_type: ClassVar[str] = "cassandra" 

200 

201 contact_points: tuple[str, ...] = Field( 

202 default=("127.0.0.1",), 

203 description="The list of contact points to try connecting for cluster discovery.", 

204 ) 

205 

206 keyspace: str = Field( 

207 default="apdb", 

208 description="Keyspace name for APDB tables.", 

209 ) 

210 

211 connection_config: ApdbCassandraConnectionConfig = Field( 

212 default_factory=ApdbCassandraConnectionConfig, 

213 description="Database connection configuration", 

214 ) 

215 

216 partitioning: ApdbCassandraPartitioningConfig = Field( 

217 default_factory=ApdbCassandraPartitioningConfig, 

218 description="Configuration for partitioning.", 

219 ) 

220 

221 dia_object_columns: list[str] = Field( 

222 default=[], 

223 description="List of columns to read from DiaObject[Last], by default read all columns.", 

224 ) 

225 

226 prefix: str = Field( 

227 default="", 

228 description="Prefix to add to table names.", 

229 ) 

230 

231 ra_dec_columns: tuple[str, str] = Field( 

232 default=("ra", "dec"), 

233 description="Names of ra/dec columns in DiaObject table", 

234 ) 

235 

236 replica_skips_diaobjects: bool = Field( 

237 default=False, 

238 description=( 

239 "If True then do not store DiaObjects when enable_replica is True " 

240 "(DiaObjectsChunks has the same data)." 

241 ), 

242 ) 

243 

244 replica_sub_chunk_count: int = Field( 

245 default=64, 

246 description="Number of sub-partitions in replica chunk tables.", 

247 ) 

248 

249 batch_statement_limit: int = Field( 

250 default=65_535, 

251 description=( 

252 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535." 

253 ), 

254 ) 

255 

256 batch_size_limit: int = Field( 

257 default=1_000_000, 

258 description=( 

259 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. " 

260 "Set to 0 or negative to disable this limit. " 

261 "Server-side batch size warning threshold needs to be set to at least this value." 

262 ), 

263 ) 

264 

265 @field_validator("ra_dec_columns") 

266 @classmethod 

267 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]: 

268 # This validation method is needed in case we initialize model from 

269 # JSON in strict mode, in that mode JSON list is rejected by default. 

270 vtup = tuple(v) 

271 if len(vtup) != 2: 

272 raise ValueError("ra_dec_columns must have exactly two column names") 

273 return vtup 

274 

275 

276class ApdbCassandraTimePartitionRange(BaseModel): 

277 """Configuration of the time partitions, this is not user-configurable, 

278 but it is reflected in metadata. 

279 """ 

280 

281 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json" 

282 """Name of the metadata key to store time partition range.""" 

283 

284 start: int = Field( 

285 description="Start partition number for per-time-partition tables that exist in the schema." 

286 ) 

287 

288 end: int = Field( 

289 description="End partition number (inclusive) for per-time-partition tables that exist in the schema." 

290 ) 

291 

292 def range(self) -> Iterator[int]: 

293 """Generate a sequence of partition numbers.""" 

294 yield from range(self.start, self.end + 1) 

295 

296 @classmethod 

297 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self: 

298 """Read this configuration object from metadata table. 

299 

300 Parameters 

301 ---------- 

302 metadata : `ApdbMetadataCassandra` 

303 Metadata table. 

304 

305 Returns 

306 ------- 

307 range : `ApdbCassandraTimePartitionRange` 

308 Configuration retrieved from database. 

309 """ 

310 time_partitions_str = metadata.get(cls.metadataTimePartitionKey) 

311 if time_partitions_str is None: 

312 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.") 

313 time_partitions_json = json.loads(time_partitions_str) 

314 return cls.model_validate(time_partitions_json) 

315 

316 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None: 

317 """Save this configuration to metadata table. 

318 

319 Parameters 

320 ---------- 

321 metadata : `ApdbMetadataCassandra` 

322 Metadata table. 

323 """ 

324 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)