Coverage for sources / agentsmgr / sources / git.py: 42%

312 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 02:32 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Git-based source handler with Dulwich. 

22 

23 This module provides source resolution for Git repositories, supporting 

24 various URL schemes and subdirectory specifications via fragment syntax. 

25''' 

26 

27 

28import dulwich.porcelain as _dulwich_porcelain 

29 

30from . import __ 

31from . import base as _base 

32 

33 

34GitApiTag: __.typx.TypeAlias = __.cabc.Mapping[ str, __.typx.Any ] 

35 

36 

37_scribe = __.provide_scribe( __name__ ) 

38 

39_WINDOWS_ABSOLUTE_PREFIX_LENGTH = 3 

40 

41 

42class GitLocation( __.immut.DataclassObject ): 

43 ''' Git source location with URL, optional ref, and optional subdir. ''' 

44 git_url: str 

45 ref: __.typx.Optional[ str ] = None 

46 subdir: __.typx.Optional[ str ] = None 

47 

48 

49class GitCloneFailure( __.Omnierror, OSError ): 

50 ''' Git repository cloning operation failure. ''' 

51 

52 def __init__( self, git_url: str, reason: str = '' ): 

53 self.git_url = git_url 

54 self.reason = reason 

55 message = f"Failed to clone Git repository: {git_url}" 

56 if reason: message = f"{message} ({reason})" 

57 super( ).__init__( message ) 

58 

59 

60class GitSubdirectoryAbsence( __.DataSourceNoSupport ): 

61 ''' Git repository subdirectory absence. ''' 

62 

63 def __init__( self, subdir: str, source_spec: str ): 

64 self.subdir = subdir 

65 self.source_spec = source_spec 

66 message = ( 

67 f"Subdirectory '{subdir}' not found in repository: {source_spec}" ) 

68 super( ).__init__( message ) 

69 

70 

71class GitRefAbsence( __.DataSourceNoSupport ): 

72 ''' Git reference absence in repository. ''' 

73 

74 def __init__( self, ref: str, git_url: str ): 

75 self.ref = ref 

76 self.git_url = git_url 

77 message = f"Git ref '{ref}' not found in repository: {git_url}" 

78 super( ).__init__( message ) 

79 

80 

81@_base.source_handler([ 

82 'github', 'gitlab', 'git+https', 'https', 'git@' 

83]) 

84class GitSourceHandler: 

85 ''' Handles Git repository source resolution with Dulwich. 

86 

87 Supports multiple URL schemes and converts them to Git URLs for 

88 cloning. Implements fragment syntax for subdirectory specification. 

89 ''' 

90 

91 def resolve( 

92 self, 

93 source_spec: str, 

94 tag_prefix: __.typx.Annotated[ 

95 __.Absential[ str ], 

96 __.ddoc.Doc( 

97 "Prefix for filtering version tags when no explicit ref " 

98 "is specified. Only tags starting with this prefix will be " 

99 "considered, and the prefix will be stripped before version " 

100 "parsing." ), 

101 ] = __.absent, 

102 ) -> __.Path: 

103 ''' Resolves Git source to local temporary directory. 

104 

105 Clones the repository to a temporary location and returns the 

106 path to the specified subdirectory or repository root. 

107 ''' 

108 location = self._parse_git_url( source_spec ) 

109 temp_dir = self._create_temp_directory( ) 

110 try: 

111 self._clone_repository( location, temp_dir, tag_prefix ) 

112 if location.subdir: 

113 subdir_path = temp_dir / location.subdir 

114 if not subdir_path.exists( ): 

115 self._raise_subdir_not_found( 

116 location.subdir, source_spec ) 

117 result_path = subdir_path 

118 else: 

119 result_path = temp_dir 

120 except Exception as exception: 

121 # Clean up on failure 

122 __.shutil.rmtree( temp_dir, ignore_errors = True ) 

123 if isinstance( exception, __.DataSourceNoSupport ): 

124 raise 

125 raise GitCloneFailure( 

126 location.git_url, str( exception ) ) from exception 

127 else: 

128 return result_path 

129 

130 def _parse_git_url( self, source_spec: str ) -> GitLocation: 

131 ''' Parses source specification into Git URL, ref, and subdirectory. 

132 

133 Supports URL scheme mapping and fragment syntax for subdirectory 

