Coverage for sources / agentsmgr / operations.py: 8%

168 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 23:00 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core operations for content generation and directory population. 

22 

23 This module provides functions for orchestrating content generation, 

24 including directory population and file writing operations with 

25 simulation support. 

26''' 

27 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import generator as _generator 

32from . import renderers as _renderers 

33 

34 

35_MANAGED_BLOCK_BEGIN = '# BEGIN: Managed by agentsmgr (emcd-agents)' 

36_MANAGED_BLOCK_WARNING = '# Do not manually edit entries in this block.' 

37_MANAGED_BLOCK_END = '# END: Managed by agentsmgr (emcd-agents)' 

38 

39 

40def populate_directory( 

41 generator: _generator.ContentGenerator, 

42 target: __.Path, 

43 simulate: bool = False 

44) -> tuple[ int, int ]: 

45 ''' Generates all content items to target directory. 

46 

47 Orchestrates content generation for all coders and item types 

48 configured in generator. Returns tuple of (items_attempted, 

49 items_written). 

50 ''' 

51 items_attempted = 0 

52 items_written = 0 

53 _ensure_output_directories( generator, target, simulate ) 

54 for coder_name in generator.configuration[ 'coders' ]: 

55 try: renderer = _renderers.RENDERERS[ coder_name ] 

56 except KeyError: continue 

57 for item_type in renderer.item_types_available: 

58 attempted, written = generate_coder_item_type( 

59 generator, coder_name, item_type, target, simulate ) 

60 items_attempted += attempted 

61 items_written += written 

62 return ( items_attempted, items_written ) 

63 

64def _ensure_output_directories( 

65 generator: _generator.ContentGenerator, 

66 target: __.Path, 

67 simulate: bool, 

68) -> None: 

69 if simulate or generator.mode == 'nowhere': return 

70 for coder_name in generator.configuration[ 'coders' ]: 

71 try: renderer = _renderers.RENDERERS[ coder_name ] 

72 except KeyError: continue 

73 if generator.mode == 'default': actual_mode = renderer.mode_default 

74 else: actual_mode = generator.mode 

75 if actual_mode not in ( 'per-user', 'per-project' ): continue 

76 if actual_mode not in renderer.modes_available: continue 

77 base_directory = renderer.resolve_base_directory( 

78 mode = actual_mode, 

79 target = target, 

80 configuration = generator.application_configuration, 

81 environment = __.os.environ, 

82 ) 

83 for item_type in renderer.item_types_available: 

84 dirname = renderer.produce_output_structure( item_type ) 

85 ( base_directory / dirname ).mkdir( 

86 parents = True, exist_ok = True ) 

87 

88 

89def _content_exists( 

90 generator: _generator.ContentGenerator, 

91 item_type: str, 

92 item_name: str, 

93 coder: str 

94) -> bool: 

95 ''' Checks if content file exists without loading it. 

96 

97 Uses path resolution from ContentGenerator to check both primary 

98 and fallback locations. Returns True if content is available. 

99 ''' 

100 primary_path, fallback_path = generator.resolve_content_paths( 

101 item_type, item_name, coder ) 

102 if primary_path.exists( ): 

103 return True 

104 return bool( fallback_path and fallback_path.exists( ) ) 

105 

106 

107def generate_coder_item_type( 

108 generator: _generator.ContentGenerator, 

109 coder: str, 

110 item_type: str, 

111 target: __.Path, 

112 simulate: bool 

113) -> tuple[ int, int ]: 

114 ''' Generates items of specific type for a coder. 

115 

116 Generates all items (commands or agents) for specified coder by 

117 iterating through configuration files. Pre-checks content 

118 availability and skips items with missing content. Returns tuple 

119 of (items_attempted, items_written). 

120 ''' 

121 items_attempted = 0 

122 items_written = 0 

123 if generator.mode == 'nowhere': 

124 return ( items_attempted, items_written ) 

125 configuration_directory = ( 

126 generator.location / 'configurations' / item_type ) 

127 if not configuration_directory.exists( ): 

128 return ( items_attempted, items_written ) 

129 for configuration_file in configuration_directory.glob( '*.toml' ): 

130 item_name = configuration_file.stem 

131 if not _content_exists( generator, item_type, item_name, coder ): 

132 __.provide_scribe( __name__ ).warning( 

133 f"Skipping {item_type}/{item_name} for {coder}: " 

134 "content not found" ) 

135 continue 

136 items_attempted += 1 

137 result = generator.render_single_item( 

138 item_type, item_name, coder, target ) 

139 if save_content( result.content, result.location, simulate ): 

140 items_written += 1 

141 return ( items_attempted, items_written ) 

142 

143 

144def save_content( 

145 content: str, location: __.Path, simulate: bool = False 

146) -> bool: 

147 ''' Saves content to location, creating parent directories as needed. 

