Coverage for sources / vibelinter / cli.py: 28%
290 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 04:34 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 04:34 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Command-line interface. '''
23# ruff: noqa: F821
26from appcore import cli as _appcore_cli
28from . import __
29from . import configuration as _configuration
30from . import engine as _engine
31from . import rules as _rules
32# Ensure registry is available for type hints
33from .rules import registry as _registry
36class DiffFormats( __.enum.Enum ):
37 ''' Diff visualization formats. '''
39 Unified = 'unified'
40 Context = 'context'
43class DisplayFormats( __.enum.Enum ):
44 ''' Output formats for reporting. '''
46 Text = 'text'
47 Json = 'json'
50class DisplayOptions( _appcore_cli.DisplayOptions ):
51 ''' Display options extending appcore.cli with output format selection.
53 Adds format-specific output control for linter reporting.
54 '''
56 format: __.typx.Annotated[
57 DisplayFormats,
58 __.tyro.conf.arg( prefix_name = False ),
59 __.ddoc.Doc( ''' Output format for reporting. ''' )
60 ] = DisplayFormats.Text
61 context: __.typx.Annotated[
62 int,
63 __.tyro.conf.arg( prefix_name = False ),
64 __.ddoc.Doc( ''' Show context lines around violations. ''' )
65 ] = 0
68RuleSelectorArgument: __.typx.TypeAlias = __.typx.Annotated[
69 str,
70 __.tyro.conf.arg( prefix_name = False ),
71 __.ddoc.Doc(
72 ''' Comma-separated rule identifiers '''
73 ''' (e.g. VBL101, function-ordering). '''
74 )
75]
76PathsArgument: __.typx.TypeAlias = __.tyro.conf.Positional[
77 tuple[ str, ... ]
78]
81class RenderableResult( __.immut.DataclassProtocol, __.typx.Protocol ):
82 ''' Protocol for command results with format-specific rendering.
84 Combines DataclassProtocol and Protocol to provide both structural
85 typing and dataclass compatibility. Result classes should explicitly
86 inherit from this base class.
87 '''
89 @__.abc.abstractmethod
90 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
91 ''' Renders result as JSON-compatible dictionary. '''
92 raise NotImplementedError
94 @__.abc.abstractmethod
95 def render_as_text( self ) -> tuple[ str, ... ]:
96 ''' Renders result as text lines. '''
97 raise NotImplementedError
100class CheckResult( RenderableResult ):
101 ''' Result from check command execution. '''
103 paths: tuple[ str, ... ]
104 reports: tuple[ __.typx.Any, ... ] # Engine Report objects
105 total_violations: int
106 total_files: int
107 rule_selection: __.Absential[ str ] = __.absent
109 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
110 ''' Renders result as JSON-compatible dictionary. '''
111 files_data: list[ dict[ str, __.typx.Any ] ] = [ ]
112 for report_obj in self.reports:
113 typed_report = __.typx.cast( _engine.Report, report_obj )
114 violations_data = [
115 v.render_as_json( ) for v in typed_report.violations
116 ]
117 files_data.append( {
118 'filename': typed_report.filename,
119 'violations': violations_data,
120 'violation_count': len( typed_report.violations ),
121 'rule_count': typed_report.rule_count,
122 'analysis_duration_ms': typed_report.analysis_duration_ms,
123 } )
124 result: dict[ str, __.typx.Any ] = {
125 'files': files_data,
126 'total_violations': self.total_violations,
127 'total_files': self.total_files,
128 }
129 if not __.is_absent( self.rule_selection ):
130 result[ 'rule_selection' ] = self.rule_selection
131 return result
133 def render_as_text( self ) -> tuple[ str, ... ]:
134 ''' Renders result as text lines. '''
135 lines: list[ str ] = [ ]
136 for report_obj in self.reports:
137 typed_report = __.typx.cast( _engine.Report, report_obj )
138 if typed_report.violations:
139 lines.append( f'\n{typed_report.filename}:' )
140 lines.extend(
141 v.render_as_text( )
142 for v in typed_report.violations )
143 if not lines:
144 lines.append( 'No violations found.' )
145 else:
146 lines.append(
147 f'\nFound {self.total_violations} violations '
148 f'in {self.total_files} files.' )
149 return tuple( lines )
152class FixResult( RenderableResult ):
153 ''' Result from fix command execution. '''
155 paths: tuple[ str, ... ]
156 simulate: bool
157 diff_format: str
158 apply_dangerous: bool
159 rule_selection: __.Absential[ str ] = __.absent
161 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
162 ''' Renders result as JSON-compatible dictionary. '''
163 result: dict[ str, __.typx.Any ] = {
164 'paths': list( self.paths ),
165 'simulate': self.simulate,
166 'diff_format': self.diff_format,
167 'apply_dangerous': self.apply_dangerous,
168 }
169 if not __.is_absent( self.rule_selection ):
170 result[ 'rule_selection' ] = self.rule_selection
171 return result
173 def render_as_text( self ) -> tuple[ str, ... ]:
174 ''' Renders result as text lines. '''
175 lines = [ f'Fixing paths: {self.paths}' ]
176 if not __.is_absent( self.rule_selection ):
177 lines.append( f' Rule selection: {self.rule_selection}' )
178 lines.append( f' Simulate: {self.simulate}' )
179 lines.append( f' Diff format: {self.diff_format}' )
180 lines.append( f' Apply dangerous: {self.apply_dangerous}' )
181 return tuple( lines )
184class ConfigureResult( RenderableResult ):
185 ''' Result from configure command execution. '''
187 validate: bool
188 interactive: bool
189 display_effective: bool
191 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
192 ''' Renders result as JSON-compatible dictionary. '''
193 return {
194 'validate': self.validate,
195 'interactive': self.interactive,
196 'display_effective': self.display_effective,
197 }
199 def render_as_text( self ) -> tuple[ str, ... ]:
200 ''' Renders result as text lines. '''
201 return (
202 'Configure command',
203 f' Validate: {self.validate}',
204 f' Interactive: {self.interactive}',
205 f' Display effective: {self.display_effective}',
206 )
209class DescribeRulesResult( RenderableResult ):
210 ''' Result from describe rules command execution. '''
212 details: bool
214 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
215 ''' Renders result as JSON-compatible dictionary. '''
216 return { 'details': self.details }
218 def render_as_text( self ) -> tuple[ str, ... ]:
219 ''' Renders result as text lines. '''
220 return (
221 'Available rules',
222 f' Details: {self.details}',
223 )
226class DescribeRuleResult( RenderableResult ):
227 ''' Result from describe rule command execution. '''
229 rule_id: str
230 details: bool
232 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
233 ''' Renders result as JSON-compatible dictionary. '''
234 return {
235 'rule_id': self.rule_id,
236 'details': self.details,
237 }
239 def render_as_text( self ) -> tuple[ str, ... ]:
240 ''' Renders result as text lines. '''
241 return (
242 f'Rule: {self.rule_id}',
243 f' Details: {self.details}',
244 )
247class ServeResult( RenderableResult ):
248 ''' Result from serve command execution. '''
250 protocol: str
252 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
253 ''' Renders result as JSON-compatible dictionary. '''
254 return {
255 'protocol': self.protocol,
256 'status': 'not_implemented',
257 }
259 def render_as_text( self ) -> tuple[ str, ... ]:
260 ''' Renders result as text lines. '''
261 return (
262 f'Protocol server: {self.protocol}',
263 ' (Not yet implemented)',
264 )
267class CheckCommand( __.immut.DataclassObject ):
268 ''' Analyzes code and reports violations. '''
270 paths: PathsArgument = ( '.',)
271 select: __.Absential[ RuleSelectorArgument ] = __.absent
272 jobs: __.typx.Annotated[
273 __.typx.Union[ int, __.typx.Literal[ 'auto' ] ],
274 __.tyro.conf.arg( prefix_name = False ),
275 __.ddoc.Doc( ''' Number of parallel processing jobs. ''' )
276 ] = 'auto'
278 async def __call__( self, display: DisplayOptions ) -> int:
279 ''' Executes the check command. '''
280 # TODO: Implement parallel processing with jobs parameter
281 _ = self.jobs # Suppress vulture warning
282 config = _configuration.discover_configuration( )
283 file_paths = _discover_python_files( self.paths )
284 if not __.is_absent( config ):
285 file_paths = _apply_path_filters( file_paths, config )
286 if not file_paths:
287 result = CheckResult(
288 paths = self.paths,
289 reports = ( ),
290 total_violations = 0,
291 total_files = 0,
292 rule_selection = self.select,
293 )
294 async with __.ctxl.AsyncExitStack( ) as exits:
295 await _render_and_print_result( result, display, exits )
296 return 0
297 enabled_rules = _merge_rule_selection(
298 self.select, config, _rules.create_registry_manager( ) )
299 context_size = _merge_context_size( display.context, config )
300 rule_parameters: __.immut.Dictionary[
301 str, __.immut.Dictionary[ str, __.typx.Any ] ]
302 per_file_ignores: __.immut.Dictionary[ str, tuple[ str, ... ] ]
303 if __.is_absent( config ):
304 rule_parameters = __.immut.Dictionary( )
305 per_file_ignores = __.immut.Dictionary( )
306 else:
307 rule_parameters = config.rule_parameters
308 per_file_ignores = config.per_file_ignores
309 configuration = _engine.EngineConfiguration(
310 enabled_rules = enabled_rules,
311 context_size = context_size,
312 include_context = context_size > 0,
313 rule_parameters = rule_parameters,
314 per_file_ignores = per_file_ignores,
315 )
316 registry_manager = _rules.create_registry_manager( )
317 engine = _engine.Engine( registry_manager, configuration )
318 reports = engine.lint_files( file_paths )
319 total_violations = sum( len( r.violations ) for r in reports )
320 result = CheckResult(
321 paths = self.paths,
322 reports = reports,
323 total_violations = total_violations,
324 total_files = len( reports ),
325 rule_selection = self.select,
326 )
327 async with __.ctxl.AsyncExitStack( ) as exits:
328 await _render_and_print_result( result, display, exits )
329 return 1 if total_violations > 0 else 0
332class FixCommand( __.immut.DataclassObject ):
333 ''' Applies automated fixes with safety controls. '''
335 paths: PathsArgument = ( '.',)
336 select: __.Absential[ RuleSelectorArgument ] = __.absent
337 simulate: __.typx.Annotated[
338 bool,
339 __.tyro.conf.arg( prefix_name = False ),
340 __.ddoc.Doc( ''' Preview changes without applying them. ''' )
341 ] = False
342 diff_format: __.typx.Annotated[
343 DiffFormats,
344 __.tyro.conf.arg( prefix_name = False ),
345 __.ddoc.Doc( ''' Diff visualization format. ''' )
346 ] = DiffFormats.Unified
347 apply_dangerous: __.typx.Annotated[
348 bool,
349 __.tyro.conf.arg( prefix_name = False ),
350 __.ddoc.Doc( ''' Enable potentially unsafe fixes. ''' )
351 ] = False
353 async def __call__( self, display: DisplayOptions ) -> int:
354 ''' Executes the fix command. '''
355 result = FixResult(
356 paths = self.paths,
357 simulate = self.simulate,
358 diff_format = self.diff_format.value,
359 apply_dangerous = self.apply_dangerous,
360 rule_selection = self.select,
361 )
362 async with __.ctxl.AsyncExitStack( ) as exits:
363 await _render_and_print_result( result, display, exits )
364 return 0
367class ConfigureCommand( __.immut.DataclassObject ):
368 ''' Manages configuration without destructive file editing. '''
370 validate: __.typx.Annotated[
371 bool,
372 __.tyro.conf.arg( prefix_name = False ),
373 __.ddoc.Doc(
374 ''' Validate existing configuration without analysis. ''' )
375 ] = False
376 interactive: __.typx.Annotated[
377 bool,
378 __.tyro.conf.arg( prefix_name = False ),
379 __.ddoc.Doc( ''' Interactive configuration wizard. ''' )
380 ] = False
381 display_effective: __.typx.Annotated[
382 bool,
383 __.tyro.conf.arg( prefix_name = False ),
384 __.ddoc.Doc( ''' Display effective merged configuration. ''' )
385 ] = False
387 async def __call__( self, display: DisplayOptions ) -> int:
388 ''' Executes the configure command. '''
389 result = ConfigureResult(
390 validate = self.validate,
391 interactive = self.interactive,
392 display_effective = self.display_effective,
393 )
394 async with __.ctxl.AsyncExitStack( ) as exits:
395 await _render_and_print_result( result, display, exits )
396 return 0
399class DescribeRulesCommand( __.immut.DataclassObject ):
400 ''' Lists all available rules with descriptions. '''
402 details: __.typx.Annotated[
403 bool,
404 __.tyro.conf.arg( prefix_name = False ),
405 __.ddoc.Doc(
406 ''' Display detailed rule information including '''
407 ''' configuration status. ''' )
408 ] = False
410 async def __call__( self, display: DisplayOptions ) -> int:
411 ''' Executes the describe rules command. '''
412 result = DescribeRulesResult( details = self.details )
413 async with __.ctxl.AsyncExitStack( ) as exits:
414 await _render_and_print_result( result, display, exits )
415 return 0
418class DescribeRuleCommand( __.immut.DataclassObject ):
419 ''' Displays detailed information for a specific rule. '''
421 rule_id: __.tyro.conf.Positional[ str ]
422 details: __.typx.Annotated[
423 bool,
424 __.tyro.conf.arg( prefix_name = False ),
425 __.ddoc.Doc(
426 ''' Display detailed rule information including '''
427 ''' configuration status. ''' )
428 ] = False
430 async def __call__( self, display: DisplayOptions ) -> int:
431 ''' Executes the describe rule command. '''
432 result = DescribeRuleResult(
433 rule_id = self.rule_id,
434 details = self.details,
435 )
436 async with __.ctxl.AsyncExitStack( ) as exits:
437 await _render_and_print_result( result, display, exits )
438 return 0
441class DescribeCommand( __.immut.DataclassObject ):
442 ''' Displays rule information and documentation. '''
444 subcommand: __.typx.Union[
445 __.typx.Annotated[
446 DescribeRulesCommand,
447 __.tyro.conf.subcommand( 'rules', prefix_name = False ),
448 ],
449 __.typx.Annotated[
450 DescribeRuleCommand,
451 __.tyro.conf.subcommand( 'rule', prefix_name = False ),
452 ],
453 ]
455 async def __call__( self, display: DisplayOptions ) -> int:
456 ''' Delegates to selected subcommand. '''
457 return await self.subcommand( display )
460class ServeCommand( __.immut.DataclassObject ):
461 ''' Starts a protocol server (future implementation). '''
463 protocol: __.typx.Annotated[
464 __.typx.Literal[ 'lsp', 'mcp' ],
465 __.tyro.conf.arg( prefix_name = False ),
466 __.ddoc.Doc( ''' Protocol server to start. ''' )
467 ] = 'mcp'
469 async def __call__( self, display: DisplayOptions ) -> int:
470 ''' Executes the serve command. '''
471 result = ServeResult( protocol = self.protocol )
472 async with __.ctxl.AsyncExitStack( ) as exits:
473 await _render_and_print_result( result, display, exits )
474 return 0
477class Cli( __.immut.DataclassObject ):
478 ''' Linter command-line interface. '''
480 command: __.typx.Union[
481 __.typx.Annotated[
482 CheckCommand,
483 __.tyro.conf.subcommand(
484 'check', prefix_name = False, default = True ),
485 ],
486 __.typx.Annotated[
487 FixCommand,
488 __.tyro.conf.subcommand( 'fix', prefix_name = False ),
489 ],
490 __.typx.Annotated[
491 ConfigureCommand,
492 __.tyro.conf.subcommand( 'configure', prefix_name = False ),
493 ],
494 __.typx.Annotated[
495 DescribeCommand,
496 __.tyro.conf.subcommand( 'describe', prefix_name = False ),
497 ],
498 __.typx.Annotated[
499 ServeCommand,
500 __.tyro.conf.subcommand( 'serve', prefix_name = False ),
501 ],
502 ]
503 display: __.typx.Annotated[
504 DisplayOptions,
505 __.tyro.conf.arg( prefix_name = False ),
506 ] = __.dcls.field( default_factory = DisplayOptions )
507 verbose: __.typx.Annotated[
508 bool,
509 __.ddoc.Doc( ''' Enable verbose output. ''' )
510 ] = False
512 async def __call__( self ) -> None:
513 ''' Invokes selected subcommand after system preparation. '''
514 # TODO: Implement verbose logging setup
515 _ = self.verbose # Suppress vulture warning
516 async with intercept_errors( self.display ):
517 exit_code = await self.command( self.display )
518 raise SystemExit( exit_code )
521def execute( ) -> None:
522 ''' Entrypoint for CLI execution. '''
523 from asyncio import run
524 config = (
525 __.tyro.conf.EnumChoicesFromValues,
526 __.tyro.conf.HelptextFromCommentsOff,
527 )
528 try: run( __.tyro.cli( Cli, config = config )( ) ) # pyright: ignore
529 except SystemExit: raise
530 except BaseException:
531 # TODO: Log exception with proper error handling
532 raise SystemExit( 1 ) from None
535@__.ctxl.asynccontextmanager
536async def intercept_errors(
537 display: DisplayOptions,
538) -> __.cabc.AsyncIterator[ None ]:
539 ''' Context manager that intercepts and renders exceptions.
541 Catches Omnierror exceptions and renders them according to the
542 display format. Handles unexpected exceptions by logging and
543 formatting as errors.
544 '''
545 from . import exceptions as _exceptions
546 try:
547 yield
548 except _exceptions.Omnierror as exc:
549 async with __.ctxl.AsyncExitStack( ) as exits:
550 stream = await display.provide_stream( exits )
551 match display.format:
552 case DisplayFormats.Json:
553 stream.write(
554 __.json.dumps( exc.render_as_json( ), indent = 2 ) )
555 stream.write( '\n' )
556 case DisplayFormats.Text:
557 for line in exc.render_as_text( ):
558 stream.write( line )
559 stream.write( '\n' )
560 raise SystemExit( 1 ) from exc
561 except ( SystemExit, KeyboardInterrupt ):
562 raise
563 except BaseException as exc:
564 # TODO: Log exception with proper error handling via scribe
565 async with __.ctxl.AsyncExitStack( ) as exits:
566 stream = await display.provide_stream( exits )
567 match display.format:
568 case DisplayFormats.Json:
569 error_data = {
570 'type': 'unexpected_error',
571 'message': str( exc ),
572 }
573 stream.write( __.json.dumps( error_data, indent = 2 ) )
574 stream.write( '\n' )
575 case DisplayFormats.Text:
576 stream.write( '## Unexpected Error\n' )
577 stream.write( f'**Message**: {exc}\n' )
578 raise SystemExit( 1 ) from exc
581def _discover_python_files(
582 paths: __.cabc.Sequence[ str ]
583) -> tuple[ __.pathlib.Path, ... ]:
584 ''' Discovers Python files from file paths or directories. '''
585 python_files: list[ __.pathlib.Path ] = [ ]
586 for path_str in paths:
587 path = __.pathlib.Path( path_str )
588 if not path.exists( ):
589 continue
590 if path.is_file( ) and path.suffix == '.py':
591 python_files.append( path )
592 elif path.is_dir( ):
593 python_files.extend( path.rglob( '*.py' ) )
594 return tuple( sorted( set( python_files ) ) )
597def _apply_path_filters(
598 file_paths: tuple[ __.pathlib.Path, ... ],
599 config: __.typx.Any,
600) -> tuple[ __.pathlib.Path, ... ]:
601 ''' Applies include/exclude path filters from configuration. '''
602 typed_config = __.typx.cast( _configuration.Configuration, config )
603 filtered = list( file_paths )
604 if not __.is_absent( typed_config.include_paths ):
605 filtered = [
606 fp for fp in filtered
607 if _matches_any_pattern( fp, typed_config.include_paths )
608 ]
609 if not __.is_absent( typed_config.exclude_paths ):
610 patterns = typed_config.exclude_paths
611 filtered = [
612 fp for fp in filtered
613 if not _matches_any_pattern( fp, patterns )
614 ]
615 return tuple( filtered )
618def _matches_any_pattern(
619 file_path: __.pathlib.Path,
620 patterns: tuple[ str, ... ],
621) -> bool:
622 ''' Checks if file path matches any glob pattern. '''
623 path_str = str( file_path )
624 for pattern in patterns:
625 if __.wcglob.globmatch(
626 path_str, pattern, flags = __.wcglob.GLOBSTAR ):
627 return True
628 return False
631def _merge_context_size(
632 cli_context: int,
633 config: __.Absential[ __.typx.Any ],
634) -> int:
635 ''' Merges context size from CLI and configuration. '''
636 if cli_context > 0:
637 return cli_context
638 if __.is_absent( config ):
639 return 0
640 typed_config = __.typx.cast( _configuration.Configuration, config )
641 if __.is_absent( typed_config.context ):
642 return 0
643 return typed_config.context
646def _resolve_rule_set(
647 identifiers: __.cabc.Iterable[ str ],
648 registry_manager: _registry.RuleRegistryManager,
649) -> set[ str ]:
650 ''' Resolves a sequence of rule identifiers to codes. '''
651 codes: set[ str ] = set( )
652 for raw_identifier in identifiers:
653 identifier = raw_identifier.strip( )
654 if not identifier:
655 continue
656 code = registry_manager.resolve_rule_identifier( identifier )
657 codes.add( code )
658 return codes
661def _merge_rule_selection(
662 cli_selection: __.Absential[ str ],
663 config: __.Absential[ __.typx.Any ],
664 registry_manager: _registry.RuleRegistryManager,
665) -> frozenset[ str ]:
666 ''' Merges rule selection from CLI and configuration. '''
667 from .rules.implementations.__ import RULE_DESCRIPTORS
668 all_rules = frozenset( RULE_DESCRIPTORS.keys( ) )
669 if not __.is_absent( cli_selection ):
670 return frozenset( _resolve_rule_set(
671 cli_selection.split( ',' ), registry_manager ) )
672 if __.is_absent( config ):
673 return all_rules
674 typed_config = __.typx.cast( _configuration.Configuration, config )
675 if not __.is_absent( typed_config.select ):
676 selected = _resolve_rule_set(
677 typed_config.select, registry_manager )
678 else:
679 selected = set( all_rules )
680 if not __.is_absent( typed_config.exclude_rules ):
681 excluded = _resolve_rule_set(
682 typed_config.exclude_rules, registry_manager )
683 selected -= excluded
684 return frozenset( selected )
687async def _render_and_print_result(
688 result: RenderableResult,
689 display: DisplayOptions,
690 exits: __.ctxl.AsyncExitStack,
691) -> None:
692 ''' Renders and prints a result object based on display options. '''
693 stream = await display.provide_stream( exits )
694 match display.format:
695 case DisplayFormats.Json:
696 stream.write( __.json.dumps( result.render_as_json( ) ) )
697 stream.write( '\n' )
698 case DisplayFormats.Text:
699 for line in result.render_as_text( ):
700 stream.write( line )
701 stream.write( '\n' )