Coverage for python / lsst / daf / butler / datastore / stored_file_info.py: 42%

174 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-27 01:07 -0700

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("SerializedStoredFileInfo", "StoredDatastoreItemInfo", "StoredFileInfo") 

31 

32import inspect 

33from collections.abc import Iterable, Mapping 

34from dataclasses import dataclass 

35from functools import cache 

36from typing import TYPE_CHECKING, Any 

37 

38import pyarrow as pa 

39import pyarrow.compute as pc 

40import pydantic 

41 

42from lsst.resources import ResourcePath 

43from lsst.utils import doImportType 

44from lsst.utils.introspection import get_full_type_name 

45 

46from .._formatter import Formatter, FormatterParameter, FormatterV2 

47from .._location import Location, LocationFactory 

48from .._storage_class import StorageClass, StorageClassFactory 

49from ..arrow_utils import ArrowTableUtils 

50 

51if TYPE_CHECKING: 

52 from .._dataset_ref import DatasetRef 

53 

54# String to use when a Python None is encountered 

55NULLSTR = "__NULL_STRING__" 

56 

57 

58class StoredDatastoreItemInfo: 

59 """Internal information associated with a stored dataset in a `Datastore`. 

60 

61 This is an empty base class. Datastore implementations are expected to 

62 write their own subclasses. 

63 """ 

64 

65 __slots__ = () 

66 

67 def file_location(self, factory: LocationFactory) -> Location: 

68 """Return the location of artifact. 

69 

70 Parameters 

71 ---------- 

72 factory : `LocationFactory` 

73 Factory relevant to the datastore represented by this item. 

74 

75 Returns 

76 ------- 

77 location : `Location` 

78 The location of the item within this datastore. 

79 """ 

80 raise NotImplementedError("The base class does not know how to locate an item in a datastore.") 

81 

82 @classmethod 

83 def from_record(cls: type[StoredDatastoreItemInfo], record: Mapping[str, Any]) -> StoredDatastoreItemInfo: 

84 """Create instance from database record. 

85 

86 Parameters 

87 ---------- 

88 record : `dict` 

89 The record associated with this item. 

90 

91 Returns 

92 ------- 

93 info : `StoredDatastoreItemInfo` 

94 The newly-constructed item corresponding to the record. 

95 """ 

96 raise NotImplementedError() 

97 

98 def to_record(self, **kwargs: Any) -> dict[str, Any]: 

99 """Convert record contents to a dictionary. 

100 

101 Parameters 

102 ---------- 

103 **kwargs 

104 Additional items to add to returned record. 

105 """ 

106 raise NotImplementedError() 

107 

108 def update(self, **kwargs: Any) -> StoredDatastoreItemInfo: 

109 """Create a new class with everything retained apart from the 

110 specified values. 

111 

112 Parameters 

113 ---------- 

114 **kwargs : `~collections.abc.Mapping` 

115 Values to override. 

116 

117 Returns 

118 ------- 

119 updated : `StoredDatastoreItemInfo` 

120 A new instance of the object with updated values. 

121 """ 

122 raise NotImplementedError() 

123 

124 @classmethod 

125 def to_records( 

126 cls, records: Iterable[StoredDatastoreItemInfo], **kwargs: Any 

127 ) -> tuple[str, Iterable[Mapping[str, Any]]]: 

128 """Convert a collection of records to dictionaries. 

129 

130 Parameters 

131 ---------- 

132 records : `~collections.abc.Iterable` [ `StoredDatastoreItemInfo` ] 

133 A collection of records, all records must be of the same type. 

134 **kwargs 

135 Additional items to add to each returned record. 

136 

137 Returns 

138 ------- 

139 class_name : `str` 

140 Name of the record class. 

141 records : `list` [ `dict` ] 

142 Records in their dictionary representation. 

143 """ 

144 if not records: 

145 return "", [] 

146 classes = {record.__class__ for record in records} 

