Coverage for python/lsst/obs/base/ingest.py: 14%

518 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 01:27 -0700

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "RawExposureData", 

26 "RawFileData", 

27 "RawFileDatasetInfo", 

28 "RawIngestConfig", 

29 "RawIngestTask", 

30 "makeTransferChoiceField", 

31) 

32 

33import concurrent.futures 

34import contextlib 

35import json 

36import logging 

37import re 

38import warnings 

39import zipfile 

40from collections import defaultdict 

41from collections.abc import Callable, Iterable, Iterator, MutableMapping, Sequence, Sized 

42from contextlib import contextmanager 

43from dataclasses import InitVar, dataclass 

44from typing import Any, ClassVar, cast 

45 

46from astro_metadata_translator import MetadataTranslator, ObservationInfo, merge_headers 

47from astro_metadata_translator.indexing import process_index_data, process_sidecar_data 

48from pydantic import BaseModel 

49 

50from lsst.afw.fits import readMetadata 

51from lsst.daf.butler import ( 

52 Butler, 

53 CollectionType, 

54 DataCoordinate, 

55 DatasetIdGenEnum, 

56 DatasetRef, 

57 DatasetType, 

58 DimensionRecord, 

59 DimensionUniverse, 

60 FileDataset, 

61 Formatter, 

62 FormatterV2, 

63 Progress, 

64 Timespan, 

65) 

66from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex 

67from lsst.pex.config import ChoiceField, Config, Field 

68from lsst.pipe.base import Instrument, Task 

69from lsst.resources import ResourcePath, ResourcePathExpression 

70from lsst.utils.logging import LsstLoggers 

71from lsst.utils.timer import time_this, timeMethod 

72 

73from ._instrument import makeExposureRecordFromObsInfo 

74 

75 

76def _do_nothing(*args: Any, **kwargs: Any) -> None: 

77 """Do nothing. 

78 

79 This is a function that accepts anything and does nothing. 

80 For use as a default in callback arguments. 

81 """ 

82 pass 

83 

84 

85def _log_msg_counter(noun: int | Sized) -> tuple[int, str]: 

86 """Count the iterable and return the count and plural modifier. 

87 

88 Parameters 

89 ---------- 

90 noun : `Sized` or `int` 

91 Thing to count. If given an integer it is assumed to be the count 

92 to use to calculate modifier. 

93 

94 Returns 

95 ------- 

96 num : `int` 

97 Number of items found in ``noun``. 

98 modifier : `str` 

99 Character to add to the end of a string referring to these items 

100 to indicate whether it was a single item or not. Returns empty 

101 string if there is one item or "s" otherwise. 

102 

103 Examples 

104 -------- 

105 .. code-block:: python 

106 

107 log.warning("Found %d file%s", *_log_msg_counter(nfiles)) 

108 """ 

109 if isinstance(noun, int): 

110 num = noun 

111 else: 

112 num = len(noun) 

113 return num, "" if num == 1 else "s" 

114 

115 

116class IngestMetrics(BaseModel): 

117 """Metrics collected during raw ingest.""" 

118 

119 time_for_metadata: float = 0.0 

120 """Wall-clock time, in seconds, spent gathering file metadata.""" 

121 

122 time_for_records: float = 0.0 

123 """Wall-clock time, in seconds, spent writing dimension records.""" 

124 

125 time_for_ingest: float = 0.0 

126 """Wall-clock time, in seconds, spent calling butler ingest.""" 

127 

128 time_for_callbacks: float = 0.0 

129 """Wall-clock time, in seconds, processing user-supplied callbacks.""" 

130 

131 def reset(self) -> None: 

132 """Reset all metrics to initial values.""" 

133 self.time_for_ingest = 0.0 

134 self.time_for_records = 0.0 

135 self.time_for_metadata = 0.0 

136 self.time_for_callbacks = 0.0 

137 

138 @contextmanager 

139 def collect_metric( 

140 self, 

141 property: str, 

142 log: LsstLoggers | None = None, 

143 msg: str | None = None, 

144 args: tuple[Any, ...] = (), 

145 ) -> Iterator[None]: 

146 with time_this(log=log, msg=msg, args=args, level=logging.INFO) as timer: 

147 yield 

148 setattr(self, property, getattr(self, property) + timer.duration) 

149 

150 

151@dataclass 

152class RawFileDatasetInfo: 

153 """Information about a single dataset within a raw file.""" 

154 

155 dataId: DataCoordinate 

156 """Data ID for this file (`lsst.daf.butler.DataCoordinate`).""" 

157 

158 obsInfo: ObservationInfo 

159 """Standardized observation metadata extracted directly from the file 

160 headers (`astro_metadata_translator.ObservationInfo`). 

161 """ 

162 

163 

164@dataclass 

165class RawFileData: 

166 """Information about a single raw file, used during ingest.""" 

167 

168 datasets: list[RawFileDatasetInfo] 

169 """The information describing each dataset within this raw file. 

170 (`list` of `RawFileDatasetInfo`) 

171 """ 

172 

173 filename: ResourcePath 

174 """URI of the file this information was extracted from (`str`). 

175 

176 This is the path prior to ingest, not the path after ingest. 

177 """ 

178 

179 FormatterClass: type[Formatter] 

180 """Formatter class that should be used to ingest this file (`type`; as 

181 subclass of `~lsst.daf.butler.Formatter`). 

182 """ 

183 

184 instrument: Instrument | None 

185 """The `Instrument` instance associated with this file. Can be `None` 

186 if ``datasets`` is an empty list.""" 

187 

188 

189@dataclass 

190class RawExposureData: 

191 """Information about a complete raw exposure, used during ingest.""" 

192 

193 dataId: DataCoordinate 

194 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`). 

195 """ 

196 

197 files: list[RawFileData] 

198 """List of structures containing file-level information. 

199 """ 

200 

201 universe: InitVar[DimensionUniverse] 

202 """Set of all known dimensions. 

203 """ 

204 

205 record: DimensionRecord 

206 """The exposure `~lsst.daf.butler.DimensionRecord` that must be inserted 

207 into the `~lsst.daf.butler.Registry` prior to file-level ingest 

208 (`~lsst.daf.butler.DimensionRecord`). 

209 """ 

210 

211 dependencyRecords: dict[str, DimensionRecord] 

212 """Additional records that must be inserted into the 

213 `~lsst.daf.butler.Registry` prior to ingesting the exposure ``record`` 

214 (e.g., to satisfy foreign key constraints), indexed by the dimension name. 

215 """ 

216 

217 

218def makeTransferChoiceField( 

219 doc: str = "How to transfer files (None for no transfer).", default: str = "auto" 

220) -> ChoiceField: 

221 """Create a Config field with options for transferring data between repos. 

222 

223 The allowed options for the field are exactly those supported by 

224 `lsst.daf.butler.Datastore.ingest`. 

225 

226 Parameters 

227 ---------- 

228 doc : `str` 

229 Documentation for the configuration field. 

230 default : `str`, optional 

231 Default transfer mode for the field. 

232 

233 Returns 

234 ------- 

235 field : `lsst.pex.config.ChoiceField` 

236 Configuration field. 

237 """ 

238 return ChoiceField( 

239 doc=doc, 

240 dtype=str, 

241 allowed={ 

242 "move": "move", 

243 "copy": "copy", 

244 "auto": "choice will depend on datastore", 

245 "direct": "use URI to ingested file directly in datastore", 

246 "link": "hard link falling back to symbolic link", 

247 "hardlink": "hard link", 

248 "symlink": "symbolic (soft) link", 

249 "relsymlink": "relative symbolic link", 

250 }, 

251 optional=True, 

252 default=default, 

253 ) 

254 

255 

256class RawIngestConfig(Config): 

257 """Configuration class for RawIngestTask.""" 

258 

259 transfer = makeTransferChoiceField() 

260 failFast: Field[bool] = Field( 

261 dtype=bool, 

262 default=False, 

263 doc="If True, stop ingest as soon as any problem is encountered with any file. " 

264 "Otherwise problem files will be skipped and logged and a report issued at completion.", 

265 ) 

266 

267 

268class RawIngestTask(Task): 

269 """Driver Task for ingesting raw data into Gen3 Butler repositories. 

270 

271 Parameters 

272 ---------- 

273 config : `RawIngestConfig` 

274 Configuration for the task. 

275 butler : `~lsst.daf.butler.Butler` 

276 Writeable butler instance, with ``butler.run`` set to the appropriate 

277 `~lsst.daf.butler.CollectionType.RUN` collection for these raw 

278 datasets. 

279 on_success : `collections.abc.Callable`, optional 

280 A callback invoked when all of the raws associated with an exposure 

281 are ingested. Will be passed a list of `~lsst.daf.butler.FileDataset` 

282 objects, each containing one or more resolved 

283 `~lsst.daf.butler.DatasetRef` objects. If this callback raises it will 

284 interrupt the entire ingest process, even if `RawIngestConfig.failFast` 

285 is `False`. 

286 on_metadata_failure : `collections.abc.Callable`, optional 

287 A callback invoked when a failure occurs trying to translate the 

288 metadata for a file. Will be passed the URI and the exception, in 

289 that order, as positional arguments. Guaranteed to be called in an 

290 ``except`` block, allowing the callback to re-raise or replace (with 

291 ``raise ... from``) to override the task's usual error handling (before 

292 `RawIngestConfig.failFast` logic occurs). This callback can be called 

293 from within a worker thread if multiple workers have been requested. 

294 Ensure that any code within the call back is thread-safe. 

295 on_ingest_failure : `collections.abc.Callable`, optional 

296 A callback invoked when dimension record or dataset insertion into the 

297 database fails for an exposure. Will be passed a `RawExposureData` 

298 instance and the exception, in that order, as positional arguments. 

299 Guaranteed to be called in an ``except`` block, allowing the callback 

300 to re-raise or replace (with ``raise ... from``) to override the task's 

301 usual error handling (before `RawIngestConfig.failFast` logic occurs). 

302 on_exposure_record : `collections.abc.Callable`, optional 

303 A callback invoked when an exposure dimension record has been created 

304 or modified. Will not be called if the record already existed. Will 

305 be called with the exposure record. 

306 **kwargs 

307 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task` 

