Coverage for sources/librovore/functions.py: 16%

87 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-06 02:25 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core business logic shared between CLI and MCP server. ''' 

22 

23 

24from . import __ 

25from . import detection as _detection 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import processors as _processors 

29from . import results as _results 

30from . import search as _search 

31from . import state as _state 

32 

33 

34 

35_SUCCESS_RATE_MINIMUM = 0.1 

36 

37 

38LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

39 str, __.ddoc.Fname( 'location argument' ) ] 

40 

41 

42_search_behaviors_default = _interfaces.SearchBehaviors( ) 

43_filters_default = __.immut.Dictionary[ str, __.typx.Any ]( ) 

44 

45 

46async def detect( 

47 auxdata: _state.Globals, 

48 location: LocationArgument, /, 

49 genus: _interfaces.ProcessorGenera, 

50 processor_name: __.Absential[ str ] = __.absent, 

51) -> _results.DetectionsResult: 

52 ''' Detects relevant processors of particular genus for location. ''' 

53 location = _normalize_location( location ) 

54 start_time = __.time.perf_counter( ) 

55 detections, detection_optimal = ( 

56 await _detection.access_detections( 

57 auxdata, location, genus = genus ) ) 

58 end_time = __.time.perf_counter( ) 

59 detection_time_ms = int( ( end_time - start_time ) * 1000 ) 

60 if __.is_absent( detection_optimal ): 

61 genus_name = ( 

62 genus.name.lower( ) if hasattr( genus, 'name' ) else str( genus ) ) 

63 raise _exceptions.ProcessorInavailability( 

64 location, 

65 genus = genus_name ) 

66 # Convert detections mapping to tuple of results.Detection objects 

67 detections_tuple = tuple( 

68 _results.Detection( 

69 processor_name = detection.processor.name, 

70 confidence = detection.confidence, 

71 processor_type = genus.value, 

72 detection_metadata = __.immut.Dictionary( ), 

73 ) 

74 for detection in detections.values( ) 

75 ) 

76 # Convert detection_optimal to results.Detection 

77 detection_optimal_result = _results.Detection( 

78 processor_name = detection_optimal.processor.name, 

79 confidence = detection_optimal.confidence, 

80 processor_type = genus.value, 

81 detection_metadata = __.immut.Dictionary( ), 

82 ) 

83 return _results.DetectionsResult( 

84 source = location, 

85 detections = detections_tuple, 

86 detection_optimal = detection_optimal_result, 

87 time_detection_ms = detection_time_ms ) 

88 

89 

90async def query_content( # noqa: PLR0913 

91 auxdata: _state.Globals, 

92 location: LocationArgument, 

93 term: str, /, *, 

94 processor_name: __.Absential[ str ] = __.absent, 

95 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

96 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

97 content_id: __.Absential[ str ] = __.absent, 

98 results_max: int = 10, 

99 lines_max: __.typx.Optional[ int ] = None, 

100) -> _results.ContentQueryResult: 

101 ''' Searches documentation content with relevance ranking. ''' 

102 location = _normalize_location( location ) 

103 start_time = __.time.perf_counter( ) 

104 idetection = await _detection.detect_inventory( 

105 auxdata, location, processor_name = processor_name ) 

106 # Resolve URL after detection to get working URL if redirect exists 

107 resolved_location = _detection.resolve_source_url( location ) 

108 objects = await idetection.filter_inventory( 

109 auxdata, resolved_location, 

110 filters = filters ) 

111 if not __.is_absent( content_id ): 

112 candidates = _process_content_id_filter( 

113 content_id, resolved_location, objects ) 

114 else: 

115 results = _search.filter_by_name( 

116 objects, term, search_behaviors = search_behaviors ) 

117 candidates = [ 

118 result.inventory_object 

119 for result in results[ : results_max * 3 ] ] 

120 locations = tuple( [ _results.InventoryLocationInfo( 

121 inventory_type = idetection.processor.name, 

122 location_url = resolved_location, 

123 processor_name = idetection.processor.name, 

124 confidence = idetection.confidence, 

125 object_count = len( objects ) ) ] ) 

126 if not candidates: 

127 end_time = __.time.perf_counter( ) 

128 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

129 return _results.ContentQueryResult( 

130 location = resolved_location, 

131 term = term, 

132 documents = tuple( ), 

133 search_metadata = _results.SearchMetadata( 

134 results_count = 0, 

135 results_max = results_max, 

136 search_time_ms = search_time_ms ), 

137 inventory_locations = locations ) 

138 sdetection = await _detection.detect_structure( 

139 auxdata, resolved_location, processor_name = processor_name ) 

140 documents = await sdetection.extract_contents( 

141 auxdata, resolved_location, candidates[ : results_max ] ) 

142 end_time = __.time.perf_counter( ) 

143 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

144 return _results.ContentQueryResult( 

145 location = resolved_location, 

146 term = term, 

147 documents = tuple( documents ), 

148 search_metadata = _results.SearchMetadata( 

149 results_count = len( documents ), 

150 results_max = results_max, 

151 matches_total = len( candidates ), 

152 search_time_ms = search_time_ms ), 

153 inventory_locations = locations ) 

154 

155 

156async def query_inventory( # noqa: PLR0913 

157 auxdata: _state.Globals, 

158 location: LocationArgument, 

159 term: str, /, *, 

160 processor_name: __.Absential[ str ] = __.absent, 

161 search_behaviors: _interfaces.SearchBehaviors = _search_behaviors_default, 

162 filters: __.cabc.Mapping[ str, __.typx.Any ] = _filters_default, 

163 results_max: int = 5, 

164) -> _results.InventoryQueryResult: 

165 ''' Searches object inventory by name. 