134 specification. Also supports @ref syntax for Git references. 

135 ''' 

136 url_part = source_spec 

137 ref = None 

138 subdir = None 

139 if '#' in url_part: 

140 url_part, subdir = url_part.split( '#', 1 ) 

141 self._validate_subdirectory_fragment( subdir, source_spec ) 

142 ref_separator = self._locate_ref_separator( url_part ) 

143 if ref_separator is not None: 

144 ref = url_part[ ref_separator + 1 : ] 

145 if not ref: 

146 self._raise_invalid_source_spec( 

147 source_spec, "empty ref after '@'" ) 

148 url_part = url_part[ :ref_separator ] 

149 if not url_part: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 self._raise_invalid_source_spec( 

151 source_spec, "missing repository URL" ) 

152 git_url = self._normalize_git_url( url_part ) 

153 return GitLocation( git_url = git_url, ref = ref, subdir = subdir ) 

154 

155 def _is_absolute_subdirectory_fragment( self, subdir: str ) -> bool: 

156 ''' Detects absolute path fragments across POSIX and Windows forms. ''' 

157 if subdir.startswith( ( '/', '\\' ) ): 

158 return True 

159 return ( 

160 len( subdir ) >= _WINDOWS_ABSOLUTE_PREFIX_LENGTH 

161 and subdir[ 0 ].isalpha( ) 

162 and subdir[ 1 ] == ':' 

163 and subdir[ 2 ] in ( '/', '\\' ) 

164 ) 

165 

166 def _locate_ref_separator( 

167 self, url_part: str 

168 ) -> __.typx.Optional[ int ]: 

169 ''' Locates @ref separator while preserving URL authentication syntax. 

170 

171 Distinguishes ref suffixes from '@' used in SSH URLs 

172 (git@host:path) and URL authority userinfo (user@host). 

173 ''' 

174 separator = url_part.rfind( '@' ) 

175 if separator < 0: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 return None 

177 if url_part.startswith( 'git@' ): 

178 host_separator = url_part.find( ':' ) 

179 if host_separator < 0 or separator < host_separator: 

180 return None 

181 return separator 

182 scheme_separator = url_part.find( '://' ) 

183 if scheme_separator >= 0: 183 ↛ 190line 183 didn't jump to line 190 because the condition on line 183 was always true

184 authority_start = scheme_separator + 3 

185 authority_end = url_part.find( '/', authority_start ) 

186 if authority_end < 0: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 authority_end = len( url_part ) 

188 if separator < authority_end: 

189 return None 

190 return separator 

191 

192 def _normalize_git_url( self, url_part: str ) -> str: 

193 ''' Normalizes shorthand and web URLs to clone-ready Git URLs. ''' 

194 if url_part.startswith( 'github:' ): 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 repo_path = url_part[ len( 'github:' ): ] 

196 return f"https://github.com/{repo_path}.git" 

197 if url_part.startswith( 'gitlab:' ): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 repo_path = url_part[ len( 'gitlab:' ): ] 

199 return f"https://gitlab.com/{repo_path}.git" 

200 if url_part.startswith( 'git+https:' ): 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 return url_part[ len( 'git+' ): ] 

202 if url_part.startswith( 'https://github.com/' ): 

203 return self._normalize_git_web_url( url_part ) 

204 if url_part.startswith( 'https://gitlab.com/' ): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 return self._normalize_git_web_url( url_part ) 

206 # Direct git URLs (git@github.com:user/repo.git) 

207 return url_part 

208 

209 def _normalize_git_web_url( self, url_part: str ) -> str: 

210 ''' Converts GitHub/GitLab web URLs to clone URL form. ''' 

211 if url_part.endswith( '.git' ): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 return url_part 

213 return f"{url_part.rstrip( '/' )}.git" 

214 

215 def _raise_invalid_source_spec( 

216 self, source_spec: str, reason: str 

217 ) -> None: 

218 ''' Raises DataSourceNoSupport with stable malformed-spec details. ''' 

219 source_spec_with_reason = ( 

220 f"{source_spec} (invalid Git source specification: {reason})" 

221 ) 

222 raise __.DataSourceNoSupport( source_spec_with_reason ) 

223 

224 def _validate_subdirectory_fragment( 

225 self, subdir: str, source_spec: str 

226 ) -> None: 

227 ''' Validates parsed #subdir fragment for safety and structure. ''' 

