Coverage for sources/librovore/functions.py: 20%

195 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 22:48 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import search as _search 

30from . import state as _state 

31 

32 

33DocumentationResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

34SearchResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

35LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

36 str, __.ddoc.Fname( 'location argument' ) ] 

37 

38 

39_search_behaviors_default = _interfaces.SearchBehaviors( ) 

40_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

41 

42 

43 

44 

45def normalize_location( location: str ) -> str: 

46 ''' Normalizes location URL by stripping index.html. ''' 

47 if location.endswith( '/index.html' ): 

48 location = location[ : -11 ] 

49 return location 

50_SUCCESS_RATE_MINIMUM = 0.1 

51 

52 

53async def detect( 

54 auxdata: _state.Globals, 

55 location: LocationArgument, /, 

56 genus: _interfaces.ProcessorGenera, 

57 processor_name: __.Absential[ str ] = __.absent, 

58) -> dict[ str, __.typx.Any ]: 

59 ''' Detects relevant processors of particular genus for location. ''' 

60 location = normalize_location( location ) 

61 start_time = __.time.perf_counter( ) 

62 detections, detection_optimal = ( 

63 await _detection.access_detections( 

64 auxdata, location, genus = genus ) ) 

65 end_time = __.time.perf_counter( ) 

66 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

67 if __.is_absent( detection_optimal ): 

68 # Create a synthetic exception to get proper error formatting 

