Coverage for sources/mimeogram/__/asyncf.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-22 03:16 +0000

1# vim: set filetype=python fileencoding=utf-8: 

2# -*- coding: utf-8 -*- 

3 

4#============================================================================# 

5# # 

6# Licensed under the Apache License, Version 2.0 (the "License"); # 

7# you may not use this file except in compliance with the License. # 

8# You may obtain a copy of the License at # 

9# # 

10# http://www.apache.org/licenses/LICENSE-2.0 # 

11# # 

12# Unless required by applicable law or agreed to in writing, software # 

13# distributed under the License is distributed on an "AS IS" BASIS, # 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

15# See the License for the specific language governing permissions and # 

16# limitations under the License. # 

17# # 

18#============================================================================# 

19 

20 

21''' Helper functions for async execution. ''' 

22 

23 

24from __future__ import annotations 

25 

26from . import imports as __ 

27from . import exceptions as _exceptions 

28from . import generics as _generics 

29 

30 

31async def gather_async( 

32 *operands: __.typx.Any, 

33 return_exceptions: __.typx.Annotated[ 

34 bool, 

35 __.typx.Doc( ''' Raw or wrapped results. Wrapped, if true. ''' ) 

36 ] = False, 

37 error_message: str = 'Failure of async operations.', 

38 ignore_nonawaitables: __.typx.Annotated[ 

39 bool, 

40 __.typx.Doc( 

41 ''' Ignore or error on non-awaitables. Ignore, if true. ''' ) 

42 ] = False, 

43) -> tuple[ __.typx.Any, ... ]: 

44 ''' Gathers results from invocables concurrently and asynchronously. ''' 

45 # TODO: Overload signature for 'return_exceptions'. 

46 from exceptiongroup import ExceptionGroup # TODO: Python 3.11: builtin 

47 if ignore_nonawaitables: 

48 results = await _gather_async_permissive( operands ) 

49 else: 

50 results = await _gather_async_strict( operands ) 

51 if return_exceptions: return tuple( results ) 

52 errors = tuple( result.error for result in results if result.is_error( ) ) 

53 if errors: raise ExceptionGroup( error_message, errors ) 

54 return tuple( result.extract( ) for result in results ) 

55 

56 

57async def intercept_error_async( 

58 awaitable: __.cabc.Awaitable[ __.typx.Any ] 

59) -> _generics.Result[ object, Exception ]: 

60 ''' Converts unwinding exceptions to error results. 

61 

62 Exceptions, which are not instances of :py:exc:`Exception` or one of 

63 its subclasses, are allowed to propagate. In particular, 

64 :py:exc:`KeyboardInterrupt` and :py:exc:`SystemExit` must be allowed 

65 to propagate to be consistent with :py:class:`asyncio.TaskGroup` 

66 behavior. 

67 

68 Helpful when working with :py:func:`asyncio.gather`, for example, 

69 because exceptions can be distinguished from computed values 

70 and collected together into an exception group. 

71 

72 In general, it is a bad idea to swallow exceptions. In this case, 

73 the intent is to add them into an exception group for continued 

74 propagation. 

75 ''' 

76 try: return _generics.Value( await awaitable ) 

77 except Exception as exc: # pylint: disable=broad-exception-caught 

78 return _generics.Error( exc ) 

79 

80 

81async def _gather_async_permissive( 

82 operands: __.cabc.Sequence[ __.typx.Any ] 

83) -> __.cabc.Sequence[ __.typx.Any ]: 

84 from asyncio import gather # TODO? Python 3.11: TaskGroup 

85 awaitables: dict[ int, __.cabc.Awaitable[ __.typx.Any ] ] = { } 

86 results: list[ _generics.GenericResult ] = [ ] 

87 for i, operand in enumerate( operands ): 

88 if isinstance( operand, __.cabc.Awaitable ): 

89 awaitables[ i ] = ( 

90 intercept_error_async( __.typx.cast( 

91 __.cabc.Awaitable[ __.typx.Any ], operand ) ) ) 

92 results.append( _generics.Value( None ) ) 

93 else: results.append( _generics.Value( operand ) ) 

94 results_ = await gather( *awaitables.values( ) ) 

95 for i, result in zip( awaitables.keys( ), results_ ): 

96 results[ i ] = result 

97 return results 

98 

99 

100async def _gather_async_strict( 

101 operands: __.cabc.Sequence[ __.typx.Any ] 

102) -> __.cabc.Sequence[ __.typx.Any ]: 

103 from inspect import isawaitable, iscoroutine 

104 from asyncio import gather # TODO? Python 3.11: TaskGroup 

105 awaitables: list[ __.cabc.Awaitable[ __.typx.Any ] ] = [ ] 

106 for operand in operands: # Sanity check. 

107 if isawaitable( operand ): continue 

108 for operand_ in operands: # Cleanup. 

109 if iscoroutine( operand_ ): operand_.close( ) 

110 raise _exceptions.AsyncAssertionFailure( operand ) 

111 for operand in operands: 

112 awaitables.append( intercept_error_async( __.typx.cast( 

113 __.cabc.Awaitable[ __.typx.Any ], operand ) ) ) 

114 return await gather( *awaitables )