Coverage for sources/agentsmgr/operations.py: 8%

77 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-23 02:37 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core operations for content generation and directory population. 

22 

23 This module provides functions for orchestrating content generation, 

24 including directory population and file writing operations with 

25 simulation support. 

26''' 

27 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import generator as _generator 

32 

33 

34def populate_directory( 

35 generator: _generator.ContentGenerator, 

36 target: __.Path, 

37 simulate: bool = False 

38) -> tuple[ int, int ]: 

39 ''' Generates all content items to target directory. 

40 

41 Orchestrates content generation for all coders and item types 

42 configured in generator. Returns tuple of (items_attempted, 

43 items_written). 

44 ''' 

45 items_attempted = 0 

46 items_written = 0 

47 for coder_name in generator.configuration[ 'coders' ]: 

48 for item_type in ( 'commands', 'agents' ): 

49 attempted, written = generate_coder_item_type( 

50 generator, coder_name, item_type, target, simulate ) 

51 items_attempted += attempted 

52 items_written += written 

53 return ( items_attempted, items_written ) 

54 

55 

56def generate_coder_item_type( 

57 generator: _generator.ContentGenerator, 

58 coder: str, 

59 item_type: str, 

60 target: __.Path, 

61 simulate: bool 

62) -> tuple[ int, int ]: 

63 ''' Generates items of specific type for a coder. 

64 

65 Generates all items (commands or agents) for specified coder by 

66 iterating through configuration files. Returns tuple of 

67 (items_attempted, items_written). 

68 ''' 

69 items_attempted = 0 

70 items_written = 0 

71 if generator.mode == 'nowhere': 

72 return ( items_attempted, items_written ) 

73 configuration_directory = ( 

74 generator.location / 'configurations' / item_type ) 

75 if not configuration_directory.exists( ): 

76 return ( items_attempted, items_written ) 

77 for configuration_file in configuration_directory.glob( '*.toml' ): 

78 items_attempted += 1 

79 result = generator.render_single_item( 

80 item_type, configuration_file.stem, coder, target ) 

81 if save_content( result.content, result.location, simulate ): 

82 items_written += 1 

83 return ( items_attempted, items_written ) 

84 

85 

86def save_content( 

87 content: str, location: __.Path, simulate: bool = False 

88) -> bool: 

89 ''' Saves content to location, creating parent directories as needed. 

90 

91 Writes content to specified location, creating parent directories 

92 if necessary. In simulation mode, no actual writing occurs. 

93 Returns True if file was written, False if simulated. 

94 ''' 

95 if simulate: return False 

96 try: location.parent.mkdir( parents = True, exist_ok = True ) 

97 except ( OSError, IOError ) as exception: 

98 raise _exceptions.FileOperationFailure( 

99 location.parent, "create directory" ) from exception 

100 try: location.write_text( content, encoding = 'utf-8' ) 

101 except ( OSError, IOError ) as exception: 

102 raise _exceptions.FileOperationFailure( 

103 location, "save content" ) from exception 

104 return True 

105 

106 

107def update_git_exclude( 

108 target: __.Path, 

109 symlinks: __.cabc.Sequence[ str ], 

110 simulate: bool = False 

111) -> int: 

112 ''' Updates .git/info/exclude with symlink names if not already present. 

113 

114 Adds symlink names to git exclude file to prevent accidental 

115 commits of generated symlinks. Processes file line-by-line to 

116 preserve existing content and avoid duplicates. 

117 

118 Handles GIT_DIR environment variable and git worktrees by 

119 resolving the actual git directory location and using the common 

120 git directory for shared resources. 

121 

122 Returns count of symlink names added to exclude file. 

123 ''' 

124 if simulate or not symlinks: return 0 

125 git_dir = _resolve_git_directory( target ) 

126 if not git_dir: return 0 

127 exclude_file = git_dir / 'info' / 'exclude' 

128 if not exclude_file.exists( ): return 0 

129 try: content = exclude_file.read_text( encoding = 'utf-8' ) 

130 except ( OSError, IOError ) as exception: 

131 raise _exceptions.FileOperationFailure( 

132 exclude_file, "read git exclude file" ) from exception 

133 existing_lines = content.splitlines( ) 

134 existing_patterns = frozenset( existing_lines ) 

135 additions = [ 

136 symlink for symlink in symlinks 

137 if symlink not in existing_patterns 

138 ] 

139 if not additions: return 0 

140 new_content_lines = existing_lines.copy( ) 

141 if new_content_lines and not new_content_lines[ -1 ].strip( ): 

142 new_content_lines.extend( additions ) 

143 else: 

144 new_content_lines.append( '' ) 

145 new_content_lines.extend( additions ) 

146 new_content = '\n'.join( new_content_lines ) 

147 if not new_content.endswith( '\n' ): new_content += '\n' 

148 try: exclude_file.write_text( new_content, encoding = 'utf-8' ) 

149 except ( OSError, IOError ) as exception: 

150 raise _exceptions.FileOperationFailure( 

151 exclude_file, "update git exclude file" ) from exception 

152 return len( additions ) 

153 

154 

155def _resolve_git_directory( 

156 start_path: __.Path 

157) -> __.typx.Optional[ __.Path ]: 

158 ''' Resolves git directory location, handling GIT_DIR and worktrees. 

159 

160 Checks GIT_DIR environment variable first, then uses Dulwich to 

161 discover repository. Returns common git directory (shared across 

162 worktrees) for access to shared resources like info/exclude. 

163 

164 Returns None if not in a git repository or on error. 

165 ''' 

166 from dulwich.repo import Repo 

167 git_dir_env = __.os.environ.get( 'GIT_DIR' ) 

168 if git_dir_env: 

169 git_dir_path = __.Path( git_dir_env ) 

170 if git_dir_path.exists( ) and git_dir_path.is_dir( ): 

171 return _discover_common_git_directory( git_dir_path ) 

172 try: repo = Repo.discover( str( start_path ) ) 

173 except Exception: return None 

174 git_dir_path = __.Path( repo.controldir( ) ) 

175 return _discover_common_git_directory( git_dir_path ) 

176 

177 

178def _discover_common_git_directory( git_dir: __.Path ) -> __.Path: 

179 ''' Discovers common git directory, handling worktree commondir. 

180 

181 For worktrees, reads commondir file to find shared resources. 

182 For standard repos, returns git_dir unchanged. 

183 ''' 

184 commondir_file = git_dir / 'commondir' 

185 if not commondir_file.exists( ): 

186 return git_dir 

187 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( ) 

188 except ( OSError, IOError ): return git_dir 

189 return ( git_dir / common_path ).resolve( )