Coverage for sources/librovore/server.py: 61%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-02 00:02 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' MCP server implementation. ''' 

22 

23from mcp.server.fastmcp import FastMCP as _FastMCP 

24 

25# FastMCP uses Pydantic to generate JSON schemas from function signatures. 

26from pydantic import Field as _Field 

27 

28from . import __ 

29from . import exceptions as _exceptions 

30from . import functions as _functions 

31from . import interfaces as _interfaces 

32from . import state as _state 

33 

34 

35def intercept_errors( 

36 func: __.cabc.Callable[ 

37 ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ] 

38) -> __.cabc.Callable[ ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ]: 

39 ''' Decorator for MCP functions to intercept self-rendering exceptions. 

40  

41 Catches Omnierror exceptions and returns their JSON representation 

42 instead of raising them. Other exceptions are re-raised unchanged. 

43 ''' 

44 @__.funct.wraps( func ) 

45 async def wrapper( 

46 *posargs: __.typx.Any, **nomargs: __.typx.Any 

47 ) -> dict[ str, __.typx.Any ]: 

48 try: 

49 return await func( *posargs, **nomargs ) 

50 except _exceptions.Omnierror as exc: 

51 return dict( exc.render_as_json( ) ) 

52 except Exception: 

53 raise 

54 

55 return wrapper 

56 

57 

58@__.dcls.dataclass( kw_only = True, slots = True ) 

59class SearchBehaviorsMutable: 

60 ''' Mutable version of SearchBehaviors for FastMCP/Pydantic compatibility. 

61 

62 Note: Fields are manually duplicated from SearchBehaviors to avoid 

63 immutable dataclass internals leaking into JSON schema generation. 

64 ''' 

65 

66 match_mode: _interfaces.MatchMode = _interfaces.MatchMode.Fuzzy 

67 fuzzy_threshold: int = 50 

68 

69 

70FiltersMutable: __.typx.TypeAlias = dict[ str, __.typx.Any ] 

71GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[ 

72 __.typx.Optional[ str ], 

73 _Field( description = __.access_doctab( 'group by argument' ) ), 

74] 

75TermArgument: __.typx.TypeAlias = __.typx.Annotated[ 

76 str, _Field( description = __.access_doctab( 'term argument' ) ) ] 

77ResultsMax: __.typx.TypeAlias = __.typx.Annotated[ 

78 int, _Field( description = __.access_doctab( 'results max argument' ) ) ] 

79LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

80 str, _Field( description = __.access_doctab( 'location argument' ) ) ] 

81 

82 

83_filters_default = FiltersMutable( ) 

84_search_behaviors_default = SearchBehaviorsMutable( ) 

85 

86_scribe = __.acquire_scribe( __name__ ) 

87 

88 

89 

90async def serve( 

91 auxdata: _state.Globals, /, *, 

92 port: int = 0, 

93 transport: str = 'stdio', 

94 extra_functions: bool = False, 

95) -> None: 

96 ''' Runs MCP server. ''' 

97 _scribe.debug( "Initializing FastMCP server." ) 

98 mcp = _FastMCP( 'Sphinx MCP Server', port = port ) 

99 _register_server_functions( 

100 auxdata, mcp, extra_functions = extra_functions ) 

101 match transport: 

102 case 'sse': await mcp.run_sse_async( mount_path = None ) 

103 case 'stdio': await mcp.run_stdio_async( ) 

104 case _: raise ValueError 

105 

106 

107def _produce_detect_function( auxdata: _state.Globals ): 

108 @intercept_errors 

109 async def detect( 

110 location: LocationArgument, 

111 genus: __.typx.Annotated[ 

112 _interfaces.ProcessorGenera, 

113 _Field( description = "Processor genus (inventory or structure)" ), 

114 ], 

115 processor_name: __.typx.Annotated[ 

116 __.typx.Optional[ str ], 

117 _Field( description = "Optional processor name." ), 

118 ] = None, 

119 ) -> dict[ str, __.typx.Any ]: 

120 nomargs: __.NominativeArguments = { } 

121 if processor_name is not None: 

122 nomargs[ 'processor_name' ] = processor_name 

123 result = await _functions.detect( auxdata, location, genus, **nomargs ) 

124 return dict( result.render_as_json( ) ) 

125 

126 return detect 

127 

128 

129def _produce_query_content_function( auxdata: _state.Globals ): 

130 @intercept_errors 

