Coverage for sources / vibelinter / engine.py: 98%

72 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-01 02:35 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21 

22''' Central linter engine coordinating single-pass CST analysis. ''' 

23 

24 

25from . import __ 

26from . import exceptions as _exceptions 

27from .rules import context as _context 

28from .rules import registry as _registry 

29from .rules import violations as _violations 

30from .rules.base import BaseRule as _BaseRule 

31 

32 

33def _create_empty_rule_parameters( ) -> __.immut.Dictionary[ 

34 str, __.immut.Dictionary[ str, __.typx.Any ] ]: 

35 ''' Creates empty rule parameters dictionary. ''' 

36 return __.immut.Dictionary( ) 

37 

38 

39class EngineConfiguration( __.immut.DataclassObject ): 

40 ''' Configuration for linter engine behavior and rule selection. ''' 

41 

42 enabled_rules: __.typx.Annotated[ 

43 frozenset[ str ], 

44 __.ddoc.Doc( 'VBL codes of rules to execute.' ) ] 

45 rule_parameters: __.typx.Annotated[ 

46 __.immut.Dictionary[ 

47 str, __.immut.Dictionary[ str, __.typx.Any ] ], 

48 __.ddoc.Doc( 

49 'Rule-specific configuration parameters indexed by VBL code.' 

50 ) ] = __.dcls.field( default_factory = _create_empty_rule_parameters ) 

51 context_size: __.typx.Annotated[ 

52 int, 

53 __.ddoc.Doc( 

54 'Number of context lines to extract around violations.' ) ] = 2 

55 include_context: __.typx.Annotated[ 

56 bool, 

57 __.ddoc.Doc( 

58 'Whether to extract source context for violations.' ) ] = True 

59 

60 

61class Report( __.immut.DataclassObject ): 

62 ''' Results of linting analysis including violations and metadata. ''' 

63 

64 violations: __.typx.Annotated[ 

65 tuple[ _violations.Violation, ... ], 

66 __.ddoc.Doc( 'All violations detected during analysis.' ) ] 

67 contexts: __.typx.Annotated[ 

68 tuple[ _violations.ViolationContext, ... ], 

69 __.ddoc.Doc( 'Violation contexts when context extraction enabled.' ) ] 

70 filename: __.typx.Annotated[ 

71 str, __.ddoc.Doc( 'Path to analyzed source file.' ) ] 

72 rule_count: __.typx.Annotated[ 

73 int, __.ddoc.Doc( 'Number of rules executed during analysis.' ) ] 

74 analysis_duration_ms: __.typx.Annotated[ 

75 float, 

76 __.ddoc.Doc( 'Time spent in analysis phase excluding parsing.' ) ] 

77 

78 

79class Engine: 

80 ''' Central orchestrator for linting analysis. 

81 

82 Implements single-pass CST traversal with multiple rule execution. 

83 ''' 

84 

85 def __init__( 

86 self, 

87 registry_manager: __.typx.Annotated[ 

88 _registry.RuleRegistryManager, 

89 __.ddoc.Doc( 'Rule registry for instantiating rules.' ) ], 

90 configuration: __.typx.Annotated[ 

91 EngineConfiguration, 

92 __.ddoc.Doc( 'Engine configuration and rule selection.' ) ], 

93 ) -> None: 

94 self.registry_manager = registry_manager 

95 self.configuration = configuration 

96 

97 def lint_file( 

98 self, 

99 file_path: __.typx.Annotated[ 

100 __.pathlib.Path, 

101 __.ddoc.Doc( 'Path to Python source file to analyze.' ) ] 

102 ) -> __.typx.Annotated[ 

103 Report, 

104 __.ddoc.Doc( 

105 'Analysis results including violations and metadata.' ) ]: 

106 ''' Analyzes a Python source file and returns violations. ''' 

107 source_code = file_path.read_text( encoding = 'utf-8' ) 

108 return self.lint_source( source_code, str( file_path ) ) 

109 

110 def _create_metadata_wrapper( 

111 self, source_code: str, filename: str 

112 ) -> tuple[ __.libcst.metadata.MetadataWrapper, tuple[ str, ... ] ]: 

113 ''' Parses source and creates metadata wrapper. ''' 

114 module = __.libcst.parse_module( source_code ) 

115 source_lines = tuple( source_code.splitlines( ) ) 

