Coverage for sources/librovore/detection.py: 60%

147 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 18:40 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Documentation source detection system for plugin architecture. ''' 

22 

23 

24from . import __ 

25from . import exceptions as _exceptions 

26from . import interfaces as _interfaces 

27from . import processors as _processors 

28from . import state as _state 

29from . import urlpatterns as _urlpatterns 

30from . import urls as _urls 

31 

32 

33CONFIDENCE_THRESHOLD_MINIMUM = 0.5 

34 

35 

36class DetectionsCacheEntry( __.immut.DataclassObject ): 

37 ''' Cache entry for source detection results. ''' 

38 

39 detections: __.cabc.Mapping[ str, _processors.Detection ] 

40 timestamp: float 

41 ttl: int 

42 

43 @property 

44 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]: 

45 ''' Returns the detection with highest confidence. ''' 

46 if not self.detections: return __.absent 

47 optimum = max( 

48 self.detections.values( ), key=lambda x: x.confidence ) 

49 return ( 

50 optimum 

51 if optimum.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

52 else __.absent ) 

53 

54 def invalid( self, current_time: float ) -> bool: 

55 ''' Checks if cache entry has expired. ''' 

56 return current_time - self.timestamp > self.ttl 

57 

58 

59class DetectionsCache( __.immut.DataclassObject ): 

60 ''' Cache for source detection results with TTL support. ''' 

61 

62 ttl: int = 3600 

63 _entries: dict[ str, DetectionsCacheEntry ] = ( 

64 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) ) 

65 

66 def access_detections( 

67 self, source: str 

68 ) -> __.Absential[ _processors.DetectionsByProcessor ]: 

69 ''' Returns all detections for source, if unexpired. ''' 

70 if source not in self._entries: return __.absent 

71 entry = self._entries[ source ] 

72 current_time = __.time.time( ) 

73 if entry.invalid( current_time ): 

74 del self._entries[ source ] 

75 return __.absent 

76 return entry.detections 

77 

78 def access_detection_optimal( 

79 self, source: str 

80 ) -> __.Absential[ _processors.Detection ]: 

81 ''' Returns the best detection for source, if unexpired. ''' 

82 if source not in self._entries: return __.absent 

83 entry = self._entries[ source ] 

84 current_time = __.time.time( ) 

85 if entry.invalid( current_time ): 

86 del self._entries[ source ] 

87 return __.absent 

88 return entry.detection_optimal 

89 

90 def add_entry( 

91 self, source: str, detections: _processors.DetectionsByProcessor 

92 ) -> __.typx.Self: 

93 ''' Adds or updates cache entry with fresh results. ''' 

94 self._entries[ source ] = DetectionsCacheEntry( 

95 detections = detections, 

96 timestamp = __.time.time( ), 

97 ttl = self.ttl ) 

98 return self 

99 

100 def clear( self ) -> __.typx.Self: 

101 ''' Clears all cached entries. ''' 

102 self._entries.clear( ) 

103 return self 

104 

105 

106 

107_inventory_detections_cache = DetectionsCache( ) 

108_structure_detections_cache = DetectionsCache( ) 

109 

110_url_redirects_cache: dict[ str, str ] = { } 

111 

112 

113def resolve_source_url( url: str ) -> str: 

114 ''' Resolves source URL through redirect cache, returns working URL. ''' 

115 return _url_redirects_cache.get( url, url ) 

116 

117 

118async def access_detections( 

119 auxdata: _state.Globals, 

120 source: str, /, *, 

121 genus: _interfaces.ProcessorGenera 

122) -> tuple[ 

123 _processors.DetectionsByProcessor, 

124 __.Absential[ _processors.Detection ] 

125]: 

126 ''' Accesses detections via appropriate cache. ''' 

127 source_ = _url_redirects_cache.get( source, source ) 

128 match genus: 

129 case _interfaces.ProcessorGenera.Inventory: 

130 cache = _inventory_detections_cache 

131 processors = _processors.inventory_processors 

132 case _interfaces.ProcessorGenera.Structure: 

133 cache = _structure_detections_cache 

134 processors = _processors.structure_processors 

135 return await access_detections_ll( 

136 auxdata, source_, cache = cache, processors = processors ) 

137 

138 

139async def access_detections_ll( 

140 auxdata: _state.Globals, 

141 source: str, /, *, 

142 cache: DetectionsCache, 

143 processors: __.cabc.Mapping[ str, _processors.Processor ], 

144) -> tuple[ 

145 _processors.DetectionsByProcessor, 

146 __.Absential[ _processors.Detection ] 

147]: 

148 ''' Accesses detections via appropriate cache. 

