Coverage for sources/mimeogram/parsers.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-03 00:13 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Parsers for mimeograms and their constituents. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import __ 

27from . import parts as _parts 

28 

29 

30_scribe = __.produce_scribe( __name__ ) 

31 

32 

33def parse( mgtext: str ) -> __.cabc.Sequence[ _parts.Part ]: 

34 ''' Parses mimeogram. ''' 

35 # TODO? Accept 'strict' flag. 

36 from .exceptions import MimeogramParseFailure 

37 if not mgtext.strip( ): 

38 raise MimeogramParseFailure( reason = "Empty mimeogram." ) 

39 boundary = _extract_boundary( mgtext ) 

40 ptexts = _separate_parts( mgtext, boundary ) 

41 parts: list[ _parts.Part ] = [ ] 

42 for i, ptext in enumerate( ptexts, 1 ): 

43 try: part = parse_part( ptext ) 

44 except MimeogramParseFailure: 

45 _scribe.exception( f"Parse failure on part {i}." ) 

46 continue 

47 parts.append( part ) 

48 _scribe.debug( f"Parsed part {i} with location '{part.location}'." ) 

49 _scribe.debug( "Parsed {} parts.".format( len( parts ) ) ) 

50 return parts 

51 

52 

53def parse_part( ptext: str ) -> _parts.Part: 

54 ''' Parses mimeogram part. ''' 

55 descriptor, content = _parse_descriptor_and_content( ptext ) 

56 _validate_descriptor( descriptor ) 

57 mimetype, charset, linesep = ( 

58 _parse_mimetype( descriptor[ 'Content-Type' ] ) ) 

59 return _parts.Part( 

60 location = descriptor[ 'Content-Location' ], 

61 mimetype = mimetype, charset = charset, linesep = linesep, 

62 content = content ) 

63 

64 

65_BOUNDARY_REGEX = __.re.compile( 

66 r'''^--====MIMEOGRAM_[0-9a-fA-F]{16,}====\s*$''', 

67 __.re.IGNORECASE | __.re.MULTILINE ) 

68def _extract_boundary( content: str ) -> str: 

69 ''' Extracts first mimeogram boundary. ''' 

70 mobject = _BOUNDARY_REGEX.search( content ) 

71 if mobject: 

72 boundary = mobject.group( ) 

73 # Windows clipboard has CRLF newlines. Strip CR before display. 

74 boundary_s = boundary.rstrip( '\r' ) 

75 _scribe.debug( f"Found boundary: {boundary_s}" ) 

76 # Return with trailing newline to ensure parts are properly split. 

77 return f"{boundary}\n" 

78 from .exceptions import MimeogramParseFailure 

79 raise MimeogramParseFailure( reason = "No mimeogram boundary found." ) 

80 

81 

82_DESCRIPTOR_REGEX = __.re.compile( 

83 r'''^(?P<name>[\w\-]+)\s*:\s*(?P<value>.*)$''' ) 

84def _parse_descriptor_and_content( 

85 content: str 

86) -> tuple[ __.cabc.Mapping[ str, str ], str ]: 

87 descriptor: __.cabc.Mapping[ str, str ] = { } 

88 lines: list[ str ] = [ ] 

89 in_matter = False 

90 for line in content.splitlines( ): 

91 if in_matter: 

92 lines.append( line ) 

93 continue 

94 line_s = line.strip( ) 

95 if not line_s: 

96 in_matter = True 

97 continue 

98 mobject = _DESCRIPTOR_REGEX.fullmatch( line_s ) 

99 if not mobject: 

100 _scribe.warning( "No blank line after headers." ) 

101 in_matter = True 

102 lines.append( line ) 

103 continue 

104 name = '-'.join( map( 

105 str.capitalize, mobject.group( 'name' ).split( '-' ) ) ) 

106 value = mobject.group( 'value' ) 

107 # TODO: Detect duplicates. 

108 descriptor[ name ] = value 

109 _scribe.debug( f"Descriptor: {descriptor}" ) 

110 return descriptor, '\n'.join( lines ) 

111 

112 

113_QUOTES = '"\'' 

114def _parse_mimetype( header: str ) -> tuple[ str, str, _parts.LineSeparators ]: 

115 ''' Extracts MIME type and charset from Content-Type header. ''' 

116 parts = [ p.strip( ) for p in header.split( ';' ) ] 

117 mimetype = parts[ 0 ] 

118 charset = 'utf-8' 

119 linesep = _parts.LineSeparators.LF 

120 for part in parts[ 1: ]: 

121 if part.startswith( 'charset=' ): 

122 charset = part[ 8: ].strip( _QUOTES ) 

123 if part.startswith( 'linesep=' ): 

124 linesep = _parts.LineSeparators[ 

125 part[ 8: ].strip( _QUOTES ).upper( ) ] 

126 return mimetype, charset, linesep 

127 

128 

129def _separate_parts( content: str, boundary: str ) -> list[ str ]: 

130 ''' Splits content into parts using boundary. ''' 

131 boundary_s = boundary.rstrip( ) 

132 final_boundary = f"{boundary_s}--" 

133 # Detect final boundary and trailing text first. 

134 final_parts = content.split( final_boundary ) 

135 if len( final_parts ) > 1: 

136 _scribe.debug( "Found final boundary." ) 

137 content_with_parts = final_parts[ 0 ] 

138 trailing_text = final_parts[ 1 ].strip( ) 

139 if trailing_text: _scribe.debug( "Found trailing text." ) 

140 else: 

141 _scribe.warning( "No final boundary found." ) 

142 content_with_parts = content 

143 # Split remaining content on regular boundary and skip leading text. 

144 parts = content_with_parts.split( boundary )[ 1: ] 

145 _scribe.debug( "Found {} parts to parse.".format( len( parts ) ) ) 

146 return parts 

147 

148 

149_DESCRIPTOR_INDICES_REQUISITE = frozenset( ( 

150 'Content-Location', 'Content-Type' ) ) 

151def _validate_descriptor( 

152 descriptor: __.cabc.Mapping[ str, str ] 

153) -> __.cabc.Mapping[ str, str ]: 

154 from .exceptions import MimeogramParseFailure 

155 names = _DESCRIPTOR_INDICES_REQUISITE - descriptor.keys( ) 

156 if names: 

157 reason = ( 

158 "Missing required headers: {awol}".format( 

159 awol = ', '.join( names ) ) ) 

160 _scribe.warning( reason ) 

161 raise MimeogramParseFailure( reason = reason ) 

162 return descriptor # TODO: Return immutable.