Coverage for sources/librovore/functions.py: 15%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-02 00:02 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import results as _results 

30from . import search as _search 

31from . import state as _state 

32 

33 

34_SUCCESS_RATE_MINIMUM = 0.1 

35 

36 

37LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

38 str, __.ddoc.Fname( 'location argument' ) ] 

39 

40 

41_search_behaviors_default = _interfaces.SearchBehaviors( ) 

42_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

43 

44 

45async def detect( 

46 auxdata: _state.Globals, 

47 location: LocationArgument, /, 

48 genus: _interfaces.ProcessorGenera, 

49 processor_name: __.Absential[ str ] = __.absent, 

50) -> _results.DetectionsResult: 

51 ''' Detects relevant processors of particular genus for location. ''' 

52 location = _normalize_location( location ) 

53 start_time = __.time.perf_counter( ) 

54 detections, detection_optimal = ( 

55 await _detection.access_detections( 

56 auxdata, location, genus = genus ) ) 

57 end_time = __.time.perf_counter( ) 

58 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

59 if __.is_absent( detection_optimal ): 

60 genus_name = ( 

61 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

62 raise _exceptions.ProcessorInavailability( 

63 location, 

64 genus = genus_name ) 

65 # Convert detections mapping to tuple of results.Detection objects 

66 detections_tuple = tuple( 

67 _results.Detection( 

68 processor_name = detection.processor.name, 

69 confidence = detection.confidence, 

70 processor_type = genus.value, 

71 detection_metadata = __.immut.Dictionary( ), 

72 ) 

73 for detection in detections.values( ) 

74 ) 

75 # Convert detection_optimal to results.Detection 

76 detection_optimal_result = _results.Detection( 

77 processor_name = detection_optimal.processor.name, 

78 confidence = detection_optimal.confidence, 

79 processor_type = genus.value, 

80 detection_metadata = __.immut.Dictionary( ), 

81 ) 

82 return _results.DetectionsResult( 

83 source = location, 

84 detections = detections_tuple, 

85 detection_optimal = detection_optimal_result, 

86 time_detection_ms = detection_time_ms ) 

87 

88 

89async def query_content( # noqa: PLR0913 

90 auxdata: _state.Globals, 

91 location: LocationArgument, 

92 term: str, /, *, 

93 processor_name: __.Absential[ str ] = __.absent, 

94 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

95 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

96 results_max: int = 10, 

97 lines_max: __.typx.Optional[ int ] = None, 

98) -> _results.ContentQueryResult: 

99 ''' Searches documentation content with relevance ranking. ''' 

100 location = _normalize_location( location ) 

101 start_time = __.time.perf_counter( ) 

102 idetection = await _detection.detect_inventory( 

103 auxdata, location, processor_name = processor_name ) 

104 # Resolve URL after detection to get working URL if redirect exists 

105 resolved_location = _detection.resolve_source_url( location ) 

106 objects = await idetection.filter_inventory( 

107 auxdata, resolved_location, 

108 filters = filters, 

109 details = _interfaces.InventoryQueryDetails.Name ) 

110 results = _search.filter_by_name( 

111 objects, term, 

112 match_mode = search_behaviors.match_mode, 

113 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

114 candidates = [ 

115 result.inventory_object for result in results[ : results_max * 3 ] ] 

116 locations = tuple( [ _results.InventoryLocationInfo( 

117 inventory_type = idetection.processor.name, 

118 location_url = resolved_location, 

119 processor_name = idetection.processor.name, 

120 confidence = idetection.confidence, 

121 object_count = len( objects ) ) ] ) 

122 if not candidates: 

123 end_time = __.time.perf_counter( ) 

124 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

125 return _results.ContentQueryResult( 

126 location = resolved_location, 

127 query = term, 

128 documents = tuple( ), 

129 search_metadata = _results.SearchMetadata( 

130 results_count = 0, 

131 results_max = results_max, 

132 search_time_ms = search_time_ms ), 

133 inventory_locations = locations ) 

134 sdetection = await _detection.detect_structure( 

135 auxdata, resolved_location, processor_name = processor_name ) 

136 documents = await sdetection.extract_contents( 

137 auxdata, resolved_location, candidates[ : results_max ] ) 

138 end_time = __.time.perf_counter( ) 

139 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

140 return _results.ContentQueryResult( 

141 location = resolved_location, 

142 query = term, 

143 documents = tuple( documents ), 

144 search_metadata = _results.SearchMetadata( 

145 results_count = len( documents ), 

146 results_max = results_max, 

147 matches_total = len( candidates ), 

148 search_time_ms = search_time_ms ), 

149 inventory_locations = locations ) 

150 

151 

152async def query_inventory( # noqa: PLR0913 

153 auxdata: _state.Globals, 

154 location: LocationArgument, 

155 term: str, /, *, 

156 processor_name: __.Absential[ str ] = __.absent, 

157 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

158 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

159 details: _interfaces.InventoryQueryDetails = ( 

160 _interfaces.InventoryQueryDetails.Name ), 

161 results_max: int = 5, 

162) -> _results.InventoryQueryResult: 

163 ''' Searches object inventory by name. 

164 

165 Returns configurable detail levels. Always includes object names 

166 plus requested detail flags (signatures, summaries, documentation). 

167 ''' 

168 location = _normalize_location( location ) 

169 start_time = __.time.perf_counter( ) 

170 detection = await _detection.detect_inventory( 

171 auxdata, location, processor_name = processor_name ) 

172 # Resolve URL after detection to get working URL if redirect exists 

173 resolved_location = _detection.resolve_source_url( location ) 

174 objects = await detection.filter_inventory( 

175 auxdata, resolved_location, filters = filters, details = details ) 

176 results = _search.filter_by_name( 

177 objects, term, 

178 match_mode = search_behaviors.match_mode, 

179 fuzzy_threshold = search_behaviors.fuzzy_threshold ) 

180 selections = [ 

181 result.inventory_object for result in results[ : results_max ] ] 

182 end_time = __.time.perf_counter( ) 

183 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

184 return _results.InventoryQueryResult( 

185 location = resolved_location, 

186 query = term, 

187 objects = tuple( selections ), 

188 search_metadata = _results.SearchMetadata( 

189 results_count = len( selections ), 

190 results_max = results_max, 

191 matches_total = len( objects ), 

192 search_time_ms = search_time_ms ), 

193 inventory_locations = tuple( [ 

194 _results.InventoryLocationInfo( 

195 inventory_type = detection.processor.name, 

196 location_url = resolved_location, 

197 processor_name = detection.processor.name, 

198 confidence = detection.confidence, 

199 object_count = len( objects ) ) ] ) ) 

200 

201 

202 

203async def survey_processors( 

204 auxdata: _state.Globals, /, 

205 genus: _interfaces.ProcessorGenera, 

206 name: __.typx.Optional[ str ] = None, 

207) -> _results.ProcessorsSurveyResult: 

208 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

209 start_time = __.time.perf_counter( ) 

210 match genus: 

211 case _interfaces.ProcessorGenera.Inventory: 

212 processors = dict( _processors.inventory_processors ) 

213 case _interfaces.ProcessorGenera.Structure: 

214 processors = dict( _processors.structure_processors ) 

215 if name is not None and name not in processors: 

216 raise _exceptions.ProcessorInavailability( 

217 name, 

218 genus = genus.value ) 

219 processor_infos: list[ _results.ProcessorInfo ] = [ ] 

220 for name_, processor in processors.items( ): 

221 if name is None or name_ == name: 

222 processor_info = _results.ProcessorInfo( 

223 processor_name = name_, 

224 processor_type = genus.value, 

225 capabilities = processor.capabilities, 

226 ) 

227 processor_infos.append( processor_info ) 

228 end_time = __.time.perf_counter( ) 

229 survey_time_ms = int( ( end_time - start_time ) * 1000 ) 

230 return _results.ProcessorsSurveyResult( 

231 genus = genus, 

232 filter_name = name, 

233 processors = tuple( processor_infos ), 

234 survey_time_ms = survey_time_ms, 

235 ) 

236 

237 

238 

239def _normalize_location( location: str ) -> str: 

240 ''' Normalizes location URL by stripping index.html. ''' 

241 if location.endswith( '/' ): return location[ : -1 ] 

242 if location.endswith( '/index.html' ): return location[ : -11 ] 

243 return location 

244 

245 

246 

247def _serialize_for_json( obj: __.typx.Any ) -> __.typx.Any: 

248 ''' Recursively serializes dataclass objects to JSON-compatible format. ''' 

249 # TODO: Remove type suppressions. 

250 if __.dcls.is_dataclass( obj ): 

251 result = { } # type: ignore[var-annotated] 

252 for field in __.dcls.fields( obj ): 

253 if field.name.startswith( '_' ): 

254 continue 

255 value = getattr( obj, field.name ) 

256 result[ field.name ] = _serialize_for_json( value ) 

257 return result # type: ignore[return-value] 

258 if isinstance( obj, ( list, tuple ) ): 

259 return [ _serialize_for_json( item ) for item in obj ] # type: ignore[misc] 

260 if isinstance( obj, ( frozenset, set ) ): 

261 return list( obj ) # type: ignore[arg-type] 

262 if hasattr( obj, 'items' ): # Handle mappings (dict, frigid.Dictionary) 

263 return { k: _serialize_for_json( v ) for k, v in obj.items( ) } 

264 if obj is None or isinstance( obj, ( str, int, float, bool ) ): 

265 return obj 

266 return str( obj )