Coverage for sources/librovore/functions.py: 13%

105 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 21:59 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import results as _results 

30from . import search as _search 

31from . import state as _state 

32 

33 

34 

35_SUCCESS_RATE_MINIMUM = 0.1 

36 

37 

38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

39 str, __.ddoc.Fname( 'location argument' ) ] 

40 

41 

42_search_behaviors_default = _interfaces.SearchBehaviors( ) 

43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

44 

45 

46async def detect( 

47 auxdata: _state.Globals, 

48 location: LocationArgument, /, 

49 genus: _interfaces.ProcessorGenera, 

50 processor_name: __.Absential[ str ] = __.absent, 

51) -> _results.DetectionsResult: 

52 ''' Detects relevant processors of particular genus for location. ''' 

53 location = _normalize_location( location ) 

54 start_time = __.time.perf_counter( ) 

55 detections, detection_optimal = ( 

56 await _detection.access_detections( 

57 auxdata, location, genus = genus ) ) 

58 end_time = __.time.perf_counter( ) 

59 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

60 if __.is_absent( detection_optimal ): 

61 genus_name = ( 

62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

63 raise _exceptions.ProcessorInavailability( 

64 location, 

65 genus = genus_name ) 

66 # Convert detections mapping to tuple of results.Detection objects 

67 detections_tuple = tuple( 

68 _results.Detection( 

69 processor_name = detection.processor.name, 

70 confidence = detection.confidence, 

71 processor_type = genus.value, 

72 detection_metadata = __.immut.Dictionary( ), 

73 ) 

74 for detection in detections.values( ) 

75 ) 

76 # Convert detection_optimal to results.Detection 

77 detection_optimal_result = _results.Detection( 

78 processor_name = detection_optimal.processor.name, 

79 confidence = detection_optimal.confidence, 

80 processor_type = genus.value, 

81 detection_metadata = __.immut.Dictionary( ), 

82 ) 

83 return _results.DetectionsResult( 

84 source = location, 

85 detections = detections_tuple, 

86 detection_optimal = detection_optimal_result, 

87 time_detection_ms = detection_time_ms ) 

88 

89 

90async def query_content( # noqa: PLR0913 

91 auxdata: _state.Globals, 

92 location: LocationArgument, 

93 term: str, /, *, 

94 processor_name: __.Absential[ str ] = __.absent, 

95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

97 content_id: __.Absential[ str ] = __.absent, 

98 results_max: int = 10, 

99 lines_max: __.typx.Optional[ int ] = None, 

100) -> _results.ContentQueryResult: 

101 ''' Searches documentation content with relevance ranking. ''' 

102 location = _normalize_location( location ) 

103 start_time = __.time.perf_counter( ) 

104 idetection = await _detection.detect_inventory( 

105 auxdata, location, processor_name = processor_name ) 

106 # Resolve URL after detection to get working URL if redirect exists 

107 resolved_location = _detection.resolve_source_url( location ) 

108 objects = await idetection.filter_inventory( 

109 auxdata, resolved_location, 

110 filters = filters, 

111 details = _interfaces.InventoryQueryDetails.Name ) 

112 if not __.is_absent( content_id ): 

113 candidates = _process_content_id_filter( 

114 content_id, resolved_location, objects ) 

115 else: 

116 results = _search.filter_by_name( 

117 objects, term, 

118 match_mode = search_behaviors.match_mode, 

119 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

120 candidates = [ 

121 result.inventory_object 

122 for result in results[ : results_max * 3 ] ] 

123 locations = tuple( [ _results.InventoryLocationInfo( 

124 inventory_type = idetection.processor.name, 

125 location_url = resolved_location, 

126 processor_name = idetection.processor.name, 

127 confidence = idetection.confidence, 

128 object_count = len( objects ) ) ] ) 

129 if not candidates: 

130 end_time = __.time.perf_counter( ) 

131 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

132 return _results.ContentQueryResult( 

133 location = resolved_location, 

134 query = term, 

135 documents = tuple( ), 

136 search_metadata = _results.SearchMetadata( 

137 results_count = 0, 

138 results_max = results_max, 

139 search_time_ms = search_time_ms ), 

140 inventory_locations = locations ) 

141 sdetection = await _detection.detect_structure( 

142 auxdata, resolved_location, processor_name = processor_name ) 

143 documents = await sdetection.extract_contents( 

144 auxdata, resolved_location, candidates[ : results_max ] ) 

145 end_time = __.time.perf_counter( ) 

146 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

147 return _results.ContentQueryResult( 

148 location = resolved_location, 

149 query = term, 

150 documents = tuple( documents ), 

151 search_metadata = _results.SearchMetadata( 

152 results_count = len( documents ), 

153 results_max = results_max, 

154 matches_total = len( candidates ), 

155 search_time_ms = search_time_ms ), 

156 inventory_locations = locations ) 

157 

158 

159async def query_inventory( # noqa: PLR0913 

160 auxdata: _state.Globals, 

161 location: LocationArgument, 

162 term: str, /, *, 

163 processor_name: __.Absential[ str ] = __.absent, 

164 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

165 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

166 details: _interfaces.InventoryQueryDetails = ( 

167 _interfaces.InventoryQueryDetails.Name ), 

168 results_max: int = 5, 

169) -> _results.InventoryQueryResult: 

170 ''' Searches object inventory by name. 

171 

172 Returns configurable detail levels. Always includes object names 

173 plus requested detail flags (signatures, summaries, documentation). 

174 ''' 

175 location = _normalize_location( location ) 

176 start_time = __.time.perf_counter( ) 

177 detection = await _detection.detect_inventory( 

178 auxdata, location, processor_name = processor_name ) 

179 # Resolve URL after detection to get working URL if redirect exists 

180 resolved_location = _detection.resolve_source_url( location ) 

181 objects = await detection.filter_inventory( 

182 auxdata, resolved_location, filters = filters, details = details ) 

183 results = _search.filter_by_name( 

184 objects, term, 

185 match_mode = search_behaviors.match_mode, 

186 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

187 selections = [ 

188 result.inventory_object for result in results[ : results_max ] ] 

189 end_time = __.time.perf_counter( ) 

190 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

191 return _results.InventoryQueryResult( 

192 location = resolved_location, 

193 query = term, 

194 objects = tuple( selections ), 

195 search_metadata = _results.SearchMetadata( 

196 results_count = len( selections ), 

197 results_max = results_max, 

198 matches_total = len( objects ), 

199 search_time_ms = search_time_ms ), 

200 inventory_locations = tuple( [ 

201 _results.InventoryLocationInfo( 

202 inventory_type = detection.processor.name, 

203 location_url = resolved_location, 

204 processor_name = detection.processor.name, 

205 confidence = detection.confidence, 

206 object_count = len( objects ) ) ] ) ) 

207 

208 

209 

210async def survey_processors( 

211 auxdata: _state.Globals, /, 

212 genus: _interfaces.ProcessorGenera, 

213 name: __.typx.Optional[ str ] = None, 

214) -> _results.ProcessorsSurveyResult: 

215 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

216 start_time = __.time.perf_counter( ) 

217 match genus: 

218 case _interfaces.ProcessorGenera.Inventory: 

219 processors = dict( _processors.inventory_processors ) 

220 case _interfaces.ProcessorGenera.Structure: 

221 processors = dict( _processors.structure_processors ) 

222 if name is not None and name not in processors: 

223 raise _exceptions.ProcessorInavailability( 

224 name, 

225 genus = genus.value ) 

226 processor_infos: list[ _results.ProcessorInfo ] = [ ] 

227 for name_, processor in processors.items( ): 

228 if name is None or name_ == name: 

229 processor_info = _results.ProcessorInfo( 

230 processor_name = name_, 

231 processor_type = genus.value, 

232 capabilities = processor.capabilities, 

233 ) 

234 processor_infos.append( processor_info ) 

235 end_time = __.time.perf_counter( ) 

236 survey_time_ms = int( ( end_time - start_time ) * 1000 ) 

237 return _results.ProcessorsSurveyResult( 

238 genus = genus, 

239 filter_name = name, 

240 processors = tuple( processor_infos ), 

241 survey_time_ms = survey_time_ms, 

242 ) 

243 

244 

245 

246def _normalize_location( location: str ) -> str: 

247 ''' Normalizes location URL by stripping index.html. ''' 

248 if location.endswith( '/' ): return location[ : -1 ] 

249 if location.endswith( '/index.html' ): return location[ : -11 ] 

250 return location 

251 

252 

253def _process_content_id_filter( 

254 content_id: str, 

255 resolved_location: str, 

256 objects: __.cabc.Sequence[ _results.InventoryObject ], 

257) -> tuple[ _results.InventoryObject, ... ]: 

258 ''' Processes content ID for browse-then-extract workflow filtering. ''' 

259 try: 

260 parsed_location, name = _results.parse_content_id( content_id ) 

261 except ValueError as exc: 

262 raise _exceptions.ContentIdInvalidity( 

263 content_id, f"Parsing failed: {exc}" ) from exc 

264 if parsed_location != resolved_location: 

265 raise _exceptions.ContentIdLocationMismatch( 

266 parsed_location, resolved_location ) 

267 matching_objects = [ 

268 obj for obj in objects if obj.name == name ] 

269 if not matching_objects: 

270 raise _exceptions.ContentIdObjectAbsence( 

271 name, resolved_location ) 

272 return tuple( matching_objects[ :1 ] ) 

273 

274 

275def _serialize_for_json( obj: __.typx.Any ) -> __.typx.Any: 

276 ''' Recursively serializes dataclass objects to JSON-compatible format. ''' 

277 # TODO: Remove type suppressions. 

278 if __.dcls.is_dataclass( obj ): 

279 result = { } # type: ignore[var-annotated] 

280 for field in __.dcls.fields( obj ): 

281 if field.name.startswith( '_' ): 

282 continue 

283 value = getattr( obj, field.name ) 

284 result[ field.name ] = _serialize_for_json( value ) 

285 return result # type: ignore[return-value] 

286 if isinstance( obj, ( list, tuple ) ): 

287 return [ _serialize_for_json( item ) for item in obj ] # type: ignore[misc] 

288 if isinstance( obj, ( frozenset, set ) ): 

289 return list( obj ) # type: ignore[arg-type] 

290 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary) 

291 return { k: _serialize_for_json( v ) for k, v in obj.items( ) } 

292 if obj is None or isinstance( obj, ( str, int, float, bool ) ): 

293 return obj 

294 return str( obj )