Coverage for tests / test_cli.py: 25%

228 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 01:22 -0700

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import logging 

23import os 

24import shutil 

25import tempfile 

26import unittest 

27from typing import Any 

28 

29import yaml 

30from sqlalchemy import create_engine, text 

31 

32import felis.tap_schema as tap_schema 

33from felis.datamodel import Schema 

34from felis.metadata import MetaDataBuilder 

35from felis.tests.run_cli import run_cli 

36 

37TEST_DIR = os.path.abspath(os.path.dirname(__file__)) 

38TEST_YAML = os.path.join(TEST_DIR, "data", "test.yml") 

39TEST_SALES_YAML = os.path.join(TEST_DIR, "data", "sales.yaml") 

40 

41 

42class CliTestCase(unittest.TestCase): 

43 """Tests for CLI commands.""" 

44 

45 def setUp(self) -> None: 

46 """Set up a temporary directory for tests.""" 

47 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR) 

48 self.sqlite_url = f"sqlite:///{self.tmpdir}/db.sqlite3" 

49 print(f"Using temporary directory: {self.tmpdir}") 

50 

51 def tearDown(self) -> None: 

52 """Clean up temporary directory.""" 

53 shutil.rmtree(self.tmpdir, ignore_errors=True) 

54 

55 def test_invalid_command(self) -> None: 

56 """Test for invalid command.""" 

57 run_cli(["invalid"], expect_error=True) 

58 

59 def test_help(self) -> None: 

60 """Test for help command.""" 

61 run_cli(["--help"], print_output=True) 

62 

63 def test_create(self) -> None: 

64 """Test for create command.""" 

