Coverage for sources/librovore/functions.py: 13%
153 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 18:40 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 18:40 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import results as _results
30from . import search as _search
31from . import state as _state
35_SUCCESS_RATE_MINIMUM = 0.1
38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
39 str, __.ddoc.Fname( 'location argument' ) ]
42_search_behaviors_default = _interfaces.SearchBehaviors( )
43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
46FilterValidationResult: __.typx.TypeAlias = tuple[
47 tuple[ str, ... ], tuple[ str, ... ] ]
50def validate_filters(
51 filters: __.cabc.Mapping[ str, __.typx.Any ],
52 processor_capabilities: _interfaces.ProcessorCapabilities,
53) -> FilterValidationResult:
54 ''' Validates filters against processor capabilities.
56 Returns tuple of (filters_applied, filters_ignored) where
57 filters_applied contains filter names that are supported by the
58 processor and filters_ignored contains filter names that are not
59 supported.
60 '''
61 supported_filter_names = frozenset(
62 fc.name for fc in processor_capabilities.supported_filters )
63 filters_applied: list[ str ] = [ ]
64 filters_ignored: list[ str ] = [ ]
65 for filter_name in filters:
66 if filter_name in supported_filter_names:
67 filters_applied.append( filter_name )
68 else: filters_ignored.append( filter_name )
69 return tuple( filters_applied ), tuple( filters_ignored )
72async def detect(
73 auxdata: _state.Globals,
74 location: LocationArgument, /,
75 genus: _interfaces.ProcessorGenera,
76 processor_name: __.Absential[ str ] = __.absent,
77) -> _results.DetectionsResult:
78 ''' Detects relevant processors of particular genus for location. '''
79 location = _normalize_location( location )
80 start_time = __.time.perf_counter( )
81 detections, detection_optimal = (
82 await _detection.access_detections(
83 auxdata, location, genus = genus ) )
84 end_time = __.time.perf_counter( )
85 detection_time_ms = int( ( end_time - start_time ) * 1000 )
86 if __.is_absent( detection_optimal ):
87 genus_name = (
88 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
89 raise _exceptions.ProcessorInavailability(
90 location,
91 genus = genus_name )
92 # Convert detections mapping to tuple of results.Detection objects
93 detections_tuple = tuple(
94 _results.Detection(
95 processor_name = detection.processor.name,
96 confidence = detection.confidence,
97 processor_type = genus.value,
98 detection_metadata = __.immut.Dictionary( ),
99 )
100 for detection in detections.values( )
101 )
102 # Convert detection_optimal to results.Detection
103 detection_optimal_result = _results.Detection(
104 processor_name = detection_optimal.processor.name,
105 confidence = detection_optimal.confidence,
106 processor_type = genus.value,
107 detection_metadata = __.immut.Dictionary( ),
108 )
109 return _results.DetectionsResult(
110 source = location,
111 detections = detections_tuple,
112 detection_optimal = detection_optimal_result,
113 time_detection_ms = detection_time_ms )
116async def query_content( # noqa: PLR0913
117 auxdata: _state.Globals,
118 location: LocationArgument,
119 term: str, /, *,
120 processor_name: __.Absential[ str ] = __.absent,
121 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
122 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
123 content_id: __.Absential[ str ] = __.absent,
124 results_max: int = 10,
125 lines_max: __.typx.Optional[ int ] = None,
126) -> _results.ContentQueryResult:
127 ''' Searches documentation content with relevance ranking. '''
128 location = _normalize_location( location )
129 start_time = __.time.perf_counter( )
130 resolved_location = _detection.resolve_source_url( location )
131 idetection = await _detection.detect_inventory(
132 auxdata, location, processor_name = processor_name )
133 filters_applied, filters_ignored = validate_filters(
134 filters, idetection.processor.capabilities )
135 if filters_ignored:
136 locations = await _create_inventory_location_info(
137 auxdata, location, resolved_location, 0 )
138 end_time = __.time.perf_counter( )
139 search_time_ms = int( ( end_time - start_time ) * 1000 )
140 return _results.ContentQueryResult(
141 location = resolved_location,
142 term = term,
143 documents = tuple( ),
144 search_metadata = _results.SearchMetadata(
145 results_count = 0,
146 results_max = results_max,
147 search_time_ms = search_time_ms,
148 filters_applied = filters_applied,
149 filters_ignored = filters_ignored ),
150 inventory_locations = locations )
151 objects = await _collect_inventory_objects_multi_source(
152 auxdata, location, resolved_location, processor_name, filters )
153 if not __.is_absent( content_id ):
154 candidates = _process_content_id_filter(
155 content_id, resolved_location, objects )
156 else:
157 results = _search.filter_by_name(
158 objects, term, search_behaviors = search_behaviors )
159 candidates = [
160 result.inventory_object
161 for result in results[ : results_max * 3 ] ]
162 locations = await _create_inventory_location_info(
163 auxdata, location, resolved_location, len( objects ) )
164 if not candidates:
165 end_time = __.time.perf_counter( )
166 search_time_ms = int( ( end_time - start_time ) * 1000 )
167 return _results.ContentQueryResult(
168 location = resolved_location,
169 term = term,
170 documents = tuple( ),
171 search_metadata = _results.SearchMetadata(
172 results_count = 0,
173 results_max = results_max,
174 search_time_ms = search_time_ms,
175 filters_applied = filters_applied,
176 filters_ignored = filters_ignored ),
177 inventory_locations = locations )
178 sdetection = await _detection.detect_structure(
179 auxdata, resolved_location, processor_name = processor_name )
180 structure_capabilities = sdetection.get_capabilities( )
181 compatible_candidates = _filter_objects_by_structure_capabilities(
182 candidates[ : results_max ], structure_capabilities )
183 if not compatible_candidates:
184 end_time = __.time.perf_counter( )
185 search_time_ms = int( ( end_time - start_time ) * 1000 )
186 return _results.ContentQueryResult(
187 location = resolved_location,
188 term = term,
189 documents = ( ),
190 search_metadata = _results.SearchMetadata(
191 results_count = 0,
192 results_max = results_max,
193 search_time_ms = search_time_ms,
194 filters_applied = filters_applied,
195 filters_ignored = filters_ignored ),
196 inventory_locations = locations )
197 documents = await sdetection.extract_contents(
198 auxdata, resolved_location, compatible_candidates )
199 end_time = __.time.perf_counter( )
200 search_time_ms = int( ( end_time - start_time ) * 1000 )
201 return _results.ContentQueryResult(
202 location = resolved_location,
203 term = term,
204 documents = tuple( documents ),
205 search_metadata = _results.SearchMetadata(
206 results_count = len( documents ),
207 results_max = results_max,
208 matches_total = len( candidates ),
209 search_time_ms = search_time_ms,
210 filters_applied = filters_applied,
211 filters_ignored = filters_ignored ),
212 inventory_locations = locations )
215async def query_inventory( # noqa: PLR0913
216 auxdata: _state.Globals,
217 location: LocationArgument,
218 term: str, /, *,
219 processor_name: __.Absential[ str ] = __.absent,
220 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
221 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
222 results_max: int = 5,
223) -> _results.InventoryQueryResult:
224 ''' Searches object inventory by name.
226 Returns configurable detail levels. Always includes object names
227 plus requested detail flags (signatures, summaries, documentation).
228 '''
229 location = _normalize_location( location )
230 start_time = __.time.perf_counter( )
231 detection = await _detection.detect_inventory(
232 auxdata, location, processor_name = processor_name )
233 resolved_location = _detection.resolve_source_url( location )
234 filters_applied, filters_ignored = validate_filters(
235 filters, detection.processor.capabilities )
236 if filters_ignored:
237 end_time = __.time.perf_counter( )
238 search_time_ms = int( ( end_time - start_time ) * 1000 )
239 return _results.InventoryQueryResult(
240 location = resolved_location,
241 term = term,
242 objects = ( ),
243 search_metadata = _results.SearchMetadata(
244 results_count = 0,
245 results_max = results_max,
246 matches_total = 0,
247 search_time_ms = search_time_ms,
248 filters_applied = filters_applied,
249 filters_ignored = filters_ignored ),
250 inventory_locations = tuple( [
251 _results.InventoryLocationInfo(
252 inventory_type = detection.processor.name,
253 location_url = resolved_location,
254 processor_name = detection.processor.name,
255 confidence = detection.confidence,
256 object_count = 0 ) ] ) )
257 objects = await detection.filter_inventory(
258 auxdata, resolved_location, filters = filters )
259 results = _search.filter_by_name(
260 objects, term, search_behaviors = search_behaviors )
261 all_selections = [
262 result.inventory_object for result in results ]
263 end_time = __.time.perf_counter( )
264 search_time_ms = int( ( end_time - start_time ) * 1000 )
265 return _results.InventoryQueryResult(
266 location = resolved_location,
267 term = term,
268 objects = tuple( all_selections ),
269 search_metadata = _results.SearchMetadata(
270 results_count = len( all_selections ),
271 results_max = results_max,
272 matches_total = len( objects ),
273 search_time_ms = search_time_ms,
274 filters_applied = filters_applied,
275 filters_ignored = filters_ignored ),
276 inventory_locations = tuple( [
277 _results.InventoryLocationInfo(
278 inventory_type = detection.processor.name,
279 location_url = resolved_location,
280 processor_name = detection.processor.name,
281 confidence = detection.confidence,
282 object_count = len( objects ) ) ] ) )
286async def survey_processors(
287 auxdata: _state.Globals, /,
288 genus: _interfaces.ProcessorGenera,
289 name: __.typx.Optional[ str ] = None,
290) -> _results.ProcessorsSurveyResult:
291 ''' Lists processor capabilities for specified genus, filtered by name. '''
292 start_time = __.time.perf_counter( )
293 match genus:
294 case _interfaces.ProcessorGenera.Inventory:
295 processors = dict( _processors.inventory_processors )
296 case _interfaces.ProcessorGenera.Structure:
297 processors = dict( _processors.structure_processors )
298 if name is not None and name not in processors:
299 raise _exceptions.ProcessorInavailability(
300 name,
301 genus = genus.value )
302 processor_infos: list[ _results.ProcessorInfo ] = [ ]
303 for name_, processor in processors.items( ):
304 if name is None or name_ == name:
305 processor_info = _results.ProcessorInfo(
306 processor_name = name_,
307 processor_type = genus.value,
308 capabilities = processor.capabilities,
309 )
310 processor_infos.append( processor_info )
311 end_time = __.time.perf_counter( )
312 survey_time_ms = int( ( end_time - start_time ) * 1000 )
313 return _results.ProcessorsSurveyResult(
314 genus = genus,
315 filter_name = name,
316 processors = tuple( processor_infos ),
317 survey_time_ms = survey_time_ms,
318 )
321async def _collect_inventory_objects_multi_source(
322 auxdata: _state.Globals,
323 location: str,
324 resolved_location: str,
325 processor_name: __.Absential[ str ],
326 filters: __.cabc.Mapping[ str, __.typx.Any ],
327) -> tuple[ _results.InventoryObject, ... ]:
328 ''' Collects inventory objects using multi-source coordination.
330 Optimized to pre-filter inventory sources by structure processor
331 compatibility before making network requests.
332 '''
333 try:
334 inventory_detections = (
335 await _detection.collect_filter_inventories( auxdata, location ) )
336 except Exception:
337 idetection = await _detection.detect_inventory(
338 auxdata, location, processor_name = processor_name )
339 return await idetection.filter_inventory(
340 auxdata, resolved_location, filters = filters )
341 if not inventory_detections: return ( )
342 sdetection = await _detection.detect_structure(
343 auxdata, resolved_location, processor_name = processor_name )
344 structure_capabilities = sdetection.get_capabilities( )
345 compatible_detections = _filter_detections_by_structure_capabilities(
346 inventory_detections, structure_capabilities )
347 if not compatible_detections: return ( )
348 return await _merge_primary_supplementary(
349 auxdata, compatible_detections, location, filters = filters )
352async def _create_inventory_location_info(
353 auxdata: _state.Globals,
354 location: str,
355 resolved_location: str,
356 object_count: int,
357) -> tuple[ _results.InventoryLocationInfo, ... ]:
358 ''' Creates inventory location info for multi-source results. '''
359 try:
360 inventory_detections = (
361 await _detection.collect_filter_inventories(
362 auxdata, location ) )
363 except Exception:
364 idetection = await _detection.detect_inventory( auxdata, location )
365 return tuple( [ _results.InventoryLocationInfo(
366 inventory_type = idetection.processor.name,
367 location_url = resolved_location,
368 processor_name = idetection.processor.name,
369 confidence = idetection.confidence,
370 object_count = object_count ) ] )
371 if not inventory_detections:
372 return ( )
373 primary_detection = _select_primary_detection( inventory_detections )
374 return tuple( [ _results.InventoryLocationInfo(
375 inventory_type = primary_detection.processor.name,
376 location_url = resolved_location,
377 processor_name = primary_detection.processor.name,
378 confidence = primary_detection.confidence,
379 object_count = object_count ) ] )
382def _filter_detections_by_structure_capabilities(
383 inventory_detections: __.cabc.Mapping[
384 str, _processors.InventoryDetection ],
385 structure_capabilities: _interfaces.StructureProcessorCapabilities,
386) -> __.immut.Dictionary[ str, _processors.InventoryDetection ]:
387 ''' Filters inventory detections by structure processor capabilities.
389 Pre-filters inventory sources by compatibility before object collection
390 to avoid unnecessary network requests and processing overhead.
391 '''
392 compatible_detections = {
393 processor_name: detection
394 for processor_name, detection in inventory_detections.items( )
395 if structure_capabilities.supports_inventory_type(
396 detection.processor.name ) }
397 return __.immut.Dictionary( compatible_detections )
400def _filter_objects_by_structure_capabilities(
401 candidates: __.cabc.Sequence[ _results.InventoryObject ],
402 structure_capabilities: _interfaces.StructureProcessorCapabilities,
403) -> tuple[ _results.InventoryObject, ... ]:
404 ''' Filters inventory objects by structure processor capabilities. '''
405 compatible_objects = [
406 obj for obj in candidates
407 if structure_capabilities.supports_inventory_type(
408 obj.inventory_type ) ]
409 return tuple( compatible_objects )
412async def _merge_primary_supplementary(
413 auxdata: _state.Globals,
414 inventory_detections: __.cabc.Mapping[
415 str, _processors.InventoryDetection ],
416 location: str,
417 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
418) -> tuple[ _results.InventoryObject, ... ]:
419 ''' Merges inventory objects using PRIMARY_SUPPLEMENTARY strategy.
421 Uses highest-confidence detection as primary source, adds supplementary
422 objects from other qualified sources with preserved source attribution.
423 No deduplication - complementary metadata is valuable.
425 Note: inventory_detections should already be pre-filtered for
426 compatibility with the structure processor to avoid unnecessary
427 network requests.
428 '''
429 if not inventory_detections: return ( )
430 objects_aggregate: list[ _results.InventoryObject ] = [ ]
431 location_ = _detection.resolve_source_url( location )
432 for detection in inventory_detections.values( ):
433 objects = await detection.filter_inventory(
434 auxdata, location_, filters = filters )
435 objects_aggregate.extend( objects )
436 return tuple( objects_aggregate )
439def _normalize_location( location: str ) -> str:
440 ''' Normalizes location URL by stripping index.html. '''
441 if location.endswith( '/' ): return location[ : -1 ]
442 if location.endswith( '/index.html' ): return location[ : -11 ]
443 return location
446def _process_content_id_filter(
447 content_id: str,
448 location: str,
449 objects: __.cabc.Sequence[ _results.InventoryObject ],
450) -> tuple[ _results.InventoryObject, ... ]:
451 ''' Processes content ID for browse-then-extract workflow filtering. '''
452 try: location_, name = _results.parse_content_id( content_id )
453 except ValueError as exc:
454 raise _exceptions.ContentIdInvalidity(
455 content_id, f"Parsing failed: {exc}" ) from exc
456 if location_ != location:
457 raise _exceptions.ContentIdLocationMismatch( location_, location )
458 objects_ = [ obj for obj in objects if obj.name == name ]
459 if not objects_:
460 raise _exceptions.ContentIdObjectAbsence( name, location )
461 return tuple( objects_[ :1 ] )
464def _select_primary_detection(
465 inventory_detections: __.cabc.Mapping[
466 str, _processors.InventoryDetection ],
467) -> _processors.InventoryDetection:
468 ''' Selects primary detection with highest confidence. '''
469 detections_list = list( inventory_detections.values( ) )
470 detections_list.sort( key = lambda d: -d.confidence )
471 return detections_list[ 0 ]