Coverage for sources/librovore/xtnsmgr/importation.py: 28%

115 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-17 23:43 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Import path management and .pth file processing for extension packages. ''' 

22 

23 

24import importlib as _importlib 

25 

26from . import __ 

27 

28 

29_added_paths: __.cabc.MutableSequence[ str ] = [ ] 

30_scribe = __.acquire_scribe( __name__ ) 

31 

32 

33def add_package_to_import_path( 

34 package_path: __.Path, *, 

35 path_adder: __.Absential[ 

36 __.cabc.Callable[ [ __.Path ], None ] 

37 ] = __.absent, 

38 pth_processor: __.Absential[ 

39 __.cabc.Callable[ [ __.Path ], None ] 

40 ] = __.absent 

41) -> None: 

42 ''' Add package to sys.path and process any .pth files. ''' 

43 if __.is_absent( path_adder ): 43 ↛ 45line 43 didn't jump to line 45 because the condition on line 43 was always true

44 path_adder = _add_path_to_sys_path 

45 if __.is_absent( pth_processor ): 45 ↛ 47line 45 didn't jump to line 47 because the condition on line 45 was always true

46 pth_processor = process_pth_files 

47 path_adder( package_path ) 

48 pth_processor( package_path ) 

49 

50 

51def _add_path_to_sys_path( package_path: __.Path ) -> None: 

52 ''' Add single path to sys.path if not already present. ''' 

53 path_str = str( package_path ) 

54 if path_str not in __.sys.path: 

55 __.sys.path.insert( 0, path_str ) 

56 _added_paths.append( path_str ) 

57 _scribe.debug( f"Added to sys.path: {path_str}." ) 

58 else: 

59 _scribe.debug( f"Path already in sys.path: {path_str}." ) 

60 

61 

62def remove_from_import_path( package_path: __.Path ) -> None: 

63 ''' Remove package path from sys.path. ''' 

64 path_str = str( package_path ) 

65 if path_str in __.sys.path: 

66 __.sys.path.remove( path_str ) 

67 if path_str in _added_paths: 

68 _added_paths.remove( path_str ) 

69 _scribe.debug( f"Removed from sys.path: {path_str}." ) 

70 

71 

72def cleanup_import_paths( ) -> None: 

73 ''' Remove all extension-added paths from sys.path. ''' 

74 # Copy list to avoid modification during iteration 

75 for path_str in _added_paths[ : ]: 

76 if path_str in __.sys.path: 

77 __.sys.path.remove( path_str ) 

78 _added_paths.remove( path_str ) 

79 _scribe.debug( f"Cleaned up sys.path: {path_str}." ) 

80 

81 

82def import_processor_module( module_name: str ) -> __.types.ModuleType: 

83 ''' Import a processor module by name. 

84 

85 Uses standard Python import machinery. For builtin processors, 

86 pass f"{__.package_name}.structures.{name}". For external processors, 

87 pass the module name directly. 

88 ''' 

89 try: 

90 _scribe.debug( f"Importing processor module: {module_name}." ) 

91 module = _importlib.import_module( module_name ) 

92 except ImportError as exc: 

93 _scribe.error( f"Failed to import {module_name}: {exc}." ) 

94 raise 

95 else: 

96 _scribe.info( f"Successfully imported: {module_name}." ) 

97 return module 

98 

99 

100def reload_processor_module( 

101 module_name: str, *, 

102 importer: __.Absential[ 

103 __.cabc.Callable[ [ str ], __.types.ModuleType ] 

104 ] = __.absent, 

105 reloader: __.Absential[ __.cabc.Callable[ 

106 [ __.types.ModuleType ], __.types.ModuleType 

107 ] ] = __.absent 

108) -> __.types.ModuleType: 

109 ''' Reload a processor module if it's already imported. ''' 

110 if module_name in __.sys.modules: 

111 _scribe.debug( f"Reloading processor module: {module_name}." ) 

112 module = __.sys.modules[ module_name ] 

113 if __.is_absent( reloader ): 

114 reloader = _importlib.reload 

115 return reloader( module ) 

116 _scribe.debug( f"Module not yet imported, importing: {module_name}." ) 

117 if __.is_absent( importer ): 

118 importer = import_processor_module 

119 return importer( module_name ) 

120 

121 

122def list_registered_processors( ) -> tuple[ str, ... ]: 

123 ''' List all currently registered processor names from both registries. ''' 

124 all_processors: dict[ str, __.Processor ] = { 

125 **__.inventory_processors, 

126 **__.structure_processors 

127 } 

128 return tuple( all_processors.keys( ) ) 

129 

130 

131def get_module_info( module_name: str ) -> dict[ str, __.typx.Any ]: 

132 ''' Get information about an imported module. ''' 

133 if module_name not in __.sys.modules: 

134 return { 'imported': False } 

135 module = __.sys.modules[ module_name ] 

136 return { 

137 'imported': True, 

138 'name': module.__name__, 

139 'file': getattr( module, '__file__', None ), 

140 'package': getattr( module, '__package__', None ), 

141 'version': getattr( module, '__version__', None ), 

142 'doc': getattr( module, '__doc__', None ), 

143 } 

