Coverage for sources / vibelinter / cli.py: 28%

290 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-07 04:34 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Command-line interface. ''' 

22 

23# ruff: noqa: F821 

24 

25 

26from appcore import cli as _appcore_cli 

27 

28from . import __ 

29from . import configuration as _configuration 

30from . import engine as _engine 

31from . import rules as _rules 

32# Ensure registry is available for type hints 

33from .rules import registry as _registry 

34 

35 

36class DiffFormats( __.enum.Enum ): 

37 ''' Diff visualization formats. ''' 

38 

39 Unified = 'unified' 

40 Context = 'context' 

41 

42 

43class DisplayFormats( __.enum.Enum ): 

44 ''' Output formats for reporting. ''' 

45 

46 Text = 'text' 

47 Json = 'json' 

48 

49 

50class DisplayOptions( _appcore_cli.DisplayOptions ): 

51 ''' Display options extending appcore.cli with output format selection. 

52 

53 Adds format-specific output control for linter reporting. 

54 ''' 

55 

56 format: __.typx.Annotated[ 

57 DisplayFormats, 

58 __.tyro.conf.arg( prefix_name = False ), 

59 __.ddoc.Doc( ''' Output format for reporting. ''' ) 

60 ] = DisplayFormats.Text 

61 context: __.typx.Annotated[ 

62 int, 

63 __.tyro.conf.arg( prefix_name = False ), 

64 __.ddoc.Doc( ''' Show context lines around violations. ''' ) 

65 ] = 0 

66 

67 

68RuleSelectorArgument: __.typx.TypeAlias = __.typx.Annotated[ 

69 str, 

70 __.tyro.conf.arg( prefix_name = False ), 

71 __.ddoc.Doc( 

72 ''' Comma-separated rule identifiers ''' 

73 ''' (e.g. VBL101, function-ordering). ''' 

74 ) 

75] 

76PathsArgument: __.typx.TypeAlias = __.tyro.conf.Positional[ 

77 tuple[ str, ... ] 

78] 

79 

80 

81class RenderableResult( __.immut.DataclassProtocol, __.typx.Protocol ): 

82 ''' Protocol for command results with format-specific rendering. 

83 

84 Combines DataclassProtocol and Protocol to provide both structural 

85 typing and dataclass compatibility. Result classes should explicitly 

86 inherit from this base class. 

