Coverage for sources/agentsmgr/operations.py: 8%

86 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-24 01:49 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Core operations for content generation and directory population. 

22 

23 This module provides functions for orchestrating content generation, 

24 including directory population and file writing operations with 

25 simulation support. 

26''' 

27 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import generator as _generator 

32 

33 

34def populate_directory( 

35 generator: _generator.ContentGenerator, 

36 target: __.Path, 

37 simulate: bool = False 

38) -> tuple[ int, int ]: 

39 ''' Generates all content items to target directory. 

40 

41 Orchestrates content generation for all coders and item types 

42 configured in generator. Returns tuple of (items_attempted, 

43 items_written). 

44 ''' 

45 items_attempted = 0 

46 items_written = 0 

47 for coder_name in generator.configuration[ 'coders' ]: 

48 for item_type in ( 'commands', 'agents' ): 

49 attempted, written = generate_coder_item_type( 

50 generator, coder_name, item_type, target, simulate ) 

51 items_attempted += attempted 

52 items_written += written 

53 return ( items_attempted, items_written ) 

54 

55 

56def _content_exists( 

57 generator: _generator.ContentGenerator, 

58 item_type: str, 

59 item_name: str, 

60 coder: str 

61) -> bool: 

62 ''' Checks if content file exists without loading it. 

63 

64 Uses path resolution from ContentGenerator to check both primary 

65 and fallback locations. Returns True if content is available. 

66 ''' 

67 primary_path, fallback_path = generator.resolve_content_paths( 

68 item_type, item_name, coder ) 

69 if primary_path.exists( ): 

70 return True 

71 return bool( fallback_path and fallback_path.exists( ) ) 

72 

73 

74def generate_coder_item_type( 

75 generator: _generator.ContentGenerator, 

76 coder: str, 

77 item_type: str, 

78 target: __.Path, 

79 simulate: bool 

80) -> tuple[ int, int ]: 

81 ''' Generates items of specific type for a coder. 

82 

83 Generates all items (commands or agents) for specified coder by 

84 iterating through configuration files. Pre-checks content 

85 availability and skips items with missing content. Returns tuple 

86 of (items_attempted, items_written). 

87 ''' 

88 items_attempted = 0 

89 items_written = 0 

90 if generator.mode == 'nowhere': 

91 return ( items_attempted, items_written ) 

92 configuration_directory = ( 

93 generator.location / 'configurations' / item_type ) 

94 if not configuration_directory.exists( ): 

95 return ( items_attempted, items_written ) 

96 for configuration_file in configuration_directory.glob( '*.toml' ): 

97 item_name = configuration_file.stem 

98 if not _content_exists( generator, item_type, item_name, coder ): 

99 __.provide_scribe( __name__ ).warning( 

100 f"Skipping {item_type}/{item_name} for {coder}: " 

101 "content not found" ) 

102 continue 

103 items_attempted += 1 

104 result = generator.render_single_item( 

105 item_type, item_name, coder, target ) 

106 if save_content( result.content, result.location, simulate ): 

107 items_written += 1 

108 return ( items_attempted, items_written ) 

109 

110 

111def save_content( 

112 content: str, location: __.Path, simulate: bool = False 

113) -> bool: 

114 ''' Saves content to location, creating parent directories as needed. 

115 

116 Writes content to specified location, creating parent directories 

117 if necessary. In simulation mode, no actual writing occurs. 

118 Returns True if file was written, False if simulated. 

119 ''' 

120 if simulate: return False 

121 try: location.parent.mkdir( parents = True, exist_ok = True ) 

122 except ( OSError, IOError ) as exception: 

123 raise _exceptions.FileOperationFailure( 

124 location.parent, "create directory" ) from exception 

125 try: location.write_text( content, encoding = 'utf-8' ) 

126 except ( OSError, IOError ) as exception: 

127 raise _exceptions.FileOperationFailure( 

128 location, "save content" ) from exception 

129 return True 

130 

131 

132def update_git_exclude( 

133 target: __.Path, 

134 symlinks: __.cabc.Sequence[ str ], 

135 simulate: bool = False 

136) -> int: 

137 ''' Updates .git/info/exclude with symlink names if not already present. 

138 

139 Adds symlink names to git exclude file to prevent accidental 

140 commits of generated symlinks. Processes file line-by-line to 

141 preserve existing content and avoid duplicates. 

142 

143 Handles GIT_DIR environment variable and git worktrees by 

144 resolving the actual git directory location and using the common 

145 git directory for shared resources. 

146 

147 Returns count of symlink names added to exclude file. 

148 ''' 

149 if simulate or not symlinks: return 0 

150 git_dir = _resolve_git_directory( target ) 

151 if not git_dir: return 0 

152 exclude_file = git_dir / 'info' / 'exclude' 

153 if not exclude_file.exists( ): return 0 

154 try: content = exclude_file.read_text( encoding = 'utf-8' ) 

155 except ( OSError, IOError ) as exception: 

156 raise _exceptions.FileOperationFailure( 

157 exclude_file, "read git exclude file" ) from exception 

158 existing_lines = content.splitlines( ) 

159 existing_patterns = frozenset( existing_lines ) 

160 additions = [ 

161 symlink for symlink in symlinks 

162 if symlink not in existing_patterns 

163 ] 

164 if not additions: return 0 

165 new_content_lines = existing_lines.copy( ) 

166 if new_content_lines and not new_content_lines[ -1 ].strip( ): 

167 new_content_lines.extend( additions ) 

168 else: 

169 new_content_lines.append( '' ) 

170 new_content_lines.extend( additions ) 

171 new_content = '\n'.join( new_content_lines ) 

172 if not new_content.endswith( '\n' ): new_content += '\n' 

173 try: exclude_file.write_text( new_content, encoding = 'utf-8' ) 

174 except ( OSError, IOError ) as exception: 

175 raise _exceptions.FileOperationFailure( 

176 exclude_file, "update git exclude file" ) from exception 

177 return len( additions ) 

178 

179 

180def _resolve_git_directory( 

181 start_path: __.Path 

182) -> __.typx.Optional[ __.Path ]: 

183 ''' Resolves git directory location, handling GIT_DIR and worktrees. 

184 

185 Checks GIT_DIR environment variable first, then uses Dulwich to 

186 discover repository. Returns common git directory (shared across 

187 worktrees) for access to shared resources like info/exclude. 

188 

189 Returns None if not in a git repository or on error. 

190 ''' 

191 from dulwich.repo import Repo 

192 git_dir_env = __.os.environ.get( 'GIT_DIR' ) 

193 if git_dir_env: 

194 git_dir_path = __.Path( git_dir_env ) 

195 if git_dir_path.exists( ) and git_dir_path.is_dir( ): 

196 return _discover_common_git_directory( git_dir_path ) 

197 try: repo = Repo.discover( str( start_path ) ) 

198 except Exception: return None 

199 git_dir_path = __.Path( repo.controldir( ) ) 

200 return _discover_common_git_directory( git_dir_path ) 

201 

202 

203def _discover_common_git_directory( git_dir: __.Path ) -> __.Path: 

204 ''' Discovers common git directory, handling worktree commondir. 

205 

206 For worktrees, reads commondir file to find shared resources. 

207 For standard repos, returns git_dir unchanged. 

208 ''' 

209 commondir_file = git_dir / 'commondir' 

210 if not commondir_file.exists( ): 

211 return git_dir 

212 try: common_path = commondir_file.read_text( encoding = 'utf-8' ).strip( ) 

213 except ( OSError, IOError ): return git_dir 

214 return ( git_dir / common_path ).resolve( )