144 

145 

146def process_pth_files( 

147 package_path: __.Path, *, 

148 processor: __.Absential[ 

149 __.cabc.Callable[ [ __.Path ], None ] 

150 ] = __.absent 

151) -> None: 

152 ''' Process .pth files in package directory to update sys.path. 

153 

154 Handles proper encoding, hidden file detection, and security. 

155 ''' 

156 if not package_path.is_dir( ): return 156 ↛ exitline 156 didn't return from function 'process_pth_files' because the return on line 156 wasn't executed

157 try: 

158 pth_files = ( 

159 file for file in package_path.iterdir( ) 

160 if '.pth' == file.suffix 

161 and not file.name.startswith( '.' ) ) 

162 except OSError: return 

163 if __.is_absent( processor ): 163 ↛ 165line 163 didn't jump to line 165 because the condition on line 163 was always true

164 processor = _process_pth_file 

165 for pth_file in sorted( pth_files ): 165 ↛ 166line 165 didn't jump to line 166 because the loop on line 165 never started

166 processor( pth_file ) 

167 

168 

169def _acquire_pth_file_content( 

170 pth_file: __.Path, *, 

171 encoding_provider: __.Absential[ 

172 __.cabc.Callable[ [ ], str ] 

173 ] = __.absent 

174) -> str: 

175 ''' Read .pth file content with proper encoding handling. ''' 

176 with __.io.open_code( str( pth_file ) ) as stream: 

177 content_bytes = stream.read( ) 

178 # Accept BOM markers in .pth files - same as with source files 

179 try: return content_bytes.decode( 'utf-8-sig' ) 

180 except UnicodeDecodeError: 

181 if __.is_absent( encoding_provider ): 

182 encoding_provider = __.locale.getpreferredencoding 

183 return content_bytes.decode( encoding_provider( ) ) 

184 

185 

186def _is_hidden( 

187 path: __.Path, *, platform: __.Absential[ str ] = __.absent 

188) -> bool: 

189 ''' Check if path is hidden via system attributes. ''' 

190 try: inode = path.lstat( ) 

191 except OSError: return False 

192 if __.is_absent( platform ): 

193 platform = __.sys.platform 

194 match platform: 

195 case 'darwin': 

196 return bool( getattr( inode, 'st_flags', 0 ) & __.stat.UF_HIDDEN ) 

197 case 'win32': 

198 # Windows FILE_ATTRIBUTE_HIDDEN constant (0x2) 

199 return bool( 

200 getattr( inode, 'st_file_attributes', 0 ) 

201 & getattr( __.stat, 'FILE_ATTRIBUTE_HIDDEN', 0x2 ) ) 

202 case _: return False 

203 

204 

205def _process_pth_file( 

206 pth_file: __.Path, *, 

207 hidden_checker: __.Absential[ 

208 __.cabc.Callable[ [ __.Path ], bool ] 

209 ] = __.absent, 

210 content_reader: __.Absential[ 

211 __.cabc.Callable[ [ __.Path ], str ] 

212 ] = __.absent, 

213 line_processor: __.Absential[ 

214 __.cabc.Callable[ [ __.Path, str ], None ] 

215 ] = __.absent 

216) -> None: 

217 ''' Process single .pth file. ''' 

218 if __.is_absent( hidden_checker ): 

219 hidden_checker = _is_hidden 

220 if __.is_absent( content_reader ): 

221 content_reader = _acquire_pth_file_content 

222 if __.is_absent( line_processor ): 

223 line_processor = _process_pth_file_lines 

224 if not pth_file.exists( ) or hidden_checker( pth_file ): return 

225 try: content = content_reader( pth_file ) 

226 except OSError: return 

227 line_processor( pth_file, content ) 

228 

229 

230def _process_pth_file_lines( 

231 pth_file: __.Path, content: str, *, 

232 executor: __.Absential[ __.cabc.Callable[ [ str ], None ] ] = __.absent, 

233 path_adder: __.Absential[ 

234 __.cabc.Callable[ [ __.Path ], None ] 

235 ] = __.absent 

236) -> None: 

237 ''' Process lines in .pth file content. ''' 

238 if __.is_absent( executor ): 

239 executor = exec 

240 if __.is_absent( path_adder ): 

241 path_adder = _add_path_to_sys_path 

242 for n, line in enumerate( content.splitlines( ), 1 ): 

243 if line.startswith( '#' ) or '' == line.strip( ): continue 

244 if line.startswith( ( 'import ', 'import\t' ) ): 

245 _scribe.debug( f"Executing import from {pth_file.name}: {line}" ) 

246 try: executor( line ) 

247 except Exception: 

248 _scribe.exception( f"Error on line {n} of {pth_file}." ) 

249 break 

250 continue 

251 # Add directory path relative to .pth file location 

252 path_to_add = pth_file.parent / line.rstrip( ) 

253 if path_to_add.exists( ): 

254 path_adder( path_to_add )