308 constructor. 

309 

310 Notes 

311 ----- 

312 Each instance of `RawIngestTask` writes to the same Butler. Each 

313 invocation of `RawIngestTask.run` ingests a list of files. 

314 """ 

315 

316 ConfigClass: ClassVar[type[Config]] = RawIngestConfig 

317 

318 _DefaultName: ClassVar[str] = "ingest" 

319 

320 def getDatasetType(self) -> DatasetType: 

321 """Return the default DatasetType of the datasets ingested by this 

322 Task. 

323 

324 Returns 

325 ------- 

326 datasetType : `lsst.daf.butler.DatasetType` 

327 The default dataset type to use for the data being ingested. This 

328 is only used if the relevant `~lsst.pipe.base.Instrument` does not 

329 define an override. 

330 """ 

331 return DatasetType( 

332 "raw", 

333 ("instrument", "detector", "exposure"), 

334 "Exposure", 

335 universe=self.butler.dimensions, 

336 ) 

337 

338 # Mypy can not determine that the config passed to super() is this type. 

339 config: RawIngestConfig 

340 

341 def __init__( 

342 self, 

343 config: RawIngestConfig | None = None, 

344 *, 

345 butler: Butler, 

346 on_success: Callable[[list[FileDataset]], Any] = _do_nothing, 

347 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing, 

348 on_ingest_failure: Callable[[RawExposureData, Exception], Any] = _do_nothing, 

349 on_exposure_record: Callable[[DimensionRecord], Any] = _do_nothing, 

350 **kwargs: Any, 

351 ): 

352 if config is None: 

353 config = RawIngestConfig() 

354 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here. 

355 super().__init__(config, **kwargs) 

356 self.butler = butler 

357 self.universe = self.butler.dimensions 

358 self.datasetType = self.getDatasetType() 

359 self._on_success = on_success 

360 self._on_exposure_record = on_exposure_record 

361 self._on_metadata_failure = on_metadata_failure 

362 self._on_ingest_failure = on_ingest_failure 

363 self.progress = Progress("obs.base.RawIngestTask") 

364 

365 # Import all the instrument classes so that we ensure that we 

366 # have all the relevant metadata translators loaded. 

367 self.instruments = Instrument.importAll(self.butler.registry) 

368 

369 # Read all the instrument records into a cache since they will be 

370 # needed later to calculate day_obs timespans, if appropriate. 

371 self._instrument_records = { 

372 rec.name: rec for rec in butler.registry.queryDimensionRecords("instrument") 

373 } 

374 # Create empty metrics. 

375 self.metrics = IngestMetrics() 

376 

377 def _reduce_kwargs(self) -> dict[str, Any]: 

378 # Add extra parameters to pickle. 

379 return dict( 

380 **super()._reduce_kwargs(), 

381 butler=self.butler, 

382 on_success=self._on_success, 

383 on_metadata_failure=self._on_metadata_failure, 

384 on_ingest_failure=self._on_ingest_failure, 

385 on_exposure_record=self._on_exposure_record, 

386 ) 

387 

388 def _determine_instrument_formatter( 

389 self, dataId: DataCoordinate, filename: ResourcePath 

390 ) -> tuple[Instrument | None, type[Formatter | FormatterV2]]: 

391 """Determine the instrument and formatter class. 

392 

393 Parameters 

394 ---------- 

395 dataId : `lsst.daf.butler.DataCoordinate` 

396 The dataId associated with this dataset. 

397 filename : `lsst.resources.ResourcePath` 

398 URI of file used for error reporting. 

399 

400 Returns 

401 ------- 

402 instrument : `Instrument` or `None` 

403 Instance of the `Instrument` associated with this dataset. `None` 

404 indicates that the instrument could not be determined. 

405 formatterClass : `type` 

406 Class to be used as the formatter for this dataset. 

407 

408 Notes 

409 ----- 

410 Does not access butler registry since it may be called from threads. 

411 """ 

412 # The data model currently assumes that whilst multiple datasets 

413 # can be associated with a single file, they must all share the 

414 # same formatter. 

415 FormatterClass: type[Formatter | FormatterV2] = Formatter 

416 try: 

417 instrument_name = cast(str, dataId["instrument"]) 

418 instrument = self.instruments[instrument_name]() 

419 except LookupError as e: 

420 self._on_metadata_failure(filename, e) 

421 self.log.warning( 

422 "Instrument %s for file %s not known to registry", dataId["instrument"], filename 

423 ) 

424 if self.config.failFast: 

425 raise RuntimeError( 

426 f"Instrument {dataId['instrument']} for file {filename} not known to registry" 

427 ) from e 

428 # Indicate that we could not work out the instrument. 

429 instrument = None 

430 else: 

431 assert instrument is not None, "Should be guaranted by fromName succeeding." 

432 FormatterClass = instrument.getRawFormatter(dataId) 

433 return instrument, FormatterClass 

434 

435 def get_raw_datasetType( 

436 self, instrument: Instrument, cache: dict[str, DatasetType] | None = None 

437 ) -> DatasetType: 

438 """Get the raw dataset type associated with this ingest. 

439 

440 Parameters 

441 ---------- 

442 instrument : `Instrument` 

443 Class that might specify an override of the default raw dataset 

444 type. If no override is specified the task default will be used. 

445 cache : `dict` [`str`, `lsst.daf.butler.DatasetType`] \ 

446 or `None`, optional 

447 An optional cache that can be used to return a pre-existing 

448 dataset type. Is not updated. 

449 

450 Returns 

451 ------- 

452 lsst.daf.butler.DatasetType 

453 The dataset type to use for raw ingest of this instrument. 

454 """ 

455 if cache is None: 

456 cache = {} 

457 if raw_definition := getattr(instrument, "raw_definition", None): 

458 datasetTypeName, dimensions, storageClass = raw_definition 

459 if not (datasetType := cache.get(datasetTypeName)): 

460 datasetType = DatasetType( 

461 datasetTypeName, dimensions, storageClass, universe=self.butler.dimensions 

462 ) 

463 else: 

464 datasetType = self.datasetType 

465 return datasetType 

466 

467 def extractMetadata(self, filename: ResourcePath) -> RawFileData: 

468 """Extract and process metadata from a single raw file. 

469 

470 Parameters 

471 ---------- 

472 filename : `lsst.resources.ResourcePath` 

473 URI to the file. 

474 

475 Returns 

476 ------- 

477 data : `RawFileData` 

478 A structure containing the metadata extracted from the file, 

479 as well as the original filename. All fields will be populated, 

480 but the `RawFileDatasetInfo.dataId` attribute will be a minimal 

481 (unexpanded) `~lsst.daf.butler.DataCoordinate` instance. The 

482 ``instrument`` field will be `None` if there is a problem 

483 with metadata extraction. 

484 

485 Notes 

486 ----- 

487 Assumes that there is a single dataset associated with the given 

488 file. Instruments using a single file to store multiple datasets 

489 must implement their own version of this method. 

490 

491 By default the method will catch all exceptions unless the 

492 `RawIngestConfig.failFast` configuration item is `True`. If an error 

493 is encountered the supplied ``on_metadata_failure()`` 

494 method will be called. If no exceptions result and an error was 

495 encountered the returned object will have a null-instrument class and 

496 no datasets. 

497 

498 This method supports sidecar JSON files which can be used to 

499 extract metadata without having to read the data file itself. 

500 The sidecar file is always used if found. 

