Coverage for sources/librovore/functions.py: 22%
159 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-17 23:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-17 23:43 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import search as _search
30from . import state as _state
33DocumentationResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
34SearchResult: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
35LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
36 str, __.ddoc.Fname( 'location argument' ) ]
39_search_behaviors_default = _interfaces.SearchBehaviors( )
40_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
43def normalize_location( location: str ) -> str:
44 ''' Normalizes location URL by stripping index.html. '''
45 if location.endswith( '/index.html' ):
46 location = location[ : -11 ]
47 return location
48_SUCCESS_RATE_MINIMUM = 0.1
51async def detect(
52 auxdata: _state.Globals,
53 location: LocationArgument, /,
54 genus: _interfaces.ProcessorGenera,
55 processor_name: __.Absential[ str ] = __.absent,
56) -> dict[ str, __.typx.Any ]:
57 ''' Detects relevant processors of particular genus for location. '''
58 location = normalize_location( location )
59 start_time = __.time.perf_counter( )
60 detections, detection_optimal = (
61 await _detection.access_detections(
62 auxdata, location, genus = genus ) )
63 end_time = __.time.perf_counter( )
64 detection_time_ms = int( ( end_time - start_time ) * 1000 )
65 response = _processors.DetectionsForLocation(
66 source = location,
67 detections = detections,
68 detection_optimal = (
69 None if __.is_absent( detection_optimal ) else detection_optimal ),
70 time_detection_ms = detection_time_ms )
71 return _serialize_dataclass( response )
74async def query_content( # noqa: PLR0913
75 auxdata: _state.Globals,
76 location: LocationArgument,
77 term: str, /, *,
78 processor_name: __.Absential[ str ] = __.absent,
79 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
80 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
81 include_snippets: bool = True,
82 results_max: int = 10,
83) -> __.typx.Annotated[
84 dict[ str, __.typx.Any ],
85 __.ddoc.Fname( 'content query return' ) ]:
86 ''' Searches documentation content with relevance ranking. '''
87 location = normalize_location( location )
88 idetection = await _detection.detect_inventory(
89 auxdata, location, processor_name = processor_name )
90 objects = await idetection.filter_inventory(
91 auxdata, location,
92 filters = filters,
93 details = _interfaces.InventoryQueryDetails.Name )
94 results = _search.filter_by_name(
95 objects, term,
96 match_mode = search_behaviors.match_mode,
97 fuzzy_threshold = search_behaviors.fuzzy_threshold )
98 candidates = [ result.object for result in results[ : results_max * 3 ] ]
99 if not candidates:
100 return {
101 'source': location,
102 'query': term,
103 'search_metadata': {
104 'results_count': 0,
105 'results_max': results_max,
106 },
107 'documents': [ ],
108 }
109 sdetection = await _detection.detect_structure(
110 auxdata, location, processor_name = processor_name )
111 contents = await sdetection.extract_contents(
112 auxdata, location, candidates, include_snippets = include_snippets )
113 _validate_extraction_results(
114 contents, candidates, sdetection.processor.name, location )
115 contents_by_relevance = sorted(
116 contents,
117 key = lambda x: x.get( 'relevance_score', 0.0 ),
118 reverse = True )
119 contents_ = list( contents_by_relevance[ : results_max ] )
120 search_metadata: dict[ str, __.typx.Any ] = {
121 'results_count': len( contents_ ),
122 'results_max': results_max,
123 }
124 documents = [
125 {
126 'name': result[ 'object_name' ],
127 'type': result[ 'object_type' ],
128 'domain': result[ 'domain' ],
129 'priority': result[ 'priority' ],
130 'url': result[ 'url' ],
131 'signature': result[ 'signature' ],
132 'description': result[ 'description' ],
133 'content_snippet': result[ 'content_snippet' ],
134 'relevance_score': result[ 'relevance_score' ],
135 'match_reasons': result[ 'match_reasons' ]
136 }
137 for result in contents_ ]
138 return {
139 'source': location,
140 'query': term,
141 'search_metadata': search_metadata,
142 'documents': documents,
143 }
146async def query_inventory( # noqa: PLR0913
147 auxdata: _state.Globals,
148 location: LocationArgument,
149 term: str, /, *,
150 processor_name: __.Absential[ str ] = __.absent,
151 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
152 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
153 details: _interfaces.InventoryQueryDetails = (
154 _interfaces.InventoryQueryDetails.Documentation ),
155 results_max: int = 5,
156) -> __.typx.Annotated[
157 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory query return' ) ]:
158 ''' Searches object inventory by name.
160 Returns configurable detail levels. Always includes object names
161 plus requested detail flags (signatures, summaries, documentation).
162 '''
163 location = normalize_location( location )
164 detection = await _detection.detect_inventory(
165 auxdata, location, processor_name = processor_name )
166 objects = await detection.filter_inventory(
167 auxdata, location, filters = filters, details = details )
168 results = _search.filter_by_name(
169 objects, term,
170 match_mode = search_behaviors.match_mode,
171 fuzzy_threshold = search_behaviors.fuzzy_threshold )
172 selections = [ result.object for result in results[ : results_max ] ]
173 documents = [
174 {
175 'name': obj[ 'name' ],
176 'role': obj[ 'role' ],
177 'domain': obj.get( 'domain', '' ),
178 'uri': obj[ 'uri' ],
179 'dispname': obj[ 'dispname' ],
180 }
181 for obj in selections ]
182 search_metadata: dict[ str, __.typx.Any ] = {
183 'objects_count': len( selections ),
184 'results_max': results_max,
185 'matches_total': len( objects ),
186 }
187 return {
188 'project': (
189 objects[ 0 ].get( '_inventory_project', 'Unknown' )
190 if objects else 'Unknown' ),
191 'version': (
192 objects[ 0 ].get( '_inventory_version', 'Unknown' )
193 if objects else 'Unknown' ),
194 'query': term,
195 'documents': documents,
196 'search_metadata': search_metadata,
197 'objects_count': len( selections ),
198 'source': location,
199 }
202async def summarize_inventory( # noqa: PLR0913
203 auxdata: _state.Globals,
204 location: LocationArgument, /,
205 term: str = '', *,
206 processor_name: __.Absential[ str ] = __.absent,
207 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
208 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
209 group_by: __.typx.Optional[ str ] = None,
210) -> __.typx.Annotated[
211 dict[ str, __.typx.Any ], __.ddoc.Fname( 'inventory summary return' ) ]:
212 ''' Provides structured summary of inventory data. '''
213 details = _interfaces.InventoryQueryDetails.Name
214 inventory_result = await query_inventory(
215 auxdata, location, term, processor_name = processor_name,
216 search_behaviors = search_behaviors, filters = filters,
217 results_max = 1000, # Large number to get all matches
218 details = details )
219 if group_by is not None:
220 objects_data = _group_documents_by_field(
221 inventory_result[ 'documents' ], group_by )
222 else: objects_data = inventory_result[ 'documents' ]
223 inventory_data: dict[ str, __.typx.Any ] = {
224 'project': inventory_result[ 'project' ],
225 'version': inventory_result[ 'version' ],
226 'objects_count':
227 inventory_result[ 'search_metadata' ][ 'matches_total' ],
228 'objects': objects_data,
229 }
230 return inventory_data
233async def survey_processors(
234 auxdata: _state.Globals, /,
235 genus: _interfaces.ProcessorGenera,
236 name: __.typx.Optional[ str ] = None,
237) -> dict[ str, __.typx.Any ]:
238 ''' Lists processor capabilities for specified genus, filtered by name. '''
239 match genus:
240 case _interfaces.ProcessorGenera.Inventory:
241 processors = dict( _processors.inventory_processors )
242 case _interfaces.ProcessorGenera.Structure:
243 processors = dict( _processors.structure_processors )
244 if name is not None and name not in processors:
245 raise _exceptions.ProcessorInavailability( name )
246 processors_capabilities = {
247 name_: _serialize_dataclass( processor.capabilities )
248 for name_, processor in processors.items( )
249 if name is None or name_ == name }
250 return { 'processors': processors_capabilities }
253def _add_object_metadata_to_results(
254 selected_objects: list[ dict[ str, __.typx.Any ] ],
255 result: dict[ str, __.typx.Any ],
256) -> None:
257 ''' Adds object metadata without documentation to results. '''
258 for obj in selected_objects:
259 document = _create_document_metadata( obj )
260 result[ 'documents' ].append( document )
263def _construct_explore_result_structure( # noqa: PLR0913
264 inventory_data: dict[ str, __.typx.Any ],
265 query: str,
266 selected_objects: list[ dict[ str, __.typx.Any ] ],
267 results_max: int,
268 search_behaviors: _interfaces.SearchBehaviors,
269 filters: __.cabc.Mapping[ str, __.typx.Any ],
270) -> dict[ str, __.typx.Any ]:
271 ''' Builds the base result structure with metadata. '''
272 search_metadata: dict[ str, __.typx.Any ] = {
273 'objects_count': len( selected_objects ),
274 'results_max': results_max,
275 'matches_total': inventory_data[ 'objects_count' ],
276 }
277 result: dict[ str, __.typx.Any ] = {
278 'project': inventory_data[ 'project' ],
279 'version': inventory_data[ 'version' ],
280 'query': query,
281 'search_metadata': search_metadata,
282 'documents': [ ],
283 }
284 return result
287def _construct_query_result_structure( # noqa: PLR0913
288 source: str,
289 query: str,
290 raw_results: list[ __.cabc.Mapping[ str, __.typx.Any ] ],
291 results_max: int,
292 search_behaviors: _interfaces.SearchBehaviors,
293 filters: __.cabc.Mapping[ str, __.typx.Any ],
294) -> dict[ str, __.typx.Any ]:
295 ''' Builds query result structure in explore format. '''
296 search_metadata: dict[ str, __.typx.Any ] = {
297 'results_count': len( raw_results ),
298 'results_max': results_max,
299 }
300 documents: list[ dict[ str, __.typx.Any ] ] = [ ]
301 for raw_result in raw_results:
302 result_dict = dict( raw_result )
303 document: dict[ str, __.typx.Any ] = {
304 'name': result_dict[ 'object_name' ],
305 'type': result_dict[ 'object_type' ],
306 'domain': result_dict[ 'domain' ],
307 'priority': result_dict[ 'priority' ],
308 'url': result_dict[ 'url' ],
309 'signature': result_dict[ 'signature' ],
310 'description': result_dict[ 'description' ],
311 'content_snippet': result_dict[ 'content_snippet' ],
312 'relevance_score': result_dict[ 'relevance_score' ],
313 'match_reasons': result_dict[ 'match_reasons' ]
314 }
315 documents.append( document )
316 result: dict[ str, __.typx.Any ] = {
317 'source': source,
318 'query': query,
319 'search_metadata': search_metadata,
320 'documents': documents,
321 }
322 return result
325def _create_document_with_docs(
326 obj: dict[ str, __.typx.Any ],
327 doc_result: __.cabc.Mapping[ str, __.typx.Any ],
328) -> dict[ str, __.typx.Any ]:
329 ''' Creates document structure with documentation content. '''
330 document = _create_document_metadata( obj )
331 document[ 'documentation' ] = doc_result
332 return document
335def _create_document_metadata(
336 obj: dict[ str, __.typx.Any ]
337) -> dict[ str, __.typx.Any ]:
338 ''' Creates base document structure from object metadata. '''
339 document = {
340 'name': obj[ 'name' ],
341 'role': obj[ 'role' ],
342 'domain': obj.get( 'domain', '' ),
343 'uri': obj[ 'uri' ],
344 'dispname': obj[ 'dispname' ],
345 }
346 if 'fuzzy_score' in obj:
347 document[ 'fuzzy_score' ] = obj[ 'fuzzy_score' ]
348 return document
351def _format_inventory_summary(
352 inventory_data: dict[ str, __.typx.Any ]
353) -> str:
354 ''' Formats inventory data into human-readable summary. '''
355 summary_lines: list[ str ] = [
356 f"Project: {inventory_data[ 'project' ]}",
357 f"Version: {inventory_data[ 'version' ]}",
358 f"Objects: {inventory_data[ 'objects_count' ]}",
359 ]
360 if inventory_data[ 'objects' ]:
361 if isinstance( inventory_data[ 'objects' ], dict ):
362 summary_lines.append( "\nBreakdown by groups:" )
363 grouped_objects = __.typx.cast(
364 dict[ str, __.typx.Any ], inventory_data[ 'objects' ] )
365 for group_name, objects in grouped_objects.items( ):
366 object_count = len( objects )
367 summary_lines.append(
368 f" {group_name}: {object_count} objects" )
369 else:
370 objects = inventory_data[ 'objects' ]
371 summary_lines.append( "\nObjects listed without grouping." )
372 return '\n'.join( summary_lines )
375def _group_documents_by_field(
376 documents: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ],
377 field: __.typx.Optional[ str ]
378) -> __.immut.Dictionary[
379 str, tuple[ __.cabc.Mapping[ str, __.typx.Any ], ... ]
380]:
381 ''' Groups documents by specified field for inventory format. '''
382 if field is None: return __.immut.Dictionary( )
383 groups: dict[ str, list[ __.cabc.Mapping[ str, __.typx.Any ] ] ] = { }
384 for doc in documents:
385 raw_value = doc.get( field, f"(missing {field})" )
386 if isinstance( raw_value, list ):
387 str_value = "[list]"
388 elif isinstance( raw_value, dict ):
389 str_value = "[dict]"
390 elif raw_value is None or raw_value == '':
391 str_value = f"(missing {field})"
392 else:
393 str_value = str( raw_value )
394 if str_value not in groups: groups[ str_value ] = [ ]
395 obj_data = {
396 'name': doc[ 'name' ],
397 'role': doc[ 'role' ],
398 'domain': doc.get( 'domain', '' ),
399 'uri': doc[ 'uri' ],
400 'dispname': doc[ 'dispname' ],
401 }
402 if 'fuzzy_score' in doc:
403 obj_data[ 'fuzzy_score' ] = doc[ 'fuzzy_score' ]
404 obj = __.immut.Dictionary( obj_data )
405 groups[ str_value ].append( obj )
406 return __.immut.Dictionary(
407 ( key, tuple( items ) ) for key, items in groups.items( ) )
410def _serialize_dataclass( obj: __.typx.Any ) -> __.typx.Any:
411 ''' Recursively serializes dataclass objects to JSON-compatible format. '''
412 if __.dcls.is_dataclass( obj ):
413 result = { } # type: ignore[var-annotated]
414 for field in __.dcls.fields( obj ):
415 if field.name.startswith( '_' ):
416 continue # Skip private/internal fields
417 value = getattr( obj, field.name )
418 result[ field.name ] = _serialize_dataclass( value )
419 return result # type: ignore[return-value]
420 if isinstance( obj, list ):
421 return [ _serialize_dataclass( item ) for item in obj ] # type: ignore[misc]
422 if isinstance( obj, ( frozenset, set ) ):
423 return list( obj ) # type: ignore[arg-type]
424 if obj is None or isinstance( obj, ( str, int, float, bool ) ):
425 return obj
426 # For other objects, try to convert to string
427 return str( obj )
430def _select_top_objects(
431 inventory_data: dict[ str, __.typx.Any ],
432 results_max: int
433) -> list[ dict[ str, __.typx.Any ] ]:
434 ''' Selects top objects from inventory, sorted by fuzzy score. '''
435 all_objects: list[ dict[ str, __.typx.Any ] ] = [ ]
436 for domain_objects in inventory_data[ 'objects' ].values( ):
437 all_objects.extend( domain_objects )
438 all_objects.sort(
439 key = lambda obj: obj.get( 'fuzzy_score', 0 ),
440 reverse = True )
441 return all_objects[ : results_max ]
444def _validate_extraction_results(
445 results: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ],
446 requested_objects: __.cabc.Sequence[ __.cabc.Mapping[ str, __.typx.Any ] ],
447 processor_name: str,
448 source: str
449) -> None:
450 ''' Validates that extraction results contain meaningful content. '''
451 if not requested_objects: return
452 if not results:
453 raise _exceptions.StructureIncompatibility( processor_name, source )
454 meaningful_results = 0
455 for result in results:
456 signature = result.get( 'signature', '' ).strip( )
457 description = result.get( 'description', '' ).strip( )
458 if signature or description: meaningful_results += 1
459 success_rate = meaningful_results / len( requested_objects )
460 if success_rate < _SUCCESS_RATE_MINIMUM:
461 raise _exceptions.ContentExtractFailure(
462 processor_name, source, meaningful_results,
463 len( requested_objects ) )