166 

167 Returns configurable detail levels. Always includes object names 

168 plus requested detail flags (signatures, summaries, documentation). 

169 ''' 

170 location = _normalize_location( location ) 

171 start_time = __.time.perf_counter( ) 

172 detection = await _detection.detect_inventory( 

173 auxdata, location, processor_name = processor_name ) 

174 # Resolve URL after detection to get working URL if redirect exists 

175 resolved_location = _detection.resolve_source_url( location ) 

176 objects = await detection.filter_inventory( 

177 auxdata, resolved_location, filters = filters ) 

178 results = _search.filter_by_name( 

179 objects, term, search_behaviors = search_behaviors ) 

180 selections = [ 

181 result.inventory_object for result in results[ : results_max ] ] 

182 end_time = __.time.perf_counter( ) 

183 search_time_ms = int( ( end_time - start_time ) * 1000 ) 

184 return _results.InventoryQueryResult( 

185 location = resolved_location, 

186 term = term, 

187 objects = tuple( selections ), 

188 search_metadata = _results.SearchMetadata( 

189 results_count = len( selections ), 

190 results_max = results_max, 

191 matches_total = len( objects ), 

192 search_time_ms = search_time_ms ), 

193 inventory_locations = tuple( [ 

194 _results.InventoryLocationInfo( 

195 inventory_type = detection.processor.name, 

196 location_url = resolved_location, 

197 processor_name = detection.processor.name, 

198 confidence = detection.confidence, 

199 object_count = len( objects ) ) ] ) ) 

200 

201 

202 

203async def survey_processors( 

204 auxdata: _state.Globals, /, 

205 genus: _interfaces.ProcessorGenera, 

206 name: __.typx.Optional[ str ] = None, 

207) -> _results.ProcessorsSurveyResult: 

208 ''' Lists processor capabilities for specified genus, filtered by name. ''' 

209 start_time = __.time.perf_counter( ) 

210 match genus: 

211 case _interfaces.ProcessorGenera.Inventory: 

212 processors = dict( _processors.inventory_processors ) 

213 case _interfaces.ProcessorGenera.Structure: 

214 processors = dict( _processors.structure_processors ) 

215 if name is not None and name not in processors: 

216 raise _exceptions.ProcessorInavailability( 

217 name, 

218 genus = genus.value ) 

219 processor_infos: list[ _results.ProcessorInfo ] = [ ] 

220 for name_, processor in processors.items( ): 

221 if name is None or name_ == name: 

222 processor_info = _results.ProcessorInfo( 

223 processor_name = name_, 

224 processor_type = genus.value, 

225 capabilities = processor.capabilities, 

226 ) 

227 processor_infos.append( processor_info ) 

228 end_time = __.time.perf_counter( ) 

229 survey_time_ms = int( ( end_time - start_time ) * 1000 ) 

230 return _results.ProcessorsSurveyResult( 

231 genus = genus, 

232 filter_name = name, 

233 processors = tuple( processor_infos ), 

234 survey_time_ms = survey_time_ms, 

235 ) 

236 

237 

238 

239def _normalize_location( location: str ) -> str: 

240 ''' Normalizes location URL by stripping index.html. ''' 

241 if location.endswith( '/' ): return location[ : -1 ] 

242 if location.endswith( '/index.html' ): return location[ : -11 ] 

243 return location 

244 

245 

246def _process_content_id_filter( 

247 content_id: str, 

248 resolved_location: str, 

249 objects: __.cabc.Sequence[ _results.InventoryObject ], 

250) -> tuple[ _results.InventoryObject, ... ]: 

251 ''' Processes content ID for browse-then-extract workflow filtering. ''' 

252 try: 

253 parsed_location, name = _results.parse_content_id( content_id ) 

254 except ValueError as exc: 

255 raise _exceptions.ContentIdInvalidity( 

256 content_id, f"Parsing failed: {exc}" ) from exc 

257 if parsed_location != resolved_location: 

258 raise _exceptions.ContentIdLocationMismatch( 

259 parsed_location, resolved_location ) 

260 matching_objects = [ 

261 obj for obj in objects if obj.name == name ] 

262 if not matching_objects: 

263 raise _exceptions.ContentIdObjectAbsence( 

264 name, resolved_location ) 

265 return tuple( matching_objects[ :1 ] ) 

266 

267