501 """ 

502 formatterClass: type[Formatter | FormatterV2] 

503 sidecar_fail_msg = "" # Requires prepended space when set. 

504 try: 

505 sidecar_file = filename.updatedExtension(".json") 

506 headers = [] 

507 with contextlib.suppress(Exception): 

508 # Try to read directly, bypassing existence check. 

509 content = json.loads(sidecar_file.read()) 

510 headers = [process_sidecar_data(content)] 

511 sidecar_fail_msg = " (via sidecar)" 

512 if not headers: 

513 # Read the metadata from the data file itself. 

514 

515 # For remote files download the entire file to get the 

516 # header. This is very inefficient and it would be better 

517 # to have some way of knowing where in the file the headers 

518 # are and to only download those parts of the file. 

519 with filename.as_local() as local_file: 

520 # Read the primary. This might be sufficient. 

521 header = readMetadata(local_file.ospath, 0) 

522 translator_class = None 

523 

524 try: 

525 # Try to work out a translator class early. 

526 translator_class = MetadataTranslator.determine_translator( 

527 header, filename=str(filename) 

528 ) 

529 except ValueError: 

530 # Primary header was not sufficient (maybe this file 

531 # has been compressed or is a MEF with minimal 

532 # primary). Read second header and merge with primary. 

533 header = merge_headers([header, readMetadata(local_file.ospath, 1)], mode="overwrite") 

534 

535 # Try again to work out a translator class, letting this 

536 # fail. 

537 if translator_class is None: 

538 translator_class = MetadataTranslator.determine_translator( 

539 header, filename=str(filename) 

540 ) 

541 

542 # Request the headers to use for ingest 

543 headers = list(translator_class.determine_translatable_headers(local_file.ospath, header)) 

544 

545 # Add each header to the dataset list 

546 datasets = [self._calculate_dataset_info(h, filename) for h in headers] 

547 

548 except Exception as e: 

549 self.log.debug("Problem extracting metadata from %s%s: %s", filename, sidecar_fail_msg, e) 

550 # Indicate to the caller that we failed to read. 

551 datasets = [] 

552 formatterClass = Formatter 

553 instrument = None 

554 self._on_metadata_failure(filename, e) 

555 if self.config.failFast: 

556 raise RuntimeError( 

557 f"Problem extracting metadata for file {filename}{sidecar_fail_msg}" 

558 ) from e 

559 else: 

560 self.log.debug("Extracted metadata for file %s%s", filename, sidecar_fail_msg) 

561 # The data model currently assumes that whilst multiple datasets 

562 # can be associated with a single file, they must all share the 

563 # same formatter. 

564 instrument, formatterClass = self._determine_instrument_formatter(datasets[0].dataId, filename) 

565 if instrument is None: 

566 datasets = [] 

567 

568 return RawFileData( 

569 datasets=datasets, 

570 filename=filename, 

571 # MyPy wants this to be a non-abstract class, which is not true 

572 # for the error case where instrument is None and datasets=[]. 

573 FormatterClass=formatterClass, # type: ignore 

574 instrument=instrument, 

575 ) 

576 

577 @classmethod 

578 def getObservationInfoSubsets(cls) -> tuple[set, set]: 

579 """Return subsets of fields in the 

580 `~astro_metadata_translator.ObservationInfo` that we care about. 

581 

582 These fields will be used in constructing an exposure record. 

583 

584 Returns 

585 ------- 

586 required : `set` 

587 Set of `~astro_metadata_translator.ObservationInfo` field names 

588 that are required. 

589 optional : `set` 

590 Set of `~astro_metadata_translator.ObservationInfo` field names 

591 we will use if they are available. 

592 """ 

593 # Marking the new properties "group_counter_*" and 

594 # "has_simulated_content" as required, assumes that we either 

595 # recreate any existing index/sidecar files that include translated 

596 # values, or else allow astro_metadata_translator to fill in 

597 # defaults. 

598 required = { 

599 "datetime_begin", 

600 "datetime_end", 

601 "detector_num", 

602 "exposure_group", 

603 "exposure_id", 

604 "exposure_time_requested", 

605 "group_counter_end", 

606 "group_counter_start", 

607 "has_simulated_content", 

608 "instrument", 

609 "observation_id", 

610 "observation_type", 

611 "observing_day", 

612 "physical_filter", 

613 } 

614 optional = { 

615 "altaz_begin", 

616 "boresight_rotation_coord", 

617 "boresight_rotation_angle", 

618 "dark_time", 

619 "tracking_radec", 

620 "object", 

621 "observation_counter", 

622 "observation_reason", 

623 "observing_day_offset", 

624 "science_program", 

625 "visit_id", 

626 "can_see_sky", 

627 } 

628 return required, optional 

629 

630 def _calculate_dataset_info( 

631 self, header: MutableMapping[str, Any] | ObservationInfo, filename: ResourcePath 

632 ) -> RawFileDatasetInfo: 

633 """Calculate a RawFileDatasetInfo from the supplied information. 

634 

635 Parameters 

636 ---------- 

637 header : Mapping or `astro_metadata_translator.ObservationInfo` 

638 Header from the dataset or previously-translated content. 

639 filename : `lsst.resources.ResourcePath` 

640 Filename to use for error messages. 

641 

642 Returns 

643 ------- 

644 dataset : `RawFileDatasetInfo` 

645 The dataId, and observation information associated with this 

646 dataset. 

647 """ 

648 required, optional = self.getObservationInfoSubsets() 

649 if isinstance(header, ObservationInfo): 

650 obsInfo = header 

651 missing = [] 

652 # Need to check the required properties are present. 

653 for property in required: 

654 # getattr does not need to be protected because it is using 

655 # the defined list above containing properties that must exist. 

656 value = getattr(obsInfo, property) 

657 if value is None: 

658 missing.append(property) 

659 if missing: 

660 raise ValueError( 

661 f"Requested required properties are missing from file {filename}: {missing} (via JSON)" 

662 ) 

663 

664 else: 

665 obsInfo = ObservationInfo( 

666 header, 

667 pedantic=False, 

668 filename=str(filename), 

669 required=required, 

670 subset=required | optional, 

671 ) 

672 

673 dataId = DataCoordinate.standardize( 

674 instrument=obsInfo.instrument, 

675 exposure=obsInfo.exposure_id, 

676 detector=obsInfo.detector_num, 

677 universe=self.universe, 

678 ) 

679 return RawFileDatasetInfo(obsInfo=obsInfo, dataId=dataId) 

680 

681 def readZipIndexFiles( 

682 self, files: Iterable[ResourcePath] 

683 ) -> tuple[list[RawFileData], list[ResourcePath], set[ResourcePath], set[ResourcePath]]: 

684 """Given a list of files, filter out zip files and look for index files 

685 inside. 

686 

687 Parameters 

688 ---------- 

689 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

690 URIs to the files to be ingested. 

691 

692 Returns 

693 ------- 

694 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ] 

695 Merged contents of all relevant index files found in zip files. 

696 The keys include the path to the data file within the zip using 

697 the butler fragment convention of ``zip-path=PATH``. 

698 updated_files : `list` [ `lsst.resources.ResourcePath` ] 

699 Updated list of the input files with zip entries removed. 

700 good_index_files: `set` [ `lsst.resources.ResourcePath` ] 

701 Zip files that contained index information. 

702 bad_zip_files: `set` [ `lsst.resources.ResourcePath` ] 

703 Zip files that contained no index information. 

