Coverage for tests/test_datastore.py: 12%

1297 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 01:35 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import contextlib 

31import os 

32import pickle 

33import shutil 

34import tempfile 

35import time 

36import unittest 

37import unittest.mock 

38import uuid 

39from collections.abc import Callable, Iterator 

40from typing import Any, cast 

41 

42import yaml 

43 

44import lsst.utils.tests 

45from lsst.daf.butler import ( 

46 Config, 

47 DataCoordinate, 

48 DatasetIdGenEnum, 

49 DatasetRef, 

50 DatasetType, 

51 DatasetTypeNotSupportedError, 

52 Datastore, 

53 DimensionUniverse, 

54 FileDataset, 

55 StorageClass, 

56 StorageClassFactory, 

57) 

58from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreValidationError, NullDatastore 

59from lsst.daf.butler.datastore.cache_manager import ( 

60 DatastoreCacheManager, 

61 DatastoreCacheManagerConfig, 

62 DatastoreDisabledCacheManager, 

63) 

64from lsst.daf.butler.datastore.record_data import ( 

65 DatastoreRecordData, 

66 DatastoreRecordTable, 

67 SerializedDatastoreRecordData, 

68) 

69from lsst.daf.butler.datastore.stored_file_info import ( 

70 StoredFileInfo, 

71 StoredFileInfoTable, 

72 make_datastore_path_relative, 

73) 

74from lsst.daf.butler.formatters.yaml import YamlFormatter 

75from lsst.daf.butler.tests import ( 

76 BadNoWriteFormatter, 

77 BadWriteFormatter, 

78 DatasetTestHelper, 

79 DatastoreTestHelper, 

80 DummyRegistry, 

81 MetricsExample, 

82 MetricsExampleDataclass, 

83 MetricsExampleModel, 

84) 

85from lsst.daf.butler.tests.dict_convertible_model import DictConvertibleModel 

86from lsst.daf.butler.tests.utils import TestCaseMixin 

87from lsst.resources import ResourcePath 

88from lsst.utils import doImport 

89from lsst.utils.introspection import get_full_type_name 

90 

91TESTDIR = os.path.dirname(__file__) 

92 

93 

94def makeExampleMetrics(use_none: bool = False) -> MetricsExample: 

95 """Make example dataset that can be stored in butler.""" 

96 if use_none: 

97 array = None 

98 else: 

99 array = [563, 234, 456.7, 105, 2054, -1045] 

100 return MetricsExample( 

101 {"AM1": 5.2, "AM2": 30.6}, 

102 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

103 array, 

104 ) 

105 

106 

107class TransactionTestError(Exception): 

108 """Specific error for transactions, to prevent misdiagnosing 

109 that might otherwise occur when a standard exception is used. 

110 """ 

111 

112 pass 

113 

114 

115class DatastoreTestsBase(DatasetTestHelper, DatastoreTestHelper, TestCaseMixin): 

116 """Support routines for datastore testing""" 

117 

118 root: str | None = None 

119 universe: DimensionUniverse 

120 storageClassFactory: StorageClassFactory 

121 

122 @classmethod 

123 def setUpClass(cls) -> None: 

124 # Storage Classes are fixed for all datastores in these tests 

125 scConfigFile = os.path.join(TESTDIR, "config/basic/storageClasses.yaml") 

126 cls.storageClassFactory = StorageClassFactory() 

127 cls.storageClassFactory.addFromConfig(scConfigFile) 

128 

129 # Read the Datastore config so we can get the class 

130 # information (since we should not assume the constructor 

131 # name here, but rely on the configuration file itself) 

132 datastoreConfig = DatastoreConfig(cls.configFile) 

133 cls.datastoreType = cast(type[Datastore], doImport(datastoreConfig["cls"])) 

134 cls.universe = DimensionUniverse() 

135 

136 def setUp(self) -> None: 

137 self.setUpDatastoreTests(DummyRegistry, DatastoreConfig) 

138 

139 def tearDown(self) -> None: 

140 if self.root is not None and os.path.exists(self.root): 

141 shutil.rmtree(self.root, ignore_errors=True) 

142 

143 

144class DatastoreTests(DatastoreTestsBase): 

145 """Some basic tests of a simple datastore.""" 

146 

147 hasUnsupportedPut = True 

148 rootKeys: tuple[str, ...] | None = None 

149 isEphemeral: bool = False 

150 validationCanFail: bool = False 

151 

152 def testConfigRoot(self) -> None: 

153 full = DatastoreConfig(self.configFile) 

154 config = DatastoreConfig(self.configFile, mergeDefaults=False) 

155 newroot = "/random/location" 

156 self.datastoreType.setConfigRoot(newroot, config, full) 

157 if self.rootKeys: 

158 for k in self.rootKeys: 

159 self.assertIn(newroot, config[k]) 

160 

161 def testConstructor(self) -> None: 

162 datastore = self.makeDatastore() 

163 self.assertIsNotNone(datastore) 

164 self.assertIs(datastore.isEphemeral, self.isEphemeral) 

165 

166 def testConfigurationValidation(self) -> None: 

167 datastore = self.makeDatastore() 

168 sc = self.storageClassFactory.getStorageClass("ThingOne") 

169 datastore.validateConfiguration([sc]) 

170 

171 sc2 = self.storageClassFactory.getStorageClass("ThingTwo") 

172 if self.validationCanFail: 

173 with self.assertRaises(DatastoreValidationError): 

174 datastore.validateConfiguration([sc2], logFailures=True) 

175 

176 dimensions = self.universe.conform(("visit", "physical_filter")) 

177 dataId = { 

178 "instrument": "dummy", 

179 "visit": 52, 

180 "physical_filter": "V", 

181 "band": "v", 

182 "day_obs": 20250101, 

183 } 

184 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

185 datastore.validateConfiguration([ref]) 

186 

187 def testParameterValidation(self) -> None: 

188 """Check that parameters are validated""" 

189 sc = self.storageClassFactory.getStorageClass("ThingOne") 

190 dimensions = self.universe.conform(("visit", "physical_filter")) 

191 dataId = { 

192 "instrument": "dummy", 

193 "visit": 52, 

194 "physical_filter": "V", 

195 "band": "v", 

196 "day_obs": 20250101, 

197 } 

198 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

199 datastore = self.makeDatastore() 

200 data = {1: 2, 3: 4} 

201 datastore.put(data, ref) 

202 newdata = datastore.get(ref) 

203 self.assertEqual(data, newdata) 

204 with self.assertRaises(KeyError): 

205 newdata = datastore.get(ref, parameters={"missing": 5}) 

206 

207 def testBasicPutGet(self) -> None: 

208 metrics = makeExampleMetrics() 

209 datastore = self.makeDatastore() 

210 

211 # Create multiple storage classes for testing different formulations 

212 storageClasses = [ 

213 self.storageClassFactory.getStorageClass(sc) 

214 for sc in ("StructuredData", "StructuredDataJson", "StructuredDataPickle") 

215 ] 

216 

217 dimensions = self.universe.conform(("visit", "physical_filter")) 

218 dataId = { 

219 "instrument": "dummy", 

220 "visit": 52, 

221 "physical_filter": "V", 

222 "band": "v", 

223 "day_obs": 20250101, 

224 } 

225 dataId2 = { 

226 "instrument": "dummy", 

227 "visit": 53, 

228 "physical_filter": "V", 

229 "band": "v", 

230 "day_obs": 20250101, 

231 } 

232 

233 for sc in storageClasses: 

234 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

235 ref2 = self.makeDatasetRef("metric", dimensions, sc, dataId2) 

236 

237 # Make sure that using getManyURIs without predicting before the 

238 # dataset has been put raises. 

239 with self.assertRaises(FileNotFoundError): 

240 datastore.getManyURIs([ref], predict=False) 

241 

242 # Make sure that using getManyURIs with predicting before the 

243 # dataset has been put predicts the URI. 

244 uris = datastore.getManyURIs([ref, ref2], predict=True) 

245 self.assertIn("52", uris[ref].primaryURI.geturl()) 

246 self.assertIn("#predicted", uris[ref].primaryURI.geturl()) 

247 self.assertIn("53", uris[ref2].primaryURI.geturl()) 

248 self.assertIn("#predicted", uris[ref2].primaryURI.geturl()) 

249 

250 datastore.put(metrics, ref) 

251 

252 # Does it exist? 

253 self.assertTrue(datastore.exists(ref)) 

254 self.assertTrue(datastore.knows(ref)) 

255 multi = datastore.knows_these([ref]) 

256 self.assertTrue(multi[ref]) 

257 multi = datastore.mexists([ref, ref2]) 

258 self.assertTrue(multi[ref]) 

259 self.assertFalse(multi[ref2]) 

260 

261 # Get 

262 metricsOut = datastore.get(ref, parameters=None) 

263 self.assertEqual(metrics, metricsOut) 

264 

265 uri = datastore.getURI(ref) 

266 self.assertEqual(uri.scheme, self.uriScheme) 

267 

268 uris = datastore.getManyURIs([ref]) 

269 self.assertEqual(len(uris), 1) 

270 ref, uri = uris.popitem() 

271 self.assertTrue(uri.primaryURI.exists()) 

272 self.assertFalse(uri.componentURIs) 

273 

274 # Get a component -- we need to construct new refs for them 

275 # with derived storage classes but with parent ID 

276 for comp in ("data", "output"): 

277 compRef = ref.makeComponentRef(comp) 

278 output = datastore.get(compRef) 

279 self.assertEqual(output, getattr(metricsOut, comp)) 

280 

281 uri = datastore.getURI(compRef) 

282 self.assertEqual(uri.scheme, self.uriScheme) 

283 

284 uris = datastore.getManyURIs([compRef]) 

285 self.assertEqual(len(uris), 1) 

286 

287 storageClass = sc 

288 

289 # Check that we can put a metric with None in a component and 

290 # get it back as None 

291 metricsNone = makeExampleMetrics(use_none=True) 

292 dataIdNone = { 

293 "instrument": "dummy", 

294 "visit": 54, 

295 "physical_filter": "V", 

296 "band": "v", 

297 "day_obs": 20250101, 

298 } 

299 refNone = self.makeDatasetRef("metric", dimensions, sc, dataIdNone) 

300 datastore.put(metricsNone, refNone) 

301 

302 comp = "data" 

303 for comp in ("data", "output"): 

304 compRef = refNone.makeComponentRef(comp) 

305 output = datastore.get(compRef) 

306 self.assertEqual(output, getattr(metricsNone, comp)) 

307 

308 # Check that a put fails if the dataset type is not supported 

309 if self.hasUnsupportedPut: 

310 sc = StorageClass("UnsupportedSC", pytype=type(metrics)) 

311 ref = self.makeDatasetRef("unsupportedType", dimensions, sc, dataId) 

312 with self.assertRaises(DatasetTypeNotSupportedError): 

313 datastore.put(metrics, ref) 

314 

315 # These should raise 

316 ref = self.makeDatasetRef("metrics", dimensions, storageClass, dataId) 

317 with self.assertRaises(FileNotFoundError): 

318 # non-existing file 

319 datastore.get(ref) 

320 

321 # Get a URI from it 

322 uri = datastore.getURI(ref, predict=True) 

323 self.assertEqual(uri.scheme, self.uriScheme) 

324 

325 with self.assertRaises(FileNotFoundError): 

326 datastore.getURI(ref) 

327 

328 def testTrustGetRequest(self) -> None: 

329 """Check that we can get datasets that registry knows nothing about.""" 

330 datastore = self.makeDatastore() 

331 

332 # Skip test if the attribute is not defined 

333 if not hasattr(datastore, "trustGetRequest"): 

334 return 

335 

336 metrics = makeExampleMetrics() 

337 

338 i = 0 

339 for sc_name in ("StructuredDataNoComponents", "StructuredData", "StructuredComposite"): 

340 i += 1 

341 datasetTypeName = f"test_metric{i}" # Different dataset type name each time. 

342 

343 if sc_name == "StructuredComposite": 

344 disassembled = True 

345 else: 

346 disassembled = False 

347 

348 # Start datastore in default configuration of using registry 

349 datastore.trustGetRequest = False 

350 

351 # Create multiple storage classes for testing with or without 

352 # disassembly 

353 sc = self.storageClassFactory.getStorageClass(sc_name) 

354 dimensions = self.universe.conform(("visit", "physical_filter")) 

355 

356 dataId = { 

357 "instrument": "dummy", 

358 "visit": 52 + i, 

359 "physical_filter": "V", 

360 "band": "v", 

361 "day_obs": 20250101, 

362 } 

363 

364 ref = self.makeDatasetRef(datasetTypeName, dimensions, sc, dataId) 

365 datastore.put(metrics, ref) 

366 

