Coverage for sources/librovore/functions.py: 13%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 21:59 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 21:59 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import results as _results
30from . import search as _search
31from . import state as _state
35_SUCCESS_RATE_MINIMUM = 0.1
38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
39 str, __.ddoc.Fname( 'location argument' ) ]
42_search_behaviors_default = _interfaces.SearchBehaviors( )
43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
46async def detect(
47 auxdata: _state.Globals,
48 location: LocationArgument, /,
49 genus: _interfaces.ProcessorGenera,
50 processor_name: __.Absential[ str ] = __.absent,
51) -> _results.DetectionsResult:
52 ''' Detects relevant processors of particular genus for location. '''
53 location = _normalize_location( location )
54 start_time = __.time.perf_counter( )
55 detections, detection_optimal = (
56 await _detection.access_detections(
57 auxdata, location, genus = genus ) )
58 end_time = __.time.perf_counter( )
59 detection_time_ms = int( ( end_time - start_time ) * 1000 )
60 if __.is_absent( detection_optimal ):
61 genus_name = (
62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
63 raise _exceptions.ProcessorInavailability(
64 location,
65 genus = genus_name )
66 # Convert detections mapping to tuple of results.Detection objects
67 detections_tuple = tuple(
68 _results.Detection(
69 processor_name = detection.processor.name,
70 confidence = detection.confidence,
71 processor_type = genus.value,
72 detection_metadata = __.immut.Dictionary( ),
73 )
74 for detection in detections.values( )
75 )
76 # Convert detection_optimal to results.Detection
77 detection_optimal_result = _results.Detection(
78 processor_name = detection_optimal.processor.name,
79 confidence = detection_optimal.confidence,
80 processor_type = genus.value,
81 detection_metadata = __.immut.Dictionary( ),
82 )
83 return _results.DetectionsResult(
84 source = location,
85 detections = detections_tuple,
86 detection_optimal = detection_optimal_result,
87 time_detection_ms = detection_time_ms )
90async def query_content( # noqa: PLR0913
91 auxdata: _state.Globals,
92 location: LocationArgument,
93 term: str, /, *,
94 processor_name: __.Absential[ str ] = __.absent,
95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
97 content_id: __.Absential[ str ] = __.absent,
98 results_max: int = 10,
99 lines_max: __.typx.Optional[ int ] = None,
100) -> _results.ContentQueryResult:
101 ''' Searches documentation content with relevance ranking. '''
102 location = _normalize_location( location )
103 start_time = __.time.perf_counter( )
104 idetection = await _detection.detect_inventory(
105 auxdata, location, processor_name = processor_name )
106 # Resolve URL after detection to get working URL if redirect exists
107 resolved_location = _detection.resolve_source_url( location )
108 objects = await idetection.filter_inventory(
109 auxdata, resolved_location,
110 filters = filters,
111 details = _interfaces.InventoryQueryDetails.Name )
112 if not __.is_absent( content_id ):
113 candidates = _process_content_id_filter(
114 content_id, resolved_location, objects )
115 else:
116 results = _search.filter_by_name(
117 objects, term,
118 match_mode = search_behaviors.match_mode,
119 fuzzy_threshold = search_behaviors.fuzzy_threshold )
120 candidates = [
121 result.inventory_object
122 for result in results[ : results_max * 3 ] ]
123 locations = tuple( [ _results.InventoryLocationInfo(
124 inventory_type = idetection.processor.name,
125 location_url = resolved_location,
126 processor_name = idetection.processor.name,
127 confidence = idetection.confidence,
128 object_count = len( objects ) ) ] )
129 if not candidates:
130 end_time = __.time.perf_counter( )
131 search_time_ms = int( ( end_time - start_time ) * 1000 )
132 return _results.ContentQueryResult(
133 location = resolved_location,
134 query = term,
135 documents = tuple( ),
136 search_metadata = _results.SearchMetadata(
137 results_count = 0,
138 results_max = results_max,
139 search_time_ms = search_time_ms ),
140 inventory_locations = locations )
141 sdetection = await _detection.detect_structure(
142 auxdata, resolved_location, processor_name = processor_name )
143 documents = await sdetection.extract_contents(
144 auxdata, resolved_location, candidates[ : results_max ] )
145 end_time = __.time.perf_counter( )
146 search_time_ms = int( ( end_time - start_time ) * 1000 )
147 return _results.ContentQueryResult(
148 location = resolved_location,
149 query = term,
150 documents = tuple( documents ),
151 search_metadata = _results.SearchMetadata(
152 results_count = len( documents ),
153 results_max = results_max,
154 matches_total = len( candidates ),
155 search_time_ms = search_time_ms ),
156 inventory_locations = locations )
159async def query_inventory( # noqa: PLR0913
160 auxdata: _state.Globals,
161 location: LocationArgument,
162 term: str, /, *,
163 processor_name: __.Absential[ str ] = __.absent,
164 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
165 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
166 details: _interfaces.InventoryQueryDetails = (
167 _interfaces.InventoryQueryDetails.Name ),
168 results_max: int = 5,
169) -> _results.InventoryQueryResult:
170 ''' Searches object inventory by name.
172 Returns configurable detail levels. Always includes object names
173 plus requested detail flags (signatures, summaries, documentation).
174 '''
175 location = _normalize_location( location )
176 start_time = __.time.perf_counter( )
177 detection = await _detection.detect_inventory(
178 auxdata, location, processor_name = processor_name )
179 # Resolve URL after detection to get working URL if redirect exists
180 resolved_location = _detection.resolve_source_url( location )
181 objects = await detection.filter_inventory(
182 auxdata, resolved_location, filters = filters, details = details )
183 results = _search.filter_by_name(
184 objects, term,
185 match_mode = search_behaviors.match_mode,
186 fuzzy_threshold = search_behaviors.fuzzy_threshold )
187 selections = [
188 result.inventory_object for result in results[ : results_max ] ]
189 end_time = __.time.perf_counter( )
190 search_time_ms = int( ( end_time - start_time ) * 1000 )
191 return _results.InventoryQueryResult(
192 location = resolved_location,
193 query = term,
194 objects = tuple( selections ),
195 search_metadata = _results.SearchMetadata(
196 results_count = len( selections ),
197 results_max = results_max,
198 matches_total = len( objects ),
199 search_time_ms = search_time_ms ),
200 inventory_locations = tuple( [
201 _results.InventoryLocationInfo(
202 inventory_type = detection.processor.name,
203 location_url = resolved_location,
204 processor_name = detection.processor.name,
205 confidence = detection.confidence,
206 object_count = len( objects ) ) ] ) )
210async def survey_processors(
211 auxdata: _state.Globals, /,
212 genus: _interfaces.ProcessorGenera,
213 name: __.typx.Optional[ str ] = None,
214) -> _results.ProcessorsSurveyResult:
215 ''' Lists processor capabilities for specified genus, filtered by name. '''
216 start_time = __.time.perf_counter( )
217 match genus:
218 case _interfaces.ProcessorGenera.Inventory:
219 processors = dict( _processors.inventory_processors )
220 case _interfaces.ProcessorGenera.Structure:
221 processors = dict( _processors.structure_processors )
222 if name is not None and name not in processors:
223 raise _exceptions.ProcessorInavailability(
224 name,
225 genus = genus.value )
226 processor_infos: list[ _results.ProcessorInfo ] = [ ]
227 for name_, processor in processors.items( ):
228 if name is None or name_ == name:
229 processor_info = _results.ProcessorInfo(
230 processor_name = name_,
231 processor_type = genus.value,
232 capabilities = processor.capabilities,
233 )
234 processor_infos.append( processor_info )
235 end_time = __.time.perf_counter( )
236 survey_time_ms = int( ( end_time - start_time ) * 1000 )
237 return _results.ProcessorsSurveyResult(
238 genus = genus,
239 filter_name = name,
240 processors = tuple( processor_infos ),
241 survey_time_ms = survey_time_ms,
242 )
246def _normalize_location( location: str ) -> str:
247 ''' Normalizes location URL by stripping index.html. '''
248 if location.endswith( '/' ): return location[ : -1 ]
249 if location.endswith( '/index.html' ): return location[ : -11 ]
250 return location
253def _process_content_id_filter(
254 content_id: str,
255 resolved_location: str,
256 objects: __.cabc.Sequence[ _results.InventoryObject ],
257) -> tuple[ _results.InventoryObject, ... ]:
258 ''' Processes content ID for browse-then-extract workflow filtering. '''
259 try:
260 parsed_location, name = _results.parse_content_id( content_id )
261 except ValueError as exc:
262 raise _exceptions.ContentIdInvalidity(
263 content_id, f"Parsing failed: {exc}" ) from exc
264 if parsed_location != resolved_location:
265 raise _exceptions.ContentIdLocationMismatch(
266 parsed_location, resolved_location )
267 matching_objects = [
268 obj for obj in objects if obj.name == name ]
269 if not matching_objects:
270 raise _exceptions.ContentIdObjectAbsence(
271 name, resolved_location )
272 return tuple( matching_objects[ :1 ] )
275def _serialize_for_json( obj: __.typx.Any ) -> __.typx.Any:
276 ''' Recursively serializes dataclass objects to JSON-compatible format. '''
277 # TODO: Remove type suppressions.
278 if __.dcls.is_dataclass( obj ):
279 result = { } # type: ignore[var-annotated]
280 for field in __.dcls.fields( obj ):
281 if field.name.startswith( '_' ):
282 continue
283 value = getattr( obj, field.name )
284 result[ field.name ] = _serialize_for_json( value )
285 return result # type: ignore[return-value]
286 if isinstance( obj, ( list, tuple ) ):
287 return [ _serialize_for_json( item ) for item in obj ] # type: ignore[misc]
288 if isinstance( obj, ( frozenset, set ) ):
289 return list( obj ) # type: ignore[arg-type]
290 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary)
291 return { k: _serialize_for_json( v ) for k, v in obj.items( ) }
292 if obj is None or isinstance( obj, ( str, int, float, bool ) ):
293 return obj
294 return str( obj )