704 """ 

705 zip_metadata_index = "_metadata_index.json" 

706 

707 # Files that weren't zip files. 

708 updated_files: list[ResourcePath] = [] 

709 

710 # Index files we failed to read. 

711 bad_index_files: set[ResourcePath] = set() 

712 

713 # Any good index files that were found and used. 

714 good_index_files: set[ResourcePath] = set() 

715 

716 # Processed content from any zip files. 

717 indexFileData: list[RawFileData] = [] 

718 

719 for file in files: 

720 if file.getExtension() != ".zip": 

721 updated_files.append(file) 

722 continue 

723 

724 zip_info: dict[str, zipfile.ZipInfo] = {} 

725 try: 

726 with file.open("rb") as fd, zipfile.ZipFile(fd) as zf: 

727 zip_info = {info.filename: info for info in zf.infolist()} 

728 content = json.loads(zf.read(zip_metadata_index)) 

729 index = process_index_data(content, force_dict=True) 

730 assert isinstance(index, MutableMapping) 

731 

732 # Try to read the ZipIndex. 

733 zip_index = ZipIndex.from_open_zip(zf) 

734 except Exception as e: 

735 if self.config.failFast: 

736 raise RuntimeError(f"Problem reading index file from zip file at {file}") from e 

737 bad_index_files.add(file) 

738 continue 

739 self.log.debug("Extracted index metadata from zip file %s", str(file)) 

740 good_index_files.add(file) 

741 

742 # All the metadata read from this index file with keys of full 

743 # path. 

744 index_entries: dict[ResourcePath, Any] = {} 

745 

746 # In theory we could scan for JSON sidecar files associated with 

747 # any files not found in the metadata index, but that is not meant 

748 # to be possible. Guider data is another issue not handled by 

749 # this code. 

750 for path_in_zip in index: 

751 if path_in_zip not in zip_info: 

752 # Index entry exists but no file for it. 

753 self.log.info( 

754 "File %s is in zip index but not in zip file %s. Ignoring.", path_in_zip, file 

755 ) 

756 continue 

757 file_to_ingest = file.replace(fragment=f"zip-path={path_in_zip}") 

758 index_entries[file_to_ingest] = index[path_in_zip] 

759 

760 file_data = self.processIndexEntries(index_entries) 

761 

762 # Validate that the index entries we have read match the 

763 # values in the butler zip index. 

764 data_ids_from_index: dict[str, tuple[DataCoordinate, ...]] = {} 

765 for f in file_data: 

766 _, path_in_zip = f.filename.fragment.split("=") 

767 data_ids_from_index[path_in_zip] = tuple(d.dataId for d in f.datasets) 

768 

769 data_ids_from_butler_index: dict[str, tuple[DataCoordinate, ...]] = {} 

770 # Refs indexed by UUID. 

771 refs = zip_index.refs.to_refs(universe=self.universe) 

772 id_to_ref = {ref.id: ref for ref in refs} 

773 for path_in_zip, index_info in zip_index.artifact_map.items(): 

774 data_ids_from_butler_index[path_in_zip] = tuple( 

775 id_to_ref[id_].dataId for id_ in index_info.ids 

776 ) 

777 

778 if data_ids_from_butler_index != data_ids_from_index: 

779 self.log.warning( 

780 "Recalculating the Data IDs for zip file %s (which may include new metadata corrections) " 

781 "results in a difference to the Data IDs recorded in the butler index in that zip. " 

782 "Consider remaking the zipped raws.", 

783 file, 

784 ) 

785 

786 indexFileData.extend(file_data) 

787 

788 return indexFileData, updated_files, good_index_files, bad_index_files 

789 

790 def locateAndReadIndexFiles( 

791 self, files: Iterable[ResourcePath] 

792 ) -> tuple[dict[ResourcePath, Any], list[ResourcePath], set[ResourcePath], set[ResourcePath]]: 

793 """Given a list of files, look for index files and read them. 

794 

795 Index files can either be explicitly in the list of files to 

796 ingest, or else located in the same directory as a file to ingest. 

797 Index entries are always used if present. 

798 

799 Parameters 

800 ---------- 

801 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

802 URIs to the files to be ingested. 

803 

804 Returns 

805 ------- 

806 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ] 

807 Merged contents of all relevant index files found. These can 

808 be explicitly specified index files or ones found in the 

809 directory alongside a data file to be ingested. 

810 updated_files : `list` [ `lsst.resources.ResourcePath` ] 

811 Updated list of the input files with entries removed that were 

812 found listed in an index file. Order is not guaranteed to 

813 match the order of the files given to this routine. 

814 good_index_files: `set` [ `lsst.resources.ResourcePath` ] 

815 Index files that were successfully read. 

816 bad_index_files: `set` [ `lsst.resources.ResourcePath` ] 

817 Files that looked like index files but failed to read properly. 

818 """ 

819 # Convert the paths to absolute for easy comparison with index content. 

820 # Do not convert to real paths since we have to assume that index 

821 # files are in this location and not the location which it links to. 

822 files = tuple(f.abspath() for f in files) 

823 

824 # Index files must be named this. 

825 index_root_file = "_index.json" 

826 

827 # Group the files by directory. 

828 files_by_directory: dict[ResourcePath, set[str]] = defaultdict(set) 

829 

830 for path in files: 

831 directory, file_in_dir = path.split() 

832 files_by_directory[directory].add(file_in_dir) 

833 

834 # All the metadata read from index files with keys of full path. 

835 index_entries: dict[ResourcePath, Any] = {} 

836 

837 # Index files we failed to read. 

838 bad_index_files: set[ResourcePath] = set() 

839 

840 # Any good index files that were found and used. 

841 good_index_files: set[ResourcePath] = set() 

842 

843 # Look for index files in those directories. 

844 for directory, files_in_directory in files_by_directory.items(): 

845 possible_index_file = directory.join(index_root_file) 

846 if possible_index_file.exists(): 

847 # If we are explicitly requesting an index file the 

848 # messages should be different. 

849 index_msg = "inferred" 

850 is_implied = True 

851 if index_root_file in files_in_directory: 

852 index_msg = "explicit" 

853 is_implied = False 

854 

855 # Try to read the index file and catch and report any 

856 # problems. 

857 try: 

858 content = json.loads(possible_index_file.read()) 

859 index = process_index_data(content, force_dict=True) 

860 # mypy should in theory know that this is a mapping 

861 # from the overload type annotation of process_index_data. 

862 assert isinstance(index, MutableMapping) 

863 except Exception as e: 

864 # Only trigger the callback if the index file 

865 # was asked for explicitly. Triggering on implied file 

866 # might be surprising. 

867 if not is_implied: 

868 self._on_metadata_failure(possible_index_file, e) 

869 if self.config.failFast: 

870 raise RuntimeError( 

871 f"Problem reading index file from {index_msg} location {possible_index_file}" 

872 ) from e 

873 bad_index_files.add(possible_index_file) 

874 continue 

875 

876 self.log.debug("Extracted index metadata from %s file %s", index_msg, possible_index_file) 

877 good_index_files.add(possible_index_file) 

878 

879 # Go through the index adding entries for files. 

880 # If we have non-index files in this directory marked for 

881 # ingest we should only get index information for those. 

882 # If the index file was explicit we use all entries. 

883 if is_implied: 

884 files_to_ingest = files_in_directory 

885 else: 

886 files_to_ingest = set(index) 

887 

888 # Copy relevant metadata into a single dict for all index 

889 # entries. 

890 for file_in_dir in files_to_ingest: 

891 # Skip an explicitly specified index file. 

892 # This should never happen because an explicit index 

893 # file will force ingest of all files in the index 

894 # and not use the explicit file list. If somehow 

895 # this is not true we continue. Raising an exception 

896 # seems like the wrong thing to do since this is harmless. 

897 if file_in_dir == index_root_file: 

898 self.log.info( 

899 "Logic error found scanning directory %s. Please file ticket.", directory 

900 ) 

901 continue 

902 if file_in_dir in index: 

903 file = directory.join(file_in_dir) 

904 if file in index_entries: 

905 # ObservationInfo overrides raw metadata 

906 if isinstance(index[file_in_dir], ObservationInfo) and not isinstance( 

907 index_entries[file], ObservationInfo 

908 ): 

909 self.log.warning( 

910 "File %s already specified in an index file but overriding" 

911 " with ObservationInfo content from %s", 

912 file, 

913 possible_index_file, 

914 ) 

915 else: 

916 self.log.warning( 

917 "File %s already specified in an index file, ignoring content from %s", 

918 file, 

919 possible_index_file, 

920 ) 

921 # Do nothing in this case 

922 continue 

923 

924 index_entries[file] = index[file_in_dir] 

925 

926 # Remove files from list that have index entries and also 

927 # any files that we determined to be explicit index files 

928 # or any index files that we failed to read. 

929 filtered = set(files) - set(index_entries) - good_index_files - bad_index_files 

930 

931 # The filtered list loses the initial order. Retaining the order 

932 # is good for testing but does have a cost if there are many 

933 # files when copying the good values out. A dict would have faster 

934 # lookups (using the files as keys) but use more memory. 

935 ordered: list[ResourcePath] = [] 

936 seen: set[ResourcePath] = set() 

937 for f in files: 

938 if f in filtered and f not in seen: 

939 ordered.append(f) 

940 seen.add(f) 

941 

942 return index_entries, ordered, good_index_files, bad_index_files 

943 

944 def processIndexEntries(self, index_entries: dict[ResourcePath, Any]) -> list[RawFileData]: 

945 """Convert index entries to RawFileData. 

946 

947 Parameters 

948 ---------- 

949 index_entries : `dict` [`lsst.resources.ResourcePath`, `typing.Any`] 

950 Dict indexed by name of file to ingest and with keys either 

951 raw metadata or translated 

952 `~astro_metadata_translator.ObservationInfo`. 

953 

954 Returns 

955 ------- 

956 data : `list` [ `RawFileData` ] 

957 Structures containing the metadata extracted from the file, 

958 as well as the original filename. All fields will be populated, 

959 but the `RawFileDatasetInfo.dataId` attributes will be minimal 

960 (unexpanded) `~lsst.daf.butler.DataCoordinate` instances. 

