Coverage for sources/mimeogram/acquirers.py: 92%

123 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-16 01:42 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Content acquisition from various sources. ''' 

22 

23 

24from __future__ import annotations 

25 

26import aiofiles as _aiofiles 

27import httpx as _httpx 

28 

29from . import __ 

30from . import exceptions as _exceptions 

31from . import parts as _parts 

32 

33 

34_scribe = __.produce_scribe( __name__ ) 

35 

36 

37async def acquire( 

38 auxdata: __.Globals, sources: __.cabc.Sequence[ str | __.Path ] 

39) -> __.cabc.Sequence[ _parts.Part ]: 

40 ''' Acquires content from multiple sources. ''' 

41 from urllib.parse import urlparse 

42 options = auxdata.configuration.get( 'acquire-parts', { } ) 

43 # strict = options.get( 'fail-on-invalid', False ) 

44 recursive = options.get( 'recurse-directories', False ) 

45 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ] 

46 for source in sources: 

47 url_parts = ( 

48 urlparse( source ) if isinstance( source, str ) 

49 else urlparse( str( source ) ) ) 

50 match url_parts.scheme: 

51 case '' | 'file': 

52 tasks.extend( _produce_fs_tasks( source, recursive ) ) 

53 case 'http' | 'https': 

54 tasks.append( _produce_http_task( str( source ) ) ) 

55 case _: 

56 raise _exceptions.UrlSchemeNoSupport( str( source ) ) 

57 return await __.gather_async( *tasks ) 

58 

59 

60async def _acquire_from_file( location: __.Path ) -> _parts.Part: 

61 ''' Acquires content from text file. ''' 

62 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

63 try: 

64 async with _aiofiles.open( location, 'rb' ) as f: 

65 content_bytes = await f.read( ) 

66 except Exception as exc: raise ContentAcquireFailure( location ) from exc 

67 mimetype, charset = _detect_mimetype_and_charset( content_bytes, location ) 

68 if charset is None: raise ContentDecodeFailure( location, '???' ) 68 ↛ exitline 68 didn't except from function '_acquire_from_file' because the raise on line 68 wasn't executed

69 linesep = _parts.LineSeparators.detect_bytes( content_bytes ) 

70 # TODO? Separate error for newline issues. 

71 if linesep is None: raise ContentDecodeFailure( location, charset ) 71 ↛ exitline 71 didn't except from function '_acquire_from_file' because the raise on line 71 wasn't executed

72 try: content = content_bytes.decode( charset ) 

73 except Exception as exc: 

74 raise ContentDecodeFailure( location, charset ) from exc 

75 _scribe.debug( f"Read file: {location}" ) 

76 return _parts.Part( 

77 location = str( location ), 

78 mimetype = mimetype, 

79 charset = charset, 

80 linesep = linesep, 

81 content = linesep.normalize( content ) ) 

82 

83 

84async def _acquire_via_http( # pylint: disable=too-many-locals 

85 client: _httpx.AsyncClient, url: str 

86) -> _parts.Part: 

87 ''' Acquires content via HTTP/HTTPS. ''' 

88 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

89 try: 

90 response = await client.get( url ) 

91 response.raise_for_status( ) 

92 except Exception as exc: raise ContentAcquireFailure( url ) from exc 

93 mimetype = ( 

94 response.headers.get( 'content-type', 'application/octet-stream' ) 

95 .split( ';' )[ 0 ].strip( ) ) 

96 content_bytes = response.content 

97 charset = response.encoding or _detect_charset( content_bytes ) 

98 if charset is None: raise ContentDecodeFailure( url, '???' ) 98 ↛ exitline 98 didn't except from function '_acquire_via_http' because the raise on line 98 wasn't executed

99 if not _is_textual_mimetype( mimetype ): 

100 mimetype, _ = ( 

101 _detect_mimetype_and_charset( 

102 content_bytes, url, charset = charset ) ) 

103 linesep = _parts.LineSeparators.detect_bytes( content_bytes ) 

104 # TODO? Separate error for newline issues. 

105 if linesep is None: raise ContentDecodeFailure( url, charset ) 105 ↛ exitline 105 didn't except from function '_acquire_via_http' because the raise on line 105 wasn't executed

106 try: content = content_bytes.decode( charset ) 

107 except Exception as exc: 

108 raise ContentDecodeFailure( url, charset ) from exc 

109 _scribe.debug( f"Fetched URL: {url}" ) 

110 return _parts.Part( 

111 location = url, 

112 mimetype = mimetype, 

113 charset = charset, 

114 linesep = linesep, 

115 content = linesep.normalize( content ) ) 

116 

117 

118# VCS directories to skip during traversal 

119_VCS_DIRS = frozenset( ( '.git', '.svn', '.hg', '.bzr' ) ) 

120def _collect_directory_files( 

121 directory: __.Path, recursive: bool 

122) -> list[ __.Path ]: 

123 ''' Collects and filters files from directory hierarchy. ''' 

124 import gitignorefile 

125 cache = gitignorefile.Cache( ) 

126 paths: list[ __.Path ] = [ ] 

127 for entry in directory.iterdir( ): 

128 if entry.is_dir( ) and entry.name in _VCS_DIRS: 

129 _scribe.debug( f"Ignoring VCS directory: {entry}" ) 

130 continue 

131 path = entry.resolve( ) 

132 path_str = str( path ) 

133 if cache( path_str ): 

134 _scribe.debug( f"Ignoring path (matched by .gitignore): {entry}" ) 

135 continue 

136 if entry.is_dir( ) and recursive: 

137 paths.extend( _collect_directory_files( path, recursive ) ) 

138 elif entry.is_file( ): paths.append( path ) 

139 return paths 

140 

141 

142def _detect_charset( content: bytes ) -> str | None: 

143 # TODO: Pyright bug: `None is charset` != `charset is None` 

144 from chardet import detect 

145 charset = detect( content )[ 'encoding' ] 

146 if charset is None: return charset 146 ↛ exitline 146 didn't return from function '_detect_charset' because the return on line 146 wasn't executed

147 if charset.startswith( 'utf' ): return charset 

148 match charset: 

149 case 'ascii': return 'utf-8' # Assume superset. 

150 case _: pass 

151 # Shake out false positives, like 'MacRoman'. 

152 try: content.decode( 'utf-8' ) 

153 except UnicodeDecodeError: return charset 

154 return 'utf-8' 

155 

156 

157def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None: 

158 from mimetypes import guess_type 

159 from puremagic import PureError, from_string # pyright: ignore 

160 try: return from_string( content, mime = True ) 

161 except PureError: 

162 return guess_type( str( location ) )[ 0 ] 

163 

164 

165def _detect_mimetype_and_charset( 

166 content: bytes, 

167 location: str | __.Path, *, 

168 mimetype: __.Absential[ str ] = __.absent, 

169 charset: __.Absential[ str ] = __.absent, 

170) -> tuple[ str, str | None ]: 

171 from .exceptions import TextualMimetypeInvalidity 

172 if __.is_absent( mimetype ): 172 ↛ 174line 172 didn't jump to line 174 because the condition on line 172 was always true

173 mimetype_ = _detect_mimetype( content, location ) 

174 else: mimetype_ = mimetype 

175 if __.is_absent( charset ): 

176 charset_ = _detect_charset( content ) 

177 else: charset_ = charset 

178 if not mimetype_: 

179 if charset_: mimetype_ = 'text/plain' # pylint: disable=redefined-variable-type 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was always true

180 else: mimetype_ = 'application/octet-stream' 

181 if not _is_textual_mimetype( mimetype_ ): 

182 raise TextualMimetypeInvalidity( location, mimetype_ ) 

183 return mimetype_, charset_ 

184 

185 

186# MIME types that are considered textual beyond those starting with 'text/' 

187_TEXTUAL_MIME_TYPES = frozenset( ( 

188 'application/json', 

189 'application/xml', 

190 'application/xhtml+xml', 

191 'application/javascript', 

192 'image/svg+xml', 

193) ) 

194def _is_textual_mimetype( mimetype: str ) -> bool: 

195 ''' Checks if MIME type represents textual content. ''' 

196 _scribe.debug( f"MIME type: {mimetype}" ) 

197 if mimetype.startswith( ( 'text/', 'application/x-', 'text/x-' ) ): 

198 return True 

199 return mimetype in _TEXTUAL_MIME_TYPES 

200 

201 

202def _produce_fs_tasks( 

203 location: str | __.Path, recursive: bool = False 

204) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]: 

205 location_ = ( 

206 __.Path( location ) if isinstance( location, str ) else location ) 

207 if location_.is_file( ) or location_.is_symlink( ): 

208 return ( _acquire_from_file( location_ ), ) 

209 if location_.is_dir( ): 

210 files = _collect_directory_files( location_, recursive ) 

211 return tuple( _acquire_from_file( f ) for f in files ) 

212 raise _exceptions.ContentAcquireFailure( location ) 

213 

214 

215def _produce_http_task( 

216 url: str 

217) -> __.cabc.Coroutine[ None, None, _parts.Part ]: 

218 # TODO: URL object rather than string. 

219 # TODO: Reuse clients for common hosts. 

220 

221 async def _execute_session( ) -> _parts.Part: 

222 async with _httpx.AsyncClient( # nosec B113 

223 follow_redirects = True 

224 ) as client: return await _acquire_via_http( client, url ) 

225 

226 return _execute_session( )