Coverage for sources / vibelinter / cli.py: 29%
278 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-04 00:00 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-04 00:00 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Command-line interface. '''
23# ruff: noqa: F821
26from appcore import cli as _appcore_cli
28from . import __
29from . import configuration as _configuration
30from . import engine as _engine
31from . import rules as _rules
34class DiffFormats( __.enum.Enum ):
35 ''' Diff visualization formats. '''
37 Unified = 'unified'
38 Context = 'context'
41class DisplayFormats( __.enum.Enum ):
42 ''' Output formats for reporting. '''
44 Text = 'text'
45 Json = 'json'
48class DisplayOptions( _appcore_cli.DisplayOptions ):
49 ''' Display options extending appcore.cli with output format selection.
51 Adds format-specific output control for linter reporting.
52 '''
54 format: __.typx.Annotated[
55 DisplayFormats,
56 __.tyro.conf.arg( prefix_name = False ),
57 __.ddoc.Doc( ''' Output format for reporting. ''' )
58 ] = DisplayFormats.Text
59 context: __.typx.Annotated[
60 int,
61 __.tyro.conf.arg( prefix_name = False ),
62 __.ddoc.Doc( ''' Show context lines around violations. ''' )
63 ] = 0
66RuleSelectorArgument: __.typx.TypeAlias = __.typx.Annotated[
67 str,
68 __.tyro.conf.arg( prefix_name = False ),
69 __.ddoc.Doc( ''' Comma-separated VBL rule codes (e.g. VBL101,VBL201). ''' )
70]
71PathsArgument: __.typx.TypeAlias = __.tyro.conf.Positional[
72 tuple[ str, ... ]
73]
76class RenderableResult( __.immut.DataclassProtocol, __.typx.Protocol ):
77 ''' Protocol for command results with format-specific rendering.
79 Combines DataclassProtocol and Protocol to provide both structural
80 typing and dataclass compatibility. Result classes should explicitly
81 inherit from this base class.
82 '''
84 @__.abc.abstractmethod
85 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
86 ''' Renders result as JSON-compatible dictionary. '''
87 raise NotImplementedError
89 @__.abc.abstractmethod
90 def render_as_text( self ) -> tuple[ str, ... ]:
91 ''' Renders result as text lines. '''
92 raise NotImplementedError
95class CheckResult( RenderableResult ):
96 ''' Result from check command execution. '''
98 paths: tuple[ str, ... ]
99 reports: tuple[ __.typx.Any, ... ] # Engine Report objects
100 total_violations: int
101 total_files: int
102 rule_selection: __.Absential[ str ] = __.absent
104 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
105 ''' Renders result as JSON-compatible dictionary. '''
106 files_data: list[ dict[ str, __.typx.Any ] ] = [ ]
107 for report_obj in self.reports:
108 typed_report = __.typx.cast( _engine.Report, report_obj )
109 violations_data = [
110 v.render_as_json( ) for v in typed_report.violations
111 ]
112 files_data.append( {
113 'filename': typed_report.filename,
114 'violations': violations_data,
115 'violation_count': len( typed_report.violations ),
116 'rule_count': typed_report.rule_count,
117 'analysis_duration_ms': typed_report.analysis_duration_ms,
118 } )
119 result: dict[ str, __.typx.Any ] = {
120 'files': files_data,
121 'total_violations': self.total_violations,
122 'total_files': self.total_files,
123 }
124 if not __.is_absent( self.rule_selection ):
125 result[ 'rule_selection' ] = self.rule_selection
126 return result
128 def render_as_text( self ) -> tuple[ str, ... ]:
129 ''' Renders result as text lines. '''
130 lines: list[ str ] = [ ]
131 for report_obj in self.reports:
132 typed_report = __.typx.cast( _engine.Report, report_obj )
133 if typed_report.violations:
134 lines.append( f'\n{typed_report.filename}:' )
135 lines.extend(
136 v.render_as_text( )
137 for v in typed_report.violations )
138 if not lines:
139 lines.append( 'No violations found.' )
140 else:
141 lines.append(
142 f'\nFound {self.total_violations} violations '
143 f'in {self.total_files} files.' )
144 return tuple( lines )
147class FixResult( RenderableResult ):
148 ''' Result from fix command execution. '''
150 paths: tuple[ str, ... ]
151 simulate: bool
152 diff_format: str
153 apply_dangerous: bool
154 rule_selection: __.Absential[ str ] = __.absent
156 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
157 ''' Renders result as JSON-compatible dictionary. '''
158 result: dict[ str, __.typx.Any ] = {
159 'paths': list( self.paths ),
160 'simulate': self.simulate,
161 'diff_format': self.diff_format,
162 'apply_dangerous': self.apply_dangerous,
163 }
164 if not __.is_absent( self.rule_selection ):
165 result[ 'rule_selection' ] = self.rule_selection
166 return result
168 def render_as_text( self ) -> tuple[ str, ... ]:
169 ''' Renders result as text lines. '''
170 lines = [ f'Fixing paths: {self.paths}' ]
171 if not __.is_absent( self.rule_selection ):
172 lines.append( f' Rule selection: {self.rule_selection}' )
173 lines.append( f' Simulate: {self.simulate}' )
174 lines.append( f' Diff format: {self.diff_format}' )
175 lines.append( f' Apply dangerous: {self.apply_dangerous}' )
176 return tuple( lines )
179class ConfigureResult( RenderableResult ):
180 ''' Result from configure command execution. '''
182 validate: bool
183 interactive: bool
184 display_effective: bool
186 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
187 ''' Renders result as JSON-compatible dictionary. '''
188 return {
189 'validate': self.validate,
190 'interactive': self.interactive,
191 'display_effective': self.display_effective,
192 }
194 def render_as_text( self ) -> tuple[ str, ... ]:
195 ''' Renders result as text lines. '''
196 return (
197 'Configure command',
198 f' Validate: {self.validate}',
199 f' Interactive: {self.interactive}',
200 f' Display effective: {self.display_effective}',
201 )
204class DescribeRulesResult( RenderableResult ):
205 ''' Result from describe rules command execution. '''
207 details: bool
209 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
210 ''' Renders result as JSON-compatible dictionary. '''
211 return { 'details': self.details }
213 def render_as_text( self ) -> tuple[ str, ... ]:
214 ''' Renders result as text lines. '''
215 return (
216 'Available rules',
217 f' Details: {self.details}',
218 )
221class DescribeRuleResult( RenderableResult ):
222 ''' Result from describe rule command execution. '''
224 rule_id: str
225 details: bool
227 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
228 ''' Renders result as JSON-compatible dictionary. '''
229 return {
230 'rule_id': self.rule_id,
231 'details': self.details,
232 }
234 def render_as_text( self ) -> tuple[ str, ... ]:
235 ''' Renders result as text lines. '''
236 return (
237 f'Rule: {self.rule_id}',
238 f' Details: {self.details}',
239 )
242class ServeResult( RenderableResult ):
243 ''' Result from serve command execution. '''
245 protocol: str
247 def render_as_json( self ) -> dict[ str, __.typx.Any ]:
248 ''' Renders result as JSON-compatible dictionary. '''
249 return {
250 'protocol': self.protocol,
251 'status': 'not_implemented',
252 }
254 def render_as_text( self ) -> tuple[ str, ... ]:
255 ''' Renders result as text lines. '''
256 return (
257 f'Protocol server: {self.protocol}',
258 ' (Not yet implemented)',
259 )
262class CheckCommand( __.immut.DataclassObject ):
263 ''' Analyzes code and reports violations. '''
265 paths: PathsArgument = ( '.',)
266 select: __.Absential[ RuleSelectorArgument ] = __.absent
267 jobs: __.typx.Annotated[
268 __.typx.Union[ int, __.typx.Literal[ 'auto' ] ],
269 __.tyro.conf.arg( prefix_name = False ),
270 __.ddoc.Doc( ''' Number of parallel processing jobs. ''' )
271 ] = 'auto'
273 async def __call__( self, display: DisplayOptions ) -> int:
274 ''' Executes the check command. '''
275 # TODO: Implement parallel processing with jobs parameter
276 _ = self.jobs # Suppress vulture warning
277 config = _configuration.discover_configuration( )
278 file_paths = _discover_python_files( self.paths )
279 if not __.is_absent( config ):
280 file_paths = _apply_path_filters( file_paths, config )
281 if not file_paths:
282 result = CheckResult(
283 paths = self.paths,
284 reports = ( ),
285 total_violations = 0,
286 total_files = 0,
287 rule_selection = self.select,
288 )
289 async with __.ctxl.AsyncExitStack( ) as exits:
290 await _render_and_print_result( result, display, exits )
291 return 0
292 enabled_rules = _merge_rule_selection( self.select, config )
293 context_size = _merge_context_size( display.context, config )
294 rule_parameters: __.immut.Dictionary[
295 str, __.immut.Dictionary[ str, __.typx.Any ] ]
296 if __.is_absent( config ):
297 rule_parameters = __.immut.Dictionary( )
298 else:
299 rule_parameters = config.rule_parameters
300 configuration = _engine.EngineConfiguration(
301 enabled_rules = enabled_rules,
302 context_size = context_size,
303 include_context = context_size > 0,
304 rule_parameters = rule_parameters,
305 )
306 registry_manager = _rules.create_registry_manager( )
307 engine = _engine.Engine( registry_manager, configuration )
308 reports = engine.lint_files( file_paths )
309 total_violations = sum( len( r.violations ) for r in reports )
310 result = CheckResult(
311 paths = self.paths,
312 reports = reports,
313 total_violations = total_violations,
314 total_files = len( reports ),
315 rule_selection = self.select,
316 )
317 async with __.ctxl.AsyncExitStack( ) as exits:
318 await _render_and_print_result( result, display, exits )
319 return 1 if total_violations > 0 else 0
322class FixCommand( __.immut.DataclassObject ):
323 ''' Applies automated fixes with safety controls. '''
325 paths: PathsArgument = ( '.',)
326 select: __.Absential[ RuleSelectorArgument ] = __.absent
327 simulate: __.typx.Annotated[
328 bool,
329 __.tyro.conf.arg( prefix_name = False ),
330 __.ddoc.Doc( ''' Preview changes without applying them. ''' )
331 ] = False
332 diff_format: __.typx.Annotated[
333 DiffFormats,
334 __.tyro.conf.arg( prefix_name = False ),
335 __.ddoc.Doc( ''' Diff visualization format. ''' )
336 ] = DiffFormats.Unified
337 apply_dangerous: __.typx.Annotated[
338 bool,
339 __.tyro.conf.arg( prefix_name = False ),
340 __.ddoc.Doc( ''' Enable potentially unsafe fixes. ''' )
341 ] = False
343 async def __call__( self, display: DisplayOptions ) -> int:
344 ''' Executes the fix command. '''
345 result = FixResult(
346 paths = self.paths,
347 simulate = self.simulate,
348 diff_format = self.diff_format.value,
349 apply_dangerous = self.apply_dangerous,
350 rule_selection = self.select,
351 )
352 async with __.ctxl.AsyncExitStack( ) as exits:
353 await _render_and_print_result( result, display, exits )
354 return 0
357class ConfigureCommand( __.immut.DataclassObject ):
358 ''' Manages configuration without destructive file editing. '''
360 validate: __.typx.Annotated[
361 bool,
362 __.tyro.conf.arg( prefix_name = False ),
363 __.ddoc.Doc(
364 ''' Validate existing configuration without analysis. ''' )
365 ] = False
366 interactive: __.typx.Annotated[
367 bool,
368 __.tyro.conf.arg( prefix_name = False ),
369 __.ddoc.Doc( ''' Interactive configuration wizard. ''' )
370 ] = False
371 display_effective: __.typx.Annotated[
372 bool,
373 __.tyro.conf.arg( prefix_name = False ),
374 __.ddoc.Doc( ''' Display effective merged configuration. ''' )
375 ] = False
377 async def __call__( self, display: DisplayOptions ) -> int:
378 ''' Executes the configure command. '''
379 result = ConfigureResult(
380 validate = self.validate,
381 interactive = self.interactive,
382 display_effective = self.display_effective,
383 )
384 async with __.ctxl.AsyncExitStack( ) as exits:
385 await _render_and_print_result( result, display, exits )
386 return 0
389class DescribeRulesCommand( __.immut.DataclassObject ):
390 ''' Lists all available rules with descriptions. '''
392 details: __.typx.Annotated[
393 bool,
394 __.tyro.conf.arg( prefix_name = False ),
395 __.ddoc.Doc(
396 ''' Display detailed rule information including '''
397 ''' configuration status. ''' )
398 ] = False
400 async def __call__( self, display: DisplayOptions ) -> int:
401 ''' Executes the describe rules command. '''
402 result = DescribeRulesResult( details = self.details )
403 async with __.ctxl.AsyncExitStack( ) as exits:
404 await _render_and_print_result( result, display, exits )
405 return 0
408class DescribeRuleCommand( __.immut.DataclassObject ):
409 ''' Displays detailed information for a specific rule. '''
411 rule_id: __.tyro.conf.Positional[ str ]
412 details: __.typx.Annotated[
413 bool,
414 __.tyro.conf.arg( prefix_name = False ),
415 __.ddoc.Doc(
416 ''' Display detailed rule information including '''
417 ''' configuration status. ''' )
418 ] = False
420 async def __call__( self, display: DisplayOptions ) -> int:
421 ''' Executes the describe rule command. '''
422 result = DescribeRuleResult(
423 rule_id = self.rule_id,
424 details = self.details,
425 )
426 async with __.ctxl.AsyncExitStack( ) as exits:
427 await _render_and_print_result( result, display, exits )
428 return 0
431class DescribeCommand( __.immut.DataclassObject ):
432 ''' Displays rule information and documentation. '''
434 subcommand: __.typx.Union[
435 __.typx.Annotated[
436 DescribeRulesCommand,
437 __.tyro.conf.subcommand( 'rules', prefix_name = False ),
438 ],
439 __.typx.Annotated[
440 DescribeRuleCommand,
441 __.tyro.conf.subcommand( 'rule', prefix_name = False ),
442 ],
443 ]
445 async def __call__( self, display: DisplayOptions ) -> int:
446 ''' Delegates to selected subcommand. '''
447 return await self.subcommand( display )
450class ServeCommand( __.immut.DataclassObject ):
451 ''' Starts a protocol server (future implementation). '''
453 protocol: __.typx.Annotated[
454 __.typx.Literal[ 'lsp', 'mcp' ],
455 __.tyro.conf.arg( prefix_name = False ),
456 __.ddoc.Doc( ''' Protocol server to start. ''' )
457 ] = 'mcp'
459 async def __call__( self, display: DisplayOptions ) -> int:
460 ''' Executes the serve command. '''
461 result = ServeResult( protocol = self.protocol )
462 async with __.ctxl.AsyncExitStack( ) as exits:
463 await _render_and_print_result( result, display, exits )
464 return 0
467class Cli( __.immut.DataclassObject ):
468 ''' Linter command-line interface. '''
470 command: __.typx.Union[
471 __.typx.Annotated[
472 CheckCommand,
473 __.tyro.conf.subcommand(
474 'check', prefix_name = False, default = True ),
475 ],
476 __.typx.Annotated[
477 FixCommand,
478 __.tyro.conf.subcommand( 'fix', prefix_name = False ),
479 ],
480 __.typx.Annotated[
481 ConfigureCommand,
482 __.tyro.conf.subcommand( 'configure', prefix_name = False ),
483 ],
484 __.typx.Annotated[
485 DescribeCommand,
486 __.tyro.conf.subcommand( 'describe', prefix_name = False ),
487 ],
488 __.typx.Annotated[
489 ServeCommand,
490 __.tyro.conf.subcommand( 'serve', prefix_name = False ),
491 ],
492 ]
493 display: __.typx.Annotated[
494 DisplayOptions,
495 __.tyro.conf.arg( prefix_name = False ),
496 ] = __.dcls.field( default_factory = DisplayOptions )
497 verbose: __.typx.Annotated[
498 bool,
499 __.ddoc.Doc( ''' Enable verbose output. ''' )
500 ] = False
502 async def __call__( self ) -> None:
503 ''' Invokes selected subcommand after system preparation. '''
504 # TODO: Implement verbose logging setup
505 _ = self.verbose # Suppress vulture warning
506 async with intercept_errors( self.display ):
507 exit_code = await self.command( self.display )
508 raise SystemExit( exit_code )
511def execute( ) -> None:
512 ''' Entrypoint for CLI execution. '''
513 from asyncio import run
514 config = (
515 __.tyro.conf.EnumChoicesFromValues,
516 __.tyro.conf.HelptextFromCommentsOff,
517 )
518 try: run( __.tyro.cli( Cli, config = config )( ) ) # pyright: ignore
519 except SystemExit: raise
520 except BaseException:
521 # TODO: Log exception with proper error handling
522 raise SystemExit( 1 ) from None
525@__.ctxl.asynccontextmanager
526async def intercept_errors(
527 display: DisplayOptions,
528) -> __.cabc.AsyncIterator[ None ]:
529 ''' Context manager that intercepts and renders exceptions.
531 Catches Omnierror exceptions and renders them according to the
532 display format. Handles unexpected exceptions by logging and
533 formatting as errors.
534 '''
535 from . import exceptions as _exceptions
536 try:
537 yield
538 except _exceptions.Omnierror as exc:
539 async with __.ctxl.AsyncExitStack( ) as exits:
540 stream = await display.provide_stream( exits )
541 match display.format:
542 case DisplayFormats.Json:
543 stream.write(
544 __.json.dumps( exc.render_as_json( ), indent = 2 ) )
545 stream.write( '\n' )
546 case DisplayFormats.Text:
547 for line in exc.render_as_text( ):
548 stream.write( line )
549 stream.write( '\n' )
550 raise SystemExit( 1 ) from exc
551 except ( SystemExit, KeyboardInterrupt ):
552 raise
553 except BaseException as exc:
554 # TODO: Log exception with proper error handling via scribe
555 async with __.ctxl.AsyncExitStack( ) as exits:
556 stream = await display.provide_stream( exits )
557 match display.format:
558 case DisplayFormats.Json:
559 error_data = {
560 'type': 'unexpected_error',
561 'message': str( exc ),
562 }
563 stream.write( __.json.dumps( error_data, indent = 2 ) )
564 stream.write( '\n' )
565 case DisplayFormats.Text:
566 stream.write( '## Unexpected Error\n' )
567 stream.write( f'**Message**: {exc}\n' )
568 raise SystemExit( 1 ) from exc
571def _discover_python_files(
572 paths: __.cabc.Sequence[ str ]
573) -> tuple[ __.pathlib.Path, ... ]:
574 ''' Discovers Python files from file paths or directories. '''
575 python_files: list[ __.pathlib.Path ] = [ ]
576 for path_str in paths:
577 path = __.pathlib.Path( path_str )
578 if not path.exists( ):
579 continue
580 if path.is_file( ) and path.suffix == '.py':
581 python_files.append( path )
582 elif path.is_dir( ):
583 python_files.extend( path.rglob( '*.py' ) )
584 return tuple( sorted( set( python_files ) ) )
587def _apply_path_filters(
588 file_paths: tuple[ __.pathlib.Path, ... ],
589 config: __.typx.Any,
590) -> tuple[ __.pathlib.Path, ... ]:
591 ''' Applies include/exclude path filters from configuration. '''
592 typed_config = __.typx.cast( _configuration.Configuration, config )
593 filtered = list( file_paths )
594 if not __.is_absent( typed_config.include_paths ):
595 filtered = [
596 fp for fp in filtered
597 if _matches_any_pattern( fp, typed_config.include_paths )
598 ]
599 if not __.is_absent( typed_config.exclude_paths ):
600 patterns = typed_config.exclude_paths
601 filtered = [
602 fp for fp in filtered
603 if not _matches_any_pattern( fp, patterns )
604 ]
605 return tuple( filtered )
608def _matches_any_pattern(
609 file_path: __.pathlib.Path,
610 patterns: tuple[ str, ... ],
611) -> bool:
612 ''' Checks if file path matches any glob pattern. '''
613 path_str = str( file_path )
614 for pattern in patterns:
615 if __.wcglob.globmatch(
616 path_str, pattern, flags = __.wcglob.GLOBSTAR ):
617 return True
618 return False
621def _merge_context_size(
622 cli_context: int,
623 config: __.Absential[ __.typx.Any ],
624) -> int:
625 ''' Merges context size from CLI and configuration. '''
626 if cli_context > 0:
627 return cli_context
628 if __.is_absent( config ):
629 return 0
630 typed_config = __.typx.cast( _configuration.Configuration, config )
631 if __.is_absent( typed_config.context ):
632 return 0
633 return typed_config.context
636def _merge_rule_selection(
637 cli_selection: __.Absential[ str ],
638 config: __.Absential[ __.typx.Any ],
639) -> frozenset[ str ]:
640 ''' Merges rule selection from CLI and configuration. '''
641 from .rules.implementations.__ import RULE_DESCRIPTORS
642 all_rules = frozenset( RULE_DESCRIPTORS.keys( ) )
643 if not __.is_absent( cli_selection ):
644 codes = cli_selection.split( ',' )
645 return frozenset( code.strip( ) for code in codes )
646 if __.is_absent( config ):
647 return all_rules
648 typed_config = __.typx.cast( _configuration.Configuration, config )
649 if not __.is_absent( typed_config.select ):
650 selected = set( typed_config.select )
651 else:
652 selected = set( all_rules )
653 if not __.is_absent( typed_config.exclude_rules ):
654 selected -= set( typed_config.exclude_rules )
655 return frozenset( selected )
658async def _render_and_print_result(
659 result: RenderableResult,
660 display: DisplayOptions,
661 exits: __.ctxl.AsyncExitStack,
662) -> None:
663 ''' Renders and prints a result object based on display options. '''
664 stream = await display.provide_stream( exits )
665 match display.format:
666 case DisplayFormats.Json:
667 stream.write( __.json.dumps( result.render_as_json( ) ) )
668 stream.write( '\n' )
669 case DisplayFormats.Text:
670 for line in result.render_as_text( ):
671 stream.write( line )
672 stream.write( '\n' )