961 """ 

962 formatterClass: type[Formatter | FormatterV2] 

963 fileData = [] 

964 for filename, metadata in index_entries.items(): 

965 try: 

966 datasets = [self._calculate_dataset_info(metadata, filename)] 

967 except Exception as e: 

968 self.log.debug("Problem extracting metadata for file %s found in index file: %s", filename, e) 

969 datasets = [] 

970 formatterClass = Formatter 

971 instrument = None 

972 self._on_metadata_failure(filename, e) 

973 if self.config.failFast: 

974 raise RuntimeError( 

975 f"Problem extracting metadata for file {filename} found in index file" 

976 ) from e 

977 else: 

978 instrument, formatterClass = self._determine_instrument_formatter( 

979 datasets[0].dataId, filename 

980 ) 

981 if instrument is None: 

982 datasets = [] 

983 fileData.append( 

984 RawFileData( 

985 datasets=datasets, 

986 filename=filename, 

987 # MyPy wants this to be a non-abstract class, which is not 

988 # true for the error case where instrument is None and 

989 # datasets=[]. 

990 FormatterClass=formatterClass, # type: ignore 

991 instrument=instrument, 

992 ) 

993 ) 

994 return fileData 

995 

996 def groupByExposure(self, files: Iterable[RawFileData]) -> list[RawExposureData]: 

997 """Group an iterable of `RawFileData` by exposure. 

998 

999 Parameters 

1000 ---------- 

1001 files : iterable of `RawFileData` 

1002 File-level information to group. 

1003 

1004 Returns 

1005 ------- 

1006 exposures : `list` of `RawExposureData` 

1007 A list of structures that group the file-level information by 

1008 exposure. All fields will be populated. The 

1009 `RawExposureData.dataId` attributes will be minimal (unexpanded) 

1010 `~lsst.daf.butler.DataCoordinate` instances. 

1011 """ 

1012 exposureDimensions = self.universe["exposure"].minimal_group 

1013 byExposure = defaultdict(list) 

1014 for f in files: 

1015 # Assume that the first dataset is representative for the file. 

1016 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f) 

1017 

1018 return [ 

1019 RawExposureData( 

1020 dataId=dataId, 

1021 files=exposureFiles, 

1022 universe=self.universe, 

1023 record=self.makeExposureRecord(exposureFiles[0].datasets[0].obsInfo, self.universe), 

1024 dependencyRecords=self.makeDependencyRecords( 

1025 exposureFiles[0].datasets[0].obsInfo, self.universe 

1026 ), 

1027 ) 

1028 for dataId, exposureFiles in byExposure.items() 

1029 ] 

1030 

1031 def makeExposureRecord( 

1032 self, obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any 

1033 ) -> DimensionRecord: 

1034 """Construct a registry record for an exposure. 

1035 

1036 This is a method that subclasses will often want to customize. This can 

1037 often be done by calling this base class implementation with additional 

1038 ``kwargs``. 

1039 

1040 Parameters 

1041 ---------- 

1042 obsInfo : `~astro_metadata_translator.ObservationInfo` 

1043 Observation details for (one of the components of) the exposure. 

1044 universe : `lsst.daf.butler.DimensionUniverse` 

1045 Set of all known dimensions. 

1046 **kwargs 

1047 Additional field values for this record. 

1048 

1049 Returns 

1050 ------- 

1051 record : `lsst.daf.butler.DimensionRecord` 

1052 The exposure record that must be inserted into the 

1053 `~lsst.daf.butler.Registry` prior to file-level ingest. 

1054 """ 

1055 return makeExposureRecordFromObsInfo(obsInfo, universe, **kwargs) 

1056 

1057 def makeDependencyRecords( 

1058 self, obsInfo: ObservationInfo, universe: DimensionUniverse 

1059 ) -> dict[str, DimensionRecord]: 

1060 """Construct dependency records. 

1061 

1062 These dependency records will be inserted into the 

1063 `~lsst.daf.butler.Registry` before the exposure records, because they 

1064 are dependencies of the exposure. This allows an opportunity to satisfy 

1065 foreign key constraints that exist because of dimensions related to the 

1066 exposure. 

1067 

1068 This is a method that subclasses may want to customize, if they've 

1069 added dimensions that relate to an exposure. 

1070 

1071 Parameters 

1072 ---------- 

1073 obsInfo : `~astro_metadata_translator.ObservationInfo` 

1074 Observation details for (one of the components of) the exposure. 

1075 universe : `lsst.daf.butler.DimensionUniverse` 

1076 Set of all known dimensions. 

1077 

1078 Returns 

1079 ------- 

1080 records : `dict` [`str`, `lsst.daf.butler.DimensionRecord`] 

1081 The records to insert, indexed by dimension name. 

1082 """ 

1083 records: dict[str, DimensionRecord] = {} 

1084 if "exposure" not in universe: 

1085 return records 

1086 exposure = universe["exposure"] 

1087 if "group" in exposure.implied: 

1088 records["group"] = universe["group"].RecordClass( 

1089 name=obsInfo.exposure_group, 

1090 instrument=obsInfo.instrument, 

1091 ) 

1092 if "day_obs" in exposure.implied: 

1093 if (offset := getattr(obsInfo, "observing_day_offset")) is not None: 

1094 offset_int = round(offset.to_value("s")) 

1095 assert obsInfo.observing_day is not None 

1096 timespan = Timespan.from_day_obs(obsInfo.observing_day, offset_int) 

1097 else: 

1098 timespan = None 

1099 records["day_obs"] = universe["day_obs"].RecordClass( 

1100 instrument=obsInfo.instrument, 

1101 id=obsInfo.observing_day, 

1102 timespan=timespan, 

1103 ) 

1104 return records 

1105 

1106 def expandDataIds(self, data: RawExposureData) -> RawExposureData: 

1107 """Expand the data IDs associated with a raw exposure. 

1108 

1109 This adds the metadata records. 

1110 

1111 Parameters 

1112 ---------- 

1113 data : `RawExposureData` 

1114 A structure containing information about the exposure to be 

1115 ingested. Must have `RawExposureData.record` populated. Should 

1116 be considered consumed upon return. 

1117 

1118 Returns 

1119 ------- 

1120 exposure : `RawExposureData` 

1121 An updated version of the input structure, with 

1122 `RawExposureData.dataId` and nested `RawFileDatasetInfo.dataId` 

1123 attributes updated to data IDs for which 

1124 `~lsst.daf.butler.DataCoordinate.hasRecords` returns `True`. 

1125 """ 

1126 # We start by expanded the exposure-level data ID; we won't use that 

1127 # directly in file ingest, but this lets us do some database lookups 

1128 # once per exposure instead of once per file later. 

1129 data.dataId = self.butler.registry.expandDataId( 

1130 data.dataId, 

1131 # We pass in the records we'll be inserting shortly so they aren't 

1132 # looked up from the database. We do expect instrument and filter 

1133 # records to be retrieved from the database here (though the 

1134 # Registry may cache them so there isn't a lookup every time). 

1135 records={"exposure": data.record, **data.dependencyRecords}, 

1136 ) 

1137 # Now we expand the per-file (exposure+detector) data IDs. This time 

1138 # we pass in the records we just retrieved from the exposure data ID 

1139 # expansion. 

1140 for file in data.files: 

1141 for dataset in file.datasets: 

1142 dataset.dataId = self.butler.registry.expandDataId( 

1143 dataset.dataId, 

1144 records={k: data.dataId.records[k] for k in data.dataId.dimensions.elements}, 

1145 ) 

1146 return data 

1147 

1148 def prep( 

1149 self, 

1150 files: Iterable[ResourcePath], 

1151 *, 

1152 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1153 search_indexes: bool = True, 

1154 ) -> tuple[Iterator[RawExposureData], list[ResourcePath]]: 

1155 """Perform all non-database-updating ingest preprocessing steps. 

1156 

1157 Parameters 

1158 ---------- 

1159 files : iterable over `str` or path-like objects 

1160 Paths to the files to be ingested. Will be made absolute 

1161 if they are not already. 

1162 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1163 If not `None`, a thread pool with which to parallelize some 

1164 operations. 

1165 search_indexes : `bool`, optional 

1166 If `True` the code will search for index JSON files in given 

1167 directories. If you know for a fact that index files do not exist 

1168 set this to `False` for a slight speed up in metadata gathering. 

1169 

1170 Returns 

1171 ------- 

1172 exposures : `~collections.abc.Iterator` [ `RawExposureData` ] 

1173 Data structures containing dimension records, filenames, and data 

1174 IDs to be ingested (one structure for each exposure). 

1175 bad_files : `list` of `str` 

1176 List of all the files that could not have metadata extracted. 

