Coverage for sources/agentsmgr/operations.py: 9%

127 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-28 17:44 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core operations for content generation and directory population. 

22 

23 This module provides functions for orchestrating content generation, 

24 including directory population and file writing operations with 

25 simulation support. 

26''' 

27 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import generator as _generator 

32 

33 

34_MANAGED_BLOCK_BEGIN = '# BEGIN: Managed by agentsmgr (emcd-agents)' 

35_MANAGED_BLOCK_WARNING = '# Do not manually edit entries in this block.' 

36_MANAGED_BLOCK_END = '# END: Managed by agentsmgr (emcd-agents)' 

37 

38 

39def populate_directory( 

40 generator: _generator.ContentGenerator, 

41 target: __.Path, 

42 simulate: bool = False 

43) -> tuple[ int, int ]: 

44 ''' Generates all content items to target directory. 

45 

46 Orchestrates content generation for all coders and item types 

47 configured in generator. Returns tuple of (items_attempted, 

48 items_written). 

49 ''' 

50 items_attempted = 0 

51 items_written = 0 

52 for coder_name in generator.configuration[ 'coders' ]: 

53 for item_type in ( 'commands', 'agents' ): 

54 attempted, written = generate_coder_item_type( 

55 generator, coder_name, item_type, target, simulate ) 

56 items_attempted += attempted 

57 items_written += written 

58 return ( items_attempted, items_written ) 

59 

60 

61def _content_exists( 

62 generator: _generator.ContentGenerator, 

63 item_type: str, 

64 item_name: str, 

65 coder: str 

66) -> bool: 

67 ''' Checks if content file exists without loading it. 

68 

69 Uses path resolution from ContentGenerator to check both primary 

70 and fallback locations. Returns True if content is available. 

71 ''' 

72 primary_path, fallback_path = generator.resolve_content_paths( 

73 item_type, item_name, coder ) 

74 if primary_path.exists( ): 

75 return True 

76 return bool( fallback_path and fallback_path.exists( ) ) 

77 

78 

79def generate_coder_item_type( 

80 generator: _generator.ContentGenerator, 

81 coder: str, 

82 item_type: str, 

83 target: __.Path, 

84 simulate: bool 

85) -> tuple[ int, int ]: 

86 ''' Generates items of specific type for a coder. 

87 

88 Generates all items (commands or agents) for specified coder by 

89 iterating through configuration files. Pre-checks content 

90 availability and skips items with missing content. Returns tuple 

91 of (items_attempted, items_written). 

92 ''' 

93 items_attempted = 0 

94 items_written = 0 

95 if generator.mode == 'nowhere': 

96 return ( items_attempted, items_written ) 

97 configuration_directory = ( 

98 generator.location / 'configurations' / item_type ) 

99 if not configuration_directory.exists( ): 

100 return ( items_attempted, items_written ) 

101 for configuration_file in configuration_directory.glob( '*.toml' ): 

102 item_name = configuration_file.stem 

103 if not _content_exists( generator, item_type, item_name, coder ): 

104 __.provide_scribe( __name__ ).warning( 

105 f"Skipping {item_type}/{item_name} for {coder}: " 

106 "content not found" ) 

107 continue 

108 items_attempted += 1 

109 result = generator.render_single_item( 

110 item_type, item_name, coder, target ) 

111 if save_content( result.content, result.location, simulate ): 

112 items_written += 1 

113 return ( items_attempted, items_written ) 

114 

115 

116def save_content( 

117 content: str, location: __.Path, simulate: bool = False 

118) -> bool: 

119 ''' Saves content to location, creating parent directories as needed. 

120 

121 Writes content to specified location, creating parent directories 

122 if necessary. In simulation mode, no actual writing occurs. 

123 Returns True if file was written, False if simulated. 

124 ''' 

125 if simulate: return False 

126 try: location.parent.mkdir( parents = True, exist_ok = True ) 

127 except ( OSError, IOError ) as exception: 

128 raise _exceptions.FileOperationFailure( 

129 location.parent, "create directory" ) from exception 

130 try: location.write_text( content, encoding = 'utf-8' ) 

131 except ( OSError, IOError ) as exception: 

132 raise _exceptions.FileOperationFailure( 

133 location, "save content" ) from exception 

134 return True 

135 

136 

137def update_git_exclude( 

138 target: __.Path, 

139 entries: __.cabc.Collection[ str ], 

140 simulate: bool = False 

141) -> int: 

142 ''' Updates .git/info/exclude with managed block of agentsmgr entries. 

143 

144 Maintains a clearly-marked block of entries managed by agentsmgr, 

145 with complete replacement on each update. Entries are sorted 

146 lexicographically within the block. User entries outside the 

147 managed block are preserved. 

148 

149 Handles GIT_DIR environment variable and git worktrees by 

150 resolving the actual git directory location and using the common 

151 git directory for shared resources. 

152 

153 Returns count of entries in managed block. 

