Coverage for python / lsst / daf / butler / datastore / file_templates.py: 11%
319 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:07 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Support for file template string expansion."""
30from __future__ import annotations
32__all__ = ("FileTemplate", "FileTemplateValidationError", "FileTemplates", "FileTemplatesConfig")
34import logging
35import os.path
36import string
37from collections.abc import Iterable, Mapping
38from types import MappingProxyType
39from typing import TYPE_CHECKING, Any, TypedDict, cast
41from .._config import Config
42from .._config_support import LookupKey, processLookupConfigs
43from .._dataset_ref import DatasetId, DatasetRef
44from .._exceptions import ValidationError
45from .._storage_class import StorageClass
46from ..dimensions import DataCoordinate, DimensionGroup
48if TYPE_CHECKING:
49 from .._dataset_type import DatasetType
50 from ..dimensions import DimensionRecord, DimensionUniverse
52log = logging.getLogger(__name__)
55class FileTemplateValidationError(ValidationError):
56 """Exception for file template inconsistent with associated DatasetType."""
58 pass
61class FileTemplatesConfig(Config):
62 """Configuration information for `FileTemplates`."""
64 pass
67class FieldDict(TypedDict):
68 """Dictionary containing the grouped fields from a template."""
70 standard: set[str]
71 special: set[str]
72 subfield: set[str]
73 parent: set[str]
76class FileTemplates:
77 """Collection of `FileTemplate` templates.
79 Parameters
80 ----------
81 config : `FileTemplatesConfig` or `str`
82 Load configuration.
83 default : `str`, optional
84 If not `None`, a default template to use if no template has
85 been specified explicitly in the configuration.
86 universe : `DimensionUniverse`
87 The set of all known dimensions, used to normalize any lookup keys
88 involving dimensions.
90 Notes
91 -----
92 The configuration can include one level of hierarchy where an
93 instrument-specific section can be defined to override more general
94 template specifications. This is represented in YAML using a
95 key of form ``instrument<name>`` which can then define templates
96 that will be returned if a `DatasetRef` contains a matching instrument
97 name in the data ID.
99 A default fallback template can be specified using the key ``default``.
100 Defaulting can be disabled in a child configuration by defining the
101 value to be an empty string or a boolean `False`.
103 The config is parsed using the function
104 `~lsst.daf.butler.configSubset.processLookupConfigs`.
105 """
107 defaultKey = LookupKey("default")
108 """Configuration key associated with the default template."""
110 def __init__(
111 self,
112 config: FileTemplatesConfig | str,
113 default: str | None = None,
114 *,
115 universe: DimensionUniverse,
116 ):
117 self.config = FileTemplatesConfig(config)
118 self._templates = {}
120 contents = processLookupConfigs(self.config, universe=universe)
122 # Determine default to use -- defaults can be disabled if
123 # we get a False or None
124 defaultValue = contents.get(self.defaultKey, default)
125 if defaultValue and not isinstance(defaultValue, str):
126 raise RuntimeError(
127 f"Default template value should be str or False, or None. Got '{defaultValue}'"
128 )
129 self.default = FileTemplate(defaultValue) if isinstance(defaultValue, str) and defaultValue else None
131 # Convert all the values to FileTemplate, handling defaults
132 for key, templateStr in contents.items():
133 if key == self.defaultKey:
134 continue
135 if not isinstance(templateStr, str):
136 raise RuntimeError(f"Unexpected value in file template key {key}: {templateStr}")
137 self._templates[key] = FileTemplate(templateStr)
139 @property
140 def templates(self) -> Mapping[LookupKey, FileTemplate]:
141 """Return collection of templates indexed by lookup key (`dict`)."""
142 return MappingProxyType(self._templates)
144 def __contains__(self, key: LookupKey) -> bool:
145 """Indicate whether the supplied key is present in the templates.
147 Parameters
148 ----------
149 key : `LookupKey`
150 Key to use to determine if a corresponding value is present
151 in the templates.
153 Returns
154 -------
155 in : `bool`
156 `True` if the supplied key is present in the templates.
157 """
158 return key in self.templates
160 def __getitem__(self, key: LookupKey) -> FileTemplate:
161 return self.templates[key]
163 def validateTemplates(
164 self, entities: Iterable[DatasetType | DatasetRef | StorageClass], logFailures: bool = False
165 ) -> None:
166 """Validate the templates.
168 Retrieves the template associated with each dataset type and
169 validates the dimensions against the template.
171 Parameters
172 ----------
173 entities : `DatasetType`, `DatasetRef`, or `StorageClass`
174 Entities to validate against the matching templates. Can be
175 differing types.
176 logFailures : `bool`, optional
177 If `True`, output a log message for every validation error
178 detected.
180 Raises
181 ------
182 FileTemplateValidationError
183 Raised if an entity failed validation.
185 Notes
186 -----
187 See `FileTemplate.validateTemplate()` for details on the validation.
188 """
189 unmatchedKeys = set(self.templates)
190 failed = []
191 for entity in entities:
192 try:
193 matchKey, template = self.getTemplateWithMatch(entity)
194 except KeyError as e:
195 # KeyError always quotes on stringification so strip here
196 errMsg = str(e).strip("\"'")
197 failed.append(errMsg)
198 if logFailures:
199 log.critical("%s", errMsg)
200 continue
202 if matchKey in unmatchedKeys:
203 unmatchedKeys.remove(matchKey)
205 try:
206 template.validateTemplate(entity)
207 except FileTemplateValidationError as e:
208 failed.append(f"{e} (via key '{matchKey}')")
209 if logFailures:
210 log.critical("Template failure with key '%s': %s", matchKey, e)
212 if logFailures and unmatchedKeys:
213 log.warning("Unchecked keys: '%s'", ", ".join([str(k) for k in unmatchedKeys]))
215 if failed:
216 if len(failed) == 1:
217 msg = str(failed[0])
218 else:
219 failMsg = ";\n".join(failed)
220 msg = f"{len(failed)} template validation failures: {failMsg}"
221 raise FileTemplateValidationError(msg)
223 def getLookupKeys(self) -> set[LookupKey]:
224 """Retrieve the look up keys for all the template entries.
226 Returns
227 -------
228 keys : `set` of `LookupKey`
229 The keys available for matching a template.
230 """
231 return set(self.templates)
233 def getTemplateWithMatch(
234 self, entity: DatasetRef | DatasetType | StorageClass
235 ) -> tuple[LookupKey, FileTemplate]:
236 """Retrieve the `FileTemplate` associated with the dataset type.
238 Also retrieves the lookup key that was a match for this template.
240 If the lookup name corresponds to a component the base name for
241 the component will be examined if the full component name does
242 not match.
244 Parameters
245 ----------
246 entity : `DatasetType`, `DatasetRef`, or `StorageClass`
247 Instance to use to look for a corresponding template.
248 A `DatasetType` name or a `StorageClass` name will be used
249 depending on the supplied entity. Priority is given to a
250 `DatasetType` name. Supports instrument override if a
251 `DatasetRef` is provided configured with an ``instrument``
252 value for the data ID.
254 Returns
255 -------
256 matchKey : `LookupKey`
257 The key that resulted in the successful match.
258 template : `FileTemplate`
259 Template instance to use with that dataset type.
261 Raises
262 ------
263 KeyError
264 Raised if no template could be located for this Dataset type.
265 """
266 # Get the names to use for lookup
267 names = entity._lookupNames()
269 # Get a location from the templates
270 template = self.default
271 source = self.defaultKey
272 for name in names:
273 if name in self.templates:
274 template = self.templates[name]
275 source = name
276 break
278 if template is None:
279 raise KeyError(f"Unable to determine file template from supplied argument [{entity}]")
281 log.debug("Got file %s from %s via %s", template, entity, source)
283 return source, template
285 def getTemplate(self, entity: DatasetType | DatasetRef | StorageClass) -> FileTemplate:
286 """Retrieve the `FileTemplate` associated with the dataset type.
288 If the lookup name corresponds to a component the base name for
289 the component will be examined if the full component name does
290 not match.
292 Parameters
293 ----------
294 entity : `DatasetType`, `DatasetRef`, or `StorageClass`
295 Instance to use to look for a corresponding template.
296 A `DatasetType` name or a `StorageClass` name will be used
297 depending on the supplied entity. Priority is given to a
298 `DatasetType` name. Supports instrument override if a
299 `DatasetRef` is provided configured with an ``instrument``
300 value for the data ID.
302 Returns
303 -------
304 template : `FileTemplate`
305 Template instance to use with that dataset type.
307 Raises
308 ------
309 KeyError
310 Raised if no template could be located for this Dataset type.
311 """
312 _, template = self.getTemplateWithMatch(entity)
313 return template
316class FileTemplate:
317 """Format a path template into a fully expanded path.
319 Parameters
320 ----------
321 template : `str`
322 Template string.
324 Raises
325 ------
326 FileTemplateValidationError
327 Raised if the template fails basic validation.
329 Notes
330 -----
331 The templates use the standard Format Specification Mini-Language
332 with the caveat that only named fields can be used. The field names
333 are taken from the Dimensions along with several additional fields:
335 - datasetType: `str`, `DatasetType.name`
336 - component: `str`, name of the StorageClass component
337 - run: `str`, name of the run this dataset was added with
339 `run` must always be provided to ensure unique paths.
341 More detailed information can be requested from dimensions by using a dot
342 notation, so ``visit.name`` would use the name of the visit and
343 ``detector.name_in_raft`` would use the name of the detector within the
344 raft.
346 In some cases the template may want to support multiple options for a
347 single part of the template. For example, you may not want to include
348 ``group`` if ``exposure`` is in the data ID. To handle this situation a
349 ``|`` character can be used to specify multiple data Id keys in the
350 same format specifier. For example ``{exposure.obs_id|group}`` would
351 choose ``exposure.obs_id`` if ``exposure`` is in the data ID but otherwise
352 would use ``group``.
354 The mini-language is extended to understand a "?" in the format
355 specification. This indicates that a field is optional. If that
356 Dimension is missing the field, along with the text before the field,
357 unless it is a path separator, will be removed from the output path.
359 By default any "/" in a dataId value will be replaced by "_" to prevent
360 unexpected directories being created in the path. If the "/" should be
361 retained then a special "/" format specifier can be included in the
362 template.
363 """
365 mandatoryFields = {"run", "id"}
366 """A set of fields, one of which must be present in a template."""
368 datasetFields = {"datasetType", "component"}
369 """Fields related to the supplied dataset, not a dimension."""
371 specialFields = mandatoryFields | datasetFields
372 """Set of special fields that are available independently of the defined
373 Dimensions."""
375 _special_fs_chars = str.maketrans({c: "_" for c in ' <>:"\\|?*'})
376 """Characters that can cause trouble if they leak into file names are
377 replaced by '_'.
378 """
380 def __init__(self, template: str):
381 if not isinstance(template, str):
382 raise FileTemplateValidationError(
383 f"Template ('{template}') does not contain any format specifiers"
384 )
385 self.template = template
387 # Do basic validation without access to dimensions
388 self.validateTemplate(None)
390 def __eq__(self, other: Any) -> bool:
391 if not isinstance(other, FileTemplate):
392 return False
394 return self.template == other.template
396 def __str__(self) -> str:
397 return self.template
399 def __repr__(self) -> str:
400 return f'{self.__class__.__name__}("{self.template}")'
402 def grouped_fields(self, dimensions: DimensionGroup | None = None) -> tuple[FieldDict, FieldDict]:
403 """Return all the fields, grouped by their type.
405 Parameters
406 ----------
407 dimensions : `lsst.daf.butler.DimensionGroup` or `None`
408 If present, can be used to filter unknown or unused dimensions out
409 of the template when alternates are used. This allows a template to
410 have newer dimensions within it that are not known to an older
411 universe so long as an alternative is given that works with an
412 older universe. If none of the alternates are present in the
413 dimensions the first will be returned. The caller can determine how
414 to handle the situation.
416 Returns
417 -------
418 grouped : `FieldDict`
419 The fields grouped by their type. The keys for this dict are
420 ``standard``, ``special``, ``subfield``, and
421 ``parent``. If field ``a.b`` is present, ``a`` will not be
422 included in ``standard`` but will be included in ``parent``.
423 grouped_optional : `FieldDict`
424 As for ``grouped`` but the optional fields.
425 """
426 fmt = string.Formatter()
427 parts = fmt.parse(self.template)
429 grouped: FieldDict = {
430 "standard": set(),
431 "special": set(),
432 "subfield": set(),
433 "parent": set(),
434 }
435 grouped_optional: FieldDict = {
436 "standard": set(),
437 "special": set(),
438 "subfield": set(),
439 "parent": set(),
440 }
442 for _, field_names, format_spec, _ in parts:
443 if field_names is not None and format_spec is not None:
444 # Determine which fields are in the dimension universe.
445 given_fields = field_names.split("|")
446 validated_fields: list[str] = []
447 if dimensions is not None:
448 for field in given_fields:
449 if "." in field:
450 field_name, _ = field.split(".")
451 else:
452 field_name = field
453 if field_name in dimensions or field_name in self.specialFields:
454 # Found one that is in the relevant dimensions
455 # so stop searching.
456 validated_fields.append(field)
457 break
458 if not validated_fields:
459 # None of them were in the dimensions or we had no
460 # dimensions. Use all of them below and let the caller work
461 # it (some of these may be skypix).
462 validated_fields = given_fields
464 if "?" in format_spec:
465 target = grouped_optional
466 else:
467 target = grouped
469 for field_name in validated_fields: # Treat alternates as equals.
470 subfield = None
471 if field_name in self.specialFields:
472 field_set = target["special"]
473 elif "." in field_name:
474 # This needs to be added twice.
475 subfield = field_name
476 field_set = target["parent"]
477 field_name, _ = field_name.split(".")
478 target["subfield"].add(subfield)
479 else:
480 field_set = target["standard"]
482 field_set.add(field_name)
484 return grouped, grouped_optional
486 def fields(self, optionals: bool = False, specials: bool = False, subfields: bool = False) -> set[str]:
487 """Return the field names used in this template.
489 Parameters
490 ----------
491 optionals : `bool`
492 If `True`, optional fields are included in the returned set.
493 specials : `bool`
494 If `True`, non-dimension fields are included.
495 subfields : `bool`, optional
496 If `True`, fields with syntax ``a.b`` are included. If `False`,
497 the default, only ``a`` would be returned.
499 Returns
500 -------
501 names : `set`
502 Names of fields used in this template.
504 Notes
505 -----
506 The returned set will include the special values such as
507 ``datasetType`` and ``component``.
508 """
509 fmt = string.Formatter()
510 parts = fmt.parse(self.template)
512 names = set()
513 for _, field_names, format_spec, _ in parts:
514 if field_names is not None and format_spec is not None:
515 if not optionals and "?" in format_spec:
516 continue
517 for field_name in field_names.split("|"):
518 if not specials and field_name in self.specialFields:
519 continue
521 if not subfields and "." in field_name:
522 field_name, _ = field_name.split(".")
524 names.add(field_name)
526 return names
528 def format(self, ref: DatasetRef) -> str:
529 """Format a template string into a full path.
531 Parameters
532 ----------
533 ref : `DatasetRef`
534 The dataset to be formatted.
536 Returns
537 -------
538 path : `str`
539 Expanded path.
541 Raises
542 ------
543 KeyError
544 Raised if the requested field is not defined and the field is
545 not optional. Or, `component` is specified but "component" was
546 not part of the template.
547 RuntimeError
548 Raised if a template uses dimension record metadata but no
549 records are attached to the `DatasetRef`.
550 """
551 # Get the dimension values. Should all be non None.
552 # Will want to store a DatasetId in it later.
553 fields = cast(dict[str, int | str | DatasetId], dict(ref.dataId.mapping))
554 # Extra information that can be included using . syntax
555 extras: dict[str, DimensionRecord | None] = {}
556 skypix_alias: str | None = None
557 can_use_extra_records = False
558 if isinstance(ref.dataId, DataCoordinate):
559 if ref.dataId.hasRecords():
560 can_use_extra_records = True
561 skypix_alias = self._determine_skypix_alias(ref)
562 if skypix_alias is not None:
563 fields["skypix"] = fields[skypix_alias]
565 datasetType = ref.datasetType
566 fields["datasetType"], component = datasetType.nameAndComponent()
568 usedComponent = False
569 if component is not None:
570 fields["component"] = component
572 fields["run"] = ref.run
573 fields["id"] = ref.id
575 fmt = string.Formatter()
576 parts = fmt.parse(self.template)
577 output = ""
579 for literal, field_name, format_spec, conversion in parts:
580 if field_name and "|" in field_name:
581 alternates = field_name.split("|")
582 for alt in alternates:
583 if "." in alt:
584 primary, _ = alt.split(".")
585 else:
586 primary = alt
587 # If the alternate is known to this data ID then we use
588 # it and drop the lower priority fields.
589 if primary in fields:
590 field_name = alt
591 break
592 else:
593 # None of these were found in the field list. Select the
594 # first and let downstream code handle whether this
595 # is optional or not.
596 field_name = alternates[0]
598 if field_name == "component":
599 usedComponent = True
601 if format_spec is None:
602 output = output + literal
603 continue
605 # Should only happen if format_spec is None
606 if field_name is None:
607 raise RuntimeError(f"Unexpected blank field_name encountered in {self.template} [{literal}]")
609 if "?" in format_spec:
610 optional = True
611 # Remove the non-standard character from the spec
612 format_spec = format_spec.replace("?", "")
613 else:
614 optional = False
616 # Check for request for additional information from the dataId
617 if "." in field_name:
618 primary, secondary = field_name.split(".")
619 if can_use_extra_records and primary not in extras and primary in fields:
620 record_key = primary
621 if primary == "skypix" and skypix_alias is not None:
622 record_key = skypix_alias
623 extras[record_key] = ref.dataId.records[record_key]
624 if record_key != primary:
625 # Make sure that htm7 and skypix both work.
626 extras[primary] = extras[record_key]
628 if primary in extras:
629 record = extras[primary]
630 # Only fill in the fields if we have a value, the
631 # KeyError will trigger below if the attribute is missing,
632 # but only if it is not optional. This is most likely
633 # a typo in the metadata field and so should be reported
634 # even if optional.
635 if hasattr(record, secondary):
636 fields[field_name] = getattr(record, secondary)
637 else:
638 # Is a log message sufficient?
639 log.info(
640 "Template field %s could not be resolved because metadata field %s"
641 " is not understood for dimension %s. Template entry will be ignored",
642 field_name,
643 secondary,
644 primary,
645 )
646 elif primary in fields:
647 # We do have an entry for the primary but do not have any
648 # secondary entries. This is likely a problem with the
649 # code failing to attach a record to the DatasetRef.
650 raise RuntimeError(
651 f"No metadata records attached to dataset {ref}"
652 f" when attempting to expand field {field_name}."
653 " Either expand the DatasetRef or change the template."
654 )
656 if field_name in fields:
657 value = fields[field_name]
658 elif optional:
659 # If this is optional ignore the format spec
660 # and do not include the literal text prior to the optional
661 # field unless it contains a "/" path separator
662 format_spec = ""
663 value = ""
664 if "/" not in literal:
665 literal = ""
666 else:
667 raise KeyError(
668 f"'{field_name}' requested in template via '{self.template}' "
669 "but not defined and not optional"
670 )
672 # Handle "/" in values since we do not want to be surprised by
673 # unexpected directories turning up
674 replace_slash = True
675 if "/" in format_spec:
676 # Remove the non-standard character from the spec
677 format_spec = format_spec.replace("/", "")
678 replace_slash = False
680 if isinstance(value, str):
681 # Replace any special characters that can cause difficulties
682 # if they appear in filenames.
683 value = value.translate(self._special_fs_chars)
684 if replace_slash:
685 value = value.replace("/", "_")
687 # Apply conversion (e.g., integer to string)
688 if conversion:
689 value = fmt.convert_field(value, conversion)
691 # Now use standard formatting
692 output = output + literal + format(value, format_spec)
694 # Replace periods with underscores in the non-directory part to
695 # prevent file extension confusion. Also replace # in the non-dir
696 # part to avoid confusion with URI fragments
697 head, tail = os.path.split(output)
698 tail = tail.replace(".", "_")
699 tail = tail.replace("#", "HASH")
700 output = os.path.join(head, tail)
702 # Complain if we were meant to use a component
703 if component is not None and not usedComponent:
704 raise KeyError(f"Component '{component}' specified but template {self.template} did not use it")
706 # Since this is known to be a path, normalize it in case some double
707 # slashes have crept in
708 path = os.path.normpath(output)
710 # It should not be an absolute path (may happen with optionals)
711 if os.path.isabs(path):
712 path = os.path.relpath(path, start="/")
714 return path
716 def validateTemplate(self, entity: DatasetRef | DatasetType | StorageClass | None) -> None:
717 """Compare the template against supplied entity that wants to use it.
719 Parameters
720 ----------
721 entity : `DatasetType`, `DatasetRef`, or `StorageClass`
722 Entity to compare against template. If `None` is given only
723 very basic validation of templates will be performed.
725 Raises
726 ------
727 FileTemplateValidationError
728 Raised if the template is inconsistent with the supplied entity.
730 Notes
731 -----
732 Validation will always include a check that mandatory fields
733 are present and that at least one field refers to a dimension.
734 If the supplied entity includes a `DimensionGroup` then it will be
735 used to compare the available dimensions with those specified in the
736 template.
737 """
738 # A universe can be used to filter out alternates that are
739 # not known.
740 dimensions = getattr(entity, "dimensions", None)
741 grouped_fields, grouped_optionals = self.grouped_fields(dimensions)
743 # Check that the template has run
744 withSpecials = (
745 grouped_fields["standard"]
746 | grouped_fields["parent"]
747 | grouped_fields["special"]
748 | grouped_optionals["standard"]
749 | grouped_optionals["parent"]
750 | grouped_optionals["special"]
751 )
753 if "collection" in withSpecials:
754 raise FileTemplateValidationError(
755 "'collection' is no longer supported as a file template placeholder; use 'run' instead."
756 )
758 if not withSpecials & self.mandatoryFields:
759 raise FileTemplateValidationError(
760 f"Template '{self}' is missing a mandatory field from {self.mandatoryFields}"
761 )
763 # Check that there are some dimension fields in the template
764 # The id is allowed instead if present since that also uniquely
765 # identifies the file in the datastore.
766 allfields = (
767 grouped_fields["standard"]
768 | grouped_fields["parent"]
769 | grouped_optionals["standard"]
770 | grouped_optionals["parent"]
771 )
772 if not allfields and "id" not in withSpecials:
773 raise FileTemplateValidationError(
774 f"Template '{self}' does not seem to have any fields corresponding to dimensions."
775 )
777 # Do not allow ../ in the template to confuse where the file might
778 # end up.
779 if "../" in self.template:
780 raise FileTemplateValidationError("A file template should not include jump to parent directory.")
782 # Require that if "id" is in the template then it must exist in the
783 # file part -- this avoids templates like "{id}/fixed" where the file
784 # name is fixed but the directory has the ID.
785 if "id" in withSpecials:
786 file_part = os.path.split(self.template)[-1]
787 if "{id}" not in file_part:
788 raise FileTemplateValidationError(
789 f"Template '{self}' includes the 'id' but that ID is not part of the file name."
790 )
792 # If we do not have dimensions available then all we can do is shrug
793 if not hasattr(entity, "dimensions"):
794 return
796 # Mypy does not know about hasattr so help it out
797 if entity is None:
798 return
800 # if this entity represents a component then insist that component
801 # is present in the template. If the entity is not a component
802 # make sure that component is not mandatory.
803 try:
804 # mypy does not see the except block so complains about
805 # StorageClass not supporting isComponent
806 if entity.isComponent(): # type: ignore
807 if "component" not in withSpecials:
808 raise FileTemplateValidationError(
809 f"Template '{self}' has no component but {entity} refers to a component."
810 )
811 else:
812 mandatorySpecials = (
813 grouped_fields["standard"] | grouped_fields["parent"] | grouped_fields["special"]
814 )
815 if "component" in mandatorySpecials:
816 raise FileTemplateValidationError(
817 f"Template '{self}' has mandatory component but "
818 f"{entity} does not refer to a component."
819 )
820 except AttributeError:
821 pass
823 # From here on we need at least a DatasetType
824 # Mypy doesn't understand the AttributeError clause below
825 if isinstance(entity, StorageClass):
826 return
828 # Get the dimension links to get the full set of available field names
829 # Fall back to dataId keys if we have them but no links.
830 # dataId keys must still be present in the template
831 try:
832 minimal = set(entity.dimensions.required)
833 maximal = set(entity.dimensions.names)
834 except AttributeError:
835 try:
836 minimal = set(entity.dataId.keys().names) # type: ignore
837 maximal = minimal
838 except AttributeError:
839 return
841 required = grouped_fields["standard"] | grouped_fields["parent"]
843 # Replace specific skypix dimensions with generic one
844 skypix_alias = self._determine_skypix_alias(entity)
845 if skypix_alias is not None:
846 minimal.add("skypix")
847 maximal.add("skypix")
848 minimal.remove(skypix_alias)
849 maximal.remove(skypix_alias)
850 if skypix_alias in required:
851 required.remove(skypix_alias)
852 required.add("skypix")
853 if skypix_alias in allfields:
854 allfields.remove(skypix_alias)
855 allfields.add("skypix")
857 # Calculate any field usage that does not match a dimension
858 if not required.issubset(maximal):
859 raise FileTemplateValidationError(
860 f"Template '{self}' is inconsistent with {entity}: {required} is not a subset of {maximal}."
861 )
863 if not allfields.issuperset(minimal):
864 raise FileTemplateValidationError(
865 f"Template '{self}' is inconsistent with {entity}:"
866 f" {allfields} is not a superset of {minimal}."
867 )
869 return
871 def _determine_skypix_alias(self, entity: DatasetRef | DatasetType) -> str | None:
872 """Return the dimension name that refers to a sky pixel.
874 Parameters
875 ----------
876 entity : `DatasetRef` or `DatasetType`
877 The entity to examine.
879 Returns
880 -------
881 alias : `str`
882 If there is a sky pixelization in the supplied dataId, return
883 its name, else returns `None`. Will return `None` also if there
884 is more than one sky pix dimension in the data ID or if the
885 dataID is not a `DataCoordinate`
886 """
887 alias = None
889 if isinstance(entity, DatasetRef):
890 entity = entity.datasetType
892 # If there is exactly one SkyPixDimension in the data ID, alias its
893 # value with the key "skypix", so we can use that to match any
894 # skypix dimension.
895 # We restrict this behavior to the (real-world) case where the
896 # data ID is a DataCoordinate, not just a dict. That should only
897 # not be true in some test code, but that test code is a pain to
898 # update to be more like the real world while still providing our
899 # only tests of important behavior.
900 if len(entity.dimensions.skypix) == 1:
901 (alias,) = entity.dimensions.skypix
902 return alias