Coverage for sources/mimeogram/updaters.py: 97%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-16 02:11 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' File content updates. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import __ 

27from . import fsprotect as _fsprotect 

28from . import interactions as _interactions 

29from . import interfaces as _interfaces 

30from . import parts as _parts 

31 

32 

33_scribe = __.produce_scribe( __name__ ) 

34 

35 

36class ReviewModes( __.enum.Enum ): # TODO: Python 3.11: StrEnum 

37 ''' Controls how updates are reviewed and applied. ''' 

38 

39 Silent = 'silent' # Apply parts without review. 

40 # Aggregate = 'aggregate' # Git-style aggregated diff for all parts 

41 Partitive = 'partitive' # Interactively review each part. 

42 

43 

44# pylint: disable=bad-reversed-sequence,unsubscriptable-object 

45# pylint: disable=unsupported-assignment-operation,unsupported-membership-test 

46class Reverter( 

47 metaclass = __.ImmutableStandardDataclass, 

48 decorators = ( __.standard_dataclass, ), 

49): 

50 ''' Backup and restore filesystem state. ''' 

51 

52 originals: dict[ __.Path, str ] = ( 

53 __.dataclass_declare( default_factory = dict ) ) 

54 revisions: list[ __.Path ] = ( 

55 __.dataclass_declare( default_factory = list ) ) 

56 

57 async def save( self, part: _parts.Part, path: __.Path ) -> None: 

58 ''' Saves original file content if it exists. ''' 

59 from .exceptions import ContentAcquireFailure 

60 if not path.exists( ): return 

61 try: 

62 content = ( 

63 await __.acquire_text_file_async( 

64 path, charset = part.charset ) ) 

65 except Exception as exc: raise ContentAcquireFailure( path ) from exc 

66 self.originals[ path ] = content 

67 

68 async def restore( self ) -> None: 

69 ''' Restores files to original contents in reverse order. ''' 

70 # TODO: async parallel fanout 

71 from .exceptions import ContentUpdateFailure 

72 for path in reversed( self.revisions ): 

73 if path in self.originals: 

74 try: 

75 await _update_content_atomic( 

76 path, self.originals[ path ] ) 

77 except ContentUpdateFailure: 

78 _scribe.exception( "Failed to restore {path}" ) 

79 else: path.unlink( ) 

80# pylint: enable=bad-reversed-sequence,unsubscriptable-object 

81# pylint: enable=unsupported-assignment-operation,unsupported-membership-test 

82 

83 

84# pylint: disable=no-member,not-an-iterable 

85class Queue( 

86 metaclass = __.ImmutableStandardDataclass, 

87 decorators = ( __.standard_dataclass, ), 

88): 

89 ''' Manages queued file updates for batch application. ''' 

90 

91 updates: list[ tuple[ _parts.Part, __.Path, str ] ] = ( 

92 __.dataclass_declare( default_factory = list ) ) 

93 reverter: Reverter = ( 

94 __.dataclass_declare( default_factory = Reverter ) ) 

95 

96 def enqueue( 

97 self, part: _parts.Part, target: __.Path, content: str 

98 ) -> None: 

99 ''' Adds a file update to queue. ''' 

100 self.updates.append( ( part, target, content ) ) 

101 

102 async def apply( self ) -> None: 

103 ''' Applies all queued updates with parallel async fanout. ''' 

104 try: 

105 await __.gather_async( 

106 *( self.reverter.save( part, target ) 

107 for part, target, _ in self.updates ), 

108 error_message = "Failed to backup files." ) 

109 await __.gather_async( 

110 *( _update_content_atomic( 

111 target, content, charset = part.charset ) 

112 for part, target, content in self.updates ), 

113 error_message = "Failed to apply updates." ) 

114 except Exception: 

115 await self.reverter.restore( ) 

116 raise 

117 for _, target, _ in self.updates: 

118 self.reverter.revisions.append( target ) 

119# pylint: enable=no-member,not-an-iterable 

120 

121 

