Coverage for sources/mimeogram/acquirers.py: 89%

141 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 22:32 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Content acquisition from various sources. ''' 

22 

23 

24from __future__ import annotations 

25 

26import aiofiles as _aiofiles 

27import httpx as _httpx 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import parts as _parts 

32 

33 

34_scribe = __.produce_scribe( __name__ ) 

35 

36 

37async def acquire( # pylint: disable=too-many-locals 

38 auxdata: __.Globals, sources: __.cabc.Sequence[ str | __.Path ] 

39) -> __.cabc.Sequence[ _parts.Part ]: 

40 ''' Acquires content from multiple sources. ''' 

41 from urllib.parse import urlparse 

42 options = auxdata.configuration.get( 'acquire-parts', { } ) 

43 strict = options.get( 'fail-on-invalid', False ) 

44 recursive = options.get( 'recurse-directories', False ) 

45 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ] 

46 for source in sources: 

47 path = __.Path( source ) 

48 url_parts = ( 

49 urlparse( source ) if isinstance( source, str ) 

50 else urlparse( str( source ) ) ) 

51 scheme = 'file' if path.drive else url_parts.scheme 

52 match scheme: 

53 case '' | 'file': 

54 tasks.extend( _produce_fs_tasks( source, recursive ) ) 

55 case 'http' | 'https': 

56 tasks.append( _produce_http_task( str( source ) ) ) 

57 case _: 

58 raise _exceptions.UrlSchemeNoSupport( str( source ) ) 

59 if strict: return await __.gather_async( *tasks ) 

60 results = await __.gather_async( *tasks, return_exceptions = True ) 

61 # TODO: Factor into '__.generics.extract_results_filter_errors'. 

62 values: list[ _parts.Part ] = [ ] 

63 for result in results: 

64 if result.is_error( ): 

65 _scribe.warning( str( result.error ) ) 

66 continue 

67 values.append( result.extract( ) ) 

68 return tuple( values ) 

69 

70 

71async def _acquire_from_file( location: __.Path ) -> _parts.Part: 

72 ''' Acquires content from text file. ''' 

73 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

74 try: 

75 async with _aiofiles.open( location, 'rb' ) as f: 

76 content_bytes = await f.read( ) 

77 except Exception as exc: raise ContentAcquireFailure( location ) from exc 

78 mimetype, charset = _detect_mimetype_and_charset( content_bytes, location ) 

79 if charset is None: raise ContentDecodeFailure( location, '???' ) 79 ↛ exitline 79 didn't except from function '_acquire_from_file' because the raise on line 79 wasn't executed

80 linesep = _parts.LineSeparators.detect_bytes( content_bytes ) 

81 if linesep is None: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 _scribe.warning( f"No line separator detected in '{location}'." ) 

83 linesep = _parts.LineSeparators( __.os.linesep ) 

84 try: content = content_bytes.decode( charset ) 

85 except Exception as exc: 

86 raise ContentDecodeFailure( location, charset ) from exc 

87 _scribe.debug( f"Read file: {location}" ) 

88 return _parts.Part( 

89 location = str( location ), 

90 mimetype = mimetype, 

91 charset = charset, 

92 linesep = linesep, 

93 content = linesep.normalize( content ) ) 

94 

95 

96async def _acquire_via_http( # pylint: disable=too-many-locals 

97 client: _httpx.AsyncClient, url: str 

98) -> _parts.Part: 

99 ''' Acquires content via HTTP/HTTPS. ''' 

100 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

101 try: 

102 response = await client.get( url ) 

103 response.raise_for_status( ) 

104 except Exception as exc: raise ContentAcquireFailure( url ) from exc 

105 mimetype = ( 

106 response.headers.get( 'content-type', 'application/octet-stream' ) 

107 .split( ';' )[ 0 ].strip( ) ) 

108 content_bytes = response.content 

109 charset = response.encoding or _detect_charset( content_bytes ) 

110 if charset is None: raise ContentDecodeFailure( url, '???' ) 110 ↛ exitline 110 didn't except from function '_acquire_via_http' because the raise on line 110 wasn't executed

111 if not _is_textual_mimetype( mimetype ): 

112 mimetype, _ = ( 

113 _detect_mimetype_and_charset( 

114 content_bytes, url, charset = charset ) ) 

115 linesep = _parts.LineSeparators.detect_bytes( content_bytes ) 

116 if linesep is None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 _scribe.warning( f"No line separator detected in '{url}'." ) 

118 linesep = _parts.LineSeparators( __.os.linesep ) 

119 try: content = content_bytes.decode( charset ) 

120 except Exception as exc: 

121 raise ContentDecodeFailure( url, charset ) from exc 

122 _scribe.debug( f"Fetched URL: {url}" ) 

123 return _parts.Part( 

124 location = url, 

125 mimetype = mimetype, 

126 charset = charset, 

127 linesep = linesep, 

128 content = linesep.normalize( content ) ) 

129 

130 

131_files_to_ignore = frozenset( ( '.DS_Store', '.env' ) ) 

132_directories_to_ignore = frozenset( ( '.bzr', '.git', '.hg', '.svn' ) ) 

133def _collect_directory_files( 

134 directory: __.Path, recursive: bool 

135) -> list[ __.Path ]: 

136 ''' Collects and filters files from directory hierarchy. ''' 

137 import gitignorefile 

138 cache = gitignorefile.Cache( ) 

139 paths: list[ __.Path ] = [ ] 

140 _scribe.debug( f"Collecting files in directory: {directory}" ) 

141 for entry in directory.iterdir( ): 

142 if entry.is_dir( ) and entry.name in _directories_to_ignore: 

143 _scribe.debug( f"Ignoring directory: {entry}" ) 

144 continue 

145 if entry.is_file( ) and entry.name in _files_to_ignore: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 _scribe.debug( f"Ignoring file: {entry}" ) 

147 continue 

148 if cache( str( entry ) ): 

149 _scribe.debug( f"Ignoring path (matched by .gitignore): {entry}" ) 

150 continue 

151 if entry.is_dir( ) and recursive: 

152 paths.extend( _collect_directory_files( entry, recursive ) ) 

153 elif entry.is_file( ): paths.append( entry ) 

154 return paths 

155 

156 

157def _detect_charset( content: bytes ) -> str | None: 

158 from chardet import detect 

159 charset = detect( content )[ 'encoding' ] 

160 if charset is None: return charset 160 ↛ exitline 160 didn't return from function '_detect_charset' because the return on line 160 wasn't executed

161 if charset.startswith( 'utf' ): return charset 

162 match charset: 

163 case 'ascii': return 'utf-8' # Assume superset. 

164 case _: pass 

165 # Shake out false positives, like 'MacRoman'. 

166 try: content.decode( 'utf-8' ) 

167 except UnicodeDecodeError: return charset 

168 return 'utf-8' 

169 

170 

171def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None: 

172 from mimetypes import guess_type 

173 from puremagic import PureError, from_string # pyright: ignore 

174 try: return from_string( content, mime = True ) 

175 except ( PureError, ValueError ): 

176 return guess_type( str( location ) )[ 0 ] 

177 

178 

179def _detect_mimetype_and_charset( 

180 content: bytes, 

181 location: str | __.Path, *, 

182 mimetype: __.Absential[ str ] = __.absent, 

183 charset: __.Absential[ str ] = __.absent, 

184) -> tuple[ str, str | None ]: 

185 from .exceptions import TextualMimetypeInvalidity 

186 if __.is_absent( mimetype ): 186 ↛ 188line 186 didn't jump to line 188 because the condition on line 186 was always true

187 mimetype_ = _detect_mimetype( content, location ) 

188 else: mimetype_ = mimetype 

189 if __.is_absent( charset ): 

190 charset_ = _detect_charset( content ) 

191 else: charset_ = charset 

192 if not mimetype_: 

193 if charset_: mimetype_ = 'text/plain' # pylint: disable=redefined-variable-type 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was always true

194 else: mimetype_ = 'application/octet-stream' 

195 if not _is_textual_mimetype( mimetype_ ): 

196 raise TextualMimetypeInvalidity( location, mimetype_ ) 

197 return mimetype_, charset_ 

198 

199 

200# MIME types that are considered textual beyond those starting with 'text/' 

201_TEXTUAL_MIME_TYPES = frozenset( ( 

202 'application/json', 

203 'application/xml', 

204 'application/xhtml+xml', 

205 'application/javascript', 

206 'image/svg+xml', 

207) ) 

208def _is_textual_mimetype( mimetype: str ) -> bool: 

209 ''' Checks if MIME type represents textual content. ''' 

210 _scribe.debug( f"MIME type: {mimetype}" ) 

211 if mimetype.startswith( ( 'text/', 'application/x-', 'text/x-' ) ): 

212 return True 

213 return mimetype in _TEXTUAL_MIME_TYPES 

214 

215 

216def _produce_fs_tasks( 

217 location: str | __.Path, recursive: bool = False 

218) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]: 

219 location_ = __.Path( location ) 

220 if location_.is_file( ) or location_.is_symlink( ): 

221 return ( _acquire_from_file( location_ ), ) 

222 if location_.is_dir( ): 

223 files = _collect_directory_files( location_, recursive ) 

224 return tuple( _acquire_from_file( f ) for f in files ) 

225 raise _exceptions.ContentAcquireFailure( location ) 

226 

227 

228def _produce_http_task( 

229 url: str 

230) -> __.cabc.Coroutine[ None, None, _parts.Part ]: 

231 # TODO: URL object rather than string. 

232 # TODO: Reuse clients for common hosts. 

233 

234 async def _execute_session( ) -> _parts.Part: 

235 async with _httpx.AsyncClient( # nosec B113 

236 follow_redirects = True 

237 ) as client: return await _acquire_via_http( client, url ) 

238 

239 return _execute_session( )