Coverage for sources/mimeogram/differences.py: 67%
78 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-16 02:11 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-16 02:11 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Content differences management. '''
24from __future__ import annotations
26from . import __
27from . import interfaces as _interfaces
28from . import parts as _parts
31_scribe = __.produce_scribe( __name__ )
34class ConsoleDisplay(
35 _interfaces.DifferencesDisplay, decorators = ( __.standard_dataclass, )
36):
37 ''' Default display of differences to console. '''
39 async def __call__( self, lines: __.cabc.Sequence[ str ] ) -> None:
40 from .display import display_content
41 if self.inline_threshold >= len( lines ):
42 for line in lines: print( line )
43 return
44 diff = '\n'.join( lines )
45 display_content( diff, suffix = '.diff' )
48class ConsoleInteractor( _interfaces.DifferencesInteractor ):
49 ''' Default console-based interaction handler. '''
51 async def __call__(
52 self,
53 lines: __.cabc.Sequence[ str ],
54 display: _interfaces.DifferencesDisplay
55 ) -> bool:
56 # TODO: Display hunk number.
57 from readchar import readkey
58 await display( lines )
59 menu = "Apply this change? (y)es, (n)o, (v)iew"
60 while True:
61 print( f"\n{menu} > ", end = '' )
62 try: choice = readkey( ).lower( )
63 except ( EOFError, KeyboardInterrupt ):
64 print( ) # Add newline to avoid output mangling
65 return False
66 print( choice ) # Echo.
67 match choice:
68 case 'y': return True
69 case 'n': return False
70 case 'v': await display( lines )
71 case _:
72 if choice.isprintable( ):
73 print( f"Invalid choice: {choice}" )
74 else: print( "Invalid choice." )
77async def select_segments(
78 target: _parts.Target, revision: str,
79 display: __.Absential[ _interfaces.DifferencesDisplay ] = __.absent,
80 interactor: __.Absential[ _interfaces.DifferencesInteractor ] = __.absent,
81) -> str:
82 ''' Selects which diff hunks to apply. '''
83 # TODO: Use global state for instance configuration.
84 if __.is_absent( display ): display = ConsoleDisplay( )
85 if __.is_absent( interactor ): interactor = ConsoleInteractor( )
86 # TODO: Acquire destination content from cache.
87 part = target.part
88 original = (
89 await __.acquire_text_file_async(
90 target.destination, charset = part.charset ) )
91 original = part.linesep.normalize( original )
92 if original == revision:
93 print( "No changes" )
94 return revision
95 try:
96 revision_ = (
97 await _select_segments(
98 original, revision,
99 display = display, interactor = interactor ) )
100 except Exception: # pylint: disable=broad-exception-caught
101 _scribe.exception( "Could not process changes" )
102 return revision
103 return revision_
106def _format_segment( # pylint: disable=too-many-arguments,too-many-locals
107 current_lines: list[ str ],
108 revision_lines: list[ str ],
109 i1: int, i2: int,
110 j1: int, j2: int,
111 context_lines: int = 3,
112) -> list[ str ]:
113 ''' Formats change block with context lines. '''
114 # Calculate context ranges with bounds checking
115 start = max( 0, i1 - context_lines )
116 end = min( len( current_lines ), i2 + context_lines )
117 # Build diff display
118 # TODO? Convert non-printables into printable sequences.
119 diff: list[ str ] = [ ]
120 diff.append(
121 f"@@ -{i1 + 1},{i2 - i1} +{j1 + 1},{j2 - j1} @@" )
122 for idx in range( start, i1 ):
123 diff.append( f" {current_lines[ idx ]}" )
124 for idx in range( i1, i2 ):
125 diff.append( f"-{current_lines[ idx ]}" )
126 for idx in range( j1, j2 ):
127 diff.append( f"+{revision_lines[ idx ]}" )
128 for idx in range( i2, end ):
129 diff.append( f" {current_lines[ idx ]}" )
130 return diff
133async def _select_segments( # pylint: disable=too-many-locals
134 current: str,
135 revision: str,
136 display: _interfaces.DifferencesDisplay,
137 interactor: _interfaces.DifferencesInteractor,
138) -> str:
139 from patiencediff import PatienceSequenceMatcher # pyright: ignore
140 current_lines = current.split( '\n' )
141 revision_lines = revision.split( '\n' )
142 matcher = PatienceSequenceMatcher( # pyright: ignore
143 None, current_lines, revision_lines )
144 result: list[ str ] = [ ]
145 for op, i1, i2, j1, j2 in matcher.get_opcodes( ):
146 if op == 'equal': # pylint: disable=magic-value-comparison
147 result.extend( current_lines[ i1:i2 ] )
148 continue
149 diff_lines = _format_segment(
150 current_lines, revision_lines,
151 i1, i2, j1, j2,
152 context_lines = display.context_lines )
153 if not await interactor( diff_lines, display ):
154 result.extend( current_lines[ i1:i2 ] )
155 continue
156 result.extend( revision_lines[ j1:j2 ] )
157 return '\n'.join( result )