Coverage for sources / agentsmgr / operations.py: 8%
151 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-30 00:03 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-30 00:03 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core operations for content generation and directory population.
23 This module provides functions for orchestrating content generation,
24 including directory population and file writing operations with
25 simulation support.
26'''
29from . import __
30from . import exceptions as _exceptions
31from . import generator as _generator
34_MANAGED_BLOCK_BEGIN = '# BEGIN: Managed by agentsmgr (emcd-agents)'
35_MANAGED_BLOCK_WARNING = '# Do not manually edit entries in this block.'
36_MANAGED_BLOCK_END = '# END: Managed by agentsmgr (emcd-agents)'
39def populate_directory(
40 generator: _generator.ContentGenerator,
41 target: __.Path,
42 simulate: bool = False
43) -> tuple[ int, int ]:
44 ''' Generates all content items to target directory.
46 Orchestrates content generation for all coders and item types
47 configured in generator. Returns tuple of (items_attempted,
48 items_written).
49 '''
50 items_attempted = 0
51 items_written = 0
52 for coder_name in generator.configuration[ 'coders' ]:
53 for item_type in ( 'commands', 'agents' ):
54 attempted, written = generate_coder_item_type(
55 generator, coder_name, item_type, target, simulate )
56 items_attempted += attempted
57 items_written += written
58 return ( items_attempted, items_written )
61def _content_exists(
62 generator: _generator.ContentGenerator,
63 item_type: str,
64 item_name: str,
65 coder: str
66) -> bool:
67 ''' Checks if content file exists without loading it.
69 Uses path resolution from ContentGenerator to check both primary
70 and fallback locations. Returns True if content is available.
71 '''
72 primary_path, fallback_path = generator.resolve_content_paths(
73 item_type, item_name, coder )
74 if primary_path.exists( ):
75 return True
76 return bool( fallback_path and fallback_path.exists( ) )
79def generate_coder_item_type(
80 generator: _generator.ContentGenerator,
81 coder: str,
82 item_type: str,
83 target: __.Path,
84 simulate: bool
85) -> tuple[ int, int ]:
86 ''' Generates items of specific type for a coder.
88 Generates all items (commands or agents) for specified coder by
89 iterating through configuration files. Pre-checks content
90 availability and skips items with missing content. Returns tuple
91 of (items_attempted, items_written).
92 '''
93 items_attempted = 0
94 items_written = 0
95 if generator.mode == 'nowhere':
96 return ( items_attempted, items_written )
97 configuration_directory = (
98 generator.location / 'configurations' / item_type )
99 if not configuration_directory.exists( ):
100 return ( items_attempted, items_written )
101 for configuration_file in configuration_directory.glob( '*.toml' ):
102 item_name = configuration_file.stem
103 if not _content_exists( generator, item_type, item_name, coder ):
104 __.provide_scribe( __name__ ).warning(
105 f"Skipping {item_type}/{item_name} for {coder}: "
106 "content not found" )
107 continue
108 items_attempted += 1
109 result = generator.render_single_item(
110 item_type, item_name, coder, target )
111 if save_content( result.content, result.location, simulate ):
112 items_written += 1
113 return ( items_attempted, items_written )
116def save_content(
117 content: str, location: __.Path, simulate: bool = False
118) -> bool:
119 ''' Saves content to location, creating parent directories as needed.
121 Writes content to specified location, creating parent directories
122 if necessary. In simulation mode, no actual writing occurs.
123 Returns True if file was written, False if simulated.
124 '''
125 if simulate: return False
126 try: location.parent.mkdir( parents = True, exist_ok = True )
127 except ( OSError, IOError ) as exception:
128 raise _exceptions.FileOperationFailure(
129 location.parent, "create directory" ) from exception
130 try: location.write_text( content, encoding = 'utf-8' )
131 except ( OSError, IOError ) as exception:
132 raise _exceptions.FileOperationFailure(
133 location, "save content" ) from exception
134 return True
137def update_git_exclude(
138 target: __.Path,
139 entries: __.cabc.Collection[ str ],
140 simulate: bool = False
141) -> int:
142 ''' Updates .git/info/exclude with managed block of agentsmgr entries.
144 Maintains a clearly-marked block of entries managed by agentsmgr,
145 with complete replacement on each update. Entries are sorted
146 lexicographically within the block. User entries outside the
147 managed block are preserved.
149 Handles GIT_DIR environment variable and git worktrees by
150 resolving the actual git directory location and using the common
151 git directory for shared resources.
153 Returns count of entries in managed block.
154 '''
155 if simulate: return 0
156 git_dir = _resolve_git_directory( target )
157 if not git_dir: return 0
158 exclude_file = git_dir / 'info' / 'exclude'
159 if not exclude_file.exists( ): return 0
160 try: content = exclude_file.read_text( encoding = 'utf-8' )
161 except ( OSError, IOError ) as exception:
162 raise _exceptions.FileOperationFailure(
163 exclude_file, "read git exclude file" ) from exception
164 normalized_entries = sorted( {
165 entry.strip( ) for entry in entries if entry.strip( )
166 } )
167 if not normalized_entries:
168 new_content = _remove_managed_block( content )
169 if new_content == content: return 0
170 else:
171 new_content = _update_managed_block( content, normalized_entries )
172 try: exclude_file.write_text( new_content, encoding = 'utf-8' )
173 except ( OSError, IOError ) as exception:
174 raise _exceptions.FileOperationFailure(
175 exclude_file, "update git exclude file" ) from exception
176 return len( normalized_entries )
179def _update_managed_block(
180 content: str, entries: __.cabc.Sequence[ str ]
181) -> str:
182 ''' Updates content with new managed block containing sorted entries.
184 Locates existing managed block (if present) and replaces it with
185 new block. If no block exists, appends to end of file. Preserves
186 user content outside the managed block.
187 '''
188 lines = content.splitlines( )
189 before_block, after_block = _partition_around_managed_block( lines )
190 block_lines = [ _MANAGED_BLOCK_BEGIN, _MANAGED_BLOCK_WARNING ]
191 block_lines.extend( entries )
192 block_lines.append( _MANAGED_BLOCK_END )
193 if before_block and before_block[ -1 ].strip( ):
194 before_block.append( '' )
195 result_lines = before_block + block_lines
196 if after_block:
197 result_lines.append( '' )
198 result_lines.extend( after_block )
199 return '\n'.join( result_lines ) + '\n'
202def _remove_managed_block( content: str ) -> str:
203 ''' Removes managed block from content, preserving user entries.
205 Locates and removes managed block if present. Returns content
206 unchanged if no block found.
207 '''
208 lines = content.splitlines( )
209 before_block, after_block = _partition_around_managed_block( lines )
210 if not before_block and not after_block:
211 return content
212 result_lines = before_block
213 if result_lines and after_block:
214 if result_lines[ -1 ].strip( ):
215 result_lines.append( '' )
216 result_lines.extend( after_block )
217 elif after_block:
218 result_lines = after_block
219 if not result_lines: return ''
220 return '\n'.join( result_lines ) + '\n'
223def _partition_around_managed_block(
224 lines: __.cabc.Sequence[ str ]
225) -> tuple[ list[ str ], list[ str ] ]:
226 ''' Partitions lines into content before and after managed block.
228 Locates managed block markers and returns (before, after) tuple.
229 If block is malformed or not found, returns (all_lines, []).
230 Malformed blocks are treated as non-existent.
231 '''
232 try: begin_index = lines.index( _MANAGED_BLOCK_BEGIN )
233 except ValueError: return ( list( lines ), [ ] )
234 try: end_index = lines.index( _MANAGED_BLOCK_END, begin_index )
235 except ValueError:
236 __.provide_scribe( __name__ ).warning(
237 "Malformed agentsmgr block in .git/info/exclude; rebuilding." )
238 return ( list( lines ), [ ] )
239 if end_index < begin_index:
240 __.provide_scribe( __name__ ).warning(
241 "Malformed agentsmgr block in .git/info/exclude; rebuilding." )
242 return ( list( lines ), [ ] )
243 before_block = list( lines[ :begin_index ] )
244 while before_block and not before_block[ -1 ].strip( ):
245 before_block.pop( )
246 after_block = list( lines[ end_index + 1: ] )
247 while after_block and not after_block[ 0 ].strip( ):
248 after_block.pop( 0 )
249 return ( before_block, after_block )
251def _resolve_git_directory(
252 start_path: __.Path
253) -> __.typx.Optional[ __.Path ]:
254 ''' Resolves git directory location, handling GIT_DIR and worktrees.
256 Checks GIT_DIR environment variable first, then uses Dulwich to
257 discover repository. Returns common git directory (shared across
258 worktrees) for access to shared resources like info/exclude.
260 Returns None if not in a git repository or on error.
261 '''
262 from dulwich.repo import Repo
263 git_dir_env = __.os.environ.get( 'GIT_DIR' )
264 if git_dir_env:
265 git_dir_path = __.Path( git_dir_env )
266 if git_dir_path.exists( ) and git_dir_path.is_dir( ):
267 return _discover_common_git_directory( git_dir_path )
268 try: repo = Repo.discover( str( start_path ) )
269 except Exception: return None
270 git_dir_path = __.Path( repo.controldir( ) )
271 return _discover_common_git_directory( git_dir_path )
273def _discover_common_git_directory( git_dir: __.Path ) -> __.Path:
274 ''' Discovers common git directory, handling worktree commondir.
276 For worktrees, reads commondir file to find shared resources.
277 For standard repos, returns git_dir unchanged.
278 '''
279 commondir_file = git_dir / 'commondir'
280 if not commondir_file.exists( ):
281 return git_dir
282 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( )
283 except ( OSError, IOError ): return git_dir
284 return ( git_dir / common_path ).resolve( )
286def copy_coder_resources(
287 source_root: __.Path,
288 target_root: __.Path,
289 coders: __.cabc.Sequence[ str ],
290 simulate: bool = False
291) -> tuple[ int, int ]:
292 ''' Copies static resources for specified coders.
294 Iterates through coders and copies resources from
295 source_root/<coder> to target_root/<coder>.
296 Returns tuple of (coders_attempted, coders_processed).
297 '''
298 attempted = 0
299 processed = 0
300 for coder in coders:
301 source = source_root / coder
302 target = target_root / coder
303 if not source.exists( ):
304 __.provide_scribe( __name__ ).debug(
305 f"No resources found for {coder} at {source}" )
306 continue
307 attempted += 1
308 if copy_resource_content( source, target, simulate ):
309 processed += 1
310 return ( attempted, processed )
312def copy_resource_content(
313 source: __.Path, target: __.Path, simulate: bool
314) -> bool:
315 ''' Recursively copies directory contents.
317 Copies all files and subdirectories from source to target,
318 overwriting existing files. Handles directory creation.
319 '''
320 if simulate: return True
321 try:
322 target.mkdir( parents = True, exist_ok = True )
323 except ( OSError, IOError ) as exception:
324 raise _exceptions.CoderResourceCopyFailure(
325 source, target
326 ) from exception
327 try:
328 __.shutil.copytree( source, target, dirs_exist_ok = True )
329 except ( OSError, IOError ) as exception:
330 raise _exceptions.CoderResourceCopyFailure(
331 source, target
332 ) from exception
333 return True