147 assert len(classes) == 1, f"Records have to be of the same class: {classes}" 

148 return get_full_type_name(classes.pop()), [record.to_record(**kwargs) for record in records] 

149 

150 @classmethod 

151 def from_records( 

152 cls, class_name: str, records: Iterable[Mapping[str, Any]] 

153 ) -> list[StoredDatastoreItemInfo]: 

154 """Convert collection of dictionaries to records. 

155 

156 Parameters 

157 ---------- 

158 class_name : `str` 

159 Name of the record class. 

160 records : `~collections.abc.Iterable` [ `dict` ] 

161 Records in their dictionary representation. 

162 

163 Returns 

164 ------- 

165 infos : `list` [`StoredDatastoreItemInfo`] 

166 Sequence of records converted to typed representation. 

167 

168 Raises 

169 ------ 

170 TypeError 

171 Raised if ``class_name`` is not a sub-class of 

172 `StoredDatastoreItemInfo`. 

173 """ 

174 try: 

175 klass = doImportType(class_name) 

176 except ImportError: 

177 # Prior to DM-41043 we were embedding a lsst.daf.butler.core 

178 # path in the serialized form, which we never wanted; fix this 

179 # one case. 

180 if class_name == "lsst.daf.butler.core.storedFileInfo.StoredFileInfo": 

181 klass = StoredFileInfo 

182 else: 

183 raise 

184 if not issubclass(klass, StoredDatastoreItemInfo): 

185 raise TypeError(f"Class {class_name} is not a subclass of StoredDatastoreItemInfo") 

186 return [klass.from_record(record) for record in records] 

187 

188 

189@dataclass(frozen=True, slots=True) 

190class StoredFileInfo(StoredDatastoreItemInfo): 

191 """Datastore-private metadata associated with a Datastore file. 

192 

193 Parameters 

194 ---------- 

195 formatter : `Formatter` or `FormatterV2` or `str` 

196 The formatter to use for this dataset. 

197 path : `str` 

198 Path to the artifact associated with this dataset. 

199 storageClass : `StorageClass` or `None` 

200 The storage class associated with this dataset. If `None`, 

201 ``storage_class_name`` must be provided as a keyword argument. 

202 component : `str` or `None`, optional 

203 The component if disassembled. 

204 checksum : `str` or `None`, optional 

205 The checksum of the artifact. 

206 file_size : `int` 

207 The size of the file in bytes. -1 indicates the size is not known. 

208 storage_class_name : `str`, optional 

209 Name of the storage class. This may be passed instead of 

210 ``storageClass`` to defer loading storage class definitions (e.g. if a 

211 butler configuration may not have been loaded yet). Note that 

212 ``storageClass=None`` must be passed explicitly (for backward 

213 compatibility, it remains a positional argument with no default). 

214 """ 

215 

216 def __init__( 

217 self, 

218 formatter: FormatterParameter, 

219 path: str, 

220 storageClass: StorageClass | None, 

221 component: str | None, 

222 checksum: str | None, 

223 file_size: int, 

224 *, 

225 storage_class_name: str | None = None, 

226 ): 

227 # Use these shenanigans to allow us to use a frozen dataclass 

228 object.__setattr__(self, "path", path) 

229 if storageClass is not None: 

230 object.__setattr__(self, "storage_class_name", storageClass.name) 

231 else: 

232 if storage_class_name is None: 

233 raise TypeError("At least one of 'storageClass' and 'storage_class_name' must be provided.") 

234 object.__setattr__(self, "storage_class_name", storage_class_name) 

235 object.__setattr__(self, "component", component) 

236 object.__setattr__(self, "checksum", checksum) 

237 object.__setattr__(self, "file_size", file_size) 

238 

239 if isinstance(formatter, str): 

240 # We trust that this string refers to a Formatter 

241 formatterStr = formatter 

