Coverage for sources / vibelinter / engine.py: 97%

139 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-07 04:34 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21 

22''' Central linter engine coordinating single-pass CST analysis. ''' 

23 

24 

25from . import __ 

26from . import exceptions as _exceptions 

27from .rules import context as _context 

28from .rules import registry as _registry 

29from .rules import violations as _violations 

30from .rules.base import BaseRule as _BaseRule 

31 

32 

33def _create_empty_rule_parameters( ) -> __.immut.Dictionary[ 

34 str, __.immut.Dictionary[ str, __.typx.Any ] ]: 

35 ''' Creates empty rule parameters dictionary. ''' 

36 return __.immut.Dictionary( ) 

37 

38 

39class EngineConfiguration( __.immut.DataclassObject ): 

40 ''' Configuration for linter engine behavior and rule selection. ''' 

41 

42 enabled_rules: __.typx.Annotated[ 

43 frozenset[ str ], 

44 __.ddoc.Doc( 'VBL codes of rules to execute.' ) ] 

45 rule_parameters: __.typx.Annotated[ 

46 __.immut.Dictionary[ 

47 str, __.immut.Dictionary[ str, __.typx.Any ] ], 

48 __.ddoc.Doc( 

49 'Rule-specific configuration parameters indexed by VBL code.' 

50 ) ] = __.dcls.field( default_factory = _create_empty_rule_parameters ) 

51 context_size: __.typx.Annotated[ 

52 int, 

53 __.ddoc.Doc( 

54 'Number of context lines to extract around violations.' ) ] = 2 

55 include_context: __.typx.Annotated[ 

56 bool, 

57 __.ddoc.Doc( 

58 'Whether to extract source context for violations.' ) ] = True 

59 per_file_ignores: __.typx.Annotated[ 

60 __.immut.Dictionary[ str, tuple[ str, ... ] ], 

61 __.ddoc.Doc( 

62 'Per-file rule exclusions.' 

63 ) ] = __.dcls.field( default_factory = lambda: __.immut.Dictionary( ) ) 

64 

65 

66class Report( __.immut.DataclassObject ): 

67 ''' Results of linting analysis including violations and metadata. ''' 

68 

69 violations: __.typx.Annotated[ 

70 tuple[ _violations.Violation, ... ], 

71 __.ddoc.Doc( 'All violations detected during analysis.' ) ] 

72 contexts: __.typx.Annotated[ 

73 tuple[ _violations.ViolationContext, ... ], 

74 __.ddoc.Doc( 'Violation contexts when context extraction enabled.' ) ] 

75 filename: __.typx.Annotated[ 

76 str, __.ddoc.Doc( 'Path to analyzed source file.' ) ] 

77 rule_count: __.typx.Annotated[ 

78 int, __.ddoc.Doc( 'Number of rules executed during analysis.' ) ] 

79 analysis_duration_ms: __.typx.Annotated[ 

80 float, 

81 __.ddoc.Doc( 'Time spent in analysis phase excluding parsing.' ) ] 

82 

83 

84class Engine: 

85 ''' Central orchestrator for linting analysis. 

86 

87 Implements single-pass CST traversal with multiple rule execution. 

88 ''' 

89 

90 def __init__( 

91 self, 

92 registry_manager: __.typx.Annotated[ 

93 _registry.RuleRegistryManager, 

94 __.ddoc.Doc( 'Rule registry for instantiating rules.' ) ], 

95 configuration: __.typx.Annotated[ 

96 EngineConfiguration, 

97 __.ddoc.Doc( 'Engine configuration and rule selection.' ) ], 

98 ) -> None: 

99 self.registry_manager = registry_manager 

100 self.configuration = configuration 

101 

102 def lint_file( 

103 self, 

104 file_path: __.typx.Annotated[ 

105 __.pathlib.Path, 

106 __.ddoc.Doc( 'Path to Python source file to analyze.' ) ] 

107 ) -> __.typx.Annotated[ 

108 Report, 

109 __.ddoc.Doc( 

110 'Analysis results including violations and metadata.' ) ]: 

111 ''' Analyzes a Python source file and returns violations. ''' 

112 source_code = file_path.read_text( encoding = 'utf-8' ) 

113 return self.lint_source( source_code, str( file_path ) ) 

114 

115 def _create_metadata_wrapper( 

116 self, source_code: str, filename: str 

117 ) -> tuple[ __.libcst.metadata.MetadataWrapper, tuple[ str, ... ] ]: 

118 ''' Parses source and creates metadata wrapper. ''' 

119 module = __.libcst.parse_module( source_code ) 

120 source_lines = tuple( source_code.splitlines( ) ) 

121 try: wrapper = __.libcst.metadata.MetadataWrapper( module ) 

122 except Exception as exc: 

