Coverage for sources/mimeogram/interactions.py: 52%

89 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-05 19:15 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' User interactions and automations. ''' 

22 

23 

24from . import __ 

25from . import interfaces as _interfaces 

26from . import parts as _parts 

27 

28 

29async def _display_content( target: _parts.Target, content: str ) -> None: 

30 ''' Displays content in system pager. ''' 

31 from .display import display_content 

32 # Suffix from location for proper syntax highlighting. 

33 suffix = __.Path( target.part.location ).suffix or '.txt' 

34 display_content( content, suffix = suffix ) 

35 

36 

37async def _display_differences( 

38 target: _parts.Target, revision: str 

39) -> None: 

40 ''' Displays differences between content and target file. ''' 

41 original = '' 

42 destination = target.destination 

43 part = target.part 

44 if destination.exists( ): 

45 from .exceptions import ContentAcquireFailure 

46 try: 

47 original = ( 

48 await __.acquire_text_file_async( 

49 destination, charset = part.charset ) ) 

50 except Exception as exc: 

51 raise ContentAcquireFailure( destination ) from exc 

52 original = part.linesep.normalize( original ) 

53 diff = _calculate_differences( part, revision, original ) 

54 if not diff: 

55 print( "No changes" ) 

56 return 

57 from .display import display_content as display 

58 display( '\n'.join( diff ), suffix = '.diff' ) 

59 

60 

61async def _edit_content( target: _parts.Target, content: str ) -> str: 

62 ''' Edits content in system editor. ''' 

63 from .edit import edit_content 

64 # Suffix from location for proper syntax highlighting. 

65 suffix = __.Path( target.destination ).suffix or '.txt' 

66 return edit_content( content, suffix = suffix ) 

67 

68 

69def _prompt_action( 

70 target: _parts.Target, content: str, protect: bool 

71) -> str: 

72 from readchar import readkey 

73 from .exceptions import UserOperateCancellation 

74 menu = _produce_actions_menu( target.part, content, protect ) 

75 print( f"\n{menu} > ", end = '' ) 

76 __.sys.stdout.flush( ) 

77 try: choice = readkey( ).lower( ) 

78 except ( EOFError, KeyboardInterrupt ) as exc: 

79 print( ) # Add newline to avoid output mangling. 

80 raise UserOperateCancellation( exc ) from exc 

81 print( choice ) # Echo. 

82 return choice 

83 

84 

85async def _select_segments( target: _parts.Target, content: str ) -> str: 

86 from .differences import select_segments 

87 return await select_segments( target, content ) 

88 

89 

90def _validate_choice( 

91 target: _parts.Target, choice: str 

92) -> None: 

93 if choice.isprintable( ): 

94 print( f"Invalid choice: {choice}" ) 

95 else: print( "Invalid choice." ) 

96 

97 

98class GenericInteractor( _interfaces.PartInteractor ): 

99 ''' Default console-based interaction handler. ''' 

100 

101 prompter: __.cabc.Callable[ 

102 [ _parts.Target, str, bool ], str ] = _prompt_action 

103 cdisplayer: __.cabc.Callable[ 

104 [ _parts.Target, str ], 

105 __.cabc.Coroutine[ None, None, None ] ] = _display_content 

106 ddisplayer: __.cabc.Callable[ 

107 [ _parts.Target, str ], 

108 __.cabc.Coroutine[ None, None, None ] ] = _display_differences 

109 editor: __.cabc.Callable[ 

110 [ _parts.Target, str ], 

111 __.cabc.Coroutine[ None, None, str ] ] = _edit_content 

112 sselector: __.cabc.Callable[ 

113 [ _parts.Target, str ], 

114 __.cabc.Coroutine[ None, None, str ] ] = _select_segments 

115 validator: __.cabc.Callable[ 

116 [ _parts.Target, str ], None ] = _validate_choice 

117 

118 async def __call__( 

119 self, target: _parts.Target 

120 ) -> tuple[ _parts.Resolutions, str ]: 

121 # TODO? Track revision history. 

122 # TODO: Use copies of target object with updated content. 

123 content = target.part.content 

124 protect = target.protection.active 

125 while True: 

126 choice = self.prompter( target, content, protect ) 

127 match choice: 

128 case 'a' if not protect: 

129 return _parts.Resolutions.Apply, content 

130 case 'd': await self.ddisplayer( target, content ) 

131 case 'e' if not protect: 

132 content = await self.editor( target, content ) 

133 case 'i': return _parts.Resolutions.Ignore, content 

134 case 'p' if protect: protect = False 

135 case 's' if not protect: 

136 content = await self.sselector( target, content ) 

137 case 'v': await self.cdisplayer( target, content ) 

138 case _: self.validator( target, choice ) 

139 

140 

141async def interact( 

142 target: _parts.Target, 

143 interactor: __.Absential[ _interfaces.PartInteractor ] = __.absent, 

144) -> tuple[ _parts.Resolutions, str ]: 

145 ''' Performs interaction for part. ''' 

146 if __.is_absent( interactor ): interactor = GenericInteractor( ) 

147 return await interactor( target ) 

148 

149 

150def _calculate_differences( 

151 part: _parts.Part, 

152 revision: str, 

153 original: __.Absential[ str ] = __.absent, 

154) -> list[ str ]: 

155 ''' Generates unified diff between contents. ''' 

156 from patiencediff import ( 

157 unified_diff, PatienceSequenceMatcher ) # pyright: ignore 

158 from_lines = ( 

159 original.split( '\n' ) if not __.is_absent( original ) else [ ] ) 

160 to_lines = revision.split( '\n' ) 

161 from_file = ( 

162 part.location if not __.is_absent( original ) else '/dev/null' ) 

163 to_file = part.location 

164 return list( unified_diff( # pyright: ignore 

165 from_lines, to_lines, 

166 fromfile = from_file, tofile = to_file, 

167 lineterm = '', sequencematcher = PatienceSequenceMatcher ) ) 

168 

169 

170def _produce_actions_menu( 

171 part: _parts.Part, content: str, protect: bool 

172) -> str: 

173 size = len( content ) 

174 size_str = ( 

175 "{:.1f}K".format( size / 1024 ) 

176 if 1024 <= size # noqa: PLR2004 

177 else f"{size}B" ) 

178 status = "[PROTECTED]" if protect else "" 

179 info = f"{part.location} [{size_str}] {status}" 

180 if protect: 

181 return ( 

182 f"{info}\n" 

183 "Action? (d)iff, (i)gnore, (p)ermit changes, (v)iew" ) 

184 return ( 

185 f"{info}\n" 

186 "Action? (a)pply, (d)iff, (e)dit, (i)gnore, (s)elect hunks, (v)iew" )