Coverage for sources/librovore/cli.py: 48%

142 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 22:09 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Command-line interface. ''' 

22 

23 

24import appcore.cli as _appcore_cli 

25 

26from . import __ 

27from . import cacheproxy as _cacheproxy 

28from . import exceptions as _exceptions 

29from . import functions as _functions 

30from . import interfaces as _interfaces 

31from . import results as _results 

32from . import server as _server 

33from . import state as _state 

34 

35 

36_scribe = __.acquire_scribe( __name__ ) 

37 

38 

39def intercept_errors( ) -> __.cabc.Callable[ 

40 [ __.cabc.Callable[ 

41 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] ] ], 

42 __.cabc.Callable[ 

43 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] ] 

44]: 

45 ''' Decorator for CLI handlers to intercept exceptions. 

46 

47 Catches Omnierror exceptions and renders them appropriately. 

48 Other exceptions are logged and formatted simply. 

49 ''' 

50 def decorator( 

51 function: __.cabc.Callable[ 

52 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] ] 

53 ) -> __.cabc.Callable[ 

54 ..., __.typx.Coroutine[ __.typx.Any, __.typx.Any, None ] 

55 ]: 

56 @__.funct.wraps( function ) 

57 async def wrapper( 

58 self: __.typx.Any, 

59 auxdata: _state.Globals, 

60 *posargs: __.typx.Any, 

61 **nomargs: __.typx.Any, 

62 ) -> None: 

63 if not isinstance( # pragma: no cover 

64 auxdata, _state.Globals 

65 ): raise _exceptions.ContextInvalidity 

66 stream = await auxdata.display.provide_stream( auxdata.exits ) 

67 try: return await function( self, auxdata, *posargs, **nomargs ) 

68 except _exceptions.Omnierror as exc: 

69 match auxdata.display.format: 

70 case _interfaces.DisplayFormat.JSON: 

71 serialized = dict( exc.render_as_json( ) ) 

72 error_message = __.json.dumps( serialized, indent = 2 ) 

73 case _interfaces.DisplayFormat.Markdown: 

74 lines = exc.render_as_markdown( ) 

75 error_message = '\n'.join( lines ) 

76 print( error_message, file = stream ) 

77 raise SystemExit( 1 ) from None 

78 except Exception as exc: 

79 _scribe.error( f"{function.__name__} failed: %s", exc ) 

80 match auxdata.display.format: 

81 case _interfaces.DisplayFormat.JSON: 

82 error_data = { 

83 "type": "unexpected_error", 

84 "title": "Unexpected Error", 

85 "message": str( exc ), 

86 "suggestion": ( 

87 "Please report this issue if it persists." ), 

88 } 

89 error_message = __.json.dumps( error_data, indent = 2 ) 

90 case _interfaces.DisplayFormat.Markdown: 

91 error_message = f"❌ Unexpected error: {exc}" 

92 print( error_message, file = stream ) 

93 raise SystemExit( 1 ) from None 

94 

95 return wrapper 

96 return decorator 

97 

98 

99GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[ 

100 __.typx.Optional[ str ], 

101 __.tyro.conf.arg( help = __.access_doctab( 'group by argument' ) ), 

102] 

103PortArgument: __.typx.TypeAlias = __.typx.Annotated[ 

104 __.typx.Optional[ int ], 

105 __.tyro.conf.arg( help = __.access_doctab( 'server port argument' ) ), 

106] 

107TermArgument: __.typx.TypeAlias = __.typx.Annotated[ 

108 __.tyro.conf.Positional[ str ], 

109 __.tyro.conf.arg( help = __.access_doctab( 'term argument' ) ), 

110] 

111ResultsMax: __.typx.TypeAlias = __.typx.Annotated[ 

112 int, 

113 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ), 

114] 

115LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

116 __.tyro.conf.Positional[ str ], 

117 __.tyro.conf.arg( help = __.access_doctab( 'location argument' ) ), 

118] 

119TransportArgument: __.typx.TypeAlias = __.typx.Annotated[ 

120 __.typx.Optional[ str ], 

121 __.tyro.conf.arg( help = __.access_doctab( 'transport argument' ) ), 

122] 

123 

124 

125_search_behaviors_default = _interfaces.SearchBehaviors( ) 

126 

127_MARKDOWN_OBJECT_LIMIT = 10 

128_MARKDOWN_CONTENT_LIMIT = 200 

129 

130 

131class DetectCommand( 

132 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

133): 

134 ''' Detect which processors can handle a documentation source. ''' 

135 

136 location: LocationArgument 

137 genus: __.typx.Annotated[ 

138 _interfaces.ProcessorGenera, 

139 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ), 

140 ] 

141 processor_name: __.typx.Annotated[ 

142 __.typx.Optional[ str ], 

143 __.tyro.conf.arg( help = "Specific processor to use." ), 

144 ] = None 

145 

146 @intercept_errors( ) 

147 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

148 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

149 raise _exceptions.ContextInvalidity 

150 processor_name = ( 

151 self.processor_name if self.processor_name is not None 

152 else __.absent ) 

153 result = await _functions.detect( 

154 auxdata, self.location, self.genus, 

155 processor_name = processor_name ) 

156 await _render_and_print_result( 

157 result, auxdata.display, auxdata.exits, reveal_internals = False ) 

158 

159 

160class QueryInventoryCommand( 

161 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

162): 

163 ''' Explores documentation structure and object inventory. 

