In async programming model, methods do not block the current thread. Instead, an async method returns an Async<Foo>
object, representing an async action running in the background. An Async<Foo>
action eventually completes, either in a success with a Foo
value, or in a failure with an Exception
. For example
static Async<Void> fooAction() { return Fiber.sleep(Duration.ofSeconds(1)); }
Here, Fiber.sleep()
does not block the current thread; instead it immediately returns an Async<Void>
which will complete after some time.
You may use Async.execute()
to convert a legacy blocking action to an async action. For example
Async<byte[]> read(Path filePath) { return Async.execute( ()-> Files.readAllBytes(filePath) ); }
Fiber
is an analogue of Thread
for async actions. A fiber is responsible for executing an async task and sub-tasks. For example
static Async<Integer> task() { ... } public static void main(String[] args) { Fiber<?> fiber = new Fiber<>( ()->task() ); fiber.block(); // block the main thread till task completion }
FiberLocal
is an analogue of ThreadLocal
.
Fiber stack trace
can be enabled, which records execution paths of async actions.
An example usage of Fiber: HttpServer
creates a fiber for each http connection. The fiber executes application code that handles http requests from this connection. FiberLocal
is used to store the current request. Application can dump all fibers to review the state of connections.
After an action completes, often we want to start a subsequent action, and so on. This can be modeled by sequencing methods
like map()
and then()
. For example
Async<Integer> action1(){...} Async<String> action2(Integer x){...} Async<String> sequence = action1() .peek( System.out::println ) .map ( x->x*2 ) .then( x->action2(x) ); // then() is "flat map"
We can also simulate Java's try-catch-finally:
.then( x->action2(x) ) .catch_( IOException.class, e->"default string" ) .finally_( resource::close ) .then( string->... )
AsyncIterator<T>
is an async analogue of Iterator
; it contains a single abstract method
Async<T> next();
which yields the next element asynchronously. The returned Async<T>
eventually succeeds with a T
value as the next element, or fails with an End
exception to end the iteration.
For example, an async iterator that yields ten discrete seconds:
Instant stop = Instant.now().plusSeconds(10); AsyncIterator<Instant> ticks = ()-> { if(Instant.now().isAfter(stop)) return End.async(); // end iteration return Fiber .sleep(Duration.ofSeconds(1)) .map( v -> Instant.now() ); };
An AsyncIterator
can be transformed by map(), filter(),
etc. to create an iterator of new elements:
AsyncIterator<String> iter = AsyncIterator.of(1, 2, 3, 4) .filter(x -> x % 2 == 1) .map( String::valueOf ); // map( T -> R ) AsyncIterator<Instant> ticks2 = AsyncIterator.ofInts(0, 10) .map_( i -> Fiber.sleep(Duration.ofSeconds(1)) ) // map_( T -> Async<R> ) .map ( v -> Instant.now() );
Loops involving async actions can often be modeled by an AsyncIterator
with a terminal operation like forEach(), reduce()
.
Async<Void> p = iter.forEach(System.out::println);
Some async APIs are designed to be compatible with AsyncIterator.next()
. For example, ByteSource.read()
returns Async<ByteBuffer>
, which either succeeds with the next ByteBuffer
, or fails with End
for EOF. It's easy to create an async iterator for the data buffers from the source:
ByteSource source = new FileByteSource("tmp.txt"); AsyncIterator<ByteBuffer> bbs = source::read; bbs.forEach( this::parse ); // 3 alternative ways: ( (AsyncIterator<ByteBuffer>)source::read ).forEach( this::parse ); AsyncIterator.by( source::read ).forEach( this::parse ); AsyncIterator.forEach( source::read, this::parse );
We can run parallel actions and treat them as one collective action, which completes when all or some of the actions complete. See AsyncBundle
. In the following example, we query 3 servers simultaneously, and we only need one successful response from any of the servers:
Async<String> queryServer(int serverId, String term) { ... } Async<String> queryParallel(String term) { Stream<Async<String>> queries = IntStream.range(0, 3) .mapToObj( id -> queryServer(id, term) ); return AsyncBundle.anyOf( queries ); }
You may prefer to formulate an algorithm as functions calling functions... all the way down. If an argument to a function is the eventual success value of an async action, try Async.invoke
String func(boolean arg1, int arg2){ ... } Async<String> x = Async.invoke( this::func, async1(), async2() ); Async<Boolean> async1(){ ... } Async<Integer> async2(){ ... }
Currently Async.invoke
supports only arity of 2, as an experiment. If you are interested in this topic, please provide feedback and let's discuss how to design a more general API.
See Goto and Tail Call on how to program a flowchart containing async actions.
We can send a cancellation request to any async action by action.cancel(reason)
. If the action has not completed yet, it should abort immediately and fail with reason
.
A common usage is to cancel an action after a certain timeout. This can be done conveniently by timeout(duration)
.
queryParallel("kittens").timeout(Duration.ofSeconds(3)) .then( ... )
Promise
is an implementation of Async
for result producers. In the following example, we spawn a thread to do some heavy work asynchronously, using a Promise
to represent the async action. The promise is eventually completed by calling either succeed(value)
or fail(exception)
.
Async<Integer> foo() { Promise<Integer> promise = new Promise<>(); Thread thread = new Thread( ()-> { try { Integer x = someHeavyWork(); promise.succeed(x); } catch(Exception e) { promise.fail(e); } }); thread.start(); promise.onCancel( ex->thread.interrupt() ); return promise; }