Coverage for sources/librovore/detection.py: 60%
149 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 22:48 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 22:48 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Documentation source detection system for plugin architecture. '''
24from . import __
25from . import exceptions as _exceptions
26from . import interfaces as _interfaces
27from . import processors as _processors
28from . import state as _state
29from . import urlpatterns as _urlpatterns
30from . import urls as _urls
33CONFIDENCE_THRESHOLD_MINIMUM = 0.5
36class DetectionsCacheEntry( __.immut.DataclassObject ):
37 ''' Cache entry for source detection results. '''
39 detections: __.cabc.Mapping[ str, _processors.Detection ]
40 timestamp: float
41 ttl: int
43 @property
44 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]:
45 ''' Returns the detection with highest confidence. '''
46 if not self.detections: return __.absent
47 best_result = max(
48 self.detections.values( ),
49 key=lambda x: x.confidence )
50 return (
51 best_result
52 if best_result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
53 else __.absent )
55 def invalid( self, current_time: float ) -> bool:
56 ''' Checks if cache entry has expired. '''
57 return current_time - self.timestamp > self.ttl
60class DetectionsCache( __.immut.DataclassObject ):
61 ''' Cache for source detection results with TTL support. '''
63 ttl: int = 3600
64 _entries: dict[ str, DetectionsCacheEntry ] = (
65 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) )
67 def access_detections(
68 self, source: str
69 ) -> __.Absential[ _processors.DetectionsByProcessor ]:
70 ''' Returns all detections for source, if unexpired. '''
71 if source not in self._entries: return __.absent
72 cache_entry = self._entries[ source ]
73 current_time = __.time.time( )
74 if cache_entry.invalid( current_time ):
75 del self._entries[ source ]
76 return __.absent
77 return cache_entry.detections
79 def access_detection_optimal(
80 self, source: str
81 ) -> __.Absential[ _processors.Detection ]:
82 ''' Returns the best detection for source, if unexpired. '''
83 if source not in self._entries: return __.absent
84 cache_entry = self._entries[ source ]
85 current_time = __.time.time( )
86 if cache_entry.invalid( current_time ):
87 del self._entries[ source ]
88 return __.absent
89 return cache_entry.detection_optimal
91 def add_entry(
92 self, source: str, detections: _processors.DetectionsByProcessor
93 ) -> __.typx.Self:
94 ''' Adds or updates cache entry with fresh results. '''
95 self._entries[ source ] = DetectionsCacheEntry(
96 detections = detections,
97 timestamp = __.time.time( ),
98 ttl = self.ttl,
99 )
100 return self
102 def clear( self ) -> __.typx.Self:
103 ''' Clears all cached entries. '''
104 self._entries.clear( )
105 return self
107 def remove_entry(
108 self, source: str
109 ) -> __.Absential[ _processors.DetectionsByProcessor ]:
110 ''' Removes specific source from cache, if present. '''
111 entry = self._entries.pop( source, None )
112 if entry: return entry.detections
113 return __.absent
117_inventory_detections_cache = DetectionsCache( )
118_structure_detections_cache = DetectionsCache( )
120# Universal URL redirects cache: original_url → working_url
121_url_redirects_cache: dict[ str, str ] = { }
124def resolve_source_url( url: str ) -> str:
125 ''' Resolves source URL through redirect cache, returns working URL. '''
126 return _url_redirects_cache.get( url, url )
129async def access_detections(
130 auxdata: _state.Globals,
131 source: str, /, *,
132 genus: _interfaces.ProcessorGenera
133) -> tuple[
134 _processors.DetectionsByProcessor,
135 __.Absential[ _processors.Detection ]
136]:
137 ''' Accesses detections via appropriate cache. '''
138 resolved_source = _url_redirects_cache.get( source, source )
139 match genus:
140 case _interfaces.ProcessorGenera.Inventory:
141 cache = _inventory_detections_cache
142 processors = _processors.inventory_processors
143 case _interfaces.ProcessorGenera.Structure:
144 cache = _structure_detections_cache
145 processors = _processors.structure_processors
146 return await access_detections_ll(
147 auxdata, resolved_source, cache = cache, processors = processors )
150async def access_detections_ll(
151 auxdata: _state.Globals,
152 source: str, /, *,
153 cache: DetectionsCache,
154 processors: __.cabc.Mapping[ str, _processors.Processor ],
155) -> tuple[
156 _processors.DetectionsByProcessor,
157 __.Absential[ _processors.Detection ]
158]:
159 ''' Accesses detections via appropriate cache.
161 Detections are performed to fill cache, if necessary.
163 Low-level function accepting arbitrary cache and processors list.
164 '''
165 detections = cache.access_detections( source )
166 if __.is_absent( detections ):
167 await _execute_processors_with_patterns_and_cache(
168 auxdata, source, cache, processors )
169 detections = cache.access_detections( source )
170 if __.is_absent( detections ):
171 detections = __.immut.Dictionary[
172 str, _processors.Detection ]( )
173 detection_optimal = cache.access_detection_optimal( source )
174 return detections, detection_optimal
177async def detect(
178 auxdata: _state.Globals,
179 source: str, /,
180 genus: _interfaces.ProcessorGenera, *,
181 processor_name: __.Absential[ str ] = __.absent,
182) -> _processors.Detection:
183 ''' Detects processors for source through cache system. '''
184 resolved_source = _url_redirects_cache.get( source, source )
185 match genus:
186 case _interfaces.ProcessorGenera.Inventory:
187 cache = _inventory_detections_cache
188 class_name = 'inventory'
189 processors = _processors.inventory_processors
190 case _interfaces.ProcessorGenera.Structure:
191 cache = _structure_detections_cache
192 class_name = 'structure'
193 processors = _processors.structure_processors
194 if not __.is_absent( processor_name ):
195 if processor_name not in processors:
196 raise _exceptions.ProcessorInavailability( processor_name )
197 processor = processors[ processor_name ]
198 return await processor.detect( auxdata, resolved_source )
199 detection = await determine_detection_optimal_ll(
200 auxdata, resolved_source, cache = cache, processors = processors )
201 if __.is_absent( detection ):
202 raise _exceptions.ProcessorInavailability( class_name )
203 return detection
206async def detect_inventory(
207 auxdata: _state.Globals,
208 source: str, /, *,
209 processor_name: __.Absential[ str ] = __.absent,
210) -> _processors.InventoryDetection:
211 ''' Detects inventory processors for source through cache system. '''
212 detection = await detect(
213 auxdata, source,
214 genus = _interfaces.ProcessorGenera.Inventory,
215 processor_name = processor_name )
216 return __.typx.cast( _processors.InventoryDetection, detection )
219async def detect_structure(
220 auxdata: _state.Globals,
221 source: str, /, *,
222 processor_name: __.Absential[ str ] = __.absent,
223) -> _processors.StructureDetection:
224 ''' Detects structure processors for source through cache system. '''
225 detection = await detect(
226 auxdata, source,
227 genus = _interfaces.ProcessorGenera.Structure,
228 processor_name = processor_name )
229 return __.typx.cast( _processors.StructureDetection, detection )
232async def determine_detection_optimal_ll(
233 auxdata: _state.Globals,
234 source: str, /, *,
235 cache: DetectionsCache,
236 processors: __.cabc.Mapping[ str, _processors.Processor ],
237) -> __.Absential[ _processors.Detection ]:
238 ''' Determines which processor can best handle the source.
240 Low-level function accepting arbitrary cache and processors list.
241 '''
242 detection = cache.access_detection_optimal( source )
243 if not __.is_absent( detection ): return detection
244 detections = await _execute_processors_with_patterns(
245 auxdata, source, processors )
246 cache.add_entry( source, detections )
247 return _select_detection_optimal( detections, processors )
250async def _execute_processors(
251 auxdata: _state.Globals,
252 source: str,
253 processors: __.cabc.Mapping[ str, _processors.Processor ],
254) -> dict[ str, _processors.Detection ]:
255 ''' Runs all processors on the source. '''
256 results: dict[ str, _processors.Detection ] = { }
257 # TODO: Parallel async fanout.
258 for processor in processors.values( ):
259 try: detection = await processor.detect( auxdata, source )
260 except Exception: # noqa: PERF203,S112
261 # Skip processor on detection failure
262 continue
263 else: results[ processor.name ] = detection
264 return results
267async def _execute_processors_with_patterns(
268 auxdata: _state.Globals,
269 source: str,
270 processors: __.cabc.Mapping[ str, _processors.Processor ],
271) -> dict[ str, _processors.Detection ]:
272 ''' Runs processors with URL pattern extension fallback. '''
273 results = await _execute_processors( auxdata, source, processors )
274 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
275 for detection in results.values( ) ):
276 return results
277 base_url = _urls.normalize_base_url( source )
278 working_url = await _urlpatterns.probe_url_patterns( auxdata, base_url )
279 if not __.is_absent( working_url ): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 working_source = working_url.geturl( )
281 pattern_results = await _execute_processors(
282 auxdata, working_source, processors )
283 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
284 for detection in pattern_results.values( ) ):
285 _url_redirects_cache[ source ] = working_source
286 return pattern_results
287 return results
290async def _execute_processors_and_cache(
291 auxdata: _state.Globals,
292 source: str,
293 cache: DetectionsCache,
294 processors: __.cabc.Mapping[ str, _processors.Processor ],
295) -> None:
296 ''' Executes all processors and caches results. '''
297 detections = await _execute_processors( auxdata, source, processors )
298 cache.add_entry( source, detections )
301async def _execute_processors_with_patterns_and_cache(
302 auxdata: _state.Globals,
303 source: str,
304 cache: DetectionsCache,
305 processors: __.cabc.Mapping[ str, _processors.Processor ],
306) -> None:
307 ''' Executes processors with URL pattern extension and caches. '''
308 detections = await _execute_processors_with_patterns(
309 auxdata, source, processors )
310 cache.add_entry( source, detections )
313async def probe_source_with_patterns(
314 auxdata: _state.Globals,
315 source: str
316) -> __.Absential[ str ]:
317 ''' Probes source with URL pattern extension. '''
318 base_url = _urls.normalize_base_url( source )
319 working_url = await _urlpatterns.probe_url_patterns( auxdata, base_url )
320 if not __.is_absent( working_url ):
321 return working_url.geturl( )
322 return __.absent
325def _select_detection_optimal(
326 detections: _processors.DetectionsByProcessor,
327 processors: __.cabc.Mapping[ str, _processors.Processor ]
328) -> __.Absential[ _processors.Detection ]:
329 ''' Selects best processor based on confidence and registration order. '''
330 if not detections: return __.absent
331 detections_ = [
332 result for result in detections.values( )
333 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ]
334 if not detections_: return __.absent
335 processor_names = list( processors.keys( ) )
336 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]:
337 confidence = result.confidence
338 processor_name = result.processor.name
339 registration_order = processor_names.index( processor_name )
340 return ( -confidence, registration_order )
341 detections_.sort( key = sort_key )
342 return detections_[ 0 ]