Coverage for sources/mimeogram/interactions.py: 53%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-03 00:13 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' User interactions and automations. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import __ 

27from . import interfaces as _interfaces 

28from . import parts as _parts 

29 

30 

31async def _display_content( target: _parts.Target, content: str ) -> None: 

32 ''' Displays content in system pager. ''' 

33 from .display import display_content 

34 # Suffix from location for proper syntax highlighting. 

35 suffix = __.Path( target.part.location ).suffix or '.txt' 

36 display_content( content, suffix = suffix ) 

37 

38 

39async def _display_differences( 

40 target: _parts.Target, revision: str 

41) -> None: 

42 ''' Displays differences between content and target file. ''' 

43 original = '' 

44 destination = target.destination 

45 part = target.part 

46 if destination.exists( ): 

47 from .exceptions import ContentAcquireFailure 

48 try: 

49 original = ( 

50 await __.acquire_text_file_async( 

51 destination, charset = part.charset ) ) 

52 except Exception as exc: 

53 raise ContentAcquireFailure( destination ) from exc 

54 original = part.linesep.normalize( original ) 

55 diff = _calculate_differences( part, revision, original ) 

56 if not diff: 

57 print( "No changes" ) 

58 return 

59 from .display import display_content as display 

60 display( '\n'.join( diff ), suffix = '.diff' ) 

61 

62 

63async def _edit_content( target: _parts.Target, content: str ) -> str: 

64 ''' Edits content in system editor. ''' 

65 from .edit import edit_content 

66 # Suffix from location for proper syntax highlighting. 

67 suffix = __.Path( target.destination ).suffix or '.txt' 

68 return edit_content( content, suffix = suffix ) 

69 

70 

71def _prompt_action( 

72 target: _parts.Target, content: str, protect: bool 

73) -> str: 

74 from readchar import readkey 

75 from .exceptions import UserOperateCancellation 

76 menu = _produce_actions_menu( target.part, content, protect ) 

77 print( f"\n{menu} > ", end = '' ) 

78 __.sys.stdout.flush( ) 

79 try: choice = readkey( ).lower( ) 

80 except ( EOFError, KeyboardInterrupt ) as exc: 

81 print( ) # Add newline to avoid output mangling. 

82 raise UserOperateCancellation( exc ) from exc 

83 print( choice ) # Echo. 

84 return choice 

85 

86 

87async def _select_segments( target: _parts.Target, content: str ) -> str: 

88 from .differences import select_segments 

89 return await select_segments( target, content ) 

90 

91 

92def _validate_choice( 

93 target: _parts.Target, choice: str # pylint: disable=unused-argument 

94) -> None: 

95 if choice.isprintable( ): 

96 print( f"Invalid choice: {choice}" ) 

97 else: print( "Invalid choice." ) 

98 

99 

100class GenericInteractor( 

101 _interfaces.PartInteractor, 

102 decorators = ( __.standard_dataclass, ), 

103): 

104 ''' Default console-based interaction handler. ''' 

105 

106 prompter: __.cabc.Callable[ 

107 [ _parts.Target, str, bool ], str ] = _prompt_action 

108 cdisplayer: __.cabc.Callable[ 

109 [ _parts.Target, str ], 

110 __.cabc.Coroutine[ None, None, None ] ] = _display_content 

111 ddisplayer: __.cabc.Callable[ 

112 [ _parts.Target, str ], 

113 __.cabc.Coroutine[ None, None, None ] ] = _display_differences 

114 editor: __.cabc.Callable[ 

115 [ _parts.Target, str ], 

116 __.cabc.Coroutine[ None, None, str ] ] = _edit_content 

117 sselector: __.cabc.Callable[ 

118 [ _parts.Target, str ], 

119 __.cabc.Coroutine[ None, None, str ] ] = _select_segments 

120 validator: __.cabc.Callable[ 

121 [ _parts.Target, str ], None ] = _validate_choice 

122 

123 async def __call__( 

124 self, target: _parts.Target 

125 ) -> tuple[ _parts.Resolutions, str ]: 

126 # TODO? Track revision history. 

127 # TODO: Use copies of target object with updated content. 

128 content = target.part.content 

129 protect = target.protection.active 

130 while True: 

131 # pylint: disable=too-many-function-args 

132 choice = self.prompter( target, content, protect ) 

133 match choice: 

134 case 'a' if not protect: 

135 return _parts.Resolutions.Apply, content 

136 case 'd': await self.ddisplayer( target, content ) 

137 case 'e' if not protect: 

138 content = await self.editor( target, content ) 

139 case 'i': return _parts.Resolutions.Ignore, content 

140 case 'p' if protect: protect = False 

141 case 's' if not protect: 

142 content = await self.sselector( target, content ) 

143 case 'v': await self.cdisplayer( target, content ) 

144 case _: self.validator( target, choice ) 

145 # pylint: enable=too-many-function-args 

146 

147 

148async def interact( 

149 target: _parts.Target, 

150 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

151) -> tuple[ _parts.Resolutions, str ]: 

152 ''' Performs interaction for part. ''' 

153 if __.is_absent( interactor ): interactor = GenericInteractor( ) 

154 return await interactor( target ) 

155 

156 

157def _calculate_differences( 

158 part: _parts.Part, 

159 revision: str, 

160 original: __.Absential[ str ] = __.absent, 

161) -> list[ str ]: 

162 ''' Generates unified diff between contents. ''' 

163 from patiencediff import ( 

164 unified_diff, PatienceSequenceMatcher ) # pyright: ignore 

165 from_lines = ( 

166 original.split( '\n' ) if not __.is_absent( original ) else [ ] ) 

167 to_lines = revision.split( '\n' ) 

168 from_file = ( 

169 part.location if not __.is_absent( original ) else '/dev/null' ) 

170 to_file = part.location 

171 return list( unified_diff( # pyright: ignore 

172 from_lines, to_lines, 

173 fromfile = from_file, tofile = to_file, 

174 lineterm = '', sequencematcher = PatienceSequenceMatcher ) ) 

175 

176 

177def _produce_actions_menu( 

178 part: _parts.Part, content: str, protect: bool 

179) -> str: 

180 size = len( content ) 

181 size_str = ( 

182 "{:.1f}K".format( size / 1024 ) 

183 if 1024 <= size # pylint: disable=magic-value-comparison 

184 else f"{size}B" ) 

185 status = "[PROTECTED]" if protect else "" 

186 info = f"{part.location} [{size_str}] {status}" 

187 if protect: 

188 return ( 

189 f"{info}\n" 

190 "Action? (d)iff, (i)gnore, (p)ermit changes, (v)iew" ) 

191 return ( 

192 f"{info}\n" 

193 "Action? (a)pply, (d)iff, (e)dit, (i)gnore, (s)elect hunks, (v)iew" )