122async def update( # pylint: disable=too-many-arguments,too-many-locals 

123 auxdata: __.Globals, 

124 parts: __.cabc.Sequence[ _parts.Part ], 

125 mode: ReviewModes, 

126 base: __.Absential[ __.Path ] = __.absent, 

127 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

128 protector: __.Absential[ _fsprotect.Protector ] = __.absent, 

129) -> None: 

130 ''' Updates filesystem locations from mimeogram. ''' 

131 if __.is_absent( base ): base = __.Path( ) 

132 if __.is_absent( protector ): 

133 protector = _fsprotect.Cache.from_configuration( auxdata = auxdata ) 

134 queue = Queue( ) 

135 for part in parts: 

136 if part.location.startswith( 'mimeogram://' ): continue 

137 destination = _derive_location( part.location, base = base ) 

138 target = _parts.Target( 

139 part = part, 

140 destination = destination, 

141 protection = protector.verify( destination ) ) 

142 action, content = await update_part( 

143 auxdata, target, mode = mode, interactor = interactor ) 

144 if _parts.Resolutions.Ignore is action: continue 

145 queue.enqueue( target.part, target.destination, content ) 

146 await queue.apply( ) 

147 

148 

149async def update_part( 

150 auxdata: __.Globals, 

151 target: _parts.Target, 

152 mode: ReviewModes, 

153 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

154) -> tuple[ _parts.Resolutions, str ]: 

155 ''' Updates filesystem location from mimeogram part. ''' 

156 content = target.part.content 

157 if ReviewModes.Partitive is mode: 

158 return await _interactions.interact( target, interactor = interactor ) 

159 options = auxdata.configuration.get( 'update-parts', { } ) 

160 if target.protection and not options.get( 'disable-protections', False ): 

161 _scribe.warning( 

162 f"Skipping protected path: {target.destination} " 

163 f"Reason: {target.protection.description}" ) 

164 return _parts.Resolutions.Ignore, content 

165 return _parts.Resolutions.Apply, content 

166 

167 

168def _derive_location( 

169 location: __.typx.Annotated[ 

170 str, __.typx.Doc( "Part location (URL or filesystem path)." ) ], 

171 base: __.typx.Annotated[ 

172 __.Absential[ __.Path ], 

173 __.typx.Doc( 

174 "Base path for relative locations. " 

175 "Defaults to current directory." ) 

176 ] = __.absent, 

177) -> __.Path: 

178 ''' Resolves part location to filesystem path. ''' 

179 import os.path as ospath 

180 from urllib.parse import urlparse 

181 from .exceptions import LocationInvalidity 

182 try: url = urlparse( location ) 

183 except Exception as exc: raise LocationInvalidity( location ) from exc 

184 match url.scheme: 

185 case '' | 'file': pass 

186 case _: raise LocationInvalidity( location ) 

187 location_ = __.Path( ospath.expanduser( ospath.expandvars( url.path ) ) ) 

188 if location_.is_absolute( ): return location_ 

189 if not __.is_absent( base ): return ( base / location_ ).resolve( ) 

190 return __.Path( ) / location_ 

191 

192 

193async def _update_content_atomic( 

194 location: __.Path, 

195 content: str, 

196 charset: str = 'utf-8', 

197 linesep: _parts.LineSeparators = _parts.LineSeparators.LF 

198) -> None: 

199 ''' Updates file content atomically, if possible. ''' 

200 import aiofiles.os as os # pylint: disable=consider-using-from-import 

201 from aiofiles.tempfile import NamedTemporaryFile 

202 location.parent.mkdir( parents = True, exist_ok = True ) 

203 content = linesep.nativize( content ) 

204 async with NamedTemporaryFile( 

205 delete = False, 

206 dir = location.parent, 

207 suffix = f"{location.suffix}.tmp", 

208 ) as stream: 

209 filename = str( stream.name ) 

210 try: 

211 await stream.write( content.encode( charset ) ) 

212 await os.replace( filename, str( location ) ) 

213 except Exception as exc: 

214 from .exceptions import ContentUpdateFailure 

215 raise ContentUpdateFailure( location ) from exc 

216 finally: 

217 if await os.path.exists( filename ): 

218 await os.remove( filename )