149 

150 Detections are performed to fill cache, if necessary. 

151 

152 Low-level function accepting arbitrary cache and processors list. 

153 ''' 

154 detections = cache.access_detections( source ) 

155 if __.is_absent( detections ): 

156 await _execute_processors_and_cache( 

157 auxdata, source, cache, processors ) 

158 detections = cache.access_detections( source ) 

159 if __.is_absent( detections ): 

160 detections = __.immut.Dictionary[ 

161 str, _processors.Detection ]( ) 

162 optimum = cache.access_detection_optimal( source ) 

163 return detections, optimum 

164 

165 

166async def collect_filter_inventories( 

167 auxdata: _state.Globals, 

168 location: str, /, *, 

169 confidence_limit: float = 0.5, 

170) -> __.immut.Dictionary[ str, _processors.InventoryDetection ]: 

171 ''' Collects all inventory sources above confidence threshold. 

172 

173 Returns dictionary mapping processor names to their detections 

174 for multi-source inventory coordination. 

175 ''' 

176 location_ = _url_redirects_cache.get( location, location ) 

177 detections, _ = await access_detections( 

178 auxdata, location_, genus = _interfaces.ProcessorGenera.Inventory ) 

179 detections_ = { 

180 processor_name: __.typx.cast( 

181 _processors.InventoryDetection, detection ) 

182 for processor_name, detection in detections.items( ) 

183 if detection.confidence >= confidence_limit } 

184 return __.immut.Dictionary( detections_ ) 

185 

186 

187async def detect( 

188 auxdata: _state.Globals, 

189 source: str, /, 

190 genus: _interfaces.ProcessorGenera, *, 

191 processor_name: __.Absential[ str ] = __.absent, 

192) -> _processors.Detection: 

193 ''' Detects processors for source through cache system. ''' 

194 source_ = _url_redirects_cache.get( source, source ) 

195 match genus: 

196 case _interfaces.ProcessorGenera.Inventory: 

197 cache = _inventory_detections_cache 

198 class_name = 'inventory' 

199 processors = _processors.inventory_processors 

200 case _interfaces.ProcessorGenera.Structure: 

201 cache = _structure_detections_cache 

202 class_name = 'structure' 

203 processors = _processors.structure_processors 

204 if not __.is_absent( processor_name ): 

205 if processor_name not in processors: 

206 raise _exceptions.ProcessorInavailability( processor_name ) 

207 processor = processors[ processor_name ] 

208 return await processor.detect( auxdata, source_ ) 

209 detection = await determine_detection_optimal_ll( 

210 auxdata, source_, cache = cache, processors = processors ) 

211 if __.is_absent( detection ): 

212 raise _exceptions.ProcessorInavailability( class_name ) 

213 return detection 

214 

215 

216async def detect_inventory( 

217 auxdata: _state.Globals, 

218 source: str, /, *, 

219 processor_name: __.Absential[ str ] = __.absent, 

220) -> _processors.InventoryDetection: 

221 ''' Detects inventory processors for source through cache system. ''' 

222 detection = await detect( 

223 auxdata, source, 

224 genus = _interfaces.ProcessorGenera.Inventory, 

225 processor_name = processor_name ) 

226 return __.typx.cast( _processors.InventoryDetection, detection ) 

227 

228 

229async def detect_structure( 

230 auxdata: _state.Globals, 

231 source: str, /, *, 

232 processor_name: __.Absential[ str ] = __.absent, 

233) -> _processors.StructureDetection: 

234 ''' Detects structure processors for source through cache system. ''' 

235 detection = await detect( 

236 auxdata, source, 

237 genus = _interfaces.ProcessorGenera.Structure, 

238 processor_name = processor_name ) 

239 return __.typx.cast( _processors.StructureDetection, detection ) 

240 

241 

242async def determine_detection_optimal_ll( 

243 auxdata: _state.Globals, 

244 source: str, /, *, 

245 cache: DetectionsCache, 

246 processors: __.cabc.Mapping[ str, _processors.Processor ], 

247) -> __.Absential[ _processors.Detection ]: 

248 ''' Determines which processor can best handle the source. 

