Coverage for sources/librovore/functions.py: 16%
87 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-06 02:25 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-06 02:25 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import results as _results
30from . import search as _search
31from . import state as _state
35_SUCCESS_RATE_MINIMUM = 0.1
38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
39 str, __.ddoc.Fname( 'location argument' ) ]
42_search_behaviors_default = _interfaces.SearchBehaviors( )
43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
46async def detect(
47 auxdata: _state.Globals,
48 location: LocationArgument, /,
49 genus: _interfaces.ProcessorGenera,
50 processor_name: __.Absential[ str ] = __.absent,
51) -> _results.DetectionsResult:
52 ''' Detects relevant processors of particular genus for location. '''
53 location = _normalize_location( location )
54 start_time = __.time.perf_counter( )
55 detections, detection_optimal = (
56 await _detection.access_detections(
57 auxdata, location, genus = genus ) )
58 end_time = __.time.perf_counter( )
59 detection_time_ms = int( ( end_time - start_time ) * 1000 )
60 if __.is_absent( detection_optimal ):
61 genus_name = (
62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
63 raise _exceptions.ProcessorInavailability(
64 location,
65 genus = genus_name )
66 # Convert detections mapping to tuple of results.Detection objects
67 detections_tuple = tuple(
68 _results.Detection(
69 processor_name = detection.processor.name,
70 confidence = detection.confidence,
71 processor_type = genus.value,
72 detection_metadata = __.immut.Dictionary( ),
73 )
74 for detection in detections.values( )
75 )
76 # Convert detection_optimal to results.Detection
77 detection_optimal_result = _results.Detection(
78 processor_name = detection_optimal.processor.name,
79 confidence = detection_optimal.confidence,
80 processor_type = genus.value,
81 detection_metadata = __.immut.Dictionary( ),
82 )
83 return _results.DetectionsResult(
84 source = location,
85 detections = detections_tuple,
86 detection_optimal = detection_optimal_result,
87 time_detection_ms = detection_time_ms )
90async def query_content( # noqa: PLR0913
91 auxdata: _state.Globals,
92 location: LocationArgument,
93 term: str, /, *,
94 processor_name: __.Absential[ str ] = __.absent,
95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
97 content_id: __.Absential[ str ] = __.absent,
98 results_max: int = 10,
99 lines_max: __.typx.Optional[ int ] = None,
100) -> _results.ContentQueryResult:
101 ''' Searches documentation content with relevance ranking. '''
102 location = _normalize_location( location )
103 start_time = __.time.perf_counter( )
104 idetection = await _detection.detect_inventory(
105 auxdata, location, processor_name = processor_name )
106 # Resolve URL after detection to get working URL if redirect exists
107 resolved_location = _detection.resolve_source_url( location )
108 objects = await idetection.filter_inventory(
109 auxdata, resolved_location,
110 filters = filters )
111 if not __.is_absent( content_id ):
112 candidates = _process_content_id_filter(
113 content_id, resolved_location, objects )
114 else:
115 results = _search.filter_by_name(
116 objects, term, search_behaviors = search_behaviors )
117 candidates = [
118 result.inventory_object
119 for result in results[ : results_max * 3 ] ]
120 locations = tuple( [ _results.InventoryLocationInfo(
121 inventory_type = idetection.processor.name,
122 location_url = resolved_location,
123 processor_name = idetection.processor.name,
124 confidence = idetection.confidence,
125 object_count = len( objects ) ) ] )
126 if not candidates:
127 end_time = __.time.perf_counter( )
128 search_time_ms = int( ( end_time - start_time ) * 1000 )
129 return _results.ContentQueryResult(
130 location = resolved_location,
131 term = term,
132 documents = tuple( ),
133 search_metadata = _results.SearchMetadata(
134 results_count = 0,
135 results_max = results_max,
136 search_time_ms = search_time_ms ),
137 inventory_locations = locations )
138 sdetection = await _detection.detect_structure(
139 auxdata, resolved_location, processor_name = processor_name )
140 documents = await sdetection.extract_contents(
141 auxdata, resolved_location, candidates[ : results_max ] )
142 end_time = __.time.perf_counter( )
143 search_time_ms = int( ( end_time - start_time ) * 1000 )
144 return _results.ContentQueryResult(
145 location = resolved_location,
146 term = term,
147 documents = tuple( documents ),
148 search_metadata = _results.SearchMetadata(
149 results_count = len( documents ),
150 results_max = results_max,
151 matches_total = len( candidates ),
152 search_time_ms = search_time_ms ),
153 inventory_locations = locations )
156async def query_inventory( # noqa: PLR0913
157 auxdata: _state.Globals,
158 location: LocationArgument,
159 term: str, /, *,
160 processor_name: __.Absential[ str ] = __.absent,
161 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
162 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
163 results_max: int = 5,
164) -> _results.InventoryQueryResult:
165 ''' Searches object inventory by name.
167 Returns configurable detail levels. Always includes object names
168 plus requested detail flags (signatures, summaries, documentation).
169 '''
170 location = _normalize_location( location )
171 start_time = __.time.perf_counter( )
172 detection = await _detection.detect_inventory(
173 auxdata, location, processor_name = processor_name )
174 # Resolve URL after detection to get working URL if redirect exists
175 resolved_location = _detection.resolve_source_url( location )
176 objects = await detection.filter_inventory(
177 auxdata, resolved_location, filters = filters )
178 results = _search.filter_by_name(
179 objects, term, search_behaviors = search_behaviors )
180 selections = [
181 result.inventory_object for result in results[ : results_max ] ]
182 end_time = __.time.perf_counter( )
183 search_time_ms = int( ( end_time - start_time ) * 1000 )
184 return _results.InventoryQueryResult(
185 location = resolved_location,
186 term = term,
187 objects = tuple( selections ),
188 search_metadata = _results.SearchMetadata(
189 results_count = len( selections ),
190 results_max = results_max,
191 matches_total = len( objects ),
192 search_time_ms = search_time_ms ),
193 inventory_locations = tuple( [
194 _results.InventoryLocationInfo(
195 inventory_type = detection.processor.name,
196 location_url = resolved_location,
197 processor_name = detection.processor.name,
198 confidence = detection.confidence,
199 object_count = len( objects ) ) ] ) )
203async def survey_processors(
204 auxdata: _state.Globals, /,
205 genus: _interfaces.ProcessorGenera,
206 name: __.typx.Optional[ str ] = None,
207) -> _results.ProcessorsSurveyResult:
208 ''' Lists processor capabilities for specified genus, filtered by name. '''
209 start_time = __.time.perf_counter( )
210 match genus:
211 case _interfaces.ProcessorGenera.Inventory:
212 processors = dict( _processors.inventory_processors )
213 case _interfaces.ProcessorGenera.Structure:
214 processors = dict( _processors.structure_processors )
215 if name is not None and name not in processors:
216 raise _exceptions.ProcessorInavailability(
217 name,
218 genus = genus.value )
219 processor_infos: list[ _results.ProcessorInfo ] = [ ]
220 for name_, processor in processors.items( ):
221 if name is None or name_ == name:
222 processor_info = _results.ProcessorInfo(
223 processor_name = name_,
224 processor_type = genus.value,
225 capabilities = processor.capabilities,
226 )
227 processor_infos.append( processor_info )
228 end_time = __.time.perf_counter( )
229 survey_time_ms = int( ( end_time - start_time ) * 1000 )
230 return _results.ProcessorsSurveyResult(
231 genus = genus,
232 filter_name = name,
233 processors = tuple( processor_infos ),
234 survey_time_ms = survey_time_ms,
235 )
239def _normalize_location( location: str ) -> str:
240 ''' Normalizes location URL by stripping index.html. '''
241 if location.endswith( '/' ): return location[ : -1 ]
242 if location.endswith( '/index.html' ): return location[ : -11 ]
243 return location
246def _process_content_id_filter(
247 content_id: str,
248 resolved_location: str,
249 objects: __.cabc.Sequence[ _results.InventoryObject ],
250) -> tuple[ _results.InventoryObject, ... ]:
251 ''' Processes content ID for browse-then-extract workflow filtering. '''
252 try:
253 parsed_location, name = _results.parse_content_id( content_id )
254 except ValueError as exc:
255 raise _exceptions.ContentIdInvalidity(
256 content_id, f"Parsing failed: {exc}" ) from exc
257 if parsed_location != resolved_location:
258 raise _exceptions.ContentIdLocationMismatch(
259 parsed_location, resolved_location )
260 matching_objects = [
261 obj for obj in objects if obj.name == name ]
262 if not matching_objects:
263 raise _exceptions.ContentIdObjectAbsence(
264 name, resolved_location )
265 return tuple( matching_objects[ :1 ] )