Coverage for sources/mimeogram/acquirers.py: 92%
123 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-16 03:28 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-16 03:28 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Content acquisition from various sources. '''
24from __future__ import annotations
26import aiofiles as _aiofiles
27import httpx as _httpx
29from . import __
30from . import exceptions as _exceptions
31from . import parts as _parts
34_scribe = __.produce_scribe( __name__ )
37async def acquire(
38 auxdata: __.Globals, sources: __.cabc.Sequence[ str | __.Path ]
39) -> __.cabc.Sequence[ _parts.Part ]:
40 ''' Acquires content from multiple sources. '''
41 from urllib.parse import urlparse
42 options = auxdata.configuration.get( 'acquire-parts', { } )
43 # strict = options.get( 'fail-on-invalid', False )
44 recursive = options.get( 'recurse-directories', False )
45 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ]
46 for source in sources:
47 url_parts = (
48 urlparse( source ) if isinstance( source, str )
49 else urlparse( str( source ) ) )
50 match url_parts.scheme:
51 case '' | 'file':
52 tasks.extend( _produce_fs_tasks( source, recursive ) )
53 case 'http' | 'https':
54 tasks.append( _produce_http_task( str( source ) ) )
55 case _:
56 raise _exceptions.UrlSchemeNoSupport( str( source ) )
57 return await __.gather_async( *tasks )
60async def _acquire_from_file( location: __.Path ) -> _parts.Part:
61 ''' Acquires content from text file. '''
62 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
63 try:
64 async with _aiofiles.open( location, 'rb' ) as f:
65 content_bytes = await f.read( )
66 except Exception as exc: raise ContentAcquireFailure( location ) from exc
67 mimetype, charset = _detect_mimetype_and_charset( content_bytes, location )
68 if charset is None: raise ContentDecodeFailure( location, '???' ) 68 ↛ exitline 68 didn't except from function '_acquire_from_file' because the raise on line 68 wasn't executed
69 linesep = _parts.LineSeparators.detect_bytes( content_bytes )
70 # TODO? Separate error for newline issues.
71 if linesep is None: raise ContentDecodeFailure( location, charset ) 71 ↛ exitline 71 didn't except from function '_acquire_from_file' because the raise on line 71 wasn't executed
72 try: content = content_bytes.decode( charset )
73 except Exception as exc:
74 raise ContentDecodeFailure( location, charset ) from exc
75 _scribe.debug( f"Read file: {location}" )
76 return _parts.Part(
77 location = str( location ),
78 mimetype = mimetype,
79 charset = charset,
80 linesep = linesep,
81 content = linesep.normalize( content ) )
84async def _acquire_via_http( # pylint: disable=too-many-locals
85 client: _httpx.AsyncClient, url: str
86) -> _parts.Part:
87 ''' Acquires content via HTTP/HTTPS. '''
88 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
89 try:
90 response = await client.get( url )
91 response.raise_for_status( )
92 except Exception as exc: raise ContentAcquireFailure( url ) from exc
93 mimetype = (
94 response.headers.get( 'content-type', 'application/octet-stream' )
95 .split( ';' )[ 0 ].strip( ) )
96 content_bytes = response.content
97 charset = response.encoding or _detect_charset( content_bytes )
98 if charset is None: raise ContentDecodeFailure( url, '???' ) 98 ↛ exitline 98 didn't except from function '_acquire_via_http' because the raise on line 98 wasn't executed
99 if not _is_textual_mimetype( mimetype ):
100 mimetype, _ = (
101 _detect_mimetype_and_charset(
102 content_bytes, url, charset = charset ) )
103 linesep = _parts.LineSeparators.detect_bytes( content_bytes )
104 # TODO? Separate error for newline issues.
105 if linesep is None: raise ContentDecodeFailure( url, charset ) 105 ↛ exitline 105 didn't except from function '_acquire_via_http' because the raise on line 105 wasn't executed
106 try: content = content_bytes.decode( charset )
107 except Exception as exc:
108 raise ContentDecodeFailure( url, charset ) from exc
109 _scribe.debug( f"Fetched URL: {url}" )
110 return _parts.Part(
111 location = url,
112 mimetype = mimetype,
113 charset = charset,
114 linesep = linesep,
115 content = linesep.normalize( content ) )
118# VCS directories to skip during traversal
119_VCS_DIRS = frozenset( ( '.git', '.svn', '.hg', '.bzr' ) )
120def _collect_directory_files(
121 directory: __.Path, recursive: bool
122) -> list[ __.Path ]:
123 ''' Collects and filters files from directory hierarchy. '''
124 import gitignorefile
125 cache = gitignorefile.Cache( )
126 paths: list[ __.Path ] = [ ]
127 for entry in directory.iterdir( ):
128 if entry.is_dir( ) and entry.name in _VCS_DIRS:
129 _scribe.debug( f"Ignoring VCS directory: {entry}" )
130 continue
131 path = entry.resolve( )
132 path_str = str( path )
133 if cache( path_str ):
134 _scribe.debug( f"Ignoring path (matched by .gitignore): {entry}" )
135 continue
136 if entry.is_dir( ) and recursive:
137 paths.extend( _collect_directory_files( path, recursive ) )
138 elif entry.is_file( ): paths.append( path )
139 return paths
142def _detect_charset( content: bytes ) -> str | None:
143 # TODO: Pyright bug: `None is charset` != `charset is None`
144 from chardet import detect
145 charset = detect( content )[ 'encoding' ]
146 if charset is None: return charset 146 ↛ exitline 146 didn't return from function '_detect_charset' because the return on line 146 wasn't executed
147 if charset.startswith( 'utf' ): return charset
148 match charset:
149 case 'ascii': return 'utf-8' # Assume superset.
150 case _: pass
151 # Shake out false positives, like 'MacRoman'.
152 try: content.decode( 'utf-8' )
153 except UnicodeDecodeError: return charset
154 return 'utf-8'
157def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None:
158 from mimetypes import guess_type
159 from puremagic import PureError, from_string # pyright: ignore
160 try: return from_string( content, mime = True )
161 except PureError:
162 return guess_type( str( location ) )[ 0 ]
165def _detect_mimetype_and_charset(
166 content: bytes,
167 location: str | __.Path, *,
168 mimetype: __.Absential[ str ] = __.absent,
169 charset: __.Absential[ str ] = __.absent,
170) -> tuple[ str, str | None ]:
171 from .exceptions import TextualMimetypeInvalidity
172 if __.is_absent( mimetype ): 172 ↛ 174line 172 didn't jump to line 174 because the condition on line 172 was always true
173 mimetype_ = _detect_mimetype( content, location )
174 else: mimetype_ = mimetype
175 if __.is_absent( charset ):
176 charset_ = _detect_charset( content )
177 else: charset_ = charset
178 if not mimetype_:
179 if charset_: mimetype_ = 'text/plain' # pylint: disable=redefined-variable-type 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was always true
180 else: mimetype_ = 'application/octet-stream'
181 if not _is_textual_mimetype( mimetype_ ):
182 raise TextualMimetypeInvalidity( location, mimetype_ )
183 return mimetype_, charset_
186# MIME types that are considered textual beyond those starting with 'text/'
187_TEXTUAL_MIME_TYPES = frozenset( (
188 'application/json',
189 'application/xml',
190 'application/xhtml+xml',
191 'application/javascript',
192 'image/svg+xml',
193) )
194def _is_textual_mimetype( mimetype: str ) -> bool:
195 ''' Checks if MIME type represents textual content. '''
196 _scribe.debug( f"MIME type: {mimetype}" )
197 if mimetype.startswith( ( 'text/', 'application/x-', 'text/x-' ) ):
198 return True
199 return mimetype in _TEXTUAL_MIME_TYPES
202def _produce_fs_tasks(
203 location: str | __.Path, recursive: bool = False
204) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]:
205 location_ = (
206 __.Path( location ) if isinstance( location, str ) else location )
207 if location_.is_file( ) or location_.is_symlink( ):
208 return ( _acquire_from_file( location_ ), )
209 if location_.is_dir( ):
210 files = _collect_directory_files( location_, recursive )
211 return tuple( _acquire_from_file( f ) for f in files )
212 raise _exceptions.ContentAcquireFailure( location )
215def _produce_http_task(
216 url: str
217) -> __.cabc.Coroutine[ None, None, _parts.Part ]:
218 # TODO: URL object rather than string.
219 # TODO: Reuse clients for common hosts.
221 async def _execute_session( ) -> _parts.Part:
222 async with _httpx.AsyncClient( # nosec B113
223 follow_redirects = True
224 ) as client: return await _acquire_via_http( client, url )
226 return _execute_session( )