Coverage for sources/agentsmgr/sources/git.py: 13%
136 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 00:43 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 00:43 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Git-based source handler with Dulwich.
23 This module provides source resolution for Git repositories, supporting
24 various URL schemes and subdirectory specifications via fragment syntax.
25'''
28import dulwich.porcelain as _dulwich_porcelain
30from . import __
31from . import base as _base
34class GitLocation( __.immut.DataclassObject ):
35 ''' Git source location with URL, optional ref, and optional subdir. '''
36 git_url: str
37 ref: __.typx.Optional[ str ] = None
38 subdir: __.typx.Optional[ str ] = None
41class GitCloneFailure( __.Omnierror, OSError ):
42 ''' Git repository cloning operation failure. '''
44 def __init__( self, git_url: str, reason: str = '' ):
45 self.git_url = git_url
46 self.reason = reason
47 message = f"Failed to clone Git repository: {git_url}"
48 if reason: message = f"{message} ({reason})"
49 super( ).__init__( message )
52class GitSubdirectoryAbsence( __.DataSourceNoSupport ):
53 ''' Git repository subdirectory absence. '''
55 def __init__( self, subdir: str, source_spec: str ):
56 self.subdir = subdir
57 self.source_spec = source_spec
58 message = (
59 f"Subdirectory '{subdir}' not found in repository: {source_spec}" )
60 super( ).__init__( message )
63class GitRefAbsence( __.DataSourceNoSupport ):
64 ''' Git reference absence in repository. '''
66 def __init__( self, ref: str, git_url: str ):
67 self.ref = ref
68 self.git_url = git_url
69 message = f"Git ref '{ref}' not found in repository: {git_url}"
70 super( ).__init__( message )
73@_base.source_handler([
74 'github:', 'gitlab:', 'git+https:',
75 'https://github.com/', 'https://gitlab.com/', 'git@'
76])
77class GitSourceHandler:
78 ''' Handles Git repository source resolution with Dulwich.
80 Supports multiple URL schemes and converts them to Git URLs for
81 cloning. Implements fragment syntax for subdirectory specification.
82 '''
84 def resolve( self, source_spec: str ) -> __.Path:
85 ''' Resolves Git source to local temporary directory.
87 Clones the repository to a temporary location and returns the
88 path to the specified subdirectory or repository root.
89 '''
90 location = self._parse_git_url( source_spec )
91 temp_dir = self._create_temp_directory( )
92 try:
93 self._clone_repository( location, temp_dir )
94 if location.subdir:
95 subdir_path = temp_dir / location.subdir
96 if not subdir_path.exists( ):
97 self._raise_subdir_not_found(
98 location.subdir, source_spec )
99 result_path = subdir_path
100 else:
101 result_path = temp_dir
102 except Exception as exception:
103 # Clean up on failure
104 __.shutil.rmtree( temp_dir, ignore_errors = True )
105 if isinstance( exception, __.DataSourceNoSupport ):
106 raise
107 raise GitCloneFailure(
108 location.git_url, str( exception ) ) from exception
109 else:
110 return result_path
112 def _parse_git_url( self, source_spec: str ) -> GitLocation:
113 ''' Parses source specification into Git URL, ref, and subdirectory.
115 Supports URL scheme mapping and fragment syntax for subdirectory
116 specification. Also supports @ref syntax for Git references.
117 '''
118 url_part = source_spec
119 ref = None
120 subdir = None
121 if '#' in url_part:
122 url_part, subdir = url_part.split( '#', 1 )
123 if '@' in url_part:
124 url_part, ref = url_part.split( '@', 1 )
125 # Map URL schemes to Git URLs
126 if url_part.startswith( 'github:' ):
127 repo_path = url_part[ len( 'github:' ): ]
128 git_url = f"https://github.com/{repo_path}.git"
129 elif url_part.startswith( 'gitlab:' ):
130 repo_path = url_part[ len( 'gitlab:' ): ]
131 git_url = f"https://gitlab.com/{repo_path}.git"
132 elif url_part.startswith( 'git+https:' ):
133 git_url = url_part[ len( 'git+' ): ]
134 elif url_part.startswith( 'https://github.com/' ):
135 # Convert GitHub web URLs to Git URLs
136 if url_part.endswith( '.git' ):
137 git_url = url_part
138 else:
139 git_url = f"{url_part.rstrip( '/' )}.git"
140 elif url_part.startswith( 'https://gitlab.com/' ):
141 # Convert GitLab web URLs to Git URLs
142 if url_part.endswith( '.git' ):
143 git_url = url_part
144 else:
145 git_url = f"{url_part.rstrip( '/' )}.git"
146 else:
147 # Direct git URLs (git@github.com:user/repo.git)
148 git_url = url_part
150 return GitLocation( git_url = git_url, ref = ref, subdir = subdir )
152 def _create_temp_directory( self ) -> __.Path:
153 ''' Creates temporary directory for repository cloning. '''
154 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' )
155 return __.Path( temp_dir )
157 def _clone_repository(
158 self, location: GitLocation, target_dir: __.Path
159 ) -> None:
160 ''' Clones Git repository using Dulwich.
162 Performs shallow clone for default branch or full clone for refs,
163 then checks out the specified reference if provided.
164 '''
165 try:
166 _dulwich_porcelain.clone(
167 location.git_url,
168 str( target_dir ),
169 bare = False,
170 depth = None,
171 )
172 if location.ref is None:
173 latest_tag = self._get_latest_tag( target_dir )
174 if latest_tag:
175 self._checkout_ref( target_dir, latest_tag )
176 else:
177 # Checkout specified ref
178 self._checkout_ref( target_dir, location.ref )
179 except Exception as exception:
180 error_msg = str( exception ).lower( )
181 if location.ref is not None and (
182 'not found' in error_msg or 'does not exist' in error_msg
183 ):
184 raise GitRefAbsence(
185 location.ref, location.git_url ) from exception
186 raise GitCloneFailure(
187 location.git_url, str( exception ) ) from exception
189 def _get_latest_tag( self, repo_dir: __.Path ) -> __.typx.Optional[ str ]:
190 ''' Gets the latest tag from the repository by commit date. '''
191 from dulwich.repo import Repo
192 try:
193 repo = Repo( str( repo_dir ) )
194 except Exception:
195 return None
196 try:
197 tag_refs = repo.refs.as_dict( b"refs/tags" )
198 except Exception:
199 return None
200 if not tag_refs:
201 return None
202 tag_times: list[ tuple[ int, str ] ] = [ ]
203 for tag_name_bytes, commit_sha in tag_refs.items( ):
204 commit = self._get_tag_commit( repo, commit_sha )
205 if commit is not None:
206 tag_name = tag_name_bytes.decode( 'utf-8' )
207 tag_times.append( ( commit.commit_time, tag_name ) )
208 if not tag_times:
209 return None
210 tag_times.sort( reverse = True )
211 return tag_times[ 0 ][ 1 ]
213 def _get_tag_commit(
214 self, repo: __.typx.Any, commit_sha: bytes
215 ) -> __.typx.Any:
216 ''' Gets commit object for a tag, handling annotated tags. '''
217 try:
218 commit = repo[ commit_sha ]
219 while hasattr( commit, 'object' ):
220 commit = repo[ commit.object ]
221 except Exception:
222 return None
223 else:
224 return commit
226 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None:
227 ''' Checks out a specific reference by cloning with branch param. '''
228 from dulwich.repo import Repo
229 try:
230 repo = Repo( str( repo_dir ) )
231 except Exception as exception:
232 raise GitRefAbsence( ref, str( repo_dir ) ) from exception
233 ref_bytes = ref.encode( )
234 tag_ref = f"refs/tags/{ref}".encode( )
235 branch_ref = f"refs/heads/{ref}".encode( )
236 if tag_ref in repo.refs or branch_ref in repo.refs:
237 return
238 try:
239 repo[ ref_bytes ]
240 except KeyError:
241 self._raise_ref_not_found( ref, str( repo_dir ) )
243 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None:
244 ''' Raises GitRefAbsence for invalid reference. '''
245 raise GitRefAbsence( ref, repo_dir )
247 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None:
248 ''' Raises GitSubdirectoryAbsence for missing subdirectory. '''
249 raise GitSubdirectoryAbsence( subdir, source_spec )