367 # Does it exist? 

368 self.assertTrue(datastore.exists(ref)) 

369 self.assertTrue(datastore.knows(ref)) 

370 multi = datastore.knows_these([ref]) 

371 self.assertTrue(multi[ref]) 

372 multi = datastore.mexists([ref]) 

373 self.assertTrue(multi[ref]) 

374 

375 # Get 

376 metricsOut = datastore.get(ref) 

377 self.assertEqual(metrics, metricsOut) 

378 

379 # Get the URI(s) 

380 allURIs = datastore.getURIs(ref) 

381 primaryURI, componentURIs = allURIs 

382 if disassembled: 

383 self.assertIsNone(primaryURI) 

384 self.assertEqual(len(componentURIs), 3) 

385 self.assertEqual(list(allURIs.iter_all()), list(componentURIs.values())) 

386 else: 

387 self.assertIn(datasetTypeName, primaryURI.path) 

388 self.assertFalse(componentURIs) 

389 self.assertEqual(list(allURIs.iter_all()), [primaryURI]) 

390 

391 # Delete registry entry so now we are trusting 

392 datastore.removeStoredItemInfo(ref) 

393 

394 # Now stop trusting and check that things break 

395 datastore.trustGetRequest = False 

396 

397 # Does it exist? 

398 self.assertFalse(datastore.exists(ref)) 

399 self.assertFalse(datastore.knows(ref)) 

400 multi = datastore.knows_these([ref]) 

401 self.assertFalse(multi[ref]) 

402 multi = datastore.mexists([ref]) 

403 self.assertFalse(multi[ref]) 

404 

405 with self.assertRaises(FileNotFoundError): 

406 datastore.get(ref) 

407 

408 if sc_name != "StructuredDataNoComponents": 

409 with self.assertRaises(FileNotFoundError): 

410 datastore.get(ref.makeComponentRef("data")) 

411 

412 # URI should fail unless we ask for prediction 

413 with self.assertRaises(FileNotFoundError): 

414 datastore.getURIs(ref) 

415 

416 predicted_primary, predicted_disassembled = datastore.getURIs(ref, predict=True) 

417 if disassembled: 

418 self.assertIsNone(predicted_primary) 

419 self.assertEqual(len(predicted_disassembled), 3) 

420 for uri in predicted_disassembled.values(): 

421 self.assertEqual(uri.fragment, "predicted") 

422 self.assertIn(datasetTypeName, uri.path) 

423 else: 

424 self.assertIn(datasetTypeName, predicted_primary.path) 

425 self.assertFalse(predicted_disassembled) 

426 self.assertEqual(predicted_primary.fragment, "predicted") 

427 

428 # Now enable registry-free trusting mode 

429 datastore.trustGetRequest = True 

430 

431 # Try again to get it 

432 metricsOut = datastore.get(ref) 

433 self.assertEqual(metricsOut, metrics) 

434 

435 # Does it exist? 

436 self.assertTrue(datastore.exists(ref)) 

437 

438 # Get a component 

439 if sc_name != "StructuredDataNoComponents": 

440 comp = "data" 

441 compRef = ref.makeComponentRef(comp) 

442 output = datastore.get(compRef) 

443 self.assertEqual(output, getattr(metrics, comp)) 

444 

445 # Get the URI -- if we trust this should work even without 

446 # enabling prediction. 

447 primaryURI2, componentURIs2 = datastore.getURIs(ref) 

448 self.assertEqual(primaryURI2, primaryURI) 

449 self.assertEqual(componentURIs2, componentURIs) 

450 

451 # Check for compatible storage class. 

452 if sc_name in ("StructuredDataNoComponents", "StructuredData"): 

453 # Make new dataset ref with compatible storage class. 

454 ref_comp = ref.overrideStorageClass("StructuredDataDictJson") 

455 

456 # Without `set_retrieve_dataset_type_method` it will fail to 

457 # find correct file. 

458 self.assertFalse(datastore.exists(ref_comp)) 

459 with self.assertRaises(FileNotFoundError): 

460 datastore.get(ref_comp) 

461 with self.assertRaises(FileNotFoundError): 

462 datastore.get(ref, storageClass="StructuredDataDictJson") 

463 

464 # Need a special method to generate stored dataset type. 

465 def _stored_dataset_type(name: str, ref: DatasetRef = ref) -> DatasetType: 

466 if name == ref.datasetType.name: 

467 return ref.datasetType 

468 raise ValueError(f"Unexpected dataset type name {ref.datasetType.name}") 

469 

470 datastore.set_retrieve_dataset_type_method(_stored_dataset_type) 

471 

472 # Storage class override with original dataset ref. 

473 metrics_as_dict = datastore.get(ref, storageClass="StructuredDataDictJson") 

474 self.assertIsInstance(metrics_as_dict, dict) 

475 

476 # get() should return a dict now. 

477 metrics_as_dict = datastore.get(ref_comp) 

478 self.assertIsInstance(metrics_as_dict, dict) 

479 

480 # exists() should work as well. 

481 self.assertTrue(datastore.exists(ref_comp)) 

482 

483 datastore.set_retrieve_dataset_type_method(None) 

484 

485 def testDisassembly(self) -> None: 

486 """Test disassembly within datastore.""" 

487 metrics = makeExampleMetrics() 

488 if self.isEphemeral: 

489 # in-memory datastore does not disassemble 

490 return 

491 

492 # Create multiple storage classes for testing different formulations 

493 # of composites. One of these will not disassemble to provide 

494 # a reference. 

495 storageClasses = [ 

496 self.storageClassFactory.getStorageClass(sc) 

497 for sc in ( 

498 "StructuredComposite", 

499 "StructuredCompositeTestA", 

500 "StructuredCompositeTestB", 

501 "StructuredCompositeReadComp", 

502 "StructuredData", # No disassembly 

503 "StructuredCompositeReadCompNoDisassembly", 

504 ) 

505 ] 

506 

507 # Create the test datastore 

508 datastore = self.makeDatastore() 

509 

510 # Dummy dataId 

511 dimensions = self.universe.conform(("visit", "physical_filter")) 

512 dataId = {"instrument": "dummy", "visit": 428, "physical_filter": "R"} 

513 

514 for i, sc in enumerate(storageClasses): 

515 with self.subTest(storageClass=sc.name): 

516 # Create a different dataset type each time round 

517 # so that a test failure in this subtest does not trigger 

518 # a cascade of tests because of file clashes 

519 ref = self.makeDatasetRef(f"metric_comp_{i}", dimensions, sc, dataId) 

520 

521 disassembled = sc.name not in {"StructuredData", "StructuredCompositeReadCompNoDisassembly"} 

522 

523 datastore.put(metrics, ref) 

524 

525 baseURI, compURIs = datastore.getURIs(ref) 

526 if disassembled: 

527 self.assertIsNone(baseURI) 

528 self.assertEqual(set(compURIs), {"data", "output", "summary"}) 

529 else: 

530 self.assertIsNotNone(baseURI) 

531 self.assertEqual(compURIs, {}) 

532 

533 metrics_get = datastore.get(ref) 

534 self.assertEqual(metrics_get, metrics) 

535 

536 # Retrieve the composite with read parameter 

537 stop = 4 

538 metrics_get = datastore.get(ref, parameters={"slice": slice(stop)}) 

539 self.assertEqual(metrics_get.summary, metrics.summary) 

540 self.assertEqual(metrics_get.output, metrics.output) 

541 self.assertEqual(metrics_get.data, metrics.data[:stop]) 

542 

543 # Retrieve a component 

544 data = datastore.get(ref.makeComponentRef("data")) 

545 self.assertEqual(data, metrics.data) 

546 

547 # On supported storage classes attempt to access a read 

548 # only component 

549 if "ReadComp" in sc.name: 

550 cRef = ref.makeComponentRef("counter") 

551 counter = datastore.get(cRef) 

552 self.assertEqual(counter, len(metrics.data)) 

553 

554 counter = datastore.get(cRef, parameters={"slice": slice(stop)}) 

555 self.assertEqual(counter, stop) 

556 

557 datastore.remove(ref) 

558 

559 def prepDeleteTest(self, n_refs: int = 1) -> tuple[Datastore, tuple[DatasetRef, ...]]: 

560 metrics = makeExampleMetrics() 

561 datastore = self.makeDatastore() 

562 # Put 

563 dimensions = self.universe.conform(("visit", "physical_filter")) 

564 sc = self.storageClassFactory.getStorageClass("StructuredData") 

565 refs = [] 

566 for i in range(n_refs): 

567 dataId = { 

568 "instrument": "dummy", 

569 "visit": 638 + i, 

570 "physical_filter": "U", 

571 "band": "u", 

572 "day_obs": 20250101, 

573 } 

574 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

575 datastore.put(metrics, ref) 

576 

577 # Does it exist? 

578 self.assertTrue(datastore.exists(ref)) 

579 

580 # Get 

581 metricsOut = datastore.get(ref) 

582 self.assertEqual(metrics, metricsOut) 

583 refs.append(ref) 

584 

585 return datastore, *refs 

586 

587 def testRemove(self) -> None: 

588 datastore, ref = self.prepDeleteTest() 

589 

590 # Remove 

591 datastore.remove(ref) 

592 

593 # Does it exist? 

594 self.assertFalse(datastore.exists(ref)) 

595 

596 # Do we now get a predicted URI? 

597 uri = datastore.getURI(ref, predict=True) 

598 self.assertEqual(uri.fragment, "predicted") 

599 

600 # Get should now fail 

601 with self.assertRaises(FileNotFoundError): 

602 datastore.get(ref) 

603 # Can only delete once 

604 with self.assertRaises(FileNotFoundError): 

605 datastore.remove(ref) 

606 

607 def testForget(self) -> None: 

608 datastore, ref = self.prepDeleteTest() 

609 

610 # Remove 

611 datastore.forget([ref]) 

612 

613 # Does it exist (as far as we know)? 

614 self.assertFalse(datastore.exists(ref)) 

615 

616 # Do we now get a predicted URI? 

617 uri = datastore.getURI(ref, predict=True) 

618 self.assertEqual(uri.fragment, "predicted") 

619 

620 # Get should now fail 

621 with self.assertRaises(FileNotFoundError): 

622 datastore.get(ref) 

623 

624 # Forgetting again is a silent no-op 

625 datastore.forget([ref]) 

626 

627 # Predicted URI should still point to the file. 

628 self.assertTrue(uri.exists()) 

629 

630 def testTransfer(self) -> None: 

631 metrics = makeExampleMetrics() 

632 

633 dimensions = self.universe.conform(("visit", "physical_filter")) 

634 dataId = { 

635 "instrument": "dummy", 

636 "visit": 2048, 

637 "physical_filter": "Uprime", 

638 "band": "u", 

639 "day_obs": 20250101, 

640 } 

641 

642 sc = self.storageClassFactory.getStorageClass("StructuredData") 

643 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

644 

645 inputDatastore = self.makeDatastore("test_input_datastore") 

646 outputDatastore = self.makeDatastore("test_output_datastore") 

647 

648 inputDatastore.put(metrics, ref) 

649 outputDatastore.transfer(inputDatastore, ref) 

650 

651 metricsOut = outputDatastore.get(ref) 

652 self.assertEqual(metrics, metricsOut) 

653 

654 def testBasicTransaction(self) -> None: 

655 datastore = self.makeDatastore() 

656 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

657 dimensions = self.universe.conform(("visit", "physical_filter")) 

658 nDatasets = 6 

