Coverage for sources/mimeogram/parsers.py: 100%
94 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-22 20:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-22 20:12 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Parsers for mimeograms and their constituents. '''
24from __future__ import annotations
26from . import __
27from . import parts as _parts
30_scribe = __.produce_scribe( __name__ )
33def parse( mgtext: str ) -> __.cabc.Sequence[ _parts.Part ]:
34 ''' Parses mimeogram. '''
35 # TODO? Accept 'strict' flag.
36 from .exceptions import MimeogramParseFailure
37 if not mgtext.strip( ):
38 raise MimeogramParseFailure( reason = "Empty mimeogram." )
39 boundary = _extract_boundary( mgtext )
40 ptexts = _separate_parts( mgtext, boundary )
41 parts: list[ _parts.Part ] = [ ]
42 for i, ptext in enumerate( ptexts, 1 ):
43 try: part = parse_part( ptext )
44 except MimeogramParseFailure:
45 _scribe.exception( f"Parse failure on part {i}." )
46 continue
47 parts.append( part )
48 _scribe.debug( f"Parsed part {i} with location '{part.location}'." )
49 _scribe.debug( "Parsed {} parts.".format( len( parts ) ) )
50 return parts
53def parse_part( ptext: str ) -> _parts.Part:
54 ''' Parses mimeogram part. '''
55 descriptor, content = _parse_descriptor_and_content( ptext )
56 _validate_descriptor( descriptor )
57 mimetype, charset, linesep = (
58 _parse_mimetype( descriptor[ 'Content-Type' ] ) )
59 return _parts.Part(
60 location = descriptor[ 'Content-Location' ],
61 mimetype = mimetype, charset = charset, linesep = linesep,
62 content = content )
65_BOUNDARY_REGEX = __.re.compile(
66 r'''^--====MIMEOGRAM_[0-9a-fA-F]{16,}====\s*$''',
67 __.re.IGNORECASE | __.re.MULTILINE )
68def _extract_boundary( content: str ) -> str:
69 ''' Extracts first mimeogram boundary. '''
70 mobject = _BOUNDARY_REGEX.search( content )
71 if mobject:
72 boundary = mobject.group( )
73 # Windows clipboard has CRLF newlines. Strip CR before display.
74 boundary_s = boundary.rstrip( '\r' )
75 _scribe.debug( f"Found boundary: {boundary_s}" )
76 # Return with trailing newline to ensure parts are properly split.
77 return f"{boundary}\n"
78 from .exceptions import MimeogramParseFailure
79 raise MimeogramParseFailure( reason = "No mimeogram boundary found." )
82_DESCRIPTOR_REGEX = __.re.compile(
83 r'''^(?P<name>[\w\-]+)\s*:\s*(?P<value>.*)$''' )
84def _parse_descriptor_and_content(
85 content: str
86) -> tuple[ __.cabc.Mapping[ str, str ], str ]:
87 descriptor: __.cabc.Mapping[ str, str ] = { }
88 lines: list[ str ] = [ ]
89 in_matter = False
90 for line in content.splitlines( ):
91 if in_matter:
92 lines.append( line )
93 continue
94 line_s = line.strip( )
95 if not line_s:
96 in_matter = True
97 continue
98 mobject = _DESCRIPTOR_REGEX.fullmatch( line_s )
99 if not mobject:
100 _scribe.warning( "No blank line after headers." )
101 in_matter = True
102 lines.append( line )
103 continue
104 name = '-'.join( map(
105 str.capitalize, mobject.group( 'name' ).split( '-' ) ) )
106 value = mobject.group( 'value' )
107 # TODO: Detect duplicates.
108 descriptor[ name ] = value
109 _scribe.debug( f"Descriptor: {descriptor}" )
110 return descriptor, '\n'.join( lines )
113_QUOTES = '"\''
114def _parse_mimetype( header: str ) -> tuple[ str, str, _parts.LineSeparators ]:
115 ''' Extracts MIME type and charset from Content-Type header. '''
116 parts = [ p.strip( ) for p in header.split( ';' ) ]
117 mimetype = parts[ 0 ]
118 charset = 'utf-8'
119 linesep = _parts.LineSeparators.LF
120 for part in parts[ 1: ]:
121 if part.startswith( 'charset=' ):
122 charset = part[ 8: ].strip( _QUOTES )
123 if part.startswith( 'linesep=' ):
124 linesep = _parts.LineSeparators[
125 part[ 8: ].strip( _QUOTES ).upper( ) ]
126 return mimetype, charset, linesep
129def _separate_parts( content: str, boundary: str ) -> list[ str ]:
130 ''' Splits content into parts using boundary. '''
131 boundary_s = boundary.rstrip( )
132 final_boundary = f"{boundary_s}--"
133 # Detect final boundary and trailing text first.
134 final_parts = content.split( final_boundary )
135 if len( final_parts ) > 1:
136 _scribe.debug( "Found final boundary." )
137 content_with_parts = final_parts[ 0 ]
138 trailing_text = final_parts[ 1 ].strip( )
139 if trailing_text: _scribe.debug( "Found trailing text." )
140 else:
141 _scribe.warning( "No final boundary found." )
142 content_with_parts = content
143 # Split remaining content on regular boundary and skip leading text.
144 parts = content_with_parts.split( boundary )[ 1: ]
145 _scribe.debug( "Found {} parts to parse.".format( len( parts ) ) )
146 return parts
149_DESCRIPTOR_INDICES_REQUISITE = frozenset( (
150 'Content-Location', 'Content-Type' ) )
151def _validate_descriptor(
152 descriptor: __.cabc.Mapping[ str, str ]
153) -> __.cabc.Mapping[ str, str ]:
154 from .exceptions import MimeogramParseFailure
155 names = _DESCRIPTOR_INDICES_REQUISITE - descriptor.keys( )
156 if names:
157 reason = (
158 "Missing required headers: {awol}".format(
159 awol = ', '.join( names ) ) )
160 _scribe.warning( reason )
161 raise MimeogramParseFailure( reason = reason )
162 return descriptor # TODO: Return immutable.