148 

149 Writes content to specified location, creating parent directories 

150 if necessary. In simulation mode, no actual writing occurs. 

151 Returns True if file was written, False if simulated. 

152 ''' 

153 if simulate: return False 

154 try: location.parent.mkdir( parents = True, exist_ok = True ) 

155 except ( OSError, IOError ) as exception: 

156 raise _exceptions.FileOperationFailure( 

157 location.parent, "create directory" ) from exception 

158 try: location.write_text( content, encoding = 'utf-8' ) 

159 except ( OSError, IOError ) as exception: 

160 raise _exceptions.FileOperationFailure( 

161 location, "save content" ) from exception 

162 return True 

163 

164 

165def update_git_exclude( 

166 target: __.Path, 

167 entries: __.cabc.Collection[ str ], 

168 simulate: bool = False 

169) -> int: 

170 ''' Updates .git/info/exclude with managed block of agentsmgr entries. 

171 

172 Maintains a clearly-marked block of entries managed by agentsmgr, 

173 with complete replacement on each update. Entries are sorted 

174 lexicographically within the block. User entries outside the 

175 managed block are preserved. 

176 

177 Handles GIT_DIR environment variable and git worktrees by 

178 resolving the actual git directory location and using the common 

179 git directory for shared resources. 

180 

181 Returns count of entries in managed block. 

182 ''' 

183 if simulate: return 0 

184 git_dir = _resolve_git_directory( target ) 

185 if not git_dir: return 0 

186 exclude_file = git_dir / 'info' / 'exclude' 

187 if not exclude_file.exists( ): return 0 

188 try: content = exclude_file.read_text( encoding = 'utf-8' ) 

189 except ( OSError, IOError ) as exception: 

190 raise _exceptions.FileOperationFailure( 

191 exclude_file, "read git exclude file" ) from exception 

192 normalized_entries = sorted( { 

193 ( 

194 "/{0}".format( entry.strip( ) ) 

195 if entry.strip( ) and not entry.strip( ).startswith( '/' ) 

196 else entry.strip( ) 

197 ) 

198 for entry in entries if entry.strip( ) 

199 } ) 

200 if not normalized_entries: 

201 new_content = _remove_managed_block( content ) 

202 if new_content == content: return 0 

203 else: 

204 new_content = _update_managed_block( content, normalized_entries ) 

205 try: exclude_file.write_text( new_content, encoding = 'utf-8' ) 

206 except ( OSError, IOError ) as exception: 

207 raise _exceptions.FileOperationFailure( 

208 exclude_file, "update git exclude file" ) from exception 

209 return len( normalized_entries ) 

210 

211 

212def _update_managed_block( 

213 content: str, entries: __.cabc.Sequence[ str ] 

214) -> str: 

215 ''' Updates content with new managed block containing sorted entries. 

216 

217 Locates existing managed block (if present) and replaces it with 

218 new block. If no block exists, appends to end of file. Preserves 

219 user content outside the managed block. 

220 ''' 

221 lines = content.splitlines( ) 

222 before_block, after_block = _partition_around_managed_block( lines ) 

223 block_lines = [ _MANAGED_BLOCK_BEGIN, _MANAGED_BLOCK_WARNING ] 

224 block_lines.extend( entries ) 

225 block_lines.append( _MANAGED_BLOCK_END ) 

226 if before_block and before_block[ -1 ].strip( ): 

227 before_block.append( '' ) 

228 result_lines = before_block + block_lines 

229 if after_block: 

230 result_lines.append( '' ) 

231 result_lines.extend( after_block ) 

232 return '\n'.join( result_lines ) + '\n' 

233 

234 

235def _remove_managed_block( content: str ) -> str: 

236 ''' Removes managed block from content, preserving user entries. 

237 

238 Locates and removes managed block if present. Returns content 

239 unchanged if no block found. 

240 ''' 

241 lines = content.splitlines( ) 

242 before_block, after_block = _partition_around_managed_block( lines ) 

243 if not before_block and not after_block: 

244 return content 

245 result_lines = before_block 

246 if result_lines and after_block: 

247 if result_lines[ -1 ].strip( ): 

248 result_lines.append( '' ) 

249 result_lines.extend( after_block ) 

250 elif after_block: 

251 result_lines = after_block 

252 if not result_lines: return '' 

253 return '\n'.join( result_lines ) + '\n' 

254 

255 

256def _partition_around_managed_block( 

257 lines: __.cabc.Sequence[ str ] 

258) -> tuple[ list[ str ], list[ str ] ]: 

259 ''' Partitions lines into content before and after managed block. 