659 dataIds = [ 

660 {"instrument": "dummy", "visit": i, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

661 for i in range(nDatasets) 

662 ] 

663 data = [ 

664 ( 

665 self.makeDatasetRef("metric", dimensions, storageClass, dataId), 

666 makeExampleMetrics(), 

667 ) 

668 for dataId in dataIds 

669 ] 

670 succeed = data[: nDatasets // 2] 

671 fail = data[nDatasets // 2 :] 

672 # All datasets added in this transaction should continue to exist 

673 with datastore.transaction(): 

674 for ref, metrics in succeed: 

675 datastore.put(metrics, ref) 

676 # Whereas datasets added in this transaction should not 

677 with self.assertRaises(TransactionTestError): 

678 with datastore.transaction(): 

679 for ref, metrics in fail: 

680 datastore.put(metrics, ref) 

681 raise TransactionTestError("This should propagate out of the context manager") 

682 # Check for datasets that should exist 

683 for ref, metrics in succeed: 

684 # Does it exist? 

685 self.assertTrue(datastore.exists(ref)) 

686 # Get 

687 metricsOut = datastore.get(ref, parameters=None) 

688 self.assertEqual(metrics, metricsOut) 

689 # URI 

690 uri = datastore.getURI(ref) 

691 self.assertEqual(uri.scheme, self.uriScheme) 

692 # Check for datasets that should not exist 

693 for ref, _ in fail: 

694 # These should raise 

695 with self.assertRaises(FileNotFoundError): 

696 # non-existing file 

697 datastore.get(ref) 

698 with self.assertRaises(FileNotFoundError): 

699 datastore.getURI(ref) 

700 

701 def testNestedTransaction(self) -> None: 

702 datastore = self.makeDatastore() 

703 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

704 dimensions = self.universe.conform(("visit", "physical_filter")) 

705 metrics = makeExampleMetrics() 

706 

707 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

708 refBefore = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

709 datastore.put(metrics, refBefore) 

710 with self.assertRaises(TransactionTestError): 

711 with datastore.transaction(): 

712 dataId = { 

713 "instrument": "dummy", 

714 "visit": 1, 

715 "physical_filter": "V", 

716 "band": "v", 

717 "day_obs": 20250101, 

718 } 

719 refOuter = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

720 datastore.put(metrics, refOuter) 

721 with datastore.transaction(): 

722 dataId = { 

723 "instrument": "dummy", 

724 "visit": 2, 

725 "physical_filter": "V", 

726 "band": "v", 

727 "day_obs": 20250101, 

728 } 

729 refInner = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

730 datastore.put(metrics, refInner) 

731 # All datasets should exist 

732 for ref in (refBefore, refOuter, refInner): 

733 metricsOut = datastore.get(ref, parameters=None) 

734 self.assertEqual(metrics, metricsOut) 

735 raise TransactionTestError("This should roll back the transaction") 

736 # Dataset(s) inserted before the transaction should still exist 

737 metricsOut = datastore.get(refBefore, parameters=None) 

738 self.assertEqual(metrics, metricsOut) 

739 # But all datasets inserted during the (rolled back) transaction 

740 # should be gone 

741 with self.assertRaises(FileNotFoundError): 

742 datastore.get(refOuter) 

743 with self.assertRaises(FileNotFoundError): 

744 datastore.get(refInner) 

745 

746 def _prepareIngestTest(self) -> tuple[MetricsExample, DatasetRef]: 

747 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

748 dimensions = self.universe.conform(("visit", "physical_filter")) 

749 metrics = makeExampleMetrics() 

750 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

751 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

752 return metrics, ref 

753 

754 def runIngestTest(self, func: Callable[[MetricsExample, str, DatasetRef], None]) -> None: 

755 metrics, ref = self._prepareIngestTest() 

756 # The file will be deleted after the test. 

757 # For symlink tests this leads to a situation where the datastore 

758 # points to a file that does not exist. This will make os.path.exist 

759 # return False but then the new symlink will fail with 

760 # FileExistsError later in the code so the test still passes. 

761 with _temp_yaml_file(metrics._asdict()) as path: 

762 func(metrics, path, ref) 

763 

764 def testIngestNoTransfer(self) -> None: 

765 """Test ingesting existing files with no transfer.""" 

766 for mode in (None, "auto"): 

767 # Some datastores have auto but can't do in place transfer 

768 if mode == "auto" and "auto" in self.ingestTransferModes and not self.canIngestNoTransferAuto: 

769 continue 

770 

771 with self.subTest(mode=mode): 

772 datastore = self.makeDatastore() 

773 

774 def succeed( 

775 obj: MetricsExample, 

776 path: str, 

777 ref: DatasetRef, 

778 mode: str | None = mode, 

779 datastore: Datastore = datastore, 

780 ) -> None: 

781 """Ingest a file already in the datastore root.""" 

782 # first move it into the root, and adjust the path 

783 # accordingly. 

784 # In the case of a ChainedDatastore, we have multiple 

785 # roots, all of which will accept the file, so we 

786 # have to copy it into all the roots. 

787 relative_path = None 

788 for root in datastore.roots.values(): 

789 if root is not None: 

790 copied_path = shutil.copy(path, root.ospath) 

791 relative_path = os.path.relpath(copied_path, start=root.ospath) 

792 assert relative_path is not None, ( 

793 "Running a FileDatastore test on a Datastore instance without any roots" 

794 ) 

795 datastore.ingest(FileDataset(path=relative_path, refs=ref), transfer=mode) 

796 self.assertEqual(obj, datastore.get(ref)) 

797 

798 def failInputDoesNotExist( 

799 obj: MetricsExample, 

800 path: str, 

801 ref: DatasetRef, 

802 mode: str | None = mode, 

803 datastore: Datastore = datastore, 

804 ) -> None: 

805 """Can't ingest files if we're given a bad path.""" 

806 with self.assertRaises(FileNotFoundError): 

807 datastore.ingest( 

808 FileDataset(path="this-file-does-not-exist.yaml", refs=ref), transfer=mode 

809 ) 

810 self.assertFalse(datastore.exists(ref)) 

811 

812 def failOutsideRoot( 

813 obj: MetricsExample, 

814 path: str, 

815 ref: DatasetRef, 

816 mode: str | None = mode, 

817 datastore: Datastore = datastore, 

818 ) -> None: 

819 """Can't ingest files outside of datastore root unless 

820 auto. 

821 """ 

822 if mode == "auto": 

823 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

824 self.assertTrue(datastore.exists(ref)) 

825 else: 

826 with self.assertRaises(RuntimeError): 

827 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

828 self.assertFalse(datastore.exists(ref)) 

829 

830 def failNotImplemented( 

831 obj: MetricsExample, 

832 path: str, 

833 ref: DatasetRef, 

834 mode: str | None = mode, 

835 datastore: Datastore = datastore, 

836 ) -> None: 

837 with self.assertRaises(NotImplementedError): 

838 datastore.ingest(FileDataset(path=path, refs=ref), transfer=mode) 

839 

840 if mode in self.ingestTransferModes: 

841 self.runIngestTest(failOutsideRoot) 

842 self.runIngestTest(failInputDoesNotExist) 

843 self.runIngestTest(succeed) 

844 else: 

845 self.runIngestTest(failNotImplemented) 

846 

847 def testIngestTransfer(self) -> None: 

848 """Test ingesting existing files after transferring them.""" 

849 for mode in ("copy", "move", "link", "hardlink", "symlink", "relsymlink", "auto"): 

850 with self.subTest(mode=mode): 

851 datastore = self.makeDatastore(mode) 

852 

853 def succeed( 

854 obj: MetricsExample, 

855 path: str, 

856 ref: DatasetRef, 

857 mode: str | None = mode, 

858 datastore: Datastore = datastore, 

859 ) -> None: 

860 """Ingest a file by transferring it to the template 

861 location. 

862 """ 

863 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

864 self.assertEqual(obj, datastore.get(ref)) 

865 file_exists = os.path.exists(path) 

866 if mode == "move": 

867 self.assertFalse(file_exists) 

868 else: 

869 self.assertTrue(file_exists) 

870 

871 def failInputDoesNotExist( 

872 obj: MetricsExample, 

873 path: str, 

874 ref: DatasetRef, 

875 mode: str | None = mode, 

876 datastore: Datastore = datastore, 

877 ) -> None: 

878 """Can't ingest files if we're given a bad path.""" 

879 with self.assertRaises(FileNotFoundError): 

880 # Ensure the file does not look like it is in 

881 # datastore for auto mode 

882 datastore.ingest( 

883 FileDataset(path="../this-file-does-not-exist.yaml", refs=ref), transfer=mode 

884 ) 

885 self.assertFalse(datastore.exists(ref), f"Checking not in datastore using mode {mode}") 

886 

887 def failNotImplemented( 

888 obj: MetricsExample, 

889 path: str, 

890 ref: DatasetRef, 

891 mode: str | None = mode, 

892 datastore: Datastore = datastore, 

893 ) -> None: 

894 with self.assertRaises(NotImplementedError): 

895 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

896 

897 if mode in self.ingestTransferModes: 

898 self.runIngestTest(failInputDoesNotExist) 

899 self.runIngestTest(succeed) 

900 else: 

901 self.runIngestTest(failNotImplemented) 

902 

903 def testIngestSymlinkOfSymlink(self) -> None: 

904 """Special test for symlink to a symlink ingest""" 

905 metrics, ref = self._prepareIngestTest() 

906 # The aim of this test is to create a dataset on disk, then 

907 # create a symlink to it and finally ingest the symlink such that 

908 # the symlink in the datastore points to the original dataset. 

909 for mode in ("symlink", "relsymlink"): 

910 if mode not in self.ingestTransferModes: 

911 continue 

912 

913 print(f"Trying mode {mode}") 

914 with _temp_yaml_file(metrics._asdict()) as realpath: 

915 with tempfile.TemporaryDirectory() as tmpdir: 

916 sympath = os.path.join(tmpdir, "symlink.yaml") 

917 os.symlink(os.path.realpath(realpath), sympath) 

918 

919 datastore = self.makeDatastore() 

920 datastore.ingest(FileDataset(path=os.path.abspath(sympath), refs=ref), transfer=mode) 

921 

922 uri = datastore.getURI(ref) 

923 self.assertTrue(uri.isLocal, f"Check {uri.scheme}") 

924 self.assertTrue(os.path.islink(uri.ospath), f"Check {uri} is a symlink") 

925 

926 linkTarget = os.readlink(uri.ospath) 

927 if mode == "relsymlink": 

928 self.assertFalse(os.path.isabs(linkTarget)) 

929 else: 

930 self.assertTrue(os.path.samefile(linkTarget, realpath)) 

931 

932 # Check that we can get the dataset back regardless of mode 

933 metric2 = datastore.get(ref) 

934 self.assertEqual(metric2, metrics) 

935 

936 # Cleanup the file for next time round loop 

937 # since it will get the same file name in store 

938 datastore.remove(ref) 

939 

940 def _populate_export_datastore(self, name: str) -> tuple[Datastore, list[DatasetRef]]: 

941 datastore = self.makeDatastore(name) 

942 

943 # For now only the FileDatastore can be used for this test. 

944 # ChainedDatastore that only includes InMemoryDatastores have to be 

945 # skipped as well. 

946 for name in datastore.names: 

947 if not name.startswith("InMemoryDatastore"): 

948 break 

949 else: 

950 raise unittest.SkipTest("in-memory datastore does not support record export/import") 

951 

952 metrics = makeExampleMetrics() 

953 dimensions = self.universe.conform(("visit", "physical_filter")) 

954 sc = self.storageClassFactory.getStorageClass("StructuredData") 

955 

956 refs = [] 

957 for visit in (2048, 2049, 2050): 

958 dataId = { 

959 "instrument": "dummy", 

960 "visit": visit, 

961 "physical_filter": "Uprime", 

962 "band": "u", 

963 "day_obs": 20250101, 

964 } 

965 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

966 datastore.put(metrics, ref) 

967 refs.append(ref) 

968 return datastore, refs 

969 

970 def testExportImportRecords(self) -> None: 

971 """Test for export_records and import_records methods.""" 

972 datastore, refs = self._populate_export_datastore("test_datastore") 

973 for exported_refs in (refs, refs[1:]): 

974 n_refs = len(exported_refs) 

975 records = datastore.export_records(exported_refs) 

976 self.assertGreater(len(records), 0) 

977 self.assertTrue(set(records.keys()) <= set(datastore.names)) 

978 # In a ChainedDatastore each FileDatastore will have a complete set 

979 for datastore_name in records: 

980 record_data = records[datastore_name] 

981 self.assertEqual(len(record_data.records), n_refs) 

982 

983 # Check that subsetting works, include non-existing dataset ID. 

984 dataset_ids = {exported_refs[0].id, uuid.uuid4()} 

985 subset = record_data.subset(dataset_ids) 

986 assert subset is not None 

987 self.assertEqual(len(subset.records), 1) 

988 subset = record_data.subset({uuid.uuid4()}) 

989 self.assertIsNone(subset) 

990 

991 # Use the same datastore name to import relative path. 

992 datastore2 = self.makeDatastore("test_datastore") 

993 

994 records = datastore.export_records(refs[1:]) 

995 datastore2.import_records(records) 

996 

997 with self.assertRaises(FileNotFoundError): 

998 data = datastore2.get(refs[0]) 

999 data = datastore2.get(refs[1]) 

1000 self.assertIsNotNone(data) 

1001 data = datastore2.get(refs[2]) 

1002 self.assertIsNotNone(data) 

1003 

1004 def testExportImportTable(self) -> None: 

1005 datastore, refs = self._populate_export_datastore("test_datastore") 

1006 table = datastore.export_table([ref.id for ref in refs]) 

1007 datastore2 = self.makeDatastore("test_datastore") 

1008 datastore2.import_table(table) 

1009 

1010 for ref in refs: 

1011 self.assertIsNotNone(datastore2.get(ref)) 

1012 self.assertEqual(datastore.getURI(ref), datastore2.getURI(ref)) 

1013 original_info = datastore.get_file_info_for_transfer([ref.id])[ref.id][0] 

1014 imported_info_list = datastore.get_file_info_for_transfer([ref.id]).get(ref.id) 

1015 self.assertIsNotNone(imported_info_list) 

1016 self.assertEqual(len(imported_info_list), 1) 

1017 imported_info = imported_info_list[0] 

1018 self.assertEqual(imported_info.file_info.formatter, original_info.file_info.formatter) 

1019 self.assertEqual( 

1020 imported_info.file_info.storage_class_name, original_info.file_info.storage_class_name 

1021 ) 

1022 self.assertEqual(imported_info.file_info.file_size, original_info.file_info.file_size) 

1023 self.assertEqual(imported_info.file_info.checksum, original_info.file_info.checksum) 

1024 self.assertEqual(imported_info.file_info.component, original_info.file_info.component) 

1025 

1026 def testExportPredictedRecords(self): 

1027 if self.isEphemeral: 

1028 raise unittest.SkipTest("in-memory datastore does not support record export/import") 

1029 sc = self.storageClassFactory.getStorageClass("ThingOne") 

1030 dimensions = self.universe.conform(("visit", "physical_filter")) 

1031 dataId = { 

1032 "instrument": "dummy", 

1033 "visit": 52, 

1034 "physical_filter": "V", 

1035 "band": "v", 

1036 "day_obs": 20250101, 

1037 } 

1038 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

1039 

1040 datastore = self.makeDatastore("test_datastore") 

1041 names = {n for n in datastore.names if not n.startswith("InMemory")} 

1042 records = datastore.export_predicted_records([ref]) 

1043 

1044 # Expect predicted records from all datastores. 

1045 self.assertEqual(set(records.keys()), names) 

1046 

1047 for record_data in records.values(): 

1048 self.assertEqual(len(record_data.records), 1) 

1049 

1050 def testExport(self) -> None: 

1051 datastore, refs = self._populate_export_datastore("test_datastore") 

1052 

1053 datasets = list(datastore.export(refs)) 

1054 self.assertEqual(len(datasets), 3) 

1055 

1056 for transfer in (None, "auto"): 

1057 # Both will default to None 

1058 datasets = list(datastore.export(refs, transfer=transfer)) 

1059 self.assertEqual(len(datasets), 3) 

1060 

1061 with self.assertRaises(TypeError): 

1062 list(datastore.export(refs, transfer="copy")) 

1063 

1064 with self.assertRaises(TypeError): 

1065 list(datastore.export(refs, directory="exportDir", transfer="move")) 

1066 

1067 # Create a new ref that is not known to the datastore and try to 

1068 # export it. 

1069 sc = self.storageClassFactory.getStorageClass("ThingOne") 

1070 dimensions = self.universe.conform(("visit", "physical_filter")) 

1071 dataId = { 

1072 "instrument": "dummy", 

1073 "visit": 52, 

1074 "physical_filter": "V", 

1075 "band": "v", 

1076 "day_obs": 20250101, 

1077 } 

1078 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

1079 with self.assertRaises(FileNotFoundError): 

1080 list(datastore.export(refs + [ref], transfer=None)) 

1081 

1082 def test_pydantic_dict_storage_class_conversions(self) -> None: 

1083 """Test converting a dataset stored as a pydantic model into a dict on 

1084 read. 

1085 """ 

1086 datastore = self.makeDatastore() 

1087 store_as_model = self.makeDatasetRef( 

1088 "store_as_model", 

1089 dimensions=self.universe.empty, 

1090 storageClass="DictConvertibleModel", 

1091 dataId=DataCoordinate.make_empty(self.universe), 

1092 ) 

1093 content = {"a": "one", "b": "two"} 

1094 model = DictConvertibleModel.from_dict(content, extra="original content") 

1095 datastore.put(model, store_as_model) 

1096 retrieved_model = datastore.get(store_as_model) 

1097 self.assertEqual(retrieved_model, model) 

1098 loaded = datastore.get(store_as_model.overrideStorageClass("NativeDictForConvertibleModel")) 

1099 self.assertEqual(type(loaded), dict) 

1100 self.assertEqual(loaded, content) 

1101 

1102 def test_simple_class_put_get(self) -> None: 

1103 """Test that we can put and get a simple class with dict() 

1104 constructor. 

1105 """ 

1106 datastore = self.makeDatastore() 

1107 data = MetricsExample(summary={"a": 1}, data=[1, 2, 3], output={"b": 2}) 

1108 self._assert_different_puts(datastore, "MetricsExample", data) 

1109 

1110 def test_dataclass_put_get(self) -> None: 

1111 """Test that we can put and get a simple dataclass.""" 

1112 datastore = self.makeDatastore() 

1113 data = MetricsExampleDataclass(summary={"a": 1}, data=[1, 2, 3], output={"b": 2}) 

1114 self._assert_different_puts(datastore, "MetricsExampleDataclass", data) 

1115 

1116 def test_pydantic_put_get(self) -> None: 

1117 """Test that we can put and get a simple Pydantic model.""" 

1118 datastore = self.makeDatastore() 

1119 data = MetricsExampleModel(summary={"a": 1}, data=[1, 2, 3], output={"b": 2}) 

1120 self._assert_different_puts(datastore, "MetricsExampleModel", data) 

1121 

1122 def test_tuple_put_get(self) -> None: 

1123 """Test that we can put and get a tuple.""" 

1124 datastore = self.makeDatastore() 

1125 data = ("a", "b", 1) 

1126 self._assert_different_puts(datastore, "TupleExample", data) 

1127 

1128 def _assert_different_puts(self, datastore: Datastore, storageClass_root: str, data: Any) -> None: 

1129 refs = { 

1130 x: self.makeDatasetRef( 

1131 f"stora_as_{x}", 

1132 dimensions=self.universe.empty, 

1133 storageClass=f"{storageClass_root}{x}", 

1134 dataId=DataCoordinate.make_empty(self.universe), 

1135 ) 

1136 for x in ["A", "B"] 

1137 } 

1138 

1139 for ref in refs.values(): 

1140 datastore.put(data, ref) 

1141 

1142 self.assertEqual(datastore.get(refs["A"]), datastore.get(refs["B"])) 

1143 

1144 

1145class PosixDatastoreTestCase(DatastoreTests, unittest.TestCase): 

1146 """PosixDatastore specialization""" 

1147 

1148 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1149 uriScheme = "file" 

1150 canIngestNoTransferAuto = True 

1151 ingestTransferModes = (None, "copy", "move", "link", "hardlink", "symlink", "relsymlink", "auto") 

1152 isEphemeral = False 

1153 rootKeys = ("root",) 

1154 validationCanFail = True 

1155 

1156 def setUp(self) -> None: 

1157 # The call to os.path.realpath is necessary because Mac temporary files 

1158 # can end up in either /private/var/folders or /var/folders, which 

1159 # refer to the same location but don't appear to. 

1160 # This matters for "relsymlink" transfer mode, because it needs to be 

1161 # able to read the file through a relative symlink, but some of the 

1162 # intermediate directories are not traversable if you try to get from a 

1163 # tempfile in /var/folders to one in /private/var/folders via a 

1164 # relative path. 

1165 self.root = os.path.realpath(self.enterContext(tempfile.TemporaryDirectory())) 

1166 super().setUp() 

1167 

1168 def testAtomicWrite(self) -> None: 

1169 """Test that we write to a temporary and then rename""" 

1170 datastore = self.makeDatastore() 

1171 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1172 dimensions = self.universe.conform(("visit", "physical_filter")) 

1173 metrics = makeExampleMetrics() 

1174 

1175 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

1176 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1177 

1178 with self.assertLogs("lsst.resources", "DEBUG") as cm: 

1179 datastore.put(metrics, ref) 

1180 move_logs = [ll for ll in cm.output if "transfer=" in ll] 

1181 self.assertIn("transfer=move", move_logs[0]) 

1182 

1183 # And the transfer should be file to file. 

1184 self.assertEqual(move_logs[0].count("file://"), 2) 

1185 

1186 def testCanNotDeterminePutFormatterLocation(self) -> None: 

1187 """Verify that the expected exception is raised if the FileDatastore 

1188 can not determine the put formatter location. 

1189 """ 

1190 _ = makeExampleMetrics() 

1191 datastore = self.makeDatastore() 

1192 

1193 # Create multiple storage classes for testing different formulations 

1194 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1195 

1196 sccomp = StorageClass("Dummy") 

1197 compositeStorageClass = StorageClass( 

1198 "StructuredComposite", components={"dummy": sccomp, "dummy2": sccomp} 

1199 ) 

1200 

1201 dimensions = self.universe.conform(("visit", "physical_filter")) 

1202 dataId = { 

1203 "instrument": "dummy", 

1204 "visit": 52, 

1205 "physical_filter": "V", 

1206 "band": "v", 

1207 "day_obs": 20250101, 

1208 } 

1209 

1210 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1211 compRef = self.makeDatasetRef("metric", dimensions, compositeStorageClass, dataId) 

1212 

1213 def raiser(ref: DatasetRef) -> None: 

1214 raise DatasetTypeNotSupportedError() 

1215 

1216 with unittest.mock.patch.object( 

1217 lsst.daf.butler.datastores.fileDatastore.FileDatastore, 

1218 "_determine_put_formatter_location", 

1219 side_effect=raiser, 

1220 ): 

1221 # verify the non-composite ref execution path: 

1222 with self.assertRaises(DatasetTypeNotSupportedError): 

1223 datastore.getURIs(ref, predict=True) 

1224 

1225 # verify the composite-ref execution path: 

1226 with self.assertRaises(DatasetTypeNotSupportedError): 

1227 datastore.getURIs(compRef, predict=True) 

1228 

1229 def test_roots(self): 

1230 datastore = self.makeDatastore() 

1231 

1232 self.assertEqual(set(datastore.names), set(datastore.roots.keys())) 

1233 for root in datastore.roots.values(): 

1234 if root is not None: 

1235 self.assertTrue(root.exists()) 

1236 

1237 def test_prepare_get_for_external_client(self): 

1238 datastore = self.makeDatastore() 

1239 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1240 dimensions = self.universe.conform(("visit", "physical_filter")) 

1241 dataId = {"instrument": "dummy", "visit": 52, "physical_filter": "V", "band": "v"} 

1242 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1243 # Most of the coverage for this function is in test_server.py, 

1244 # because it requires a file backend that supports URL signing. 

1245 self.assertIsNone(datastore.prepare_get_for_external_client(ref)) 

1246 

1247 

1248class PosixDatastoreNoChecksumsTestCase(PosixDatastoreTestCase): 

1249 """Posix datastore tests but with checksums disabled.""" 

1250 

1251 configFile = os.path.join(TESTDIR, "config/basic/posixDatastoreNoChecksums.yaml") 

1252 

1253 def testChecksum(self) -> None: 

1254 """Ensure that checksums have not been calculated.""" 

1255 datastore = self.makeDatastore() 

1256 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1257 dimensions = self.universe.conform(("visit", "physical_filter")) 

1258 metrics = makeExampleMetrics() 

1259 

1260 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

1261 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1262 

1263 # Configuration should have disabled checksum calculation 

1264 datastore.put(metrics, ref) 

1265 infos = datastore.getStoredItemsInfo(ref) 

1266 self.assertIsNone(infos[0].checksum) 

1267 

1268 # Remove put back but with checksums enabled explicitly 

1269 datastore.remove(ref) 

1270 datastore.useChecksum = True 

1271 datastore.put(metrics, ref) 

1272 

1273 infos = datastore.getStoredItemsInfo(ref) 

1274 self.assertIsNotNone(infos[0].checksum) 

1275 

1276 def test_repeat_ingest(self): 

1277 """Test that repeatedly ingesting the same file in direct mode 

1278 is allowed. 

1279 

1280 Test can only run with FileDatastore since that is the only one 

1281 supporting "direct" ingest. 

1282 """ 

1283 metrics, v4ref = self._prepareIngestTest() 

1284 datastore = self.makeDatastore() 

1285 v5ref = DatasetRef( 

1286 v4ref.datasetType, v4ref.dataId, v4ref.run, id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN 

1287 ) 

1288 

1289 with _temp_yaml_file(metrics._asdict()) as path: 

1290 datastore.ingest(FileDataset(path=path, refs=v4ref), transfer="direct") 

1291 

1292 # This will fail because the ref is using UUIDv4. 

1293 with self.assertRaises(RuntimeError): 

1294 datastore.ingest(FileDataset(path=path, refs=v4ref), transfer="direct") 

1295 

1296 # UUIDv5 can be repeatedly ingested in direct mode. 

1297 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="direct") 

1298 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="direct") 

1299 

1300 with self.assertRaises(RuntimeError): 

1301 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="copy") 

