Coverage for sources/librovore/detection.py: 63%
120 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-17 23:43 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-17 23:43 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Documentation source detection system for plugin architecture. '''
24from . import __
25from . import exceptions as _exceptions
26from . import interfaces as _interfaces
27from . import processors as _processors
28from . import state as _state
31CONFIDENCE_THRESHOLD_MINIMUM = 0.5
34class DetectionsCacheEntry( __.immut.DataclassObject ):
35 ''' Cache entry for source detection results. '''
37 detections: __.cabc.Mapping[ str, _processors.Detection ]
38 timestamp: float
39 ttl: int
41 @property
42 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]:
43 ''' Returns the detection with highest confidence. '''
44 if not self.detections: return __.absent
45 best_result = max(
46 self.detections.values( ),
47 key=lambda x: x.confidence )
48 return (
49 best_result
50 if best_result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
51 else __.absent )
53 def invalid( self, current_time: float ) -> bool:
54 ''' Checks if cache entry has expired. '''
55 return current_time - self.timestamp > self.ttl
58class DetectionsCache( __.immut.DataclassObject ):
59 ''' Cache for source detection results with TTL support. '''
61 ttl: int = 3600
62 _entries: dict[ str, DetectionsCacheEntry ] = (
63 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) )
65 def access_detections(
66 self, source: str
67 ) -> __.Absential[ _processors.DetectionsByProcessor ]:
68 ''' Returns all detections for source, if unexpired. '''
69 if source not in self._entries: return __.absent
70 cache_entry = self._entries[ source ]
71 current_time = __.time.time( )
72 if cache_entry.invalid( current_time ):
73 del self._entries[ source ]
74 return __.absent
75 return cache_entry.detections
77 def access_detection_optimal(
78 self, source: str
79 ) -> __.Absential[ _processors.Detection ]:
80 ''' Returns the best detection for source, if unexpired. '''
81 if source not in self._entries: return __.absent
82 cache_entry = self._entries[ source ]
83 current_time = __.time.time( )
84 if cache_entry.invalid( current_time ):
85 del self._entries[ source ]
86 return __.absent
87 return cache_entry.detection_optimal
89 def add_entry(
90 self, source: str, detections: _processors.DetectionsByProcessor
91 ) -> __.typx.Self:
92 ''' Adds or updates cache entry with fresh results. '''
93 self._entries[ source ] = DetectionsCacheEntry(
94 detections = detections,
95 timestamp = __.time.time( ),
96 ttl = self.ttl,
97 )
98 return self
100 def clear( self ) -> __.typx.Self:
101 ''' Clears all cached entries. '''
102 self._entries.clear( )
103 return self
105 def remove_entry(
106 self, source: str
107 ) -> __.Absential[ _processors.DetectionsByProcessor ]:
108 ''' Removes specific source from cache, if present. '''
109 entry = self._entries.pop( source, None )
110 if entry: return entry.detections
111 return __.absent
114_inventory_detections_cache = DetectionsCache( )
115_structure_detections_cache = DetectionsCache( )
118async def access_detections(
119 auxdata: _state.Globals,
120 source: str, /, *,
121 genus: _interfaces.ProcessorGenera
122) -> tuple[
123 _processors.DetectionsByProcessor,
124 __.Absential[ _processors.Detection ]
125]:
126 ''' Accesses detections via appropriate cache.
128 Detections are performed to fill cache, if necessary.
129 '''
130 match genus:
131 case _interfaces.ProcessorGenera.Inventory:
132 cache = _inventory_detections_cache
133 processors = _processors.inventory_processors
134 case _interfaces.ProcessorGenera.Structure:
135 cache = _structure_detections_cache
136 processors = _processors.structure_processors
137 return await access_detections_ll(
138 auxdata, source, cache = cache, processors = processors )
141async def access_detections_ll(
142 auxdata: _state.Globals,
143 source: str, /, *,
144 cache: DetectionsCache,
145 processors: __.cabc.Mapping[ str, _processors.Processor ],
146) -> tuple[
147 _processors.DetectionsByProcessor,
148 __.Absential[ _processors.Detection ]
149]:
150 ''' Accesses detections via appropriate cache.
152 Detections are performed to fill cache, if necessary.
154 Low-level function which accepts arbitrary cache and processors list.
155 '''
156 detections = cache.access_detections( source )
157 if __.is_absent( detections ):
158 await _execute_processors_and_cache(
159 auxdata, source, cache, processors )
160 detections = cache.access_detections( source )
161 # After fresh execution, detections should never be absent
162 if __.is_absent( detections ):
163 # Fallback: create empty detections mapping
164 detections = __.immut.Dictionary[
165 str, _processors.Detection ]( )
166 detection_optimal = cache.access_detection_optimal( source )
167 return detections, detection_optimal
170async def detect(
171 auxdata: _state.Globals,
172 source: str, /,
173 genus: _interfaces.ProcessorGenera, *,
174 processor_name: __.Absential[ str ] = __.absent,
175) -> _processors.Detection:
176 ''' Detects inventory processors for source through cache system. '''
177 match genus:
178 case _interfaces.ProcessorGenera.Inventory:
179 cache = _inventory_detections_cache
180 class_name = 'inventory'
181 processors = _processors.inventory_processors
182 case _interfaces.ProcessorGenera.Structure:
183 cache = _structure_detections_cache
184 class_name = 'structure'
185 processors = _processors.structure_processors
186 if not __.is_absent( processor_name ):
187 if processor_name not in processors:
188 raise _exceptions.ProcessorInavailability( processor_name )
189 processor = processors[ processor_name ]
190 return await processor.detect( auxdata, source )
191 detection = await determine_detection_optimal_ll(
192 auxdata, source, cache = cache, processors = processors )
193 if __.is_absent( detection ):
194 raise _exceptions.ProcessorInavailability( class_name )
195 return detection
198async def detect_inventory(
199 auxdata: _state.Globals,
200 source: str, /, *,
201 processor_name: __.Absential[ str ] = __.absent,
202) -> _processors.InventoryDetection:
203 ''' Detects inventory processors for source through cache system. '''
204 detection = await detect(
205 auxdata, source,
206 genus = _interfaces.ProcessorGenera.Inventory,
207 processor_name = processor_name )
208 return __.typx.cast( _processors.InventoryDetection, detection )
211async def detect_structure(
212 auxdata: _state.Globals,
213 source: str, /, *,
214 processor_name: __.Absential[ str ] = __.absent,
215) -> _processors.StructureDetection:
216 ''' Detects structure processors for source through cache system. '''
217 detection = await detect(
218 auxdata, source,
219 genus = _interfaces.ProcessorGenera.Structure,
220 processor_name = processor_name )
221 return __.typx.cast( _processors.StructureDetection, detection )
224async def determine_detection_optimal_ll(
225 auxdata: _state.Globals,
226 source: str, /, *,
227 cache: DetectionsCache,
228 processors: __.cabc.Mapping[ str, _processors.Processor ],
229) -> __.Absential[ _processors.Detection ]:
230 ''' Determines which processor can best handle the source.
232 Low-level function which accepts arbitrary cache and processors list.
233 '''
234 detection = cache.access_detection_optimal( source )
235 if not __.is_absent( detection ): return detection
236 detections = await _execute_processors( auxdata, source, processors )
237 cache.add_entry( source, detections )
238 return _select_detection_optimal( detections, processors )
241async def _execute_processors(
242 auxdata: _state.Globals,
243 source: str,
244 processors: __.cabc.Mapping[ str, _processors.Processor ],
245) -> dict[ str, _processors.Detection ]:
246 ''' Runs all processors on the source. '''
247 results: dict[ str, _processors.Detection ] = { }
248 # TODO: Parallel async fanout.
249 for processor in processors.values( ):
250 try: detection = await processor.detect( auxdata, source )
251 except Exception: # noqa: PERF203,S112
252 # Skip processor on detection failure
253 continue
254 else: results[ processor.name ] = detection
255 return results
258async def _execute_processors_and_cache(
259 auxdata: _state.Globals,
260 source: str,
261 cache: DetectionsCache,
262 processors: __.cabc.Mapping[ str, _processors.Processor ],
263) -> None:
264 ''' Executes all processors and caches results. '''
265 detections = await _execute_processors( auxdata, source, processors )
266 cache.add_entry( source, detections )
269def _select_detection_optimal(
270 detections: _processors.DetectionsByProcessor,
271 processors: __.cabc.Mapping[ str, _processors.Processor ]
272) -> __.Absential[ _processors.Detection ]:
273 ''' Selects best processor based on confidence and registration order. '''
274 if not detections: return __.absent
275 detections_ = [
276 result for result in detections.values( )
277 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ]
278 if not detections_: return __.absent
279 processor_names = list( processors.keys( ) )
280 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]:
281 confidence = result.confidence
282 processor_name = result.processor.name
283 registration_order = processor_names.index( processor_name )
284 return ( -confidence, registration_order )
285 detections_.sort( key = sort_key )
286 return detections_[ 0 ]