Coverage for sources/mimeogram/interactions.py: 52%
89 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:15 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:15 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' User interactions and automations. '''
24from . import __
25from . import interfaces as _interfaces
26from . import parts as _parts
29async def _display_content( target: _parts.Target, content: str ) -> None:
30 ''' Displays content in system pager. '''
31 from .display import display_content
32 # Suffix from location for proper syntax highlighting.
33 suffix = __.Path( target.part.location ).suffix or '.txt'
34 display_content( content, suffix = suffix )
37async def _display_differences(
38 target: _parts.Target, revision: str
39) -> None:
40 ''' Displays differences between content and target file. '''
41 original = ''
42 destination = target.destination
43 part = target.part
44 if destination.exists( ):
45 from .exceptions import ContentAcquireFailure
46 try:
47 original = (
48 await __.acquire_text_file_async(
49 destination, charset = part.charset ) )
50 except Exception as exc:
51 raise ContentAcquireFailure( destination ) from exc
52 original = part.linesep.normalize( original )
53 diff = _calculate_differences( part, revision, original )
54 if not diff:
55 print( "No changes" )
56 return
57 from .display import display_content as display
58 display( '\n'.join( diff ), suffix = '.diff' )
61async def _edit_content( target: _parts.Target, content: str ) -> str:
62 ''' Edits content in system editor. '''
63 from .edit import edit_content
64 # Suffix from location for proper syntax highlighting.
65 suffix = __.Path( target.destination ).suffix or '.txt'
66 return edit_content( content, suffix = suffix )
69def _prompt_action(
70 target: _parts.Target, content: str, protect: bool
71) -> str:
72 from readchar import readkey
73 from .exceptions import UserOperateCancellation
74 menu = _produce_actions_menu( target.part, content, protect )
75 print( f"\n{menu} > ", end = '' )
76 __.sys.stdout.flush( )
77 try: choice = readkey( ).lower( )
78 except ( EOFError, KeyboardInterrupt ) as exc:
79 print( ) # Add newline to avoid output mangling.
80 raise UserOperateCancellation( exc ) from exc
81 print( choice ) # Echo.
82 return choice
85async def _select_segments( target: _parts.Target, content: str ) -> str:
86 from .differences import select_segments
87 return await select_segments( target, content )
90def _validate_choice(
91 target: _parts.Target, choice: str
92) -> None:
93 if choice.isprintable( ):
94 print( f"Invalid choice: {choice}" )
95 else: print( "Invalid choice." )
98class GenericInteractor( _interfaces.PartInteractor ):
99 ''' Default console-based interaction handler. '''
101 prompter: __.cabc.Callable[
102 [ _parts.Target, str, bool ], str ] = _prompt_action
103 cdisplayer: __.cabc.Callable[
104 [ _parts.Target, str ],
105 __.cabc.Coroutine[ None, None, None ] ] = _display_content
106 ddisplayer: __.cabc.Callable[
107 [ _parts.Target, str ],
108 __.cabc.Coroutine[ None, None, None ] ] = _display_differences
109 editor: __.cabc.Callable[
110 [ _parts.Target, str ],
111 __.cabc.Coroutine[ None, None, str ] ] = _edit_content
112 sselector: __.cabc.Callable[
113 [ _parts.Target, str ],
114 __.cabc.Coroutine[ None, None, str ] ] = _select_segments
115 validator: __.cabc.Callable[
116 [ _parts.Target, str ], None ] = _validate_choice
118 async def __call__(
119 self, target: _parts.Target
120 ) -> tuple[ _parts.Resolutions, str ]:
121 # TODO? Track revision history.
122 # TODO: Use copies of target object with updated content.
123 content = target.part.content
124 protect = target.protection.active
125 while True:
126 choice = self.prompter( target, content, protect )
127 match choice:
128 case 'a' if not protect:
129 return _parts.Resolutions.Apply, content
130 case 'd': await self.ddisplayer( target, content )
131 case 'e' if not protect:
132 content = await self.editor( target, content )
133 case 'i': return _parts.Resolutions.Ignore, content
134 case 'p' if protect: protect = False
135 case 's' if not protect:
136 content = await self.sselector( target, content )
137 case 'v': await self.cdisplayer( target, content )
138 case _: self.validator( target, choice )
141async def interact(
142 target: _parts.Target,
143 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent,
144) -> tuple[ _parts.Resolutions, str ]:
145 ''' Performs interaction for part. '''
146 if __.is_absent( interactor ): interactor = GenericInteractor( )
147 return await interactor( target )
150def _calculate_differences(
151 part: _parts.Part,
152 revision: str,
153 original: __.Absential[ str ] = __.absent,
154) -> list[ str ]:
155 ''' Generates unified diff between contents. '''
156 from patiencediff import (
157 unified_diff, PatienceSequenceMatcher ) # pyright: ignore
158 from_lines = (
159 original.split( '\n' ) if not __.is_absent( original ) else [ ] )
160 to_lines = revision.split( '\n' )
161 from_file = (
162 part.location if not __.is_absent( original ) else '/dev/null' )
163 to_file = part.location
164 return list( unified_diff( # pyright: ignore
165 from_lines, to_lines,
166 fromfile = from_file, tofile = to_file,
167 lineterm = '', sequencematcher = PatienceSequenceMatcher ) )
170def _produce_actions_menu(
171 part: _parts.Part, content: str, protect: bool
172) -> str:
173 size = len( content )
174 size_str = (
175 "{:.1f}K".format( size / 1024 )
176 if 1024 <= size # noqa: PLR2004
177 else f"{size}B" )
178 status = "[PROTECTED]" if protect else ""
179 info = f"{part.location} [{size_str}] {status}"
180 if protect:
181 return (
182 f"{info}\n"
183 "Action? (d)iff, (i)gnore, (p)ermit changes, (v)iew" )
184 return (
185 f"{info}\n"
186 "Action? (a)pply, (d)iff, (e)dit, (i)gnore, (s)elect hunks, (v)iew" )