1302 

1303 

1304class TrashDatastoreTestCase(PosixDatastoreTestCase): 

1305 """Restrict trash test to FileDatastore.""" 

1306 

1307 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1308 

1309 def testTrash(self) -> None: 

1310 datastore, *refs = self.prepDeleteTest(n_refs=10) 

1311 

1312 # Trash one of them. 

1313 ref = refs.pop() 

1314 uri = datastore.getURI(ref) 

1315 datastore.trash(ref) 

1316 self.assertTrue(uri.exists(), uri) # Not deleted yet 

1317 datastore.emptyTrash() 

1318 self.assertFalse(uri.exists(), uri) 

1319 

1320 # Trash it again should be fine. 

1321 datastore.trash(ref) 

1322 

1323 # Trash multiple items at once. 

1324 subset = [refs.pop(), refs.pop()] 

1325 datastore.trash(subset) 

1326 datastore.emptyTrash() 

1327 

1328 # Remove a record and trash should do nothing. 

1329 # This is execution butler scenario. 

1330 ref = refs.pop() 

1331 uri = datastore.getURI(ref) 

1332 datastore._table.delete(["dataset_id"], {"dataset_id": ref.id}) 

1333 self.assertTrue(uri.exists()) 

1334 datastore.trash(ref) 

1335 datastore.emptyTrash() 

