Coverage for sources / mimeogram / acquirers.py: 94%
109 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-18 17:27 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-18 17:27 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Content acquisition from various sources. '''
24import aiofiles as _aiofiles
25import httpx as _httpx
27from . import __
28from . import exceptions as _exceptions
29from . import parts as _parts
32_scribe = __.produce_scribe( __name__ )
33_decode_inform_behaviors = __.dcls.replace(
34 __.detextive.BEHAVIORS_DEFAULT,
35 trial_decode_confidence = 0.75 )
38async def acquire(
39 auxdata: __.appcore.state.Globals,
40 sources: __.cabc.Sequence[ str | __.Path ],
41) -> __.cabc.Sequence[ _parts.Part ]:
42 ''' Acquires content from multiple sources. '''
43 from urllib.parse import urlparse
44 options = auxdata.configuration.get( 'acquire-parts', { } )
45 strict = options.get( 'fail-on-invalid', False )
46 recursive = options.get( 'recurse-directories', False )
47 no_ignores = options.get( 'no-ignores', False )
48 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ]
49 for source in sources:
50 path = __.Path( source )
51 url_parts = (
52 urlparse( source ) if isinstance( source, str )
53 else urlparse( str( source ) ) )
54 scheme = 'file' if path.drive else url_parts.scheme
55 match scheme:
56 case '' | 'file':
57 fs_tasks = _produce_fs_tasks( source, recursive, no_ignores )
58 tasks.extend( fs_tasks )
59 case 'http' | 'https':
60 tasks.append( _produce_http_task( str( source ) ) )
61 case _:
62 raise _exceptions.UrlSchemeNoSupport( str( source ) )
63 if strict: return await __.asyncf.gather_async( *tasks )
64 results: tuple[ __.generics.GenericResult, ... ] = (
65 await __.asyncf.gather_async(
66 *tasks, return_exceptions = True
67 )
68 )
69 # TODO: Factor into '__.generics.extract_results_filter_errors'.
70 values: list[ _parts.Part ] = [ ]
71 for result in results:
72 if __.generics.is_error( result ):
73 _scribe.warning( str( result.error ) )
74 continue
75 values.append( result.extract( ) )
76 return tuple( values )
78async def _acquire_from_file( location: __.Path ) -> _parts.Part:
79 ''' Acquires content from text file. '''
80 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
81 try:
82 async with _aiofiles.open( location, 'rb' ) as f: # pyright: ignore
83 content_bytes = await f.read( )
84 except Exception as exc: raise ContentAcquireFailure( location ) from exc
85 try:
86 result = __.detextive.decode_inform(
87 content_bytes,
88 location = str( location ),
89 behaviors = _decode_inform_behaviors )
90 except Exception as exc:
91 raise ContentDecodeFailure( location, '???' ) from exc
92 mimetype = result.mimetype.mimetype
93 charset = result.charset.charset
94 if charset is None: raise ContentDecodeFailure( location, '???' ) 94 ↛ exitline 94 didn't except from function '_acquire_from_file' because the raise on line 94 wasn't executed
95 linesep = result.linesep
96 if linesep is None:
97 _scribe.warning( f"No line separator detected in '{location}'." )
98 linesep = __.detextive.LineSeparators( __.os.linesep )
99 _scribe.debug( f"Read file: {location}" )
100 return _parts.Part(
101 location = str( location ),
102 mimetype = mimetype,
103 charset = charset,
104 linesep = linesep,
105 content = linesep.normalize( result.text ) )
108async def _acquire_via_http(
109 client: _httpx.AsyncClient, url: str
110) -> _parts.Part:
111 ''' Acquires content via HTTP/HTTPS. '''
112 from .exceptions import ContentAcquireFailure, ContentDecodeFailure
113 try:
114 response = await client.get( url )
115 response.raise_for_status( )
116 except Exception as exc: raise ContentAcquireFailure( url ) from exc
117 http_content_type = response.headers.get( 'content-type' )
118 content_bytes = response.content
119 try:
120 result = __.detextive.decode_inform(
121 content_bytes,
122 location = url,
123 behaviors = _decode_inform_behaviors,
124 http_content_type = http_content_type or __.absent )
125 except Exception as exc:
126 raise ContentDecodeFailure( url, '???' ) from exc
127 mimetype = result.mimetype.mimetype
128 charset = result.charset.charset
129 if charset is None: raise ContentDecodeFailure( url, '???' ) 129 ↛ exitline 129 didn't except from function '_acquire_via_http' because the raise on line 129 wasn't executed
130 linesep = result.linesep
131 if linesep is None: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 _scribe.warning( f"No line separator detected in '{url}'." )
133 linesep = __.detextive.LineSeparators( __.os.linesep )
134 _scribe.debug( f"Fetched URL: {url}" )
135 return _parts.Part(
136 location = url,
137 mimetype = mimetype,
138 charset = charset,
139 linesep = linesep,
140 content = linesep.normalize( result.text ) )
143_files_to_ignore = frozenset( ( '.DS_Store', '.env' ) )
144_directories_to_ignore = frozenset( ( '.bzr', '.git', '.hg', '.svn' ) )
145def _collect_directory_files(
146 directory: __.Path, recursive: bool, no_ignores: bool = False
147) -> list[ __.Path ]:
148 ''' Collects and filters files from directory hierarchy.
150 When no_ignores is True, gitignore filtering is disabled.
151 When gitignore filtering is enabled, warnings are emitted for
152 filtered paths.
153 '''
154 import gitignorefile
155 cache = gitignorefile.Cache( )
156 paths: list[ __.Path ] = [ ]
157 _scribe.debug( f"Collecting files in directory: {directory}" )
158 for entry in directory.iterdir( ):
159 if entry.is_dir( ) and entry.name in _directories_to_ignore:
160 _scribe.debug( f"Ignoring directory: {entry}" )
161 continue
162 if entry.is_file( ) and entry.name in _files_to_ignore: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 _scribe.debug( f"Ignoring file: {entry}" )
164 continue
165 if not no_ignores and cache( str( entry ) ):
166 _scribe.warning(
167 f"Skipping path (matched by .gitignore): {entry}. "
168 "Use --no-ignores to include." )
169 continue
170 if entry.is_dir( ) and recursive:
171 collected = _collect_directory_files(
172 entry, recursive, no_ignores )
173 paths.extend( collected )
174 elif entry.is_file( ): paths.append( entry )
175 return paths
178def _produce_fs_tasks(
179 location: str | __.Path, recursive: bool = False, no_ignores: bool = False
180) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]:
181 location_ = __.Path( location )
182 if location_.is_file( ) or location_.is_symlink( ):
183 return ( _acquire_from_file( location_ ), )
184 if location_.is_dir( ):
185 files = _collect_directory_files( location_, recursive, no_ignores )
186 return tuple( _acquire_from_file( f ) for f in files )
187 raise _exceptions.ContentAcquireFailure( location )
190def _produce_http_task(
191 url: str
192) -> __.cabc.Coroutine[ None, None, _parts.Part ]:
193 # TODO: URL object rather than string.
194 # TODO: Reuse clients for common hosts.
196 async def _execute_session( ) -> _parts.Part:
197 async with _httpx.AsyncClient( # nosec B113
198 follow_redirects = True
199 ) as client: return await _acquire_via_http( client, url )
201 return _execute_session( )