Coverage for python / lsst / daf / butler / datastore / file_templates.py: 11%

319 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:25 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Support for file template string expansion.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("FileTemplate", "FileTemplateValidationError", "FileTemplates", "FileTemplatesConfig") 

33 

34import logging 

35import os.path 

36import string 

37from collections.abc import Iterable, Mapping 

38from types import MappingProxyType 

39from typing import TYPE_CHECKING, Any, TypedDict, cast 

40 

41from .._config import Config 

42from .._config_support import LookupKey, processLookupConfigs 

43from .._dataset_ref import DatasetId, DatasetRef 

44from .._exceptions import ValidationError 

45from .._storage_class import StorageClass 

46from ..dimensions import DataCoordinate, DimensionGroup 

47 

48if TYPE_CHECKING: 

49 from .._dataset_type import DatasetType 

50 from ..dimensions import DimensionRecord, DimensionUniverse 

51 

52log = logging.getLogger(__name__) 

53 

54 

55class FileTemplateValidationError(ValidationError): 

56 """Exception for file template inconsistent with associated DatasetType.""" 

57 

58 pass 

59 

60 

61class FileTemplatesConfig(Config): 

62 """Configuration information for `FileTemplates`.""" 

63 

64 pass 

65 

66 

67class FieldDict(TypedDict): 

68 """Dictionary containing the grouped fields from a template.""" 

69 

70 standard: set[str] 

71 special: set[str] 

72 subfield: set[str] 

73 parent: set[str] 

74 

75 

76class FileTemplates: 

77 """Collection of `FileTemplate` templates. 

78 

79 Parameters 

80 ---------- 

81 config : `FileTemplatesConfig` or `str` 

82 Load configuration. 

83 default : `str`, optional 

84 If not `None`, a default template to use if no template has 

85 been specified explicitly in the configuration. 

86 universe : `DimensionUniverse` 

87 The set of all known dimensions, used to normalize any lookup keys 

88 involving dimensions. 

89 

90 Notes 

91 ----- 

92 The configuration can include one level of hierarchy where an 

93 instrument-specific section can be defined to override more general 

94 template specifications. This is represented in YAML using a 

95 key of form ``instrument<name>`` which can then define templates 

96 that will be returned if a `DatasetRef` contains a matching instrument 

97 name in the data ID. 

98 

99 A default fallback template can be specified using the key ``default``. 

100 Defaulting can be disabled in a child configuration by defining the 

101 value to be an empty string or a boolean `False`. 

102 

103 The config is parsed using the function 

104 `~lsst.daf.butler.configSubset.processLookupConfigs`. 

105 """ 

106 

107 defaultKey = LookupKey("default") 

108 """Configuration key associated with the default template.""" 

109 

110 def __init__( 

111 self, 

112 config: FileTemplatesConfig | str, 

113 default: str | None = None, 

114 *, 

115 universe: DimensionUniverse, 

116 ): 

117 self.config = FileTemplatesConfig(config) 

118 self._templates = {} 

119 

120 contents = processLookupConfigs(self.config, universe=universe) 

121 

122 # Determine default to use -- defaults can be disabled if 

123 # we get a False or None 

124 defaultValue = contents.get(self.defaultKey, default) 

125 if defaultValue and not isinstance(defaultValue, str): 

126 raise RuntimeError( 

127 f"Default template value should be str or False, or None. Got '{defaultValue}'" 

128 ) 

129 self.default = FileTemplate(defaultValue) if isinstance(defaultValue, str) and defaultValue else None 

130 

131 # Convert all the values to FileTemplate, handling defaults 

132 for key, templateStr in contents.items(): 

133 if key == self.defaultKey: 

134 continue 

135 if not isinstance(templateStr, str): 

136 raise RuntimeError(f"Unexpected value in file template key {key}: {templateStr}") 

137 self._templates[key] = FileTemplate(templateStr) 

138 

139 @property 

140 def templates(self) -> Mapping[LookupKey, FileTemplate]: 

141 """Return collection of templates indexed by lookup key (`dict`).""" 

142 return MappingProxyType(self._templates) 

143 

144 def __contains__(self, key: LookupKey) -> bool: 

145 """Indicate whether the supplied key is present in the templates. 

146 

147 Parameters 

148 ---------- 

149 key : `LookupKey` 

150 Key to use to determine if a corresponding value is present 

151 in the templates. 

152 

153 Returns 

154 ------- 

155 in : `bool` 

156 `True` if the supplied key is present in the templates. 

157 """ 

