Coverage for sources/librovore/functions.py: 13%

153 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 18:40 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import results as _results 

30from . import search as _search 

31from . import state as _state 

32 

33 

34 

35_SUCCESS_RATE_MINIMUM = 0.1 

36 

37 

38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

39 str, __.ddoc.Fname( 'location argument' ) ] 

40 

41 

42_search_behaviors_default = _interfaces.SearchBehaviors( ) 

43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

44 

45 

46FilterValidationResult: __.typx.TypeAlias = tuple[ 

47 tuple[ str, ... ], tuple[ str, ... ] ] 

48 

49 

50def validate_filters( 

51 filters: __.cabc.Mapping[ str, __.typx.Any ], 

52 processor_capabilities: _interfaces.ProcessorCapabilities, 

53) -> FilterValidationResult: 

54 ''' Validates filters against processor capabilities. 

55 

56 Returns tuple of (filters_applied, filters_ignored) where 

57 filters_applied contains filter names that are supported by the 

58 processor and filters_ignored contains filter names that are not 

59 supported. 

60 ''' 

61 supported_filter_names = frozenset( 

62 fc.name for fc in processor_capabilities.supported_filters ) 

63 filters_applied: list[ str ] = [ ] 

64 filters_ignored: list[ str ] = [ ] 

65 for filter_name in filters: 

66 if filter_name in supported_filter_names: 

67 filters_applied.append( filter_name ) 

68 else: filters_ignored.append( filter_name ) 

69 return tuple( filters_applied ), tuple( filters_ignored ) 

70 

71 

72async def detect( 

73 auxdata: _state.Globals, 

74 location: LocationArgument, /, 

75 genus: _interfaces.ProcessorGenera, 

76 processor_name: __.Absential[ str ] = __.absent, 

77) -> _results.DetectionsResult: 

78 ''' Detects relevant processors of particular genus for location. ''' 

79 location = _normalize_location( location ) 

80 start_time = __.time.perf_counter( ) 

81 detections, detection_optimal = ( 

82 await _detection.access_detections( 

83 auxdata, location, genus = genus ) ) 

84 end_time = __.time.perf_counter( ) 

85 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

86 if __.is_absent( detection_optimal ): 

