Coverage for sources/librovore/cli.py: 49%

144 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 18:40 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Command-line interface. ''' 

22 

23 

24import appcore.cli as _appcore_cli 

25 

26from . import __ 

27from . import cacheproxy as _cacheproxy 

28from . import exceptions as _exceptions 

29from . import functions as _functions 

30from . import interfaces as _interfaces 

31from . import results as _results 

32from . import server as _server 

33from . import state as _state 

34 

35 

36_scribe = __.acquire_scribe( __name__ ) 

37 

38 

39def intercept_errors( ) -> __.cabc.Callable[ 

40 [ __.cabc.Callable[ 

41 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] ] ], 

42 __.cabc.Callable[ 

43 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] ] 

44]: 

45 ''' Decorator for CLI handlers to intercept exceptions. 

46 

47 Catches Omnierror exceptions and renders them appropriately. 

48 Other exceptions are logged and formatted simply. 

49 ''' 

50 def decorator( 

51 function: __.cabc.Callable[ 

52 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] ] 

53 ) -> __.cabc.Callable[ 

54 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] 

55 ]: 

56 @__.funct.wraps( function ) 

57 async def wrapper( 

58 self: __.typx.Any, 

59 auxdata: _state.Globals, 

60 *posargs: __.typx.Any, 

61 **nomargs: __.typx.Any, 

62 ) -> None: 

63 if not isinstance( # pragma: no cover 

64 auxdata, _state.Globals 

65 ): raise _exceptions.ContextInvalidity 

66 stream = await auxdata.display.provide_stream( auxdata.exits ) 

67 try: return await function( self, auxdata, *posargs, **nomargs ) 

68 except _exceptions.Omnierror as exc: 

69 match auxdata.display.format: 

70 case _interfaces.DisplayFormat.JSON: 

71 serialized = dict( exc.render_as_json( ) ) 

72 error_message = __.json.dumps( serialized, indent = 2 ) 

73 case _interfaces.DisplayFormat.Markdown: 

74 lines = exc.render_as_markdown( ) 

75 error_message = '\n'.join( lines ) 

76 print( error_message, file = stream ) 

77 raise SystemExit( 1 ) from None 

78 except Exception as exc: 

79 _scribe.error( f"{function.__name__} failed: %s", exc ) 

80 match auxdata.display.format: 

81 case _interfaces.DisplayFormat.JSON: 

82 error_data = { 

83 "type": "unexpected_error", 

84 "title": "Unexpected Error", 

85 "message": str( exc ), 

86 "suggestion": ( 

87 "Please report this issue if it persists." ), 

88 } 

89 error_message = __.json.dumps( error_data, indent = 2 ) 

90 case _interfaces.DisplayFormat.Markdown: 

91 error_message = f"❌ Unexpected error: {exc}" 

92 print( error_message, file = stream ) 

93 raise SystemExit( 1 ) from None 

94 

95 return wrapper 

96 return decorator 

97 

98 

99GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[ 

100 __.typx.Optional[ str ], 

101 __.tyro.conf.arg( help = __.access_doctab( 'group by argument' ) ), 

102] 

103PortArgument: __.typx.TypeAlias = __.typx.Annotated[ 

104 __.typx.Optional[ int ], 

105 __.tyro.conf.arg( help = __.access_doctab( 'server port argument' ) ), 

106] 

107TermArgument: __.typx.TypeAlias = __.typx.Annotated[ 

108 __.tyro.conf.Positional[ str ], 

109 __.tyro.conf.arg( help = __.access_doctab( 'term argument' ) ), 

110] 

111ResultsMax: __.typx.TypeAlias = __.typx.Annotated[ 

112 int, 

113 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ), 

114] 

115LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

116 __.tyro.conf.Positional[ str ], 

117 __.tyro.conf.arg( help = __.access_doctab( 'location argument' ) ), 

118] 

119TransportArgument: __.typx.TypeAlias = __.typx.Annotated[ 

120 __.typx.Optional[ str ], 

121 __.tyro.conf.arg( help = __.access_doctab( 'transport argument' ) ), 

122] 

123 

124 

125_search_behaviors_default = _interfaces.SearchBehaviors( ) 

126 

127_MARKDOWN_OBJECT_LIMIT = 10 

128_MARKDOWN_CONTENT_LIMIT = 200 

129 

130 

131class DetectCommand( 

132 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

133): 

134 ''' Detect which processors can handle a documentation source. ''' 

135 

136 location: LocationArgument 

137 genus: __.typx.Annotated[ 

138 _interfaces.ProcessorGenera, 

139 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ), 

140 ] 

141 processor_name: __.typx.Annotated[ 

142 __.typx.Optional[ str ], 

143 __.tyro.conf.arg( help = "Specific processor to use." ), 

144 ] = None 

145 

146 @intercept_errors( ) 

147 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

148 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

149 raise _exceptions.ContextInvalidity 

150 processor_name = ( 

151 self.processor_name if self.processor_name is not None 

152 else __.absent ) 

153 result = await _functions.detect( 

154 auxdata, self.location, self.genus, 

155 processor_name = processor_name ) 

156 await _render_and_print_result( 

157 result, auxdata.display, auxdata.exits, reveal_internals = False ) 

158 

159 

160class QueryInventoryCommand( 

161 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

162): 

163 ''' Explores documentation structure and object inventory. 

