Coverage for sources/librovore/xtnsmgr/importation.py: 34%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 21:59 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Import path management and .pth file processing for extension packages. ''' 

22 

23 

24import importlib as _importlib 

25 

26from . import __ 

27 

28 

29_added_paths: __.cabc.MutableSequence[ str ] = [ ] 

30_scribe = __.acquire_scribe( __name__ ) 

31 

32 

33def add_package_to_import_path( 

34 package_path: __.Path, *, 

35 path_adder: __.Absential[ 

36 __.cabc.Callable[ [ __.Path ], None ] 

37 ] = __.absent, 

38 pth_processor: __.Absential[ 

39 __.cabc.Callable[ [ __.Path ], None ] 

40 ] = __.absent 

41) -> None: 

42 ''' Add package to sys.path and process any .pth files. ''' 

43 if __.is_absent( path_adder ): 43 ↛ 45line 43 didn't jump to line 45 because the condition on line 43 was always true

44 path_adder = _add_path_to_sys_path 

45 if __.is_absent( pth_processor ): 45 ↛ 47line 45 didn't jump to line 47 because the condition on line 45 was always true

46 pth_processor = process_pth_files 

47 path_adder( package_path ) 

48 pth_processor( package_path ) 

49 

50 

51def _add_path_to_sys_path( package_path: __.Path ) -> None: 

52 ''' Add single path to sys.path if not already present. ''' 

53 path_str = str( package_path ) 

54 if path_str not in __.sys.path: 

55 __.sys.path.insert( 0, path_str ) 

56 _added_paths.append( path_str ) 

57 _scribe.debug( f"Added to sys.path: {path_str}." ) 

58 else: 

59 _scribe.debug( f"Path already in sys.path: {path_str}." ) 

60 

61 

62def import_processor_module( module_name: str ) -> __.types.ModuleType: 

63 ''' Import a processor module by name. 

64 

65 Uses standard Python import machinery. For builtin processors, 

66 pass f"{__.package_name}.structures.{name}". For external processors, 

67 pass the module name directly. 

68 ''' 

69 try: 

70 _scribe.debug( f"Importing processor module: {module_name}." ) 

71 module = _importlib.import_module( module_name ) 

72 except ImportError as exc: 

73 _scribe.error( f"Failed to import {module_name}: {exc}." ) 

74 raise 

75 else: 

76 _scribe.info( f"Successfully imported: {module_name}." ) 

77 return module 

78 

79 

80def list_registered_processors( ) -> tuple[ str, ... ]: 

81 ''' List all currently registered processor names from both registries. ''' 

82 all_processors: dict[ str, __.Processor ] = { 

83 **__.inventory_processors, 

84 **__.structure_processors 

85 } 

86 return tuple( all_processors.keys( ) ) 

87 

88 

89def get_module_info( module_name: str ) -> dict[ str, __.typx.Any ]: 

90 ''' Get information about an imported module. ''' 

91 if module_name not in __.sys.modules: 

92 return { 'imported': False } 

93 module = __.sys.modules[ module_name ] 

94 return { 

95 'imported': True, 

96 'name': module.__name__, 

97 'file': getattr( module, '__file__', None ), 

98 'package': getattr( module, '__package__', None ), 

99 'version': getattr( module, '__version__', None ), 

100 'doc': getattr( module, '__doc__', None ), 

101 } 

102 

103 

104def process_pth_files( 

105 package_path: __.Path, *, 

106 processor: __.Absential[ 

107 __.cabc.Callable[ [ __.Path ], None ] 

108 ] = __.absent 

109) -> None: 

110 ''' Process .pth files in package directory to update sys.path. 

111 

112 Handles proper encoding, hidden file detection, and security. 

