Coverage for sources / agentsmgr / operations.py: 8%
151 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-01 15:37 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-01 15:37 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core operations for content generation and directory population.
23 This module provides functions for orchestrating content generation,
24 including directory population and file writing operations with
25 simulation support.
26'''
29from . import __
30from . import exceptions as _exceptions
31from . import generator as _generator
34_MANAGED_BLOCK_BEGIN = '# BEGIN: Managed by agentsmgr (emcd-agents)'
35_MANAGED_BLOCK_WARNING = '# Do not manually edit entries in this block.'
36_MANAGED_BLOCK_END = '# END: Managed by agentsmgr (emcd-agents)'
39def populate_directory(
40 generator: _generator.ContentGenerator,
41 target: __.Path,
42 simulate: bool = False
43) -> tuple[ int, int ]:
44 ''' Generates all content items to target directory.
46 Orchestrates content generation for all coders and item types
47 configured in generator. Returns tuple of (items_attempted,
48 items_written).
49 '''
50 items_attempted = 0
51 items_written = 0
52 for coder_name in generator.configuration[ 'coders' ]:
53 for item_type in ( 'commands', 'agents' ):
54 attempted, written = generate_coder_item_type(
55 generator, coder_name, item_type, target, simulate )
56 items_attempted += attempted
57 items_written += written
58 return ( items_attempted, items_written )
61def _content_exists(
62 generator: _generator.ContentGenerator,
63 item_type: str,
64 item_name: str,
65 coder: str
66) -> bool:
67 ''' Checks if content file exists without loading it.
69 Uses path resolution from ContentGenerator to check both primary
70 and fallback locations. Returns True if content is available.
71 '''
72 primary_path, fallback_path = generator.resolve_content_paths(
73 item_type, item_name, coder )
74 if primary_path.exists( ):
75 return True
76 return bool( fallback_path and fallback_path.exists( ) )
79def generate_coder_item_type(
80 generator: _generator.ContentGenerator,
81 coder: str,
82 item_type: str,
83 target: __.Path,
84 simulate: bool
85) -> tuple[ int, int ]:
86 ''' Generates items of specific type for a coder.
88 Generates all items (commands or agents) for specified coder by
89 iterating through configuration files. Pre-checks content
90 availability and skips items with missing content. Returns tuple
91 of (items_attempted, items_written).
92 '''
93 items_attempted = 0
94 items_written = 0
95 if generator.mode == 'nowhere':
96 return ( items_attempted, items_written )
97 configuration_directory = (
98 generator.location / 'configurations' / item_type )
99 if not configuration_directory.exists( ):
100 return ( items_attempted, items_written )
101 for configuration_file in configuration_directory.glob( '*.toml' ):
102 item_name = configuration_file.stem
103 if not _content_exists( generator, item_type, item_name, coder ):
104 __.provide_scribe( __name__ ).warning(
105 f"Skipping {item_type}/{item_name} for {coder}: "
106 "content not found" )
107 continue
108 items_attempted += 1
109 result = generator.render_single_item(
110 item_type, item_name, coder, target )
111 if save_content( result.content, result.location, simulate ):
112 items_written += 1
113 return ( items_attempted, items_written )
116def save_content(
117 content: str, location: __.Path, simulate: bool = False
118) -> bool:
119 ''' Saves content to location, creating parent directories as needed.
121 Writes content to specified location, creating parent directories
122 if necessary. In simulation mode, no actual writing occurs.
123 Returns True if file was written, False if simulated.
124 '''
125 if simulate: return False
126 try: location.parent.mkdir( parents = True, exist_ok = True )
127 except ( OSError, IOError ) as exception:
128 raise _exceptions.FileOperationFailure(
129 location.parent, "create directory" ) from exception
130 try: location.write_text( content, encoding = 'utf-8' )
131 except ( OSError, IOError ) as exception:
132 raise _exceptions.FileOperationFailure(
133 location, "save content" ) from exception
134 return True
137def update_git_exclude(
138 target: __.Path,
139 entries: __.cabc.Collection[ str ],
140 simulate: bool = False
141) -> int:
142 ''' Updates .git/info/exclude with managed block of agentsmgr entries.
144 Maintains a clearly-marked block of entries managed by agentsmgr,
145 with complete replacement on each update. Entries are sorted
146 lexicographically within the block. User entries outside the
147 managed block are preserved.
149 Handles GIT_DIR environment variable and git worktrees by
150 resolving the actual git directory location and using the common
151 git directory for shared resources.
153 Returns count of entries in managed block.
154 '''
155 if simulate: return 0
156 git_dir = _resolve_git_directory( target )
157 if not git_dir: return 0
158 exclude_file = git_dir / 'info' / 'exclude'
159 if not exclude_file.exists( ): return 0
160 try: content = exclude_file.read_text( encoding = 'utf-8' )
161 except ( OSError, IOError ) as exception:
162 raise _exceptions.FileOperationFailure(
163 exclude_file, "read git exclude file" ) from exception
164 normalized_entries = sorted( {
165 (
166 "/{0}".format( entry.strip( ) )
167 if entry.strip( ) and not entry.strip( ).startswith( '/' )
168 else entry.strip( )
169 )
170 for entry in entries if entry.strip( )
171 } )
172 if not normalized_entries:
173 new_content = _remove_managed_block( content )
174 if new_content == content: return 0
175 else:
176 new_content = _update_managed_block( content, normalized_entries )
177 try: exclude_file.write_text( new_content, encoding = 'utf-8' )
178 except ( OSError, IOError ) as exception:
179 raise _exceptions.FileOperationFailure(
180 exclude_file, "update git exclude file" ) from exception
181 return len( normalized_entries )
184def _update_managed_block(
185 content: str, entries: __.cabc.Sequence[ str ]
186) -> str:
187 ''' Updates content with new managed block containing sorted entries.
189 Locates existing managed block (if present) and replaces it with
190 new block. If no block exists, appends to end of file. Preserves
191 user content outside the managed block.
192 '''
193 lines = content.splitlines( )
194 before_block, after_block = _partition_around_managed_block( lines )
195 block_lines = [ _MANAGED_BLOCK_BEGIN, _MANAGED_BLOCK_WARNING ]
196 block_lines.extend( entries )
197 block_lines.append( _MANAGED_BLOCK_END )
198 if before_block and before_block[ -1 ].strip( ):
199 before_block.append( '' )
200 result_lines = before_block + block_lines
201 if after_block:
202 result_lines.append( '' )
203 result_lines.extend( after_block )
204 return '\n'.join( result_lines ) + '\n'
207def _remove_managed_block( content: str ) -> str:
208 ''' Removes managed block from content, preserving user entries.
210 Locates and removes managed block if present. Returns content
211 unchanged if no block found.
212 '''
213 lines = content.splitlines( )
214 before_block, after_block = _partition_around_managed_block( lines )
215 if not before_block and not after_block:
216 return content
217 result_lines = before_block
218 if result_lines and after_block:
219 if result_lines[ -1 ].strip( ):
220 result_lines.append( '' )
221 result_lines.extend( after_block )
222 elif after_block:
223 result_lines = after_block
224 if not result_lines: return ''
225 return '\n'.join( result_lines ) + '\n'
228def _partition_around_managed_block(
229 lines: __.cabc.Sequence[ str ]
230) -> tuple[ list[ str ], list[ str ] ]:
231 ''' Partitions lines into content before and after managed block.
233 Locates managed block markers and returns (before, after) tuple.
234 If block is malformed or not found, returns (all_lines, []).
235 Malformed blocks are treated as non-existent.
236 '''
237 try: begin_index = lines.index( _MANAGED_BLOCK_BEGIN )
238 except ValueError: return ( list( lines ), [ ] )
239 try: end_index = lines.index( _MANAGED_BLOCK_END, begin_index )
240 except ValueError:
241 __.provide_scribe( __name__ ).warning(
242 "Malformed agentsmgr block in .git/info/exclude; rebuilding." )
243 return ( list( lines ), [ ] )
244 if end_index < begin_index:
245 __.provide_scribe( __name__ ).warning(
246 "Malformed agentsmgr block in .git/info/exclude; rebuilding." )
247 return ( list( lines ), [ ] )
248 before_block = list( lines[ :begin_index ] )
249 while before_block and not before_block[ -1 ].strip( ):
250 before_block.pop( )
251 after_block = list( lines[ end_index + 1: ] )
252 while after_block and not after_block[ 0 ].strip( ):
253 after_block.pop( 0 )
254 return ( before_block, after_block )
256def _resolve_git_directory(
257 start_path: __.Path
258) -> __.typx.Optional[ __.Path ]:
259 ''' Resolves git directory location, handling GIT_DIR and worktrees.
261 Checks GIT_DIR environment variable first, then uses Dulwich to
262 discover repository. Returns common git directory (shared across
263 worktrees) for access to shared resources like info/exclude.
265 Returns None if not in a git repository or on error.
266 '''
267 from dulwich.repo import Repo
268 git_dir_env = __.os.environ.get( 'GIT_DIR' )
269 if git_dir_env:
270 git_dir_path = __.Path( git_dir_env )
271 if git_dir_path.exists( ) and git_dir_path.is_dir( ):
272 return _discover_common_git_directory( git_dir_path )
273 try: repo = Repo.discover( str( start_path ) )
274 except Exception: return None
275 git_dir_path = __.Path( repo.controldir( ) )
276 return _discover_common_git_directory( git_dir_path )
278def _discover_common_git_directory( git_dir: __.Path ) -> __.Path:
279 ''' Discovers common git directory, handling worktree commondir.
281 For worktrees, reads commondir file to find shared resources.
282 For standard repos, returns git_dir unchanged.
283 '''
284 commondir_file = git_dir / 'commondir'
285 if not commondir_file.exists( ):
286 return git_dir
287 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( )
288 except ( OSError, IOError ): return git_dir
289 return ( git_dir / common_path ).resolve( )
291def copy_coder_resources(
292 source_root: __.Path,
293 target_root: __.Path,
294 coders: __.cabc.Sequence[ str ],
295 simulate: bool = False
296) -> tuple[ int, int ]:
297 ''' Copies static resources for specified coders.
299 Iterates through coders and copies resources from
300 source_root/<coder> to target_root/<coder>.
301 Returns tuple of (coders_attempted, coders_processed).
302 '''
303 attempted = 0
304 processed = 0
305 for coder in coders:
306 source = source_root / coder
307 target = target_root / coder
308 if not source.exists( ):
309 __.provide_scribe( __name__ ).debug(
310 f"No resources found for {coder} at {source}" )
311 continue
312 attempted += 1
313 if copy_resource_content( source, target, simulate ):
314 processed += 1
315 return ( attempted, processed )
317def copy_resource_content(
318 source: __.Path, target: __.Path, simulate: bool
319) -> bool:
320 ''' Recursively copies directory contents.
322 Copies all files and subdirectories from source to target,
323 overwriting existing files. Handles directory creation.
324 '''
325 if simulate: return True
326 try:
327 target.mkdir( parents = True, exist_ok = True )
328 except ( OSError, IOError ) as exception:
329 raise _exceptions.CoderResourceCopyFailure(
330 source, target
331 ) from exception
332 try:
333 __.shutil.copytree( source, target, dirs_exist_ok = True )
334 except ( OSError, IOError ) as exception:
335 raise _exceptions.CoderResourceCopyFailure(
336 source, target
337 ) from exception
338 return True