Coverage for sources/librovore/detection.py: 61%

136 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-06 02:25 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Documentation source detection system for plugin architecture. ''' 

22 

23 

24from . import __ 

25from . import exceptions as _exceptions 

26from . import interfaces as _interfaces 

27from . import processors as _processors 

28from . import state as _state 

29from . import urlpatterns as _urlpatterns 

30from . import urls as _urls 

31 

32 

33CONFIDENCE_THRESHOLD_MINIMUM = 0.5 

34 

35 

36class DetectionsCacheEntry( __.immut.DataclassObject ): 

37 ''' Cache entry for source detection results. ''' 

38 

39 detections: __.cabc.Mapping[ str, _processors.Detection ] 

40 timestamp: float 

41 ttl: int 

42 

43 @property 

44 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]: 

45 ''' Returns the detection with highest confidence. ''' 

46 if not self.detections: return __.absent 

47 best_result = max( 

48 self.detections.values( ), 

49 key=lambda x: x.confidence ) 

50 return ( 

51 best_result 

52 if best_result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

53 else __.absent ) 

54 

55 def invalid( self, current_time: float ) -> bool: 

56 ''' Checks if cache entry has expired. ''' 

57 return current_time - self.timestamp > self.ttl 

58 

59 

60class DetectionsCache( __.immut.DataclassObject ): 

61 ''' Cache for source detection results with TTL support. ''' 

62 

63 ttl: int = 3600 

64 _entries: dict[ str, DetectionsCacheEntry ] = ( 

65 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) ) 

66 

67 def access_detections( 

68 self, source: str 

69 ) -> __.Absential[ _processors.DetectionsByProcessor ]: 

70 ''' Returns all detections for source, if unexpired. ''' 

71 if source not in self._entries: return __.absent 

72 cache_entry = self._entries[ source ] 

73 current_time = __.time.time( ) 

74 if cache_entry.invalid( current_time ): 

75 del self._entries[ source ] 

76 return __.absent 

77 return cache_entry.detections 

78 

79 def access_detection_optimal( 

80 self, source: str 

81 ) -> __.Absential[ _processors.Detection ]: 

82 ''' Returns the best detection for source, if unexpired. ''' 

83 if source not in self._entries: return __.absent 

84 cache_entry = self._entries[ source ] 

85 current_time = __.time.time( ) 

86 if cache_entry.invalid( current_time ): 

87 del self._entries[ source ] 

88 return __.absent 

89 return cache_entry.detection_optimal 

90 

91 def add_entry( 

92 self, source: str, detections: _processors.DetectionsByProcessor 

93 ) -> __.typx.Self: 

94 ''' Adds or updates cache entry with fresh results. ''' 

95 self._entries[ source ] = DetectionsCacheEntry( 

96 detections = detections, 

97 timestamp = __.time.time( ), 

98 ttl = self.ttl, 

99 ) 

100 return self 

101 

102 def clear( self ) -> __.typx.Self: 

103 ''' Clears all cached entries. ''' 

104 self._entries.clear( ) 

105 return self 

106 

107 

108 

109_inventory_detections_cache = DetectionsCache( ) 

110_structure_detections_cache = DetectionsCache( ) 

111 

112_url_redirects_cache: dict[ str, str ] = { } 

113 

114 

115def resolve_source_url( url: str ) -> str: 

116 ''' Resolves source URL through redirect cache, returns working URL. ''' 

117 return _url_redirects_cache.get( url, url ) 

118 

119 

120async def access_detections( 

121 auxdata: _state.Globals, 

122 source: str, /, *, 

123 genus: _interfaces.ProcessorGenera 

124) -> tuple[ 

125 _processors.DetectionsByProcessor, 

126 __.Absential[ _processors.Detection ] 

127]: 

128 ''' Accesses detections via appropriate cache. ''' 

129 resolved_source = _url_redirects_cache.get( source, source ) 

130 match genus: 

131 case _interfaces.ProcessorGenera.Inventory: 

132 cache = _inventory_detections_cache 

133 processors = _processors.inventory_processors 

134 case _interfaces.ProcessorGenera.Structure: 

135 cache = _structure_detections_cache 

136 processors = _processors.structure_processors 

137 return await access_detections_ll( 

138 auxdata, resolved_source, cache = cache, processors = processors ) 

139 

140 

141async def access_detections_ll( 

142 auxdata: _state.Globals, 

143 source: str, /, *, 

144 cache: DetectionsCache, 

145 processors: __.cabc.Mapping[ str, _processors.Processor ], 

146) -> tuple[ 

147 _processors.DetectionsByProcessor, 

148 __.Absential[ _processors.Detection ] 

149]: 

150 ''' Accesses detections via appropriate cache. 

151 

152 Detections are performed to fill cache, if necessary. 

153 

154 Low-level function accepting arbitrary cache and processors list. 