158 return key in self.templates 

159 

160 def __getitem__(self, key: LookupKey) -> FileTemplate: 

161 return self.templates[key] 

162 

163 def validateTemplates( 

164 self, entities: Iterable[DatasetType | DatasetRef | StorageClass], logFailures: bool = False 

165 ) -> None: 

166 """Validate the templates. 

167 

168 Retrieves the template associated with each dataset type and 

169 validates the dimensions against the template. 

170 

171 Parameters 

172 ---------- 

173 entities : `DatasetType`, `DatasetRef`, or `StorageClass` 

174 Entities to validate against the matching templates. Can be 

175 differing types. 

176 logFailures : `bool`, optional 

177 If `True`, output a log message for every validation error 

178 detected. 

179 

180 Raises 

181 ------ 

182 FileTemplateValidationError 

183 Raised if an entity failed validation. 

184 

185 Notes 

186 ----- 

187 See `FileTemplate.validateTemplate()` for details on the validation. 

188 """ 

189 unmatchedKeys = set(self.templates) 

190 failed = [] 

191 for entity in entities: 

192 try: 

193 matchKey, template = self.getTemplateWithMatch(entity) 

194 except KeyError as e: 

195 # KeyError always quotes on stringification so strip here 

196 errMsg = str(e).strip("\"'") 

197 failed.append(errMsg) 

198 if logFailures: 

199 log.critical("%s", errMsg) 

200 continue 

201 

202 if matchKey in unmatchedKeys: 

203 unmatchedKeys.remove(matchKey) 

204 

205 try: 

206 template.validateTemplate(entity) 

207 except FileTemplateValidationError as e: 

208 failed.append(f"{e} (via key '{matchKey}')") 

209 if logFailures: 

210 log.critical("Template failure with key '%s': %s", matchKey, e) 

211 

212 if logFailures and unmatchedKeys: 

213 log.warning("Unchecked keys: '%s'", ", ".join([str(k) for k in unmatchedKeys])) 

214 

215 if failed: 

216 if len(failed) == 1: 

217 msg = str(failed[0]) 

218 else: 

219 failMsg = ";\n".join(failed) 

220 msg = f"{len(failed)} template validation failures: {failMsg}" 

221 raise FileTemplateValidationError(msg) 

222 

223 def getLookupKeys(self) -> set[LookupKey]: 

224 """Retrieve the look up keys for all the template entries. 

225 

226 Returns 

227 ------- 

228 keys : `set` of `LookupKey` 

229 The keys available for matching a template. 

230 """ 

231 return set(self.templates) 

232 

233 def getTemplateWithMatch( 

234 self, entity: DatasetRef | DatasetType | StorageClass 

235 ) -> tuple[LookupKey, FileTemplate]: 

236 """Retrieve the `FileTemplate` associated with the dataset type. 

237 

238 Also retrieves the lookup key that was a match for this template. 

239 

240 If the lookup name corresponds to a component the base name for 

241 the component will be examined if the full component name does 

242 not match. 

243 

244 Parameters 

245 ---------- 

246 entity : `DatasetType`, `DatasetRef`, or `StorageClass` 

247 Instance to use to look for a corresponding template. 

248 A `DatasetType` name or a `StorageClass` name will be used 

249 depending on the supplied entity. Priority is given to a 

250 `DatasetType` name. Supports instrument override if a 

251 `DatasetRef` is provided configured with an ``instrument`` 

252 value for the data ID. 

253 

254 Returns 

255 ------- 

256 matchKey : `LookupKey` 

257 The key that resulted in the successful match. 

258 template : `FileTemplate` 

259 Template instance to use with that dataset type. 

260 

261 Raises 

262 ------ 

263 KeyError 

264 Raised if no template could be located for this Dataset type. 

265 """ 

266 # Get the names to use for lookup 

267 names = entity._lookupNames() 

268 

269 # Get a location from the templates 

270 template = self.default 

271 source = self.defaultKey 

272 for name in names: 

273 if name in self.templates: 

274 template = self.templates[name] 

275 source = name 

276 break 

277 

278 if template is None: 

279 raise KeyError(f"Unable to determine file template from supplied argument [{entity}]") 

280 

281 log.debug("Got file %s from %s via %s", template, entity, source) 

282 

283 return source, template 

284 

