Coverage for python/lsst/obs/base/ingest.py: 14%
518 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:25 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 08:25 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "RawExposureData",
26 "RawFileData",
27 "RawFileDatasetInfo",
28 "RawIngestConfig",
29 "RawIngestTask",
30 "makeTransferChoiceField",
31)
33import concurrent.futures
34import contextlib
35import json
36import logging
37import re
38import warnings
39import zipfile
40from collections import defaultdict
41from collections.abc import Callable, Iterable, Iterator, MutableMapping, Sequence, Sized
42from contextlib import contextmanager
43from dataclasses import InitVar, dataclass
44from typing import Any, ClassVar, cast
46from astro_metadata_translator import MetadataTranslator, ObservationInfo, merge_headers
47from astro_metadata_translator.indexing import process_index_data, process_sidecar_data
48from pydantic import BaseModel
50from lsst.afw.fits import readMetadata
51from lsst.daf.butler import (
52 Butler,
53 CollectionType,
54 DataCoordinate,
55 DatasetIdGenEnum,
56 DatasetRef,
57 DatasetType,
58 DimensionRecord,
59 DimensionUniverse,
60 FileDataset,
61 Formatter,
62 FormatterV2,
63 Progress,
64 Timespan,
65)
66from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex
67from lsst.pex.config import ChoiceField, Config, Field
68from lsst.pipe.base import Instrument, Task
69from lsst.resources import ResourcePath, ResourcePathExpression
70from lsst.utils.logging import LsstLoggers
71from lsst.utils.timer import time_this, timeMethod
73from ._instrument import makeExposureRecordFromObsInfo
76def _do_nothing(*args: Any, **kwargs: Any) -> None:
77 """Do nothing.
79 This is a function that accepts anything and does nothing.
80 For use as a default in callback arguments.
81 """
82 pass
85def _log_msg_counter(noun: int | Sized) -> tuple[int, str]:
86 """Count the iterable and return the count and plural modifier.
88 Parameters
89 ----------
90 noun : `Sized` or `int`
91 Thing to count. If given an integer it is assumed to be the count
92 to use to calculate modifier.
94 Returns
95 -------
96 num : `int`
97 Number of items found in ``noun``.
98 modifier : `str`
99 Character to add to the end of a string referring to these items
100 to indicate whether it was a single item or not. Returns empty
101 string if there is one item or "s" otherwise.
103 Examples
104 --------
105 .. code-block:: python
107 log.warning("Found %d file%s", *_log_msg_counter(nfiles))
108 """
109 if isinstance(noun, int):
110 num = noun
111 else:
112 num = len(noun)
113 return num, "" if num == 1 else "s"
116class IngestMetrics(BaseModel):
117 """Metrics collected during raw ingest."""
119 time_for_metadata: float = 0.0
120 """Wall-clock time, in seconds, spent gathering file metadata."""
122 time_for_records: float = 0.0
123 """Wall-clock time, in seconds, spent writing dimension records."""
125 time_for_ingest: float = 0.0
126 """Wall-clock time, in seconds, spent calling butler ingest."""
128 time_for_callbacks: float = 0.0
129 """Wall-clock time, in seconds, processing user-supplied callbacks."""
131 def reset(self) -> None:
132 """Reset all metrics to initial values."""
133 self.time_for_ingest = 0.0
134 self.time_for_records = 0.0
135 self.time_for_metadata = 0.0
136 self.time_for_callbacks = 0.0
138 @contextmanager
139 def collect_metric(
140 self,
141 property: str,
142 log: LsstLoggers | None = None,
143 msg: str | None = None,
144 args: tuple[Any, ...] = (),
145 ) -> Iterator[None]:
146 with time_this(log=log, msg=msg, args=args, level=logging.INFO) as timer:
147 yield
148 setattr(self, property, getattr(self, property) + timer.duration)
151@dataclass
152class RawFileDatasetInfo:
153 """Information about a single dataset within a raw file."""
155 dataId: DataCoordinate
156 """Data ID for this file (`lsst.daf.butler.DataCoordinate`)."""
158 obsInfo: ObservationInfo
159 """Standardized observation metadata extracted directly from the file
160 headers (`astro_metadata_translator.ObservationInfo`).
161 """
164@dataclass
165class RawFileData:
166 """Information about a single raw file, used during ingest."""
168 datasets: list[RawFileDatasetInfo]
169 """The information describing each dataset within this raw file.
170 (`list` of `RawFileDatasetInfo`)
171 """
173 filename: ResourcePath
174 """URI of the file this information was extracted from (`str`).
176 This is the path prior to ingest, not the path after ingest.
177 """
179 FormatterClass: type[Formatter]
180 """Formatter class that should be used to ingest this file (`type`; as
181 subclass of `~lsst.daf.butler.Formatter`).
182 """
184 instrument: Instrument | None
185 """The `Instrument` instance associated with this file. Can be `None`
186 if ``datasets`` is an empty list."""
189@dataclass
190class RawExposureData:
191 """Information about a complete raw exposure, used during ingest."""
193 dataId: DataCoordinate
194 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`).
195 """
197 files: list[RawFileData]
198 """List of structures containing file-level information.
199 """
201 universe: InitVar[DimensionUniverse]
202 """Set of all known dimensions.
203 """
205 record: DimensionRecord
206 """The exposure `~lsst.daf.butler.DimensionRecord` that must be inserted
207 into the `~lsst.daf.butler.Registry` prior to file-level ingest
208 (`~lsst.daf.butler.DimensionRecord`).
209 """
211 dependencyRecords: dict[str, DimensionRecord]
212 """Additional records that must be inserted into the
213 `~lsst.daf.butler.Registry` prior to ingesting the exposure ``record``
214 (e.g., to satisfy foreign key constraints), indexed by the dimension name.
215 """
218def makeTransferChoiceField(
219 doc: str = "How to transfer files (None for no transfer).", default: str = "auto"
220) -> ChoiceField:
221 """Create a Config field with options for transferring data between repos.
223 The allowed options for the field are exactly those supported by
224 `lsst.daf.butler.Datastore.ingest`.
226 Parameters
227 ----------
228 doc : `str`
229 Documentation for the configuration field.
230 default : `str`, optional
231 Default transfer mode for the field.
233 Returns
234 -------
235 field : `lsst.pex.config.ChoiceField`
236 Configuration field.
237 """
238 return ChoiceField(
239 doc=doc,
240 dtype=str,
241 allowed={
242 "move": "move",
243 "copy": "copy",
244 "auto": "choice will depend on datastore",
245 "direct": "use URI to ingested file directly in datastore",
246 "link": "hard link falling back to symbolic link",
247 "hardlink": "hard link",
248 "symlink": "symbolic (soft) link",
249 "relsymlink": "relative symbolic link",
250 },
251 optional=True,
252 default=default,
253 )
256class RawIngestConfig(Config):
257 """Configuration class for RawIngestTask."""
259 transfer = makeTransferChoiceField()
260 failFast: Field[bool] = Field(
261 dtype=bool,
262 default=False,
263 doc="If True, stop ingest as soon as any problem is encountered with any file. "
264 "Otherwise problem files will be skipped and logged and a report issued at completion.",
265 )
268class RawIngestTask(Task):
269 """Driver Task for ingesting raw data into Gen3 Butler repositories.
271 Parameters
272 ----------
273 config : `RawIngestConfig`
274 Configuration for the task.
275 butler : `~lsst.daf.butler.Butler`
276 Writeable butler instance, with ``butler.run`` set to the appropriate
277 `~lsst.daf.butler.CollectionType.RUN` collection for these raw
278 datasets.
279 on_success : `collections.abc.Callable`, optional
280 A callback invoked when all of the raws associated with an exposure
281 are ingested. Will be passed a list of `~lsst.daf.butler.FileDataset`
282 objects, each containing one or more resolved
283 `~lsst.daf.butler.DatasetRef` objects. If this callback raises it will
284 interrupt the entire ingest process, even if `RawIngestConfig.failFast`
285 is `False`.
286 on_metadata_failure : `collections.abc.Callable`, optional
287 A callback invoked when a failure occurs trying to translate the
288 metadata for a file. Will be passed the URI and the exception, in
289 that order, as positional arguments. Guaranteed to be called in an
290 ``except`` block, allowing the callback to re-raise or replace (with
291 ``raise ... from``) to override the task's usual error handling (before
292 `RawIngestConfig.failFast` logic occurs). This callback can be called
293 from within a worker thread if multiple workers have been requested.
294 Ensure that any code within the call back is thread-safe.
295 on_ingest_failure : `collections.abc.Callable`, optional
296 A callback invoked when dimension record or dataset insertion into the
297 database fails for an exposure. Will be passed a `RawExposureData`
298 instance and the exception, in that order, as positional arguments.
299 Guaranteed to be called in an ``except`` block, allowing the callback
300 to re-raise or replace (with ``raise ... from``) to override the task's
301 usual error handling (before `RawIngestConfig.failFast` logic occurs).
302 on_exposure_record : `collections.abc.Callable`, optional
303 A callback invoked when an exposure dimension record has been created
304 or modified. Will not be called if the record already existed. Will
305 be called with the exposure record.
306 **kwargs
307 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
308 constructor.
310 Notes
311 -----
312 Each instance of `RawIngestTask` writes to the same Butler. Each
313 invocation of `RawIngestTask.run` ingests a list of files.
314 """
316 ConfigClass: ClassVar[type[Config]] = RawIngestConfig
318 _DefaultName: ClassVar[str] = "ingest"
320 def getDatasetType(self) -> DatasetType:
321 """Return the default DatasetType of the datasets ingested by this
322 Task.
324 Returns
325 -------
326 datasetType : `lsst.daf.butler.DatasetType`
327 The default dataset type to use for the data being ingested. This
328 is only used if the relevant `~lsst.pipe.base.Instrument` does not
329 define an override.
330 """
331 return DatasetType(
332 "raw",
333 ("instrument", "detector", "exposure"),
334 "Exposure",
335 universe=self.butler.dimensions,
336 )
338 # Mypy can not determine that the config passed to super() is this type.
339 config: RawIngestConfig
341 def __init__(
342 self,
343 config: RawIngestConfig | None = None,
344 *,
345 butler: Butler,
346 on_success: Callable[[list[FileDataset]], Any] = _do_nothing,
347 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing,
348 on_ingest_failure: Callable[[RawExposureData, Exception], Any] = _do_nothing,
349 on_exposure_record: Callable[[DimensionRecord], Any] = _do_nothing,
350 **kwargs: Any,
351 ):
352 if config is None:
353 config = RawIngestConfig()
354 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
355 super().__init__(config, **kwargs)
356 self.butler = butler
357 self.universe = self.butler.dimensions
358 self.datasetType = self.getDatasetType()
359 self._on_success = on_success
360 self._on_exposure_record = on_exposure_record
361 self._on_metadata_failure = on_metadata_failure
362 self._on_ingest_failure = on_ingest_failure
363 self.progress = Progress("obs.base.RawIngestTask")
365 # Import all the instrument classes so that we ensure that we
366 # have all the relevant metadata translators loaded.
367 self.instruments = Instrument.importAll(self.butler.registry)
369 # Read all the instrument records into a cache since they will be
370 # needed later to calculate day_obs timespans, if appropriate.
371 self._instrument_records = {
372 rec.name: rec for rec in butler.registry.queryDimensionRecords("instrument")
373 }
374 # Create empty metrics.
375 self.metrics = IngestMetrics()
377 def _reduce_kwargs(self) -> dict[str, Any]:
378 # Add extra parameters to pickle.
379 return dict(
380 **super()._reduce_kwargs(),
381 butler=self.butler,
382 on_success=self._on_success,
383 on_metadata_failure=self._on_metadata_failure,
384 on_ingest_failure=self._on_ingest_failure,
385 on_exposure_record=self._on_exposure_record,
386 )
388 def _determine_instrument_formatter(
389 self, dataId: DataCoordinate, filename: ResourcePath
390 ) -> tuple[Instrument | None, type[Formatter | FormatterV2]]:
391 """Determine the instrument and formatter class.
393 Parameters
394 ----------
395 dataId : `lsst.daf.butler.DataCoordinate`
396 The dataId associated with this dataset.
397 filename : `lsst.resources.ResourcePath`
398 URI of file used for error reporting.
400 Returns
401 -------
402 instrument : `Instrument` or `None`
403 Instance of the `Instrument` associated with this dataset. `None`
404 indicates that the instrument could not be determined.
405 formatterClass : `type`
406 Class to be used as the formatter for this dataset.
408 Notes
409 -----
410 Does not access butler registry since it may be called from threads.
411 """
412 # The data model currently assumes that whilst multiple datasets
413 # can be associated with a single file, they must all share the
414 # same formatter.
415 FormatterClass: type[Formatter | FormatterV2] = Formatter
416 try:
417 instrument_name = cast(str, dataId["instrument"])
418 instrument = self.instruments[instrument_name]()
419 except LookupError as e:
420 self._on_metadata_failure(filename, e)
421 self.log.warning(
422 "Instrument %s for file %s not known to registry", dataId["instrument"], filename
423 )
424 if self.config.failFast:
425 raise RuntimeError(
426 f"Instrument {dataId['instrument']} for file {filename} not known to registry"
427 ) from e
428 # Indicate that we could not work out the instrument.
429 instrument = None
430 else:
431 assert instrument is not None, "Should be guaranted by fromName succeeding."
432 FormatterClass = instrument.getRawFormatter(dataId)
433 return instrument, FormatterClass
435 def get_raw_datasetType(
436 self, instrument: Instrument, cache: dict[str, DatasetType] | None = None
437 ) -> DatasetType:
438 """Get the raw dataset type associated with this ingest.
440 Parameters
441 ----------
442 instrument : `Instrument`
443 Class that might specify an override of the default raw dataset
444 type. If no override is specified the task default will be used.
445 cache : `dict` [`str`, `lsst.daf.butler.DatasetType`] \
446 or `None`, optional
447 An optional cache that can be used to return a pre-existing
448 dataset type. Is not updated.
450 Returns
451 -------
452 lsst.daf.butler.DatasetType
453 The dataset type to use for raw ingest of this instrument.
454 """
455 if cache is None:
456 cache = {}
457 if raw_definition := getattr(instrument, "raw_definition", None):
458 datasetTypeName, dimensions, storageClass = raw_definition
459 if not (datasetType := cache.get(datasetTypeName)):
460 datasetType = DatasetType(
461 datasetTypeName, dimensions, storageClass, universe=self.butler.dimensions
462 )
463 else:
464 datasetType = self.datasetType
465 return datasetType
467 def extractMetadata(self, filename: ResourcePath) -> RawFileData:
468 """Extract and process metadata from a single raw file.
470 Parameters
471 ----------
472 filename : `lsst.resources.ResourcePath`
473 URI to the file.
475 Returns
476 -------
477 data : `RawFileData`
478 A structure containing the metadata extracted from the file,
479 as well as the original filename. All fields will be populated,
480 but the `RawFileDatasetInfo.dataId` attribute will be a minimal
481 (unexpanded) `~lsst.daf.butler.DataCoordinate` instance. The
482 ``instrument`` field will be `None` if there is a problem
483 with metadata extraction.
485 Notes
486 -----
487 Assumes that there is a single dataset associated with the given
488 file. Instruments using a single file to store multiple datasets
489 must implement their own version of this method.
491 By default the method will catch all exceptions unless the
492 `RawIngestConfig.failFast` configuration item is `True`. If an error
493 is encountered the supplied ``on_metadata_failure()``
494 method will be called. If no exceptions result and an error was
495 encountered the returned object will have a null-instrument class and
496 no datasets.
498 This method supports sidecar JSON files which can be used to
499 extract metadata without having to read the data file itself.
500 The sidecar file is always used if found.
501 """
502 formatterClass: type[Formatter | FormatterV2]
503 sidecar_fail_msg = "" # Requires prepended space when set.
504 try:
505 sidecar_file = filename.updatedExtension(".json")
506 headers = []
507 with contextlib.suppress(Exception):
508 # Try to read directly, bypassing existence check.
509 content = json.loads(sidecar_file.read())
510 headers = [process_sidecar_data(content)]
511 sidecar_fail_msg = " (via sidecar)"
512 if not headers:
513 # Read the metadata from the data file itself.
515 # For remote files download the entire file to get the
516 # header. This is very inefficient and it would be better
517 # to have some way of knowing where in the file the headers
518 # are and to only download those parts of the file.
519 with filename.as_local() as local_file:
520 # Read the primary. This might be sufficient.
521 header = readMetadata(local_file.ospath, 0)
522 translator_class = None
524 try:
525 # Try to work out a translator class early.
526 translator_class = MetadataTranslator.determine_translator(
527 header, filename=str(filename)
528 )
529 except ValueError:
530 # Primary header was not sufficient (maybe this file
531 # has been compressed or is a MEF with minimal
532 # primary). Read second header and merge with primary.
533 header = merge_headers([header, readMetadata(local_file.ospath, 1)], mode="overwrite")
535 # Try again to work out a translator class, letting this
536 # fail.
537 if translator_class is None:
538 translator_class = MetadataTranslator.determine_translator(
539 header, filename=str(filename)
540 )
542 # Request the headers to use for ingest
543 headers = list(translator_class.determine_translatable_headers(local_file.ospath, header))
545 # Add each header to the dataset list
546 datasets = [self._calculate_dataset_info(h, filename) for h in headers]
548 except Exception as e:
549 self.log.debug("Problem extracting metadata from %s%s: %s", filename, sidecar_fail_msg, e)
550 # Indicate to the caller that we failed to read.
551 datasets = []
552 formatterClass = Formatter
553 instrument = None
554 self._on_metadata_failure(filename, e)
555 if self.config.failFast:
556 raise RuntimeError(
557 f"Problem extracting metadata for file {filename}{sidecar_fail_msg}"
558 ) from e
559 else:
560 self.log.debug("Extracted metadata for file %s%s", filename, sidecar_fail_msg)
561 # The data model currently assumes that whilst multiple datasets
562 # can be associated with a single file, they must all share the
563 # same formatter.
564 instrument, formatterClass = self._determine_instrument_formatter(datasets[0].dataId, filename)
565 if instrument is None:
566 datasets = []
568 return RawFileData(
569 datasets=datasets,
570 filename=filename,
571 # MyPy wants this to be a non-abstract class, which is not true
572 # for the error case where instrument is None and datasets=[].
573 FormatterClass=formatterClass, # type: ignore
574 instrument=instrument,
575 )
577 @classmethod
578 def getObservationInfoSubsets(cls) -> tuple[set, set]:
579 """Return subsets of fields in the
580 `~astro_metadata_translator.ObservationInfo` that we care about.
582 These fields will be used in constructing an exposure record.
584 Returns
585 -------
586 required : `set`
587 Set of `~astro_metadata_translator.ObservationInfo` field names
588 that are required.
589 optional : `set`
590 Set of `~astro_metadata_translator.ObservationInfo` field names
591 we will use if they are available.
592 """
593 # Marking the new properties "group_counter_*" and
594 # "has_simulated_content" as required, assumes that we either
595 # recreate any existing index/sidecar files that include translated
596 # values, or else allow astro_metadata_translator to fill in
597 # defaults.
598 required = {
599 "datetime_begin",
600 "datetime_end",
601 "detector_num",
602 "exposure_group",
603 "exposure_id",
604 "exposure_time_requested",
605 "group_counter_end",
606 "group_counter_start",
607 "has_simulated_content",
608 "instrument",
609 "observation_id",
610 "observation_type",
611 "observing_day",
612 "physical_filter",
613 }
614 optional = {
615 "altaz_begin",
616 "boresight_rotation_coord",
617 "boresight_rotation_angle",
618 "dark_time",
619 "tracking_radec",
620 "object",
621 "observation_counter",
622 "observation_reason",
623 "observing_day_offset",
624 "science_program",
625 "visit_id",
626 "can_see_sky",
627 }
628 return required, optional
630 def _calculate_dataset_info(
631 self, header: MutableMapping[str, Any] | ObservationInfo, filename: ResourcePath
632 ) -> RawFileDatasetInfo:
633 """Calculate a RawFileDatasetInfo from the supplied information.
635 Parameters
636 ----------
637 header : Mapping or `astro_metadata_translator.ObservationInfo`
638 Header from the dataset or previously-translated content.
639 filename : `lsst.resources.ResourcePath`
640 Filename to use for error messages.
642 Returns
643 -------
644 dataset : `RawFileDatasetInfo`
645 The dataId, and observation information associated with this
646 dataset.
647 """
648 required, optional = self.getObservationInfoSubsets()
649 if isinstance(header, ObservationInfo):
650 obsInfo = header
651 missing = []
652 # Need to check the required properties are present.
653 for property in required:
654 # getattr does not need to be protected because it is using
655 # the defined list above containing properties that must exist.
656 value = getattr(obsInfo, property)
657 if value is None:
658 missing.append(property)
659 if missing:
660 raise ValueError(
661 f"Requested required properties are missing from file {filename}: {missing} (via JSON)"
662 )
664 else:
665 obsInfo = ObservationInfo(
666 header,
667 pedantic=False,
668 filename=str(filename),
669 required=required,
670 subset=required | optional,
671 )
673 dataId = DataCoordinate.standardize(
674 instrument=obsInfo.instrument,
675 exposure=obsInfo.exposure_id,
676 detector=obsInfo.detector_num,
677 universe=self.universe,
678 )
679 return RawFileDatasetInfo(obsInfo=obsInfo, dataId=dataId)
681 def readZipIndexFiles(
682 self, files: Iterable[ResourcePath]
683 ) -> tuple[list[RawFileData], list[ResourcePath], set[ResourcePath], set[ResourcePath]]:
684 """Given a list of files, filter out zip files and look for index files
685 inside.
687 Parameters
688 ----------
689 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ]
690 URIs to the files to be ingested.
692 Returns
693 -------
694 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ]
695 Merged contents of all relevant index files found in zip files.
696 The keys include the path to the data file within the zip using
697 the butler fragment convention of ``zip-path=PATH``.
698 updated_files : `list` [ `lsst.resources.ResourcePath` ]
699 Updated list of the input files with zip entries removed.
700 good_index_files: `set` [ `lsst.resources.ResourcePath` ]
701 Zip files that contained index information.
702 bad_zip_files: `set` [ `lsst.resources.ResourcePath` ]
703 Zip files that contained no index information.
704 """
705 zip_metadata_index = "_metadata_index.json"
707 # Files that weren't zip files.
708 updated_files: list[ResourcePath] = []
710 # Index files we failed to read.
711 bad_index_files: set[ResourcePath] = set()
713 # Any good index files that were found and used.
714 good_index_files: set[ResourcePath] = set()
716 # Processed content from any zip files.
717 indexFileData: list[RawFileData] = []
719 for file in files:
720 if file.getExtension() != ".zip":
721 updated_files.append(file)
722 continue
724 zip_info: dict[str, zipfile.ZipInfo] = {}
725 try:
726 with file.open("rb") as fd, zipfile.ZipFile(fd) as zf:
727 zip_info = {info.filename: info for info in zf.infolist()}
728 content = json.loads(zf.read(zip_metadata_index))
729 index = process_index_data(content, force_dict=True)
730 assert isinstance(index, MutableMapping)
732 # Try to read the ZipIndex.
733 zip_index = ZipIndex.from_open_zip(zf)
734 except Exception as e:
735 if self.config.failFast:
736 raise RuntimeError(f"Problem reading index file from zip file at {file}") from e
737 bad_index_files.add(file)
738 continue
739 self.log.debug("Extracted index metadata from zip file %s", str(file))
740 good_index_files.add(file)
742 # All the metadata read from this index file with keys of full
743 # path.
744 index_entries: dict[ResourcePath, Any] = {}
746 # In theory we could scan for JSON sidecar files associated with
747 # any files not found in the metadata index, but that is not meant
748 # to be possible. Guider data is another issue not handled by
749 # this code.
750 for path_in_zip in index:
751 if path_in_zip not in zip_info:
752 # Index entry exists but no file for it.
753 self.log.info(
754 "File %s is in zip index but not in zip file %s. Ignoring.", path_in_zip, file
755 )
756 continue
757 file_to_ingest = file.replace(fragment=f"zip-path={path_in_zip}")
758 index_entries[file_to_ingest] = index[path_in_zip]
760 file_data = self.processIndexEntries(index_entries)
762 # Validate that the index entries we have read match the
763 # values in the butler zip index.
764 data_ids_from_index: dict[str, tuple[DataCoordinate, ...]] = {}
765 for f in file_data:
766 _, path_in_zip = f.filename.fragment.split("=")
767 data_ids_from_index[path_in_zip] = tuple(d.dataId for d in f.datasets)
769 data_ids_from_butler_index: dict[str, tuple[DataCoordinate, ...]] = {}
770 # Refs indexed by UUID.
771 refs = zip_index.refs.to_refs(universe=self.universe)
772 id_to_ref = {ref.id: ref for ref in refs}
773 for path_in_zip, index_info in zip_index.artifact_map.items():
774 data_ids_from_butler_index[path_in_zip] = tuple(
775 id_to_ref[id_].dataId for id_ in index_info.ids
776 )
778 if data_ids_from_butler_index != data_ids_from_index:
779 self.log.warning(
780 "Recalculating the Data IDs for zip file %s (which may include new metadata corrections) "
781 "results in a difference to the Data IDs recorded in the butler index in that zip. "
782 "Consider remaking the zipped raws.",
783 file,
784 )
786 indexFileData.extend(file_data)
788 return indexFileData, updated_files, good_index_files, bad_index_files
790 def locateAndReadIndexFiles(
791 self, files: Iterable[ResourcePath]
792 ) -> tuple[dict[ResourcePath, Any], list[ResourcePath], set[ResourcePath], set[ResourcePath]]:
793 """Given a list of files, look for index files and read them.
795 Index files can either be explicitly in the list of files to
796 ingest, or else located in the same directory as a file to ingest.
797 Index entries are always used if present.
799 Parameters
800 ----------
801 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ]
802 URIs to the files to be ingested.
804 Returns
805 -------
806 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ]
807 Merged contents of all relevant index files found. These can
808 be explicitly specified index files or ones found in the
809 directory alongside a data file to be ingested.
810 updated_files : `list` [ `lsst.resources.ResourcePath` ]
811 Updated list of the input files with entries removed that were
812 found listed in an index file. Order is not guaranteed to
813 match the order of the files given to this routine.
814 good_index_files: `set` [ `lsst.resources.ResourcePath` ]
815 Index files that were successfully read.
816 bad_index_files: `set` [ `lsst.resources.ResourcePath` ]
817 Files that looked like index files but failed to read properly.
818 """
819 # Convert the paths to absolute for easy comparison with index content.
820 # Do not convert to real paths since we have to assume that index
821 # files are in this location and not the location which it links to.
822 files = tuple(f.abspath() for f in files)
824 # Index files must be named this.
825 index_root_file = "_index.json"
827 # Group the files by directory.
828 files_by_directory: dict[ResourcePath, set[str]] = defaultdict(set)
830 for path in files:
831 directory, file_in_dir = path.split()
832 files_by_directory[directory].add(file_in_dir)
834 # All the metadata read from index files with keys of full path.
835 index_entries: dict[ResourcePath, Any] = {}
837 # Index files we failed to read.
838 bad_index_files: set[ResourcePath] = set()
840 # Any good index files that were found and used.
841 good_index_files: set[ResourcePath] = set()
843 # Look for index files in those directories.
844 for directory, files_in_directory in files_by_directory.items():
845 possible_index_file = directory.join(index_root_file)
846 if possible_index_file.exists():
847 # If we are explicitly requesting an index file the
848 # messages should be different.
849 index_msg = "inferred"
850 is_implied = True
851 if index_root_file in files_in_directory:
852 index_msg = "explicit"
853 is_implied = False
855 # Try to read the index file and catch and report any
856 # problems.
857 try:
858 content = json.loads(possible_index_file.read())
859 index = process_index_data(content, force_dict=True)
860 # mypy should in theory know that this is a mapping
861 # from the overload type annotation of process_index_data.
862 assert isinstance(index, MutableMapping)
863 except Exception as e:
864 # Only trigger the callback if the index file
865 # was asked for explicitly. Triggering on implied file
866 # might be surprising.
867 if not is_implied:
868 self._on_metadata_failure(possible_index_file, e)
869 if self.config.failFast:
870 raise RuntimeError(
871 f"Problem reading index file from {index_msg} location {possible_index_file}"
872 ) from e
873 bad_index_files.add(possible_index_file)
874 continue
876 self.log.debug("Extracted index metadata from %s file %s", index_msg, possible_index_file)
877 good_index_files.add(possible_index_file)
879 # Go through the index adding entries for files.
880 # If we have non-index files in this directory marked for
881 # ingest we should only get index information for those.
882 # If the index file was explicit we use all entries.
883 if is_implied:
884 files_to_ingest = files_in_directory
885 else:
886 files_to_ingest = set(index)
888 # Copy relevant metadata into a single dict for all index
889 # entries.
890 for file_in_dir in files_to_ingest:
891 # Skip an explicitly specified index file.
892 # This should never happen because an explicit index
893 # file will force ingest of all files in the index
894 # and not use the explicit file list. If somehow
895 # this is not true we continue. Raising an exception
896 # seems like the wrong thing to do since this is harmless.
897 if file_in_dir == index_root_file:
898 self.log.info(
899 "Logic error found scanning directory %s. Please file ticket.", directory
900 )
901 continue
902 if file_in_dir in index:
903 file = directory.join(file_in_dir)
904 if file in index_entries:
905 # ObservationInfo overrides raw metadata
906 if isinstance(index[file_in_dir], ObservationInfo) and not isinstance(
907 index_entries[file], ObservationInfo
908 ):
909 self.log.warning(
910 "File %s already specified in an index file but overriding"
911 " with ObservationInfo content from %s",
912 file,
913 possible_index_file,
914 )
915 else:
916 self.log.warning(
917 "File %s already specified in an index file, ignoring content from %s",
918 file,
919 possible_index_file,
920 )
921 # Do nothing in this case
922 continue
924 index_entries[file] = index[file_in_dir]
926 # Remove files from list that have index entries and also
927 # any files that we determined to be explicit index files
928 # or any index files that we failed to read.
929 filtered = set(files) - set(index_entries) - good_index_files - bad_index_files
931 # The filtered list loses the initial order. Retaining the order
932 # is good for testing but does have a cost if there are many
933 # files when copying the good values out. A dict would have faster
934 # lookups (using the files as keys) but use more memory.
935 ordered: list[ResourcePath] = []
936 seen: set[ResourcePath] = set()
937 for f in files:
938 if f in filtered and f not in seen:
939 ordered.append(f)
940 seen.add(f)
942 return index_entries, ordered, good_index_files, bad_index_files
944 def processIndexEntries(self, index_entries: dict[ResourcePath, Any]) -> list[RawFileData]:
945 """Convert index entries to RawFileData.
947 Parameters
948 ----------
949 index_entries : `dict` [`lsst.resources.ResourcePath`, `typing.Any`]
950 Dict indexed by name of file to ingest and with keys either
951 raw metadata or translated
952 `~astro_metadata_translator.ObservationInfo`.
954 Returns
955 -------
956 data : `list` [ `RawFileData` ]
957 Structures containing the metadata extracted from the file,
958 as well as the original filename. All fields will be populated,
959 but the `RawFileDatasetInfo.dataId` attributes will be minimal
960 (unexpanded) `~lsst.daf.butler.DataCoordinate` instances.
961 """
962 formatterClass: type[Formatter | FormatterV2]
963 fileData = []
964 for filename, metadata in index_entries.items():
965 try:
966 datasets = [self._calculate_dataset_info(metadata, filename)]
967 except Exception as e:
968 self.log.debug("Problem extracting metadata for file %s found in index file: %s", filename, e)
969 datasets = []
970 formatterClass = Formatter
971 instrument = None
972 self._on_metadata_failure(filename, e)
973 if self.config.failFast:
974 raise RuntimeError(
975 f"Problem extracting metadata for file {filename} found in index file"
976 ) from e
977 else:
978 instrument, formatterClass = self._determine_instrument_formatter(
979 datasets[0].dataId, filename
980 )
981 if instrument is None:
982 datasets = []
983 fileData.append(
984 RawFileData(
985 datasets=datasets,
986 filename=filename,
987 # MyPy wants this to be a non-abstract class, which is not
988 # true for the error case where instrument is None and
989 # datasets=[].
990 FormatterClass=formatterClass, # type: ignore
991 instrument=instrument,
992 )
993 )
994 return fileData
996 def groupByExposure(self, files: Iterable[RawFileData]) -> list[RawExposureData]:
997 """Group an iterable of `RawFileData` by exposure.
999 Parameters
1000 ----------
1001 files : iterable of `RawFileData`
1002 File-level information to group.
1004 Returns
1005 -------
1006 exposures : `list` of `RawExposureData`
1007 A list of structures that group the file-level information by
1008 exposure. All fields will be populated. The
1009 `RawExposureData.dataId` attributes will be minimal (unexpanded)
1010 `~lsst.daf.butler.DataCoordinate` instances.
1011 """
1012 exposureDimensions = self.universe["exposure"].minimal_group
1013 byExposure = defaultdict(list)
1014 for f in files:
1015 # Assume that the first dataset is representative for the file.
1016 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f)
1018 return [
1019 RawExposureData(
1020 dataId=dataId,
1021 files=exposureFiles,
1022 universe=self.universe,
1023 record=self.makeExposureRecord(exposureFiles[0].datasets[0].obsInfo, self.universe),
1024 dependencyRecords=self.makeDependencyRecords(
1025 exposureFiles[0].datasets[0].obsInfo, self.universe
1026 ),
1027 )
1028 for dataId, exposureFiles in byExposure.items()
1029 ]
1031 def makeExposureRecord(
1032 self, obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any
1033 ) -> DimensionRecord:
1034 """Construct a registry record for an exposure.
1036 This is a method that subclasses will often want to customize. This can
1037 often be done by calling this base class implementation with additional
1038 ``kwargs``.
1040 Parameters
1041 ----------
1042 obsInfo : `~astro_metadata_translator.ObservationInfo`
1043 Observation details for (one of the components of) the exposure.
1044 universe : `lsst.daf.butler.DimensionUniverse`
1045 Set of all known dimensions.
1046 **kwargs
1047 Additional field values for this record.
1049 Returns
1050 -------
1051 record : `lsst.daf.butler.DimensionRecord`
1052 The exposure record that must be inserted into the
1053 `~lsst.daf.butler.Registry` prior to file-level ingest.
1054 """
1055 return makeExposureRecordFromObsInfo(obsInfo, universe, **kwargs)
1057 def makeDependencyRecords(
1058 self, obsInfo: ObservationInfo, universe: DimensionUniverse
1059 ) -> dict[str, DimensionRecord]:
1060 """Construct dependency records.
1062 These dependency records will be inserted into the
1063 `~lsst.daf.butler.Registry` before the exposure records, because they
1064 are dependencies of the exposure. This allows an opportunity to satisfy
1065 foreign key constraints that exist because of dimensions related to the
1066 exposure.
1068 This is a method that subclasses may want to customize, if they've
1069 added dimensions that relate to an exposure.
1071 Parameters
1072 ----------
1073 obsInfo : `~astro_metadata_translator.ObservationInfo`
1074 Observation details for (one of the components of) the exposure.
1075 universe : `lsst.daf.butler.DimensionUniverse`
1076 Set of all known dimensions.
1078 Returns
1079 -------
1080 records : `dict` [`str`, `lsst.daf.butler.DimensionRecord`]
1081 The records to insert, indexed by dimension name.
1082 """
1083 records: dict[str, DimensionRecord] = {}
1084 if "exposure" not in universe:
1085 return records
1086 exposure = universe["exposure"]
1087 if "group" in exposure.implied:
1088 records["group"] = universe["group"].RecordClass(
1089 name=obsInfo.exposure_group,
1090 instrument=obsInfo.instrument,
1091 )
1092 if "day_obs" in exposure.implied:
1093 if (offset := getattr(obsInfo, "observing_day_offset")) is not None:
1094 offset_int = round(offset.to_value("s"))
1095 assert obsInfo.observing_day is not None
1096 timespan = Timespan.from_day_obs(obsInfo.observing_day, offset_int)
1097 else:
1098 timespan = None
1099 records["day_obs"] = universe["day_obs"].RecordClass(
1100 instrument=obsInfo.instrument,
1101 id=obsInfo.observing_day,
1102 timespan=timespan,
1103 )
1104 return records
1106 def expandDataIds(self, data: RawExposureData) -> RawExposureData:
1107 """Expand the data IDs associated with a raw exposure.
1109 This adds the metadata records.
1111 Parameters
1112 ----------
1113 data : `RawExposureData`
1114 A structure containing information about the exposure to be
1115 ingested. Must have `RawExposureData.record` populated. Should
1116 be considered consumed upon return.
1118 Returns
1119 -------
1120 exposure : `RawExposureData`
1121 An updated version of the input structure, with
1122 `RawExposureData.dataId` and nested `RawFileDatasetInfo.dataId`
1123 attributes updated to data IDs for which
1124 `~lsst.daf.butler.DataCoordinate.hasRecords` returns `True`.
1125 """
1126 # We start by expanded the exposure-level data ID; we won't use that
1127 # directly in file ingest, but this lets us do some database lookups
1128 # once per exposure instead of once per file later.
1129 data.dataId = self.butler.registry.expandDataId(
1130 data.dataId,
1131 # We pass in the records we'll be inserting shortly so they aren't
1132 # looked up from the database. We do expect instrument and filter
1133 # records to be retrieved from the database here (though the
1134 # Registry may cache them so there isn't a lookup every time).
1135 records={"exposure": data.record, **data.dependencyRecords},
1136 )
1137 # Now we expand the per-file (exposure+detector) data IDs. This time
1138 # we pass in the records we just retrieved from the exposure data ID
1139 # expansion.
1140 for file in data.files:
1141 for dataset in file.datasets:
1142 dataset.dataId = self.butler.registry.expandDataId(
1143 dataset.dataId,
1144 records={k: data.dataId.records[k] for k in data.dataId.dimensions.elements},
1145 )
1146 return data
1148 def prep(
1149 self,
1150 files: Iterable[ResourcePath],
1151 *,
1152 pool: concurrent.futures.ThreadPoolExecutor | None = None,
1153 search_indexes: bool = True,
1154 ) -> tuple[Iterator[RawExposureData], list[ResourcePath]]:
1155 """Perform all non-database-updating ingest preprocessing steps.
1157 Parameters
1158 ----------
1159 files : iterable over `str` or path-like objects
1160 Paths to the files to be ingested. Will be made absolute
1161 if they are not already.
1162 pool : `concurrent.futures.ThreadPoolExecutor`, optional
1163 If not `None`, a thread pool with which to parallelize some
1164 operations.
1165 search_indexes : `bool`, optional
1166 If `True` the code will search for index JSON files in given
1167 directories. If you know for a fact that index files do not exist
1168 set this to `False` for a slight speed up in metadata gathering.
1170 Returns
1171 -------
1172 exposures : `~collections.abc.Iterator` [ `RawExposureData` ]
1173 Data structures containing dimension records, filenames, and data
1174 IDs to be ingested (one structure for each exposure).
1175 bad_files : `list` of `str`
1176 List of all the files that could not have metadata extracted.
1177 """
1179 def _partition_good_bad(
1180 file_data: Iterable[RawFileData],
1181 ) -> tuple[list[RawFileData], list[ResourcePath]]:
1182 """Filter out bad files and return good with list of bad."""
1183 good_files = []
1184 bad_files = []
1185 for fileDatum in self.progress.wrap(file_data, desc="Reading image metadata"):
1186 if not fileDatum.datasets:
1187 bad_files.append(fileDatum.filename)
1188 else:
1189 good_files.append(fileDatum)
1190 return good_files, bad_files
1192 # Look for zip files.
1193 zip_file_data, files, good_zip_files, bad_zip_files = self.readZipIndexFiles(files)
1194 if bad_zip_files:
1195 self.log.info("Failed to extract index metadata from the following zip files:")
1196 for bad in sorted(bad_zip_files):
1197 self.log.info("- %s", bad)
1199 # Look for index files and read them.
1200 # There should be far fewer index files than data files.
1201 index_entries: dict[ResourcePath, Any] = {}
1202 if search_indexes:
1203 index_entries, files, good_index_files, bad_index_files = self.locateAndReadIndexFiles(files)
1204 if bad_index_files:
1205 self.log.info("Failed to read the following explicitly requested index files:")
1206 for bad in sorted(bad_index_files):
1207 self.log.info("- %s", bad)
1208 else:
1209 # We have been told explicitly there are no indexes.
1210 index_entries = {}
1211 good_index_files = set()
1212 bad_index_files = set()
1214 # Merge information from zips and standalone index files.
1215 good_index_files.update(good_zip_files)
1216 bad_index_files.update(bad_zip_files)
1218 # Now convert all the index file entries to standard form for ingest.
1219 processed_bad_index_files: list[ResourcePath] = []
1220 indexFileData = self.processIndexEntries(index_entries)
1221 indexFileData.extend(zip_file_data)
1222 if indexFileData:
1223 indexFileData, processed_bad_index_files = _partition_good_bad(indexFileData)
1224 self.log.info(
1225 "Successfully extracted metadata for %d file%s found in %d index file%s with %d failure%s",
1226 *_log_msg_counter(indexFileData),
1227 *_log_msg_counter(good_index_files),
1228 *_log_msg_counter(processed_bad_index_files),
1229 )
1231 # Extract metadata and build per-detector regions.
1232 # This could run in threads or a subprocess so collect all output
1233 # before looking at failures.
1234 fileData: Iterator[RawFileData]
1235 if pool is None:
1236 fileData = map(self.extractMetadata, files)
1237 else:
1238 fileData = pool.map(self.extractMetadata, files)
1240 # Filter out all the failed reads and store them for later
1241 # reporting.
1242 good_file_data, bad_files = _partition_good_bad(fileData)
1243 # Only report if we looked at any standalone files at all.
1244 if files:
1245 self.log.info(
1246 "Successfully extracted metadata from %d file%s with %d failure%s",
1247 *_log_msg_counter(good_file_data),
1248 *_log_msg_counter(bad_files),
1249 )
1251 # Combine with data from index files.
1252 good_file_data.extend(indexFileData)
1253 bad_files.extend(processed_bad_index_files)
1254 bad_files.extend(bad_index_files)
1256 # Use that metadata to group files (and extracted metadata) by
1257 # exposure. Never parallelized because it's intrinsically a gather
1258 # step.
1259 exposureData: list[RawExposureData] = self.groupByExposure(good_file_data)
1261 # The next operation operates on RawExposureData instances (one at
1262 # a time) in-place and then returns the modified instance. We call it
1263 # as a pass-through instead of relying on the arguments we pass in to
1264 # have been modified because in the parallel case those arguments are
1265 # going to be pickled and unpickled, and I'm not certain
1266 # multiprocessing is careful enough with that for output arguments to
1267 # work.
1269 # Expand the data IDs to include all dimension metadata; we need this
1270 # because we may need to generate path templates that rely on that
1271 # metadata.
1272 # This is the first step that involves actual database calls (but just
1273 # SELECTs), so if there's going to be a problem with connections vs.
1274 # multiple processes, or lock contention (in SQLite) slowing things
1275 # down, it'll happen here.
1276 return map(self.expandDataIds, exposureData), bad_files
1278 def ingestExposureDatasets(
1279 self,
1280 exposure: RawExposureData,
1281 datasetType: DatasetType,
1282 *,
1283 run: str,
1284 skip_existing_exposures: bool = False,
1285 track_file_attrs: bool = True,
1286 ) -> list[FileDataset]:
1287 """Ingest all raw files in one exposure.
1289 Parameters
1290 ----------
1291 exposure : `RawExposureData`
1292 A structure containing information about the exposure to be
1293 ingested. Must have `RawExposureData.record` populated and all
1294 data ID attributes expanded.
1295 datasetType : `lsst.daf.butler.DatasetType`
1296 The dataset type associated with this exposure.
1297 run : `str`
1298 Name of a RUN-type collection to write to.
1299 skip_existing_exposures : `bool`, optional
1300 If `True` (`False` is default), skip raws that have already been
1301 ingested (i.e. raws for which we already have a dataset with the
1302 same data ID in the target collection, even if from another file).
1303 Note that this is much slower than just not passing
1304 already-ingested files as inputs, because we still need to read and
1305 process metadata to identify which exposures to search for.
1306 track_file_attrs : `bool`, optional
1307 Control whether file attributes such as the size or checksum should
1308 be tracked by the datastore. Whether this parameter is honored
1309 depends on the specific datastore implementation.
1311 Returns
1312 -------
1313 datasets : `list` of `lsst.daf.butler.FileDataset`
1314 Per-file structures identifying the files ingested and their
1315 dataset representation in the data repository.
1316 """
1317 # Raw files are preferentially ingested using a UUID derived from
1318 # the collection name and dataId.
1319 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN):
1320 mode = DatasetIdGenEnum.DATAID_TYPE_RUN
1321 else:
1322 mode = DatasetIdGenEnum.UNIQUE
1324 # The datasets for this exposure could all be from a single zip
1325 # or be distinct files. Need to pull out the zip files.
1326 zips: dict[ResourcePath, list[RawFileData]] = defaultdict(list)
1327 datasets: list[FileDataset] = []
1328 for file in exposure.files:
1329 if file.filename.getExtension() == ".zip":
1330 zips[file.filename.replace(fragment="")].append(file)
1331 continue
1333 refs = [
1334 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets
1335 ]
1336 if refs:
1337 datasets.append(
1338 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass)
1339 )
1341 if datasets:
1342 with self.butler.record_metrics() as butler_metrics:
1343 self.butler.ingest(
1344 *datasets,
1345 transfer=self.config.transfer,
1346 record_validation_info=track_file_attrs,
1347 skip_existing=skip_existing_exposures,
1348 )
1349 self.metrics.time_for_ingest += butler_metrics.time_in_ingest
1351 # In theory it is possible for the new Data IDs to differ from the Data
1352 # IDs stored in the Zip index. That could happen if there is a metadata
1353 # correction that changes the exposure or detector numbers. We have to
1354 # assume that by the time the zip has been made that this correction
1355 # has been applied. If we don't assume that then we have to
1356 # regenerate the index but we cannot change the contents of the zip.
1357 # We would also need the ability for butler.ingest_zip to take an
1358 # override ZipIndex object.
1359 # The Dataset ref IDs will only change if the data IDs change.
1360 for zip, files in zips.items():
1361 zip_datasets: list[FileDataset] = [] # Needed for return value.
1362 for file in files:
1363 refs = [
1364 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets
1365 ]
1366 if refs:
1367 # Assumes the guiders are not included in the metadata
1368 # index.
1369 zip_datasets.append(
1370 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass)
1371 )
1372 with self.butler.record_metrics() as butler_metrics:
1373 self.butler.ingest_zip(
1374 zip, transfer=self.config.transfer, skip_existing=skip_existing_exposures
1375 )
1376 datasets.extend(zip_datasets)
1377 self.metrics.time_for_ingest += butler_metrics.time_in_ingest
1379 return datasets
1381 def ingestFiles(
1382 self,
1383 files: Sequence[ResourcePath],
1384 *,
1385 pool: concurrent.futures.ThreadPoolExecutor | None = None,
1386 num_workers: int = 1,
1387 run: str | None = None,
1388 skip_existing_exposures: bool = False,
1389 update_exposure_records: bool = False,
1390 track_file_attrs: bool = True,
1391 search_indexes: bool = True,
1392 skip_ingest: bool = False,
1393 ) -> tuple[list[DatasetRef], list[ResourcePath], int, int, int]:
1394 """Ingest files into a Butler data repository.
1396 This creates any new exposure or visit Dimension entries needed to
1397 identify the ingested files, creates new Dataset entries in the
1398 Registry and finally ingests the files themselves into the Datastore.
1399 Any needed instrument, detector, and physical_filter Dimension entries
1400 must exist in the Registry before `run` is called.
1402 Parameters
1403 ----------
1404 files : iterable over `lsst.resources.ResourcePath`
1405 URIs to the files to be ingested.
1406 pool : `concurrent.futures.ThreadPoolExecutor`, optional
1407 If not `None`, a thread pool with which to parallelize some
1408 operations.
1409 num_workers : `int`, optional
1410 The number of workers to use. Ignored if ``pool`` is not `None`.
1411 run : `str`, optional
1412 Name of a RUN-type collection to write to, overriding
1413 the default derived from the instrument name.
1414 skip_existing_exposures : `bool`, optional
1415 If `True` (`False` is default), skip raws that have already been
1416 ingested (i.e. raws for which we already have a dataset with the
1417 same data ID in the target collection, even if from another file).
1418 Note that this is much slower than just not passing
1419 already-ingested files as inputs, because we still need to read and
1420 process metadata to identify which exposures to search for. It
1421 also will not work reliably if multiple processes are attempting to
1422 ingest raws from the same exposure concurrently, in that different
1423 processes may still attempt to ingest the same raw and conflict,
1424 causing a failure that prevents other raws from the same exposure
1425 from being ingested.
1426 update_exposure_records : `bool`, optional
1427 If `True` (`False` is default), update existing exposure records
1428 that conflict with the new ones instead of rejecting them. THIS IS
1429 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS
1430 KNOWN TO BE BAD. This should usually be combined with
1431 ``skip_existing_exposures=True``.
1432 track_file_attrs : `bool`, optional
1433 Control whether file attributes such as the size or checksum should
1434 be tracked by the datastore. Whether this parameter is honored
1435 depends on the specific datastore implementation.
1436 search_indexes : `bool`, optional
1437 If `True` the code will search for index JSON files in given
1438 directories. If you know for a fact that index files do not exist
1439 set this to `False` for a slight speed up in metadata gathering.
1440 skip_ingest : `bool`, optional
1441 Set this to `True` to do metadata extraction and dimension record
1442 updates without attempting to re-ingest. This can be useful if
1443 there has been a metadata correction associated with an exposure.
1445 Returns
1446 -------
1447 refs : `list` of `lsst.daf.butler.DatasetRef`
1448 Dataset references for ingested raws.
1449 bad_files : `list` of `lsst.resources.ResourcePath`
1450 Given paths that could not be ingested.
1451 n_exposures : `int`
1452 Number of exposures successfully ingested.
1453 n_exposures_failed : `int`
1454 Number of exposures that failed when inserting dimension data.
1455 n_ingests_failed : `int`
1456 Number of exposures that failed when ingesting raw datasets.
1457 """
1458 created_pool = False
1459 if pool is None and num_workers > 1:
1460 pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers)
1461 created_pool = True
1463 try:
1464 with self.metrics.collect_metric(
1465 "time_for_metadata",
1466 self.log,
1467 msg="Reading metadata from %d file%s",
1468 args=(*_log_msg_counter(files),),
1469 ):
1470 exposureData, bad_files = self.prep(files, pool=pool, search_indexes=search_indexes)
1471 finally:
1472 if created_pool and pool:
1473 # The pool is not needed any more so close it if we created
1474 # it to ensure we clean up resources.
1475 pool.shutdown(wait=True)
1477 # Up to this point, we haven't modified the data repository at all.
1478 # Now we finally do that, with one transaction per exposure. This is
1479 # not parallelized at present because the performance of this step is
1480 # limited by the database server. That may or may not change in the
1481 # future once we increase our usage of bulk inserts and reduce our
1482 # usage of savepoints; we've tried to get everything but the database
1483 # operations done in advance to reduce the time spent inside
1484 # transactions.
1485 refs = []
1486 runs = set()
1487 datasetTypes: dict[str, DatasetType] = {}
1488 n_exposures = 0
1489 n_exposures_failed = 0
1490 n_ingests_failed = 0
1491 for exposure in self.progress.wrap(exposureData, desc="Ingesting raw exposures"):
1492 assert exposure.record is not None, "Should be guaranteed by prep()"
1493 self.log.debug(
1494 "Attempting to ingest %d file%s from exposure %s:%s",
1495 *_log_msg_counter(exposure.files),
1496 exposure.record.instrument,
1497 exposure.record.obs_id,
1498 )
1500 try:
1501 with self.metrics.collect_metric(
1502 "time_for_records",
1503 self.log,
1504 msg="Creating dimension records for instrument %s, exposure %s",
1505 args=(
1506 str(exposure.record.instrument),
1507 str(exposure.record.id),
1508 ),
1509 ):
1510 for name, record in exposure.dependencyRecords.items():
1511 self.butler.registry.syncDimensionData(name, record, update=update_exposure_records)
1512 inserted_or_updated = self.butler.registry.syncDimensionData(
1513 "exposure",
1514 exposure.record,
1515 update=update_exposure_records,
1516 )
1517 if inserted_or_updated is not False:
1518 with self.metrics.collect_metric(
1519 "time_for_callbacks", log=self.log, msg="Exposure record updated. Calling handler"
1520 ):
1521 self._on_exposure_record(exposure.record)
1522 except Exception as e:
1523 self._on_ingest_failure(exposure, e)
1524 n_exposures_failed += 1
1525 self.log.warning(
1526 "Exposure %s:%s could not be registered: %s",
1527 exposure.record.instrument,
1528 exposure.record.obs_id,
1529 e,
1530 )
1531 if self.config.failFast:
1532 raise e
1533 continue
1535 if isinstance(inserted_or_updated, dict):
1536 # Exposure is in the registry and we updated it, so
1537 # syncDimensionData returned a dict.
1538 columns_updated = list(inserted_or_updated.keys())
1539 s_col = "s" if len(columns_updated) != 1 else ""
1540 w_col = "were" if len(columns_updated) != 1 else "was"
1541 self.log.info(
1542 "Exposure %s:%s was already present, but column%s %s %s updated.",
1543 exposure.record.instrument,
1544 exposure.record.obs_id,
1545 s_col,
1546 ", ".join(repr(c) for c in columns_updated),
1547 w_col,
1548 )
1550 if skip_ingest:
1551 continue
1553 # Determine the instrument so we can work out the dataset type.
1554 instrument = exposure.files[0].instrument
1555 assert instrument is not None, (
1556 "file should have been removed from this list by prep if instrument could not be found"
1557 )
1559 datasetType = self.get_raw_datasetType(instrument, datasetTypes)
1560 if datasetType.name not in datasetTypes:
1561 self.butler.registry.registerDatasetType(datasetType)
1562 datasetTypes[datasetType.name] = datasetType
1564 # Override default run if nothing specified explicitly.
1565 if run is None:
1566 this_run = instrument.makeDefaultRawIngestRunName()
1567 else:
1568 this_run = run
1569 if this_run not in runs:
1570 self.butler.registry.registerCollection(this_run, type=CollectionType.RUN)
1571 runs.add(this_run)
1572 try:
1573 datasets_for_exposure = self.ingestExposureDatasets(
1574 exposure,
1575 datasetType=datasetType,
1576 run=this_run,
1577 skip_existing_exposures=skip_existing_exposures,
1578 track_file_attrs=track_file_attrs,
1579 )
1580 except Exception as e:
1581 self._on_ingest_failure(exposure, e)
1582 n_ingests_failed += 1
1583 self.log.warning("Failed to ingest the following for reason: %s", e)
1584 for f in exposure.files:
1585 self.log.warning("- %s", f.filename)
1586 if self.config.failFast:
1587 raise e
1588 continue
1589 else:
1590 with self.metrics.collect_metric("time_for_callbacks", self.log, msg="Calling on_success"):
1591 self._on_success(datasets_for_exposure)
1592 for dataset in datasets_for_exposure:
1593 refs.extend(dataset.refs)
1595 # Success for this exposure.
1596 n_exposures += 1
1597 self.log.info(
1598 "Exposure %s:%s ingested successfully", exposure.record.instrument, exposure.record.obs_id
1599 )
1601 return refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed
1603 @timeMethod
1604 def run(
1605 self,
1606 files: Iterable[ResourcePathExpression],
1607 *,
1608 pool: concurrent.futures.ThreadPoolExecutor | None = None,
1609 processes: int | None = None, # Deprecated. Use num_workers.
1610 run: str | None = None,
1611 file_filter: str | re.Pattern = r"\.fit[s]?\b",
1612 group_files: bool = True,
1613 skip_existing_exposures: bool = False,
1614 update_exposure_records: bool = False,
1615 track_file_attrs: bool = True,
1616 search_indexes: bool = True,
1617 num_workers: int = 1,
1618 skip_ingest: bool = False,
1619 ) -> list[DatasetRef]:
1620 """Ingest files into a Butler data repository.
1622 This creates any new exposure or visit Dimension entries needed to
1623 identify the ingested files, creates new Dataset entries in the
1624 Registry and finally ingests the files themselves into the Datastore.
1625 Any needed instrument, detector, and physical_filter Dimension entries
1626 must exist in the Registry before `run` is called.
1628 Parameters
1629 ----------
1630 files : iterable `lsst.resources.ResourcePath`, `str` or path-like
1631 Paths to the files to be ingested. Can refer to directories.
1632 Will be made absolute if they are not already.
1633 pool : `concurrent.futures.ThreadPoolExecutor`, optional
1634 If not `None`, a process pool with which to parallelize some
1635 operations. This parameter was previously a `multiprocessing.Pool`
1636 but that option is no longer supported since it is slow compared
1637 to futures.
1638 processes : `int`, optional
1639 The number of processes to use. Ignored if ``pool`` is not `None`.
1640 Deprecated. Please use ``num_workers`` parameter instead.
1641 run : `str`, optional
1642 Name of a RUN-type collection to write to, overriding
1643 the default derived from the instrument name.
1644 file_filter : `str` or `re.Pattern`, optional
1645 Pattern to use to discover files to ingest within directories.
1646 The default is to search for FITS files. The regex applies to
1647 files within the directory.
1648 group_files : `bool`, optional
1649 Group files by directory if they have been discovered in
1650 directories. Will not affect files explicitly provided.
1651 skip_existing_exposures : `bool`, optional
1652 If `True` (`False` is default), skip raws that have already been
1653 ingested (i.e. raws for which we already have a dataset with the
1654 same data ID in the target collection, even if from another file).
1655 Note that this is much slower than just not passing
1656 already-ingested files as inputs, because we still need to read and
1657 process metadata to identify which exposures to search for. It
1658 also will not work reliably if multiple processes are attempting to
1659 ingest raws from the same exposure concurrently, in that different
1660 processes may still attempt to ingest the same raw and conflict,
1661 causing a failure that prevents other raws from the same exposure
1662 from being ingested.
1663 update_exposure_records : `bool`, optional
1664 If `True` (`False` is default), update existing exposure records
1665 that conflict with the new ones instead of rejecting them. THIS IS
1666 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS
1667 KNOWN TO BE BAD. This should usually be combined with
1668 ``skip_existing_exposures=True``.
1669 track_file_attrs : `bool`, optional
1670 Control whether file attributes such as the size or checksum should
1671 be tracked by the datastore. Whether this parameter is honored
1672 depends on the specific datastore implementation.
1673 search_indexes : `bool`, optional
1674 If `True` the code will search for index JSON files in given
1675 directories. If you know for a fact that index files do not exist
1676 set this to `False` for a slight speed up in metadata gathering.
1677 num_workers : `int`, optional
1678 The number of workers to use. Ignored if ``pool`` parameter is
1679 given.
1680 skip_ingest : `bool`, optional
1681 Set this to `True` to do metadata extraction and dimension record
1682 updates without attempting to re-ingest. This can be useful if
1683 there has been a metadata correction associated with an exposure.
1685 Returns
1686 -------
1687 refs : `list` of `lsst.daf.butler.DatasetRef`
1688 Dataset references for ingested raws.
1690 Notes
1691 -----
1692 This method inserts all datasets for an exposure within a transaction,
1693 guaranteeing that partial exposures are never ingested. The exposure
1694 dimension record is inserted with
1695 `lsst.daf.butler.Registry.syncDimensionData` first (in its own
1696 transaction), which inserts only if a record with the same
1697 primary key does not already exist. This allows different files within
1698 the same exposure to be ingested in different runs.
1699 """
1700 if pool and not isinstance(pool, concurrent.futures.ThreadPoolExecutor):
1701 raise ValueError(f"This parameter must now be a ThreadPoolExecutor but was given {pool}.")
1703 if processes is not None:
1704 warnings.warn(
1705 "Processes parameter is deprecated. Please use num_workers parameter.",
1706 FutureWarning,
1707 stacklevel=3, # Jump above the timeMethod wrapper.
1708 )
1709 num_workers = processes
1711 refs = []
1712 bad_files = []
1713 n_exposures = 0
1714 n_exposures_failed = 0
1715 n_ingests_failed = 0
1716 self.metrics.reset() # Clear previous metrics.
1717 ingest_duration = 0.0
1718 if group_files:
1719 with time_this(log=self.log, msg="Processing ingest groups") as timer:
1720 for group in ResourcePath.findFileResources(files, file_filter, group_files):
1721 new_refs, bad, n_exp, n_exp_fail, n_ingest_fail = self.ingestFiles(
1722 tuple(group),
1723 pool=pool,
1724 num_workers=num_workers,
1725 run=run,
1726 skip_existing_exposures=skip_existing_exposures,
1727 update_exposure_records=update_exposure_records,
1728 track_file_attrs=track_file_attrs,
1729 search_indexes=search_indexes,
1730 skip_ingest=skip_ingest,
1731 )
1732 refs.extend(new_refs)
1733 bad_files.extend(bad)
1734 n_exposures += n_exp
1735 n_exposures_failed += n_exp_fail
1736 n_ingests_failed += n_ingest_fail
1737 ingest_duration = timer.duration
1738 else:
1739 with time_this(log=self.log, msg="Ingesting all files in one batch") as timer:
1740 refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed = self.ingestFiles(
1741 tuple(ResourcePath.findFileResources(files, file_filter, group_files)),
1742 pool=pool,
1743 num_workers=num_workers,
1744 run=run,
1745 skip_existing_exposures=skip_existing_exposures,
1746 update_exposure_records=update_exposure_records,
1747 track_file_attrs=track_file_attrs,
1748 search_indexes=search_indexes,
1749 skip_ingest=skip_ingest,
1750 )
1751 ingest_duration = timer.duration
1753 had_failure = False
1755 if bad_files:
1756 had_failure = True
1757 self.log.warning("Could not extract observation metadata from the following:")
1758 for f in bad_files:
1759 self.log.warning("- %s", f)
1761 if skip_ingest:
1762 ingest_text = ""
1763 else:
1764 ingest_text = f" - time in butler ingest: {self.metrics.time_for_ingest} s\n"
1766 self.log.info(
1767 "Successfully processed data from %d exposure%s with %d failure%s from exposure"
1768 " registration and %d failure%s from file ingest.\n"
1769 "Timing breakdown:\n"
1770 " - time in metadata gathering: %f s\n"
1771 " - time in dimension record writing: %f s\n"
1772 "%s"
1773 " - time in user-supplied callbacks: %f s\n",
1774 *_log_msg_counter(n_exposures),
1775 *_log_msg_counter(n_exposures_failed),
1776 *_log_msg_counter(n_ingests_failed),
1777 self.metrics.time_for_metadata,
1778 self.metrics.time_for_records,
1779 ingest_text,
1780 self.metrics.time_for_callbacks,
1781 )
1782 if n_exposures_failed > 0 or n_ingests_failed > 0:
1783 had_failure = True
1784 if not skip_ingest:
1785 self.log.info(
1786 "Ingested %d distinct Butler dataset%s in %f sec", *_log_msg_counter(refs), ingest_duration
1787 )
1789 if had_failure:
1790 raise RuntimeError("Some failures encountered during ingestion")
1792 return refs