Coverage for sources/agentsmgr/sources/git.py: 11%

260 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-23 02:37 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Git-based source handler with Dulwich. 

22 

23 This module provides source resolution for Git repositories, supporting 

24 various URL schemes and subdirectory specifications via fragment syntax. 

25''' 

26 

27 

28import dulwich.porcelain as _dulwich_porcelain 

29 

30from . import __ 

31from . import base as _base 

32 

33 

34GitApiTag: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

35 

36 

37_scribe = __.provide_scribe( __name__ ) 

38 

39 

40class GitLocation( __.immut.DataclassObject ): 

41 ''' Git source location with URL, optional ref, and optional subdir. ''' 

42 git_url: str 

43 ref: __.typx.Optional[ str ] = None 

44 subdir: __.typx.Optional[ str ] = None 

45 

46 

47class GitCloneFailure( __.Omnierror, OSError ): 

48 ''' Git repository cloning operation failure. ''' 

49 

50 def __init__( self, git_url: str, reason: str = '' ): 

51 self.git_url = git_url 

52 self.reason = reason 

53 message = f"Failed to clone Git repository: {git_url}" 

54 if reason: message = f"{message} ({reason})" 

55 super( ).__init__( message ) 

56 

57 

58class GitSubdirectoryAbsence( __.DataSourceNoSupport ): 

59 ''' Git repository subdirectory absence. ''' 

60 

61 def __init__( self, subdir: str, source_spec: str ): 

62 self.subdir = subdir 

63 self.source_spec = source_spec 

64 message = ( 

65 f"Subdirectory '{subdir}' not found in repository: {source_spec}" ) 

66 super( ).__init__( message ) 

67 

68 

69class GitRefAbsence( __.DataSourceNoSupport ): 

70 ''' Git reference absence in repository. ''' 

71 

72 def __init__( self, ref: str, git_url: str ): 

73 self.ref = ref 

74 self.git_url = git_url 

75 message = f"Git ref '{ref}' not found in repository: {git_url}" 

76 super( ).__init__( message ) 

77 

78 

79@_base.source_handler([ 

80 'github', 'gitlab', 'git+https', 'https', 'git@' 

81]) 

82class GitSourceHandler: 

83 ''' Handles Git repository source resolution with Dulwich. 

84 

85 Supports multiple URL schemes and converts them to Git URLs for 

86 cloning. Implements fragment syntax for subdirectory specification. 

87 ''' 

88 

89 def resolve( 

90 self, 

91 source_spec: str, 

92 tag_prefix: __.typx.Annotated[ 

93 __.Absential[ str ], 

94 __.ddoc.Doc( 

95 "Prefix for filtering version tags when no explicit ref " 

96 "is specified. Only tags starting with this prefix will be " 

97 "considered, and the prefix will be stripped before version " 

98 "parsing." ), 

99 ] = __.absent, 

100 ) -> __.Path: 

101 ''' Resolves Git source to local temporary directory. 

102 

103 Clones the repository to a temporary location and returns the 

104 path to the specified subdirectory or repository root. 

105 ''' 

106 location = self._parse_git_url( source_spec ) 

107 temp_dir = self._create_temp_directory( ) 

108 try: 

109 self._clone_repository( location, temp_dir, tag_prefix ) 

110 if location.subdir: 

111 subdir_path = temp_dir / location.subdir 

112 if not subdir_path.exists( ): 

113 self._raise_subdir_not_found( 

114 location.subdir, source_spec ) 

115 result_path = subdir_path 

116 else: 

117 result_path = temp_dir 

118 except Exception as exception: 

119 # Clean up on failure 

120 __.shutil.rmtree( temp_dir, ignore_errors = True ) 

121 if isinstance( exception, __.DataSourceNoSupport ): 

122 raise 

123 raise GitCloneFailure( 

124 location.git_url, str( exception ) ) from exception 

125 else: 

126 return result_path 

127 

128 def _parse_git_url( self, source_spec: str ) -> GitLocation: 

129 ''' Parses source specification into Git URL, ref, and subdirectory. 

130 

131 Supports URL scheme mapping and fragment syntax for subdirectory 

132 specification. Also supports @ref syntax for Git references. 

