Coverage for sources/librovore/server.py: 61%

84 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 22:09 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' MCP server implementation. ''' 

22 

23from mcp.server.fastmcp import FastMCP as _FastMCP 

24 

25# FastMCP uses Pydantic to generate JSON schemas from function signatures. 

26from pydantic import Field as _Field 

27 

28from . import __ 

29from . import exceptions as _exceptions 

30from . import functions as _functions 

31from . import interfaces as _interfaces 

32from . import state as _state 

33 

34 

35def intercept_errors( 

36 func: __.cabc.Callable[ 

37 ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ] 

38) -> __.cabc.Callable[ ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ]: 

39 ''' Decorator for MCP functions to intercept self-rendering exceptions. 

40  

41 Catches Omnierror exceptions and returns their JSON representation 

42 instead of raising them. Other exceptions are re-raised unchanged. 

43 ''' 

44 @__.funct.wraps( func ) 

45 async def wrapper( 

46 *posargs: __.typx.Any, **nomargs: __.typx.Any 

47 ) -> dict[ str, __.typx.Any ]: 

48 try: 

49 return await func( *posargs, **nomargs ) 

50 except _exceptions.Omnierror as exc: 

51 return dict( exc.render_as_json( ) ) 

52 except Exception: 

53 raise 

54 

55 return wrapper 

56 

57 

58@__.dcls.dataclass( kw_only = True, slots = True ) 

59class SearchBehaviorsMutable: 

60 ''' Mutable version of SearchBehaviors for FastMCP/Pydantic compatibility. 

61 

62 Note: Fields are manually duplicated from SearchBehaviors to avoid 

63 immutable dataclass internals leaking into JSON schema generation. 

64 ''' 

65 

66 match_mode: _interfaces.MatchMode = _interfaces.MatchMode.Similar 

67 similarity_score_min: int = 50 

68 

69 

70FiltersMutable: __.typx.TypeAlias = dict[ str, __.typx.Any ] 

71GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[ 

72 __.typx.Optional[ str ], 

73 _Field( description = __.access_doctab( 'group by argument' ) ), 

74] 

75TermArgument: __.typx.TypeAlias = __.typx.Annotated[ 

76 str, _Field( description = __.access_doctab( 'term argument' ) ) ] 

77ResultsMax: __.typx.TypeAlias = __.typx.Annotated[ 

78 int, _Field( description = __.access_doctab( 'results max argument' ) ) ] 

79LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

80 str, _Field( description = __.access_doctab( 'location argument' ) ) ] 

81ContainsTerm: __.typx.TypeAlias = __.typx.Annotated[ 

82 bool, 

83 _Field( 

84 description = ( 

85 "Enable substring matching in Exact and Similar modes. " 

86 "When enabled, allows terms to match as substrings." ) ), 

87] 

88CaseSensitive: __.typx.TypeAlias = __.typx.Annotated[ 

89 bool, 

90 _Field( 

91 description = ( 

92 "Enable case-sensitive matching. When False, " 

93 "performs case-insensitive matching (default)." ) ), 

94] 

95 

96 

97_filters_default = FiltersMutable( ) 

98_search_behaviors_default = SearchBehaviorsMutable( ) 

99 

100_scribe = __.acquire_scribe( __name__ ) 

101 

102 

103 

104async def serve( 

105 auxdata: _state.Globals, /, *, 

106 port: int = 0, 

107 transport: str = 'stdio', 

108 extra_functions: bool = False, 

109) -> None: 

110 ''' Runs MCP server. ''' 

111 _scribe.debug( "Initializing FastMCP server." ) 

112 mcp = _FastMCP( 'Librovore Documentation Server', port = port ) 

113 _register_server_functions( 

114 auxdata, mcp, extra_functions = extra_functions ) 

115 match transport: 

116 case 'sse': await mcp.run_sse_async( mount_path = None ) 

117 case 'stdio': await mcp.run_stdio_async( ) 

118 case _: raise ValueError 

119 

120 

121def _produce_detect_function( auxdata: _state.Globals ): 

122 @intercept_errors 

123 async def detect( 

124 location: LocationArgument, 

125 genus: __.typx.Annotated[ 

126 _interfaces.ProcessorGenera, 

127 _Field( description = "Processor genus (inventory or structure)" ), 

128 ], 

129 processor_name: __.typx.Annotated[ 

130 __.typx.Optional[ str ], 

131 _Field( description = "Optional processor name." ), 

132 ] = None, 

133 ) -> dict[ str, __.typx.Any ]: 

134 nomargs: __.NominativeArguments = { } 

135 if processor_name is not None: 

136 nomargs[ 'processor_name' ] = processor_name 

137 result = await _functions.detect( auxdata, location, genus, **nomargs ) 

138 return dict( result.render_as_json( ) ) 

139 

140 return detect 

141 

142 

143def _produce_query_content_function( auxdata: _state.Globals ): 

144 @intercept_errors 

