Coverage for sources/mimeogram/interactions.py: 53%

90 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-07 04:07 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' User interactions and automations. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import __ 

27from . import interfaces as _interfaces 

28from . import parts as _parts 

29 

30 

31async def _display_content( target: _parts.Target, content: str ) -> None: 

32 ''' Displays content in system pager. ''' 

33 from .display import display_content 

34 # Suffix from location for proper syntax highlighting. 

35 suffix = __.Path( target.part.location ).suffix or '.txt' 

36 display_content( content, suffix = suffix ) 

37 

38 

39async def _display_differences( 

40 target: _parts.Target, revision: str 

41) -> None: 

42 ''' Displays differences between content and target file. ''' 

43 original = '' 

44 destination = target.destination 

45 part = target.part 

46 if destination.exists( ): 

47 from .exceptions import ContentAcquireFailure 

48 try: 

49 original = ( 

50 await __.acquire_text_file_async( 

51 destination, charset = part.charset ) ) 

52 except Exception as exc: 

53 raise ContentAcquireFailure( destination ) from exc 

54 original = part.linesep.normalize( original ) 

55 diff = _calculate_differences( part, revision, original ) 

56 if not diff: 

57 print( "No changes" ) 

58 return 

59 from .display import display_content as display 

60 display( '\n'.join( diff ), suffix = '.diff' ) 

61 

62 

63async def _edit_content( target: _parts.Target, content: str ) -> str: 

64 ''' Edits content in system editor. ''' 

65 from .edit import edit_content 

66 # Suffix from location for proper syntax highlighting. 

67 suffix = __.Path( target.destination ).suffix or '.txt' 

68 return edit_content( content, suffix = suffix ) 

69 

70 

71def _prompt_action( 

72 target: _parts.Target, content: str, protect: bool 

73) -> str: 

74 from readchar import readkey 

75 from .exceptions import UserOperateCancellation 

76 menu = _produce_actions_menu( target.part, content, protect ) 

77 print( f"\n{menu} > ", end = '' ) 

78 __.sys.stdout.flush( ) 

79 try: choice = readkey( ).lower( ) 

80 except ( EOFError, KeyboardInterrupt ) as exc: 

81 print( ) # Add newline to avoid output mangling. 

82 raise UserOperateCancellation( exc ) from exc 

83 print( choice ) # Echo. 

84 return choice 

85 

86 

87async def _select_segments( target: _parts.Target, content: str ) -> str: 

88 from .differences import select_segments 

89 return await select_segments( target, content ) 

90 

91 

92def _validate_choice( 

93 target: _parts.Target, choice: str 

94) -> None: 

95 if choice.isprintable( ): 

96 print( f"Invalid choice: {choice}" ) 

97 else: print( "Invalid choice." ) 

98 

99 

100class GenericInteractor( 

101 _interfaces.PartInteractor, 

102 decorators = ( __.standard_dataclass, ), 

103): 

104 ''' Default console-based interaction handler. ''' 

105 

106 prompter: __.cabc.Callable[ 

107 [ _parts.Target, str, bool ], str ] = _prompt_action 

108 cdisplayer: __.cabc.Callable[ 

109 [ _parts.Target, str ], 

110 __.cabc.Coroutine[ None, None, None ] ] = _display_content 

111 ddisplayer: __.cabc.Callable[ 

112 [ _parts.Target, str ], 

113 __.cabc.Coroutine[ None, None, None ] ] = _display_differences 

114 editor: __.cabc.Callable[ 

115 [ _parts.Target, str ], 

116 __.cabc.Coroutine[ None, None, str ] ] = _edit_content 

117 sselector: __.cabc.Callable[ 

118 [ _parts.Target, str ], 

119 __.cabc.Coroutine[ None, None, str ] ] = _select_segments 

120 validator: __.cabc.Callable[ 

121 [ _parts.Target, str ], None ] = _validate_choice 

122 

123 async def __call__( 

124 self, target: _parts.Target 

125 ) -> tuple[ _parts.Resolutions, str ]: 

126 # TODO? Track revision history. 

127 # TODO: Use copies of target object with updated content. 

128 content = target.part.content 

129 protect = target.protection.active 

130 while True: 

131 choice = self.prompter( target, content, protect ) 

132 match choice: 

133 case 'a' if not protect: 

134 return _parts.Resolutions.Apply, content 

135 case 'd': await self.ddisplayer( target, content ) 

136 case 'e' if not protect: 

137 content = await self.editor( target, content ) 

138 case 'i': return _parts.Resolutions.Ignore, content 

139 case 'p' if protect: protect = False 

140 case 's' if not protect: 

141 content = await self.sselector( target, content ) 

142 case 'v': await self.cdisplayer( target, content ) 

143 case _: self.validator( target, choice ) 

144 

145 

146async def interact( 

147 target: _parts.Target, 

148 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

149) -> tuple[ _parts.Resolutions, str ]: 

150 ''' Performs interaction for part. ''' 

151 if __.is_absent( interactor ): interactor = GenericInteractor( ) 

152 return await interactor( target ) 

153 

154 

155def _calculate_differences( 

156 part: _parts.Part, 

157 revision: str, 

158 original: __.Absential[ str ] = __.absent, 

159) -> list[ str ]: 

160 ''' Generates unified diff between contents. ''' 

161 from patiencediff import ( 

162 unified_diff, PatienceSequenceMatcher ) # pyright: ignore 

163 from_lines = ( 

164 original.split( '\n' ) if not __.is_absent( original ) else [ ] ) 

165 to_lines = revision.split( '\n' ) 

166 from_file = ( 

167 part.location if not __.is_absent( original ) else '/dev/null' ) 

168 to_file = part.location 

169 return list( unified_diff( # pyright: ignore 

170 from_lines, to_lines, 

171 fromfile = from_file, tofile = to_file, 

172 lineterm = '', sequencematcher = PatienceSequenceMatcher ) ) 

173 

174 

175def _produce_actions_menu( 

176 part: _parts.Part, content: str, protect: bool 

177) -> str: 

178 size = len( content ) 

179 size_str = ( 

180 "{:.1f}K".format( size / 1024 ) 

181 if 1024 <= size # noqa: PLR2004 

182 else f"{size}B" ) 

183 status = "[PROTECTED]" if protect else "" 

184 info = f"{part.location} [{size_str}] {status}" 

185 if protect: 

186 return ( 

187 f"{info}\n" 

188 "Action? (d)iff, (i)gnore, (p)ermit changes, (v)iew" ) 

189 return ( 

190 f"{info}\n" 

191 "Action? (a)pply, (d)iff, (e)dit, (i)gnore, (s)elect hunks, (v)iew" )