Coverage for sources/mimeogram/updaters.py: 93%
102 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-02 23:41 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-02 23:41 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' File content updates. '''
24from __future__ import annotations
26from . import __
27from . import fsprotect as _fsprotect
28from . import interactions as _interactions
29from . import interfaces as _interfaces
30from . import parts as _parts
33_scribe = __.produce_scribe( __name__ )
36class ReviewModes( __.enum.Enum ): # TODO: Python 3.11: StrEnum
37 ''' Controls how updates are reviewed and applied. '''
39 Silent = 'silent' # Apply parts without review.
40 # Aggregate = 'aggregate' # Git-style aggregated diff for all parts
41 Partitive = 'partitive' # Interactively review each part.
44# pylint: disable=bad-reversed-sequence,unsubscriptable-object
45# pylint: disable=unsupported-assignment-operation,unsupported-membership-test
46class Reverter(
47 metaclass = __.ImmutableStandardDataclass,
48 decorators = ( __.standard_dataclass, ),
49):
50 ''' Backup and restore filesystem state. '''
52 originals: dict[ __.Path, str ] = (
53 __.dataclass_declare( default_factory = dict ) )
54 revisions: list[ __.Path ] = (
55 __.dataclass_declare( default_factory = list ) )
57 async def save( self, part: _parts.Part, path: __.Path ) -> None:
58 ''' Saves original file content if it exists. '''
59 from .exceptions import ContentAcquireFailure
60 if not path.exists( ): return
61 try:
62 content = (
63 await __.acquire_text_file_async(
64 path, charset = part.charset ) )
65 except Exception as exc: raise ContentAcquireFailure( path ) from exc
66 self.originals[ path ] = content
68 async def restore( self ) -> None:
69 ''' Restores files to original contents in reverse order. '''
70 # TODO: async parallel fanout
71 from .exceptions import ContentUpdateFailure
72 for path in reversed( self.revisions ):
73 if path in self.originals:
74 try:
75 await _update_content_atomic(
76 path, self.originals[ path ] )
77 except ContentUpdateFailure:
78 _scribe.exception( "Failed to restore {path}" )
79 else: path.unlink( )
80# pylint: enable=bad-reversed-sequence,unsubscriptable-object
81# pylint: enable=unsupported-assignment-operation,unsupported-membership-test
84# pylint: disable=no-member,not-an-iterable
85class Queue(
86 metaclass = __.ImmutableStandardDataclass,
87 decorators = ( __.standard_dataclass, ),
88):
89 ''' Manages queued file updates for batch application. '''
91 updates: list[ tuple[ _parts.Part, __.Path, str ] ] = (
92 __.dataclass_declare( default_factory = list ) )
93 reverter: Reverter = (
94 __.dataclass_declare( default_factory = Reverter ) )
96 def enqueue(
97 self, part: _parts.Part, target: __.Path, content: str
98 ) -> None:
99 ''' Adds a file update to queue. '''
100 self.updates.append( ( part, target, content ) )
102 async def apply( self ) -> None:
103 ''' Applies all queued updates with parallel async fanout. '''
104 try:
105 await __.gather_async(
106 *( self.reverter.save( part, target )
107 for part, target, _ in self.updates ),
108 error_message = "Failed to backup files." )
109 await __.gather_async(
110 *( _update_content_atomic(
111 target, content, charset = part.charset )
112 for part, target, content in self.updates ),
113 error_message = "Failed to apply updates." )
114 except Exception:
115 await self.reverter.restore( )
116 raise
117 for _, target, _ in self.updates:
118 self.reverter.revisions.append( target )
119# pylint: enable=no-member,not-an-iterable
122async def update( # pylint: disable=too-many-arguments,too-many-locals
123 auxdata: __.Globals,
124 parts: __.cabc.Sequence[ _parts.Part ],
125 mode: ReviewModes,
126 base: __.Absential[ __.Path ] = __.absent,
127 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent,
128 protector: __.Absential[ _fsprotect.Protector ] = __.absent,
129) -> None:
130 ''' Updates filesystem locations from mimeogram. '''
131 if __.is_absent( base ): base = __.Path( )
132 if __.is_absent( protector ):
133 protector = _fsprotect.Cache.from_configuration( auxdata = auxdata )
134 queue = Queue( ) # pyright: ignore
135 for part in parts:
136 if part.location.startswith( 'mimeogram://' ): continue
137 destination = _derive_location( part.location, base = base )
138 target = _parts.Target(
139 part = part,
140 destination = destination,
141 protection = protector.verify( destination ) )
142 action, content = await update_part(
143 auxdata, target, mode = mode, interactor = interactor )
144 if _parts.Resolutions.Ignore is action: continue
145 queue.enqueue( target.part, target.destination, content )
146 await queue.apply( )
149async def update_part(
150 auxdata: __.Globals,
151 target: _parts.Target,
152 mode: ReviewModes,
153 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent,
154) -> tuple[ _parts.Resolutions, str ]:
155 ''' Updates filesystem location from mimeogram part. '''
156 content = target.part.content
157 if ReviewModes.Partitive is mode:
158 return await _interactions.interact( target, interactor = interactor )
159 options = auxdata.configuration.get( 'update-parts', { } )
160 if target.protection and not options.get( 'disable-protections', False ):
161 _scribe.warning(
162 f"Skipping protected path: {target.destination} "
163 f"Reason: {target.protection.description}" )
164 return _parts.Resolutions.Ignore, content
165 return _parts.Resolutions.Apply, content
168def _derive_location(
169 location: __.typx.Annotated[
170 str, __.typx.Doc( "Part location (URL or filesystem path)." ) ],
171 base: __.typx.Annotated[
172 __.Absential[ __.Path ],
173 __.typx.Doc(
174 "Base path for relative locations. "
175 "Defaults to current directory." )
176 ] = __.absent,
177) -> __.Path:
178 ''' Resolves part location to filesystem path. '''
179 import os.path as ospath
180 from urllib.parse import urlparse
181 from .exceptions import LocationInvalidity
182 try: url = urlparse( location )
183 except Exception as exc: raise LocationInvalidity( location ) from exc
184 path = __.Path( location )
185 scheme = 'file' if path.drive else url.scheme
186 match scheme:
187 case '' | 'file': pass
188 case _: raise LocationInvalidity( location )
189 location_ = __.Path( ospath.expanduser( ospath.expandvars( url.path ) ) )
190 if location_.is_absolute( ): return location_
191 if not __.is_absent( base ): return ( base / location_ ).resolve( )
192 return __.Path( ) / location_
195async def _update_content_atomic(
196 location: __.Path,
197 content: str,
198 charset: str = 'utf-8',
199 linesep: _parts.LineSeparators = _parts.LineSeparators.LF
200) -> None:
201 ''' Updates file content atomically, if possible. '''
202 import aiofiles.os as os # pylint: disable=consider-using-from-import
203 from aiofiles.tempfile import NamedTemporaryFile
204 location.parent.mkdir( parents = True, exist_ok = True )
205 content = linesep.nativize( content )
206 has_error = False
207 async with NamedTemporaryFile(
208 delete = False,
209 dir = location.parent,
210 suffix = f"{location.suffix}.tmp",
211 ) as stream:
212 filename = str( stream.name )
213 try: await stream.write( content.encode( charset ) )
214 except Exception: # pylint: disable=broad-exception-caught
215 has_error = True
216 # Windows: Replace must happen after file handle is closed.
217 if not has_error: 217 ↛ 221line 217 didn't jump to line 221 because the condition on line 217 was always true
218 try: await os.replace( filename, str( location ) )
219 except Exception: # pylint: disable=broad-exception-caught
220 has_error = True
221 if await os.path.exists( filename ):
222 try: await os.remove( filename )
223 except Exception: # pylint: disable=broad-exception-caught
224 _scribe.warning( f"Could not remove temporary file: {filename}" )
225 if has_error:
226 from .exceptions import ContentUpdateFailure
227 raise ContentUpdateFailure( location )