Coverage for sources/mimeogram/updaters.py: 93%

101 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-05 19:15 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' File content updates. ''' 

22 

23 

24from . import __ 

25from . import fsprotect as _fsprotect 

26from . import interactions as _interactions 

27from . import interfaces as _interfaces 

28from . import parts as _parts 

29 

30 

31_scribe = __.produce_scribe( __name__ ) 

32 

33 

34class ReviewModes( __.enum.Enum ): # TODO: Python 3.11: StrEnum 

35 ''' Controls how updates are reviewed and applied. ''' 

36 

37 Silent = 'silent' # Apply parts without review. 

38 # Aggregate = 'aggregate' # Git-style aggregated diff for all parts 

39 Partitive = 'partitive' # Interactively review each part. 

40 

41 

42class Reverter( __.immut.DataclassObject ): 

43 ''' Backup and restore filesystem state. ''' 

44 

45 originals: dict[ __.Path, str ] = ( 

46 __.dcls.field( default_factory = dict[ __.Path, str ] ) ) 

47 revisions: list[ __.Path ] = ( 

48 __.dcls.field( default_factory = list[ __.Path ] ) ) 

49 

50 async def save( self, part: _parts.Part, path: __.Path ) -> None: 

51 ''' Saves original file content if it exists. ''' 

52 from .exceptions import ContentAcquireFailure 

53 if not path.exists( ): return 

54 try: 

55 content = ( 

56 await __.acquire_text_file_async( 

57 path, charset = part.charset ) ) 

58 except Exception as exc: raise ContentAcquireFailure( path ) from exc 

59 self.originals[ path ] = content 

60 

61 async def restore( self ) -> None: 

62 ''' Restores files to original contents in reverse order. ''' 

63 # TODO: async parallel fanout 

64 from .exceptions import ContentUpdateFailure 

65 for path in reversed( self.revisions ): 

66 if path in self.originals: 

67 try: 

68 await _update_content_atomic( 

69 path, self.originals[ path ] ) 

70 except ContentUpdateFailure: 

71 _scribe.exception( "Failed to restore {path}" ) 

72 else: path.unlink( ) 

73 

74 

75class Queue( __.immut.DataclassObject ): 

76 ''' Manages queued file updates for batch application. ''' 

77 

78 updates: list[ tuple[ _parts.Part, __.Path, str ] ] = ( 

79 __.dcls.field( 

80 default_factory = list[ tuple[ _parts.Part, __.Path, str ] ] ) ) 

81 reverter: Reverter = ( __.dcls.field( default_factory = Reverter ) ) 

82 

83 def enqueue( 

84 self, part: _parts.Part, target: __.Path, content: str 

85 ) -> None: 

86 ''' Adds a file update to queue. ''' 

87 self.updates.append( ( part, target, content ) ) 

88 

89 async def apply( self ) -> None: 

90 ''' Applies all queued updates with parallel async fanout. ''' 

91 try: 

92 await __.gather_async( 

93 *( self.reverter.save( part, target ) 

94 for part, target, _ in self.updates ), 

95 error_message = "Failed to backup files." ) 

96 await __.gather_async( 

97 *( _update_content_atomic( 

98 target, content, charset = part.charset ) 

99 for part, target, content in self.updates ), 

100 error_message = "Failed to apply updates." ) 

101 except Exception: 

102 await self.reverter.restore( ) 

103 raise 

104 for _, target, _ in self.updates: 

105 self.reverter.revisions.append( target ) 

106 

107 

108async def update( # noqa: PLR0913 

109 auxdata: __.Globals, 

110 parts: __.cabc.Sequence[ _parts.Part ], 

111 mode: ReviewModes, 

112 base: __.Absential[ __.Path ] = __.absent, 

113 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

114 protector: __.Absential[ _fsprotect.Protector ] = __.absent, 

115) -> None: 

116 ''' Updates filesystem locations from mimeogram. ''' 

117 if __.is_absent( base ): base = __.Path( ) 

118 if __.is_absent( protector ): 

119 protector = _fsprotect.Cache.from_configuration( auxdata = auxdata ) 