249 

250 Low-level function accepting arbitrary cache and processors list. 

251 ''' 

252 detection = cache.access_detection_optimal( source ) 

253 if not __.is_absent( detection ): return detection 

254 detections = await _execute_processors_with_patterns( 

255 auxdata, source, processors ) 

256 cache.add_entry( source, detections ) 

257 return _select_detection_optimal( detections, processors ) 

258 

259 

260async def _execute_processors( 

261 auxdata: _state.Globals, 

262 source: str, 

263 processors: __.cabc.Mapping[ str, _processors.Processor ], 

264) -> dict[ str, _processors.Detection ]: 

265 ''' Runs all processors on the source. ''' 

266 results: dict[ str, _processors.Detection ] = { } 

267 access_failures: list[ _exceptions.RobotsTxtAccessFailure ] = [ ] 

268 # TODO: Parallel async fanout. 

269 for processor in processors.values( ): 

270 try: detection = await processor.detect( auxdata, source ) 

271 except _exceptions.RobotsTxtAccessFailure as exc: # noqa: PERF203 

272 access_failures.append( exc ) 

273 continue 

274 except Exception: # noqa: S112 

275 continue 

276 else: results[ processor.name ] = detection 

277 # If all processors failed due to robots.txt access issues, raise error 

278 if not results and access_failures: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 raise access_failures[ 0 ] from None 

280 return results 

281 

282 

283async def _execute_processors_with_patterns( 

284 auxdata: _state.Globals, 

285 source: str, 

286 processors: __.cabc.Mapping[ str, _processors.Processor ], 

287) -> dict[ str, _processors.Detection ]: 

288 ''' Runs processors with URL pattern extension fallback. ''' 

289 results = await _execute_processors( auxdata, source, processors ) 

290 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

291 for detection in results.values( ) ): 

292 return results 

293 base_url = _urls.normalize_base_url( source ) 

294 working_url = await _urlpatterns.probe_url_patterns( 

295 auxdata, base_url, '/objects.inv' ) 

296 if not __.is_absent( working_url ): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 working_source = working_url.geturl( ) 

298 pattern_results = await _execute_processors( 

299 auxdata, working_source, processors ) 

300 if any( detection.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

301 for detection in pattern_results.values( ) ): 

302 _url_redirects_cache[ source ] = working_source 

303 return pattern_results 

304 return results 

305 

306 

307async def _execute_processors_and_cache( 

308 auxdata: _state.Globals, 

309 source: str, 

310 cache: DetectionsCache, 

311 processors: __.cabc.Mapping[ str, _processors.Processor ], 

312) -> None: 

313 ''' Executes processors with URL pattern extension and caches. ''' 

314 detections = await _execute_processors_with_patterns( 

315 auxdata, source, processors ) 

316 cache.add_entry( source, detections ) 

317 

318 

319def _select_detection_optimal( 

320 detections: _processors.DetectionsByProcessor, 

321 processors: __.cabc.Mapping[ str, _processors.Processor ] 

322) -> __.Absential[ _processors.Detection ]: 

323 ''' Selects best processor based on confidence and registration order. ''' 

324 if not detections: return __.absent 

325 detections_ = [ 

326 result for result in detections.values( ) 

327 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ] 

328 if not detections_: return __.absent 

329 processor_names = list( processors.keys( ) ) 

330 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]: 

331 confidence = result.confidence 

332 processor_name = result.processor.name 

333 registration_order = processor_names.index( processor_name ) 

334 return ( -confidence, registration_order ) 

335 detections_.sort( key = sort_key ) 

336 return detections_[ 0 ]