164 

165 Use before content searches to: 

166 

167 - Discover available topics and object types 

168 - Identify relevant search terms and filters 

169 - Understand documentation scope and organization 

170 ''' 

171 

172 location: LocationArgument 

173 term: TermArgument 

174 filters: __.typx.Annotated[ 

175 __.cabc.Sequence[ str ], 

176 __.tyro.conf.arg( prefix_name = False ), 

177 ] = ( ) 

178 search_behaviors: __.typx.Annotated[ 

179 _interfaces.SearchBehaviors, 

180 __.tyro.conf.arg( prefix_name = False ), 

181 ] = __.dcls.field( 

182 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

183 results_max: __.typx.Annotated[ 

184 int, 

185 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ), 

186 ] = 5 

187 summarize: __.typx.Annotated[ 

188 bool, 

189 __.tyro.conf.arg( 

190 help = ( 

191 "Show distribution summary instead of full object list." ) ), 

192 ] = False 

193 group_by: __.typx.Annotated[ 

194 __.cabc.Sequence[ str ], 

195 __.tyro.conf.arg( 

196 prefix_name = False, 

197 help = ( 

198 "Grouping dimensions for summary. Uses processor's supported " 

199 "filters if not specified." ) ), 

200 ] = ( ) 

201 reveal_internals: __.typx.Annotated[ 

202 bool, 

203 __.tyro.conf.arg( 

204 help = ( 

205 "Show internal implementation details (domain, priority, " 

206 "project, version)." ) ), 

207 ] = False 

208 

209 @intercept_errors( ) 

210 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

211 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

212 raise _exceptions.ContextInvalidity 

213 result = await _functions.query_inventory( 

214 auxdata, 

215 self.location, 

216 self.term, 

217 search_behaviors = self.search_behaviors, 

218 filters = _filters_to_dictionary( self.filters ), 

219 results_max = self.results_max ) 

220 await _render_and_print_result( 

221 result, auxdata.display, auxdata.exits, 

222 reveal_internals = self.reveal_internals, 

223 summarize = self.summarize, 

224 group_by = self.group_by ) 

225 

226 

227class QueryContentCommand( 

228 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

229): 

230 ''' Searches documentation with flexible preview/extraction modes. 

231 

232 Workflows: 

233 

234 - Sample: Use --lines-max 5-10 to preview results and identify relevant 

235 content 

236 - Extract: Use --content-id from sample results to retrieve full 

237 content 

238 - Direct: Search with higher --lines-max for immediate full results 

