Coverage for sources/librovore/cli.py: 26%

269 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-17 23:43 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Command-line interface. ''' 

22 

23 

24from . import __ 

25from . import cacheproxy as _cacheproxy 

26from . import exceptions as _exceptions 

27from . import functions as _functions 

28from . import interfaces as _interfaces 

29from . import server as _server 

30from . import state as _state 

31 

32 

33_scribe = __.acquire_scribe( __name__ ) 

34 

35 

36GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[ 

37 __.typx.Optional[ str ], 

38 __.tyro.conf.arg( help = __.access_doctab( 'group by argument' ) ), 

39] 

40IncludeSnippets: __.typx.TypeAlias = __.typx.Annotated[ 

41 bool, 

42 __.tyro.conf.arg( help = __.access_doctab( 'include snippets argument' ) ), 

43] 

44PortArgument: __.typx.TypeAlias = __.typx.Annotated[ 

45 __.typx.Optional[ int ], 

46 __.tyro.conf.arg( help = __.access_doctab( 'server port argument' ) ), 

47] 

48TermArgument: __.typx.TypeAlias = __.typx.Annotated[ 

49 __.tyro.conf.Positional[ str ], 

50 __.tyro.conf.arg( help = __.access_doctab( 'term argument' ) ), 

51] 

52ResultsMax: __.typx.TypeAlias = __.typx.Annotated[ 

53 int, 

54 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ), 

55] 

56LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

57 __.tyro.conf.Positional[ str ], 

58 __.tyro.conf.arg( help = __.access_doctab( 'location argument' ) ), 

59] 

60TransportArgument: __.typx.TypeAlias = __.typx.Annotated[ 

61 __.typx.Optional[ str ], 

62 __.tyro.conf.arg( help = __.access_doctab( 'transport argument' ) ), 

63] 

64 

65 

66_search_behaviors_default = _interfaces.SearchBehaviors( ) 

67_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

68 

69_MARKDOWN_OBJECT_LIMIT = 10 

70_MARKDOWN_CONTENT_LIMIT = 200 

71 

72 

73 

74class _CliCommand( 

75 __.immut.DataclassProtocol, __.typx.Protocol, 

76 decorators = ( __.typx.runtime_checkable, ), 

77): 

78 ''' CLI command. ''' 

79 

80 @__.abc.abstractmethod 

81 async def __call__( 

82 self, 

83 auxdata: _state.Globals, 

84 display: __.DisplayTarget, 

85 display_format: _interfaces.DisplayFormat, 

86 ) -> None: 

87 ''' Executes command with global state. ''' 

88 raise NotImplementedError 

89 

90 

91class DetectCommand( 

92 _CliCommand, decorators = ( __.standard_tyro_class, ), 

93): 

94 ''' Detect which processors can handle a documentation source. ''' 

95 

96 location: LocationArgument 

97 genus: __.typx.Annotated[ 

98 _interfaces.ProcessorGenera, 

99 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ), 

100 ] 

101 processor_name: __.typx.Annotated[ 

102 __.typx.Optional[ str ], 

103 __.tyro.conf.arg( help = "Specific processor to use." ), 

104 ] = None 

105 

106 async def __call__( 

107 self, 

108 auxdata: _state.Globals, 

109 display: __.DisplayTarget, 

110 display_format: _interfaces.DisplayFormat, 

111 ) -> None: 

112 stream = await display.provide_stream( ) 

113 processor_name = ( 

114 self.processor_name if self.processor_name is not None 

115 else __.absent ) 

116 try: 

117 result = await _functions.detect( 

118 auxdata, self.location, self.genus, 

119 processor_name = processor_name ) 

120 except Exception as exc: 

121 _scribe.error( "detect failed: %s", exc ) 

122 print( _format_cli_exception( exc ), file = stream ) 

123 raise SystemExit( 1 ) from None 

124 output = _format_output( result, display_format ) 

125 print( output, file = stream ) 

126 

127 

128class QueryInventoryCommand( 

129 _CliCommand, decorators = ( __.standard_tyro_class, ), 

130): 

131 ''' Searches object inventory by name with fuzzy matching. ''' 

132 

133 location: LocationArgument 

134 term: TermArgument 

135 details: __.typx.Annotated[ 

136 _interfaces.InventoryQueryDetails, 

137 __.tyro.conf.arg( 

138 help = __.access_doctab( 'query details argument' ) ), 

139 ] = _interfaces.InventoryQueryDetails.Documentation 

140 filters: __.typx.Annotated[ 

141 __.cabc.Mapping[ str, __.typx.Any ], 

142 __.tyro.conf.arg( prefix_name = False ), 

143 ] = __.dcls.field( default_factory = lambda: dict( _filters_default ) ) 

144 search_behaviors: __.typx.Annotated[ 

145 _interfaces.SearchBehaviors, 

146 __.tyro.conf.arg( prefix_name = False ), 

147 ] = __.dcls.field( 

148 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

149 results_max: __.typx.Annotated[ 

150 int, 

151 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ), 

152 ] = 5 

153 

154 async def __call__( 

155 self, 

156 auxdata: _state.Globals, 

157 display: __.DisplayTarget, 

158 display_format: _interfaces.DisplayFormat, 

159 ) -> None: 

160 stream = await display.provide_stream( ) 

161 try: 

162 result = await _functions.query_inventory( 

163 auxdata, 

164 self.location, 

165 self.term, 

166 search_behaviors = self.search_behaviors, 

167 filters = self.filters, 

168 results_max = self.results_max, 

169 details = self.details ) 

170 except Exception as exc: 

171 _scribe.error( "query-inventory failed: %s", exc ) 

172 print( _format_cli_exception( exc ), file = stream ) 

173 raise SystemExit( 1 ) from None 

174 output = _format_output( result, display_format ) 

175 print( output, file = stream ) 

176 

177 

178class QueryContentCommand( 

179 _CliCommand, decorators = ( __.standard_tyro_class, ), 

180): 

181 ''' Searches documentation content with relevance ranking and snippets. ''' 

182 

183 location: LocationArgument 

184 term: TermArgument 

185 search_behaviors: __.typx.Annotated[ 

186 _interfaces.SearchBehaviors, 

187 __.tyro.conf.arg( prefix_name = False ), 

188 ] = __.dcls.field( 

189 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

190 filters: __.typx.Annotated[ 

191 __.cabc.Mapping[ str, __.typx.Any ], 

192 __.tyro.conf.arg( prefix_name = False ), 

193 ] = __.dcls.field( default_factory = lambda: dict( _filters_default ) ) 

194 include_snippets: IncludeSnippets = True 

195 results_max: ResultsMax = 10 

196 lines_max: __.typx.Annotated[ 

197 int, 

198 __.tyro.conf.arg( 

199 help = "Maximum number of lines to display per result." ), 

200 ] = 40 

201 

202 async def __call__( 

203 self, 

204 auxdata: _state.Globals, 

205 display: __.DisplayTarget, 

206 display_format: _interfaces.DisplayFormat, 

207 ) -> None: 

208 stream = await display.provide_stream( ) 

209 try: 

210 result = await _functions.query_content( 

211 auxdata, self.location, self.term, 

212 search_behaviors = self.search_behaviors, 

213 filters = self.filters, 

214 results_max = self.results_max, 

215 include_snippets = self.include_snippets ) 

216 except Exception as exc: 

217 _scribe.error( "query-content failed: %s", exc ) 

218 print( _format_cli_exception( exc ), file = stream ) 

219 raise SystemExit( 1 ) from None 

220 # Apply lines_max truncation to content 

221 if 'documents' in result and self.lines_max > 0: 

222 result = _truncate_query_content( result, self.lines_max ) 

223 output = _format_output( result, display_format ) 

224 print( output, file = stream ) 

225 

226 

227class SummarizeInventoryCommand( 

228 _CliCommand, decorators = ( __.standard_tyro_class, ), 

229): 

230 ''' Provides human-readable summary of inventory. ''' 

231 

232 location: LocationArgument 

233 term: TermArgument = '' 

234 filters: __.typx.Annotated[ 

235 __.cabc.Mapping[ str, __.typx.Any ], 

236 __.tyro.conf.arg( prefix_name = False ), 

237 ] = __.dcls.field( default_factory = lambda: dict( _filters_default ) ) 

238 group_by: GroupByArgument = None 

239 search_behaviors: __.typx.Annotated[ 

240 _interfaces.SearchBehaviors, 

241 __.tyro.conf.arg( prefix_name = False ), 

242 ] = __.dcls.field( 

243 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

244 

245 async def __call__( 

246 self, 

247 auxdata: _state.Globals, 

248 display: __.DisplayTarget, 

249 display_format: _interfaces.DisplayFormat, 

250 ) -> None: 

251 stream = await display.provide_stream( ) 

252 result = await _functions.summarize_inventory( 

253 auxdata, self.location, self.term or '', 

254 search_behaviors = self.search_behaviors, 

255 filters = self.filters, 

256 group_by = self.group_by ) 

257 output = _format_output( result, display_format ) 

258 print( output, file = stream ) 

259 

260 

261class SurveyProcessorsCommand( 

262 _CliCommand, decorators = ( __.standard_tyro_class, ), 

263): 

264 ''' List processors for specified genus and their capabilities. ''' 

265 

266 genus: __.typx.Annotated[ 

267 _interfaces.ProcessorGenera, 

268 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ), 

269 ] 

270 name: __.typx.Annotated[ 

271 __.typx.Optional[ str ], 

272 __.tyro.conf.arg( help = "Name of processor to describe" ), 

273 ] = None 

274 

275 async def __call__( 

276 self, 

277 auxdata: _state.Globals, 

278 display: __.DisplayTarget, 

279 display_format: _interfaces.DisplayFormat, 

280 ) -> None: 

281 stream = await display.provide_stream( ) 

282 nomargs: __.NominativeArguments = { 'genus': self.genus } 

283 if self.name is not None: nomargs[ 'name' ] = self.name 

284 try: 

285 result = await _functions.survey_processors( auxdata, **nomargs ) 

286 except Exception as exc: 

287 _scribe.error( "survey-processors failed: %s", exc ) 

288 print( _format_cli_exception( exc ), file = stream ) 

289 raise SystemExit( 1 ) from None 

290 output = _format_output( result, display_format ) 

291 print( output, file = stream ) 

292 

293 

294 

295class ServeCommand( 

296 _CliCommand, decorators = ( __.standard_tyro_class, ), 

297): 

298 ''' Starts MCP server. ''' 

299 

300 port: PortArgument = None 

301 transport: TransportArgument = None 

302 extra_functions: __.typx.Annotated[ 

303 bool, 

304 __.tyro.conf.arg( 

305 help = "Enable extra functions (detect and survey-processors)." ), 

306 ] = False 

307 serve_function: __.typx.Callable[ 

308 [ _state.Globals ], __.cabc.Awaitable[ None ] 

309 ] = _server.serve 

310 async def __call__( 

311 self, 

312 auxdata: _state.Globals, 

313 display: __.DisplayTarget, 

314 display_format: _interfaces.DisplayFormat, 

315 ) -> None: 

316 nomargs: __.NominativeArguments = { } 

317 if self.port is not None: nomargs[ 'port' ] = self.port 

318 if self.transport is not None: nomargs[ 'transport' ] = self.transport 

319 nomargs[ 'extra_functions' ] = self.extra_functions 

320 await self.serve_function( auxdata, **nomargs ) 

321 

322 

323class Cli( __.immut.DataclassObject, decorators = ( __.simple_tyro_class, ) ): 

324 ''' MCP server CLI. ''' 

325 

326 display: __.DisplayTarget 

327 display_format: __.typx.Annotated[ 

328 _interfaces.DisplayFormat, 

329 __.tyro.conf.arg( help = "Output format for command results." ), 

330 ] = _interfaces.DisplayFormat.Markdown 

331 command: __.typx.Union[ 

332 __.typx.Annotated[ 

333 DetectCommand, 

334 __.tyro.conf.subcommand( 'detect', prefix_name = False ), 

335 ], 

336 __.typx.Annotated[ 

337 QueryInventoryCommand, 

338 __.tyro.conf.subcommand( 'query-inventory', prefix_name = False ), 

339 ], 

340 __.typx.Annotated[ 

341 QueryContentCommand, 

342 __.tyro.conf.subcommand( 'query-content', prefix_name = False ), 

343 ], 

344 __.typx.Annotated[ 

345 SummarizeInventoryCommand, 

346 __.tyro.conf.subcommand( 

347 'summarize-inventory', prefix_name = False ), 

348 ], 

349 __.typx.Annotated[ 

350 SurveyProcessorsCommand, 

351 __.tyro.conf.subcommand( 

352 'survey-processors', prefix_name = False ), 

353 ], 

354 __.typx.Annotated[ 

355 ServeCommand, 

356 __.tyro.conf.subcommand( 'serve', prefix_name = False ), 

357 ], 

358 ] 

359 logfile: __.typx.Annotated[ 

360 __.typx.Optional[ str ], 

361 __.ddoc.Doc( ''' Path to log capture file. ''' ), 

362 ] = None 

363 

364 async def __call__( self ): 

365 ''' Invokes command after library preparation. ''' 

366 nomargs = self.prepare_invocation_args( ) 

367 async with __.ctxl.AsyncExitStack( ) as exits: 

368 auxdata = await _prepare( exits = exits, **nomargs ) 

369 from . import xtnsmgr 

370 await xtnsmgr.register_processors( auxdata ) 

371 await self.command( 

372 auxdata = auxdata, 

373 display = self.display, 

374 display_format = self.display_format ) 

375 

376 def prepare_invocation_args( 

377 self, 

378 ) -> __.cabc.Mapping[ str, __.typx.Any ]: 

379 ''' Prepares arguments for initial configuration. ''' 

380 args: dict[ str, __.typx.Any ] = dict( 

381 environment = True, 

382 logfile = self.logfile, 

383 ) 

384 return args 

385 

386 

387def execute( ) -> None: 

388 ''' Entrypoint for CLI execution. ''' 

389 config = ( 

390 __.tyro.conf.HelptextFromCommentsOff, 

391 ) 

392 with __.warnings.catch_warnings( ): 

393 __.warnings.filterwarnings( 

394 'ignore', 

395 message = r'Mutable type .* is used as a default value.*', 

396 category = UserWarning, 

397 module = 'tyro.constructors._struct_spec_dataclass' ) 

398 try: __.asyncio.run( __.tyro.cli( Cli, config = config )( ) ) 

399 except SystemExit: raise 

400 except BaseException as exc: 

401 __.report_exceptions( exc, _scribe ) 

402 raise SystemExit( 1 ) from None 

403 

404 

405def _extract_object_name_and_role( obj: __.typx.Any ) -> tuple[ str, str ]: 

406 ''' Extracts name and role from object, with safe fallbacks. ''' 

407 if not hasattr( obj, 'get' ): 

408 return 'Unknown', 'unknown' 

409 try: 

410 name = getattr( obj, 'get' )( 'name', 'Unknown' ) 

411 except ( AttributeError, TypeError ): 

412 name = 'Unknown' 

413 try: 

414 role = getattr( obj, 'get' )( 'role', 'unknown' ) 

415 except ( AttributeError, TypeError ): 

416 role = 'unknown' 

417 if not isinstance( name, str ): 

418 name = str( name ) if name is not None else 'Unknown' 

419 if not isinstance( role, str ): 

420 role = str( role ) if role is not None else 'unknown' 

421 return name, role 

422 

423 

424def _format_as_markdown( result: __.cabc.Mapping[ str, __.typx.Any ] ) -> str: 

425 ''' Converts structured data to Markdown format. ''' 

426 if 'project' in result and 'version' in result and 'objects' in result: 

427 return _format_inventory_summary_markdown( result ) 

428 if 'documents' in result and 'search_metadata' in result: 

429 return _format_query_result_markdown( result ) 

430 if 'source' in result and 'detections' in result: 

431 return _format_detect_result_markdown( result ) 

432 return __.json.dumps( result, indent = 2 ) 

433 

434 

435def _format_detect_result_markdown( 

436 result: __.cabc.Mapping[ str, __.typx.Any ] 

437) -> str: 

438 ''' Formats detection results as Markdown. ''' 

439 source = result.get( 'source', 'Unknown' ) 

440 optimal = result.get( 'detection_optimal' ) 

441 time_ms = result.get( 'time_detection_ms', 0 ) 

442 lines = [ 

443 "# Detection Results", 

444 f"**Source:** {source}", 

445 f"**Detection Time:** {time_ms}ms", 

446 ] 

447 if optimal: 

448 processor = optimal.get( 'processor', {} ) 

449 confidence = optimal.get( 'confidence', 0 ) 

450 lines.extend([ 

451 "\n## Optimal Processor", 

452 f"- **Name:** {processor.get('name', 'Unknown')}", 

453 f"- **Confidence:** {confidence:.1%}", 

454 ]) 

455 return '\n'.join( lines ) 

456 

457 

458def _format_grouped_objects( 

459 objects_value: __.cabc.Mapping[ str, __.typx.Any ] 

460) -> list[ str ]: 

461 ''' Formats objects grouped by categories. ''' 

462 lines: list[ str ] = [ "\n## Breakdown by Groups" ] 

463 for group_name, group_objects in objects_value.items( ): 

464 if hasattr( group_objects, '__len__' ): 

465 object_count = len( group_objects ) 

466 lines.append( f"- **{group_name}:** {object_count} objects" ) 

467 return lines 

468 

469 

470def _format_inventory_summary_markdown( 

471 result: __.cabc.Mapping[ str, __.typx.Any ] 

472) -> str: 

473 ''' Formats inventory summary as Markdown. ''' 

474 lines = [ 

475 f"# {result[ 'project' ]}", 

476 f"**Version:** {result[ 'version' ]}", 

477 f"**Objects:** {result[ 'objects_count' ]}", 

478 ] 

479 objects_value = result.get( 'objects' ) 

480 if objects_value: 

481 if isinstance( objects_value, dict ): 

482 grouped_objects = __.typx.cast( 

483 __.cabc.Mapping[ str, __.typx.Any ], objects_value ) 

484 lines.extend( _format_grouped_objects( grouped_objects ) ) 

485 else: 

486 lines.extend( _format_object_list( objects_value ) ) 

487 return '\n'.join( lines ) 

488 

489 

490def _format_object_list( objects_value: __.typx.Any ) -> list[ str ]: 

491 ''' Formats a flat list of objects. ''' 

492 lines: list[ str ] = [ ] 

493 if not hasattr( objects_value, '__len__' ): return lines 

494 objects_count = len( objects_value ) 

495 lines.append( f"\n## Objects ({objects_count})" ) 

496 if ( hasattr( objects_value, '__getitem__' ) 

497 and hasattr( objects_value, '__iter__' ) ): 

498 subset_limit = _MARKDOWN_OBJECT_LIMIT 

499 objects_subset = ( 

500 objects_value[ :subset_limit ] 

501 if objects_count > subset_limit else objects_value ) 

502 for obj in objects_subset: 

503 name, role = _extract_object_name_and_role( obj ) 

504 lines.append( f"- `{name}` ({role})" ) 

505 if objects_count > _MARKDOWN_OBJECT_LIMIT: 

506 remaining = objects_count - _MARKDOWN_OBJECT_LIMIT 

507 lines.append( f"- ... and {remaining} more" ) 

508 return lines 

509 

510 

511def _truncate_query_content( 

512 result: __.cabc.Mapping[ str, __.typx.Any ], 

513 lines_max: int, 

514) -> __.cabc.Mapping[ str, __.typx.Any ]: 

515 ''' Truncates content in query results to specified line limit. ''' 

516 truncated_docs: list[ __.cabc.Mapping[ str, __.typx.Any ] ] = [] 

517 for doc in result[ 'documents' ]: 

518 truncated_doc = dict( doc ) 

519 if 'description' in truncated_doc: 

520 lines = truncated_doc[ 'description' ].split( '\n' ) 

521 if len( lines ) > lines_max: 

522 truncated_lines = lines[ :lines_max ] 

523 truncated_lines.append( '...' ) 

524 truncated_doc[ 'description' ] = '\n'.join( truncated_lines ) 

525 truncated_docs.append( truncated_doc ) 

526 result = dict( result ) 

527 result[ 'documents' ] = truncated_docs 

528 return result 

529 

530 

531def _format_output( 

532 result: __.cabc.Mapping[ str, __.typx.Any ], 

533 display_format: _interfaces.DisplayFormat, 

534) -> str: 

535 ''' Formats command output according to display format. ''' 

536 if display_format == _interfaces.DisplayFormat.JSON: 

537 return __.json.dumps( result, indent = 2 ) 

538 if display_format == _interfaces.DisplayFormat.Markdown: 

539 return _format_as_markdown( result ) 

540 raise ValueError 

541 

542 

543def _format_query_result_markdown( 

544 result: __.cabc.Mapping[ str, __.typx.Any ] 

545) -> str: 

546 ''' Formats query results as Markdown. ''' 

547 project = result.get( 'project', 'Unknown' ) 

548 query = result.get( 'query', 'Unknown' ) 

549 documents = result.get( 'documents', [] ) 

550 metadata = result.get( 'search_metadata', {} ) 

551 lines = [ 

552 f"# Query Results: {query}", 

553 f"**Project:** {project}", 

554 f"**Results:** {metadata.get('results_count', 0)}/" 

555 f"{metadata.get('matches_total', 0)}", 

556 ] 

557 if documents: 

558 lines.append( "\n## Documents" ) 

559 for index, doc in enumerate( documents, 1 ): 

560 # Add separator before each result 

561 separator = "\n\n🔍 ── Result {} ─────────────────────── 🔍\n" 

562 lines.append( separator.format( index ) ) 

563 name = doc.get( 'name', 'Unknown' ) 

564 role = doc.get( 'role', 'unknown' ) 

565 lines.append( f"### `{name}`" ) 

566 lines.append( f"- **Type:** {role}" ) 

567 if 'domain' in doc: 

568 lines.append( f"- **Domain:** {doc['domain']}" ) 

569 if 'description' in doc: 

570 lines.append( f"- **Content:** {doc['description']}" ) 

571 lines.append( "" ) 

572 return '\n'.join( lines ) 

573 

574 

575def _format_cli_exception( exc: Exception ) -> str: # noqa: PLR0911 

576 ''' Formats exceptions for user-friendly CLI output. ''' 

577 match exc: 

578 case _exceptions.ProcessorInavailability( ): 

579 return ( 

580 f"❌ No processor found to handle source: {exc.source}\n" 

581 f"💡 Verify this is a Sphinx documentation site" ) 

582 case _exceptions.InventoryInaccessibility( ): 

583 return ( 

584 f"❌ Cannot access documentation inventory: {exc.source}\n" 

585 f"💡 Check URL accessibility and network connection" ) 

586 case _exceptions.DocumentationContentAbsence( ): 

587 return ( 

588 f"❌ Documentation structure not recognized: {exc.url}\n" 

589 f"💡 This may be an unsupported Sphinx theme" ) 

590 case _exceptions.DocumentationObjectAbsence( ): 

591 return ( 

592 f"❌ Object '{exc.object_id}' not found in page: {exc.url}\n" 

593 f"💡 Verify the object name and try a broader search" ) 

594 case _exceptions.InventoryInvalidity( ): 

595 return ( 

596 f"❌ Invalid documentation inventory: {exc.source}\n" 

597 f"💡 The documentation site may be corrupted" ) 

598 case _exceptions.DocumentationInaccessibility( ): 

599 return ( 

600 f"❌ Documentation inaccessible: {exc.url}\n" 

601 f"💡 Check URL accessibility and network connection" ) 

602 case _: 

603 return f"❌ Unexpected error: {exc}" 

604 

605 

606async def _prepare( 

607 environment: __.typx.Annotated[ 

608 bool, 

609 __.ddoc.Doc( ''' Whether to configure environment. ''' ) 

610 ], 

611 exits: __.typx.Annotated[ 

612 __.ctxl.AsyncExitStack, 

613 __.ddoc.Doc( ''' Exit stack for resource management. ''' ) 

614 ], 

615 logfile: __.typx.Annotated[ 

616 __.typx.Optional[ str ], 

617 __.ddoc.Doc( ''' Path to log capture file. ''' ) 

618 ], 

619) -> __.typx.Annotated[ 

620 _state.Globals, 

621 __.ddoc.Doc( ''' Configured global state. ''' ) 

622]: 

623 ''' Configures application based on arguments. ''' 

624 nomargs: __.NominativeArguments = { 

625 'environment': environment, 

626 'exits': exits, 

627 } 

628 if logfile: 

629 logfile_p = __.Path( logfile ).resolve( ) 

630 ( logfile_p.parent ).mkdir( parents = True, exist_ok = True ) 

631 logstream = exits.enter_context( logfile_p.open( 'w' ) ) 

632 inscription = __.appcore.inscription.Control( 

633 level = 'debug', target = logstream ) 

634 nomargs[ 'inscription' ] = inscription 

635 auxdata = await __.appcore.prepare( **nomargs ) 

636 content_cache, probe_cache, robots_cache = _cacheproxy.prepare( auxdata ) 

637 return _state.Globals( 

638 application = auxdata.application, 

639 configuration = auxdata.configuration, 

640 directories = auxdata.directories, 

641 distribution = auxdata.distribution, 

642 exits = auxdata.exits, 

643 content_cache = content_cache, 

644 probe_cache = probe_cache, 

645 robots_cache = robots_cache )