Coverage for sources/mimeogram/interactions.py: 53%
90 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-16 02:11 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-16 02:11 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' User interactions and automations. '''
24from __future__ import annotations
26from . import __
27from . import interfaces as _interfaces
28from . import parts as _parts
31async def _display_content( target: _parts.Target, content: str ) -> None:
32 ''' Displays content in system pager. '''
33 from .display import display_content
34 # Suffix from location for proper syntax highlighting.
35 suffix = __.Path( target.part.location ).suffix or '.txt'
36 display_content( content, suffix = suffix )
39async def _display_differences(
40 target: _parts.Target, revision: str
41) -> None:
42 ''' Displays differences between content and target file. '''
43 original = ''
44 destination = target.destination
45 part = target.part
46 if destination.exists( ):
47 from .exceptions import ContentAcquireFailure
48 try:
49 original = (
50 await __.acquire_text_file_async(
51 destination, charset = part.charset ) )
52 except Exception as exc:
53 raise ContentAcquireFailure( destination ) from exc
54 original = part.linesep.normalize( original )
55 diff = _calculate_differences( part, revision, original )
56 if not diff:
57 print( "No changes" )
58 return
59 from .display import display_content as display
60 display( '\n'.join( diff ), suffix = '.diff' )
63async def _edit_content( target: _parts.Target, content: str ) -> str:
64 ''' Edits content in system editor. '''
65 from .edit import edit_content
66 # Suffix from location for proper syntax highlighting.
67 suffix = __.Path( target.destination ).suffix or '.txt'
68 return edit_content( content, suffix = suffix )
71def _prompt_action(
72 target: _parts.Target, content: str, protect: bool
73) -> str:
74 from readchar import readkey
75 from .exceptions import UserOperateCancellation
76 menu = _produce_actions_menu( target.part, content, protect )
77 print( f"\n{menu} > ", end = '' )
78 __.sys.stdout.flush( )
79 try: choice = readkey( ).lower( )
80 except ( EOFError, KeyboardInterrupt ) as exc:
81 print( ) # Add newline to avoid output mangling.
82 raise UserOperateCancellation( exc ) from exc
83 print( choice ) # Echo.
84 return choice
87async def _select_segments( target: _parts.Target, content: str ) -> str:
88 from .differences import select_segments
89 return await select_segments( target, content )
92def _validate_choice(
93 target: _parts.Target, choice: str # pylint: disable=unused-argument
94) -> None:
95 if choice.isprintable( ):
96 print( f"Invalid choice: {choice}" )
97 else: print( "Invalid choice." )
100class GenericInteractor(
101 _interfaces.PartInteractor,
102 decorators = ( __.standard_dataclass, ),
103):
104 ''' Default console-based interaction handler. '''
106 prompter: __.cabc.Callable[
107 [ _parts.Target, str, bool ], str ] = _prompt_action
108 cdisplayer: __.cabc.Callable[
109 [ _parts.Target, str ],
110 __.cabc.Coroutine[ None, None, None ] ] = _display_content
111 ddisplayer: __.cabc.Callable[
112 [ _parts.Target, str ],
113 __.cabc.Coroutine[ None, None, None ] ] = _display_differences
114 editor: __.cabc.Callable[
115 [ _parts.Target, str ],
116 __.cabc.Coroutine[ None, None, str ] ] = _edit_content
117 sselector: __.cabc.Callable[
118 [ _parts.Target, str ],
119 __.cabc.Coroutine[ None, None, str ] ] = _select_segments
120 validator: __.cabc.Callable[
121 [ _parts.Target, str ], None ] = _validate_choice
123 async def __call__(
124 self, target: _parts.Target
125 ) -> tuple[ _parts.Resolutions, str ]:
126 # TODO? Track revision history.
127 # TODO: Use copies of target object with updated content.
128 content = target.part.content
129 protect = target.protection.active
130 while True:
131 # pylint: disable=too-many-function-args
132 choice = self.prompter( target, content, protect )
133 match choice:
134 case 'a' if not protect:
135 return _parts.Resolutions.Apply, content
136 case 'd': await self.ddisplayer( target, content )
137 case 'e' if not protect:
138 content = await self.editor( target, content )
139 case 'i': return _parts.Resolutions.Ignore, content
140 case 'p' if protect: protect = False
141 case 's' if not protect:
142 content = await self.sselector( target, content )
143 case 'v': await self.cdisplayer( target, content )
144 case _: self.validator( target, choice )
145 # pylint: enable=too-many-function-args
148async def interact(
149 target: _parts.Target,
150 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent,
151) -> tuple[ _parts.Resolutions, str ]:
152 ''' Performs interaction for part. '''
153 if __.is_absent( interactor ): interactor = GenericInteractor( )
154 return await interactor( target )
157def _calculate_differences(
158 part: _parts.Part,
159 revision: str,
160 original: __.Absential[ str ] = __.absent,
161) -> list[ str ]:
162 ''' Generates unified diff between contents. '''
163 from patiencediff import (
164 unified_diff, PatienceSequenceMatcher ) # pyright: ignore
165 from_lines = (
166 original.split( '\n' ) if not __.is_absent( original ) else [ ] )
167 to_lines = revision.split( '\n' )
168 from_file = (
169 part.location if not __.is_absent( original ) else '/dev/null' )
170 to_file = part.location
171 return list( unified_diff( # pyright: ignore
172 from_lines, to_lines,
173 fromfile = from_file, tofile = to_file,
174 lineterm = '', sequencematcher = PatienceSequenceMatcher ) )
177def _produce_actions_menu(
178 part: _parts.Part, content: str, protect: bool
179) -> str:
180 size = len( content )
181 size_str = (
182 "{:.1f}K".format( size / 1024 )
183 if 1024 <= size # pylint: disable=magic-value-comparison
184 else f"{size}B" )
185 status = "[PROTECTED]" if protect else ""
186 info = f"{part.location} [{size_str}] {status}"
187 if protect:
188 return (
189 f"{info}\n"
190 "Action? (d)iff, (i)gnore, (p)ermit changes, (v)iew" )
191 return (
192 f"{info}\n"
193 "Action? (a)pply, (d)iff, (e)dit, (i)gnore, (s)elect hunks, (v)iew" )