242 elif isinstance(formatter, Formatter | FormatterV2) or ( 

243 inspect.isclass(formatter) and issubclass(formatter, Formatter | FormatterV2) 

244 ): 

245 formatterStr = formatter.name() 

246 else: 

247 raise TypeError(f"Supplied formatter '{formatter}' is not a Formatter") 

248 object.__setattr__(self, "formatter", formatterStr) 

249 

250 formatter: str 

251 """Fully-qualified name of Formatter. If a Formatter class or instance 

252 is given the name will be extracted.""" 

253 

254 path: str 

255 """Path to dataset within Datastore.""" 

256 

257 storage_class_name: str 

258 """Name of the storage class associated with this dataset.""" 

259 

260 component: str | None 

261 """Component associated with this file. Can be `None` if the file does 

262 not refer to a component of a composite.""" 

263 

264 checksum: str | None 

265 """Checksum of the serialized dataset.""" 

266 

267 file_size: int 

268 """Size of the serialized dataset in bytes.""" 

269 

270 @property 

271 def storageClass(self) -> StorageClass: 

272 """Storage class associated with this dataset.""" 

273 return StorageClassFactory().getStorageClass(self.storage_class_name) 

274 

275 def rebase(self, ref: DatasetRef) -> StoredFileInfo: 

276 """Return a copy of the record suitable for a specified reference. 

277 

278 Parameters 

279 ---------- 

280 ref : `DatasetRef` 

281 DatasetRef which provides component name and dataset ID for the 

282 new returned record. 

283 

284 Returns 

285 ------- 

286 record : `StoredFileInfo` 

287 New record instance. 

288 """ 

289 # take component from the ref, rest comes from self 

290 component = ref.datasetType.component() 

291 if component is None: 

292 component = self.component 

293 return self.update(component=component) 

294 

295 def to_record(self, **kwargs: Any) -> dict[str, Any]: 

296 """Convert the supplied ref to a database record. 

297 

298 Parameters 

299 ---------- 

300 **kwargs : `typing.Any` 

301 Additional information to be added to the record. 

302 """ 

303 component = self.component 

304 if component is None: 

305 # Use empty string since we want this to be part of the 

306 # primary key. 

307 component = NULLSTR 

308 return dict( 

309 formatter=self.formatter, 

310 path=self.path, 

311 storage_class=self.storage_class_name, 

312 component=component, 

313 checksum=self.checksum, 

314 file_size=self.file_size, 

315 **kwargs, 

316 ) 

317 

318 def to_simple(self) -> SerializedStoredFileInfo: 

319 record = self.to_record() 

320 # We allow None on the model but the record contains a "null string" 

321 # instead 

322 record["component"] = self.component 

323 return SerializedStoredFileInfo.model_validate(record) 

324 

325 def file_location(self, factory: LocationFactory) -> Location: 

326 """Return the location of artifact. 

327 

328 Parameters 

329 ---------- 

330 factory : `LocationFactory` 

331 Factory relevant to the datastore represented by this item. 

332 

333 Returns 

334 ------- 

335 location : `Location` 

336 The location of the item within this datastore. 

337 """ 

338 uriInStore = ResourcePath(self.path, forceAbsolute=False, forceDirectory=False) 

339 if uriInStore.isabs(): 

340 location = Location(None, uriInStore) 

341 else: 

342 location = factory.from_uri(uriInStore, trusted_path=True) 

343 return location 

344 

345 @classmethod 

346 def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredFileInfo: 

347 """Create instance from database record. 

348 

349 Parameters 

350 ---------- 

351 record : `dict` 

352 The record associated with this item. 

353 

354 Returns 

355 ------- 

356 info : `StoredFileInfo` 

357 The newly-constructed item corresponding to the record. 

358 """ 

359 # Convert name of StorageClass to instance 

360 component = record["component"] if (record["component"] and record["component"] != NULLSTR) else None 

