Maximizing Efficiency with Concurrent Execution
Executing the provided Listing 2 yields comparable results, but the structured concurrency showcased in this example significantly accelerates the process by issuing requests simultaneously. A noteworthy comparison arises between the sc()
method, employing multithreading through structured concurrency, and the sync()
method, which relies on synchronous code. The structured concurrency paradigm, while not substantially more complex to conceptualize, demonstrates a remarkable improvement in speed.
Navigating the Task and Subtask Landscape
In the realm of structured concurrency, the creation of a StructuredTaskScope initiates a workflow that leverages virtual threads by default. Unlike traditional multithreading approaches that allocate operating system threads, structured concurrency instructs the Java Virtual Machine (JVM) to efficiently orchestrate requests. It’s worth noting that the StructuredTaskScope constructor offers the flexibility to accept a ThreadFactory if needed.
Listing 2 showcases the creation of a StructuredTaskScope object within a try-with-resource block, aligning with its intended usage. The fork()
method becomes a pivotal tool, enabling developers to spawn as many jobs as required. This method is versatile, accepting anything implementing Callable, including methods or functions. Notably, the use of an anonymous function, such as () -> getPlanet(planetId)
, provides a concise syntax for passing arguments into the target function.
Upon invoking join()
, the structured concurrency model orchestrates the parallel tasks to synchronize, seamlessly transitioning back to synchronous mode. Crucially, the forked jobs adhere to the configurations set by the TaskScope.
Graceful Termination of Task Scope
The StructuredTaskScope, created within a try-with-resource block, automatically undergoes closure when the block concludes. This triggers the shutdown() process for the scope, offering an opportunity for customized handling of thread disposal. Manual invocation of the shutdown() method is also possible if the need arises to terminate the scope before its automatic closure.
StructuredTaskScope introduces two classes, namely ShutDownOnSuccess and ShutDownOnFailure, designed to implement built-in shutdown policies. These policies monitor the success or failure of subtasks, promptly canceling the remaining running threads. In the existing setup, these classes can be seamlessly integrated to enhance control over concurrent workflows, as demonstrated in the provided example.
Listing 3. Built-in shutdown policies
void failFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2,3,-1,4};
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}
}
void succeedFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2};
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
} catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
System.out.println("\n\r-- BEGIN succeedFast");
try {
myApp. succeedFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("\n\r-- BEGIN failFast");
try {
myApp.failFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
These policies will give output similar to below:
-- BEGIN succeedFast
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt
-- BEGIN failFast
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Error fetching planet information for ID: -1
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt
So what we have is a simple mechanism to initiate all the requests concurrently, and then cancel the rest when one either succeeds or fails via exception. From here, any customizations can be made. The structured concurrency documentation includes an example of collecting subtask results as they succeed or fail and then returning the results. This is fairly simply accomplished by overriding the join()
method and watching the results of each task.
StructuredTaskScope.Subtask
One thing we have not seen in our example is watching the return values of subtasks. Each time StructuredTaskScope.fork()
is called, a StructuredTaskScope.SubTask
object is returned. We can make use of this to watch the state of the tasks. For example, in our sc()
method, we could do the following:
Listing 4. Using StructuredTaskScope.Subtask to watch state
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.ArrayList;
void sc() throws Exception {
int[] planetIds = {1,2,3,4,5};
ArrayList<Subtask> tasks = new ArrayList<Subtask>(planetIds.length);
try (var scope = new StructuredTaskScope<Object>()) {
for (int planetId : planetIds) {
tasks.add(scope.fork(() -> getPlanet(planetId)));
}
scope.join();
}catch (Exception e){
System.out.println("Error: " + e);
}
for (Subtask t : tasks){
System.out.println("Task: " + t.state());
}
}
In this example, we take each task and hold it in an ArrayList
, then output the state on them after join()
. Note that the available states for Subtask
are defined on it as enum. This new method will output something similar to this:
-- BEGIN Structured Concurrency
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Got a Planet: {"name":"Yavin IV"}
Got a Planet: {"name":"Alderaan"}
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS