Coverage for sources/agentsmgr/sources/git.py: 11%

260 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 02:08 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Git-based source handler with Dulwich. 

22 

23 This module provides source resolution for Git repositories, supporting 

24 various URL schemes and subdirectory specifications via fragment syntax. 

25''' 

26 

27 

28import dulwich.porcelain as _dulwich_porcelain 

29 

30from . import __ 

31from . import base as _base 

32 

33 

34GitApiTag: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

35 

36 

37_scribe = __.provide_scribe( __name__ ) 

38 

39 

40class GitLocation( __.immut.DataclassObject ): 

41 ''' Git source location with URL, optional ref, and optional subdir. ''' 

42 git_url: str 

43 ref: __.typx.Optional[ str ] = None 

44 subdir: __.typx.Optional[ str ] = None 

45 

46 

47class GitCloneFailure( __.Omnierror, OSError ): 

48 ''' Git repository cloning operation failure. ''' 

49 

50 def __init__( self, git_url: str, reason: str = '' ): 

51 self.git_url = git_url 

52 self.reason = reason 

53 message = f"Failed to clone Git repository: {git_url}" 

54 if reason: message = f"{message} ({reason})" 

55 super( ).__init__( message ) 

56 

57 

58class GitSubdirectoryAbsence( __.DataSourceNoSupport ): 

59 ''' Git repository subdirectory absence. ''' 

60 

61 def __init__( self, subdir: str, source_spec: str ): 

62 self.subdir = subdir 

63 self.source_spec = source_spec 

64 message = ( 

65 f"Subdirectory '{subdir}' not found in repository: {source_spec}" ) 

66 super( ).__init__( message ) 

67 

68 

69class GitRefAbsence( __.DataSourceNoSupport ): 

70 ''' Git reference absence in repository. ''' 

71 

72 def __init__( self, ref: str, git_url: str ): 

73 self.ref = ref 

74 self.git_url = git_url 

75 message = f"Git ref '{ref}' not found in repository: {git_url}" 

76 super( ).__init__( message ) 

77 

78 

79@_base.source_handler([ 

80 'github:', 'gitlab:', 'git+https:', 

81 'https://github.com/', 'https://gitlab.com/', 'git@' 

82]) 

83class GitSourceHandler: 

84 ''' Handles Git repository source resolution with Dulwich. 

85 

86 Supports multiple URL schemes and converts them to Git URLs for 

87 cloning. Implements fragment syntax for subdirectory specification. 

88 ''' 

89 

90 def resolve( 

91 self, 

92 source_spec: str, 

93 tag_prefix: __.typx.Annotated[ 

94 __.Absential[ str ], 

95 __.ddoc.Doc( 

96 "Prefix for filtering version tags when no explicit ref " 

97 "is specified. Only tags starting with this prefix will be " 

98 "considered, and the prefix will be stripped before version " 

99 "parsing." ), 

100 ] = __.absent, 

101 ) -> __.Path: 

102 ''' Resolves Git source to local temporary directory. 

103 

104 Clones the repository to a temporary location and returns the 

105 path to the specified subdirectory or repository root. 

106 ''' 

107 location = self._parse_git_url( source_spec ) 

108 temp_dir = self._create_temp_directory( ) 

109 try: 

110 self._clone_repository( location, temp_dir, tag_prefix ) 

111 if location.subdir: 

112 subdir_path = temp_dir / location.subdir 

113 if not subdir_path.exists( ): 

114 self._raise_subdir_not_found( 

115 location.subdir, source_spec ) 

116 result_path = subdir_path 

117 else: 

118 result_path = temp_dir 

119 except Exception as exception: 

120 # Clean up on failure 

121 __.shutil.rmtree( temp_dir, ignore_errors = True ) 

122 if isinstance( exception, __.DataSourceNoSupport ): 

123 raise 

124 raise GitCloneFailure( 

125 location.git_url, str( exception ) ) from exception 

126 else: 

127 return result_path 

128 

129 def _parse_git_url( self, source_spec: str ) -> GitLocation: 

130 ''' Parses source specification into Git URL, ref, and subdirectory. 

131 

132 Supports URL scheme mapping and fragment syntax for subdirectory 

