Coverage for sources / mimeogram / create.py: 73%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-18 17:27 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Creation of mimeograms. ''' 

22# TODO? Use BSD sysexits. 

23 

24 

25from . import __ 

26from . import exceptions as _exceptions 

27from . import interfaces as _interfaces 

28from . import tokenizers as _tokenizers 

29 

30 

31_scribe = __.produce_scribe( __name__ ) 

32 

33 

34class Command( 

35 _interfaces.CliCommand, 

36 decorators = ( __.standard_tyro_class, ), 

37): 

38 ''' Creates mimeogram from filesystem locations or URLs. ''' 

39 

40 sources: __.typx.Annotated[ 

41 __.tyro.conf.Positional[ list[ str ] ], 

42 __.typx.Doc( ''' Filesystem locations or URLs. ''' ), 

43 __.tyro.conf.arg( prefix_name = False ), 

44 ] 

45 clip: __.typx.Annotated[ 

46 __.tyro.conf.DisallowNone[ bool | None ], 

47 __.typx.Doc( ''' Copy mimeogram to clipboard. ''' ), 

48 __.tyro.conf.arg( aliases = ( '--clipboard', '--to-clipboard' ) ), 

49 ] = None 

50 count_tokens: __.typx.Annotated[ 

51 __.tyro.conf.DisallowNone[ bool | None ], 

52 __.typx.Doc( ''' Count total tokens in mimeogram. ''' ), 

53 ] = None 

54 edit: __.typx.Annotated[ 

55 bool, 

56 __.typx.Doc( ''' Spawn editor to capture introductory message. ''' ), 

57 __.tyro.conf.arg( aliases = ( '-e', '--edit-message' ) ), 

58 ] = False 

59 prepend_prompt: __.typx.Annotated[ 

60 bool, 

61 __.typx.Doc( ''' Prepend mimeogram format instructions. ''' ), 

62 ] = False 

63 recurse: __.typx.Annotated[ 

64 __.tyro.conf.DisallowNone[ bool | None ], 

65 __.typx.Doc( ''' Recurse into directories. ''' ), 

66 __.tyro.conf.arg( 

67 aliases = ( '-r', '--recurse-directories', '--recursive' ) ), 

68 ] = None 

69 no_ignores: __.typx.Annotated[ 

70 __.tyro.conf.DisallowNone[ bool | None ], 

71 __.typx.Doc( 

72 ''' Disable gitignore filtering for file collection. ''' ), 

73 ] = None 

74 strict: __.typx.Annotated[ 

75 __.tyro.conf.DisallowNone[ bool | None ], 

76 __.typx.Doc( 

77 ''' Fail on invalid contents? True, fail. False, skip. ''' ), 

78 __.tyro.conf.arg( aliases = ( '--fail-on-invalid', ) ), 

79 ] = None 

80 tokenizer: __.typx.Annotated[ 

81 __.typx.Optional[ _tokenizers.Tokenizers ], 

82 __.typx.Doc( ''' Which tokenizer to use for counting? ''' ), 

83 ] = None 

84 tokenizer_variant: __.typx.Annotated[ 

85 __.typx.Optional[ str ], 

86 __.typx.Doc( 

87 ''' Which tokenizer variant to use for counting? 

88 

89 'tiktoken': 'cl100k_base', 'o200k_base', etc.... 

90 

91 Not all tokenizers have variants. 

92 If not specified, then the default variant is used. 

93 ''' ), 

94 ] = None 

95 deterministic_boundary: __.typx.Annotated[ 

96 __.tyro.conf.DisallowNone[ bool | None ], 

97 __.typx.Doc( 

98 ''' Use deterministic boundary for reproducible output. 

99 

100 When enabled, the MIME boundary marker will be a hash of the 

101 content, making output reproducible and diff-friendly. 

102 Useful for testing, CI, and batch processing. 

