Coverage for aiostream/stream/create.py: 96%

81 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-05-04 00:10 +0000

1"""Non-pipable creation operators.""" 

2from __future__ import annotations 

3 

4import sys 

5import asyncio 

6import inspect 

7import builtins 

8import itertools 

9 

10from typing import ( 

11 AsyncIterable, 

12 Awaitable, 

13 Iterable, 

14 Protocol, 

15 TypeVar, 

16 AsyncIterator, 

17 cast, 

18) 

19from typing_extensions import ParamSpec 

20 

21from ..stream import time 

22from ..core import operator, streamcontext 

23 

24__all__ = [ 

25 "iterate", 

26 "preserve", 

27 "just", 

28 "call", 

29 "throw", 

30 "empty", 

31 "never", 

32 "repeat", 

33 "range", 

34 "count", 

35] 

36 

37T = TypeVar("T") 

38P = ParamSpec("P") 

39 

40# Hack for python 3.8 compatibility 

41if sys.version_info < (3, 9): 

42 P = TypeVar("P") 

43 

44# Convert regular iterables 

45 

46 

47@operator 

48async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]: 

49 """Generate values from a regular iterable.""" 

50 for item in it: 

51 await asyncio.sleep(0) 

52 yield item 

53 

54 

55@operator 

56def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]: 

57 """Generate values from an asynchronous iterable. 

58 

59 Note: the corresponding iterator will be explicitely closed 

60 when leaving the context manager.""" 

61 return streamcontext(ait) 

62 

63 

64@operator 

65def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]: 

66 """Generate values from a sychronous or asynchronous iterable.""" 

67 if isinstance(it, AsyncIterable): 

68 return from_async_iterable.raw(it) 

69 if isinstance(it, Iterable): 

70 return from_iterable.raw(it) 

71 raise TypeError(f"{type(it).__name__!r} object is not (async) iterable") 

72 

73 

74@operator 

75async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]: 

76 """Generate values from an asynchronous iterable without 

77 explicitly closing the corresponding iterator.""" 

78 async for item in ait: 

79 yield item 

80 

81 

82# Simple operators 

83 

84 

85@operator 

86async def just(value: T) -> AsyncIterator[T]: 

87 """Await if possible, and generate a single value.""" 

88 if inspect.isawaitable(value): 

89 yield await value 

90 else: 

91 yield value 

92 

93 

94Y = TypeVar("Y", covariant=True) 

95 

96 

97class SyncCallable(Protocol[P, Y]): 

98 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y: 

99 ... 

100 

101 

102class AsyncCallable(Protocol[P, Y]): 

103 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]: 

104 ... 

105 

106 

107@operator 

108async def call( 

109 func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs 

110) -> AsyncIterator[T]: 

111 """Call the given function and generate a single value. 

112 

113 Await if the provided function is asynchronous. 

114 """ 

115 if asyncio.iscoroutinefunction(func): 

116 async_func = cast("AsyncCallable[P, T]", func) 

117 yield await async_func(*args, **kwargs) 

118 else: 

119 sync_func = cast("SyncCallable[P, T]", func) 

120 yield sync_func(*args, **kwargs) 

121 

122 

123@operator 

124async def throw(exc: Exception) -> AsyncIterator[None]: 

125 """Throw an exception without generating any value.""" 

126 if False: 

127 yield 

128 raise exc 

129 

130 

131@operator 

132async def empty() -> AsyncIterator[None]: 

133 """Terminate without generating any value.""" 

134 if False: 

135 yield 

136 

137 

138@operator 

139async def never() -> AsyncIterator[None]: 

140 """Hang forever without generating any value.""" 

141 if False: 

142 yield 

143 future: asyncio.Future[None] = asyncio.Future() 

144 try: 

145 await future 

146 finally: 

147 future.cancel() 

148 

149 

150@operator 

151def repeat( 

152 value: T, times: int | None = None, *, interval: float = 0.0 

153) -> AsyncIterator[T]: 

154 """Generate the same value a given number of times. 

155 

156 If ``times`` is ``None``, the value is repeated indefinitely. 

157 An optional interval can be given to space the values out. 

158 """ 

159 args = () if times is None else (times,) 

160 it = itertools.repeat(value, *args) 

161 agen = from_iterable.raw(it) 

162 return time.spaceout.raw(agen, interval) if interval else agen 

163 

164 

165# Counting operators 

166 

167 

168@operator 

169def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]: 

170 """Generate a given range of numbers. 

171 

172 It supports the same arguments as the builtin function. 

173 An optional interval can be given to space the values out. 

174 """ 

175 agen = from_iterable.raw(builtins.range(*args)) 

176 return time.spaceout.raw(agen, interval) if interval else agen 

177 

178 

179@operator 

180def count( 

181 start: int = 0, step: int = 1, *, interval: float = 0.0 

182) -> AsyncIterator[int]: 

183 """Generate consecutive numbers indefinitely. 

184 

185 Optional starting point and increment can be defined, 

186 respectively defaulting to ``0`` and ``1``. 

187 

188 An optional interval can be given to space the values out. 

189 """ 

190 agen = from_iterable.raw(itertools.count(start, step)) 

191 return time.spaceout.raw(agen, interval) if interval else agen