Coverage for sources/librovore/functions.py: 22%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-17 23:43 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import search as _search 

30from . import state as _state 

31 

32 

33DocumentationResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

34SearchResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

35LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

36 str, __.ddoc.Fname( 'location argument' ) ] 

37 

38 

39_search_behaviors_default = _interfaces.SearchBehaviors( ) 

40_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

41 

42 

43def normalize_location( location: str ) -> str: 

44 ''' Normalizes location URL by stripping index.html. ''' 

45 if location.endswith( '/index.html' ): 

46 location = location[ : -11 ] 

47 return location 

48_SUCCESS_RATE_MINIMUM = 0.1 

49 

50 

51async def detect( 

52 auxdata: _state.Globals, 

53 location: LocationArgument, /, 

54 genus: _interfaces.ProcessorGenera, 

55 processor_name: __.Absential[ str ] = __.absent, 

56) -> dict[ str, __.typx.Any ]: 

57 ''' Detects relevant processors of particular genus for location. ''' 

58 location = normalize_location( location ) 

59 start_time = __.time.perf_counter( ) 

60 detections, detection_optimal = ( 

61 await _detection.access_detections( 

62 auxdata, location, genus = genus ) ) 

63 end_time = __.time.perf_counter( ) 

64 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

65 response = _processors.DetectionsForLocation( 

66 source = location, 

67 detections = detections, 

68 detection_optimal = ( 

69 None if __.is_absent( detection_optimal ) else detection_optimal ), 

70 time_detection_ms = detection_time_ms ) 

71 return _serialize_dataclass( response ) 

72 

73 

74async def query_content( # noqa: PLR0913 

75 auxdata: _state.Globals, 

76 location: LocationArgument, 

77 term: str, /, *, 

78 processor_name: __.Absential[ str ] = __.absent, 

79 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

80 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

81 include_snippets: bool = True, 

82 results_max: int = 10, 

83) -> __.typx.Annotated[ 

84 dict[ str, __.typx.Any ], 

85 __.ddoc.Fname( 'content query return' ) ]: 

86 ''' Searches documentation content with relevance ranking. ''' 

87 location = normalize_location( location ) 

88 idetection = await _detection.detect_inventory( 

89 auxdata, location, processor_name = processor_name ) 

90 objects = await idetection.filter_inventory( 

91 auxdata, location, 

92 filters = filters, 

93 details = _interfaces.InventoryQueryDetails.Name ) 