103 ''' ), 

104 __.tyro.conf.arg( aliases = ( '--deterministic-boundary', ) ), 

105 ] = None 

106 

107 async def __call__( 

108 self, auxdata: __.appcore.state.Globals 

109 ) -> None: 

110 ''' Executes command to create mimeogram. ''' 

111 await create( auxdata, self ) 

112 

113 def provide_configuration_edits( 

114 self, 

115 ) -> __.appcore.dictedits.Edits: 

116 ''' Provides edits against configuration from options. ''' 

117 edits: list[ __.appcore.dictedits.Edit ] = [ ] 

118 if None is not self.clip: 

119 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

120 address = ( 'create', 'to-clipboard' ), value = self.clip ) ) 

121 if None is not self.count_tokens: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

123 address = ( 'create', 'count-tokens' ), 

124 value = self.count_tokens ) ) 

125 if None is not self.recurse: 

126 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

127 address = ( 'acquire-parts', 'recurse-directories' ), 

128 value = self.recurse ) ) 

129 if None is not self.no_ignores: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

131 address = ( 'acquire-parts', 'no-ignores' ), 

132 value = self.no_ignores ) ) 

133 if None is not self.strict: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

135 address = ( 'acquire-parts', 'fail-on-invalid' ), 

136 value = self.strict ) ) 

137 if None is not self.tokenizer: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

139 address = ( 'tokenizers', 'default' ), 

140 value = self.tokenizer ) ) 

141 if None is not self.deterministic_boundary: 

142 edits.append( __.appcore.dictedits.SimpleEdit( # pyright: ignore 

143 address = ( 'create', 'deterministic-boundary' ), 

144 value = self.deterministic_boundary ) ) 

145 return tuple( edits ) 

146 

147 

148async def _acquire_prompt( 

149 auxdata: __.appcore.state.Globals, 

150) -> str: 

151 from .prompt import acquire_prompt 

152 return await acquire_prompt( auxdata ) 

153 

154 

155async def _copy_to_clipboard( mimeogram: str ) -> None: 

156 from . import clipboard 

157 clipboard.copy_to_clipboard( mimeogram ) 

158 _scribe.info( "Copied mimeogram to clipboard." ) 

159 

160 

161async def _edit_message( ) -> str: 

162 from .edit import edit_content 

163 return edit_content( ) 

164 

165 

166async def create( 

167 auxdata: __.appcore.state.Globals, 

168 command: Command, 

169 *, 

170 editor: __.cabc.Callable[ 

171 [ ], __.cabc.Coroutine[ None, None, str ] 

172 ] = _edit_message, 

173 clipcopier: __.cabc.Callable[ 

174 [ str ], __.cabc.Coroutine[ None, None, None ] 

175 ] = _copy_to_clipboard, 

176 prompter: __.cabc.Callable[ 

177 [ __.appcore.state.Globals ], 

178 __.cabc.Coroutine[ None, None, str ] 

179 ] = _acquire_prompt, 

180) -> __.typx.Never: 

181 ''' Creates mimeogram. ''' 

182 from .acquirers import acquire 

183 from .formatters import format_mimeogram 

184 with _exceptions.report_exceptions( 

185 _scribe, "Could not acquire mimeogram parts." 

186 ): parts = await acquire( auxdata, command.sources ) 

187 if command.edit: 

188 with _exceptions.report_exceptions( 

189 _scribe, "Could not acquire user message." 

190 ): message = await editor( ) 

191 else: message = None 

192 options = auxdata.configuration.get( 'create', { } ) 

193 deterministic_boundary = ( 

194 command.deterministic_boundary 

195 if command.deterministic_boundary is not None 

196 else options.get( 'deterministic-boundary', False ) ) 

197 mimeogram = format_mimeogram( 

198 parts, message = message, 

199 deterministic_boundary = deterministic_boundary ) 

200 # TODO? Pass prompt to 'format_mimeogram'. 

201 if command.prepend_prompt: 

202 prompt = await prompter( auxdata ) 

203 mimeogram = f"{prompt}\n\n{mimeogram}" 

204 if options.get( 'count-tokens', False ): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 with _exceptions.report_exceptions( 

206 _scribe, "Could not count mimeogram tokens." 

207 ): 

208 tokenizer = await _tokenizer_from_command( auxdata, command ) 

209 tokens_count = await tokenizer.count( mimeogram ) 

210 _scribe.info( f"Total mimeogram size is {tokens_count} tokens." ) 

211 if options.get( 'to-clipboard', False ): 

212 with _exceptions.report_exceptions( 

213 _scribe, "Could not copy mimeogram to clipboard." 

214 ): await clipcopier( mimeogram ) 

215 else: print( mimeogram ) 

216 raise SystemExit( 0 ) 

217 

218 

219async def _tokenizer_from_command( 

220 auxdata: __.appcore.state.Globals, 

221 command: Command, 

222) -> _tokenizers.Tokenizer: 

223 options = auxdata.configuration.get( 'tokenizers', { } ) 

224 name = ( 

225 command.tokenizer.value if command.tokenizer 

226 else options.get( 'default', 'tiktoken' ) ) 

227 variant = command.tokenizer_variant 

228 args = dict( variant = variant ) if variant else { } 

229 return await _tokenizers.Tokenizers.produce( name, **args )