133 ''' 

134 url_part = source_spec 

135 ref = None 

136 subdir = None 

137 if '#' in url_part: 

138 url_part, subdir = url_part.split( '#', 1 ) 

139 if '@' in url_part: 

140 url_part, ref = url_part.split( '@', 1 ) 

141 # Map URL schemes to Git URLs 

142 if url_part.startswith( 'github:' ): 

143 repo_path = url_part[ len( 'github:' ): ] 

144 git_url = f"https://github.com/{repo_path}.git" 

145 elif url_part.startswith( 'gitlab:' ): 

146 repo_path = url_part[ len( 'gitlab:' ): ] 

147 git_url = f"https://gitlab.com/{repo_path}.git" 

148 elif url_part.startswith( 'git+https:' ): 

149 git_url = url_part[ len( 'git+' ): ] 

150 elif url_part.startswith( 'https://github.com/' ): 

151 # Convert GitHub web URLs to Git URLs 

152 if url_part.endswith( '.git' ): 

153 git_url = url_part 

154 else: 

155 git_url = f"{url_part.rstrip( '/' )}.git" 

156 elif url_part.startswith( 'https://gitlab.com/' ): 

157 # Convert GitLab web URLs to Git URLs 

158 if url_part.endswith( '.git' ): 

159 git_url = url_part 

160 else: 

161 git_url = f"{url_part.rstrip( '/' )}.git" 

162 else: 

163 # Direct git URLs (git@github.com:user/repo.git) 

164 git_url = url_part 

165 

166 return GitLocation( git_url = git_url, ref = ref, subdir = subdir ) 

167 

168 def _create_temp_directory( self ) -> __.Path: 

169 ''' Creates temporary directory for repository cloning. ''' 

170 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' ) 

171 return __.Path( temp_dir ) 

172 

173 def _clone_repository( 

174 self, 

175 location: GitLocation, 

176 target_dir: __.Path, 

177 tag_prefix: __.Absential[ str ] = __.absent, 

178 ) -> None: 

179 ''' Clones Git repository using Dulwich with optimizations. 

180 

181 For GitHub/GitLab repositories without explicit ref, attempts 

182 API-based tag resolution followed by shallow clone. Falls back 

183 to standard full clone on any failure. 

184 ''' 

185 try: 

186 if location.ref is None: 

187 cloned = self._attempt_optimized_clone( 

188 location, target_dir, tag_prefix ) 

189 if cloned: return 

190 self._perform_standard_clone( location, target_dir, tag_prefix ) 

191 except Exception as exception: 

192 error_msg = str( exception ).lower( ) 

193 if location.ref is not None and ( 

194 'not found' in error_msg or 'does not exist' in error_msg 

195 ): 

196 raise GitRefAbsence( 

197 location.ref, location.git_url ) from exception 

198 raise GitCloneFailure( 

199 location.git_url, str( exception ) ) from exception 

200 

201 def _attempt_optimized_clone( 

202 self, 

203 location: GitLocation, 

204 target_dir: __.Path, 

205 tag_prefix: __.Absential[ str ] = __.absent, 

206 ) -> bool: 

207 ''' Attempts optimized clone using API and shallow clone. 

208 

209 Returns True if successful, False if optimization should fall 

210 back to standard clone. 

211 ''' 

212 latest_tag = self._resolve_latest_tag_via_api( 

213 location.git_url, tag_prefix ) 

214 if latest_tag is None: return False 

215 _scribe.info( 

216 f"Resolved latest tag '{latest_tag}' via API for repository: " 

217 f"{location.git_url}" ) 

218 try: 

219 self._perform_shallow_clone( 

220 location.git_url, target_dir, latest_tag ) 

221 except Exception: 

222 _scribe.info( 

223 f"Shallow clone failed, falling back to standard clone for " 

224 f"repository: {location.git_url}" ) 

225 return False 

226 else: 

227 _scribe.info( 

228 f"Performed shallow clone for tag '{latest_tag}' in " 

229 f"repository: {location.git_url}" ) 

230 return True 

231 

232 def _perform_shallow_clone( 

233 self, git_url: str, target_dir: __.Path, ref: str 

234 ) -> None: 

235 ''' Performs shallow clone of specific ref using Dulwich. 

