Coverage for sources/librovore/xtnsmgr/importation.py: 28%
115 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 22:48 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 22:48 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Import path management and .pth file processing for extension packages. '''
24import importlib as _importlib
26from . import __
29_added_paths: __.cabc.MutableSequence[ str ] = [ ]
30_scribe = __.acquire_scribe( __name__ )
33def add_package_to_import_path(
34 package_path: __.Path, *,
35 path_adder: __.Absential[
36 __.cabc.Callable[ [ __.Path ], None ]
37 ] = __.absent,
38 pth_processor: __.Absential[
39 __.cabc.Callable[ [ __.Path ], None ]
40 ] = __.absent
41) -> None:
42 ''' Add package to sys.path and process any .pth files. '''
43 if __.is_absent( path_adder ): 43 ↛ 45line 43 didn't jump to line 45 because the condition on line 43 was always true
44 path_adder = _add_path_to_sys_path
45 if __.is_absent( pth_processor ): 45 ↛ 47line 45 didn't jump to line 47 because the condition on line 45 was always true
46 pth_processor = process_pth_files
47 path_adder( package_path )
48 pth_processor( package_path )
51def _add_path_to_sys_path( package_path: __.Path ) -> None:
52 ''' Add single path to sys.path if not already present. '''
53 path_str = str( package_path )
54 if path_str not in __.sys.path:
55 __.sys.path.insert( 0, path_str )
56 _added_paths.append( path_str )
57 _scribe.debug( f"Added to sys.path: {path_str}." )
58 else:
59 _scribe.debug( f"Path already in sys.path: {path_str}." )
62def remove_from_import_path( package_path: __.Path ) -> None:
63 ''' Remove package path from sys.path. '''
64 path_str = str( package_path )
65 if path_str in __.sys.path:
66 __.sys.path.remove( path_str )
67 if path_str in _added_paths:
68 _added_paths.remove( path_str )
69 _scribe.debug( f"Removed from sys.path: {path_str}." )
72def cleanup_import_paths( ) -> None:
73 ''' Remove all extension-added paths from sys.path. '''
74 # Copy list to avoid modification during iteration
75 for path_str in _added_paths[ : ]:
76 if path_str in __.sys.path:
77 __.sys.path.remove( path_str )
78 _added_paths.remove( path_str )
79 _scribe.debug( f"Cleaned up sys.path: {path_str}." )
82def import_processor_module( module_name: str ) -> __.types.ModuleType:
83 ''' Import a processor module by name.
85 Uses standard Python import machinery. For builtin processors,
86 pass f"{__.package_name}.structures.{name}". For external processors,
87 pass the module name directly.
88 '''
89 try:
90 _scribe.debug( f"Importing processor module: {module_name}." )
91 module = _importlib.import_module( module_name )
92 except ImportError as exc:
93 _scribe.error( f"Failed to import {module_name}: {exc}." )
94 raise
95 else:
96 _scribe.info( f"Successfully imported: {module_name}." )
97 return module
100def reload_processor_module(
101 module_name: str, *,
102 importer: __.Absential[
103 __.cabc.Callable[ [ str ], __.types.ModuleType ]
104 ] = __.absent,
105 reloader: __.Absential[ __.cabc.Callable[
106 [ __.types.ModuleType ], __.types.ModuleType
107 ] ] = __.absent
108) -> __.types.ModuleType:
109 ''' Reload a processor module if it's already imported. '''
110 if module_name in __.sys.modules:
111 _scribe.debug( f"Reloading processor module: {module_name}." )
112 module = __.sys.modules[ module_name ]
113 if __.is_absent( reloader ):
114 reloader = _importlib.reload
115 return reloader( module )
116 _scribe.debug( f"Module not yet imported, importing: {module_name}." )
117 if __.is_absent( importer ):
118 importer = import_processor_module
119 return importer( module_name )
122def list_registered_processors( ) -> tuple[ str, ... ]:
123 ''' List all currently registered processor names from both registries. '''
124 all_processors: dict[ str, __.Processor ] = {
125 **__.inventory_processors,
126 **__.structure_processors
127 }
128 return tuple( all_processors.keys( ) )
131def get_module_info( module_name: str ) -> dict[ str, __.typx.Any ]:
132 ''' Get information about an imported module. '''
133 if module_name not in __.sys.modules:
134 return { 'imported': False }
135 module = __.sys.modules[ module_name ]
136 return {
137 'imported': True,
138 'name': module.__name__,
139 'file': getattr( module, '__file__', None ),
140 'package': getattr( module, '__package__', None ),
141 'version': getattr( module, '__version__', None ),
142 'doc': getattr( module, '__doc__', None ),
143 }
146def process_pth_files(
147 package_path: __.Path, *,
148 processor: __.Absential[
149 __.cabc.Callable[ [ __.Path ], None ]
150 ] = __.absent
151) -> None:
152 ''' Process .pth files in package directory to update sys.path.
154 Handles proper encoding, hidden file detection, and security.
155 '''
156 if not package_path.is_dir( ): return 156 ↛ exitline 156 didn't return from function 'process_pth_files' because the return on line 156 wasn't executed
157 try:
158 pth_files = (
159 file for file in package_path.iterdir( )
160 if '.pth' == file.suffix
161 and not file.name.startswith( '.' ) )
162 except OSError: return
163 if __.is_absent( processor ): 163 ↛ 165line 163 didn't jump to line 165 because the condition on line 163 was always true
164 processor = _process_pth_file
165 for pth_file in sorted( pth_files ): 165 ↛ 166line 165 didn't jump to line 166 because the loop on line 165 never started
166 processor( pth_file )
169def _acquire_pth_file_content(
170 pth_file: __.Path, *,
171 encoding_provider: __.Absential[
172 __.cabc.Callable[ [ ], str ]
173 ] = __.absent
174) -> str:
175 ''' Read .pth file content with proper encoding handling. '''
176 with __.io.open_code( str( pth_file ) ) as stream:
177 content_bytes = stream.read( )
178 # Accept BOM markers in .pth files - same as with source files
179 try: return content_bytes.decode( 'utf-8-sig' )
180 except UnicodeDecodeError:
181 if __.is_absent( encoding_provider ):
182 encoding_provider = __.locale.getpreferredencoding
183 return content_bytes.decode( encoding_provider( ) )
186def _is_hidden(
187 path: __.Path, *, platform: __.Absential[ str ] = __.absent
188) -> bool:
189 ''' Check if path is hidden via system attributes. '''
190 try: inode = path.lstat( )
191 except OSError: return False
192 if __.is_absent( platform ):
193 platform = __.sys.platform
194 match platform:
195 case 'darwin':
196 return bool( getattr( inode, 'st_flags', 0 ) & __.stat.UF_HIDDEN )
197 case 'win32':
198 # Windows FILE_ATTRIBUTE_HIDDEN constant (0x2)
199 return bool(
200 getattr( inode, 'st_file_attributes', 0 )
201 & getattr( __.stat, 'FILE_ATTRIBUTE_HIDDEN', 0x2 ) )
202 case _: return False
205def _process_pth_file(
206 pth_file: __.Path, *,
207 hidden_checker: __.Absential[
208 __.cabc.Callable[ [ __.Path ], bool ]
209 ] = __.absent,
210 content_reader: __.Absential[
211 __.cabc.Callable[ [ __.Path ], str ]
212 ] = __.absent,
213 line_processor: __.Absential[
214 __.cabc.Callable[ [ __.Path, str ], None ]
215 ] = __.absent
216) -> None:
217 ''' Process single .pth file. '''
218 if __.is_absent( hidden_checker ):
219 hidden_checker = _is_hidden
220 if __.is_absent( content_reader ):
221 content_reader = _acquire_pth_file_content
222 if __.is_absent( line_processor ):
223 line_processor = _process_pth_file_lines
224 if not pth_file.exists( ) or hidden_checker( pth_file ): return
225 try: content = content_reader( pth_file )
226 except OSError: return
227 line_processor( pth_file, content )
230def _process_pth_file_lines(
231 pth_file: __.Path, content: str, *,
232 executor: __.Absential[ __.cabc.Callable[ [ str ], None ] ] = __.absent,
233 path_adder: __.Absential[
234 __.cabc.Callable[ [ __.Path ], None ]
235 ] = __.absent
236) -> None:
237 ''' Process lines in .pth file content. '''
238 if __.is_absent( executor ):
239 executor = exec
240 if __.is_absent( path_adder ):
241 path_adder = _add_path_to_sys_path
242 for n, line in enumerate( content.splitlines( ), 1 ):
243 if line.startswith( '#' ) or '' == line.strip( ): continue
244 if line.startswith( ( 'import ', 'import\t' ) ):
245 _scribe.debug( f"Executing import from {pth_file.name}: {line}" )
246 try: executor( line )
247 except Exception:
248 _scribe.exception( f"Error on line {n} of {pth_file}." )
249 break
250 continue
251 # Add directory path relative to .pth file location
252 path_to_add = pth_file.parent / line.rstrip( )
253 if path_to_add.exists( ):
254 path_adder( path_to_add )