Coverage for sources/librovore/detection.py: 63%

120 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-17 23:43 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Documentation source detection system for plugin architecture. ''' 

22 

23 

24from . import __ 

25from . import exceptions as _exceptions 

26from . import interfaces as _interfaces 

27from . import processors as _processors 

28from . import state as _state 

29 

30 

31CONFIDENCE_THRESHOLD_MINIMUM = 0.5 

32 

33 

34class DetectionsCacheEntry( __.immut.DataclassObject ): 

35 ''' Cache entry for source detection results. ''' 

36 

37 detections: __.cabc.Mapping[ str, _processors.Detection ] 

38 timestamp: float 

39 ttl: int 

40 

41 @property 

42 def detection_optimal( self ) -> __.Absential[ _processors.Detection ]: 

43 ''' Returns the detection with highest confidence. ''' 

44 if not self.detections: return __.absent 

45 best_result = max( 

46 self.detections.values( ), 

47 key=lambda x: x.confidence ) 

48 return ( 

49 best_result 

50 if best_result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM 

51 else __.absent ) 

52 

53 def invalid( self, current_time: float ) -> bool: 

54 ''' Checks if cache entry has expired. ''' 

55 return current_time - self.timestamp > self.ttl 

56 

57 

58class DetectionsCache( __.immut.DataclassObject ): 

59 ''' Cache for source detection results with TTL support. ''' 

60 

61 ttl: int = 3600 

62 _entries: dict[ str, DetectionsCacheEntry ] = ( 

63 __.dcls.field( default_factory = dict[ str, DetectionsCacheEntry ] ) ) 

64 

65 def access_detections( 

66 self, source: str 

67 ) -> __.Absential[ _processors.DetectionsByProcessor ]: 

68 ''' Returns all detections for source, if unexpired. ''' 

69 if source not in self._entries: return __.absent 

70 cache_entry = self._entries[ source ] 

71 current_time = __.time.time( ) 

72 if cache_entry.invalid( current_time ): 

73 del self._entries[ source ] 

74 return __.absent 

75 return cache_entry.detections 

76 

77 def access_detection_optimal( 

78 self, source: str 

79 ) -> __.Absential[ _processors.Detection ]: 

80 ''' Returns the best detection for source, if unexpired. ''' 

81 if source not in self._entries: return __.absent 

82 cache_entry = self._entries[ source ] 

83 current_time = __.time.time( ) 

84 if cache_entry.invalid( current_time ): 

85 del self._entries[ source ] 

86 return __.absent 

87 return cache_entry.detection_optimal 

88 

89 def add_entry( 

90 self, source: str, detections: _processors.DetectionsByProcessor 

91 ) -> __.typx.Self: 

92 ''' Adds or updates cache entry with fresh results. ''' 

93 self._entries[ source ] = DetectionsCacheEntry( 

94 detections = detections, 

95 timestamp = __.time.time( ), 

96 ttl = self.ttl, 

97 ) 

98 return self 

99 

100 def clear( self ) -> __.typx.Self: 

101 ''' Clears all cached entries. ''' 

102 self._entries.clear( ) 

103 return self 

104 

105 def remove_entry( 

106 self, source: str 

107 ) -> __.Absential[ _processors.DetectionsByProcessor ]: 

108 ''' Removes specific source from cache, if present. ''' 

109 entry = self._entries.pop( source, None ) 

110 if entry: return entry.detections 

111 return __.absent 

112 

113 

114_inventory_detections_cache = DetectionsCache( ) 

115_structure_detections_cache = DetectionsCache( ) 

116 

117 

118async def access_detections( 

119 auxdata: _state.Globals, 

120 source: str, /, *, 

121 genus: _interfaces.ProcessorGenera 

122) -> tuple[ 

123 _processors.DetectionsByProcessor, 

124 __.Absential[ _processors.Detection ] 

125]: 

126 ''' Accesses detections via appropriate cache. 

127 

128 Detections are performed to fill cache, if necessary. 

129 ''' 

130 match genus: 

131 case _interfaces.ProcessorGenera.Inventory: 

132 cache = _inventory_detections_cache 

133 processors = _processors.inventory_processors 

134 case _interfaces.ProcessorGenera.Structure: 

135 cache = _structure_detections_cache 

136 processors = _processors.structure_processors 

137 return await access_detections_ll( 

138 auxdata, source, cache = cache, processors = processors ) 

139 

140 

141async def access_detections_ll( 

142 auxdata: _state.Globals, 

143 source: str, /, *, 

144 cache: DetectionsCache, 

145 processors: __.cabc.Mapping[ str, _processors.Processor ], 

146) -> tuple[ 

147 _processors.DetectionsByProcessor, 

148 __.Absential[ _processors.Detection ] 

149]: 

150 ''' Accesses detections via appropriate cache. 

151 

152 Detections are performed to fill cache, if necessary. 

153 

154 Low-level function which accepts arbitrary cache and processors list. 

