Coverage for sources/librovore/functions.py: 15%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-29 01:14 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import results as _results 

30from . import search as _search 

31from . import state as _state 

32 

33 

34_SUCCESS_RATE_MINIMUM = 0.1 

35 

36 

37LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

38 str, __.ddoc.Fname( 'location argument' ) ] 

39 

40 

41_search_behaviors_default = _interfaces.SearchBehaviors( ) 

42_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

43 

44 

45async def detect( 

46 auxdata: _state.Globals, 

47 location: LocationArgument, /, 

48 genus: _interfaces.ProcessorGenera, 

49 processor_name: __.Absential[ str ] = __.absent, 

50) -> _results.DetectionsResult | _results.ErrorResponse: 

51 ''' Detects relevant processors of particular genus for location. ''' 

52 location = _normalize_location( location ) 

53 start_time = __.time.perf_counter( ) 

54 detections, detection_optimal = ( 

55 await _detection.access_detections( 

56 auxdata, location, genus = genus ) ) 

57 end_time = __.time.perf_counter( ) 

58 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

59 if __.is_absent( detection_optimal ): 

60 # Create a synthetic exception to get proper error formatting 

61 genus_name = ( 

62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

63 exc = _exceptions.ProcessorInavailability( genus_name ) 

64 return _produce_processor_error_response( 

65 exc, location, 'detection', genus = genus ) 

66 # Convert detections mapping to tuple of results.Detection objects 

67 detections_tuple = tuple( 

68 _results.Detection( 

69 processor_name = detection.processor.name, 

70 confidence = detection.confidence, 

71 processor_type = genus.value, 

72 detection_metadata = __.immut.Dictionary( ), 

73 ) 

74 for detection in detections.values( ) 

75 ) 

76 # Convert detection_optimal to results.Detection 

77 detection_optimal_result = _results.Detection( 

78 processor_name = detection_optimal.processor.name, 

79 confidence = detection_optimal.confidence, 

80 processor_type = genus.value, 

81 detection_metadata = __.immut.Dictionary( ), 

82 ) 

83 return _results.DetectionsResult( 

84 source = location, 

85 detections = detections_tuple, 

86 detection_optimal = detection_optimal_result, 

87 time_detection_ms = detection_time_ms ) 

88 

89 

90async def query_content( # noqa: PLR0913 

91 auxdata: _state.Globals, 

92 location: LocationArgument, 

93 term: str, /, *, 

94 processor_name: __.Absential[ str ] = __.absent, 

95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

97 include_snippets: bool = True, 

98 results_max: int = 10, 

99) -> _results.ContentResult: 

100 ''' Searches documentation content with relevance ranking. ''' 

101 location = _normalize_location( location ) 

102 start_time = __.time.perf_counter( ) 

103 try: 

104 idetection = await _detection.detect_inventory( 

105 auxdata, location, processor_name = processor_name ) 

106 except _exceptions.ProcessorInavailability as exc: 

107 return _produce_processor_error_response( 

108 exc, location, term, 

109 genus = _interfaces.ProcessorGenera.Inventory ) 

110 # Resolve URL after detection to get working URL if redirect exists 

111 resolved_location = _detection.resolve_source_url( location ) 

112 objects = await idetection.filter_inventory( 

113 auxdata, resolved_location, 

114 filters = filters, 

115 details = _interfaces.InventoryQueryDetails.Name ) 

116 results = _search.filter_by_name( 

117 objects, term, 

118 match_mode = search_behaviors.match_mode, 

119 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

120 candidates = [ 

121 result.inventory_object for result in results[ : results_max * 3 ] ] 

122 locations = tuple( [ _results.InventoryLocationInfo( 

123 inventory_type = idetection.processor.name, 

124 location_url = resolved_location, 

125 processor_name = idetection.processor.name, 

126 confidence = idetection.confidence, 

127 object_count = len( objects ) ) ] ) 

128 if not candidates: 

129 end_time = __.time.perf_counter( ) 

130 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

131 return _results.ContentQueryResult( 

132 location = resolved_location, 

133 query = term, 

134 documents = tuple( ), 

135 search_metadata = _results.SearchMetadata( 

136 results_count = 0, 

137 results_max = results_max, 

138 search_time_ms = search_time_ms ), 

139 inventory_locations = locations ) 

140 sdetection = await _detection.detect_structure( 

141 auxdata, resolved_location, processor_name = processor_name ) 

142 documents = await sdetection.extract_contents( 

143 auxdata, resolved_location, candidates[ : results_max ], 

144 include_snippets = include_snippets ) 

145 end_time = __.time.perf_counter( ) 

146 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

147 return _results.ContentQueryResult( 

148 location = resolved_location, 

149 query = term, 

150 documents = tuple( documents ), 

151 search_metadata = _results.SearchMetadata( 

152 results_count = len( documents ), 

153 results_max = results_max, 

154 matches_total = len( candidates ), 

155 search_time_ms = search_time_ms ), 

156 inventory_locations = locations ) 

157 

158 

159async def query_inventory( # noqa: PLR0913 

160 auxdata: _state.Globals, 

161 location: LocationArgument, 

162 term: str, /, *, 

163 processor_name: __.Absential[ str ] = __.absent, 

164 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

165 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

166 details: _interfaces.InventoryQueryDetails = ( 

167 _interfaces.InventoryQueryDetails.Documentation ), 

168 results_max: int = 5, 

169) -> _results.InventoryResult: 

170 ''' Searches object inventory by name. 

171 

172 Returns configurable detail levels. Always includes object names 

173 plus requested detail flags (signatures, summaries, documentation). 

174 ''' 

175 location = _normalize_location( location ) 

176 start_time = __.time.perf_counter( ) 

177 try: 

178 detection = await _detection.detect_inventory( 

179 auxdata, location, processor_name = processor_name ) 

180 except _exceptions.ProcessorInavailability as exc: 

181 return _produce_processor_error_response( 

182 exc, location, term, 

183 genus = _interfaces.ProcessorGenera.Inventory ) 

184 # Resolve URL after detection to get working URL if redirect exists 

185 resolved_location = _detection.resolve_source_url( location ) 

186 objects = await detection.filter_inventory( 

187 auxdata, resolved_location, filters = filters, details = details ) 

188 results = _search.filter_by_name( 

189 objects, term, 

190 match_mode = search_behaviors.match_mode, 

191 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

192 selections = [ 

193 result.inventory_object for result in results[ : results_max ] ] 

194 end_time = __.time.perf_counter( ) 

195 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

196 return _results.InventoryQueryResult( 

197 location = resolved_location, 

198 query = term, 

199 objects = tuple( selections ), 

200 search_metadata = _results.SearchMetadata( 

201 results_count = len( selections ), 

202 results_max = results_max, 

203 matches_total = len( objects ), 

204 search_time_ms = search_time_ms ), 

205 inventory_locations = tuple( [ 

206 _results.InventoryLocationInfo( 

207 inventory_type = detection.processor.name, 

208 location_url = resolved_location, 

209 processor_name = detection.processor.name, 

210 confidence = detection.confidence, 

211 object_count = len( objects ) ) ] ) ) 

212 

213 

214 

215async def survey_processors( 

216 auxdata: _state.Globals, /, 

217 genus: _interfaces.ProcessorGenera, 

218 name: __.typx.Optional[ str ] = None, 

219) -> _results.ProcessorsSurveyResultUnion: 

220 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

221 start_time = __.time.perf_counter( ) 

222 match genus: 

223 case _interfaces.ProcessorGenera.Inventory: 

224 processors = dict( _processors.inventory_processors ) 

225 case _interfaces.ProcessorGenera.Structure: 

226 processors = dict( _processors.structure_processors ) 

227 if name is not None and name not in processors: 

228 exc = _exceptions.ProcessorInavailability( name ) 

229 return _produce_processor_error_response( 

230 exc, '', name or '', genus = genus ) 

231 processor_infos: list[ _results.ProcessorInfo ] = [ ] 

232 for name_, processor in processors.items( ): 

233 if name is None or name_ == name: 

234 processor_info = _results.ProcessorInfo( 

235 processor_name = name_, 

236 processor_type = genus.value, 

237 capabilities = processor.capabilities, 

238 ) 

239 processor_infos.append( processor_info ) 

240 end_time = __.time.perf_counter( ) 

241 survey_time_ms = int( ( end_time - start_time ) * 1000 ) 

242 return _results.ProcessorsSurveyResult( 

243 genus = genus, 

244 filter_name = name, 

245 processors = tuple( processor_infos ), 

246 survey_time_ms = survey_time_ms, 

247 ) 

248 

249 

250 

251def _normalize_location( location: str ) -> str: 

252 ''' Normalizes location URL by stripping index.html. ''' 

253 if location.endswith( '/' ): return location[ : -1 ] 

254 if location.endswith( '/index.html' ): return location[ : -11 ] 

255 return location 

256 

257 

258def _produce_generic_error_response( 

259 exc: _exceptions.ProcessorInavailability, 

260 location: str, 

261 query: str, 

262) -> _results.ErrorResponse: 

263 ''' Produces structured error response for generic processor failures. ''' 

264 return _results.ErrorResponse( 

265 location = location, 

266 query = query, 

267 error = _results.ErrorInfo( 

268 type = 'processor_unavailable', 

269 title = 'No Compatible Processor Found', 

270 message = ( 

271 'No compatible processor found to handle this ' 

272 'documentation source.' ), 

273 suggestion = ( 

274 'Verify the URL points to a supported documentation format.' ) 

275 ) ) 

276 

277 

278def _produce_inventory_error_response( 

279 exc: _exceptions.ProcessorInavailability, 

280 location: str, 

281 query: str, 

282) -> _results.ErrorResponse: 

283 ''' Produces structured error response for inventory failures. ''' 

284 return _results.ErrorResponse( 

285 location = location, 

286 query = query, 

287 error = _results.ErrorInfo( 

288 type = 'processor_unavailable', 

289 title = 'No Compatible Format Detected', 

290 message = ( 

291 'No compatible inventory format detected at this ' 

292 'documentation source.' ), 

293 suggestion = ( 

294 'Verify the URL points to a supported documentation site.' ) ) 

295 ) 

296 

297 

298def _produce_processor_error_response( 

299 exc: _exceptions.ProcessorInavailability, 

300 location: str, 

301 query: str, 

302 genus: __.Absential[ _interfaces.ProcessorGenera ] = __.absent, 

303) -> _results.ErrorResponse: 

304 ''' Produces appropriate structured error response based on genus. ''' 

305 match genus: 

306 case _interfaces.ProcessorGenera.Inventory: 

307 return _produce_inventory_error_response( exc, location, query ) 

308 case _interfaces.ProcessorGenera.Structure: 

309 return _produce_structure_error_response( exc, location, query ) 

310 case _: 

311 return _produce_generic_error_response( exc, location, query ) 

312 

313 

314def _produce_structure_error_response( 

315 exc: _exceptions.ProcessorInavailability, 

316 location: str, 

317 query: str, 

318) -> _results.ErrorResponse: 

319 ''' Produces structured error response for structure failures. ''' 

320 return _results.ErrorResponse( 

321 location = location, 

322 query = query, 

323 error = _results.ErrorInfo( 

324 type = 'processor_unavailable', 

325 title = 'No Compatible Structure Processor', 

326 message = ( 

327 'No compatible structure processor found for this ' 

328 'documentation source.' ), 

329 suggestion = ( 

330 'Ensure the site uses a supported documentation format ' 

331 'like Sphinx or MkDocs.' ) ) ) 

332 

333 

334def _serialize_for_json( obj: __.typx.Any ) -> __.typx.Any: 

335 ''' Recursively serializes dataclass objects to JSON-compatible format. ''' 

336 # TODO: Remove type suppressions. 

337 if __.dcls.is_dataclass( obj ): 

338 result = { } # type: ignore[var-annotated] 

339 for field in __.dcls.fields( obj ): 

340 if field.name.startswith( '_' ): 

341 continue 

342 value = getattr( obj, field.name ) 

343 result[ field.name ] = _serialize_for_json( value ) 

344 return result # type: ignore[return-value] 

345 if isinstance( obj, ( list, tuple ) ): 

346 return [ _serialize_for_json( item ) for item in obj ] # type: ignore[misc] 

347 if isinstance( obj, ( frozenset, set ) ): 

348 return list( obj ) # type: ignore[arg-type] 

349 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary) 

350 return { k: _serialize_for_json( v ) for k, v in obj.items( ) } 

351 if obj is None or isinstance( obj, ( str, int, float, bool ) ): 

352 return obj 

353 return str( obj )