Coverage for sources / agentsmgr / userdata.py: 32%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-04 21:55 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Global settings management for coder configurations. 

22 

23 Provides functionality for populating per-user global settings files, 

24 including direct file copying and JSON/TOML settings merging with user 

25 preservation semantics. 

26''' 

27 

28 

29import json as _json 

30 

31import toml as _toml 

32 

33from . import __ 

34from . import exceptions as _exceptions 

35from . import renderers as _renderers 

36 

37 

38_scribe = __.provide_scribe( __name__ ) 

39 

40 

41def _is_json_dict( 

42 value: __.typx.Any 

43) -> __.typx.TypeGuard[ dict[ str, __.typx.Any ] ]: 

44 ''' Type guard for JSON dictionary values. ''' 

45 return isinstance( value, dict ) 

46 

47 

48def populate_globals( 

49 data_location: __.Path, 

50 coders: __.cabc.Sequence[ str ], 

51 application_configuration: __.cabc.Mapping[ str, __.typx.Any ], 

52 simulate: bool = False, 

53) -> tuple[ int, int ]: 

54 ''' Populates per-user global files for configured coders. 

55 

56 Surveys defaults/user/configurations directory for coder-specific 

57 files and populates them to per-user locations. Handles two types 

58 of files: direct copy for non-settings files and merge for settings 

59 files (preserving user values). 

60 

61 Returns tuple of (files_attempted, files_updated) counts. 

62 ''' 

63 globals_directory = data_location / 'user' / 'configurations' 

64 if not globals_directory.exists( ): 

65 return ( 0, 0 ) 

66 files_attempted = 0 

67 files_updated = 0 

68 for coder in coders: 

69 coder_globals = globals_directory / coder 

70 if not coder_globals.exists( ): 

71 continue 

72 try: renderer = _renderers.RENDERERS[ coder ] 

73 except KeyError as exception: 

74 raise _exceptions.CoderAbsence( coder ) from exception 

75 per_user_directory = renderer.resolve_base_directory( 

76 mode = 'per-user', 

77 target = __.Path.cwd( ), 

78 configuration = application_configuration, 

79 environment = __.os.environ, 

80 ) 

81 for global_file in coder_globals.iterdir( ): 

82 if not global_file.is_file( ): 

83 continue 

84 files_attempted += 1 

85 target_file = per_user_directory / global_file.name 

86 if _is_settings_file( global_file, coder ): 

87 updated = _merge_settings_file( 

88 global_file, target_file, simulate ) 

89 else: 

90 updated = _copy_file_directly( 

91 global_file, target_file, simulate ) 

92 if updated: 

93 files_updated += 1 

94 return ( files_attempted, files_updated ) 

95 

96 

97def _is_settings_file( file: __.Path, coder: str ) -> bool: 

98 ''' Determines whether file is a settings file requiring merge logic. 

99 

100 Settings files have coder-specific names and contain JSON or TOML 

101 configuration that should be merged rather than replaced. Non-settings 

102 files are directly copied, replacing any existing version. 

103 ''' 

104 settings_names: dict[ str, tuple[ str, ... ] ] = { 

105 'claude': ( 'settings.json', ), 

106 'opencode': ( 'opencode.json', 'opencode.jsonc' ), 

107 'codex': ( 'config.toml', ), 

108 } 

109 return file.name in settings_names.get( coder, ( ) ) 

110 

111 

112def _copy_file_directly( 

113 source: __.Path, target: __.Path, simulate: bool 

114) -> bool: 

115 ''' Copies file directly from source to target location. 

116 

117 Creates target directory if needed. Returns True if file was 

118 updated (or would be updated in simulation mode). 

119 ''' 

120 if simulate: 

121 return True 

122 target.parent.mkdir( parents = True, exist_ok = True ) 

123 try: __.shutil.copy2( source, target ) 

124 except ( OSError, IOError ) as exception: 

125 raise _exceptions.GlobalsPopulationFailure( 

126 source, target 

127 ) from exception 

128 return True 

129 

130 

131def _merge_settings_file( 

132 source: __.Path, target: __.Path, simulate: bool 

133) -> bool: 

134 ''' Merges JSON or TOML settings file preserving user values. 

