Since its introduction in JDK 8, the Stream API has changed heavily how we process collections and data sequences in Java. With its functional approach and support for pipeline operations, it allows for more expressive and often more efficient code.
The Stream
API is very rich and offers numerous possibilities for in-memory data processing based on the map-filter-reduce model. It relies on the Spliterator
API, and the versatility of these two models provides many opportunities. However, the downside is that the Spliterator API is not easy to use and does not produce simple, readable code. Additionally, if you need to leverage parallel streams, its use can become very complex.
With the Gatherer
API, officially introduced in JDK 24 after several preview phases, Java offers a mechanism for defining custom intermediate operations. This innovation unlocks unprecedented flexibility and expressiveness in stream pipelines. Its design is similar to the Collector
API for terminal stream operations.
As described in JEP 485, this API enhances stream pipelines by introducing advanced intermediate operations that were previously unsupported.
A Solution to Stream Transformation Limitations
The Stream API provides essential operations such as map
, filter
, reduce
, and sort
. However, some advanced transformations remain difficult to achieve with these standard tools, such as:
- Applying a
distinct
operation based on an object attribute (e.g., filtering strings by their unique length). - Grouping elements into fixed-size windows as the stream progresses.
- Detecting patterns or sequences in an event stream, requiring persistent state management.
These operations cannot be defined using the Stream API and require the use of Spliterators. The Gatherer API offers a solution to these issues by enabling the modeling of custom intermediate operations, similar to how the Collector API allows defining custom terminal operations.
It provides a simpler model than the Spliterator API, with excellent parallelism support: even with a sequential Gatherer implementation, you can benefit from the performance of parallel streams both upstream and downstream. The Spliterator API does not offer this advantage.
What is a Gatherer?
A Gatherer is an object defining a custom intermediate operation on a stream. It receives elements from an upstreamstream, performs a transformation, and transmits (or not) the elements to a downstream stream.
A Gatherer can take several forms:
- 1-to-1: each input element produces a single output element (like
map
). - 1-to-N: each input element produces zero or more output elements (like
flatMap
). - N-to-1: multiple input elements are combined into one (like
fold
). - N-to-M: complex transformations involving multiple inputs and outputs (e.g., windowing operations).
Gatherers offer several advanced features:
- Maintaining a mutable state to retain information from previous operations.
- Interrupting processing to handle infinite streams or optimize computations (short-circuiting).
- Supporting parallel execution via a state-combining function (combiner).
Understanding the Gatherer Interface
A Gatherer is defined through the java.util.stream.Gatherer
interface, which relies on four key methods:
initializer()
(optional): provides an initial mutable state to maintain information across elements.integrator()
(mandatory): defines the transformation to apply to each element using anIntegrator
. It receives the current state, the input element, and aDownstream
for pushing transformed elements.combiner()
(optional): combines states in case of parallel processing.finisher()
(optional): executes a final operation after processing all elements.
A Gatherer can be instantiated by directly implementing this interface or using factory methods like Gatherer.of()
and Gatherer.ofSequential()
.
Example Usage of Gatherer
The Gatherer API exists to simplify complex operations, making it difficult to write simple examples that showcase its full potential. However, here are two trivial examples.
map
This operation performs the same task as a map
operation, transforming all strings in a stream to uppercase.
Here, Gatherer.of()
is used with a lambda to transform each element to uppercase before passing it to the next stage. This is the simplest available version: it does not require maintaining a state.
💡It is important to note that the integrator must return a boolean indicating whether the stream accepts more data or not. In this example, we simply propagate the information provided by the push operation on the downstream stream, thereby allowing a possible termination to be signaled upstream (short-circuiting).
join
The following Gatherer performs a similar task to Collectors.joining()
, using a sequential operation on stream elements.
Here, we maintain a mutable state State
to efficiently accumulate the join operation on successive elements. Note that the integrator does not publish to its downstream flow in this particular case, so we can use an optimization via Integrator.ofGreedy()
to indicate that we will consume the entire stream. Only the final operation will publish (if necessary).
💡The use of ? in the Gatherer type declaration here is noteworthy because the Gatherer must include the type of the mutable state, which is not accessible outside the join function.
Ready-to-Use Gatherers
JDK 24 provides several predefined Gatherers via java.util.stream.Gatherers
, including:
fold( initial, folder )
: incremental aggregation of a mutable state.scan( initial, scanner )
: incremental accumulation of state over the stream.windowFixed( windowSize )
: grouping elements into fixed-size windows.windowSliding( windowSize )
: sliding windows over the stream.mapConcurrent( maxConcurrency, mapper )
: concurrent application of a transformation while preserving order.
As an illustration, here is an example of using fixed-size windowing:
Why Adopt the Gatherer API?
The Gatherer API fills a significant gap in the Stream API, offering unprecedented flexibility for developers working with data streams. While its learning curve may require some initial effort, it allows for:
- Expressing complex transformations directly in a Stream pipeline.
- Simplifying and improving the readability of data processing code.
- Optimizing performance through advanced state management and parallelism.
If you have ever been limited by standard intermediate operations in Java Streams, it’s time to explore gather()
and push the boundaries of data transformations in your Java 24 applications!
References
- JEP 485: Stream Gatherers – https://openjdk.org/jeps/485
- Java API –
Gatherer<T,A,R>
– https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherer.html - Java API –
Gatherer.Integrator<A,T,R>
– https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherer.Integrator.html - Java API –
Gatherer.Downstream<T>
– https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherer.Downstream.html - Java API –
Gatherers
– https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherers.html - Oracle: Stream Gatherers – https://docs.oracle.com/en/java/javase/24/core/stream-gatherers.html
- The Gatherer API – https://dev.java/learn/api/streams/gatherers/
- To Gather or not to Gather? That is the question. – https://dev.to/onepoint/to-gather-or-not-to-gather-that-is-the-question-36oo
- Stream Gatherers: Intro to Intermediate Operations Modeler – https://dzone.com/articles/stream-gatherers-intermediate-operations-modeler