Coverage for sources/mimeogram/updaters.py: 93%
101 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:46 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:46 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' File content updates. '''
24from . import __
25from . import fsprotect as _fsprotect
26from . import interactions as _interactions
27from . import interfaces as _interfaces
28from . import parts as _parts
31_scribe = __.produce_scribe( __name__ )
34class ReviewModes( __.enum.Enum ): # TODO: Python 3.11: StrEnum
35 ''' Controls how updates are reviewed and applied. '''
37 Silent = 'silent' # Apply parts without review.
38 # Aggregate = 'aggregate' # Git-style aggregated diff for all parts
39 Partitive = 'partitive' # Interactively review each part.
42class Reverter( __.immut.DataclassObject ):
43 ''' Backup and restore filesystem state. '''
45 originals: dict[ __.Path, str ] = (
46 __.dcls.field( default_factory = dict[ __.Path, str ] ) )
47 revisions: list[ __.Path ] = (
48 __.dcls.field( default_factory = list[ __.Path ] ) )
50 async def save( self, part: _parts.Part, path: __.Path ) -> None:
51 ''' Saves original file content if it exists. '''
52 from .exceptions import ContentAcquireFailure
53 if not path.exists( ): return
54 try:
55 content = (
56 await __.acquire_text_file_async(
57 path, charset = part.charset ) )
58 except Exception as exc: raise ContentAcquireFailure( path ) from exc
59 self.originals[ path ] = content
61 async def restore( self ) -> None:
62 ''' Restores files to original contents in reverse order. '''
63 # TODO: async parallel fanout
64 from .exceptions import ContentUpdateFailure
65 for path in reversed( self.revisions ):
66 if path in self.originals:
67 try:
68 await _update_content_atomic(
69 path, self.originals[ path ] )
70 except ContentUpdateFailure:
71 _scribe.exception( "Failed to restore {path}" )
72 else: path.unlink( )
75class Queue( __.immut.DataclassObject ):
76 ''' Manages queued file updates for batch application. '''
78 updates: list[ tuple[ _parts.Part, __.Path, str ] ] = (
79 __.dcls.field(
80 default_factory = list[ tuple[ _parts.Part, __.Path, str ] ] ) )
81 reverter: Reverter = ( __.dcls.field( default_factory = Reverter ) )
83 def enqueue(
84 self, part: _parts.Part, target: __.Path, content: str
85 ) -> None:
86 ''' Adds a file update to queue. '''
87 self.updates.append( ( part, target, content ) )
89 async def apply( self ) -> None:
90 ''' Applies all queued updates with parallel async fanout. '''
91 try:
92 await __.gather_async(
93 *( self.reverter.save( part, target )
94 for part, target, _ in self.updates ),
95 error_message = "Failed to backup files." )
96 await __.gather_async(
97 *( _update_content_atomic(
98 target, content, charset = part.charset )
99 for part, target, content in self.updates ),
100 error_message = "Failed to apply updates." )
101 except Exception:
102 await self.reverter.restore( )
103 raise
104 for _, target, _ in self.updates:
105 self.reverter.revisions.append( target )
108async def update( # noqa: PLR0913
109 auxdata: __.Globals,
110 parts: __.cabc.Sequence[ _parts.Part ],
111 mode: ReviewModes,
112 base: __.Absential[ __.Path ] = __.absent,
113 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent,
114 protector: __.Absential[ _fsprotect.Protector ] = __.absent,
115) -> None:
116 ''' Updates filesystem locations from mimeogram. '''
117 if __.is_absent( base ): base = __.Path( )
118 if __.is_absent( protector ):
119 protector = _fsprotect.Cache.from_configuration( auxdata = auxdata )
120 queue = Queue( ) # pyright: ignore
121 for part in parts:
122 if part.location.startswith( 'mimeogram://' ): continue
123 destination = _derive_location( part.location, base = base )
124 target = _parts.Target(
125 part = part,
126 destination = destination,
127 protection = protector.verify( destination ) )
128 action, content = await update_part(
129 auxdata, target, mode = mode, interactor = interactor )
130 if _parts.Resolutions.Ignore is action: continue
131 queue.enqueue( target.part, target.destination, content )
132 await queue.apply( )
135async def update_part(
136 auxdata: __.Globals,
137 target: _parts.Target,
138 mode: ReviewModes,
139 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent,
140) -> tuple[ _parts.Resolutions, str ]:
141 ''' Updates filesystem location from mimeogram part. '''
142 content = target.part.content
143 if ReviewModes.Partitive is mode:
144 return await _interactions.interact( target, interactor = interactor )
145 options = auxdata.configuration.get( 'update-parts', { } )
146 if target.protection and not options.get( 'disable-protections', False ):
147 _scribe.warning(
148 f"Skipping protected path: {target.destination} "
149 f"Reason: {target.protection.description}" )
150 return _parts.Resolutions.Ignore, content
151 return _parts.Resolutions.Apply, content
154def _derive_location(
155 location: __.typx.Annotated[
156 str, __.typx.Doc( "Part location (URL or filesystem path)." ) ],
157 base: __.typx.Annotated[
158 __.Absential[ __.Path ],
159 __.typx.Doc(
160 "Base path for relative locations. "
161 "Defaults to current directory." )
162 ] = __.absent,
163) -> __.Path:
164 ''' Resolves part location to filesystem path. '''
165 import os.path as ospath
166 from urllib.parse import urlparse
167 from .exceptions import LocationInvalidity
168 try: url = urlparse( location )
169 except Exception as exc: raise LocationInvalidity( location ) from exc
170 path = __.Path( location )
171 scheme = 'file' if path.drive else url.scheme
172 match scheme:
173 case '' | 'file': pass
174 case _: raise LocationInvalidity( location )
175 location_ = __.Path( ospath.expanduser( ospath.expandvars( url.path ) ) )
176 if location_.is_absolute( ): return location_
177 if not __.is_absent( base ): return ( base / location_ ).resolve( )
178 return __.Path( ) / location_
181async def _update_content_atomic(
182 location: __.Path,
183 content: str,
184 charset: str = 'utf-8',
185 linesep: _parts.LineSeparators = _parts.LineSeparators.LF
186) -> None:
187 ''' Updates file content atomically, if possible. '''
188 import aiofiles.os as os # noqa: PLR0402
189 from aiofiles.tempfile import NamedTemporaryFile # pyright: ignore
190 location.parent.mkdir( parents = True, exist_ok = True )
191 content = linesep.nativize( content )
192 has_error = False
193 async with NamedTemporaryFile(
194 delete = False,
195 dir = location.parent,
196 suffix = f"{location.suffix}.tmp",
197 ) as stream:
198 filename = str( stream.name )
199 try: await stream.write( content.encode( charset ) )
200 except Exception:
201 has_error = True
202 # Windows: Replace must happen after file handle is closed.
203 if not has_error: 203 ↛ 207line 203 didn't jump to line 207 because the condition on line 203 was always true
204 try: await os.replace( filename, str( location ) )
205 except Exception:
206 has_error = True
207 if await os.path.exists( filename ):
208 try: await os.remove( filename )
209 except Exception:
210 _scribe.warning( f"Could not remove temporary file: {filename}" )
211 if has_error:
212 from .exceptions import ContentUpdateFailure
213 raise ContentUpdateFailure( location )