228 if not subdir: 

229 self._raise_invalid_source_spec( 

230 source_spec, "empty subdirectory fragment after '#'" ) 

231 if self._is_absolute_subdirectory_fragment( subdir ): 

232 self._raise_invalid_source_spec( 

233 source_spec, "absolute subdirectory fragments are not allowed" 

234 ) 

235 subdir_path = __.Path( subdir ) 

236 if '..' in subdir_path.parts: 

237 self._raise_invalid_source_spec( 

238 source_spec, "parent-path traversal in subdirectory is " 

239 "not allowed" 

240 ) 

241 

242 def _create_temp_directory( self ) -> __.Path: 

243 ''' Creates temporary directory for repository cloning. ''' 

244 temp_dir = __.tempfile.mkdtemp( prefix = 'agentsmgr-git-' ) 

245 return __.Path( temp_dir ) 

246 

247 def _clone_repository( 

248 self, 

249 location: GitLocation, 

250 target_dir: __.Path, 

251 tag_prefix: __.Absential[ str ] = __.absent, 

252 ) -> None: 

253 ''' Clones Git repository using Dulwich with optimizations. 

254 

255 For GitHub/GitLab repositories without explicit ref, attempts 

256 API-based tag resolution followed by shallow clone. Falls back 

257 to standard full clone on any failure. 

258 ''' 

259 try: 

260 if location.ref is None: 

261 cloned = self._attempt_optimized_clone( 

262 location, target_dir, tag_prefix ) 

263 if cloned: return 

264 self._perform_standard_clone( location, target_dir, tag_prefix ) 

265 except Exception as exception: 

266 error_msg = str( exception ).lower( ) 

267 if location.ref is not None and ( 

268 'not found' in error_msg or 'does not exist' in error_msg 

269 ): 

270 raise GitRefAbsence( 

271 location.ref, location.git_url ) from exception 

272 raise GitCloneFailure( 

273 location.git_url, str( exception ) ) from exception 

274 

275 def _attempt_optimized_clone( 

276 self, 

277 location: GitLocation, 

278 target_dir: __.Path, 

279 tag_prefix: __.Absential[ str ] = __.absent, 

280 ) -> bool: 

281 ''' Attempts optimized clone using API and shallow clone. 

282 

283 Returns True if successful, False if optimization should fall 

284 back to standard clone. 

285 ''' 

286 latest_tag = self._resolve_latest_tag_via_api( 

287 location.git_url, tag_prefix ) 

288 if latest_tag is None: return False 

289 _scribe.info( 

290 f"Resolved latest tag '{latest_tag}' via API for repository: " 

291 f"{location.git_url}" ) 

292 try: 

293 self._perform_shallow_clone( 

294 location.git_url, target_dir, latest_tag ) 

295 except Exception: 

296 _scribe.info( 

297 f"Shallow clone failed, falling back to standard clone for " 

298 f"repository: {location.git_url}" ) 

299 return False 

300 else: 

301 _scribe.info( 

302 f"Performed shallow clone for tag '{latest_tag}' in " 

303 f"repository: {location.git_url}" ) 

304 return True 

305 

306 def _perform_shallow_clone( 

307 self, git_url: str, target_dir: __.Path, ref: str 

308 ) -> None: 

309 ''' Performs shallow clone of specific ref using Dulwich. 

310 

311 Uses depth=1 and branch parameters for efficient cloning. 

312 ''' 

313 with open( __.os.devnull, 'wb' ) as devnull: 

314 _dulwich_porcelain.clone( 

315 git_url, 

316 str( target_dir ), 

317 bare = False, 

318 depth = 1, 

319 branch = ref.encode( ), 

320 errstream = devnull, 

321 ) 

322 

323 def _perform_standard_clone( 

324 self, 

325 location: GitLocation, 

326 target_dir: __.Path, 

327 tag_prefix: __.Absential[ str ] = __.absent, 

328 ) -> None: 

329 ''' Performs standard full clone with optional ref checkout. 

330 

331 This is the fallback path for repositories that cannot use 

332 API optimization or when explicit ref is provided. 

333 ''' 

334 with open( __.os.devnull, 'wb' ) as devnull: 