1177 """ 

1178 

1179 def _partition_good_bad( 

1180 file_data: Iterable[RawFileData], 

1181 ) -> tuple[list[RawFileData], list[ResourcePath]]: 

1182 """Filter out bad files and return good with list of bad.""" 

1183 good_files = [] 

1184 bad_files = [] 

1185 for fileDatum in self.progress.wrap(file_data, desc="Reading image metadata"): 

1186 if not fileDatum.datasets: 

1187 bad_files.append(fileDatum.filename) 

1188 else: 

1189 good_files.append(fileDatum) 

1190 return good_files, bad_files 

1191 

1192 # Look for zip files. 

1193 zip_file_data, files, good_zip_files, bad_zip_files = self.readZipIndexFiles(files) 

1194 if bad_zip_files: 

1195 self.log.info("Failed to extract index metadata from the following zip files:") 

1196 for bad in sorted(bad_zip_files): 

1197 self.log.info("- %s", bad) 

1198 

1199 # Look for index files and read them. 

1200 # There should be far fewer index files than data files. 

1201 index_entries: dict[ResourcePath, Any] = {} 

1202 if search_indexes: 

1203 index_entries, files, good_index_files, bad_index_files = self.locateAndReadIndexFiles(files) 

1204 if bad_index_files: 

1205 self.log.info("Failed to read the following explicitly requested index files:") 

1206 for bad in sorted(bad_index_files): 

1207 self.log.info("- %s", bad) 

1208 else: 

1209 # We have been told explicitly there are no indexes. 

1210 index_entries = {} 

1211 good_index_files = set() 

1212 bad_index_files = set() 

1213 

1214 # Merge information from zips and standalone index files. 

1215 good_index_files.update(good_zip_files) 

1216 bad_index_files.update(bad_zip_files) 

1217 

1218 # Now convert all the index file entries to standard form for ingest. 

1219 processed_bad_index_files: list[ResourcePath] = [] 

1220 indexFileData = self.processIndexEntries(index_entries) 

1221 indexFileData.extend(zip_file_data) 

1222 if indexFileData: 

1223 indexFileData, processed_bad_index_files = _partition_good_bad(indexFileData) 

1224 self.log.info( 

1225 "Successfully extracted metadata for %d file%s found in %d index file%s with %d failure%s", 

1226 *_log_msg_counter(indexFileData), 

1227 *_log_msg_counter(good_index_files), 

1228 *_log_msg_counter(processed_bad_index_files), 

1229 ) 

1230 

1231 # Extract metadata and build per-detector regions. 

1232 # This could run in threads or a subprocess so collect all output 

1233 # before looking at failures. 

1234 fileData: Iterator[RawFileData] 

1235 if pool is None: 

1236 fileData = map(self.extractMetadata, files) 

1237 else: 

1238 fileData = pool.map(self.extractMetadata, files) 

1239 

1240 # Filter out all the failed reads and store them for later 

1241 # reporting. 

1242 good_file_data, bad_files = _partition_good_bad(fileData) 

1243 # Only report if we looked at any standalone files at all. 

1244 if files: 

1245 self.log.info( 

1246 "Successfully extracted metadata from %d file%s with %d failure%s", 

1247 *_log_msg_counter(good_file_data), 

1248 *_log_msg_counter(bad_files), 

1249 ) 

1250 

1251 # Combine with data from index files. 

1252 good_file_data.extend(indexFileData) 

1253 bad_files.extend(processed_bad_index_files) 

1254 bad_files.extend(bad_index_files) 

1255 

1256 # Use that metadata to group files (and extracted metadata) by 

1257 # exposure. Never parallelized because it's intrinsically a gather 

1258 # step. 

1259 exposureData: list[RawExposureData] = self.groupByExposure(good_file_data) 

1260 

1261 # The next operation operates on RawExposureData instances (one at 

1262 # a time) in-place and then returns the modified instance. We call it 

1263 # as a pass-through instead of relying on the arguments we pass in to 

1264 # have been modified because in the parallel case those arguments are 

1265 # going to be pickled and unpickled, and I'm not certain 

1266 # multiprocessing is careful enough with that for output arguments to 

1267 # work. 

1268 

1269 # Expand the data IDs to include all dimension metadata; we need this 

1270 # because we may need to generate path templates that rely on that 

1271 # metadata. 

1272 # This is the first step that involves actual database calls (but just 

1273 # SELECTs), so if there's going to be a problem with connections vs. 

1274 # multiple processes, or lock contention (in SQLite) slowing things 

1275 # down, it'll happen here. 

1276 return map(self.expandDataIds, exposureData), bad_files 

1277 

1278 def ingestExposureDatasets( 

1279 self, 

1280 exposure: RawExposureData, 

1281 datasetType: DatasetType, 

1282 *, 

1283 run: str, 

1284 skip_existing_exposures: bool = False, 

1285 track_file_attrs: bool = True, 

1286 ) -> list[FileDataset]: 

1287 """Ingest all raw files in one exposure. 

1288 

1289 Parameters 

1290 ---------- 

1291 exposure : `RawExposureData` 

1292 A structure containing information about the exposure to be 

1293 ingested. Must have `RawExposureData.record` populated and all 

1294 data ID attributes expanded. 

1295 datasetType : `lsst.daf.butler.DatasetType` 

1296 The dataset type associated with this exposure. 

1297 run : `str` 

1298 Name of a RUN-type collection to write to. 

1299 skip_existing_exposures : `bool`, optional 

1300 If `True` (`False` is default), skip raws that have already been 

1301 ingested (i.e. raws for which we already have a dataset with the 

1302 same data ID in the target collection, even if from another file). 

1303 Note that this is much slower than just not passing 

1304 already-ingested files as inputs, because we still need to read and 

1305 process metadata to identify which exposures to search for. 

1306 track_file_attrs : `bool`, optional 

1307 Control whether file attributes such as the size or checksum should 

1308 be tracked by the datastore. Whether this parameter is honored 

1309 depends on the specific datastore implementation. 

1310 

1311 Returns 

1312 ------- 

1313 datasets : `list` of `lsst.daf.butler.FileDataset` 

1314 Per-file structures identifying the files ingested and their 

1315 dataset representation in the data repository. 

1316 """ 

1317 # Raw files are preferentially ingested using a UUID derived from 

1318 # the collection name and dataId. 

1319 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN): 

1320 mode = DatasetIdGenEnum.DATAID_TYPE_RUN 

1321 else: 

1322 mode = DatasetIdGenEnum.UNIQUE 

1323 

1324 # The datasets for this exposure could all be from a single zip 

1325 # or be distinct files. Need to pull out the zip files. 

1326 zips: dict[ResourcePath, list[RawFileData]] = defaultdict(list) 

1327 datasets: list[FileDataset] = [] 

1328 for file in exposure.files: 

1329 if file.filename.getExtension() == ".zip": 

1330 zips[file.filename.replace(fragment="")].append(file) 

1331 continue 

1332 

1333 refs = [ 

1334 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets 

1335 ] 

1336 if refs: 

1337 datasets.append( 

1338 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass) 

1339 ) 

1340 

1341 if datasets: 

1342 with self.butler.record_metrics() as butler_metrics: 

1343 self.butler.ingest( 

1344 *datasets, 

1345 transfer=self.config.transfer, 

1346 record_validation_info=track_file_attrs, 

1347 skip_existing=skip_existing_exposures, 

1348 ) 

1349 self.metrics.time_for_ingest += butler_metrics.time_in_ingest 

1350 

1351 # In theory it is possible for the new Data IDs to differ from the Data 

1352 # IDs stored in the Zip index. That could happen if there is a metadata 

1353 # correction that changes the exposure or detector numbers. We have to 

1354 # assume that by the time the zip has been made that this correction 

1355 # has been applied. If we don't assume that then we have to 

1356 # regenerate the index but we cannot change the contents of the zip. 

1357 # We would also need the ability for butler.ingest_zip to take an 

1358 # override ZipIndex object. 

1359 # The Dataset ref IDs will only change if the data IDs change. 

1360 for zip, files in zips.items(): 

1361 zip_datasets: list[FileDataset] = [] # Needed for return value. 

1362 for file in files: 

1363 refs = [ 

1364 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets 

1365 ] 

1366 if refs: 

1367 # Assumes the guiders are not included in the metadata 

1368 # index. 

1369 zip_datasets.append( 

1370 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass) 

1371 ) 

1372 with self.butler.record_metrics() as butler_metrics: 

1373 self.butler.ingest_zip( 

1374 zip, transfer=self.config.transfer, skip_existing=skip_existing_exposures 

1375 ) 

1376 datasets.extend(zip_datasets) 

1377 self.metrics.time_for_ingest += butler_metrics.time_in_ingest 

1378 

1379 return datasets 

1380 

1381 def ingestFiles( 

1382 self, 

1383 files: Sequence[ResourcePath], 

1384 *, 

1385 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1386 num_workers: int = 1, 

1387 run: str | None = None, 

1388 skip_existing_exposures: bool = False, 

1389 update_exposure_records: bool = False, 

1390 track_file_attrs: bool = True, 

1391 search_indexes: bool = True, 

1392 skip_ingest: bool = False, 

1393 ) -> tuple[list[DatasetRef], list[ResourcePath], int, int, int]: 

1394 """Ingest files into a Butler data repository. 