164 

165 Use before content searches to: 

166 

167 - Discover available topics and object types 

168 - Identify relevant search terms and filters 

169 - Understand documentation scope and organization 

170 ''' 

171 

172 location: LocationArgument 

173 term: TermArgument 

174 filters: __.typx.Annotated[ 

175 __.cabc.Sequence[ str ], 

176 __.tyro.conf.arg( prefix_name = False ), 

177 ] = ( ) 

178 search_behaviors: __.typx.Annotated[ 

179 _interfaces.SearchBehaviors, 

180 __.tyro.conf.arg( prefix_name = False ), 

181 ] = __.dcls.field( 

182 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

183 results_max: __.typx.Annotated[ 

184 int, 

185 __.tyro.conf.arg( help = __.access_doctab( 'results max argument' ) ), 

186 ] = 5 

187 reveal_internals: __.typx.Annotated[ 

188 bool, 

189 __.tyro.conf.arg( 

190 help = ( 

191 "Show internal implementation details (domain, priority, " 

192 "project, version)." ) ), 

193 ] = False 

194 

195 @intercept_errors( ) 

196 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

197 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

198 raise _exceptions.ContextInvalidity 

199 result = await _functions.query_inventory( 

200 auxdata, 

201 self.location, 

202 self.term, 

203 search_behaviors = self.search_behaviors, 

204 filters = _filters_to_dictionary( self.filters ), 

205 results_max = self.results_max ) 

206 await _render_and_print_result( 

207 result, auxdata.display, auxdata.exits, 

208 reveal_internals = self.reveal_internals ) 

209 

210 

211class QueryContentCommand( 

212 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

213): 

214 ''' Searches documentation with flexible preview/extraction modes. 

215 

216 Workflows: 

217 

218 - Sample: Use --lines-max 5-10 to preview results and identify relevant 

219 content 

220 - Extract: Use --content-id from sample results to retrieve full 

221 content 

222 - Direct: Search with higher --lines-max for immediate full results 

