public class Fiber<T> extends Object
A fiber executes an async task. A fiber is started upon construction (i.e. the constructor starts the execution of the task).
Async<String> fooTask(){ ... } // start a fiber to execute fooTask Fiber<String> fiber = new Fiber<>( this::fooTask );
A fiber is completed when the task is completed.
See isCompleted()
, join()
.
fiber.block(); // block the current thread until the fiber is completed.
The current fiber can be obtained by Fiber.current()
, for example
Async<String> fooTask() { ... System.out.println( Fiber.current().getName() );
The fiber name is for diagnosis purposes; see getName()
and setName(String)
.
Each fiber is associated with an Executor
, for executing the fiber task and its sub-tasks.
The executor can be specified when creating a new fiber, see constructor
Fiber(executor, name, task)
.
There are default executors
that can be used for fibers.
However, applications often have customized executors.
For example, HttpServer
creates a fiber for each http connection, using
TcpConnection's executor
which dispatches tasks to the selector thread of the connection.
Another example, if you use Async in Swing, you may consider an executor based on
EventQueue::invokeLater
.
Requirements for any fiber executor:
If Fiber.enableTrace
==true, fibers keep track of execution paths
of async actions and sub-actions.
In the following example, the `foo` action invokes the `bar` sub-action:
public static void main(String[] args) { System.setProperty("bayou.async.Fiber.enableTrace", "true"); new Fiber<>( () -> foo() ) .block(); } static Async<Void> foo() { return Async.VOID .then( v -> bar() ); } static Async<Void> bar() { Fiber.dumpStack(); // compare to Thread.dumpStack() return Async.VOID; }
The fiber stack trace will show main()-foo()-bar()
.
Here's a utility to dump all live fibers to System.out:
public static void dumpAllFibers() { for(Fiber<?> fiber : Fiber.getAll()) { System.out.println(fiber.getName()); for(StackTraceElement st : fiber.getStackTrace()) System.out.println(" "+st); } }
There is a non-trivial cost of enabling fiber stack trace. Whether to enable it on a production system should be determined by profiling.
FiberLocal
-- an analogue of ThreadLocal
Modifier and Type | Field and Description |
---|---|
static boolean |
enableTrace
Whether to enable fiber stack traces.
|
Constructor and Description |
---|
Fiber(Callable<Async<T>> task)
Create a new fiber for the task, with the current executor and a generated name.
|
Fiber(Executor executor,
String name,
Callable<Async<T>> task)
Create a new fiber, with the specified executor, name, and task.
|
Instance Methods | |
---|---|
Executor |
getExecutor()
Get the executor of this fiber.
|
String |
getName()
Get the name of this fiber.
|
void |
setName(String name)
Set the name of this fiber.
|
boolean |
isCompleted()
Whether this fiber is completed.
|
Async<T> |
join()
Return an Async that completes when this fiber completes.
|
Result<T> |
block()
Block the current thread util this fiber completes.
|
StackTraceElement[] |
getStackTrace()
Get the fiber stack trace.
|
Static Methods | |
Fiber<?> |
current()
Return the current fiber, or null if there is none.
|
Collection<Fiber<?>> |
getAll()
Get all fibers that are not completed.
|
Executor |
defaultExecutor()
Get a default executor implementation that can be used for fibers.
|
Executor |
currentExecutor()
Get the executor of the current fiber.
|
Async<Void> |
sleep(Duration duration)
Start an async action that idles for the specified duration, then succeeds with `null`.
|
<T> Async<T> |
sleep(Duration duration,
T value)
Start an async action that idles for the specified duration, then succeeds with `value`.
|
void |
dumpStack()
Dump the stack trace of the current fiber to System.err.
|
public static final boolean enableTrace
This flag is true iff system property "bayou.async.Fiber.enableTrace" is "true" when Fiber class is initialized. For example, you can place the following code in the beginning of your application:
System.setProperty("bayou.async.Fiber.enableTrace", "true");
public Fiber(Callable<Async<T>> task)
See Fiber(executor, name, task)
for details.
public Fiber(Executor executor, String name, Callable<Async<T>> task)
The fiber is started upon construction, submitting the task to the executor.
If `executor==null`, the currentExecutor()
is used.
See also requirements for the executor.
If `name==null`, an automatically generated name is used.
The `task` must be non-null.
public static Fiber<?> current()
If the current code is being executed in a fiber executor, this method returns that very fiber.
See getExecutor()
.
public static Collection<Fiber<?>> getAll()
The returned collection is unmodifiable by caller, but it may appear to be changing, because new fibers are being created, and existing fibers are being completed.
public static Executor defaultExecutor()
public static Executor currentExecutor()
If Fiber.current()
==null, return defaultExecutor()
instead.
Note that this method never returns null.
The current executor is used for executing user codes like
`onSuccess` in Async.then(onSuccess)
,
`action` in AsyncIterator.forEach(action)
,
etc. The purpose is to execute fiber related codes in the proper fiber environment.
Clarification on the meaning of "current": in this example
action1 .then( func ) ...
The current executor for `then()` is captured at the time `then()` is invoked, not when/where `action1` is completed. When `action1` is completed, `func` will be submitted to the previously captured executor.
public Executor getExecutor()
The returned executor is not the same as the executor passed in the constructor; it's a wrapper of the latter.
Any code submitted to this executor will see `Fiber.current()` equal to this fiber.
fiber0.getExecutor().execute( ()-> { assert Fiber.current()==fiber0; });
public String getName()
Methods get/setName()
are thread-safe, with volatile read/write semantics.
public void setName(String name)
Methods get/setName()
are thread-safe, with volatile read/write semantics.
name
- must be non-nullpublic boolean isCompleted()
public Async<T> join()
Example Usage:
anotherFiber.join() .timeout( Duration.ofSeconds(5) ) .then(...) ...
Any cancellation request to the returned Async will be forwarded to the fiber task.
Note: this method is an async analogue of `Thread.join()`;
it does not block the current thread.
You can do join().sync()
instead,
see Async.sync()
.
public Result<T> block()
If the current thread is interrupted while it's being blocked by this method, the fiber task will receive a cancellation request with `InterruptedException` as reason. Hopefully the fiber task will abort quickly, so that this method can return quickly.
This method does not have a timeout parameter; try `join().timeout(duration).sync()` instead.
Caution: This is a blocking method, which usually should not be called in an async application. Because this method blocks the current thread, deadlock is possible if it's called in a fiber executor, preventing this or other fibers to advance to completion. Be very careful if you use this method on a production system.
public static Async<Void> sleep(Duration duration)
If the action is canceled with `reason=e` before the duration has passed, the action fails with `e`.
This method is equivalent to `sleep(duration, (Void)null)`.
public static <T> Async<T> sleep(Duration duration, T value)
If the action is canceled with `reason=e` before the duration has passed, the action fails with `e`.
public StackTraceElement[] getStackTrace()
The returned array is a snapshot copy of the current stack trace; the caller may modify it.
It's safe to invoke this method on any fiber and any thread.
public static void dumpStack()
If Fiber.enableTrace
==false
or Fiber.current()
==null,
this method is equivalent to Thread.dumpStack()
.