Coverage for sources / agentsmgr / sources / git.py: 11%
260 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-30 00:03 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-30 00:03 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Git-based source handler with Dulwich.
23 This module provides source resolution for Git repositories, supporting
24 various URL schemes and subdirectory specifications via fragment syntax.
25'''
28import dulwich.porcelain as _dulwich_porcelain
30from . import __
31from . import base as _base
34GitApiTag: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
37_scribe = __.provide_scribe( __name__ )
40class GitLocation( __.immut.DataclassObject ):
41 ''' Git source location with URL, optional ref, and optional subdir. '''
42 git_url: str
43 ref: __.typx.Optional[ str ] = None
44 subdir: __.typx.Optional[ str ] = None
47class GitCloneFailure( __.Omnierror, OSError ):
48 ''' Git repository cloning operation failure. '''
50 def __init__( self, git_url: str, reason: str = '' ):
51 self.git_url = git_url
52 self.reason = reason
53 message = f"Failed to clone Git repository: {git_url}"
54 if reason: message = f"{message} ({reason})"
55 super( ).__init__( message )
58class GitSubdirectoryAbsence( __.DataSourceNoSupport ):
59 ''' Git repository subdirectory absence. '''
61 def __init__( self, subdir: str, source_spec: str ):
62 self.subdir = subdir
63 self.source_spec = source_spec
64 message = (
65 f"Subdirectory '{subdir}' not found in repository: {source_spec}" )
66 super( ).__init__( message )
69class GitRefAbsence( __.DataSourceNoSupport ):
70 ''' Git reference absence in repository. '''
72 def __init__( self, ref: str, git_url: str ):
73 self.ref = ref
74 self.git_url = git_url
75 message = f"Git ref '{ref}' not found in repository: {git_url}"
76 super( ).__init__( message )
79@_base.source_handler([
80 'github', 'gitlab', 'git+https', 'https', 'git@'
81])
82class GitSourceHandler:
83 ''' Handles Git repository source resolution with Dulwich.
85 Supports multiple URL schemes and converts them to Git URLs for
86 cloning. Implements fragment syntax for subdirectory specification.
87 '''
89 def resolve(
90 self,
91 source_spec: str,
92 tag_prefix: __.typx.Annotated[
93 __.Absential[ str ],
94 __.ddoc.Doc(
95 "Prefix for filtering version tags when no explicit ref "
96 "is specified. Only tags starting with this prefix will be "
97 "considered, and the prefix will be stripped before version "
98 "parsing." ),
99 ] = __.absent,
100 ) -> __.Path:
101 ''' Resolves Git source to local temporary directory.
103 Clones the repository to a temporary location and returns the
104 path to the specified subdirectory or repository root.
105 '''
106 location = self._parse_git_url( source_spec )
107 temp_dir = self._create_temp_directory( )
108 try:
109 self._clone_repository( location, temp_dir, tag_prefix )
110 if location.subdir:
111 subdir_path = temp_dir / location.subdir
112 if not subdir_path.exists( ):
113 self._raise_subdir_not_found(
114 location.subdir, source_spec )
115 result_path = subdir_path
116 else:
117 result_path = temp_dir
118 except Exception as exception:
119 # Clean up on failure
120 __.shutil.rmtree( temp_dir, ignore_errors = True )
121 if isinstance( exception, __.DataSourceNoSupport ):
122 raise
123 raise GitCloneFailure(
124 location.git_url, str( exception ) ) from exception
125 else:
126 return result_path
128 def _parse_git_url( self, source_spec: str ) -> GitLocation:
129 ''' Parses source specification into Git URL, ref, and subdirectory.
131 Supports URL scheme mapping and fragment syntax for subdirectory
132 specification. Also supports @ref syntax for Git references.
133 '''
134 url_part = source_spec
135 ref = None
136 subdir = None
137 if '#' in url_part:
138 url_part, subdir = url_part.split( '#', 1 )
139 if '@' in url_part:
140 url_part, ref = url_part.split( '@', 1 )
141 # Map URL schemes to Git URLs
142 if url_part.startswith( 'github:' ):
143 repo_path = url_part[ len( 'github:' ): ]
144 git_url = f"https://github.com/{repo_path}.git"
145 elif url_part.startswith( 'gitlab:' ):
146 repo_path = url_part[ len( 'gitlab:' ): ]
147 git_url = f"https://gitlab.com/{repo_path}.git"
148 elif url_part.startswith( 'git+https:' ):
149 git_url = url_part[ len( 'git+' ): ]
150 elif url_part.startswith( 'https://github.com/' ):
151 # Convert GitHub web URLs to Git URLs
152 if url_part.endswith( '.git' ):
153 git_url = url_part
154 else:
155 git_url = f"{url_part.rstrip( '/' )}.git"
156 elif url_part.startswith( 'https://gitlab.com/' ):
157 # Convert GitLab web URLs to Git URLs
158 if url_part.endswith( '.git' ):
159 git_url = url_part
160 else:
161 git_url = f"{url_part.rstrip( '/' )}.git"
162 else:
163 # Direct git URLs (git@github.com:user/repo.git)
164 git_url = url_part
166 return GitLocation( git_url = git_url, ref = ref, subdir = subdir )
168 def _create_temp_directory( self ) -> __.Path:
169 ''' Creates temporary directory for repository cloning. '''
170 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' )
171 return __.Path( temp_dir )
173 def _clone_repository(
174 self,
175 location: GitLocation,
176 target_dir: __.Path,
177 tag_prefix: __.Absential[ str ] = __.absent,
178 ) -> None:
179 ''' Clones Git repository using Dulwich with optimizations.
181 For GitHub/GitLab repositories without explicit ref, attempts
182 API-based tag resolution followed by shallow clone. Falls back
183 to standard full clone on any failure.
184 '''
185 try:
186 if location.ref is None:
187 cloned = self._attempt_optimized_clone(
188 location, target_dir, tag_prefix )
189 if cloned: return
190 self._perform_standard_clone( location, target_dir, tag_prefix )
191 except Exception as exception:
192 error_msg = str( exception ).lower( )
193 if location.ref is not None and (
194 'not found' in error_msg or 'does not exist' in error_msg
195 ):
196 raise GitRefAbsence(
197 location.ref, location.git_url ) from exception
198 raise GitCloneFailure(
199 location.git_url, str( exception ) ) from exception
201 def _attempt_optimized_clone(
202 self,
203 location: GitLocation,
204 target_dir: __.Path,
205 tag_prefix: __.Absential[ str ] = __.absent,
206 ) -> bool:
207 ''' Attempts optimized clone using API and shallow clone.
209 Returns True if successful, False if optimization should fall
210 back to standard clone.
211 '''
212 latest_tag = self._resolve_latest_tag_via_api(
213 location.git_url, tag_prefix )
214 if latest_tag is None: return False
215 _scribe.info(
216 f"Resolved latest tag '{latest_tag}' via API for repository: "
217 f"{location.git_url}" )
218 try:
219 self._perform_shallow_clone(
220 location.git_url, target_dir, latest_tag )
221 except Exception:
222 _scribe.info(
223 f"Shallow clone failed, falling back to standard clone for "
224 f"repository: {location.git_url}" )
225 return False
226 else:
227 _scribe.info(
228 f"Performed shallow clone for tag '{latest_tag}' in "
229 f"repository: {location.git_url}" )
230 return True
232 def _perform_shallow_clone(
233 self, git_url: str, target_dir: __.Path, ref: str
234 ) -> None:
235 ''' Performs shallow clone of specific ref using Dulwich.
237 Uses depth=1 and branch parameters for efficient cloning.
238 '''
239 with open( __.os.devnull, 'wb' ) as devnull:
240 _dulwich_porcelain.clone(
241 git_url,
242 str( target_dir ),
243 bare = False,
244 depth = 1,
245 branch = ref.encode( ),
246 errstream = devnull,
247 )
249 def _perform_standard_clone(
250 self,
251 location: GitLocation,
252 target_dir: __.Path,
253 tag_prefix: __.Absential[ str ] = __.absent,
254 ) -> None:
255 ''' Performs standard full clone with optional ref checkout.
257 This is the fallback path for repositories that cannot use
258 API optimization or when explicit ref is provided.
259 '''
260 with open( __.os.devnull, 'wb' ) as devnull:
261 _dulwich_porcelain.clone(
262 location.git_url,
263 str( target_dir ),
264 bare = False,
265 depth = None,
266 errstream = devnull,
267 )
268 if location.ref is None:
269 latest_tag = self._get_latest_tag( target_dir, tag_prefix )
270 if latest_tag:
271 _scribe.info(
272 f"Selected latest tag '{latest_tag}' for repository: "
273 f"{location.git_url}" )
274 self._checkout_ref( target_dir, latest_tag )
275 else:
276 _scribe.info(
277 f"No version tags found, using default branch for "
278 f"repository: {location.git_url}" )
279 else:
280 _scribe.info(
281 f"Using explicit ref '{location.ref}' for repository: "
282 f"{location.git_url}" )
283 self._checkout_ref( target_dir, location.ref )
285 def _extract_version(
286 self,
287 tag_name: str,
288 prefix: __.Absential[ str ] = __.absent,
289 ) -> __.typx.Optional[ __.Version ]:
290 ''' Extracts and parses semantic version from tag name.
292 If prefix is provided, only processes tags that start with the
293 prefix and strips it before parsing. If prefix is absent, tries
294 parsing the tag name directly. Returns None if tag cannot be
295 parsed as a valid semantic version.
296 '''
297 version_string = tag_name
298 if not __.is_absent( prefix ):
299 if not tag_name.startswith( prefix ):
300 return None
301 version_string = tag_name[ len( prefix ): ]
302 try:
303 return __.Version( version_string )
304 except __.InvalidVersion:
305 return None
307 def _get_latest_tag(
308 self,
309 repo_dir: __.Path,
310 tag_prefix: __.Absential[ str ] = __.absent,
311 ) -> __.typx.Optional[ str ]:
312 ''' Gets the latest tag from the repository by semantic version.
314 Optionally filters tags by prefix before selecting latest.
315 Uses packaging.version.Version for semantic comparison. If no
316 tags can be parsed as versions, returns None (falls back to
317 default branch).
318 '''
319 from dulwich.repo import Repo
320 try:
321 repo = Repo( str( repo_dir ) )
322 except Exception:
323 return None
324 try:
325 tag_refs = repo.refs.as_dict( b"refs/tags" )
326 except Exception:
327 return None
328 if not tag_refs:
329 return None
330 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ]
331 for tag_name_bytes, commit_sha in tag_refs.items( ):
332 commit = self._get_tag_commit( repo, commit_sha )
333 if commit is not None:
334 tag_name = tag_name_bytes.decode( 'utf-8' )
335 version = self._extract_version( tag_name, tag_prefix )
336 if version is not None:
337 versioned_tags.append( ( version, tag_name ) )
338 if versioned_tags:
339 versioned_tags.sort( reverse = True )
340 return versioned_tags[ 0 ][ 1 ]
341 return None
343 def _get_tag_commit(
344 self, repo: __.typx.Any, commit_sha: bytes
345 ) -> __.typx.Any:
346 ''' Gets commit object for a tag, handling annotated tags. '''
347 try:
348 commit = repo[ commit_sha ]
349 while hasattr( commit, 'object' ):
350 # object attribute is a tuple (class, sha)
351 commit = repo[ commit.object[ 1 ] ]
352 except Exception:
353 return None
354 else:
355 return commit
357 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None:
358 ''' Checks out a specific reference by cloning with branch param. '''
359 from dulwich.repo import Repo
360 try:
361 repo = Repo( str( repo_dir ) )
362 except Exception as exception:
363 raise GitRefAbsence( ref, str( repo_dir ) ) from exception
364 ref_bytes = ref.encode( )
365 tag_ref = f"refs/tags/{ref}".encode( )
366 branch_ref = f"refs/heads/{ref}".encode( )
367 if tag_ref in repo.refs or branch_ref in repo.refs:
368 return
369 try:
370 repo[ ref_bytes ]
371 except KeyError:
372 self._raise_ref_not_found( ref, str( repo_dir ) )
374 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None:
375 ''' Raises GitRefAbsence for invalid reference. '''
376 raise GitRefAbsence( ref, repo_dir )
378 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None:
379 ''' Raises GitSubdirectoryAbsence for missing subdirectory. '''
380 raise GitSubdirectoryAbsence( subdir, source_spec )
382 def _detect_git_host( self, git_url: str ) -> __.typx.Optional[ str ]:
383 ''' Detects Git hosting provider from URL.
385 Returns 'github', 'gitlab', or None for other providers.
386 '''
387 if git_url.startswith( 'git@' ):
388 parts = git_url.split( '@', 1 )
389 if len( parts ) > 1:
390 host_part = parts[ 1 ].split( ':', 1 )[ 0 ]
391 if 'github.com' in host_part: return 'github'
392 if 'gitlab.com' in host_part: return 'gitlab'
393 else:
394 parsed = __.urlparse.urlparse( git_url )
395 hostname = parsed.netloc.lower( )
396 if 'github.com' in hostname: return 'github'
397 if 'gitlab.com' in hostname: return 'gitlab'
398 return None
400 def _acquire_github_authentication_token(
401 self
402 ) -> __.typx.Optional[ str ]:
403 ''' Acquires GitHub authentication token from environment or gh CLI.
405 Checks GITHUB_TOKEN environment variable first, then attempts
406 to retrieve token from gh CLI. Returns None if neither source
407 is available.
408 '''
409 token = __.os.environ.get( 'GITHUB_TOKEN' )
410 if token: return token
411 try:
412 result = __.subprocess.run(
413 [ 'gh', 'auth', 'token' ],
414 capture_output = True,
415 text = True,
416 timeout = 5,
417 check = False )
418 if result.returncode == 0:
419 return result.stdout.strip( )
420 except ( FileNotFoundError, __.subprocess.TimeoutExpired ):
421 pass
422 return None
424 def _acquire_gitlab_authentication_token(
425 self
426 ) -> __.typx.Optional[ str ]:
427 ''' Acquires GitLab authentication token from environment.
429 Checks GITLAB_TOKEN environment variable. Returns None if not
430 available.
431 '''
432 return __.os.environ.get( 'GITLAB_TOKEN' )
434 def _retrieve_github_tags(
435 self, owner: str, repository: str
436 ) -> __.typx.Optional[ list[ GitApiTag ] ]:
437 ''' Retrieves tags from GitHub API.
439 Returns list of tag dictionaries or None on failure. Each tag
440 contains 'name' and 'commit' fields.
441 '''
442 token = self._acquire_github_authentication_token( )
443 url = f"https://api.github.com/repos/{owner}/{repository}/tags"
444 request = __.urlreq.Request( url )
445 if token:
446 request.add_header( 'Authorization', f"token {token}" )
447 request.add_header( 'Accept', 'application/vnd.github.v3+json' )
448 try:
449 with __.urlreq.urlopen( request, timeout = 10 ) as response:
450 return __.json.loads( response.read( ) )
451 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ):
452 return None
454 def _retrieve_gitlab_tags(
455 self, owner: str, repository: str
456 ) -> __.typx.Optional[ list[ GitApiTag ] ]:
457 ''' Retrieves tags from GitLab API.
459 Returns list of tag dictionaries or None on failure. Each tag
460 contains 'name' and 'commit' fields.
461 '''
462 token = self._acquire_gitlab_authentication_token( )
463 project_path = f"{owner}%2F{repository}"
464 url = (
465 f"https://gitlab.com/api/v4/projects/{project_path}/"
466 f"repository/tags" )
467 request = __.urlreq.Request( url )
468 if token:
469 request.add_header( 'PRIVATE-TOKEN', token )
470 try:
471 with __.urlreq.urlopen( request, timeout = 10 ) as response:
472 return __.json.loads( response.read( ) )
473 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ):
474 return None
476 def _extract_repository_information(
477 self, git_url: str
478 ) -> __.typx.Optional[ tuple[ str, str ] ]:
479 ''' Extracts owner and repository name from Git URL.
481 Returns tuple of (owner, repository) or None if URL format is
482 not recognized. Handles both SSH (git@host:owner/repo) and
483 HTTPS (https://host/owner/repo) formats.
484 '''
485 host = self._detect_git_host( git_url )
486 if host is None: return None
487 path = None
488 if git_url.startswith( 'git@' ):
489 parts = git_url.split( ':', maxsplit = 1 )
490 path = parts[ 1 ] if len( parts ) > 1 else None
491 else:
492 parsed = __.urlparse.urlparse( git_url )
493 path = parsed.path.lstrip( '/' )
494 if path is None: return None
495 path = path.removesuffix( '.git' )
496 path_parts = path.split( '/', maxsplit = 1 )
497 if len( path_parts ) > 1:
498 return ( path_parts[ 0 ], path_parts[ 1 ] )
499 return None
501 def _select_latest_tag_from_api(
502 self,
503 tags: list[ GitApiTag ],
504 tag_prefix: __.Absential[ str ] = __.absent,
505 ) -> __.typx.Optional[ str ]:
506 ''' Selects latest tag from API results by semantic version.
508 Filters by tag prefix if provided, then selects tag with
509 highest semantic version. Returns None if no valid version
510 tags are found.
511 '''
512 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ]
513 for tag in tags:
514 tag_name = tag[ 'name' ]
515 version = self._extract_version( tag_name, tag_prefix )
516 if version is not None:
517 versioned_tags.append( ( version, tag_name ) )
518 if versioned_tags:
519 versioned_tags.sort( reverse = True )
520 return versioned_tags[ 0 ][ 1 ]
521 return None
523 def _resolve_latest_tag_via_api(
524 self,
525 git_url: str,
526 tag_prefix: __.Absential[ str ] = __.absent,
527 ) -> __.typx.Optional[ str ]:
528 ''' Resolves latest tag using GitHub or GitLab API.
530 Returns tag name or None if API resolution fails or is not
531 applicable.
532 '''
533 host = self._detect_git_host( git_url )
534 if host is None: return None
535 repo_info = self._extract_repository_information( git_url )
536 if repo_info is None: return None
537 owner, repository = repo_info
538 if host == 'github':
539 tags = self._retrieve_github_tags( owner, repository )
540 elif host == 'gitlab':
541 tags = self._retrieve_gitlab_tags( owner, repository )
542 else:
543 return None
544 if tags is None: return None
545 return self._select_latest_tag_from_api( tags, tag_prefix )