Welcome Gatherers!

We go – to the gathering
We all go to the gathering

The Gathering – Killing Joke

With Java 22, the Gatherers (JEP 461) have joined stream processing. Until now, stream processing consisted of three parts: the source, intermediate operations and a terminal operation. While the source and the termination operation were already quite flexible since Java 8, there were few possibilities to break out of the existing set for the intermediate operators.

If something special was to happen to the stream, this either worked via complicated intermediate steps or specially implemented Collectors as a terminal operation.

Gatherers are now revolutionizing stream processing. With the Stream::gather(Gatherer) method, you can use your own intermediate operators. All you need to do is implement the Gatherer interface.

A Gatherer can manipulate elements in various ways. It can transform them, insert additional elements and even limit a stream as a short-circuit. Similar to a collector, the gatherer provides methods to fulfill its tasks.

The initializer method creates an object to hold the internal state of the Gatherer, if necessary.
The integrater method that receives a new element from the stream and processes it. The integrator can push elements into the stream or terminate the stream processing prematurely.
The combiner method , like its counterpart in the Collector, takes care of merging during parallel processing.
The finisher method is called when there are no more elements in the stream. It can be used to perform final tasks on the stream.

To demonstrate the possibilities of the Gatherers, the next example shows the EllipsisGatherer, which emulates the functionality of the EllipsisCollector from the Stream Collector Utilities project.

List<String> maxBrothers= List.of("Chico","Harpo","Groucho","Gummo","Zeppo");
assertEquals("Chico, Harpo, Groucho, …", maxBrothers.stream().collect(EllipsisCollector.ellipsis(16)));  
assertEquals("Chico, Harpo, Groucho, …", 
maxBrothers.stream().gather(EllipsisGatherers.ellipsis(16)).findFirst().orElse(""));

In the two examples, the result "Chico, Harpo, Groucho, …" is generated from the list "Chico", "Harpo", "Groucho", "Gummo", "Zeppo". The first time with a Collector and the second time with a Gatherer.

public final class EllipsisGatherers {
    public static Gatherer<String, ?, String> ellipsis(int maxLength) {
        return ellipsis(", ", "…", maxLength);
    }
    
    public static Gatherer<String, ?, String> ellipsis(CharSequence delimiter, String ellipsis, int maxLength) {
        if (maxLength < 1)
            throw new IllegalArgumentException("'maxLength' must be greater than zero");

        class Ellipsis {

            private final StringJoiner joiner;
            private final String ellipsis;
            private final int maxLength;
            private final CharSequence delimiter;

            public Ellipsis(CharSequence delimiter, String ellipsis, int maxLength) {
                joiner = new StringJoiner(delimiter);
                this.delimiter = delimiter;
                this.ellipsis = ellipsis;
                this.maxLength = maxLength - ellipsis.length() - delimiter.length();
            }

            boolean integrate(String element, Gatherer.Downstream<? super String> downstream) {
                System.out.println(element);
                joiner.add(element);
                if (joiner.length() < maxLength) {
                    return true;
                }
                downstream.push(joiner + delimiter.toString() + ellipsis);
                return false;
            }

            void finish(Gatherer.Downstream<? super String> downstream) {
                String result = joiner.length() < maxLength ? joiner.toString() : joiner + delimiter.toString() + ellipsis;
                if (!result.isEmpty() && !downstream.isRejecting()) {
                    downstream.push(result);
                }
            }
        }

        return Gatherer.<String, Ellipsis, String>ofSequential(
                () -> new Ellipsis(delimiter, ellipsis, maxLength),
                Gatherer.Integrator.<Ellipsis, String, String>ofGreedy(Ellipsis::integrate),
                Ellipsis::finish
        );
    }
}

The Gatherer concatenates String elements from the Stream until the maximum desired length has been reached. As long as the length is less than maxLength, the String is appended, otherwise the concatenated String is output to the Stream and the processing of the Stream is stopped with return false.

String results longer than maxLength can actually be created, but this is not relevant for this example.

In the article Aufzählungen und andere String-Konkatenationen, a shortcoming of the existing EllipsisCollector was named. No matter how many elements the Stream contains, all elements must be touched by the Collector. In this example, all five Marx Brothers, but it could also have been 5000 elements. The Gatherer solution touches only three of the Brothers.

The Stream Collector Utilities project suffers a little from the introduction of the Gatherers, because even the PortionCollectors become obsolete. With Gathereres.windowFixed, portions can now be created with JDK on-board resources.

// will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List numbers = List.of(1,2,3,4,5,6,7,8);
List<List<Integer>> windows = numbers.stream().gather(Gatherers.windowSliding(2)).toList();
Collection<List<Integer>> portions = numbers.stream().collect(PortionCollector.toList(2));

At the moment JEP 461 still has the status of a preview feature. Let’s hope that changes soon!

Leave a Comment