Coverage for sources/librovore/server.py: 60%

82 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 21:59 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' MCP server implementation. ''' 

22 

23from mcp.server.fastmcp import FastMCP as _FastMCP 

24 

25# FastMCP uses Pydantic to generate JSON schemas from function signatures. 

26from pydantic import Field as _Field 

27 

28from . import __ 

29from . import exceptions as _exceptions 

30from . import functions as _functions 

31from . import interfaces as _interfaces 

32from . import state as _state 

33 

34 

35def intercept_errors( 

36 func: __.cabc.Callable[ 

37 ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ] 

38) -> __.cabc.Callable[ ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ]: 

39 ''' Decorator for MCP functions to intercept self-rendering exceptions. 

40  

41 Catches Omnierror exceptions and returns their JSON representation 

42 instead of raising them. Other exceptions are re-raised unchanged. 

43 ''' 

44 @__.funct.wraps( func ) 

45 async def wrapper( 

46 *posargs: __.typx.Any, **nomargs: __.typx.Any 

47 ) -> dict[ str, __.typx.Any ]: 

48 try: 

49 return await func( *posargs, **nomargs ) 

50 except _exceptions.Omnierror as exc: 

51 return dict( exc.render_as_json( ) ) 

52 except Exception: 

53 raise 

54 

55 return wrapper 

56 

57 

58@__.dcls.dataclass( kw_only = True, slots = True ) 

59class SearchBehaviorsMutable: 

60 ''' Mutable version of SearchBehaviors for FastMCP/Pydantic compatibility. 

61 

62 Note: Fields are manually duplicated from SearchBehaviors to avoid 

63 immutable dataclass internals leaking into JSON schema generation. 

64 ''' 

65 

66 match_mode: _interfaces.MatchMode = _interfaces.MatchMode.Fuzzy 

67 fuzzy_threshold: int = 50 

68 

69 

70FiltersMutable: __.typx.TypeAlias = dict[ str, __.typx.Any ] 

71GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[ 

72 __.typx.Optional[ str ], 

73 _Field( description = __.access_doctab( 'group by argument' ) ), 

74] 

75TermArgument: __.typx.TypeAlias = __.typx.Annotated[ 

76 str, _Field( description = __.access_doctab( 'term argument' ) ) ] 

77ResultsMax: __.typx.TypeAlias = __.typx.Annotated[ 

78 int, _Field( description = __.access_doctab( 'results max argument' ) ) ] 

79LocationArgument: __.typx.TypeAlias = __.typx.Annotated[ 

80 str, _Field( description = __.access_doctab( 'location argument' ) ) ] 

81 

82 

83_filters_default = FiltersMutable( ) 

84_search_behaviors_default = SearchBehaviorsMutable( ) 

85 

86_scribe = __.acquire_scribe( __name__ ) 

87 

88 

89 

90async def serve( 

91 auxdata: _state.Globals, /, *, 

92 port: int = 0, 

93 transport: str = 'stdio', 

94 extra_functions: bool = False, 

95) -> None: 

96 ''' Runs MCP server. ''' 

97 _scribe.debug( "Initializing FastMCP server." ) 

98 mcp = _FastMCP( 'Librovore Documentation Server', port = port ) 

99 _register_server_functions( 

100 auxdata, mcp, extra_functions = extra_functions ) 

101 match transport: 

102 case 'sse': await mcp.run_sse_async( mount_path = None ) 

103 case 'stdio': await mcp.run_stdio_async( ) 

104 case _: raise ValueError 

105 

106 

107def _produce_detect_function( auxdata: _state.Globals ): 

108 @intercept_errors 

109 async def detect( 

110 location: LocationArgument, 

111 genus: __.typx.Annotated[ 

112 _interfaces.ProcessorGenera, 

113 _Field( description = "Processor genus (inventory or structure)" ), 

114 ], 

115 processor_name: __.typx.Annotated[ 

116 __.typx.Optional[ str ], 

117 _Field( description = "Optional processor name." ), 

118 ] = None, 

119 ) -> dict[ str, __.typx.Any ]: 

120 nomargs: __.NominativeArguments = { } 

121 if processor_name is not None: 

122 nomargs[ 'processor_name' ] = processor_name 

123 result = await _functions.detect( auxdata, location, genus, **nomargs ) 

124 return dict( result.render_as_json( ) ) 

125 

126 return detect 

127 

128 

129def _produce_query_content_function( auxdata: _state.Globals ): 

130 @intercept_errors 

