Coverage for sources/librovore/cli.py: 26%
269 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-17 23:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-17 23:43 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Command-line interface. '''
24from . import __
25from . import cacheproxy as _cacheproxy
26from . import exceptions as _exceptions
27from . import functions as _functions
28from . import interfaces as _interfaces
29from . import server as _server
30from . import state as _state
33_scribe = __.acquire_scribe( __name__ )
36GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[
37 __.typx.Optional[ str ],
38 __.tyro.conf.arg( help = __.access_doctab( 'group by argument' ) ),
39]
40IncludeSnippets: __.typx.TypeAlias = __.typx.Annotated[
41 bool,
42 __.tyro.conf.arg( help = __.access_doctab( 'include snippets argument' ) ),
43]
44PortArgument: __.typx.TypeAlias = __.typx.Annotated[
45 __.typx.Optional[ int ],
46 __.tyro.conf.arg( help = __.access_doctab( 'server port argument' ) ),
47]
48TermArgument: __.typx.TypeAlias = __.typx.Annotated[
49 __.tyro.conf.Positional[ str ],
50 __.tyro.conf.arg( help = __.access_doctab( 'term argument' ) ),
51]
52ResultsMax: __.typx.TypeAlias = __.typx.Annotated[
53 int,
54 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ),
55]
56LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
57 __.tyro.conf.Positional[ str ],
58 __.tyro.conf.arg( help = __.access_doctab( 'location argument' ) ),
59]
60TransportArgument: __.typx.TypeAlias = __.typx.Annotated[
61 __.typx.Optional[ str ],
62 __.tyro.conf.arg( help = __.access_doctab( 'transport argument' ) ),
63]
66_search_behaviors_default = _interfaces.SearchBehaviors( )
67_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
69_MARKDOWN_OBJECT_LIMIT = 10
70_MARKDOWN_CONTENT_LIMIT = 200
74class _CliCommand(
75 __.immut.DataclassProtocol, __.typx.Protocol,
76 decorators = ( __.typx.runtime_checkable, ),
77):
78 ''' CLI command. '''
80 @__.abc.abstractmethod
81 async def __call__(
82 self,
83 auxdata: _state.Globals,
84 display: __.DisplayTarget,
85 display_format: _interfaces.DisplayFormat,
86 ) -> None:
87 ''' Executes command with global state. '''
88 raise NotImplementedError
91class DetectCommand(
92 _CliCommand, decorators = ( __.standard_tyro_class, ),
93):
94 ''' Detect which processors can handle a documentation source. '''
96 location: LocationArgument
97 genus: __.typx.Annotated[
98 _interfaces.ProcessorGenera,
99 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ),
100 ]
101 processor_name: __.typx.Annotated[
102 __.typx.Optional[ str ],
103 __.tyro.conf.arg( help = "Specific processor to use." ),
104 ] = None
106 async def __call__(
107 self,
108 auxdata: _state.Globals,
109 display: __.DisplayTarget,
110 display_format: _interfaces.DisplayFormat,
111 ) -> None:
112 stream = await display.provide_stream( )
113 processor_name = (
114 self.processor_name if self.processor_name is not None
115 else __.absent )
116 try:
117 result = await _functions.detect(
118 auxdata, self.location, self.genus,
119 processor_name = processor_name )
120 except Exception as exc:
121 _scribe.error( "detect failed: %s", exc )
122 print( _format_cli_exception( exc ), file = stream )
123 raise SystemExit( 1 ) from None
124 output = _format_output( result, display_format )
125 print( output, file = stream )
128class QueryInventoryCommand(
129 _CliCommand, decorators = ( __.standard_tyro_class, ),
130):
131 ''' Searches object inventory by name with fuzzy matching. '''
133 location: LocationArgument
134 term: TermArgument
135 details: __.typx.Annotated[
136 _interfaces.InventoryQueryDetails,
137 __.tyro.conf.arg(
138 help = __.access_doctab( 'query details argument' ) ),
139 ] = _interfaces.InventoryQueryDetails.Documentation
140 filters: __.typx.Annotated[
141 __.cabc.Mapping[ str, __.typx.Any ],
142 __.tyro.conf.arg( prefix_name = False ),
143 ] = __.dcls.field( default_factory = lambda: dict( _filters_default ) )
144 search_behaviors: __.typx.Annotated[
145 _interfaces.SearchBehaviors,
146 __.tyro.conf.arg( prefix_name = False ),
147 ] = __.dcls.field(
148 default_factory = lambda: _interfaces.SearchBehaviors( ) )
149 results_max: __.typx.Annotated[
150 int,
151 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ),
152 ] = 5
154 async def __call__(
155 self,
156 auxdata: _state.Globals,
157 display: __.DisplayTarget,
158 display_format: _interfaces.DisplayFormat,
159 ) -> None:
160 stream = await display.provide_stream( )
161 try:
162 result = await _functions.query_inventory(
163 auxdata,
164 self.location,
165 self.term,
166 search_behaviors = self.search_behaviors,
167 filters = self.filters,
168 results_max = self.results_max,
169 details = self.details )
170 except Exception as exc:
171 _scribe.error( "query-inventory failed: %s", exc )
172 print( _format_cli_exception( exc ), file = stream )
173 raise SystemExit( 1 ) from None
174 output = _format_output( result, display_format )
175 print( output, file = stream )
178class QueryContentCommand(
179 _CliCommand, decorators = ( __.standard_tyro_class, ),
180):
181 ''' Searches documentation content with relevance ranking and snippets. '''
183 location: LocationArgument
184 term: TermArgument
185 search_behaviors: __.typx.Annotated[
186 _interfaces.SearchBehaviors,
187 __.tyro.conf.arg( prefix_name = False ),
188 ] = __.dcls.field(
189 default_factory = lambda: _interfaces.SearchBehaviors( ) )
190 filters: __.typx.Annotated[
191 __.cabc.Mapping[ str, __.typx.Any ],
192 __.tyro.conf.arg( prefix_name = False ),
193 ] = __.dcls.field( default_factory = lambda: dict( _filters_default ) )
194 include_snippets: IncludeSnippets = True
195 results_max: ResultsMax = 10
196 lines_max: __.typx.Annotated[
197 int,
198 __.tyro.conf.arg(
199 help = "Maximum number of lines to display per result." ),
200 ] = 40
202 async def __call__(
203 self,
204 auxdata: _state.Globals,
205 display: __.DisplayTarget,
206 display_format: _interfaces.DisplayFormat,
207 ) -> None:
208 stream = await display.provide_stream( )
209 try:
210 result = await _functions.query_content(
211 auxdata, self.location, self.term,
212 search_behaviors = self.search_behaviors,
213 filters = self.filters,
214 results_max = self.results_max,
215 include_snippets = self.include_snippets )
216 except Exception as exc:
217 _scribe.error( "query-content failed: %s", exc )
218 print( _format_cli_exception( exc ), file = stream )
219 raise SystemExit( 1 ) from None
220 # Apply lines_max truncation to content
221 if 'documents' in result and self.lines_max > 0:
222 result = _truncate_query_content( result, self.lines_max )
223 output = _format_output( result, display_format )
224 print( output, file = stream )
227class SummarizeInventoryCommand(
228 _CliCommand, decorators = ( __.standard_tyro_class, ),
229):
230 ''' Provides human-readable summary of inventory. '''
232 location: LocationArgument
233 term: TermArgument = ''
234 filters: __.typx.Annotated[
235 __.cabc.Mapping[ str, __.typx.Any ],
236 __.tyro.conf.arg( prefix_name = False ),
237 ] = __.dcls.field( default_factory = lambda: dict( _filters_default ) )
238 group_by: GroupByArgument = None
239 search_behaviors: __.typx.Annotated[
240 _interfaces.SearchBehaviors,
241 __.tyro.conf.arg( prefix_name = False ),
242 ] = __.dcls.field(
243 default_factory = lambda: _interfaces.SearchBehaviors( ) )
245 async def __call__(
246 self,
247 auxdata: _state.Globals,
248 display: __.DisplayTarget,
249 display_format: _interfaces.DisplayFormat,
250 ) -> None:
251 stream = await display.provide_stream( )
252 result = await _functions.summarize_inventory(
253 auxdata, self.location, self.term or '',
254 search_behaviors = self.search_behaviors,
255 filters = self.filters,
256 group_by = self.group_by )
257 output = _format_output( result, display_format )
258 print( output, file = stream )
261class SurveyProcessorsCommand(
262 _CliCommand, decorators = ( __.standard_tyro_class, ),
263):
264 ''' List processors for specified genus and their capabilities. '''
266 genus: __.typx.Annotated[
267 _interfaces.ProcessorGenera,
268 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ),
269 ]
270 name: __.typx.Annotated[
271 __.typx.Optional[ str ],
272 __.tyro.conf.arg( help = "Name of processor to describe" ),
273 ] = None
275 async def __call__(
276 self,
277 auxdata: _state.Globals,
278 display: __.DisplayTarget,
279 display_format: _interfaces.DisplayFormat,
280 ) -> None:
281 stream = await display.provide_stream( )
282 nomargs: __.NominativeArguments = { 'genus': self.genus }
283 if self.name is not None: nomargs[ 'name' ] = self.name
284 try:
285 result = await _functions.survey_processors( auxdata, **nomargs )
286 except Exception as exc:
287 _scribe.error( "survey-processors failed: %s", exc )
288 print( _format_cli_exception( exc ), file = stream )
289 raise SystemExit( 1 ) from None
290 output = _format_output( result, display_format )
291 print( output, file = stream )
295class ServeCommand(
296 _CliCommand, decorators = ( __.standard_tyro_class, ),
297):
298 ''' Starts MCP server. '''
300 port: PortArgument = None
301 transport: TransportArgument = None
302 extra_functions: __.typx.Annotated[
303 bool,
304 __.tyro.conf.arg(
305 help = "Enable extra functions (detect and survey-processors)." ),
306 ] = False
307 serve_function: __.typx.Callable[
308 [ _state.Globals ], __.cabc.Awaitable[ None ]
309 ] = _server.serve
310 async def __call__(
311 self,
312 auxdata: _state.Globals,
313 display: __.DisplayTarget,
314 display_format: _interfaces.DisplayFormat,
315 ) -> None:
316 nomargs: __.NominativeArguments = { }
317 if self.port is not None: nomargs[ 'port' ] = self.port
318 if self.transport is not None: nomargs[ 'transport' ] = self.transport
319 nomargs[ 'extra_functions' ] = self.extra_functions
320 await self.serve_function( auxdata, **nomargs )
323class Cli( __.immut.DataclassObject, decorators = ( __.simple_tyro_class, ) ):
324 ''' MCP server CLI. '''
326 display: __.DisplayTarget
327 display_format: __.typx.Annotated[
328 _interfaces.DisplayFormat,
329 __.tyro.conf.arg( help = "Output format for command results." ),
330 ] = _interfaces.DisplayFormat.Markdown
331 command: __.typx.Union[
332 __.typx.Annotated[
333 DetectCommand,
334 __.tyro.conf.subcommand( 'detect', prefix_name = False ),
335 ],
336 __.typx.Annotated[
337 QueryInventoryCommand,
338 __.tyro.conf.subcommand( 'query-inventory', prefix_name = False ),
339 ],
340 __.typx.Annotated[
341 QueryContentCommand,
342 __.tyro.conf.subcommand( 'query-content', prefix_name = False ),
343 ],
344 __.typx.Annotated[
345 SummarizeInventoryCommand,
346 __.tyro.conf.subcommand(
347 'summarize-inventory', prefix_name = False ),
348 ],
349 __.typx.Annotated[
350 SurveyProcessorsCommand,
351 __.tyro.conf.subcommand(
352 'survey-processors', prefix_name = False ),
353 ],
354 __.typx.Annotated[
355 ServeCommand,
356 __.tyro.conf.subcommand( 'serve', prefix_name = False ),
357 ],
358 ]
359 logfile: __.typx.Annotated[
360 __.typx.Optional[ str ],
361 __.ddoc.Doc( ''' Path to log capture file. ''' ),
362 ] = None
364 async def __call__( self ):
365 ''' Invokes command after library preparation. '''
366 nomargs = self.prepare_invocation_args( )
367 async with __.ctxl.AsyncExitStack( ) as exits:
368 auxdata = await _prepare( exits = exits, **nomargs )
369 from . import xtnsmgr
370 await xtnsmgr.register_processors( auxdata )
371 await self.command(
372 auxdata = auxdata,
373 display = self.display,
374 display_format = self.display_format )
376 def prepare_invocation_args(
377 self,
378 ) -> __.cabc.Mapping[ str, __.typx.Any ]:
379 ''' Prepares arguments for initial configuration. '''
380 args: dict[ str, __.typx.Any ] = dict(
381 environment = True,
382 logfile = self.logfile,
383 )
384 return args
387def execute( ) -> None:
388 ''' Entrypoint for CLI execution. '''
389 config = (
390 __.tyro.conf.HelptextFromCommentsOff,
391 )
392 with __.warnings.catch_warnings( ):
393 __.warnings.filterwarnings(
394 'ignore',
395 message = r'Mutable type .* is used as a default value.*',
396 category = UserWarning,
397 module = 'tyro.constructors._struct_spec_dataclass' )
398 try: __.asyncio.run( __.tyro.cli( Cli, config = config )( ) )
399 except SystemExit: raise
400 except BaseException as exc:
401 __.report_exceptions( exc, _scribe )
402 raise SystemExit( 1 ) from None
405def _extract_object_name_and_role( obj: __.typx.Any ) -> tuple[ str, str ]:
406 ''' Extracts name and role from object, with safe fallbacks. '''
407 if not hasattr( obj, 'get' ):
408 return 'Unknown', 'unknown'
409 try:
410 name = getattr( obj, 'get' )( 'name', 'Unknown' )
411 except ( AttributeError, TypeError ):
412 name = 'Unknown'
413 try:
414 role = getattr( obj, 'get' )( 'role', 'unknown' )
415 except ( AttributeError, TypeError ):
416 role = 'unknown'
417 if not isinstance( name, str ):
418 name = str( name ) if name is not None else 'Unknown'
419 if not isinstance( role, str ):
420 role = str( role ) if role is not None else 'unknown'
421 return name, role
424def _format_as_markdown( result: __.cabc.Mapping[ str, __.typx.Any ] ) -> str:
425 ''' Converts structured data to Markdown format. '''
426 if 'project' in result and 'version' in result and 'objects' in result:
427 return _format_inventory_summary_markdown( result )
428 if 'documents' in result and 'search_metadata' in result:
429 return _format_query_result_markdown( result )
430 if 'source' in result and 'detections' in result:
431 return _format_detect_result_markdown( result )
432 return __.json.dumps( result, indent = 2 )
435def _format_detect_result_markdown(
436 result: __.cabc.Mapping[ str, __.typx.Any ]
437) -> str:
438 ''' Formats detection results as Markdown. '''
439 source = result.get( 'source', 'Unknown' )
440 optimal = result.get( 'detection_optimal' )
441 time_ms = result.get( 'time_detection_ms', 0 )
442 lines = [
443 "# Detection Results",
444 f"**Source:** {source}",
445 f"**Detection Time:** {time_ms}ms",
446 ]
447 if optimal:
448 processor = optimal.get( 'processor', {} )
449 confidence = optimal.get( 'confidence', 0 )
450 lines.extend([
451 "\n## Optimal Processor",
452 f"- **Name:** {processor.get('name', 'Unknown')}",
453 f"- **Confidence:** {confidence:.1%}",
454 ])
455 return '\n'.join( lines )
458def _format_grouped_objects(
459 objects_value: __.cabc.Mapping[ str, __.typx.Any ]
460) -> list[ str ]:
461 ''' Formats objects grouped by categories. '''
462 lines: list[ str ] = [ "\n## Breakdown by Groups" ]
463 for group_name, group_objects in objects_value.items( ):
464 if hasattr( group_objects, '__len__' ):
465 object_count = len( group_objects )
466 lines.append( f"- **{group_name}:** {object_count} objects" )
467 return lines
470def _format_inventory_summary_markdown(
471 result: __.cabc.Mapping[ str, __.typx.Any ]
472) -> str:
473 ''' Formats inventory summary as Markdown. '''
474 lines = [
475 f"# {result[ 'project' ]}",
476 f"**Version:** {result[ 'version' ]}",
477 f"**Objects:** {result[ 'objects_count' ]}",
478 ]
479 objects_value = result.get( 'objects' )
480 if objects_value:
481 if isinstance( objects_value, dict ):
482 grouped_objects = __.typx.cast(
483 __.cabc.Mapping[ str, __.typx.Any ], objects_value )
484 lines.extend( _format_grouped_objects( grouped_objects ) )
485 else:
486 lines.extend( _format_object_list( objects_value ) )
487 return '\n'.join( lines )
490def _format_object_list( objects_value: __.typx.Any ) -> list[ str ]:
491 ''' Formats a flat list of objects. '''
492 lines: list[ str ] = [ ]
493 if not hasattr( objects_value, '__len__' ): return lines
494 objects_count = len( objects_value )
495 lines.append( f"\n## Objects ({objects_count})" )
496 if ( hasattr( objects_value, '__getitem__' )
497 and hasattr( objects_value, '__iter__' ) ):
498 subset_limit = _MARKDOWN_OBJECT_LIMIT
499 objects_subset = (
500 objects_value[ :subset_limit ]
501 if objects_count > subset_limit else objects_value )
502 for obj in objects_subset:
503 name, role = _extract_object_name_and_role( obj )
504 lines.append( f"- `{name}` ({role})" )
505 if objects_count > _MARKDOWN_OBJECT_LIMIT:
506 remaining = objects_count - _MARKDOWN_OBJECT_LIMIT
507 lines.append( f"- ... and {remaining} more" )
508 return lines
511def _truncate_query_content(
512 result: __.cabc.Mapping[ str, __.typx.Any ],
513 lines_max: int,
514) -> __.cabc.Mapping[ str, __.typx.Any ]:
515 ''' Truncates content in query results to specified line limit. '''
516 truncated_docs: list[ __.cabc.Mapping[ str, __.typx.Any ] ] = []
517 for doc in result[ 'documents' ]:
518 truncated_doc = dict( doc )
519 if 'description' in truncated_doc:
520 lines = truncated_doc[ 'description' ].split( '\n' )
521 if len( lines ) > lines_max:
522 truncated_lines = lines[ :lines_max ]
523 truncated_lines.append( '...' )
524 truncated_doc[ 'description' ] = '\n'.join( truncated_lines )
525 truncated_docs.append( truncated_doc )
526 result = dict( result )
527 result[ 'documents' ] = truncated_docs
528 return result
531def _format_output(
532 result: __.cabc.Mapping[ str, __.typx.Any ],
533 display_format: _interfaces.DisplayFormat,
534) -> str:
535 ''' Formats command output according to display format. '''
536 if display_format == _interfaces.DisplayFormat.JSON:
537 return __.json.dumps( result, indent = 2 )
538 if display_format == _interfaces.DisplayFormat.Markdown:
539 return _format_as_markdown( result )
540 raise ValueError
543def _format_query_result_markdown(
544 result: __.cabc.Mapping[ str, __.typx.Any ]
545) -> str:
546 ''' Formats query results as Markdown. '''
547 project = result.get( 'project', 'Unknown' )
548 query = result.get( 'query', 'Unknown' )
549 documents = result.get( 'documents', [] )
550 metadata = result.get( 'search_metadata', {} )
551 lines = [
552 f"# Query Results: {query}",
553 f"**Project:** {project}",
554 f"**Results:** {metadata.get('results_count', 0)}/"
555 f"{metadata.get('matches_total', 0)}",
556 ]
557 if documents:
558 lines.append( "\n## Documents" )
559 for index, doc in enumerate( documents, 1 ):
560 # Add separator before each result
561 separator = "\n\n🔍 ── Result {} ─────────────────────── 🔍\n"
562 lines.append( separator.format( index ) )
563 name = doc.get( 'name', 'Unknown' )
564 role = doc.get( 'role', 'unknown' )
565 lines.append( f"### `{name}`" )
566 lines.append( f"- **Type:** {role}" )
567 if 'domain' in doc:
568 lines.append( f"- **Domain:** {doc['domain']}" )
569 if 'description' in doc:
570 lines.append( f"- **Content:** {doc['description']}" )
571 lines.append( "" )
572 return '\n'.join( lines )
575def _format_cli_exception( exc: Exception ) -> str: # noqa: PLR0911
576 ''' Formats exceptions for user-friendly CLI output. '''
577 match exc:
578 case _exceptions.ProcessorInavailability( ):
579 return (
580 f"❌ No processor found to handle source: {exc.source}\n"
581 f"💡 Verify this is a Sphinx documentation site" )
582 case _exceptions.InventoryInaccessibility( ):
583 return (
584 f"❌ Cannot access documentation inventory: {exc.source}\n"
585 f"💡 Check URL accessibility and network connection" )
586 case _exceptions.DocumentationContentAbsence( ):
587 return (
588 f"❌ Documentation structure not recognized: {exc.url}\n"
589 f"💡 This may be an unsupported Sphinx theme" )
590 case _exceptions.DocumentationObjectAbsence( ):
591 return (
592 f"❌ Object '{exc.object_id}' not found in page: {exc.url}\n"
593 f"💡 Verify the object name and try a broader search" )
594 case _exceptions.InventoryInvalidity( ):
595 return (
596 f"❌ Invalid documentation inventory: {exc.source}\n"
597 f"💡 The documentation site may be corrupted" )
598 case _exceptions.DocumentationInaccessibility( ):
599 return (
600 f"❌ Documentation inaccessible: {exc.url}\n"
601 f"💡 Check URL accessibility and network connection" )
602 case _:
603 return f"❌ Unexpected error: {exc}"
606async def _prepare(
607 environment: __.typx.Annotated[
608 bool,
609 __.ddoc.Doc( ''' Whether to configure environment. ''' )
610 ],
611 exits: __.typx.Annotated[
612 __.ctxl.AsyncExitStack,
613 __.ddoc.Doc( ''' Exit stack for resource management. ''' )
614 ],
615 logfile: __.typx.Annotated[
616 __.typx.Optional[ str ],
617 __.ddoc.Doc( ''' Path to log capture file. ''' )
618 ],
619) -> __.typx.Annotated[
620 _state.Globals,
621 __.ddoc.Doc( ''' Configured global state. ''' )
622]:
623 ''' Configures application based on arguments. '''
624 nomargs: __.NominativeArguments = {
625 'environment': environment,
626 'exits': exits,
627 }
628 if logfile:
629 logfile_p = __.Path( logfile ).resolve( )
630 ( logfile_p.parent ).mkdir( parents = True, exist_ok = True )
631 logstream = exits.enter_context( logfile_p.open( 'w' ) )
632 inscription = __.appcore.inscription.Control(
633 level = 'debug', target = logstream )
634 nomargs[ 'inscription' ] = inscription
635 auxdata = await __.appcore.prepare( **nomargs )
636 content_cache, probe_cache, robots_cache = _cacheproxy.prepare( auxdata )
637 return _state.Globals(
638 application = auxdata.application,
639 configuration = auxdata.configuration,
640 directories = auxdata.directories,
641 distribution = auxdata.distribution,
642 exits = auxdata.exits,
643 content_cache = content_cache,
644 probe_cache = probe_cache,
645 robots_cache = robots_cache )