116 try: wrapper = __.libcst.metadata.MetadataWrapper( module ) 

117 except Exception as exc: 

118 raise _exceptions.MetadataProvideFailure( filename ) from exc 

119 return wrapper, source_lines 

120 

121 def _instantiate_rules( 

122 self, 

123 wrapper: __.libcst.metadata.MetadataWrapper, 

124 source_lines: tuple[ str, ... ], 

125 filename: str 

126 ) -> list[ _BaseRule ]: 

127 ''' Instantiates all enabled rules with configuration. ''' 

128 rules: list[ _BaseRule ] = [ ] 

129 for vbl_code in self.configuration.enabled_rules: 

130 params = self.configuration.rule_parameters.get( 

131 vbl_code, __.immut.Dictionary( ) ) 

132 try: 

133 rule = self.registry_manager.produce_rule_instance( 

134 vbl_code = vbl_code, 

135 filename = filename, 

136 wrapper = wrapper, 

137 source_lines = source_lines, 

138 **params ) 

139 rules.append( rule ) 

140 except Exception as exc: 

141 raise _exceptions.RuleExecuteFailure( vbl_code ) from exc 

142 return rules 

143 

144 def _execute_rules( 

145 self, 

146 rules: list[ _BaseRule ], 

147 wrapper: __.libcst.metadata.MetadataWrapper 

148 ) -> None: 

149 ''' Executes rules via single-pass CST traversal. ''' 

150 for rule in rules: 

151 try: wrapper.visit( rule ) 

152 except Exception as exc: # noqa: PERF203 

153 raise _exceptions.RuleExecuteFailure( rule.rule_id ) from exc 

154 

155 def _collect_violations( 

156 self, rules: list[ _BaseRule ] 

157 ) -> list[ _violations.Violation ]: 

158 ''' Collects and sorts violations from all rules. ''' 

159 all_violations: list[ _violations.Violation ] = [ ] 

160 for rule in rules: 

161 all_violations.extend( rule.violations ) 

162 all_violations.sort( key = lambda v: ( v.line, v.column ) ) 

163 return all_violations 

164 

165 def lint_source( 

166 self, 

167 source_code: __.typx.Annotated[ 

168 str, 

169 __.ddoc.Doc( 'Python source code to analyze.' ) ], 

170 filename: __.typx.Annotated[ 

171 str, 

172 __.ddoc.Doc( 'Logical filename for source code.' ) ] = '<string>', 

173 ) -> __.typx.Annotated[ 

174 Report, 

175 __.ddoc.Doc( 

176 'Analysis results including violations and metadata.' ) ]: 

177 ''' Analyzes Python source code and returns violations. ''' 

178 analysis_start_time = __.time.perf_counter( ) 

179 wrapper, source_lines = self._create_metadata_wrapper( 

180 source_code, filename ) 

181 rules = self._instantiate_rules( wrapper, source_lines, filename ) 

182 self._execute_rules( rules, wrapper ) 

183 all_violations = self._collect_violations( rules ) 

184 violation_contexts: tuple[ 

185 _violations.ViolationContext, ... ] = ( ) 

186 if self.configuration.include_context and all_violations: 

187 violation_contexts = _context.extract_contexts_for_violations( 

188 all_violations, 

189 source_lines, 

190 self.configuration.context_size ) 

191 analysis_duration_ms = ( 

192 ( __.time.perf_counter( ) - analysis_start_time ) * 1000 ) 

193 return Report( 

194 violations = tuple( all_violations ), 

195 contexts = violation_contexts, 

196 filename = filename, 

197 rule_count = len( rules ), 

198 analysis_duration_ms = analysis_duration_ms ) 

199 

200 def lint_files( 

201 self, 

202 file_paths: __.typx.Annotated[ 

203 __.cabc.Sequence[ __.pathlib.Path ], 

204 __.ddoc.Doc( 'Paths to Python source files to analyze.' ) ] 

205 ) -> __.typx.Annotated[ 

206 tuple[ Report, ... ], 

207 __.ddoc.Doc( 'Analysis results for all files.' ) ]: 

208 ''' Analyzes multiple Python source files. ''' 

209 reports: list[ Report ] = [ ] 

210 for file_path in file_paths: 

211 try: report = self.lint_file( file_path ) 

212 except Exception: continue # noqa: S112 

213 reports.append( report ) 

214 return tuple( reports )