335 _dulwich_porcelain.clone( 

336 location.git_url, 

337 str( target_dir ), 

338 bare = False, 

339 depth = None, 

340 errstream = devnull, 

341 ) 

342 if location.ref is None: 

343 latest_tag = self._get_latest_tag( target_dir, tag_prefix ) 

344 if latest_tag: 344 ↛ 350line 344 didn't jump to line 350 because the condition on line 344 was always true

345 _scribe.info( 

346 f"Selected latest tag '{latest_tag}' for repository: " 

347 f"{location.git_url}" ) 

348 self._checkout_ref( target_dir, latest_tag ) 

349 else: 

350 _scribe.info( 

351 f"No version tags found, using default branch for " 

352 f"repository: {location.git_url}" ) 

353 else: 

354 _scribe.info( 

355 f"Using explicit ref '{location.ref}' for repository: " 

356 f"{location.git_url}" ) 

357 self._checkout_ref( target_dir, location.ref ) 

358 

359 def _extract_version( 

360 self, 

361 tag_name: str, 

362 prefix: __.Absential[ str ] = __.absent, 

363 ) -> __.typx.Optional[ __.Version ]: 

364 ''' Extracts and parses semantic version from tag name. 

365 

366 If prefix is provided, only processes tags that start with the 

367 prefix and strips it before parsing. If prefix is absent, tries 

368 parsing the tag name directly. Returns None if tag cannot be 

369 parsed as a valid semantic version. 

370 ''' 

371 version_string = tag_name 

372 if not __.is_absent( prefix ): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 if not tag_name.startswith( prefix ): 

374 return None 

375 version_string = tag_name[ len( prefix ): ] 

376 try: 

377 return __.Version( version_string ) 

378 except __.InvalidVersion: 

379 return None 

380 

381 def _get_latest_tag( 

382 self, 

383 repo_dir: __.Path, 

384 tag_prefix: __.Absential[ str ] = __.absent, 

385 ) -> __.typx.Optional[ str ]: 

386 ''' Gets the latest tag from the repository by semantic version. 

387 

388 Optionally filters tags by prefix before selecting latest. 

389 Uses packaging.version.Version for semantic comparison. If no 

390 tags can be parsed as versions, returns None (falls back to 

391 default branch). 

392 ''' 

393 from dulwich.repo import Repo 

394 try: 

395 repo = Repo( str( repo_dir ) ) 

396 except Exception: 

397 return None 

398 try: 

399 tag_refs = repo.refs.as_dict( b"refs/tags" ) 

400 except Exception: 

401 return None 

402 if not tag_refs: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true

403 return None 

404 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ] 

405 for tag_name_bytes, commit_sha in tag_refs.items( ): 

406 commit = self._get_tag_commit( repo, commit_sha ) 

407 if commit is not None: 407 ↛ 405line 407 didn't jump to line 405 because the condition on line 407 was always true

408 tag_name = tag_name_bytes.decode( 'utf-8' ) 

409 version = self._extract_version( tag_name, tag_prefix ) 

410 if version is not None: 410 ↛ 405line 410 didn't jump to line 405 because the condition on line 410 was always true

411 versioned_tags.append( ( version, tag_name ) ) 

412 if versioned_tags: 412 ↛ 415line 412 didn't jump to line 415 because the condition on line 412 was always true

413 versioned_tags.sort( reverse = True ) 

414 return versioned_tags[ 0 ][ 1 ] 

415 return None 

416 

417 def _get_tag_commit( 

418 self, repo: __.typx.Any, commit_sha: bytes 

419 ) -> __.typx.Any: 

420 ''' Gets commit object for a tag, handling annotated tags. ''' 

421 try: 

422 commit = repo[ commit_sha ] 

423 while hasattr( commit, 'object' ): 423 ↛ 425line 423 didn't jump to line 425 because the condition on line 423 was never true

424 # object attribute is a tuple (class, sha) 

425 commit = repo[ commit.object[ 1 ] ] 

426 except Exception: 

427 return None 

428 else: 

429 return commit 

430 

431 def _checkout_ref( self, repo_dir: __.Path, ref: str ) -> None: 

432 ''' Checks out a specific branch, tag, or commit in cloned repo. ''' 

433 from dulwich.repo import Repo 

434 try: 

435 repo = Repo( str( repo_dir ) ) 