69 genus_name = ( 

70 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

71 exc = _exceptions.ProcessorInavailability( genus_name ) 

72 return _produce_processor_error_response( exc, genus = genus ) 

73 response = _processors.DetectionsForLocation( 

74 source = location, 

75 detections = detections, 

76 detection_optimal = detection_optimal, 

77 time_detection_ms = detection_time_ms ) 

78 return { 

79 'success': True, 

80 'data': serialize_for_json( response ), 

81 } 

82 

83 

84async def query_content( # noqa: PLR0913 

85 auxdata: _state.Globals, 

86 location: LocationArgument, 

87 term: str, /, *, 

88 processor_name: __.Absential[ str ] = __.absent, 

89 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

90 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

91 include_snippets: bool = True, 

92 results_max: int = 10, 

93) -> __.typx.Annotated[ 

94 dict[ str, __.typx.Any ], 

95 __.ddoc.Fname( 'content query return' ) ]: 

96 ''' Searches documentation content with relevance ranking. ''' 

97 location = normalize_location( location ) 

98 try: 

99 idetection = await _detection.detect_inventory( 

100 auxdata, location, processor_name = processor_name ) 

101 except _exceptions.ProcessorInavailability as exc: 

102 return _produce_processor_error_response( 

103 exc, genus = _interfaces.ProcessorGenera.Inventory ) 

104 # Resolve URL after detection to get working URL if redirect exists 

105 resolved_location = _detection.resolve_source_url( location ) 

106 objects = await idetection.filter_inventory( 

107 auxdata, resolved_location, 

108 filters = filters, 

109 details = _interfaces.InventoryQueryDetails.Name ) 

110 results = _search.filter_by_name( 

111 objects, term, 

112 match_mode = search_behaviors.match_mode, 

113 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

114 candidates = [ result.object for result in results[ : results_max * 3 ] ] 

115 if not candidates: 

116 return { 

117 'success': True, 

118 'data': { 

119 'source': location, 

120 'query': term, 

121 'search_metadata': { 

122 'results_count': 0, 

123 'results_max': results_max, 

124 }, 

125 'documents': [ ], 

126 }, 

127 } 

128 try: 

129 sdetection = await _detection.detect_structure( 

130 auxdata, location, processor_name = processor_name ) 

131 except _exceptions.ProcessorInavailability as exc: 

132 return _produce_processor_error_response( 

133 exc, genus = _interfaces.ProcessorGenera.Structure ) 

134 contents = await sdetection.extract_contents( 

135 auxdata, location, candidates, include_snippets = include_snippets ) 

136 _validate_extraction_results( 

137 contents, candidates, sdetection.processor.name, location ) 

138 contents_by_relevance = sorted( 

139 contents, 

140 key = lambda x: x.get( 'relevance_score', 0.0 ), 

141 reverse = True ) 

142 contents_ = list( contents_by_relevance[ : results_max ] ) 

143 search_metadata: dict[ str, __.typx.Any ] = { 

144 'results_count': len( contents_ ), 

145 'results_max': results_max, 

146 } 

147 documents = [ 

148 { 

149 'name': result[ 'object_name' ], 

150 'type': result[ 'object_type' ], 

151 'domain': result[ 'domain' ], 

152 'priority': result[ 'priority' ], 

153 'url': result[ 'url' ], 

154 'signature': result[ 'signature' ], 

155 'description': result[ 'description' ], 

156 'content_snippet': result[ 'content_snippet' ], 

157 'relevance_score': result[ 'relevance_score' ], 

158 'match_reasons': result[ 'match_reasons' ] 

159 } 

160 for result in contents_ ] 

161 return { 

162 'success': True, 

163 'data': { 

164 'source': resolved_location, 

165 'query': term, 

166 'search_metadata': search_metadata, 

167 'documents': documents, 

168 }, 

169 } 

170 

171 

172async def query_inventory( # noqa: PLR0913 

173 auxdata: _state.Globals, 

174 location: LocationArgument, 

175 term: str, /, *, 

176 processor_name: __.Absential[ str ] = __.absent, 

177 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

178 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

179 details: _interfaces.InventoryQueryDetails = ( 

180 _interfaces.InventoryQueryDetails.Documentation ), 

181 results_max: int = 5, 

182) -> __.typx.Annotated[ 

183 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory query return' ) ]: 

184 ''' Searches object inventory by name. 

185 

186 Returns configurable detail levels. Always includes object names 

187 plus requested detail flags (signatures, summaries, documentation). 

188 ''' 

189 location = normalize_location( location ) 

190 try: 

191 detection = await _detection.detect_inventory( 

192 auxdata, location, processor_name = processor_name ) 

193 except _exceptions.ProcessorInavailability as exc: 

194 return _produce_processor_error_response( 

195 exc, genus = _interfaces.ProcessorGenera.Inventory ) 

196 # Resolve URL after detection to get working URL if redirect exists 

197 resolved_location = _detection.resolve_source_url( location ) 

198 objects = await detection.filter_inventory( 

199 auxdata, resolved_location, filters = filters, details = details ) 

200 results = _search.filter_by_name( 

201 objects, term, 

202 match_mode = search_behaviors.match_mode, 

203 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

204 selections = [ result.object for result in results[ : results_max ] ] 

205 documents = [ 

206 { 

207 'name': obj[ 'name' ], 

208 'role': obj[ 'role' ], 

209 'domain': obj.get( 'domain', '' ), 

210 'uri': obj[ 'uri' ], 

211 'dispname': obj[ 'dispname' ], 

212 } 

213 for obj in selections ] 

214 search_metadata: dict[ str, __.typx.Any ] = { 

215 'objects_count': len( selections ), 

216 'results_max': results_max, 

217 'matches_total': len( objects ), 

218 } 

219 return { 

220 'success': True, 

221 'data': { 

222 'project': ( 

223 objects[ 0 ].get( '_inventory_project', 'Unknown' ) 

224 if objects else 'Unknown' ), 

225 'version': ( 

226 objects[ 0 ].get( '_inventory_version', 'Unknown' ) 

227 if objects else 'Unknown' ), 

228 'query': term, 

229 'documents': documents, 

230 'search_metadata': search_metadata, 

231 'objects_count': len( selections ), 

232 'source': resolved_location, 

233 }, 

234 } 

235 

236 

237async def summarize_inventory( # noqa: PLR0913 

238 auxdata: _state.Globals, 

239 location: LocationArgument, /, 

240 term: str = '', *, 

241 processor_name: __.Absential[ str ] = __.absent, 

242 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

243 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

244 group_by: __.typx.Optional[ str ] = None, 

245) -> __.typx.Annotated[ 

246 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory summary return' ) ]: 

247 ''' Provides structured summary of inventory data. ''' 

248 details = _interfaces.InventoryQueryDetails.Name 

249 inventory_result = await query_inventory( 

250 auxdata, location, term, processor_name = processor_name, 

251 search_behaviors = search_behaviors, filters = filters, 

252 results_max = 1000, # Large number to get all matches 

253 details = details ) 

254 if not inventory_result[ 'success' ]: 

255 return inventory_result # Forward error response 

256 result_data = inventory_result[ 'data' ] 

257 if group_by is not None: 

258 objects_data = _group_documents_by_field( 

259 result_data[ 'documents' ], group_by ) 

260 else: objects_data = result_data[ 'documents' ] 

261 inventory_data: dict[ str, __.typx.Any ] = { 

262 'project': result_data[ 'project' ], 

263 'version': result_data[ 'version' ], 

264 'objects_count': 

265 result_data[ 'search_metadata' ][ 'matches_total' ], 

266 'objects': objects_data, 

267 } 

268 return { 

269 'success': True, 

270 'data': serialize_for_json( inventory_data ), 

271 } 

272 

273 

274async def survey_processors( 

275 auxdata: _state.Globals, /, 

276 genus: _interfaces.ProcessorGenera, 

277 name: __.typx.Optional[ str ] = None, 

278) -> dict[ str, __.typx.Any ]: 

279 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

280 match genus: 

281 case _interfaces.ProcessorGenera.Inventory: 

282 processors = dict( _processors.inventory_processors ) 

283 case _interfaces.ProcessorGenera.Structure: 

284 processors = dict( _processors.structure_processors ) 

285 if name is not None and name not in processors: 

286 raise _exceptions.ProcessorInavailability( name ) 

287 processors_capabilities = { 

288 name_: serialize_for_json( processor.capabilities ) 

289 for name_, processor in processors.items( ) 

290 if name is None or name_ == name } 

291 return { 'processors': processors_capabilities } 

292 

293 

294def _add_object_metadata_to_results( 

295 selected_objects: list[ dict[ str, __.typx.Any ] ], 

296 result: dict[ str, __.typx.Any ], 

297) -> None: 

298 ''' Adds object metadata without documentation to results. ''' 

299 for obj in selected_objects: 

300 document = _create_document_metadata( obj ) 

301 result[ 'documents' ].append( document ) 

302 

303 

304def _produce_generic_error_response( 

305 exc: _exceptions.ProcessorInavailability 

306) -> dict[ str, __.typx.Any ]: 

307 ''' Produces structured error response for generic processor failures. ''' 

308 return { 

309 'success': False, 

310 'error': { 

311 'type': 'processor_unavailable', 

312 'title': 'No Compatible Processor Found', 

313 'message': ( 

314 'No compatible processor found to handle this ' 

315 'documentation source.' ), 

316 'suggestion': ( 

317 'Verify the URL points to a supported documentation ' 

318 'format.' ), 

319 }, 

320 } 

321 

322 

323def _produce_inventory_error_response( 

324 exc: _exceptions.ProcessorInavailability 

325) -> dict[ str, __.typx.Any ]: 

326 ''' Produces structured error response for inventory failures. ''' 

327 return { 

328 'success': False, 

329 'error': { 

330 'type': 'processor_unavailable', 

331 'title': 'No Compatible Format Detected', 

332 'message': ( 

333 'No compatible inventory format detected at this ' 

334 'documentation source.' ), 

335 'suggestion': ( 

336 'Verify the URL points to a Sphinx documentation site ' 

337 'with objects.inv file.' ), 

338 }, 

339 } 

340 

341 

342def _produce_processor_error_response( 

343 exc: _exceptions.ProcessorInavailability, 

344 genus: __.Absential[ _interfaces.ProcessorGenera ] = __.absent, 

345) -> dict[ str, __.typx.Any ]: 

346 ''' Produces appropriate structured error response based on genus. ''' 

347 if __.is_absent( genus ): 

348 return _produce_generic_error_response( exc ) 

349 match genus: 

350 case _interfaces.ProcessorGenera.Inventory: 

351 return _produce_inventory_error_response( exc ) 

352 case _interfaces.ProcessorGenera.Structure: 

353 return _produce_structure_error_response( exc ) 

354 case _: 

355 return _produce_generic_error_response( exc ) 

356 

357 

358def _produce_structure_error_response( 

359 exc: _exceptions.ProcessorInavailability 

360) -> dict[ str, __.typx.Any ]: 

361 ''' Produces structured error response for structure failures. ''' 

362 return { 

363 'success': False, 

364 'error': { 

365 'type': 'processor_unavailable', 

366 'title': 'No Compatible Structure Processor', 

367 'message': ( 

368 'No compatible structure processor found for this ' 

369 'documentation source.' ), 

370 'suggestion': ( 

371 'Ensure the site uses a supported documentation format ' 

372 'like Sphinx or MkDocs.' ), 

373 }, 

374 } 

375 

376 

377def _construct_explore_result_structure( # noqa: PLR0913 

378 inventory_data: dict[ str, __.typx.Any ], 

379 query: str, 

380 selected_objects: list[ dict[ str, __.typx.Any ] ], 

381 results_max: int, 

382 search_behaviors: _interfaces.SearchBehaviors, 

383 filters: __.cabc.Mapping[ str, __.typx.Any ], 

384) -> dict[ str, __.typx.Any ]: 

385 ''' Builds the base result structure with metadata. ''' 

386 search_metadata: dict[ str, __.typx.Any ] = { 

387 'objects_count': len( selected_objects ), 

388 'results_max': results_max, 

389 'matches_total': inventory_data[ 'objects_count' ], 

390 } 

391 result: dict[ str, __.typx.Any ] = { 

392 'project': inventory_data[ 'project' ], 

393 'version': inventory_data[ 'version' ], 

394 'query': query, 

395 'search_metadata': search_metadata, 

396 'documents': [ ], 

397 } 

398 return result 

399 

400 

401def _construct_query_result_structure( # noqa: PLR0913 

402 source: str, 

403 query: str, 

404 raw_results: list[ __.cabc.Mapping[ str, __.typx.Any ] ], 

405 results_max: int, 

406 search_behaviors: _interfaces.SearchBehaviors, 

407 filters: __.cabc.Mapping[ str, __.typx.Any ], 

408) -> dict[ str, __.typx.Any ]: 

409 ''' Builds query result structure in explore format. ''' 

410 search_metadata: dict[ str, __.typx.Any ] = { 

411 'results_count': len( raw_results ), 

412 'results_max': results_max, 

413 } 

414 documents: list[ dict[ str, __.typx.Any ] ] = [ ] 

415 for raw_result in raw_results: 

416 result_dict = dict( raw_result ) 

417 document: dict[ str, __.typx.Any ] = { 

418 'name': result_dict[ 'object_name' ], 

419 'type': result_dict[ 'object_type' ], 

420 'domain': result_dict[ 'domain' ], 

421 'priority': result_dict[ 'priority' ], 

422 'url': result_dict[ 'url' ], 

423 'signature': result_dict[ 'signature' ], 

424 'description': result_dict[ 'description' ], 

425 'content_snippet': result_dict[ 'content_snippet' ], 

426 'relevance_score': result_dict[ 'relevance_score' ], 

427 'match_reasons': result_dict[ 'match_reasons' ] 

428 } 

429 documents.append( document ) 

430 result: dict[ str, __.typx.Any ] = { 

431 'source': source, 

432 'query': query, 

433 'search_metadata': search_metadata, 

434 'documents': documents, 

435 } 

436 return result 

437 

438 

439def _create_document_with_docs( 

440 obj: dict[ str, __.typx.Any ], 

441 doc_result: __.cabc.Mapping[ str, __.typx.Any ], 

442) -> dict[ str, __.typx.Any ]: 

443 ''' Creates document structure with documentation content. ''' 

444 document = _create_document_metadata( obj ) 

445 document[ 'documentation' ] = doc_result 

446 return document 

447 

448 

449def _create_document_metadata( 

450 obj: dict[ str, __.typx.Any ] 

451) -> dict[ str, __.typx.Any ]: 

452 ''' Creates base document structure from object metadata. ''' 

453 document = { 

454 'name': obj[ 'name' ], 

455 'role': obj[ 'role' ], 

456 'domain': obj.get( 'domain', '' ), 

457 'uri': obj[ 'uri' ], 

458 'dispname': obj[ 'dispname' ], 

459 } 

460 if 'fuzzy_score' in obj: 

461 document[ 'fuzzy_score' ] = obj[ 'fuzzy_score' ] 

462 return document 

463 

464 

465def _format_inventory_summary( 

466 inventory_data: dict[ str, __.typx.Any ] 

467) -> str: 

468 ''' Formats inventory data into human-readable summary. ''' 

469 summary_lines: list[ str ] = [ 

470 f"Project: {inventory_data[ 'project' ]}", 

471 f"Version: {inventory_data[ 'version' ]}", 

472 f"Objects: {inventory_data[ 'objects_count' ]}", 

473 ] 

474 if inventory_data[ 'objects' ]: 

475 if isinstance( inventory_data[ 'objects' ], dict ): 

476 summary_lines.append( "\nBreakdown by groups:" ) 

477 grouped_objects = __.typx.cast( 

478 dict[ str, __.typx.Any ], inventory_data[ 'objects' ] ) 

479 for group_name, objects in grouped_objects.items( ): 

480 object_count = len( objects ) 

481 summary_lines.append( 

482 f" {group_name}: {object_count} objects" ) 

483 else: 

484 objects = inventory_data[ 'objects' ] 

485 summary_lines.append( "\nObjects listed without grouping." ) 

486 return '\n'.join( summary_lines ) 

487 

488 

489def _group_documents_by_field( 

490 documents: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ], 

491 field: __.typx.Optional[ str ] 

492) -> __.immut.Dictionary[ 

493 str, tuple[ __.cabc.Mapping[ str, __.typx.Any ], ... ] 

494]: 

495 ''' Groups documents by specified field for inventory format. ''' 

496 if field is None: return __.immut.Dictionary( ) 

497 groups: dict[ str, list[ __.cabc.Mapping[ str, __.typx.Any ] ] ] = { } 

498 for doc in documents: 

499 raw_value = doc.get( field, f"(missing {field})" ) 

500 if isinstance( raw_value, list ): 

501 str_value = "[list]" 

502 elif isinstance( raw_value, dict ): 

503 str_value = "[dict]" 

504 elif raw_value is None or raw_value == '': 

505 str_value = f"(missing {field})" 

506 else: 

507 str_value = str( raw_value ) 

508 if str_value not in groups: groups[ str_value ] = [ ] 

509 obj_data = { 

510 'name': doc[ 'name' ], 

511 'role': doc[ 'role' ], 

512 'domain': doc.get( 'domain', '' ), 

513 'uri': doc[ 'uri' ], 

514 'dispname': doc[ 'dispname' ], 

515 } 

516 if 'fuzzy_score' in doc: 

517 obj_data[ 'fuzzy_score' ] = doc[ 'fuzzy_score' ] 

518 obj = __.immut.Dictionary( obj_data ) 

519 groups[ str_value ].append( obj ) 

520 return __.immut.Dictionary( 

521 ( key, tuple( items ) ) for key, items in groups.items( ) ) 

522 

523 

524def serialize_for_json( obj: __.typx.Any ) -> __.typx.Any: 

525 ''' Recursively serializes dataclass objects to JSON-compatible format. ''' 

526 if __.dcls.is_dataclass( obj ): 

527 result = { } # type: ignore[var-annotated] 

528 for field in __.dcls.fields( obj ): 

529 if field.name.startswith( '_' ): 

530 continue # Skip private/internal fields 

531 value = getattr( obj, field.name ) 

532 result[ field.name ] = serialize_for_json( value ) 

533 return result # type: ignore[return-value] 

534 if isinstance( obj, ( list, tuple ) ): 

535 return [ serialize_for_json( item ) for item in obj ] # type: ignore[misc] 

536 if isinstance( obj, ( frozenset, set ) ): 

537 return list( obj ) # type: ignore[arg-type] 

538 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary) 

539 return { k: serialize_for_json( v ) for k, v in obj.items( ) } 

540 if obj is None or isinstance( obj, ( str, int, float, bool ) ): 

541 return obj 

542 # For other objects, try to convert to string 

543 return str( obj ) 

544 

545 

546def _select_top_objects( 

547 inventory_data: dict[ str, __.typx.Any ], 

548 results_max: int 

549) -> list[ dict[ str, __.typx.Any ] ]: 

550 ''' Selects top objects from inventory, sorted by fuzzy score. ''' 

551 all_objects: list[ dict[ str, __.typx.Any ] ] = [ ] 

552 for domain_objects in inventory_data[ 'objects' ].values( ): 

553 all_objects.extend( domain_objects ) 

554 all_objects.sort( 

555 key = lambda obj: obj.get( 'fuzzy_score', 0 ), 

556 reverse = True ) 

557 return all_objects[ : results_max ] 

558 

559 

560def _validate_extraction_results( 

561 results: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ], 

562 requested_objects: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ], 

563 processor_name: str, 

564 source: str 

565) -> None: 

566 ''' Validates that extraction results contain meaningful content. ''' 

567 if not requested_objects: return 

568 if not results: 

569 raise _exceptions.StructureIncompatibility( processor_name, source ) 

570 meaningful_results = 0 

571 for result in results: 

572 signature = result.get( 'signature', '' ).strip( ) 

573 description = result.get( 'description', '' ).strip( ) 

574 if signature or description: meaningful_results += 1 

575 success_rate = meaningful_results / len( requested_objects ) 

576 if success_rate < _SUCCESS_RATE_MINIMUM: 

577 raise _exceptions.ContentExtractFailure( 

578 processor_name, source, meaningful_results, 

579 len( requested_objects ) )