Coverage for sources/agentsmgr/sources/git.py: 11%
260 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 02:08 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 02:08 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Git-based source handler with Dulwich.
23 This module provides source resolution for Git repositories, supporting
24 various URL schemes and subdirectory specifications via fragment syntax.
25'''
28import dulwich.porcelain as _dulwich_porcelain
30from . import __
31from . import base as _base
34GitApiTag: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ]
37_scribe = __.provide_scribe( __name__ )
40class GitLocation( __.immut.DataclassObject ):
41 ''' Git source location with URL, optional ref, and optional subdir. '''
42 git_url: str
43 ref: __.typx.Optional[ str ] = None
44 subdir: __.typx.Optional[ str ] = None
47class GitCloneFailure( __.Omnierror, OSError ):
48 ''' Git repository cloning operation failure. '''
50 def __init__( self, git_url: str, reason: str = '' ):
51 self.git_url = git_url
52 self.reason = reason
53 message = f"Failed to clone Git repository: {git_url}"
54 if reason: message = f"{message} ({reason})"
55 super( ).__init__( message )
58class GitSubdirectoryAbsence( __.DataSourceNoSupport ):
59 ''' Git repository subdirectory absence. '''
61 def __init__( self, subdir: str, source_spec: str ):
62 self.subdir = subdir
63 self.source_spec = source_spec
64 message = (
65 f"Subdirectory '{subdir}' not found in repository: {source_spec}" )
66 super( ).__init__( message )
69class GitRefAbsence( __.DataSourceNoSupport ):
70 ''' Git reference absence in repository. '''
72 def __init__( self, ref: str, git_url: str ):
73 self.ref = ref
74 self.git_url = git_url
75 message = f"Git ref '{ref}' not found in repository: {git_url}"
76 super( ).__init__( message )
79@_base.source_handler([
80 'github:', 'gitlab:', 'git+https:',
81 'https://github.com/', 'https://gitlab.com/', 'git@'
82])
83class GitSourceHandler:
84 ''' Handles Git repository source resolution with Dulwich.
86 Supports multiple URL schemes and converts them to Git URLs for
87 cloning. Implements fragment syntax for subdirectory specification.
88 '''
90 def resolve(
91 self,
92 source_spec: str,
93 tag_prefix: __.typx.Annotated[
94 __.Absential[ str ],
95 __.ddoc.Doc(
96 "Prefix for filtering version tags when no explicit ref "
97 "is specified. Only tags starting with this prefix will be "
98 "considered, and the prefix will be stripped before version "
99 "parsing." ),
100 ] = __.absent,
101 ) -> __.Path:
102 ''' Resolves Git source to local temporary directory.
104 Clones the repository to a temporary location and returns the
105 path to the specified subdirectory or repository root.
106 '''
107 location = self._parse_git_url( source_spec )
108 temp_dir = self._create_temp_directory( )
109 try:
110 self._clone_repository( location, temp_dir, tag_prefix )
111 if location.subdir:
112 subdir_path = temp_dir / location.subdir
113 if not subdir_path.exists( ):
114 self._raise_subdir_not_found(
115 location.subdir, source_spec )
116 result_path = subdir_path
117 else:
118 result_path = temp_dir
119 except Exception as exception:
120 # Clean up on failure
121 __.shutil.rmtree( temp_dir, ignore_errors = True )
122 if isinstance( exception, __.DataSourceNoSupport ):
123 raise
124 raise GitCloneFailure(
125 location.git_url, str( exception ) ) from exception
126 else:
127 return result_path
129 def _parse_git_url( self, source_spec: str ) -> GitLocation:
130 ''' Parses source specification into Git URL, ref, and subdirectory.
132 Supports URL scheme mapping and fragment syntax for subdirectory
133 specification. Also supports @ref syntax for Git references.
134 '''
135 url_part = source_spec
136 ref = None
137 subdir = None
138 if '#' in url_part:
139 url_part, subdir = url_part.split( '#', 1 )
140 if '@' in url_part:
141 url_part, ref = url_part.split( '@', 1 )
142 # Map URL schemes to Git URLs
143 if url_part.startswith( 'github:' ):
144 repo_path = url_part[ len( 'github:' ): ]
145 git_url = f"https://github.com/{repo_path}.git"
146 elif url_part.startswith( 'gitlab:' ):
147 repo_path = url_part[ len( 'gitlab:' ): ]
148 git_url = f"https://gitlab.com/{repo_path}.git"
149 elif url_part.startswith( 'git+https:' ):
150 git_url = url_part[ len( 'git+' ): ]
151 elif url_part.startswith( 'https://github.com/' ):
152 # Convert GitHub web URLs to Git URLs
153 if url_part.endswith( '.git' ):
154 git_url = url_part
155 else:
156 git_url = f"{url_part.rstrip( '/' )}.git"
157 elif url_part.startswith( 'https://gitlab.com/' ):
158 # Convert GitLab web URLs to Git URLs
159 if url_part.endswith( '.git' ):
160 git_url = url_part
161 else:
162 git_url = f"{url_part.rstrip( '/' )}.git"
163 else:
164 # Direct git URLs (git@github.com:user/repo.git)
165 git_url = url_part
167 return GitLocation( git_url = git_url, ref = ref, subdir = subdir )
169 def _create_temp_directory( self ) -> __.Path:
170 ''' Creates temporary directory for repository cloning. '''
171 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' )
172 return __.Path( temp_dir )
174 def _clone_repository(
175 self,
176 location: GitLocation,
177 target_dir: __.Path,
178 tag_prefix: __.Absential[ str ] = __.absent,
179 ) -> None:
180 ''' Clones Git repository using Dulwich with optimizations.
182 For GitHub/GitLab repositories without explicit ref, attempts
183 API-based tag resolution followed by shallow clone. Falls back
184 to standard full clone on any failure.
185 '''
186 try:
187 if location.ref is None:
188 cloned = self._attempt_optimized_clone(
189 location, target_dir, tag_prefix )
190 if cloned: return
191 self._perform_standard_clone( location, target_dir, tag_prefix )
192 except Exception as exception:
193 error_msg = str( exception ).lower( )
194 if location.ref is not None and (
195 'not found' in error_msg or 'does not exist' in error_msg
196 ):
197 raise GitRefAbsence(
198 location.ref, location.git_url ) from exception
199 raise GitCloneFailure(
200 location.git_url, str( exception ) ) from exception
202 def _attempt_optimized_clone(
203 self,
204 location: GitLocation,
205 target_dir: __.Path,
206 tag_prefix: __.Absential[ str ] = __.absent,
207 ) -> bool:
208 ''' Attempts optimized clone using API and shallow clone.
210 Returns True if successful, False if optimization should fall
211 back to standard clone.
212 '''
213 latest_tag = self._resolve_latest_tag_via_api(
214 location.git_url, tag_prefix )
215 if latest_tag is None: return False
216 _scribe.info(
217 f"Resolved latest tag '{latest_tag}' via API for repository: "
218 f"{location.git_url}" )
219 try:
220 self._perform_shallow_clone(
221 location.git_url, target_dir, latest_tag )
222 except Exception:
223 _scribe.info(
224 f"Shallow clone failed, falling back to standard clone for "
225 f"repository: {location.git_url}" )
226 return False
227 else:
228 _scribe.info(
229 f"Performed shallow clone for tag '{latest_tag}' in "
230 f"repository: {location.git_url}" )
231 return True
233 def _perform_shallow_clone(
234 self, git_url: str, target_dir: __.Path, ref: str
235 ) -> None:
236 ''' Performs shallow clone of specific ref using Dulwich.
238 Uses depth=1 and branch parameters for efficient cloning.
239 '''
240 with open( __.os.devnull, 'wb' ) as devnull:
241 _dulwich_porcelain.clone(
242 git_url,
243 str( target_dir ),
244 bare = False,
245 depth = 1,
246 branch = ref.encode( ),
247 errstream = devnull,
248 )
250 def _perform_standard_clone(
251 self,
252 location: GitLocation,
253 target_dir: __.Path,
254 tag_prefix: __.Absential[ str ] = __.absent,
255 ) -> None:
256 ''' Performs standard full clone with optional ref checkout.
258 This is the fallback path for repositories that cannot use
259 API optimization or when explicit ref is provided.
260 '''
261 with open( __.os.devnull, 'wb' ) as devnull:
262 _dulwich_porcelain.clone(
263 location.git_url,
264 str( target_dir ),
265 bare = False,
266 depth = None,
267 errstream = devnull,
268 )
269 if location.ref is None:
270 latest_tag = self._get_latest_tag( target_dir, tag_prefix )
271 if latest_tag:
272 _scribe.info(
273 f"Selected latest tag '{latest_tag}' for repository: "
274 f"{location.git_url}" )
275 self._checkout_ref( target_dir, latest_tag )
276 else:
277 _scribe.info(
278 f"No version tags found, using default branch for "
279 f"repository: {location.git_url}" )
280 else:
281 _scribe.info(
282 f"Using explicit ref '{location.ref}' for repository: "
283 f"{location.git_url}" )
284 self._checkout_ref( target_dir, location.ref )
286 def _extract_version(
287 self,
288 tag_name: str,
289 prefix: __.Absential[ str ] = __.absent,
290 ) -> __.typx.Optional[ __.Version ]:
291 ''' Extracts and parses semantic version from tag name.
293 If prefix is provided, only processes tags that start with the
294 prefix and strips it before parsing. If prefix is absent, tries
295 parsing the tag name directly. Returns None if tag cannot be
296 parsed as a valid semantic version.
297 '''
298 version_string = tag_name
299 if not __.is_absent( prefix ):
300 if not tag_name.startswith( prefix ):
301 return None
302 version_string = tag_name[ len( prefix ): ]
303 try:
304 return __.Version( version_string )
305 except __.InvalidVersion:
306 return None
308 def _get_latest_tag(
309 self,
310 repo_dir: __.Path,
311 tag_prefix: __.Absential[ str ] = __.absent,
312 ) -> __.typx.Optional[ str ]:
313 ''' Gets the latest tag from the repository by semantic version.
315 Optionally filters tags by prefix before selecting latest.
316 Uses packaging.version.Version for semantic comparison. If no
317 tags can be parsed as versions, returns None (falls back to
318 default branch).
319 '''
320 from dulwich.repo import Repo
321 try:
322 repo = Repo( str( repo_dir ) )
323 except Exception:
324 return None
325 try:
326 tag_refs = repo.refs.as_dict( b"refs/tags" )
327 except Exception:
328 return None
329 if not tag_refs:
330 return None
331 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ]
332 for tag_name_bytes, commit_sha in tag_refs.items( ):
333 commit = self._get_tag_commit( repo, commit_sha )
334 if commit is not None:
335 tag_name = tag_name_bytes.decode( 'utf-8' )
336 version = self._extract_version( tag_name, tag_prefix )
337 if version is not None:
338 versioned_tags.append( ( version, tag_name ) )
339 if versioned_tags:
340 versioned_tags.sort( reverse = True )
341 return versioned_tags[ 0 ][ 1 ]
342 return None
344 def _get_tag_commit(
345 self, repo: __.typx.Any, commit_sha: bytes
346 ) -> __.typx.Any:
347 ''' Gets commit object for a tag, handling annotated tags. '''
348 try:
349 commit = repo[ commit_sha ]
350 while hasattr( commit, 'object' ):
351 # object attribute is a tuple (class, sha)
352 commit = repo[ commit.object[ 1 ] ]
353 except Exception:
354 return None
355 else:
356 return commit
358 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None:
359 ''' Checks out a specific reference by cloning with branch param. '''
360 from dulwich.repo import Repo
361 try:
362 repo = Repo( str( repo_dir ) )
363 except Exception as exception:
364 raise GitRefAbsence( ref, str( repo_dir ) ) from exception
365 ref_bytes = ref.encode( )
366 tag_ref = f"refs/tags/{ref}".encode( )
367 branch_ref = f"refs/heads/{ref}".encode( )
368 if tag_ref in repo.refs or branch_ref in repo.refs:
369 return
370 try:
371 repo[ ref_bytes ]
372 except KeyError:
373 self._raise_ref_not_found( ref, str( repo_dir ) )
375 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None:
376 ''' Raises GitRefAbsence for invalid reference. '''
377 raise GitRefAbsence( ref, repo_dir )
379 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None:
380 ''' Raises GitSubdirectoryAbsence for missing subdirectory. '''
381 raise GitSubdirectoryAbsence( subdir, source_spec )
383 def _detect_git_host( self, git_url: str ) -> __.typx.Optional[ str ]:
384 ''' Detects Git hosting provider from URL.
386 Returns 'github', 'gitlab', or None for other providers.
387 '''
388 if git_url.startswith( 'git@' ):
389 parts = git_url.split( '@', 1 )
390 if len( parts ) > 1:
391 host_part = parts[ 1 ].split( ':', 1 )[ 0 ]
392 if 'github.com' in host_part: return 'github'
393 if 'gitlab.com' in host_part: return 'gitlab'
394 else:
395 parsed = __.urlparse.urlparse( git_url )
396 hostname = parsed.netloc.lower( )
397 if 'github.com' in hostname: return 'github'
398 if 'gitlab.com' in hostname: return 'gitlab'
399 return None
401 def _acquire_github_authentication_token(
402 self
403 ) -> __.typx.Optional[ str ]:
404 ''' Acquires GitHub authentication token from environment or gh CLI.
406 Checks GITHUB_TOKEN environment variable first, then attempts
407 to retrieve token from gh CLI. Returns None if neither source
408 is available.
409 '''
410 token = __.os.environ.get( 'GITHUB_TOKEN' )
411 if token: return token
412 try:
413 result = __.subprocess.run(
414 [ 'gh', 'auth', 'token' ],
415 capture_output = True,
416 text = True,
417 timeout = 5,
418 check = False )
419 if result.returncode == 0:
420 return result.stdout.strip( )
421 except ( FileNotFoundError, __.subprocess.TimeoutExpired ):
422 pass
423 return None
425 def _acquire_gitlab_authentication_token(
426 self
427 ) -> __.typx.Optional[ str ]:
428 ''' Acquires GitLab authentication token from environment.
430 Checks GITLAB_TOKEN environment variable. Returns None if not
431 available.
432 '''
433 return __.os.environ.get( 'GITLAB_TOKEN' )
435 def _retrieve_github_tags(
436 self, owner: str, repository: str
437 ) -> __.typx.Optional[ list[ GitApiTag ] ]:
438 ''' Retrieves tags from GitHub API.
440 Returns list of tag dictionaries or None on failure. Each tag
441 contains 'name' and 'commit' fields.
442 '''
443 token = self._acquire_github_authentication_token( )
444 url = f"https://api.github.com/repos/{owner}/{repository}/tags"
445 request = __.urlreq.Request( url )
446 if token:
447 request.add_header( 'Authorization', f"token {token}" )
448 request.add_header( 'Accept', 'application/vnd.github.v3+json' )
449 try:
450 with __.urlreq.urlopen( request, timeout = 10 ) as response:
451 return __.json.loads( response.read( ) )
452 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ):
453 return None
455 def _retrieve_gitlab_tags(
456 self, owner: str, repository: str
457 ) -> __.typx.Optional[ list[ GitApiTag ] ]:
458 ''' Retrieves tags from GitLab API.
460 Returns list of tag dictionaries or None on failure. Each tag
461 contains 'name' and 'commit' fields.
462 '''
463 token = self._acquire_gitlab_authentication_token( )
464 project_path = f"{owner}%2F{repository}"
465 url = (
466 f"https://gitlab.com/api/v4/projects/{project_path}/"
467 f"repository/tags" )
468 request = __.urlreq.Request( url )
469 if token:
470 request.add_header( 'PRIVATE-TOKEN', token )
471 try:
472 with __.urlreq.urlopen( request, timeout = 10 ) as response:
473 return __.json.loads( response.read( ) )
474 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ):
475 return None
477 def _extract_repository_information(
478 self, git_url: str
479 ) -> __.typx.Optional[ tuple[ str, str ] ]:
480 ''' Extracts owner and repository name from Git URL.
482 Returns tuple of (owner, repository) or None if URL format is
483 not recognized. Handles both SSH (git@host:owner/repo) and
484 HTTPS (https://host/owner/repo) formats.
485 '''
486 host = self._detect_git_host( git_url )
487 if host is None: return None
488 path = None
489 if git_url.startswith( 'git@' ):
490 parts = git_url.split( ':', maxsplit = 1 )
491 path = parts[ 1 ] if len( parts ) > 1 else None
492 else:
493 parsed = __.urlparse.urlparse( git_url )
494 path = parsed.path.lstrip( '/' )
495 if path is None: return None
496 path = path.removesuffix( '.git' )
497 path_parts = path.split( '/', maxsplit = 1 )
498 if len( path_parts ) > 1:
499 return ( path_parts[ 0 ], path_parts[ 1 ] )
500 return None
502 def _select_latest_tag_from_api(
503 self,
504 tags: list[ GitApiTag ],
505 tag_prefix: __.Absential[ str ] = __.absent,
506 ) -> __.typx.Optional[ str ]:
507 ''' Selects latest tag from API results by semantic version.
509 Filters by tag prefix if provided, then selects tag with
510 highest semantic version. Returns None if no valid version
511 tags are found.
512 '''
513 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ]
514 for tag in tags:
515 tag_name = tag[ 'name' ]
516 version = self._extract_version( tag_name, tag_prefix )
517 if version is not None:
518 versioned_tags.append( ( version, tag_name ) )
519 if versioned_tags:
520 versioned_tags.sort( reverse = True )
521 return versioned_tags[ 0 ][ 1 ]
522 return None
524 def _resolve_latest_tag_via_api(
525 self,
526 git_url: str,
527 tag_prefix: __.Absential[ str ] = __.absent,
528 ) -> __.typx.Optional[ str ]:
529 ''' Resolves latest tag using GitHub or GitLab API.
531 Returns tag name or None if API resolution fails or is not
532 applicable.
533 '''
534 host = self._detect_git_host( git_url )
535 if host is None: return None
536 repo_info = self._extract_repository_information( git_url )
537 if repo_info is None: return None
538 owner, repository = repo_info
539 if host == 'github':
540 tags = self._retrieve_github_tags( owner, repository )
541 elif host == 'gitlab':
542 tags = self._retrieve_gitlab_tags( owner, repository )
543 else:
544 return None
545 if tags is None: return None
546 return self._select_latest_tag_from_api( tags, tag_prefix )