436 except Exception as exception: 

437 raise GitRefAbsence( ref, str( repo_dir ) ) from exception 

438 checkout = getattr( _dulwich_porcelain, 'checkout', None ) 

439 if checkout is None: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 self._validate_ref_presence( repo, ref, str( repo_dir ) ) 

441 return 

442 try: 

443 checkout( repo, ref ) 

444 except ( KeyError, ValueError ): 

445 self._raise_ref_not_found( ref, str( repo_dir ) ) 

446 

447 def _validate_ref_presence( 

448 self, repo: __.typx.Any, ref: str, repo_dir: str 

449 ) -> None: 

450 ''' Validates branch, tag, or commit reference exists in repo. ''' 

451 ref_bytes = ref.encode( ) 

452 tag_ref = f"refs/tags/{ref}".encode( ) 

453 branch_ref = f"refs/heads/{ref}".encode( ) 

454 if tag_ref in repo.refs or branch_ref in repo.refs: 

455 return 

456 try: 

457 repo[ ref_bytes ] 

458 except KeyError: 

459 self._raise_ref_not_found( ref, repo_dir ) 

460 

461 def _raise_ref_not_found( self, ref: str, repo_dir: str ) -> None: 

462 ''' Raises GitRefAbsence for invalid reference. ''' 

463 raise GitRefAbsence( ref, repo_dir ) 

464 

465 def _raise_subdir_not_found( self, subdir: str, source_spec: str ) -> None: 

466 ''' Raises GitSubdirectoryAbsence for missing subdirectory. ''' 

467 raise GitSubdirectoryAbsence( subdir, source_spec ) 

468 

469 def _detect_git_host( self, git_url: str ) -> __.typx.Optional[ str ]: 

470 ''' Detects Git hosting provider from URL. 

471 

472 Returns 'github', 'gitlab', or None for other providers. 

473 ''' 

474 if git_url.startswith( 'git@' ): 

475 parts = git_url.split( '@', 1 ) 

476 if len( parts ) > 1: 

477 host_part = parts[ 1 ].split( ':', 1 )[ 0 ] 

478 if 'github.com' in host_part: return 'github' 

479 if 'gitlab.com' in host_part: return 'gitlab' 

480 else: 

481 parsed = __.urlparse.urlparse( git_url ) 

482 hostname = parsed.netloc.lower( ) 

483 if 'github.com' in hostname: return 'github' 

484 if 'gitlab.com' in hostname: return 'gitlab' 

485 return None 

486 

487 def _acquire_github_authentication_token( 

488 self 

489 ) -> __.typx.Optional[ str ]: 

490 ''' Acquires GitHub authentication token from environment or gh CLI. 

491 

492 Checks GITHUB_TOKEN environment variable first, then attempts 

493 to retrieve token from gh CLI. Returns None if neither source 

494 is available. 

495 ''' 

496 token = __.os.environ.get( 'GITHUB_TOKEN' ) 

497 if token: return token 

498 try: 

499 result = __.subprocess.run( 

500 [ 'gh', 'auth', 'token' ], 

501 capture_output = True, 

502 text = True, 

503 timeout = 5, 

504 check = False ) 

505 if result.returncode == 0: 

506 return result.stdout.strip( ) 

507 except ( FileNotFoundError, __.subprocess.TimeoutExpired ): 

508 pass 

509 return None 

510 

511 def _acquire_gitlab_authentication_token( 

512 self 

513 ) -> __.typx.Optional[ str ]: 

514 ''' Acquires GitLab authentication token from environment. 

515 

516 Checks GITLAB_TOKEN environment variable. Returns None if not 

517 available. 

518 ''' 

519 return __.os.environ.get( 'GITLAB_TOKEN' ) 

520 

521 def _retrieve_github_tags( 

522 self, owner: str, repository: str 

523 ) -> __.typx.Optional[ list[ GitApiTag ] ]: 

524 ''' Retrieves tags from GitHub API. 

525 

526 Returns list of tag dictionaries or None on failure. Each tag 

527 contains 'name' and 'commit' fields. 

528 ''' 

529 token = self._acquire_github_authentication_token( ) 

530 url = f"https://api.github.com/repos/{owner}/{repository}/tags" 

531 request = __.urlreq.Request( url ) 

532 if token: 

