Coverage for sources/librovore/detection.py: 60%
147 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 22:09 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 22:09 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Documentation source detection system for plugin architecture. '''
24from . import __
25from . import exceptions as _exceptions
26from . import interfaces as _interfaces
27from . import processors as _processors
28from . import state as _state
29from . import urlpatterns as _urlpatterns
30from . import urls as _urls
33CONFIDENCE_THRESHOLD_MINIMUM = 0.5
36class DetectionsCacheEntry( __.immut.DataclassObject ):
37 ''' Cache entry for source detection results. '''
39 detections: __.cabc.Mapping[ str, _processors.Detection ]
40 timestamp: float
41 ttl: int
43 @property
44 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]:
45 ''' Returns the detection with highest confidence. '''
46 if not self.detections: return __.absent
47 optimum = max(
48 self.detections.values( ), key=lambda x: x.confidence )
49 return (
50 optimum
51 if optimum.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
52 else __.absent )
54 def invalid( self, current_time: float ) -> bool:
55 ''' Checks if cache entry has expired. '''
56 return current_time - self.timestamp > self.ttl
59class DetectionsCache( __.immut.DataclassObject ):
60 ''' Cache for source detection results with TTL support. '''
62 ttl: int = 3600
63 _entries: dict[ str, DetectionsCacheEntry ] = (
64 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) )
66 def access_detections(
67 self, source: str
68 ) -> __.Absential[ _processors.DetectionsByProcessor ]:
69 ''' Returns all detections for source, if unexpired. '''
70 if source not in self._entries: return __.absent
71 entry = self._entries[ source ]
72 current_time = __.time.time( )
73 if entry.invalid( current_time ):
74 del self._entries[ source ]
75 return __.absent
76 return entry.detections
78 def access_detection_optimal(
79 self, source: str
80 ) -> __.Absential[ _processors.Detection ]:
81 ''' Returns the best detection for source, if unexpired. '''
82 if source not in self._entries: return __.absent
83 entry = self._entries[ source ]
84 current_time = __.time.time( )
85 if entry.invalid( current_time ):
86 del self._entries[ source ]
87 return __.absent
88 return entry.detection_optimal
90 def add_entry(
91 self, source: str, detections: _processors.DetectionsByProcessor
92 ) -> __.typx.Self:
93 ''' Adds or updates cache entry with fresh results. '''
94 self._entries[ source ] = DetectionsCacheEntry(
95 detections = detections,
96 timestamp = __.time.time( ),
97 ttl = self.ttl )
98 return self
100 def clear( self ) -> __.typx.Self:
101 ''' Clears all cached entries. '''
102 self._entries.clear( )
103 return self
107_inventory_detections_cache = DetectionsCache( )
108_structure_detections_cache = DetectionsCache( )
110_url_redirects_cache: dict[ str, str ] = { }
113def resolve_source_url( url: str ) -> str:
114 ''' Resolves source URL through redirect cache, returns working URL. '''
115 return _url_redirects_cache.get( url, url )
118async def access_detections(
119 auxdata: _state.Globals,
120 source: str, /, *,
121 genus: _interfaces.ProcessorGenera
122) -> tuple[
123 _processors.DetectionsByProcessor,
124 __.Absential[ _processors.Detection ]
125]:
126 ''' Accesses detections via appropriate cache. '''
127 source_ = _url_redirects_cache.get( source, source )
128 match genus:
129 case _interfaces.ProcessorGenera.Inventory:
130 cache = _inventory_detections_cache
131 processors = _processors.inventory_processors
132 case _interfaces.ProcessorGenera.Structure:
133 cache = _structure_detections_cache
134 processors = _processors.structure_processors
135 return await access_detections_ll(
136 auxdata, source_, cache = cache, processors = processors )
139async def access_detections_ll(
140 auxdata: _state.Globals,
141 source: str, /, *,
142 cache: DetectionsCache,
143 processors: __.cabc.Mapping[ str, _processors.Processor ],
144) -> tuple[
145 _processors.DetectionsByProcessor,
146 __.Absential[ _processors.Detection ]
147]:
148 ''' Accesses detections via appropriate cache.
150 Detections are performed to fill cache, if necessary.
152 Low-level function accepting arbitrary cache and processors list.
153 '''
154 detections = cache.access_detections( source )
155 if __.is_absent( detections ):
156 await _execute_processors_and_cache(
157 auxdata, source, cache, processors )
158 detections = cache.access_detections( source )
159 if __.is_absent( detections ):
160 detections = __.immut.Dictionary[
161 str, _processors.Detection ]( )
162 optimum = cache.access_detection_optimal( source )
163 return detections, optimum
166async def collect_filter_inventories(
167 auxdata: _state.Globals,
168 location: str, /, *,
169 confidence_limit: float = 0.5,
170) -> __.immut.Dictionary[ str, _processors.InventoryDetection ]:
171 ''' Collects all inventory sources above confidence threshold.
173 Returns dictionary mapping processor names to their detections
174 for multi-source inventory coordination.
175 '''
176 location_ = _url_redirects_cache.get( location, location )
177 detections, _ = await access_detections(
178 auxdata, location_, genus = _interfaces.ProcessorGenera.Inventory )
179 detections_ = {
180 processor_name: __.typx.cast(
181 _processors.InventoryDetection, detection )
182 for processor_name, detection in detections.items( )
183 if detection.confidence >= confidence_limit }
184 return __.immut.Dictionary( detections_ )
187async def detect(
188 auxdata: _state.Globals,
189 source: str, /,
190 genus: _interfaces.ProcessorGenera, *,
191 processor_name: __.Absential[ str ] = __.absent,
192) -> _processors.Detection:
193 ''' Detects processors for source through cache system. '''
194 source_ = _url_redirects_cache.get( source, source )
195 match genus:
196 case _interfaces.ProcessorGenera.Inventory:
197 cache = _inventory_detections_cache
198 class_name = 'inventory'
199 processors = _processors.inventory_processors
200 case _interfaces.ProcessorGenera.Structure:
201 cache = _structure_detections_cache
202 class_name = 'structure'
203 processors = _processors.structure_processors
204 if not __.is_absent( processor_name ):
205 if processor_name not in processors:
206 raise _exceptions.ProcessorInavailability( processor_name )
207 processor = processors[ processor_name ]
208 return await processor.detect( auxdata, source_ )
209 detection = await determine_detection_optimal_ll(
210 auxdata, source_, cache = cache, processors = processors )
211 if __.is_absent( detection ):
212 raise _exceptions.ProcessorInavailability( class_name )
213 return detection
216async def detect_inventory(
217 auxdata: _state.Globals,
218 source: str, /, *,
219 processor_name: __.Absential[ str ] = __.absent,
220) -> _processors.InventoryDetection:
221 ''' Detects inventory processors for source through cache system. '''
222 detection = await detect(
223 auxdata, source,
224 genus = _interfaces.ProcessorGenera.Inventory,
225 processor_name = processor_name )
226 return __.typx.cast( _processors.InventoryDetection, detection )
229async def detect_structure(
230 auxdata: _state.Globals,
231 source: str, /, *,
232 processor_name: __.Absential[ str ] = __.absent,
233) -> _processors.StructureDetection:
234 ''' Detects structure processors for source through cache system. '''
235 detection = await detect(
236 auxdata, source,
237 genus = _interfaces.ProcessorGenera.Structure,
238 processor_name = processor_name )
239 return __.typx.cast( _processors.StructureDetection, detection )
242async def determine_detection_optimal_ll(
243 auxdata: _state.Globals,
244 source: str, /, *,
245 cache: DetectionsCache,
246 processors: __.cabc.Mapping[ str, _processors.Processor ],
247) -> __.Absential[ _processors.Detection ]:
248 ''' Determines which processor can best handle the source.
250 Low-level function accepting arbitrary cache and processors list.
251 '''
252 detection = cache.access_detection_optimal( source )
253 if not __.is_absent( detection ): return detection
254 detections = await _execute_processors_with_patterns(
255 auxdata, source, processors )
256 cache.add_entry( source, detections )
257 return _select_detection_optimal( detections, processors )
260async def _execute_processors(
261 auxdata: _state.Globals,
262 source: str,
263 processors: __.cabc.Mapping[ str, _processors.Processor ],
264) -> dict[ str, _processors.Detection ]:
265 ''' Runs all processors on the source. '''
266 results: dict[ str, _processors.Detection ] = { }
267 access_failures: list[ _exceptions.RobotsTxtAccessFailure ] = [ ]
268 # TODO: Parallel async fanout.
269 for processor in processors.values( ):
270 try: detection = await processor.detect( auxdata, source )
271 except _exceptions.RobotsTxtAccessFailure as exc: # noqa: PERF203
272 access_failures.append( exc )
273 continue
274 except Exception: # noqa: S112
275 continue
276 else: results[ processor.name ] = detection
277 # If all processors failed due to robots.txt access issues, raise error
278 if not results and access_failures: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 raise access_failures[ 0 ] from None
280 return results
283async def _execute_processors_with_patterns(
284 auxdata: _state.Globals,
285 source: str,
286 processors: __.cabc.Mapping[ str, _processors.Processor ],
287) -> dict[ str, _processors.Detection ]:
288 ''' Runs processors with URL pattern extension fallback. '''
289 results = await _execute_processors( auxdata, source, processors )
290 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
291 for detection in results.values( ) ):
292 return results
293 base_url = _urls.normalize_base_url( source )
294 working_url = await _urlpatterns.probe_url_patterns(
295 auxdata, base_url, '/objects.inv' )
296 if not __.is_absent( working_url ): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 working_source = working_url.geturl( )
298 pattern_results = await _execute_processors(
299 auxdata, working_source, processors )
300 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM
301 for detection in pattern_results.values( ) ):
302 _url_redirects_cache[ source ] = working_source
303 return pattern_results
304 return results
307async def _execute_processors_and_cache(
308 auxdata: _state.Globals,
309 source: str,
310 cache: DetectionsCache,
311 processors: __.cabc.Mapping[ str, _processors.Processor ],
312) -> None:
313 ''' Executes processors with URL pattern extension and caches. '''
314 detections = await _execute_processors_with_patterns(
315 auxdata, source, processors )
316 cache.add_entry( source, detections )
319def _select_detection_optimal(
320 detections: _processors.DetectionsByProcessor,
321 processors: __.cabc.Mapping[ str, _processors.Processor ]
322) -> __.Absential[ _processors.Detection ]:
323 ''' Selects best processor based on confidence and registration order. '''
324 if not detections: return __.absent
325 detections_ = [
326 result for result in detections.values( )
327 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ]
328 if not detections_: return __.absent
329 processor_names = list( processors.keys( ) )
330 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]:
331 confidence = result.confidence
332 processor_name = result.processor.name
333 registration_order = processor_names.index( processor_name )
334 return ( -confidence, registration_order )
335 detections_.sort( key = sort_key )
336 return detections_[ 0 ]