Coverage for sources / agentsmgr / operations.py: 8%
168 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 21:55 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 21:55 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Core operations for content generation and directory population.
23 This module provides functions for orchestrating content generation,
24 including directory population and file writing operations with
25 simulation support.
26'''
29from . import __
30from . import exceptions as _exceptions
31from . import generator as _generator
32from . import renderers as _renderers
35_MANAGED_BLOCK_BEGIN = '# BEGIN: Managed by agentsmgr (emcd-agents)'
36_MANAGED_BLOCK_WARNING = '# Do not manually edit entries in this block.'
37_MANAGED_BLOCK_END = '# END: Managed by agentsmgr (emcd-agents)'
40def populate_directory(
41 generator: _generator.ContentGenerator,
42 target: __.Path,
43 simulate: bool = False
44) -> tuple[ int, int ]:
45 ''' Generates all content items to target directory.
47 Orchestrates content generation for all coders and item types
48 configured in generator. Returns tuple of (items_attempted,
49 items_written).
50 '''
51 items_attempted = 0
52 items_written = 0
53 _ensure_output_directories( generator, target, simulate )
54 for coder_name in generator.configuration[ 'coders' ]:
55 try: renderer = _renderers.RENDERERS[ coder_name ]
56 except KeyError: continue
57 for item_type in renderer.item_types_available:
58 attempted, written = generate_coder_item_type(
59 generator, coder_name, item_type, target, simulate )
60 items_attempted += attempted
61 items_written += written
62 return ( items_attempted, items_written )
64def _ensure_output_directories(
65 generator: _generator.ContentGenerator,
66 target: __.Path,
67 simulate: bool,
68) -> None:
69 if simulate or generator.mode == 'nowhere': return
70 for coder_name in generator.configuration[ 'coders' ]:
71 try: renderer = _renderers.RENDERERS[ coder_name ]
72 except KeyError: continue
73 if generator.mode == 'default': actual_mode = renderer.mode_default
74 else: actual_mode = generator.mode
75 if actual_mode not in ( 'per-user', 'per-project' ): continue
76 if actual_mode not in renderer.modes_available: continue
77 base_directory = renderer.resolve_base_directory(
78 mode = actual_mode,
79 target = target,
80 configuration = generator.application_configuration,
81 environment = __.os.environ,
82 )
83 for item_type in renderer.item_types_available:
84 dirname = renderer.produce_output_structure( item_type )
85 ( base_directory / dirname ).mkdir(
86 parents = True, exist_ok = True )
89def _content_exists(
90 generator: _generator.ContentGenerator,
91 item_type: str,
92 item_name: str,
93 coder: str
94) -> bool:
95 ''' Checks if content file exists without loading it.
97 Uses path resolution from ContentGenerator to check both primary
98 and fallback locations. Returns True if content is available.
99 '''
100 primary_path, fallback_path = generator.resolve_content_paths(
101 item_type, item_name, coder )
102 if primary_path.exists( ):
103 return True
104 return bool( fallback_path and fallback_path.exists( ) )
107def generate_coder_item_type(
108 generator: _generator.ContentGenerator,
109 coder: str,
110 item_type: str,
111 target: __.Path,
112 simulate: bool
113) -> tuple[ int, int ]:
114 ''' Generates items of specific type for a coder.
116 Generates all items (commands or agents) for specified coder by
117 iterating through configuration files. Pre-checks content
118 availability and skips items with missing content. Returns tuple
119 of (items_attempted, items_written).
120 '''
121 items_attempted = 0
122 items_written = 0
123 if generator.mode == 'nowhere':
124 return ( items_attempted, items_written )
125 configuration_directory = (
126 generator.location / 'configurations' / item_type )
127 if not configuration_directory.exists( ):
128 return ( items_attempted, items_written )
129 for configuration_file in configuration_directory.glob( '*.toml' ):
130 item_name = configuration_file.stem
131 if not _content_exists( generator, item_type, item_name, coder ):
132 __.provide_scribe( __name__ ).warning(
133 f"Skipping {item_type}/{item_name} for {coder}: "
134 "content not found" )
135 continue
136 items_attempted += 1
137 result = generator.render_single_item(
138 item_type, item_name, coder, target )
139 if save_content( result.content, result.location, simulate ):
140 items_written += 1
141 return ( items_attempted, items_written )
144def save_content(
145 content: str, location: __.Path, simulate: bool = False
146) -> bool:
147 ''' Saves content to location, creating parent directories as needed.
149 Writes content to specified location, creating parent directories
150 if necessary. In simulation mode, no actual writing occurs.
151 Returns True if file was written, False if simulated.
152 '''
153 if simulate: return False
154 try: location.parent.mkdir( parents = True, exist_ok = True )
155 except ( OSError, IOError ) as exception:
156 raise _exceptions.FileOperationFailure(
157 location.parent, "create directory" ) from exception
158 try: location.write_text( content, encoding = 'utf-8' )
159 except ( OSError, IOError ) as exception:
160 raise _exceptions.FileOperationFailure(
161 location, "save content" ) from exception
162 return True
165def update_git_exclude(
166 target: __.Path,
167 entries: __.cabc.Collection[ str ],
168 simulate: bool = False
169) -> int:
170 ''' Updates .git/info/exclude with managed block of agentsmgr entries.
172 Maintains a clearly-marked block of entries managed by agentsmgr,
173 with complete replacement on each update. Entries are sorted
174 lexicographically within the block. User entries outside the
175 managed block are preserved.
177 Handles GIT_DIR environment variable and git worktrees by
178 resolving the actual git directory location and using the common
179 git directory for shared resources.
181 Returns count of entries in managed block.
182 '''
183 if simulate: return 0
184 git_dir = _resolve_git_directory( target )
185 if not git_dir: return 0
186 exclude_file = git_dir / 'info' / 'exclude'
187 if not exclude_file.exists( ): return 0
188 try: content = exclude_file.read_text( encoding = 'utf-8' )
189 except ( OSError, IOError ) as exception:
190 raise _exceptions.FileOperationFailure(
191 exclude_file, "read git exclude file" ) from exception
192 normalized_entries = sorted( {
193 (
194 "/{0}".format( entry.strip( ) )
195 if entry.strip( ) and not entry.strip( ).startswith( '/' )
196 else entry.strip( )
197 )
198 for entry in entries if entry.strip( )
199 } )
200 if not normalized_entries:
201 new_content = _remove_managed_block( content )
202 if new_content == content: return 0
203 else:
204 new_content = _update_managed_block( content, normalized_entries )
205 try: exclude_file.write_text( new_content, encoding = 'utf-8' )
206 except ( OSError, IOError ) as exception:
207 raise _exceptions.FileOperationFailure(
208 exclude_file, "update git exclude file" ) from exception
209 return len( normalized_entries )
212def _update_managed_block(
213 content: str, entries: __.cabc.Sequence[ str ]
214) -> str:
215 ''' Updates content with new managed block containing sorted entries.
217 Locates existing managed block (if present) and replaces it with
218 new block. If no block exists, appends to end of file. Preserves
219 user content outside the managed block.
220 '''
221 lines = content.splitlines( )
222 before_block, after_block = _partition_around_managed_block( lines )
223 block_lines = [ _MANAGED_BLOCK_BEGIN, _MANAGED_BLOCK_WARNING ]
224 block_lines.extend( entries )
225 block_lines.append( _MANAGED_BLOCK_END )
226 if before_block and before_block[ -1 ].strip( ):
227 before_block.append( '' )
228 result_lines = before_block + block_lines
229 if after_block:
230 result_lines.append( '' )
231 result_lines.extend( after_block )
232 return '\n'.join( result_lines ) + '\n'
235def _remove_managed_block( content: str ) -> str:
236 ''' Removes managed block from content, preserving user entries.
238 Locates and removes managed block if present. Returns content
239 unchanged if no block found.
240 '''
241 lines = content.splitlines( )
242 before_block, after_block = _partition_around_managed_block( lines )
243 if not before_block and not after_block:
244 return content
245 result_lines = before_block
246 if result_lines and after_block:
247 if result_lines[ -1 ].strip( ):
248 result_lines.append( '' )
249 result_lines.extend( after_block )
250 elif after_block:
251 result_lines = after_block
252 if not result_lines: return ''
253 return '\n'.join( result_lines ) + '\n'
256def _partition_around_managed_block(
257 lines: __.cabc.Sequence[ str ]
258) -> tuple[ list[ str ], list[ str ] ]:
259 ''' Partitions lines into content before and after managed block.
261 Locates managed block markers and returns (before, after) tuple.
262 If block is malformed or not found, returns (all_lines, []).
263 Malformed blocks are treated as non-existent.
264 '''
265 try: begin_index = lines.index( _MANAGED_BLOCK_BEGIN )
266 except ValueError: return ( list( lines ), [ ] )
267 try: end_index = lines.index( _MANAGED_BLOCK_END, begin_index )
268 except ValueError:
269 __.provide_scribe( __name__ ).warning(
270 "Malformed agentsmgr block in .git/info/exclude; rebuilding." )
271 return ( list( lines ), [ ] )
272 if end_index < begin_index:
273 __.provide_scribe( __name__ ).warning(
274 "Malformed agentsmgr block in .git/info/exclude; rebuilding." )
275 return ( list( lines ), [ ] )
276 before_block = list( lines[ :begin_index ] )
277 while before_block and not before_block[ -1 ].strip( ):
278 before_block.pop( )
279 after_block = list( lines[ end_index + 1: ] )
280 while after_block and not after_block[ 0 ].strip( ):
281 after_block.pop( 0 )
282 return ( before_block, after_block )
284def _resolve_git_directory(
285 start_path: __.Path
286) -> __.typx.Optional[ __.Path ]:
287 ''' Resolves git directory location, handling GIT_DIR and worktrees.
289 Checks GIT_DIR environment variable first, then uses Dulwich to
290 discover repository. Returns common git directory (shared across
291 worktrees) for access to shared resources like info/exclude.
293 Returns None if not in a git repository or on error.
294 '''
295 from dulwich.repo import Repo
296 git_dir_env = __.os.environ.get( 'GIT_DIR' )
297 if git_dir_env:
298 git_dir_path = __.Path( git_dir_env )
299 if git_dir_path.exists( ) and git_dir_path.is_dir( ):
300 return _discover_common_git_directory( git_dir_path )
301 try: repo = Repo.discover( str( start_path ) )
302 except Exception: return None
303 git_dir_path = __.Path( repo.controldir( ) )
304 return _discover_common_git_directory( git_dir_path )
306def _discover_common_git_directory( git_dir: __.Path ) -> __.Path:
307 ''' Discovers common git directory, handling worktree commondir.
309 For worktrees, reads commondir file to find shared resources.
310 For standard repos, returns git_dir unchanged.
311 '''
312 commondir_file = git_dir / 'commondir'
313 if not commondir_file.exists( ):
314 return git_dir
315 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( )
316 except ( OSError, IOError ): return git_dir
317 return ( git_dir / common_path ).resolve( )
319def copy_coder_resources(
320 source_root: __.Path,
321 target_root: __.Path,
322 coders: __.cabc.Sequence[ str ],
323 simulate: bool = False
324) -> tuple[ int, int ]:
325 ''' Copies static resources for specified coders.
327 Iterates through coders and copies resources from
328 source_root/<coder> to target_root/<coder>.
329 Returns tuple of (coders_attempted, coders_processed).
330 '''
331 attempted = 0
332 processed = 0
333 for coder in coders:
334 source = source_root / coder
335 target = target_root / coder
336 if not source.exists( ):
337 __.provide_scribe( __name__ ).debug(
338 f"No resources found for {coder} at {source}" )
339 continue
340 attempted += 1
341 if copy_resource_content( source, target, simulate ):
342 processed += 1
343 return ( attempted, processed )
345def copy_resource_content(
346 source: __.Path, target: __.Path, simulate: bool
347) -> bool:
348 ''' Recursively copies directory contents.
350 Copies all files and subdirectories from source to target,
351 overwriting existing files. Handles directory creation.
352 '''
353 if simulate: return True
354 try:
355 target.mkdir( parents = True, exist_ok = True )
356 except ( OSError, IOError ) as exception:
357 raise _exceptions.CoderResourceCopyFailure(
358 source, target
359 ) from exception
360 try:
361 __.shutil.copytree( source, target, dirs_exist_ok = True )
362 except ( OSError, IOError ) as exception:
363 raise _exceptions.CoderResourceCopyFailure(
364 source, target
365 ) from exception
366 return True