87 ''' 

88 

89 @__.abc.abstractmethod 

90 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

91 ''' Renders result as JSON-compatible dictionary. ''' 

92 raise NotImplementedError 

93 

94 @__.abc.abstractmethod 

95 def render_as_text( self ) -> tuple[ str, ... ]: 

96 ''' Renders result as text lines. ''' 

97 raise NotImplementedError 

98 

99 

100class CheckResult( RenderableResult ): 

101 ''' Result from check command execution. ''' 

102 

103 paths: tuple[ str, ... ] 

104 reports: tuple[ __.typx.Any, ... ] # Engine Report objects 

105 total_violations: int 

106 total_files: int 

107 rule_selection: __.Absential[ str ] = __.absent 

108 

109 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

110 ''' Renders result as JSON-compatible dictionary. ''' 

111 files_data: list[ dict[ str, __.typx.Any ] ] = [ ] 

112 for report_obj in self.reports: 

113 typed_report = __.typx.cast( _engine.Report, report_obj ) 

114 violations_data = [ 

115 v.render_as_json( ) for v in typed_report.violations 

116 ] 

117 files_data.append( { 

118 'filename': typed_report.filename, 

119 'violations': violations_data, 

120 'violation_count': len( typed_report.violations ), 

121 'rule_count': typed_report.rule_count, 

122 'analysis_duration_ms': typed_report.analysis_duration_ms, 

123 } ) 

124 result: dict[ str, __.typx.Any ] = { 

125 'files': files_data, 

126 'total_violations': self.total_violations, 

127 'total_files': self.total_files, 

128 } 

129 if not __.is_absent( self.rule_selection ): 

130 result[ 'rule_selection' ] = self.rule_selection 

131 return result 

132 

133 def render_as_text( self ) -> tuple[ str, ... ]: 

134 ''' Renders result as text lines. ''' 

135 lines: list[ str ] = [ ] 

136 for report_obj in self.reports: 

137 typed_report = __.typx.cast( _engine.Report, report_obj ) 

138 if typed_report.violations: 

139 lines.append( f'\n{typed_report.filename}:' ) 

140 lines.extend( 

141 v.render_as_text( ) 

142 for v in typed_report.violations ) 

143 if not lines: 

144 lines.append( 'No violations found.' ) 

145 else: 

146 lines.append( 

147 f'\nFound {self.total_violations} violations ' 

148 f'in {self.total_files} files.' ) 

149 return tuple( lines ) 

150 

151 

152class FixResult( RenderableResult ): 

153 ''' Result from fix command execution. ''' 

154 

155 paths: tuple[ str, ... ] 

156 simulate: bool 

157 diff_format: str 

158 apply_dangerous: bool 

159 rule_selection: __.Absential[ str ] = __.absent 

160 

161 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

162 ''' Renders result as JSON-compatible dictionary. ''' 

163 result: dict[ str, __.typx.Any ] = { 

164 'paths': list( self.paths ), 

165 'simulate': self.simulate, 

166 'diff_format': self.diff_format, 

167 'apply_dangerous': self.apply_dangerous, 

168 } 

169 if not __.is_absent( self.rule_selection ): 

170 result[ 'rule_selection' ] = self.rule_selection 

171 return result 

172 

173 def render_as_text( self ) -> tuple[ str, ... ]: 

174 ''' Renders result as text lines. ''' 

175 lines = [ f'Fixing paths: {self.paths}' ] 

176 if not __.is_absent( self.rule_selection ): 

177 lines.append( f' Rule selection: {self.rule_selection}' ) 

178 lines.append( f' Simulate: {self.simulate}' ) 

179 lines.append( f' Diff format: {self.diff_format}' ) 

180 lines.append( f' Apply dangerous: {self.apply_dangerous}' ) 

181 return tuple( lines ) 

182 

183 

184class ConfigureResult( RenderableResult ): 

185 ''' Result from configure command execution. ''' 

186 

187 validate: bool 

188 interactive: bool 

189 display_effective: bool 

190 

191 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

192 ''' Renders result as JSON-compatible dictionary. ''' 

193 return { 

194 'validate': self.validate, 

195 'interactive': self.interactive, 

196 'display_effective': self.display_effective, 

197 } 

198 

199 def render_as_text( self ) -> tuple[ str, ... ]: 

200 ''' Renders result as text lines. ''' 

201 return ( 

202 'Configure command', 

203 f' Validate: {self.validate}', 

204 f' Interactive: {self.interactive}', 

205 f' Display effective: {self.display_effective}', 

206 ) 

207 

208 

209class DescribeRulesResult( RenderableResult ): 

210 ''' Result from describe rules command execution. ''' 

211 

212 details: bool 

213 

214 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

215 ''' Renders result as JSON-compatible dictionary. ''' 

216 return { 'details': self.details } 

217 

218 def render_as_text( self ) -> tuple[ str, ... ]: 

219 ''' Renders result as text lines. ''' 

220 return ( 

221 'Available rules', 

222 f' Details: {self.details}', 

223 ) 

224 

225 

226class DescribeRuleResult( RenderableResult ): 

227 ''' Result from describe rule command execution. ''' 

228 

229 rule_id: str 

230 details: bool 

231 

232 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

233 ''' Renders result as JSON-compatible dictionary. ''' 

234 return { 

235 'rule_id': self.rule_id, 

236 'details': self.details, 

237 } 

238 

239 def render_as_text( self ) -> tuple[ str, ... ]: 

240 ''' Renders result as text lines. ''' 

241 return ( 

242 f'Rule: {self.rule_id}', 

243 f' Details: {self.details}', 

244 ) 

245 

246 

247class ServeResult( RenderableResult ): 

248 ''' Result from serve command execution. ''' 

249 

250 protocol: str 

251 

252 def render_as_json( self ) -> dict[ str, __.typx.Any ]: 

253 ''' Renders result as JSON-compatible dictionary. ''' 

254 return { 

255 'protocol': self.protocol, 

256 'status': 'not_implemented', 

257 } 

258 

259 def render_as_text( self ) -> tuple[ str, ... ]: 

260 ''' Renders result as text lines. ''' 

261 return ( 

262 f'Protocol server: {self.protocol}', 

263 ' (Not yet implemented)', 

264 ) 

265 

266 

267class CheckCommand( __.immut.DataclassObject ): 

268 ''' Analyzes code and reports violations. ''' 

269 

270 paths: PathsArgument = ( '.',) 

271 select: __.Absential[ RuleSelectorArgument ] = __.absent 

272 jobs: __.typx.Annotated[ 

273 __.typx.Union[ int, __.typx.Literal[ 'auto' ] ], 

274 __.tyro.conf.arg( prefix_name = False ), 

275 __.ddoc.Doc( ''' Number of parallel processing jobs. ''' ) 

276 ] = 'auto' 

277 

278 async def __call__( self, display: DisplayOptions ) -> int: 

279 ''' Executes the check command. ''' 

280 # TODO: Implement parallel processing with jobs parameter 

281 _ = self.jobs # Suppress vulture warning 

282 config = _configuration.discover_configuration( ) 

283 file_paths = _discover_python_files( self.paths ) 

284 if not __.is_absent( config ): 

285 file_paths = _apply_path_filters( file_paths, config ) 

286 if not file_paths: 

287 result = CheckResult( 

288 paths = self.paths, 

289 reports = ( ), 

290 total_violations = 0, 

291 total_files = 0, 

292 rule_selection = self.select, 

293 ) 

294 async with __.ctxl.AsyncExitStack( ) as exits: 

295 await _render_and_print_result( result, display, exits ) 

296 return 0 

297 enabled_rules = _merge_rule_selection( 

298 self.select, config, _rules.create_registry_manager( ) ) 

299 context_size = _merge_context_size( display.context, config ) 

300 rule_parameters: __.immut.Dictionary[ 

301 str, __.immut.Dictionary[ str, __.typx.Any ] ] 

302 per_file_ignores: __.immut.Dictionary[ str, tuple[ str, ... ] ] 

303 if __.is_absent( config ): 

304 rule_parameters = __.immut.Dictionary( ) 

305 per_file_ignores = __.immut.Dictionary( ) 

306 else: 

307 rule_parameters = config.rule_parameters 

308 per_file_ignores = config.per_file_ignores 

309 configuration = _engine.EngineConfiguration( 

310 enabled_rules = enabled_rules, 

311 context_size = context_size, 

312 include_context = context_size > 0, 

313 rule_parameters = rule_parameters, 

314 per_file_ignores = per_file_ignores, 

315 ) 

316 registry_manager = _rules.create_registry_manager( ) 

317 engine = _engine.Engine( registry_manager, configuration ) 

318 reports = engine.lint_files( file_paths ) 

319 total_violations = sum( len( r.violations ) for r in reports ) 

320 result = CheckResult( 

321 paths = self.paths, 

322 reports = reports, 

323 total_violations = total_violations, 

324 total_files = len( reports ), 

325 rule_selection = self.select, 

326 ) 

327 async with __.ctxl.AsyncExitStack( ) as exits: 

328 await _render_and_print_result( result, display, exits ) 

329 return 1 if total_violations > 0 else 0 

330 

331 

332class FixCommand( __.immut.DataclassObject ): 

333 ''' Applies automated fixes with safety controls. ''' 

334 

335 paths: PathsArgument = ( '.',) 

336 select: __.Absential[ RuleSelectorArgument ] = __.absent 

337 simulate: __.typx.Annotated[ 

338 bool, 

339 __.tyro.conf.arg( prefix_name = False ), 

340 __.ddoc.Doc( ''' Preview changes without applying them. ''' ) 

341 ] = False 

342 diff_format: __.typx.Annotated[ 

343 DiffFormats, 

344 __.tyro.conf.arg( prefix_name = False ), 

345 __.ddoc.Doc( ''' Diff visualization format. ''' ) 

346 ] = DiffFormats.Unified 

347 apply_dangerous: __.typx.Annotated[ 

348 bool, 

349 __.tyro.conf.arg( prefix_name = False ), 

350 __.ddoc.Doc( ''' Enable potentially unsafe fixes. ''' ) 

351 ] = False 

352 

353 async def __call__( self, display: DisplayOptions ) -> int: 

354 ''' Executes the fix command. ''' 

355 result = FixResult( 

356 paths = self.paths, 

357 simulate = self.simulate, 

358 diff_format = self.diff_format.value, 

359 apply_dangerous = self.apply_dangerous, 

360 rule_selection = self.select, 

361 ) 

362 async with __.ctxl.AsyncExitStack( ) as exits: 

363 await _render_and_print_result( result, display, exits ) 

364 return 0 

365 

366 

367class ConfigureCommand( __.immut.DataclassObject ): 

368 ''' Manages configuration without destructive file editing. ''' 

369 

370 validate: __.typx.Annotated[ 

371 bool, 

372 __.tyro.conf.arg( prefix_name = False ), 

373 __.ddoc.Doc( 

374 ''' Validate existing configuration without analysis. ''' ) 

375 ] = False 

376 interactive: __.typx.Annotated[ 

377 bool, 

378 __.tyro.conf.arg( prefix_name = False ), 

379 __.ddoc.Doc( ''' Interactive configuration wizard. ''' ) 

380 ] = False 

381 display_effective: __.typx.Annotated[ 

382 bool, 

383 __.tyro.conf.arg( prefix_name = False ), 

384 __.ddoc.Doc( ''' Display effective merged configuration. ''' ) 

385 ] = False 

386 

387 async def __call__( self, display: DisplayOptions ) -> int: 

388 ''' Executes the configure command. ''' 

389 result = ConfigureResult( 

390 validate = self.validate, 

391 interactive = self.interactive, 

392 display_effective = self.display_effective, 

393 ) 

394 async with __.ctxl.AsyncExitStack( ) as exits: 

395 await _render_and_print_result( result, display, exits ) 

396 return 0 

397 

398 

399class DescribeRulesCommand( __.immut.DataclassObject ): 

400 ''' Lists all available rules with descriptions. ''' 

401 

402 details: __.typx.Annotated[ 

403 bool, 

404 __.tyro.conf.arg( prefix_name = False ), 

405 __.ddoc.Doc( 

406 ''' Display detailed rule information including ''' 

407 ''' configuration status. ''' ) 

408 ] = False 

409 

410 async def __call__( self, display: DisplayOptions ) -> int: 

411 ''' Executes the describe rules command. ''' 

412 result = DescribeRulesResult( details = self.details ) 

413 async with __.ctxl.AsyncExitStack( ) as exits: 

414 await _render_and_print_result( result, display, exits ) 

415 return 0 

416 

417 

418class DescribeRuleCommand( __.immut.DataclassObject ): 

419 ''' Displays detailed information for a specific rule. ''' 

420 

421 rule_id: __.tyro.conf.Positional[ str ] 

422 details: __.typx.Annotated[ 

423 bool, 

424 __.tyro.conf.arg( prefix_name = False ), 

425 __.ddoc.Doc( 

426 ''' Display detailed rule information including ''' 

427 ''' configuration status. ''' ) 

428 ] = False 

429 

430 async def __call__( self, display: DisplayOptions ) -> int: 

431 ''' Executes the describe rule command. ''' 

432 result = DescribeRuleResult( 

433 rule_id = self.rule_id, 

434 details = self.details, 

435 ) 

436 async with __.ctxl.AsyncExitStack( ) as exits: 

437 await _render_and_print_result( result, display, exits ) 

438 return 0 

439 

440 

441class DescribeCommand( __.immut.DataclassObject ): 

442 ''' Displays rule information and documentation. ''' 

443 

444 subcommand: __.typx.Union[ 

445 __.typx.Annotated[ 

446 DescribeRulesCommand, 

447 __.tyro.conf.subcommand( 'rules', prefix_name = False ), 

448 ], 

449 __.typx.Annotated[ 

450 DescribeRuleCommand, 

451 __.tyro.conf.subcommand( 'rule', prefix_name = False ), 

452 ], 

453 ] 

454 

455 async def __call__( self, display: DisplayOptions ) -> int: 

456 ''' Delegates to selected subcommand. ''' 

457 return await self.subcommand( display ) 

458 

459 

460class ServeCommand( __.immut.DataclassObject ): 

461 ''' Starts a protocol server (future implementation). ''' 

462 

463 protocol: __.typx.Annotated[ 

464 __.typx.Literal[ 'lsp', 'mcp' ], 

465 __.tyro.conf.arg( prefix_name = False ), 

466 __.ddoc.Doc( ''' Protocol server to start. ''' ) 

467 ] = 'mcp' 

468 

469 async def __call__( self, display: DisplayOptions ) -> int: 

470 ''' Executes the serve command. ''' 

471 result = ServeResult( protocol = self.protocol ) 

472 async with __.ctxl.AsyncExitStack( ) as exits: 

473 await _render_and_print_result( result, display, exits ) 

474 return 0 

475 

476 

477class Cli( __.immut.DataclassObject ): 

478 ''' Linter command-line interface. ''' 

479 

480 command: __.typx.Union[ 

481 __.typx.Annotated[ 

482 CheckCommand, 

483 __.tyro.conf.subcommand( 

484 'check', prefix_name = False, default = True ), 

485 ], 

486 __.typx.Annotated[ 

487 FixCommand, 

488 __.tyro.conf.subcommand( 'fix', prefix_name = False ), 

489 ], 

490 __.typx.Annotated[ 

491 ConfigureCommand, 

492 __.tyro.conf.subcommand( 'configure', prefix_name = False ), 

493 ], 

494 __.typx.Annotated[ 

495 DescribeCommand, 

496 __.tyro.conf.subcommand( 'describe', prefix_name = False ), 

497 ], 

498 __.typx.Annotated[ 

499 ServeCommand, 

500 __.tyro.conf.subcommand( 'serve', prefix_name = False ), 

501 ], 

502 ] 

503 display: __.typx.Annotated[ 

504 DisplayOptions, 

505 __.tyro.conf.arg( prefix_name = False ), 

506 ] = __.dcls.field( default_factory = DisplayOptions ) 

507 verbose: __.typx.Annotated[ 

508 bool, 

509 __.ddoc.Doc( ''' Enable verbose output. ''' ) 

510 ] = False 

511 

512 async def __call__( self ) -> None: 

513 ''' Invokes selected subcommand after system preparation. ''' 

514 # TODO: Implement verbose logging setup 

515 _ = self.verbose # Suppress vulture warning 

516 async with intercept_errors( self.display ): 

517 exit_code = await self.command( self.display ) 

518 raise SystemExit( exit_code ) 

519 

520 

521def execute( ) -> None: 

522 ''' Entrypoint for CLI execution. ''' 

523 from asyncio import run 

524 config = ( 

525 __.tyro.conf.EnumChoicesFromValues, 

526 __.tyro.conf.HelptextFromCommentsOff, 

527 ) 

528 try: run( __.tyro.cli( Cli, config = config )( ) ) # pyright: ignore 

529 except SystemExit: raise 

530 except BaseException: 

531 # TODO: Log exception with proper error handling 

532 raise SystemExit( 1 ) from None 

533 

534 

535@__.ctxl.asynccontextmanager 

536async def intercept_errors( 

537 display: DisplayOptions, 

538) -> __.cabc.AsyncIterator[ None ]: 

539 ''' Context manager that intercepts and renders exceptions. 