239 ''' 

240 

241 location: LocationArgument 

242 term: TermArgument 

243 search_behaviors: __.typx.Annotated[ 

244 _interfaces.SearchBehaviors, 

245 __.tyro.conf.arg( prefix_name = False ), 

246 ] = __.dcls.field( 

247 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

248 filters: __.typx.Annotated[ 

249 __.cabc.Sequence[ str ], 

250 __.tyro.conf.arg( prefix_name = False ), 

251 ] = ( ) 

252 results_max: ResultsMax = 10 

253 lines_max: __.typx.Annotated[ 

254 int, 

255 __.tyro.conf.arg( 

256 help = ( 

257 "Lines per result for preview/sampling. Use 5-10 for " 

258 "discovery, omit for full content extraction via " 

259 "content-id." ) ), 

260 ] = 40 

261 content_id: __.typx.Annotated[ 

262 __.typx.Optional[ str ], 

263 __.tyro.conf.arg( 

264 help = ( 

265 "Extract full content for specific result. Obtain IDs from " 

266 "previous query-content calls with limited lines-max." ) ), 

267 ] = None 

268 reveal_internals: __.typx.Annotated[ 

269 bool, 

270 __.tyro.conf.arg( 

271 help = ( 

272 "Show internal implementation details (domain, priority, " 

273 "project, version)." ) ), 

274 ] = False 

275 @intercept_errors( ) 

276 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

277 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

278 raise _exceptions.ContextInvalidity 

279 content_id_ = ( 

280 __.absent if self.content_id is None else self.content_id ) 

281 result = await _functions.query_content( 

282 auxdata, self.location, self.term, 

283 search_behaviors = self.search_behaviors, 

284 filters = _filters_to_dictionary( self.filters ), 

285 content_id = content_id_, 

286 results_max = self.results_max, 

287 lines_max = self.lines_max ) 

288 await _render_and_print_result( 

289 result, auxdata.display, auxdata.exits, 

290 reveal_internals = self.reveal_internals, 

291 lines_max = self.lines_max ) 

292 

293 

294class SurveyProcessorsCommand( 

295 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

296): 

297 ''' List processors for specified genus and their capabilities. ''' 

298 

299 genus: __.typx.Annotated[ 

300 _interfaces.ProcessorGenera, 

301 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ), 

302 ] 

303 name: __.typx.Annotated[ 

304 __.typx.Optional[ str ], 

305 __.tyro.conf.arg( help = "Name of processor to describe" ), 

306 ] = None 

307 

308 @intercept_errors( ) 

309 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

310 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

311 raise _exceptions.ContextInvalidity 

312 nomargs: __.NominativeArguments = { 'genus': self.genus } 

313 if self.name is not None: nomargs[ 'name' ] = self.name 

314 result = await _functions.survey_processors( auxdata, **nomargs ) 

315 await _render_and_print_result( 

316 result, auxdata.display, auxdata.exits, reveal_internals = False ) 

317 

318 

319class ServeCommand( 

320 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

321): 

322 ''' Starts MCP server. ''' 

323 

324 port: PortArgument = None 

325 transport: TransportArgument = None 

326 extra_functions: __.typx.Annotated[ 

327 bool, 

328 __.tyro.conf.arg( 

329 help = "Enable extra functions (detect and survey-processors)." ), 

330 ] = False 

331 serve_function: __.typx.Callable[ 

332 [ _state.Globals ], __.cabc.Awaitable[ None ] 

333 ] = _server.serve 

334 async def execute( self, auxdata: __.Globals ) -> None: 

335 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

336 raise _exceptions.ContextInvalidity 

337 nomargs: __.NominativeArguments = { } 

338 if self.port is not None: nomargs[ 'port' ] = self.port 

339 if self.transport is not None: nomargs[ 'transport' ] = self.transport 

340 nomargs[ 'extra_functions' ] = self.extra_functions 

341 await self.serve_function( auxdata, **nomargs ) 

342 

343 

344class Cli( _appcore_cli.Application ): 

345 ''' MCP server CLI. ''' 

346 

347 display: _state.DisplayOptions = __.dcls.field( 

348 default_factory = _state.DisplayOptions ) 

349 command: __.typx.Union[ 

350 __.typx.Annotated[ 

351 DetectCommand, 

352 __.tyro.conf.subcommand( 'detect', prefix_name = False ), 

353 ], 

354 __.typx.Annotated[ 

355 QueryInventoryCommand, 

356 __.tyro.conf.subcommand( 'query-inventory', prefix_name = False ), 

357 ], 

358 __.typx.Annotated[ 

359 QueryContentCommand, 

360 __.tyro.conf.subcommand( 'query-content', prefix_name = False ), 

361 ], 

362 __.typx.Annotated[ 

363 SurveyProcessorsCommand, 

364 __.tyro.conf.subcommand( 

365 'survey-processors', prefix_name = False ), 

366 ], 

367 __.typx.Annotated[ 

368 ServeCommand, 

369 __.tyro.conf.subcommand( 'serve', prefix_name = False ), 

370 ], 

371 ] 

372 

373 async def execute( self, auxdata: __.Globals ) -> None: 

374 ''' Executes command with extension registration. ''' 

375 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

376 raise _exceptions.ContextInvalidity 

377 from . import xtnsmgr 

378 await xtnsmgr.register_processors( auxdata ) 

379 await self.command( auxdata ) 

380 

381 async def prepare( 

382 self, exits: __.ctxl.AsyncExitStack 

383 ) -> _state.Globals: 

384 ''' Prepares librovore-specific global state with cache proxies. ''' 

385 auxdata_base = await super( ).prepare( exits ) 

386 content_cache, probe_cache, robots_cache = _cacheproxy.prepare( 

387 auxdata_base ) 

388 nomargs = { 

389 field.name: getattr( auxdata_base, field.name ) 

390 for field in __.dcls.fields( auxdata_base ) 

391 if not field.name.startswith( '_' ) } 

392 return _state.Globals( 

393 display = self.display, 

394 content_cache = content_cache, 

395 probe_cache = probe_cache, 

396 robots_cache = robots_cache, 

397 **nomargs ) 

398 

399 

400def execute( ) -> None: 

401 ''' Entrypoint for CLI execution. ''' 

402 config = ( 

403 __.tyro.conf.HelptextFromCommentsOff, 

404 ) 

405 with __.warnings.catch_warnings( ): 

406 __.warnings.filterwarnings( 

407 'ignore', 

408 message = r'Mutable type .* is used as a default value.*', 

409 category = UserWarning, 

410 module = 'tyro.constructors._struct_spec_dataclass' ) 

411 try: __.asyncio.run( __.tyro.cli( Cli, config = config )( ) ) 

412 except SystemExit: raise 

413 except BaseException as exc: 

414 __.report_exceptions( exc, _scribe ) 

415 raise SystemExit( 1 ) from None 

416 

417 

418def _filters_to_dictionary( 

419 filters: __.cabc.Sequence[ str ] 

420) -> dict[ str, str ]: 

421 return dict( map( lambda s: s.split( '=' ), filters ) ) 

422 

423 

424async def _render_and_print_result( 

425 result: _results.ResultBase, 

426 display: _state.DisplayOptions, 

427 exits: __.ctxl.AsyncExitStack, 

428 **nomargs: __.typx.Any 

429) -> None: 

430 ''' Centralizes result rendering logic with Rich formatting support. ''' 

431 stream = await display.provide_stream( exits ) 

432 match display.format: 

433 case _interfaces.DisplayFormat.JSON: 

434 nomargs_filtered = { 

435 key: value for key, value in nomargs.items() 

436 if key in [ 

437 'lines_max', 'reveal_internals', 'summarize', 'group_by' ] 

438 } 

439 serialized = dict( result.render_as_json( **nomargs_filtered ) ) 

440 output = __.json.dumps( serialized, indent = 2 ) 

441 print( output, file = stream ) 

442 case _interfaces.DisplayFormat.Markdown: 

443 lines = result.render_as_markdown( **nomargs ) 

444 if display.determine_colorization( stream ): 

445 from rich.console import Console 

446 from rich.markdown import Markdown 

447 console = Console( file = stream, force_terminal = True ) 

448 markdown_obj = Markdown( '\n'.join( lines ) ) 

449 console.print( markdown_obj ) 

450 else: 

451 output = '\n'.join( lines ) 

452 print( output, file = stream )