120 queue = Queue( ) # pyright: ignore 

121 for part in parts: 

122 if part.location.startswith( 'mimeogram://' ): continue 

123 destination = _derive_location( part.location, base = base ) 

124 target = _parts.Target( 

125 part = part, 

126 destination = destination, 

127 protection = protector.verify( destination ) ) 

128 action, content = await update_part( 

129 auxdata, target, mode = mode, interactor = interactor ) 

130 if _parts.Resolutions.Ignore is action: continue 

131 queue.enqueue( target.part, target.destination, content ) 

132 await queue.apply( ) 

133 

134 

135async def update_part( 

136 auxdata: __.Globals, 

137 target: _parts.Target, 

138 mode: ReviewModes, 

139 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

140) -> tuple[ _parts.Resolutions, str ]: 

141 ''' Updates filesystem location from mimeogram part. ''' 

142 content = target.part.content 

143 if ReviewModes.Partitive is mode: 

144 return await _interactions.interact( target, interactor = interactor ) 

145 options = auxdata.configuration.get( 'update-parts', { } ) 

146 if target.protection and not options.get( 'disable-protections', False ): 

147 _scribe.warning( 

148 f"Skipping protected path: {target.destination} " 

149 f"Reason: {target.protection.description}" ) 

150 return _parts.Resolutions.Ignore, content 

151 return _parts.Resolutions.Apply, content 

152 

153 

154def _derive_location( 

155 location: __.typx.Annotated[ 

156 str, __.typx.Doc( "Part location (URL or filesystem path)." ) ], 

157 base: __.typx.Annotated[ 

158 __.Absential[ __.Path ], 

159 __.typx.Doc( 

160 "Base path for relative locations. " 

161 "Defaults to current directory." ) 

162 ] = __.absent, 

163) -> __.Path: 

164 ''' Resolves part location to filesystem path. ''' 

165 import os.path as ospath 

166 from urllib.parse import urlparse 

167 from .exceptions import LocationInvalidity 

168 try: url = urlparse( location ) 

169 except Exception as exc: raise LocationInvalidity( location ) from exc 

170 path = __.Path( location ) 

171 scheme = 'file' if path.drive else url.scheme 

172 match scheme: 

173 case '' | 'file': pass 

174 case _: raise LocationInvalidity( location ) 

175 location_ = __.Path( ospath.expanduser( ospath.expandvars( url.path ) ) ) 

176 if location_.is_absolute( ): return location_ 

177 if not __.is_absent( base ): return ( base / location_ ).resolve( ) 

178 return __.Path( ) / location_ 

179 

180 

181async def _update_content_atomic( 

182 location: __.Path, 

183 content: str, 

184 charset: str = 'utf-8', 

185 linesep: _parts.LineSeparators = _parts.LineSeparators.LF 

186) -> None: 

187 ''' Updates file content atomically, if possible. ''' 

188 import aiofiles.os as os # noqa: PLR0402 

189 from aiofiles.tempfile import NamedTemporaryFile # pyright: ignore 

190 location.parent.mkdir( parents = True, exist_ok = True ) 

191 content = linesep.nativize( content ) 

192 has_error = False 

193 async with NamedTemporaryFile( 

194 delete = False, 

195 dir = location.parent, 

196 suffix = f"{location.suffix}.tmp", 

197 ) as stream: 

198 filename = str( stream.name ) 

199 try: await stream.write( content.encode( charset ) ) 

200 except Exception: 

201 has_error = True 

202 # Windows: Replace must happen after file handle is closed. 

203 if not has_error: 203 ↛ 207line 203 didn't jump to line 207 because the condition on line 203 was always true

204 try: await os.replace( filename, str( location ) ) 

205 except Exception: 

206 has_error = True 

207 if await os.path.exists( filename ): 

208 try: await os.remove( filename ) 

209 except Exception: 

210 _scribe.warning( f"Could not remove temporary file: {filename}" ) 

211 if has_error: 

212 from .exceptions import ContentUpdateFailure 

213 raise ContentUpdateFailure( location )