1395 

1396 This creates any new exposure or visit Dimension entries needed to 

1397 identify the ingested files, creates new Dataset entries in the 

1398 Registry and finally ingests the files themselves into the Datastore. 

1399 Any needed instrument, detector, and physical_filter Dimension entries 

1400 must exist in the Registry before `run` is called. 

1401 

1402 Parameters 

1403 ---------- 

1404 files : iterable over `lsst.resources.ResourcePath` 

1405 URIs to the files to be ingested. 

1406 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1407 If not `None`, a thread pool with which to parallelize some 

1408 operations. 

1409 num_workers : `int`, optional 

1410 The number of workers to use. Ignored if ``pool`` is not `None`. 

1411 run : `str`, optional 

1412 Name of a RUN-type collection to write to, overriding 

1413 the default derived from the instrument name. 

1414 skip_existing_exposures : `bool`, optional 

1415 If `True` (`False` is default), skip raws that have already been 

1416 ingested (i.e. raws for which we already have a dataset with the 

1417 same data ID in the target collection, even if from another file). 

1418 Note that this is much slower than just not passing 

1419 already-ingested files as inputs, because we still need to read and 

1420 process metadata to identify which exposures to search for. It 

1421 also will not work reliably if multiple processes are attempting to 

1422 ingest raws from the same exposure concurrently, in that different 

1423 processes may still attempt to ingest the same raw and conflict, 

1424 causing a failure that prevents other raws from the same exposure 

1425 from being ingested. 

1426 update_exposure_records : `bool`, optional 

1427 If `True` (`False` is default), update existing exposure records 

1428 that conflict with the new ones instead of rejecting them. THIS IS 

1429 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS 

1430 KNOWN TO BE BAD. This should usually be combined with 

1431 ``skip_existing_exposures=True``. 

1432 track_file_attrs : `bool`, optional 

1433 Control whether file attributes such as the size or checksum should 

1434 be tracked by the datastore. Whether this parameter is honored 

1435 depends on the specific datastore implementation. 

1436 search_indexes : `bool`, optional 

1437 If `True` the code will search for index JSON files in given 

1438 directories. If you know for a fact that index files do not exist 

1439 set this to `False` for a slight speed up in metadata gathering. 

1440 skip_ingest : `bool`, optional 

1441 Set this to `True` to do metadata extraction and dimension record 

1442 updates without attempting to re-ingest. This can be useful if 

1443 there has been a metadata correction associated with an exposure. 

1444 

1445 Returns 

1446 ------- 

1447 refs : `list` of `lsst.daf.butler.DatasetRef` 

1448 Dataset references for ingested raws. 

1449 bad_files : `list` of `lsst.resources.ResourcePath` 

1450 Given paths that could not be ingested. 

1451 n_exposures : `int` 

1452 Number of exposures successfully ingested. 

1453 n_exposures_failed : `int` 

1454 Number of exposures that failed when inserting dimension data. 

1455 n_ingests_failed : `int` 

1456 Number of exposures that failed when ingesting raw datasets. 

1457 """ 

1458 created_pool = False 

1459 if pool is None and num_workers > 1: 

1460 pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) 

1461 created_pool = True 

1462 

1463 try: 

1464 with self.metrics.collect_metric( 

1465 "time_for_metadata", 

1466 self.log, 

1467 msg="Reading metadata from %d file%s", 

1468 args=(*_log_msg_counter(files),), 

1469 ): 

1470 exposureData, bad_files = self.prep(files, pool=pool, search_indexes=search_indexes) 

1471 finally: 

1472 if created_pool and pool: 

1473 # The pool is not needed any more so close it if we created 

1474 # it to ensure we clean up resources. 

1475 pool.shutdown(wait=True) 

1476 

1477 # Up to this point, we haven't modified the data repository at all. 

1478 # Now we finally do that, with one transaction per exposure. This is 

1479 # not parallelized at present because the performance of this step is 

1480 # limited by the database server. That may or may not change in the 

1481 # future once we increase our usage of bulk inserts and reduce our 

1482 # usage of savepoints; we've tried to get everything but the database 

1483 # operations done in advance to reduce the time spent inside 

1484 # transactions. 

1485 refs = [] 

1486 runs = set() 

1487 datasetTypes: dict[str, DatasetType] = {} 

1488 n_exposures = 0 

1489 n_exposures_failed = 0 

1490 n_ingests_failed = 0 

1491 for exposure in self.progress.wrap(exposureData, desc="Ingesting raw exposures"): 

1492 assert exposure.record is not None, "Should be guaranteed by prep()" 

1493 self.log.debug( 

1494 "Attempting to ingest %d file%s from exposure %s:%s", 

1495 *_log_msg_counter(exposure.files), 

1496 exposure.record.instrument, 

1497 exposure.record.obs_id, 

1498 ) 

1499 

1500 try: 

1501 with self.metrics.collect_metric( 

1502 "time_for_records", 

1503 self.log, 

1504 msg="Creating dimension records for instrument %s, exposure %s", 

1505 args=( 

1506 str(exposure.record.instrument), 

1507 str(exposure.record.id), 

1508 ), 

1509 ): 

1510 for name, record in exposure.dependencyRecords.items(): 

1511 self.butler.registry.syncDimensionData(name, record, update=update_exposure_records) 

1512 inserted_or_updated = self.butler.registry.syncDimensionData( 

1513 "exposure", 

1514 exposure.record, 

1515 update=update_exposure_records, 

1516 ) 

1517 if inserted_or_updated is not False: 

1518 with self.metrics.collect_metric( 

1519 "time_for_callbacks", log=self.log, msg="Exposure record updated. Calling handler" 

1520 ): 

1521 self._on_exposure_record(exposure.record) 

1522 except Exception as e: 

1523 self._on_ingest_failure(exposure, e) 

1524 n_exposures_failed += 1 

1525 self.log.warning( 

1526 "Exposure %s:%s could not be registered: %s", 

1527 exposure.record.instrument, 

1528 exposure.record.obs_id, 

1529 e, 

1530 ) 

1531 if self.config.failFast: 

1532 raise e 

1533 continue 

1534 

1535 if isinstance(inserted_or_updated, dict): 

1536 # Exposure is in the registry and we updated it, so 

1537 # syncDimensionData returned a dict. 

1538 columns_updated = list(inserted_or_updated.keys()) 

1539 s_col = "s" if len(columns_updated) != 1 else "" 

1540 w_col = "were" if len(columns_updated) != 1 else "was" 

1541 self.log.info( 

1542 "Exposure %s:%s was already present, but column%s %s %s updated.", 

1543 exposure.record.instrument, 

1544 exposure.record.obs_id, 

1545 s_col, 

1546 ", ".join(repr(c) for c in columns_updated), 

1547 w_col, 

1548 ) 

1549 

1550 if skip_ingest: 

1551 continue 

1552 

1553 # Determine the instrument so we can work out the dataset type. 

1554 instrument = exposure.files[0].instrument 

1555 assert instrument is not None, ( 

1556 "file should have been removed from this list by prep if instrument could not be found" 

1557 ) 

1558 

1559 datasetType = self.get_raw_datasetType(instrument, datasetTypes) 

1560 if datasetType.name not in datasetTypes: 

1561 self.butler.registry.registerDatasetType(datasetType) 

1562 datasetTypes[datasetType.name] = datasetType 

1563 

1564 # Override default run if nothing specified explicitly. 

1565 if run is None: 

1566 this_run = instrument.makeDefaultRawIngestRunName() 

1567 else: 

1568 this_run = run 

1569 if this_run not in runs: 

1570 self.butler.registry.registerCollection(this_run, type=CollectionType.RUN) 

1571 runs.add(this_run) 

1572 try: 

1573 datasets_for_exposure = self.ingestExposureDatasets( 

1574 exposure, 

1575 datasetType=datasetType, 

1576 run=this_run, 

1577 skip_existing_exposures=skip_existing_exposures, 

1578 track_file_attrs=track_file_attrs, 

1579 ) 

1580 except Exception as e: 

1581 self._on_ingest_failure(exposure, e) 

1582 n_ingests_failed += 1 

1583 self.log.warning("Failed to ingest the following for reason: %s", e) 

1584 for f in exposure.files: 

1585 self.log.warning("- %s", f.filename) 

1586 if self.config.failFast: 

1587 raise e 

1588 continue 

1589 else: 

1590 with self.metrics.collect_metric("time_for_callbacks", self.log, msg="Calling on_success"): 

1591 self._on_success(datasets_for_exposure) 

1592 for dataset in datasets_for_exposure: 

1593 refs.extend(dataset.refs) 

1594 

1595 # Success for this exposure. 

1596 n_exposures += 1 

1597 self.log.info( 

1598 "Exposure %s:%s ingested successfully", exposure.record.instrument, exposure.record.obs_id 

1599 ) 

1600 

1601 return refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed 

1602 

1603 @timeMethod 

1604 def run( 

1605 self, 

1606 files: Iterable[ResourcePathExpression], 

1607 *, 

1608 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1609 processes: int | None = None, # Deprecated. Use num_workers. 

1610 run: str | None = None, 

1611 file_filter: str | re.Pattern = r"\.fit[s]?\b", 

1612 group_files: bool = True, 

1613 skip_existing_exposures: bool = False, 

1614 update_exposure_records: bool = False, 

1615 track_file_attrs: bool = True, 

1616 search_indexes: bool = True, 

1617 num_workers: int = 1, 

1618 skip_ingest: bool = False, 

1619 ) -> list[DatasetRef]: 

1620 """Ingest files into a Butler data repository. 