133 specification. Also supports @ref syntax for Git references. 

134 ''' 

135 url_part = source_spec 

136 ref = None 

137 subdir = None 

138 if '#' in url_part: 

139 url_part, subdir = url_part.split( '#', 1 ) 

140 if '@' in url_part: 

141 url_part, ref = url_part.split( '@', 1 ) 

142 # Map URL schemes to Git URLs 

143 if url_part.startswith( 'github:' ): 

144 repo_path = url_part[ len( 'github:' ): ] 

145 git_url = f"https://github.com/{repo_path}.git" 

146 elif url_part.startswith( 'gitlab:' ): 

147 repo_path = url_part[ len( 'gitlab:' ): ] 

148 git_url = f"https://gitlab.com/{repo_path}.git" 

149 elif url_part.startswith( 'git+https:' ): 

150 git_url = url_part[ len( 'git+' ): ] 

151 elif url_part.startswith( 'https://github.com/' ): 

152 # Convert GitHub web URLs to Git URLs 

153 if url_part.endswith( '.git' ): 

154 git_url = url_part 

155 else: 

156 git_url = f"{url_part.rstrip( '/' )}.git" 

157 elif url_part.startswith( 'https://gitlab.com/' ): 

158 # Convert GitLab web URLs to Git URLs 

159 if url_part.endswith( '.git' ): 

160 git_url = url_part 

161 else: 

162 git_url = f"{url_part.rstrip( '/' )}.git" 

163 else: 

164 # Direct git URLs (git@github.com:user/repo.git) 

165 git_url = url_part 

166 

167 return GitLocation( git_url = git_url, ref = ref, subdir = subdir ) 

168 

169 def _create_temp_directory( self ) -> __.Path: 

170 ''' Creates temporary directory for repository cloning. ''' 

171 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' ) 

172 return __.Path( temp_dir ) 

173 

174 def _clone_repository( 

175 self, 

176 location: GitLocation, 

177 target_dir: __.Path, 

178 tag_prefix: __.Absential[ str ] = __.absent, 

179 ) -> None: 

180 ''' Clones Git repository using Dulwich with optimizations. 

181 

182 For GitHub/GitLab repositories without explicit ref, attempts 

183 API-based tag resolution followed by shallow clone. Falls back 

184 to standard full clone on any failure. 

185 ''' 

186 try: 

187 if location.ref is None: 

188 cloned = self._attempt_optimized_clone( 

189 location, target_dir, tag_prefix ) 

190 if cloned: return 

191 self._perform_standard_clone( location, target_dir, tag_prefix ) 

192 except Exception as exception: 

193 error_msg = str( exception ).lower( ) 

194 if location.ref is not None and ( 

195 'not found' in error_msg or 'does not exist' in error_msg 

196 ): 

197 raise GitRefAbsence( 

198 location.ref, location.git_url ) from exception 

199 raise GitCloneFailure( 

200 location.git_url, str( exception ) ) from exception 

201 

202 def _attempt_optimized_clone( 

203 self, 

204 location: GitLocation, 

205 target_dir: __.Path, 

206 tag_prefix: __.Absential[ str ] = __.absent, 

207 ) -> bool: 

208 ''' Attempts optimized clone using API and shallow clone. 

209 

210 Returns True if successful, False if optimization should fall 

211 back to standard clone. 

212 ''' 

213 latest_tag = self._resolve_latest_tag_via_api( 

214 location.git_url, tag_prefix ) 

215 if latest_tag is None: return False 

216 _scribe.info( 

217 f"Resolved latest tag '{latest_tag}' via API for repository: " 

218 f"{location.git_url}" ) 

219 try: 

220 self._perform_shallow_clone( 

221 location.git_url, target_dir, latest_tag ) 

222 except Exception: 

223 _scribe.info( 

224 f"Shallow clone failed, falling back to standard clone for " 

225 f"repository: {location.git_url}" ) 

226 return False 

227 else: 

228 _scribe.info( 

229 f"Performed shallow clone for tag '{latest_tag}' in " 

230 f"repository: {location.git_url}" ) 

231 return True 

232 

233 def _perform_shallow_clone( 

234 self, git_url: str, target_dir: __.Path, ref: str 

235 ) -> None: 

236 ''' Performs shallow clone of specific ref using Dulwich. 