285 def getTemplate(self, entity: DatasetType | DatasetRef | StorageClass) -> FileTemplate: 

286 """Retrieve the `FileTemplate` associated with the dataset type. 

287 

288 If the lookup name corresponds to a component the base name for 

289 the component will be examined if the full component name does 

290 not match. 

291 

292 Parameters 

293 ---------- 

294 entity : `DatasetType`, `DatasetRef`, or `StorageClass` 

295 Instance to use to look for a corresponding template. 

296 A `DatasetType` name or a `StorageClass` name will be used 

297 depending on the supplied entity. Priority is given to a 

298 `DatasetType` name. Supports instrument override if a 

299 `DatasetRef` is provided configured with an ``instrument`` 

300 value for the data ID. 

301 

302 Returns 

303 ------- 

304 template : `FileTemplate` 

305 Template instance to use with that dataset type. 

306 

307 Raises 

308 ------ 

309 KeyError 

310 Raised if no template could be located for this Dataset type. 

311 """ 

312 _, template = self.getTemplateWithMatch(entity) 

313 return template 

314 

315 

316class FileTemplate: 

317 """Format a path template into a fully expanded path. 

318 

319 Parameters 

320 ---------- 

321 template : `str` 

322 Template string. 

323 

324 Raises 

325 ------ 

326 FileTemplateValidationError 

327 Raised if the template fails basic validation. 

328 

329 Notes 

330 ----- 

331 The templates use the standard Format Specification Mini-Language 

332 with the caveat that only named fields can be used. The field names 

333 are taken from the Dimensions along with several additional fields: 

334 

335 - datasetType: `str`, `DatasetType.name` 

336 - component: `str`, name of the StorageClass component 

337 - run: `str`, name of the run this dataset was added with 

338 

339 `run` must always be provided to ensure unique paths. 

340 

341 More detailed information can be requested from dimensions by using a dot 

342 notation, so ``visit.name`` would use the name of the visit and 

343 ``detector.name_in_raft`` would use the name of the detector within the 

344 raft. 

345 

346 In some cases the template may want to support multiple options for a 

347 single part of the template. For example, you may not want to include 

348 ``group`` if ``exposure`` is in the data ID. To handle this situation a 

349 ``|`` character can be used to specify multiple data Id keys in the 

350 same format specifier. For example ``{exposure.obs_id|group}`` would 

351 choose ``exposure.obs_id`` if ``exposure`` is in the data ID but otherwise 

352 would use ``group``. 

353 

354 The mini-language is extended to understand a "?" in the format 

355 specification. This indicates that a field is optional. If that 

356 Dimension is missing the field, along with the text before the field, 

357 unless it is a path separator, will be removed from the output path. 

358 

359 By default any "/" in a dataId value will be replaced by "_" to prevent 

360 unexpected directories being created in the path. If the "/" should be 

361 retained then a special "/" format specifier can be included in the 

362 template. 

363 """ 

364 

365 mandatoryFields = {"run", "id"} 

366 """A set of fields, one of which must be present in a template.""" 

367 

368 datasetFields = {"datasetType", "component"} 

369 """Fields related to the supplied dataset, not a dimension.""" 

370 

371 specialFields = mandatoryFields | datasetFields 

372 """Set of special fields that are available independently of the defined 

373 Dimensions.""" 

374 

375 _special_fs_chars = str.maketrans({c: "_" for c in ' <>:"\\|?*'}) 

376 """Characters that can cause trouble if they leak into file names are 

377 replaced by '_'. 

378 """ 

379 

380 def __init__(self, template: str): 

381 if not isinstance(template, str): 

382 raise FileTemplateValidationError( 

383 f"Template ('{template}') does not contain any format specifiers" 

384 ) 

385 self.template = template 

386 

387 # Do basic validation without access to dimensions 

388 self.validateTemplate(None) 

389 

390 def __eq__(self, other: Any) -> bool: 

391 if not isinstance(other, FileTemplate): 

392 return False 

393 

394 return self.template == other.template 

395 

396 def __str__(self) -> str: 

397 return self.template 

398 

399 def __repr__(self) -> str: 

400 return f'{self.__class__.__name__}("{self.template}")' 

401 

402 def grouped_fields(self, dimensions: DimensionGroup | None = None) -> tuple[FieldDict, FieldDict]: 

