Coverage for sources/mimeogram/updaters.py: 93%

102 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-07 04:07 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' File content updates. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import __ 

27from . import fsprotect as _fsprotect 

28from . import interactions as _interactions 

29from . import interfaces as _interfaces 

30from . import parts as _parts 

31 

32 

33_scribe = __.produce_scribe( __name__ ) 

34 

35 

36class ReviewModes( __.enum.Enum ): # TODO: Python 3.11: StrEnum 

37 ''' Controls how updates are reviewed and applied. ''' 

38 

39 Silent = 'silent' # Apply parts without review. 

40 # Aggregate = 'aggregate' # Git-style aggregated diff for all parts 

41 Partitive = 'partitive' # Interactively review each part. 

42 

43 

44class Reverter( 

45 metaclass = __.ImmutableStandardDataclass, 

46 decorators = ( __.standard_dataclass, ), 

47): 

48 ''' Backup and restore filesystem state. ''' 

49 

50 originals: dict[ __.Path, str ] = ( 

51 __.dataclass_declare( default_factory = dict[ __.Path, str ] ) ) 

52 revisions: list[ __.Path ] = ( 

53 __.dataclass_declare( default_factory = list[ __.Path ] ) ) 

54 

55 async def save( self, part: _parts.Part, path: __.Path ) -> None: 

56 ''' Saves original file content if it exists. ''' 

57 from .exceptions import ContentAcquireFailure 

58 if not path.exists( ): return 

59 try: 

60 content = ( 

61 await __.acquire_text_file_async( 

62 path, charset = part.charset ) ) 

63 except Exception as exc: raise ContentAcquireFailure( path ) from exc 

64 self.originals[ path ] = content 

65 

66 async def restore( self ) -> None: 

67 ''' Restores files to original contents in reverse order. ''' 

68 # TODO: async parallel fanout 

69 from .exceptions import ContentUpdateFailure 

70 for path in reversed( self.revisions ): 

71 if path in self.originals: 

72 try: 

73 await _update_content_atomic( 

74 path, self.originals[ path ] ) 

75 except ContentUpdateFailure: 

76 _scribe.exception( "Failed to restore {path}" ) 

77 else: path.unlink( ) 

78 

79 

80class Queue( 

81 metaclass = __.ImmutableStandardDataclass, 

82 decorators = ( __.standard_dataclass, ), 

83): 

84 ''' Manages queued file updates for batch application. ''' 

85 

86 updates: list[ tuple[ _parts.Part, __.Path, str ] ] = ( 

87 __.dataclass_declare( 

88 default_factory = list[ tuple[ _parts.Part, __.Path, str ] ] ) ) 

89 reverter: Reverter = ( 

90 __.dataclass_declare( default_factory = Reverter ) ) 

91 

92 def enqueue( 

93 self, part: _parts.Part, target: __.Path, content: str 

94 ) -> None: 

95 ''' Adds a file update to queue. ''' 

96 self.updates.append( ( part, target, content ) ) 

97 

98 async def apply( self ) -> None: 

99 ''' Applies all queued updates with parallel async fanout. ''' 

100 try: 

101 await __.gather_async( 

102 *( self.reverter.save( part, target ) 

103 for part, target, _ in self.updates ), 

104 error_message = "Failed to backup files." ) 

105 await __.gather_async( 

106 *( _update_content_atomic( 

107 target, content, charset = part.charset ) 

108 for part, target, content in self.updates ), 

109 error_message = "Failed to apply updates." ) 

110 except Exception: 

111 await self.reverter.restore( ) 

112 raise 

113 for _, target, _ in self.updates: 

114 self.reverter.revisions.append( target ) 

115 

116 

117async def update( # noqa: PLR0913 

118 auxdata: __.Globals, 

119 parts: __.cabc.Sequence[ _parts.Part ], 

120 mode: ReviewModes, 

121 base: __.Absential[ __.Path ] = __.absent, 

122 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

123 protector: __.Absential[ _fsprotect.Protector ] = __.absent, 

124) -> None: 

125 ''' Updates filesystem locations from mimeogram. ''' 

126 if __.is_absent( base ): base = __.Path( ) 

