Coverage for sources/mimeogram/parsers.py: 100%
93 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:46 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:46 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Parsers for mimeograms and their constituents. '''
24from . import __
25from . import parts as _parts
28_scribe = __.produce_scribe( __name__ )
31def parse( mgtext: str ) -> __.cabc.Sequence[ _parts.Part ]:
32 ''' Parses mimeogram. '''
33 # TODO? Accept 'strict' flag.
34 from .exceptions import MimeogramParseFailure
35 if not mgtext.strip( ):
36 raise MimeogramParseFailure( reason = "Empty mimeogram." )
37 boundary = _extract_boundary( mgtext )
38 ptexts = _separate_parts( mgtext, boundary )
39 parts: list[ _parts.Part ] = [ ]
40 for i, ptext in enumerate( ptexts, 1 ):
41 try: part = parse_part( ptext )
42 except MimeogramParseFailure:
43 _scribe.exception( f"Parse failure on part {i}." )
44 continue
45 parts.append( part )
46 _scribe.debug( f"Parsed part {i} with location '{part.location}'." )
47 _scribe.debug( "Parsed {} parts.".format( len( parts ) ) )
48 return parts
51def parse_part( ptext: str ) -> _parts.Part:
52 ''' Parses mimeogram part. '''
53 descriptor, content = _parse_descriptor_and_content( ptext )
54 _validate_descriptor( descriptor )
55 mimetype, charset, linesep = (
56 _parse_mimetype( descriptor[ 'Content-Type' ] ) )
57 return _parts.Part(
58 location = descriptor[ 'Content-Location' ],
59 mimetype = mimetype, charset = charset, linesep = linesep,
60 content = content )
63_BOUNDARY_REGEX = __.re.compile(
64 r'''^--====MIMEOGRAM_[0-9a-fA-F]{16,}====\s*$''',
65 __.re.IGNORECASE | __.re.MULTILINE )
66def _extract_boundary( content: str ) -> str:
67 ''' Extracts first mimeogram boundary. '''
68 mobject = _BOUNDARY_REGEX.search( content )
69 if mobject:
70 boundary = mobject.group( )
71 # Windows clipboard has CRLF newlines. Strip CR before display.
72 boundary_s = boundary.rstrip( '\r' )
73 _scribe.debug( f"Found boundary: {boundary_s}" )
74 # Return with trailing newline to ensure parts are properly split.
75 return f"{boundary}\n"
76 from .exceptions import MimeogramParseFailure
77 raise MimeogramParseFailure( reason = "No mimeogram boundary found." )
80_DESCRIPTOR_REGEX = __.re.compile(
81 r'''^(?P<name>[\w\-]+)\s*:\s*(?P<value>.*)$''' )
82def _parse_descriptor_and_content(
83 content: str
84) -> tuple[ __.cabc.Mapping[ str, str ], str ]:
85 descriptor: __.cabc.Mapping[ str, str ] = { }
86 lines: list[ str ] = [ ]
87 in_matter = False
88 for line in content.splitlines( ):
89 if in_matter:
90 lines.append( line )
91 continue
92 line_s = line.strip( )
93 if not line_s:
94 in_matter = True
95 continue
96 mobject = _DESCRIPTOR_REGEX.fullmatch( line_s )
97 if not mobject:
98 _scribe.warning( "No blank line after headers." )
99 in_matter = True
100 lines.append( line )
101 continue
102 name = '-'.join( map(
103 str.capitalize, mobject.group( 'name' ).split( '-' ) ) )
104 value = mobject.group( 'value' )
105 # TODO: Detect duplicates.
106 descriptor[ name ] = value
107 _scribe.debug( f"Descriptor: {descriptor}" )
108 return descriptor, '\n'.join( lines )
111_QUOTES = '"\''
112def _parse_mimetype( header: str ) -> tuple[ str, str, _parts.LineSeparators ]:
113 ''' Extracts MIME type and charset from Content-Type header. '''
114 parts = [ p.strip( ) for p in header.split( ';' ) ]
115 mimetype = parts[ 0 ]
116 charset = 'utf-8'
117 linesep = _parts.LineSeparators.LF
118 for part in parts[ 1: ]:
119 if part.startswith( 'charset=' ):
120 charset = part[ 8: ].strip( _QUOTES )
121 if part.startswith( 'linesep=' ):
122 linesep = _parts.LineSeparators[
123 part[ 8: ].strip( _QUOTES ).upper( ) ]
124 return mimetype, charset, linesep
127def _separate_parts( content: str, boundary: str ) -> list[ str ]:
128 ''' Splits content into parts using boundary. '''
129 boundary_s = boundary.rstrip( )
130 final_boundary = f"{boundary_s}--"
131 # Detect final boundary and trailing text first.
132 final_parts = content.split( final_boundary )
133 if len( final_parts ) > 1:
134 _scribe.debug( "Found final boundary." )
135 content_with_parts = final_parts[ 0 ]
136 trailing_text = final_parts[ 1 ].strip( )
137 if trailing_text: _scribe.debug( "Found trailing text." )
138 else:
139 _scribe.warning( "No final boundary found." )
140 content_with_parts = content
141 # Split remaining content on regular boundary and skip leading text.
142 parts = content_with_parts.split( boundary )[ 1: ]
143 _scribe.debug( "Found {} parts to parse.".format( len( parts ) ) )
144 return parts
147_DESCRIPTOR_INDICES_REQUISITE = frozenset( (
148 'Content-Location', 'Content-Type' ) )
149def _validate_descriptor(
150 descriptor: __.cabc.Mapping[ str, str ]
151) -> __.cabc.Mapping[ str, str ]:
152 from .exceptions import MimeogramParseFailure
153 names = _DESCRIPTOR_INDICES_REQUISITE - descriptor.keys( )
154 if names:
155 reason = (
156 "Missing required headers: {awol}".format(
157 awol = ', '.join( names ) ) )
158 _scribe.warning( reason )
159 raise MimeogramParseFailure( reason = reason )
160 return descriptor # TODO: Return immutable.