94 results = _search.filter_by_name( 

95 objects, term, 

96 match_mode = search_behaviors.match_mode, 

97 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

98 candidates = [ result.object for result in results[ : results_max * 3 ] ] 

99 if not candidates: 

100 return { 

101 'source': location, 

102 'query': term, 

103 'search_metadata': { 

104 'results_count': 0, 

105 'results_max': results_max, 

106 }, 

107 'documents': [ ], 

108 } 

109 sdetection = await _detection.detect_structure( 

110 auxdata, location, processor_name = processor_name ) 

111 contents = await sdetection.extract_contents( 

112 auxdata, location, candidates, include_snippets = include_snippets ) 

113 _validate_extraction_results( 

114 contents, candidates, sdetection.processor.name, location ) 

115 contents_by_relevance = sorted( 

116 contents, 

117 key = lambda x: x.get( 'relevance_score', 0.0 ), 

118 reverse = True ) 

119 contents_ = list( contents_by_relevance[ : results_max ] ) 

120 search_metadata: dict[ str, __.typx.Any ] = { 

121 'results_count': len( contents_ ), 

122 'results_max': results_max, 

123 } 

124 documents = [ 

125 { 

126 'name': result[ 'object_name' ], 

127 'type': result[ 'object_type' ], 

128 'domain': result[ 'domain' ], 

129 'priority': result[ 'priority' ], 

130 'url': result[ 'url' ], 

131 'signature': result[ 'signature' ], 

132 'description': result[ 'description' ], 

133 'content_snippet': result[ 'content_snippet' ], 

134 'relevance_score': result[ 'relevance_score' ], 

135 'match_reasons': result[ 'match_reasons' ] 

136 } 

137 for result in contents_ ] 

138 return { 

139 'source': location, 

140 'query': term, 

141 'search_metadata': search_metadata, 

142 'documents': documents, 

143 } 

144 

145 

146async def query_inventory( # noqa: PLR0913 

147 auxdata: _state.Globals, 

148 location: LocationArgument, 

149 term: str, /, *, 

150 processor_name: __.Absential[ str ] = __.absent, 

151 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

152 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

153 details: _interfaces.InventoryQueryDetails = ( 

154 _interfaces.InventoryQueryDetails.Documentation ), 

155 results_max: int = 5, 

156) -> __.typx.Annotated[ 

157 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory query return' ) ]: 

158 ''' Searches object inventory by name. 

159 

160 Returns configurable detail levels. Always includes object names 

161 plus requested detail flags (signatures, summaries, documentation). 

162 ''' 

163 location = normalize_location( location ) 

164 detection = await _detection.detect_inventory( 

165 auxdata, location, processor_name = processor_name ) 

166 objects = await detection.filter_inventory( 

167 auxdata, location, filters = filters, details = details ) 

168 results = _search.filter_by_name( 

169 objects, term, 

170 match_mode = search_behaviors.match_mode, 

171 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

172 selections = [ result.object for result in results[ : results_max ] ] 

173 documents = [ 

174 { 

175 'name': obj[ 'name' ], 

176 'role': obj[ 'role' ], 

177 'domain': obj.get( 'domain', '' ), 

178 'uri': obj[ 'uri' ], 

179 'dispname': obj[ 'dispname' ], 

180 } 

181 for obj in selections ] 

182 search_metadata: dict[ str, __.typx.Any ] = { 

183 'objects_count': len( selections ), 

184 'results_max': results_max, 

185 'matches_total': len( objects ), 

186 } 

187 return { 

188 'project': ( 

189 objects[ 0 ].get( '_inventory_project', 'Unknown' ) 

190 if objects else 'Unknown' ), 

191 'version': ( 

192 objects[ 0 ].get( '_inventory_version', 'Unknown' ) 

193 if objects else 'Unknown' ), 

194 'query': term, 

195 'documents': documents, 

196 'search_metadata': search_metadata, 

197 'objects_count': len( selections ), 

198 'source': location, 

199 } 

200 

201 

202async def summarize_inventory( # noqa: PLR0913 

203 auxdata: _state.Globals, 

204 location: LocationArgument, /, 

205 term: str = '', *, 

206 processor_name: __.Absential[ str ] = __.absent, 

207 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

208 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

209 group_by: __.typx.Optional[ str ] = None, 

210) -> __.typx.Annotated[ 

211 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory summary return' ) ]: 

212 ''' Provides structured summary of inventory data. ''' 

213 details = _interfaces.InventoryQueryDetails.Name 

214 inventory_result = await query_inventory( 

215 auxdata, location, term, processor_name = processor_name, 

216 search_behaviors = search_behaviors, filters = filters, 

217 results_max = 1000, # Large number to get all matches 

218 details = details ) 

219 if group_by is not None: 

220 objects_data = _group_documents_by_field( 

221 inventory_result[ 'documents' ], group_by ) 

222 else: objects_data = inventory_result[ 'documents' ] 

223 inventory_data: dict[ str, __.typx.Any ] = { 

224 'project': inventory_result[ 'project' ], 

225 'version': inventory_result[ 'version' ], 

226 'objects_count': 

227 inventory_result[ 'search_metadata' ][ 'matches_total' ], 

228 'objects': objects_data, 

229 } 

230 return inventory_data 

231 

232 

233async def survey_processors( 

234 auxdata: _state.Globals, /, 

235 genus: _interfaces.ProcessorGenera, 

236 name: __.typx.Optional[ str ] = None, 

237) -> dict[ str, __.typx.Any ]: 

238 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

239 match genus: 

240 case _interfaces.ProcessorGenera.Inventory: 

241 processors = dict( _processors.inventory_processors ) 

242 case _interfaces.ProcessorGenera.Structure: 

243 processors = dict( _processors.structure_processors ) 

244 if name is not None and name not in processors: 

245 raise _exceptions.ProcessorInavailability( name ) 

246 processors_capabilities = { 

247 name_: _serialize_dataclass( processor.capabilities ) 

248 for name_, processor in processors.items( ) 

249 if name is None or name_ == name } 

250 return { 'processors': processors_capabilities } 

251 

252 

253def _add_object_metadata_to_results( 

254 selected_objects: list[ dict[ str, __.typx.Any ] ], 

255 result: dict[ str, __.typx.Any ], 

256) -> None: 

257 ''' Adds object metadata without documentation to results. ''' 

258 for obj in selected_objects: 

259 document = _create_document_metadata( obj ) 

260 result[ 'documents' ].append( document ) 

261 

262 

263def _construct_explore_result_structure( # noqa: PLR0913 

264 inventory_data: dict[ str, __.typx.Any ], 

265 query: str, 

266 selected_objects: list[ dict[ str, __.typx.Any ] ], 

267 results_max: int, 

268 search_behaviors: _interfaces.SearchBehaviors, 

269 filters: __.cabc.Mapping[ str, __.typx.Any ], 

270) -> dict[ str, __.typx.Any ]: 

271 ''' Builds the base result structure with metadata. ''' 

272 search_metadata: dict[ str, __.typx.Any ] = { 

273 'objects_count': len( selected_objects ), 

274 'results_max': results_max, 

275 'matches_total': inventory_data[ 'objects_count' ], 

276 } 

277 result: dict[ str, __.typx.Any ] = { 

278 'project': inventory_data[ 'project' ], 

279 'version': inventory_data[ 'version' ], 

280 'query': query, 

281 'search_metadata': search_metadata, 

282 'documents': [ ], 

283 } 

284 return result 

285 

286 

287def _construct_query_result_structure( # noqa: PLR0913 

288 source: str, 

289 query: str, 

290 raw_results: list[ __.cabc.Mapping[ str, __.typx.Any ] ], 

291 results_max: int, 

292 search_behaviors: _interfaces.SearchBehaviors, 

293 filters: __.cabc.Mapping[ str, __.typx.Any ], 

294) -> dict[ str, __.typx.Any ]: 

295 ''' Builds query result structure in explore format. ''' 

296 search_metadata: dict[ str, __.typx.Any ] = { 

297 'results_count': len( raw_results ), 

298 'results_max': results_max, 

299 } 

300 documents: list[ dict[ str, __.typx.Any ] ] = [ ] 

301 for raw_result in raw_results: 

302 result_dict = dict( raw_result ) 

303 document: dict[ str, __.typx.Any ] = { 

304 'name': result_dict[ 'object_name' ], 

305 'type': result_dict[ 'object_type' ], 

306 'domain': result_dict[ 'domain' ], 

307 'priority': result_dict[ 'priority' ], 

308 'url': result_dict[ 'url' ], 

309 'signature': result_dict[ 'signature' ], 

310 'description': result_dict[ 'description' ], 

311 'content_snippet': result_dict[ 'content_snippet' ], 

312 'relevance_score': result_dict[ 'relevance_score' ], 

313 'match_reasons': result_dict[ 'match_reasons' ] 

314 } 

315 documents.append( document ) 

316 result: dict[ str, __.typx.Any ] = { 

317 'source': source, 

318 'query': query, 

319 'search_metadata': search_metadata, 

320 'documents': documents, 

321 } 

322 return result 

323 

324 

325def _create_document_with_docs( 

326 obj: dict[ str, __.typx.Any ], 

327 doc_result: __.cabc.Mapping[ str, __.typx.Any ], 

328) -> dict[ str, __.typx.Any ]: 

329 ''' Creates document structure with documentation content. ''' 

330 document = _create_document_metadata( obj ) 

331 document[ 'documentation' ] = doc_result 

332 return document 

333 

334 

335def _create_document_metadata( 

336 obj: dict[ str, __.typx.Any ] 

337) -> dict[ str, __.typx.Any ]: 

338 ''' Creates base document structure from object metadata. ''' 

339 document = { 

340 'name': obj[ 'name' ], 

341 'role': obj[ 'role' ], 

342 'domain': obj.get( 'domain', '' ), 

343 'uri': obj[ 'uri' ], 

344 'dispname': obj[ 'dispname' ], 

345 } 

346 if 'fuzzy_score' in obj: 

347 document[ 'fuzzy_score' ] = obj[ 'fuzzy_score' ] 

348 return document 

349 

350 

351def _format_inventory_summary( 

352 inventory_data: dict[ str, __.typx.Any ] 

353) -> str: 

354 ''' Formats inventory data into human-readable summary. ''' 

355 summary_lines: list[ str ] = [ 

356 f"Project: {inventory_data[ 'project' ]}", 

357 f"Version: {inventory_data[ 'version' ]}", 

358 f"Objects: {inventory_data[ 'objects_count' ]}", 

359 ] 

360 if inventory_data[ 'objects' ]: 

361 if isinstance( inventory_data[ 'objects' ], dict ): 

362 summary_lines.append( "\nBreakdown by groups:" ) 

363 grouped_objects = __.typx.cast( 

364 dict[ str, __.typx.Any ], inventory_data[ 'objects' ] ) 

365 for group_name, objects in grouped_objects.items( ): 

366 object_count = len( objects ) 

367 summary_lines.append( 

368 f" {group_name}: {object_count} objects" ) 

369 else: 

370 objects = inventory_data[ 'objects' ] 

371 summary_lines.append( "\nObjects listed without grouping." ) 

372 return '\n'.join( summary_lines ) 

373 

374 

375def _group_documents_by_field( 

376 documents: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ], 

377 field: __.typx.Optional[ str ] 

378) -> __.immut.Dictionary[ 

379 str, tuple[ __.cabc.Mapping[ str, __.typx.Any ], ... ] 

380]: 

381 ''' Groups documents by specified field for inventory format. ''' 

382 if field is None: return __.immut.Dictionary( ) 

383 groups: dict[ str, list[ __.cabc.Mapping[ str, __.typx.Any ] ] ] = { } 

384 for doc in documents: 

385 raw_value = doc.get( field, f"(missing {field})" ) 

386 if isinstance( raw_value, list ): 

387 str_value = "[list]" 

388 elif isinstance( raw_value, dict ): 

389 str_value = "[dict]" 

390 elif raw_value is None or raw_value == '': 

391 str_value = f"(missing {field})" 

392 else: 

393 str_value = str( raw_value ) 

394 if str_value not in groups: groups[ str_value ] = [ ] 

395 obj_data = { 

396 'name': doc[ 'name' ], 

397 'role': doc[ 'role' ], 

398 'domain': doc.get( 'domain', '' ), 

399 'uri': doc[ 'uri' ], 

400 'dispname': doc[ 'dispname' ], 

401 } 

402 if 'fuzzy_score' in doc: 

403 obj_data[ 'fuzzy_score' ] = doc[ 'fuzzy_score' ] 

404 obj = __.immut.Dictionary( obj_data ) 

405 groups[ str_value ].append( obj ) 

406 return __.immut.Dictionary( 

407 ( key, tuple( items ) ) for key, items in groups.items( ) ) 

408 

409 

410def _serialize_dataclass( obj: __.typx.Any ) -> __.typx.Any: 

411 ''' Recursively serializes dataclass objects to JSON-compatible format. ''' 

412 if __.dcls.is_dataclass( obj ): 

413 result = { } # type: ignore[var-annotated] 

414 for field in __.dcls.fields( obj ): 

415 if field.name.startswith( '_' ): 

416 continue # Skip private/internal fields 

417 value = getattr( obj, field.name ) 

418 result[ field.name ] = _serialize_dataclass( value ) 

419 return result # type: ignore[return-value] 

420 if isinstance( obj, list ): 

421 return [ _serialize_dataclass( item ) for item in obj ] # type: ignore[misc] 

422 if isinstance( obj, ( frozenset, set ) ): 

423 return list( obj ) # type: ignore[arg-type] 

424 if obj is None or isinstance( obj, ( str, int, float, bool ) ): 

425 return obj 

426 # For other objects, try to convert to string 

427 return str( obj ) 

428 

429 

430def _select_top_objects( 

431 inventory_data: dict[ str, __.typx.Any ], 

432 results_max: int 

433) -> list[ dict[ str, __.typx.Any ] ]: 

434 ''' Selects top objects from inventory, sorted by fuzzy score. ''' 

435 all_objects: list[ dict[ str, __.typx.Any ] ] = [ ] 

436 for domain_objects in inventory_data[ 'objects' ].values( ): 

437 all_objects.extend( domain_objects ) 

438 all_objects.sort( 

439 key = lambda obj: obj.get( 'fuzzy_score', 0 ), 

440 reverse = True ) 

441 return all_objects[ : results_max ] 

442 

443 

444def _validate_extraction_results( 

445 results: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ], 

446 requested_objects: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ], 

447 processor_name: str, 

448 source: str 

449) -> None: 

450 ''' Validates that extraction results contain meaningful content. ''' 

451 if not requested_objects: return 

452 if not results: 

453 raise _exceptions.StructureIncompatibility( processor_name, source ) 

454 meaningful_results = 0 

455 for result in results: 

456 signature = result.get( 'signature', '' ).strip( ) 

457 description = result.get( 'description', '' ).strip( ) 

458 if signature or description: meaningful_results += 1 

459 success_rate = meaningful_results / len( requested_objects ) 

460 if success_rate < _SUCCESS_RATE_MINIMUM: 

461 raise _exceptions.ContentExtractFailure( 

462 processor_name, source, meaningful_results, 

463 len( requested_objects ) )