236 

237 Uses depth=1 and branch parameters for efficient cloning. 

238 ''' 

239 with open( __.os.devnull, 'wb' ) as devnull: 

240 _dulwich_porcelain.clone( 

241 git_url, 

242 str( target_dir ), 

243 bare = False, 

244 depth = 1, 

245 branch = ref.encode( ), 

246 errstream = devnull, 

247 ) 

248 

249 def _perform_standard_clone( 

250 self, 

251 location: GitLocation, 

252 target_dir: __.Path, 

253 tag_prefix: __.Absential[ str ] = __.absent, 

254 ) -> None: 

255 ''' Performs standard full clone with optional ref checkout. 

256 

257 This is the fallback path for repositories that cannot use 

258 API optimization or when explicit ref is provided. 

259 ''' 

260 with open( __.os.devnull, 'wb' ) as devnull: 

261 _dulwich_porcelain.clone( 

262 location.git_url, 

263 str( target_dir ), 

264 bare = False, 

265 depth = None, 

266 errstream = devnull, 

267 ) 

268 if location.ref is None: 

269 latest_tag = self._get_latest_tag( target_dir, tag_prefix ) 

270 if latest_tag: 

271 _scribe.info( 

272 f"Selected latest tag '{latest_tag}' for repository: " 

273 f"{location.git_url}" ) 

274 self._checkout_ref( target_dir, latest_tag ) 

275 else: 

276 _scribe.info( 

277 f"No version tags found, using default branch for " 

278 f"repository: {location.git_url}" ) 

279 else: 

280 _scribe.info( 

281 f"Using explicit ref '{location.ref}' for repository: " 

282 f"{location.git_url}" ) 

283 self._checkout_ref( target_dir, location.ref ) 

284 

285 def _extract_version( 

286 self, 

287 tag_name: str, 

288 prefix: __.Absential[ str ] = __.absent, 

289 ) -> __.typx.Optional[ __.Version ]: 

290 ''' Extracts and parses semantic version from tag name. 

291 

292 If prefix is provided, only processes tags that start with the 

293 prefix and strips it before parsing. If prefix is absent, tries 

294 parsing the tag name directly. Returns None if tag cannot be 

295 parsed as a valid semantic version. 

296 ''' 

297 version_string = tag_name 

298 if not __.is_absent( prefix ): 

299 if not tag_name.startswith( prefix ): 

300 return None 

301 version_string = tag_name[ len( prefix ): ] 

302 try: 

303 return __.Version( version_string ) 

304 except __.InvalidVersion: 

305 return None 

306 

307 def _get_latest_tag( 

308 self, 

309 repo_dir: __.Path, 

310 tag_prefix: __.Absential[ str ] = __.absent, 

311 ) -> __.typx.Optional[ str ]: 

312 ''' Gets the latest tag from the repository by semantic version. 

313 

314 Optionally filters tags by prefix before selecting latest. 

315 Uses packaging.version.Version for semantic comparison. If no 

316 tags can be parsed as versions, returns None (falls back to 

317 default branch). 