403 """Return all the fields, grouped by their type. 

404 

405 Parameters 

406 ---------- 

407 dimensions : `lsst.daf.butler.DimensionGroup` or `None` 

408 If present, can be used to filter unknown or unused dimensions out 

409 of the template when alternates are used. This allows a template to 

410 have newer dimensions within it that are not known to an older 

411 universe so long as an alternative is given that works with an 

412 older universe. If none of the alternates are present in the 

413 dimensions the first will be returned. The caller can determine how 

414 to handle the situation. 

415 

416 Returns 

417 ------- 

418 grouped : `FieldDict` 

419 The fields grouped by their type. The keys for this dict are 

420 ``standard``, ``special``, ``subfield``, and 

421 ``parent``. If field ``a.b`` is present, ``a`` will not be 

422 included in ``standard`` but will be included in ``parent``. 

423 grouped_optional : `FieldDict` 

424 As for ``grouped`` but the optional fields. 

425 """ 

426 fmt = string.Formatter() 

427 parts = fmt.parse(self.template) 

428 

429 grouped: FieldDict = { 

430 "standard": set(), 

431 "special": set(), 

432 "subfield": set(), 

433 "parent": set(), 

434 } 

435 grouped_optional: FieldDict = { 

436 "standard": set(), 

437 "special": set(), 

438 "subfield": set(), 

439 "parent": set(), 

440 } 

441 

442 for _, field_names, format_spec, _ in parts: 

443 if field_names is not None and format_spec is not None: 

444 # Determine which fields are in the dimension universe. 

445 given_fields = field_names.split("|") 

446 validated_fields: list[str] = [] 

447 if dimensions is not None: 

448 for field in given_fields: 

449 if "." in field: 

450 field_name, _ = field.split(".") 

451 else: 

452 field_name = field 

453 if field_name in dimensions or field_name in self.specialFields: 

454 # Found one that is in the relevant dimensions 

455 # so stop searching. 

456 validated_fields.append(field) 

457 break 

458 if not validated_fields: 

459 # None of them were in the dimensions or we had no 

460 # dimensions. Use all of them below and let the caller work 

461 # it (some of these may be skypix). 

462 validated_fields = given_fields 

463 

464 if "?" in format_spec: 

465 target = grouped_optional 

466 else: 

467 target = grouped 

468 

469 for field_name in validated_fields: # Treat alternates as equals. 

470 subfield = None 

471 if field_name in self.specialFields: 

472 field_set = target["special"] 

473 elif "." in field_name: 

474 # This needs to be added twice. 

475 subfield = field_name 

476 field_set = target["parent"] 

477 field_name, _ = field_name.split(".") 

478 target["subfield"].add(subfield) 

479 else: 

480 field_set = target["standard"] 

481 

482 field_set.add(field_name) 

483 

484 return grouped, grouped_optional 

485 

486 def fields(self, optionals: bool = False, specials: bool = False, subfields: bool = False) -> set[str]: 

487 """Return the field names used in this template. 

488 

489 Parameters 

490 ---------- 

491 optionals : `bool` 

492 If `True`, optional fields are included in the returned set. 

493 specials : `bool` 

494 If `True`, non-dimension fields are included. 

495 subfields : `bool`, optional 

496 If `True`, fields with syntax ``a.b`` are included. If `False`, 

497 the default, only ``a`` would be returned. 

498 

499 Returns 

500 ------- 

501 names : `set` 

502 Names of fields used in this template. 

503 

504 Notes 

505 ----- 

506 The returned set will include the special values such as 

507 ``datasetType`` and ``component``. 

508 """ 

509 fmt = string.Formatter() 

510 parts = fmt.parse(self.template) 

511 

512 names = set() 

513 for _, field_names, format_spec, _ in parts: 

514 if field_names is not None and format_spec is not None: 

515 if not optionals and "?" in format_spec: 

516 continue 

517 for field_name in field_names.split("|"): 

518 if not specials and field_name in self.specialFields: 

519 continue 

520 

521 if not subfields and "." in field_name: 

522 field_name, _ = field_name.split(".") 

523 

524 names.add(field_name) 

525 

526 return names 

527 

528 def format(self, ref: DatasetRef) -> str: 

529 """Format a template string into a full path. 

530 

531 Parameters 

532 ---------- 

533 ref : `DatasetRef` 

534 The dataset to be formatted. 

535 

536 Returns 

537 ------- 

538 path : `str` 

539 Expanded path. 

540 

541 Raises 

542 ------ 

543 KeyError 

544 Raised if the requested field is not defined and the field is 

545 not optional. Or, `component` is specified but "component" was 

546 not part of the template. 

547 RuntimeError 

548 Raised if a template uses dimension record metadata but no 

549 records are attached to the `DatasetRef`. 

550 """ 

