Coverage for sources/librovore/functions.py: 20%
195 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 22:48 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 22:48 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import search as _search
30from . import state as _state
33DocumentationResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
34SearchResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
35LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
36 str, __.ddoc.Fname( 'location argument' ) ]
39_search_behaviors_default = _interfaces.SearchBehaviors( )
40_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
45def normalize_location( location: str ) -> str:
46 ''' Normalizes location URL by stripping index.html. '''
47 if location.endswith( '/index.html' ):
48 location = location[ : -11 ]
49 return location
50_SUCCESS_RATE_MINIMUM = 0.1
53async def detect(
54 auxdata: _state.Globals,
55 location: LocationArgument, /,
56 genus: _interfaces.ProcessorGenera,
57 processor_name: __.Absential[ str ] = __.absent,
58) -> dict[ str, __.typx.Any ]:
59 ''' Detects relevant processors of particular genus for location. '''
60 location = normalize_location( location )
61 start_time = __.time.perf_counter( )
62 detections, detection_optimal = (
63 await _detection.access_detections(
64 auxdata, location, genus = genus ) )
65 end_time = __.time.perf_counter( )
66 detection_time_ms = int( ( end_time - start_time ) * 1000 )
67 if __.is_absent( detection_optimal ):
68 # Create a synthetic exception to get proper error formatting
69 genus_name = (
70 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
71 exc = _exceptions.ProcessorInavailability( genus_name )
72 return _produce_processor_error_response( exc, genus = genus )
73 response = _processors.DetectionsForLocation(
74 source = location,
75 detections = detections,
76 detection_optimal = detection_optimal,
77 time_detection_ms = detection_time_ms )
78 return {
79 'success': True,
80 'data': serialize_for_json( response ),
81 }
84async def query_content( # noqa: PLR0913
85 auxdata: _state.Globals,
86 location: LocationArgument,
87 term: str, /, *,
88 processor_name: __.Absential[ str ] = __.absent,
89 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
90 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
91 include_snippets: bool = True,
92 results_max: int = 10,
93) -> __.typx.Annotated[
94 dict[ str, __.typx.Any ],
95 __.ddoc.Fname( 'content query return' ) ]:
96 ''' Searches documentation content with relevance ranking. '''
97 location = normalize_location( location )
98 try:
99 idetection = await _detection.detect_inventory(
100 auxdata, location, processor_name = processor_name )
101 except _exceptions.ProcessorInavailability as exc:
102 return _produce_processor_error_response(
103 exc, genus = _interfaces.ProcessorGenera.Inventory )
104 # Resolve URL after detection to get working URL if redirect exists
105 resolved_location = _detection.resolve_source_url( location )
106 objects = await idetection.filter_inventory(
107 auxdata, resolved_location,
108 filters = filters,
109 details = _interfaces.InventoryQueryDetails.Name )
110 results = _search.filter_by_name(
111 objects, term,
112 match_mode = search_behaviors.match_mode,
113 fuzzy_threshold = search_behaviors.fuzzy_threshold )
114 candidates = [ result.object for result in results[ : results_max * 3 ] ]
115 if not candidates:
116 return {
117 'success': True,
118 'data': {
119 'source': location,
120 'query': term,
121 'search_metadata': {
122 'results_count': 0,
123 'results_max': results_max,
124 },
125 'documents': [ ],
126 },
127 }
128 try:
129 sdetection = await _detection.detect_structure(
130 auxdata, location, processor_name = processor_name )
131 except _exceptions.ProcessorInavailability as exc:
132 return _produce_processor_error_response(
133 exc, genus = _interfaces.ProcessorGenera.Structure )
134 contents = await sdetection.extract_contents(
135 auxdata, location, candidates, include_snippets = include_snippets )
136 _validate_extraction_results(
137 contents, candidates, sdetection.processor.name, location )
138 contents_by_relevance = sorted(
139 contents,
140 key = lambda x: x.get( 'relevance_score', 0.0 ),
141 reverse = True )
142 contents_ = list( contents_by_relevance[ : results_max ] )
143 search_metadata: dict[ str, __.typx.Any ] = {
144 'results_count': len( contents_ ),
145 'results_max': results_max,
146 }
147 documents = [
148 {
149 'name': result[ 'object_name' ],
150 'type': result[ 'object_type' ],
151 'domain': result[ 'domain' ],
152 'priority': result[ 'priority' ],
153 'url': result[ 'url' ],
154 'signature': result[ 'signature' ],
155 'description': result[ 'description' ],
156 'content_snippet': result[ 'content_snippet' ],
157 'relevance_score': result[ 'relevance_score' ],
158 'match_reasons': result[ 'match_reasons' ]
159 }
160 for result in contents_ ]
161 return {
162 'success': True,
163 'data': {
164 'source': resolved_location,
165 'query': term,
166 'search_metadata': search_metadata,
167 'documents': documents,
168 },
169 }
172async def query_inventory( # noqa: PLR0913
173 auxdata: _state.Globals,
174 location: LocationArgument,
175 term: str, /, *,
176 processor_name: __.Absential[ str ] = __.absent,
177 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
178 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
179 details: _interfaces.InventoryQueryDetails = (
180 _interfaces.InventoryQueryDetails.Documentation ),
181 results_max: int = 5,
182) -> __.typx.Annotated[
183 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory query return' ) ]:
184 ''' Searches object inventory by name.
186 Returns configurable detail levels. Always includes object names
187 plus requested detail flags (signatures, summaries, documentation).
188 '''
189 location = normalize_location( location )
190 try:
191 detection = await _detection.detect_inventory(
192 auxdata, location, processor_name = processor_name )
193 except _exceptions.ProcessorInavailability as exc:
194 return _produce_processor_error_response(
195 exc, genus = _interfaces.ProcessorGenera.Inventory )
196 # Resolve URL after detection to get working URL if redirect exists
197 resolved_location = _detection.resolve_source_url( location )
198 objects = await detection.filter_inventory(
199 auxdata, resolved_location, filters = filters, details = details )
200 results = _search.filter_by_name(
201 objects, term,
202 match_mode = search_behaviors.match_mode,
203 fuzzy_threshold = search_behaviors.fuzzy_threshold )
204 selections = [ result.object for result in results[ : results_max ] ]
205 documents = [
206 {
207 'name': obj[ 'name' ],
208 'role': obj[ 'role' ],
209 'domain': obj.get( 'domain', '' ),
210 'uri': obj[ 'uri' ],
211 'dispname': obj[ 'dispname' ],
212 }
213 for obj in selections ]
214 search_metadata: dict[ str, __.typx.Any ] = {
215 'objects_count': len( selections ),
216 'results_max': results_max,
217 'matches_total': len( objects ),
218 }
219 return {
220 'success': True,
221 'data': {
222 'project': (
223 objects[ 0 ].get( '_inventory_project', 'Unknown' )
224 if objects else 'Unknown' ),
225 'version': (
226 objects[ 0 ].get( '_inventory_version', 'Unknown' )
227 if objects else 'Unknown' ),
228 'query': term,
229 'documents': documents,
230 'search_metadata': search_metadata,
231 'objects_count': len( selections ),
232 'source': resolved_location,
233 },
234 }
237async def summarize_inventory( # noqa: PLR0913
238 auxdata: _state.Globals,
239 location: LocationArgument, /,
240 term: str = '', *,
241 processor_name: __.Absential[ str ] = __.absent,
242 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
243 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
244 group_by: __.typx.Optional[ str ] = None,
245) -> __.typx.Annotated[
246 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory summary return' ) ]:
247 ''' Provides structured summary of inventory data. '''
248 details = _interfaces.InventoryQueryDetails.Name
249 inventory_result = await query_inventory(
250 auxdata, location, term, processor_name = processor_name,
251 search_behaviors = search_behaviors, filters = filters,
252 results_max = 1000, # Large number to get all matches
253 details = details )
254 if not inventory_result[ 'success' ]:
255 return inventory_result # Forward error response
256 result_data = inventory_result[ 'data' ]
257 if group_by is not None:
258 objects_data = _group_documents_by_field(
259 result_data[ 'documents' ], group_by )
260 else: objects_data = result_data[ 'documents' ]
261 inventory_data: dict[ str, __.typx.Any ] = {
262 'project': result_data[ 'project' ],
263 'version': result_data[ 'version' ],
264 'objects_count':
265 result_data[ 'search_metadata' ][ 'matches_total' ],
266 'objects': objects_data,
267 }
268 return {
269 'success': True,
270 'data': serialize_for_json( inventory_data ),
271 }
274async def survey_processors(
275 auxdata: _state.Globals, /,
276 genus: _interfaces.ProcessorGenera,
277 name: __.typx.Optional[ str ] = None,
278) -> dict[ str, __.typx.Any ]:
279 ''' Lists processor capabilities for specified genus, filtered by name. '''
280 match genus:
281 case _interfaces.ProcessorGenera.Inventory:
282 processors = dict( _processors.inventory_processors )
283 case _interfaces.ProcessorGenera.Structure:
284 processors = dict( _processors.structure_processors )
285 if name is not None and name not in processors:
286 raise _exceptions.ProcessorInavailability( name )
287 processors_capabilities = {
288 name_: serialize_for_json( processor.capabilities )
289 for name_, processor in processors.items( )
290 if name is None or name_ == name }
291 return { 'processors': processors_capabilities }
294def _add_object_metadata_to_results(
295 selected_objects: list[ dict[ str, __.typx.Any ] ],
296 result: dict[ str, __.typx.Any ],
297) -> None:
298 ''' Adds object metadata without documentation to results. '''
299 for obj in selected_objects:
300 document = _create_document_metadata( obj )
301 result[ 'documents' ].append( document )
304def _produce_generic_error_response(
305 exc: _exceptions.ProcessorInavailability
306) -> dict[ str, __.typx.Any ]:
307 ''' Produces structured error response for generic processor failures. '''
308 return {
309 'success': False,
310 'error': {
311 'type': 'processor_unavailable',
312 'title': 'No Compatible Processor Found',
313 'message': (
314 'No compatible processor found to handle this '
315 'documentation source.' ),
316 'suggestion': (
317 'Verify the URL points to a supported documentation '
318 'format.' ),
319 },
320 }
323def _produce_inventory_error_response(
324 exc: _exceptions.ProcessorInavailability
325) -> dict[ str, __.typx.Any ]:
326 ''' Produces structured error response for inventory failures. '''
327 return {
328 'success': False,
329 'error': {
330 'type': 'processor_unavailable',
331 'title': 'No Compatible Format Detected',
332 'message': (
333 'No compatible inventory format detected at this '
334 'documentation source.' ),
335 'suggestion': (
336 'Verify the URL points to a Sphinx documentation site '
337 'with objects.inv file.' ),
338 },
339 }
342def _produce_processor_error_response(
343 exc: _exceptions.ProcessorInavailability,
344 genus: __.Absential[ _interfaces.ProcessorGenera ] = __.absent,
345) -> dict[ str, __.typx.Any ]:
346 ''' Produces appropriate structured error response based on genus. '''
347 if __.is_absent( genus ):
348 return _produce_generic_error_response( exc )
349 match genus:
350 case _interfaces.ProcessorGenera.Inventory:
351 return _produce_inventory_error_response( exc )
352 case _interfaces.ProcessorGenera.Structure:
353 return _produce_structure_error_response( exc )
354 case _:
355 return _produce_generic_error_response( exc )
358def _produce_structure_error_response(
359 exc: _exceptions.ProcessorInavailability
360) -> dict[ str, __.typx.Any ]:
361 ''' Produces structured error response for structure failures. '''
362 return {
363 'success': False,
364 'error': {
365 'type': 'processor_unavailable',
366 'title': 'No Compatible Structure Processor',
367 'message': (
368 'No compatible structure processor found for this '
369 'documentation source.' ),
370 'suggestion': (
371 'Ensure the site uses a supported documentation format '
372 'like Sphinx or MkDocs.' ),
373 },
374 }
377def _construct_explore_result_structure( # noqa: PLR0913
378 inventory_data: dict[ str, __.typx.Any ],
379 query: str,
380 selected_objects: list[ dict[ str, __.typx.Any ] ],
381 results_max: int,
382 search_behaviors: _interfaces.SearchBehaviors,
383 filters: __.cabc.Mapping[ str, __.typx.Any ],
384) -> dict[ str, __.typx.Any ]:
385 ''' Builds the base result structure with metadata. '''
386 search_metadata: dict[ str, __.typx.Any ] = {
387 'objects_count': len( selected_objects ),
388 'results_max': results_max,
389 'matches_total': inventory_data[ 'objects_count' ],
390 }
391 result: dict[ str, __.typx.Any ] = {
392 'project': inventory_data[ 'project' ],
393 'version': inventory_data[ 'version' ],
394 'query': query,
395 'search_metadata': search_metadata,
396 'documents': [ ],
397 }
398 return result
401def _construct_query_result_structure( # noqa: PLR0913
402 source: str,
403 query: str,
404 raw_results: list[ __.cabc.Mapping[ str, __.typx.Any ] ],
405 results_max: int,
406 search_behaviors: _interfaces.SearchBehaviors,
407 filters: __.cabc.Mapping[ str, __.typx.Any ],
408) -> dict[ str, __.typx.Any ]:
409 ''' Builds query result structure in explore format. '''
410 search_metadata: dict[ str, __.typx.Any ] = {
411 'results_count': len( raw_results ),
412 'results_max': results_max,
413 }
414 documents: list[ dict[ str, __.typx.Any ] ] = [ ]
415 for raw_result in raw_results:
416 result_dict = dict( raw_result )
417 document: dict[ str, __.typx.Any ] = {
418 'name': result_dict[ 'object_name' ],
419 'type': result_dict[ 'object_type' ],
420 'domain': result_dict[ 'domain' ],
421 'priority': result_dict[ 'priority' ],
422 'url': result_dict[ 'url' ],
423 'signature': result_dict[ 'signature' ],
424 'description': result_dict[ 'description' ],
425 'content_snippet': result_dict[ 'content_snippet' ],
426 'relevance_score': result_dict[ 'relevance_score' ],
427 'match_reasons': result_dict[ 'match_reasons' ]
428 }
429 documents.append( document )
430 result: dict[ str, __.typx.Any ] = {
431 'source': source,
432 'query': query,
433 'search_metadata': search_metadata,
434 'documents': documents,
435 }
436 return result
439def _create_document_with_docs(
440 obj: dict[ str, __.typx.Any ],
441 doc_result: __.cabc.Mapping[ str, __.typx.Any ],
442) -> dict[ str, __.typx.Any ]:
443 ''' Creates document structure with documentation content. '''
444 document = _create_document_metadata( obj )
445 document[ 'documentation' ] = doc_result
446 return document
449def _create_document_metadata(
450 obj: dict[ str, __.typx.Any ]
451) -> dict[ str, __.typx.Any ]:
452 ''' Creates base document structure from object metadata. '''
453 document = {
454 'name': obj[ 'name' ],
455 'role': obj[ 'role' ],
456 'domain': obj.get( 'domain', '' ),
457 'uri': obj[ 'uri' ],
458 'dispname': obj[ 'dispname' ],
459 }
460 if 'fuzzy_score' in obj:
461 document[ 'fuzzy_score' ] = obj[ 'fuzzy_score' ]
462 return document
465def _format_inventory_summary(
466 inventory_data: dict[ str, __.typx.Any ]
467) -> str:
468 ''' Formats inventory data into human-readable summary. '''
469 summary_lines: list[ str ] = [
470 f"Project: {inventory_data[ 'project' ]}",
471 f"Version: {inventory_data[ 'version' ]}",
472 f"Objects: {inventory_data[ 'objects_count' ]}",
473 ]
474 if inventory_data[ 'objects' ]:
475 if isinstance( inventory_data[ 'objects' ], dict ):
476 summary_lines.append( "\nBreakdown by groups:" )
477 grouped_objects = __.typx.cast(
478 dict[ str, __.typx.Any ], inventory_data[ 'objects' ] )
479 for group_name, objects in grouped_objects.items( ):
480 object_count = len( objects )
481 summary_lines.append(
482 f" {group_name}: {object_count} objects" )
483 else:
484 objects = inventory_data[ 'objects' ]
485 summary_lines.append( "\nObjects listed without grouping." )
486 return '\n'.join( summary_lines )
489def _group_documents_by_field(
490 documents: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ],
491 field: __.typx.Optional[ str ]
492) -> __.immut.Dictionary[
493 str, tuple[ __.cabc.Mapping[ str, __.typx.Any ], ... ]
494]:
495 ''' Groups documents by specified field for inventory format. '''
496 if field is None: return __.immut.Dictionary( )
497 groups: dict[ str, list[ __.cabc.Mapping[ str, __.typx.Any ] ] ] = { }
498 for doc in documents:
499 raw_value = doc.get( field, f"(missing {field})" )
500 if isinstance( raw_value, list ):
501 str_value = "[list]"
502 elif isinstance( raw_value, dict ):
503 str_value = "[dict]"
504 elif raw_value is None or raw_value == '':
505 str_value = f"(missing {field})"
506 else:
507 str_value = str( raw_value )
508 if str_value not in groups: groups[ str_value ] = [ ]
509 obj_data = {
510 'name': doc[ 'name' ],
511 'role': doc[ 'role' ],
512 'domain': doc.get( 'domain', '' ),
513 'uri': doc[ 'uri' ],
514 'dispname': doc[ 'dispname' ],
515 }
516 if 'fuzzy_score' in doc:
517 obj_data[ 'fuzzy_score' ] = doc[ 'fuzzy_score' ]
518 obj = __.immut.Dictionary( obj_data )
519 groups[ str_value ].append( obj )
520 return __.immut.Dictionary(
521 ( key, tuple( items ) ) for key, items in groups.items( ) )
524def serialize_for_json( obj: __.typx.Any ) -> __.typx.Any:
525 ''' Recursively serializes dataclass objects to JSON-compatible format. '''
526 if __.dcls.is_dataclass( obj ):
527 result = { } # type: ignore[var-annotated]
528 for field in __.dcls.fields( obj ):
529 if field.name.startswith( '_' ):
530 continue # Skip private/internal fields
531 value = getattr( obj, field.name )
532 result[ field.name ] = serialize_for_json( value )
533 return result # type: ignore[return-value]
534 if isinstance( obj, ( list, tuple ) ):
535 return [ serialize_for_json( item ) for item in obj ] # type: ignore[misc]
536 if isinstance( obj, ( frozenset, set ) ):
537 return list( obj ) # type: ignore[arg-type]
538 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary)
539 return { k: serialize_for_json( v ) for k, v in obj.items( ) }
540 if obj is None or isinstance( obj, ( str, int, float, bool ) ):
541 return obj
542 # For other objects, try to convert to string
543 return str( obj )
546def _select_top_objects(
547 inventory_data: dict[ str, __.typx.Any ],
548 results_max: int
549) -> list[ dict[ str, __.typx.Any ] ]:
550 ''' Selects top objects from inventory, sorted by fuzzy score. '''
551 all_objects: list[ dict[ str, __.typx.Any ] ] = [ ]
552 for domain_objects in inventory_data[ 'objects' ].values( ):
553 all_objects.extend( domain_objects )
554 all_objects.sort(
555 key = lambda obj: obj.get( 'fuzzy_score', 0 ),
556 reverse = True )
557 return all_objects[ : results_max ]
560def _validate_extraction_results(
561 results: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ],
562 requested_objects: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ],
563 processor_name: str,
564 source: str
565) -> None:
566 ''' Validates that extraction results contain meaningful content. '''
567 if not requested_objects: return
568 if not results:
569 raise _exceptions.StructureIncompatibility( processor_name, source )
570 meaningful_results = 0
571 for result in results:
572 signature = result.get( 'signature', '' ).strip( )
573 description = result.get( 'description', '' ).strip( )
574 if signature or description: meaningful_results += 1
575 success_rate = meaningful_results / len( requested_objects )
576 if success_rate < _SUCCESS_RATE_MINIMUM:
577 raise _exceptions.ContentExtractFailure(
578 processor_name, source, meaningful_results,
579 len( requested_objects ) )