113 ''' 

114 if not package_path.is_dir( ): return 114 ↛ exitline 114 didn't return from function 'process_pth_files' because the return on line 114 wasn't executed

115 try: 

116 pth_files = ( 

117 file for file in package_path.iterdir( ) 

118 if '.pth' == file.suffix 

119 and not file.name.startswith( '.' ) ) 

120 except OSError: return 

121 if __.is_absent( processor ): 121 ↛ 123line 121 didn't jump to line 123 because the condition on line 121 was always true

122 processor = _process_pth_file 

123 for pth_file in sorted( pth_files ): 123 ↛ 124line 123 didn't jump to line 124 because the loop on line 123 never started

124 processor( pth_file ) 

125 

126 

127def _acquire_pth_file_content( 

128 pth_file: __.Path, *, 

129 encoding_provider: __.Absential[ 

130 __.cabc.Callable[ [ ], str ] 

131 ] = __.absent 

132) -> str: 

133 ''' Read .pth file content with proper encoding handling. ''' 

134 with __.io.open_code( str( pth_file ) ) as stream: 

135 content_bytes = stream.read( ) 

136 # Accept BOM markers in .pth files - same as with source files 

137 try: return content_bytes.decode( 'utf-8-sig' ) 

138 except UnicodeDecodeError: 

139 if __.is_absent( encoding_provider ): 

140 encoding_provider = __.locale.getpreferredencoding 

141 return content_bytes.decode( encoding_provider( ) ) 

142 

143 

144def _is_hidden( 

145 path: __.Path, *, platform: __.Absential[ str ] = __.absent 

146) -> bool: 

147 ''' Check if path is hidden via system attributes. ''' 

148 try: inode = path.lstat( ) 

149 except OSError: return False 

150 if __.is_absent( platform ): 

151 platform = __.sys.platform 

152 match platform: 

153 case 'darwin': 

154 return bool( getattr( inode, 'st_flags', 0 ) & __.stat.UF_HIDDEN ) 

155 case 'win32': 

156 # Windows FILE_ATTRIBUTE_HIDDEN constant (0x2) 

157 return bool( 

158 getattr( inode, 'st_file_attributes', 0 ) 

159 & getattr( __.stat, 'FILE_ATTRIBUTE_HIDDEN', 0x2 ) ) 

160 case _: return False 

161 

162 

163def _process_pth_file( 

164 pth_file: __.Path, *, 

165 hidden_checker: __.Absential[ 

166 __.cabc.Callable[ [ __.Path ], bool ] 

167 ] = __.absent, 

168 content_reader: __.Absential[ 

169 __.cabc.Callable[ [ __.Path ], str ] 

170 ] = __.absent, 

171 line_processor: __.Absential[ 

172 __.cabc.Callable[ [ __.Path, str ], None ] 

173 ] = __.absent 

174) -> None: 

175 ''' Process single .pth file. ''' 

176 if __.is_absent( hidden_checker ): 

177 hidden_checker = _is_hidden 

178 if __.is_absent( content_reader ): 

179 content_reader = _acquire_pth_file_content 

180 if __.is_absent( line_processor ): 

181 line_processor = _process_pth_file_lines 

182 if not pth_file.exists( ) or hidden_checker( pth_file ): return 

183 try: content = content_reader( pth_file ) 

184 except OSError: return 

185 line_processor( pth_file, content ) 

186 

187 

188def _process_pth_file_lines( 

189 pth_file: __.Path, content: str, *, 

190 executor: __.Absential[ __.cabc.Callable[ [ str ], None ] ] = __.absent, 

191 path_adder: __.Absential[ 

192 __.cabc.Callable[ [ __.Path ], None ] 

193 ] = __.absent 

194) -> None: 

195 ''' Process lines in .pth file content. ''' 

196 if __.is_absent( executor ): 

197 executor = exec 

198 if __.is_absent( path_adder ): 

199 path_adder = _add_path_to_sys_path 

200 for n, line in enumerate( content.splitlines( ), 1 ): 

201 if line.startswith( '#' ) or '' == line.strip( ): continue 

202 if line.startswith( ( 'import ', 'import\t' ) ): 

203 _scribe.debug( f"Executing import from {pth_file.name}: {line}" ) 

204 try: executor( line ) 

205 except Exception: 

206 _scribe.exception( f"Error on line {n} of {pth_file}." ) 

207 break 

208 continue 

209 # Add directory path relative to .pth file location 

210 path_to_add = pth_file.parent / line.rstrip( ) 

211 if path_to_add.exists( ): 

212 path_adder( path_to_add )