Coverage for sources / mimeogram / acquirers.py: 94%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-18 17:27 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Content acquisition from various sources. ''' 

22 

23 

24import aiofiles as _aiofiles 

25import httpx as _httpx 

26 

27from . import __ 

28from . import exceptions as _exceptions 

29from . import parts as _parts 

30 

31 

32_scribe = __.produce_scribe( __name__ ) 

33_decode_inform_behaviors = __.dcls.replace( 

34 __.detextive.BEHAVIORS_DEFAULT, 

35 trial_decode_confidence = 0.75 ) 

36 

37 

38async def acquire( 

39 auxdata: __.appcore.state.Globals, 

40 sources: __.cabc.Sequence[ str | __.Path ], 

41) -> __.cabc.Sequence[ _parts.Part ]: 

42 ''' Acquires content from multiple sources. ''' 

43 from urllib.parse import urlparse 

44 options = auxdata.configuration.get( 'acquire-parts', { } ) 

45 strict = options.get( 'fail-on-invalid', False ) 

46 recursive = options.get( 'recurse-directories', False ) 

47 no_ignores = options.get( 'no-ignores', False ) 

48 tasks: list[ __.cabc.Coroutine[ None, None, _parts.Part ] ] = [ ] 

49 for source in sources: 

50 path = __.Path( source ) 

51 url_parts = ( 

52 urlparse( source ) if isinstance( source, str ) 

53 else urlparse( str( source ) ) ) 

54 scheme = 'file' if path.drive else url_parts.scheme 

55 match scheme: 

56 case '' | 'file': 

57 fs_tasks = _produce_fs_tasks( source, recursive, no_ignores ) 

58 tasks.extend( fs_tasks ) 

59 case 'http' | 'https': 

60 tasks.append( _produce_http_task( str( source ) ) ) 

61 case _: 

62 raise _exceptions.UrlSchemeNoSupport( str( source ) ) 

63 if strict: return await __.asyncf.gather_async( *tasks ) 

64 results: tuple[ __.generics.GenericResult, ... ] = ( 

65 await __.asyncf.gather_async( 

66 *tasks, return_exceptions = True 

67 ) 

68 ) 

69 # TODO: Factor into '__.generics.extract_results_filter_errors'. 

70 values: list[ _parts.Part ] = [ ] 

71 for result in results: 

72 if __.generics.is_error( result ): 

73 _scribe.warning( str( result.error ) ) 

74 continue 

75 values.append( result.extract( ) ) 

76 return tuple( values ) 

77 

78async def _acquire_from_file( location: __.Path ) -> _parts.Part: 

79 ''' Acquires content from text file. ''' 

80 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

81 try: 

82 async with _aiofiles.open( location, 'rb' ) as f: # pyright: ignore 

83 content_bytes = await f.read( ) 

84 except Exception as exc: raise ContentAcquireFailure( location ) from exc 

85 try: 

86 result = __.detextive.decode_inform( 

87 content_bytes, 

88 location = str( location ), 

89 behaviors = _decode_inform_behaviors ) 

90 except Exception as exc: 

91 raise ContentDecodeFailure( location, '???' ) from exc 

92 mimetype = result.mimetype.mimetype 

93 charset = result.charset.charset 

94 if charset is None: raise ContentDecodeFailure( location, '???' ) 94 ↛ exitline 94 didn't except from function '_acquire_from_file' because the raise on line 94 wasn't executed

95 linesep = result.linesep 

96 if linesep is None: 

97 _scribe.warning( f"No line separator detected in '{location}'." ) 

98 linesep = __.detextive.LineSeparators( __.os.linesep ) 

99 _scribe.debug( f"Read file: {location}" ) 

100 return _parts.Part( 

101 location = str( location ), 

102 mimetype = mimetype, 

103 charset = charset, 

104 linesep = linesep, 

105 content = linesep.normalize( result.text ) ) 

106 

107 

108async def _acquire_via_http( 

109 client: _httpx.AsyncClient, url: str 

110) -> _parts.Part: 

111 ''' Acquires content via HTTP/HTTPS. ''' 

112 from .exceptions import ContentAcquireFailure, ContentDecodeFailure 

113 try: 

114 response = await client.get( url ) 

115 response.raise_for_status( ) 

116 except Exception as exc: raise ContentAcquireFailure( url ) from exc 

117 http_content_type = response.headers.get( 'content-type' ) 

118 content_bytes = response.content 

119 try: 

120 result = __.detextive.decode_inform( 

121 content_bytes, 

122 location = url, 

123 behaviors = _decode_inform_behaviors, 

124 http_content_type = http_content_type or __.absent ) 

125 except Exception as exc: 

126 raise ContentDecodeFailure( url, '???' ) from exc 

127 mimetype = result.mimetype.mimetype 

128 charset = result.charset.charset 

129 if charset is None: raise ContentDecodeFailure( url, '???' ) 129 ↛ exitline 129 didn't except from function '_acquire_via_http' because the raise on line 129 wasn't executed

130 linesep = result.linesep 

131 if linesep is None: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 _scribe.warning( f"No line separator detected in '{url}'." ) 

133 linesep = __.detextive.LineSeparators( __.os.linesep ) 

134 _scribe.debug( f"Fetched URL: {url}" ) 

135 return _parts.Part( 

136 location = url, 

137 mimetype = mimetype, 

138 charset = charset, 

139 linesep = linesep, 

140 content = linesep.normalize( result.text ) ) 

141 

142 

143_files_to_ignore = frozenset( ( '.DS_Store', '.env' ) ) 

144_directories_to_ignore = frozenset( ( '.bzr', '.git', '.hg', '.svn' ) ) 

145def _collect_directory_files( 

146 directory: __.Path, recursive: bool, no_ignores: bool = False 

147) -> list[ __.Path ]: 

148 ''' Collects and filters files from directory hierarchy. 

