Coverage for tests/test_dimensions_versions.py: 16%
140 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:15 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import os
29import unittest
30from contextlib import closing
32from pydantic import ValidationError
34from lsst.daf.butler import Butler, Config, InconsistentUniverseError
35from lsst.daf.butler.tests import addDataIdValue, addDatasetType, makeTestRepo
36from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
38TESTDIR = os.path.abspath(os.path.dirname(__file__))
40_INSTRUMENT = "🔭"
43def _make_butler(root: str, dimensions_version: int) -> Butler:
44 path = os.path.join(
45 os.path.dirname(__file__), "config", "dimensions", f"dimensions{dimensions_version}.yaml"
46 )
47 config = Config()
48 config["datastore", "cls"] = "lsst.daf.butler.datastores.fileDatastore.FileDatastore"
49 butler = makeTestRepo(root, config=config, dimensionConfig=path)
51 return butler
54def _populate_data_ids(butler: Butler, dimension_version: int) -> None:
55 """Populate the repository with data IDs for the dimensions."""
56 addDataIdValue(butler, "instrument", _INSTRUMENT)
58 for detector in range(10):
59 addDataIdValue(butler, "detector", detector, instrument=_INSTRUMENT)
61 if dimension_version in (1, 2):
62 for visit in range(10):
63 addDataIdValue(butler, "visit", visit, instrument=_INSTRUMENT)
65 if dimension_version in (2, 3):
66 for group in "ABCD":
67 addDataIdValue(butler, "group", group, instrument=_INSTRUMENT)
69 if dimension_version in (3, 4):
70 for day_obs in range(2):
71 addDataIdValue(butler, "day_obs", day_obs, instrument=_INSTRUMENT)
72 for visit in range(10):
73 addDataIdValue(butler, "visit", visit, day_obs=visit % 2, instrument=_INSTRUMENT)
76def _populate_repo(butler: Butler, dimension_version: int) -> None:
77 """Populate the repository with some data."""
78 butler.collections.register("test1")
79 butler.collections.register("test2")
80 butler.collections.register("test3")
82 if dimension_version in (1, 2):
83 dataset_type = addDatasetType(butler, "test1", {"instrument", "visit", "detector"}, "int")
84 for data in range(10):
85 butler.put(data, dataset_type, run="test1", instrument=_INSTRUMENT, visit=data, detector=data)
87 if dimension_version == 2:
88 dataset_type = addDatasetType(butler, "test2", {"instrument", "visit", "detector", "group"}, "int")
89 for data in range(10):
90 butler.put(
91 data,
92 dataset_type,
93 run="test2",
94 instrument=_INSTRUMENT,
95 visit=data,
96 detector=data,
97 group="ABCD"[data % 4],
98 )
100 if dimension_version in (3, 4):
101 dataset_type = addDatasetType(butler, "test3", {"instrument", "visit", "detector"}, "int")
102 for data in range(10):
103 butler.put(
104 data,
105 dataset_type,
106 run="test3",
107 instrument=_INSTRUMENT,
108 visit=data,
109 day_obs=data % 2,
110 detector=data,
111 )
114class DimensionsVersionsTestCase(unittest.TestCase):
115 """Tests for dimensions version compatibility."""
117 def _prep_repo(self, root: str, subdir: str, dimension_version: int, fill: bool = False) -> Butler:
118 butler = _make_butler(os.path.join(root, subdir), dimension_version)
119 self.assertEqual(butler.dimensions.namespace, "test")
120 self.assertEqual(butler.dimensions.version, dimension_version)
121 if fill:
122 _populate_data_ids(butler, dimension_version)
123 _populate_repo(butler, dimension_version)
124 return butler
126 def setUp(self) -> None:
127 self.root = makeTestTempDir(TESTDIR)
129 def tearDown(self) -> None:
130 removeTestTempDir(self.root)
132 def test_v1_v2_transfer(self):
133 # Transfer from v1 to v2 works fine as v2 is a superset of v1.
134 with (
135 closing(self._prep_repo(self.root, "butler1", 1, True)) as butler1,
136 closing(self._prep_repo(self.root, "butler2", 2)) as butler2,
137 ):
138 refs = butler1.query_datasets("test1", collections="test1")
139 self.assertEqual(len(refs), 10)
140 butler2.transfer_from(butler1, refs, register_dataset_types=True, transfer_dimensions=True)
142 def test_v1_v2_ingest_zip(self):
143 # Ingest_zip v1 to v2 works fine as v2 is a superset of v1.
144 with (
145 closing(self._prep_repo(self.root, "butler1", 1, True)) as butler1,
146 closing(self._prep_repo(self.root, "butler2", 2)) as butler2,
147 ):
148 refs = butler1.query_datasets("test1", collections="test1", with_dimension_records=True)
149 self.assertEqual(len(refs), 10)
150 export_path = butler1.retrieve_artifacts_zip(refs, os.path.join(self.root, "export"))
151 butler2.ingest_zip(export_path, transfer_dimensions=True)
153 def test_v2_v1_transfer(self):
154 # Transfer from v2 to v1 works if using a subset of v2 dimensions.
155 with (
156 closing(self._prep_repo(self.root, "butler1", 1)) as butler1,
157 closing(self._prep_repo(self.root, "butler2", 2, True)) as butler2,
158 ):
159 refs = butler2.query_datasets("test1", collections="test1")
160 self.assertEqual(len(refs), 10)
161 butler1.transfer_from(butler2, refs, register_dataset_types=True, transfer_dimensions=True)
163 refs = butler2.query_datasets("test2", collections="test2")
164 self.assertEqual(len(refs), 10)
165 with self.assertRaises(InconsistentUniverseError):
166 butler1.transfer_from(butler2, refs, register_dataset_types=True, transfer_dimensions=True)
168 def test_v2_v1_ingest_zip(self):
169 # Ingest_zip from v2 to v1 works if using a subset of v2 dimensions.
170 with (
171 closing(self._prep_repo(self.root, "butler1", 1)) as butler1,
172 closing(self._prep_repo(self.root, "butler2", 2, True)) as butler2,
173 ):
174 refs = butler2.query_datasets("test1", collections="test1", with_dimension_records=True)
175 self.assertEqual(len(refs), 10)
176 export_path = butler2.retrieve_artifacts_zip(refs, os.path.join(self.root, "export"))
177 butler1.ingest_zip(export_path, transfer_dimensions=True)
179 refs = butler2.query_datasets("test2", collections="test2", with_dimension_records=True)
180 self.assertEqual(len(refs), 10)
181 export_path = butler2.retrieve_artifacts_zip(refs, os.path.join(self.root, "export2"))
182 with self.assertRaises(InconsistentUniverseError):
183 butler1.ingest_zip(export_path, transfer_dimensions=True)
185 def test_v2_v3_transfer(self):
186 # Transfer from v2 to v3 fails because day_obs is required for visit.
187 with (
188 closing(self._prep_repo(self.root, "butler2", 2, True)) as butler2,
189 closing(self._prep_repo(self.root, "butler3", 3)) as butler3,
190 ):
191 refs = butler2.query_datasets("test1", collections="test1")
192 self.assertEqual(len(refs), 10)
193 with self.assertRaises(InconsistentUniverseError):
194 butler3.transfer_from(butler2, refs, register_dataset_types=True, transfer_dimensions=True)
196 def test_v2_v3_ingest_zip(self):
197 # Transfer from v2 to v3 fails because day_obs is required for visit.
198 with (
199 closing(self._prep_repo(self.root, "butler2", 2, True)) as butler2,
200 closing(self._prep_repo(self.root, "butler3", 3)) as butler3,
201 ):
202 refs = butler2.query_datasets("test1", collections="test1", with_dimension_records=True)
203 self.assertEqual(len(refs), 10)
204 export_path = butler2.retrieve_artifacts_zip(refs, os.path.join(self.root, "export"))
205 with self.assertRaises(InconsistentUniverseError):
206 butler3.ingest_zip(export_path, transfer_dimensions=True)
208 def test_v3_v2_transfer(self):
209 # Transfer from v3 to v2 fails because day_obs is required for visit.
210 with (
211 closing(self._prep_repo(self.root, "butler2", 2)) as butler2,
212 closing(self._prep_repo(self.root, "butler3", 3, True)) as butler3,
213 ):
214 refs = butler3.query_datasets("test3", collections="test3")
215 self.assertEqual(len(refs), 10)
216 with self.assertRaises(InconsistentUniverseError):
217 butler2.transfer_from(butler3, refs, register_dataset_types=True, transfer_dimensions=True)
219 def test_v3_v2_ingest_zip(self):
220 # Transfer from v3 to v2 fails because day_obs is required for visit.
221 with (
222 closing(self._prep_repo(self.root, "butler2", 2)) as butler2,
223 closing(self._prep_repo(self.root, "butler3", 3, True)) as butler3,
224 ):
225 refs = butler3.query_datasets("test3", collections="test3", with_dimension_records=True)
226 self.assertEqual(len(refs), 10)
227 export_path = butler3.retrieve_artifacts_zip(refs, os.path.join(self.root, "export"))
228 with self.assertRaises(InconsistentUniverseError):
229 butler2.ingest_zip(export_path, transfer_dimensions=True)
231 def test_v3_v4_transfer(self):
232 # Transfer from v3 to v4 works OK.
233 with (
234 closing(self._prep_repo(self.root, "butler3", 3, True)) as butler3,
235 closing(self._prep_repo(self.root, "butler4", 4)) as butler4,
236 ):
237 refs = butler3.query_datasets("test3", collections="test3")
238 self.assertEqual(len(refs), 10)
239 butler4.transfer_from(butler3, refs, register_dataset_types=True, transfer_dimensions=True)
241 def test_v3_v4_ingest_zip(self):
242 # Ingest_zip from v3 to v4 fails due to pydantic validation error.
243 with (
244 closing(self._prep_repo(self.root, "butler3", 3, True)) as butler3,
245 closing(self._prep_repo(self.root, "butler4", 4)) as butler4,
246 ):
247 refs = butler3.query_datasets("test3", collections="test3", with_dimension_records=True)
248 self.assertEqual(len(refs), 10)
249 export_path = butler3.retrieve_artifacts_zip(refs, os.path.join(self.root, "export"))
250 with self.assertRaises(ValidationError):
251 butler4.ingest_zip(export_path, transfer_dimensions=True)
253 def test_v4_v3_transfer(self):
254 # Transfer from v4 to v3 works OK.
255 with (
256 closing(self._prep_repo(self.root, "butler3", 3)) as butler3,
257 closing(self._prep_repo(self.root, "butler4", 4, True)) as butler4,
258 ):
259 refs = butler4.query_datasets("test3", collections="test3")
260 self.assertEqual(len(refs), 10)
261 butler3.transfer_from(butler4, refs, register_dataset_types=True, transfer_dimensions=True)
263 def test_v4_v3_ingest_zip(self):
264 # Ingest_zip from v4 to v3 fails due to pydantic validation error.
265 with (
266 closing(self._prep_repo(self.root, "butler3", 3)) as butler3,
267 closing(self._prep_repo(self.root, "butler4", 4, True)) as butler4,
268 ):
269 refs = butler4.query_datasets("test3", collections="test3", with_dimension_records=True)
270 self.assertEqual(len(refs), 10)
271 export_path = butler4.retrieve_artifacts_zip(refs, os.path.join(self.root, "export"))
272 with self.assertRaises(ValidationError):
273 butler3.ingest_zip(export_path, transfer_dimensions=True)
276if __name__ == "__main__":
277 unittest.main()