Coverage for sources/librovore/xtnsmgr/cachemgr.py: 53%

98 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 18:40 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Cache management for extension packages. ''' 

22 

23 

24from . import __ 

25from . import importation as _importation 

26from . import installation as _installation 

27 

28 

29_scribe = __.acquire_scribe( __name__ ) 

30 

31 

32class CacheInfo( __.immut.DataclassObject ): 

33 ''' Information about cached extension package. ''' 

34 

35 specification: str 

36 location: __.Path 

37 ctime: __.datetime.datetime 

38 ttl: int # hours 

39 platform_id: str 

40 

41 @property 

42 def is_expired( self ) -> bool: 

43 ''' Checks if cache entry has expired. ''' 

44 return ( 

45 __.datetime.datetime.now( ) - self.ctime 

46 > __.datetime.timedelta( hours = self.ttl ) ) 

47 

48 

49def calculate_cache_path( specification: str ) -> __.Path: 

50 ''' Calculates cache path for package specification. ''' 

51 base_dir = __.Path( '.auxiliary/caches/extensions' ) 

52 hasher = __.hashlib.sha256( ) 

53 hasher.update( specification.encode( 'utf-8' ) ) 

54 digest = hasher.hexdigest( ) 

55 platform_id = calculate_platform_id( ) 

56 return base_dir / digest / platform_id 

57 

58 

59def calculate_platform_id( ) -> str: 

60 ''' Calculates platform identifier for package cache paths. 

61 

62 Format: {python_impl}-{python_ver}--{os_name}--{cpu_arch} 

63 

64 Examples: 

65 cpython-3.10--linux--x86_64 

66 pypy-3.10-7.3--darwin--arm64 