318 ''' 

319 from dulwich.repo import Repo 

320 try: 

321 repo = Repo( str( repo_dir ) ) 

322 except Exception: 

323 return None 

324 try: 

325 tag_refs = repo.refs.as_dict( b"refs/tags" ) 

326 except Exception: 

327 return None 

328 if not tag_refs: 

329 return None 

330 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ] 

331 for tag_name_bytes, commit_sha in tag_refs.items( ): 

332 commit = self._get_tag_commit( repo, commit_sha ) 

333 if commit is not None: 

334 tag_name = tag_name_bytes.decode( 'utf-8' ) 

335 version = self._extract_version( tag_name, tag_prefix ) 

336 if version is not None: 

337 versioned_tags.append( ( version, tag_name ) ) 

338 if versioned_tags: 

339 versioned_tags.sort( reverse = True ) 

340 return versioned_tags[ 0 ][ 1 ] 

341 return None 

342 

343 def _get_tag_commit( 

344 self, repo: __.typx.Any, commit_sha: bytes 

345 ) -> __.typx.Any: 

346 ''' Gets commit object for a tag, handling annotated tags. ''' 

347 try: 

348 commit = repo[ commit_sha ] 

349 while hasattr( commit, 'object' ): 

350 # object attribute is a tuple (class, sha) 

351 commit = repo[ commit.object[ 1 ] ] 

352 except Exception: 

353 return None 

354 else: 

355 return commit 

356 

357 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None: 

358 ''' Checks out a specific reference by cloning with branch param. ''' 

359 from dulwich.repo import Repo 

360 try: 

361 repo = Repo( str( repo_dir ) ) 

362 except Exception as exception: 

363 raise GitRefAbsence( ref, str( repo_dir ) ) from exception 

364 ref_bytes = ref.encode( ) 

365 tag_ref = f"refs/tags/{ref}".encode( ) 

366 branch_ref = f"refs/heads/{ref}".encode( ) 

367 if tag_ref in repo.refs or branch_ref in repo.refs: 

368 return 

369 try: 

370 repo[ ref_bytes ] 

371 except KeyError: 

372 self._raise_ref_not_found( ref, str( repo_dir ) ) 

373 

374 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None: 

375 ''' Raises GitRefAbsence for invalid reference. ''' 

376 raise GitRefAbsence( ref, repo_dir ) 

377 

378 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None: 

379 ''' Raises GitSubdirectoryAbsence for missing subdirectory. ''' 

380 raise GitSubdirectoryAbsence( subdir, source_spec ) 

381 

382 def _detect_git_host( self, git_url: str ) -> __.typx.Optional[ str ]: 

383 ''' Detects Git hosting provider from URL. 

384 

385 Returns 'github', 'gitlab', or None for other providers. 

386 ''' 

387 if git_url.startswith( 'git@' ): 

388 parts = git_url.split( '@', 1 ) 

389 if len( parts ) > 1: 

390 host_part = parts[ 1 ].split( ':', 1 )[ 0 ] 

391 if 'github.com' in host_part: return 'github' 

392 if 'gitlab.com' in host_part: return 'gitlab' 

393 else: 

394 parsed = __.urlparse.urlparse( git_url ) 

395 hostname = parsed.netloc.lower( ) 

396 if 'github.com' in hostname: return 'github' 

397 if 'gitlab.com' in hostname: return 'gitlab' 

398 return None 

399 

400 def _acquire_github_authentication_token( 

401 self 

402 ) -> __.typx.Optional[ str ]: 

403 ''' Acquires GitHub authentication token from environment or gh CLI. 

404 

405 Checks GITHUB_TOKEN environment variable first, then attempts 

406 to retrieve token from gh CLI. Returns None if neither source 

407 is available. 

408 ''' 

409 token = __.os.environ.get( 'GITHUB_TOKEN' ) 

410 if token: return token 

411 try: 

412 result = __.subprocess.run( 

413 [ 'gh', 'auth', 'token' ], 

414 capture_output = True, 

415 text = True, 

416 timeout = 5, 

417 check = False ) 

418 if result.returncode == 0: 

419 return result.stdout.strip( ) 

420 except ( FileNotFoundError, __.subprocess.TimeoutExpired ): 

421 pass 

422 return None 

423 

424 def _acquire_gitlab_authentication_token( 

425 self 

426 ) -> __.typx.Optional[ str ]: 

427 ''' Acquires GitLab authentication token from environment. 

428 

429 Checks GITLAB_TOKEN environment variable. Returns None if not 

430 available. 

431 ''' 

432 return __.os.environ.get( 'GITLAB_TOKEN' ) 

433 

434 def _retrieve_github_tags( 

435 self, owner: str, repository: str 

436 ) -> __.typx.Optional[ list[ GitApiTag ] ]: 

437 ''' Retrieves tags from GitHub API. 

438 

439 Returns list of tag dictionaries or None on failure. Each tag 

440 contains 'name' and 'commit' fields. 