551 # Get the dimension values. Should all be non None. 

552 # Will want to store a DatasetId in it later. 

553 fields = cast(dict[str, int | str | DatasetId], dict(ref.dataId.mapping)) 

554 # Extra information that can be included using . syntax 

555 extras: dict[str, DimensionRecord | None] = {} 

556 skypix_alias: str | None = None 

557 can_use_extra_records = False 

558 if isinstance(ref.dataId, DataCoordinate): 

559 if ref.dataId.hasRecords(): 

560 can_use_extra_records = True 

561 skypix_alias = self._determine_skypix_alias(ref) 

562 if skypix_alias is not None: 

563 fields["skypix"] = fields[skypix_alias] 

564 

565 datasetType = ref.datasetType 

566 fields["datasetType"], component = datasetType.nameAndComponent() 

567 

568 usedComponent = False 

569 if component is not None: 

570 fields["component"] = component 

571 

572 fields["run"] = ref.run 

573 fields["id"] = ref.id 

574 

575 fmt = string.Formatter() 

576 parts = fmt.parse(self.template) 

577 output = "" 

578 

579 for literal, field_name, format_spec, conversion in parts: 

580 if field_name and "|" in field_name: 

581 alternates = field_name.split("|") 

582 for alt in alternates: 

583 if "." in alt: 

584 primary, _ = alt.split(".") 

585 else: 

586 primary = alt 

587 # If the alternate is known to this data ID then we use 

588 # it and drop the lower priority fields. 

589 if primary in fields: 

590 field_name = alt 

591 break 

592 else: 

593 # None of these were found in the field list. Select the 

594 # first and let downstream code handle whether this 

595 # is optional or not. 

596 field_name = alternates[0] 

597 

598 if field_name == "component": 

599 usedComponent = True 

600 

601 if format_spec is None: 

602 output = output + literal 

603 continue 

604 

605 # Should only happen if format_spec is None 

606 if field_name is None: 

607 raise RuntimeError(f"Unexpected blank field_name encountered in {self.template} [{literal}]") 

608 

609 if "?" in format_spec: 

610 optional = True 

611 # Remove the non-standard character from the spec 

612 format_spec = format_spec.replace("?", "") 

613 else: 

614 optional = False 

615 

616 # Check for request for additional information from the dataId 

617 if "." in field_name: 

618 primary, secondary = field_name.split(".") 

619 if can_use_extra_records and primary not in extras and primary in fields: 

620 record_key = primary 

621 if primary == "skypix" and skypix_alias is not None: 

622 record_key = skypix_alias 

623 extras[record_key] = ref.dataId.records[record_key] 

624 if record_key != primary: 

625 # Make sure that htm7 and skypix both work. 

626 extras[primary] = extras[record_key] 

627 

628 if primary in extras: 

629 record = extras[primary] 

630 # Only fill in the fields if we have a value, the 

631 # KeyError will trigger below if the attribute is missing, 

632 # but only if it is not optional. This is most likely 

633 # a typo in the metadata field and so should be reported 

634 # even if optional. 

635 if hasattr(record, secondary): 

636 fields[field_name] = getattr(record, secondary) 

637 else: 

638 # Is a log message sufficient? 

639 log.info( 

640 "Template field %s could not be resolved because metadata field %s" 

641 " is not understood for dimension %s. Template entry will be ignored", 

642 field_name, 

643 secondary, 

644 primary, 

645 ) 

646 elif primary in fields: 

647 # We do have an entry for the primary but do not have any 

648 # secondary entries. This is likely a problem with the 

649 # code failing to attach a record to the DatasetRef. 

650 raise RuntimeError( 

651 f"No metadata records attached to dataset {ref}" 

652 f" when attempting to expand field {field_name}." 

653 " Either expand the DatasetRef or change the template." 

654 ) 

655 

656 if field_name in fields: 

657 value = fields[field_name] 

658 elif optional: 

659 # If this is optional ignore the format spec 

660 # and do not include the literal text prior to the optional 

661 # field unless it contains a "/" path separator 

662 format_spec = "" 

663 value = "" 

664 if "/" not in literal: 

665 literal = "" 

666 else: 

