Coverage for sources/mimeogram/apply.py: 87%

92 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-16 01:42 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Application of mimeograms. ''' 

22# TODO? Use BSD sysexits. 

23 

24 

25from __future__ import annotations 

26 

27from . import __ 

28from . import interfaces as _interfaces 

29from . import parts as _parts 

30from . import updaters as _updaters 

31 

32 

33_scribe = __.produce_scribe( __name__ ) 

34 

35 

36class Command( 

37 _interfaces.CliCommand, 

38 decorators = ( __.standard_dataclass, __.standard_tyro_class ), 

39): 

40 ''' Applies mimeogram to filesystem locations. ''' 

41 

42 source: __.typx.Annotated[ 

43 str, # TODO: str | Path 

44 __.tyro.conf.arg( 

45 help = ( 

46 "Source file for mimeogram. " 

47 "Defaults to stdin if '--clip' not specified." ) ), 

48 ] = '-' 

49 clip: __.typx.Annotated[ 

50 __.typx.Optional[ bool ], 

51 __.tyro.conf.arg( 

52 aliases = ( '--clipboard', '--from-clipboard' ), 

53 help = "Read mimeogram from clipboard instead of file or stdin." ), 

54 ] = None 

55 mode: __.typx.Annotated[ 

56 __.typx.Optional[ _updaters.ReviewModes ], 

57 __.tyro.conf.arg( 

58 aliases = ( '--review-mode', ), 

59 help = ( 

60 "Controls how changes are reviewed. " 

61 "'silent': Apply without review. " 

62 "'partitive': Review each change interactively. " 

63 "Partitive, if not specified and on a terminal. " 

64 "Silent, if not specified and not on a terminal." ) ), 

65 ] = None 

66 base: __.typx.Annotated[ 

67 __.typx.Optional[ __.Path ], 

68 __.tyro.conf.arg( 

69 aliases = ( '--base-directory', ), 

70 help = ( 

71 "Base directory for relative locations. " 

72 "Defaults to current working directory." ) ), 

73 ] = None 

74 # preview: __.typx.Annotated[ 

75 # __.typx.Optional[ bool ], 

76 # __.tyro.conf.arg( 

77 # help = "Show what would be changed without making changes." ), 

78 # ] = None 

79 force: __.typx.Annotated[ 

80 __.typx.Optional[ bool ], 

81 __.tyro.conf.arg( 

82 help = 'Override protected path checks.' ), 

83 ] = None 

84 

85 async def __call__( self, auxdata: __.Globals ) -> None: 

86 ''' Executes command to apply mimeogram. ''' 

87 await apply( auxdata, self ) 

88 

89 def provide_configuration_edits( self ) -> __.DictionaryEdits: 

90 ''' Provides edits against configuration from options. ''' 

91 edits: list[ __.DictionaryEdit ] = [ ] 

92 if None is not self.clip: 

93 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

94 address = ( 'apply', 'from-clipboard' ), value = self.clip ) ) 

95 if None is not self.force: 

96 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

97 address = ( 'update-parts', 'disable-protections' ), 

98 value = self.force ) ) 

99 return tuple( edits ) 

100 

101 

102class ContentAcquirer( # pylint: disable=invalid-metaclass 

103 __.typx.Protocol, 

104 metaclass = __.ImmutableStandardProtocolDataclass, 

105 decorators = ( __.standard_dataclass, __.typx.runtime_checkable ), 

106): 

107 ''' Acquires content for apply command. ''' 

108 

109 @__.abc.abstractmethod 

110 def stdin_is_tty( self ) -> bool: 

111 ''' Checks if input is from a terminal. ''' 

112 raise NotImplementedError 

113 

114 @__.abc.abstractmethod 

115 async def acquire_clipboard( self ) -> str: 

116 ''' Acquires content from clipboard. ''' 

117 raise NotImplementedError 

118 

119 @__.abc.abstractmethod 

120 async def acquire_file( self, path: str | __.Path ) -> str: 

121 ''' Acquires content from file. ''' 

122 raise NotImplementedError 

123 

124 @__.abc.abstractmethod 

125 async def acquire_stdin( self ) -> str: 

126 ''' Acquires content from standard input. ''' 

127 raise NotImplementedError 

128 

129 

130class StandardContentAcquirer( 

131 ContentAcquirer, decorators = ( __.standard_dataclass, ) 

132): 

133 ''' Standard implementation of content acquisition. ''' 

134 

135 def stdin_is_tty( self ) -> bool: 

136 return __.sys.stdin.isatty( ) 

137 

138 async def acquire_clipboard( self ) -> str: 

139 from pyperclip import paste 

140 return paste( ) 

141 

142 async def acquire_file( self, path: str | __.Path ) -> str: 

143 return await __.acquire_text_file_async( path ) 

144 

145 async def acquire_stdin( self ) -> str: 

146 return __.sys.stdin.read( ) 

147 

148 

149async def apply( # pylint: disable=too-complex,too-many-statements 

150 auxdata: __.Globals, 

151 command: Command, 

152 *, 

153 acquirer: __.Absential[ ContentAcquirer ] = __.absent, 

154 parser: __.Absential[ 

155 __.cabc.Callable[ [ str ], __.cabc.Sequence[ _parts.Part ] ] 

156 ] = __.absent, 

157 updater: __.Absential[ 

158 __.cabc.Callable[ 

159 [ __.Globals, 

160 __.cabc.Sequence[ _parts.Part ], 

161 _updaters.ReviewModes ], 

162 __.cabc.Coroutine[ None, None, None ] 

163 ] 

164 ] = __.absent, 

165) -> __.typx.Never: 

166 ''' Applies mimeogram. ''' 

167 if __.is_absent( acquirer ): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 acquirer = StandardContentAcquirer( ) 

169 if __.is_absent( parser ): 

170 from .parsers import parse as parser 

171 if __.is_absent( updater ): 

172 from .updaters import update as updater 

173 review_mode = _determine_review_mode( command, acquirer ) 

174 try: mgtext = await _acquire( auxdata, command, acquirer ) 

175 except Exception as exc: 

176 _scribe.exception( "Could not acquire mimeogram to apply." ) 

177 raise SystemExit( 1 ) from exc 

178 if not mgtext: 

179 _scribe.error( "Cannot apply empty mimeogram." ) 

180 raise SystemExit( 1 ) 

181 try: parts = parser( mgtext ) 

182 except Exception as exc: 

183 _scribe.exception( "Could not parse mimeogram." ) 

184 raise SystemExit( 1 ) from exc 

185 nomargs: dict[ str, __.typx.Any ] = { } 

186 if command.base: nomargs[ 'base' ] = command.base 

187 try: await updater( auxdata, parts, review_mode, **nomargs ) 

188 except Exception as exc: 

189 _scribe.exception( "Could not apply mimeogram." ) 

190 raise SystemExit( 1 ) from exc 

191 _scribe.info( "Successfully applied mimeogram" ) 

192 raise SystemExit( 0 ) 

193 

194 

195async def _acquire( 

196 auxdata: __.Globals, cmd: Command, acquirer: ContentAcquirer 

197) -> str: 

198 ''' Acquires content to parse from clipboard, file, or stdin. ''' 

199 options = auxdata.configuration.get( 'apply', { } ) 

200 if options.get( 'from-clipboard', False ): 

201 content = await acquirer.acquire_clipboard( ) 

202 if not content: 

203 _scribe.error( "Clipboard is empty." ) 

204 raise SystemExit( 1 ) 

205 _scribe.debug( 

206 "Read {} characters from clipboard.".format( len( content ) ) ) 

207 return content 

208 match cmd.source: 

209 case '-': return await acquirer.acquire_stdin( ) 

210 case _: return await acquirer.acquire_file( cmd.source ) 

211 

212 

213def _determine_review_mode( 

214 command: Command, acquirer: ContentAcquirer 

215) -> _updaters.ReviewModes: 

216 on_tty = acquirer.stdin_is_tty( ) 

217 if command.mode is None: 

218 if on_tty: return _updaters.ReviewModes.Partitive 

219 return _updaters.ReviewModes.Silent 

220 if not on_tty and command.mode is not _updaters.ReviewModes.Silent: 

221 _scribe.error( "Cannot use an interactive mode without terminal." ) 

222 raise SystemExit( 1 ) 

223 return command.mode