237 

238 Uses depth=1 and branch parameters for efficient cloning. 

239 ''' 

240 with open( __.os.devnull, 'wb' ) as devnull: 

241 _dulwich_porcelain.clone( 

242 git_url, 

243 str( target_dir ), 

244 bare = False, 

245 depth = 1, 

246 branch = ref.encode( ), 

247 errstream = devnull, 

248 ) 

249 

250 def _perform_standard_clone( 

251 self, 

252 location: GitLocation, 

253 target_dir: __.Path, 

254 tag_prefix: __.Absential[ str ] = __.absent, 

255 ) -> None: 

256 ''' Performs standard full clone with optional ref checkout. 

257 

258 This is the fallback path for repositories that cannot use 

259 API optimization or when explicit ref is provided. 

260 ''' 

261 with open( __.os.devnull, 'wb' ) as devnull: 

262 _dulwich_porcelain.clone( 

263 location.git_url, 

264 str( target_dir ), 

265 bare = False, 

266 depth = None, 

267 errstream = devnull, 

268 ) 

269 if location.ref is None: 

270 latest_tag = self._get_latest_tag( target_dir, tag_prefix ) 

271 if latest_tag: 

272 _scribe.info( 

273 f"Selected latest tag '{latest_tag}' for repository: " 

274 f"{location.git_url}" ) 

275 self._checkout_ref( target_dir, latest_tag ) 

276 else: 

277 _scribe.info( 

278 f"No version tags found, using default branch for " 

279 f"repository: {location.git_url}" ) 

280 else: 

281 _scribe.info( 

282 f"Using explicit ref '{location.ref}' for repository: " 

283 f"{location.git_url}" ) 

284 self._checkout_ref( target_dir, location.ref ) 

285 

286 def _extract_version( 

287 self, 

288 tag_name: str, 

289 prefix: __.Absential[ str ] = __.absent, 

290 ) -> __.typx.Optional[ __.Version ]: 

291 ''' Extracts and parses semantic version from tag name. 

292 

293 If prefix is provided, only processes tags that start with the 

294 prefix and strips it before parsing. If prefix is absent, tries 

295 parsing the tag name directly. Returns None if tag cannot be 

296 parsed as a valid semantic version. 

297 ''' 

298 version_string = tag_name 

299 if not __.is_absent( prefix ): 

300 if not tag_name.startswith( prefix ): 

301 return None 

302 version_string = tag_name[ len( prefix ): ] 

303 try: 

304 return __.Version( version_string ) 

305 except __.InvalidVersion: 

306 return None 

307 

308 def _get_latest_tag( 

309 self, 

310 repo_dir: __.Path, 

311 tag_prefix: __.Absential[ str ] = __.absent, 

312 ) -> __.typx.Optional[ str ]: 

313 ''' Gets the latest tag from the repository by semantic version. 

314 

315 Optionally filters tags by prefix before selecting latest. 

316 Uses packaging.version.Version for semantic comparison. If no 

317 tags can be parsed as versions, returns None (falls back to 

318 default branch). 