145 async def query_content( # noqa: PLR0913 

146 location: LocationArgument, 

147 term: TermArgument, 

148 search_behaviors: __.typx.Annotated[ 

149 SearchBehaviorsMutable, 

150 _Field( description = "Search behavior configuration" ), 

151 ] = _search_behaviors_default, 

152 filters: __.typx.Annotated[ 

153 FiltersMutable, 

154 _Field( description = "Processor-specific filters" ), 

155 ] = _filters_default, 

156 results_max: ResultsMax = 10, 

157 lines_max: __.typx.Annotated[ 

158 int, 

159 _Field( 

160 description = ( 

161 "Lines per result. Use 5-10 for sampling/preview, " 

162 "larger values or omit for full content. Results " 

163 "include content_id for extraction." ) ), 

164 ] = 40, 

165 contains_term: ContainsTerm = True, 

166 case_sensitive: CaseSensitive = False, 

167 content_id: __.typx.Annotated[ 

168 __.typx.Optional[ str ], 

169 _Field( 

170 description = ( 

171 "Retrieve complete content for specific result from " 

172 "previous query. Use content_id values returned in " 

173 "sample searches." ) ), 

174 ] = None, 

175 reveal_internals: __.typx.Annotated[ 

176 bool, 

177 _Field( 

178 description = ( 

179 "Show internal implementation details (domain, priority, " 

180 "project, version)." ) ), 

181 ] = False, 

182 ) -> dict[ str, __.typx.Any ]: 

183 immutable_search_behaviors = ( 

184 _to_immutable_search_behaviors( search_behaviors ) ) 

185 immutable_filters = _to_immutable_filters( filters ) 

186 content_id_ = __.absent if content_id is None else content_id 

187 result = await _functions.query_content( 

188 auxdata, location, term, 

189 search_behaviors = _interfaces.SearchBehaviors( 

190 match_mode = immutable_search_behaviors.match_mode, 

191 similarity_score_min = ( 

192 immutable_search_behaviors.similarity_score_min ), 

193 contains_term = contains_term, 

194 case_sensitive = case_sensitive ), 

195 filters = immutable_filters, 

196 content_id = content_id_, 

197 results_max = results_max, 

198 lines_max = lines_max ) 

199 return dict( result.render_as_json( lines_max = lines_max ) ) 

200 

201 return query_content 

202 

203 

204def _produce_query_inventory_function( auxdata: _state.Globals ): 

205 @intercept_errors 

206 async def query_inventory( # noqa: PLR0913 

207 location: LocationArgument, 

208 term: TermArgument, 

209 search_behaviors: __.typx.Annotated[ 

210 SearchBehaviorsMutable, 

211 _Field( description = "Search behavior configuration" ), 

212 ] = _search_behaviors_default, 

213 filters: __.typx.Annotated[ 

214 FiltersMutable, 

215 _Field( description = "Processor-specific filters" ), 

216 ] = _filters_default, 

217 results_max: ResultsMax = 5, 

218 contains_term: ContainsTerm = True, 

219 case_sensitive: CaseSensitive = False, 

220 reveal_internals: __.typx.Annotated[ 

221 bool, 

222 _Field( 

223 description = ( 

224 "Show internal implementation details (domain, priority, " 

225 "project, version)." ) ), 

226 ] = False, 

227 ) -> dict[ str, __.typx.Any ]: 

228 immutable_search_behaviors = ( 

229 _to_immutable_search_behaviors( search_behaviors ) ) 

230 immutable_filters = _to_immutable_filters( filters ) 

231 result = await _functions.query_inventory( 

232 auxdata, location, term, 

233 search_behaviors = _interfaces.SearchBehaviors( 

234 match_mode = immutable_search_behaviors.match_mode, 

235 similarity_score_min = ( 

236 immutable_search_behaviors.similarity_score_min ), 

237 contains_term = contains_term, 

238 case_sensitive = case_sensitive ), 

239 filters = immutable_filters, 

240 results_max = results_max ) 

241 return dict( result.render_as_json( 

242 reveal_internals = reveal_internals ) ) 

243 

244 return query_inventory 

245 

246 

247 

248 

249def _produce_survey_processors_function( auxdata: _state.Globals ): 

250 @intercept_errors 

251 async def survey_processors( 

252 genus: __.typx.Annotated[ 

253 _interfaces.ProcessorGenera, 

254 _Field( description = "Processor genus (inventory or structure)" ), 

255 ], 

256 name: __.typx.Annotated[ 

257 __.typx.Optional[ str ], 

258 _Field( description = "Optional processor name to filter." ) 

259 ] = None, 

260 ) -> dict[ str, __.typx.Any ]: 

261 result = await _functions.survey_processors( auxdata, genus, name ) 

262 return dict( result.render_as_json( ) ) 

263 

264 return survey_processors 

265 

266 

267def _register_server_functions( 

268 auxdata: _state.Globals, mcp: _FastMCP, /, *, extra_functions: bool = False 

269) -> None: 

270 ''' Registers MCP server tools with closures for auxdata access. ''' 

271 _scribe.debug( "Registering tools." ) 

272 mcp.tool( )( _produce_query_inventory_function( auxdata ) ) 

273 mcp.tool( )( _produce_query_content_function( auxdata ) ) 

274 if extra_functions: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 mcp.tool( )( _produce_detect_function( auxdata ) ) 

276 mcp.tool( )( _produce_survey_processors_function( auxdata ) ) 

277 _scribe.debug( "All tools registered successfully." ) 

278 

279 

280def _to_immutable_filters( 

281 mutable_filters: FiltersMutable 

282) -> __.immut.Dictionary[ str, __.typx.Any ]: 

283 ''' Converts mutable filters dict to immutable dictionary. ''' 

284 return __.immut.Dictionary[ str, __.typx.Any ]( mutable_filters ) 

285 

286 

287def _to_immutable_search_behaviors( 

288 mutable_behaviors: SearchBehaviorsMutable 

289) -> _interfaces.SearchBehaviors: 

290 ''' Converts mutable search behaviors to immutable. ''' 

291 field_values = { 

292 field.name: getattr( mutable_behaviors, field.name ) 

293 for field in __.dcls.fields( mutable_behaviors ) 

294 if not field.name.startswith( '_' ) } 

295 return _interfaces.SearchBehaviors( **field_values )