87 genus_name = ( 

88 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

89 raise _exceptions.ProcessorInavailability( 

90 location, 

91 genus = genus_name ) 

92 # Convert detections mapping to tuple of results.Detection objects 

93 detections_tuple = tuple( 

94 _results.Detection( 

95 processor_name = detection.processor.name, 

96 confidence = detection.confidence, 

97 processor_type = genus.value, 

98 detection_metadata = __.immut.Dictionary( ), 

99 ) 

100 for detection in detections.values( ) 

101 ) 

102 # Convert detection_optimal to results.Detection 

103 detection_optimal_result = _results.Detection( 

104 processor_name = detection_optimal.processor.name, 

105 confidence = detection_optimal.confidence, 

106 processor_type = genus.value, 

107 detection_metadata = __.immut.Dictionary( ), 

108 ) 

109 return _results.DetectionsResult( 

110 source = location, 

111 detections = detections_tuple, 

112 detection_optimal = detection_optimal_result, 

113 time_detection_ms = detection_time_ms ) 

114 

115 

116async def query_content( # noqa: PLR0913 

117 auxdata: _state.Globals, 

118 location: LocationArgument, 

119 term: str, /, *, 

120 processor_name: __.Absential[ str ] = __.absent, 

121 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

122 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

123 content_id: __.Absential[ str ] = __.absent, 

124 results_max: int = 10, 

125 lines_max: __.typx.Optional[ int ] = None, 

126) -> _results.ContentQueryResult: 

127 ''' Searches documentation content with relevance ranking. ''' 

128 location = _normalize_location( location ) 

129 start_time = __.time.perf_counter( ) 

130 resolved_location = _detection.resolve_source_url( location ) 

131 idetection = await _detection.detect_inventory( 

132 auxdata, location, processor_name = processor_name ) 

133 filters_applied, filters_ignored = validate_filters( 

134 filters, idetection.processor.capabilities ) 

135 if filters_ignored: 

136 locations = await _create_inventory_location_info( 

137 auxdata, location, resolved_location, 0 ) 

138 end_time = __.time.perf_counter( ) 

139 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

140 return _results.ContentQueryResult( 

141 location = resolved_location, 

142 term = term, 

143 documents = tuple( ), 

144 search_metadata = _results.SearchMetadata( 

145 results_count = 0, 

146 results_max = results_max, 

147 search_time_ms = search_time_ms, 

148 filters_applied = filters_applied, 

149 filters_ignored = filters_ignored ), 

150 inventory_locations = locations ) 

151 objects = await _collect_inventory_objects_multi_source( 

152 auxdata, location, resolved_location, processor_name, filters ) 

153 if not __.is_absent( content_id ): 

154 candidates = _process_content_id_filter( 

155 content_id, resolved_location, objects ) 

156 else: 

157 results = _search.filter_by_name( 

158 objects, term, search_behaviors = search_behaviors ) 

159 candidates = [ 

160 result.inventory_object 

161 for result in results[ : results_max * 3 ] ] 

162 locations = await _create_inventory_location_info( 

163 auxdata, location, resolved_location, len( objects ) ) 

164 if not candidates: 

165 end_time = __.time.perf_counter( ) 

166 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

167 return _results.ContentQueryResult( 

168 location = resolved_location, 

169 term = term, 

170 documents = tuple( ), 

171 search_metadata = _results.SearchMetadata( 

172 results_count = 0, 

173 results_max = results_max, 

174 search_time_ms = search_time_ms, 

175 filters_applied = filters_applied, 

176 filters_ignored = filters_ignored ), 

177 inventory_locations = locations ) 

178 sdetection = await _detection.detect_structure( 

179 auxdata, resolved_location, processor_name = processor_name ) 

180 structure_capabilities = sdetection.get_capabilities( ) 

181 compatible_candidates = _filter_objects_by_structure_capabilities( 

182 candidates[ : results_max ], structure_capabilities ) 

183 if not compatible_candidates: 

184 end_time = __.time.perf_counter( ) 

185 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

186 return _results.ContentQueryResult( 

187 location = resolved_location, 

188 term = term, 

189 documents = ( ), 

190 search_metadata = _results.SearchMetadata( 

191 results_count = 0, 

192 results_max = results_max, 

193 search_time_ms = search_time_ms, 

194 filters_applied = filters_applied, 

195 filters_ignored = filters_ignored ), 

196 inventory_locations = locations ) 

197 documents = await sdetection.extract_contents( 

198 auxdata, resolved_location, compatible_candidates ) 

199 end_time = __.time.perf_counter( ) 

200 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

201 return _results.ContentQueryResult( 

202 location = resolved_location, 

203 term = term, 

204 documents = tuple( documents ), 

205 search_metadata = _results.SearchMetadata( 

206 results_count = len( documents ), 

207 results_max = results_max, 

208 matches_total = len( candidates ), 

209 search_time_ms = search_time_ms, 

210 filters_applied = filters_applied, 

211 filters_ignored = filters_ignored ), 

212 inventory_locations = locations ) 

213 

214 

215async def query_inventory( # noqa: PLR0913 

216 auxdata: _state.Globals, 

217 location: LocationArgument, 

218 term: str, /, *, 

219 processor_name: __.Absential[ str ] = __.absent, 

220 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

221 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

222 results_max: int = 5, 

223) -> _results.InventoryQueryResult: 

224 ''' Searches object inventory by name. 

225 

226 Returns configurable detail levels. Always includes object names 

227 plus requested detail flags (signatures, summaries, documentation). 

228 ''' 

229 location = _normalize_location( location ) 

230 start_time = __.time.perf_counter( ) 

231 detection = await _detection.detect_inventory( 

232 auxdata, location, processor_name = processor_name ) 

233 resolved_location = _detection.resolve_source_url( location ) 

234 filters_applied, filters_ignored = validate_filters( 

235 filters, detection.processor.capabilities ) 

236 if filters_ignored: 

237 end_time = __.time.perf_counter( ) 

238 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

239 return _results.InventoryQueryResult( 

240 location = resolved_location, 

241 term = term, 

242 objects = ( ), 

243 search_metadata = _results.SearchMetadata( 

244 results_count = 0, 

245 results_max = results_max, 

246 matches_total = 0, 

247 search_time_ms = search_time_ms, 

248 filters_applied = filters_applied, 

249 filters_ignored = filters_ignored ), 

250 inventory_locations = tuple( [ 

251 _results.InventoryLocationInfo( 

252 inventory_type = detection.processor.name, 

253 location_url = resolved_location, 

254 processor_name = detection.processor.name, 

255 confidence = detection.confidence, 

256 object_count = 0 ) ] ) ) 

257 objects = await detection.filter_inventory( 

258 auxdata, resolved_location, filters = filters ) 

259 results = _search.filter_by_name( 

260 objects, term, search_behaviors = search_behaviors ) 

261 all_selections = [ 

262 result.inventory_object for result in results ] 

263 end_time = __.time.perf_counter( ) 

264 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

265 return _results.InventoryQueryResult( 

266 location = resolved_location, 

267 term = term, 

268 objects = tuple( all_selections ), 

269 search_metadata = _results.SearchMetadata( 

270 results_count = len( all_selections ), 

271 results_max = results_max, 

272 matches_total = len( objects ), 

273 search_time_ms = search_time_ms, 

274 filters_applied = filters_applied, 

275 filters_ignored = filters_ignored ), 

276 inventory_locations = tuple( [ 

277 _results.InventoryLocationInfo( 

278 inventory_type = detection.processor.name, 

279 location_url = resolved_location, 

280 processor_name = detection.processor.name, 

281 confidence = detection.confidence, 

282 object_count = len( objects ) ) ] ) ) 

283 

284 

285 

286async def survey_processors( 

287 auxdata: _state.Globals, /, 

288 genus: _interfaces.ProcessorGenera, 

289 name: __.typx.Optional[ str ] = None, 

290) -> _results.ProcessorsSurveyResult: 

291 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

292 start_time = __.time.perf_counter( ) 

293 match genus: 

294 case _interfaces.ProcessorGenera.Inventory: 

295 processors = dict( _processors.inventory_processors ) 

296 case _interfaces.ProcessorGenera.Structure: 

297 processors = dict( _processors.structure_processors ) 

298 if name is not None and name not in processors: 

299 raise _exceptions.ProcessorInavailability( 

300 name, 

301 genus = genus.value ) 

302 processor_infos: list[ _results.ProcessorInfo ] = [ ] 

303 for name_, processor in processors.items( ): 

304 if name is None or name_ == name: 

305 processor_info = _results.ProcessorInfo( 

306 processor_name = name_, 

307 processor_type = genus.value, 

308 capabilities = processor.capabilities, 

309 ) 

310 processor_infos.append( processor_info ) 

311 end_time = __.time.perf_counter( ) 

312 survey_time_ms = int( ( end_time - start_time ) * 1000 ) 

313 return _results.ProcessorsSurveyResult( 

314 genus = genus, 

315 filter_name = name, 

316 processors = tuple( processor_infos ), 

317 survey_time_ms = survey_time_ms, 

318 ) 

319 

320 

321async def _collect_inventory_objects_multi_source( 

322 auxdata: _state.Globals, 

323 location: str, 

324 resolved_location: str, 

325 processor_name: __.Absential[ str ], 

326 filters: __.cabc.Mapping[ str, __.typx.Any ], 

327) -> tuple[ _results.InventoryObject, ... ]: 

328 ''' Collects inventory objects using multi-source coordination. 

329 

330 Optimized to pre-filter inventory sources by structure processor 

331 compatibility before making network requests. 

332 ''' 

333 try: 

334 inventory_detections = ( 

335 await _detection.collect_filter_inventories( auxdata, location ) ) 

336 except Exception: 

337 idetection = await _detection.detect_inventory( 

338 auxdata, location, processor_name = processor_name ) 

339 return await idetection.filter_inventory( 

340 auxdata, resolved_location, filters = filters ) 

341 if not inventory_detections: return ( ) 

342 sdetection = await _detection.detect_structure( 

343 auxdata, resolved_location, processor_name = processor_name ) 

344 structure_capabilities = sdetection.get_capabilities( ) 

345 compatible_detections = _filter_detections_by_structure_capabilities( 

346 inventory_detections, structure_capabilities ) 

347 if not compatible_detections: return ( ) 

348 return await _merge_primary_supplementary( 

349 auxdata, compatible_detections, location, filters = filters ) 

350 

351 

352async def _create_inventory_location_info( 

353 auxdata: _state.Globals, 

354 location: str, 

355 resolved_location: str, 

356 object_count: int, 

357) -> tuple[ _results.InventoryLocationInfo, ... ]: 

358 ''' Creates inventory location info for multi-source results. ''' 

359 try: 

360 inventory_detections = ( 

361 await _detection.collect_filter_inventories( 

362 auxdata, location ) ) 

363 except Exception: 

364 idetection = await _detection.detect_inventory( auxdata, location ) 

365 return tuple( [ _results.InventoryLocationInfo( 

366 inventory_type = idetection.processor.name, 

367 location_url = resolved_location, 

368 processor_name = idetection.processor.name, 

369 confidence = idetection.confidence, 

370 object_count = object_count ) ] ) 

371 if not inventory_detections: 

372 return ( ) 

373 primary_detection = _select_primary_detection( inventory_detections ) 

374 return tuple( [ _results.InventoryLocationInfo( 

375 inventory_type = primary_detection.processor.name, 

376 location_url = resolved_location, 

377 processor_name = primary_detection.processor.name, 

378 confidence = primary_detection.confidence, 

379 object_count = object_count ) ] ) 

380 

381 

382def _filter_detections_by_structure_capabilities( 

383 inventory_detections: __.cabc.Mapping[ 

384 str, _processors.InventoryDetection ], 

385 structure_capabilities: _interfaces.StructureProcessorCapabilities, 

386) -> __.immut.Dictionary[ str, _processors.InventoryDetection ]: 

387 ''' Filters inventory detections by structure processor capabilities. 

388 

389 Pre-filters inventory sources by compatibility before object collection 

390 to avoid unnecessary network requests and processing overhead. 

391 ''' 

392 compatible_detections = { 

393 processor_name: detection 

394 for processor_name, detection in inventory_detections.items( ) 

395 if structure_capabilities.supports_inventory_type( 

396 detection.processor.name ) } 

397 return __.immut.Dictionary( compatible_detections ) 

398 

399 

400def _filter_objects_by_structure_capabilities( 

401 candidates: __.cabc.Sequence[ _results.InventoryObject ], 

402 structure_capabilities: _interfaces.StructureProcessorCapabilities, 

403) -> tuple[ _results.InventoryObject, ... ]: 

404 ''' Filters inventory objects by structure processor capabilities. ''' 

405 compatible_objects = [ 

406 obj for obj in candidates 

407 if structure_capabilities.supports_inventory_type( 

408 obj.inventory_type ) ] 

409 return tuple( compatible_objects ) 

410 

411 

412async def _merge_primary_supplementary( 

413 auxdata: _state.Globals, 

414 inventory_detections: __.cabc.Mapping[ 

415 str, _processors.InventoryDetection ], 

416 location: str, 

417 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

418) -> tuple[ _results.InventoryObject, ... ]: 

419 ''' Merges inventory objects using PRIMARY_SUPPLEMENTARY strategy. 

420 

421 Uses highest-confidence detection as primary source, adds supplementary 

422 objects from other qualified sources with preserved source attribution. 

423 No deduplication - complementary metadata is valuable. 

424 

425 Note: inventory_detections should already be pre-filtered for 

426 compatibility with the structure processor to avoid unnecessary 

427 network requests. 

428 ''' 

429 if not inventory_detections: return ( ) 

430 objects_aggregate: list[ _results.InventoryObject ] = [ ] 

431 location_ = _detection.resolve_source_url( location ) 

432 for detection in inventory_detections.values( ): 

433 objects = await detection.filter_inventory( 

434 auxdata, location_, filters = filters ) 

435 objects_aggregate.extend( objects ) 

436 return tuple( objects_aggregate ) 

437 

438 

439def _normalize_location( location: str ) -> str: 

440 ''' Normalizes location URL by stripping index.html. ''' 

441 if location.endswith( '/' ): return location[ : -1 ] 

442 if location.endswith( '/index.html' ): return location[ : -11 ] 

443 return location 

444 

445 

446def _process_content_id_filter( 

447 content_id: str, 

448 location: str, 

449 objects: __.cabc.Sequence[ _results.InventoryObject ], 

450) -> tuple[ _results.InventoryObject, ... ]: 

451 ''' Processes content ID for browse-then-extract workflow filtering. ''' 

452 try: location_, name = _results.parse_content_id( content_id ) 

453 except ValueError as exc: 

454 raise _exceptions.ContentIdInvalidity( 

455 content_id, f"Parsing failed: {exc}" ) from exc 

456 if location_ != location: 

457 raise _exceptions.ContentIdLocationMismatch( location_, location ) 

458 objects_ = [ obj for obj in objects if obj.name == name ] 

459 if not objects_: 

460 raise _exceptions.ContentIdObjectAbsence( name, location ) 

461 return tuple( objects_[ :1 ] ) 

462 

463 

464def _select_primary_detection( 

465 inventory_detections: __.cabc.Mapping[ 

466 str, _processors.InventoryDetection ], 

467) -> _processors.InventoryDetection: 

468 ''' Selects primary detection with highest confidence. ''' 

469 detections_list = list( inventory_detections.values( ) ) 

470 detections_list.sort( key = lambda d: -d.confidence ) 

471 return detections_list[ 0 ]