319 ''' 

320 from dulwich.repo import Repo 

321 try: 

322 repo = Repo( str( repo_dir ) ) 

323 except Exception: 

324 return None 

325 try: 

326 tag_refs = repo.refs.as_dict( b"refs/tags" ) 

327 except Exception: 

328 return None 

329 if not tag_refs: 

330 return None 

331 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ] 

332 for tag_name_bytes, commit_sha in tag_refs.items( ): 

333 commit = self._get_tag_commit( repo, commit_sha ) 

334 if commit is not None: 

335 tag_name = tag_name_bytes.decode( 'utf-8' ) 

336 version = self._extract_version( tag_name, tag_prefix ) 

337 if version is not None: 

338 versioned_tags.append( ( version, tag_name ) ) 

339 if versioned_tags: 

340 versioned_tags.sort( reverse = True ) 

341 return versioned_tags[ 0 ][ 1 ] 

342 return None 

343 

344 def _get_tag_commit( 

345 self, repo: __.typx.Any, commit_sha: bytes 

346 ) -> __.typx.Any: 

347 ''' Gets commit object for a tag, handling annotated tags. ''' 

348 try: 

349 commit = repo[ commit_sha ] 

350 while hasattr( commit, 'object' ): 

351 # object attribute is a tuple (class, sha) 

352 commit = repo[ commit.object[ 1 ] ] 

353 except Exception: 

354 return None 

355 else: 

356 return commit 

357 

358 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None: 

359 ''' Checks out a specific reference by cloning with branch param. ''' 

360 from dulwich.repo import Repo 

361 try: 

362 repo = Repo( str( repo_dir ) ) 

363 except Exception as exception: 

364 raise GitRefAbsence( ref, str( repo_dir ) ) from exception 

365 ref_bytes = ref.encode( ) 

366 tag_ref = f"refs/tags/{ref}".encode( ) 

367 branch_ref = f"refs/heads/{ref}".encode( ) 

368 if tag_ref in repo.refs or branch_ref in repo.refs: 

369 return 

370 try: 

371 repo[ ref_bytes ] 

372 except KeyError: 

373 self._raise_ref_not_found( ref, str( repo_dir ) ) 

374 

375 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None: 

376 ''' Raises GitRefAbsence for invalid reference. ''' 

377 raise GitRefAbsence( ref, repo_dir ) 

378 

379 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None: 

380 ''' Raises GitSubdirectoryAbsence for missing subdirectory. ''' 

381 raise GitSubdirectoryAbsence( subdir, source_spec ) 

382 

383 def _detect_git_host( self, git_url: str ) -> __.typx.Optional[ str ]: 

384 ''' Detects Git hosting provider from URL. 

385 

386 Returns 'github', 'gitlab', or None for other providers. 

387 ''' 

388 if git_url.startswith( 'git@' ): 

389 parts = git_url.split( '@', 1 ) 

390 if len( parts ) > 1: 

391 host_part = parts[ 1 ].split( ':', 1 )[ 0 ] 

392 if 'github.com' in host_part: return 'github' 

393 if 'gitlab.com' in host_part: return 'gitlab' 

394 else: 

395 parsed = __.urlparse.urlparse( git_url ) 

396 hostname = parsed.netloc.lower( ) 

397 if 'github.com' in hostname: return 'github' 

398 if 'gitlab.com' in hostname: return 'gitlab' 

399 return None 

400 

401 def _acquire_github_authentication_token( 

402 self 

403 ) -> __.typx.Optional[ str ]: 

404 ''' Acquires GitHub authentication token from environment or gh CLI. 

405 

406 Checks GITHUB_TOKEN environment variable first, then attempts 

407 to retrieve token from gh CLI. Returns None if neither source 

408 is available. 

409 ''' 

410 token = __.os.environ.get( 'GITHUB_TOKEN' ) 

411 if token: return token 

412 try: 

413 result = __.subprocess.run( 

414 [ 'gh', 'auth', 'token' ], 

415 capture_output = True, 

416 text = True, 

417 timeout = 5, 

418 check = False ) 

419 if result.returncode == 0: 

420 return result.stdout.strip( ) 

421 except ( FileNotFoundError, __.subprocess.TimeoutExpired ): 

422 pass 

423 return None 

424 

425 def _acquire_gitlab_authentication_token( 

426 self 

427 ) -> __.typx.Optional[ str ]: 

428 ''' Acquires GitLab authentication token from environment. 

429 

430 Checks GITLAB_TOKEN environment variable. Returns None if not 

431 available. 

432 ''' 

433 return __.os.environ.get( 'GITLAB_TOKEN' ) 

434 

435 def _retrieve_github_tags( 

436 self, owner: str, repository: str 

437 ) -> __.typx.Optional[ list[ GitApiTag ] ]: 

438 ''' Retrieves tags from GitHub API. 

439 

440 Returns list of tag dictionaries or None on failure. Each tag 

441 contains 'name' and 'commit' fields. 