1336 self.assertTrue(uri.exists()) 

1337 

1338 # Switch on trust and it should delete the file. 

1339 datastore.trustGetRequest = True 

1340 datastore.trash([ref]) 

1341 self.assertFalse(uri.exists()) 

1342 

1343 # Remove multiples at once in trust mode. 

1344 subset = [refs.pop() for i in range(3)] 

1345 datastore.trash(subset) 

1346 datastore.trash(refs.pop()) # Check that a single ref can trash 

1347 

1348 def test_empty_trash(self) -> None: 

1349 """Test parameters and return value for empty trash.""" 

1350 datastore, *refs = self.prepDeleteTest(n_refs=10) 

1351 

1352 # Trash one of them. 

1353 ref = refs.pop() 

1354 uri = datastore.getURI(ref) 

1355 datastore.trash(ref) 

1356 self.assertTrue(uri.exists(), uri) # Not deleted yet 

1357 

1358 # Empty trash but with a list of refs that does not include the 

1359 # one in the trash table. 

1360 removed = datastore.emptyTrash(refs=refs) 

1361 self.assertEqual(len(removed), 0) 

1362 self.assertTrue(uri.exists(), uri) 

1363 

1364 # Empty the entire trash but in dry_run mode. 

1365 removed = datastore.emptyTrash(dry_run=True) 

1366 self.assertEqual(len(removed), 1) 

1367 self.assertEqual(removed.pop(), uri) 

1368 self.assertTrue(uri.exists(), uri) 

1369 

1370 # Empty the trash specifying the actual ref. 

1371 removed = datastore.emptyTrash(refs=[ref]) 

1372 self.assertEqual(len(removed), 1) 

1373 self.assertEqual(removed.pop(), uri) 

1374 self.assertFalse(uri.exists(), uri) 

1375 

1376 # Trash everything and empty. 

1377 datastore.trash(refs) 

1378 removed = datastore.emptyTrash(dry_run=True) 

1379 for u in removed: 

1380 self.assertTrue(u.exists()) 

1381 removed = datastore.emptyTrash() 

1382 for u in removed: 

1383 self.assertFalse(u.exists()) 

1384 

1385 

1386class CleanupPosixDatastoreTestCase(DatastoreTestsBase, unittest.TestCase): 

1387 """Test datastore cleans up on failure.""" 

1388 

1389 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1390 

1391 def setUp(self) -> None: 

1392 # Override the working directory before calling the base class 

1393 self.root = tempfile.mkdtemp() 

1394 super().setUp() 

1395 

1396 def testCleanup(self) -> None: 

1397 """Test that a failed formatter write does cleanup a partial file.""" 

1398 metrics = makeExampleMetrics() 

1399 datastore = self.makeDatastore() 

1400 

1401 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1402 

1403 dimensions = self.universe.conform(("visit", "physical_filter")) 

1404 dataId = { 

1405 "instrument": "dummy", 

1406 "visit": 52, 

1407 "physical_filter": "V", 

1408 "band": "v", 

1409 "day_obs": 20250101, 

1410 } 

1411 

1412 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1413 

1414 # Determine where the file will end up (we assume Formatters use 

1415 # the same file extension) 

1416 expectedUri = datastore.getURI(ref, predict=True) 

1417 self.assertEqual(expectedUri.fragment, "predicted") 

1418 

1419 self.assertEqual(expectedUri.getExtension(), ".yaml", f"Is there a file extension in {expectedUri}") 

1420 

1421 # Try formatter that fails and formatter that fails and leaves 

1422 # a file behind 

1423 for formatter in (BadWriteFormatter, BadNoWriteFormatter): 

1424 with self.subTest(formatter=str(formatter)): 

1425 # Monkey patch the formatter 

1426 datastore.formatterFactory.registerFormatter(ref.datasetType, formatter, overwrite=True) 

1427 

1428 # Try to put the dataset, it should fail 

1429 with self.assertRaises(RuntimeError): 

1430 datastore.put(metrics, ref) 

1431 

1432 # Check that there is no file on disk 

1433 self.assertFalse(expectedUri.exists(), f"Check for existence of {expectedUri}") 

1434 

1435 # Check that there is a directory 

1436 dir = expectedUri.dirname() 

1437 self.assertTrue(dir.exists(), f"Check for existence of directory {dir}") 

1438 

1439 # Force YamlFormatter and check that this time a file is written 

1440 datastore.formatterFactory.registerFormatter(ref.datasetType, YamlFormatter, overwrite=True) 

1441 datastore.put(metrics, ref) 

1442 self.assertTrue(expectedUri.exists(), f"Check for existence of {expectedUri}") 

1443 datastore.remove(ref) 

1444 self.assertFalse(expectedUri.exists(), f"Check for existence of now removed {expectedUri}") 

1445 

1446 

1447class InMemoryDatastoreTestCase(DatastoreTests, unittest.TestCase): 

1448 """PosixDatastore specialization""" 

1449 

1450 configFile = os.path.join(TESTDIR, "config/basic/inMemoryDatastore.yaml") 

1451 uriScheme = "mem" 

1452 hasUnsupportedPut = False 

1453 ingestTransferModes = () 

1454 isEphemeral = True 

1455 rootKeys = None 

1456 validationCanFail = False 

1457 

1458 

1459class ChainedDatastoreTestCase(PosixDatastoreTestCase): 

1460 """ChainedDatastore specialization using a POSIXDatastore""" 

1461 

1462 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore.yaml") 

1463 hasUnsupportedPut = False 

1464 canIngestNoTransferAuto = False 

1465 ingestTransferModes = (None, "copy", "move", "hardlink", "symlink", "relsymlink", "link", "auto") 

1466 isEphemeral = False 

1467 rootKeys = (".datastores.1.root", ".datastores.2.root") 

1468 validationCanFail = True 

1469 

1470 

1471class ChainedDatastoreMemoryTestCase(InMemoryDatastoreTestCase): 

1472 """ChainedDatastore specialization using all InMemoryDatastore""" 

1473 

1474 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore2.yaml") 

1475 validationCanFail = False 

1476 isEphemeral = True 

1477 

1478 

1479class DatastoreConstraintsTests(DatastoreTestsBase): 

1480 """Basic tests of constraints model of Datastores.""" 

1481 

1482 def testConstraints(self) -> None: 

1483 """Test constraints model. Assumes that each test class has the 

1484 same constraints. 

1485 """ 

1486 metrics = makeExampleMetrics() 

1487 datastore = self.makeDatastore() 

1488 

1489 sc1 = self.storageClassFactory.getStorageClass("StructuredData") 

1490 sc2 = self.storageClassFactory.getStorageClass("StructuredDataJson") 

1491 dimensions = self.universe.conform(("visit", "physical_filter", "instrument")) 

1492 dataId = { 

1493 "visit": 52, 

1494 "physical_filter": "V", 

1495 "band": "v", 

1496 "instrument": "DummyCamComp", 

1497 "day_obs": 20250101, 

1498 } 

1499 

1500 # Write empty file suitable for ingest check (JSON and YAML variants) 

1501 testfile_y = tempfile.NamedTemporaryFile(suffix=".yaml") 

1502 testfile_j = tempfile.NamedTemporaryFile(suffix=".json") 

1503 for datasetTypeName, sc, accepted in ( 

1504 ("metric", sc1, True), 

1505 ("metric5", sc1, False), 

1506 ("metric33", sc1, True), 

1507 ("metric5", sc2, True), 

1508 ): 

1509 # Choose different temp file depending on StorageClass 

1510 testfile = testfile_j if sc.name.endswith("Json") else testfile_y 

1511 

1512 with self.subTest(datasetTypeName=datasetTypeName, storageClass=sc.name, file=testfile.name): 

1513 ref = self.makeDatasetRef(datasetTypeName, dimensions, sc, dataId) 

1514 if accepted: 

1515 datastore.put(metrics, ref) 

1516 self.assertTrue(datastore.exists(ref)) 

1517 datastore.remove(ref) 

1518 

1519 # Try ingest 

1520 if self.canIngest: 

1521 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1522 self.assertTrue(datastore.exists(ref)) 

1523 datastore.remove(ref) 

1524 else: 

1525 with self.assertRaises(DatasetTypeNotSupportedError): 

1526 datastore.put(metrics, ref) 

1527 self.assertFalse(datastore.exists(ref)) 

1528 

1529 # Again with ingest 

1530 if self.canIngest: 

1531 with self.assertRaises(DatasetTypeNotSupportedError): 

1532 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1533 self.assertFalse(datastore.exists(ref)) 

1534 

1535 

1536class PosixDatastoreConstraintsTestCase(DatastoreConstraintsTests, unittest.TestCase): 

1537 """PosixDatastore specialization""" 

1538 

1539 configFile = os.path.join(TESTDIR, "config/basic/posixDatastoreP.yaml") 

1540 canIngest = True 

1541 

1542 def setUp(self) -> None: 

1543 # Override the working directory before calling the base class 

1544 self.root = tempfile.mkdtemp() 

1545 super().setUp() 

1546 

1547 

1548class InMemoryDatastoreConstraintsTestCase(DatastoreConstraintsTests, unittest.TestCase): 

1549 """InMemoryDatastore specialization.""" 

1550 

1551 configFile = os.path.join(TESTDIR, "config/basic/inMemoryDatastoreP.yaml") 

1552 canIngest = False 

1553 

1554 

1555class ChainedDatastoreConstraintsNativeTestCase(PosixDatastoreConstraintsTestCase): 

1556 """ChainedDatastore specialization using a POSIXDatastore and constraints 

1557 at the ChainedDatstore. 

1558 """ 

1559 

1560 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastorePa.yaml") 

1561 

1562 

1563class ChainedDatastoreConstraintsTestCase(PosixDatastoreConstraintsTestCase): 

1564 """ChainedDatastore specialization using a POSIXDatastore.""" 

1565 

1566 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastoreP.yaml") 

1567 

1568 

1569class ChainedDatastoreMemoryConstraintsTestCase(InMemoryDatastoreConstraintsTestCase): 

1570 """ChainedDatastore specialization using all InMemoryDatastore.""" 

1571 

1572 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore2P.yaml") 

1573 canIngest = False 

1574 

1575 

1576class ChainedDatastorePerStoreConstraintsTests(DatastoreTestsBase, unittest.TestCase): 

1577 """Test that a chained datastore can control constraints per-datastore 

1578 even if child datastore would accept. 

1579 """ 

1580 

1581 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastorePb.yaml") 

1582 

1583 def setUp(self) -> None: 

1584 # Override the working directory before calling the base class 

1585 self.root = tempfile.mkdtemp() 

1586 super().setUp() 

1587 

1588 def testConstraints(self) -> None: 

1589 """Test chained datastore constraints model.""" 

1590 metrics = makeExampleMetrics() 

1591 datastore = self.makeDatastore() 

1592 

1593 sc1 = self.storageClassFactory.getStorageClass("StructuredData") 

1594 sc2 = self.storageClassFactory.getStorageClass("StructuredDataJson") 

1595 dimensions = self.universe.conform(("visit", "physical_filter", "instrument")) 

1596 dataId1 = { 

1597 "visit": 52, 

1598 "physical_filter": "V", 

1599 "band": "v", 

1600 "instrument": "DummyCamComp", 

1601 "day_obs": 20250101, 

1602 } 

