Coverage for sources/librovore/detection.py: 60%

149 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 22:48 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Documentation source detection system for plugin architecture. ''' 

22 

23 

24from . import __ 

25from . import exceptions as _exceptions 

26from . import interfaces as _interfaces 

27from . import processors as _processors 

28from . import state as _state 

29from . import urlpatterns as _urlpatterns 

30from . import urls as _urls 

31 

32 

33CONFIDENCE_THRESHOLD_MINIMUM = 0.5 

34 

35 

36class DetectionsCacheEntry( __.immut.DataclassObject ): 

37 ''' Cache entry for source detection results. ''' 

38 

39 detections: __.cabc.Mapping[ str, _processors.Detection ] 

40 timestamp: float 

41 ttl: int 

42 

43 @property 

44 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]: 

45 ''' Returns the detection with highest confidence. ''' 

46 if not self.detections: return __.absent 

47 best_result = max( 

48 self.detections.values( ), 

49 key=lambda x: x.confidence ) 

50 return ( 

51 best_result 

52 if best_result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

53 else __.absent ) 

54 

55 def invalid( self, current_time: float ) -> bool: 

56 ''' Checks if cache entry has expired. ''' 

57 return current_time - self.timestamp > self.ttl 

58 

59 

60class DetectionsCache( __.immut.DataclassObject ): 

61 ''' Cache for source detection results with TTL support. ''' 

62 

63 ttl: int = 3600 

64 _entries: dict[ str, DetectionsCacheEntry ] = ( 

65 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) ) 

66 

67 def access_detections( 

68 self, source: str 

69 ) -> __.Absential[ _processors.DetectionsByProcessor ]: 

70 ''' Returns all detections for source, if unexpired. ''' 

71 if source not in self._entries: return __.absent 

72 cache_entry = self._entries[ source ] 

73 current_time = __.time.time( ) 

74 if cache_entry.invalid( current_time ): 

75 del self._entries[ source ] 

76 return __.absent 

77 return cache_entry.detections 

78 

79 def access_detection_optimal( 

80 self, source: str 

81 ) -> __.Absential[ _processors.Detection ]: 

82 ''' Returns the best detection for source, if unexpired. ''' 

83 if source not in self._entries: return __.absent 

84 cache_entry = self._entries[ source ] 

85 current_time = __.time.time( ) 

86 if cache_entry.invalid( current_time ): 

87 del self._entries[ source ] 

88 return __.absent 

89 return cache_entry.detection_optimal 

90 

91 def add_entry( 

92 self, source: str, detections: _processors.DetectionsByProcessor 

93 ) -> __.typx.Self: 

94 ''' Adds or updates cache entry with fresh results. ''' 

95 self._entries[ source ] = DetectionsCacheEntry( 

96 detections = detections, 

97 timestamp = __.time.time( ), 

98 ttl = self.ttl, 

99 ) 

100 return self 

101 

102 def clear( self ) -> __.typx.Self: 

103 ''' Clears all cached entries. ''' 

104 self._entries.clear( ) 

105 return self 

106 

107 def remove_entry( 

108 self, source: str 

109 ) -> __.Absential[ _processors.DetectionsByProcessor ]: 

110 ''' Removes specific source from cache, if present. ''' 

111 entry = self._entries.pop( source, None ) 

112 if entry: return entry.detections 

113 return __.absent 

114 

115 

116 

117_inventory_detections_cache = DetectionsCache( ) 

118_structure_detections_cache = DetectionsCache( ) 

119 

120# Universal URL redirects cache: original_url → working_url 

121_url_redirects_cache: dict[ str, str ] = { } 

122 

123 

124def resolve_source_url( url: str ) -> str: 

125 ''' Resolves source URL through redirect cache, returns working URL. ''' 

126 return _url_redirects_cache.get( url, url ) 

127 

128 

129async def access_detections( 

130 auxdata: _state.Globals, 

131 source: str, /, *, 

132 genus: _interfaces.ProcessorGenera 

133) -> tuple[ 

134 _processors.DetectionsByProcessor, 

135 __.Absential[ _processors.Detection ] 

136]: 

137 ''' Accesses detections via appropriate cache. ''' 

138 resolved_source = _url_redirects_cache.get( source, source ) 

139 match genus: 

140 case _interfaces.ProcessorGenera.Inventory: 

141 cache = _inventory_detections_cache 

142 processors = _processors.inventory_processors 

143 case _interfaces.ProcessorGenera.Structure: 

144 cache = _structure_detections_cache 

145 processors = _processors.structure_processors 

146 return await access_detections_ll( 

147 auxdata, resolved_source, cache = cache, processors = processors ) 

148 

149 

150async def access_detections_ll( 

151 auxdata: _state.Globals, 

152 source: str, /, *, 

153 cache: DetectionsCache, 

154 processors: __.cabc.Mapping[ str, _processors.Processor ], 

155) -> tuple[ 

156 _processors.DetectionsByProcessor, 

157 __.Absential[ _processors.Detection ] 

158]: 

159 ''' Accesses detections via appropriate cache. 

