Coverage for sources/mimeogram/__/asyncf.py: 100%
42 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-03 00:13 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-03 00:13 +0000
1# vim: set filetype=python fileencoding=utf-8:
2# -*- coding: utf-8 -*-
4#============================================================================#
5# #
6# Licensed under the Apache License, Version 2.0 (the "License"); #
7# you may not use this file except in compliance with the License. #
8# You may obtain a copy of the License at #
9# #
10# http://www.apache.org/licenses/LICENSE-2.0 #
11# #
12# Unless required by applicable law or agreed to in writing, software #
13# distributed under the License is distributed on an "AS IS" BASIS, #
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15# See the License for the specific language governing permissions and #
16# limitations under the License. #
17# #
18#============================================================================#
21''' Helper functions for async execution. '''
24from __future__ import annotations
26from . import imports as __
27from . import exceptions as _exceptions
28from . import generics as _generics
31async def gather_async(
32 *operands: __.typx.Any,
33 return_exceptions: __.typx.Annotated[
34 bool,
35 __.typx.Doc( ''' Raw or wrapped results. Wrapped, if true. ''' )
36 ] = False,
37 error_message: str = 'Failure of async operations.',
38 ignore_nonawaitables: __.typx.Annotated[
39 bool,
40 __.typx.Doc(
41 ''' Ignore or error on non-awaitables. Ignore, if true. ''' )
42 ] = False,
43) -> tuple[ __.typx.Any, ... ]:
44 ''' Gathers results from invocables concurrently and asynchronously. '''
45 # TODO: Overload signature for 'return_exceptions'.
46 from exceptiongroup import ExceptionGroup # TODO: Python 3.11: builtin
47 if ignore_nonawaitables:
48 results = await _gather_async_permissive( operands )
49 else:
50 results = await _gather_async_strict( operands )
51 if return_exceptions: return tuple( results )
52 errors = tuple( result.error for result in results if result.is_error( ) )
53 if errors: raise ExceptionGroup( error_message, errors )
54 return tuple( result.extract( ) for result in results )
57async def intercept_error_async(
58 awaitable: __.cabc.Awaitable[ __.typx.Any ]
59) -> _generics.Result[ object, Exception ]:
60 ''' Converts unwinding exceptions to error results.
62 Exceptions, which are not instances of :py:exc:`Exception` or one of
63 its subclasses, are allowed to propagate. In particular,
64 :py:exc:`KeyboardInterrupt` and :py:exc:`SystemExit` must be allowed
65 to propagate to be consistent with :py:class:`asyncio.TaskGroup`
66 behavior.
68 Helpful when working with :py:func:`asyncio.gather`, for example,
69 because exceptions can be distinguished from computed values
70 and collected together into an exception group.
72 In general, it is a bad idea to swallow exceptions. In this case,
73 the intent is to add them into an exception group for continued
74 propagation.
75 '''
76 try: return _generics.Value( await awaitable )
77 except Exception as exc: # pylint: disable=broad-exception-caught
78 return _generics.Error( exc )
81async def _gather_async_permissive(
82 operands: __.cabc.Sequence[ __.typx.Any ]
83) -> __.cabc.Sequence[ __.typx.Any ]:
84 from asyncio import gather # TODO? Python 3.11: TaskGroup
85 awaitables: dict[ int, __.cabc.Awaitable[ __.typx.Any ] ] = { }
86 results: list[ _generics.GenericResult ] = [ ]
87 for i, operand in enumerate( operands ):
88 if isinstance( operand, __.cabc.Awaitable ):
89 awaitables[ i ] = (
90 intercept_error_async( __.typx.cast(
91 __.cabc.Awaitable[ __.typx.Any ], operand ) ) )
92 results.append( _generics.Value( None ) )
93 else: results.append( _generics.Value( operand ) )
94 results_ = await gather( *awaitables.values( ) )
95 for i, result in zip( awaitables.keys( ), results_ ):
96 results[ i ] = result
97 return results
100async def _gather_async_strict(
101 operands: __.cabc.Sequence[ __.typx.Any ]
102) -> __.cabc.Sequence[ __.typx.Any ]:
103 from inspect import isawaitable, iscoroutine
104 from asyncio import gather # TODO? Python 3.11: TaskGroup
105 awaitables: list[ __.cabc.Awaitable[ __.typx.Any ] ] = [ ]
106 for operand in operands: # Sanity check.
107 if isawaitable( operand ): continue
108 for operand_ in operands: # Cleanup.
109 if iscoroutine( operand_ ): operand_.close( )
110 raise _exceptions.AsyncAssertionFailure( operand )
111 for operand in operands:
112 awaitables.append( intercept_error_async( __.typx.cast(
113 __.cabc.Awaitable[ __.typx.Any ], operand ) ) )
114 return await gather( *awaitables )