540 

541 Catches Omnierror exceptions and renders them according to the 

542 display format. Handles unexpected exceptions by logging and 

543 formatting as errors. 

544 ''' 

545 from . import exceptions as _exceptions 

546 try: 

547 yield 

548 except _exceptions.Omnierror as exc: 

549 async with __.ctxl.AsyncExitStack( ) as exits: 

550 stream = await display.provide_stream( exits ) 

551 match display.format: 

552 case DisplayFormats.Json: 

553 stream.write( 

554 __.json.dumps( exc.render_as_json( ), indent = 2 ) ) 

555 stream.write( '\n' ) 

556 case DisplayFormats.Text: 

557 for line in exc.render_as_text( ): 

558 stream.write( line ) 

559 stream.write( '\n' ) 

560 raise SystemExit( 1 ) from exc 

561 except ( SystemExit, KeyboardInterrupt ): 

562 raise 

563 except BaseException as exc: 

564 # TODO: Log exception with proper error handling via scribe 

565 async with __.ctxl.AsyncExitStack( ) as exits: 

566 stream = await display.provide_stream( exits ) 

567 match display.format: 

568 case DisplayFormats.Json: 

569 error_data = { 

570 'type': 'unexpected_error', 

571 'message': str( exc ), 

572 } 

573 stream.write( __.json.dumps( error_data, indent = 2 ) ) 

574 stream.write( '\n' ) 

575 case DisplayFormats.Text: 

576 stream.write( '## Unexpected Error\n' ) 

577 stream.write( f'**Message**: {exc}\n' ) 

578 raise SystemExit( 1 ) from exc 

579 

580 

581def _discover_python_files( 

582 paths: __.cabc.Sequence[ str ] 

583) -> tuple[ __.pathlib.Path, ... ]: 

584 ''' Discovers Python files from file paths or directories. ''' 

585 python_files: list[ __.pathlib.Path ] = [ ] 

586 for path_str in paths: 

587 path = __.pathlib.Path( path_str ) 

588 if not path.exists( ): 

589 continue 

590 if path.is_file( ) and path.suffix == '.py': 

591 python_files.append( path ) 

592 elif path.is_dir( ): 

593 python_files.extend( path.rglob( '*.py' ) ) 

594 return tuple( sorted( set( python_files ) ) ) 

595 

596 

597def _apply_path_filters( 

598 file_paths: tuple[ __.pathlib.Path, ... ], 

599 config: __.typx.Any, 

600) -> tuple[ __.pathlib.Path, ... ]: 

601 ''' Applies include/exclude path filters from configuration. ''' 

602 typed_config = __.typx.cast( _configuration.Configuration, config ) 

603 filtered = list( file_paths ) 

604 if not __.is_absent( typed_config.include_paths ): 

605 filtered = [ 

606 fp for fp in filtered 

607 if _matches_any_pattern( fp, typed_config.include_paths ) 

608 ] 

609 if not __.is_absent( typed_config.exclude_paths ): 

610 patterns = typed_config.exclude_paths 

611 filtered = [ 

612 fp for fp in filtered 

613 if not _matches_any_pattern( fp, patterns ) 

614 ] 

615 return tuple( filtered ) 

616 

617 

618def _matches_any_pattern( 

619 file_path: __.pathlib.Path, 

620 patterns: tuple[ str, ... ], 

621) -> bool: 

622 ''' Checks if file path matches any glob pattern. ''' 

623 path_str = str( file_path ) 

624 for pattern in patterns: 

625 if __.wcglob.globmatch( 

626 path_str, pattern, flags = __.wcglob.GLOBSTAR ): 

627 return True 

628 return False 

629 

630 

631def _merge_context_size( 

632 cli_context: int, 

633 config: __.Absential[ __.typx.Any ], 

634) -> int: 

635 ''' Merges context size from CLI and configuration. ''' 

636 if cli_context > 0: 

637 return cli_context 

638 if __.is_absent( config ): 

639 return 0 

640 typed_config = __.typx.cast( _configuration.Configuration, config ) 

641 if __.is_absent( typed_config.context ): 

642 return 0 

643 return typed_config.context 

644 

645 

646def _resolve_rule_set( 

647 identifiers: __.cabc.Iterable[ str ], 

648 registry_manager: _registry.RuleRegistryManager, 

649) -> set[ str ]: 

650 ''' Resolves a sequence of rule identifiers to codes. ''' 

651 codes: set[ str ] = set( ) 

652 for raw_identifier in identifiers: 

653 identifier = raw_identifier.strip( ) 

654 if not identifier: 

655 continue 

656 code = registry_manager.resolve_rule_identifier( identifier ) 

657 codes.add( code ) 

658 return codes 

659 

660 

661def _merge_rule_selection( 

662 cli_selection: __.Absential[ str ], 

663 config: __.Absential[ __.typx.Any ], 

664 registry_manager: _registry.RuleRegistryManager, 

665) -> frozenset[ str ]: 

666 ''' Merges rule selection from CLI and configuration. ''' 

667 from .rules.implementations.__ import RULE_DESCRIPTORS 

668 all_rules = frozenset( RULE_DESCRIPTORS.keys( ) ) 

669 if not __.is_absent( cli_selection ): 

670 return frozenset( _resolve_rule_set( 

671 cli_selection.split( ',' ), registry_manager ) ) 

672 if __.is_absent( config ): 

673 return all_rules 

674 typed_config = __.typx.cast( _configuration.Configuration, config ) 

675 if not __.is_absent( typed_config.select ): 

676 selected = _resolve_rule_set( 

677 typed_config.select, registry_manager ) 

678 else: 

679 selected = set( all_rules ) 

680 if not __.is_absent( typed_config.exclude_rules ): 

681 excluded = _resolve_rule_set( 

682 typed_config.exclude_rules, registry_manager ) 

683 selected -= excluded 

684 return frozenset( selected ) 

685 

686 

687async def _render_and_print_result( 

688 result: RenderableResult, 

689 display: DisplayOptions, 

690 exits: __.ctxl.AsyncExitStack, 

691) -> None: 

692 ''' Renders and prints a result object based on display options. ''' 

693 stream = await display.provide_stream( exits ) 

694 match display.format: 

695 case DisplayFormats.Json: 

696 stream.write( __.json.dumps( result.render_as_json( ) ) ) 

697 stream.write( '\n' ) 

698 case DisplayFormats.Text: 

699 for line in result.render_as_text( ): 

700 stream.write( line ) 

701 stream.write( '\n' )