Coverage for sources/librovore/functions.py: 14%
131 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 22:09 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 22:09 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import results as _results
30from . import search as _search
31from . import state as _state
35_SUCCESS_RATE_MINIMUM = 0.1
38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
39 str, __.ddoc.Fname( 'location argument' ) ]
42_search_behaviors_default = _interfaces.SearchBehaviors( )
43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
46async def detect(
47 auxdata: _state.Globals,
48 location: LocationArgument, /,
49 genus: _interfaces.ProcessorGenera,
50 processor_name: __.Absential[ str ] = __.absent,
51) -> _results.DetectionsResult:
52 ''' Detects relevant processors of particular genus for location. '''
53 location = _normalize_location( location )
54 start_time = __.time.perf_counter( )
55 detections, detection_optimal = (
56 await _detection.access_detections(
57 auxdata, location, genus = genus ) )
58 end_time = __.time.perf_counter( )
59 detection_time_ms = int( ( end_time - start_time ) * 1000 )
60 if __.is_absent( detection_optimal ):
61 genus_name = (
62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
63 raise _exceptions.ProcessorInavailability(
64 location,
65 genus = genus_name )
66 # Convert detections mapping to tuple of results.Detection objects
67 detections_tuple = tuple(
68 _results.Detection(
69 processor_name = detection.processor.name,
70 confidence = detection.confidence,
71 processor_type = genus.value,
72 detection_metadata = __.immut.Dictionary( ),
73 )
74 for detection in detections.values( )
75 )
76 # Convert detection_optimal to results.Detection
77 detection_optimal_result = _results.Detection(
78 processor_name = detection_optimal.processor.name,
79 confidence = detection_optimal.confidence,
80 processor_type = genus.value,
81 detection_metadata = __.immut.Dictionary( ),
82 )
83 return _results.DetectionsResult(
84 source = location,
85 detections = detections_tuple,
86 detection_optimal = detection_optimal_result,
87 time_detection_ms = detection_time_ms )
90async def query_content( # noqa: PLR0913
91 auxdata: _state.Globals,
92 location: LocationArgument,
93 term: str, /, *,
94 processor_name: __.Absential[ str ] = __.absent,
95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
97 content_id: __.Absential[ str ] = __.absent,
98 results_max: int = 10,
99 lines_max: __.typx.Optional[ int ] = None,
100) -> _results.ContentQueryResult:
101 ''' Searches documentation content with relevance ranking. '''
102 location = _normalize_location( location )
103 start_time = __.time.perf_counter( )
104 resolved_location = _detection.resolve_source_url( location )
105 objects = await _collect_inventory_objects_multi_source(
106 auxdata, location, resolved_location, processor_name, filters )
107 if not __.is_absent( content_id ):
108 candidates = _process_content_id_filter(
109 content_id, resolved_location, objects )
110 else:
111 results = _search.filter_by_name(
112 objects, term, search_behaviors = search_behaviors )
113 candidates = [
114 result.inventory_object
115 for result in results[ : results_max * 3 ] ]
116 locations = await _create_inventory_location_info(
117 auxdata, location, resolved_location, len( objects ) )
118 if not candidates:
119 end_time = __.time.perf_counter( )
120 search_time_ms = int( ( end_time - start_time ) * 1000 )
121 return _results.ContentQueryResult(
122 location = resolved_location,
123 term = term,
124 documents = tuple( ),
125 search_metadata = _results.SearchMetadata(
126 results_count = 0,
127 results_max = results_max,
128 search_time_ms = search_time_ms ),
129 inventory_locations = locations )
130 sdetection = await _detection.detect_structure(
131 auxdata, resolved_location, processor_name = processor_name )
132 structure_capabilities = sdetection.get_capabilities( )
133 compatible_candidates = _filter_objects_by_structure_capabilities(
134 candidates[ : results_max ], structure_capabilities )
135 if not compatible_candidates:
136 end_time = __.time.perf_counter( )
137 search_time_ms = int( ( end_time - start_time ) * 1000 )
138 return _results.ContentQueryResult(
139 location = resolved_location,
140 term = term,
141 documents = ( ),
142 search_metadata = _results.SearchMetadata(
143 results_count = 0,
144 results_max = results_max,
145 search_time_ms = search_time_ms ),
146 inventory_locations = locations )
147 documents = await sdetection.extract_contents(
148 auxdata, resolved_location, compatible_candidates )
149 end_time = __.time.perf_counter( )
150 search_time_ms = int( ( end_time - start_time ) * 1000 )
151 return _results.ContentQueryResult(
152 location = resolved_location,
153 term = term,
154 documents = tuple( documents ),
155 search_metadata = _results.SearchMetadata(
156 results_count = len( documents ),
157 results_max = results_max,
158 matches_total = len( candidates ),
159 search_time_ms = search_time_ms ),
160 inventory_locations = locations )
163async def query_inventory( # noqa: PLR0913
164 auxdata: _state.Globals,
165 location: LocationArgument,
166 term: str, /, *,
167 processor_name: __.Absential[ str ] = __.absent,
168 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
169 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
170 results_max: int = 5,
171) -> _results.InventoryQueryResult:
172 ''' Searches object inventory by name.
174 Returns configurable detail levels. Always includes object names
175 plus requested detail flags (signatures, summaries, documentation).
176 '''
177 location = _normalize_location( location )
178 start_time = __.time.perf_counter( )
179 detection = await _detection.detect_inventory(
180 auxdata, location, processor_name = processor_name )
181 # Resolve URL after detection to get working URL if redirect exists
182 resolved_location = _detection.resolve_source_url( location )
183 objects = await detection.filter_inventory(
184 auxdata, resolved_location, filters = filters )
185 results = _search.filter_by_name(
186 objects, term, search_behaviors = search_behaviors )
187 selections = [
188 result.inventory_object for result in results[ : results_max ] ]
189 end_time = __.time.perf_counter( )
190 search_time_ms = int( ( end_time - start_time ) * 1000 )
191 return _results.InventoryQueryResult(
192 location = resolved_location,
193 term = term,
194 objects = tuple( selections ),
195 search_metadata = _results.SearchMetadata(
196 results_count = len( selections ),
197 results_max = results_max,
198 matches_total = len( objects ),
199 search_time_ms = search_time_ms ),
200 inventory_locations = tuple( [
201 _results.InventoryLocationInfo(
202 inventory_type = detection.processor.name,
203 location_url = resolved_location,
204 processor_name = detection.processor.name,
205 confidence = detection.confidence,
206 object_count = len( objects ) ) ] ) )
210async def survey_processors(
211 auxdata: _state.Globals, /,
212 genus: _interfaces.ProcessorGenera,
213 name: __.typx.Optional[ str ] = None,
214) -> _results.ProcessorsSurveyResult:
215 ''' Lists processor capabilities for specified genus, filtered by name. '''
216 start_time = __.time.perf_counter( )
217 match genus:
218 case _interfaces.ProcessorGenera.Inventory:
219 processors = dict( _processors.inventory_processors )
220 case _interfaces.ProcessorGenera.Structure:
221 processors = dict( _processors.structure_processors )
222 if name is not None and name not in processors:
223 raise _exceptions.ProcessorInavailability(
224 name,
225 genus = genus.value )
226 processor_infos: list[ _results.ProcessorInfo ] = [ ]
227 for name_, processor in processors.items( ):
228 if name is None or name_ == name:
229 processor_info = _results.ProcessorInfo(
230 processor_name = name_,
231 processor_type = genus.value,
232 capabilities = processor.capabilities,
233 )
234 processor_infos.append( processor_info )
235 end_time = __.time.perf_counter( )
236 survey_time_ms = int( ( end_time - start_time ) * 1000 )
237 return _results.ProcessorsSurveyResult(
238 genus = genus,
239 filter_name = name,
240 processors = tuple( processor_infos ),
241 survey_time_ms = survey_time_ms,
242 )
245async def _collect_inventory_objects_multi_source(
246 auxdata: _state.Globals,
247 location: str,
248 resolved_location: str,
249 processor_name: __.Absential[ str ],
250 filters: __.cabc.Mapping[ str, __.typx.Any ],
251) -> tuple[ _results.InventoryObject, ... ]:
252 ''' Collects inventory objects using multi-source coordination.
254 Optimized to pre-filter inventory sources by structure processor
255 compatibility before making network requests.
256 '''
257 try:
258 inventory_detections = (
259 await _detection.collect_filter_inventories( auxdata, location ) )
260 except Exception:
261 idetection = await _detection.detect_inventory(
262 auxdata, location, processor_name = processor_name )
263 return await idetection.filter_inventory(
264 auxdata, resolved_location, filters = filters )
265 if not inventory_detections: return ( )
266 sdetection = await _detection.detect_structure(
267 auxdata, resolved_location, processor_name = processor_name )
268 structure_capabilities = sdetection.get_capabilities( )
269 compatible_detections = _filter_detections_by_structure_capabilities(
270 inventory_detections, structure_capabilities )
271 if not compatible_detections: return ( )
272 return await _merge_primary_supplementary(
273 auxdata, compatible_detections, location, filters = filters )
276async def _create_inventory_location_info(
277 auxdata: _state.Globals,
278 location: str,
279 resolved_location: str,
280 object_count: int,
281) -> tuple[ _results.InventoryLocationInfo, ... ]:
282 ''' Creates inventory location info for multi-source results. '''
283 try:
284 inventory_detections = (
285 await _detection.collect_filter_inventories(
286 auxdata, location ) )
287 except Exception:
288 idetection = await _detection.detect_inventory( auxdata, location )
289 return tuple( [ _results.InventoryLocationInfo(
290 inventory_type = idetection.processor.name,
291 location_url = resolved_location,
292 processor_name = idetection.processor.name,
293 confidence = idetection.confidence,
294 object_count = object_count ) ] )
295 if not inventory_detections:
296 return ( )
297 primary_detection = _select_primary_detection( inventory_detections )
298 return tuple( [ _results.InventoryLocationInfo(
299 inventory_type = primary_detection.processor.name,
300 location_url = resolved_location,
301 processor_name = primary_detection.processor.name,
302 confidence = primary_detection.confidence,
303 object_count = object_count ) ] )
306def _filter_detections_by_structure_capabilities(
307 inventory_detections: __.cabc.Mapping[
308 str, _processors.InventoryDetection ],
309 structure_capabilities: _interfaces.StructureProcessorCapabilities,
310) -> __.immut.Dictionary[ str, _processors.InventoryDetection ]:
311 ''' Filters inventory detections by structure processor capabilities.
313 Pre-filters inventory sources by compatibility before object collection
314 to avoid unnecessary network requests and processing overhead.
315 '''
316 compatible_detections = {
317 processor_name: detection
318 for processor_name, detection in inventory_detections.items( )
319 if structure_capabilities.supports_inventory_type(
320 detection.processor.name ) }
321 return __.immut.Dictionary( compatible_detections )
324def _filter_objects_by_structure_capabilities(
325 candidates: __.cabc.Sequence[ _results.InventoryObject ],
326 structure_capabilities: _interfaces.StructureProcessorCapabilities,
327) -> tuple[ _results.InventoryObject, ... ]:
328 ''' Filters inventory objects by structure processor capabilities. '''
329 compatible_objects = [
330 obj for obj in candidates
331 if structure_capabilities.supports_inventory_type(
332 obj.inventory_type ) ]
333 return tuple( compatible_objects )
336async def _merge_primary_supplementary(
337 auxdata: _state.Globals,
338 inventory_detections: __.cabc.Mapping[
339 str, _processors.InventoryDetection ],
340 location: str,
341 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
342) -> tuple[ _results.InventoryObject, ... ]:
343 ''' Merges inventory objects using PRIMARY_SUPPLEMENTARY strategy.
345 Uses highest-confidence detection as primary source, adds supplementary
346 objects from other qualified sources with preserved source attribution.
347 No deduplication - complementary metadata is valuable.
349 Note: inventory_detections should already be pre-filtered for
350 compatibility with the structure processor to avoid unnecessary
351 network requests.
352 '''
353 if not inventory_detections: return ( )
354 objects_aggregate: list[ _results.InventoryObject ] = [ ]
355 location_ = _detection.resolve_source_url( location )
356 for detection in inventory_detections.values( ):
357 objects = await detection.filter_inventory(
358 auxdata, location_, filters = filters )
359 objects_aggregate.extend( objects )
360 return tuple( objects_aggregate )
363def _normalize_location( location: str ) -> str:
364 ''' Normalizes location URL by stripping index.html. '''
365 if location.endswith( '/' ): return location[ : -1 ]
366 if location.endswith( '/index.html' ): return location[ : -11 ]
367 return location
370def _process_content_id_filter(
371 content_id: str,
372 location: str,
373 objects: __.cabc.Sequence[ _results.InventoryObject ],
374) -> tuple[ _results.InventoryObject, ... ]:
375 ''' Processes content ID for browse-then-extract workflow filtering. '''
376 try: location_, name = _results.parse_content_id( content_id )
377 except ValueError as exc:
378 raise _exceptions.ContentIdInvalidity(
379 content_id, f"Parsing failed: {exc}" ) from exc
380 if location_ != location:
381 raise _exceptions.ContentIdLocationMismatch( location_, location )
382 objects_ = [ obj for obj in objects if obj.name == name ]
383 if not objects_:
384 raise _exceptions.ContentIdObjectAbsence( name, location )
385 return tuple( objects_[ :1 ] )
388def _select_primary_detection(
389 inventory_detections: __.cabc.Mapping[
390 str, _processors.InventoryDetection ],
391) -> _processors.InventoryDetection:
392 ''' Selects primary detection with highest confidence. '''
393 detections_list = list( inventory_detections.values( ) )
394 detections_list.sort( key = lambda d: -d.confidence )
395 return detections_list[ 0 ]