135 

136 Loads both source template and target user settings, performs deep 

137 merge adding missing keys from template while preserving all user 

138 values. Creates backup before writing merged result. Returns True 

139 if file was updated (or would be updated in simulation mode). 

140 ''' 

141 if source.suffix == '.toml': 141 ↛ 163line 141 didn't jump to line 163 because the condition on line 141 was always true

142 template = _load_toml_file( source, target ) 

143 user_settings: dict[ str, __.typx.Any ] = ( 

144 _load_toml_file( target, target ) if target.exists( ) 

145 else { } ) 

146 merged = _deep_merge_settings( user_settings, template ) 

147 if simulate: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 return True 

149 if merged == user_settings: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 return False 

151 if target.exists( ) and _toml_content_contains_comments( 151 ↛ 154line 151 didn't jump to line 154 because the condition on line 151 was never true

152 target.read_text( encoding = 'utf-8' ) 

153 ): 

154 backup_path = target.with_suffix( '.toml.backup' ) 

155 _scribe.warning( 

156 "TOML settings merge rewrites '%s' and may drop comments. " 

157 "A backup will be written to '%s'.", 

158 target, 

159 backup_path, 

160 ) 

161 _write_merged_toml_settings( target, merged ) 

162 else: 

163 template = _load_json_file( source, target ) 

164 user_settings = ( 

165 _load_json_file( target, target ) if target.exists( ) 

166 else { } ) 

167 merged = _deep_merge_settings( user_settings, template ) 

168 if simulate: 

169 return True 

170 if merged == user_settings: 

171 return False 

172 _write_merged_settings( target, merged ) 

173 return True 

174 

175 

176def _load_json_file( 

177 filepath: __.Path, target_context: __.Path 

178) -> dict[ str, __.typx.Any ]: 

179 ''' Loads JSON file with error handling. 

180 

181 Raises GlobalsPopulationFailure with source context on any error. 

182 ''' 

183 try: content = filepath.read_text( encoding = 'utf-8' ) 

184 except ( OSError, IOError ) as exception: 

185 raise _exceptions.GlobalsPopulationFailure( 

186 filepath, target_context ) from exception 

187 try: 

188 loaded: __.typx.Any = _json.loads( content ) 

189 except ValueError as exception: 

190 raise _exceptions.GlobalsPopulationFailure( 

191 filepath, target_context ) from exception 

192 if not _is_json_dict( loaded ): 

193 raise _exceptions.GlobalsPopulationFailure( filepath, target_context ) 

194 return loaded 

195 

196 

197def _write_merged_settings( 

198 target: __.Path, merged: dict[ str, __.typx.Any ] 

199) -> None: 

200 ''' Writes merged settings with backup of existing file. 

201 

202 Creates target directory if needed. Backs up existing file before 

203 writing merged result. 

204 ''' 

205 target.parent.mkdir( parents = True, exist_ok = True ) 

206 if target.exists( ): 

207 backup_path = target.with_suffix( '.json.backup' ) 

208 try: __.shutil.copy2( target, backup_path ) 

209 except ( OSError, IOError ) as exception: 

210 raise _exceptions.GlobalsPopulationFailure( 

211 target, target ) from exception 

212 try: 

213 target.write_text( 

214 _json.dumps( merged, indent = 2 ), encoding = 'utf-8' ) 

215 except ( OSError, IOError ) as exception: 

216 raise _exceptions.GlobalsPopulationFailure( 

217 target, target 

218 ) from exception 

219 

220 

221def _load_toml_file( 

222 filepath: __.Path, target_context: __.Path 

223) -> dict[ str, __.typx.Any ]: 

224 ''' Loads TOML file with error handling. 

225 

226 Raises GlobalsPopulationFailure with source context on any error. 

