public interface AsyncIterator<T>
The single abstract method of AsyncIterator<T>
is `Async<T> next()`
,
which is an async action that yields the next element.
If the async action fails with
End
, the iterator reaches its end and there are no more elements.
Some APIs have methods that are semantically compatible with AsyncIterator.next(), for example, ByteSource.read(), WebSocketChannel.readMessage(). It's easy to create an AsyncIterator for them:
AsyncIterator<ByteBuffer> bbIter = byteSource::read; bbIter.forEach( bb->... ); AsyncIterator.by( webSocketChannel::readMessage ).forEach( msg->... );
AsyncIterator is usually used to model loops containing async actions. For example
ByteSource source = ...; AsyncIterator.by(source::read) .map( ByteBuffer::remaining ) .forEach( System.out::println ) .finally_( source::close );
There are intermediary operations that transform an AsyncIterator to another AsyncIterator,
for example, map
, flatMap
, filter
.
There are terminal operations that visit all elements and compute a result,
for example, reduce
, forEach
, toList
.
After an intermediary/terminal operation on an iterator, the iterator should not be used again, or there could be multiple parties trying to iterate its elements.
Sometimes we need to break a loop early, ignoring the rest of the elements. This can be done by throwing `End` from the function that processes elements. For example
Stream<Integer> stream = IntStream.range(0, 100).boxed(); AsyncIterator<Integer> iter = AsyncIterator.wrap(stream); // {0, 1, ... 99 } iter = iter.filter( (integer) -> { if(integer > 50) throw End.instance(); // filter out all remaining elements return integer%2==0; }); iter.forEach( (integer) -> { if(integer > 30) throw End.instance(); // break print(integer); });
Breaking does not cause error condition. In the example, `filter()` will create a new iterator of {0, 2, ... 50}, `forEach()` will complete successfully after printing {0, 2, ... 30},
If the function returns an Async, it can also break the loop by returning an Async that (eventually) fails with an End.
End
-- the control exception for signaling end-of-iteration
Abstract Method | |
---|---|
Async<T> |
next()
Yield the next element asynchronously.
|
Default Methods | |
<R> AsyncIterator<R> |
map(FunctionX<T,R> func)
Create a new iterator by mapping each elements of this iterator to a new element.
|
<R> AsyncIterator<R> |
map_(FunctionX<T,Async<R>> func)
Similar to
map() , except that `func` returns Async<R> instead of R. |
<R> AsyncIterator<R> |
flatMap(FunctionX<T,AsyncIterator<R>> func)
Create a new iterator by mapping each elements of this iterator to multiple new elements.
|
<R> AsyncIterator<R> |
flatMap(FunctionX<T,AsyncIterator<R>> func,
FunctionX<End,AsyncIterator<R>> endFunc)
Similar to flatMap(func), except that the end-of-iteration event is also used
to generate new elements.
|
AsyncIterator<T> |
filter(FunctionX<T,Boolean> predicate)
Create a new iterator that retrains only elements approved by predicate.
|
AsyncIterator<T> |
filter_(FunctionX<T,Async<Boolean>> predicate)
Similar to
filter() , except that predicate returns Async<Boolean> instead of Boolean. |
AsyncIterator<T> |
peek(ConsumerX<T> action)
Perform `action` on each element as they are being iterated.
|
Async<Void> |
forEach(ConsumerX<T> action)
Perform action on each element.
|
Async<Void> |
forEach_(FunctionX<T,Async<?>> asyncAction)
Similar to
forEach() , except that the action is async. |
<R> Async<R> |
reduce(R r0,
BiFunctionX<R,T,R> func)
Compute a value from the elements.
|
<R> Async<R> |
reduce_(R r0,
BiFunctionX<R,T,Async<R>> func)
Similar to
reduce() , except func returns Async<R> instead of R. |
Async<List<T>> |
toList()
Gather all elements into a List.
|
Static Methods | |
<T> AsyncIterator<T> |
empty()
Create an empty
AsyncIterator<T> , with no elements. |
<T> AsyncIterator<T> |
of(T... elements)
Create an
AsyncIterator<T> of the elements. |
AsyncIterator<Integer> |
ofInts(int start,
int end)
Create an AsyncIterator of integers from `start`(inclusive) to `end`(exclusive).
|
<T> AsyncIterator<T> |
wrap(Iterator<T> iterator)
Wrap an
Iterator<T> as an AsyncIterator<T> . |
<T> AsyncIterator<T> |
wrap(Stream<T> stream)
Wrap a
Stream<T> as an AsyncIterator<T> . |
<T> AsyncIterator<T> |
by(AsyncIterator<T> asyncIterator)
Syntax sugar to create an AsyncIterator
from a lambda expression or a method reference.
|
<T> Async<Void> |
forEach(AsyncIterator<T> elements,
ConsumerX<T> action)
Perform action on each element.
|
<T> Async<Void> |
forEach_(AsyncIterator<T> elements,
FunctionX<T,Async<?>> asyncAction)
Perform asyncAction on each element.
|
Async<T> next()
This method returns an Async<T>
,
Caution: concurrent next() actions are not supported; an application must not call next() until the previous next() action is completed.
default <R> AsyncIterator<R> map(FunctionX<T,R> func)
If the elements of this iterator are { e0 , e1 , ... en-1 }, the elements of the new iterators are { func(e0) , func(e1) , ... func(en-1) }.
Breaking can be done if `func` throws `End`.
The `func`
will be invoked in the current executor
.
default <R> AsyncIterator<R> map_(FunctionX<T,Async<R>> func)
map()
, except that `func` returns Async<R>
instead of R.
Breaking can be done if `func` throws `End` or returns an Async that eventually fails with `End`.
The `func`
will be invoked in the current executor
.
default <R> AsyncIterator<R> flatMap(FunctionX<T,AsyncIterator<R>> func)
Breaking can be done if `func` throws `End`.
The `func`
will be invoked in the current executor
.
default <R> AsyncIterator<R> flatMap(FunctionX<T,AsyncIterator<R>> func, FunctionX<End,AsyncIterator<R>> endFunc)
When this iterator next() fails with End, or when func throws End, the End exception is fed to endFunc to generate more new elements.
Breaking can be done if `func` throws `End`.
The `func` and 'endFunc'
will be invoked in the current executor
.
default AsyncIterator<T> filter(FunctionX<T,Boolean> predicate)
Breaking can be done if `predicate` throws `End`.
The `predicate`
will be invoked in the current executor
.
default AsyncIterator<T> filter_(FunctionX<T,Async<Boolean>> predicate)
filter()
, except that predicate returns Async<Boolean>
instead of Boolean.
Breaking can be done if `predicate` throws `End` or returns an Async that eventually fails with `End`.
The `predicate`
will be invoked in the current executor
.
default AsyncIterator<T> peek(ConsumerX<T> action)
This is an intermediary operation equivalent to
map( t->{ action.accept(t); return t; } );
usual for performing actions without altering the elements.
Example usage:
AsyncIterator .ofInts(0, 10) .peek( System.out::println ) .filter( x->x%3==0 ) .peek( System.err::println ) ...
CAUTION: this is not a terminal operation; this method alone will not trigger iteration of elements.
default Async<Void> forEach(ConsumerX<T> action)
The overall action succeeds after all elements are visited and action is applied on each of them.
Breaking can be done if `action` throws `End`.
The `action`
will be invoked in the current executor
.
WARNING:
if the action is async, use forEach_(asyncAction)
instead.
Unfortunately, it's easy to accidentally pass an async action to this method, for example
iter.forEach( elem->Fiber.sleep(duration) ); // the action is async!
This is because the type of parameter `action` is T->void
,
which is compatible with any lambda expression T->expr
,
regardless of the type of `expr`. It also matches with a method reference
with no regard to the return type.
At this point, we don't have a good solution to prevent the problem at compile time. We do some runtime detection and issue a warning if the action is suspected to be async.
default Async<Void> forEach_(FunctionX<T,Async<?>> asyncAction)
forEach()
, except that the action is async.
The `asyncAction` returns Async<?>
instead of void.
Breaking can be done if `action` throws `End` or returns an Async that eventually fails with `End`.
The `asyncAction`
will be invoked in the current executor
.
default <R> Async<R> reduce(R r0, BiFunctionX<R,T,R> func)
If the elements are { e0 , e1 , ... en-1 }, define ri+1 = func(ri , ei), reduce() will yield value rn.
Breaking can be done if `func` throws `End`.
The `func`
will be invoked in the current executor
.
default <R> Async<R> reduce_(R r0, BiFunctionX<R,T,Async<R>> func)
reduce()
, except func returns Async<R>
instead of R.
Breaking can be done if `func` throws `End` or returns an Async that eventually fails with `End`.
The `func`
will be invoked in the current executor
.
static <T> AsyncIterator<T> empty()
AsyncIterator<T>
, with no elements.@SafeVarargs static <T> AsyncIterator<T> of(T... elements)
AsyncIterator<T>
of the elements.static AsyncIterator<Integer> ofInts(int start, int end)
static <T> AsyncIterator<T> wrap(Iterator<T> iterator)
Iterator<T>
as an AsyncIterator<T>
.static <T> AsyncIterator<T> wrap(Stream<T> stream)
Stream<T>
as an AsyncIterator<T>
.static <T> AsyncIterator<T> by(AsyncIterator<T> asyncIterator)
This method simply returns the argument `asyncIterator`, which seems a little odd. Explanation:
Since AsyncIterator is a functional interface, an instance can be created by a lambda expression or a method reference, in 3 contexts:
// Assignment Context AsyncIterator<ByteBuffer> asyncIter = source::read; asyncIter.forEach(...); // Casting Context ((AsyncIterator<ByteBuffer>)source::read) .forEach(...); // Invocation Context AsyncIterator.by(source::read) .forEach(...);
The 3rd option looks better than the other two, and that's the purpose of this method.
If by() is followed by forEach(), it's simpler to use static AsyncIterator.forEach():
AsyncIterator.forEach(source::read, action)
static <T> Async<Void> forEach(AsyncIterator<T> elements, ConsumerX<T> action)
This method is a syntax sugar for `elements.forEach(action)`. Example Usage:
ByteSource source = ... AsyncIterator.forEach( source::read, System.out::println ) .finally_( source::close );
WARNING:
if the action is async, use
forEach_(elements, asyncAction)
instead.
Read the explanation in forEach(action)
.
static <T> Async<Void> forEach_(AsyncIterator<T> elements, FunctionX<T,Async<?>> asyncAction)
This method is a syntax sugar for `elements.forEach_(asyncAction)`. Example Usage:
// echo websocket messages WebSocketChannel chann = ...; AsyncIterator.forEach_( chann::readMessage, chann::writeMessage ) .finally_( chann::close );