Coverage for sources/mimeogram/create.py: 73%

70 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-05 19:15 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Creation of mimeograms. ''' 

22# TODO? Use BSD sysexits. 

23 

24 

25from . import __ 

26from . import interfaces as _interfaces 

27from . import tokenizers as _tokenizers 

28 

29 

30_scribe = __.produce_scribe( __name__ ) 

31 

32 

33class Command( 

34 _interfaces.CliCommand, 

35 decorators = ( __.standard_tyro_class, ), 

36): 

37 ''' Creates mimeogram from filesystem locations or URLs. ''' 

38 

39 sources: __.typx.Annotated[ 

40 __.tyro.conf.Positional[ list[ str ] ], 

41 __.typx.Doc( ''' Filesystem locations or URLs. ''' ), 

42 __.tyro.conf.arg( prefix_name = False ), 

43 ] 

44 clip: __.typx.Annotated[ 

45 __.typx.Optional[ bool ], 

46 __.typx.Doc( ''' Copy mimeogram to clipboard. ''' ), 

47 __.tyro.conf.arg( aliases = ( '--clipboard', '--to-clipboard' ) ), 

48 ] = None 

49 count_tokens: __.typx.Annotated[ 

50 __.typx.Optional[ bool ], 

51 __.typx.Doc( ''' Count total tokens in mimeogram. ''' ), 

52 ] = None 

53 edit: __.typx.Annotated[ 

54 bool, 

55 __.typx.Doc( ''' Spawn editor to capture introductory message. ''' ), 

56 __.tyro.conf.arg( aliases = ( '-e', '--edit-message' ) ), 

57 ] = False 

58 prepend_prompt: __.typx.Annotated[ 

59 bool, 

60 __.typx.Doc( ''' Prepend mimeogram format instructions. ''' ), 

61 ] = False 

62 recurse: __.typx.Annotated[ 

63 __.typx.Optional[ bool ], 

64 __.typx.Doc( ''' Recurse into directories. ''' ), 

65 __.tyro.conf.arg( 

66 aliases = ( '-r', '--recurse-directories', '--recursive' ) ), 

67 ] = None 

68 strict: __.typx.Annotated[ 

69 __.typx.Optional[ bool ], 

70 __.typx.Doc( 

71 ''' Fail on invalid contents? True, fail. False, skip. ''' ), 

72 __.tyro.conf.arg( aliases = ( '--fail-on-invalid', ) ), 

73 ] = None 

74 tokenizer: __.typx.Annotated[ 

75 __.typx.Optional[ _tokenizers.Tokenizers ], 

76 __.typx.Doc( ''' Which tokenizer to use for counting? ''' ), 

77 ] = None 

78 tokenizer_variant: __.typx.Annotated[ 

79 __.typx.Optional[ str ], 

80 __.typx.Doc( 

81 ''' Which tokenizer variant to use for counting? 

82 

83 'tiktoken': 'cl100k_base', 'o200k_base', etc.... 

84 

85 Not all tokenizers have variants. 

86 If not specified, then the default variant is used. 

87 ''' ), 

88 ] = None 

89 deterministic_boundary: __.typx.Annotated[ 

90 __.typx.Optional[ bool ], 

91 __.typx.Doc( 

92 ''' Use deterministic boundary for reproducible output. 

93 

94 When enabled, the MIME boundary marker will be a hash of the 

95 content, making output reproducible and diff-friendly. 

96 Useful for testing, CI, and batch processing. 