127 if __.is_absent( protector ): 

128 protector = _fsprotect.Cache.from_configuration( auxdata = auxdata ) 

129 queue = Queue( ) # pyright: ignore 

130 for part in parts: 

131 if part.location.startswith( 'mimeogram://' ): continue 

132 destination = _derive_location( part.location, base = base ) 

133 target = _parts.Target( 

134 part = part, 

135 destination = destination, 

136 protection = protector.verify( destination ) ) 

137 action, content = await update_part( 

138 auxdata, target, mode = mode, interactor = interactor ) 

139 if _parts.Resolutions.Ignore is action: continue 

140 queue.enqueue( target.part, target.destination, content ) 

141 await queue.apply( ) 

142 

143 

144async def update_part( 

145 auxdata: __.Globals, 

146 target: _parts.Target, 

147 mode: ReviewModes, 

148 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

149) -> tuple[ _parts.Resolutions, str ]: 

150 ''' Updates filesystem location from mimeogram part. ''' 

151 content = target.part.content 

152 if ReviewModes.Partitive is mode: 

153 return await _interactions.interact( target, interactor = interactor ) 

154 options = auxdata.configuration.get( 'update-parts', { } ) 

155 if target.protection and not options.get( 'disable-protections', False ): 

156 _scribe.warning( 

157 f"Skipping protected path: {target.destination} " 

158 f"Reason: {target.protection.description}" ) 

159 return _parts.Resolutions.Ignore, content 

160 return _parts.Resolutions.Apply, content 

161 

162 

163def _derive_location( 

164 location: __.typx.Annotated[ 

165 str, __.typx.Doc( "Part location (URL or filesystem path)." ) ], 

166 base: __.typx.Annotated[ 

167 __.Absential[ __.Path ], 

168 __.typx.Doc( 

169 "Base path for relative locations. " 

170 "Defaults to current directory." ) 

171 ] = __.absent, 

172) -> __.Path: 

173 ''' Resolves part location to filesystem path. ''' 

174 import os.path as ospath 

175 from urllib.parse import urlparse 

176 from .exceptions import LocationInvalidity 

177 try: url = urlparse( location ) 

178 except Exception as exc: raise LocationInvalidity( location ) from exc 

179 path = __.Path( location ) 

180 scheme = 'file' if path.drive else url.scheme 

181 match scheme: 

182 case '' | 'file': pass 

183 case _: raise LocationInvalidity( location ) 

184 location_ = __.Path( ospath.expanduser( ospath.expandvars( url.path ) ) ) 

185 if location_.is_absolute( ): return location_ 

186 if not __.is_absent( base ): return ( base / location_ ).resolve( ) 

187 return __.Path( ) / location_ 

188 

189 

190async def _update_content_atomic( 

191 location: __.Path, 

192 content: str, 

193 charset: str = 'utf-8', 

194 linesep: _parts.LineSeparators = _parts.LineSeparators.LF 

195) -> None: 

196 ''' Updates file content atomically, if possible. ''' 

197 import aiofiles.os as os # noqa: PLR0402 

198 from aiofiles.tempfile import NamedTemporaryFile # pyright: ignore 

199 location.parent.mkdir( parents = True, exist_ok = True ) 

200 content = linesep.nativize( content ) 

201 has_error = False 

202 async with NamedTemporaryFile( 

203 delete = False, 

204 dir = location.parent, 

205 suffix = f"{location.suffix}.tmp", 

206 ) as stream: 

207 filename = str( stream.name ) 

208 try: await stream.write( content.encode( charset ) ) 

209 except Exception: 

210 has_error = True 

211 # Windows: Replace must happen after file handle is closed. 

212 if not has_error: 212 ↛ 216line 212 didn't jump to line 216 because the condition on line 212 was always true

213 try: await os.replace( filename, str( location ) ) 

214 except Exception: 

215 has_error = True 

216 if await os.path.exists( filename ): 

217 try: await os.remove( filename ) 

218 except Exception: 

219 _scribe.warning( f"Could not remove temporary file: {filename}" ) 

220 if has_error: 

221 from .exceptions import ContentUpdateFailure 

222 raise ContentUpdateFailure( location )