155 ''' 

156 detections = cache.access_detections( source ) 

157 if __.is_absent( detections ): 

158 await _execute_processors_and_cache( 

159 auxdata, source, cache, processors ) 

160 detections = cache.access_detections( source ) 

161 # After fresh execution, detections should never be absent 

162 if __.is_absent( detections ): 

163 # Fallback: create empty detections mapping 

164 detections = __.immut.Dictionary[ 

165 str, _processors.Detection ]( ) 

166 detection_optimal = cache.access_detection_optimal( source ) 

167 return detections, detection_optimal 

168 

169 

170async def detect( 

171 auxdata: _state.Globals, 

172 source: str, /, 

173 genus: _interfaces.ProcessorGenera, *, 

174 processor_name: __.Absential[ str ] = __.absent, 

175) -> _processors.Detection: 

176 ''' Detects inventory processors for source through cache system. ''' 

177 match genus: 

178 case _interfaces.ProcessorGenera.Inventory: 

179 cache = _inventory_detections_cache 

180 class_name = 'inventory' 

181 processors = _processors.inventory_processors 

182 case _interfaces.ProcessorGenera.Structure: 

183 cache = _structure_detections_cache 

184 class_name = 'structure' 

185 processors = _processors.structure_processors 

186 if not __.is_absent( processor_name ): 

187 if processor_name not in processors: 

188 raise _exceptions.ProcessorInavailability( processor_name ) 

189 processor = processors[ processor_name ] 

190 return await processor.detect( auxdata, source ) 

191 detection = await determine_detection_optimal_ll( 

192 auxdata, source, cache = cache, processors = processors ) 

193 if __.is_absent( detection ): 

194 raise _exceptions.ProcessorInavailability( class_name ) 

195 return detection 

196 

197 

198async def detect_inventory( 

199 auxdata: _state.Globals, 

200 source: str, /, *, 

201 processor_name: __.Absential[ str ] = __.absent, 

202) -> _processors.InventoryDetection: 

203 ''' Detects inventory processors for source through cache system. ''' 

204 detection = await detect( 

205 auxdata, source, 

206 genus = _interfaces.ProcessorGenera.Inventory, 

207 processor_name = processor_name ) 

208 return __.typx.cast( _processors.InventoryDetection, detection ) 

209 

210 

211async def detect_structure( 

212 auxdata: _state.Globals, 

213 source: str, /, *, 

214 processor_name: __.Absential[ str ] = __.absent, 

215) -> _processors.StructureDetection: 

216 ''' Detects structure processors for source through cache system. ''' 

217 detection = await detect( 

218 auxdata, source, 

219 genus = _interfaces.ProcessorGenera.Structure, 

220 processor_name = processor_name ) 

221 return __.typx.cast( _processors.StructureDetection, detection ) 

222 

223 

224async def determine_detection_optimal_ll( 

225 auxdata: _state.Globals, 

226 source: str, /, *, 

227 cache: DetectionsCache, 

228 processors: __.cabc.Mapping[ str, _processors.Processor ], 

229) -> __.Absential[ _processors.Detection ]: 

230 ''' Determines which processor can best handle the source. 

231 

232 Low-level function which accepts arbitrary cache and processors list. 

233 ''' 

234 detection = cache.access_detection_optimal( source ) 

235 if not __.is_absent( detection ): return detection 

236 detections = await _execute_processors( auxdata, source, processors ) 

237 cache.add_entry( source, detections ) 

238 return _select_detection_optimal( detections, processors ) 

239 

240 

241async def _execute_processors( 

242 auxdata: _state.Globals, 

243 source: str, 

244 processors: __.cabc.Mapping[ str, _processors.Processor ], 

245) -> dict[ str, _processors.Detection ]: 

246 ''' Runs all processors on the source. ''' 

247 results: dict[ str, _processors.Detection ] = { } 

248 # TODO: Parallel async fanout. 

249 for processor in processors.values( ): 

250 try: detection = await processor.detect( auxdata, source ) 

251 except Exception: # noqa: PERF203,S112 

252 # Skip processor on detection failure 

253 continue 

254 else: results[ processor.name ] = detection 

255 return results 

256 

257 

258async def _execute_processors_and_cache( 

259 auxdata: _state.Globals, 

260 source: str, 

261 cache: DetectionsCache, 

262 processors: __.cabc.Mapping[ str, _processors.Processor ], 

263) -> None: 

264 ''' Executes all processors and caches results. ''' 

265 detections = await _execute_processors( auxdata, source, processors ) 

266 cache.add_entry( source, detections ) 

267 

268 

269def _select_detection_optimal( 

270 detections: _processors.DetectionsByProcessor, 

271 processors: __.cabc.Mapping[ str, _processors.Processor ] 

272) -> __.Absential[ _processors.Detection ]: 

273 ''' Selects best processor based on confidence and registration order. ''' 

274 if not detections: return __.absent 

275 detections_ = [ 

276 result for result in detections.values( ) 

277 if result.confidence >= CONFIDENCE_THRESHOLD_MINIMUM ] 

278 if not detections_: return __.absent 

279 processor_names = list( processors.keys( ) ) 

280 def sort_key( result: _processors.Detection ) -> tuple[ float, int ]: 

281 confidence = result.confidence 

282 processor_name = result.processor.name 

283 registration_order = processor_names.index( processor_name ) 

284 return ( -confidence, registration_order ) 

285 detections_.sort( key = sort_key ) 

286 return detections_[ 0 ]