Coverage for sources/mimeogram/acquirers.py: 92%

168 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-27 13:03 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Content acquisition from various sources. ''' 

22 

23 

24import aiofiles as _aiofiles 

25import httpx as _httpx 

26 

27from . import __ 

28from . import exceptions as _exceptions 

29from . import parts as _parts 

30 

31 

32_scribe = __.produce_scribe( __name__ ) 

33 

34 

35async def acquire( 

36 auxdata: __.Globals, sources: __.cabc.Sequence[ str | __.Path ] 

37) -> __.cabc.Sequence[ _parts.Part ]: 

38 ''' Acquires content from multiple sources. ''' 

39 from urllib.parse import urlparse 

40 options = auxdata.configuration.get( 'acquire-parts', { } ) 

41 strict = options.get( 'fail-on-invalid', False ) 

42 recursive = options.get( 'recurse-directories', False ) 

43 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ] 

44 for source in sources: 

45 path = __.Path( source ) 

46 url_parts = ( 

47 urlparse( source ) if isinstance( source, str ) 

48 else urlparse( str( source ) ) ) 

49 scheme = 'file' if path.drive else url_parts.scheme 

50 match scheme: 

51 case '' | 'file': 

52 tasks.extend( _produce_fs_tasks( source, recursive ) ) 

53 case 'http' | 'https': 

54 tasks.append( _produce_http_task( str( source ) ) ) 

55 case _: 

56 raise _exceptions.UrlSchemeNoSupport( str( source ) ) 

57 if strict: return await __.gather_async( *tasks ) 

58 results = await __.gather_async( *tasks, return_exceptions = True ) 

59 # TODO: Factor into '__.generics.extract_results_filter_errors'. 

60 values: list[ _parts.Part ] = [ ] 

61 for result in results: 

62 if result.is_error( ): 

63 _scribe.warning( str( result.error ) ) 

64 continue 

65 values.append( result.extract( ) ) 

66 return tuple( values ) 

67 

68 

69async def _acquire_from_file( location: __.Path ) -> _parts.Part: 

70 ''' Acquires content from text file. ''' 

71 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

72 try: 

73 async with _aiofiles.open( location, 'rb' ) as f: # pyright: ignore 

74 content_bytes = await f.read( ) 

75 except Exception as exc: raise ContentAcquireFailure( location ) from exc 

76 mimetype, charset = _detect_mimetype_and_charset( content_bytes, location ) 

77 if charset is None: raise ContentDecodeFailure( location, '???' ) 77 ↛ exitline 77 didn't except from function '_acquire_from_file' because the raise on line 77 wasn't executed

78 linesep = _parts.LineSeparators.detect_bytes( content_bytes ) 

79 if linesep is None: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 _scribe.warning( f"No line separator detected in '{location}'." ) 

81 linesep = _parts.LineSeparators( __.os.linesep ) 

82 try: content = content_bytes.decode( charset ) 

83 except Exception as exc: 

84 raise ContentDecodeFailure( location, charset ) from exc 

85 _scribe.debug( f"Read file: {location}" ) 

86 return _parts.Part( 

87 location = str( location ), 

88 mimetype = mimetype, 

89 charset = charset, 

90 linesep = linesep, 

91 content = linesep.normalize( content ) ) 

92 

93 

94async def _acquire_via_http( 

95 client: _httpx.AsyncClient, url: str 

96) -> _parts.Part: 

97 ''' Acquires content via HTTP/HTTPS. ''' 

98 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

99 try: 

100 response = await client.get( url ) 

101 response.raise_for_status( ) 

102 except Exception as exc: raise ContentAcquireFailure( url ) from exc 

103 mimetype = ( 

104 response.headers.get( 'content-type', 'application/octet-stream' ) 

105 .split( ';' )[ 0 ].strip( ) ) 

106 content_bytes = response.content 

107 charset = response.encoding or _detect_charset( content_bytes ) 

108 if charset is None: raise ContentDecodeFailure( url, '???' ) 108 ↛ exitline 108 didn't except from function '_acquire_via_http' because the raise on line 108 wasn't executed

109 if not _is_textual_mimetype( mimetype ): 

110 mimetype, _ = ( 

111 _detect_mimetype_and_charset( 

112 content_bytes, url, charset = charset ) ) 

113 linesep = _parts.LineSeparators.detect_bytes( content_bytes ) 

114 if linesep is None: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 _scribe.warning( f"No line separator detected in '{url}'." ) 

116 linesep = _parts.LineSeparators( __.os.linesep ) 

117 try: content = content_bytes.decode( charset ) 

118 except Exception as exc: 

119 raise ContentDecodeFailure( url, charset ) from exc 

120 _scribe.debug( f"Fetched URL: {url}" ) 

121 return _parts.Part( 

122 location = url, 

123 mimetype = mimetype, 

124 charset = charset, 

125 linesep = linesep, 

126 content = linesep.normalize( content ) ) 

127 

128 

129_files_to_ignore = frozenset( ( '.DS_Store', '.env' ) ) 

130_directories_to_ignore = frozenset( ( '.bzr', '.git', '.hg', '.svn' ) ) 

131def _collect_directory_files( 

132 directory: __.Path, recursive: bool 

133) -> list[ __.Path ]: 

134 ''' Collects and filters files from directory hierarchy. ''' 

135 import gitignorefile 

136 cache = gitignorefile.Cache( ) 

137 paths: list[ __.Path ] = [ ] 

138 _scribe.debug( f"Collecting files in directory: {directory}" ) 

139 for entry in directory.iterdir( ): 

140 if entry.is_dir( ) and entry.name in _directories_to_ignore: 

141 _scribe.debug( f"Ignoring directory: {entry}" ) 

142 continue 

143 if entry.is_file( ) and entry.name in _files_to_ignore: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 _scribe.debug( f"Ignoring file: {entry}" ) 

145 continue 

146 if cache( str( entry ) ): 

147 _scribe.debug( f"Ignoring path (matched by .gitignore): {entry}" ) 

148 continue 

149 if entry.is_dir( ) and recursive: 

150 paths.extend( _collect_directory_files( entry, recursive ) ) 

151 elif entry.is_file( ): paths.append( entry ) 

152 return paths 

153 

154 

155def _detect_charset( content: bytes ) -> str | None: 

156 from chardet import detect 

157 charset = detect( content )[ 'encoding' ] 

158 if charset is None: return charset 

159 if charset.startswith( 'utf' ): return charset 

160 match charset: 

161 case 'ascii': return 'utf-8' # Assume superset. 

162 case _: pass 

163 # Shake out false positives, like 'MacRoman'. 

164 try: content.decode( 'utf-8' ) 

165 except UnicodeDecodeError: return charset 

166 return 'utf-8' 

167 

168 

169def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None: 

170 from mimetypes import guess_type 

171 from puremagic import PureError, from_string # pyright: ignore 

172 try: return from_string( content, mime = True ) 

173 except ( PureError, ValueError ): 

174 return guess_type( str( location ) )[ 0 ] 

175 

176 

177def _detect_mimetype_and_charset( 

178 content: bytes, 

179 location: str | __.Path, *, 

180 mimetype: __.Absential[ str ] = __.absent, 

181 charset: __.Absential[ str ] = __.absent, 

182) -> tuple[ str, str | None ]: 

183 from .exceptions import TextualMimetypeInvalidity 

184 if __.is_absent( mimetype ): 184 ↛ 186line 184 didn't jump to line 186 because the condition on line 184 was always true

185 mimetype_ = _detect_mimetype( content, location ) 

186 else: mimetype_ = mimetype 

187 if __.is_absent( charset ): # noqa: SIM108 

188 charset_ = _detect_charset( content ) 

189 else: charset_ = charset 

190 if not mimetype_: 

191 if charset_: 

192 mimetype_ = 'text/plain' 

193 _validate_mimetype_with_trial_decode( 

194 content, location, mimetype_, charset_ ) 

195 return mimetype_, charset_ 

196 mimetype_ = 'application/octet-stream' 

197 if _is_textual_mimetype( mimetype_ ): 

198 return mimetype_, charset_ 

199 if charset_ is None: 

200 raise TextualMimetypeInvalidity( location, mimetype_ ) 

201 _validate_mimetype_with_trial_decode( 

202 content, location, mimetype_, charset_ ) 

203 return mimetype_, charset_ 

204 

205 

206def _is_reasonable_text_content( content: str ) -> bool: 

207 ''' Checks if decoded content appears to be meaningful text. ''' 

208 if not content: return False 208 ↛ exitline 208 didn't return from function '_is_reasonable_text_content' because the return on line 208 wasn't executed

209 # Check for excessive repetition of single characters (likely binary) 

210 if len( set( content ) ) == 1: return False 

211 # Check for excessive control characters (excluding common whitespace) 

212 common_whitespace = '\t\n\r' 

213 ascii_control_limit = 32 

214 control_chars = sum( 

215 1 for c in content 

216 if ord( c ) < ascii_control_limit and c not in common_whitespace ) 

217 if control_chars > len( content ) * 0.1: return False # >10% control chars 

218 # Check for reasonable printable character ratio 

219 printable_chars = sum( 

220 1 for c in content if c.isprintable( ) or c in common_whitespace ) 

221 return printable_chars >= len( content ) * 0.8 # >=80% printable 

222 

223 

224# MIME types that are considered textual beyond those starting with 'text/'. 

225_TEXTUAL_MIME_TYPES = frozenset( ( 

226 'application/json', 

227 'application/xml', 

228 'application/xhtml+xml', 

229 'application/x-perl', 

230 'application/x-python', 

231 'application/x-php', 

232 'application/x-ruby', 

233 'application/x-shell', 

234 'application/javascript', 

235 'image/svg+xml', 

236) ) 

237# MIME type suffixes that indicate textual content. 

238_TEXTUAL_SUFFIXES = ( '+xml', '+json', '+yaml', '+toml' ) 

239def _is_textual_mimetype( mimetype: str ) -> bool: 

240 ''' Checks if MIME type represents textual content. ''' 

241 _scribe.debug( f"MIME type: {mimetype}" ) 

242 if mimetype.startswith( ( 'text/', 'text/x-' ) ): return True 

243 if mimetype in _TEXTUAL_MIME_TYPES: return True 

244 if mimetype.endswith( _TEXTUAL_SUFFIXES ): 

245 _scribe.debug( 

246 f"MIME type '{mimetype}' accepted due to textual suffix." ) 

247 return True 

248 return False 

249 

250 

251def _produce_fs_tasks( 

252 location: str | __.Path, recursive: bool = False 

253) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]: 

254 location_ = __.Path( location ) 

255 if location_.is_file( ) or location_.is_symlink( ): 

256 return ( _acquire_from_file( location_ ), ) 

257 if location_.is_dir( ): 

258 files = _collect_directory_files( location_, recursive ) 

259 return tuple( _acquire_from_file( f ) for f in files ) 

260 raise _exceptions.ContentAcquireFailure( location ) 

261 

262 

263def _produce_http_task( 

264 url: str 

265) -> __.cabc.Coroutine[ None, None, _parts.Part ]: 

266 # TODO: URL object rather than string. 

267 # TODO: Reuse clients for common hosts. 

268 

269 async def _execute_session( ) -> _parts.Part: 

270 async with _httpx.AsyncClient( # nosec B113 

271 follow_redirects = True 

272 ) as client: return await _acquire_via_http( client, url ) 

273 

274 return _execute_session( ) 

275 

276 

277def _validate_mimetype_with_trial_decode( 

278 content: bytes, location: str | __.Path, mimetype: str, charset: str 

279) -> None: 

280 ''' Validates charset fallback and returns appropriate MIME type. ''' 

281 from .exceptions import TextualMimetypeInvalidity 

282 try: text = content.decode( charset ) 

283 except ( UnicodeDecodeError, LookupError ) as exc: 

284 raise TextualMimetypeInvalidity( location, mimetype ) from exc 

285 if _is_reasonable_text_content( text ): 

286 _scribe.debug( 

287 f"MIME type '{mimetype}' accepted after successful " 

288 f"decode test with charset '{charset}' for '{location}'." ) 

289 return 

290 raise TextualMimetypeInvalidity( location, mimetype )