123 raise _exceptions.MetadataProvideFailure( filename ) from exc 

124 return wrapper, source_lines 

125 

126 def _instantiate_rules( 

127 self, 

128 wrapper: __.libcst.metadata.MetadataWrapper, 

129 source_lines: tuple[ str, ... ], 

130 filename: str 

131 ) -> list[ _BaseRule ]: 

132 ''' Instantiates all enabled rules with configuration. ''' 

133 rules: list[ _BaseRule ] = [ ] 

134 for vbl_code in self.configuration.enabled_rules: 

135 params = self.configuration.rule_parameters.get( 

136 vbl_code, __.immut.Dictionary( ) ) 

137 try: 

138 rule = self.registry_manager.produce_rule_instance( 

139 vbl_code = vbl_code, 

140 filename = filename, 

141 wrapper = wrapper, 

142 source_lines = source_lines, 

143 **params ) 

144 rules.append( rule ) 

145 except Exception as exc: 

146 raise _exceptions.RuleExecuteFailure( vbl_code ) from exc 

147 return rules 

148 

149 def _execute_rules( 

150 self, 

151 rules: list[ _BaseRule ], 

152 wrapper: __.libcst.metadata.MetadataWrapper 

153 ) -> None: 

154 ''' Executes rules via single-pass CST traversal. ''' 

155 for rule in rules: 

156 try: wrapper.visit( rule ) 

157 except Exception as exc: # noqa: PERF203 

158 raise _exceptions.RuleExecuteFailure( rule.rule_id ) from exc 

159 

160 def _collect_violations( 

161 self, rules: list[ _BaseRule ] 

162 ) -> list[ _violations.Violation ]: 

163 ''' Collects and sorts violations from all rules. ''' 

164 all_violations: list[ _violations.Violation ] = [ ] 

165 for rule in rules: 

166 all_violations.extend( rule.violations ) 

167 all_violations.sort( key = lambda v: ( v.line, v.column ) ) 

168 return all_violations 

169 

170 def _extract_suppressions( 

171 self, source_lines: tuple[ str, ... ] 

172 ) -> dict[ int, bool | set[ str ] ]: 

173 ''' Extracts suppression comments from source lines. 

174 

175 Returns map of line_number -> (True for all rules, or code set). 

176 ''' 

177 suppressions: dict[ int, bool | set[ str ] ] = { } 

178 for i, line in enumerate( source_lines ): 

179 if '#' not in line: 

180 continue 

181 # Simple split on first # is safer to find the START of comment 

182 comment_start = line.find( '#' ) 

183 comment_text = line[ comment_start + 1: ].strip( ) 

184 if 'noqa' not in comment_text: 

185 continue 

186 # Split into parts to handle "nosec # noqa: ..." 

187 parts = comment_text.split( ) 

188 # Check for bare noqa 

189 # Avoid matching "noqa" inside other words 

190 if 'noqa' in parts and not any( 

191 p.startswith( 'noqa:' ) for p in parts 

192 ): 

193 suppressions[ i + 1 ] = True 

194 continue 

195 # Check for specific codes 

196 for part in parts: 

197 if part.startswith( 'noqa:' ): 

198 codes_str = part[ 5: ] 

199 codes = { 

200 c.strip( ) for c in codes_str.split( ',' ) 

201 if c.strip( ) } 

202 self._add_suppression( suppressions, i + 1, codes ) 

203 # Robust parsing for noqa: ... 

204 if 'noqa:' in comment_text: 204 ↛ 178line 204 didn't jump to line 178 because the condition on line 204 was always true

205 noqa_idx = comment_text.find( 'noqa:' ) 

206 code_text = comment_text[ noqa_idx + 5: ] 

207 codes = { 

208 c.strip( ) for c in code_text.split( ',' ) 

209 if c.strip( ) } 

210 valid_codes = { 

211 c for c in codes if c and not c.startswith( '#' ) } 

212 self._add_suppression( suppressions, i + 1, valid_codes ) 

213 return suppressions 

214 

215 def _add_suppression( 

216 self, 

217 suppressions: dict[ int, bool | set[ str ] ], 

218 line_number: int, 

219 codes: set[ str ] 

220 ) -> None: 

221 ''' Helper to add codes to suppression map. ''' 

222 suppression = suppressions.get( line_number ) 

223 if isinstance( suppression, set ): 

224 suppression.update( codes ) 

225 else: 

226 suppressions[ line_number ] = codes 

227 

228 def _resolve_rule_identifiers( 

229 self, 

230 identifiers: tuple[ str, ... ] 

231 ) -> set[ str ]: 

232 ''' Resolves rule identifiers to VBL codes. ''' 

233 resolved: set[ str ] = set( ) 

234 for identifier in identifiers: 