1603 dataId2 = {"visit": 52, "physical_filter": "V", "band": "v", "instrument": "HSC", "day_obs": 20250101} 

1604 

1605 # Write empty file suitable for ingest check (JSON and YAML variants) 

1606 testfile_y = tempfile.NamedTemporaryFile(suffix=".yaml") 

1607 testfile_j = tempfile.NamedTemporaryFile(suffix=".json") 

1608 

1609 for typeName, dataId, sc, accept, ingest in ( 

1610 ("metric", dataId1, sc1, (False, True, False), True), 

1611 ("metric5", dataId1, sc1, (False, False, False), False), 

1612 ("metric5", dataId2, sc1, (True, False, False), False), 

1613 ("metric33", dataId2, sc2, (True, True, False), True), 

1614 ("metric5", dataId1, sc2, (False, True, False), True), 

1615 ): 

1616 # Choose different temp file depending on StorageClass 

1617 testfile = testfile_j if sc.name.endswith("Json") else testfile_y 

1618 

1619 with self.subTest(datasetTypeName=typeName, dataId=dataId, sc=sc.name): 

1620 ref = self.makeDatasetRef(typeName, dimensions, sc, dataId) 

1621 if any(accept): 

1622 datastore.put(metrics, ref) 

1623 self.assertTrue(datastore.exists(ref)) 

1624 

1625 # Check each datastore inside the chained datastore 

1626 for childDatastore, expected in zip(datastore.datastores, accept, strict=True): 

1627 self.assertEqual( 

1628 childDatastore.exists(ref), 

1629 expected, 

1630 f"Testing presence of {ref} in datastore {childDatastore.name}", 

1631 ) 

1632 

1633 datastore.remove(ref) 

1634 

1635 # Check that ingest works 

1636 if ingest: 

1637 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1638 self.assertTrue(datastore.exists(ref)) 

1639 

1640 # Check each datastore inside the chained datastore 

1641 for childDatastore, expected in zip(datastore.datastores, accept, strict=True): 

1642 # Ephemeral datastores means InMemory at the moment 

1643 # and that does not accept ingest of files. 

1644 if childDatastore.isEphemeral: 

1645 expected = False 

1646 self.assertEqual( 

1647 childDatastore.exists(ref), 

1648 expected, 

1649 f"Testing presence of ingested {ref} in datastore {childDatastore.name}", 

1650 ) 

1651 

1652 datastore.remove(ref) 

1653 else: 

1654 with self.assertRaises(DatasetTypeNotSupportedError): 

1655 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1656 

1657 else: 

1658 with self.assertRaises(DatasetTypeNotSupportedError): 

1659 datastore.put(metrics, ref) 

1660 self.assertFalse(datastore.exists(ref)) 

1661 

1662 # Again with ingest 

1663 with self.assertRaises(DatasetTypeNotSupportedError): 

1664 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1665 self.assertFalse(datastore.exists(ref)) 

1666 

1667 

1668@unittest.mock.patch.dict(os.environ, {}, clear=True) 

1669class DatastoreCacheTestCase(DatasetTestHelper, unittest.TestCase): 

1670 """Tests for datastore caching infrastructure.""" 

1671 

1672 @classmethod 

1673 def setUpClass(cls) -> None: 

1674 cls.storageClassFactory = StorageClassFactory() 

1675 cls.universe = DimensionUniverse() 

1676 

1677 # Ensure that we load the test storage class definitions. 

1678 scConfigFile = os.path.join(TESTDIR, "config/basic/storageClasses.yaml") 

1679 cls.storageClassFactory.addFromConfig(scConfigFile) 

1680 

1681 def setUp(self) -> None: 

1682 self.id = 0 

1683 

1684 # Create a root that we can use for caching tests. 

1685 self.root = tempfile.mkdtemp() 

1686 

1687 # Create some test dataset refs and associated test files 

1688 sc = self.storageClassFactory.getStorageClass("StructuredDataDict") 

1689 dimensions = self.universe.conform(("visit", "physical_filter")) 

1690 dataId = { 

1691 "instrument": "dummy", 

1692 "visit": 52, 

1693 "physical_filter": "V", 

1694 "band": "v", 

1695 "day_obs": 20250101, 

1696 } 

1697 

1698 # Create list of refs and list of temporary files 

1699 n_datasets = 10 

1700 self.refs = [self.makeDatasetRef(f"metric{n}", dimensions, sc, dataId) for n in range(n_datasets)] 

1701 

1702 root_uri = ResourcePath(self.root, forceDirectory=True) 

1703 self.files = [root_uri.join(f"file{n}.txt") for n in range(n_datasets)] 

1704 

1705 # Create test files. 

1706 for uri in self.files: 

1707 uri.write(b"0123456789") 

1708 

1709 # Create some composite refs with component files. 

1710 sc = self.storageClassFactory.getStorageClass("StructuredData") 

1711 self.composite_refs = [self.makeDatasetRef(f"composite{n}", dimensions, sc, dataId) for n in range(3)] 

1712 self.comp_files = [] 

1713 self.comp_refs = [] 

1714 for n, ref in enumerate(self.composite_refs): 

1715 component_refs = [] 

1716 component_files = [] 

1717 for component in sc.components: 

1718 component_ref = ref.makeComponentRef(component) 

1719 file = root_uri.join(f"composite_file-{n}-{component}.txt") 

1720 component_refs.append(component_ref) 

1721 component_files.append(file) 

1722 file.write(b"9876543210") 

1723 

1724 self.comp_files.append(component_files) 

1725 self.comp_refs.append(component_refs) 

1726 

1727 def tearDown(self) -> None: 

1728 if self.root is not None and os.path.exists(self.root): 

1729 shutil.rmtree(self.root, ignore_errors=True) 

1730 

1731 def _make_cache_manager(self, config_str: str) -> DatastoreCacheManager: 

1732 config = Config.fromYaml(config_str) 

1733 return DatastoreCacheManager(DatastoreCacheManagerConfig(config), universe=self.universe) 

1734 

1735 def testNoCacheDir(self) -> None: 

1736 config_str = """ 

1737cached: 

1738 root: null 

1739 cacheable: 

1740 metric0: true 

1741 """ 

1742 cache_manager = self._make_cache_manager(config_str) 

1743 

1744 # Look inside to check we don't have a cache directory 

1745 self.assertIsNone(cache_manager._cache_directory) 

1746 

1747 self.assertCache(cache_manager) 

1748 

1749 # Test that the cache directory is marked temporary 

1750 self.assertTrue(cache_manager.cache_directory.isTemporary) 

1751 

1752 def testNoCacheDirReversed(self) -> None: 

1753 """Use default caching status and metric1 to false""" 

1754 config_str = """ 

1755cached: 

1756 root: null 

1757 default: true 

1758 cacheable: 

1759 metric1: false 

1760 """ 

1761 cache_manager = self._make_cache_manager(config_str) 

1762 

1763 self.assertCache(cache_manager) 

1764 

1765 def testEnvvarCacheDir(self) -> None: 

1766 config_str = f""" 

1767cached: 

1768 root: '{self.root}' 

1769 cacheable: 

1770 metric0: true 

1771 """ 

1772 

1773 root = ResourcePath(self.root, forceDirectory=True) 

1774 env_dir = root.join("somewhere", forceDirectory=True) 

1775 elsewhere = root.join("elsewhere", forceDirectory=True) 

1776 

1777 # Environment variable should override the config value. 

1778 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath}): 

1779 cache_manager = self._make_cache_manager(config_str) 

1780 self.assertEqual(cache_manager.cache_directory, env_dir) 

1781 

1782 # This environment variable should not override the config value. 

1783 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}): 

1784 cache_manager = self._make_cache_manager(config_str) 

1785 self.assertEqual(cache_manager.cache_directory, root) 

1786 

1787 # No default setting. 

1788 config_str = """ 

1789cached: 

1790 root: null 

1791 default: true 

1792 cacheable: 

1793 metric1: false 

1794 """ 

1795 cache_manager = self._make_cache_manager(config_str) 

1796 

1797 # This environment variable should override the config value. 

1798 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}): 

1799 cache_manager = self._make_cache_manager(config_str) 

1800 self.assertEqual(cache_manager.cache_directory, env_dir) 

1801 

1802 # If both environment variables are set the main (not IF_UNSET) 

1803 # variable should win. 

1804 with unittest.mock.patch.dict( 

1805 os.environ, 

1806 { 

1807 "DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath, 

1808 "DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": elsewhere.ospath, 

1809 }, 

1810 ): 

1811 cache_manager = self._make_cache_manager(config_str) 

1812 self.assertEqual(cache_manager.cache_directory, env_dir) 

1813 

1814 # Use the API to set the environment variable, making sure that the 

1815 # variable is reset on exit. 

1816 with unittest.mock.patch.dict( 

1817 os.environ, 

1818 {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""}, 

1819 ): 

1820 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset() 

1821 self.assertTrue(defined) 

1822 cache_manager = self._make_cache_manager(config_str) 

1823 self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True)) 

1824 

1825 # Now create the cache manager ahead of time and set the fallback 

1826 # later. 

1827 cache_manager = self._make_cache_manager(config_str) 

1828 self.assertIsNone(cache_manager._cache_directory) 

1829 with unittest.mock.patch.dict( 

1830 os.environ, 

1831 {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""}, 

1832 ): 

1833 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset() 

1834 self.assertTrue(defined) 

1835 self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True)) 

1836 

1837 def testExplicitCacheDir(self) -> None: 

1838 config_str = f""" 

1839cached: 

1840 root: '{self.root}' 

1841 cacheable: 

1842 metric0: true 

1843 """ 

1844 cache_manager = self._make_cache_manager(config_str) 

1845 

1846 # Look inside to check we do have a cache directory. 

1847 self.assertEqual(cache_manager.cache_directory, ResourcePath(self.root, forceDirectory=True)) 

1848 

1849 self.assertCache(cache_manager) 

1850 

1851 # Test that the cache directory is not marked temporary 

1852 self.assertFalse(cache_manager.cache_directory.isTemporary) 

1853 

1854 def testUnexpectedFilesInCacheDir(self) -> None: 

1855 """Test for regression of a bug where extraneous files in a cache 

1856 directory would cause all cache lookups to raise an exception. 

1857 """ 

1858 config_str = f""" 

1859cached: 

1860 root: '{self.root}' 

1861 cacheable: 

1862 metric0: true 

1863 """ 

1864 

1865 for filename in ["unexpected.txt", "unexpected", "un_expected", "un_expected.txt"]: 

1866 unexpected_file = os.path.join(self.root, filename) 

1867 with open(unexpected_file, "w") as fh: 

1868 fh.write("test") 

1869 

1870 cache_manager = self._make_cache_manager(config_str) 

1871 cache_manager.scan_cache() 

1872 self.assertCache(cache_manager) 

1873 

1874 def assertCache(self, cache_manager: DatastoreCacheManager) -> None: 

1875 self.assertTrue(cache_manager.should_be_cached(self.refs[0])) 

1876 self.assertFalse(cache_manager.should_be_cached(self.refs[1])) 

1877 

1878 uri = cache_manager.move_to_cache(self.files[0], self.refs[0]) 

1879 self.assertIsInstance(uri, ResourcePath) 

1880 self.assertIsNone(cache_manager.move_to_cache(self.files[1], self.refs[1])) 

1881 

1882 # Check presence in cache using ref and then using file extension. 

1883 self.assertFalse(cache_manager.known_to_cache(self.refs[1])) 

1884 self.assertTrue(cache_manager.known_to_cache(self.refs[0])) 

1885 self.assertFalse(cache_manager.known_to_cache(self.refs[1], self.files[1].getExtension())) 

1886 self.assertTrue(cache_manager.known_to_cache(self.refs[0], self.files[0].getExtension())) 

1887 

1888 # Cached file should no longer exist but uncached file should be 

1889 # unaffected. 

1890 self.assertFalse(self.files[0].exists()) 

1891 self.assertTrue(self.files[1].exists()) 

1892 

1893 # Should find this file and it should be within the cache directory. 

1894 with cache_manager.find_in_cache(self.refs[0], ".txt") as found: 

1895 self.assertTrue(found.exists()) 

1896 self.assertIsNotNone(found.relative_to(cache_manager.cache_directory)) 

1897 

1898 # Should not be able to find these in cache 

1899 with cache_manager.find_in_cache(self.refs[0], ".fits") as found: 

1900 self.assertIsNone(found) 

1901 with cache_manager.find_in_cache(self.refs[1], ".fits") as found: 

1902 self.assertIsNone(found) 

1903 

1904 def testNoCache(self) -> None: 

1905 cache_manager = DatastoreDisabledCacheManager("", universe=self.universe) 

1906 for uri, ref in zip(self.files, self.refs, strict=True): 

