Coverage for sources/librovore/functions.py: 14%

131 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 22:09 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import results as _results 

30from . import search as _search 

31from . import state as _state 

32 

33 

34 

35_SUCCESS_RATE_MINIMUM = 0.1 

36 

37 

38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

39 str, __.ddoc.Fname( 'location argument' ) ] 

40 

41 

42_search_behaviors_default = _interfaces.SearchBehaviors( ) 

43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

44 

45 

46async def detect( 

47 auxdata: _state.Globals, 

48 location: LocationArgument, /, 

49 genus: _interfaces.ProcessorGenera, 

50 processor_name: __.Absential[ str ] = __.absent, 

51) -> _results.DetectionsResult: 

52 ''' Detects relevant processors of particular genus for location. ''' 

53 location = _normalize_location( location ) 

54 start_time = __.time.perf_counter( ) 

55 detections, detection_optimal = ( 

56 await _detection.access_detections( 

57 auxdata, location, genus = genus ) ) 

58 end_time = __.time.perf_counter( ) 

59 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

60 if __.is_absent( detection_optimal ): 

61 genus_name = ( 

62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

63 raise _exceptions.ProcessorInavailability( 

64 location, 

65 genus = genus_name ) 

66 # Convert detections mapping to tuple of results.Detection objects 

67 detections_tuple = tuple( 

68 _results.Detection( 

69 processor_name = detection.processor.name, 

70 confidence = detection.confidence, 

71 processor_type = genus.value, 

72 detection_metadata = __.immut.Dictionary( ), 

73 ) 

74 for detection in detections.values( ) 

75 ) 

76 # Convert detection_optimal to results.Detection 

77 detection_optimal_result = _results.Detection( 

78 processor_name = detection_optimal.processor.name, 

79 confidence = detection_optimal.confidence, 

80 processor_type = genus.value, 

81 detection_metadata = __.immut.Dictionary( ), 

82 ) 

83 return _results.DetectionsResult( 

84 source = location, 

85 detections = detections_tuple, 

86 detection_optimal = detection_optimal_result, 

87 time_detection_ms = detection_time_ms ) 

88 

89 

90async def query_content( # noqa: PLR0913 

91 auxdata: _state.Globals, 

92 location: LocationArgument, 

93 term: str, /, *, 

94 processor_name: __.Absential[ str ] = __.absent, 

95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

97 content_id: __.Absential[ str ] = __.absent, 

98 results_max: int = 10, 

99 lines_max: __.typx.Optional[ int ] = None, 

100) -> _results.ContentQueryResult: 

101 ''' Searches documentation content with relevance ranking. ''' 

102 location = _normalize_location( location ) 

103 start_time = __.time.perf_counter( ) 

104 resolved_location = _detection.resolve_source_url( location ) 

105 objects = await _collect_inventory_objects_multi_source( 

106 auxdata, location, resolved_location, processor_name, filters ) 

107 if not __.is_absent( content_id ): 

108 candidates = _process_content_id_filter( 

109 content_id, resolved_location, objects ) 

110 else: 

111 results = _search.filter_by_name( 

112 objects, term, search_behaviors = search_behaviors ) 

113 candidates = [ 

114 result.inventory_object 

115 for result in results[ : results_max * 3 ] ] 

116 locations = await _create_inventory_location_info( 

117 auxdata, location, resolved_location, len( objects ) ) 

118 if not candidates: 

119 end_time = __.time.perf_counter( ) 

120 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

121 return _results.ContentQueryResult( 

122 location = resolved_location, 

123 term = term, 

124 documents = tuple( ), 

125 search_metadata = _results.SearchMetadata( 

126 results_count = 0, 

127 results_max = results_max, 

128 search_time_ms = search_time_ms ), 

129 inventory_locations = locations ) 

130 sdetection = await _detection.detect_structure( 

131 auxdata, resolved_location, processor_name = processor_name ) 

132 structure_capabilities = sdetection.get_capabilities( ) 

133 compatible_candidates = _filter_objects_by_structure_capabilities( 

134 candidates[ : results_max ], structure_capabilities ) 

135 if not compatible_candidates: 

136 end_time = __.time.perf_counter( ) 

137 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

138 return _results.ContentQueryResult( 

139 location = resolved_location, 

140 term = term, 

141 documents = ( ), 

142 search_metadata = _results.SearchMetadata( 

143 results_count = 0, 

144 results_max = results_max, 

145 search_time_ms = search_time_ms ), 

146 inventory_locations = locations ) 

147 documents = await sdetection.extract_contents( 

148 auxdata, resolved_location, compatible_candidates ) 

149 end_time = __.time.perf_counter( ) 

150 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

151 return _results.ContentQueryResult( 

152 location = resolved_location, 

153 term = term, 

154 documents = tuple( documents ), 

155 search_metadata = _results.SearchMetadata( 

156 results_count = len( documents ), 

157 results_max = results_max, 

158 matches_total = len( candidates ), 

159 search_time_ms = search_time_ms ), 

160 inventory_locations = locations ) 

161 

162 

163async def query_inventory( # noqa: PLR0913 

164 auxdata: _state.Globals, 

165 location: LocationArgument, 

166 term: str, /, *, 

167 processor_name: __.Absential[ str ] = __.absent, 

168 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

169 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

170 results_max: int = 5, 

171) -> _results.InventoryQueryResult: 

172 ''' Searches object inventory by name. 

173 

174 Returns configurable detail levels. Always includes object names 

175 plus requested detail flags (signatures, summaries, documentation). 

176 ''' 

177 location = _normalize_location( location ) 

178 start_time = __.time.perf_counter( ) 

179 detection = await _detection.detect_inventory( 

180 auxdata, location, processor_name = processor_name ) 

181 # Resolve URL after detection to get working URL if redirect exists 

182 resolved_location = _detection.resolve_source_url( location ) 

183 objects = await detection.filter_inventory( 

184 auxdata, resolved_location, filters = filters ) 

185 results = _search.filter_by_name( 

186 objects, term, search_behaviors = search_behaviors ) 

187 selections = [ 

188 result.inventory_object for result in results[ : results_max ] ] 

189 end_time = __.time.perf_counter( ) 

190 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

191 return _results.InventoryQueryResult( 

192 location = resolved_location, 

193 term = term, 

194 objects = tuple( selections ), 

195 search_metadata = _results.SearchMetadata( 

196 results_count = len( selections ), 

197 results_max = results_max, 

198 matches_total = len( objects ), 

199 search_time_ms = search_time_ms ), 

200 inventory_locations = tuple( [ 

201 _results.InventoryLocationInfo( 

202 inventory_type = detection.processor.name, 

203 location_url = resolved_location, 

204 processor_name = detection.processor.name, 

205 confidence = detection.confidence, 

206 object_count = len( objects ) ) ] ) ) 

207 

208 

209 

210async def survey_processors( 

211 auxdata: _state.Globals, /, 

212 genus: _interfaces.ProcessorGenera, 

213 name: __.typx.Optional[ str ] = None, 

214) -> _results.ProcessorsSurveyResult: 

215 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

216 start_time = __.time.perf_counter( ) 

217 match genus: 

218 case _interfaces.ProcessorGenera.Inventory: 

219 processors = dict( _processors.inventory_processors ) 

220 case _interfaces.ProcessorGenera.Structure: 

221 processors = dict( _processors.structure_processors ) 

222 if name is not None and name not in processors: 

223 raise _exceptions.ProcessorInavailability( 

224 name, 

225 genus = genus.value ) 

226 processor_infos: list[ _results.ProcessorInfo ] = [ ] 

227 for name_, processor in processors.items( ): 

228 if name is None or name_ == name: 

229 processor_info = _results.ProcessorInfo( 

230 processor_name = name_, 

231 processor_type = genus.value, 

232 capabilities = processor.capabilities, 

233 ) 

234 processor_infos.append( processor_info ) 

235 end_time = __.time.perf_counter( ) 

236 survey_time_ms = int( ( end_time - start_time ) * 1000 ) 

237 return _results.ProcessorsSurveyResult( 

238 genus = genus, 

239 filter_name = name, 

240 processors = tuple( processor_infos ), 

241 survey_time_ms = survey_time_ms, 

242 ) 

243 

244 

245async def _collect_inventory_objects_multi_source( 

246 auxdata: _state.Globals, 

247 location: str, 

248 resolved_location: str, 

249 processor_name: __.Absential[ str ], 

250 filters: __.cabc.Mapping[ str, __.typx.Any ], 

251) -> tuple[ _results.InventoryObject, ... ]: 

252 ''' Collects inventory objects using multi-source coordination. 

253 

254 Optimized to pre-filter inventory sources by structure processor 

255 compatibility before making network requests. 

256 ''' 

257 try: 

258 inventory_detections = ( 

259 await _detection.collect_filter_inventories( auxdata, location ) ) 

260 except Exception: 

261 idetection = await _detection.detect_inventory( 

262 auxdata, location, processor_name = processor_name ) 

263 return await idetection.filter_inventory( 

264 auxdata, resolved_location, filters = filters ) 

265 if not inventory_detections: return ( ) 

266 sdetection = await _detection.detect_structure( 

267 auxdata, resolved_location, processor_name = processor_name ) 

268 structure_capabilities = sdetection.get_capabilities( ) 

269 compatible_detections = _filter_detections_by_structure_capabilities( 

270 inventory_detections, structure_capabilities ) 

271 if not compatible_detections: return ( ) 

272 return await _merge_primary_supplementary( 

273 auxdata, compatible_detections, location, filters = filters ) 

274 

275 

276async def _create_inventory_location_info( 

277 auxdata: _state.Globals, 

278 location: str, 

279 resolved_location: str, 

280 object_count: int, 

281) -> tuple[ _results.InventoryLocationInfo, ... ]: 

282 ''' Creates inventory location info for multi-source results. ''' 

283 try: 

284 inventory_detections = ( 

285 await _detection.collect_filter_inventories( 

286 auxdata, location ) ) 

287 except Exception: 

288 idetection = await _detection.detect_inventory( auxdata, location ) 

289 return tuple( [ _results.InventoryLocationInfo( 

290 inventory_type = idetection.processor.name, 

291 location_url = resolved_location, 

292 processor_name = idetection.processor.name, 

293 confidence = idetection.confidence, 

294 object_count = object_count ) ] ) 

295 if not inventory_detections: 

296 return ( ) 

297 primary_detection = _select_primary_detection( inventory_detections ) 

298 return tuple( [ _results.InventoryLocationInfo( 

299 inventory_type = primary_detection.processor.name, 

300 location_url = resolved_location, 

301 processor_name = primary_detection.processor.name, 

302 confidence = primary_detection.confidence, 

303 object_count = object_count ) ] ) 

304 

305 

306def _filter_detections_by_structure_capabilities( 

307 inventory_detections: __.cabc.Mapping[ 

308 str, _processors.InventoryDetection ], 

309 structure_capabilities: _interfaces.StructureProcessorCapabilities, 

310) -> __.immut.Dictionary[ str, _processors.InventoryDetection ]: 

311 ''' Filters inventory detections by structure processor capabilities. 

312 

313 Pre-filters inventory sources by compatibility before object collection 

314 to avoid unnecessary network requests and processing overhead. 

315 ''' 

316 compatible_detections = { 

317 processor_name: detection 

318 for processor_name, detection in inventory_detections.items( ) 

319 if structure_capabilities.supports_inventory_type( 

320 detection.processor.name ) } 

321 return __.immut.Dictionary( compatible_detections ) 

322 

323 

324def _filter_objects_by_structure_capabilities( 

325 candidates: __.cabc.Sequence[ _results.InventoryObject ], 

326 structure_capabilities: _interfaces.StructureProcessorCapabilities, 

327) -> tuple[ _results.InventoryObject, ... ]: 

328 ''' Filters inventory objects by structure processor capabilities. ''' 

329 compatible_objects = [ 

330 obj for obj in candidates 

331 if structure_capabilities.supports_inventory_type( 

332 obj.inventory_type ) ] 

333 return tuple( compatible_objects ) 

334 

335 

336async def _merge_primary_supplementary( 

337 auxdata: _state.Globals, 

338 inventory_detections: __.cabc.Mapping[ 

339 str, _processors.InventoryDetection ], 

340 location: str, 

341 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

342) -> tuple[ _results.InventoryObject, ... ]: 

343 ''' Merges inventory objects using PRIMARY_SUPPLEMENTARY strategy. 

344 

345 Uses highest-confidence detection as primary source, adds supplementary 

346 objects from other qualified sources with preserved source attribution. 

347 No deduplication - complementary metadata is valuable. 

348 

349 Note: inventory_detections should already be pre-filtered for 

350 compatibility with the structure processor to avoid unnecessary 

351 network requests. 

352 ''' 

353 if not inventory_detections: return ( ) 

354 objects_aggregate: list[ _results.InventoryObject ] = [ ] 

355 location_ = _detection.resolve_source_url( location ) 

356 for detection in inventory_detections.values( ): 

357 objects = await detection.filter_inventory( 

358 auxdata, location_, filters = filters ) 

359 objects_aggregate.extend( objects ) 

360 return tuple( objects_aggregate ) 

361 

362 

363def _normalize_location( location: str ) -> str: 

364 ''' Normalizes location URL by stripping index.html. ''' 

365 if location.endswith( '/' ): return location[ : -1 ] 

366 if location.endswith( '/index.html' ): return location[ : -11 ] 

367 return location 

368 

369 

370def _process_content_id_filter( 

371 content_id: str, 

372 location: str, 

373 objects: __.cabc.Sequence[ _results.InventoryObject ], 

374) -> tuple[ _results.InventoryObject, ... ]: 

375 ''' Processes content ID for browse-then-extract workflow filtering. ''' 

376 try: location_, name = _results.parse_content_id( content_id ) 

377 except ValueError as exc: 

378 raise _exceptions.ContentIdInvalidity( 

379 content_id, f"Parsing failed: {exc}" ) from exc 

380 if location_ != location: 

381 raise _exceptions.ContentIdLocationMismatch( location_, location ) 

382 objects_ = [ obj for obj in objects if obj.name == name ] 

383 if not objects_: 

384 raise _exceptions.ContentIdObjectAbsence( name, location ) 

385 return tuple( objects_[ :1 ] ) 

386 

387 

388def _select_primary_detection( 

389 inventory_detections: __.cabc.Mapping[ 

390 str, _processors.InventoryDetection ], 

391) -> _processors.InventoryDetection: 

392 ''' Selects primary detection with highest confidence. ''' 

393 detections_list = list( inventory_detections.values( ) ) 

394 detections_list.sort( key = lambda d: -d.confidence ) 

395 return detections_list[ 0 ]