Coverage for sources/librovore/xtnsmgr/importation.py: 34%
91 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-02 00:02 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-02 00:02 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Import path management and .pth file processing for extension packages. '''
24import importlib as _importlib
26from . import __
29_added_paths: __.cabc.MutableSequence[ str ] = [ ]
30_scribe = __.acquire_scribe( __name__ )
33def add_package_to_import_path(
34 package_path: __.Path, *,
35 path_adder: __.Absential[
36 __.cabc.Callable[ [ __.Path ], None ]
37 ] = __.absent,
38 pth_processor: __.Absential[
39 __.cabc.Callable[ [ __.Path ], None ]
40 ] = __.absent
41) -> None:
42 ''' Add package to sys.path and process any .pth files. '''
43 if __.is_absent( path_adder ): 43 ↛ 45line 43 didn't jump to line 45 because the condition on line 43 was always true
44 path_adder = _add_path_to_sys_path
45 if __.is_absent( pth_processor ): 45 ↛ 47line 45 didn't jump to line 47 because the condition on line 45 was always true
46 pth_processor = process_pth_files
47 path_adder( package_path )
48 pth_processor( package_path )
51def _add_path_to_sys_path( package_path: __.Path ) -> None:
52 ''' Add single path to sys.path if not already present. '''
53 path_str = str( package_path )
54 if path_str not in __.sys.path:
55 __.sys.path.insert( 0, path_str )
56 _added_paths.append( path_str )
57 _scribe.debug( f"Added to sys.path: {path_str}." )
58 else:
59 _scribe.debug( f"Path already in sys.path: {path_str}." )
62def import_processor_module( module_name: str ) -> __.types.ModuleType:
63 ''' Import a processor module by name.
65 Uses standard Python import machinery. For builtin processors,
66 pass f"{__.package_name}.structures.{name}". For external processors,
67 pass the module name directly.
68 '''
69 try:
70 _scribe.debug( f"Importing processor module: {module_name}." )
71 module = _importlib.import_module( module_name )
72 except ImportError as exc:
73 _scribe.error( f"Failed to import {module_name}: {exc}." )
74 raise
75 else:
76 _scribe.info( f"Successfully imported: {module_name}." )
77 return module
80def list_registered_processors( ) -> tuple[ str, ... ]:
81 ''' List all currently registered processor names from both registries. '''
82 all_processors: dict[ str, __.Processor ] = {
83 **__.inventory_processors,
84 **__.structure_processors
85 }
86 return tuple( all_processors.keys( ) )
89def get_module_info( module_name: str ) -> dict[ str, __.typx.Any ]:
90 ''' Get information about an imported module. '''
91 if module_name not in __.sys.modules:
92 return { 'imported': False }
93 module = __.sys.modules[ module_name ]
94 return {
95 'imported': True,
96 'name': module.__name__,
97 'file': getattr( module, '__file__', None ),
98 'package': getattr( module, '__package__', None ),
99 'version': getattr( module, '__version__', None ),
100 'doc': getattr( module, '__doc__', None ),
101 }
104def process_pth_files(
105 package_path: __.Path, *,
106 processor: __.Absential[
107 __.cabc.Callable[ [ __.Path ], None ]
108 ] = __.absent
109) -> None:
110 ''' Process .pth files in package directory to update sys.path.
112 Handles proper encoding, hidden file detection, and security.
113 '''
114 if not package_path.is_dir( ): return 114 ↛ exitline 114 didn't return from function 'process_pth_files' because the return on line 114 wasn't executed
115 try:
116 pth_files = (
117 file for file in package_path.iterdir( )
118 if '.pth' == file.suffix
119 and not file.name.startswith( '.' ) )
120 except OSError: return
121 if __.is_absent( processor ): 121 ↛ 123line 121 didn't jump to line 123 because the condition on line 121 was always true
122 processor = _process_pth_file
123 for pth_file in sorted( pth_files ): 123 ↛ 124line 123 didn't jump to line 124 because the loop on line 123 never started
124 processor( pth_file )
127def _acquire_pth_file_content(
128 pth_file: __.Path, *,
129 encoding_provider: __.Absential[
130 __.cabc.Callable[ [ ], str ]
131 ] = __.absent
132) -> str:
133 ''' Read .pth file content with proper encoding handling. '''
134 with __.io.open_code( str( pth_file ) ) as stream:
135 content_bytes = stream.read( )
136 # Accept BOM markers in .pth files - same as with source files
137 try: return content_bytes.decode( 'utf-8-sig' )
138 except UnicodeDecodeError:
139 if __.is_absent( encoding_provider ):
140 encoding_provider = __.locale.getpreferredencoding
141 return content_bytes.decode( encoding_provider( ) )
144def _is_hidden(
145 path: __.Path, *, platform: __.Absential[ str ] = __.absent
146) -> bool:
147 ''' Check if path is hidden via system attributes. '''
148 try: inode = path.lstat( )
149 except OSError: return False
150 if __.is_absent( platform ):
151 platform = __.sys.platform
152 match platform:
153 case 'darwin':
154 return bool( getattr( inode, 'st_flags', 0 ) & __.stat.UF_HIDDEN )
155 case 'win32':
156 # Windows FILE_ATTRIBUTE_HIDDEN constant (0x2)
157 return bool(
158 getattr( inode, 'st_file_attributes', 0 )
159 & getattr( __.stat, 'FILE_ATTRIBUTE_HIDDEN', 0x2 ) )
160 case _: return False
163def _process_pth_file(
164 pth_file: __.Path, *,
165 hidden_checker: __.Absential[
166 __.cabc.Callable[ [ __.Path ], bool ]
167 ] = __.absent,
168 content_reader: __.Absential[
169 __.cabc.Callable[ [ __.Path ], str ]
170 ] = __.absent,
171 line_processor: __.Absential[
172 __.cabc.Callable[ [ __.Path, str ], None ]
173 ] = __.absent
174) -> None:
175 ''' Process single .pth file. '''
176 if __.is_absent( hidden_checker ):
177 hidden_checker = _is_hidden
178 if __.is_absent( content_reader ):
179 content_reader = _acquire_pth_file_content
180 if __.is_absent( line_processor ):
181 line_processor = _process_pth_file_lines
182 if not pth_file.exists( ) or hidden_checker( pth_file ): return
183 try: content = content_reader( pth_file )
184 except OSError: return
185 line_processor( pth_file, content )
188def _process_pth_file_lines(
189 pth_file: __.Path, content: str, *,
190 executor: __.Absential[ __.cabc.Callable[ [ str ], None ] ] = __.absent,
191 path_adder: __.Absential[
192 __.cabc.Callable[ [ __.Path ], None ]
193 ] = __.absent
194) -> None:
195 ''' Process lines in .pth file content. '''
196 if __.is_absent( executor ):
197 executor = exec
198 if __.is_absent( path_adder ):
199 path_adder = _add_path_to_sys_path
200 for n, line in enumerate( content.splitlines( ), 1 ):
201 if line.startswith( '#' ) or '' == line.strip( ): continue
202 if line.startswith( ( 'import ', 'import\t' ) ):
203 _scribe.debug( f"Executing import from {pth_file.name}: {line}" )
204 try: executor( line )
205 except Exception:
206 _scribe.exception( f"Error on line {n} of {pth_file}." )
207 break
208 continue
209 # Add directory path relative to .pth file location
210 path_to_add = pth_file.parent / line.rstrip( )
211 if path_to_add.exists( ):
212 path_adder( path_to_add )