160 

161 Detections are performed to fill cache, if necessary. 

162 

163 Low-level function accepting arbitrary cache and processors list. 

164 ''' 

165 detections = cache.access_detections( source ) 

166 if __.is_absent( detections ): 

167 await _execute_processors_with_patterns_and_cache( 

168 auxdata, source, cache, processors ) 

169 detections = cache.access_detections( source ) 

170 if __.is_absent( detections ): 

171 detections = __.immut.Dictionary[ 

172 str, _processors.Detection ]( ) 

173 detection_optimal = cache.access_detection_optimal( source ) 

174 return detections, detection_optimal 

175 

176 

177async def detect( 

178 auxdata: _state.Globals, 

179 source: str, /, 

180 genus: _interfaces.ProcessorGenera, *, 

181 processor_name: __.Absential[ str ] = __.absent, 

182) -> _processors.Detection: 

183 ''' Detects processors for source through cache system. ''' 

184 resolved_source = _url_redirects_cache.get( source, source ) 

185 match genus: 

186 case _interfaces.ProcessorGenera.Inventory: 

187 cache = _inventory_detections_cache 

188 class_name = 'inventory' 

189 processors = _processors.inventory_processors 

190 case _interfaces.ProcessorGenera.Structure: 

191 cache = _structure_detections_cache 

192 class_name = 'structure' 

193 processors = _processors.structure_processors 

194 if not __.is_absent( processor_name ): 

195 if processor_name not in processors: 

196 raise _exceptions.ProcessorInavailability( processor_name ) 

197 processor = processors[ processor_name ] 

198 return await processor.detect( auxdata, resolved_source ) 

199 detection = await determine_detection_optimal_ll( 

200 auxdata, resolved_source, cache = cache, processors = processors ) 

201 if __.is_absent( detection ): 

202 raise _exceptions.ProcessorInavailability( class_name ) 

203 return detection 

204 

205 

206async def detect_inventory( 

207 auxdata: _state.Globals, 

208 source: str, /, *, 

209 processor_name: __.Absential[ str ] = __.absent, 

210) -> _processors.InventoryDetection: 

211 ''' Detects inventory processors for source through cache system. ''' 

212 detection = await detect( 

213 auxdata, source, 

214 genus = _interfaces.ProcessorGenera.Inventory, 

215 processor_name = processor_name ) 

216 return __.typx.cast( _processors.InventoryDetection, detection ) 

217 

218 

219async def detect_structure( 

220 auxdata: _state.Globals, 

221 source: str, /, *, 

222 processor_name: __.Absential[ str ] = __.absent, 

223) -> _processors.StructureDetection: 

224 ''' Detects structure processors for source through cache system. ''' 

225 detection = await detect( 

226 auxdata, source, 

227 genus = _interfaces.ProcessorGenera.Structure, 

228 processor_name = processor_name ) 

229 return __.typx.cast( _processors.StructureDetection, detection ) 

230 

231 

232async def determine_detection_optimal_ll( 

233 auxdata: _state.Globals, 

234 source: str, /, *, 

235 cache: DetectionsCache, 

236 processors: __.cabc.Mapping[ str, _processors.Processor ], 

237) -> __.Absential[ _processors.Detection ]: 

238 ''' Determines which processor can best handle the source. 