149 

150 When no_ignores is True, gitignore filtering is disabled. 

151 When gitignore filtering is enabled, warnings are emitted for 

152 filtered paths. 

153 ''' 

154 import gitignorefile 

155 cache = gitignorefile.Cache( ) 

156 paths: list[ __.Path ] = [ ] 

157 _scribe.debug( f"Collecting files in directory: {directory}" ) 

158 for entry in directory.iterdir( ): 

159 if entry.is_dir( ) and entry.name in _directories_to_ignore: 

160 _scribe.debug( f"Ignoring directory: {entry}" ) 

161 continue 

162 if entry.is_file( ) and entry.name in _files_to_ignore: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 _scribe.debug( f"Ignoring file: {entry}" ) 

164 continue 

165 if not no_ignores and cache( str( entry ) ): 

166 _scribe.warning( 

167 f"Skipping path (matched by .gitignore): {entry}. " 

168 "Use --no-ignores to include." ) 

169 continue 

170 if entry.is_dir( ) and recursive: 

171 collected = _collect_directory_files( 

172 entry, recursive, no_ignores ) 

173 paths.extend( collected ) 

174 elif entry.is_file( ): paths.append( entry ) 

175 return paths 

176 

177 

178def _produce_fs_tasks( 

179 location: str | __.Path, recursive: bool = False, no_ignores: bool = False 

180) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]: 

181 location_ = __.Path( location ) 

182 if location_.is_file( ) or location_.is_symlink( ): 

183 return ( _acquire_from_file( location_ ), ) 

184 if location_.is_dir( ): 

185 files = _collect_directory_files( location_, recursive, no_ignores ) 

186 return tuple( _acquire_from_file( f ) for f in files ) 

187 raise _exceptions.ContentAcquireFailure( location ) 

188 

189 

190def _produce_http_task( 

191 url: str 

192) -> __.cabc.Coroutine[ None, None, _parts.Part ]: 

193 # TODO: URL object rather than string. 

194 # TODO: Reuse clients for common hosts. 

195 

196 async def _execute_session( ) -> _parts.Part: 

197 async with _httpx.AsyncClient( # nosec B113 

198 follow_redirects = True 

199 ) as client: return await _acquire_via_http( client, url ) 

200 

201 return _execute_session( )