Coverage for sources/librovore/detection.py: 61%
136 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-29 01:14 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-29 01:14 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Documentation source detection system for plugin architecture. '''
24from . import __
25from . import exceptions as _exceptions
26from . import interfaces as _interfaces
27from . import processors as _processors
28from . import state as _state
29from . import urlpatterns as _urlpatterns
30from . import urls as _urls
33CONFIDENCE_THRESHOLD_MINIMUM = 0.5
36class DetectionsCacheEntry( __.immut.DataclassObject ):
37 ''' Cache entry for source detection results. '''
39 detections: __.cabc.Mapping[ str, _processors.Detection ]
40 timestamp: float
41 ttl: int
43 @property
44 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]:
45 ''' Returns the detection with highest confidence. '''
46 if not self.detections: return __.absent
47 best_result = max(
48 self.detections.values( ),
49 key=lambda x: x.confidence )
50 return (
51 best_result
52 if best_result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
53 else __.absent )
55 def invalid( self, current_time: float ) -> bool:
56 ''' Checks if cache entry has expired. '''
57 return current_time - self.timestamp > self.ttl
60class DetectionsCache( __.immut.DataclassObject ):
61 ''' Cache for source detection results with TTL support. '''
63 ttl: int = 3600
64 _entries: dict[ str, DetectionsCacheEntry ] = (
65 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) )
67 def access_detections(
68 self, source: str
69 ) -> __.Absential[ _processors.DetectionsByProcessor ]:
70 ''' Returns all detections for source, if unexpired. '''
71 if source not in self._entries: return __.absent
72 cache_entry = self._entries[ source ]
73 current_time = __.time.time( )
74 if cache_entry.invalid( current_time ):
75 del self._entries[ source ]
76 return __.absent
77 return cache_entry.detections
79 def access_detection_optimal(
80 self, source: str
81 ) -> __.Absential[ _processors.Detection ]:
82 ''' Returns the best detection for source, if unexpired. '''
83 if source not in self._entries: return __.absent
84 cache_entry = self._entries[ source ]
85 current_time = __.time.time( )
86 if cache_entry.invalid( current_time ):
87 del self._entries[ source ]
88 return __.absent
89 return cache_entry.detection_optimal
91 def add_entry(
92 self, source: str, detections: _processors.DetectionsByProcessor
93 ) -> __.typx.Self:
94 ''' Adds or updates cache entry with fresh results. '''
95 self._entries[ source ] = DetectionsCacheEntry(
96 detections = detections,
97 timestamp = __.time.time( ),
98 ttl = self.ttl,
99 )
100 return self
102 def clear( self ) -> __.typx.Self:
103 ''' Clears all cached entries. '''
104 self._entries.clear( )
105 return self
109_inventory_detections_cache = DetectionsCache( )
110_structure_detections_cache = DetectionsCache( )
112_url_redirects_cache: dict[ str, str ] = { }
115def resolve_source_url( url: str ) -> str:
116 ''' Resolves source URL through redirect cache, returns working URL. '''
117 return _url_redirects_cache.get( url, url )
120async def access_detections(
121 auxdata: _state.Globals,
122 source: str, /, *,
123 genus: _interfaces.ProcessorGenera
124) -> tuple[
125 _processors.DetectionsByProcessor,
126 __.Absential[ _processors.Detection ]
127]:
128 ''' Accesses detections via appropriate cache. '''
129 resolved_source = _url_redirects_cache.get( source, source )
130 match genus:
131 case _interfaces.ProcessorGenera.Inventory:
132 cache = _inventory_detections_cache
133 processors = _processors.inventory_processors
134 case _interfaces.ProcessorGenera.Structure:
135 cache = _structure_detections_cache
136 processors = _processors.structure_processors
137 return await access_detections_ll(
138 auxdata, resolved_source, cache = cache, processors = processors )
141async def access_detections_ll(
142 auxdata: _state.Globals,
143 source: str, /, *,
144 cache: DetectionsCache,
145 processors: __.cabc.Mapping[ str, _processors.Processor ],
146) -> tuple[
147 _processors.DetectionsByProcessor,
148 __.Absential[ _processors.Detection ]
149]:
150 ''' Accesses detections via appropriate cache.
152 Detections are performed to fill cache, if necessary.
154 Low-level function accepting arbitrary cache and processors list.
155 '''
156 detections = cache.access_detections( source )
157 if __.is_absent( detections ):
158 await _execute_processors_and_cache(
159 auxdata, source, cache, processors )
160 detections = cache.access_detections( source )
161 if __.is_absent( detections ):
162 detections = __.immut.Dictionary[
163 str, _processors.Detection ]( )
164 detection_optimal = cache.access_detection_optimal( source )
165 return detections, detection_optimal
168async def detect(
169 auxdata: _state.Globals,
170 source: str, /,
171 genus: _interfaces.ProcessorGenera, *,
172 processor_name: __.Absential[ str ] = __.absent,
173) -> _processors.Detection:
174 ''' Detects processors for source through cache system. '''
175 resolved_source = _url_redirects_cache.get( source, source )
176 match genus:
177 case _interfaces.ProcessorGenera.Inventory:
178 cache = _inventory_detections_cache
179 class_name = 'inventory'
180 processors = _processors.inventory_processors
181 case _interfaces.ProcessorGenera.Structure:
182 cache = _structure_detections_cache
183 class_name = 'structure'
184 processors = _processors.structure_processors
185 if not __.is_absent( processor_name ):
186 if processor_name not in processors:
187 raise _exceptions.ProcessorInavailability( processor_name )
188 processor = processors[ processor_name ]
189 return await processor.detect( auxdata, resolved_source )
190 detection = await determine_detection_optimal_ll(
191 auxdata, resolved_source, cache = cache, processors = processors )
192 if __.is_absent( detection ):
193 raise _exceptions.ProcessorInavailability( class_name )
194 return detection
197async def detect_inventory(
198 auxdata: _state.Globals,
199 source: str, /, *,
200 processor_name: __.Absential[ str ] = __.absent,
201) -> _processors.InventoryDetection:
202 ''' Detects inventory processors for source through cache system. '''
203 detection = await detect(
204 auxdata, source,
205 genus = _interfaces.ProcessorGenera.Inventory,
206 processor_name = processor_name )
207 return __.typx.cast( _processors.InventoryDetection, detection )
210async def detect_structure(
211 auxdata: _state.Globals,
212 source: str, /, *,
213 processor_name: __.Absential[ str ] = __.absent,
214) -> _processors.StructureDetection:
215 ''' Detects structure processors for source through cache system. '''
216 detection = await detect(
217 auxdata, source,
218 genus = _interfaces.ProcessorGenera.Structure,
219 processor_name = processor_name )
220 return __.typx.cast( _processors.StructureDetection, detection )
223async def determine_detection_optimal_ll(
224 auxdata: _state.Globals,
225 source: str, /, *,
226 cache: DetectionsCache,
227 processors: __.cabc.Mapping[ str, _processors.Processor ],
228) -> __.Absential[ _processors.Detection ]:
229 ''' Determines which processor can best handle the source.
231 Low-level function accepting arbitrary cache and processors list.
232 '''
233 detection = cache.access_detection_optimal( source )
234 if not __.is_absent( detection ): return detection
235 detections = await _execute_processors_with_patterns(
236 auxdata, source, processors )
237 cache.add_entry( source, detections )
238 return _select_detection_optimal( detections, processors )
241async def _execute_processors(
242 auxdata: _state.Globals,
243 source: str,
244 processors: __.cabc.Mapping[ str, _processors.Processor ],
245) -> dict[ str, _processors.Detection ]:
246 ''' Runs all processors on the source. '''
247 results: dict[ str, _processors.Detection ] = { }
248 # TODO: Parallel async fanout.
249 for processor in processors.values( ):
250 try: detection = await processor.detect( auxdata, source )
251 except Exception: # noqa: PERF203,S112
252 # Skip processor on detection failure
253 continue
254 else: results[ processor.name ] = detection
255 return results
258async def _execute_processors_with_patterns(
259 auxdata: _state.Globals,
260 source: str,
261 processors: __.cabc.Mapping[ str, _processors.Processor ],
262) -> dict[ str, _processors.Detection ]:
263 ''' Runs processors with URL pattern extension fallback. '''
264 results = await _execute_processors( auxdata, source, processors )
265 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
266 for detection in results.values( ) ):
267 return results
268 base_url = _urls.normalize_base_url( source )
269 working_url = await _urlpatterns.probe_url_patterns(
270 auxdata, base_url, '/objects.inv' )
271 if not __.is_absent( working_url ): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 working_source = working_url.geturl( )
273 pattern_results = await _execute_processors(
274 auxdata, working_source, processors )
275 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
276 for detection in pattern_results.values( ) ):
277 _url_redirects_cache[ source ] = working_source
278 return pattern_results
279 return results
282async def _execute_processors_and_cache(
283 auxdata: _state.Globals,
284 source: str,
285 cache: DetectionsCache,
286 processors: __.cabc.Mapping[ str, _processors.Processor ],
287) -> None:
288 ''' Executes processors with URL pattern extension and caches. '''
289 detections = await _execute_processors_with_patterns(
290 auxdata, source, processors )
291 cache.add_entry( source, detections )
294def _select_detection_optimal(
295 detections: _processors.DetectionsByProcessor,
296 processors: __.cabc.Mapping[ str, _processors.Processor ]
297) -> __.Absential[ _processors.Detection ]:
298 ''' Selects best processor based on confidence and registration order. '''
299 if not detections: return __.absent
300 detections_ = [
301 result for result in detections.values( )
302 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ]
303 if not detections_: return __.absent
304 processor_names = list( processors.keys( ) )
305 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]:
306 confidence = result.confidence
307 processor_name = result.processor.name
308 registration_order = processor_names.index( processor_name )
309 return ( -confidence, registration_order )
310 detections_.sort( key = sort_key )
311 return detections_[ 0 ]