227 ''' 

228 try: content = filepath.read_text( encoding = 'utf-8' ) 

229 except ( OSError, IOError ) as exception: 

230 raise _exceptions.GlobalsPopulationFailure( 

231 filepath, target_context ) from exception 

232 try: 

233 loaded: dict[ str, __.typx.Any ] = __.tomli.loads( 

234 content ) 

235 except __.tomli.TOMLDecodeError as exception: 

236 raise _exceptions.GlobalsPopulationFailure( 

237 filepath, target_context ) from exception 

238 return loaded 

239 

240 

241def _write_merged_toml_settings( 

242 target: __.Path, merged: dict[ str, __.typx.Any ] 

243) -> None: 

244 ''' Writes merged TOML settings with backup of existing file. 

245 

246 Creates target directory if needed. Backs up existing file before 

247 writing merged result. 

248 ''' 

249 target.parent.mkdir( parents = True, exist_ok = True ) 

250 if target.exists( ): 250 ↛ 256line 250 didn't jump to line 256 because the condition on line 250 was always true

251 backup_path = target.with_suffix( '.toml.backup' ) 

252 try: __.shutil.copy2( target, backup_path ) 

253 except ( OSError, IOError ) as exception: 

254 raise _exceptions.GlobalsPopulationFailure( 

255 target, target ) from exception 

256 try: 

257 target.write_text( _toml.dumps( merged ), encoding = 'utf-8' ) 

258 except ( OSError, IOError ) as exception: 

259 raise _exceptions.GlobalsPopulationFailure( 

260 target, target 

261 ) from exception 

262 

263 

264def _toml_content_contains_comments( content: str ) -> bool: 

265 ''' Heuristic detection for comments in a TOML file. 

266 

267 TOML comments begin with '#'. We warn when overwriting TOML settings 

268 because our merge process rewrites the file and does not preserve 

269 comments or formatting. 

270 ''' 

271 for line in content.splitlines( ): 

272 stripped = line.lstrip( ) 

273 if stripped.startswith( '#' ): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 return True 

275 if ' #' in line: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 return True 

277 return False 

278 

279 

280def populate_user_wrappers( 

281 data_location: __.Path, 

282 simulate: bool = False, 

283) -> tuple[ int, int ]: 

284 ''' Installs wrapper scripts to user bin directory. 

285 

286 Copies wrapper scripts from data source to ~/.local/bin, 

287 making them executable. Returns tuple of (files_attempted, 

288 files_installed) counts. 

289 ''' 

290 wrappers_dir = data_location / 'user' / 'executables' 

291 user_bin = __.Path.home( ) / '.local' / 'bin' 

292 if not wrappers_dir.exists( ): 

293 return ( 0, 0 ) 

294 files_attempted = 0 

295 files_installed = 0 

296 for script in wrappers_dir.iterdir( ): 

297 if not script.is_file( ): 

298 continue 

299 files_attempted += 1 

300 target = user_bin / script.name 

301 if not simulate: 

302 user_bin.mkdir( parents = True, exist_ok = True ) 

303 try: __.shutil.copy2( script, target ) 

304 except ( OSError, IOError ) as exception: 

305 raise _exceptions.GlobalsPopulationFailure( 

306 script, target 

307 ) from exception 

308 try: target.chmod( target.stat( ).st_mode | 0o111 ) 

309 except ( OSError, IOError ) as exception: 

310 raise _exceptions.GlobalsPopulationFailure( 

311 script, target 

312 ) from exception 

313 files_installed += 1 

314 return ( files_attempted, files_installed ) 

315 

316 

317def _deep_merge_settings( 

318 target: dict[ str, __.typx.Any ], source: dict[ str, __.typx.Any ] 

319) -> dict[ str, __.typx.Any ]: 

320 ''' Recursively merges source into target preserving target values. 

321 

322 Implements additive merge: adds keys from source that are missing 

323 in target. When both contain same key with dict values, recursively 

324 merges nested dicts. For conflicting scalar values, target value 

325 wins (user preferences preserved). 

326 ''' 

327 result = target.copy( ) 

328 for key, source_value in source.items( ): 

329 if key not in result: 

330 result[ key ] = source_value 

331 elif ( 

332 _is_json_dict( result[ key ] ) 

333 and _is_json_dict( source_value ) 

334 ): 

335 result[ key ] = _deep_merge_settings( 

336 result[ key ], source_value ) 

337 return result