Coverage for sources/mimeogram/differences.py: 67%
77 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:46 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:46 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Content differences management. '''
24from . import __
25from . import interfaces as _interfaces
26from . import parts as _parts
29_scribe = __.produce_scribe( __name__ )
32class ConsoleDisplay( _interfaces.DifferencesDisplay ):
33 ''' Default display of differences to console. '''
35 async def __call__( self, lines: __.cabc.Sequence[ str ] ) -> None:
36 from .display import display_content
37 if self.inline_threshold >= len( lines ):
38 for line in lines: print( line )
39 return
40 diff = '\n'.join( lines )
41 display_content( diff, suffix = '.diff' )
44class ConsoleInteractor( _interfaces.DifferencesInteractor ):
45 ''' Default console-based interaction handler. '''
47 async def __call__(
48 self,
49 lines: __.cabc.Sequence[ str ],
50 display: _interfaces.DifferencesDisplay
51 ) -> bool:
52 # TODO: Display hunk number.
53 from readchar import readkey
54 await display( lines )
55 menu = "Apply this change? (y)es, (n)o, (v)iew"
56 while True:
57 print( f"\n{menu} > ", end = '' )
58 try: choice = readkey( ).lower( )
59 except ( EOFError, KeyboardInterrupt ):
60 print( ) # Add newline to avoid output mangling
61 return False
62 print( choice ) # Echo.
63 match choice:
64 case 'y': return True
65 case 'n': return False
66 case 'v': await display( lines )
67 case _:
68 if choice.isprintable( ):
69 print( f"Invalid choice: {choice}" )
70 else: print( "Invalid choice." )
73async def select_segments(
74 target: _parts.Target, revision: str,
75 display: __.Absential[ _interfaces.DifferencesDisplay ] = __.absent,
76 interactor: __.Absential[ _interfaces.DifferencesInteractor ] = __.absent,
77) -> str:
78 ''' Selects which diff hunks to apply. '''
79 # TODO: Use global state for instance configuration.
80 if __.is_absent( display ): display = ConsoleDisplay( )
81 if __.is_absent( interactor ): interactor = ConsoleInteractor( )
82 # TODO: Acquire destination content from cache.
83 part = target.part
84 original = (
85 await __.acquire_text_file_async(
86 target.destination, charset = part.charset ) )
87 original = part.linesep.normalize( original )
88 if original == revision:
89 print( "No changes" )
90 return revision
91 try:
92 revision_ = (
93 await _select_segments(
94 original, revision,
95 display = display, interactor = interactor ) )
96 except Exception:
97 _scribe.exception( "Could not process changes" )
98 return revision
99 return revision_
102def _format_segment( # noqa: PLR0913
103 current_lines: list[ str ],
104 revision_lines: list[ str ],
105 i1: int, i2: int,
106 j1: int, j2: int,
107 context_lines: int = 3,
108) -> list[ str ]:
109 ''' Formats change block with context lines. '''
110 # Calculate context ranges with bounds checking
111 start = max( 0, i1 - context_lines )
112 end = min( len( current_lines ), i2 + context_lines )
113 # Build diff display
114 # TODO? Convert non-printables into printable sequences.
115 diff: list[ str ] = [ ]
116 diff.append(
117 f"@@ -{i1 + 1},{i2 - i1} +{j1 + 1},{j2 - j1} @@" )
118 for idx in range( start, i1 ):
119 diff.append( f" {current_lines[ idx ]}" ) # noqa: PERF401
120 for idx in range( i1, i2 ):
121 diff.append( f"-{current_lines[ idx ]}" ) # noqa: PERF401
122 for idx in range( j1, j2 ):
123 diff.append( f"+{revision_lines[ idx ]}" ) # noqa: PERF401
124 for idx in range( i2, end ):
125 diff.append( f" {current_lines[ idx ]}" ) # noqa: PERF401
126 return diff
129async def _select_segments(
130 current: str,
131 revision: str,
132 display: _interfaces.DifferencesDisplay,
133 interactor: _interfaces.DifferencesInteractor,
134) -> str:
135 from patiencediff import PatienceSequenceMatcher # pyright: ignore
136 current_lines = current.split( '\n' )
137 revision_lines = revision.split( '\n' )
138 matcher = PatienceSequenceMatcher( # pyright: ignore
139 None, current_lines, revision_lines )
140 result: list[ str ] = [ ]
141 for op, i1, i2, j1, j2 in matcher.get_opcodes( ):
142 if op == 'equal':
143 result.extend( current_lines[ i1:i2 ] )
144 continue
145 diff_lines = _format_segment(
146 current_lines, revision_lines,
147 i1, i2, j1, j2,
148 context_lines = display.context_lines )
149 if not await interactor( diff_lines, display ):
150 result.extend( current_lines[ i1:i2 ] )
151 continue
152 result.extend( revision_lines[ j1:j2 ] )
153 return '\n'.join( result )