65 run_cli(["create", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

66 

67 def test_create_with_echo(self) -> None: 

68 """Test for create command.""" 

69 run_cli(["create", "--echo", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

70 

71 def test_create_with_dry_run(self) -> None: 

72 """Test for ``create --dry-run`` command.""" 

73 run_cli(["create", "--schema-name=main", f"--engine-url={self.sqlite_url}", "--dry-run", TEST_YAML]) 

74 

75 def test_create_with_ignore_constraints(self) -> None: 

76 """Test ``--ignore-constraints`` flag of ``create`` command.""" 

77 run_cli( 

78 [ 

79 "create", 

80 "--schema-name=main", 

81 "--ignore-constraints", 

82 f"--engine-url={self.sqlite_url}", 

83 "--dry-run", 

84 TEST_YAML, 

85 ] 

86 ) 

87 

88 def test_validate(self) -> None: 

89 """Test validate command.""" 

90 run_cli(["validate", TEST_YAML]) 

91 

92 def test_validate_with_log_file(self) -> None: 

93 """Test validate command with log file.""" 

94 log_file = os.path.join(self.tmpdir, "validate.log") 

95 run_cli([f"--log-file={log_file}", "validate", TEST_YAML], log_level=logging.DEBUG, print_cmd=True) 

96 if not os.path.exists(log_file): 

97 self.fail("Log file was not created") 

98 if os.path.getsize(log_file) == 0: 

99 self.fail("Log file is empty") 

100 

101 def test_validate_with_id_generation(self) -> None: 

102 """Test that loading a schema with IDs works if ID generation is 

103 enabled. This is the default behavior. 

104 """ 

105 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml") 

106 run_cli(["--id-generation", "validate", test_yaml]) 

107 

108 def test_validate_with_id_generation_error(self) -> None: 

109 """Test that loading a schema without IDs fails if ID generation is not 

110 enabled. 

111 """ 

112 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml") 

113 run_cli(["--no-id-generation", "validate", test_yaml], expect_error=True) 

114 

115 def test_validate_with_extra_checks(self) -> None: 

116 """Test schema validation flags.""" 

117 run_cli( 

118 [ 

119 "validate", 

120 "--check-description", 

121 "--check-tap-principal", 

122 "--check-tap-table-indexes", 

123 TEST_YAML, 

124 ] 

125 ) 

126 

127 def test_create_with_initialize_and_drop_error(self) -> None: 

128 """Test that initialize and drop can't be used together.""" 

129 run_cli(["create", "--initialize", "--drop", TEST_YAML], expect_error=True) 

130 

131 def test_load_tap_schema(self) -> None: 

132 """Test load-tap-schema command.""" 

133 # Create the TAP_SCHEMA database. 

134 tap_schema_path = tap_schema.TableManager.get_tap_schema_std_path() 

135 run_cli(["--id-generation", "create", f"--engine-url={self.sqlite_url}", tap_schema_path]) 

136 

137 # Load the TAP_SCHEMA data. 

138 run_cli(["load-tap-schema", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

139 

140 def test_load_tap_schema_with_dry_run_and_output_file(self) -> None: 

141 """Test load-tap-schema command with dry run and output file.""" 

142 output_sql = os.path.join(self.tmpdir, "tap_schema.sql") 

143 run_cli( 

144 [ 

145 "load-tap-schema", 

146 "--engine-url=mysql://", 

147 "--dry-run", 

148 "--tap-schema-index=1", 

149 "--tap-tables-postfix=11", 

150 "--force-unbounded-arraysize", 

151 f"--output-file={output_sql}", 

152 TEST_YAML, 

153 ] 

154 ) 

155 if not os.path.exists(output_sql): 

156 self.fail("Output SQL file was not created") 

157 if os.path.getsize(output_sql) == 0: 

158 self.fail("Output SQL file is empty") 

159 

160 def test_init_tap_schema(self) -> None: 

161 """Test init-tap-schema command.""" 

162 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

163 

164 def test_init_tap_schema_mock(self) -> None: 

165 """Test init-tap-schema command with a mock URL, which should throw 

166 an error, as this is not supported. 

167 """ 

168 run_cli(["init-tap-schema", "sqlite://"], expect_error=True) 

169 

170 def test_init_tap_schema_with_extensions(self) -> None: 

171 """Test init-tap-schema command with default extensions.""" 

172 run_cli( 

173 [ 

174 "init-tap-schema", 

175 f"--engine-url={self.sqlite_url}", 

176 "--extensions", 

177 "resource://felis/config/tap_schema/tap_schema_extensions.yaml", 

178 ] 

179 ) 

180 

181 def test_init_tap_schema_with_custom_extensions(self) -> None: 

182 """Test init-tap-schema command with custom extensions file.""" 

183 extensions_file = os.path.join(self.tmpdir, "custom_extensions.yaml") 

184 extensions_content = """ 

185 name: TAP_SCHEMA 

186 tables: 

187 - name: schemas 

188 columns: 

189 - name: field1 

190 datatype: char 

191 length: 64 

192 description: A custom field 

193 """ 

194 with open(extensions_file, "w") as f: 

195 f.write(extensions_content) 

196 

197 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}", "--extensions", extensions_file]) 

198 

199 def test_diff(self) -> None: 

200 """Test for ``diff`` command.""" 

201 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

202 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

203 

204 run_cli(["diff", test_diff1, test_diff2]) 

205 

206 def test_diff_database(self) -> None: 

207 """Test for ``diff`` command with database.""" 

208 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

209 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

210 

211 engine = create_engine(self.sqlite_url) 

212 metadata_db = MetaDataBuilder(Schema.from_uri(test_diff1), apply_schema_to_metadata=False).build() 

213 metadata_db.create_all(engine) 

214 engine.dispose() 

215 

216 run_cli(["diff", f"--engine-url={self.sqlite_url}", test_diff2]) 

217 

218 def test_diff_alembic(self) -> None: 

219 """Test for ``diff`` command with ``--alembic`` comparator option.""" 

220 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

221 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

222 run_cli(["diff", "--comparator", "alembic", test_diff1, test_diff2], print_output=True) 

223 

224 def test_diff_error(self) -> None: 

225 """Test for ``diff`` command with bad arguments.""" 

226 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

227 run_cli(["diff", test_diff1], expect_error=True) 

228 

229 def test_diff_error_on_change(self) -> None: 

230 """Test for ``diff`` command with ``--error-on-change`` flag.""" 

231 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

232 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

233 run_cli(["diff", "--error-on-change", test_diff1, test_diff2], expect_error=True, print_output=True) 

234 

235 def test_dump_yaml(self) -> None: 

236 """Test for ``dump`` command with YAML output.""" 

237 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

238 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True) 

239 

240 @classmethod 

241 def _check_strip_ids(cls, obj: Any) -> None: 

242 """ 

243 Recursively check that a dict/list structure has no attributes with key 

244 '@id'. Raises a ValueError if any '@id' key is found. This is used to 

245 check the output of the `--strip-ids` option in the `dump` command. 

246 """ 

247 if isinstance(obj, dict): 

248 for k, v in obj.items(): 

249 if k == "@id": 

250 raise ValueError("Found forbidden key '@id'") 

251 cls._check_strip_ids(v) 

252 elif isinstance(obj, list): 

253 for item in obj: 

254 cls._check_strip_ids(item) 

255 

256 def test_dump_yaml_with_strip_ids(self) -> None: 

257 """Test for ``dump`` command with YAML output and stripped IDs.""" 

258 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

259 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True) 

260 dumped_data = temp_file.read().decode("utf-8") 

261 try: 

262 # Load the dumped YAML data to check for '@id' keys. 

263 data = yaml.safe_load(dumped_data) 

264 self._check_strip_ids(data) 

265 except ValueError: 

266 self.fail("Dumped YAML contains forbidden key '@id'") 

267 

268 def test_dump_json(self) -> None: 

269 """Test for ``dump`` command with JSON output.""" 

270 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

271 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True) 

272 

273 def test_dump_with_dereference_resources_and_sort_columns(self) -> None: 

274 """Test dump with both --dereference-resources and --sort-columns.""" 

275 # Define a source schema with columns in non-alphabetical order 

276 source_schema_content = """ 

277name: base_schema 

278tables: 

279- name: base_table 

280 columns: 

281 - name: zebra_col 

282 datatype: string 

283 length: 32 

284 - name: alpha_col 

285 datatype: int 

286 - name: middle_col 

287 datatype: float 

288""" 

289 source_path = os.path.join(self.tmpdir, "base_schema.yaml") 

290 with open(source_path, "w") as f: 

291 f.write(source_schema_content.strip()) 

292 

293 # Define a referencing schema that pulls columns via columnRefs 

294 ref_schema_content = f""" 

295name: derived_schema 

296resources: 

297 base_schema: 

298 uri: {source_path} 

299tables: 

300- name: derived_table 

301 columnRefs: 

302 base_schema: 

303 base_table: 

304 zebra_col: 

305 alpha_col: 

306 middle_col: 

307""" 

308 ref_path = os.path.join(self.tmpdir, "derived_schema.yaml") 

309 with open(ref_path, "w") as f: 

310 f.write(ref_schema_content.strip()) 

311 

312 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml", dir=self.tmpdir) as temp_file: 

313 run_cli( 

314 [ 

315 "dump", 

316 "--dereference-resources", 

317 "--sort-columns", 

318 ref_path, 

319 temp_file.name, 

320 ], 

321 print_output=True, 

322 ) 

323 dumped_data = temp_file.read().decode("utf-8") 

324 data = yaml.safe_load(dumped_data) 

325 

326 # Verify resources are dereferenced (no columnRefs remain) 

327 for table in data.get("tables", []): 

328 self.assertNotIn("columnRefs", table) 

329 # Verify columns are present and sorted alphabetically 

330 columns = table.get("columns", []) 

331 self.assertGreater(len(columns), 0) 

332 names = [col["name"] for col in columns] 

333 self.assertEqual(names, sorted(names)) 

334 

335 def test_dump_json_with_strip_ids(self) -> None: 

336 """Test for ``dump`` command with JSON output.""" 

337 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

338 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True) 

339 dumped_data = temp_file.read().decode("utf-8") 

340 try: 

341 # Load the dumped YAML data to check for '@id' keys. 

342 data = yaml.safe_load(dumped_data) 

343 self._check_strip_ids(data) 

344 except ValueError: 

345 self.fail("Dumped YAML contains forbidden key '@id'") 

346 

347 @classmethod 

348 def _check_columns_sorted(cls, data: dict[str, Any]) -> None: 

349 """Check that columns in each table are sorted alphabetically by 

350 name. 

351 """ 

352 for table in data.get("tables", []): 

353 columns = table.get("columns", []) 

354 names = [col["name"] for col in columns] 

355 assert names == sorted(names), f"Columns not sorted in table {table.get('name')}: {names}" 

356 

357 def test_dump_yaml_with_sort_columns(self) -> None: 

358 """Test for ``dump`` command with YAML output and sorted columns.""" 

359 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

360 run_cli(["dump", "--sort-columns", TEST_YAML, temp_file.name], print_output=True) 

361 dumped_data = temp_file.read().decode("utf-8") 

362 data = yaml.safe_load(dumped_data) 

363 self._check_columns_sorted(data) 

364 

365 def test_dump_json_with_sort_columns(self) -> None: 

366 """Test for ``dump`` command with JSON output and sorted columns.""" 

367 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

368 run_cli(["dump", "--sort-columns", TEST_YAML, temp_file.name], print_output=True) 

369 dumped_data = temp_file.read().decode("utf-8") 

370 data = yaml.safe_load(dumped_data) 

371 self._check_columns_sorted(data) 

372 

373 def test_dump_with_invalid_file_extension_error(self) -> None: 

374 """Test for ``dump`` command with JSON output.""" 

375 run_cli(["dump", TEST_YAML, "out.bad"], expect_error=True) 

376 

377 def test_create_and_drop_indexes(self) -> None: 

378 """Test creating and dropping indexes using CLI commands with 

379 SQLite; no checking for the existence of the indexes is done on the 

380 database because other test cases cover that functionality 

381 sufficiently. 

382 """ 

383 # Create database without indexes 

384 run_cli(["create", "--skip-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML]) 

385 

386 # Create the indexes using CLI 

387 run_cli( 

388 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"] 

389 ) 

390 

391 # Create the indexes again; should not cause an error 

392 run_cli( 

393 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"] 

394 ) 

395 

396 # Drop the indexes using CLI 

397 run_cli(["drop-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"]) 

398 

399 def test_generate_and_load_sql(self) -> None: 

400 """Test generating SQL and then executing it on a SQLite database.""" 

401 generated_sql = os.path.join(self.tmpdir, "generated.sql") 

402 

403 try: 

404 # Generate SQL DDL from schema using mock connection 

405 run_cli( 

406 [ 

407 "create", 

408 "--engine-url=sqlite://", 

409 f"--output-file={generated_sql}", 

410 f"{TEST_YAML}", 

411 ] 

412 ) 

413 

414 # Verify the SQL file was generated 

415 self.assertTrue(os.path.exists(generated_sql), "Generated SQL file should exist") 

416 

417 # Read the generated SQL 

418 with open(generated_sql) as f: 

419 sql = f.read() 

420 

421 # Verify SQL content is not empty 

422 self.assertGreater(len(sql.strip()), 0, "Generated SQL should not be empty") 

423 

424 # Execute the SQL against a real database 

425 engine = create_engine(self.sqlite_url) 

426 with engine.connect() as connection: 

427 with connection.begin(): 

428 # Split SQL into individual statements for execution since 

429 # SQLite can only execute one statement at a time 

430 statements = [stmt.strip() for stmt in sql.split(";") if stmt.strip()] 

431 for statement in statements: 

432 if statement: # Skip empty statements 

433 connection.execute(text(statement)) 

434 

435 # Verify that all expected tables were actually created 

436 with engine.connect() as connection: 

437 # Load the schema to get expected table names 

438 schema = Schema.from_uri(TEST_YAML, context={"id_generation": True}) 

439 expected_table_names = {table.name for table in schema.tables} 

440 

441 # Get all tables that were created in the database 

442 result = connection.execute(text("SELECT name FROM sqlite_master WHERE type='table'")) 

443 created_table_names = {row[0] for row in result.fetchall()} 

444 

445 # Verify all expected tables were created 

446 self.assertTrue( 

447 expected_table_names.issubset(created_table_names), 

448 f"Missing tables: {expected_table_names - created_table_names}. " 

449 f"Expected: {sorted(expected_table_names)}, " 

450 f"Created: {sorted(created_table_names)}", 

451 ) 

452 

453 engine.dispose() 

454 

455 except Exception as e: 

456 self.fail(f"Test failed with exception: {e}") 

457 

458 

459class ColumnRefsTestCase(unittest.TestCase): 

460 """Test handling of column references in CLI.""" 

461 

462 def setUp(self) -> None: 

463 """Set up a temporary directory for tests.""" 

464 self.temp_dir = tempfile.mkdtemp(dir=TEST_DIR) 

465 self.sqlite_url = f"sqlite:///{self.temp_dir}/db.sqlite3" 

466 

467 # Write out source schema file 

468 source_schema_content = """ 

469name: source_schema 

470tables: 

471- name: source_table 

472 columns: 

473 - name: ref_col1 

474 datatype: int 

475 - name: ref_col2 

476 datatype: string 

477 length: 64 

478 - name: ref_col3 

479 datatype: float 

480""" 

481 source_schema_path = os.path.join(self.temp_dir, "source_schema.yaml") 

482 with open(source_schema_path, "w") as f: 

483 f.write(source_schema_content.strip()) 

484 

485 # Write out referencing schema file 

486 ref_schema_content = """ 

487name: ref_schema 

488resources: 

489 source_schema: 

490 uri: {resource_path} 

491tables: 

492- name: ref_table 

493 columnRefs: 

494 source_schema: 

495 source_table: 

496 ref_col1: 

497 ref_col2: 

498 overrides: 

499 tap:column_index: 15 

500 col3: 

501 ref_name: ref_col3 

502""" 

503 self.ref_schema_path = os.path.join(self.temp_dir, "ref_schema.yaml") 

504 ref_content = ref_schema_content.format(resource_path=source_schema_path) 

505 with open(self.ref_schema_path, "w") as f: 

506 f.write(ref_content.strip()) 

507 

508 def tearDown(self) -> None: 

509 """Clean up temporary directory.""" 

510 shutil.rmtree(self.temp_dir, ignore_errors=True) 

511 

512 def test_validate_with_column_ref_index_increment(self) -> None: 

513 """Test that passing a valid value for column reference index increment 

514 works. 

515 """ 

516 run_cli( 

517 [ 

518 "--column-ref-index-increment=1", 

519 "validate", 

520 self.ref_schema_path, 

521 ] 

522 ) 

523 

524 def test_validate_with_column_ref_index_increment_error(self) -> None: 

525 """Test that passing an invalid value for column reference index raises 

526 an error. 

527 """ 

528 run_cli( 

529 [ 

530 "--column-ref-index-increment=-1", 

531 "validate", 

532 self.ref_schema_path, 

533 ], 

534 expect_error=True, 

535 ) 

536 

537 def test_load_tap_schema_with_column_refs(self) -> None: 

538 """Test load-tap-schema command with column reference index 

539 increment. 

540 """ 

541 # Create the TAP_SCHEMA database 

542 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

543 

544 # Load the TAP_SCHEMA data that includes column references 

545 run_cli( 

546 [ 

547 "load-tap-schema", 

548 f"--engine-url={self.sqlite_url}", 

549 self.ref_schema_path, 

550 ] 

551 ) 

552 

553 def test_load_tap_schema_with_column_ref_index_increment(self) -> None: 

554 """Test load-tap-schema command with column reference index 

555 increment. 

556 """ 

557 # Create the TAP_SCHEMA database 

558 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

559 

560 # Load the TAP_SCHEMA data that includes column reference index 

561 # increment 

562 run_cli( 

563 [ 

564 "--column-ref-index-increment=1", 

565 "load-tap-schema", 

566 f"--engine-url={self.sqlite_url}", 

567 self.ref_schema_path, 

568 ] 

569 ) 

570 

571 

572if __name__ == "__main__": 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 unittest.main()