154 ''' 

155 if simulate: return 0 

156 git_dir = _resolve_git_directory( target ) 

157 if not git_dir: return 0 

158 exclude_file = git_dir / 'info' / 'exclude' 

159 if not exclude_file.exists( ): return 0 

160 try: content = exclude_file.read_text( encoding = 'utf-8' ) 

161 except ( OSError, IOError ) as exception: 

162 raise _exceptions.FileOperationFailure( 

163 exclude_file, "read git exclude file" ) from exception 

164 normalized_entries = sorted( { 

165 entry.strip( ) for entry in entries if entry.strip( ) 

166 } ) 

167 if not normalized_entries: 

168 new_content = _remove_managed_block( content ) 

169 if new_content == content: return 0 

170 else: 

171 new_content = _update_managed_block( content, normalized_entries ) 

172 try: exclude_file.write_text( new_content, encoding = 'utf-8' ) 

173 except ( OSError, IOError ) as exception: 

174 raise _exceptions.FileOperationFailure( 

175 exclude_file, "update git exclude file" ) from exception 

176 return len( normalized_entries ) 

177 

178 

179def _update_managed_block( 

180 content: str, entries: __.cabc.Sequence[ str ] 

181) -> str: 

182 ''' Updates content with new managed block containing sorted entries. 

183 

184 Locates existing managed block (if present) and replaces it with 

185 new block. If no block exists, appends to end of file. Preserves 

186 user content outside the managed block. 

187 ''' 

188 lines = content.splitlines( ) 

189 before_block, after_block = _partition_around_managed_block( lines ) 

190 block_lines = [ _MANAGED_BLOCK_BEGIN, _MANAGED_BLOCK_WARNING ] 

191 block_lines.extend( entries ) 

192 block_lines.append( _MANAGED_BLOCK_END ) 

193 if before_block and before_block[ -1 ].strip( ): 

194 before_block.append( '' ) 

195 result_lines = before_block + block_lines 

196 if after_block: 

197 result_lines.append( '' ) 

198 result_lines.extend( after_block ) 

199 return '\n'.join( result_lines ) + '\n' 

200 

201 

202def _remove_managed_block( content: str ) -> str: 

203 ''' Removes managed block from content, preserving user entries. 

204 

205 Locates and removes managed block if present. Returns content 

206 unchanged if no block found. 

207 ''' 

208 lines = content.splitlines( ) 

209 before_block, after_block = _partition_around_managed_block( lines ) 

210 if not before_block and not after_block: 

211 return content 

212 result_lines = before_block 

213 if result_lines and after_block: 

214 if result_lines[ -1 ].strip( ): 

215 result_lines.append( '' ) 

216 result_lines.extend( after_block ) 

217 elif after_block: 

218 result_lines = after_block 

219 if not result_lines: return '' 

220 return '\n'.join( result_lines ) + '\n' 

221 

222 

223def _partition_around_managed_block( 

224 lines: __.cabc.Sequence[ str ] 

225) -> tuple[ list[ str ], list[ str ] ]: 

226 ''' Partitions lines into content before and after managed block. 

227 

228 Locates managed block markers and returns (before, after) tuple. 

229 If block is malformed or not found, returns (all_lines, []). 

230 Malformed blocks are treated as non-existent. 

231 ''' 

232 try: begin_index = lines.index( _MANAGED_BLOCK_BEGIN ) 

233 except ValueError: return ( list( lines ), [ ] ) 

234 try: end_index = lines.index( _MANAGED_BLOCK_END, begin_index ) 

235 except ValueError: 

236 __.provide_scribe( __name__ ).warning( 

237 "Malformed agentsmgr block in .git/info/exclude; rebuilding." ) 

238 return ( list( lines ), [ ] ) 

239 if end_index < begin_index: 

240 __.provide_scribe( __name__ ).warning( 

241 "Malformed agentsmgr block in .git/info/exclude; rebuilding." ) 

242 return ( list( lines ), [ ] ) 

243 before_block = list( lines[ :begin_index ] ) 

244 while before_block and not before_block[ -1 ].strip( ): 

245 before_block.pop( ) 

246 after_block = list( lines[ end_index + 1: ] ) 

247 while after_block and not after_block[ 0 ].strip( ): 

248 after_block.pop( 0 ) 

249 return ( before_block, after_block ) 

250 

251 

252def _resolve_git_directory( 

253 start_path: __.Path 

254) -> __.typx.Optional[ __.Path ]: 

255 ''' Resolves git directory location, handling GIT_DIR and worktrees. 

256 

257 Checks GIT_DIR environment variable first, then uses Dulwich to 

258 discover repository. Returns common git directory (shared across 

259 worktrees) for access to shared resources like info/exclude. 

260 

261 Returns None if not in a git repository or on error. 

262 ''' 

263 from dulwich.repo import Repo 

264 git_dir_env = __.os.environ.get( 'GIT_DIR' ) 

265 if git_dir_env: 

266 git_dir_path = __.Path( git_dir_env ) 

267 if git_dir_path.exists( ) and git_dir_path.is_dir( ): 

268 return _discover_common_git_directory( git_dir_path ) 

269 try: repo = Repo.discover( str( start_path ) ) 

270 except Exception: return None 

271 git_dir_path = __.Path( repo.controldir( ) ) 

272 return _discover_common_git_directory( git_dir_path ) 

273 

274 

275def _discover_common_git_directory( git_dir: __.Path ) -> __.Path: 

276 ''' Discovers common git directory, handling worktree commondir. 

277 

278 For worktrees, reads commondir file to find shared resources. 

279 For standard repos, returns git_dir unchanged. 

280 ''' 

281 commondir_file = git_dir / 'commondir' 

282 if not commondir_file.exists( ): 

283 return git_dir 

284 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( ) 

285 except ( OSError, IOError ): return git_dir 

286 return ( git_dir / common_path ).resolve( )