533 request.add_header( 'Authorization', f"token {token}" ) 

534 request.add_header( 'Accept', 'application/vnd.github.v3+json' ) 

535 try: 

536 with __.urlreq.urlopen( request, timeout = 10 ) as response: 

537 return __.json.loads( response.read( ) ) 

538 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ): 

539 return None 

540 

541 def _retrieve_gitlab_tags( 

542 self, owner: str, repository: str 

543 ) -> __.typx.Optional[ list[ GitApiTag ] ]: 

544 ''' Retrieves tags from GitLab API. 

545 

546 Returns list of tag dictionaries or None on failure. Each tag 

547 contains 'name' and 'commit' fields. 

548 ''' 

549 token = self._acquire_gitlab_authentication_token( ) 

550 project_path = f"{owner}%2F{repository}" 

551 url = ( 

552 f"https://gitlab.com/api/v4/projects/{project_path}/" 

553 f"repository/tags" ) 

554 request = __.urlreq.Request( url ) 

555 if token: 

556 request.add_header( 'PRIVATE-TOKEN', token ) 

557 try: 

558 with __.urlreq.urlopen( request, timeout = 10 ) as response: 

559 return __.json.loads( response.read( ) ) 

560 except ( __.urlerr.URLError, __.urlerr.HTTPError, Exception ): 

561 return None 

562 

563 def _extract_repository_information( 

564 self, git_url: str 

565 ) -> __.typx.Optional[ tuple[ str, str ] ]: 

566 ''' Extracts owner and repository name from Git URL. 

567 

568 Returns tuple of (owner, repository) or None if URL format is 

569 not recognized. Handles both SSH (git@host:owner/repo) and 

570 HTTPS (https://host/owner/repo) formats. 

571 ''' 

572 host = self._detect_git_host( git_url ) 

573 if host is None: return None 

574 path = None 

575 if git_url.startswith( 'git@' ): 

576 parts = git_url.split( ':', maxsplit = 1 ) 

577 path = parts[ 1 ] if len( parts ) > 1 else None 

578 else: 

579 parsed = __.urlparse.urlparse( git_url ) 

580 path = parsed.path.lstrip( '/' ) 

581 if path is None: return None 

582 path = path.removesuffix( '.git' ) 

583 path_parts = path.split( '/', maxsplit = 1 ) 

584 if len( path_parts ) > 1: 

585 return ( path_parts[ 0 ], path_parts[ 1 ] ) 

586 return None 

587 

588 def _select_latest_tag_from_api( 

589 self, 

590 tags: list[ GitApiTag ], 

591 tag_prefix: __.Absential[ str ] = __.absent, 

592 ) -> __.typx.Optional[ str ]: 

593 ''' Selects latest tag from API results by semantic version. 

594 

595 Filters by tag prefix if provided, then selects tag with 

596 highest semantic version. Returns None if no valid version 

597 tags are found. 

598 ''' 

599 versioned_tags: list[ tuple[ __.Version, str ] ] = [ ] 

600 for tag in tags: 

601 tag_name = tag[ 'name' ] 

602 version = self._extract_version( tag_name, tag_prefix ) 

603 if version is not None: 

604 versioned_tags.append( ( version, tag_name ) ) 

605 if versioned_tags: 

606 versioned_tags.sort( reverse = True ) 

607 return versioned_tags[ 0 ][ 1 ] 

608 return None 

609 

610 def _resolve_latest_tag_via_api( 

611 self, 

612 git_url: str, 

613 tag_prefix: __.Absential[ str ] = __.absent, 

614 ) -> __.typx.Optional[ str ]: 

615 ''' Resolves latest tag using GitHub or GitLab API. 

616 

617 Returns tag name or None if API resolution fails or is not 

618 applicable. 

619 ''' 

620 host = self._detect_git_host( git_url ) 

621 if host is None: return None 

622 repo_info = self._extract_repository_information( git_url ) 

623 if repo_info is None: return None 

624 owner, repository = repo_info 

625 if host == 'github': 

626 tags = self._retrieve_github_tags( owner, repository ) 

627 elif host == 'gitlab': 

628 tags = self._retrieve_gitlab_tags( owner, repository ) 

629 else: 

630 return None 

631 if tags is None: return None 

632 return self._select_latest_tag_from_api( tags, tag_prefix )