667 raise KeyError( 

668 f"'{field_name}' requested in template via '{self.template}' " 

669 "but not defined and not optional" 

670 ) 

671 

672 # Handle "/" in values since we do not want to be surprised by 

673 # unexpected directories turning up 

674 replace_slash = True 

675 if "/" in format_spec: 

676 # Remove the non-standard character from the spec 

677 format_spec = format_spec.replace("/", "") 

678 replace_slash = False 

679 

680 if isinstance(value, str): 

681 # Replace any special characters that can cause difficulties 

682 # if they appear in filenames. 

683 value = value.translate(self._special_fs_chars) 

684 if replace_slash: 

685 value = value.replace("/", "_") 

686 

687 # Apply conversion (e.g., integer to string) 

688 if conversion: 

689 value = fmt.convert_field(value, conversion) 

690 

691 # Now use standard formatting 

692 output = output + literal + format(value, format_spec) 

693 

694 # Replace periods with underscores in the non-directory part to 

695 # prevent file extension confusion. Also replace # in the non-dir 

696 # part to avoid confusion with URI fragments 

697 head, tail = os.path.split(output) 

698 tail = tail.replace(".", "_") 

699 tail = tail.replace("#", "HASH") 

700 output = os.path.join(head, tail) 

701 

702 # Complain if we were meant to use a component 

703 if component is not None and not usedComponent: 

704 raise KeyError(f"Component '{component}' specified but template {self.template} did not use it") 

705 

706 # Since this is known to be a path, normalize it in case some double 

707 # slashes have crept in 

708 path = os.path.normpath(output) 

709 

710 # It should not be an absolute path (may happen with optionals) 

711 if os.path.isabs(path): 

712 path = os.path.relpath(path, start="/") 

713 

714 return path 

715 

716 def validateTemplate(self, entity: DatasetRef | DatasetType | StorageClass | None) -> None: 

717 """Compare the template against supplied entity that wants to use it. 

718 

719 Parameters 

720 ---------- 

721 entity : `DatasetType`, `DatasetRef`, or `StorageClass` 

722 Entity to compare against template. If `None` is given only 

723 very basic validation of templates will be performed. 

724 

725 Raises 

726 ------ 

727 FileTemplateValidationError 

728 Raised if the template is inconsistent with the supplied entity. 

729 

730 Notes 

731 ----- 

732 Validation will always include a check that mandatory fields 

733 are present and that at least one field refers to a dimension. 

734 If the supplied entity includes a `DimensionGroup` then it will be 

735 used to compare the available dimensions with those specified in the 

736 template. 

737 """ 

738 # A universe can be used to filter out alternates that are 

739 # not known. 

740 dimensions = getattr(entity, "dimensions", None) 

741 grouped_fields, grouped_optionals = self.grouped_fields(dimensions) 

742 

743 # Check that the template has run 

744 withSpecials = ( 

745 grouped_fields["standard"] 

746 | grouped_fields["parent"] 

747 | grouped_fields["special"] 

748 | grouped_optionals["standard"] 

749 | grouped_optionals["parent"] 

750 | grouped_optionals["special"] 

751 ) 

752 

753 if "collection" in withSpecials: 

754 raise FileTemplateValidationError( 

755 "'collection' is no longer supported as a file template placeholder; use 'run' instead." 

756 ) 

757 

758 if not withSpecials & self.mandatoryFields: 

759 raise FileTemplateValidationError( 

760 f"Template '{self}' is missing a mandatory field from {self.mandatoryFields}" 

761 ) 

762 

763 # Check that there are some dimension fields in the template 

764 # The id is allowed instead if present since that also uniquely 

765 # identifies the file in the datastore. 

766 allfields = ( 

767 grouped_fields["standard"] 

768 | grouped_fields["parent"] 

769 | grouped_optionals["standard"] 

770 | grouped_optionals["parent"] 

771 ) 

772 if not allfields and "id" not in withSpecials: 

773 raise FileTemplateValidationError( 

774 f"Template '{self}' does not seem to have any fields corresponding to dimensions." 

775 ) 

776 

777 # Do not allow ../ in the template to confuse where the file might 

778 # end up. 

779 if "../" in self.template: 

780 raise FileTemplateValidationError("A file template should not include jump to parent directory.") 

781 

782 # Require that if "id" is in the template then it must exist in the 

783 # file part -- this avoids templates like "{id}/fixed" where the file 

784 # name is fixed but the directory has the ID. 