442 ''' 

443 token = self._acquire_github_authentication_token( ) 

444 url = f"https://api.github.com/repos/{owner}/{repository}/tags" 

445 request = __.urlreq.Request( url ) 

446 if token: 

447 request.add_header( 'Authorization', f"token {token}" ) 

448 request.add_header( 'Accept', 'application/vnd.github.v3+json' ) 

449 try: 

450 with __.urlreq.urlopen( request, timeout = 10 ) as response: 

451 return __.json.loads( response.read( ) ) 

452 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ): 

453 return None 

454 

455 def _retrieve_gitlab_tags( 

456 self, owner: str, repository: str 

457 ) -> __.typx.Optional[ list[ GitApiTag ] ]: 

458 ''' Retrieves tags from GitLab API. 

459 

460 Returns list of tag dictionaries or None on failure. Each tag 

461 contains 'name' and 'commit' fields. 

462 ''' 

463 token = self._acquire_gitlab_authentication_token( ) 

464 project_path = f"{owner}%2F{repository}" 

465 url = ( 

466 f"https://gitlab.com/api/v4/projects/{project_path}/" 

467 f"repository/tags" ) 

468 request = __.urlreq.Request( url ) 

469 if token: 

470 request.add_header( 'PRIVATE-TOKEN', token ) 

471 try: 

472 with __.urlreq.urlopen( request, timeout = 10 ) as response: 

473 return __.json.loads( response.read( ) ) 

474 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ): 

475 return None 

476 

477 def _extract_repository_information( 

478 self, git_url: str 

479 ) -> __.typx.Optional[ tuple[ str, str ] ]: 

480 ''' Extracts owner and repository name from Git URL. 

481 

482 Returns tuple of (owner, repository) or None if URL format is 

483 not recognized. Handles both SSH (git@host:owner/repo) and 

484 HTTPS (https://host/owner/repo) formats. 

485 ''' 

486 host = self._detect_git_host( git_url ) 

487 if host is None: return None 

488 path = None 

489 if git_url.startswith( 'git@' ): 

490 parts = git_url.split( ':', maxsplit = 1 ) 

491 path = parts[ 1 ] if len( parts ) > 1 else None 

492 else: 

493 parsed = __.urlparse.urlparse( git_url ) 

494 path = parsed.path.lstrip( '/' ) 

495 if path is None: return None 

496 path = path.removesuffix( '.git' ) 

497 path_parts = path.split( '/', maxsplit = 1 ) 

498 if len( path_parts ) > 1: 

499 return ( path_parts[ 0 ], path_parts[ 1 ] ) 

500 return None 

501 

502 def _select_latest_tag_from_api( 

503 self, 

504 tags: list[ GitApiTag ], 

505 tag_prefix: __.Absential[ str ] = __.absent, 

506 ) -> __.typx.Optional[ str ]: 

507 ''' Selects latest tag from API results by semantic version. 

508 

509 Filters by tag prefix if provided, then selects tag with 

510 highest semantic version. Returns None if no valid version 

511 tags are found. 

512 ''' 

513 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ] 

514 for tag in tags: 

515 tag_name = tag[ 'name' ] 

516 version = self._extract_version( tag_name, tag_prefix ) 

517 if version is not None: 

518 versioned_tags.append( ( version, tag_name ) ) 

519 if versioned_tags: 

520 versioned_tags.sort( reverse = True ) 

521 return versioned_tags[ 0 ][ 1 ] 

522 return None 

523 

524 def _resolve_latest_tag_via_api( 

525 self, 

526 git_url: str, 

527 tag_prefix: __.Absential[ str ] = __.absent, 

528 ) -> __.typx.Optional[ str ]: 

529 ''' Resolves latest tag using GitHub or GitLab API. 

530 

531 Returns tag name or None if API resolution fails or is not 

532 applicable. 

533 ''' 

534 host = self._detect_git_host( git_url ) 

535 if host is None: return None 

536 repo_info = self._extract_repository_information( git_url ) 

537 if repo_info is None: return None 

538 owner, repository = repo_info 

539 if host == 'github': 

540 tags = self._retrieve_github_tags( owner, repository ) 

541 elif host == 'gitlab': 

542 tags = self._retrieve_gitlab_tags( owner, repository ) 

543 else: 

544 return None 

545 if tags is None: return None 

546 return self._select_latest_tag_from_api( tags, tag_prefix )