Coverage for sources / vibelinter / engine.py: 97%
139 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 04:34 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 04:34 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
22''' Central linter engine coordinating single-pass CST analysis. '''
25from . import __
26from . import exceptions as _exceptions
27from .rules import context as _context
28from .rules import registry as _registry
29from .rules import violations as _violations
30from .rules.base import BaseRule as _BaseRule
33def _create_empty_rule_parameters( ) -> __.immut.Dictionary[
34 str, __.immut.Dictionary[ str, __.typx.Any ] ]:
35 ''' Creates empty rule parameters dictionary. '''
36 return __.immut.Dictionary( )
39class EngineConfiguration( __.immut.DataclassObject ):
40 ''' Configuration for linter engine behavior and rule selection. '''
42 enabled_rules: __.typx.Annotated[
43 frozenset[ str ],
44 __.ddoc.Doc( 'VBL codes of rules to execute.' ) ]
45 rule_parameters: __.typx.Annotated[
46 __.immut.Dictionary[
47 str, __.immut.Dictionary[ str, __.typx.Any ] ],
48 __.ddoc.Doc(
49 'Rule-specific configuration parameters indexed by VBL code.'
50 ) ] = __.dcls.field( default_factory = _create_empty_rule_parameters )
51 context_size: __.typx.Annotated[
52 int,
53 __.ddoc.Doc(
54 'Number of context lines to extract around violations.' ) ] = 2
55 include_context: __.typx.Annotated[
56 bool,
57 __.ddoc.Doc(
58 'Whether to extract source context for violations.' ) ] = True
59 per_file_ignores: __.typx.Annotated[
60 __.immut.Dictionary[ str, tuple[ str, ... ] ],
61 __.ddoc.Doc(
62 'Per-file rule exclusions.'
63 ) ] = __.dcls.field( default_factory = lambda: __.immut.Dictionary( ) )
66class Report( __.immut.DataclassObject ):
67 ''' Results of linting analysis including violations and metadata. '''
69 violations: __.typx.Annotated[
70 tuple[ _violations.Violation, ... ],
71 __.ddoc.Doc( 'All violations detected during analysis.' ) ]
72 contexts: __.typx.Annotated[
73 tuple[ _violations.ViolationContext, ... ],
74 __.ddoc.Doc( 'Violation contexts when context extraction enabled.' ) ]
75 filename: __.typx.Annotated[
76 str, __.ddoc.Doc( 'Path to analyzed source file.' ) ]
77 rule_count: __.typx.Annotated[
78 int, __.ddoc.Doc( 'Number of rules executed during analysis.' ) ]
79 analysis_duration_ms: __.typx.Annotated[
80 float,
81 __.ddoc.Doc( 'Time spent in analysis phase excluding parsing.' ) ]
84class Engine:
85 ''' Central orchestrator for linting analysis.
87 Implements single-pass CST traversal with multiple rule execution.
88 '''
90 def __init__(
91 self,
92 registry_manager: __.typx.Annotated[
93 _registry.RuleRegistryManager,
94 __.ddoc.Doc( 'Rule registry for instantiating rules.' ) ],
95 configuration: __.typx.Annotated[
96 EngineConfiguration,
97 __.ddoc.Doc( 'Engine configuration and rule selection.' ) ],
98 ) -> None:
99 self.registry_manager = registry_manager
100 self.configuration = configuration
102 def lint_file(
103 self,
104 file_path: __.typx.Annotated[
105 __.pathlib.Path,
106 __.ddoc.Doc( 'Path to Python source file to analyze.' ) ]
107 ) -> __.typx.Annotated[
108 Report,
109 __.ddoc.Doc(
110 'Analysis results including violations and metadata.' ) ]:
111 ''' Analyzes a Python source file and returns violations. '''
112 source_code = file_path.read_text( encoding = 'utf-8' )
113 return self.lint_source( source_code, str( file_path ) )
115 def _create_metadata_wrapper(
116 self, source_code: str, filename: str
117 ) -> tuple[ __.libcst.metadata.MetadataWrapper, tuple[ str, ... ] ]:
118 ''' Parses source and creates metadata wrapper. '''
119 module = __.libcst.parse_module( source_code )
120 source_lines = tuple( source_code.splitlines( ) )
121 try: wrapper = __.libcst.metadata.MetadataWrapper( module )
122 except Exception as exc:
123 raise _exceptions.MetadataProvideFailure( filename ) from exc
124 return wrapper, source_lines
126 def _instantiate_rules(
127 self,
128 wrapper: __.libcst.metadata.MetadataWrapper,
129 source_lines: tuple[ str, ... ],
130 filename: str
131 ) -> list[ _BaseRule ]:
132 ''' Instantiates all enabled rules with configuration. '''
133 rules: list[ _BaseRule ] = [ ]
134 for vbl_code in self.configuration.enabled_rules:
135 params = self.configuration.rule_parameters.get(
136 vbl_code, __.immut.Dictionary( ) )
137 try:
138 rule = self.registry_manager.produce_rule_instance(
139 vbl_code = vbl_code,
140 filename = filename,
141 wrapper = wrapper,
142 source_lines = source_lines,
143 **params )
144 rules.append( rule )
145 except Exception as exc:
146 raise _exceptions.RuleExecuteFailure( vbl_code ) from exc
147 return rules
149 def _execute_rules(
150 self,
151 rules: list[ _BaseRule ],
152 wrapper: __.libcst.metadata.MetadataWrapper
153 ) -> None:
154 ''' Executes rules via single-pass CST traversal. '''
155 for rule in rules:
156 try: wrapper.visit( rule )
157 except Exception as exc: # noqa: PERF203
158 raise _exceptions.RuleExecuteFailure( rule.rule_id ) from exc
160 def _collect_violations(
161 self, rules: list[ _BaseRule ]
162 ) -> list[ _violations.Violation ]:
163 ''' Collects and sorts violations from all rules. '''
164 all_violations: list[ _violations.Violation ] = [ ]
165 for rule in rules:
166 all_violations.extend( rule.violations )
167 all_violations.sort( key = lambda v: ( v.line, v.column ) )
168 return all_violations
170 def _extract_suppressions(
171 self, source_lines: tuple[ str, ... ]
172 ) -> dict[ int, bool | set[ str ] ]:
173 ''' Extracts suppression comments from source lines.
175 Returns map of line_number -> (True for all rules, or code set).
176 '''
177 suppressions: dict[ int, bool | set[ str ] ] = { }
178 for i, line in enumerate( source_lines ):
179 if '#' not in line:
180 continue
181 # Simple split on first # is safer to find the START of comment
182 comment_start = line.find( '#' )
183 comment_text = line[ comment_start + 1: ].strip( )
184 if 'noqa' not in comment_text:
185 continue
186 # Split into parts to handle "nosec # noqa: ..."
187 parts = comment_text.split( )
188 # Check for bare noqa
189 # Avoid matching "noqa" inside other words
190 if 'noqa' in parts and not any(
191 p.startswith( 'noqa:' ) for p in parts
192 ):
193 suppressions[ i + 1 ] = True
194 continue
195 # Check for specific codes
196 for part in parts:
197 if part.startswith( 'noqa:' ):
198 codes_str = part[ 5: ]
199 codes = {
200 c.strip( ) for c in codes_str.split( ',' )
201 if c.strip( ) }
202 self._add_suppression( suppressions, i + 1, codes )
203 # Robust parsing for noqa: ...
204 if 'noqa:' in comment_text: 204 ↛ 178line 204 didn't jump to line 178 because the condition on line 204 was always true
205 noqa_idx = comment_text.find( 'noqa:' )
206 code_text = comment_text[ noqa_idx + 5: ]
207 codes = {
208 c.strip( ) for c in code_text.split( ',' )
209 if c.strip( ) }
210 valid_codes = {
211 c for c in codes if c and not c.startswith( '#' ) }
212 self._add_suppression( suppressions, i + 1, valid_codes )
213 return suppressions
215 def _add_suppression(
216 self,
217 suppressions: dict[ int, bool | set[ str ] ],
218 line_number: int,
219 codes: set[ str ]
220 ) -> None:
221 ''' Helper to add codes to suppression map. '''
222 suppression = suppressions.get( line_number )
223 if isinstance( suppression, set ):
224 suppression.update( codes )
225 else:
226 suppressions[ line_number ] = codes
228 def _resolve_rule_identifiers(
229 self,
230 identifiers: tuple[ str, ... ]
231 ) -> set[ str ]:
232 ''' Resolves rule identifiers to VBL codes. '''
233 resolved: set[ str ] = set( )
234 for identifier in identifiers:
235 # Try to resolve as VBL code or descriptive name
236 vbl_code = self._try_resolve_identifier( identifier )
237 resolved.add( vbl_code )
238 return resolved
240 def _try_resolve_identifier( self, identifier: str ) -> str:
241 ''' Attempts to resolve identifier, returns original on failure. '''
242 try:
243 return self.registry_manager.resolve_rule_identifier(
244 identifier )
245 except Exception:
246 return identifier
248 def _filter_violations(
249 self,
250 violations: list[ _violations.Violation ],
251 suppressions: dict[ int, bool | set[ str ] ],
252 filename: str,
253 ) -> list[ _violations.Violation ]:
254 ''' Filters violations based on suppressions and per-file ignores. '''
255 if not violations:
256 return violations
257 filtered: list[ _violations.Violation ] = [ ]
258 # 1. Per-file ignores from configuration
259 ignored_rules: set[ str ] = set( )
260 # Convert filename to Path for glob matching
261 file_path = __.pathlib.Path( filename )
262 for pattern, rules in self.configuration.per_file_ignores.items( ):
263 # Use wcmatch via __ import
264 if __.wcglob.globmatch( 264 ↛ 262line 264 didn't jump to line 262 because the condition on line 264 was always true
265 str( file_path ), pattern, flags = __.wcglob.GLOBSTAR
266 ):
267 # Resolve descriptive names to VBL codes
268 resolved_rules = self._resolve_rule_identifiers( rules )
269 ignored_rules.update( resolved_rules )
270 for violation in violations:
271 # Check per-file ignores
272 if violation.rule_id in ignored_rules:
273 continue
274 # Check inline suppressions
275 if violation.line in suppressions:
276 suppression = suppressions[ violation.line ]
277 if suppression is True:
278 continue
279 if isinstance( suppression, set ): 279 ↛ 285line 279 didn't jump to line 285 because the condition on line 279 was always true
280 # Resolve descriptive names in suppression set
281 resolved_suppression = self._resolve_rule_identifiers(
282 tuple( suppression ) )
283 if violation.rule_id in resolved_suppression:
284 continue
285 filtered.append( violation )
286 return filtered
288 def lint_source(
289 self,
290 source_code: __.typx.Annotated[
291 str,
292 __.ddoc.Doc( 'Python source code to analyze.' ) ],
293 filename: __.typx.Annotated[
294 str,
295 __.ddoc.Doc( 'Logical filename for source code.' ) ] = '<string>',
296 ) -> __.typx.Annotated[
297 Report,
298 __.ddoc.Doc(
299 'Analysis results including violations and metadata.' ) ]:
300 ''' Analyzes Python source code and returns violations. '''
301 analysis_start_time = __.time.perf_counter( )
302 wrapper, source_lines = self._create_metadata_wrapper(
303 source_code, filename )
304 rules = self._instantiate_rules( wrapper, source_lines, filename )
305 self._execute_rules( rules, wrapper )
306 all_violations = self._collect_violations( rules )
307 # Filter violations
308 suppressions = self._extract_suppressions( source_lines )
309 filtered_violations = self._filter_violations(
310 all_violations, suppressions, filename )
311 violation_contexts: tuple[
312 _violations.ViolationContext, ... ] = ( )
313 if self.configuration.include_context and filtered_violations:
314 violation_contexts = _context.extract_contexts_for_violations(
315 filtered_violations,
316 source_lines,
317 self.configuration.context_size )
318 analysis_duration_ms = (
319 ( __.time.perf_counter( ) - analysis_start_time ) * 1000 )
320 return Report(
321 violations = tuple( filtered_violations ),
322 contexts = violation_contexts,
323 filename = filename,
324 rule_count = len( rules ),
325 analysis_duration_ms = analysis_duration_ms )
327 def lint_files(
328 self,
329 file_paths: __.typx.Annotated[
330 __.cabc.Sequence[ __.pathlib.Path ],
331 __.ddoc.Doc( 'Paths to Python source files to analyze.' ) ]
332 ) -> __.typx.Annotated[
333 tuple[ Report, ... ],
334 __.ddoc.Doc( 'Analysis results for all files.' ) ]:
335 ''' Analyzes multiple Python source files. '''
336 reports: list[ Report ] = [ ]
337 for file_path in file_paths:
338 try: report = self.lint_file( file_path )
339 except Exception: continue # noqa: S112
340 reports.append( report )
341 return tuple( reports )