67 ''' 

68 implementation = __.sys.implementation.name 

69 version = '.'.join( map( str, __.sys.version_info[ : 2 ] ) ) 

70 suffix = '' 

71 match implementation: 

72 case 'pypy': 72 ↛ 73line 72 didn't jump to line 73 because the pattern on line 72 never matched

73 suffix = '-' + '.'.join( 

74 map( str, __.sys.pypy_version_info[ : 2 ] ) ) # pyright: ignore 

75 case 'graalpy': 75 ↛ 77line 75 didn't jump to line 77 because the pattern on line 75 never matched

76 # TODO: Add GraalVM version when available 

77 pass 

78 case _: 

79 pass 

80 os_name = __.platform.system( ).lower( ) 

81 architecture = __.platform.machine( ).lower( ) 

82 return ( 

83 f"{implementation}-{version}{suffix}" 

84 f"--{os_name}--{architecture}" ) 

85 

86 

87def acquire_cache_info( specification: str ) -> CacheInfo | None: 

88 ''' Acquires cache information for a package, if it exists. ''' 

89 cache_path = calculate_cache_path( specification ) 

90 metafile = cache_path / '.cache_metadata.json' 

91 if not metafile.exists( ): return None 

92 try: 

93 with metafile.open( 'r', encoding = 'utf-8' ) as f: 

94 metadata = __.json.load( f ) 

95 return CacheInfo( 

96 specification = metadata[ 'package_spec' ], 

97 ctime = __.datetime.datetime.fromisoformat( 

98 metadata[ 'installed_at' ] 

99 ), 

100 ttl = metadata[ 'ttl_hours' ], 

101 platform_id = metadata[ 'platform_id' ], 

102 location = cache_path ) 

103 except ( __.json.JSONDecodeError, KeyError, ValueError ) as exc: 

104 _scribe.warning( 

105 f"Invalid cache metadata for {specification}: {exc}" ) 

106 return None 

107 

108 

109def save_cache_info( cache_info: CacheInfo ) -> None: 

110 ''' Saves cache information to metadata file. ''' 

111 metafile = cache_info.location / '.cache_metadata.json' 

112 metafile.parent.mkdir( parents = True, exist_ok = True ) 

113 metadata: __.cabc.Mapping[ str, str | int ] = __.immut.Dictionary( { 

114 'package_spec': cache_info.specification, 

115 'installed_at': cache_info.ctime.isoformat( ), 

116 'ttl_hours': cache_info.ttl, 

117 'platform_id': cache_info.platform_id 

118 } ) 

119 with metafile.open( 'w', encoding = 'utf-8' ) as f: 

120 __.json.dump( dict( metadata ), f, indent = 2 ) 

121 

122 

123def cleanup_expired_caches( ttl: int = 24 ) -> None: 

124 ''' Removes expired cache entries. ''' 

125 base_dir = __.Path( '.auxiliary/caches/extensions' ) 

126 if not base_dir.exists( ): return 

127 for package_dir in base_dir.iterdir( ): 

128 if not package_dir.is_dir( ): continue 

129 for platform_dir in package_dir.iterdir( ): 

130 if not platform_dir.is_dir( ): continue 

131 metafile = platform_dir / '.cache_metadata.json' 

132 if not metafile.exists( ): continue 

133 try: 

134 with metafile.open( 'r', encoding = 'utf-8' ) as f: 

135 metadata = __.json.load( f ) 

136 installed_at = __.datetime.datetime.fromisoformat( 

137 metadata[ 'installed_at' ] ) 

138 cache_ttl = metadata.get( 'ttl_hours', ttl ) 

139 if ( __.datetime.datetime.now( ) - installed_at 

140 > __.datetime.timedelta( hours = cache_ttl ) 

141 ): 

142 _scribe.info( f"Removing expired cache: {platform_dir}" ) 

143 __.shutil.rmtree( platform_dir ) 

144 except ( 

145 KeyError, ValueError, 

146 __.json.JSONDecodeError, 

147 OSError, 

148 ) as exc: 

149 _scribe.warning( 

150 f"Error processing cache {platform_dir}: {exc}" ) 

151 

152 

153def clear_package_cache( specification: str ) -> bool: 

154 ''' Clears cache for specific package. Returns True if found. ''' 

155 cache_path = calculate_cache_path( specification ) 

156 if cache_path.exists( ): 

157 try: 

158 __.shutil.rmtree( cache_path ) 

159 except OSError as exc: 

160 _scribe.error( 

161 f"Failed to clear cache for {specification}: {exc}" ) 

162 return False 

163 else: 

164 _scribe.info( f"Cleared cache for package: {specification}" ) 

165 return True 

166 return False 

167 

168 

169async def ensure_package( 

170 specification: str, *, 

171 cache_ttl: int = 24, 

172 retries_max: int = 3 

173) -> __.typx.Annotated[ 

174 None, 

175 __.ddoc.Raises( __.ExtensionConfigurationInvalidity, 'Invalid spec.' ), 

176 __.ddoc.Raises( __.ExtensionInstallFailure, 'Install fails.' ), 

177]: 

178 ''' Ensures package is installed and importable. ''' 

179 cache_info = acquire_cache_info( specification ) 

180 if cache_info and not cache_info.is_expired: 

181 _scribe.debug( f"Using cached package: {specification}." ) 

182 package_path = cache_info.location 

183 else: 

184 if cache_info and cache_info.is_expired: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 _scribe.debug( f"Clearing expired cache for: {specification}." ) 

186 clear_package_cache( specification ) 

187 cache_path = calculate_cache_path( specification ) 

188 package_path = await _installation.install_package( 

189 specification, cache_path, retries_max = retries_max ) 

190 cache_info = CacheInfo( 

191 specification = specification, 

192 ctime = __.datetime.datetime.now( ), 

193 ttl = cache_ttl, 

194 platform_id = calculate_platform_id( ), 

195 location = package_path ) 

196 save_cache_info( cache_info ) 

197 _importation.add_package_to_import_path( package_path ) 

198 

199 

200def invalidate( 

201 specification: str, *, 

202 clearer: __.Absential[ 

203 __.cabc.Callable[ [ str ], bool ] 

204 ] = __.absent 

205) -> None: 

206 ''' Removes package from cache, forcing reinstall on next ensure. ''' 

207 if __.is_absent( clearer ): 

208 clearer = clear_package_cache 

209 clearer( specification )