> Docs > Async Programming

Async Programming

Async Action

In async programming model, methods do not block the current thread. Instead, an async method returns an Async<Foo> object, representing an async action running in the background. An Async<Foo> action eventually completes, either in a success with a Foo value, or in a failure with an Exception. For example

static Async<Void> fooAction()
{
    return Fiber.sleep(Duration.ofSeconds(1));
}

Here, Fiber.sleep() does not block the current thread; instead it immediately returns an Async<Void> which will complete after some time.

You may use Async.execute() to convert a legacy blocking action to an async action. For example

Async<byte[]> read(Path filePath)
{
    return Async.execute( ()-> Files.readAllBytes(filePath) );
}

Fiber

Fiber is an analogue of Thread for async actions. A fiber is responsible for executing an async task and sub-tasks. For example

static Async<Integer> task() { ... }

public static void main(String[] args)
{
    Fiber<?> fiber = new Fiber<>( ()->task() );
    fiber.block(); // block the main thread till task completion
}

FiberLocal is an analogue of ThreadLocal.

Fiber stack trace can be enabled, which records execution paths of async actions.

An example usage of Fiber: HttpServer creates a fiber for each http connection. The fiber executes application code that handles http requests from this connection. FiberLocal is used to store the current request. Application can dump all fibers to review the state of connections.

Sequential Actions

After an action completes, often we want to start a subsequent action, and so on. This can be modeled by sequencing methods like map() and then(). For example

 Async<Integer> action1(){...}
 Async<String>  action2(Integer x){...}

 Async<String> sequence =
     action1()
     .peek( System.out::println )
     .map ( x->x*2 )
     .then( x->action2(x) ); // then() is "flat map"

We can also simulate Java's try-catch-finally:

     .then( x->action2(x) )
     .catch_( IOException.class, e->"default string" )
     .finally_( resource::close )
     .then( string->... )

Iteration and Looping

AsyncIterator<T> is an async analogue of Iterator; it contains a single abstract method

Async<T> next();

which yields the next element asynchronously. The returned Async<T> eventually succeeds with a T value as the next element, or fails with an End exception to end the iteration.

For example, an async iterator that yields ten discrete seconds:

        Instant stop = Instant.now().plusSeconds(10);

        AsyncIterator<Instant> ticks = ()->
        {
            if(Instant.now().isAfter(stop))
                return End.async();  // end iteration

            return Fiber
                .sleep(Duration.ofSeconds(1))
                .map( v -> Instant.now() );
        };

An AsyncIterator can be transformed by map(), filter(), etc. to create an iterator of new elements:

AsyncIterator<String> iter = AsyncIterator.of(1, 2, 3, 4)
    .filter(x -> x % 2 == 1)
    .map( String::valueOf );  // map( T -> R )

AsyncIterator<Instant> ticks2 = AsyncIterator.ofInts(0, 10)
    .map_( i -> Fiber.sleep(Duration.ofSeconds(1)) )     // map_( T -> Async<R> )
    .map ( v -> Instant.now() );

Loops involving async actions can often be modeled by an AsyncIterator with a terminal operation like forEach(), reduce().

Async<Void> p = iter.forEach(System.out::println);

Some async APIs are designed to be compatible with AsyncIterator.next(). For example, ByteSource.read() returns Async<ByteBuffer>, which either succeeds with the next ByteBuffer, or fails with End for EOF. It's easy to create an async iterator for the data buffers from the source:

ByteSource source = new FileByteSource("tmp.txt");

AsyncIterator<ByteBuffer> bbs = source::read;
bbs.forEach( this::parse );

// 3 alternative ways:
( (AsyncIterator<ByteBuffer>)source::read ).forEach( this::parse );
AsyncIterator.by( source::read ).forEach( this::parse );
AsyncIterator.forEach( source::read, this::parse );

Parallel Actions

We can run parallel actions and treat them as one collective action, which completes when all or some of the actions complete. See AsyncBundle. In the following example, we query 3 servers simultaneously, and we only need one successful response from any of the servers:

Async<String> queryServer(int serverId, String term) { ... }

Async<String> queryParallel(String term)
{
    Stream<Async<String>> queries = IntStream.range(0, 3)
        .mapToObj( id -> queryServer(id, term) );

    return AsyncBundle.anyOf( queries );
}

Async Args

You may prefer to formulate an algorithm as functions calling functions... all the way down. If an argument to a function is the eventual success value of an async action, try Async.invoke

String func(boolean arg1, int arg2){ ... }

Async<String> x = Async.invoke( this::func, async1(), async2() );

Async<Boolean> async1(){ ... }
Async<Integer> async2(){ ... }

Currently Async.invoke supports only arity of 2, as an experiment. If you are interested in this topic, please provide feedback and let's discuss how to design a more general API.

Goto

See Goto and Tail Call on how to program a flowchart containing async actions.

Cancel and Timeout

We can send a cancellation request to any async action by action.cancel(reason). If the action has not completed yet, it should abort immediately and fail with reason.

A common usage is to cancel an action after a certain timeout. This can be done conveniently by timeout(duration).

queryParallel("kittens").timeout(Duration.ofSeconds(3))
    .then( ... )

Promise

Promise is an implementation of Async for result producers. In the following example, we spawn a thread to do some heavy work asynchronously, using a Promise to represent the async action. The promise is eventually completed by calling either succeed(value) or fail(exception).

Async<Integer> foo()
{
    Promise<Integer> promise = new Promise<>();

    Thread thread = new Thread( ()->
    {
        try
        {
            Integer x = someHeavyWork();
            promise.succeed(x);
        }
        catch(Exception e)
        {
            promise.fail(e);
        }
    });
    thread.start();

    promise.onCancel( ex->thread.interrupt() );

    return promise;
}