Coverage for python / lsst / daf / butler / datastore / stored_file_info.py: 42%
174 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:06 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:06 -0700
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("SerializedStoredFileInfo", "StoredDatastoreItemInfo", "StoredFileInfo")
32import inspect
33from collections.abc import Iterable, Mapping
34from dataclasses import dataclass
35from functools import cache
36from typing import TYPE_CHECKING, Any
38import pyarrow as pa
39import pyarrow.compute as pc
40import pydantic
42from lsst.resources import ResourcePath
43from lsst.utils import doImportType
44from lsst.utils.introspection import get_full_type_name
46from .._formatter import Formatter, FormatterParameter, FormatterV2
47from .._location import Location, LocationFactory
48from .._storage_class import StorageClass, StorageClassFactory
49from ..arrow_utils import ArrowTableUtils
51if TYPE_CHECKING:
52 from .._dataset_ref import DatasetRef
54# String to use when a Python None is encountered
55NULLSTR = "__NULL_STRING__"
58class StoredDatastoreItemInfo:
59 """Internal information associated with a stored dataset in a `Datastore`.
61 This is an empty base class. Datastore implementations are expected to
62 write their own subclasses.
63 """
65 __slots__ = ()
67 def file_location(self, factory: LocationFactory) -> Location:
68 """Return the location of artifact.
70 Parameters
71 ----------
72 factory : `LocationFactory`
73 Factory relevant to the datastore represented by this item.
75 Returns
76 -------
77 location : `Location`
78 The location of the item within this datastore.
79 """
80 raise NotImplementedError("The base class does not know how to locate an item in a datastore.")
82 @classmethod
83 def from_record(cls: type[StoredDatastoreItemInfo], record: Mapping[str, Any]) -> StoredDatastoreItemInfo:
84 """Create instance from database record.
86 Parameters
87 ----------
88 record : `dict`
89 The record associated with this item.
91 Returns
92 -------
93 info : `StoredDatastoreItemInfo`
94 The newly-constructed item corresponding to the record.
95 """
96 raise NotImplementedError()
98 def to_record(self, **kwargs: Any) -> dict[str, Any]:
99 """Convert record contents to a dictionary.
101 Parameters
102 ----------
103 **kwargs
104 Additional items to add to returned record.
105 """
106 raise NotImplementedError()
108 def update(self, **kwargs: Any) -> StoredDatastoreItemInfo:
109 """Create a new class with everything retained apart from the
110 specified values.
112 Parameters
113 ----------
114 **kwargs : `~collections.abc.Mapping`
115 Values to override.
117 Returns
118 -------
119 updated : `StoredDatastoreItemInfo`
120 A new instance of the object with updated values.
121 """
122 raise NotImplementedError()
124 @classmethod
125 def to_records(
126 cls, records: Iterable[StoredDatastoreItemInfo], **kwargs: Any
127 ) -> tuple[str, Iterable[Mapping[str, Any]]]:
128 """Convert a collection of records to dictionaries.
130 Parameters
131 ----------
132 records : `~collections.abc.Iterable` [ `StoredDatastoreItemInfo` ]
133 A collection of records, all records must be of the same type.
134 **kwargs
135 Additional items to add to each returned record.
137 Returns
138 -------
139 class_name : `str`
140 Name of the record class.
141 records : `list` [ `dict` ]
142 Records in their dictionary representation.
143 """
144 if not records:
145 return "", []
146 classes = {record.__class__ for record in records}
147 assert len(classes) == 1, f"Records have to be of the same class: {classes}"
148 return get_full_type_name(classes.pop()), [record.to_record(**kwargs) for record in records]
150 @classmethod
151 def from_records(
152 cls, class_name: str, records: Iterable[Mapping[str, Any]]
153 ) -> list[StoredDatastoreItemInfo]:
154 """Convert collection of dictionaries to records.
156 Parameters
157 ----------
158 class_name : `str`
159 Name of the record class.
160 records : `~collections.abc.Iterable` [ `dict` ]
161 Records in their dictionary representation.
163 Returns
164 -------
165 infos : `list` [`StoredDatastoreItemInfo`]
166 Sequence of records converted to typed representation.
168 Raises
169 ------
170 TypeError
171 Raised if ``class_name`` is not a sub-class of
172 `StoredDatastoreItemInfo`.
173 """
174 try:
175 klass = doImportType(class_name)
176 except ImportError:
177 # Prior to DM-41043 we were embedding a lsst.daf.butler.core
178 # path in the serialized form, which we never wanted; fix this
179 # one case.
180 if class_name == "lsst.daf.butler.core.storedFileInfo.StoredFileInfo":
181 klass = StoredFileInfo
182 else:
183 raise
184 if not issubclass(klass, StoredDatastoreItemInfo):
185 raise TypeError(f"Class {class_name} is not a subclass of StoredDatastoreItemInfo")
186 return [klass.from_record(record) for record in records]
189@dataclass(frozen=True, slots=True)
190class StoredFileInfo(StoredDatastoreItemInfo):
191 """Datastore-private metadata associated with a Datastore file.
193 Parameters
194 ----------
195 formatter : `Formatter` or `FormatterV2` or `str`
196 The formatter to use for this dataset.
197 path : `str`
198 Path to the artifact associated with this dataset.
199 storageClass : `StorageClass` or `None`
200 The storage class associated with this dataset. If `None`,
201 ``storage_class_name`` must be provided as a keyword argument.
202 component : `str` or `None`, optional
203 The component if disassembled.
204 checksum : `str` or `None`, optional
205 The checksum of the artifact.
206 file_size : `int`
207 The size of the file in bytes. -1 indicates the size is not known.
208 storage_class_name : `str`, optional
209 Name of the storage class. This may be passed instead of
210 ``storageClass`` to defer loading storage class definitions (e.g. if a
211 butler configuration may not have been loaded yet). Note that
212 ``storageClass=None`` must be passed explicitly (for backward
213 compatibility, it remains a positional argument with no default).
214 """
216 def __init__(
217 self,
218 formatter: FormatterParameter,
219 path: str,
220 storageClass: StorageClass | None,
221 component: str | None,
222 checksum: str | None,
223 file_size: int,
224 *,
225 storage_class_name: str | None = None,
226 ):
227 # Use these shenanigans to allow us to use a frozen dataclass
228 object.__setattr__(self, "path", path)
229 if storageClass is not None:
230 object.__setattr__(self, "storage_class_name", storageClass.name)
231 else:
232 if storage_class_name is None:
233 raise TypeError("At least one of 'storageClass' and 'storage_class_name' must be provided.")
234 object.__setattr__(self, "storage_class_name", storage_class_name)
235 object.__setattr__(self, "component", component)
236 object.__setattr__(self, "checksum", checksum)
237 object.__setattr__(self, "file_size", file_size)
239 if isinstance(formatter, str):
240 # We trust that this string refers to a Formatter
241 formatterStr = formatter
242 elif isinstance(formatter, Formatter | FormatterV2) or (
243 inspect.isclass(formatter) and issubclass(formatter, Formatter | FormatterV2)
244 ):
245 formatterStr = formatter.name()
246 else:
247 raise TypeError(f"Supplied formatter '{formatter}' is not a Formatter")
248 object.__setattr__(self, "formatter", formatterStr)
250 formatter: str
251 """Fully-qualified name of Formatter. If a Formatter class or instance
252 is given the name will be extracted."""
254 path: str
255 """Path to dataset within Datastore."""
257 storage_class_name: str
258 """Name of the storage class associated with this dataset."""
260 component: str | None
261 """Component associated with this file. Can be `None` if the file does
262 not refer to a component of a composite."""
264 checksum: str | None
265 """Checksum of the serialized dataset."""
267 file_size: int
268 """Size of the serialized dataset in bytes."""
270 @property
271 def storageClass(self) -> StorageClass:
272 """Storage class associated with this dataset."""
273 return StorageClassFactory().getStorageClass(self.storage_class_name)
275 def rebase(self, ref: DatasetRef) -> StoredFileInfo:
276 """Return a copy of the record suitable for a specified reference.
278 Parameters
279 ----------
280 ref : `DatasetRef`
281 DatasetRef which provides component name and dataset ID for the
282 new returned record.
284 Returns
285 -------
286 record : `StoredFileInfo`
287 New record instance.
288 """
289 # take component from the ref, rest comes from self
290 component = ref.datasetType.component()
291 if component is None:
292 component = self.component
293 return self.update(component=component)
295 def to_record(self, **kwargs: Any) -> dict[str, Any]:
296 """Convert the supplied ref to a database record.
298 Parameters
299 ----------
300 **kwargs : `typing.Any`
301 Additional information to be added to the record.
302 """
303 component = self.component
304 if component is None:
305 # Use empty string since we want this to be part of the
306 # primary key.
307 component = NULLSTR
308 return dict(
309 formatter=self.formatter,
310 path=self.path,
311 storage_class=self.storage_class_name,
312 component=component,
313 checksum=self.checksum,
314 file_size=self.file_size,
315 **kwargs,
316 )
318 def to_simple(self) -> SerializedStoredFileInfo:
319 record = self.to_record()
320 # We allow None on the model but the record contains a "null string"
321 # instead
322 record["component"] = self.component
323 return SerializedStoredFileInfo.model_validate(record)
325 def file_location(self, factory: LocationFactory) -> Location:
326 """Return the location of artifact.
328 Parameters
329 ----------
330 factory : `LocationFactory`
331 Factory relevant to the datastore represented by this item.
333 Returns
334 -------
335 location : `Location`
336 The location of the item within this datastore.
337 """
338 uriInStore = ResourcePath(self.path, forceAbsolute=False, forceDirectory=False)
339 if uriInStore.isabs():
340 location = Location(None, uriInStore)
341 else:
342 location = factory.from_uri(uriInStore, trusted_path=True)
343 return location
345 @classmethod
346 def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredFileInfo:
347 """Create instance from database record.
349 Parameters
350 ----------
351 record : `dict`
352 The record associated with this item.
354 Returns
355 -------
356 info : `StoredFileInfo`
357 The newly-constructed item corresponding to the record.
358 """
359 # Convert name of StorageClass to instance
360 component = record["component"] if (record["component"] and record["component"] != NULLSTR) else None
361 info = cls(
362 formatter=record["formatter"],
363 path=record["path"],
364 storageClass=None,
365 storage_class_name=record["storage_class"],
366 component=component,
367 checksum=record["checksum"],
368 file_size=record["file_size"],
369 )
370 return info
372 @classmethod
373 def from_simple(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo:
374 return cls.from_record(dict(model))
376 def update(self, **kwargs: Any) -> StoredFileInfo:
377 new_args: dict[str, Any] = {"storageClass": None} # so `storage_class_name` can be passed.
378 for k in self.__slots__:
379 if k in kwargs:
380 new_args[k] = kwargs.pop(k)
381 else:
382 new_args[k] = getattr(self, k)
383 if kwargs:
384 raise ValueError(f"Unexpected keyword arguments for update: {', '.join(kwargs)}")
385 return type(self)(**new_args)
387 def __reduce__(self) -> str | tuple[Any, ...]:
388 return (self.from_record, (self.to_record(),))
390 @property
391 def artifact_path(self) -> str:
392 """Path to dataset as stored in Datastore with fragments removed."""
393 if "#" in self.path:
394 return self.path[: self.path.rfind("#")]
395 return self.path
398class SerializedStoredFileInfo(pydantic.BaseModel):
399 """Serialized representation of `StoredFileInfo` properties."""
401 formatter: str
402 """Fully-qualified name of Formatter."""
404 path: str
405 """Path to dataset within Datastore."""
407 storage_class: str
408 """Name of the StorageClass associated with Dataset."""
410 component: str | None = None
411 """Component associated with this file. Can be `None` if the file does
412 not refer to a component of a composite."""
414 checksum: str | None = None
415 """Checksum of the serialized dataset."""
417 file_size: int
418 """Size of the serialized dataset in bytes."""
421class StoredFileInfoTable:
422 """Arrow representation of the database rows from ``StoredFileInfo``.
424 Parameters
425 ----------
426 table
427 Arrow table containing the file information records.
429 Notes
430 -----
431 Users should not call the constructor directly -- use one of the ``from_*``
432 methods.
433 """
435 def __init__(self, table: pa.Table) -> None:
436 self._table = table
438 @staticmethod
439 def from_arrow(table: pa.Table) -> StoredFileInfoTable:
440 """Create an instance from an external `pyarrow.Table` object.
442 Parameters
443 ----------
444 table
445 `pyarrow.Table` instance with a schema compatible with the one
446 returned by ``StoredFileInfoTable.make_arrow_schema()``.
448 Returns
449 -------
450 table
451 New ``StoredFileInfoTable`` instance backed by the given table.
452 """
453 return StoredFileInfoTable(table.cast(StoredFileInfoTable.make_arrow_schema()))
455 def to_arrow(self) -> pa.Table:
456 """Convert to a raw `pyarrow.Table`."""
457 return self._table
459 @staticmethod
460 def from_records(records: Iterable[Mapping[str, Any]]) -> StoredFileInfoTable:
461 """Convert datastore records from raw database row mappings to an arrow
462 table representation.
464 Parameters
465 ----------
466 records
467 List of database records as mappings from database column name to
468 value.
470 Returns
471 -------
472 table
473 New ``StoredFileInfoTable`` instance containing the given records.
475 Notes
476 -----
477 This is a tabular version of ``StoredFileInfo.from_record()``.
478 """
479 records = [{**row, "dataset_id": row["dataset_id"].bytes} for row in records]
480 table = pa.Table.from_pylist(records, schema=StoredFileInfoTable.make_arrow_schema())
482 # The underlying database tables use a magic value "__NULL_STRING__"
483 # instead of SQL NULL for missing component values -- replace
484 # with actual nulls to avoid leaking this implementation detail
485 # to downstream consumers.
486 table = ArrowTableUtils.modify_column(table, "component", _replace_null_placeholder_with_null)
488 # Similarly, unknown file size is represented in the DB by a negative
489 # number -- replace with NULL to prevent accidental usage of negative
490 # file sizes downstream.
491 table = ArrowTableUtils.modify_column(table, "file_size", _replace_negative_with_null)
493 return StoredFileInfoTable(table)
495 def to_records(self) -> list[dict[str, Any]]:
496 """Convert datastore records in this table to dictionary instances in a
497 format suitable for insertion to the database.
499 Returns
500 -------
501 records
502 List of dictionaries suitable for insertion into datastore records
503 table.
504 """
505 # Undo the transformations done in ``from_records`` to get the database
506 # representation for component and file_size.
507 table = self._table
508 table = ArrowTableUtils.modify_column(table, "component", lambda c: c.fill_null(NULLSTR))
509 table = ArrowTableUtils.modify_column(table, "file_size", lambda c: c.fill_null(-1))
510 return table.to_pylist()
512 @cache
513 @staticmethod
514 def make_arrow_schema() -> pa.Schema:
515 """Return the `pyarrow.Schema` for the arrow table."""
516 string_dict = pa.dictionary(pa.int32(), pa.string())
517 return pa.schema(
518 [
519 pa.field("dataset_id", pa.uuid(), nullable=False),
520 pa.field("path", pa.string(), nullable=False),
521 pa.field("formatter", string_dict, nullable=False),
522 pa.field("storage_class", string_dict, nullable=False),
523 pa.field("component", string_dict, nullable=True),
524 pa.field("checksum", pa.string(), nullable=True),
525 pa.field("file_size", pa.int64(), nullable=True),
526 ]
527 )
529 def __len__(self) -> int:
530 return len(self._table)
533def _replace_null_placeholder_with_null(array: pa.Array) -> pa.Array:
534 mask = pc.equal(array, NULLSTR)
535 return pc.if_else(mask, pa.scalar(None), array)
538def _replace_negative_with_null(array: pa.Array) -> pa.Array:
539 mask = pc.less(array, 0)
540 return pc.if_else(mask, pa.scalar(None), array)
543def make_datastore_path_relative(path: str) -> str:
544 """Normalize a path from a `StoredFileInfo` object so
545 that it is always relative.
547 Parameters
548 ----------
549 path : `str`
550 The file path from a `StoredFileInfo`.
552 Returns
553 -------
554 normalized_path : `str`
555 The original path, if it was relative. Otherwise, a version of it that
556 was converted to a relative path, stripping URI scheme and netloc from
557 it.
558 """
559 # Force the datastore file path sent to the client to be relative, since
560 # absolute URLs in the server will generally not be reachable by the
561 # client. If an absolute URL is sent, it (or a portion of it) can end up
562 # baked into the FileDatastore that is the target of the transfer in some
563 # cases.
564 rpath = ResourcePath(path, forceAbsolute=False, forceDirectory=False)
565 if rpath.isabs():
566 relative = rpath.relativeToPathRoot
567 if rpath.fragment:
568 # Preserve the fragment, since this used to indicate special
569 # processing like zip extraction.
570 return f"{relative}#{rpath.fragment}"
571 else:
572 return relative
573 else:
574 return path