JavaJava 8Technology

How to write Java Streams Custom Collector

Hands on tutorial to write Java Streams Custom Collector and use it on Streams to Collect data. It also covers an overview of the Collector interface.

Overview

Java Streams Collectors are used with the Terminal Operation of collect() method and used to collect the Stream element into a specific data structure. If you are new to Stream Collectors, please visit Overview of Java Stream Collectors, where we discuss various predefined Java Stream Collectors.

In this tutorial we will focus on understanding the Java Collector interface and Learn to write a Custom Collector for Streams. We will start with understanding the Collector interface in Java and its method. We will also cover how the collection process is divided into four stages. Finally, we will create a custom collector and use it

Learn more about Java streams data collection, please read Java 8 Stream Collectors.

Java Steam Collector Interface

The Collector interface was provided by Java 8 as part of the then newly introduced Java Streams API. The interface defines methods to perform various mutable reduction operations, also known as mutable folds on the data in stream. In other words, such mutable reduction operations accumulates the elements in Stream and transform them into a result or represent them in a different form.

The mutable reduction operations may include applying mathematical functions on the numerical data in a Stream to find min, max or average, or accumulating elements of a Stream into a Set or simply concatenating all the String elements from the Stream.

Next is the definition of Collector interface and its generic parameters.

public interface Collector<T, A, R> {
  • T – Type of elements in Stream. These are type of input parameters to the reduction operation.
  • A – Type of the mutable accumulator for the reduction operation
  • R – Type of the result. In simple word it is the type of the output generated by the reduction operation.

Understand Stream Collector Flow

The Collection happens in 4 different steps, which are facilitated by the Streams API. Let’s have a look at these 4 steps and understand them.

  • Supply :
    It is the first step in the flow of elements collection process. In this process a container is created to hold the elements from the Stream.
  • Accumulate :
    In this step each element in the stream is added into the container created in the Supply step.
  • Combine :
    This is an optional step, which is executed only if the stream is processed in parallel environment. In parallel fashion stream elements are divided and processed simultaneously, because of which there will be multiple accumulators for the stream. Combine step is to combine all the accumulated elements into a single container. As stated, if the Stream is sequential this step will be skipped.
  • Finish :
    This is the last step in the collection process. It is executed when all the elements in Stream are accumulated in the supplied container. In the finish step we will transform the container into the defined result type.

Methods in Collector Interface

All the steps described in the above section are facilitated by 4 methods in the Collector interface. In order to write custom collectors we need to implement the interface along with these 4 methods.

supplier() : Return type of this method is the Supplier of the container type. We need to return a Supplier function that supplies a container to hold Stream elements.

accumulator() : In this method we have to return a BiConsumer function which accepts the container, and a single element from the Stream. The consumer defines the strategy, how the element should be added to the container.

combiner() : As mentioned earlier, this method or step is invoked only if the stream elements are processed in parallel. The combiner function is to combine results of different parallel calculations. In this method we need to return a BinaryOperator function that combines the two accumulated containers.

finisher() : Finisher is the last bit in the flow. In this method we can return a function to transform the accumulated and combined container into the final output.

Apart for these 4 methods, there is one more method that we need to implement.
characteristics() : This method is to specify characteristics of the Collector. We have to return a Set of Characteristics enum values. This enum has three values.

CONCURRENT : Indicates that multiple threads can invoke accumulators on the same container.
UNORDERED : If this characteristic is set, the collector won’t depend on the order of the element in the Stream.
IDENTIFY_FINISH : Set this characteristic to indicate that, the finisher() is just an identity function and it will be omitted during the processing. Which means, the finisher doesn’t perform any action and returns the input as is.

Now that, we have had an overview of the Collector interface and its methods, we are ready to write our own custom collector

Write a Streams Custom Collector

Let’s consider we have a list of Student objects, and we want to create a stream from it and collect the Student objects as an immutable list of Triplets. Each of the triplets in the list will represent a student and it will have age, first name, and last name of the student.

Next, is our Student.java class

public class Student {
    private long id;
    private String firstName;
    private String lastName;
    private int year;

