Coverage for sources / agentsmgr / operations.py: 8%

151 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-01 15:37 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core operations for content generation and directory population. 

22 

23 This module provides functions for orchestrating content generation, 

24 including directory population and file writing operations with 

25 simulation support. 

26''' 

27 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import generator as _generator 

32 

33 

34_MANAGED_BLOCK_BEGIN = '# BEGIN: Managed by agentsmgr (emcd-agents)' 

35_MANAGED_BLOCK_WARNING = '# Do not manually edit entries in this block.' 

36_MANAGED_BLOCK_END = '# END: Managed by agentsmgr (emcd-agents)' 

37 

38 

39def populate_directory( 

40 generator: _generator.ContentGenerator, 

41 target: __.Path, 

42 simulate: bool = False 

43) -> tuple[ int, int ]: 

44 ''' Generates all content items to target directory. 

45 

46 Orchestrates content generation for all coders and item types 

47 configured in generator. Returns tuple of (items_attempted, 

48 items_written). 

49 ''' 

50 items_attempted = 0 

51 items_written = 0 

52 for coder_name in generator.configuration[ 'coders' ]: 

53 for item_type in ( 'commands', 'agents' ): 

54 attempted, written = generate_coder_item_type( 

55 generator, coder_name, item_type, target, simulate ) 

56 items_attempted += attempted 

57 items_written += written 

58 return ( items_attempted, items_written ) 

59 

60 

61def _content_exists( 

62 generator: _generator.ContentGenerator, 

63 item_type: str, 

64 item_name: str, 

65 coder: str 

66) -> bool: 

67 ''' Checks if content file exists without loading it. 

68 

69 Uses path resolution from ContentGenerator to check both primary 

70 and fallback locations. Returns True if content is available. 

71 ''' 

72 primary_path, fallback_path = generator.resolve_content_paths( 

73 item_type, item_name, coder ) 

74 if primary_path.exists( ): 

75 return True 

76 return bool( fallback_path and fallback_path.exists( ) ) 

77 

78 

79def generate_coder_item_type( 

80 generator: _generator.ContentGenerator, 

81 coder: str, 

82 item_type: str, 

83 target: __.Path, 

84 simulate: bool 

85) -> tuple[ int, int ]: 

86 ''' Generates items of specific type for a coder. 

87 

88 Generates all items (commands or agents) for specified coder by 

89 iterating through configuration files. Pre-checks content 

90 availability and skips items with missing content. Returns tuple 

91 of (items_attempted, items_written). 

92 ''' 

93 items_attempted = 0 

94 items_written = 0 

95 if generator.mode == 'nowhere': 

96 return ( items_attempted, items_written ) 

97 configuration_directory = ( 

98 generator.location / 'configurations' / item_type ) 

99 if not configuration_directory.exists( ): 

100 return ( items_attempted, items_written ) 

101 for configuration_file in configuration_directory.glob( '*.toml' ): 

102 item_name = configuration_file.stem 

103 if not _content_exists( generator, item_type, item_name, coder ): 

104 __.provide_scribe( __name__ ).warning( 

105 f"Skipping {item_type}/{item_name} for {coder}: " 

106 "content not found" ) 

107 continue 

108 items_attempted += 1 

109 result = generator.render_single_item( 

110 item_type, item_name, coder, target ) 

111 if save_content( result.content, result.location, simulate ): 

112 items_written += 1 

113 return ( items_attempted, items_written ) 

114 

115 

116def save_content( 

117 content: str, location: __.Path, simulate: bool = False 

118) -> bool: 

119 ''' Saves content to location, creating parent directories as needed. 

120 

121 Writes content to specified location, creating parent directories 

122 if necessary. In simulation mode, no actual writing occurs. 

123 Returns True if file was written, False if simulated. 

124 ''' 

125 if simulate: return False 

126 try: location.parent.mkdir( parents = True, exist_ok = True ) 

127 except ( OSError, IOError ) as exception: 

128 raise _exceptions.FileOperationFailure( 

129 location.parent, "create directory" ) from exception 

130 try: location.write_text( content, encoding = 'utf-8' ) 

131 except ( OSError, IOError ) as exception: 

132 raise _exceptions.FileOperationFailure( 

133 location, "save content" ) from exception 

134 return True 

135 

136 

137def update_git_exclude( 

138 target: __.Path, 

139 entries: __.cabc.Collection[ str ], 

140 simulate: bool = False 

141) -> int: 

142 ''' Updates .git/info/exclude with managed block of agentsmgr entries. 

