Coverage for sources / agentsmgr / sources / git.py: 11%
260 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 21:55 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 21:55 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Git-based source handler with Dulwich.
23 This module provides source resolution for Git repositories, supporting
24 various URL schemes and subdirectory specifications via fragment syntax.
25'''
28import dulwich.porcelain as _dulwich_porcelain
30from . import __
31from . import base as _base
34GitApiTag: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
37_scribe = __.provide_scribe( __name__ )
40class GitLocation( __.immut.DataclassObject ):
41 ''' Git source location with URL, optional ref, and optional subdir. '''
42 git_url: str
43 ref: __.typx.Optional[ str ] = None
44 subdir: __.typx.Optional[ str ] = None
47class GitCloneFailure( __.Omnierror, OSError ):
48 ''' Git repository cloning operation failure. '''
50 def __init__( self, git_url: str, reason: str = '' ):
51 self.git_url = git_url
52 self.reason = reason
53 message = f"Failed to clone Git repository: {git_url}"
54 if reason: message = f"{message} ({reason})"
55 super( ).__init__( message )
58class GitSubdirectoryAbsence( __.DataSourceNoSupport ):
59 ''' Git repository subdirectory absence. '''
61 def __init__( self, subdir: str, source_spec: str ):
62 self.subdir = subdir
63 self.source_spec = source_spec
64 message = (
65 f"Subdirectory '{subdir}' not found in repository: {source_spec}" )
66 super( ).__init__( message )
69class GitRefAbsence( __.DataSourceNoSupport ):
70 ''' Git reference absence in repository. '''
72 def __init__( self, ref: str, git_url: str ):
73 self.ref = ref
74 self.git_url = git_url
75 message = f"Git ref '{ref}' not found in repository: {git_url}"
76 super( ).__init__( message )
79@_base.source_handler([
80 'github', 'gitlab', 'git+https', 'https', 'git@'
81])
82class GitSourceHandler:
83 ''' Handles Git repository source resolution with Dulwich.
85 Supports multiple URL schemes and converts them to Git URLs for
86 cloning. Implements fragment syntax for subdirectory specification.
87 '''
89 def resolve(
90 self,
91 source_spec: str,
92 tag_prefix: __.typx.Annotated[
93 __.Absential[ str ],
94 __.ddoc.Doc(
95 "Prefix for filtering version tags when no explicit ref "
96 "is specified. Only tags starting with this prefix will be "
97 "considered, and the prefix will be stripped before version "
98 "parsing." ),
99 ] = __.absent,
100 ) -> __.Path:
101 ''' Resolves Git source to local temporary directory.
103 Clones the repository to a temporary location and returns the
104 path to the specified subdirectory or repository root.
105 '''
106 location = self._parse_git_url( source_spec )
107 temp_dir = self._create_temp_directory( )
108 try:
109 self._clone_repository( location, temp_dir, tag_prefix )
110 if location.subdir:
111 subdir_path = temp_dir / location.subdir
112 if not subdir_path.exists( ):
113 self._raise_subdir_not_found(
114 location.subdir, source_spec )
115 result_path = subdir_path
116 else:
117 result_path = temp_dir
118 except Exception as exception:
119 # Clean up on failure
120 __.shutil.rmtree( temp_dir, ignore_errors = True )
121 if isinstance( exception, __.DataSourceNoSupport ):
122 raise
123 raise GitCloneFailure(
124 location.git_url, str( exception ) ) from exception
125 else:
126 return result_path
128 def _parse_git_url( self, source_spec: str ) -> GitLocation:
129 ''' Parses source specification into Git URL, ref, and subdirectory.
131 Supports URL scheme mapping and fragment syntax for subdirectory
132 specification. Also supports @ref syntax for Git references.
133 '''
134 url_part = source_spec
135 ref = None
136 subdir = None
137 if '#' in url_part:
138 url_part, subdir = url_part.split( '#', 1 )
139 if '@' in url_part:
140 url_part, ref = url_part.split( '@', 1 )
141 # Map URL schemes to Git URLs
142 if url_part.startswith( 'github:' ):
143 repo_path = url_part[ len( 'github:' ): ]
144 git_url = f"https://github.com/{repo_path}.git"
145 elif url_part.startswith( 'gitlab:' ):
146 repo_path = url_part[ len( 'gitlab:' ): ]
147 git_url = f"https://gitlab.com/{repo_path}.git"
148 elif url_part.startswith( 'git+https:' ):
149 git_url = url_part[ len( 'git+' ): ]
150 elif url_part.startswith( 'https://github.com/' ):
151 # Convert GitHub web URLs to Git URLs
152 if url_part.endswith( '.git' ):
153 git_url = url_part
154 else:
155 git_url = f"{url_part.rstrip( '/' )}.git"
156 elif url_part.startswith( 'https://gitlab.com/' ):
157 # Convert GitLab web URLs to Git URLs
158 if url_part.endswith( '.git' ):
159 git_url = url_part
160 else:
161 git_url = f"{url_part.rstrip( '/' )}.git"
162 else:
163 # Direct git URLs (git@github.com:user/repo.git)
164 git_url = url_part
165 return GitLocation( git_url = git_url, ref = ref, subdir = subdir )
167 def _create_temp_directory( self ) -> __.Path:
168 ''' Creates temporary directory for repository cloning. '''
169 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' )
170 return __.Path( temp_dir )
172 def _clone_repository(
173 self,
174 location: GitLocation,
175 target_dir: __.Path,
176 tag_prefix: __.Absential[ str ] = __.absent,
177 ) -> None:
178 ''' Clones Git repository using Dulwich with optimizations.
180 For GitHub/GitLab repositories without explicit ref, attempts
181 API-based tag resolution followed by shallow clone. Falls back
182 to standard full clone on any failure.
183 '''
184 try:
185 if location.ref is None:
186 cloned = self._attempt_optimized_clone(
187 location, target_dir, tag_prefix )
188 if cloned: return
189 self._perform_standard_clone( location, target_dir, tag_prefix )
190 except Exception as exception:
191 error_msg = str( exception ).lower( )
192 if location.ref is not None and (
193 'not found' in error_msg or 'does not exist' in error_msg
194 ):
195 raise GitRefAbsence(
196 location.ref, location.git_url ) from exception
197 raise GitCloneFailure(
198 location.git_url, str( exception ) ) from exception
200 def _attempt_optimized_clone(
201 self,
202 location: GitLocation,
203 target_dir: __.Path,
204 tag_prefix: __.Absential[ str ] = __.absent,
205 ) -> bool:
206 ''' Attempts optimized clone using API and shallow clone.
208 Returns True if successful, False if optimization should fall
209 back to standard clone.
210 '''
211 latest_tag = self._resolve_latest_tag_via_api(
212 location.git_url, tag_prefix )
213 if latest_tag is None: return False
214 _scribe.info(
215 f"Resolved latest tag '{latest_tag}' via API for repository: "
216 f"{location.git_url}" )
217 try:
218 self._perform_shallow_clone(
219 location.git_url, target_dir, latest_tag )
220 except Exception:
221 _scribe.info(
222 f"Shallow clone failed, falling back to standard clone for "
223 f"repository: {location.git_url}" )
224 return False
225 else:
226 _scribe.info(
227 f"Performed shallow clone for tag '{latest_tag}' in "
228 f"repository: {location.git_url}" )
229 return True
231 def _perform_shallow_clone(
232 self, git_url: str, target_dir: __.Path, ref: str
233 ) -> None:
234 ''' Performs shallow clone of specific ref using Dulwich.
236 Uses depth=1 and branch parameters for efficient cloning.
237 '''
238 with open( __.os.devnull, 'wb' ) as devnull:
239 _dulwich_porcelain.clone(
240 git_url,
241 str( target_dir ),
242 bare = False,
243 depth = 1,
244 branch = ref.encode( ),
245 errstream = devnull,
246 )
248 def _perform_standard_clone(
249 self,
250 location: GitLocation,
251 target_dir: __.Path,
252 tag_prefix: __.Absential[ str ] = __.absent,
253 ) -> None:
254 ''' Performs standard full clone with optional ref checkout.
256 This is the fallback path for repositories that cannot use
257 API optimization or when explicit ref is provided.
258 '''
259 with open( __.os.devnull, 'wb' ) as devnull:
260 _dulwich_porcelain.clone(
261 location.git_url,
262 str( target_dir ),
263 bare = False,
264 depth = None,
265 errstream = devnull,
266 )
267 if location.ref is None:
268 latest_tag = self._get_latest_tag( target_dir, tag_prefix )
269 if latest_tag:
270 _scribe.info(
271 f"Selected latest tag '{latest_tag}' for repository: "
272 f"{location.git_url}" )
273 self._checkout_ref( target_dir, latest_tag )
274 else:
275 _scribe.info(
276 f"No version tags found, using default branch for "
277 f"repository: {location.git_url}" )
278 else:
279 _scribe.info(
280 f"Using explicit ref '{location.ref}' for repository: "
281 f"{location.git_url}" )
282 self._checkout_ref( target_dir, location.ref )
284 def _extract_version(
285 self,
286 tag_name: str,
287 prefix: __.Absential[ str ] = __.absent,
288 ) -> __.typx.Optional[ __.Version ]:
289 ''' Extracts and parses semantic version from tag name.
291 If prefix is provided, only processes tags that start with the
292 prefix and strips it before parsing. If prefix is absent, tries
293 parsing the tag name directly. Returns None if tag cannot be
294 parsed as a valid semantic version.
295 '''
296 version_string = tag_name
297 if not __.is_absent( prefix ):
298 if not tag_name.startswith( prefix ):
299 return None
300 version_string = tag_name[ len( prefix ): ]
301 try:
302 return __.Version( version_string )
303 except __.InvalidVersion:
304 return None
306 def _get_latest_tag(
307 self,
308 repo_dir: __.Path,
309 tag_prefix: __.Absential[ str ] = __.absent,
310 ) -> __.typx.Optional[ str ]:
311 ''' Gets the latest tag from the repository by semantic version.
313 Optionally filters tags by prefix before selecting latest.
314 Uses packaging.version.Version for semantic comparison. If no
315 tags can be parsed as versions, returns None (falls back to
316 default branch).
317 '''
318 from dulwich.repo import Repo
319 try:
320 repo = Repo( str( repo_dir ) )
321 except Exception:
322 return None
323 try:
324 tag_refs = repo.refs.as_dict( b"refs/tags" )
325 except Exception:
326 return None
327 if not tag_refs:
328 return None
329 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ]
330 for tag_name_bytes, commit_sha in tag_refs.items( ):
331 commit = self._get_tag_commit( repo, commit_sha )
332 if commit is not None:
333 tag_name = tag_name_bytes.decode( 'utf-8' )
334 version = self._extract_version( tag_name, tag_prefix )
335 if version is not None:
336 versioned_tags.append( ( version, tag_name ) )
337 if versioned_tags:
338 versioned_tags.sort( reverse = True )
339 return versioned_tags[ 0 ][ 1 ]
340 return None
342 def _get_tag_commit(
343 self, repo: __.typx.Any, commit_sha: bytes
344 ) -> __.typx.Any:
345 ''' Gets commit object for a tag, handling annotated tags. '''
346 try:
347 commit = repo[ commit_sha ]
348 while hasattr( commit, 'object' ):
349 # object attribute is a tuple (class, sha)
350 commit = repo[ commit.object[ 1 ] ]
351 except Exception:
352 return None
353 else:
354 return commit
356 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None:
357 ''' Checks out a specific reference by cloning with branch param. '''
358 from dulwich.repo import Repo
359 try:
360 repo = Repo( str( repo_dir ) )
361 except Exception as exception:
362 raise GitRefAbsence( ref, str( repo_dir ) ) from exception
363 ref_bytes = ref.encode( )
364 tag_ref = f"refs/tags/{ref}".encode( )
365 branch_ref = f"refs/heads/{ref}".encode( )
366 if tag_ref in repo.refs or branch_ref in repo.refs:
367 return
368 try:
369 repo[ ref_bytes ]
370 except KeyError:
371 self._raise_ref_not_found( ref, str( repo_dir ) )
373 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None:
374 ''' Raises GitRefAbsence for invalid reference. '''
375 raise GitRefAbsence( ref, repo_dir )
377 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None:
378 ''' Raises GitSubdirectoryAbsence for missing subdirectory. '''
379 raise GitSubdirectoryAbsence( subdir, source_spec )
381 def _detect_git_host( self, git_url: str ) -> __.typx.Optional[ str ]:
382 ''' Detects Git hosting provider from URL.
384 Returns 'github', 'gitlab', or None for other providers.
385 '''
386 if git_url.startswith( 'git@' ):
387 parts = git_url.split( '@', 1 )
388 if len( parts ) > 1:
389 host_part = parts[ 1 ].split( ':', 1 )[ 0 ]
390 if 'github.com' in host_part: return 'github'
391 if 'gitlab.com' in host_part: return 'gitlab'
392 else:
393 parsed = __.urlparse.urlparse( git_url )
394 hostname = parsed.netloc.lower( )
395 if 'github.com' in hostname: return 'github'
396 if 'gitlab.com' in hostname: return 'gitlab'
397 return None
399 def _acquire_github_authentication_token(
400 self
401 ) -> __.typx.Optional[ str ]:
402 ''' Acquires GitHub authentication token from environment or gh CLI.
404 Checks GITHUB_TOKEN environment variable first, then attempts
405 to retrieve token from gh CLI. Returns None if neither source
406 is available.
407 '''
408 token = __.os.environ.get( 'GITHUB_TOKEN' )
409 if token: return token
410 try:
411 result = __.subprocess.run(
412 [ 'gh', 'auth', 'token' ],
413 capture_output = True,
414 text = True,
415 timeout = 5,
416 check = False )
417 if result.returncode == 0:
418 return result.stdout.strip( )
419 except ( FileNotFoundError, __.subprocess.TimeoutExpired ):
420 pass
421 return None
423 def _acquire_gitlab_authentication_token(
424 self
425 ) -> __.typx.Optional[ str ]:
426 ''' Acquires GitLab authentication token from environment.
428 Checks GITLAB_TOKEN environment variable. Returns None if not
429 available.
430 '''
431 return __.os.environ.get( 'GITLAB_TOKEN' )
433 def _retrieve_github_tags(
434 self, owner: str, repository: str
435 ) -> __.typx.Optional[ list[ GitApiTag ] ]:
436 ''' Retrieves tags from GitHub API.
438 Returns list of tag dictionaries or None on failure. Each tag
439 contains 'name' and 'commit' fields.
440 '''
441 token = self._acquire_github_authentication_token( )
442 url = f"https://api.github.com/repos/{owner}/{repository}/tags"
443 request = __.urlreq.Request( url )
444 if token:
445 request.add_header( 'Authorization', f"token {token}" )
446 request.add_header( 'Accept', 'application/vnd.github.v3+json' )
447 try:
448 with __.urlreq.urlopen( request, timeout = 10 ) as response:
449 return __.json.loads( response.read( ) )
450 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ):
451 return None
453 def _retrieve_gitlab_tags(
454 self, owner: str, repository: str
455 ) -> __.typx.Optional[ list[ GitApiTag ] ]:
456 ''' Retrieves tags from GitLab API.
458 Returns list of tag dictionaries or None on failure. Each tag
459 contains 'name' and 'commit' fields.
460 '''
461 token = self._acquire_gitlab_authentication_token( )
462 project_path = f"{owner}%2F{repository}"
463 url = (
464 f"https://gitlab.com/api/v4/projects/{project_path}/"
465 f"repository/tags" )
466 request = __.urlreq.Request( url )
467 if token:
468 request.add_header( 'PRIVATE-TOKEN', token )
469 try:
470 with __.urlreq.urlopen( request, timeout = 10 ) as response:
471 return __.json.loads( response.read( ) )
472 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ):
473 return None
475 def _extract_repository_information(
476 self, git_url: str
477 ) -> __.typx.Optional[ tuple[ str, str ] ]:
478 ''' Extracts owner and repository name from Git URL.
480 Returns tuple of (owner, repository) or None if URL format is
481 not recognized. Handles both SSH (git@host:owner/repo) and
482 HTTPS (https://host/owner/repo) formats.
483 '''
484 host = self._detect_git_host( git_url )
485 if host is None: return None
486 path = None
487 if git_url.startswith( 'git@' ):
488 parts = git_url.split( ':', maxsplit = 1 )
489 path = parts[ 1 ] if len( parts ) > 1 else None
490 else:
491 parsed = __.urlparse.urlparse( git_url )
492 path = parsed.path.lstrip( '/' )
493 if path is None: return None
494 path = path.removesuffix( '.git' )
495 path_parts = path.split( '/', maxsplit = 1 )
496 if len( path_parts ) > 1:
497 return ( path_parts[ 0 ], path_parts[ 1 ] )
498 return None
500 def _select_latest_tag_from_api(
501 self,
502 tags: list[ GitApiTag ],
503 tag_prefix: __.Absential[ str ] = __.absent,
504 ) -> __.typx.Optional[ str ]:
505 ''' Selects latest tag from API results by semantic version.
507 Filters by tag prefix if provided, then selects tag with
508 highest semantic version. Returns None if no valid version
509 tags are found.
510 '''
511 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ]
512 for tag in tags:
513 tag_name = tag[ 'name' ]
514 version = self._extract_version( tag_name, tag_prefix )
515 if version is not None:
516 versioned_tags.append( ( version, tag_name ) )
517 if versioned_tags:
518 versioned_tags.sort( reverse = True )
519 return versioned_tags[ 0 ][ 1 ]
520 return None
522 def _resolve_latest_tag_via_api(
523 self,
524 git_url: str,
525 tag_prefix: __.Absential[ str ] = __.absent,
526 ) -> __.typx.Optional[ str ]:
527 ''' Resolves latest tag using GitHub or GitLab API.
529 Returns tag name or None if API resolution fails or is not
530 applicable.
531 '''
532 host = self._detect_git_host( git_url )
533 if host is None: return None
534 repo_info = self._extract_repository_information( git_url )
535 if repo_info is None: return None
536 owner, repository = repo_info
537 if host == 'github':
538 tags = self._retrieve_github_tags( owner, repository )
539 elif host == 'gitlab':
540 tags = self._retrieve_gitlab_tags( owner, repository )
541 else:
542 return None
543 if tags is None: return None
544 return self._select_latest_tag_from_api( tags, tag_prefix )