    public Student(long id, String firstName, String lastName, int year) {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
        this.year = year;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

}

Implementation of Collector interface

We will create a class of Collector type whose definition will look like this.

public class StudentTripletCollector implements
        Collector<Student, List<Triplet<Integer, String, String>>, List<Triplet<Integer, String, String>>> {

Notice the generics arguments T, A, R here. We denote that the input type will be Student, and we want to accumulate it as a List of Triplet. Finally, the return type is same as that of the accumulator with the difference that we will be returning an unmodifiable list.

Implement supplier() method

Next, we will be writing the supplier method.

@Override
public Supplier<List<Triplet<Integer, String, String>>> supplier() {
    return ArrayList::new;
}

The method returns a supplier which supplies a new ArrayList when it is invoked.

Implement accumulator() method

In this method the returning function gets the container, which is an ArrayList and a Student object from the stream.

@Override
public BiConsumer<List<Triplet<Integer, String, String>>, Student> accumulator() {
    return (list, student) -> list.add(Triplet.with(student.getYear(), student.getFirstName(), student.getLastName()));
}

The consumer function creates a new Triplet instance and adds it to the container.

Implement combiner() method

The combiner returns a BinaryOperator function which takes to containers as arguments and returns one.

@Override
public BinaryOperator<List<Triplet<Integer, String, String>>> combiner() {
    return (list1, list2) -> {
        list1.addAll(list2);
        return list1;
    };
}

In the consumer function we are adding all the elements from second list in to the first and returning it.

Implement finisher() method

The finisher function receives the container which is an ArrayList of student triplets and returns an Unmodifiable List.

@Override
public Function<List<Triplet<Integer, String, String>>, List<Triplet<Integer, String, String>>> finisher() {
    return Collections::unmodifiableList;
}

Here, we are returning a method reference of Collections::unmodifiableList.

Specify Collector Characteristics

Our. students do not appear in a specific order and hence we can specify UNORDERED characteristic.

@Override
public Set<Characteristics> characteristics() {
    return Set.of(Characteristics.UNORDERED);
}

StudentTripletCollector Class

Now that, we have prepared individual method implementations, it is time to put it all together in our implementation.

package com.amitph.java.tutorialsamples;

import org.javatuples.Triplet;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

public class StudentTripletCollector implements
        Collector<Student, List<Triplet<Integer, String, String>>, List<Triplet<Integer, String, String>>> {


    public static StudentTripletCollector toStudentsTriplesList() {
        return new StudentTripletCollector();
    }

    @Override
    public Supplier<List<Triplet<Integer, String, String>>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<Triplet<Integer, String, String>>, Student> accumulator() {
        return (list, student) -> list.add(Triplet.with(student.getYear(), student.getFirstName(), student.getLastName()));
    }

    @Override
    public BinaryOperator<List<Triplet<Integer, String, String>>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }

    @Override
    public Function<List<Triplet<Integer, String, String>>, List<Triplet<Integer, String, String>>> finisher() {
        return Collections::unmodifiableList;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Set.of(Characteristics.UNORDERED);
    }
}

Also notice, the static collector function which returns an instance of this class.

Use Custom Collector on Streams

Next, we will use our collector on Student streams. We will use the toStudentsTriplesList() function in the collect() method.

public static void main(String[] args) {
    List<Student> students = List.of(
            new Student(1111, "Strong", "Belwas", 2020),
            new Student(1123, "Arthur", "Dayne", 2022),
            new Student(1125, "Jory", "Cassel", 2024)
    );

    List<Triplet<Integer, String, String>> listOfTriples =
            students
                    .stream()
                    .collect(StudentTripletCollector.toStudentsTriplesList());

    listOfTriples.forEach(System.out::println);
}

When we execute we get the next result.

[2020, Strong, Belwas]
[2022, Arthur, Dayne]
[2024, Jory, Cassel]

Summary

In this thorough tutorial we have Covered Writing a Java Streams Custom Collector. We began by undestanding the concept of collectors, their flow and a detailed overview of the Collector interface methods.

We then learned that the collection process is divided into four stages namely Supply, Accumulate, Combine and Finish, out of which combine step is executed only in the parallel processing streams.

Finally we wrote our Custom Streams Collector and used it to collect a List of Student objects.