Coverage for sources / agentsmgr / userdata.py: 32%
146 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:00 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:00 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Global settings management for coder configurations.
23 Provides functionality for populating per-user global settings files,
24 including direct file copying and JSON/TOML settings merging with user
25 preservation semantics.
26'''
29import json as _json
31import toml as _toml
33from . import __
34from . import exceptions as _exceptions
35from . import renderers as _renderers
38_scribe = __.provide_scribe( __name__ )
41def _is_json_dict(
42 value: __.typx.Any
43) -> __.typx.TypeGuard[ dict[ str, __.typx.Any ] ]:
44 ''' Type guard for JSON dictionary values. '''
45 return isinstance( value, dict )
48def populate_globals(
49 data_location: __.Path,
50 coders: __.cabc.Sequence[ str ],
51 application_configuration: __.cabc.Mapping[ str, __.typx.Any ],
52 simulate: bool = False,
53) -> tuple[ int, int ]:
54 ''' Populates per-user global files for configured coders.
56 Surveys defaults/user/configurations directory for coder-specific
57 files and populates them to per-user locations. Handles two types
58 of files: direct copy for non-settings files and merge for settings
59 files (preserving user values).
61 Returns tuple of (files_attempted, files_updated) counts.
62 '''
63 globals_directory = data_location / 'user' / 'configurations'
64 if not globals_directory.exists( ):
65 return ( 0, 0 )
66 files_attempted = 0
67 files_updated = 0
68 for coder in coders:
69 coder_globals = globals_directory / coder
70 if not coder_globals.exists( ):
71 continue
72 try: renderer = _renderers.RENDERERS[ coder ]
73 except KeyError as exception:
74 raise _exceptions.CoderAbsence( coder ) from exception
75 per_user_directory = renderer.resolve_base_directory(
76 mode = 'per-user',
77 target = __.Path.cwd( ),
78 configuration = application_configuration,
79 environment = __.os.environ,
80 )
81 for global_file in coder_globals.iterdir( ):
82 if not global_file.is_file( ):
83 continue
84 files_attempted += 1
85 target_file = per_user_directory / global_file.name
86 if _is_settings_file( global_file, coder ):
87 updated = _merge_settings_file(
88 global_file, target_file, simulate )
89 else:
90 updated = _copy_file_directly(
91 global_file, target_file, simulate )
92 if updated:
93 files_updated += 1
94 return ( files_attempted, files_updated )
97def _is_settings_file( file: __.Path, coder: str ) -> bool:
98 ''' Determines whether file is a settings file requiring merge logic.
100 Settings files have coder-specific names and contain JSON or TOML
101 configuration that should be merged rather than replaced. Non-settings
102 files are directly copied, replacing any existing version.
103 '''
104 settings_names: dict[ str, tuple[ str, ... ] ] = {
105 'claude': ( 'settings.json', ),
106 'opencode': ( 'opencode.json', 'opencode.jsonc' ),
107 'codex': ( 'config.toml', ),
108 }
109 return file.name in settings_names.get( coder, ( ) )
112def _copy_file_directly(
113 source: __.Path, target: __.Path, simulate: bool
114) -> bool:
115 ''' Copies file directly from source to target location.
117 Creates target directory if needed. Returns True if file was
118 updated (or would be updated in simulation mode).
119 '''
120 if simulate:
121 return True
122 target.parent.mkdir( parents = True, exist_ok = True )
123 try: __.shutil.copy2( source, target )
124 except ( OSError, IOError ) as exception:
125 raise _exceptions.GlobalsPopulationFailure(
126 source, target
127 ) from exception
128 return True
131def _merge_settings_file(
132 source: __.Path, target: __.Path, simulate: bool
133) -> bool:
134 ''' Merges JSON or TOML settings file preserving user values.
136 Loads both source template and target user settings, performs deep
137 merge adding missing keys from template while preserving all user
138 values. Creates backup before writing merged result. Returns True
139 if file was updated (or would be updated in simulation mode).
140 '''
141 if source.suffix == '.toml': 141 ↛ 163line 141 didn't jump to line 163 because the condition on line 141 was always true
142 template = _load_toml_file( source, target )
143 user_settings: dict[ str, __.typx.Any ] = (
144 _load_toml_file( target, target ) if target.exists( )
145 else { } )
146 merged = _deep_merge_settings( user_settings, template )
147 if simulate: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 return True
149 if merged == user_settings: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 return False
151 if target.exists( ) and _toml_content_contains_comments( 151 ↛ 154line 151 didn't jump to line 154 because the condition on line 151 was never true
152 target.read_text( encoding = 'utf-8' )
153 ):
154 backup_path = target.with_suffix( '.toml.backup' )
155 _scribe.warning(
156 "TOML settings merge rewrites '%s' and may drop comments. "
157 "A backup will be written to '%s'.",
158 target,
159 backup_path,
160 )
161 _write_merged_toml_settings( target, merged )
162 else:
163 template = _load_json_file( source, target )
164 user_settings = (
165 _load_json_file( target, target ) if target.exists( )
166 else { } )
167 merged = _deep_merge_settings( user_settings, template )
168 if simulate:
169 return True
170 if merged == user_settings:
171 return False
172 _write_merged_settings( target, merged )
173 return True
176def _load_json_file(
177 filepath: __.Path, target_context: __.Path
178) -> dict[ str, __.typx.Any ]:
179 ''' Loads JSON file with error handling.
181 Raises GlobalsPopulationFailure with source context on any error.
182 '''
183 try: content = filepath.read_text( encoding = 'utf-8' )
184 except ( OSError, IOError ) as exception:
185 raise _exceptions.GlobalsPopulationFailure(
186 filepath, target_context ) from exception
187 try:
188 loaded: __.typx.Any = _json.loads( content )
189 except ValueError as exception:
190 raise _exceptions.GlobalsPopulationFailure(
191 filepath, target_context ) from exception
192 if not _is_json_dict( loaded ):
193 raise _exceptions.GlobalsPopulationFailure( filepath, target_context )
194 return loaded
197def _write_merged_settings(
198 target: __.Path, merged: dict[ str, __.typx.Any ]
199) -> None:
200 ''' Writes merged settings with backup of existing file.
202 Creates target directory if needed. Backs up existing file before
203 writing merged result.
204 '''
205 target.parent.mkdir( parents = True, exist_ok = True )
206 if target.exists( ):
207 backup_path = target.with_suffix( '.json.backup' )
208 try: __.shutil.copy2( target, backup_path )
209 except ( OSError, IOError ) as exception:
210 raise _exceptions.GlobalsPopulationFailure(
211 target, target ) from exception
212 try:
213 target.write_text(
214 _json.dumps( merged, indent = 2 ), encoding = 'utf-8' )
215 except ( OSError, IOError ) as exception:
216 raise _exceptions.GlobalsPopulationFailure(
217 target, target
218 ) from exception
221def _load_toml_file(
222 filepath: __.Path, target_context: __.Path
223) -> dict[ str, __.typx.Any ]:
224 ''' Loads TOML file with error handling.
226 Raises GlobalsPopulationFailure with source context on any error.
227 '''
228 try: content = filepath.read_text( encoding = 'utf-8' )
229 except ( OSError, IOError ) as exception:
230 raise _exceptions.GlobalsPopulationFailure(
231 filepath, target_context ) from exception
232 try:
233 loaded: dict[ str, __.typx.Any ] = __.tomli.loads(
234 content )
235 except __.tomli.TOMLDecodeError as exception:
236 raise _exceptions.GlobalsPopulationFailure(
237 filepath, target_context ) from exception
238 return loaded
241def _write_merged_toml_settings(
242 target: __.Path, merged: dict[ str, __.typx.Any ]
243) -> None:
244 ''' Writes merged TOML settings with backup of existing file.
246 Creates target directory if needed. Backs up existing file before
247 writing merged result.
248 '''
249 target.parent.mkdir( parents = True, exist_ok = True )
250 if target.exists( ): 250 ↛ 256line 250 didn't jump to line 256 because the condition on line 250 was always true
251 backup_path = target.with_suffix( '.toml.backup' )
252 try: __.shutil.copy2( target, backup_path )
253 except ( OSError, IOError ) as exception:
254 raise _exceptions.GlobalsPopulationFailure(
255 target, target ) from exception
256 try:
257 target.write_text( _toml.dumps( merged ), encoding = 'utf-8' )
258 except ( OSError, IOError ) as exception:
259 raise _exceptions.GlobalsPopulationFailure(
260 target, target
261 ) from exception
264def _toml_content_contains_comments( content: str ) -> bool:
265 ''' Heuristic detection for comments in a TOML file.
267 TOML comments begin with '#'. We warn when overwriting TOML settings
268 because our merge process rewrites the file and does not preserve
269 comments or formatting.
270 '''
271 for line in content.splitlines( ):
272 stripped = line.lstrip( )
273 if stripped.startswith( '#' ): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 return True
275 if ' #' in line: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 return True
277 return False
280def populate_user_wrappers(
281 data_location: __.Path,
282 simulate: bool = False,
283) -> tuple[ int, int ]:
284 ''' Installs wrapper scripts to user bin directory.
286 Copies wrapper scripts from data source to ~/.local/bin,
287 making them executable. Returns tuple of (files_attempted,
288 files_installed) counts.
289 '''
290 wrappers_dir = data_location / 'user' / 'executables'
291 user_bin = __.Path.home( ) / '.local' / 'bin'
292 if not wrappers_dir.exists( ):
293 return ( 0, 0 )
294 files_attempted = 0
295 files_installed = 0
296 for script in wrappers_dir.iterdir( ):
297 if not script.is_file( ):
298 continue
299 files_attempted += 1
300 target = user_bin / script.name
301 if not simulate:
302 user_bin.mkdir( parents = True, exist_ok = True )
303 try: __.shutil.copy2( script, target )
304 except ( OSError, IOError ) as exception:
305 raise _exceptions.GlobalsPopulationFailure(
306 script, target
307 ) from exception
308 try: target.chmod( target.stat( ).st_mode | 0o111 )
309 except ( OSError, IOError ) as exception:
310 raise _exceptions.GlobalsPopulationFailure(
311 script, target
312 ) from exception
313 files_installed += 1
314 return ( files_attempted, files_installed )
317def _deep_merge_settings(
318 target: dict[ str, __.typx.Any ], source: dict[ str, __.typx.Any ]
319) -> dict[ str, __.typx.Any ]:
320 ''' Recursively merges source into target preserving target values.
322 Implements additive merge: adds keys from source that are missing
323 in target. When both contain same key with dict values, recursively
324 merges nested dicts. For conflicting scalar values, target value
325 wins (user preferences preserved).
326 '''
327 result = target.copy( )
328 for key, source_value in source.items( ):
329 if key not in result:
330 result[ key ] = source_value
331 elif (
332 _is_json_dict( result[ key ] )
333 and _is_json_dict( source_value )
334 ):
335 result[ key ] = _deep_merge_settings(
336 result[ key ], source_value )
337 return result