131 async def query_content( # noqa: PLR0913 

132 location: LocationArgument, 

133 term: TermArgument, 

134 search_behaviors: __.typx.Annotated[ 

135 SearchBehaviorsMutable, 

136 _Field( description = "Search behavior configuration" ), 

137 ] = _search_behaviors_default, 

138 filters: __.typx.Annotated[ 

139 FiltersMutable, 

140 _Field( description = "Processor-specific filters" ), 

141 ] = _filters_default, 

142 results_max: ResultsMax = 10, 

143 lines_max: __.typx.Annotated[ 

144 int, 

145 _Field( description = "Maximum lines to display per result." ), 

146 ] = 40, 

147 ) -> dict[ str, __.typx.Any ]: 

148 immutable_search_behaviors = ( 

149 _to_immutable_search_behaviors( search_behaviors ) ) 

150 immutable_filters = _to_immutable_filters( filters ) 

151 result = await _functions.query_content( 

152 auxdata, location, term, 

153 search_behaviors = immutable_search_behaviors, 

154 filters = immutable_filters, 

155 results_max = results_max, 

156 lines_max = lines_max ) 

157 return dict( result.render_as_json( lines_max = lines_max ) ) 

158 

159 return query_content 

160 

161 

162def _produce_query_inventory_function( auxdata: _state.Globals ): 

163 @intercept_errors 

164 async def query_inventory( # noqa: PLR0913 

165 location: LocationArgument, 

166 term: TermArgument, 

167 search_behaviors: __.typx.Annotated[ 

168 SearchBehaviorsMutable, 

169 _Field( description = "Search behavior configuration" ), 

170 ] = _search_behaviors_default, 

171 filters: __.typx.Annotated[ 

172 FiltersMutable, 

173 _Field( description = "Processor-specific filters" ), 

174 ] = _filters_default, 

175 details: __.typx.Annotated[ 

176 _interfaces.InventoryQueryDetails, 

177 _Field( description = "Detail level for inventory results" ), 

178 ] = _interfaces.InventoryQueryDetails.Name, 

179 results_max: ResultsMax = 5, 

180 ) -> dict[ str, __.typx.Any ]: 

181 immutable_search_behaviors = ( 

182 _to_immutable_search_behaviors( search_behaviors ) ) 

183 immutable_filters = _to_immutable_filters( filters ) 

184 result = await _functions.query_inventory( 

185 auxdata, location, term, 

186 search_behaviors = immutable_search_behaviors, 

187 filters = immutable_filters, 

188 details = details, 

189 results_max = results_max ) 

190 return dict( result.render_as_json( ) ) 

191 

192 return query_inventory 

193 

194 

195 

196 

197def _produce_survey_processors_function( auxdata: _state.Globals ): 

198 @intercept_errors 

199 async def survey_processors( 

200 genus: __.typx.Annotated[ 

201 _interfaces.ProcessorGenera, 

202 _Field( description = "Processor genus (inventory or structure)" ), 

203 ], 

204 name: __.typx.Annotated[ 

205 __.typx.Optional[ str ], 

206 _Field( description = "Optional processor name to filter." ) 

207 ] = None, 

208 ) -> dict[ str, __.typx.Any ]: 

209 result = await _functions.survey_processors( auxdata, genus, name ) 

210 return dict( result.render_as_json( ) ) 

211 

212 return survey_processors 

213 

214 

215def _register_server_functions( 

216 auxdata: _state.Globals, mcp: _FastMCP, /, *, extra_functions: bool = False 

217) -> None: 

218 ''' Registers MCP server tools with closures for auxdata access. ''' 

219 _scribe.debug( "Registering tools." ) 

220 mcp.tool( )( _produce_query_inventory_function( auxdata ) ) 

221 mcp.tool( )( _produce_query_content_function( auxdata ) ) 

222 if extra_functions: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 mcp.tool( )( _produce_detect_function( auxdata ) ) 

224 mcp.tool( )( _produce_survey_processors_function( auxdata ) ) 

225 _scribe.debug( "All tools registered successfully." ) 

226 

227 

228def _to_immutable_filters( 

229 mutable_filters: FiltersMutable 

230) -> __.immut.Dictionary[ str, __.typx.Any ]: 

231 ''' Converts mutable filters dict to immutable dictionary. ''' 

232 return __.immut.Dictionary[ str, __.typx.Any ]( mutable_filters ) 

233 

234 

235def _to_immutable_search_behaviors( 

236 mutable_behaviors: SearchBehaviorsMutable 

237) -> _interfaces.SearchBehaviors: 

238 ''' Converts mutable search behaviors to immutable. ''' 

239 field_values = { 

240 field.name: getattr( mutable_behaviors, field.name ) 

241 for field in __.dcls.fields( mutable_behaviors ) 

242 if not field.name.startswith( '_' ) } 

243 return _interfaces.SearchBehaviors( **field_values )