131 async def query_content( # noqa: PLR0913 

132 location: LocationArgument, 

133 term: TermArgument, 

134 search_behaviors: __.typx.Annotated[ 

135 SearchBehaviorsMutable, 

136 _Field( description = "Search behavior configuration" ), 

137 ] = _search_behaviors_default, 

138 filters: __.typx.Annotated[ 

139 FiltersMutable, 

140 _Field( description = "Processor-specific filters" ), 

141 ] = _filters_default, 

142 results_max: ResultsMax = 10, 

143 lines_max: __.typx.Annotated[ 

144 int, 

145 _Field( 

146 description = ( 

147 "Lines per result. Use 5-10 for sampling/preview, " 

148 "larger values or omit for full content. Results " 

149 "include content_id for extraction." ) ), 

150 ] = 40, 

151 content_id: __.typx.Annotated[ 

152 __.typx.Optional[ str ], 

153 _Field( 

154 description = ( 

155 "Retrieve complete content for specific result from " 

156 "previous query. Use content_id values returned in " 

157 "sample searches." ) ), 

158 ] = None, 

159 ) -> dict[ str, __.typx.Any ]: 

160 immutable_search_behaviors = ( 

161 _to_immutable_search_behaviors( search_behaviors ) ) 

162 immutable_filters = _to_immutable_filters( filters ) 

163 content_id_ = __.absent if content_id is None else content_id 

164 result = await _functions.query_content( 

165 auxdata, location, term, 

166 search_behaviors = immutable_search_behaviors, 

167 filters = immutable_filters, 

168 content_id = content_id_, 

169 results_max = results_max, 

170 lines_max = lines_max ) 

171 return dict( result.render_as_json( lines_max = lines_max ) ) 

172 

173 return query_content 

174 

175 

176def _produce_query_inventory_function( auxdata: _state.Globals ): 

177 @intercept_errors 

178 async def query_inventory( # noqa: PLR0913 

179 location: LocationArgument, 

180 term: TermArgument, 

181 search_behaviors: __.typx.Annotated[ 

182 SearchBehaviorsMutable, 

183 _Field( description = "Search behavior configuration" ), 

184 ] = _search_behaviors_default, 

185 filters: __.typx.Annotated[ 

186 FiltersMutable, 

187 _Field( description = "Processor-specific filters" ), 

188 ] = _filters_default, 

189 details: __.typx.Annotated[ 

190 _interfaces.InventoryQueryDetails, 

191 _Field( description = "Detail level for inventory results" ), 

192 ] = _interfaces.InventoryQueryDetails.Name, 

193 results_max: ResultsMax = 5, 

194 ) -> dict[ str, __.typx.Any ]: 

195 immutable_search_behaviors = ( 

196 _to_immutable_search_behaviors( search_behaviors ) ) 

197 immutable_filters = _to_immutable_filters( filters ) 

198 result = await _functions.query_inventory( 

199 auxdata, location, term, 

200 search_behaviors = immutable_search_behaviors, 

201 filters = immutable_filters, 

202 details = details, 

203 results_max = results_max ) 

204 return dict( result.render_as_json( ) ) 

205 

206 return query_inventory 

207 

208 

209 

210 

211def _produce_survey_processors_function( auxdata: _state.Globals ): 

212 @intercept_errors 

213 async def survey_processors( 

214 genus: __.typx.Annotated[ 

215 _interfaces.ProcessorGenera, 

216 _Field( description = "Processor genus (inventory or structure)" ), 

217 ], 

218 name: __.typx.Annotated[ 

219 __.typx.Optional[ str ], 

220 _Field( description = "Optional processor name to filter." ) 

221 ] = None, 

222 ) -> dict[ str, __.typx.Any ]: 

223 result = await _functions.survey_processors( auxdata, genus, name ) 

224 return dict( result.render_as_json( ) ) 

225 

226 return survey_processors 

227 

228 

229def _register_server_functions( 

230 auxdata: _state.Globals, mcp: _FastMCP, /, *, extra_functions: bool = False 

231) -> None: 

232 ''' Registers MCP server tools with closures for auxdata access. ''' 

233 _scribe.debug( "Registering tools." ) 

234 mcp.tool( )( _produce_query_inventory_function( auxdata ) ) 

235 mcp.tool( )( _produce_query_content_function( auxdata ) ) 

236 if extra_functions: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 mcp.tool( )( _produce_detect_function( auxdata ) ) 

238 mcp.tool( )( _produce_survey_processors_function( auxdata ) ) 

239 _scribe.debug( "All tools registered successfully." ) 

240 

241 

242def _to_immutable_filters( 

243 mutable_filters: FiltersMutable 

244) -> __.immut.Dictionary[ str, __.typx.Any ]: 

245 ''' Converts mutable filters dict to immutable dictionary. ''' 

246 return __.immut.Dictionary[ str, __.typx.Any ]( mutable_filters ) 

247 

248 

249def _to_immutable_search_behaviors( 

250 mutable_behaviors: SearchBehaviorsMutable 

251) -> _interfaces.SearchBehaviors: 

252 ''' Converts mutable search behaviors to immutable. ''' 

253 field_values = { 

254 field.name: getattr( mutable_behaviors, field.name ) 

255 for field in __.dcls.fields( mutable_behaviors ) 

256 if not field.name.startswith( '_' ) } 

257 return _interfaces.SearchBehaviors( **field_values )