Coverage for sources/librovore/server.py: 60%
82 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 21:59 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 21:59 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' MCP server implementation. '''
23from mcp.server.fastmcp import FastMCP as _FastMCP
25# FastMCP uses Pydantic to generate JSON schemas from function signatures.
26from pydantic import Field as _Field
28from . import __
29from . import exceptions as _exceptions
30from . import functions as _functions
31from . import interfaces as _interfaces
32from . import state as _state
35def intercept_errors(
36 func: __.cabc.Callable[
37 ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ]
38) -> __.cabc.Callable[ ..., __.cabc.Awaitable[ dict[ str, __.typx.Any ] ] ]:
39 ''' Decorator for MCP functions to intercept self-rendering exceptions.
41 Catches Omnierror exceptions and returns their JSON representation
42 instead of raising them. Other exceptions are re-raised unchanged.
43 '''
44 @__.funct.wraps( func )
45 async def wrapper(
46 *posargs: __.typx.Any, **nomargs: __.typx.Any
47 ) -> dict[ str, __.typx.Any ]:
48 try:
49 return await func( *posargs, **nomargs )
50 except _exceptions.Omnierror as exc:
51 return dict( exc.render_as_json( ) )
52 except Exception:
53 raise
55 return wrapper
58@__.dcls.dataclass( kw_only = True, slots = True )
59class SearchBehaviorsMutable:
60 ''' Mutable version of SearchBehaviors for FastMCP/Pydantic compatibility.
62 Note: Fields are manually duplicated from SearchBehaviors to avoid
63 immutable dataclass internals leaking into JSON schema generation.
64 '''
66 match_mode: _interfaces.MatchMode = _interfaces.MatchMode.Fuzzy
67 fuzzy_threshold: int = 50
70FiltersMutable: __.typx.TypeAlias = dict[ str, __.typx.Any ]
71GroupByArgument: __.typx.TypeAlias = __.typx.Annotated[
72 __.typx.Optional[ str ],
73 _Field( description = __.access_doctab( 'group by argument' ) ),
74]
75TermArgument: __.typx.TypeAlias = __.typx.Annotated[
76 str, _Field( description = __.access_doctab( 'term argument' ) ) ]
77ResultsMax: __.typx.TypeAlias = __.typx.Annotated[
78 int, _Field( description = __.access_doctab( 'results max argument' ) ) ]
79LocationArgument: __.typx.TypeAlias = __.typx.Annotated[
80 str, _Field( description = __.access_doctab( 'location argument' ) ) ]
83_filters_default = FiltersMutable( )
84_search_behaviors_default = SearchBehaviorsMutable( )
86_scribe = __.acquire_scribe( __name__ )
90async def serve(
91 auxdata: _state.Globals, /, *,
92 port: int = 0,
93 transport: str = 'stdio',
94 extra_functions: bool = False,
95) -> None:
96 ''' Runs MCP server. '''
97 _scribe.debug( "Initializing FastMCP server." )
98 mcp = _FastMCP( 'Librovore Documentation Server', port = port )
99 _register_server_functions(
100 auxdata, mcp, extra_functions = extra_functions )
101 match transport:
102 case 'sse': await mcp.run_sse_async( mount_path = None )
103 case 'stdio': await mcp.run_stdio_async( )
104 case _: raise ValueError
107def _produce_detect_function( auxdata: _state.Globals ):
108 @intercept_errors
109 async def detect(
110 location: LocationArgument,
111 genus: __.typx.Annotated[
112 _interfaces.ProcessorGenera,
113 _Field( description = "Processor genus (inventory or structure)" ),
114 ],
115 processor_name: __.typx.Annotated[
116 __.typx.Optional[ str ],
117 _Field( description = "Optional processor name." ),
118 ] = None,
119 ) -> dict[ str, __.typx.Any ]:
120 nomargs: __.NominativeArguments = { }
121 if processor_name is not None:
122 nomargs[ 'processor_name' ] = processor_name
123 result = await _functions.detect( auxdata, location, genus, **nomargs )
124 return dict( result.render_as_json( ) )
126 return detect
129def _produce_query_content_function( auxdata: _state.Globals ):
130 @intercept_errors
131 async def query_content( # noqa: PLR0913
132 location: LocationArgument,
133 term: TermArgument,
134 search_behaviors: __.typx.Annotated[
135 SearchBehaviorsMutable,
136 _Field( description = "Search behavior configuration" ),
137 ] = _search_behaviors_default,
138 filters: __.typx.Annotated[
139 FiltersMutable,
140 _Field( description = "Processor-specific filters" ),
141 ] = _filters_default,
142 results_max: ResultsMax = 10,
143 lines_max: __.typx.Annotated[
144 int,
145 _Field(
146 description = (
147 "Lines per result. Use 5-10 for sampling/preview, "
148 "larger values or omit for full content. Results "
149 "include content_id for extraction." ) ),
150 ] = 40,
151 content_id: __.typx.Annotated[
152 __.typx.Optional[ str ],
153 _Field(
154 description = (
155 "Retrieve complete content for specific result from "
156 "previous query. Use content_id values returned in "
157 "sample searches." ) ),
158 ] = None,
159 ) -> dict[ str, __.typx.Any ]:
160 immutable_search_behaviors = (
161 _to_immutable_search_behaviors( search_behaviors ) )
162 immutable_filters = _to_immutable_filters( filters )
163 content_id_ = __.absent if content_id is None else content_id
164 result = await _functions.query_content(
165 auxdata, location, term,
166 search_behaviors = immutable_search_behaviors,
167 filters = immutable_filters,
168 content_id = content_id_,
169 results_max = results_max,
170 lines_max = lines_max )
171 return dict( result.render_as_json( lines_max = lines_max ) )
173 return query_content
176def _produce_query_inventory_function( auxdata: _state.Globals ):
177 @intercept_errors
178 async def query_inventory( # noqa: PLR0913
179 location: LocationArgument,
180 term: TermArgument,
181 search_behaviors: __.typx.Annotated[
182 SearchBehaviorsMutable,
183 _Field( description = "Search behavior configuration" ),
184 ] = _search_behaviors_default,
185 filters: __.typx.Annotated[
186 FiltersMutable,
187 _Field( description = "Processor-specific filters" ),
188 ] = _filters_default,
189 details: __.typx.Annotated[
190 _interfaces.InventoryQueryDetails,
191 _Field( description = "Detail level for inventory results" ),
192 ] = _interfaces.InventoryQueryDetails.Name,
193 results_max: ResultsMax = 5,
194 ) -> dict[ str, __.typx.Any ]:
195 immutable_search_behaviors = (
196 _to_immutable_search_behaviors( search_behaviors ) )
197 immutable_filters = _to_immutable_filters( filters )
198 result = await _functions.query_inventory(
199 auxdata, location, term,
200 search_behaviors = immutable_search_behaviors,
201 filters = immutable_filters,
202 details = details,
203 results_max = results_max )
204 return dict( result.render_as_json( ) )
206 return query_inventory
211def _produce_survey_processors_function( auxdata: _state.Globals ):
212 @intercept_errors
213 async def survey_processors(
214 genus: __.typx.Annotated[
215 _interfaces.ProcessorGenera,
216 _Field( description = "Processor genus (inventory or structure)" ),
217 ],
218 name: __.typx.Annotated[
219 __.typx.Optional[ str ],
220 _Field( description = "Optional processor name to filter." )
221 ] = None,
222 ) -> dict[ str, __.typx.Any ]:
223 result = await _functions.survey_processors( auxdata, genus, name )
224 return dict( result.render_as_json( ) )
226 return survey_processors
229def _register_server_functions(
230 auxdata: _state.Globals, mcp: _FastMCP, /, *, extra_functions: bool = False
231) -> None:
232 ''' Registers MCP server tools with closures for auxdata access. '''
233 _scribe.debug( "Registering tools." )
234 mcp.tool( )( _produce_query_inventory_function( auxdata ) )
235 mcp.tool( )( _produce_query_content_function( auxdata ) )
236 if extra_functions: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 mcp.tool( )( _produce_detect_function( auxdata ) )
238 mcp.tool( )( _produce_survey_processors_function( auxdata ) )
239 _scribe.debug( "All tools registered successfully." )
242def _to_immutable_filters(
243 mutable_filters: FiltersMutable
244) -> __.immut.Dictionary[ str, __.typx.Any ]:
245 ''' Converts mutable filters dict to immutable dictionary. '''
246 return __.immut.Dictionary[ str, __.typx.Any ]( mutable_filters )
249def _to_immutable_search_behaviors(
250 mutable_behaviors: SearchBehaviorsMutable
251) -> _interfaces.SearchBehaviors:
252 ''' Converts mutable search behaviors to immutable. '''
253 field_values = {
254 field.name: getattr( mutable_behaviors, field.name )
255 for field in __.dcls.fields( mutable_behaviors )
256 if not field.name.startswith( '_' ) }
257 return _interfaces.SearchBehaviors( **field_values )