260 

261 Locates managed block markers and returns (before, after) tuple. 

262 If block is malformed or not found, returns (all_lines, []). 

263 Malformed blocks are treated as non-existent. 

264 ''' 

265 try: begin_index = lines.index( _MANAGED_BLOCK_BEGIN ) 

266 except ValueError: return ( list( lines ), [ ] ) 

267 try: end_index = lines.index( _MANAGED_BLOCK_END, begin_index ) 

268 except ValueError: 

269 __.provide_scribe( __name__ ).warning( 

270 "Malformed agentsmgr block in .git/info/exclude; rebuilding." ) 

271 return ( list( lines ), [ ] ) 

272 if end_index < begin_index: 

273 __.provide_scribe( __name__ ).warning( 

274 "Malformed agentsmgr block in .git/info/exclude; rebuilding." ) 

275 return ( list( lines ), [ ] ) 

276 before_block = list( lines[ :begin_index ] ) 

277 while before_block and not before_block[ -1 ].strip( ): 

278 before_block.pop( ) 

279 after_block = list( lines[ end_index + 1: ] ) 

280 while after_block and not after_block[ 0 ].strip( ): 

281 after_block.pop( 0 ) 

282 return ( before_block, after_block ) 

283 

284def _resolve_git_directory( 

285 start_path: __.Path 

286) -> __.typx.Optional[ __.Path ]: 

287 ''' Resolves git directory location, handling GIT_DIR and worktrees. 

288 

289 Checks GIT_DIR environment variable first, then uses Dulwich to 

290 discover repository. Returns common git directory (shared across 

291 worktrees) for access to shared resources like info/exclude. 

292 

293 Returns None if not in a git repository or on error. 

294 ''' 

295 from dulwich.repo import Repo 

296 git_dir_env = __.os.environ.get( 'GIT_DIR' ) 

297 if git_dir_env: 

298 git_dir_path = __.Path( git_dir_env ) 

299 if git_dir_path.exists( ) and git_dir_path.is_dir( ): 

300 return _discover_common_git_directory( git_dir_path ) 

301 try: repo = Repo.discover( str( start_path ) ) 

302 except Exception: return None 

303 git_dir_path = __.Path( repo.controldir( ) ) 

304 return _discover_common_git_directory( git_dir_path ) 

305 

306def _discover_common_git_directory( git_dir: __.Path ) -> __.Path: 

307 ''' Discovers common git directory, handling worktree commondir. 

308 

309 For worktrees, reads commondir file to find shared resources. 

310 For standard repos, returns git_dir unchanged. 

311 ''' 

312 commondir_file = git_dir / 'commondir' 

313 if not commondir_file.exists( ): 

314 return git_dir 

315 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( ) 

316 except ( OSError, IOError ): return git_dir 

317 return ( git_dir / common_path ).resolve( ) 

318 

319def copy_coder_resources( 

320 source_root: __.Path, 

321 target_root: __.Path, 

322 coders: __.cabc.Sequence[ str ], 

323 simulate: bool = False 

324) -> tuple[ int, int ]: 

325 ''' Copies static resources for specified coders. 

326 

327 Iterates through coders and copies resources from 

328 source_root/<coder> to target_root/<coder>. 

329 Returns tuple of (coders_attempted, coders_processed). 

330 ''' 

331 attempted = 0 

332 processed = 0 

333 for coder in coders: 

334 source = source_root / coder 

335 target = target_root / coder 

336 if not source.exists( ): 

337 __.provide_scribe( __name__ ).debug( 

338 f"No resources found for {coder} at {source}" ) 

339 continue 

340 attempted += 1 

341 if copy_resource_content( source, target, simulate ): 

342 processed += 1 

343 return ( attempted, processed ) 

344 

345def copy_resource_content( 

346 source: __.Path, target: __.Path, simulate: bool 

347) -> bool: 

348 ''' Recursively copies directory contents. 

349 

350 Copies all files and subdirectories from source to target, 

351 overwriting existing files. Handles directory creation. 

352 ''' 

353 if simulate: return True 

354 try: 

355 target.mkdir( parents = True, exist_ok = True ) 

356 except ( OSError, IOError ) as exception: 

357 raise _exceptions.CoderResourceCopyFailure( 

358 source, target 

359 ) from exception 

360 try: 

361 __.shutil.copytree( source, target, dirs_exist_ok = True ) 

362 except ( OSError, IOError ) as exception: 

363 raise _exceptions.CoderResourceCopyFailure( 

364 source, target 

365 ) from exception 

366 return True