Coverage for sources/mimeogram/acquirers.py: 92%
168 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-27 13:03 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-27 13:03 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Content acquisition from various sources. '''
24import aiofiles as _aiofiles
25import httpx as _httpx
27from . import __
28from . import exceptions as _exceptions
29from . import parts as _parts
32_scribe = __.produce_scribe( __name__ )
35async def acquire(
36 auxdata: __.Globals, sources: __.cabc.Sequence[ str | __.Path ]
37) -> __.cabc.Sequence[ _parts.Part ]:
38 ''' Acquires content from multiple sources. '''
39 from urllib.parse import urlparse
40 options = auxdata.configuration.get( 'acquire-parts', { } )
41 strict = options.get( 'fail-on-invalid', False )
42 recursive = options.get( 'recurse-directories', False )
43 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ]
44 for source in sources:
45 path = __.Path( source )
46 url_parts = (
47 urlparse( source ) if isinstance( source, str )
48 else urlparse( str( source ) ) )
49 scheme = 'file' if path.drive else url_parts.scheme
50 match scheme:
51 case '' | 'file':
52 tasks.extend( _produce_fs_tasks( source, recursive ) )
53 case 'http' | 'https':
54 tasks.append( _produce_http_task( str( source ) ) )
55 case _:
56 raise _exceptions.UrlSchemeNoSupport( str( source ) )
57 if strict: return await __.gather_async( *tasks )
58 results = await __.gather_async( *tasks, return_exceptions = True )
59 # TODO: Factor into '__.generics.extract_results_filter_errors'.
60 values: list[ _parts.Part ] = [ ]
61 for result in results:
62 if result.is_error( ):
63 _scribe.warning( str( result.error ) )
64 continue
65 values.append( result.extract( ) )
66 return tuple( values )
69async def _acquire_from_file( location: __.Path ) -> _parts.Part:
70 ''' Acquires content from text file. '''
71 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
72 try:
73 async with _aiofiles.open( location, 'rb' ) as f: # pyright: ignore
74 content_bytes = await f.read( )
75 except Exception as exc: raise ContentAcquireFailure( location ) from exc
76 mimetype, charset = _detect_mimetype_and_charset( content_bytes, location )
77 if charset is None: raise ContentDecodeFailure( location, '???' ) 77 ↛ exitline 77 didn't except from function '_acquire_from_file' because the raise on line 77 wasn't executed
78 linesep = _parts.LineSeparators.detect_bytes( content_bytes )
79 if linesep is None: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 _scribe.warning( f"No line separator detected in '{location}'." )
81 linesep = _parts.LineSeparators( __.os.linesep )
82 try: content = content_bytes.decode( charset )
83 except Exception as exc:
84 raise ContentDecodeFailure( location, charset ) from exc
85 _scribe.debug( f"Read file: {location}" )
86 return _parts.Part(
87 location = str( location ),
88 mimetype = mimetype,
89 charset = charset,
90 linesep = linesep,
91 content = linesep.normalize( content ) )
94async def _acquire_via_http(
95 client: _httpx.AsyncClient, url: str
96) -> _parts.Part:
97 ''' Acquires content via HTTP/HTTPS. '''
98 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
99 try:
100 response = await client.get( url )
101 response.raise_for_status( )
102 except Exception as exc: raise ContentAcquireFailure( url ) from exc
103 mimetype = (
104 response.headers.get( 'content-type', 'application/octet-stream' )
105 .split( ';' )[ 0 ].strip( ) )
106 content_bytes = response.content
107 charset = response.encoding or _detect_charset( content_bytes )
108 if charset is None: raise ContentDecodeFailure( url, '???' ) 108 ↛ exitline 108 didn't except from function '_acquire_via_http' because the raise on line 108 wasn't executed
109 if not _is_textual_mimetype( mimetype ):
110 mimetype, _ = (
111 _detect_mimetype_and_charset(
112 content_bytes, url, charset = charset ) )
113 linesep = _parts.LineSeparators.detect_bytes( content_bytes )
114 if linesep is None: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 _scribe.warning( f"No line separator detected in '{url}'." )
116 linesep = _parts.LineSeparators( __.os.linesep )
117 try: content = content_bytes.decode( charset )
118 except Exception as exc:
119 raise ContentDecodeFailure( url, charset ) from exc
120 _scribe.debug( f"Fetched URL: {url}" )
121 return _parts.Part(
122 location = url,
123 mimetype = mimetype,
124 charset = charset,
125 linesep = linesep,
126 content = linesep.normalize( content ) )
129_files_to_ignore = frozenset( ( '.DS_Store', '.env' ) )
130_directories_to_ignore = frozenset( ( '.bzr', '.git', '.hg', '.svn' ) )
131def _collect_directory_files(
132 directory: __.Path, recursive: bool
133) -> list[ __.Path ]:
134 ''' Collects and filters files from directory hierarchy. '''
135 import gitignorefile
136 cache = gitignorefile.Cache( )
137 paths: list[ __.Path ] = [ ]
138 _scribe.debug( f"Collecting files in directory: {directory}" )
139 for entry in directory.iterdir( ):
140 if entry.is_dir( ) and entry.name in _directories_to_ignore:
141 _scribe.debug( f"Ignoring directory: {entry}" )
142 continue
143 if entry.is_file( ) and entry.name in _files_to_ignore: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 _scribe.debug( f"Ignoring file: {entry}" )
145 continue
146 if cache( str( entry ) ):
147 _scribe.debug( f"Ignoring path (matched by .gitignore): {entry}" )
148 continue
149 if entry.is_dir( ) and recursive:
150 paths.extend( _collect_directory_files( entry, recursive ) )
151 elif entry.is_file( ): paths.append( entry )
152 return paths
155def _detect_charset( content: bytes ) -> str | None:
156 from chardet import detect
157 charset = detect( content )[ 'encoding' ]
158 if charset is None: return charset
159 if charset.startswith( 'utf' ): return charset
160 match charset:
161 case 'ascii': return 'utf-8' # Assume superset.
162 case _: pass
163 # Shake out false positives, like 'MacRoman'.
164 try: content.decode( 'utf-8' )
165 except UnicodeDecodeError: return charset
166 return 'utf-8'
169def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None:
170 from mimetypes import guess_type
171 from puremagic import PureError, from_string # pyright: ignore
172 try: return from_string( content, mime = True )
173 except ( PureError, ValueError ):
174 return guess_type( str( location ) )[ 0 ]
177def _detect_mimetype_and_charset(
178 content: bytes,
179 location: str | __.Path, *,
180 mimetype: __.Absential[ str ] = __.absent,
181 charset: __.Absential[ str ] = __.absent,
182) -> tuple[ str, str | None ]:
183 from .exceptions import TextualMimetypeInvalidity
184 if __.is_absent( mimetype ): 184 ↛ 186line 184 didn't jump to line 186 because the condition on line 184 was always true
185 mimetype_ = _detect_mimetype( content, location )
186 else: mimetype_ = mimetype
187 if __.is_absent( charset ): # noqa: SIM108
188 charset_ = _detect_charset( content )
189 else: charset_ = charset
190 if not mimetype_:
191 if charset_:
192 mimetype_ = 'text/plain'
193 _validate_mimetype_with_trial_decode(
194 content, location, mimetype_, charset_ )
195 return mimetype_, charset_
196 mimetype_ = 'application/octet-stream'
197 if _is_textual_mimetype( mimetype_ ):
198 return mimetype_, charset_
199 if charset_ is None:
200 raise TextualMimetypeInvalidity( location, mimetype_ )
201 _validate_mimetype_with_trial_decode(
202 content, location, mimetype_, charset_ )
203 return mimetype_, charset_
206def _is_reasonable_text_content( content: str ) -> bool:
207 ''' Checks if decoded content appears to be meaningful text. '''
208 if not content: return False 208 ↛ exitline 208 didn't return from function '_is_reasonable_text_content' because the return on line 208 wasn't executed
209 # Check for excessive repetition of single characters (likely binary)
210 if len( set( content ) ) == 1: return False
211 # Check for excessive control characters (excluding common whitespace)
212 common_whitespace = '\t\n\r'
213 ascii_control_limit = 32
214 control_chars = sum(
215 1 for c in content
216 if ord( c ) < ascii_control_limit and c not in common_whitespace )
217 if control_chars > len( content ) * 0.1: return False # >10% control chars
218 # Check for reasonable printable character ratio
219 printable_chars = sum(
220 1 for c in content if c.isprintable( ) or c in common_whitespace )
221 return printable_chars >= len( content ) * 0.8 # >=80% printable
224# MIME types that are considered textual beyond those starting with 'text/'.
225_TEXTUAL_MIME_TYPES = frozenset( (
226 'application/json',
227 'application/xml',
228 'application/xhtml+xml',
229 'application/x-perl',
230 'application/x-python',
231 'application/x-php',
232 'application/x-ruby',
233 'application/x-shell',
234 'application/javascript',
235 'image/svg+xml',
236) )
237# MIME type suffixes that indicate textual content.
238_TEXTUAL_SUFFIXES = ( '+xml', '+json', '+yaml', '+toml' )
239def _is_textual_mimetype( mimetype: str ) -> bool:
240 ''' Checks if MIME type represents textual content. '''
241 _scribe.debug( f"MIME type: {mimetype}" )
242 if mimetype.startswith( ( 'text/', 'text/x-' ) ): return True
243 if mimetype in _TEXTUAL_MIME_TYPES: return True
244 if mimetype.endswith( _TEXTUAL_SUFFIXES ):
245 _scribe.debug(
246 f"MIME type '{mimetype}' accepted due to textual suffix." )
247 return True
248 return False
251def _produce_fs_tasks(
252 location: str | __.Path, recursive: bool = False
253) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]:
254 location_ = __.Path( location )
255 if location_.is_file( ) or location_.is_symlink( ):
256 return ( _acquire_from_file( location_ ), )
257 if location_.is_dir( ):
258 files = _collect_directory_files( location_, recursive )
259 return tuple( _acquire_from_file( f ) for f in files )
260 raise _exceptions.ContentAcquireFailure( location )
263def _produce_http_task(
264 url: str
265) -> __.cabc.Coroutine[ None, None, _parts.Part ]:
266 # TODO: URL object rather than string.
267 # TODO: Reuse clients for common hosts.
269 async def _execute_session( ) -> _parts.Part:
270 async with _httpx.AsyncClient( # nosec B113
271 follow_redirects = True
272 ) as client: return await _acquire_via_http( client, url )
274 return _execute_session( )
277def _validate_mimetype_with_trial_decode(
278 content: bytes, location: str | __.Path, mimetype: str, charset: str
279) -> None:
280 ''' Validates charset fallback and returns appropriate MIME type. '''
281 from .exceptions import TextualMimetypeInvalidity
282 try: text = content.decode( charset )
283 except ( UnicodeDecodeError, LookupError ) as exc:
284 raise TextualMimetypeInvalidity( location, mimetype ) from exc
285 if _is_reasonable_text_content( text ):
286 _scribe.debug(
287 f"MIME type '{mimetype}' accepted after successful "
288 f"decode test with charset '{charset}' for '{location}'." )
289 return
290 raise TextualMimetypeInvalidity( location, mimetype )