235 # Try to resolve as VBL code or descriptive name 

236 vbl_code = self._try_resolve_identifier( identifier ) 

237 resolved.add( vbl_code ) 

238 return resolved 

239 

240 def _try_resolve_identifier( self, identifier: str ) -> str: 

241 ''' Attempts to resolve identifier, returns original on failure. ''' 

242 try: 

243 return self.registry_manager.resolve_rule_identifier( 

244 identifier ) 

245 except Exception: 

246 return identifier 

247 

248 def _filter_violations( 

249 self, 

250 violations: list[ _violations.Violation ], 

251 suppressions: dict[ int, bool | set[ str ] ], 

252 filename: str, 

253 ) -> list[ _violations.Violation ]: 

254 ''' Filters violations based on suppressions and per-file ignores. ''' 

255 if not violations: 

256 return violations 

257 filtered: list[ _violations.Violation ] = [ ] 

258 # 1. Per-file ignores from configuration 

259 ignored_rules: set[ str ] = set( ) 

260 # Convert filename to Path for glob matching 

261 file_path = __.pathlib.Path( filename ) 

262 for pattern, rules in self.configuration.per_file_ignores.items( ): 

263 # Use wcmatch via __ import 

264 if __.wcglob.globmatch( 264 ↛ 262line 264 didn't jump to line 262 because the condition on line 264 was always true

265 str( file_path ), pattern, flags = __.wcglob.GLOBSTAR 

266 ): 

267 # Resolve descriptive names to VBL codes 

268 resolved_rules = self._resolve_rule_identifiers( rules ) 

269 ignored_rules.update( resolved_rules ) 

270 for violation in violations: 

271 # Check per-file ignores 

272 if violation.rule_id in ignored_rules: 

273 continue 

274 # Check inline suppressions 

275 if violation.line in suppressions: 

276 suppression = suppressions[ violation.line ] 

277 if suppression is True: 

278 continue 

279 if isinstance( suppression, set ): 279 ↛ 285line 279 didn't jump to line 285 because the condition on line 279 was always true

280 # Resolve descriptive names in suppression set 

281 resolved_suppression = self._resolve_rule_identifiers( 

282 tuple( suppression ) ) 

283 if violation.rule_id in resolved_suppression: 

284 continue 

285 filtered.append( violation ) 

286 return filtered 

287 

288 def lint_source( 

289 self, 

290 source_code: __.typx.Annotated[ 

291 str, 

292 __.ddoc.Doc( 'Python source code to analyze.' ) ], 

293 filename: __.typx.Annotated[ 

294 str, 

295 __.ddoc.Doc( 'Logical filename for source code.' ) ] = '<string>', 

296 ) -> __.typx.Annotated[ 

297 Report, 

298 __.ddoc.Doc( 

299 'Analysis results including violations and metadata.' ) ]: 

300 ''' Analyzes Python source code and returns violations. ''' 

301 analysis_start_time = __.time.perf_counter( ) 

302 wrapper, source_lines = self._create_metadata_wrapper( 

303 source_code, filename ) 

304 rules = self._instantiate_rules( wrapper, source_lines, filename ) 

305 self._execute_rules( rules, wrapper ) 

306 all_violations = self._collect_violations( rules ) 

307 # Filter violations 

308 suppressions = self._extract_suppressions( source_lines ) 

309 filtered_violations = self._filter_violations( 

310 all_violations, suppressions, filename ) 

311 violation_contexts: tuple[ 

312 _violations.ViolationContext, ... ] = ( ) 

313 if self.configuration.include_context and filtered_violations: 

314 violation_contexts = _context.extract_contexts_for_violations( 

315 filtered_violations, 

316 source_lines, 

317 self.configuration.context_size ) 

318 analysis_duration_ms = ( 

319 ( __.time.perf_counter( ) - analysis_start_time ) * 1000 ) 

320 return Report( 

321 violations = tuple( filtered_violations ), 

322 contexts = violation_contexts, 

323 filename = filename, 

324 rule_count = len( rules ), 

325 analysis_duration_ms = analysis_duration_ms ) 

326 

327 def lint_files( 

328 self, 

329 file_paths: __.typx.Annotated[ 

330 __.cabc.Sequence[ __.pathlib.Path ], 

331 __.ddoc.Doc( 'Paths to Python source files to analyze.' ) ] 

332 ) -> __.typx.Annotated[ 

333 tuple[ Report, ... ], 

334 __.ddoc.Doc( 'Analysis results for all files.' ) ]: 

335 ''' Analyzes multiple Python source files. ''' 

336 reports: list[ Report ] = [ ] 

337 for file_path in file_paths: 

338 try: report = self.lint_file( file_path ) 

339 except Exception: continue # noqa: S112 

340 reports.append( report ) 

341 return tuple( reports )