239 

240 Low-level function accepting arbitrary cache and processors list. 

241 ''' 

242 detection = cache.access_detection_optimal( source ) 

243 if not __.is_absent( detection ): return detection 

244 detections = await _execute_processors_with_patterns( 

245 auxdata, source, processors ) 

246 cache.add_entry( source, detections ) 

247 return _select_detection_optimal( detections, processors ) 

248 

249 

250async def _execute_processors( 

251 auxdata: _state.Globals, 

252 source: str, 

253 processors: __.cabc.Mapping[ str, _processors.Processor ], 

254) -> dict[ str, _processors.Detection ]: 

255 ''' Runs all processors on the source. ''' 

256 results: dict[ str, _processors.Detection ] = { } 

257 # TODO: Parallel async fanout. 

258 for processor in processors.values( ): 

259 try: detection = await processor.detect( auxdata, source ) 

260 except Exception: # noqa: PERF203,S112 

261 # Skip processor on detection failure 

262 continue 

263 else: results[ processor.name ] = detection 

264 return results 

265 

266 

267async def _execute_processors_with_patterns( 

268 auxdata: _state.Globals, 

269 source: str, 

270 processors: __.cabc.Mapping[ str, _processors.Processor ], 

271) -> dict[ str, _processors.Detection ]: 

272 ''' Runs processors with URL pattern extension fallback. ''' 

273 results = await _execute_processors( auxdata, source, processors ) 

274 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

275 for detection in results.values( ) ): 

276 return results 

277 base_url = _urls.normalize_base_url( source ) 

278 working_url = await _urlpatterns.probe_url_patterns( auxdata, base_url ) 

279 if not __.is_absent( working_url ): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 working_source = working_url.geturl( ) 

281 pattern_results = await _execute_processors( 

282 auxdata, working_source, processors ) 

283 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

284 for detection in pattern_results.values( ) ): 

285 _url_redirects_cache[ source ] = working_source 

286 return pattern_results 

287 return results 

288 

289 

290async def _execute_processors_and_cache( 

291 auxdata: _state.Globals, 

292 source: str, 

293 cache: DetectionsCache, 

294 processors: __.cabc.Mapping[ str, _processors.Processor ], 

295) -> None: 

296 ''' Executes all processors and caches results. ''' 

297 detections = await _execute_processors( auxdata, source, processors ) 

298 cache.add_entry( source, detections ) 

299 

300 

301async def _execute_processors_with_patterns_and_cache( 

302 auxdata: _state.Globals, 

303 source: str, 

304 cache: DetectionsCache, 

305 processors: __.cabc.Mapping[ str, _processors.Processor ], 

306) -> None: 

307 ''' Executes processors with URL pattern extension and caches. ''' 

308 detections = await _execute_processors_with_patterns( 

309 auxdata, source, processors ) 

310 cache.add_entry( source, detections ) 

311 

312 

313async def probe_source_with_patterns( 

314 auxdata: _state.Globals, 

315 source: str 

316) -> __.Absential[ str ]: 

317 ''' Probes source with URL pattern extension. ''' 

318 base_url = _urls.normalize_base_url( source ) 

319 working_url = await _urlpatterns.probe_url_patterns( auxdata, base_url ) 

320 if not __.is_absent( working_url ): 

321 return working_url.geturl( ) 

322 return __.absent 

323 

324 

325def _select_detection_optimal( 

326 detections: _processors.DetectionsByProcessor, 

327 processors: __.cabc.Mapping[ str, _processors.Processor ] 

328) -> __.Absential[ _processors.Detection ]: 

329 ''' Selects best processor based on confidence and registration order. ''' 

330 if not detections: return __.absent 

331 detections_ = [ 

332 result for result in detections.values( ) 

333 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ] 

334 if not detections_: return __.absent 

335 processor_names = list( processors.keys( ) ) 

336 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]: 

337 confidence = result.confidence 

338 processor_name = result.processor.name 

339 registration_order = processor_names.index( processor_name ) 

340 return ( -confidence, registration_order ) 

341 detections_.sort( key = sort_key ) 

342 return detections_[ 0 ]