Coverage for sources/mimeogram/acquirers.py: 89%
141 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 21:19 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 21:19 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Content acquisition from various sources. '''
24from __future__ import annotations
26import aiofiles as _aiofiles
27import httpx as _httpx
29from . import __
30from . import exceptions as _exceptions
31from . import parts as _parts
34_scribe = __.produce_scribe( __name__ )
37async def acquire( # pylint: disable=too-many-locals
38 auxdata: __.Globals, sources: __.cabc.Sequence[ str | __.Path ]
39) -> __.cabc.Sequence[ _parts.Part ]:
40 ''' Acquires content from multiple sources. '''
41 from urllib.parse import urlparse
42 options = auxdata.configuration.get( 'acquire-parts', { } )
43 strict = options.get( 'fail-on-invalid', False )
44 recursive = options.get( 'recurse-directories', False )
45 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ]
46 for source in sources:
47 path = __.Path( source )
48 url_parts = (
49 urlparse( source ) if isinstance( source, str )
50 else urlparse( str( source ) ) )
51 scheme = 'file' if path.drive else url_parts.scheme
52 match scheme:
53 case '' | 'file':
54 tasks.extend( _produce_fs_tasks( source, recursive ) )
55 case 'http' | 'https':
56 tasks.append( _produce_http_task( str( source ) ) )
57 case _:
58 raise _exceptions.UrlSchemeNoSupport( str( source ) )
59 if strict: return await __.gather_async( *tasks )
60 results = await __.gather_async( *tasks, return_exceptions = True )
61 # TODO: Factor into '__.generics.extract_results_filter_errors'.
62 values: list[ _parts.Part ] = [ ]
63 for result in results:
64 if result.is_error( ):
65 _scribe.warning( str( result.error ) )
66 continue
67 values.append( result.extract( ) )
68 return tuple( values )
71async def _acquire_from_file( location: __.Path ) -> _parts.Part:
72 ''' Acquires content from text file. '''
73 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
74 try:
75 async with _aiofiles.open( location, 'rb' ) as f:
76 content_bytes = await f.read( )
77 except Exception as exc: raise ContentAcquireFailure( location ) from exc
78 mimetype, charset = _detect_mimetype_and_charset( content_bytes, location )
79 if charset is None: raise ContentDecodeFailure( location, '???' ) 79 ↛ exitline 79 didn't except from function '_acquire_from_file' because the raise on line 79 wasn't executed
80 linesep = _parts.LineSeparators.detect_bytes( content_bytes )
81 if linesep is None: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 _scribe.warning( f"No line separator detected in '{location}'." )
83 linesep = _parts.LineSeparators( __.os.linesep )
84 try: content = content_bytes.decode( charset )
85 except Exception as exc:
86 raise ContentDecodeFailure( location, charset ) from exc
87 _scribe.debug( f"Read file: {location}" )
88 return _parts.Part(
89 location = str( location ),
90 mimetype = mimetype,
91 charset = charset,
92 linesep = linesep,
93 content = linesep.normalize( content ) )
96async def _acquire_via_http( # pylint: disable=too-many-locals
97 client: _httpx.AsyncClient, url: str
98) -> _parts.Part:
99 ''' Acquires content via HTTP/HTTPS. '''
100 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
101 try:
102 response = await client.get( url )
103 response.raise_for_status( )
104 except Exception as exc: raise ContentAcquireFailure( url ) from exc
105 mimetype = (
106 response.headers.get( 'content-type', 'application/octet-stream' )
107 .split( ';' )[ 0 ].strip( ) )
108 content_bytes = response.content
109 charset = response.encoding or _detect_charset( content_bytes )
110 if charset is None: raise ContentDecodeFailure( url, '???' ) 110 ↛ exitline 110 didn't except from function '_acquire_via_http' because the raise on line 110 wasn't executed
111 if not _is_textual_mimetype( mimetype ):
112 mimetype, _ = (
113 _detect_mimetype_and_charset(
114 content_bytes, url, charset = charset ) )
115 linesep = _parts.LineSeparators.detect_bytes( content_bytes )
116 if linesep is None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 _scribe.warning( f"No line separator detected in '{url}'." )
118 linesep = _parts.LineSeparators( __.os.linesep )
119 try: content = content_bytes.decode( charset )
120 except Exception as exc:
121 raise ContentDecodeFailure( url, charset ) from exc
122 _scribe.debug( f"Fetched URL: {url}" )
123 return _parts.Part(
124 location = url,
125 mimetype = mimetype,
126 charset = charset,
127 linesep = linesep,
128 content = linesep.normalize( content ) )
131_files_to_ignore = frozenset( ( '.DS_Store', '.env' ) )
132_directories_to_ignore = frozenset( ( '.bzr', '.git', '.hg', '.svn' ) )
133def _collect_directory_files(
134 directory: __.Path, recursive: bool
135) -> list[ __.Path ]:
136 ''' Collects and filters files from directory hierarchy. '''
137 import gitignorefile
138 cache = gitignorefile.Cache( )
139 paths: list[ __.Path ] = [ ]
140 _scribe.debug( f"Collecting files in directory: {directory}" )
141 for entry in directory.iterdir( ):
142 if entry.is_dir( ) and entry.name in _directories_to_ignore:
143 _scribe.debug( f"Ignoring directory: {entry}" )
144 continue
145 if entry.is_file( ) and entry.name in _files_to_ignore: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 _scribe.debug( f"Ignoring file: {entry}" )
147 continue
148 if cache( str( entry ) ):
149 _scribe.debug( f"Ignoring path (matched by .gitignore): {entry}" )
150 continue
151 if entry.is_dir( ) and recursive:
152 paths.extend( _collect_directory_files( entry, recursive ) )
153 elif entry.is_file( ): paths.append( entry )
154 return paths
157def _detect_charset( content: bytes ) -> str | None:
158 from chardet import detect
159 charset = detect( content )[ 'encoding' ]
160 if charset is None: return charset 160 ↛ exitline 160 didn't return from function '_detect_charset' because the return on line 160 wasn't executed
161 if charset.startswith( 'utf' ): return charset
162 match charset:
163 case 'ascii': return 'utf-8' # Assume superset.
164 case _: pass
165 # Shake out false positives, like 'MacRoman'.
166 try: content.decode( 'utf-8' )
167 except UnicodeDecodeError: return charset
168 return 'utf-8'
171def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None:
172 from mimetypes import guess_type
173 from puremagic import PureError, from_string # pyright: ignore
174 try: return from_string( content, mime = True )
175 except ( PureError, ValueError ):
176 return guess_type( str( location ) )[ 0 ]
179def _detect_mimetype_and_charset(
180 content: bytes,
181 location: str | __.Path, *,
182 mimetype: __.Absential[ str ] = __.absent,
183 charset: __.Absential[ str ] = __.absent,
184) -> tuple[ str, str | None ]:
185 from .exceptions import TextualMimetypeInvalidity
186 if __.is_absent( mimetype ): 186 ↛ 188line 186 didn't jump to line 188 because the condition on line 186 was always true
187 mimetype_ = _detect_mimetype( content, location )
188 else: mimetype_ = mimetype
189 if __.is_absent( charset ):
190 charset_ = _detect_charset( content )
191 else: charset_ = charset
192 if not mimetype_:
193 if charset_: mimetype_ = 'text/plain' # pylint: disable=redefined-variable-type 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was always true
194 else: mimetype_ = 'application/octet-stream'
195 if not _is_textual_mimetype( mimetype_ ):
196 raise TextualMimetypeInvalidity( location, mimetype_ )
197 return mimetype_, charset_
200# MIME types that are considered textual beyond those starting with 'text/'
201_TEXTUAL_MIME_TYPES = frozenset( (
202 'application/json',
203 'application/xml',
204 'application/xhtml+xml',
205 'application/javascript',
206 'image/svg+xml',
207) )
208def _is_textual_mimetype( mimetype: str ) -> bool:
209 ''' Checks if MIME type represents textual content. '''
210 _scribe.debug( f"MIME type: {mimetype}" )
211 if mimetype.startswith( ( 'text/', 'application/x-', 'text/x-' ) ):
212 return True
213 return mimetype in _TEXTUAL_MIME_TYPES
216def _produce_fs_tasks(
217 location: str | __.Path, recursive: bool = False
218) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]:
219 location_ = __.Path( location )
220 if location_.is_file( ) or location_.is_symlink( ):
221 return ( _acquire_from_file( location_ ), )
222 if location_.is_dir( ):
223 files = _collect_directory_files( location_, recursive )
224 return tuple( _acquire_from_file( f ) for f in files )
225 raise _exceptions.ContentAcquireFailure( location )
228def _produce_http_task(
229 url: str
230) -> __.cabc.Coroutine[ None, None, _parts.Part ]:
231 # TODO: URL object rather than string.
232 # TODO: Reuse clients for common hosts.
234 async def _execute_session( ) -> _parts.Part:
235 async with _httpx.AsyncClient( # nosec B113
236 follow_redirects = True
237 ) as client: return await _acquire_via_http( client, url )
239 return _execute_session( )