1907 self.assertFalse(cache_manager.should_be_cached(ref)) 

1908 self.assertIsNone(cache_manager.move_to_cache(uri, ref)) 

1909 self.assertFalse(cache_manager.known_to_cache(ref)) 

1910 with cache_manager.find_in_cache(ref, ".txt") as found: 

1911 self.assertIsNone(found, msg=f"{cache_manager}") 

1912 

1913 def _expiration_config(self, mode: str, threshold: int) -> str: 

1914 return f""" 

1915cached: 

1916 default: true 

1917 expiry: 

1918 mode: {mode} 

1919 threshold: {threshold} 

1920 cacheable: 

1921 unused: true 

1922 """ 

1923 

1924 def testCacheExpiryFiles(self) -> None: 

1925 threshold = 2 # Keep at least 2 files. 

1926 mode = "files" 

1927 config_str = self._expiration_config(mode, threshold) 

1928 

1929 cache_manager = self._make_cache_manager(config_str) 

1930 

1931 # Check that an empty cache returns unknown for arbitrary ref 

1932 self.assertFalse(cache_manager.known_to_cache(self.refs[0])) 

1933 

1934 # Should end with datasets: 2, 3, 4 

1935 self.assertExpiration(cache_manager, 5, threshold + 1) 

1936 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

1937 

1938 # Check that we will not expire a file that is actively in use. 

1939 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

1940 self.assertIsNotNone(found) 

1941 

1942 # Trigger cache expiration that should remove the file 

1943 # we just retrieved. Should now have: 3, 4, 5 

1944 cached = cache_manager.move_to_cache(self.files[5], self.refs[5]) 

1945 self.assertIsNotNone(cached) 

1946 

1947 # Cache should still report the standard file count. 

1948 self.assertEqual(cache_manager.file_count, threshold + 1) 

1949 

1950 # Add additional entry to cache. 

1951 # Should now have 4, 5, 6 

1952 cached = cache_manager.move_to_cache(self.files[6], self.refs[6]) 

1953 self.assertIsNotNone(cached) 

1954 

1955 # Is the file still there? 

1956 self.assertTrue(found.exists()) 

1957 

1958 # Can we read it? 

1959 data = found.read() 

1960 self.assertGreater(len(data), 0) 

1961 

1962 # Outside context the file should no longer exist. 

1963 self.assertFalse(found.exists()) 

1964 

1965 # File count should not have changed. 

1966 self.assertEqual(cache_manager.file_count, threshold + 1) 

1967 

1968 # Dataset 2 was in the exempt directory but because hardlinks 

1969 # are used it was deleted from the main cache during cache expiry 

1970 # above and so should no longer be found. 

1971 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

1972 self.assertIsNone(found) 

1973 

1974 # And the one stored after it is also gone. 

1975 with cache_manager.find_in_cache(self.refs[3], ".txt") as found: 

1976 self.assertIsNone(found) 

1977 

1978 # But dataset 4 is present. 

1979 with cache_manager.find_in_cache(self.refs[4], ".txt") as found: 

1980 self.assertIsNotNone(found) 

1981 

1982 # Adding a new dataset to the cache should now delete it. 

1983 cache_manager.move_to_cache(self.files[7], self.refs[7]) 

1984 

1985 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

1986 self.assertIsNone(found) 

1987 

1988 def testCacheExpiryDatasets(self) -> None: 

1989 threshold = 2 # Keep 2 datasets. 

1990 mode = "datasets" 

1991 config_str = self._expiration_config(mode, threshold) 

1992 

1993 cache_manager = self._make_cache_manager(config_str) 

1994 self.assertExpiration(cache_manager, 5, threshold + 1) 

1995 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

1996 

1997 def testCacheExpiryDatasetsFromDisabled(self) -> None: 

1998 threshold = 2 

1999 mode = "datasets" 

2000 with unittest.mock.patch.dict( 

2001 os.environ, 

2002 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": f"{mode}={threshold}"}, 

2003 ): 

2004 cache_manager = DatastoreCacheManager.create_disabled(universe=DimensionUniverse()) 

2005 self.assertExpiration(cache_manager, 5, threshold + 1) 

2006 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

2007 

2008 def testExpirationModeOverride(self) -> None: 

2009 threshold = 2 # Keep 2 datasets. 

2010 mode = "datasets" 

2011 config_str = self._expiration_config(mode, threshold) 

2012 

2013 mode = "size" 

2014 threshold = 55 

2015 with unittest.mock.patch.dict( 

2016 os.environ, 

2017 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": f"{mode}={threshold}"}, 

2018 ): 

2019 cache_manager = self._make_cache_manager(config_str) 

2020 self.assertExpiration(cache_manager, 10, 6) 

2021 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

2022 

2023 # Check we get a warning with unrecognized form. 

2024 with unittest.mock.patch.dict( 

2025 os.environ, 

2026 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "something"}, 

2027 ): 

2028 with self.assertLogs(level="WARNING") as cm: 

2029 self._make_cache_manager(config_str) 

2030 self.assertIn("Unrecognized form (something)", cm.output[0]) 

2031 

2032 with unittest.mock.patch.dict( 

2033 os.environ, 

2034 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "something=5"}, 

2035 ): 

2036 with self.assertRaises(ValueError) as cm: 

2037 self._make_cache_manager(config_str) 

2038 self.assertIn("Unrecognized value", str(cm.exception)) 

2039 

2040 def testMissingThreshold(self) -> None: 

2041 threshold = "" 

2042 mode = "datasets" 

2043 config_str = self._expiration_config(mode, threshold) 

2044 

2045 with self.assertRaises(ValueError) as cm: 

2046 self._make_cache_manager(config_str) 

2047 self.assertIn("Cache expiration threshold", str(cm.exception)) 

2048 

2049 def testCacheExpiryDatasetsComposite(self) -> None: 

2050 threshold = 2 # Keep 2 datasets. 

2051 mode = "datasets" 

2052 config_str = self._expiration_config(mode, threshold) 

2053 

2054 cache_manager = self._make_cache_manager(config_str) 

2055 

2056 n_datasets = 3 

2057 for i in range(n_datasets): 

2058 for component_file, component_ref in zip(self.comp_files[i], self.comp_refs[i], strict=True): 

2059 cached = cache_manager.move_to_cache(component_file, component_ref) 

2060 self.assertIsNotNone(cached) 

2061 self.assertTrue(cache_manager.known_to_cache(component_ref)) 

2062 self.assertTrue(cache_manager.known_to_cache(component_ref.makeCompositeRef())) 

2063 self.assertTrue(cache_manager.known_to_cache(component_ref, component_file.getExtension())) 

2064 

2065 self.assertEqual(cache_manager.file_count, 6) # 2 datasets each of 3 files 

2066 

2067 # Write two new non-composite and the number of files should drop. 

2068 self.assertExpiration(cache_manager, 2, 5) 

2069 

2070 def testCacheExpirySize(self) -> None: 

2071 threshold = 55 # Each file is 10 bytes 

2072 mode = "size" 

2073 config_str = self._expiration_config(mode, threshold) 

2074 

2075 cache_manager = self._make_cache_manager(config_str) 

2076 self.assertExpiration(cache_manager, 10, 6) 

2077 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

2078 

2079 def testDisabledCache(self) -> None: 

2080 # Configure an active cache but disable via environment. 

2081 threshold = 2 

2082 mode = "datasets" 

2083 config_str = self._expiration_config(mode, threshold) 

2084 

2085 with unittest.mock.patch.dict( 

2086 os.environ, 

2087 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "disabled"}, 

2088 ): 

2089 env_cache_manager = self._make_cache_manager(config_str) 

2090 

2091 # Configure to be disabled 

2092 threshold = 0 

2093 mode = "disabled" 

2094 config_str = self._expiration_config(mode, threshold) 

2095 cfg_cache_manager = self._make_cache_manager(config_str) 

2096 

2097 for cache_manager in ( 

2098 cfg_cache_manager, 

2099 env_cache_manager, 

2100 DatastoreCacheManager.create_disabled(universe=DimensionUniverse()), 

2101 ): 

2102 for uri, ref in zip(self.files, self.refs, strict=True): 

2103 self.assertFalse(cache_manager.should_be_cached(ref)) 

2104 self.assertIsNone(cache_manager.move_to_cache(uri, ref)) 

2105 self.assertFalse(cache_manager.known_to_cache(ref)) 

2106 with cache_manager.find_in_cache(ref, ".txt") as found: 

2107 self.assertIsNone(found, msg=f"{cache_manager}") 

2108 self.assertIn("disabled", str(cache_manager)) 

2109 

2110 def assertExpiration( 

2111 self, cache_manager: DatastoreCacheManager, n_datasets: int, n_retained: int 

2112 ) -> None: 

2113 """Insert the datasets and then check the number retained.""" 

2114 for i in range(n_datasets): 

2115 cached = cache_manager.move_to_cache(self.files[i], self.refs[i]) 

2116 self.assertIsNotNone(cached) 

2117 

2118 self.assertEqual(cache_manager.file_count, n_retained) 

2119 

2120 # The oldest file should not be in the cache any more. 

2121 for i in range(n_datasets): 

2122 with cache_manager.find_in_cache(self.refs[i], ".txt") as found: 

2123 if i >= n_datasets - n_retained: 

2124 self.assertIsInstance(found, ResourcePath) 

2125 else: 

2126 self.assertIsNone(found) 

2127 

2128 def testCacheExpiryAge(self) -> None: 

2129 threshold = 1 # Expire older than 2 seconds 

2130 mode = "age" 

2131 config_str = self._expiration_config(mode, threshold) 

2132 

2133 cache_manager = self._make_cache_manager(config_str) 

2134 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

2135 

2136 # Insert 3 files, then sleep, then insert more. 

2137 for i in range(2): 

2138 cached = cache_manager.move_to_cache(self.files[i], self.refs[i]) 

2139 self.assertIsNotNone(cached) 

2140 time.sleep(2.0) 

2141 for j in range(4): 

2142 i = 2 + j # Continue the counting 

2143 cached = cache_manager.move_to_cache(self.files[i], self.refs[i]) 

2144 self.assertIsNotNone(cached) 

2145 

2146 # Only the files written after the sleep should exist. 

2147 self.assertEqual(cache_manager.file_count, 4) 

2148 with cache_manager.find_in_cache(self.refs[1], ".txt") as found: 

2149 self.assertIsNone(found) 

2150 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

2151 self.assertIsInstance(found, ResourcePath) 

2152 

2153 

2154class NullDatastoreTestCase(DatasetTestHelper, unittest.TestCase): 

2155 """Test the null datastore.""" 

2156 

2157 storageClassFactory = StorageClassFactory() 

2158 

2159 def test_basics(self) -> None: 

2160 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

2161 ref = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {}) 

2162 

2163 null = NullDatastore(None, None) 

2164 

2165 self.assertFalse(null.exists(ref)) 

2166 self.assertFalse(null.knows(ref)) 

2167 knows = null.knows_these([ref]) 

2168 self.assertFalse(knows[ref]) 

2169 null.validateConfiguration(ref) 

2170 

2171 with self.assertRaises(FileNotFoundError): 

2172 null.get(ref) 

2173 with self.assertRaises(NotImplementedError): 

2174 null.put("", ref) 

2175 with self.assertRaises(FileNotFoundError): 

2176 null.getURI(ref) 

2177 with self.assertRaises(FileNotFoundError): 

2178 null.getURIs(ref) 

2179 with self.assertRaises(FileNotFoundError): 

2180 null.getManyURIs([ref]) 

2181 with self.assertRaises(NotImplementedError): 

2182 null.getLookupKeys() 

2183 with self.assertRaises(NotImplementedError): 

2184 null.import_records({}) 

2185 with self.assertRaises(NotImplementedError): 

2186 null.export_records([]) 

2187 with self.assertRaises(NotImplementedError): 

2188 null.export_predicted_records([]) 

2189 with self.assertRaises(NotImplementedError): 

2190 null.export([ref]) 

2191 with self.assertRaises(NotImplementedError): 

2192 null.transfer(null, ref) 

2193 with self.assertRaises(NotImplementedError): 

2194 null.emptyTrash() 

2195 with self.assertRaises(NotImplementedError): 

2196 null.trash(ref) 

2197 with self.assertRaises(NotImplementedError): 

2198 null.forget([ref]) 

2199 with self.assertRaises(NotImplementedError): 

2200 null.remove(ref) 

2201 with self.assertRaises(NotImplementedError): 

2202 null.retrieveArtifacts([ref], ResourcePath(".")) 

2203 with self.assertRaises(NotImplementedError): 

2204 null.transfer_from(null, [ref]) 

2205 with self.assertRaises(NotImplementedError): 

