Coverage for sources/librovore/functions.py: 15%
92 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-02 00:02 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-02 00:02 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import results as _results
30from . import search as _search
31from . import state as _state
34_SUCCESS_RATE_MINIMUM = 0.1
37LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
38 str, __.ddoc.Fname( 'location argument' ) ]
41_search_behaviors_default = _interfaces.SearchBehaviors( )
42_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
45async def detect(
46 auxdata: _state.Globals,
47 location: LocationArgument, /,
48 genus: _interfaces.ProcessorGenera,
49 processor_name: __.Absential[ str ] = __.absent,
50) -> _results.DetectionsResult:
51 ''' Detects relevant processors of particular genus for location. '''
52 location = _normalize_location( location )
53 start_time = __.time.perf_counter( )
54 detections, detection_optimal = (
55 await _detection.access_detections(
56 auxdata, location, genus = genus ) )
57 end_time = __.time.perf_counter( )
58 detection_time_ms = int( ( end_time - start_time ) * 1000 )
59 if __.is_absent( detection_optimal ):
60 genus_name = (
61 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
62 raise _exceptions.ProcessorInavailability(
63 location,
64 genus = genus_name )
65 # Convert detections mapping to tuple of results.Detection objects
66 detections_tuple = tuple(
67 _results.Detection(
68 processor_name = detection.processor.name,
69 confidence = detection.confidence,
70 processor_type = genus.value,
71 detection_metadata = __.immut.Dictionary( ),
72 )
73 for detection in detections.values( )
74 )
75 # Convert detection_optimal to results.Detection
76 detection_optimal_result = _results.Detection(
77 processor_name = detection_optimal.processor.name,
78 confidence = detection_optimal.confidence,
79 processor_type = genus.value,
80 detection_metadata = __.immut.Dictionary( ),
81 )
82 return _results.DetectionsResult(
83 source = location,
84 detections = detections_tuple,
85 detection_optimal = detection_optimal_result,
86 time_detection_ms = detection_time_ms )
89async def query_content( # noqa: PLR0913
90 auxdata: _state.Globals,
91 location: LocationArgument,
92 term: str, /, *,
93 processor_name: __.Absential[ str ] = __.absent,
94 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
95 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
96 results_max: int = 10,
97 lines_max: __.typx.Optional[ int ] = None,
98) -> _results.ContentQueryResult:
99 ''' Searches documentation content with relevance ranking. '''
100 location = _normalize_location( location )
101 start_time = __.time.perf_counter( )
102 idetection = await _detection.detect_inventory(
103 auxdata, location, processor_name = processor_name )
104 # Resolve URL after detection to get working URL if redirect exists
105 resolved_location = _detection.resolve_source_url( location )
106 objects = await idetection.filter_inventory(
107 auxdata, resolved_location,
108 filters = filters,
109 details = _interfaces.InventoryQueryDetails.Name )
110 results = _search.filter_by_name(
111 objects, term,
112 match_mode = search_behaviors.match_mode,
113 fuzzy_threshold = search_behaviors.fuzzy_threshold )
114 candidates = [
115 result.inventory_object for result in results[ : results_max * 3 ] ]
116 locations = tuple( [ _results.InventoryLocationInfo(
117 inventory_type = idetection.processor.name,
118 location_url = resolved_location,
119 processor_name = idetection.processor.name,
120 confidence = idetection.confidence,
121 object_count = len( objects ) ) ] )
122 if not candidates:
123 end_time = __.time.perf_counter( )
124 search_time_ms = int( ( end_time - start_time ) * 1000 )
125 return _results.ContentQueryResult(
126 location = resolved_location,
127 query = term,
128 documents = tuple( ),
129 search_metadata = _results.SearchMetadata(
130 results_count = 0,
131 results_max = results_max,
132 search_time_ms = search_time_ms ),
133 inventory_locations = locations )
134 sdetection = await _detection.detect_structure(
135 auxdata, resolved_location, processor_name = processor_name )
136 documents = await sdetection.extract_contents(
137 auxdata, resolved_location, candidates[ : results_max ] )
138 end_time = __.time.perf_counter( )
139 search_time_ms = int( ( end_time - start_time ) * 1000 )
140 return _results.ContentQueryResult(
141 location = resolved_location,
142 query = term,
143 documents = tuple( documents ),
144 search_metadata = _results.SearchMetadata(
145 results_count = len( documents ),
146 results_max = results_max,
147 matches_total = len( candidates ),
148 search_time_ms = search_time_ms ),
149 inventory_locations = locations )
152async def query_inventory( # noqa: PLR0913
153 auxdata: _state.Globals,
154 location: LocationArgument,
155 term: str, /, *,
156 processor_name: __.Absential[ str ] = __.absent,
157 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
158 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
159 details: _interfaces.InventoryQueryDetails = (
160 _interfaces.InventoryQueryDetails.Name ),
161 results_max: int = 5,
162) -> _results.InventoryQueryResult:
163 ''' Searches object inventory by name.
165 Returns configurable detail levels. Always includes object names
166 plus requested detail flags (signatures, summaries, documentation).
167 '''
168 location = _normalize_location( location )
169 start_time = __.time.perf_counter( )
170 detection = await _detection.detect_inventory(
171 auxdata, location, processor_name = processor_name )
172 # Resolve URL after detection to get working URL if redirect exists
173 resolved_location = _detection.resolve_source_url( location )
174 objects = await detection.filter_inventory(
175 auxdata, resolved_location, filters = filters, details = details )
176 results = _search.filter_by_name(
177 objects, term,
178 match_mode = search_behaviors.match_mode,
179 fuzzy_threshold = search_behaviors.fuzzy_threshold )
180 selections = [
181 result.inventory_object for result in results[ : results_max ] ]
182 end_time = __.time.perf_counter( )
183 search_time_ms = int( ( end_time - start_time ) * 1000 )
184 return _results.InventoryQueryResult(
185 location = resolved_location,
186 query = term,
187 objects = tuple( selections ),
188 search_metadata = _results.SearchMetadata(
189 results_count = len( selections ),
190 results_max = results_max,
191 matches_total = len( objects ),
192 search_time_ms = search_time_ms ),
193 inventory_locations = tuple( [
194 _results.InventoryLocationInfo(
195 inventory_type = detection.processor.name,
196 location_url = resolved_location,
197 processor_name = detection.processor.name,
198 confidence = detection.confidence,
199 object_count = len( objects ) ) ] ) )
203async def survey_processors(
204 auxdata: _state.Globals, /,
205 genus: _interfaces.ProcessorGenera,
206 name: __.typx.Optional[ str ] = None,
207) -> _results.ProcessorsSurveyResult:
208 ''' Lists processor capabilities for specified genus, filtered by name. '''
209 start_time = __.time.perf_counter( )
210 match genus:
211 case _interfaces.ProcessorGenera.Inventory:
212 processors = dict( _processors.inventory_processors )
213 case _interfaces.ProcessorGenera.Structure:
214 processors = dict( _processors.structure_processors )
215 if name is not None and name not in processors:
216 raise _exceptions.ProcessorInavailability(
217 name,
218 genus = genus.value )
219 processor_infos: list[ _results.ProcessorInfo ] = [ ]
220 for name_, processor in processors.items( ):
221 if name is None or name_ == name:
222 processor_info = _results.ProcessorInfo(
223 processor_name = name_,
224 processor_type = genus.value,
225 capabilities = processor.capabilities,
226 )
227 processor_infos.append( processor_info )
228 end_time = __.time.perf_counter( )
229 survey_time_ms = int( ( end_time - start_time ) * 1000 )
230 return _results.ProcessorsSurveyResult(
231 genus = genus,
232 filter_name = name,
233 processors = tuple( processor_infos ),
234 survey_time_ms = survey_time_ms,
235 )
239def _normalize_location( location: str ) -> str:
240 ''' Normalizes location URL by stripping index.html. '''
241 if location.endswith( '/' ): return location[ : -1 ]
242 if location.endswith( '/index.html' ): return location[ : -11 ]
243 return location
247def _serialize_for_json( obj: __.typx.Any ) -> __.typx.Any:
248 ''' Recursively serializes dataclass objects to JSON-compatible format. '''
249 # TODO: Remove type suppressions.
250 if __.dcls.is_dataclass( obj ):
251 result = { } # type: ignore[var-annotated]
252 for field in __.dcls.fields( obj ):
253 if field.name.startswith( '_' ):
254 continue
255 value = getattr( obj, field.name )
256 result[ field.name ] = _serialize_for_json( value )
257 return result # type: ignore[return-value]
258 if isinstance( obj, ( list, tuple ) ):
259 return [ _serialize_for_json( item ) for item in obj ] # type: ignore[misc]
260 if isinstance( obj, ( frozenset, set ) ):
261 return list( obj ) # type: ignore[arg-type]
262 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary)
263 return { k: _serialize_for_json( v ) for k, v in obj.items( ) }
264 if obj is None or isinstance( obj, ( str, int, float, bool ) ):
265 return obj
266 return str( obj )