Coverage for sources/mimeogram/differences.py: 67%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-19 23:01 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Content differences management. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import __ 

27from . import interfaces as _interfaces 

28from . import parts as _parts 

29 

30 

31_scribe = __.produce_scribe( __name__ ) 

32 

33 

34class ConsoleDisplay( 

35 _interfaces.DifferencesDisplay, decorators = ( __.standard_dataclass, ) 

36): 

37 ''' Default display of differences to console. ''' 

38 

39 async def __call__( self, lines: __.cabc.Sequence[ str ] ) -> None: 

40 from .display import display_content 

41 if self.inline_threshold >= len( lines ): 

42 for line in lines: print( line ) 

43 return 

44 diff = '\n'.join( lines ) 

45 display_content( diff, suffix = '.diff' ) 

46 

47 

48class ConsoleInteractor( _interfaces.DifferencesInteractor ): 

49 ''' Default console-based interaction handler. ''' 

50 

51 async def __call__( 

52 self, 

53 lines: __.cabc.Sequence[ str ], 

54 display: _interfaces.DifferencesDisplay 

55 ) -> bool: 

56 # TODO: Display hunk number. 

57 from readchar import readkey 

58 await display( lines ) 

59 menu = "Apply this change? (y)es, (n)o, (v)iew" 

60 while True: 

61 print( f"\n{menu} > ", end = '' ) 

62 try: choice = readkey( ).lower( ) 

63 except ( EOFError, KeyboardInterrupt ): 

64 print( ) # Add newline to avoid output mangling 

65 return False 

66 print( choice ) # Echo. 

67 match choice: 

68 case 'y': return True 

69 case 'n': return False 

70 case 'v': await display( lines ) 

71 case _: 

72 if choice.isprintable( ): 

73 print( f"Invalid choice: {choice}" ) 

74 else: print( "Invalid choice." ) 

75 

76 

77async def select_segments( 

78 target: _parts.Target, revision: str, 

79 display: __.Absential[ _interfaces.DifferencesDisplay ] = __.absent, 

80 interactor: __.Absential[ _interfaces.DifferencesInteractor ] = __.absent, 

81) -> str: 

82 ''' Selects which diff hunks to apply. ''' 

83 # TODO: Use global state for instance configuration. 

84 if __.is_absent( display ): display = ConsoleDisplay( ) 

85 if __.is_absent( interactor ): interactor = ConsoleInteractor( ) 

86 # TODO: Acquire destination content from cache. 

87 part = target.part 

88 original = ( 

89 await __.acquire_text_file_async( 

90 target.destination, charset = part.charset ) ) 

91 original = part.linesep.normalize( original ) 

92 if original == revision: 

93 print( "No changes" ) 

94 return revision 

95 try: 

96 revision_ = ( 

97 await _select_segments( 

98 original, revision, 

99 display = display, interactor = interactor ) ) 

100 except Exception: # pylint: disable=broad-exception-caught 

101 _scribe.exception( "Could not process changes" ) 

102 return revision 

103 return revision_ 

104 

105 

106def _format_segment( # pylint: disable=too-many-arguments,too-many-locals 

107 current_lines: list[ str ], 

108 revision_lines: list[ str ], 

109 i1: int, i2: int, 

110 j1: int, j2: int, 

111 context_lines: int = 3, 

112) -> list[ str ]: 

113 ''' Formats change block with context lines. ''' 

114 # Calculate context ranges with bounds checking 

115 start = max( 0, i1 - context_lines ) 

116 end = min( len( current_lines ), i2 + context_lines ) 

117 # Build diff display 

118 # TODO? Convert non-printables into printable sequences. 

119 diff: list[ str ] = [ ] 

120 diff.append( 

121 f"@@ -{i1 + 1},{i2 - i1} +{j1 + 1},{j2 - j1} @@" ) 

122 for idx in range( start, i1 ): 

123 diff.append( f" {current_lines[ idx ]}" ) 

124 for idx in range( i1, i2 ): 

125 diff.append( f"-{current_lines[ idx ]}" ) 

126 for idx in range( j1, j2 ): 

127 diff.append( f"+{revision_lines[ idx ]}" ) 

128 for idx in range( i2, end ): 

129 diff.append( f" {current_lines[ idx ]}" ) 

130 return diff 

131 

132 

133async def _select_segments( # pylint: disable=too-many-locals 

134 current: str, 

135 revision: str, 

136 display: _interfaces.DifferencesDisplay, 

137 interactor: _interfaces.DifferencesInteractor, 

138) -> str: 

139 from patiencediff import PatienceSequenceMatcher # pyright: ignore 

140 current_lines = current.split( '\n' ) 

141 revision_lines = revision.split( '\n' ) 

142 matcher = PatienceSequenceMatcher( # pyright: ignore 

143 None, current_lines, revision_lines ) 

144 result: list[ str ] = [ ] 

145 for op, i1, i2, j1, j2 in matcher.get_opcodes( ): 

146 if op == 'equal': # pylint: disable=magic-value-comparison 

147 result.extend( current_lines[ i1:i2 ] ) 

148 continue 

149 diff_lines = _format_segment( 

150 current_lines, revision_lines, 

151 i1, i2, j1, j2, 

152 context_lines = display.context_lines ) 

153 if not await interactor( diff_lines, display ): 

154 result.extend( current_lines[ i1:i2 ] ) 

155 continue 

156 result.extend( revision_lines[ j1:j2 ] ) 

157 return '\n'.join( result )