361 info = cls( 

362 formatter=record["formatter"], 

363 path=record["path"], 

364 storageClass=None, 

365 storage_class_name=record["storage_class"], 

366 component=component, 

367 checksum=record["checksum"], 

368 file_size=record["file_size"], 

369 ) 

370 return info 

371 

372 @classmethod 

373 def from_simple(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo: 

374 return cls.from_record(dict(model)) 

375 

376 def update(self, **kwargs: Any) -> StoredFileInfo: 

377 new_args: dict[str, Any] = {"storageClass": None} # so `storage_class_name` can be passed. 

378 for k in self.__slots__: 

379 if k in kwargs: 

380 new_args[k] = kwargs.pop(k) 

381 else: 

382 new_args[k] = getattr(self, k) 

383 if kwargs: 

384 raise ValueError(f"Unexpected keyword arguments for update: {', '.join(kwargs)}") 

385 return type(self)(**new_args) 

386 

387 def __reduce__(self) -> str | tuple[Any, ...]: 

388 return (self.from_record, (self.to_record(),)) 

389 

390 @property 

391 def artifact_path(self) -> str: 

392 """Path to dataset as stored in Datastore with fragments removed.""" 

393 if "#" in self.path: 

394 return self.path[: self.path.rfind("#")] 

395 return self.path 

396 

397 

398class SerializedStoredFileInfo(pydantic.BaseModel): 

399 """Serialized representation of `StoredFileInfo` properties.""" 

400 

401 formatter: str 

402 """Fully-qualified name of Formatter.""" 

403 

404 path: str 

405 """Path to dataset within Datastore.""" 

406 

407 storage_class: str 

408 """Name of the StorageClass associated with Dataset.""" 

409 

410 component: str | None = None 

411 """Component associated with this file. Can be `None` if the file does 

412 not refer to a component of a composite.""" 

413 

414 checksum: str | None = None 

415 """Checksum of the serialized dataset.""" 

416 

417 file_size: int 

418 """Size of the serialized dataset in bytes.""" 

419 

420 

421class StoredFileInfoTable: 

422 """Arrow representation of the database rows from ``StoredFileInfo``. 

423 

424 Parameters 

425 ---------- 

426 table 

427 Arrow table containing the file information records. 

428 

429 Notes 

430 ----- 

431 Users should not call the constructor directly -- use one of the ``from_*`` 

432 methods. 

433 """ 

434 

435 def __init__(self, table: pa.Table) -> None: 

436 self._table = table 

437 

438 @staticmethod 

439 def from_arrow(table: pa.Table) -> StoredFileInfoTable: 

440 """Create an instance from an external `pyarrow.Table` object. 

441 

442 Parameters 

443 ---------- 

444 table 

445 `pyarrow.Table` instance with a schema compatible with the one 

446 returned by ``StoredFileInfoTable.make_arrow_schema()``. 

447 

448 Returns 

449 ------- 

450 table 

451 New ``StoredFileInfoTable`` instance backed by the given table. 

452 """ 

453 return StoredFileInfoTable(table.cast(StoredFileInfoTable.make_arrow_schema())) 

454 

455 def to_arrow(self) -> pa.Table: 

456 """Convert to a raw `pyarrow.Table`.""" 

457 return self._table 

458 

459 @staticmethod 

460 def from_records(records: Iterable[Mapping[str, Any]]) -> StoredFileInfoTable: 

461 """Convert datastore records from raw database row mappings to an arrow 

462 table representation. 

463 

464 Parameters 

465 ---------- 

466 records 

467 List of database records as mappings from database column name to 

468 value. 

469 

470 Returns 

471 ------- 

472 table 

473 New ``StoredFileInfoTable`` instance containing the given records. 

474 

475 Notes 

476 ----- 

477 This is a tabular version of ``StoredFileInfo.from_record()``. 

478 """ 

479 records = [{**row, "dataset_id": row["dataset_id"].bytes} for row in records] 

480 table = pa.Table.from_pylist(records, schema=StoredFileInfoTable.make_arrow_schema()) 

481 

482 # The underlying database tables use a magic value "__NULL_STRING__" 

483 # instead of SQL NULL for missing component values -- replace 

484 # with actual nulls to avoid leaking this implementation detail 

485 # to downstream consumers. 

486 table = ArrowTableUtils.modify_column(table, "component", _replace_null_placeholder_with_null) 

487 

488 # Similarly, unknown file size is represented in the DB by a negative 

489 # number -- replace with NULL to prevent accidental usage of negative 

490 # file sizes downstream. 

491 table = ArrowTableUtils.modify_column(table, "file_size", _replace_negative_with_null) 

492 

493 return StoredFileInfoTable(table) 

494 

495 def to_records(self) -> list[dict[str, Any]]: 

496 """Convert datastore records in this table to dictionary instances in a 

497 format suitable for insertion to the database. 

498 

499 Returns 

500 ------- 

501 records 

502 List of dictionaries suitable for insertion into datastore records 

503 table. 

504 """ 

505 # Undo the transformations done in ``from_records`` to get the database 

506 # representation for component and file_size. 

507 table = self._table 

508 table = ArrowTableUtils.modify_column(table, "component", lambda c: c.fill_null(NULLSTR)) 

509 table = ArrowTableUtils.modify_column(table, "file_size", lambda c: c.fill_null(-1)) 

510 return table.to_pylist() 

511 

512 @cache 

513 @staticmethod 

514 def make_arrow_schema() -> pa.Schema: 

515 """Return the `pyarrow.Schema` for the arrow table.""" 

516 string_dict = pa.dictionary(pa.int32(), pa.string()) 

517 return pa.schema( 

518 [ 

519 pa.field("dataset_id", pa.uuid(), nullable=False), 

520 pa.field("path", pa.string(), nullable=False), 

521 pa.field("formatter", string_dict, nullable=False), 

522 pa.field("storage_class", string_dict, nullable=False), 

523 pa.field("component", string_dict, nullable=True), 

524 pa.field("checksum", pa.string(), nullable=True), 

525 pa.field("file_size", pa.int64(), nullable=True), 

526 ] 

527 ) 

528 

529 def __len__(self) -> int: 

530 return len(self._table) 

531 

532 

533def _replace_null_placeholder_with_null(array: pa.Array) -> pa.Array: 

534 mask = pc.equal(array, NULLSTR) 

535 return pc.if_else(mask, pa.scalar(None), array) 

536 

537 

538def _replace_negative_with_null(array: pa.Array) -> pa.Array: 

539 mask = pc.less(array, 0) 

540 return pc.if_else(mask, pa.scalar(None), array) 

541 

542 

543def make_datastore_path_relative(path: str) -> str: 

544 """Normalize a path from a `StoredFileInfo` object so 

545 that it is always relative. 

546 

547 Parameters 

548 ---------- 

549 path : `str` 

550 The file path from a `StoredFileInfo`. 

551 

552 Returns 

553 ------- 

554 normalized_path : `str` 

555 The original path, if it was relative. Otherwise, a version of it that 

556 was converted to a relative path, stripping URI scheme and netloc from 

557 it. 

558 """ 

559 # Force the datastore file path sent to the client to be relative, since 

560 # absolute URLs in the server will generally not be reachable by the 

561 # client. If an absolute URL is sent, it (or a portion of it) can end up 

562 # baked into the FileDatastore that is the target of the transfer in some 

563 # cases. 

564 rpath = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

565 if rpath.isabs(): 

566 relative = rpath.relativeToPathRoot 

567 if rpath.fragment: 

568 # Preserve the fragment, since this used to indicate special 

569 # processing like zip extraction. 

570 return f"{relative}#{rpath.fragment}" 

571 else: 

572 return relative 

573 else: 

574 return path