Coverage for sources/librovore/functions.py: 15%
114 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-29 01:14 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-29 01:14 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core business logic shared between CLI and MCP server. '''
24from . import __
25from . import detection as _detection
26from . import exceptions as _exceptions
27from . import interfaces as _interfaces
28from . import processors as _processors
29from . import results as _results
30from . import search as _search
31from . import state as _state
34_SUCCESS_RATE_MINIMUM = 0.1
37LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
38 str, __.ddoc.Fname( 'location argument' ) ]
41_search_behaviors_default = _interfaces.SearchBehaviors( )
42_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( )
45async def detect(
46 auxdata: _state.Globals,
47 location: LocationArgument, /,
48 genus: _interfaces.ProcessorGenera,
49 processor_name: __.Absential[ str ] = __.absent,
50) -> _results.DetectionsResult | _results.ErrorResponse:
51 ''' Detects relevant processors of particular genus for location. '''
52 location = _normalize_location( location )
53 start_time = __.time.perf_counter( )
54 detections, detection_optimal = (
55 await _detection.access_detections(
56 auxdata, location, genus = genus ) )
57 end_time = __.time.perf_counter( )
58 detection_time_ms = int( ( end_time - start_time ) * 1000 )
59 if __.is_absent( detection_optimal ):
60 # Create a synthetic exception to get proper error formatting
61 genus_name = (
62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) )
63 exc = _exceptions.ProcessorInavailability( genus_name )
64 return _produce_processor_error_response(
65 exc, location, 'detection', genus = genus )
66 # Convert detections mapping to tuple of results.Detection objects
67 detections_tuple = tuple(
68 _results.Detection(
69 processor_name = detection.processor.name,
70 confidence = detection.confidence,
71 processor_type = genus.value,
72 detection_metadata = __.immut.Dictionary( ),
73 )
74 for detection in detections.values( )
75 )
76 # Convert detection_optimal to results.Detection
77 detection_optimal_result = _results.Detection(
78 processor_name = detection_optimal.processor.name,
79 confidence = detection_optimal.confidence,
80 processor_type = genus.value,
81 detection_metadata = __.immut.Dictionary( ),
82 )
83 return _results.DetectionsResult(
84 source = location,
85 detections = detections_tuple,
86 detection_optimal = detection_optimal_result,
87 time_detection_ms = detection_time_ms )
90async def query_content( # noqa: PLR0913
91 auxdata: _state.Globals,
92 location: LocationArgument,
93 term: str, /, *,
94 processor_name: __.Absential[ str ] = __.absent,
95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
97 include_snippets: bool = True,
98 results_max: int = 10,
99) -> _results.ContentResult:
100 ''' Searches documentation content with relevance ranking. '''
101 location = _normalize_location( location )
102 start_time = __.time.perf_counter( )
103 try:
104 idetection = await _detection.detect_inventory(
105 auxdata, location, processor_name = processor_name )
106 except _exceptions.ProcessorInavailability as exc:
107 return _produce_processor_error_response(
108 exc, location, term,
109 genus = _interfaces.ProcessorGenera.Inventory )
110 # Resolve URL after detection to get working URL if redirect exists
111 resolved_location = _detection.resolve_source_url( location )
112 objects = await idetection.filter_inventory(
113 auxdata, resolved_location,
114 filters = filters,
115 details = _interfaces.InventoryQueryDetails.Name )
116 results = _search.filter_by_name(
117 objects, term,
118 match_mode = search_behaviors.match_mode,
119 fuzzy_threshold = search_behaviors.fuzzy_threshold )
120 candidates = [
121 result.inventory_object for result in results[ : results_max * 3 ] ]
122 locations = tuple( [ _results.InventoryLocationInfo(
123 inventory_type = idetection.processor.name,
124 location_url = resolved_location,
125 processor_name = idetection.processor.name,
126 confidence = idetection.confidence,
127 object_count = len( objects ) ) ] )
128 if not candidates:
129 end_time = __.time.perf_counter( )
130 search_time_ms = int( ( end_time - start_time ) * 1000 )
131 return _results.ContentQueryResult(
132 location = resolved_location,
133 query = term,
134 documents = tuple( ),
135 search_metadata = _results.SearchMetadata(
136 results_count = 0,
137 results_max = results_max,
138 search_time_ms = search_time_ms ),
139 inventory_locations = locations )
140 sdetection = await _detection.detect_structure(
141 auxdata, resolved_location, processor_name = processor_name )
142 documents = await sdetection.extract_contents(
143 auxdata, resolved_location, candidates[ : results_max ],
144 include_snippets = include_snippets )
145 end_time = __.time.perf_counter( )
146 search_time_ms = int( ( end_time - start_time ) * 1000 )
147 return _results.ContentQueryResult(
148 location = resolved_location,
149 query = term,
150 documents = tuple( documents ),
151 search_metadata = _results.SearchMetadata(
152 results_count = len( documents ),
153 results_max = results_max,
154 matches_total = len( candidates ),
155 search_time_ms = search_time_ms ),
156 inventory_locations = locations )
159async def query_inventory( # noqa: PLR0913
160 auxdata: _state.Globals,
161 location: LocationArgument,
162 term: str, /, *,
163 processor_name: __.Absential[ str ] = __.absent,
164 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default,
165 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default,
166 details: _interfaces.InventoryQueryDetails = (
167 _interfaces.InventoryQueryDetails.Documentation ),
168 results_max: int = 5,
169) -> _results.InventoryResult:
170 ''' Searches object inventory by name.
172 Returns configurable detail levels. Always includes object names
173 plus requested detail flags (signatures, summaries, documentation).
174 '''
175 location = _normalize_location( location )
176 start_time = __.time.perf_counter( )
177 try:
178 detection = await _detection.detect_inventory(
179 auxdata, location, processor_name = processor_name )
180 except _exceptions.ProcessorInavailability as exc:
181 return _produce_processor_error_response(
182 exc, location, term,
183 genus = _interfaces.ProcessorGenera.Inventory )
184 # Resolve URL after detection to get working URL if redirect exists
185 resolved_location = _detection.resolve_source_url( location )
186 objects = await detection.filter_inventory(
187 auxdata, resolved_location, filters = filters, details = details )
188 results = _search.filter_by_name(
189 objects, term,
190 match_mode = search_behaviors.match_mode,
191 fuzzy_threshold = search_behaviors.fuzzy_threshold )
192 selections = [
193 result.inventory_object for result in results[ : results_max ] ]
194 end_time = __.time.perf_counter( )
195 search_time_ms = int( ( end_time - start_time ) * 1000 )
196 return _results.InventoryQueryResult(
197 location = resolved_location,
198 query = term,
199 objects = tuple( selections ),
200 search_metadata = _results.SearchMetadata(
201 results_count = len( selections ),
202 results_max = results_max,
203 matches_total = len( objects ),
204 search_time_ms = search_time_ms ),
205 inventory_locations = tuple( [
206 _results.InventoryLocationInfo(
207 inventory_type = detection.processor.name,
208 location_url = resolved_location,
209 processor_name = detection.processor.name,
210 confidence = detection.confidence,
211 object_count = len( objects ) ) ] ) )
215async def survey_processors(
216 auxdata: _state.Globals, /,
217 genus: _interfaces.ProcessorGenera,
218 name: __.typx.Optional[ str ] = None,
219) -> _results.ProcessorsSurveyResultUnion:
220 ''' Lists processor capabilities for specified genus, filtered by name. '''
221 start_time = __.time.perf_counter( )
222 match genus:
223 case _interfaces.ProcessorGenera.Inventory:
224 processors = dict( _processors.inventory_processors )
225 case _interfaces.ProcessorGenera.Structure:
226 processors = dict( _processors.structure_processors )
227 if name is not None and name not in processors:
228 exc = _exceptions.ProcessorInavailability( name )
229 return _produce_processor_error_response(
230 exc, '', name or '', genus = genus )
231 processor_infos: list[ _results.ProcessorInfo ] = [ ]
232 for name_, processor in processors.items( ):
233 if name is None or name_ == name:
234 processor_info = _results.ProcessorInfo(
235 processor_name = name_,
236 processor_type = genus.value,
237 capabilities = processor.capabilities,
238 )
239 processor_infos.append( processor_info )
240 end_time = __.time.perf_counter( )
241 survey_time_ms = int( ( end_time - start_time ) * 1000 )
242 return _results.ProcessorsSurveyResult(
243 genus = genus,
244 filter_name = name,
245 processors = tuple( processor_infos ),
246 survey_time_ms = survey_time_ms,
247 )
251def _normalize_location( location: str ) -> str:
252 ''' Normalizes location URL by stripping index.html. '''
253 if location.endswith( '/' ): return location[ : -1 ]
254 if location.endswith( '/index.html' ): return location[ : -11 ]
255 return location
258def _produce_generic_error_response(
259 exc: _exceptions.ProcessorInavailability,
260 location: str,
261 query: str,
262) -> _results.ErrorResponse:
263 ''' Produces structured error response for generic processor failures. '''
264 return _results.ErrorResponse(
265 location = location,
266 query = query,
267 error = _results.ErrorInfo(
268 type = 'processor_unavailable',
269 title = 'No Compatible Processor Found',
270 message = (
271 'No compatible processor found to handle this '
272 'documentation source.' ),
273 suggestion = (
274 'Verify the URL points to a supported documentation format.' )
275 ) )
278def _produce_inventory_error_response(
279 exc: _exceptions.ProcessorInavailability,
280 location: str,
281 query: str,
282) -> _results.ErrorResponse:
283 ''' Produces structured error response for inventory failures. '''
284 return _results.ErrorResponse(
285 location = location,
286 query = query,
287 error = _results.ErrorInfo(
288 type = 'processor_unavailable',
289 title = 'No Compatible Format Detected',
290 message = (
291 'No compatible inventory format detected at this '
292 'documentation source.' ),
293 suggestion = (
294 'Verify the URL points to a supported documentation site.' ) )
295 )
298def _produce_processor_error_response(
299 exc: _exceptions.ProcessorInavailability,
300 location: str,
301 query: str,
302 genus: __.Absential[ _interfaces.ProcessorGenera ] = __.absent,
303) -> _results.ErrorResponse:
304 ''' Produces appropriate structured error response based on genus. '''
305 match genus:
306 case _interfaces.ProcessorGenera.Inventory:
307 return _produce_inventory_error_response( exc, location, query )
308 case _interfaces.ProcessorGenera.Structure:
309 return _produce_structure_error_response( exc, location, query )
310 case _:
311 return _produce_generic_error_response( exc, location, query )
314def _produce_structure_error_response(
315 exc: _exceptions.ProcessorInavailability,
316 location: str,
317 query: str,
318) -> _results.ErrorResponse:
319 ''' Produces structured error response for structure failures. '''
320 return _results.ErrorResponse(
321 location = location,
322 query = query,
323 error = _results.ErrorInfo(
324 type = 'processor_unavailable',
325 title = 'No Compatible Structure Processor',
326 message = (
327 'No compatible structure processor found for this '
328 'documentation source.' ),
329 suggestion = (
330 'Ensure the site uses a supported documentation format '
331 'like Sphinx or MkDocs.' ) ) )
334def _serialize_for_json( obj: __.typx.Any ) -> __.typx.Any:
335 ''' Recursively serializes dataclass objects to JSON-compatible format. '''
336 # TODO: Remove type suppressions.
337 if __.dcls.is_dataclass( obj ):
338 result = { } # type: ignore[var-annotated]
339 for field in __.dcls.fields( obj ):
340 if field.name.startswith( '_' ):
341 continue
342 value = getattr( obj, field.name )
343 result[ field.name ] = _serialize_for_json( value )
344 return result # type: ignore[return-value]
345 if isinstance( obj, ( list, tuple ) ):
346 return [ _serialize_for_json( item ) for item in obj ] # type: ignore[misc]
347 if isinstance( obj, ( frozenset, set ) ):
348 return list( obj ) # type: ignore[arg-type]
349 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary)
350 return { k: _serialize_for_json( v ) for k, v in obj.items( ) }
351 if obj is None or isinstance( obj, ( str, int, float, bool ) ):
352 return obj
353 return str( obj )