2206 null.ingest() 

2207 

2208 

2209class DatasetRefURIsTestCase(unittest.TestCase): 

2210 """Tests for DatasetRefURIs.""" 

2211 

2212 def testSequenceAccess(self) -> None: 

2213 """Verify that DatasetRefURIs can be treated like a two-item tuple.""" 

2214 uris = DatasetRefURIs() 

2215 

2216 self.assertEqual(len(uris), 2) 

2217 self.assertEqual(uris[0], None) 

2218 self.assertEqual(uris[1], {}) 

2219 

2220 primaryURI = ResourcePath("1/2/3") 

2221 componentURI = ResourcePath("a/b/c") 

2222 

2223 # affirm that DatasetRefURIs does not support MutableSequence functions 

2224 with self.assertRaises(TypeError): 

2225 uris[0] = primaryURI 

2226 with self.assertRaises(TypeError): 

2227 uris[1] = {"foo": componentURI} 

2228 

2229 # but DatasetRefURIs can be set by property name: 

2230 uris.primaryURI = primaryURI 

2231 uris.componentURIs = {"foo": componentURI} 

2232 self.assertEqual(uris.primaryURI, primaryURI) 

2233 self.assertEqual(uris[0], primaryURI) 

2234 

2235 primary, components = uris 

2236 self.assertEqual(primary, primaryURI) 

2237 self.assertEqual(components, {"foo": componentURI}) 

2238 

2239 def testRepr(self) -> None: 

2240 """Verify __repr__ output.""" 

2241 uris = DatasetRefURIs(ResourcePath("/1/2/3"), {"comp": ResourcePath("/a/b/c")}) 

2242 self.assertEqual( 

2243 repr(uris), 

2244 'DatasetRefURIs(ResourcePath("file:///1/2/3"), {\'comp\': ResourcePath("file:///a/b/c")})', 

2245 ) 

2246 

2247 

2248class StoredFileInfoTestCase(DatasetTestHelper, unittest.TestCase): 

2249 """Test the StoredFileInfo class.""" 

2250 

2251 storageClassFactory = StorageClassFactory() 

2252 

2253 def test_StoredFileInfo(self) -> None: 

2254 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

2255 ref = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {}) 

2256 

2257 record = dict( 

2258 storage_class="StructuredDataDict", 

2259 formatter="lsst.daf.butler.Formatter", 

2260 path="a/b/c.txt", 

2261 component="component", 

2262 checksum=None, 

2263 file_size=5, 

2264 ) 

2265 info = StoredFileInfo.from_record(record) 

2266 

2267 self.assertEqual(info.to_record(), record) 

2268 

2269 ref2 = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {}) 

2270 rebased = info.rebase(ref2) 

2271 self.assertEqual(rebased.rebase(ref), info) 

2272 

2273 with self.assertRaises(TypeError): 

2274 rebased.update(formatter=42) 

2275 

2276 with self.assertRaises(ValueError): 

2277 rebased.update(something=42, new="42") 

2278 

2279 # Check that pickle works on StoredFileInfo. 

2280 pickled_info = pickle.dumps(info) 

2281 unpickled_info = pickle.loads(pickled_info) 

2282 self.assertEqual(unpickled_info, info) 

2283 

2284 def test_make_datastore_path_relative(self): 

2285 self.assertEqual(make_datastore_path_relative("a/relative/path"), "a/relative/path") 

2286 self.assertEqual(make_datastore_path_relative("path/with#fragment"), "path/with#fragment") 

2287 self.assertEqual(make_datastore_path_relative("http://server.com/some/path"), "some/path") 

2288 self.assertEqual(make_datastore_path_relative("http://server.com/some/path#frag"), "some/path#frag") 

2289 

2290 def test_datastore_record_data_json_types(self): 

2291 """Test that we don't round-trip checksums to UUIDs when deserializing 

2292 datastore record data. 

2293 """ 

2294 test_json = """ 

2295 { 

2296 "dataset_ids": [ 

2297 "74478304-abf1-4a9c-9eb2-926090a84446" 

2298 ], 

2299 "records": { 

2300 "lsst.daf.butler.datastore.stored_file_info.StoredFileInfo": { 

2301 "74478304abf14a9c9eb2926090a84446": { 

2302 "file_datastore_records": [ 

2303 { 

2304 "formatter": "lsst.daf.butler.formatters.yaml.YamlFormatter", 

2305 "path": "gain_factors/base-2025-158/gain_factors_spx_base-2025-158.yaml", 

2306 "storage_class": "GainFactors", 

2307 "component": "__NULL_STRING__", 

2308 "checksum": "cab515f6-ab67-0484-393f-aaa525dd526f", 

2309 "file_size": 5412 

2310 } 

2311 ] 

2312 } 

2313 } 

2314 } 

2315 } 

2316 """ 

2317 id_str = "74478304abf14a9c9eb2926090a84446" 

2318 s = SerializedDatastoreRecordData.model_validate_json(test_json) 

2319 self.assertIsInstance( 

2320 s.records[get_full_type_name(StoredFileInfo)][id_str]["file_datastore_records"][0]["checksum"], 

2321 str, 

2322 ) 

2323 id = uuid.UUID(id_str) 

2324 d = DatastoreRecordData.from_simple(s) 

2325 self.assertIsInstance(d.records[id]["file_datastore_records"][0], StoredFileInfo) 

2326 self.assertIsInstance(d.records[id]["file_datastore_records"][0].checksum, str) 

2327 

2328 

2329class TestDatastoreRecordTable(unittest.TestCase): 

2330 """Test DatastoreRecordTable and StoredFileInfoTable.""" 

2331 

2332 def test_empty_datastore_records_table(self) -> None: 

2333 file_info_table = StoredFileInfoTable.from_records([]) 

2334 self.assertEqual(0, len(file_info_table)) 

2335 

2336 self.assertEqual( 

2337 0, len(DatastoreRecordTable.from_stored_file_info_table("datastore_name", file_info_table)) 

2338 ) 

2339 

2340 self.assertEqual(0, len(DatastoreRecordTable.create_empty())) 

2341 self.assertEqual(0, len(DatastoreRecordTable.combine([]))) 

2342 # Doesn't throw because there is no mismatch in datastore names. 

2343 DatastoreRecordTable.create_empty().validate_datastore_names("arbitrary_name") 

2344 

2345 def test_stored_file_info_table_records(self) -> None: 

2346 uuid1 = uuid.UUID("019e1892-7b9b-736d-8248-0e031723646c") 

2347 uuid2 = uuid.UUID("019e1895-9ec3-7431-bed9-8ae60096103f") 

2348 uuid3 = uuid.UUID("13d13272-454c-4bc4-94d5-3e322982eee8") 

2349 checksum = ( 

2350 "021ced8799518305c451cde3e921515ef315ee7ba8937" 

2351 "a92697a20c571c776afa4102744bc28d2d99d35f44e073cde80cf96e387f65f3967cca45b0d015f5a6b" 

2352 ) 

2353 input_records = [ 

2354 { 

2355 "dataset_id": uuid1, 

2356 "path": "a/relative/path.fits", 

2357 "formatter": "lsst.obs.base.formatters.fitsExposure.FitsExposureFormatter", 

2358 "storage_class": "ExposureF", 

2359 "component": "__NULL_STRING__", 

2360 "checksum": None, 

2361 "file_size": 123, 

2362 }, 

2363 { 

2364 "dataset_id": uuid2, 

2365 "path": "file:///an/absolute/path.fits", 

2366 "formatter": "lsst.obs.base.formatters.fitsExposure.FitsExposureFormatter", 

2367 "storage_class": "ExposureF", 

2368 "component": "comp", 

2369 "checksum": checksum, 

2370 "file_size": -1, 

2371 }, 

2372 ] 

2373 table = StoredFileInfoTable.from_records(input_records) 

2374 self.assertEqual(len(table), 2) 

2375 

2376 def _check_records(records: list[dict]) -> None: 

2377 rec0 = records[0] 

2378 self.assertEqual(rec0["dataset_id"], uuid1) 

2379 self.assertEqual(rec0["path"], "a/relative/path.fits") 

2380 self.assertEqual(rec0["formatter"], "lsst.obs.base.formatters.fitsExposure.FitsExposureFormatter") 

2381 self.assertEqual(rec0["storage_class"], "ExposureF") 

2382 self.assertIsNone(rec0["component"]) 

2383 self.assertIsNone(rec0["checksum"]) 

2384 self.assertEqual(rec0["file_size"], 123) 

2385 rec1 = records[1] 

2386 self.assertEqual(rec1["dataset_id"], uuid2) 

2387 self.assertEqual(rec1["path"], "file:///an/absolute/path.fits") 

2388 self.assertEqual(rec1["formatter"], "lsst.obs.base.formatters.fitsExposure.FitsExposureFormatter") 

2389 self.assertEqual(rec1["storage_class"], "ExposureF") 

2390 self.assertEqual(rec1["component"], "comp") 

2391 self.assertEqual(rec1["checksum"], checksum) 

2392 self.assertIsNone(rec1["file_size"]) 

2393 

2394 arrow_records = table.to_arrow().to_pylist() 

2395 self.assertEqual(len(arrow_records), 2) 

2396 _check_records(arrow_records) 

2397 self.assertEqual(table.to_records(), input_records) 

2398 

2399 datastore_table = DatastoreRecordTable.from_stored_file_info_table("name_of_datastore", table) 

2400 datastore_arrow_records = datastore_table.to_arrow().to_pylist() 

2401 self.assertEqual(len(datastore_arrow_records), 2) 

2402 _check_records(datastore_arrow_records) 

2403 self.assertEqual(datastore_arrow_records[0]["datastore_name"], "name_of_datastore") 

2404 self.assertEqual(datastore_arrow_records[1]["datastore_name"], "name_of_datastore") 

2405 _check_records(datastore_table.to_stored_file_info_table().to_arrow().to_pylist()) 

2406 # Check round-tripping to_arrow() through from_arrow() 

2407 _check_records( 

2408 datastore_table.from_arrow(datastore_table.to_arrow()) 

2409 .to_stored_file_info_table() 

2410 .to_arrow() 

2411 .to_pylist() 

2412 ) 

2413 

2414 with self.assertRaisesRegex(ValueError, "do not match known datastores"): 

2415 datastore_table.validate_datastore_names(["not_the_same_datastore"]) 

2416 datastore_table.validate_datastore_names(["not_the_same_datastore", "name_of_datastore"]) 

2417 

2418 second_table = DatastoreRecordTable.from_stored_file_info_table( 

2419 "other_datastore_name", 

2420 StoredFileInfoTable.from_records( 

2421 [ 

2422 { 

2423 "dataset_id": uuid3, 

2424 "path": "a/relative/path2.fits", 

2425 "formatter": "lsst.obs.lsst.rawFormatter.LsstCamRawFormatter", 

2426 "storage_class": "Exposure", 

2427 "component": "__NULL_STRING__", 

2428 "checksum": None, 

2429 "file_size": 1000, 

2430 }, 

2431 ] 

2432 ), 

2433 ) 

2434 combined_table = DatastoreRecordTable.combine([datastore_table, second_table]) 

2435 combined_records = combined_table.to_arrow().to_pylist() 

2436 _check_records(combined_records) 

2437 rec2 = combined_records[2] 

2438 self.assertEqual(rec2["dataset_id"], uuid3) 

2439 self.assertEqual(rec2["path"], "a/relative/path2.fits") 

2440 self.assertEqual(rec2["formatter"], "lsst.obs.lsst.rawFormatter.LsstCamRawFormatter") 

2441 self.assertEqual(rec2["storage_class"], "Exposure") 

2442 self.assertIsNone(rec2["component"]) 

2443 self.assertIsNone(rec2["checksum"]) 

2444 self.assertEqual(rec2["file_size"], 1000) 

2445 

2446 self.assertEqual(0, len(datastore_table.filter_by_datastore_name("unknown_datastore"))) 

2447 filtered_table = combined_table.filter_by_datastore_name("name_of_datastore") 

2448 self.assertEqual(len(filtered_table), 2) 

2449 _check_records(filtered_table.to_arrow().to_pylist()) 

2450 self.assertEqual(filtered_table.to_stored_file_info_table().to_records(), input_records) 

2451 

2452 

2453@contextlib.contextmanager 

2454def _temp_yaml_file(data: Any) -> Iterator[str]: 

2455 fh = tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") 

2456 try: 

2457 yaml.dump(data, stream=fh) 

2458 fh.flush() 

2459 yield fh.name 

2460 finally: 

2461 # Some tests delete the file 

2462 with contextlib.suppress(FileNotFoundError): 

2463 fh.close() 

2464 

2465 

2466if __name__ == "__main__": 

2467 unittest.main()