143 

144 Maintains a clearly-marked block of entries managed by agentsmgr, 

145 with complete replacement on each update. Entries are sorted 

146 lexicographically within the block. User entries outside the 

147 managed block are preserved. 

148 

149 Handles GIT_DIR environment variable and git worktrees by 

150 resolving the actual git directory location and using the common 

151 git directory for shared resources. 

152 

153 Returns count of entries in managed block. 

154 ''' 

155 if simulate: return 0 

156 git_dir = _resolve_git_directory( target ) 

157 if not git_dir: return 0 

158 exclude_file = git_dir / 'info' / 'exclude' 

159 if not exclude_file.exists( ): return 0 

160 try: content = exclude_file.read_text( encoding = 'utf-8' ) 

161 except ( OSError, IOError ) as exception: 

162 raise _exceptions.FileOperationFailure( 

163 exclude_file, "read git exclude file" ) from exception 

164 normalized_entries = sorted( { 

165 ( 

166 "/{0}".format( entry.strip( ) ) 

167 if entry.strip( ) and not entry.strip( ).startswith( '/' ) 

168 else entry.strip( ) 

169 ) 

170 for entry in entries if entry.strip( ) 

171 } ) 

172 if not normalized_entries: 

173 new_content = _remove_managed_block( content ) 

174 if new_content == content: return 0 

175 else: 

176 new_content = _update_managed_block( content, normalized_entries ) 

177 try: exclude_file.write_text( new_content, encoding = 'utf-8' ) 

178 except ( OSError, IOError ) as exception: 

179 raise _exceptions.FileOperationFailure( 

180 exclude_file, "update git exclude file" ) from exception 

181 return len( normalized_entries ) 

182 

183 

184def _update_managed_block( 

185 content: str, entries: __.cabc.Sequence[ str ] 

186) -> str: 

187 ''' Updates content with new managed block containing sorted entries. 

188 

189 Locates existing managed block (if present) and replaces it with 

190 new block. If no block exists, appends to end of file. Preserves 

191 user content outside the managed block. 

192 ''' 

193 lines = content.splitlines( ) 

194 before_block, after_block = _partition_around_managed_block( lines ) 

195 block_lines = [ _MANAGED_BLOCK_BEGIN, _MANAGED_BLOCK_WARNING ] 

196 block_lines.extend( entries ) 

197 block_lines.append( _MANAGED_BLOCK_END ) 

198 if before_block and before_block[ -1 ].strip( ): 

199 before_block.append( '' ) 

200 result_lines = before_block + block_lines 

201 if after_block: 

202 result_lines.append( '' ) 

203 result_lines.extend( after_block ) 

204 return '\n'.join( result_lines ) + '\n' 

205 

206 

207def _remove_managed_block( content: str ) -> str: 

208 ''' Removes managed block from content, preserving user entries. 

209 

210 Locates and removes managed block if present. Returns content 

211 unchanged if no block found. 

212 ''' 

213 lines = content.splitlines( ) 

214 before_block, after_block = _partition_around_managed_block( lines ) 

215 if not before_block and not after_block: 

216 return content 

217 result_lines = before_block 

218 if result_lines and after_block: 

219 if result_lines[ -1 ].strip( ): 

220 result_lines.append( '' ) 

221 result_lines.extend( after_block ) 

222 elif after_block: 

223 result_lines = after_block 

224 if not result_lines: return '' 

225 return '\n'.join( result_lines ) + '\n' 

226 

227 

228def _partition_around_managed_block( 

229 lines: __.cabc.Sequence[ str ] 

230) -> tuple[ list[ str ], list[ str ] ]: 

231 ''' Partitions lines into content before and after managed block. 

232 

233 Locates managed block markers and returns (before, after) tuple. 

234 If block is malformed or not found, returns (all_lines, []). 

235 Malformed blocks are treated as non-existent. 