97 ''' ), 

98 __.tyro.conf.arg( aliases = ( '--deterministic-boundary', ) ), 

99 ] = None 

100 

101 async def __call__( self, auxdata: __.Globals ) -> None: 

102 ''' Executes command to create mimeogram. ''' 

103 await create( auxdata, self ) 

104 

105 def provide_configuration_edits( self ) -> __.DictionaryEdits: 

106 ''' Provides edits against configuration from options. ''' 

107 edits: list[ __.DictionaryEdit ] = [ ] 

108 if None is not self.clip: 

109 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

110 address = ( 'create', 'to-clipboard' ), value = self.clip ) ) 

111 if None is not self.count_tokens: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

113 address = ( 'create', 'count-tokens' ), 

114 value = self.count_tokens ) ) 

115 if None is not self.recurse: 

116 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

117 address = ( 'acquire-parts', 'recurse-directories' ), 

118 value = self.recurse ) ) 

119 if None is not self.strict: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

121 address = ( 'acquire-parts', 'fail-on-invalid' ), 

122 value = self.strict ) ) 

123 if None is not self.tokenizer: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

125 address = ( 'tokenizers', 'default' ), 

126 value = self.tokenizer ) ) 

127 if None is not self.deterministic_boundary: 

128 edits.append( __.SimpleDictionaryEdit( # pyright: ignore 

129 address = ( 'create', 'deterministic-boundary' ), 

130 value = self.deterministic_boundary ) ) 

131 return tuple( edits ) 

132 

133 

134async def _acquire_prompt( auxdata: __.Globals ) -> str: 

135 from .prompt import acquire_prompt 

136 return await acquire_prompt( auxdata ) 

137 

138 

139async def _copy_to_clipboard( mimeogram: str ) -> None: 

140 from pyperclip import copy 

141 copy( mimeogram ) 

142 _scribe.info( "Copied mimeogram to clipboard." ) 

143 

144 

145async def _edit_message( ) -> str: 

146 from .edit import edit_content 

147 return edit_content( ) 

148 

149 

150async def create( 

151 auxdata: __.Globals, 

152 command: Command, *, 

153 editor: __.cabc.Callable[ 

154 [ ], __.cabc.Coroutine[ None, None, str ] ] = _edit_message, 

155 clipcopier: __.cabc.Callable[ 

156 [ str ], __.cabc.Coroutine[ None, None, None ] ] = _copy_to_clipboard, 

157 prompter: __.cabc.Callable[ 

158 [ __.Globals ], 

159 __.cabc.Coroutine[ None, None, str ] ] = _acquire_prompt, 

160) -> __.typx.Never: 

161 ''' Creates mimeogram. ''' 

162 from .acquirers import acquire 

163 from .formatters import format_mimeogram 

164 with __.report_exceptions( 

165 _scribe, "Could not acquire mimeogram parts." 

166 ): parts = await acquire( auxdata, command.sources ) 

167 if command.edit: 

168 with __.report_exceptions( 

169 _scribe, "Could not acquire user message." 

170 ): message = await editor( ) 

171 else: message = None 

172 options = auxdata.configuration.get( 'create', { } ) 

173 deterministic_boundary = ( 

174 command.deterministic_boundary 

175 if command.deterministic_boundary is not None 

176 else options.get( 'deterministic-boundary', False ) ) 

177 mimeogram = format_mimeogram( 

178 parts, message = message, 

179 deterministic_boundary = deterministic_boundary ) 

180 # TODO? Pass prompt to 'format_mimeogram'. 

181 if command.prepend_prompt: 

182 prompt = await prompter( auxdata ) 

183 mimeogram = f"{prompt}\n\n{mimeogram}" 

184 if options.get( 'count-tokens', False ): 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 with __.report_exceptions( 

186 _scribe, "Could not count mimeogram tokens." 

187 ): 

188 tokenizer = await _tokenizer_from_command( auxdata, command ) 

189 tokens_count = await tokenizer.count( mimeogram ) 

190 _scribe.info( f"Total mimeogram size is {tokens_count} tokens." ) 

191 if options.get( 'to-clipboard', False ): 

192 with __.report_exceptions( 

193 _scribe, "Could not copy mimeogram to clipboard." 

194 ): await clipcopier( mimeogram ) 

195 else: print( mimeogram ) 

196 raise SystemExit( 0 ) 

197 

198 

199async def _tokenizer_from_command( 

200 auxdata: __.Globals, command: Command 

201) -> _tokenizers.Tokenizer: 

202 options = auxdata.configuration.get( 'tokenizers', { } ) 

203 name = ( 

204 command.tokenizer.value if command.tokenizer 

205 else options.get( 'default', 'tiktoken' ) ) 

206 variant = command.tokenizer_variant 

207 args = dict( variant = variant ) if variant else { } 

208 return await _tokenizers.Tokenizers.produce( name, **args )