785 if "id" in withSpecials: 

786 file_part = os.path.split(self.template)[-1] 

787 if "{id}" not in file_part: 

788 raise FileTemplateValidationError( 

789 f"Template '{self}' includes the 'id' but that ID is not part of the file name." 

790 ) 

791 

792 # If we do not have dimensions available then all we can do is shrug 

793 if not hasattr(entity, "dimensions"): 

794 return 

795 

796 # Mypy does not know about hasattr so help it out 

797 if entity is None: 

798 return 

799 

800 # if this entity represents a component then insist that component 

801 # is present in the template. If the entity is not a component 

802 # make sure that component is not mandatory. 

803 try: 

804 # mypy does not see the except block so complains about 

805 # StorageClass not supporting isComponent 

806 if entity.isComponent(): # type: ignore 

807 if "component" not in withSpecials: 

808 raise FileTemplateValidationError( 

809 f"Template '{self}' has no component but {entity} refers to a component." 

810 ) 

811 else: 

812 mandatorySpecials = ( 

813 grouped_fields["standard"] | grouped_fields["parent"] | grouped_fields["special"] 

814 ) 

815 if "component" in mandatorySpecials: 

816 raise FileTemplateValidationError( 

817 f"Template '{self}' has mandatory component but " 

818 f"{entity} does not refer to a component." 

819 ) 

820 except AttributeError: 

821 pass 

822 

823 # From here on we need at least a DatasetType 

824 # Mypy doesn't understand the AttributeError clause below 

825 if isinstance(entity, StorageClass): 

826 return 

827 

828 # Get the dimension links to get the full set of available field names 

829 # Fall back to dataId keys if we have them but no links. 

830 # dataId keys must still be present in the template 

831 try: 

832 minimal = set(entity.dimensions.required) 

833 maximal = set(entity.dimensions.names) 

834 except AttributeError: 

835 try: 

836 minimal = set(entity.dataId.keys().names) # type: ignore 

837 maximal = minimal 

838 except AttributeError: 

839 return 

840 

841 required = grouped_fields["standard"] | grouped_fields["parent"] 

842 

843 # Replace specific skypix dimensions with generic one 

844 skypix_alias = self._determine_skypix_alias(entity) 

845 if skypix_alias is not None: 

846 minimal.add("skypix") 

847 maximal.add("skypix") 

848 minimal.remove(skypix_alias) 

849 maximal.remove(skypix_alias) 

850 if skypix_alias in required: 

851 required.remove(skypix_alias) 

852 required.add("skypix") 

853 if skypix_alias in allfields: 

854 allfields.remove(skypix_alias) 

855 allfields.add("skypix") 

856 

857 # Calculate any field usage that does not match a dimension 

858 if not required.issubset(maximal): 

859 raise FileTemplateValidationError( 

860 f"Template '{self}' is inconsistent with {entity}: {required} is not a subset of {maximal}." 

861 ) 

862 

863 if not allfields.issuperset(minimal): 

864 raise FileTemplateValidationError( 

865 f"Template '{self}' is inconsistent with {entity}:" 

866 f" {allfields} is not a superset of {minimal}." 

867 ) 

868 

869 return 

870 

871 def _determine_skypix_alias(self, entity: DatasetRef | DatasetType) -> str | None: 

872 """Return the dimension name that refers to a sky pixel. 

873 

874 Parameters 

875 ---------- 

876 entity : `DatasetRef` or `DatasetType` 

877 The entity to examine. 

878 

879 Returns 

880 ------- 

881 alias : `str` 

882 If there is a sky pixelization in the supplied dataId, return 

883 its name, else returns `None`. Will return `None` also if there 

884 is more than one sky pix dimension in the data ID or if the 

885 dataID is not a `DataCoordinate` 

886 """ 

887 alias = None 

888 

889 if isinstance(entity, DatasetRef): 

890 entity = entity.datasetType 

891 

892 # If there is exactly one SkyPixDimension in the data ID, alias its 

893 # value with the key "skypix", so we can use that to match any 

894 # skypix dimension. 

895 # We restrict this behavior to the (real-world) case where the 

896 # data ID is a DataCoordinate, not just a dict. That should only 

897 # not be true in some test code, but that test code is a pain to 

898 # update to be more like the real world while still providing our 

899 # only tests of important behavior. 

900 if len(entity.dimensions.skypix) == 1: 

901 (alias,) = entity.dimensions.skypix 

902 return alias