Coverage for python/lsst/pipe/base/gc_metrics.py: 53%

53 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-30 08:43 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["GcMetrics"] 

31 

32import gc 

33from collections import defaultdict 

34from types import TracebackType 

35from typing import Self 

36 

37import pydantic 

38 

39from ._task_metadata import TaskMetadata 

40 

41 

42def _gc_stats() -> dict[str, list[int]]: 

43 """Convert result of `gc.get_stats` to a dictionary of lists.""" 

44 result: dict[str, list[int]] = defaultdict(list) 

45 for gen_stats in gc.get_stats(): 

46 for key, stat in gen_stats.items(): 

47 result[key].append(stat) 

48 return result 

49 

50 

51class GcMetrics(pydantic.BaseModel): 

52 """Context manager which collects GC metrics and converts them into 

53 a dictionary suitable for TaskMetadata. 

54 """ 

55 

56 start_isenabled: bool | None = None 

57 """Whether GC is enabled on entering context (`bool` or `None`).""" 

58 

59 end_isenabled: bool | None = None 

60 """Whether GC is enabled on exiting context (`bool` or `None`).""" 

61 

62 start_threshold: list[int] | None = None 

63 """GC thresholds on entering context (`list`[`int`] or `None`).""" 

64 

65 end_threshold: list[int] | None = None 

66 """GC thresholds on exiting context (`list`[`int`] or `None`).""" 

67 

68 start_count: list[int] | None = None 

69 """GC collection counts on entering context (`list`[`int`] or `None`).""" 

70 

71 end_count: list[int] | None = None 

72 """GC collection counts on exiting context (`list`[`int`] or `None`).""" 

73 

74 start_stats: dict[str, list[int]] | None = None 

75 """GC stats on entering context (`dict`[`str`, `list`[`int`]] or `None`). 

76 

77 These are the same values as returned from `gc.get_stats` but rearranged 

78 to be indexed by string key first and generation second. 

79 """ 

80 

81 end_stats: dict[str, list[int]] | None = None 

82 """GC stats on exiting context, same format as `start_stats` 

83 (`dict`[`str`, `list`[`int`]] or `None`). 

84 """ 

85 

86 def __enter__(self) -> Self: 

87 self.start_isenabled = gc.isenabled() 

88 self.start_threshold = list(gc.get_threshold()) 

89 self.start_count = list(gc.get_count()) 

90 self.start_stats = _gc_stats() 

91 return self 

92 

93 def __exit__( 

94 self, 

95 exc_type: type[BaseException] | None, 

96 exc_val: BaseException | None, 

97 exc_tb: TracebackType | None, 

98 ) -> None: 

99 self.end_isenabled = gc.isenabled() 

100 self.end_threshold = list(gc.get_threshold()) 

101 self.end_count = list(gc.get_count()) 

102 self.end_stats = _gc_stats() 

103 

104 @classmethod 

105 def from_task_metadata(cls, metadata: TaskMetadata) -> GcMetrics | None: 

106 """Extract GC metrics from task metadata. 

107 

108 Parameters 

109 ---------- 

110 metadata : `TaskMetadata` 

111 Metadata written by 

112 `.single_quantum_executor.SingleQuantumExecutor`. 

113 

114 Returns 

115 ------- 

116 gc_metrics : `GcMetrics` or `None` 

117 GC metrics for this quantum, or `None` if the expected fields were 

118 not found. 

119 """ 

120 try: 

121 quantum_metadata = metadata["quantum"] 

122 except KeyError: 

123 return None 

124 try: 

125 gc_metadata = quantum_metadata["gc_metrics"] 

126 except KeyError: 

127 return None 

128 

129 return GcMetrics(**gc_metadata.to_dict())