236 ''' 

237 try: begin_index = lines.index( _MANAGED_BLOCK_BEGIN ) 

238 except ValueError: return ( list( lines ), [ ] ) 

239 try: end_index = lines.index( _MANAGED_BLOCK_END, begin_index ) 

240 except ValueError: 

241 __.provide_scribe( __name__ ).warning( 

242 "Malformed agentsmgr block in .git/info/exclude; rebuilding." ) 

243 return ( list( lines ), [ ] ) 

244 if end_index < begin_index: 

245 __.provide_scribe( __name__ ).warning( 

246 "Malformed agentsmgr block in .git/info/exclude; rebuilding." ) 

247 return ( list( lines ), [ ] ) 

248 before_block = list( lines[ :begin_index ] ) 

249 while before_block and not before_block[ -1 ].strip( ): 

250 before_block.pop( ) 

251 after_block = list( lines[ end_index + 1: ] ) 

252 while after_block and not after_block[ 0 ].strip( ): 

253 after_block.pop( 0 ) 

254 return ( before_block, after_block ) 

255 

256def _resolve_git_directory( 

257 start_path: __.Path 

258) -> __.typx.Optional[ __.Path ]: 

259 ''' Resolves git directory location, handling GIT_DIR and worktrees. 

260 

261 Checks GIT_DIR environment variable first, then uses Dulwich to 

262 discover repository. Returns common git directory (shared across 

263 worktrees) for access to shared resources like info/exclude. 

264 

265 Returns None if not in a git repository or on error. 

266 ''' 

267 from dulwich.repo import Repo 

268 git_dir_env = __.os.environ.get( 'GIT_DIR' ) 

269 if git_dir_env: 

270 git_dir_path = __.Path( git_dir_env ) 

271 if git_dir_path.exists( ) and git_dir_path.is_dir( ): 

272 return _discover_common_git_directory( git_dir_path ) 

273 try: repo = Repo.discover( str( start_path ) ) 

274 except Exception: return None 

275 git_dir_path = __.Path( repo.controldir( ) ) 

276 return _discover_common_git_directory( git_dir_path ) 

277 

278def _discover_common_git_directory( git_dir: __.Path ) -> __.Path: 

279 ''' Discovers common git directory, handling worktree commondir. 

280 

281 For worktrees, reads commondir file to find shared resources. 

282 For standard repos, returns git_dir unchanged. 

283 ''' 

284 commondir_file = git_dir / 'commondir' 

285 if not commondir_file.exists( ): 

286 return git_dir 

287 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( ) 

288 except ( OSError, IOError ): return git_dir 

289 return ( git_dir / common_path ).resolve( ) 

290 

291def copy_coder_resources( 

292 source_root: __.Path, 

293 target_root: __.Path, 

294 coders: __.cabc.Sequence[ str ], 

295 simulate: bool = False 

296) -> tuple[ int, int ]: 

297 ''' Copies static resources for specified coders. 

298 

299 Iterates through coders and copies resources from 

300 source_root/<coder> to target_root/<coder>. 

301 Returns tuple of (coders_attempted, coders_processed). 

302 ''' 

303 attempted = 0 

304 processed = 0 

305 for coder in coders: 

306 source = source_root / coder 

307 target = target_root / coder 

308 if not source.exists( ): 

309 __.provide_scribe( __name__ ).debug( 

310 f"No resources found for {coder} at {source}" ) 

311 continue 

312 attempted += 1 

313 if copy_resource_content( source, target, simulate ): 

314 processed += 1 

315 return ( attempted, processed ) 

316 

317def copy_resource_content( 

318 source: __.Path, target: __.Path, simulate: bool 

319) -> bool: 

320 ''' Recursively copies directory contents. 

321 

322 Copies all files and subdirectories from source to target, 

323 overwriting existing files. Handles directory creation. 

324 ''' 

325 if simulate: return True 

326 try: 

327 target.mkdir( parents = True, exist_ok = True ) 

328 except ( OSError, IOError ) as exception: 

329 raise _exceptions.CoderResourceCopyFailure( 

330 source, target 

331 ) from exception 

332 try: 

333 __.shutil.copytree( source, target, dirs_exist_ok = True ) 

334 except ( OSError, IOError ) as exception: 

335 raise _exceptions.CoderResourceCopyFailure( 

336 source, target 

337 ) from exception 

338 return True