155 ''' 

156 detections = cache.access_detections( source ) 

157 if __.is_absent( detections ): 

158 await _execute_processors_and_cache( 

159 auxdata, source, cache, processors ) 

160 detections = cache.access_detections( source ) 

161 if __.is_absent( detections ): 

162 detections = __.immut.Dictionary[ 

163 str, _processors.Detection ]( ) 

164 detection_optimal = cache.access_detection_optimal( source ) 

165 return detections, detection_optimal 

166 

167 

168async def detect( 

169 auxdata: _state.Globals, 

170 source: str, /, 

171 genus: _interfaces.ProcessorGenera, *, 

172 processor_name: __.Absential[ str ] = __.absent, 

173) -> _processors.Detection: 

174 ''' Detects processors for source through cache system. ''' 

175 resolved_source = _url_redirects_cache.get( source, source ) 

176 match genus: 

177 case _interfaces.ProcessorGenera.Inventory: 

178 cache = _inventory_detections_cache 

179 class_name = 'inventory' 

180 processors = _processors.inventory_processors 

181 case _interfaces.ProcessorGenera.Structure: 

182 cache = _structure_detections_cache 

183 class_name = 'structure' 

184 processors = _processors.structure_processors 

185 if not __.is_absent( processor_name ): 

186 if processor_name not in processors: 

187 raise _exceptions.ProcessorInavailability( processor_name ) 

188 processor = processors[ processor_name ] 

189 return await processor.detect( auxdata, resolved_source ) 

190 detection = await determine_detection_optimal_ll( 

191 auxdata, resolved_source, cache = cache, processors = processors ) 

192 if __.is_absent( detection ): 

193 raise _exceptions.ProcessorInavailability( class_name ) 

194 return detection 

195 

196 

197async def detect_inventory( 

198 auxdata: _state.Globals, 

199 source: str, /, *, 

200 processor_name: __.Absential[ str ] = __.absent, 

201) -> _processors.InventoryDetection: 

202 ''' Detects inventory processors for source through cache system. ''' 

203 detection = await detect( 

204 auxdata, source, 

205 genus = _interfaces.ProcessorGenera.Inventory, 

206 processor_name = processor_name ) 

207 return __.typx.cast( _processors.InventoryDetection, detection ) 

208 

209 

210async def detect_structure( 

211 auxdata: _state.Globals, 

212 source: str, /, *, 

213 processor_name: __.Absential[ str ] = __.absent, 

214) -> _processors.StructureDetection: 

215 ''' Detects structure processors for source through cache system. ''' 

216 detection = await detect( 

217 auxdata, source, 

218 genus = _interfaces.ProcessorGenera.Structure, 

219 processor_name = processor_name ) 

220 return __.typx.cast( _processors.StructureDetection, detection ) 

221 

222 

223async def determine_detection_optimal_ll( 

224 auxdata: _state.Globals, 

225 source: str, /, *, 

226 cache: DetectionsCache, 

227 processors: __.cabc.Mapping[ str, _processors.Processor ], 

228) -> __.Absential[ _processors.Detection ]: 

229 ''' Determines which processor can best handle the source. 

230 

231 Low-level function accepting arbitrary cache and processors list. 

232 ''' 

233 detection = cache.access_detection_optimal( source ) 

234 if not __.is_absent( detection ): return detection 

235 detections = await _execute_processors_with_patterns( 

236 auxdata, source, processors ) 

237 cache.add_entry( source, detections ) 

238 return _select_detection_optimal( detections, processors ) 

239 

240 

241async def _execute_processors( 

242 auxdata: _state.Globals, 

243 source: str, 

244 processors: __.cabc.Mapping[ str, _processors.Processor ], 

245) -> dict[ str, _processors.Detection ]: 

246 ''' Runs all processors on the source. ''' 

247 results: dict[ str, _processors.Detection ] = { } 

248 # TODO: Parallel async fanout. 

249 for processor in processors.values( ): 

250 try: detection = await processor.detect( auxdata, source ) 

251 except Exception: # noqa: PERF203,S112 

252 # Skip processor on detection failure 

253 continue 

254 else: results[ processor.name ] = detection 

255 return results 

256 

257 

258async def _execute_processors_with_patterns( 

259 auxdata: _state.Globals, 

260 source: str, 

261 processors: __.cabc.Mapping[ str, _processors.Processor ], 

262) -> dict[ str, _processors.Detection ]: 

263 ''' Runs processors with URL pattern extension fallback. ''' 

264 results = await _execute_processors( auxdata, source, processors ) 

265 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

266 for detection in results.values( ) ): 

267 return results 

268 base_url = _urls.normalize_base_url( source ) 

269 working_url = await _urlpatterns.probe_url_patterns( 

270 auxdata, base_url, '/objects.inv' ) 

271 if not __.is_absent( working_url ): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 working_source = working_url.geturl( ) 

273 pattern_results = await _execute_processors( 

274 auxdata, working_source, processors ) 

275 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

276 for detection in pattern_results.values( ) ): 

277 _url_redirects_cache[ source ] = working_source 

278 return pattern_results 

279 return results 

280 

281 

282async def _execute_processors_and_cache( 

283 auxdata: _state.Globals, 

284 source: str, 

285 cache: DetectionsCache, 

286 processors: __.cabc.Mapping[ str, _processors.Processor ], 

287) -> None: 

288 ''' Executes processors with URL pattern extension and caches. ''' 

289 detections = await _execute_processors_with_patterns( 

290 auxdata, source, processors ) 

291 cache.add_entry( source, detections ) 

292 

293 

294def _select_detection_optimal( 

295 detections: _processors.DetectionsByProcessor, 

296 processors: __.cabc.Mapping[ str, _processors.Processor ] 

297) -> __.Absential[ _processors.Detection ]: 

298 ''' Selects best processor based on confidence and registration order. ''' 

299 if not detections: return __.absent 

300 detections_ = [ 

301 result for result in detections.values( ) 

302 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ] 

303 if not detections_: return __.absent 

304 processor_names = list( processors.keys( ) ) 

305 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]: 

306 confidence = result.confidence 

307 processor_name = result.processor.name 

308 registration_order = processor_names.index( processor_name ) 

309 return ( -confidence, registration_order ) 

310 detections_.sort( key = sort_key ) 

311 return detections_[ 0 ]