1621 

1622 This creates any new exposure or visit Dimension entries needed to 

1623 identify the ingested files, creates new Dataset entries in the 

1624 Registry and finally ingests the files themselves into the Datastore. 

1625 Any needed instrument, detector, and physical_filter Dimension entries 

1626 must exist in the Registry before `run` is called. 

1627 

1628 Parameters 

1629 ---------- 

1630 files : iterable `lsst.resources.ResourcePath`, `str` or path-like 

1631 Paths to the files to be ingested. Can refer to directories. 

1632 Will be made absolute if they are not already. 

1633 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1634 If not `None`, a process pool with which to parallelize some 

1635 operations. This parameter was previously a `multiprocessing.Pool` 

1636 but that option is no longer supported since it is slow compared 

1637 to futures. 

1638 processes : `int`, optional 

1639 The number of processes to use. Ignored if ``pool`` is not `None`. 

1640 Deprecated. Please use ``num_workers`` parameter instead. 

1641 run : `str`, optional 

1642 Name of a RUN-type collection to write to, overriding 

1643 the default derived from the instrument name. 

1644 file_filter : `str` or `re.Pattern`, optional 

1645 Pattern to use to discover files to ingest within directories. 

1646 The default is to search for FITS files. The regex applies to 

1647 files within the directory. 

1648 group_files : `bool`, optional 

1649 Group files by directory if they have been discovered in 

1650 directories. Will not affect files explicitly provided. 

1651 skip_existing_exposures : `bool`, optional 

1652 If `True` (`False` is default), skip raws that have already been 

1653 ingested (i.e. raws for which we already have a dataset with the 

1654 same data ID in the target collection, even if from another file). 

1655 Note that this is much slower than just not passing 

1656 already-ingested files as inputs, because we still need to read and 

1657 process metadata to identify which exposures to search for. It 

1658 also will not work reliably if multiple processes are attempting to 

1659 ingest raws from the same exposure concurrently, in that different 

1660 processes may still attempt to ingest the same raw and conflict, 

1661 causing a failure that prevents other raws from the same exposure 

1662 from being ingested. 

1663 update_exposure_records : `bool`, optional 

1664 If `True` (`False` is default), update existing exposure records 

1665 that conflict with the new ones instead of rejecting them. THIS IS 

1666 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS 

1667 KNOWN TO BE BAD. This should usually be combined with 

1668 ``skip_existing_exposures=True``. 

1669 track_file_attrs : `bool`, optional 

1670 Control whether file attributes such as the size or checksum should 

1671 be tracked by the datastore. Whether this parameter is honored 

1672 depends on the specific datastore implementation. 

1673 search_indexes : `bool`, optional 

1674 If `True` the code will search for index JSON files in given 

1675 directories. If you know for a fact that index files do not exist 

1676 set this to `False` for a slight speed up in metadata gathering. 

1677 num_workers : `int`, optional 

1678 The number of workers to use. Ignored if ``pool`` parameter is 

1679 given. 

1680 skip_ingest : `bool`, optional 

1681 Set this to `True` to do metadata extraction and dimension record 

1682 updates without attempting to re-ingest. This can be useful if 

1683 there has been a metadata correction associated with an exposure. 

1684 

1685 Returns 

1686 ------- 

1687 refs : `list` of `lsst.daf.butler.DatasetRef` 

1688 Dataset references for ingested raws. 

1689 

1690 Notes 

1691 ----- 

1692 This method inserts all datasets for an exposure within a transaction, 

1693 guaranteeing that partial exposures are never ingested. The exposure 

1694 dimension record is inserted with 

1695 `lsst.daf.butler.Registry.syncDimensionData` first (in its own 

1696 transaction), which inserts only if a record with the same 

1697 primary key does not already exist. This allows different files within 

1698 the same exposure to be ingested in different runs. 

1699 """ 

1700 if pool and not isinstance(pool, concurrent.futures.ThreadPoolExecutor): 

1701 raise ValueError(f"This parameter must now be a ThreadPoolExecutor but was given {pool}.") 

1702 

1703 if processes is not None: 

1704 warnings.warn( 

1705 "Processes parameter is deprecated. Please use num_workers parameter.", 

1706 FutureWarning, 

1707 stacklevel=3, # Jump above the timeMethod wrapper. 

1708 ) 

1709 num_workers = processes 

1710 

1711 refs = [] 

1712 bad_files = [] 

1713 n_exposures = 0 

1714 n_exposures_failed = 0 

1715 n_ingests_failed = 0 

1716 self.metrics.reset() # Clear previous metrics. 

1717 ingest_duration = 0.0 

1718 if group_files: 

1719 with time_this(log=self.log, msg="Processing ingest groups") as timer: 

1720 for group in ResourcePath.findFileResources(files, file_filter, group_files): 

1721 new_refs, bad, n_exp, n_exp_fail, n_ingest_fail = self.ingestFiles( 

1722 tuple(group), 

1723 pool=pool, 

1724 num_workers=num_workers, 

1725 run=run, 

1726 skip_existing_exposures=skip_existing_exposures, 

1727 update_exposure_records=update_exposure_records, 

1728 track_file_attrs=track_file_attrs, 

1729 search_indexes=search_indexes, 

1730 skip_ingest=skip_ingest, 

1731 ) 

1732 refs.extend(new_refs) 

1733 bad_files.extend(bad) 

1734 n_exposures += n_exp 

1735 n_exposures_failed += n_exp_fail 

1736 n_ingests_failed += n_ingest_fail 

1737 ingest_duration = timer.duration 

1738 else: 

1739 with time_this(log=self.log, msg="Ingesting all files in one batch") as timer: 

1740 refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed = self.ingestFiles( 

1741 tuple(ResourcePath.findFileResources(files, file_filter, group_files)), 

1742 pool=pool, 

1743 num_workers=num_workers, 

1744 run=run, 

1745 skip_existing_exposures=skip_existing_exposures, 

1746 update_exposure_records=update_exposure_records, 

1747 track_file_attrs=track_file_attrs, 

1748 search_indexes=search_indexes, 

1749 skip_ingest=skip_ingest, 

1750 ) 

1751 ingest_duration = timer.duration 

1752 

1753 had_failure = False 

1754 

1755 if bad_files: 

1756 had_failure = True 

1757 self.log.warning("Could not extract observation metadata from the following:") 

1758 for f in bad_files: 

1759 self.log.warning("- %s", f) 

1760 

1761 if skip_ingest: 

1762 ingest_text = "" 

1763 else: 

1764 ingest_text = f" - time in butler ingest: {self.metrics.time_for_ingest} s\n" 

1765 

1766 self.log.info( 

1767 "Successfully processed data from %d exposure%s with %d failure%s from exposure" 

1768 " registration and %d failure%s from file ingest.\n" 

1769 "Timing breakdown:\n" 

1770 " - time in metadata gathering: %f s\n" 

1771 " - time in dimension record writing: %f s\n" 

1772 "%s" 

1773 " - time in user-supplied callbacks: %f s\n", 

1774 *_log_msg_counter(n_exposures), 

1775 *_log_msg_counter(n_exposures_failed), 

1776 *_log_msg_counter(n_ingests_failed), 

1777 self.metrics.time_for_metadata, 

1778 self.metrics.time_for_records, 

1779 ingest_text, 

1780 self.metrics.time_for_callbacks, 

1781 ) 

1782 if n_exposures_failed > 0 or n_ingests_failed > 0: 

1783 had_failure = True 

1784 if not skip_ingest: 

1785 self.log.info( 

1786 "Ingested %d distinct Butler dataset%s in %f sec", *_log_msg_counter(refs), ingest_duration 

1787 ) 

1788 

1789 if had_failure: 

1790 raise RuntimeError("Some failures encountered during ingestion") 

1791 

1792 return refs