441 ''' 

442 token = self._acquire_github_authentication_token( ) 

443 url = f"https://api.github.com/repos/{owner}/{repository}/tags" 

444 request = __.urlreq.Request( url ) 

445 if token: 

446 request.add_header( 'Authorization', f"token {token}" ) 

447 request.add_header( 'Accept', 'application/vnd.github.v3+json' ) 

448 try: 

449 with __.urlreq.urlopen( request, timeout = 10 ) as response: 

450 return __.json.loads( response.read( ) ) 

451 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ): 

452 return None 

453 

454 def _retrieve_gitlab_tags( 

455 self, owner: str, repository: str 

456 ) -> __.typx.Optional[ list[ GitApiTag ] ]: 

457 ''' Retrieves tags from GitLab API. 

458 

459 Returns list of tag dictionaries or None on failure. Each tag 

460 contains 'name' and 'commit' fields. 

461 ''' 

462 token = self._acquire_gitlab_authentication_token( ) 

463 project_path = f"{owner}%2F{repository}" 

464 url = ( 

465 f"https://gitlab.com/api/v4/projects/{project_path}/" 

466 f"repository/tags" ) 

467 request = __.urlreq.Request( url ) 

468 if token: 

469 request.add_header( 'PRIVATE-TOKEN', token ) 

470 try: 

471 with __.urlreq.urlopen( request, timeout = 10 ) as response: 

472 return __.json.loads( response.read( ) ) 

473 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ): 

474 return None 

475 

476 def _extract_repository_information( 

477 self, git_url: str 

478 ) -> __.typx.Optional[ tuple[ str, str ] ]: 

479 ''' Extracts owner and repository name from Git URL. 

480 

481 Returns tuple of (owner, repository) or None if URL format is 

482 not recognized. Handles both SSH (git@host:owner/repo) and 

483 HTTPS (https://host/owner/repo) formats. 

484 ''' 

485 host = self._detect_git_host( git_url ) 

486 if host is None: return None 

487 path = None 

488 if git_url.startswith( 'git@' ): 

489 parts = git_url.split( ':', maxsplit = 1 ) 

490 path = parts[ 1 ] if len( parts ) > 1 else None 

491 else: 

492 parsed = __.urlparse.urlparse( git_url ) 

493 path = parsed.path.lstrip( '/' ) 

494 if path is None: return None 

495 path = path.removesuffix( '.git' ) 

496 path_parts = path.split( '/', maxsplit = 1 ) 

497 if len( path_parts ) > 1: 

498 return ( path_parts[ 0 ], path_parts[ 1 ] ) 

499 return None 

500 

501 def _select_latest_tag_from_api( 

502 self, 

503 tags: list[ GitApiTag ], 

504 tag_prefix: __.Absential[ str ] = __.absent, 

505 ) -> __.typx.Optional[ str ]: 

506 ''' Selects latest tag from API results by semantic version. 

507 

508 Filters by tag prefix if provided, then selects tag with 

509 highest semantic version. Returns None if no valid version 

510 tags are found. 

511 ''' 

512 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ] 

513 for tag in tags: 

514 tag_name = tag[ 'name' ] 

515 version = self._extract_version( tag_name, tag_prefix ) 

516 if version is not None: 

517 versioned_tags.append( ( version, tag_name ) ) 

518 if versioned_tags: 

519 versioned_tags.sort( reverse = True ) 

520 return versioned_tags[ 0 ][ 1 ] 

521 return None 

522 

523 def _resolve_latest_tag_via_api( 

524 self, 

525 git_url: str, 

526 tag_prefix: __.Absential[ str ] = __.absent, 

527 ) -> __.typx.Optional[ str ]: 

528 ''' Resolves latest tag using GitHub or GitLab API. 

529 

530 Returns tag name or None if API resolution fails or is not 

531 applicable. 

532 ''' 

533 host = self._detect_git_host( git_url ) 

534 if host is None: return None 

535 repo_info = self._extract_repository_information( git_url ) 

536 if repo_info is None: return None 

537 owner, repository = repo_info 

538 if host == 'github': 

539 tags = self._retrieve_github_tags( owner, repository ) 

540 elif host == 'gitlab': 

541 tags = self._retrieve_gitlab_tags( owner, repository ) 

542 else: 

543 return None 

544 if tags is None: return None 

545 return self._select_latest_tag_from_api( tags, tag_prefix )