223 ''' 

224 

225 location: LocationArgument 

226 term: TermArgument 

227 search_behaviors: __.typx.Annotated[ 

228 _interfaces.SearchBehaviors, 

229 __.tyro.conf.arg( prefix_name = False ), 

230 ] = __.dcls.field( 

231 default_factory = lambda: _interfaces.SearchBehaviors( ) ) 

232 filters: __.typx.Annotated[ 

233 __.cabc.Sequence[ str ], 

234 __.tyro.conf.arg( prefix_name = False ), 

235 ] = ( ) 

236 results_max: ResultsMax = 10 

237 lines_max: __.typx.Annotated[ 

238 int, 

239 __.tyro.conf.arg( 

240 help = ( 

241 "Lines per result for preview/sampling. Use 5-10 for " 

242 "discovery, omit for full content extraction via " 

243 "content-id." ) ), 

244 ] = 40 

245 content_id: __.typx.Annotated[ 

246 __.typx.Optional[ str ], 

247 __.tyro.conf.arg( 

248 help = ( 

249 "Extract full content for specific result. Obtain IDs from " 

250 "previous query-content calls with limited lines-max." ) ), 

251 ] = None 

252 reveal_internals: __.typx.Annotated[ 

253 bool, 

254 __.tyro.conf.arg( 

255 help = ( 

256 "Show internal implementation details (domain, priority, " 

257 "project, version)." ) ), 

258 ] = False 

259 @intercept_errors( ) 

260 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

261 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

262 raise _exceptions.ContextInvalidity 

263 content_id_ = ( 

264 __.absent if self.content_id is None else self.content_id ) 

265 result = await _functions.query_content( 

266 auxdata, self.location, self.term, 

267 search_behaviors = self.search_behaviors, 

268 filters = _filters_to_dictionary( self.filters ), 

269 content_id = content_id_, 

270 results_max = self.results_max, 

271 lines_max = self.lines_max ) 

272 await _render_and_print_result( 

273 result, auxdata.display, auxdata.exits, 

274 reveal_internals = self.reveal_internals, 

275 lines_max = self.lines_max ) 

276 

277 

278class SurveyProcessorsCommand( 

279 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

280): 

281 ''' List processors for specified genus and their capabilities. ''' 

282 

283 genus: __.typx.Annotated[ 

284 _interfaces.ProcessorGenera, 

285 __.tyro.conf.arg( help = "Processor genus (inventory or structure)." ), 

286 ] 

287 name: __.typx.Annotated[ 

288 __.typx.Optional[ str ], 

289 __.tyro.conf.arg( help = "Name of processor to describe" ), 

290 ] = None 

291 

292 @intercept_errors( ) 

293 async def execute( self, auxdata: __.Globals ) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 

294 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

295 raise _exceptions.ContextInvalidity 

296 nomargs: __.NominativeArguments = { 'genus': self.genus } 

297 if self.name is not None: nomargs[ 'name' ] = self.name 

298 result = await _functions.survey_processors( auxdata, **nomargs ) 

299 await _render_and_print_result( 

300 result, auxdata.display, auxdata.exits, reveal_internals = False ) 

301 

302 

303class ServeCommand( 

304 _appcore_cli.Command, decorators = ( __.standard_tyro_class, ) 

305): 

306 ''' Starts MCP server. ''' 

307 

308 port: PortArgument = None 

309 transport: TransportArgument = None 

310 extra_functions: __.typx.Annotated[ 

311 bool, 

312 __.tyro.conf.arg( 

313 help = "Enable extra functions (detect and survey-processors)." ), 

314 ] = False 

315 serve_function: __.typx.Callable[ 

316 [ _state.Globals ], __.cabc.Awaitable[ None ] 

317 ] = _server.serve 

318 async def execute( self, auxdata: __.Globals ) -> None: 

319 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

320 raise _exceptions.ContextInvalidity 

321 nomargs: __.NominativeArguments = { } 

322 if self.port is not None: nomargs[ 'port' ] = self.port 

323 if self.transport is not None: nomargs[ 'transport' ] = self.transport 

324 nomargs[ 'extra_functions' ] = self.extra_functions 

325 await self.serve_function( auxdata, **nomargs ) 

326 

327 

328class Cli( _appcore_cli.Application ): 

329 ''' MCP server CLI. ''' 

330 

331 display: _state.DisplayOptions = __.dcls.field( 

332 default_factory = _state.DisplayOptions ) 

333 command: __.typx.Union[ 

334 __.typx.Annotated[ 

335 DetectCommand, 

336 __.tyro.conf.subcommand( 'detect', prefix_name = False ), 

337 ], 

338 __.typx.Annotated[ 

339 QueryInventoryCommand, 

340 __.tyro.conf.subcommand( 'query-inventory', prefix_name = False ), 

341 ], 

342 __.typx.Annotated[ 

343 QueryContentCommand, 

344 __.tyro.conf.subcommand( 'query-content', prefix_name = False ), 

345 ], 

346 __.typx.Annotated[ 

347 SurveyProcessorsCommand, 

348 __.tyro.conf.subcommand( 

349 'survey-processors', prefix_name = False ), 

350 ], 

351 __.typx.Annotated[ 

352 ServeCommand, 

353 __.tyro.conf.subcommand( 'serve', prefix_name = False ), 

354 ], 

355 ] 

356 

357 async def execute( self, auxdata: __.Globals ) -> None: 

358 ''' Executes command with extension registration. ''' 

359 if not isinstance( auxdata, _state.Globals ): # pragma: no cover 

360 raise _exceptions.ContextInvalidity 

361 from . import xtnsmgr 

362 await xtnsmgr.register_processors( auxdata ) 

363 await self.command( auxdata ) 

364 

365 async def prepare( 

366 self, exits: __.ctxl.AsyncExitStack 

367 ) -> _state.Globals: 

368 ''' Prepares librovore-specific global state with cache proxies. ''' 

369 auxdata_base = await super( ).prepare( exits ) 

370 content_cache, probe_cache, robots_cache = _cacheproxy.prepare( 

371 auxdata_base ) 

372 nomargs = { 

373 field.name: getattr( auxdata_base, field.name ) 

374 for field in __.dcls.fields( auxdata_base ) 

375 if not field.name.startswith( '_' ) } 

376 return _state.Globals( 

377 display = self.display, 

378 content_cache = content_cache, 

379 probe_cache = probe_cache, 

380 robots_cache = robots_cache, 

381 **nomargs ) 

382 

383 

384def execute( ) -> None: 

385 ''' Entrypoint for CLI execution. ''' 

386 config = ( 

387 __.tyro.conf.HelptextFromCommentsOff, 

388 ) 

389 with __.warnings.catch_warnings( ): 

390 __.warnings.filterwarnings( 

391 'ignore', 

392 message = r'Mutable type .* is used as a default value.*', 

393 category = UserWarning, 

394 module = 'tyro.constructors._struct_spec_dataclass' ) 

395 try: __.asyncio.run( __.tyro.cli( Cli, config = config )( ) ) 

396 except SystemExit: raise 

397 except BaseException as exc: 

398 __.report_exceptions( exc, _scribe ) 

399 raise SystemExit( 1 ) from None 

400 

401 

402def _filters_to_dictionary( 

403 filters: __.cabc.Sequence[ str ] 

404) -> dict[ str, str ]: 

405 return dict( map( lambda s: s.split( '=' ), filters ) ) 

406 

407 

408async def _render_and_print_result( 

409 result: _results.ResultBase, 

410 display: _state.DisplayOptions, 

411 exits: __.ctxl.AsyncExitStack, 

412 **nomargs: __.typx.Any 

413) -> None: 

414 ''' Centralizes result rendering logic with Rich formatting support. ''' 

415 stream = await display.provide_stream( exits ) 

416 match display.format: 

417 case _interfaces.DisplayFormat.JSON: 

418 nomargs_filtered = { 

419 key: value for key, value in nomargs.items() 

420 if key in [ 'lines_max', 'reveal_internals' ] 

421 } 

422 serialized = dict( result.render_as_json( **nomargs_filtered ) ) 

423 output = __.json.dumps( serialized, indent = 2 ) 

424 print( output, file = stream ) 

425 case _interfaces.DisplayFormat.Markdown: 

426 lines = result.render_as_markdown( **nomargs ) 

427 if display.determine_colorization( stream ): 

428 from rich.console import Console 

429 from rich.markdown import Markdown 

430 console = Console( file = stream, force_terminal = True ) 

431 markdown_obj = Markdown( '\n'.join( lines ) ) 

432 console.print( markdown_obj ) 

433 else: 

434 output = '\n'.join( lines ) 

435 print( output, file = stream )